DomainParticipantImpl.h

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #ifndef OPENDDS_DCPS_DOMAIN_PARTICIPANT_IMPL_H
00009 #define OPENDDS_DCPS_DOMAIN_PARTICIPANT_IMPL_H
00010 
00011 #include "dds/DdsDcpsPublicationC.h"
00012 #include "dds/DdsDcpsSubscriptionExtC.h"
00013 #include "dds/DdsDcpsTopicC.h"
00014 #include "dds/DdsDcpsDomainC.h"
00015 #include "dds/DdsDcpsInfoUtilsC.h"
00016 #include "dds/DCPS/GuidUtils.h"
00017 #include "dds/DdsDcpsInfrastructureC.h"
00018 
00019 #if !defined (DDS_HAS_MINIMUM_BIT)
00020 #include "dds/DdsDcpsCoreTypeSupportC.h"
00021 #endif // !defined (DDS_HAS_MINIMUM_BIT)
00022 
00023 #include "EntityImpl.h"
00024 #include "Definitions.h"
00025 #include "TopicImpl.h"
00026 #include "InstanceHandle.h"
00027 #include "OwnershipManager.h"
00028 #include "GuidBuilder.h"
00029 
00030 #include "dds/DCPS/transport/framework/TransportImpl_rch.h"
00031 
00032 #include "dds/DCPS/PoolAllocator.h"
00033 
00034 #include "Recorder.h"
00035 #include "Replayer.h"
00036 
00037 #include "dds/DCPS/security/framework/SecurityConfig_rch.h"
00038 
00039 #include "ace/Null_Mutex.h"
00040 #include "ace/Condition_Thread_Mutex.h"
00041 #include "ace/Recursive_Thread_Mutex.h"
00042 
00043 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00044 #pragma once
00045 #endif /* ACE_LACKS_PRAGMA_ONCE */
00046 
00047 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00048 
00049 namespace OpenDDS {
00050 namespace DCPS {
00051 
00052 class PublisherImpl;
00053 class SubscriberImpl;
00054 class DataWriterImpl;
00055 class DomainParticipantFactoryImpl;
00056 class Monitor;
00057 
00058 
00059 class RecorderImpl;
00060 class ReplayerImpl;
00061 
00062 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00063 class FilterEvaluator;
00064 #endif
00065 
00066 /**
00067  * @class DomainParticipantImpl
00068  *
00069  * @brief Implements the OpenDDS::DCPS::DomainParticipant interfaces.
00070  *
00071  * This class acts as an entrypoint of the service and a factory
00072  * for publisher, subscriber and topic. It also acts as a container
00073  * for the publisher, subscriber and topic objects.
00074  *
00075  * See the DDS specification, OMG formal/04-12-02, for a description of
00076  * the interface this class is implementing.
00077  */
00078 class OpenDDS_Dcps_Export DomainParticipantImpl
00079   : public virtual OpenDDS::DCPS::LocalObject<DDS::DomainParticipant>
00080   , public virtual OpenDDS::DCPS::EntityImpl
00081   , public virtual ACE_Event_Handler
00082 {
00083 public:
00084   typedef Objref_Servant_Pair <SubscriberImpl, DDS::Subscriber,
00085                                DDS::Subscriber_ptr, DDS::Subscriber_var> Subscriber_Pair;
00086 
00087   typedef Objref_Servant_Pair <PublisherImpl, DDS::Publisher,
00088                                DDS::Publisher_ptr, DDS::Publisher_var> Publisher_Pair;
00089 
00090   typedef Objref_Servant_Pair <TopicImpl, DDS::Topic,
00091                                DDS::Topic_ptr, DDS::Topic_var> Topic_Pair;
00092 
00093   typedef OPENDDS_SET(Subscriber_Pair) SubscriberSet;
00094   typedef OPENDDS_SET(Publisher_Pair) PublisherSet;
00095 
00096   class OpenDDS_Dcps_Export RepoIdSequence {
00097   public:
00098     explicit RepoIdSequence(const RepoId& base);
00099     RepoId next();
00100   private:
00101     RepoId base_;          // will be combined with serial to produce next
00102     long serial_;          // will be incremented each time
00103     GuidBuilder builder_;  // used to modify base
00104   };
00105 
00106   struct RefCounted_Topic {
00107     RefCounted_Topic()
00108       : client_refs_(0)
00109     {
00110     }
00111 
00112     explicit RefCounted_Topic(const Topic_Pair& pair)
00113       : pair_(pair),
00114       client_refs_(1)
00115     {
00116     }
00117 
00118     /// The topic object reference.
00119     Topic_Pair pair_;
00120     /// The reference count on the obj_.
00121     CORBA::ULong client_refs_;
00122   };
00123 
00124   typedef OPENDDS_MAP(OPENDDS_STRING, RefCounted_Topic) TopicMap;
00125 
00126   typedef OPENDDS_MAP(OPENDDS_STRING, DDS::TopicDescription_var) TopicDescriptionMap;
00127 
00128   typedef OPENDDS_MAP_CMP(RepoId, DDS::InstanceHandle_t, GUID_tKeyLessThan) HandleMap;
00129   typedef OPENDDS_MAP(DDS::InstanceHandle_t, RepoId) RepoIdMap;
00130 
00131   DomainParticipantImpl(DomainParticipantFactoryImpl *     factory,
00132                         const DDS::DomainId_t&             domain_id,
00133                         const DDS::DomainParticipantQos &  qos,
00134                         DDS::DomainParticipantListener_ptr a_listener,
00135                         const DDS::StatusMask &            mask);
00136 
00137   virtual ~DomainParticipantImpl();
00138 
00139   virtual DDS::InstanceHandle_t get_instance_handle();
00140 
00141   virtual DDS::Publisher_ptr create_publisher(
00142     const DDS::PublisherQos &  qos,
00143     DDS::PublisherListener_ptr a_listener,
00144     DDS::StatusMask            mask);
00145 
00146   virtual DDS::ReturnCode_t delete_publisher(
00147     DDS::Publisher_ptr p);
00148 
00149   virtual DDS::Subscriber_ptr create_subscriber(
00150     const DDS::SubscriberQos &  qos,
00151     DDS::SubscriberListener_ptr a_listener,
00152     DDS::StatusMask             mask);
00153 
00154   virtual DDS::ReturnCode_t delete_subscriber(
00155     DDS::Subscriber_ptr s);
00156 
00157   virtual DDS::Subscriber_ptr get_builtin_subscriber();
00158 
00159   virtual DDS::Topic_ptr create_topic(
00160     const char *           topic_name,
00161     const char *           type_name,
00162     const DDS::TopicQos &  qos,
00163     DDS::TopicListener_ptr a_listener,
00164     DDS::StatusMask        mask);
00165 
00166   virtual DDS::ReturnCode_t delete_topic(
00167     DDS::Topic_ptr a_topic);
00168 
00169   virtual DDS::Topic_ptr find_topic(
00170     const char *            topic_name,
00171     const DDS::Duration_t & timeout);
00172 
00173   virtual DDS::TopicDescription_ptr lookup_topicdescription(
00174     const char * name);
00175 
00176 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00177 
00178   virtual DDS::ContentFilteredTopic_ptr create_contentfilteredtopic(
00179     const char *           name,
00180     DDS::Topic_ptr         related_topic,
00181     const char *           filter_expression,
00182     const DDS::StringSeq & expression_parameters);
00183 
00184   virtual DDS::ReturnCode_t delete_contentfilteredtopic(
00185     DDS::ContentFilteredTopic_ptr a_contentfilteredtopic);
00186 
00187 #endif
00188 
00189 #ifndef OPENDDS_NO_MULTI_TOPIC
00190 
00191   virtual DDS::MultiTopic_ptr create_multitopic(
00192     const char *           name,
00193     const char *           type_name,
00194     const char *           subscription_expression,
00195     const DDS::StringSeq & expression_parameters);
00196 
00197   virtual DDS::ReturnCode_t delete_multitopic(DDS::MultiTopic_ptr a_multitopic);
00198 
00199 #endif
00200 
00201 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00202 
00203   RcHandle<FilterEvaluator> get_filter_eval(const char* filter);
00204   void deref_filter_eval(const char* filter);
00205 
00206 #endif
00207 
00208   virtual DDS::ReturnCode_t delete_contained_entities();
00209 
00210   virtual CORBA::Boolean contains_entity(DDS::InstanceHandle_t a_handle);
00211 
00212   virtual DDS::ReturnCode_t set_qos(
00213     const DDS::DomainParticipantQos & qos);
00214 
00215   virtual DDS::ReturnCode_t get_qos(
00216     DDS::DomainParticipantQos & qos);
00217 
00218   virtual DDS::ReturnCode_t set_listener(
00219     DDS::DomainParticipantListener_ptr a_listener,
00220     DDS::StatusMask                    mask);
00221 
00222   virtual DDS::DomainParticipantListener_ptr get_listener();
00223 
00224   virtual DDS::ReturnCode_t ignore_participant(
00225     DDS::InstanceHandle_t handle);
00226 
00227   virtual DDS::ReturnCode_t ignore_topic(
00228     DDS::InstanceHandle_t handle);
00229 
00230   virtual DDS::ReturnCode_t ignore_publication(
00231     DDS::InstanceHandle_t handle);
00232 
00233   virtual DDS::ReturnCode_t ignore_subscription(
00234     DDS::InstanceHandle_t handle);
00235 
00236   virtual DDS::DomainId_t get_domain_id();
00237 
00238   virtual DDS::ReturnCode_t assert_liveliness();
00239 
00240   virtual DDS::ReturnCode_t set_default_publisher_qos(
00241     const DDS::PublisherQos & qos);
00242 
00243   virtual DDS::ReturnCode_t get_default_publisher_qos(
00244     DDS::PublisherQos & qos);
00245 
00246   virtual DDS::ReturnCode_t set_default_subscriber_qos(
00247     const DDS::SubscriberQos & qos);
00248 
00249   virtual DDS::ReturnCode_t get_default_subscriber_qos(
00250     DDS::SubscriberQos & qos);
00251 
00252   virtual DDS::ReturnCode_t set_default_topic_qos(
00253     const DDS::TopicQos & qos);
00254 
00255   virtual DDS::ReturnCode_t get_default_topic_qos(
00256     DDS::TopicQos & qos);
00257 
00258   virtual DDS::ReturnCode_t get_current_time(
00259     DDS::Time_t & current_time);
00260 
00261 #if !defined (DDS_HAS_MINIMUM_BIT)
00262 
00263   virtual DDS::ReturnCode_t get_discovered_participants(
00264     DDS::InstanceHandleSeq & participant_handles);
00265 
00266   virtual DDS::ReturnCode_t get_discovered_participant_data(
00267     DDS::ParticipantBuiltinTopicData & participant_data,
00268     DDS::InstanceHandle_t              participant_handle);
00269 
00270   virtual DDS::ReturnCode_t get_discovered_topics(
00271     DDS::InstanceHandleSeq & topic_handles);
00272 
00273   virtual DDS::ReturnCode_t get_discovered_topic_data(
00274     DDS::TopicBuiltinTopicData & topic_data,
00275     DDS::InstanceHandle_t        topic_handle);
00276 
00277 #endif
00278 
00279   virtual DDS::ReturnCode_t enable();
00280 
00281   /// Following methods are not the idl interfaces and are
00282   /// local operations.
00283 
00284   /**
00285    *  Return the id given by discovery.
00286    */
00287   RepoId get_id();
00288 
00289   /**
00290    * Return a unique string based on repo ID.
00291    */
00292   OPENDDS_STRING get_unique_id();
00293 
00294   /**
00295    * Obtain a local handle representing a GUID.
00296    */
00297   DDS::InstanceHandle_t id_to_handle(const RepoId& id);
00298 
00299   /**
00300    * Obtain a GUID representing a local hande.
00301    * @return GUID_UNKNOWN if not found.
00302    */
00303   RepoId get_repoid(const DDS::InstanceHandle_t& id);
00304 
00305   /**
00306    *  Check if the topic is used by any datareader or datawriter.
00307    */
00308   bool is_clean() const;
00309 
00310   /**
00311    * This is used to retrieve the listener for a certain status change.
00312    * If this DomainParticipant has a registered listener and the status
00313    * kind is in the listener mask then the listener is returned.
00314    * Otherwise, return nil.
00315    */
00316   DDS::DomainParticipantListener_ptr listener_for(DDS::StatusKind kind);
00317 
00318   typedef OPENDDS_VECTOR(RepoId) TopicIdVec;
00319   /**
00320    * Populates an std::vector with the RepoId of the topics this
00321    * participant has created/found.
00322    */
00323   void get_topic_ids(TopicIdVec& topics);
00324 
00325 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00326 
00327   /** Accessor for ownership manager.
00328    */
00329   OwnershipManager* ownership_manager();
00330 
00331 
00332   /**
00333    * Called upon receiving new BIT publication data to
00334    * update the ownership strength of a publication.
00335    */
00336   void update_ownership_strength(const PublicationId& pub_id,
00337                                  const CORBA::Long&   ownership_strength);
00338 
00339 #endif
00340 
00341   bool federated() const {
00342     return this->federated_;
00343   }
00344 
00345 
00346   Recorder_ptr create_recorder(DDS::Topic_ptr               a_topic,
00347                                const DDS::SubscriberQos &   subscriber_qos,
00348                                const DDS::DataReaderQos &   datareader_qos,
00349                                const RecorderListener_rch & a_listener,
00350                                DDS::StatusMask              mask);
00351 
00352   Replayer_ptr create_replayer(DDS::Topic_ptr               a_topic,
00353                                const DDS::PublisherQos &    publisher_qos,
00354                                const DDS::DataWriterQos &   datawriter_qos,
00355                                const ReplayerListener_rch & a_listener,
00356                                DDS::StatusMask              mask);
00357 
00358   DDS::Topic_ptr create_typeless_topic(
00359     const char *           topic_name,
00360     const char *           type_name,
00361     bool                   type_has_keys,
00362     const DDS::TopicQos &  qos,
00363     DDS::TopicListener_ptr a_listener,
00364     DDS::StatusMask        mask);
00365 
00366   void delete_recorder(Recorder_ptr recorder);
00367   void delete_replayer(Replayer_ptr replayer);
00368 
00369   void add_adjust_liveliness_timers(DataWriterImpl* writer);
00370   void remove_adjust_liveliness_timers();
00371 
00372 #if defined(OPENDDS_SECURITY)
00373   void set_security_config(const Security::SecurityConfig_rch& config);
00374 
00375   DDS::Security::ParticipantCryptoHandle crypto_handle() const
00376   {
00377     return part_crypto_handle_;
00378   }
00379 #endif
00380 
00381 private:
00382 
00383   bool validate_publisher_qos(DDS::PublisherQos & publisher_qos);
00384   bool validate_subscriber_qos(DDS::SubscriberQos & subscriber_qos);
00385 
00386   /** The implementation of create_topic.
00387    */
00388 
00389   enum {
00390     TOPIC_TYPE_HAS_KEYS =1,
00391     TOPIC_TYPELESS = 2
00392   } TopicTypeMask;
00393 
00394   DDS::Topic_ptr create_topic_i(
00395     const char *           topic_name,
00396     const char *           type_name,
00397     const DDS::TopicQos &  qos,
00398     DDS::TopicListener_ptr a_listener,
00399     DDS::StatusMask        mask,
00400     int                    topic_mask);
00401 
00402   DDS::Topic_ptr create_new_topic(
00403     const RepoId                   topic_id,
00404     const char *                   topic_name,
00405     const char *                   type_name,
00406     const DDS::TopicQos &          qos,
00407     DDS::TopicListener_ptr         a_listener,
00408     const DDS::StatusMask &        mask,
00409     OpenDDS::DCPS::TypeSupport_ptr type_support);
00410 
00411   /** Delete the topic with option of whether the
00412    *  topic object reference should be removed.
00413    */
00414   DDS::ReturnCode_t delete_topic_i(
00415     DDS::Topic_ptr a_topic,
00416     bool           remove_objref);
00417 
00418   DomainParticipantFactoryImpl* factory_;
00419   /// The default topic qos.
00420   DDS::TopicQos default_topic_qos_;
00421   /// The default publisher qos.
00422   DDS::PublisherQos default_publisher_qos_;
00423   /// The default subscriber qos.
00424   DDS::SubscriberQos default_subscriber_qos_;
00425 
00426   /// The qos of this DomainParticipant.
00427   DDS::DomainParticipantQos qos_;
00428   /// Used to notify the entity for relevant events.
00429   DDS::DomainParticipantListener_var listener_;
00430   /// The StatusKind bit mask indicates which status condition change
00431   /// can be notified by the listener of this entity.
00432   DDS::StatusMask listener_mask_;
00433 
00434   #if defined(OPENDDS_SECURITY)
00435   /// This participant id handle given by authentication.
00436   DDS::Security::IdentityHandle id_handle_;
00437   /// This participant permissions handle given by access constrol.
00438   DDS::Security::PermissionsHandle perm_handle_;
00439   /// This participant crypto handle given by crypto
00440   DDS::Security::ParticipantCryptoHandle part_crypto_handle_;
00441   #endif
00442 
00443   /// The id of the domain that creates this participant.
00444   const DDS::DomainId_t domain_id_;
00445   /// This participant id given by discovery.
00446   RepoId dp_id_;
00447 
00448   /// Whether this DomainParticipant is attached to a federated
00449   /// repository.
00450   bool federated_;
00451 
00452   /// Collection of publishers.
00453   PublisherSet publishers_;
00454   /// Collection of subscribers.
00455   SubscriberSet subscribers_;
00456   /// Collection of topics.
00457   TopicMap topics_;
00458 #if !defined(OPENDDS_NO_CONTENT_FILTERED_TOPIC) || !defined(OPENDDS_NO_MULTI_TOPIC)
00459   /// Collection of TopicDescriptions which are not also Topics
00460   TopicDescriptionMap topic_descrs_;
00461 #endif
00462   /// Bidirectional collection of handles <--> RepoIds.
00463   HandleMap handles_;
00464   RepoIdMap repoIds_;
00465   /// Collection of ignored participants.
00466   HandleMap ignored_participants_;
00467   /// Collection of ignored topics.
00468   HandleMap ignored_topics_;
00469   /// Protect the publisher collection.
00470   ACE_Recursive_Thread_Mutex publishers_protector_;
00471   /// Protect the subscriber collection.
00472   ACE_Recursive_Thread_Mutex subscribers_protector_;
00473   /// Protect the topic collection.
00474   ACE_Recursive_Thread_Mutex topics_protector_;
00475   /// Protect the handle collection.
00476   ACE_Recursive_Thread_Mutex handle_protector_;
00477   /// Protect the shutdown.
00478   ACE_Thread_Mutex shutdown_mutex_;
00479   ACE_Condition<ACE_Thread_Mutex> shutdown_condition_;
00480   DDS::ReturnCode_t shutdown_result_;
00481   bool shutdown_complete_;
00482 
00483   /// The built in topic subscriber.
00484   DDS::Subscriber_var bit_subscriber_;
00485 
00486   /// Instance handle generators for non-repo backed entities
00487   /// (i.e. subscribers and publishers).
00488   InstanceHandleGenerator participant_handles_;
00489 
00490   Monitor* monitor_;
00491 
00492 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00493   OwnershipManager owner_man_;
00494 #endif
00495 
00496   /// Publisher ID generator.
00497   RepoIdSequence pub_id_gen_;
00498   RepoId nextPubId();
00499 
00500 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00501   ACE_Thread_Mutex filter_cache_lock_;
00502   OPENDDS_MAP(OPENDDS_STRING, RcHandle<FilterEvaluator> ) filter_cache_;
00503 #endif
00504 
00505   typedef OPENDDS_SET_CMP(Recorder_var, VarLess<Recorder> ) RecorderSet;
00506   typedef OPENDDS_SET_CMP(Replayer_var, VarLess<Replayer> ) ReplayerSet;
00507 
00508   RecorderSet recorders_;
00509   ReplayerSet replayers_;
00510 
00511 #if defined(OPENDDS_SECURITY)
00512   Security::SecurityConfig_rch security_config_;
00513 #endif
00514 
00515   /// Protect the recorders collection.
00516   ACE_Recursive_Thread_Mutex recorders_protector_;
00517   /// Protect the replayers collection.
00518   ACE_Recursive_Thread_Mutex replayers_protector_;
00519 
00520   class LivelinessTimer : public ACE_Event_Handler {
00521   public:
00522     LivelinessTimer(DomainParticipantImpl& impl, DDS::LivelinessQosPolicyKind kind);
00523     virtual ~LivelinessTimer();
00524     void add_adjust(OpenDDS::DCPS::DataWriterImpl* writer);
00525     void remove_adjust();
00526     int handle_timeout(const ACE_Time_Value &tv, const void * /* arg */);
00527     virtual void dispatch(const ACE_Time_Value& tv) = 0;
00528 
00529   protected:
00530     DomainParticipantImpl& impl_;
00531     const DDS::LivelinessQosPolicyKind kind_;
00532 
00533     ACE_Time_Value interval () const { return interval_; }
00534 
00535   private:
00536     ACE_Time_Value interval_;
00537     bool recalculate_interval_;
00538     ACE_Time_Value last_liveliness_check_;
00539     bool scheduled_;
00540     ACE_Thread_Mutex lock_;
00541   };
00542 
00543   class AutomaticLivelinessTimer : public LivelinessTimer {
00544   public:
00545     AutomaticLivelinessTimer(DomainParticipantImpl& impl);
00546     virtual void dispatch(const ACE_Time_Value& tv);
00547   };
00548   AutomaticLivelinessTimer automatic_liveliness_timer_;
00549 
00550   class ParticipantLivelinessTimer : public LivelinessTimer {
00551   public:
00552     ParticipantLivelinessTimer(DomainParticipantImpl& impl);
00553     virtual void dispatch(const ACE_Time_Value& tv);
00554   };
00555   ParticipantLivelinessTimer participant_liveliness_timer_;
00556 
00557   ACE_Time_Value liveliness_check_interval(DDS::LivelinessQosPolicyKind kind);
00558   bool participant_liveliness_activity_after(const ACE_Time_Value& tv);
00559   void signal_liveliness(DDS::LivelinessQosPolicyKind kind);
00560 
00561   ACE_Time_Value last_liveliness_activity_;
00562 
00563   virtual int handle_exception(ACE_HANDLE fd);
00564 };
00565 
00566 } // namespace DCPS
00567 } // namespace OpenDDS
00568 
00569 OPENDDS_END_VERSIONED_NAMESPACE_DECL
00570 
00571 #endif /* OPENDDS_DCPS_DOMAIN_PARTICIPANT_IMPL_H  */
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1