00001
00002
00003
00004
00005
00006
00007
00008 #ifndef OPENDDS_DCPS_DOMAIN_PARTICIPANT_IMPL_H
00009 #define OPENDDS_DCPS_DOMAIN_PARTICIPANT_IMPL_H
00010
00011 #include "dds/DdsDcpsPublicationC.h"
00012 #include "dds/DdsDcpsSubscriptionExtC.h"
00013 #include "dds/DdsDcpsTopicC.h"
00014 #include "dds/DdsDcpsDomainC.h"
00015 #include "dds/DdsDcpsInfoUtilsC.h"
00016 #include "dds/DCPS/GuidUtils.h"
00017 #include "dds/DdsDcpsInfrastructureC.h"
00018
00019 #if !defined (DDS_HAS_MINIMUM_BIT)
00020 #include "dds/DdsDcpsCoreTypeSupportC.h"
00021 #endif // !defined (DDS_HAS_MINIMUM_BIT)
00022
00023 #include "EntityImpl.h"
00024 #include "Definitions.h"
00025 #include "TopicImpl.h"
00026 #include "InstanceHandle.h"
00027 #include "OwnershipManager.h"
00028 #include "GuidBuilder.h"
00029
00030 #include "dds/DCPS/transport/framework/TransportImpl_rch.h"
00031
00032 #include "dds/DCPS/PoolAllocator.h"
00033
00034 #include "Recorder.h"
00035 #include "Replayer.h"
00036
00037 #include "dds/DCPS/security/framework/SecurityConfig_rch.h"
00038
00039 #include "ace/Null_Mutex.h"
00040 #include "ace/Condition_Thread_Mutex.h"
00041 #include "ace/Recursive_Thread_Mutex.h"
00042
00043 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00044 #pragma once
00045 #endif
00046
00047 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00048
00049 namespace OpenDDS {
00050 namespace DCPS {
00051
00052 class PublisherImpl;
00053 class SubscriberImpl;
00054 class DataWriterImpl;
00055 class DomainParticipantFactoryImpl;
00056 class Monitor;
00057
00058
00059 class RecorderImpl;
00060 class ReplayerImpl;
00061
00062 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00063 class FilterEvaluator;
00064 #endif
00065
00066
00067
00068
00069
00070
00071
00072
00073
00074
00075
00076
00077
00078 class OpenDDS_Dcps_Export DomainParticipantImpl
00079 : public virtual OpenDDS::DCPS::LocalObject<DDS::DomainParticipant>
00080 , public virtual OpenDDS::DCPS::EntityImpl
00081 , public virtual ACE_Event_Handler
00082 {
00083 public:
00084 typedef Objref_Servant_Pair <SubscriberImpl, DDS::Subscriber,
00085 DDS::Subscriber_ptr, DDS::Subscriber_var> Subscriber_Pair;
00086
00087 typedef Objref_Servant_Pair <PublisherImpl, DDS::Publisher,
00088 DDS::Publisher_ptr, DDS::Publisher_var> Publisher_Pair;
00089
00090 typedef Objref_Servant_Pair <TopicImpl, DDS::Topic,
00091 DDS::Topic_ptr, DDS::Topic_var> Topic_Pair;
00092
00093 typedef OPENDDS_SET(Subscriber_Pair) SubscriberSet;
00094 typedef OPENDDS_SET(Publisher_Pair) PublisherSet;
00095
00096 class OpenDDS_Dcps_Export RepoIdSequence {
00097 public:
00098 explicit RepoIdSequence(const RepoId& base);
00099 RepoId next();
00100 private:
00101 RepoId base_;
00102 long serial_;
00103 GuidBuilder builder_;
00104 };
00105
00106 struct RefCounted_Topic {
00107 RefCounted_Topic()
00108 : client_refs_(0)
00109 {
00110 }
00111
00112 explicit RefCounted_Topic(const Topic_Pair& pair)
00113 : pair_(pair),
00114 client_refs_(1)
00115 {
00116 }
00117
00118
00119 Topic_Pair pair_;
00120
00121 CORBA::ULong client_refs_;
00122 };
00123
00124 typedef OPENDDS_MAP(OPENDDS_STRING, RefCounted_Topic) TopicMap;
00125
00126 typedef OPENDDS_MAP(OPENDDS_STRING, DDS::TopicDescription_var) TopicDescriptionMap;
00127
00128 typedef OPENDDS_MAP_CMP(RepoId, DDS::InstanceHandle_t, GUID_tKeyLessThan) HandleMap;
00129 typedef OPENDDS_MAP(DDS::InstanceHandle_t, RepoId) RepoIdMap;
00130
00131 DomainParticipantImpl(DomainParticipantFactoryImpl * factory,
00132 const DDS::DomainId_t& domain_id,
00133 const DDS::DomainParticipantQos & qos,
00134 DDS::DomainParticipantListener_ptr a_listener,
00135 const DDS::StatusMask & mask);
00136
00137 virtual ~DomainParticipantImpl();
00138
00139 virtual DDS::InstanceHandle_t get_instance_handle();
00140
00141 virtual DDS::Publisher_ptr create_publisher(
00142 const DDS::PublisherQos & qos,
00143 DDS::PublisherListener_ptr a_listener,
00144 DDS::StatusMask mask);
00145
00146 virtual DDS::ReturnCode_t delete_publisher(
00147 DDS::Publisher_ptr p);
00148
00149 virtual DDS::Subscriber_ptr create_subscriber(
00150 const DDS::SubscriberQos & qos,
00151 DDS::SubscriberListener_ptr a_listener,
00152 DDS::StatusMask mask);
00153
00154 virtual DDS::ReturnCode_t delete_subscriber(
00155 DDS::Subscriber_ptr s);
00156
00157 virtual DDS::Subscriber_ptr get_builtin_subscriber();
00158
00159 virtual DDS::Topic_ptr create_topic(
00160 const char * topic_name,
00161 const char * type_name,
00162 const DDS::TopicQos & qos,
00163 DDS::TopicListener_ptr a_listener,
00164 DDS::StatusMask mask);
00165
00166 virtual DDS::ReturnCode_t delete_topic(
00167 DDS::Topic_ptr a_topic);
00168
00169 virtual DDS::Topic_ptr find_topic(
00170 const char * topic_name,
00171 const DDS::Duration_t & timeout);
00172
00173 virtual DDS::TopicDescription_ptr lookup_topicdescription(
00174 const char * name);
00175
00176 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00177
00178 virtual DDS::ContentFilteredTopic_ptr create_contentfilteredtopic(
00179 const char * name,
00180 DDS::Topic_ptr related_topic,
00181 const char * filter_expression,
00182 const DDS::StringSeq & expression_parameters);
00183
00184 virtual DDS::ReturnCode_t delete_contentfilteredtopic(
00185 DDS::ContentFilteredTopic_ptr a_contentfilteredtopic);
00186
00187 #endif
00188
00189 #ifndef OPENDDS_NO_MULTI_TOPIC
00190
00191 virtual DDS::MultiTopic_ptr create_multitopic(
00192 const char * name,
00193 const char * type_name,
00194 const char * subscription_expression,
00195 const DDS::StringSeq & expression_parameters);
00196
00197 virtual DDS::ReturnCode_t delete_multitopic(DDS::MultiTopic_ptr a_multitopic);
00198
00199 #endif
00200
00201 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00202
00203 RcHandle<FilterEvaluator> get_filter_eval(const char* filter);
00204 void deref_filter_eval(const char* filter);
00205
00206 #endif
00207
00208 virtual DDS::ReturnCode_t delete_contained_entities();
00209
00210 virtual CORBA::Boolean contains_entity(DDS::InstanceHandle_t a_handle);
00211
00212 virtual DDS::ReturnCode_t set_qos(
00213 const DDS::DomainParticipantQos & qos);
00214
00215 virtual DDS::ReturnCode_t get_qos(
00216 DDS::DomainParticipantQos & qos);
00217
00218 virtual DDS::ReturnCode_t set_listener(
00219 DDS::DomainParticipantListener_ptr a_listener,
00220 DDS::StatusMask mask);
00221
00222 virtual DDS::DomainParticipantListener_ptr get_listener();
00223
00224 virtual DDS::ReturnCode_t ignore_participant(
00225 DDS::InstanceHandle_t handle);
00226
00227 virtual DDS::ReturnCode_t ignore_topic(
00228 DDS::InstanceHandle_t handle);
00229
00230 virtual DDS::ReturnCode_t ignore_publication(
00231 DDS::InstanceHandle_t handle);
00232
00233 virtual DDS::ReturnCode_t ignore_subscription(
00234 DDS::InstanceHandle_t handle);
00235
00236 virtual DDS::DomainId_t get_domain_id();
00237
00238 virtual DDS::ReturnCode_t assert_liveliness();
00239
00240 virtual DDS::ReturnCode_t set_default_publisher_qos(
00241 const DDS::PublisherQos & qos);
00242
00243 virtual DDS::ReturnCode_t get_default_publisher_qos(
00244 DDS::PublisherQos & qos);
00245
00246 virtual DDS::ReturnCode_t set_default_subscriber_qos(
00247 const DDS::SubscriberQos & qos);
00248
00249 virtual DDS::ReturnCode_t get_default_subscriber_qos(
00250 DDS::SubscriberQos & qos);
00251
00252 virtual DDS::ReturnCode_t set_default_topic_qos(
00253 const DDS::TopicQos & qos);
00254
00255 virtual DDS::ReturnCode_t get_default_topic_qos(
00256 DDS::TopicQos & qos);
00257
00258 virtual DDS::ReturnCode_t get_current_time(
00259 DDS::Time_t & current_time);
00260
00261 #if !defined (DDS_HAS_MINIMUM_BIT)
00262
00263 virtual DDS::ReturnCode_t get_discovered_participants(
00264 DDS::InstanceHandleSeq & participant_handles);
00265
00266 virtual DDS::ReturnCode_t get_discovered_participant_data(
00267 DDS::ParticipantBuiltinTopicData & participant_data,
00268 DDS::InstanceHandle_t participant_handle);
00269
00270 virtual DDS::ReturnCode_t get_discovered_topics(
00271 DDS::InstanceHandleSeq & topic_handles);
00272
00273 virtual DDS::ReturnCode_t get_discovered_topic_data(
00274 DDS::TopicBuiltinTopicData & topic_data,
00275 DDS::InstanceHandle_t topic_handle);
00276
00277 #endif
00278
00279 virtual DDS::ReturnCode_t enable();
00280
00281
00282
00283
00284
00285
00286
00287 RepoId get_id();
00288
00289
00290
00291
00292 OPENDDS_STRING get_unique_id();
00293
00294
00295
00296
00297 DDS::InstanceHandle_t id_to_handle(const RepoId& id);
00298
00299
00300
00301
00302
00303 RepoId get_repoid(const DDS::InstanceHandle_t& id);
00304
00305
00306
00307
00308 bool is_clean() const;
00309
00310
00311
00312
00313
00314
00315
00316 DDS::DomainParticipantListener_ptr listener_for(DDS::StatusKind kind);
00317
00318 typedef OPENDDS_VECTOR(RepoId) TopicIdVec;
00319
00320
00321
00322
00323 void get_topic_ids(TopicIdVec& topics);
00324
00325 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00326
00327
00328
00329 OwnershipManager* ownership_manager();
00330
00331
00332
00333
00334
00335
00336 void update_ownership_strength(const PublicationId& pub_id,
00337 const CORBA::Long& ownership_strength);
00338
00339 #endif
00340
00341 bool federated() const {
00342 return this->federated_;
00343 }
00344
00345
00346 Recorder_ptr create_recorder(DDS::Topic_ptr a_topic,
00347 const DDS::SubscriberQos & subscriber_qos,
00348 const DDS::DataReaderQos & datareader_qos,
00349 const RecorderListener_rch & a_listener,
00350 DDS::StatusMask mask);
00351
00352 Replayer_ptr create_replayer(DDS::Topic_ptr a_topic,
00353 const DDS::PublisherQos & publisher_qos,
00354 const DDS::DataWriterQos & datawriter_qos,
00355 const ReplayerListener_rch & a_listener,
00356 DDS::StatusMask mask);
00357
00358 DDS::Topic_ptr create_typeless_topic(
00359 const char * topic_name,
00360 const char * type_name,
00361 bool type_has_keys,
00362 const DDS::TopicQos & qos,
00363 DDS::TopicListener_ptr a_listener,
00364 DDS::StatusMask mask);
00365
00366 void delete_recorder(Recorder_ptr recorder);
00367 void delete_replayer(Replayer_ptr replayer);
00368
00369 void add_adjust_liveliness_timers(DataWriterImpl* writer);
00370 void remove_adjust_liveliness_timers();
00371
00372 #if defined(OPENDDS_SECURITY)
00373 void set_security_config(const Security::SecurityConfig_rch& config);
00374
00375 DDS::Security::ParticipantCryptoHandle crypto_handle() const
00376 {
00377 return part_crypto_handle_;
00378 }
00379 #endif
00380
00381 private:
00382
00383 bool validate_publisher_qos(DDS::PublisherQos & publisher_qos);
00384 bool validate_subscriber_qos(DDS::SubscriberQos & subscriber_qos);
00385
00386
00387
00388
00389 enum {
00390 TOPIC_TYPE_HAS_KEYS =1,
00391 TOPIC_TYPELESS = 2
00392 } TopicTypeMask;
00393
00394 DDS::Topic_ptr create_topic_i(
00395 const char * topic_name,
00396 const char * type_name,
00397 const DDS::TopicQos & qos,
00398 DDS::TopicListener_ptr a_listener,
00399 DDS::StatusMask mask,
00400 int topic_mask);
00401
00402 DDS::Topic_ptr create_new_topic(
00403 const RepoId topic_id,
00404 const char * topic_name,
00405 const char * type_name,
00406 const DDS::TopicQos & qos,
00407 DDS::TopicListener_ptr a_listener,
00408 const DDS::StatusMask & mask,
00409 OpenDDS::DCPS::TypeSupport_ptr type_support);
00410
00411
00412
00413
00414 DDS::ReturnCode_t delete_topic_i(
00415 DDS::Topic_ptr a_topic,
00416 bool remove_objref);
00417
00418 DomainParticipantFactoryImpl* factory_;
00419
00420 DDS::TopicQos default_topic_qos_;
00421
00422 DDS::PublisherQos default_publisher_qos_;
00423
00424 DDS::SubscriberQos default_subscriber_qos_;
00425
00426
00427 DDS::DomainParticipantQos qos_;
00428
00429 DDS::DomainParticipantListener_var listener_;
00430
00431
00432 DDS::StatusMask listener_mask_;
00433
00434 #if defined(OPENDDS_SECURITY)
00435
00436 DDS::Security::IdentityHandle id_handle_;
00437
00438 DDS::Security::PermissionsHandle perm_handle_;
00439
00440 DDS::Security::ParticipantCryptoHandle part_crypto_handle_;
00441 #endif
00442
00443
00444 const DDS::DomainId_t domain_id_;
00445
00446 RepoId dp_id_;
00447
00448
00449
00450 bool federated_;
00451
00452
00453 PublisherSet publishers_;
00454
00455 SubscriberSet subscribers_;
00456
00457 TopicMap topics_;
00458 #if !defined(OPENDDS_NO_CONTENT_FILTERED_TOPIC) || !defined(OPENDDS_NO_MULTI_TOPIC)
00459
00460 TopicDescriptionMap topic_descrs_;
00461 #endif
00462
00463 HandleMap handles_;
00464 RepoIdMap repoIds_;
00465
00466 HandleMap ignored_participants_;
00467
00468 HandleMap ignored_topics_;
00469
00470 ACE_Recursive_Thread_Mutex publishers_protector_;
00471
00472 ACE_Recursive_Thread_Mutex subscribers_protector_;
00473
00474 ACE_Recursive_Thread_Mutex topics_protector_;
00475
00476 ACE_Recursive_Thread_Mutex handle_protector_;
00477
00478 ACE_Thread_Mutex shutdown_mutex_;
00479 ACE_Condition<ACE_Thread_Mutex> shutdown_condition_;
00480 DDS::ReturnCode_t shutdown_result_;
00481 bool shutdown_complete_;
00482
00483
00484 DDS::Subscriber_var bit_subscriber_;
00485
00486
00487
00488 InstanceHandleGenerator participant_handles_;
00489
00490 Monitor* monitor_;
00491
00492 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00493 OwnershipManager owner_man_;
00494 #endif
00495
00496
00497 RepoIdSequence pub_id_gen_;
00498 RepoId nextPubId();
00499
00500 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00501 ACE_Thread_Mutex filter_cache_lock_;
00502 OPENDDS_MAP(OPENDDS_STRING, RcHandle<FilterEvaluator> ) filter_cache_;
00503 #endif
00504
00505 typedef OPENDDS_SET_CMP(Recorder_var, VarLess<Recorder> ) RecorderSet;
00506 typedef OPENDDS_SET_CMP(Replayer_var, VarLess<Replayer> ) ReplayerSet;
00507
00508 RecorderSet recorders_;
00509 ReplayerSet replayers_;
00510
00511 #if defined(OPENDDS_SECURITY)
00512 Security::SecurityConfig_rch security_config_;
00513 #endif
00514
00515
00516 ACE_Recursive_Thread_Mutex recorders_protector_;
00517
00518 ACE_Recursive_Thread_Mutex replayers_protector_;
00519
00520 class LivelinessTimer : public ACE_Event_Handler {
00521 public:
00522 LivelinessTimer(DomainParticipantImpl& impl, DDS::LivelinessQosPolicyKind kind);
00523 virtual ~LivelinessTimer();
00524 void add_adjust(OpenDDS::DCPS::DataWriterImpl* writer);
00525 void remove_adjust();
00526 int handle_timeout(const ACE_Time_Value &tv, const void * );
00527 virtual void dispatch(const ACE_Time_Value& tv) = 0;
00528
00529 protected:
00530 DomainParticipantImpl& impl_;
00531 const DDS::LivelinessQosPolicyKind kind_;
00532
00533 ACE_Time_Value interval () const { return interval_; }
00534
00535 private:
00536 ACE_Time_Value interval_;
00537 bool recalculate_interval_;
00538 ACE_Time_Value last_liveliness_check_;
00539 bool scheduled_;
00540 ACE_Thread_Mutex lock_;
00541 };
00542
00543 class AutomaticLivelinessTimer : public LivelinessTimer {
00544 public:
00545 AutomaticLivelinessTimer(DomainParticipantImpl& impl);
00546 virtual void dispatch(const ACE_Time_Value& tv);
00547 };
00548 AutomaticLivelinessTimer automatic_liveliness_timer_;
00549
00550 class ParticipantLivelinessTimer : public LivelinessTimer {
00551 public:
00552 ParticipantLivelinessTimer(DomainParticipantImpl& impl);
00553 virtual void dispatch(const ACE_Time_Value& tv);
00554 };
00555 ParticipantLivelinessTimer participant_liveliness_timer_;
00556
00557 ACE_Time_Value liveliness_check_interval(DDS::LivelinessQosPolicyKind kind);
00558 bool participant_liveliness_activity_after(const ACE_Time_Value& tv);
00559 void signal_liveliness(DDS::LivelinessQosPolicyKind kind);
00560
00561 ACE_Time_Value last_liveliness_activity_;
00562
00563 virtual int handle_exception(ACE_HANDLE fd);
00564 };
00565
00566 }
00567 }
00568
00569 OPENDDS_END_VERSIONED_NAMESPACE_DECL
00570
00571 #endif