00001
00002
00003
00004
00005
00006
00007
00008 #ifndef OPENDDS_DDS_DCPS_SERVICE_PARTICIPANT_H
00009 #define OPENDDS_DDS_DCPS_SERVICE_PARTICIPANT_H
00010
00011 #include "ace/config-macros.h"
00012 #include "DomainParticipantFactoryImpl.h"
00013 #include "dds/DdsDcpsInfrastructureC.h"
00014 #include "dds/DdsDcpsDomainC.h"
00015 #include "dds/DdsDcpsInfoUtilsC.h"
00016 #include "DomainParticipantFactoryImpl.h"
00017 #include "ace/Barrier.h"
00018 #include "dds/DCPS/Definitions.h"
00019 #include "dds/DCPS/MonitorFactory.h"
00020 #include "dds/DCPS/Discovery.h"
00021 #include "dds/DCPS/PoolAllocator.h"
00022
00023 #include "ace/Task.h"
00024 #include "ace/Configuration.h"
00025 #include "ace/Time_Value.h"
00026 #include "ace/ARGV.h"
00027
00028 #include "Recorder.h"
00029 #include "Replayer.h"
00030 #include <memory>
00031
00032 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00033 #pragma once
00034 #endif
00035
00036 namespace OpenDDS {
00037 namespace DCPS {
00038
00039 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
00040 class DataDurabilityCache;
00041 #endif
00042 class Monitor;
00043
00044 const char DEFAULT_ORB_NAME[] = "OpenDDS_DCPS";
00045
00046
00047
00048
00049
00050
00051
00052
00053
00054
00055
00056
00057
00058 class OpenDDS_Dcps_Export Service_Participant {
00059 public:
00060
00061
00062 enum { ANY_DOMAIN = -1 };
00063
00064
00065 Service_Participant();
00066
00067
00068 ~Service_Participant();
00069
00070
00071 static Service_Participant* instance();
00072
00073
00074
00075 ACE_Reactor_Timer_Interface* timer() const;
00076
00077 ACE_Reactor* reactor() const;
00078
00079 ACE_thread_t reactor_owner() const;
00080
00081
00082
00083
00084
00085
00086
00087 DDS::DomainParticipantFactory_ptr get_domain_participant_factory(
00088 int &argc = zero_argc,
00089 ACE_TCHAR *argv[] = 0);
00090
00091 #ifdef ACE_USES_WCHAR
00092 DDS::DomainParticipantFactory_ptr
00093 get_domain_participant_factory(int &argc, char *argv[]);
00094 #endif
00095
00096
00097
00098
00099
00100
00101
00102 void shutdown();
00103
00104
00105 bool is_shut_down() const;
00106
00107
00108 Discovery_rch get_discovery(const DDS::DomainId_t domain);
00109
00110
00111 DDS::UserDataQosPolicy initial_UserDataQosPolicy() const;
00112 DDS::TopicDataQosPolicy initial_TopicDataQosPolicy() const;
00113 DDS::GroupDataQosPolicy initial_GroupDataQosPolicy() const;
00114 DDS::TransportPriorityQosPolicy initial_TransportPriorityQosPolicy() const;
00115 DDS::LifespanQosPolicy initial_LifespanQosPolicy() const;
00116 DDS::DurabilityQosPolicy initial_DurabilityQosPolicy() const;
00117 DDS::DurabilityServiceQosPolicy initial_DurabilityServiceQosPolicy() const;
00118 DDS::PresentationQosPolicy initial_PresentationQosPolicy() const;
00119 DDS::DeadlineQosPolicy initial_DeadlineQosPolicy() const;
00120 DDS::LatencyBudgetQosPolicy initial_LatencyBudgetQosPolicy() const;
00121 DDS::OwnershipQosPolicy initial_OwnershipQosPolicy() const;
00122 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00123 DDS::OwnershipStrengthQosPolicy initial_OwnershipStrengthQosPolicy() const;
00124 #endif
00125 DDS::LivelinessQosPolicy initial_LivelinessQosPolicy() const;
00126 DDS::TimeBasedFilterQosPolicy initial_TimeBasedFilterQosPolicy() const;
00127 DDS::PartitionQosPolicy initial_PartitionQosPolicy() const;
00128 DDS::ReliabilityQosPolicy initial_ReliabilityQosPolicy() const;
00129 DDS::DestinationOrderQosPolicy initial_DestinationOrderQosPolicy() const;
00130 DDS::HistoryQosPolicy initial_HistoryQosPolicy() const;
00131 DDS::ResourceLimitsQosPolicy initial_ResourceLimitsQosPolicy() const;
00132 DDS::EntityFactoryQosPolicy initial_EntityFactoryQosPolicy() const;
00133 DDS::WriterDataLifecycleQosPolicy initial_WriterDataLifecycleQosPolicy() const;
00134 DDS::ReaderDataLifecycleQosPolicy initial_ReaderDataLifecycleQosPolicy() const;
00135
00136 DDS::DomainParticipantFactoryQos initial_DomainParticipantFactoryQos() const;
00137 DDS::DomainParticipantQos initial_DomainParticipantQos() const;
00138 DDS::TopicQos initial_TopicQos() const;
00139 DDS::DataWriterQos initial_DataWriterQos() const;
00140 DDS::PublisherQos initial_PublisherQos() const;
00141 DDS::DataReaderQos initial_DataReaderQos() const;
00142 DDS::SubscriberQos initial_SubscriberQos() const;
00143
00144
00145
00146
00147
00148
00149
00150 size_t n_chunks() const;
00151
00152
00153
00154
00155
00156 void n_chunks(size_t chunks);
00157
00158
00159
00160
00161
00162
00163 size_t association_chunk_multiplier() const;
00164
00165
00166
00167
00168
00169
00170 void association_chunk_multiplier(size_t multiplier);
00171
00172
00173
00174
00175 void liveliness_factor(int factor);
00176
00177
00178
00179
00180 int liveliness_factor() const;
00181
00182
00183 void add_discovery(Discovery_rch discovery);
00184
00185 bool set_repo_ior(const char* ior,
00186 Discovery::RepoKey key = Discovery::DEFAULT_REPO,
00187 bool attach_participant = true);
00188
00189 #ifdef DDS_HAS_WCHAR
00190
00191 bool set_repo_ior(const wchar_t* ior,
00192 Discovery::RepoKey key = Discovery::DEFAULT_REPO,
00193 bool attach_participant = true);
00194 #endif
00195
00196
00197 void remap_domains(Discovery::RepoKey oldKey,
00198 Discovery::RepoKey newKey,
00199 bool attach_participant = true);
00200
00201
00202 void set_repo_domain(const DDS::DomainId_t domain,
00203 Discovery::RepoKey repo,
00204 bool attach_participant = true);
00205
00206 void set_default_discovery(const Discovery::RepoKey& defaultDiscovery);
00207 Discovery::RepoKey get_default_discovery();
00208
00209
00210 Discovery::RepoKey domain_to_repo(const DDS::DomainId_t domain) const;
00211
00212
00213 void repository_lost(Discovery::RepoKey key);
00214
00215
00216
00217 int& federation_recovery_duration();
00218 int federation_recovery_duration() const;
00219
00220
00221
00222
00223 int& federation_initial_backoff_seconds();
00224 int federation_initial_backoff_seconds() const;
00225
00226
00227
00228
00229 int& federation_backoff_multiplier();
00230 int federation_backoff_multiplier() const;
00231
00232
00233
00234
00235 int& federation_liveliness();
00236 int federation_liveliness() const;
00237
00238
00239
00240
00241 long& scheduler();
00242 long scheduler() const;
00243
00244
00245
00246
00247 bool& publisher_content_filter();
00248 bool publisher_content_filter() const;
00249
00250
00251
00252 ACE_Time_Value pending_timeout() const;
00253
00254
00255
00256 int priority_min() const;
00257 int priority_max() const;
00258
00259
00260
00261
00262
00263
00264
00265
00266
00267
00268
00269
00270 int bit_transport_port() const;
00271 void bit_transport_port(int port);
00272
00273
00274 OPENDDS_STRING bit_transport_ip() const;
00275
00276
00277
00278
00279
00280
00281
00282
00283 int bit_lookup_duration_msec() const;
00284 void bit_lookup_duration_msec(int msec);
00285
00286
00287 bool get_BIT() {
00288 return bit_enabled_;
00289 }
00290
00291 void set_BIT(bool b) {
00292 bit_enabled_ = b;
00293 }
00294
00295 ACE_CString default_address() const;
00296
00297 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
00298
00299
00300 DataDurabilityCache * get_data_durability_cache(
00301 DDS::DurabilityQosPolicy const & durability);
00302 #endif
00303
00304
00305 typedef OPENDDS_MAP(Discovery::RepoKey, Discovery_rch) RepoKeyDiscoveryMap;
00306 const RepoKeyDiscoveryMap& discoveryMap() const;
00307 typedef OPENDDS_MAP(DDS::DomainId_t, Discovery::RepoKey) DomainRepoMap;
00308 const DomainRepoMap& domainRepoMap() const;
00309
00310 void register_discovery_type(const char* section_name,
00311 Discovery::Config* cfg);
00312
00313 #ifndef OPENDDS_SAFETY_PROFILE
00314 ACE_ARGV* ORB_argv() { return &ORB_argv_; }
00315 #endif
00316
00317
00318
00319
00320 Recorder_ptr create_recorder(DDS::DomainParticipant_ptr participant,
00321 DDS::Topic_ptr a_topic,
00322 const DDS::SubscriberQos & subscriber_qos,
00323 const DDS::DataReaderQos & datareader_qos,
00324 const RecorderListener_rch & a_listener );
00325
00326
00327
00328
00329
00330
00331 DDS::ReturnCode_t delete_recorder(Recorder_ptr recorder);
00332
00333
00334
00335
00336 Replayer_ptr create_replayer(DDS::DomainParticipant_ptr participant,
00337 DDS::Topic_ptr a_topic,
00338 const DDS::PublisherQos & publisher_qos,
00339 const DDS::DataWriterQos & datawriter_qos,
00340 const ReplayerListener_rch & a_listener );
00341
00342
00343
00344
00345 DDS::ReturnCode_t delete_replayer(Replayer_ptr replayer);
00346
00347
00348
00349
00350 DDS::Topic_ptr create_typeless_topic(DDS::DomainParticipant_ptr participant,
00351 const char * topic_name,
00352 const char * type_name,
00353 bool type_has_keys,
00354 const DDS::TopicQos & qos,
00355 DDS::TopicListener_ptr a_listener = 0,
00356 DDS::StatusMask mask = 0);
00357
00358
00359
00360
00361
00362
00363
00364
00365 int load_configuration(ACE_Configuration_Heap& cf,
00366 const ACE_TCHAR* filename);
00367
00368 #ifdef OPENDDS_SAFETY_PROFILE
00369
00370
00371
00372 void configure_pool();
00373 #endif
00374
00375 private:
00376
00377
00378 void initialize();
00379
00380
00381 void initializeScheduling();
00382
00383
00384
00385
00386
00387 int parse_args(int &argc, ACE_TCHAR *argv[]);
00388
00389
00390
00391
00392
00393
00394
00395
00396 int load_configuration();
00397
00398
00399
00400
00401
00402
00403
00404
00405 int load_common_configuration(ACE_Configuration_Heap& cf,
00406 const ACE_TCHAR* filename);
00407
00408
00409
00410
00411
00412 int load_domain_configuration(ACE_Configuration_Heap& cf,
00413 const ACE_TCHAR* filename);
00414
00415
00416
00417
00418
00419 int load_discovery_configuration(ACE_Configuration_Heap& cf,
00420 const ACE_TCHAR* section_name);
00421
00422 OPENDDS_MAP(OPENDDS_STRING, Discovery::Config*) discovery_types_;
00423
00424 #ifndef OPENDDS_SAFETY_PROFILE
00425 ACE_ARGV ORB_argv_;
00426 #endif
00427
00428 ACE_Reactor* reactor_;
00429 ACE_thread_t reactor_owner_;
00430
00431 struct ReactorTask : ACE_Task_Base {
00432 ReactorTask()
00433 : barrier_(2)
00434 { }
00435 int svc();
00436 void wait_for_startup() { barrier_.wait(); }
00437 private:
00438 ACE_Barrier barrier_;
00439 } reactor_task_;
00440
00441 DomainParticipantFactoryImpl* dp_factory_servant_;
00442 DDS::DomainParticipantFactory_var dp_factory_;
00443
00444
00445 RepoKeyDiscoveryMap discoveryMap_;
00446
00447
00448 DomainRepoMap domainRepoMap_;
00449
00450 Discovery::RepoKey defaultDiscovery_;
00451
00452
00453
00454 TAO_SYNCH_MUTEX factory_lock_;
00455
00456
00457 DDS::UserDataQosPolicy initial_UserDataQosPolicy_;
00458 DDS::TopicDataQosPolicy initial_TopicDataQosPolicy_;
00459 DDS::GroupDataQosPolicy initial_GroupDataQosPolicy_;
00460 DDS::TransportPriorityQosPolicy initial_TransportPriorityQosPolicy_;
00461 DDS::LifespanQosPolicy initial_LifespanQosPolicy_;
00462 DDS::DurabilityQosPolicy initial_DurabilityQosPolicy_;
00463 DDS::DurabilityServiceQosPolicy initial_DurabilityServiceQosPolicy_;
00464 DDS::PresentationQosPolicy initial_PresentationQosPolicy_;
00465 DDS::DeadlineQosPolicy initial_DeadlineQosPolicy_;
00466 DDS::LatencyBudgetQosPolicy initial_LatencyBudgetQosPolicy_;
00467 DDS::OwnershipQosPolicy initial_OwnershipQosPolicy_;
00468 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00469 DDS::OwnershipStrengthQosPolicy initial_OwnershipStrengthQosPolicy_;
00470 #endif
00471 DDS::LivelinessQosPolicy initial_LivelinessQosPolicy_;
00472 DDS::TimeBasedFilterQosPolicy initial_TimeBasedFilterQosPolicy_;
00473 DDS::PartitionQosPolicy initial_PartitionQosPolicy_;
00474 DDS::ReliabilityQosPolicy initial_ReliabilityQosPolicy_;
00475 DDS::DestinationOrderQosPolicy initial_DestinationOrderQosPolicy_;
00476 DDS::HistoryQosPolicy initial_HistoryQosPolicy_;
00477 DDS::ResourceLimitsQosPolicy initial_ResourceLimitsQosPolicy_;
00478 DDS::EntityFactoryQosPolicy initial_EntityFactoryQosPolicy_;
00479 DDS::WriterDataLifecycleQosPolicy initial_WriterDataLifecycleQosPolicy_;
00480 DDS::ReaderDataLifecycleQosPolicy initial_ReaderDataLifecycleQosPolicy_;
00481
00482 DDS::DomainParticipantQos initial_DomainParticipantQos_;
00483 DDS::TopicQos initial_TopicQos_;
00484 DDS::DataWriterQos initial_DataWriterQos_;
00485 DDS::PublisherQos initial_PublisherQos_;
00486 DDS::DataReaderQos initial_DataReaderQos_;
00487 DDS::SubscriberQos initial_SubscriberQos_;
00488 DDS::DomainParticipantFactoryQos initial_DomainParticipantFactoryQos_;
00489
00490
00491
00492 size_t n_chunks_;
00493
00494
00495
00496
00497 size_t association_chunk_multiplier_;
00498
00499
00500 int liveliness_factor_;
00501
00502
00503 ACE_TString bit_transport_ip_;
00504
00505
00506 int bit_transport_port_;
00507
00508 bool bit_enabled_;
00509
00510
00511
00512 int bit_lookup_duration_msec_;
00513
00514
00515 ACE_CString default_address_;
00516
00517
00518
00519 ACE_Configuration_Heap cf_;
00520
00521
00522
00523
00524 ACE_TString global_transport_config_;
00525
00526 public:
00527
00528
00529 MonitorFactory* monitor_factory_;
00530
00531
00532 Monitor* monitor_;
00533
00534 private:
00535
00536 int federation_recovery_duration_;
00537
00538
00539 int federation_initial_backoff_seconds_;
00540
00541
00542 int federation_backoff_multiplier_;
00543
00544
00545 int federation_liveliness_;
00546
00547
00548 ACE_TString schedulerString_;
00549
00550
00551 ACE_Time_Value schedulerQuantum_;
00552
00553 #if defined OPENDDS_SAFETY_PROFILE && defined ACE_HAS_ALLOC_HOOKS
00554
00555 size_t pool_size_;
00556
00557
00558 size_t pool_granularity_;
00559 #endif
00560
00561
00562 long scheduler_;
00563
00564
00565 int priority_min_;
00566
00567
00568 int priority_max_;
00569
00570
00571 bool publisher_content_filter_;
00572
00573 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
00574
00575
00576 std::auto_ptr<DataDurabilityCache> transient_data_cache_;
00577
00578
00579 std::auto_ptr<DataDurabilityCache> persistent_data_cache_;
00580
00581
00582 ACE_CString persistent_data_dir_;
00583
00584 #endif
00585
00586
00587
00588 ACE_Time_Value pending_timeout_;
00589
00590
00591 bool shut_down_;
00592
00593
00594 ACE_Recursive_Thread_Mutex maps_lock_;
00595
00596 static int zero_argc;
00597 };
00598
00599 # define TheServiceParticipant OpenDDS::DCPS::Service_Participant::instance()
00600
00601 # define TheParticipantFactory TheServiceParticipant->get_domain_participant_factory()
00602
00603 # define TheParticipantFactoryWithArgs(argc, argv) TheServiceParticipant->get_domain_participant_factory(argc, argv)
00604
00605 }
00606 }
00607
00608 #if defined(__ACE_INLINE__)
00609 #include "Service_Participant.inl"
00610 #endif
00611
00612 #endif