00001
00002
00003
00004
00005
00006
00007
00008 #ifndef FEDERATORMANAGERIMPL_H
00009 #define FEDERATORMANAGERIMPL_H
00010
00011 #include "federator_export.h"
00012 #include "FederatorS.h"
00013 #include "FederatorTypeSupportC.h"
00014 #include "FederatorConfig.h"
00015 #include "InfoRepoMulticastResponder.h"
00016 #include "UpdateProcessor_T.h"
00017 #include "UpdateListener_T.h"
00018 #include "Updater.h"
00019 #include "dds/DdsDcpsInfrastructureC.h"
00020 #include "dds/DdsDcpsDomainC.h"
00021 #include "dds/DCPS/Definitions.h"
00022 #include "dds/DCPS/PublisherImpl.h"
00023 #include "dds/DCPS/transport/framework/TransportDefs.h"
00024 #include "ace/Condition_T.h"
00025
00026 #include <list>
00027 #include <map>
00028
00029 class TAO_DDS_DCPSInfo_i;
00030
00031 namespace OpenDDS {
00032 namespace Federator {
00033
00034 class OpenDDS_Federator_Export ManagerImpl
00035 : public virtual POA_OpenDDS::Federator::Manager,
00036 public virtual Update::Updater,
00037 public virtual UpdateProcessor<OwnerUpdate>,
00038 public virtual UpdateProcessor<TopicUpdate>,
00039 public virtual UpdateProcessor<ParticipantUpdate>,
00040 public virtual UpdateProcessor<SubscriptionUpdate>,
00041 public virtual UpdateProcessor<PublicationUpdate> {
00042 public:
00043
00044 ManagerImpl(Config& config);
00045
00046
00047 virtual ~ManagerImpl();
00048
00049
00050
00051 virtual CORBA::Boolean discover_federation(
00052 const char * ior);
00053
00054 virtual Manager_ptr join_federation(
00055 Manager_ptr peer,
00056 FederationDomain federation);
00057
00058 virtual void leave_federation(
00059 RepoKey id);
00060
00061 virtual RepoKey federation_id();
00062
00063 virtual OpenDDS::DCPS::DCPSInfo_ptr repository();
00064
00065 virtual void initializeOwner(
00066 const OpenDDS::Federator::OwnerUpdate & data);
00067
00068 virtual void initializeTopic(
00069 const OpenDDS::Federator::TopicUpdate & data);
00070
00071 virtual void initializeParticipant(
00072 const OpenDDS::Federator::ParticipantUpdate & data);
00073
00074 virtual void initializePublication(
00075 const OpenDDS::Federator::PublicationUpdate & data);
00076
00077 virtual void initializeSubscription(
00078 const OpenDDS::Federator::SubscriptionUpdate & data);
00079
00080 virtual void leave_and_shutdown();
00081
00082 virtual void shutdown();
00083
00084
00085
00086
00087 void initialize();
00088
00089
00090 void finalize();
00091
00092
00093 TAO_DDS_DCPSInfo_i*& info();
00094 TAO_DDS_DCPSInfo_i* info() const;
00095
00096
00097 void localRepo(::OpenDDS::DCPS::DCPSInfo_ptr repo);
00098
00099
00100
00101 const TAO_DDS_DCPSFederationId& id() const;
00102
00103
00104 CORBA::ORB_ptr orb();
00105 void orb(CORBA::ORB_ptr value);
00106
00107
00108 void pushState(Manager_ptr peer);
00109
00110
00111 void processDeferred();
00112
00113
00114
00115
00116
00117 virtual void unregisterCallback();
00118
00119 virtual void requestImage();
00120
00121 virtual void create(const Update::UTopic& topic);
00122 virtual void create(const Update::UParticipant& participant);
00123 virtual void create(const Update::URActor& reader);
00124 virtual void create(const Update::UWActor& writer);
00125 virtual void create(const Update::OwnershipData& data);
00126
00127 virtual void update(const Update::IdPath& id, const DDS::DomainParticipantQos& qos);
00128 virtual void update(const Update::IdPath& id, const DDS::TopicQos& qos);
00129 virtual void update(const Update::IdPath& id, const DDS::DataWriterQos& qos);
00130 virtual void update(const Update::IdPath& id, const DDS::PublisherQos& qos);
00131 virtual void update(const Update::IdPath& id, const DDS::DataReaderQos& qos);
00132 virtual void update(const Update::IdPath& id, const DDS::SubscriberQos& qos);
00133 virtual void update(const Update::IdPath& id, const DDS::StringSeq& exprParams);
00134
00135 virtual void destroy(const Update::IdPath& id, Update::ItemType type, Update::ActorType actor);
00136
00137
00138
00139
00140
00141
00142 using UpdateProcessor<OwnerUpdate>::processUpdateQos2;
00143 using UpdateProcessor<TopicUpdate>::processUpdateQos2;
00144 using UpdateProcessor<ParticipantUpdate>::processUpdateQos2;
00145 using UpdateProcessor<OwnerUpdate>::processUpdateFilterExpressionParams;
00146 using UpdateProcessor<TopicUpdate>::processUpdateFilterExpressionParams;
00147 using UpdateProcessor<ParticipantUpdate>::processUpdateFilterExpressionParams;
00148 using UpdateProcessor<PublicationUpdate>::processUpdateFilterExpressionParams;
00149
00150
00151 void processCreate(const OwnerUpdate* sample, const DDS::SampleInfo* info);
00152
00153
00154 void processCreate(const PublicationUpdate* sample, const DDS::SampleInfo* info);
00155
00156
00157 void processCreate(const SubscriptionUpdate* sample, const DDS::SampleInfo* info);
00158
00159
00160 void processCreate(const ParticipantUpdate* sample, const DDS::SampleInfo* info);
00161
00162
00163 void processCreate(const TopicUpdate* sample, const DDS::SampleInfo* info);
00164
00165
00166 void processUpdateQos1(const OwnerUpdate* sample, const DDS::SampleInfo* info);
00167
00168
00169 void processUpdateQos1(const PublicationUpdate* sample, const DDS::SampleInfo* info);
00170
00171
00172 void processUpdateQos2(const PublicationUpdate* sample, const DDS::SampleInfo* info);
00173
00174
00175 void processUpdateQos1(const SubscriptionUpdate* sample, const DDS::SampleInfo* info);
00176
00177
00178 void processUpdateQos2(const SubscriptionUpdate* sample, const DDS::SampleInfo* info);
00179
00180
00181 void processUpdateFilterExpressionParams(const SubscriptionUpdate* sample, const DDS::SampleInfo* info);
00182
00183
00184 void processUpdateQos1(const ParticipantUpdate* sample, const DDS::SampleInfo* info);
00185
00186
00187 void processUpdateQos1(const TopicUpdate* sample, const DDS::SampleInfo* info);
00188
00189
00190 void processDelete(const OwnerUpdate* sample, const DDS::SampleInfo* info);
00191
00192
00193 void processDelete(const PublicationUpdate* sample, const DDS::SampleInfo* info);
00194
00195
00196 void processDelete(const SubscriptionUpdate* sample, const DDS::SampleInfo* info);
00197
00198
00199 void processDelete(const ParticipantUpdate* sample, const DDS::SampleInfo* info);
00200
00201
00202 void processDelete(const TopicUpdate* sample, const DDS::SampleInfo* info);
00203
00204 private:
00205
00206 ACE_SYNCH_MUTEX lock_;
00207
00208
00209 ACE_Condition<ACE_SYNCH_MUTEX> joining_;
00210
00211
00212 RepoKey joiner_;
00213
00214
00215 RepoKey joinRepo_;
00216
00217
00218
00219 bool federated_;
00220
00221
00222 typedef std::map<RepoKey, Manager_var> IdToManagerMap;
00223
00224
00225 IdToManagerMap peers_;
00226
00227
00228 OpenDDS::DCPS::SequenceNumber sequence_;
00229
00230
00231 Config& config_;
00232
00233
00234 TAO_DDS_DCPSInfo_i* info_;
00235
00236
00237 OpenDDS::DCPS::DCPSInfo_var localRepo_;
00238
00239
00240 CORBA::ORB_var orb_;
00241
00242
00243 InfoRepoMulticastResponder multicastResponder_;
00244
00245
00246 DDS::DomainParticipant_var federationParticipant_;
00247
00248
00249 UpdateListener<OwnerUpdate, OwnerUpdateDataReader> ownerListener_;
00250
00251
00252 UpdateListener<TopicUpdate, TopicUpdateDataReader> topicListener_;
00253
00254
00255 UpdateListener<ParticipantUpdate, ParticipantUpdateDataReader> participantListener_;
00256
00257
00258 UpdateListener<PublicationUpdate, PublicationUpdateDataReader> publicationListener_;
00259
00260
00261 UpdateListener<SubscriptionUpdate, SubscriptionUpdateDataReader> subscriptionListener_;
00262
00263
00264 OwnerUpdateDataWriter_var ownerWriter_;
00265
00266
00267 TopicUpdateDataWriter_var topicWriter_;
00268
00269
00270 ParticipantUpdateDataWriter_var participantWriter_;
00271
00272
00273 PublicationUpdateDataWriter_var publicationWriter_;
00274
00275
00276 SubscriptionUpdateDataWriter_var subscriptionWriter_;
00277
00278
00279 std::list<OwnerUpdate> deferredOwnerships_;
00280
00281
00282 std::list<TopicUpdate> deferredTopics_;
00283
00284
00285 std::list<PublicationUpdate> deferredPublications_;
00286
00287
00288 std::list<SubscriptionUpdate> deferredSubscriptions_;
00289
00290
00291 bool multicastEnabled_;
00292
00293
00294 ACE_Thread_Mutex deferred_lock_;
00295 };
00296
00297 }
00298 }
00299
00300 #if defined (__ACE_INLINE__)
00301 # include "FederatorManagerImpl.inl"
00302 #endif
00303
00304 #endif