OpenDDS  Snapshot(2023/04/28-20:55)
FederatorManagerImpl.h
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 #ifndef FEDERATORMANAGERIMPL_H
9 #define FEDERATORMANAGERIMPL_H
10 
11 #include "federator_export.h"
12 #include "FederatorS.h"
13 #include "FederatorTypeSupportC.h"
14 #include "FederatorConfig.h"
16 #include "UpdateProcessor_T.h"
17 #include "UpdateListener_T.h"
18 #include "Updater.h"
19 #include "dds/DdsDcpsInfrastructureC.h"
20 #include "dds/DdsDcpsDomainC.h"
21 #include "dds/DCPS/Definitions.h"
22 #include "dds/DCPS/PublisherImpl.h"
24 #include "ace/Synch_Traits.h"
25 
26 #include <list>
27 #include <map>
28 
30 
31 class TAO_DDS_DCPSInfo_i;
32 
33 namespace OpenDDS {
34 namespace Federator {
35 
37  : public virtual POA_OpenDDS::Federator::Manager,
38  public virtual Update::Updater,
39  public virtual UpdateProcessor<OwnerUpdate>,
40  public virtual UpdateProcessor<TopicUpdate>,
41  public virtual UpdateProcessor<ParticipantUpdate>,
42  public virtual UpdateProcessor<SubscriptionUpdate>,
43  public virtual UpdateProcessor<PublicationUpdate> {
44 public:
45  ManagerImpl(Config& config);
46 
47  virtual ~ManagerImpl();
48 
49  // IDL methods.
50 
51  virtual CORBA::Boolean discover_federation(
52  const char * ior);
53 
54  virtual Manager_ptr join_federation(
55  Manager_ptr peer,
56  FederationDomain federation);
57 
58  virtual void leave_federation(
59  RepoKey id);
60 
61  virtual RepoKey federation_id();
62 
63  virtual OpenDDS::DCPS::DCPSInfo_ptr repository();
64 
65  virtual void initializeOwner(
66  const OpenDDS::Federator::OwnerUpdate & data);
67 
68  virtual void initializeTopic(
69  const OpenDDS::Federator::TopicUpdate & data);
70 
71  virtual void initializeParticipant(
73 
74  virtual void initializePublication(
76 
77  virtual void initializeSubscription(
79 
80  virtual void leave_and_shutdown();
81 
82  virtual void shutdown();
83 
84  // Servant methods
85 
86  /// Establish the update publications and subscriptions.
87  void initialize();
88 
89  /// Release resources gracefully.
90  void finalize();
91 
92  /// Accessors for the DCPSInfo reference.
93  TAO_DDS_DCPSInfo_i*& info();
94  TAO_DDS_DCPSInfo_i* info() const;
95 
96  /// Capture a remote callable reference to the DCPSInfo.
97  void localRepo(::OpenDDS::DCPS::DCPSInfo_ptr repo);
98 
99  /// Accessors for the federation Id value.
100 // void id(RepoKey val);
101  const TAO_DDS_DCPSFederationId& id() const;
102 
103  /// Accessors for the ORB.
104  CORBA::ORB_ptr orb();
105  void orb(CORBA::ORB_ptr value);
106 
107  /// Push our current state to a remote repository.
108  void pushState(Manager_ptr peer);
109 
110  /// Handle any deferred updates that might have become processable.
111  void processDeferred();
112 
113  //
114  // Updater methods.
115  //
116 
117  virtual void unregisterCallback();
118 
119  virtual void requestImage();
120 
121  virtual void create(const Update::UTopic& topic);
122  virtual void create(const Update::UParticipant& participant);
123  virtual void create(const Update::URActor& reader);
124  virtual void create(const Update::UWActor& writer);
125  virtual void create(const Update::OwnershipData& data);
126 
127  virtual void update(const Update::IdPath& id, const DDS::DomainParticipantQos& qos);
128  virtual void update(const Update::IdPath& id, const DDS::TopicQos& qos);
129  virtual void update(const Update::IdPath& id, const DDS::DataWriterQos& qos);
130  virtual void update(const Update::IdPath& id, const DDS::PublisherQos& qos);
131  virtual void update(const Update::IdPath& id, const DDS::DataReaderQos& qos);
132  virtual void update(const Update::IdPath& id, const DDS::SubscriberQos& qos);
133  virtual void update(const Update::IdPath& id, const DDS::StringSeq& exprParams);
134 
135  virtual void destroy(const Update::IdPath& id, Update::ItemType type, Update::ActorType actor);
136 
137  //
138  // UpdateProcessor<> methods.
139  //
140  // "using" directive to fix "Hides the virtual function in virtual base" warning on
141  // various compilers.
149 
150  /// Null implementation for OwnerUpdate samples.
151  void processCreate(const OwnerUpdate* sample, const DDS::SampleInfo* info);
152 
153  /// Create a proxy for a new publication.
154  void processCreate(const PublicationUpdate* sample, const DDS::SampleInfo* info);
155 
156  /// Create a proxy for a new subscription.
157  void processCreate(const SubscriptionUpdate* sample, const DDS::SampleInfo* info);
158 
159  /// Create a proxy for a new participant.
160  void processCreate(const ParticipantUpdate* sample, const DDS::SampleInfo* info);
161 
162  /// Create a proxy for a new topic.
163  void processCreate(const TopicUpdate* sample, const DDS::SampleInfo* info);
164 
165  /// Process ownership changes.
166  void processUpdateQos1(const OwnerUpdate* sample, const DDS::SampleInfo* info);
167 
168  /// Update the proxy DataWriterQos for a publication.
169  void processUpdateQos1(const PublicationUpdate* sample, const DDS::SampleInfo* info);
170 
171  /// Update the proxy PublisherQos for a publication.
172  void processUpdateQos2(const PublicationUpdate* sample, const DDS::SampleInfo* info);
173 
174  /// Update the proxy DataReaderQos for a subscription.
175  void processUpdateQos1(const SubscriptionUpdate* sample, const DDS::SampleInfo* info);
176 
177  /// Update the proxy SubscriberQos for a subscription.
178  void processUpdateQos2(const SubscriptionUpdate* sample, const DDS::SampleInfo* info);
179 
180  /// Update the proxy filter expression params for a subscription.
181  void processUpdateFilterExpressionParams(const SubscriptionUpdate* sample, const DDS::SampleInfo* info);
182 
183  /// Update the proxy ParticipantQos for a participant.
184  void processUpdateQos1(const ParticipantUpdate* sample, const DDS::SampleInfo* info);
185 
186  /// Update the proxy TopicQos for a topic.
187  void processUpdateQos1(const TopicUpdate* sample, const DDS::SampleInfo* info);
188 
189  /// Null implementation for OwnerUpdate samples.
190  void processDelete(const OwnerUpdate* sample, const DDS::SampleInfo* info);
191 
192  /// Delete a proxy for a publication.
193  void processDelete(const PublicationUpdate* sample, const DDS::SampleInfo* info);
194 
195  /// Delete a proxy for a subscription.
196  void processDelete(const SubscriptionUpdate* sample, const DDS::SampleInfo* info);
197 
198  /// Delete a proxy for a participant.
199  void processDelete(const ParticipantUpdate* sample, const DDS::SampleInfo* info);
200 
201  /// Delete a proxy for a topic.
202  void processDelete(const TopicUpdate* sample, const DDS::SampleInfo* info);
203 
204 private:
205  /// Critical section MUTEX.
207 
208  /// Condition used to gate joining activities.
210 
211  /// Simple recursion avoidance during the join operations.
213 
214  /// Repository to which we joined.
216 
217  /// Flag indicating that we are actively participating in a
218  /// federation of repositories.
220 
221  /// Map type to hold references to federated repository Managers.
222  typedef std::map<RepoKey, Manager_var> IdToManagerMap;
223 
224  /// The peer with which we have federated.
225  IdToManagerMap peers_;
226 
227  /// The packet sequence number for data that we publish.
229 
230  /// The configuration information for this manager.
232 
233  /// The Info object reference to update.
235 
236  /// Remotely callable reference to the local repository.
237  OpenDDS::DCPS::DCPSInfo_var localRepo_;
238 
239  /// The ORB in which we are activated.
241 
242  /// Multicast responder
244 
245  /// local DomainParticipant
246  DDS::DomainParticipant_var federationParticipant_;
247 
248  /// TopicUpdate listener
250 
251  /// TopicUpdate listener
253 
254  /// ParticipantUpdate listener
256 
257  /// PublicationUpdate listener
259 
260  /// SubscriptionUpdate listener
262 
263  /// TopicUpdate writer
264  OwnerUpdateDataWriter_var ownerWriter_;
265 
266  /// TopicUpdate writer
267  TopicUpdateDataWriter_var topicWriter_;
268 
269  /// ParticipantUpdate writer
270  ParticipantUpdateDataWriter_var participantWriter_;
271 
272  /// PublicationUpdate writer
273  PublicationUpdateDataWriter_var publicationWriter_;
274 
275  /// SubscriptionUpdate writer
276  SubscriptionUpdateDataWriter_var subscriptionWriter_;
277 
278  /// Deferred ownership updates
279  std::list<OwnerUpdate> deferredOwnerships_;
280 
281  /// Deferred topic updates
282  std::list<TopicUpdate> deferredTopics_;
283 
284  /// Deferred publication updates
285  std::list<PublicationUpdate> deferredPublications_;
286 
287  /// Deferred subscription updates
288  std::list<SubscriptionUpdate> deferredSubscriptions_;
289 
290  /// Is multicast enabled?
292 
293  /// Protect deferred updates.
295 };
296 
297 }
298 } // End namespace OpenDDS::Federator
299 
301 
302 #if defined (__ACE_INLINE__)
303 # include "FederatorManagerImpl.inl"
304 #endif /* __ACE_INLINE__ */
305 
306 #endif /* FEDERATORMANAGERIMPL_H */
bool multicastEnabled_
Is multicast enabled?
RepoKey joiner_
Simple recursion avoidance during the join operations.
std::list< TopicUpdate > deferredTopics_
Deferred topic updates.
UpdateListener< SubscriptionUpdate, SubscriptionUpdateDataReader > subscriptionListener_
SubscriptionUpdate listener.
const LogLevel::Value value
Definition: debug.cpp:61
#define ACE_SYNCH_MUTEX
Interface for managing update publications.
TAO_DDS_DCPSInfo_i * info_
The Info object reference to update.
InfoRepoMulticastResponder multicastResponder_
Multicast responder.
CORBA::ORB_var orb_
The ORB in which we are activated.
ParticipantUpdateDataWriter_var participantWriter_
ParticipantUpdate writer.
IdToManagerMap peers_
The peer with which we have federated.
Config & config_
The configuration information for this manager.
SubscriptionUpdateDataWriter_var subscriptionWriter_
SubscriptionUpdate writer.
OpenDDS::DCPS::SequenceNumber sequence_
The packet sequence number for data that we publish.
OwnerUpdateDataWriter_var ownerWriter_
TopicUpdate writer.
ACE_Condition< ACE_SYNCH_MUTEX > joining_
Condition used to gate joining activities.
UpdateListener< OwnerUpdate, OwnerUpdateDataReader > ownerListener_
TopicUpdate listener.
RepoKey joinRepo_
Repository to which we joined.
ACE_CDR::Boolean Boolean
UpdateListener< PublicationUpdate, PublicationUpdateDataReader > publicationListener_
PublicationUpdate listener.
DDS::DomainId_t FederationDomain
Definition: Federator.idl:21
UpdateListener< ParticipantUpdate, ParticipantUpdateDataReader > participantListener_
ParticipantUpdate listener.
Implementation of the DCPSInfo.
Definition: DCPSInfo_i.h:53
ACE_SYNCH_MUTEX lock_
Critical section MUTEX.
#define OpenDDS_Federator_Export
std::map< RepoKey, Manager_var > IdToManagerMap
Map type to hold references to federated repository Managers.
Event Handler that services multicast requests for IOR of a bootstrappable service.
std::list< PublicationUpdate > deferredPublications_
Deferred publication updates.
::CORBA::Long RepoKey
Sequence number abstraction. Only allows positive 64 bit values.
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
UpdateListener< TopicUpdate, TopicUpdateDataReader > topicListener_
TopicUpdate listener.
ACE_Thread_Mutex deferred_lock_
Protect deferred updates.
std::list< OwnerUpdate > deferredOwnerships_
Deferred ownership updates.
PublicationUpdateDataWriter_var publicationWriter_
PublicationUpdate writer.
DDS::DomainParticipant_var federationParticipant_
local DomainParticipant
int shutdown(ACE_HANDLE handle, int how)
TopicUpdateDataWriter_var topicWriter_
TopicUpdate writer.
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
std::list< SubscriptionUpdate > deferredSubscriptions_
Deferred subscription updates.
sequence< string > StringSeq
Definition: DdsDcpsCore.idl:50
OpenDDS::DCPS::DCPSInfo_var localRepo_
Remotely callable reference to the local repository.