Service_Participant.h
Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008 #ifndef OPENDDS_DDS_DCPS_SERVICE_PARTICIPANT_H
00009 #define OPENDDS_DDS_DCPS_SERVICE_PARTICIPANT_H
00010
00011 #include "dds/DdsDcpsInfrastructureC.h"
00012 #include "dds/DdsDcpsDomainC.h"
00013 #include "dds/DdsDcpsInfoUtilsC.h"
00014
00015 #include "dds/DCPS/Definitions.h"
00016 #include "dds/DCPS/MonitorFactory.h"
00017 #include "dds/DCPS/Discovery.h"
00018 #include "dds/DCPS/PoolAllocator.h"
00019 #include "dds/DCPS/DomainParticipantFactoryImpl.h"
00020 #include "dds/DCPS/unique_ptr.h"
00021
00022
00023 #include "ace/Task.h"
00024 #include "ace/Configuration.h"
00025 #include "ace/Time_Value.h"
00026 #include "ace/ARGV.h"
00027 #include "ace/Barrier.h"
00028
00029 #include "Recorder.h"
00030 #include "Replayer.h"
00031 #include <memory>
00032
00033 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00034 #pragma once
00035 #endif
00036
00037 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00038
00039 namespace OpenDDS {
00040 namespace DCPS {
00041
00042 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
00043 class DataDurabilityCache;
00044 #endif
00045 class Monitor;
00046
00047 const char DEFAULT_ORB_NAME[] = "OpenDDS_DCPS";
00048
00049
00050
00051
00052
00053
00054
00055
00056
00057
00058
00059
00060
00061 class OpenDDS_Dcps_Export Service_Participant {
00062 public:
00063
00064
00065 enum { ANY_DOMAIN = -1 };
00066
00067 Service_Participant();
00068
00069 ~Service_Participant();
00070
00071
00072 static Service_Participant* instance();
00073
00074
00075
00076 ACE_Reactor_Timer_Interface* timer() const;
00077
00078 ACE_Reactor* reactor() const;
00079
00080 ACE_thread_t reactor_owner() const;
00081
00082
00083
00084
00085
00086
00087
00088 DDS::DomainParticipantFactory_ptr get_domain_participant_factory(
00089 int &argc = zero_argc,
00090 ACE_TCHAR *argv[] = 0);
00091
00092 #ifdef ACE_USES_WCHAR
00093 DDS::DomainParticipantFactory_ptr
00094 get_domain_participant_factory(int &argc, char *argv[]);
00095 #endif
00096
00097
00098
00099
00100
00101
00102
00103 void shutdown();
00104
00105
00106 bool is_shut_down() const;
00107
00108
00109 Discovery_rch get_discovery(const DDS::DomainId_t domain);
00110
00111
00112 DDS::UserDataQosPolicy initial_UserDataQosPolicy() const;
00113 DDS::TopicDataQosPolicy initial_TopicDataQosPolicy() const;
00114 DDS::GroupDataQosPolicy initial_GroupDataQosPolicy() const;
00115 DDS::TransportPriorityQosPolicy initial_TransportPriorityQosPolicy() const;
00116 DDS::LifespanQosPolicy initial_LifespanQosPolicy() const;
00117 DDS::DurabilityQosPolicy initial_DurabilityQosPolicy() const;
00118 DDS::DurabilityServiceQosPolicy initial_DurabilityServiceQosPolicy() const;
00119 DDS::PresentationQosPolicy initial_PresentationQosPolicy() const;
00120 DDS::DeadlineQosPolicy initial_DeadlineQosPolicy() const;
00121 DDS::LatencyBudgetQosPolicy initial_LatencyBudgetQosPolicy() const;
00122 DDS::OwnershipQosPolicy initial_OwnershipQosPolicy() const;
00123 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00124 DDS::OwnershipStrengthQosPolicy initial_OwnershipStrengthQosPolicy() const;
00125 #endif
00126 DDS::LivelinessQosPolicy initial_LivelinessQosPolicy() const;
00127 DDS::TimeBasedFilterQosPolicy initial_TimeBasedFilterQosPolicy() const;
00128 DDS::PartitionQosPolicy initial_PartitionQosPolicy() const;
00129 DDS::ReliabilityQosPolicy initial_ReliabilityQosPolicy() const;
00130 DDS::DestinationOrderQosPolicy initial_DestinationOrderQosPolicy() const;
00131 DDS::HistoryQosPolicy initial_HistoryQosPolicy() const;
00132 DDS::ResourceLimitsQosPolicy initial_ResourceLimitsQosPolicy() const;
00133 DDS::EntityFactoryQosPolicy initial_EntityFactoryQosPolicy() const;
00134 DDS::WriterDataLifecycleQosPolicy initial_WriterDataLifecycleQosPolicy() const;
00135 DDS::ReaderDataLifecycleQosPolicy initial_ReaderDataLifecycleQosPolicy() const;
00136 DDS::PropertyQosPolicy initial_PropertyQosPolicy() const;
00137
00138 DDS::DomainParticipantFactoryQos initial_DomainParticipantFactoryQos() const;
00139 DDS::DomainParticipantQos initial_DomainParticipantQos() const;
00140 DDS::TopicQos initial_TopicQos() const;
00141 DDS::DataWriterQos initial_DataWriterQos() const;
00142 DDS::PublisherQos initial_PublisherQos() const;
00143 DDS::DataReaderQos initial_DataReaderQos() const;
00144 DDS::SubscriberQos initial_SubscriberQos() const;
00145
00146
00147
00148
00149
00150
00151
00152 size_t n_chunks() const;
00153
00154
00155
00156
00157
00158 void n_chunks(size_t chunks);
00159
00160
00161
00162
00163
00164
00165 size_t association_chunk_multiplier() const;
00166
00167
00168
00169
00170
00171
00172 void association_chunk_multiplier(size_t multiplier);
00173
00174
00175
00176
00177 void liveliness_factor(int factor);
00178
00179
00180
00181
00182 int liveliness_factor() const;
00183
00184
00185 void add_discovery(Discovery_rch discovery);
00186
00187 bool set_repo_ior(const char* ior,
00188 Discovery::RepoKey key = Discovery::DEFAULT_REPO,
00189 bool attach_participant = true);
00190
00191 #ifdef DDS_HAS_WCHAR
00192
00193 bool set_repo_ior(const wchar_t* ior,
00194 Discovery::RepoKey key = Discovery::DEFAULT_REPO,
00195 bool attach_participant = true);
00196 #endif
00197
00198 bool use_bidir_giop() const;
00199
00200
00201 void remap_domains(Discovery::RepoKey oldKey,
00202 Discovery::RepoKey newKey,
00203 bool attach_participant = true);
00204
00205
00206 void set_repo_domain(const DDS::DomainId_t domain,
00207 Discovery::RepoKey repo,
00208 bool attach_participant = true);
00209
00210 void set_default_discovery(const Discovery::RepoKey& defaultDiscovery);
00211 Discovery::RepoKey get_default_discovery();
00212
00213
00214 Discovery::RepoKey domain_to_repo(const DDS::DomainId_t domain) const;
00215
00216
00217 void repository_lost(Discovery::RepoKey key);
00218
00219
00220
00221 int& federation_recovery_duration();
00222 int federation_recovery_duration() const;
00223
00224
00225
00226
00227 int& federation_initial_backoff_seconds();
00228 int federation_initial_backoff_seconds() const;
00229
00230
00231
00232
00233 int& federation_backoff_multiplier();
00234 int federation_backoff_multiplier() const;
00235
00236
00237
00238
00239 int& federation_liveliness();
00240 int federation_liveliness() const;
00241
00242
00243
00244
00245 long& scheduler();
00246 long scheduler() const;
00247
00248
00249
00250
00251 bool& publisher_content_filter();
00252 bool publisher_content_filter() const;
00253
00254
00255
00256 ACE_Time_Value pending_timeout() const;
00257
00258
00259
00260 int priority_min() const;
00261 int priority_max() const;
00262
00263
00264
00265
00266
00267
00268
00269
00270
00271
00272
00273
00274 int bit_transport_port() const;
00275 void bit_transport_port(int port);
00276
00277
00278 OPENDDS_STRING bit_transport_ip() const;
00279
00280
00281
00282
00283
00284
00285
00286
00287 int bit_lookup_duration_msec() const;
00288 void bit_lookup_duration_msec(int msec);
00289
00290
00291 #if defined(OPENDDS_SECURITY)
00292 bool get_security() {
00293 return security_enabled_;
00294 }
00295
00296 void set_security(bool b) {
00297 security_enabled_ = b;
00298 }
00299 #endif
00300
00301 bool get_BIT() {
00302 return bit_enabled_;
00303 }
00304
00305 void set_BIT(bool b) {
00306 bit_enabled_ = b;
00307 }
00308
00309 ACE_CString default_address() const;
00310
00311 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
00312
00313
00314 DataDurabilityCache * get_data_durability_cache(
00315 DDS::DurabilityQosPolicy const & durability);
00316 #endif
00317
00318
00319 typedef OPENDDS_MAP(Discovery::RepoKey, Discovery_rch) RepoKeyDiscoveryMap;
00320 const RepoKeyDiscoveryMap& discoveryMap() const;
00321 typedef OPENDDS_MAP(DDS::DomainId_t, Discovery::RepoKey) DomainRepoMap;
00322 const DomainRepoMap& domainRepoMap() const;
00323
00324 void register_discovery_type(const char* section_name,
00325 Discovery::Config* cfg);
00326
00327 #ifndef OPENDDS_SAFETY_PROFILE
00328 ACE_ARGV* ORB_argv() { return &ORB_argv_; }
00329 #endif
00330
00331
00332
00333
00334 Recorder_ptr create_recorder(DDS::DomainParticipant_ptr participant,
00335 DDS::Topic_ptr a_topic,
00336 const DDS::SubscriberQos & subscriber_qos,
00337 const DDS::DataReaderQos & datareader_qos,
00338 const RecorderListener_rch & a_listener );
00339
00340
00341
00342
00343
00344
00345 DDS::ReturnCode_t delete_recorder(Recorder_ptr recorder);
00346
00347
00348
00349
00350 Replayer_ptr create_replayer(DDS::DomainParticipant_ptr participant,
00351 DDS::Topic_ptr a_topic,
00352 const DDS::PublisherQos & publisher_qos,
00353 const DDS::DataWriterQos & datawriter_qos,
00354 const ReplayerListener_rch & a_listener );
00355
00356
00357
00358
00359 DDS::ReturnCode_t delete_replayer(Replayer_ptr replayer);
00360
00361
00362
00363
00364 DDS::Topic_ptr create_typeless_topic(DDS::DomainParticipant_ptr participant,
00365 const char * topic_name,
00366 const char * type_name,
00367 bool type_has_keys,
00368 const DDS::TopicQos & qos,
00369 DDS::TopicListener_ptr a_listener = 0,
00370 DDS::StatusMask mask = 0);
00371
00372
00373
00374
00375
00376
00377
00378
00379 int load_configuration(ACE_Configuration_Heap& cf,
00380 const ACE_TCHAR* filename);
00381
00382 #ifdef OPENDDS_SAFETY_PROFILE
00383
00384
00385
00386 void configure_pool();
00387 #endif
00388
00389 private:
00390
00391
00392 void initialize();
00393
00394
00395 void initializeScheduling();
00396
00397
00398
00399
00400
00401 int parse_args(int &argc, ACE_TCHAR *argv[]);
00402
00403
00404
00405
00406
00407
00408
00409
00410 int load_configuration();
00411
00412
00413
00414
00415
00416
00417
00418
00419 int load_common_configuration(ACE_Configuration_Heap& cf,
00420 const ACE_TCHAR* filename);
00421
00422
00423
00424
00425
00426 int load_domain_configuration(ACE_Configuration_Heap& cf,
00427 const ACE_TCHAR* filename);
00428
00429
00430
00431
00432
00433 int load_discovery_configuration(ACE_Configuration_Heap& cf,
00434 const ACE_TCHAR* section_name);
00435
00436 typedef OPENDDS_MAP(OPENDDS_STRING, container_supported_unique_ptr<Discovery::Config>) DiscoveryTypes;
00437 DiscoveryTypes discovery_types_;
00438
00439 #ifndef OPENDDS_SAFETY_PROFILE
00440 ACE_ARGV ORB_argv_;
00441 #endif
00442
00443 unique_ptr<ACE_Reactor> reactor_;
00444 ACE_thread_t reactor_owner_;
00445
00446 struct ReactorTask : ACE_Task_Base {
00447 ReactorTask()
00448 : barrier_(2)
00449 { }
00450 int svc();
00451 void wait_for_startup() { barrier_.wait(); }
00452 private:
00453 ACE_Barrier barrier_;
00454 } reactor_task_;
00455
00456 RcHandle<DomainParticipantFactoryImpl> dp_factory_servant_;
00457
00458
00459 RepoKeyDiscoveryMap discoveryMap_;
00460
00461
00462 DomainRepoMap domainRepoMap_;
00463
00464 Discovery::RepoKey defaultDiscovery_;
00465
00466
00467
00468 TAO_SYNCH_MUTEX factory_lock_;
00469
00470
00471 DDS::UserDataQosPolicy initial_UserDataQosPolicy_;
00472 DDS::TopicDataQosPolicy initial_TopicDataQosPolicy_;
00473 DDS::GroupDataQosPolicy initial_GroupDataQosPolicy_;
00474 DDS::TransportPriorityQosPolicy initial_TransportPriorityQosPolicy_;
00475 DDS::LifespanQosPolicy initial_LifespanQosPolicy_;
00476 DDS::DurabilityQosPolicy initial_DurabilityQosPolicy_;
00477 DDS::DurabilityServiceQosPolicy initial_DurabilityServiceQosPolicy_;
00478 DDS::PresentationQosPolicy initial_PresentationQosPolicy_;
00479 DDS::DeadlineQosPolicy initial_DeadlineQosPolicy_;
00480 DDS::LatencyBudgetQosPolicy initial_LatencyBudgetQosPolicy_;
00481 DDS::OwnershipQosPolicy initial_OwnershipQosPolicy_;
00482 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00483 DDS::OwnershipStrengthQosPolicy initial_OwnershipStrengthQosPolicy_;
00484 #endif
00485 DDS::LivelinessQosPolicy initial_LivelinessQosPolicy_;
00486 DDS::TimeBasedFilterQosPolicy initial_TimeBasedFilterQosPolicy_;
00487 DDS::PartitionQosPolicy initial_PartitionQosPolicy_;
00488 DDS::ReliabilityQosPolicy initial_ReliabilityQosPolicy_;
00489 DDS::DestinationOrderQosPolicy initial_DestinationOrderQosPolicy_;
00490 DDS::HistoryQosPolicy initial_HistoryQosPolicy_;
00491 DDS::ResourceLimitsQosPolicy initial_ResourceLimitsQosPolicy_;
00492 DDS::EntityFactoryQosPolicy initial_EntityFactoryQosPolicy_;
00493 DDS::WriterDataLifecycleQosPolicy initial_WriterDataLifecycleQosPolicy_;
00494 DDS::ReaderDataLifecycleQosPolicy initial_ReaderDataLifecycleQosPolicy_;
00495 DDS::PropertyQosPolicy initial_PropertyQosPolicy_;
00496
00497 DDS::DomainParticipantQos initial_DomainParticipantQos_;
00498 DDS::TopicQos initial_TopicQos_;
00499 DDS::DataWriterQos initial_DataWriterQos_;
00500 DDS::PublisherQos initial_PublisherQos_;
00501 DDS::DataReaderQos initial_DataReaderQos_;
00502 DDS::SubscriberQos initial_SubscriberQos_;
00503 DDS::DomainParticipantFactoryQos initial_DomainParticipantFactoryQos_;
00504
00505
00506
00507 size_t n_chunks_;
00508
00509
00510
00511
00512 size_t association_chunk_multiplier_;
00513
00514
00515 int liveliness_factor_;
00516
00517
00518 ACE_TString bit_transport_ip_;
00519
00520
00521 int bit_transport_port_;
00522
00523 bool bit_enabled_;
00524
00525 #if defined(OPENDDS_SECURITY)
00526 bool security_enabled_;
00527 #endif
00528
00529
00530
00531 int bit_lookup_duration_msec_;
00532
00533
00534 ACE_CString default_address_;
00535
00536
00537
00538 ACE_Configuration_Heap cf_;
00539
00540
00541
00542
00543 ACE_TString global_transport_config_;
00544
00545 public:
00546
00547
00548 MonitorFactory* monitor_factory_;
00549
00550
00551 unique_ptr<Monitor> monitor_;
00552
00553 private:
00554
00555 int federation_recovery_duration_;
00556
00557
00558 int federation_initial_backoff_seconds_;
00559
00560
00561 int federation_backoff_multiplier_;
00562
00563
00564 int federation_liveliness_;
00565
00566
00567 ACE_TString schedulerString_;
00568
00569
00570 ACE_Time_Value schedulerQuantum_;
00571
00572 #if defined OPENDDS_SAFETY_PROFILE && defined ACE_HAS_ALLOC_HOOKS
00573
00574 size_t pool_size_;
00575
00576
00577 size_t pool_granularity_;
00578 #endif
00579
00580
00581 long scheduler_;
00582
00583
00584 int priority_min_;
00585
00586
00587 int priority_max_;
00588
00589
00590 bool publisher_content_filter_;
00591
00592 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
00593
00594
00595 unique_ptr<DataDurabilityCache> transient_data_cache_;
00596
00597
00598 unique_ptr<DataDurabilityCache> persistent_data_cache_;
00599
00600
00601 ACE_CString persistent_data_dir_;
00602
00603 #endif
00604
00605
00606
00607 ACE_Time_Value pending_timeout_;
00608
00609
00610 bool bidir_giop_;
00611
00612
00613 bool monitor_enabled_;
00614
00615
00616 bool shut_down_;
00617
00618
00619 ACE_Recursive_Thread_Mutex maps_lock_;
00620
00621 static int zero_argc;
00622 };
00623
00624 # define TheServiceParticipant OpenDDS::DCPS::Service_Participant::instance()
00625
00626 # define TheParticipantFactory TheServiceParticipant->get_domain_participant_factory()
00627
00628 # define TheParticipantFactoryWithArgs(argc, argv) TheServiceParticipant->get_domain_participant_factory(argc, argv)
00629
00630 }
00631 }
00632
00633 OPENDDS_END_VERSIONED_NAMESPACE_DECL
00634
00635 #if defined(__ACE_INLINE__)
00636 #include "Service_Participant.inl"
00637 #endif
00638
00639 #endif