DomainParticipantImpl.h

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #ifndef OPENDDS_DCPS_DOMAIN_PARTICIPANT_IMPL_H
00009 #define OPENDDS_DCPS_DOMAIN_PARTICIPANT_IMPL_H
00010 
00011 #include "EntityImpl.h"
00012 #include "Definitions.h"
00013 #include "TopicImpl.h"
00014 #include "InstanceHandle.h"
00015 #include "OwnershipManager.h"
00016 #include "GuidBuilder.h"
00017 #include "dds/DdsDcpsPublicationC.h"
00018 #include "dds/DdsDcpsSubscriptionExtC.h"
00019 #include "dds/DdsDcpsTopicC.h"
00020 #include "dds/DdsDcpsDomainC.h"
00021 #include "dds/DdsDcpsInfoUtilsC.h"
00022 #include "dds/DCPS/GuidUtils.h"
00023 #include "dds/DdsDcpsInfrastructureC.h"
00024 
00025 #if !defined (DDS_HAS_MINIMUM_BIT)
00026 #include "dds/DdsDcpsCoreTypeSupportC.h"
00027 #endif // !defined (DDS_HAS_MINIMUM_BIT)
00028 
00029 #include "dds/DCPS/transport/framework/TransportImpl_rch.h"
00030 #include "ace/Null_Mutex.h"
00031 #include "ace/Recursive_Thread_Mutex.h"
00032 #include "ace/Condition_T.h"
00033 
00034 #include "dds/DCPS/PoolAllocator.h"
00035 
00036 #include "Recorder.h"
00037 #include "Replayer.h"
00038 
00039 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00040 #pragma once
00041 #endif /* ACE_LACKS_PRAGMA_ONCE */
00042 
00043 namespace OpenDDS {
00044 namespace DCPS {
00045 
00046 class PublisherImpl;
00047 class SubscriberImpl;
00048 class DataWriterImpl;
00049 class DomainParticipantFactoryImpl;
00050 class Monitor;
00051 
00052 
00053 class RecorderImpl;
00054 class ReplayerImpl;
00055 
00056 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00057 class FilterEvaluator;
00058 #endif
00059 
00060 /**
00061  * @class DomainParticipantImpl
00062  *
00063  * @brief Implements the OpenDDS::DCPS::DomainParticipant interfaces.
00064  *
00065  * This class acts as an entrypoint of the service and a factory
00066  * for publisher, subscriber and topic. It also acts as a container
00067  * for the publisher, subscriber and topic objects.
00068  *
00069  * See the DDS specification, OMG formal/04-12-02, for a description of
00070  * the interface this class is implementing.
00071  */
00072 class OpenDDS_Dcps_Export DomainParticipantImpl
00073   : public virtual OpenDDS::DCPS::LocalObject<DDS::DomainParticipant>
00074   , public virtual OpenDDS::DCPS::EntityImpl
00075   , public virtual ACE_Event_Handler
00076 {
00077 public:
00078   typedef Objref_Servant_Pair <SubscriberImpl, DDS::Subscriber,
00079                                DDS::Subscriber_ptr, DDS::Subscriber_var> Subscriber_Pair;
00080 
00081   typedef Objref_Servant_Pair <PublisherImpl, DDS::Publisher,
00082                                DDS::Publisher_ptr, DDS::Publisher_var> Publisher_Pair;
00083 
00084   typedef Objref_Servant_Pair <TopicImpl, DDS::Topic,
00085                                DDS::Topic_ptr, DDS::Topic_var> Topic_Pair;
00086 
00087   typedef OPENDDS_SET(Subscriber_Pair) SubscriberSet;
00088   typedef OPENDDS_SET(Publisher_Pair) PublisherSet;
00089 
00090   class OpenDDS_Dcps_Export RepoIdSequence {
00091 public:
00092     explicit RepoIdSequence(RepoId& base);
00093     RepoId next();
00094 private:
00095     RepoId base_;          // will be combined with serial to produce next
00096     long serial_;          // will be incremented each time
00097     GuidBuilder builder_;  // used to modify base
00098   };
00099 
00100   struct RefCounted_Topic {
00101     RefCounted_Topic()
00102       : client_refs_(0)
00103     {
00104     }
00105 
00106     explicit RefCounted_Topic(const Topic_Pair& pair)
00107       : pair_(pair),
00108       client_refs_(1)
00109     {
00110     }
00111 
00112     /// The topic object reference.
00113     Topic_Pair pair_;
00114     /// The reference count on the obj_.
00115     CORBA::Long client_refs_;
00116   };
00117 
00118   typedef OPENDDS_MAP(OPENDDS_STRING, RefCounted_Topic) TopicMap;
00119 
00120   typedef OPENDDS_MAP(OPENDDS_STRING, DDS::TopicDescription_var) TopicDescriptionMap;
00121 
00122   typedef OPENDDS_MAP_CMP(RepoId, DDS::InstanceHandle_t, GUID_tKeyLessThan) HandleMap;
00123   typedef OPENDDS_MAP(DDS::InstanceHandle_t, RepoId) RepoIdMap;
00124 
00125   ///Constructor
00126   DomainParticipantImpl(DomainParticipantFactoryImpl *     factory,
00127                         const DDS::DomainId_t&             domain_id,
00128                         const RepoId&                      dp_id,
00129                         const DDS::DomainParticipantQos &  qos,
00130                         DDS::DomainParticipantListener_ptr a_listener,
00131                         const DDS::StatusMask &            mask,
00132                         bool                               federated = false);
00133 
00134   ///Destructor
00135   virtual ~DomainParticipantImpl();
00136 
00137   virtual DDS::InstanceHandle_t get_instance_handle();
00138 
00139   virtual DDS::Publisher_ptr create_publisher(
00140     const DDS::PublisherQos &  qos,
00141     DDS::PublisherListener_ptr a_listener,
00142     DDS::StatusMask            mask);
00143 
00144   virtual DDS::ReturnCode_t delete_publisher(
00145     DDS::Publisher_ptr p);
00146 
00147   virtual DDS::Subscriber_ptr create_subscriber(
00148     const DDS::SubscriberQos &  qos,
00149     DDS::SubscriberListener_ptr a_listener,
00150     DDS::StatusMask             mask);
00151 
00152   virtual DDS::ReturnCode_t delete_subscriber(
00153     DDS::Subscriber_ptr s);
00154 
00155   virtual DDS::Subscriber_ptr get_builtin_subscriber();
00156 
00157   virtual DDS::Topic_ptr create_topic(
00158     const char *           topic_name,
00159     const char *           type_name,
00160     const DDS::TopicQos &  qos,
00161     DDS::TopicListener_ptr a_listener,
00162     DDS::StatusMask        mask);
00163 
00164   virtual DDS::ReturnCode_t delete_topic(
00165     DDS::Topic_ptr a_topic);
00166 
00167   virtual DDS::Topic_ptr find_topic(
00168     const char *            topic_name,
00169     const DDS::Duration_t & timeout);
00170 
00171   virtual DDS::TopicDescription_ptr lookup_topicdescription(
00172     const char * name);
00173 
00174 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00175 
00176   virtual DDS::ContentFilteredTopic_ptr create_contentfilteredtopic(
00177     const char *           name,
00178     DDS::Topic_ptr         related_topic,
00179     const char *           filter_expression,
00180     const DDS::StringSeq & expression_parameters);
00181 
00182   virtual DDS::ReturnCode_t delete_contentfilteredtopic(
00183     DDS::ContentFilteredTopic_ptr a_contentfilteredtopic);
00184 
00185 #endif
00186 
00187 #ifndef OPENDDS_NO_MULTI_TOPIC
00188 
00189   virtual DDS::MultiTopic_ptr create_multitopic(
00190     const char *           name,
00191     const char *           type_name,
00192     const char *           subscription_expression,
00193     const DDS::StringSeq & expression_parameters);
00194 
00195   virtual DDS::ReturnCode_t delete_multitopic(DDS::MultiTopic_ptr a_multitopic);
00196 
00197 #endif
00198 
00199 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00200 
00201   RcHandle<FilterEvaluator> get_filter_eval(const char* filter);
00202   void deref_filter_eval(const char* filter);
00203 
00204 #endif
00205 
00206   virtual DDS::ReturnCode_t delete_contained_entities();
00207 
00208   virtual CORBA::Boolean contains_entity(DDS::InstanceHandle_t a_handle);
00209 
00210   virtual DDS::ReturnCode_t set_qos(
00211     const DDS::DomainParticipantQos & qos);
00212 
00213   virtual DDS::ReturnCode_t get_qos(
00214     DDS::DomainParticipantQos & qos);
00215 
00216   virtual DDS::ReturnCode_t set_listener(
00217     DDS::DomainParticipantListener_ptr a_listener,
00218     DDS::StatusMask                    mask);
00219 
00220   virtual DDS::DomainParticipantListener_ptr get_listener();
00221 
00222   virtual DDS::ReturnCode_t ignore_participant(
00223     DDS::InstanceHandle_t handle);
00224 
00225   virtual DDS::ReturnCode_t ignore_topic(
00226     DDS::InstanceHandle_t handle);
00227 
00228   virtual DDS::ReturnCode_t ignore_publication(
00229     DDS::InstanceHandle_t handle);
00230 
00231   virtual DDS::ReturnCode_t ignore_subscription(
00232     DDS::InstanceHandle_t handle);
00233 
00234   virtual DDS::DomainId_t get_domain_id();
00235 
00236   virtual DDS::ReturnCode_t assert_liveliness();
00237 
00238   virtual DDS::ReturnCode_t set_default_publisher_qos(
00239     const DDS::PublisherQos & qos);
00240 
00241   virtual DDS::ReturnCode_t get_default_publisher_qos(
00242     DDS::PublisherQos & qos);
00243 
00244   virtual DDS::ReturnCode_t set_default_subscriber_qos(
00245     const DDS::SubscriberQos & qos);
00246 
00247   virtual DDS::ReturnCode_t get_default_subscriber_qos(
00248     DDS::SubscriberQos & qos);
00249 
00250   virtual DDS::ReturnCode_t set_default_topic_qos(
00251     const DDS::TopicQos & qos);
00252 
00253   virtual DDS::ReturnCode_t get_default_topic_qos(
00254     DDS::TopicQos & qos);
00255 
00256   virtual DDS::ReturnCode_t get_current_time(
00257     DDS::Time_t & current_time);
00258 
00259 #if !defined (DDS_HAS_MINIMUM_BIT)
00260 
00261   virtual DDS::ReturnCode_t get_discovered_participants(
00262     DDS::InstanceHandleSeq & participant_handles);
00263 
00264   virtual DDS::ReturnCode_t get_discovered_participant_data(
00265     DDS::ParticipantBuiltinTopicData & participant_data,
00266     DDS::InstanceHandle_t              participant_handle);
00267 
00268   virtual DDS::ReturnCode_t get_discovered_topics(
00269     DDS::InstanceHandleSeq & topic_handles);
00270 
00271   virtual DDS::ReturnCode_t get_discovered_topic_data(
00272     DDS::TopicBuiltinTopicData & topic_data,
00273     DDS::InstanceHandle_t        topic_handle);
00274 
00275 #endif
00276 
00277   virtual DDS::ReturnCode_t enable();
00278 
00279   /// Following methods are not the idl interfaces and are
00280   /// local operations.
00281 
00282   /**
00283    *  Return the id given by discovery.
00284    */
00285   RepoId get_id();
00286 
00287   /**
00288    * Return a unique string based on repo ID.
00289    */
00290   OPENDDS_STRING get_unique_id();
00291 
00292   /**
00293    * Obtain a local handle representing a GUID.
00294    */
00295   DDS::InstanceHandle_t id_to_handle(const RepoId& id);
00296 
00297   /**
00298    * Obtain a GUID representing a local hande.
00299    * @return GUID_UNKNOWN if not found.
00300    */
00301   RepoId get_repoid(const DDS::InstanceHandle_t& id);
00302 
00303   /**
00304    *  Associate the servant with the object reference.
00305    *  This is required to pass to the topic servant.
00306    */
00307   void set_object_reference(const DDS::DomainParticipant_ptr& dp);
00308 
00309   /**
00310    *  Check if the topic is used by any datareader or datawriter.
00311    */
00312   int is_clean() const;
00313 
00314   /**
00315    * This is used to retrieve the listener for a certain status change.
00316    * If this DomainParticipant has a registered listener and the status
00317    * kind is in the listener mask then the listener is returned.
00318    * Otherwise, return nil.
00319    */
00320   DDS::DomainParticipantListener_ptr listener_for(DDS::StatusKind kind);
00321 
00322   typedef OPENDDS_VECTOR(RepoId) TopicIdVec;
00323   /**
00324    * Populates an std::vector with the RepoId of the topics this
00325    * participant has created/found.
00326    */
00327   void get_topic_ids(TopicIdVec& topics);
00328 
00329 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00330 
00331   /** Accessor for ownership manager.
00332    */
00333   OwnershipManager* ownership_manager();
00334 
00335 
00336   /**
00337    * Called upon receiving new BIT publication data to
00338    * update the ownership strength of a publication.
00339    */
00340   void update_ownership_strength(const PublicationId& pub_id,
00341                                  const CORBA::Long&   ownership_strength);
00342 
00343 #endif
00344 
00345   bool federated() const {
00346     return this->federated_;
00347   }
00348 
00349 
00350   Recorder_ptr create_recorder(DDS::Topic_ptr               a_topic,
00351                                const DDS::SubscriberQos &   subscriber_qos,
00352                                const DDS::DataReaderQos &   datareader_qos,
00353                                const RecorderListener_rch & a_listener,
00354                                DDS::StatusMask              mask);
00355 
00356   Replayer_ptr create_replayer(DDS::Topic_ptr               a_topic,
00357                                const DDS::PublisherQos &    publisher_qos,
00358                                const DDS::DataWriterQos &   datawriter_qos,
00359                                const ReplayerListener_rch & a_listener,
00360                                DDS::StatusMask              mask);
00361 
00362   DDS::Topic_ptr create_typeless_topic(
00363     const char *           topic_name,
00364     const char *           type_name,
00365     bool                   type_has_keys,
00366     const DDS::TopicQos &  qos,
00367     DDS::TopicListener_ptr a_listener,
00368     DDS::StatusMask        mask);
00369 
00370   void delete_recorder(Recorder_ptr recorder);
00371   void delete_replayer(Replayer_ptr replayer);
00372 
00373   void add_adjust_liveliness_timers(DataWriterImpl* writer);
00374   void remove_adjust_liveliness_timers();
00375 
00376 private:
00377 
00378   bool validate_publisher_qos(DDS::PublisherQos & publisher_qos);
00379   bool validate_subscriber_qos(DDS::SubscriberQos & subscriber_qos);
00380 
00381   /** The implementation of create_topic.
00382    */
00383 
00384   enum {
00385     TOPIC_TYPE_HAS_KEYS =1,
00386     TOPIC_TYPELESS = 2
00387   } TopicTypeMask;
00388 
00389   DDS::Topic_ptr create_topic_i(
00390     const char *           topic_name,
00391     const char *           type_name,
00392     const DDS::TopicQos &  qos,
00393     DDS::TopicListener_ptr a_listener,
00394     DDS::StatusMask        mask,
00395     int                    topic_mask);
00396 
00397   DDS::Topic_ptr create_new_topic(
00398     const RepoId                   topic_id,
00399     const char *                   topic_name,
00400     const char *                   type_name,
00401     const DDS::TopicQos &          qos,
00402     DDS::TopicListener_ptr         a_listener,
00403     const DDS::StatusMask &        mask,
00404     OpenDDS::DCPS::TypeSupport_ptr type_support);
00405 
00406   /** Delete the topic with option of whether the
00407    *  topic object reference should be removed.
00408    */
00409   DDS::ReturnCode_t delete_topic_i(
00410     DDS::Topic_ptr a_topic,
00411     bool           remove_objref);
00412 
00413   DomainParticipantFactoryImpl* factory_;
00414   /// The default topic qos.
00415   DDS::TopicQos default_topic_qos_;
00416   /// The default publisher qos.
00417   DDS::PublisherQos default_publisher_qos_;
00418   /// The default subscriber qos.
00419   DDS::SubscriberQos default_subscriber_qos_;
00420 
00421   /// The qos of this DomainParticipant.
00422   DDS::DomainParticipantQos qos_;
00423   /// Used to notify the entity for relevant events.
00424   DDS::DomainParticipantListener_var listener_;
00425   /// The StatusKind bit mask indicates which status condition change
00426   /// can be notified by the listener of this entity.
00427   DDS::StatusMask listener_mask_;
00428   /// The id of the domain that creates this participant.
00429   DDS::DomainId_t domain_id_;
00430   /// This participant id given by discovery.
00431   RepoId dp_id_;
00432 
00433   /// Whether this DomainParticipant is attached to a federated
00434   /// repository.
00435   bool federated_;
00436 
00437   /// Collection of publishers.
00438   PublisherSet publishers_;
00439   /// Collection of subscribers.
00440   SubscriberSet subscribers_;
00441   /// Collection of topics.
00442   TopicMap topics_;
00443 #if !defined(OPENDDS_NO_CONTENT_FILTERED_TOPIC) || !defined(OPENDDS_NO_MULTI_TOPIC)
00444   /// Collection of TopicDescriptions which are not also Topics
00445   TopicDescriptionMap topic_descrs_;
00446 #endif
00447   /// Bidirectional collection of handles <--> RepoIds.
00448   HandleMap handles_;
00449   RepoIdMap repoIds_;
00450   /// Collection of ignored participants.
00451   HandleMap ignored_participants_;
00452   /// Collection of ignored topics.
00453   HandleMap ignored_topics_;
00454   /// Protect the publisher collection.
00455   ACE_Recursive_Thread_Mutex publishers_protector_;
00456   /// Protect the subscriber collection.
00457   ACE_Recursive_Thread_Mutex subscribers_protector_;
00458   /// Protect the topic collection.
00459   ACE_Recursive_Thread_Mutex topics_protector_;
00460   /// Protect the handle collection.
00461   ACE_Recursive_Thread_Mutex handle_protector_;
00462   /// Protect the shutdown.
00463   ACE_Thread_Mutex shutdown_mutex_;
00464   ACE_Condition<ACE_Thread_Mutex> shutdown_condition_;
00465   DDS::ReturnCode_t shutdown_result_;
00466   bool shutdown_complete_;
00467 
00468   /// The object reference activated from this servant.
00469   DDS::DomainParticipant_var participant_objref_;
00470 
00471   /// The built in topic subscriber.
00472   DDS::Subscriber_var bit_subscriber_;
00473 
00474   /// Instance handle generators for non-repo backed entities
00475   /// (i.e. subscribers and publishers).
00476   InstanceHandleGenerator participant_handles_;
00477 
00478   Monitor* monitor_;
00479 
00480 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00481   OwnershipManager owner_man_;
00482 #endif
00483 
00484   /// Publisher ID generator.
00485   RepoIdSequence pub_id_gen_;
00486   RepoId nextPubId();
00487 
00488 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00489   ACE_Thread_Mutex filter_cache_lock_;
00490   OPENDDS_MAP(OPENDDS_STRING, RcHandle<FilterEvaluator> ) filter_cache_;
00491 #endif
00492 
00493   typedef OPENDDS_SET_CMP(Recorder_var, VarLess<Recorder> ) RecorderSet;
00494   typedef OPENDDS_SET_CMP(Replayer_var, VarLess<Replayer> ) ReplayerSet;
00495 
00496   RecorderSet recorders_;
00497   ReplayerSet replayers_;
00498 
00499   /// Protect the recorders collection.
00500   ACE_Recursive_Thread_Mutex recorders_protector_;
00501   /// Protect the replayers collection.
00502   ACE_Recursive_Thread_Mutex replayers_protector_;
00503 
00504   class LivelinessTimer : public ACE_Event_Handler {
00505   public:
00506     LivelinessTimer(DomainParticipantImpl& impl, DDS::LivelinessQosPolicyKind kind);
00507     virtual ~LivelinessTimer();
00508     void add_adjust(OpenDDS::DCPS::DataWriterImpl* writer);
00509     void remove_adjust();
00510     int handle_timeout(const ACE_Time_Value &tv, const void * /* arg */);
00511     virtual void dispatch(const ACE_Time_Value& tv) = 0;
00512 
00513   protected:
00514     DomainParticipantImpl& impl_;
00515     const DDS::LivelinessQosPolicyKind kind_;
00516 
00517     ACE_Time_Value interval () const { return interval_; }
00518 
00519   private:
00520     ACE_Time_Value interval_;
00521     bool recalculate_interval_;
00522     ACE_Time_Value last_liveliness_check_;
00523     bool scheduled_;
00524     ACE_Thread_Mutex lock_;
00525   };
00526 
00527   class AutomaticLivelinessTimer : public LivelinessTimer {
00528   public:
00529     AutomaticLivelinessTimer(DomainParticipantImpl& impl);
00530     virtual void dispatch(const ACE_Time_Value& tv);
00531   };
00532   AutomaticLivelinessTimer automatic_liveliness_timer_;
00533 
00534   class ParticipantLivelinessTimer : public LivelinessTimer {
00535   public:
00536     ParticipantLivelinessTimer(DomainParticipantImpl& impl);
00537     virtual void dispatch(const ACE_Time_Value& tv);
00538   };
00539   ParticipantLivelinessTimer participant_liveliness_timer_;
00540 
00541   ACE_Time_Value liveliness_check_interval(DDS::LivelinessQosPolicyKind kind);
00542   bool participant_liveliness_activity_after(const ACE_Time_Value& tv);
00543   void signal_liveliness(DDS::LivelinessQosPolicyKind kind);
00544 
00545   ACE_Time_Value last_liveliness_activity_;
00546 
00547   virtual int handle_exception(ACE_HANDLE fd);
00548 };
00549 
00550 } // namespace DCPS
00551 } // namespace OpenDDS
00552 
00553 #endif /* OPENDDS_DCPS_DOMAIN_PARTICIPANT_IMPL_H  */

Generated on Fri Feb 12 20:05:22 2016 for OpenDDS by  doxygen 1.4.7