00001
00002
00003
00004
00005
00006
00007
00008 #ifndef OPENDDS_DCPS_DOMAIN_PARTICIPANT_IMPL_H
00009 #define OPENDDS_DCPS_DOMAIN_PARTICIPANT_IMPL_H
00010
00011 #include "EntityImpl.h"
00012 #include "Definitions.h"
00013 #include "TopicImpl.h"
00014 #include "InstanceHandle.h"
00015 #include "OwnershipManager.h"
00016 #include "GuidBuilder.h"
00017 #include "dds/DdsDcpsPublicationC.h"
00018 #include "dds/DdsDcpsSubscriptionExtC.h"
00019 #include "dds/DdsDcpsTopicC.h"
00020 #include "dds/DdsDcpsDomainC.h"
00021 #include "dds/DdsDcpsInfoUtilsC.h"
00022 #include "dds/DCPS/GuidUtils.h"
00023 #include "dds/DdsDcpsInfrastructureC.h"
00024
00025 #if !defined (DDS_HAS_MINIMUM_BIT)
00026 #include "dds/DdsDcpsCoreTypeSupportC.h"
00027 #endif // !defined (DDS_HAS_MINIMUM_BIT)
00028
00029 #include "dds/DCPS/transport/framework/TransportImpl_rch.h"
00030 #include "ace/Null_Mutex.h"
00031 #include "ace/Recursive_Thread_Mutex.h"
00032 #include "ace/Condition_T.h"
00033
00034 #include "dds/DCPS/PoolAllocator.h"
00035
00036 #include "Recorder.h"
00037 #include "Replayer.h"
00038
00039 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00040 #pragma once
00041 #endif
00042
00043 namespace OpenDDS {
00044 namespace DCPS {
00045
00046 class PublisherImpl;
00047 class SubscriberImpl;
00048 class DataWriterImpl;
00049 class DomainParticipantFactoryImpl;
00050 class Monitor;
00051
00052
00053 class RecorderImpl;
00054 class ReplayerImpl;
00055
00056 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00057 class FilterEvaluator;
00058 #endif
00059
00060
00061
00062
00063
00064
00065
00066
00067
00068
00069
00070
00071
00072 class OpenDDS_Dcps_Export DomainParticipantImpl
00073 : public virtual OpenDDS::DCPS::LocalObject<DDS::DomainParticipant>
00074 , public virtual OpenDDS::DCPS::EntityImpl
00075 , public virtual ACE_Event_Handler
00076 {
00077 public:
00078 typedef Objref_Servant_Pair <SubscriberImpl, DDS::Subscriber,
00079 DDS::Subscriber_ptr, DDS::Subscriber_var> Subscriber_Pair;
00080
00081 typedef Objref_Servant_Pair <PublisherImpl, DDS::Publisher,
00082 DDS::Publisher_ptr, DDS::Publisher_var> Publisher_Pair;
00083
00084 typedef Objref_Servant_Pair <TopicImpl, DDS::Topic,
00085 DDS::Topic_ptr, DDS::Topic_var> Topic_Pair;
00086
00087 typedef OPENDDS_SET(Subscriber_Pair) SubscriberSet;
00088 typedef OPENDDS_SET(Publisher_Pair) PublisherSet;
00089
00090 class OpenDDS_Dcps_Export RepoIdSequence {
00091 public:
00092 explicit RepoIdSequence(RepoId& base);
00093 RepoId next();
00094 private:
00095 RepoId base_;
00096 long serial_;
00097 GuidBuilder builder_;
00098 };
00099
00100 struct RefCounted_Topic {
00101 RefCounted_Topic()
00102 : client_refs_(0)
00103 {
00104 }
00105
00106 explicit RefCounted_Topic(const Topic_Pair& pair)
00107 : pair_(pair),
00108 client_refs_(1)
00109 {
00110 }
00111
00112
00113 Topic_Pair pair_;
00114
00115 CORBA::Long client_refs_;
00116 };
00117
00118 typedef OPENDDS_MAP(OPENDDS_STRING, RefCounted_Topic) TopicMap;
00119
00120 typedef OPENDDS_MAP(OPENDDS_STRING, DDS::TopicDescription_var) TopicDescriptionMap;
00121
00122 typedef OPENDDS_MAP_CMP(RepoId, DDS::InstanceHandle_t, GUID_tKeyLessThan) HandleMap;
00123 typedef OPENDDS_MAP(DDS::InstanceHandle_t, RepoId) RepoIdMap;
00124
00125
00126 DomainParticipantImpl(DomainParticipantFactoryImpl * factory,
00127 const DDS::DomainId_t& domain_id,
00128 const RepoId& dp_id,
00129 const DDS::DomainParticipantQos & qos,
00130 DDS::DomainParticipantListener_ptr a_listener,
00131 const DDS::StatusMask & mask,
00132 bool federated = false);
00133
00134
00135 virtual ~DomainParticipantImpl();
00136
00137 virtual DDS::InstanceHandle_t get_instance_handle();
00138
00139 virtual DDS::Publisher_ptr create_publisher(
00140 const DDS::PublisherQos & qos,
00141 DDS::PublisherListener_ptr a_listener,
00142 DDS::StatusMask mask);
00143
00144 virtual DDS::ReturnCode_t delete_publisher(
00145 DDS::Publisher_ptr p);
00146
00147 virtual DDS::Subscriber_ptr create_subscriber(
00148 const DDS::SubscriberQos & qos,
00149 DDS::SubscriberListener_ptr a_listener,
00150 DDS::StatusMask mask);
00151
00152 virtual DDS::ReturnCode_t delete_subscriber(
00153 DDS::Subscriber_ptr s);
00154
00155 virtual DDS::Subscriber_ptr get_builtin_subscriber();
00156
00157 virtual DDS::Topic_ptr create_topic(
00158 const char * topic_name,
00159 const char * type_name,
00160 const DDS::TopicQos & qos,
00161 DDS::TopicListener_ptr a_listener,
00162 DDS::StatusMask mask);
00163
00164 virtual DDS::ReturnCode_t delete_topic(
00165 DDS::Topic_ptr a_topic);
00166
00167 virtual DDS::Topic_ptr find_topic(
00168 const char * topic_name,
00169 const DDS::Duration_t & timeout);
00170
00171 virtual DDS::TopicDescription_ptr lookup_topicdescription(
00172 const char * name);
00173
00174 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00175
00176 virtual DDS::ContentFilteredTopic_ptr create_contentfilteredtopic(
00177 const char * name,
00178 DDS::Topic_ptr related_topic,
00179 const char * filter_expression,
00180 const DDS::StringSeq & expression_parameters);
00181
00182 virtual DDS::ReturnCode_t delete_contentfilteredtopic(
00183 DDS::ContentFilteredTopic_ptr a_contentfilteredtopic);
00184
00185 #endif
00186
00187 #ifndef OPENDDS_NO_MULTI_TOPIC
00188
00189 virtual DDS::MultiTopic_ptr create_multitopic(
00190 const char * name,
00191 const char * type_name,
00192 const char * subscription_expression,
00193 const DDS::StringSeq & expression_parameters);
00194
00195 virtual DDS::ReturnCode_t delete_multitopic(DDS::MultiTopic_ptr a_multitopic);
00196
00197 #endif
00198
00199 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00200
00201 RcHandle<FilterEvaluator> get_filter_eval(const char* filter);
00202 void deref_filter_eval(const char* filter);
00203
00204 #endif
00205
00206 virtual DDS::ReturnCode_t delete_contained_entities();
00207
00208 virtual CORBA::Boolean contains_entity(DDS::InstanceHandle_t a_handle);
00209
00210 virtual DDS::ReturnCode_t set_qos(
00211 const DDS::DomainParticipantQos & qos);
00212
00213 virtual DDS::ReturnCode_t get_qos(
00214 DDS::DomainParticipantQos & qos);
00215
00216 virtual DDS::ReturnCode_t set_listener(
00217 DDS::DomainParticipantListener_ptr a_listener,
00218 DDS::StatusMask mask);
00219
00220 virtual DDS::DomainParticipantListener_ptr get_listener();
00221
00222 virtual DDS::ReturnCode_t ignore_participant(
00223 DDS::InstanceHandle_t handle);
00224
00225 virtual DDS::ReturnCode_t ignore_topic(
00226 DDS::InstanceHandle_t handle);
00227
00228 virtual DDS::ReturnCode_t ignore_publication(
00229 DDS::InstanceHandle_t handle);
00230
00231 virtual DDS::ReturnCode_t ignore_subscription(
00232 DDS::InstanceHandle_t handle);
00233
00234 virtual DDS::DomainId_t get_domain_id();
00235
00236 virtual DDS::ReturnCode_t assert_liveliness();
00237
00238 virtual DDS::ReturnCode_t set_default_publisher_qos(
00239 const DDS::PublisherQos & qos);
00240
00241 virtual DDS::ReturnCode_t get_default_publisher_qos(
00242 DDS::PublisherQos & qos);
00243
00244 virtual DDS::ReturnCode_t set_default_subscriber_qos(
00245 const DDS::SubscriberQos & qos);
00246
00247 virtual DDS::ReturnCode_t get_default_subscriber_qos(
00248 DDS::SubscriberQos & qos);
00249
00250 virtual DDS::ReturnCode_t set_default_topic_qos(
00251 const DDS::TopicQos & qos);
00252
00253 virtual DDS::ReturnCode_t get_default_topic_qos(
00254 DDS::TopicQos & qos);
00255
00256 virtual DDS::ReturnCode_t get_current_time(
00257 DDS::Time_t & current_time);
00258
00259 #if !defined (DDS_HAS_MINIMUM_BIT)
00260
00261 virtual DDS::ReturnCode_t get_discovered_participants(
00262 DDS::InstanceHandleSeq & participant_handles);
00263
00264 virtual DDS::ReturnCode_t get_discovered_participant_data(
00265 DDS::ParticipantBuiltinTopicData & participant_data,
00266 DDS::InstanceHandle_t participant_handle);
00267
00268 virtual DDS::ReturnCode_t get_discovered_topics(
00269 DDS::InstanceHandleSeq & topic_handles);
00270
00271 virtual DDS::ReturnCode_t get_discovered_topic_data(
00272 DDS::TopicBuiltinTopicData & topic_data,
00273 DDS::InstanceHandle_t topic_handle);
00274
00275 #endif
00276
00277 virtual DDS::ReturnCode_t enable();
00278
00279
00280
00281
00282
00283
00284
00285 RepoId get_id();
00286
00287
00288
00289
00290 OPENDDS_STRING get_unique_id();
00291
00292
00293
00294
00295 DDS::InstanceHandle_t id_to_handle(const RepoId& id);
00296
00297
00298
00299
00300
00301 RepoId get_repoid(const DDS::InstanceHandle_t& id);
00302
00303
00304
00305
00306
00307 void set_object_reference(const DDS::DomainParticipant_ptr& dp);
00308
00309
00310
00311
00312 int is_clean() const;
00313
00314
00315
00316
00317
00318
00319
00320 DDS::DomainParticipantListener_ptr listener_for(DDS::StatusKind kind);
00321
00322 typedef OPENDDS_VECTOR(RepoId) TopicIdVec;
00323
00324
00325
00326
00327 void get_topic_ids(TopicIdVec& topics);
00328
00329 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00330
00331
00332
00333 OwnershipManager* ownership_manager();
00334
00335
00336
00337
00338
00339
00340 void update_ownership_strength(const PublicationId& pub_id,
00341 const CORBA::Long& ownership_strength);
00342
00343 #endif
00344
00345 bool federated() const {
00346 return this->federated_;
00347 }
00348
00349
00350 Recorder_ptr create_recorder(DDS::Topic_ptr a_topic,
00351 const DDS::SubscriberQos & subscriber_qos,
00352 const DDS::DataReaderQos & datareader_qos,
00353 const RecorderListener_rch & a_listener,
00354 DDS::StatusMask mask);
00355
00356 Replayer_ptr create_replayer(DDS::Topic_ptr a_topic,
00357 const DDS::PublisherQos & publisher_qos,
00358 const DDS::DataWriterQos & datawriter_qos,
00359 const ReplayerListener_rch & a_listener,
00360 DDS::StatusMask mask);
00361
00362 DDS::Topic_ptr create_typeless_topic(
00363 const char * topic_name,
00364 const char * type_name,
00365 bool type_has_keys,
00366 const DDS::TopicQos & qos,
00367 DDS::TopicListener_ptr a_listener,
00368 DDS::StatusMask mask);
00369
00370 void delete_recorder(Recorder_ptr recorder);
00371 void delete_replayer(Replayer_ptr replayer);
00372
00373 void add_adjust_liveliness_timers(DataWriterImpl* writer);
00374 void remove_adjust_liveliness_timers();
00375
00376 private:
00377
00378 bool validate_publisher_qos(DDS::PublisherQos & publisher_qos);
00379 bool validate_subscriber_qos(DDS::SubscriberQos & subscriber_qos);
00380
00381
00382
00383
00384 enum {
00385 TOPIC_TYPE_HAS_KEYS =1,
00386 TOPIC_TYPELESS = 2
00387 } TopicTypeMask;
00388
00389 DDS::Topic_ptr create_topic_i(
00390 const char * topic_name,
00391 const char * type_name,
00392 const DDS::TopicQos & qos,
00393 DDS::TopicListener_ptr a_listener,
00394 DDS::StatusMask mask,
00395 int topic_mask);
00396
00397 DDS::Topic_ptr create_new_topic(
00398 const RepoId topic_id,
00399 const char * topic_name,
00400 const char * type_name,
00401 const DDS::TopicQos & qos,
00402 DDS::TopicListener_ptr a_listener,
00403 const DDS::StatusMask & mask,
00404 OpenDDS::DCPS::TypeSupport_ptr type_support);
00405
00406
00407
00408
00409 DDS::ReturnCode_t delete_topic_i(
00410 DDS::Topic_ptr a_topic,
00411 bool remove_objref);
00412
00413 DomainParticipantFactoryImpl* factory_;
00414
00415 DDS::TopicQos default_topic_qos_;
00416
00417 DDS::PublisherQos default_publisher_qos_;
00418
00419 DDS::SubscriberQos default_subscriber_qos_;
00420
00421
00422 DDS::DomainParticipantQos qos_;
00423
00424 DDS::DomainParticipantListener_var listener_;
00425
00426
00427 DDS::StatusMask listener_mask_;
00428
00429 DDS::DomainId_t domain_id_;
00430
00431 RepoId dp_id_;
00432
00433
00434
00435 bool federated_;
00436
00437
00438 PublisherSet publishers_;
00439
00440 SubscriberSet subscribers_;
00441
00442 TopicMap topics_;
00443 #if !defined(OPENDDS_NO_CONTENT_FILTERED_TOPIC) || !defined(OPENDDS_NO_MULTI_TOPIC)
00444
00445 TopicDescriptionMap topic_descrs_;
00446 #endif
00447
00448 HandleMap handles_;
00449 RepoIdMap repoIds_;
00450
00451 HandleMap ignored_participants_;
00452
00453 HandleMap ignored_topics_;
00454
00455 ACE_Recursive_Thread_Mutex publishers_protector_;
00456
00457 ACE_Recursive_Thread_Mutex subscribers_protector_;
00458
00459 ACE_Recursive_Thread_Mutex topics_protector_;
00460
00461 ACE_Recursive_Thread_Mutex handle_protector_;
00462
00463 ACE_Thread_Mutex shutdown_mutex_;
00464 ACE_Condition<ACE_Thread_Mutex> shutdown_condition_;
00465 DDS::ReturnCode_t shutdown_result_;
00466 bool shutdown_complete_;
00467
00468
00469 DDS::DomainParticipant_var participant_objref_;
00470
00471
00472 DDS::Subscriber_var bit_subscriber_;
00473
00474
00475
00476 InstanceHandleGenerator participant_handles_;
00477
00478 Monitor* monitor_;
00479
00480 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00481 OwnershipManager owner_man_;
00482 #endif
00483
00484
00485 RepoIdSequence pub_id_gen_;
00486 RepoId nextPubId();
00487
00488 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00489 ACE_Thread_Mutex filter_cache_lock_;
00490 OPENDDS_MAP(OPENDDS_STRING, RcHandle<FilterEvaluator> ) filter_cache_;
00491 #endif
00492
00493 typedef OPENDDS_SET_CMP(Recorder_var, VarLess<Recorder> ) RecorderSet;
00494 typedef OPENDDS_SET_CMP(Replayer_var, VarLess<Replayer> ) ReplayerSet;
00495
00496 RecorderSet recorders_;
00497 ReplayerSet replayers_;
00498
00499
00500 ACE_Recursive_Thread_Mutex recorders_protector_;
00501
00502 ACE_Recursive_Thread_Mutex replayers_protector_;
00503
00504 class LivelinessTimer : public ACE_Event_Handler {
00505 public:
00506 LivelinessTimer(DomainParticipantImpl& impl, DDS::LivelinessQosPolicyKind kind);
00507 virtual ~LivelinessTimer();
00508 void add_adjust(OpenDDS::DCPS::DataWriterImpl* writer);
00509 void remove_adjust();
00510 int handle_timeout(const ACE_Time_Value &tv, const void * );
00511 virtual void dispatch(const ACE_Time_Value& tv) = 0;
00512
00513 protected:
00514 DomainParticipantImpl& impl_;
00515 const DDS::LivelinessQosPolicyKind kind_;
00516
00517 ACE_Time_Value interval () const { return interval_; }
00518
00519 private:
00520 ACE_Time_Value interval_;
00521 bool recalculate_interval_;
00522 ACE_Time_Value last_liveliness_check_;
00523 bool scheduled_;
00524 ACE_Thread_Mutex lock_;
00525 };
00526
00527 class AutomaticLivelinessTimer : public LivelinessTimer {
00528 public:
00529 AutomaticLivelinessTimer(DomainParticipantImpl& impl);
00530 virtual void dispatch(const ACE_Time_Value& tv);
00531 };
00532 AutomaticLivelinessTimer automatic_liveliness_timer_;
00533
00534 class ParticipantLivelinessTimer : public LivelinessTimer {
00535 public:
00536 ParticipantLivelinessTimer(DomainParticipantImpl& impl);
00537 virtual void dispatch(const ACE_Time_Value& tv);
00538 };
00539 ParticipantLivelinessTimer participant_liveliness_timer_;
00540
00541 ACE_Time_Value liveliness_check_interval(DDS::LivelinessQosPolicyKind kind);
00542 bool participant_liveliness_activity_after(const ACE_Time_Value& tv);
00543 void signal_liveliness(DDS::LivelinessQosPolicyKind kind);
00544
00545 ACE_Time_Value last_liveliness_activity_;
00546
00547 virtual int handle_exception(ACE_HANDLE fd);
00548 };
00549
00550 }
00551 }
00552
00553 #endif