FederatorManagerImpl.h

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #ifndef FEDERATORMANAGERIMPL_H
00009 #define FEDERATORMANAGERIMPL_H
00010 
00011 #include "federator_export.h"
00012 #include "FederatorS.h"
00013 #include "FederatorTypeSupportC.h"
00014 #include "FederatorConfig.h"
00015 #include "InfoRepoMulticastResponder.h"
00016 #include "UpdateProcessor_T.h"
00017 #include "UpdateListener_T.h"
00018 #include "Updater.h"
00019 #include "dds/DdsDcpsInfrastructureC.h"
00020 #include "dds/DdsDcpsDomainC.h"
00021 #include "dds/DCPS/Definitions.h"
00022 #include "dds/DCPS/PublisherImpl.h"
00023 #include "dds/DCPS/transport/framework/TransportDefs.h"
00024 #include "ace/Condition_T.h"
00025 
00026 #include <list>
00027 #include <map>
00028 
00029 class TAO_DDS_DCPSInfo_i;
00030 
00031 namespace OpenDDS {
00032 namespace Federator {
00033 
00034 class OpenDDS_Federator_Export ManagerImpl
00035   : public virtual POA_OpenDDS::Federator::Manager,
00036     public virtual Update::Updater,
00037     public virtual UpdateProcessor<OwnerUpdate>,
00038     public virtual UpdateProcessor<TopicUpdate>,
00039     public virtual UpdateProcessor<ParticipantUpdate>,
00040     public virtual UpdateProcessor<SubscriptionUpdate>,
00041     public virtual UpdateProcessor<PublicationUpdate> {
00042 public:
00043   /// Default constructor.
00044   ManagerImpl(Config& config);
00045 
00046   /// Virtual destructor.
00047   virtual ~ManagerImpl();
00048 
00049   // IDL methods.
00050 
00051   virtual CORBA::Boolean discover_federation(
00052     const char * ior);
00053 
00054   virtual Manager_ptr join_federation(
00055     Manager_ptr peer,
00056     FederationDomain federation);
00057 
00058   virtual void leave_federation(
00059     RepoKey id);
00060 
00061   virtual RepoKey federation_id();
00062 
00063   virtual OpenDDS::DCPS::DCPSInfo_ptr repository();
00064 
00065   virtual void initializeOwner(
00066     const OpenDDS::Federator::OwnerUpdate & data);
00067 
00068   virtual void initializeTopic(
00069     const OpenDDS::Federator::TopicUpdate & data);
00070 
00071   virtual void initializeParticipant(
00072     const OpenDDS::Federator::ParticipantUpdate & data);
00073 
00074   virtual void initializePublication(
00075     const OpenDDS::Federator::PublicationUpdate & data);
00076 
00077   virtual void initializeSubscription(
00078     const OpenDDS::Federator::SubscriptionUpdate & data);
00079 
00080   virtual void leave_and_shutdown();
00081 
00082   virtual void shutdown();
00083 
00084   // Servant methods
00085 
00086   /// Establish the update publications and subscriptions.
00087   void initialize();
00088 
00089   /// Release resources gracefully.
00090   void finalize();
00091 
00092   /// Accessors for the DCPSInfo reference.
00093   TAO_DDS_DCPSInfo_i*& info();
00094   TAO_DDS_DCPSInfo_i*  info() const;
00095 
00096   /// Capture a remote callable reference to the DCPSInfo.
00097   void localRepo(::OpenDDS::DCPS::DCPSInfo_ptr repo);
00098 
00099   /// Accessors for the federation Id value.
00100 //  void id(RepoKey val);
00101   const TAO_DDS_DCPSFederationId&  id() const;
00102 
00103   /// Accessors for the ORB.
00104   CORBA::ORB_ptr orb();
00105   void orb(CORBA::ORB_ptr value);
00106 
00107   /// Push our current state to a remote repository.
00108   void pushState(Manager_ptr peer);
00109 
00110   /// Handle any deferred updates that might have become processable.
00111   void processDeferred();
00112 
00113   //
00114   // Updater methods.
00115   //
00116 
00117   virtual void unregisterCallback();
00118 
00119   virtual void requestImage();
00120 
00121   virtual void create(const Update::UTopic& topic);
00122   virtual void create(const Update::UParticipant& participant);
00123   virtual void create(const Update::URActor& reader);
00124   virtual void create(const Update::UWActor& writer);
00125   virtual void create(const Update::OwnershipData& data);
00126 
00127   virtual void update(const Update::IdPath& id, const DDS::DomainParticipantQos& qos);
00128   virtual void update(const Update::IdPath& id, const DDS::TopicQos&             qos);
00129   virtual void update(const Update::IdPath& id, const DDS::DataWriterQos&        qos);
00130   virtual void update(const Update::IdPath& id, const DDS::PublisherQos&         qos);
00131   virtual void update(const Update::IdPath& id, const DDS::DataReaderQos&        qos);
00132   virtual void update(const Update::IdPath& id, const DDS::SubscriberQos&        qos);
00133   virtual void update(const Update::IdPath& id, const DDS::StringSeq&     exprParams);
00134 
00135   virtual void destroy(const Update::IdPath& id, Update::ItemType type, Update::ActorType actor);
00136 
00137   //
00138   // UpdateProcessor<> methods.
00139   //
00140   // "using" directive to fix "Hides the virtual function in virtual base" warning on
00141   // SunOS compiler.
00142   using UpdateProcessor<OwnerUpdate>::processUpdateQos2;
00143   using UpdateProcessor<TopicUpdate>::processUpdateQos2;
00144   using UpdateProcessor<ParticipantUpdate>::processUpdateQos2;
00145   using UpdateProcessor<OwnerUpdate>::processUpdateFilterExpressionParams;
00146   using UpdateProcessor<TopicUpdate>::processUpdateFilterExpressionParams;
00147   using UpdateProcessor<ParticipantUpdate>::processUpdateFilterExpressionParams;
00148   using UpdateProcessor<PublicationUpdate>::processUpdateFilterExpressionParams;
00149 
00150   /// Null implementation for OwnerUpdate samples.
00151   void processCreate(const OwnerUpdate* sample, const DDS::SampleInfo* info);
00152 
00153   /// Create a proxy for a new publication.
00154   void processCreate(const PublicationUpdate* sample, const DDS::SampleInfo* info);
00155 
00156   /// Create a proxy for a new subscription.
00157   void processCreate(const SubscriptionUpdate* sample, const DDS::SampleInfo* info);
00158 
00159   /// Create a proxy for a new participant.
00160   void processCreate(const ParticipantUpdate* sample, const DDS::SampleInfo* info);
00161 
00162   /// Create a proxy for a new topic.
00163   void processCreate(const TopicUpdate* sample, const DDS::SampleInfo* info);
00164 
00165   /// Process ownership changes.
00166   void processUpdateQos1(const OwnerUpdate* sample, const DDS::SampleInfo* info);
00167 
00168   /// Update the proxy DataWriterQos for a publication.
00169   void processUpdateQos1(const PublicationUpdate* sample, const DDS::SampleInfo* info);
00170 
00171   /// Update the proxy PublisherQos for a publication.
00172   void processUpdateQos2(const PublicationUpdate* sample, const DDS::SampleInfo* info);
00173 
00174   /// Update the proxy DataReaderQos for a subscription.
00175   void processUpdateQos1(const SubscriptionUpdate* sample, const DDS::SampleInfo* info);
00176 
00177   /// Update the proxy SubscriberQos for a subscription.
00178   void processUpdateQos2(const SubscriptionUpdate* sample, const DDS::SampleInfo* info);
00179 
00180   /// Update the proxy filter expression params for a subscription.
00181   void processUpdateFilterExpressionParams(const SubscriptionUpdate* sample, const DDS::SampleInfo* info);
00182 
00183   /// Update the proxy ParticipantQos for a participant.
00184   void processUpdateQos1(const ParticipantUpdate* sample, const DDS::SampleInfo* info);
00185 
00186   /// Update the proxy TopicQos for a topic.
00187   void processUpdateQos1(const TopicUpdate* sample, const DDS::SampleInfo* info);
00188 
00189   /// Null implementation for OwnerUpdate samples.
00190   void processDelete(const OwnerUpdate* sample, const DDS::SampleInfo* info);
00191 
00192   /// Delete a proxy for a publication.
00193   void processDelete(const PublicationUpdate* sample, const DDS::SampleInfo* info);
00194 
00195   /// Delete a proxy for a subscription.
00196   void processDelete(const SubscriptionUpdate* sample, const DDS::SampleInfo* info);
00197 
00198   /// Delete a proxy for a participant.
00199   void processDelete(const ParticipantUpdate* sample, const DDS::SampleInfo* info);
00200 
00201   /// Delete a proxy for a topic.
00202   void processDelete(const TopicUpdate* sample, const DDS::SampleInfo* info);
00203 
00204 private:
00205   /// Critical section MUTEX.
00206   ACE_SYNCH_MUTEX lock_;
00207 
00208   /// Condition used to gate joining activities.
00209   ACE_Condition<ACE_SYNCH_MUTEX> joining_;
00210 
00211   /// Simple recursion avoidance during the join operations.
00212   RepoKey joiner_;
00213 
00214   /// Repository to which we joined.
00215   RepoKey joinRepo_;
00216 
00217   /// Flag indicating that we are actively participating in a
00218   /// federation of repositories.
00219   bool federated_;
00220 
00221   /// Map type to hold references to federated repository Managers.
00222   typedef std::map<RepoKey, Manager_var> IdToManagerMap;
00223 
00224   /// The peer with which we have federated.
00225   IdToManagerMap peers_;
00226 
00227   /// The packet sequence number for data that we publish.
00228   OpenDDS::DCPS::SequenceNumber sequence_;
00229 
00230   /// The configuration information for this manager.
00231   Config& config_;
00232 
00233   /// The Info object reference to update.
00234   TAO_DDS_DCPSInfo_i* info_;
00235 
00236   /// Remotely callable reference to the local repository.
00237   OpenDDS::DCPS::DCPSInfo_var localRepo_;
00238 
00239   /// The ORB in which we are activated.
00240   CORBA::ORB_var orb_;
00241 
00242   /// Multicast responder
00243   InfoRepoMulticastResponder multicastResponder_;
00244 
00245   /// local DomainParticipant
00246   DDS::DomainParticipant_var federationParticipant_;
00247 
00248   /// TopicUpdate listener
00249   UpdateListener<OwnerUpdate, OwnerUpdateDataReader> ownerListener_;
00250 
00251   /// TopicUpdate listener
00252   UpdateListener<TopicUpdate, TopicUpdateDataReader> topicListener_;
00253 
00254   /// ParticipantUpdate listener
00255   UpdateListener<ParticipantUpdate, ParticipantUpdateDataReader> participantListener_;
00256 
00257   /// PublicationUpdate listener
00258   UpdateListener<PublicationUpdate, PublicationUpdateDataReader> publicationListener_;
00259 
00260   /// SubscriptionUpdate listener
00261   UpdateListener<SubscriptionUpdate, SubscriptionUpdateDataReader> subscriptionListener_;
00262 
00263   /// TopicUpdate writer
00264   OwnerUpdateDataWriter_var ownerWriter_;
00265 
00266   /// TopicUpdate writer
00267   TopicUpdateDataWriter_var topicWriter_;
00268 
00269   /// ParticipantUpdate writer
00270   ParticipantUpdateDataWriter_var participantWriter_;
00271 
00272   /// PublicationUpdate writer
00273   PublicationUpdateDataWriter_var publicationWriter_;
00274 
00275   /// SubscriptionUpdate writer
00276   SubscriptionUpdateDataWriter_var subscriptionWriter_;
00277 
00278   /// Deferred ownership updates
00279   std::list<OwnerUpdate> deferredOwnerships_;
00280 
00281   /// Deferred topic updates
00282   std::list<TopicUpdate> deferredTopics_;
00283 
00284   /// Deferred publication updates
00285   std::list<PublicationUpdate> deferredPublications_;
00286 
00287   /// Deferred subscription updates
00288   std::list<SubscriptionUpdate> deferredSubscriptions_;
00289 
00290   /// Is multicast enabled?
00291   bool multicastEnabled_;
00292 
00293   /// Protect deferred updates.
00294   ACE_Thread_Mutex deferred_lock_;
00295 };
00296 
00297 }
00298 } // End namespace OpenDDS::Federator
00299 
00300 #if defined (__ACE_INLINE__)
00301 # include "FederatorManagerImpl.inl"
00302 #endif  /* __ACE_INLINE__ */
00303 
00304 #endif /* FEDERATORMANAGERIMPL_H */

Generated on Fri Feb 12 20:05:23 2016 for OpenDDS by  doxygen 1.4.7