#include <FederatorManagerImpl.h>
Inheritance diagram for OpenDDS::Federator::ManagerImpl:
Public Member Functions | |
ManagerImpl (Config &config) | |
Default constructor. | |
virtual | ~ManagerImpl () |
Virtual destructor. | |
virtual CORBA::Boolean | discover_federation (const char *ior) |
virtual Manager_ptr | join_federation (Manager_ptr peer, FederationDomain federation) |
virtual void | leave_federation (RepoKey id) |
virtual RepoKey | federation_id () |
virtual OpenDDS::DCPS::DCPSInfo_ptr | repository () |
virtual void | initializeOwner (const OpenDDS::Federator::OwnerUpdate &data) |
virtual void | initializeTopic (const OpenDDS::Federator::TopicUpdate &data) |
virtual void | initializeParticipant (const OpenDDS::Federator::ParticipantUpdate &data) |
virtual void | initializePublication (const OpenDDS::Federator::PublicationUpdate &data) |
virtual void | initializeSubscription (const OpenDDS::Federator::SubscriptionUpdate &data) |
virtual void | leave_and_shutdown () |
virtual void | shutdown () |
void | initialize () |
Establish the update publications and subscriptions. | |
void | finalize () |
Release resources gracefully. | |
TAO_DDS_DCPSInfo_i *& | info () |
Accessors for the DCPSInfo reference. | |
TAO_DDS_DCPSInfo_i * | info () const |
void | localRepo (::OpenDDS::DCPS::DCPSInfo_ptr repo) |
Capture a remote callable reference to the DCPSInfo. | |
const TAO_DDS_DCPSFederationId & | id () const |
Accessors for the federation Id value. | |
CORBA::ORB_ptr | orb () |
Accessors for the ORB. | |
void | orb (CORBA::ORB_ptr value) |
void | pushState (Manager_ptr peer) |
Push our current state to a remote repository. | |
void | processDeferred () |
Handle any deferred updates that might have become processable. | |
virtual void | unregisterCallback () |
virtual void | requestImage () |
virtual void | create (const Update::UTopic &topic) |
virtual void | create (const Update::UParticipant &participant) |
virtual void | create (const Update::URActor &reader) |
virtual void | create (const Update::UWActor &writer) |
virtual void | create (const Update::OwnershipData &data) |
virtual void | update (const Update::IdPath &id, const DDS::DomainParticipantQos &qos) |
virtual void | update (const Update::IdPath &id, const DDS::TopicQos &qos) |
virtual void | update (const Update::IdPath &id, const DDS::DataWriterQos &qos) |
virtual void | update (const Update::IdPath &id, const DDS::PublisherQos &qos) |
virtual void | update (const Update::IdPath &id, const DDS::DataReaderQos &qos) |
virtual void | update (const Update::IdPath &id, const DDS::SubscriberQos &qos) |
virtual void | update (const Update::IdPath &id, const DDS::StringSeq &exprParams) |
virtual void | destroy (const Update::IdPath &id, Update::ItemType type, Update::ActorType actor) |
void | processCreate (const OwnerUpdate *sample, const DDS::SampleInfo *info) |
Null implementation for OwnerUpdate samples. | |
void | processCreate (const PublicationUpdate *sample, const DDS::SampleInfo *info) |
Create a proxy for a new publication. | |
void | processCreate (const SubscriptionUpdate *sample, const DDS::SampleInfo *info) |
Create a proxy for a new subscription. | |
void | processCreate (const ParticipantUpdate *sample, const DDS::SampleInfo *info) |
Create a proxy for a new participant. | |
void | processCreate (const TopicUpdate *sample, const DDS::SampleInfo *info) |
Create a proxy for a new topic. | |
void | processUpdateQos1 (const OwnerUpdate *sample, const DDS::SampleInfo *info) |
Process ownership changes. | |
void | processUpdateQos1 (const PublicationUpdate *sample, const DDS::SampleInfo *info) |
Update the proxy DataWriterQos for a publication. | |
void | processUpdateQos2 (const PublicationUpdate *sample, const DDS::SampleInfo *info) |
Update the proxy PublisherQos for a publication. | |
void | processUpdateQos1 (const SubscriptionUpdate *sample, const DDS::SampleInfo *info) |
Update the proxy DataReaderQos for a subscription. | |
void | processUpdateQos2 (const SubscriptionUpdate *sample, const DDS::SampleInfo *info) |
Update the proxy SubscriberQos for a subscription. | |
void | processUpdateFilterExpressionParams (const SubscriptionUpdate *sample, const DDS::SampleInfo *info) |
Update the proxy filter expression params for a subscription. | |
void | processUpdateQos1 (const ParticipantUpdate *sample, const DDS::SampleInfo *info) |
Update the proxy ParticipantQos for a participant. | |
void | processUpdateQos1 (const TopicUpdate *sample, const DDS::SampleInfo *info) |
Update the proxy TopicQos for a topic. | |
void | processDelete (const OwnerUpdate *sample, const DDS::SampleInfo *info) |
Null implementation for OwnerUpdate samples. | |
void | processDelete (const PublicationUpdate *sample, const DDS::SampleInfo *info) |
Delete a proxy for a publication. | |
void | processDelete (const SubscriptionUpdate *sample, const DDS::SampleInfo *info) |
Delete a proxy for a subscription. | |
void | processDelete (const ParticipantUpdate *sample, const DDS::SampleInfo *info) |
Delete a proxy for a participant. | |
void | processDelete (const TopicUpdate *sample, const DDS::SampleInfo *info) |
Delete a proxy for a topic. | |
Private Types | |
typedef std::map< RepoKey, Manager_var > | IdToManagerMap |
Map type to hold references to federated repository Managers. | |
Private Attributes | |
ACE_SYNCH_MUTEX | lock_ |
Critical section MUTEX. | |
ACE_Condition< ACE_SYNCH_MUTEX > | joining_ |
Condition used to gate joining activities. | |
RepoKey | joiner_ |
Simple recursion avoidance during the join operations. | |
RepoKey | joinRepo_ |
Repository to which we joined. | |
bool | federated_ |
IdToManagerMap | peers_ |
The peer with which we have federated. | |
OpenDDS::DCPS::SequenceNumber | sequence_ |
The packet sequence number for data that we publish. | |
Config & | config_ |
The configuration information for this manager. | |
TAO_DDS_DCPSInfo_i * | info_ |
The Info object reference to update. | |
OpenDDS::DCPS::DCPSInfo_var | localRepo_ |
Remotely callable reference to the local repository. | |
CORBA::ORB_var | orb_ |
The ORB in which we are activated. | |
InfoRepoMulticastResponder | multicastResponder_ |
Multicast responder. | |
DDS::DomainParticipant_var | federationParticipant_ |
local DomainParticipant | |
UpdateListener< OwnerUpdate, OwnerUpdateDataReader > | ownerListener_ |
TopicUpdate listener. | |
UpdateListener< TopicUpdate, TopicUpdateDataReader > | topicListener_ |
TopicUpdate listener. | |
UpdateListener< ParticipantUpdate, ParticipantUpdateDataReader > | participantListener_ |
ParticipantUpdate listener. | |
UpdateListener< PublicationUpdate, PublicationUpdateDataReader > | publicationListener_ |
PublicationUpdate listener. | |
UpdateListener< SubscriptionUpdate, SubscriptionUpdateDataReader > | subscriptionListener_ |
SubscriptionUpdate listener. | |
OwnerUpdateDataWriter_var | ownerWriter_ |
TopicUpdate writer. | |
TopicUpdateDataWriter_var | topicWriter_ |
TopicUpdate writer. | |
ParticipantUpdateDataWriter_var | participantWriter_ |
ParticipantUpdate writer. | |
PublicationUpdateDataWriter_var | publicationWriter_ |
PublicationUpdate writer. | |
SubscriptionUpdateDataWriter_var | subscriptionWriter_ |
SubscriptionUpdate writer. | |
std::list< OwnerUpdate > | deferredOwnerships_ |
Deferred ownership updates. | |
std::list< TopicUpdate > | deferredTopics_ |
Deferred topic updates. | |
std::list< PublicationUpdate > | deferredPublications_ |
Deferred publication updates. | |
std::list< SubscriptionUpdate > | deferredSubscriptions_ |
Deferred subscription updates. | |
bool | multicastEnabled_ |
Is multicast enabled? | |
ACE_Thread_Mutex | deferred_lock_ |
Protect deferred updates. |
Definition at line 34 of file FederatorManagerImpl.h.
typedef std::map<RepoKey, Manager_var> OpenDDS::Federator::ManagerImpl::IdToManagerMap [private] |
Map type to hold references to federated repository Managers.
Definition at line 222 of file FederatorManagerImpl.h.
OpenDDS::Federator::ManagerImpl::ManagerImpl | ( | Config & | config | ) |
Default constructor.
Definition at line 37 of file FederatorManagerImpl.cpp.
References OpenDDS::DCPS::DCPS_debug_level, and multicastEnabled_.
00038 : joining_(this->lock_), 00039 joiner_(NIL_REPOSITORY), 00040 joinRepo_(NIL_REPOSITORY), 00041 federated_(false), 00042 config_(config), 00043 info_(0), 00044 ownerListener_(*this), 00045 topicListener_(*this), 00046 participantListener_(*this), 00047 publicationListener_(*this), 00048 subscriptionListener_(*this), 00049 multicastEnabled_(false) 00050 { 00051 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 00052 ACE_DEBUG((LM_DEBUG, 00053 ACE_TEXT("(%P|%t) Federator::ManagerImpl::ManagerImpl()\n"))); 00054 } 00055 00056 char* mdec = ACE_OS::getenv("MulticastDiscoveryEnabled"); 00057 00058 if (mdec != 0) { 00059 std::string mde(ACE_OS::getenv("MulticastDiscoveryEnabled")); 00060 00061 if (mde != "0") { 00062 multicastEnabled_ = true; 00063 } 00064 } 00065 }
OpenDDS::Federator::ManagerImpl::~ManagerImpl | ( | ) | [virtual] |
Virtual destructor.
Definition at line 67 of file FederatorManagerImpl.cpp.
References OpenDDS::DCPS::DCPS_debug_level.
00068 { 00069 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 00070 ACE_DEBUG((LM_DEBUG, 00071 ACE_TEXT("(%P|%t) Federator::ManagerImpl::~ManagerImpl()\n"))); 00072 } 00073 }
void OpenDDS::Federator::ManagerImpl::create | ( | const Update::OwnershipData & | data | ) | [virtual] |
Implements Update::Updater.
Definition at line 177 of file FederatorManagerImpl_updates.cpp.
References OpenDDS::Federator::OwnerUpdate::action, OpenDDS::Federator::CreateEntity, OpenDDS::DCPS::DCPS_debug_level, Update::OwnershipData::domain, OpenDDS::Federator::OwnerUpdate::domain, DDS::HANDLE_NIL, TAO_DDS_DCPSFederationId::id(), id(), Update::OwnershipData::owner, OpenDDS::Federator::OwnerUpdate::owner, ownerWriter_, Update::OwnershipData::participant, OpenDDS::Federator::OwnerUpdate::participant, and OpenDDS::Federator::OwnerUpdate::sender.
00178 { 00179 if (CORBA::is_nil(this->ownerWriter_.in())) { 00180 // Decline to publish data until we can. 00181 return; 00182 } 00183 00184 OwnerUpdate sample; 00185 sample.sender = this->id().id(); 00186 sample.action = CreateEntity; 00187 00188 sample.domain = data.domain; 00189 sample.participant = data.participant; 00190 sample.owner = data.owner; 00191 00192 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 00193 OpenDDS::DCPS::RepoIdConverter converter(sample.participant); 00194 ACE_DEBUG((LM_DEBUG, 00195 ACE_TEXT("(%P|%t) Federator::ManagerImpl::create( OwnerUpdate): ") 00196 ACE_TEXT("repo %d - [ domain %d/ participant %C/ sender %d/ owner %d ]\n"), 00197 this->id().id(), 00198 sample.domain, 00199 std::string(converter).c_str(), 00200 sample.sender, 00201 sample.owner)); 00202 } 00203 00204 this->ownerWriter_->write(sample, DDS::HANDLE_NIL); 00205 }
virtual void OpenDDS::Federator::ManagerImpl::create | ( | const Update::UWActor & | writer | ) | [virtual] |
Implements Update::Updater.
void OpenDDS::Federator::ManagerImpl::create | ( | const Update::URActor & | reader | ) | [virtual] |
Implements Update::Updater.
Definition at line 102 of file FederatorManagerImpl_updates.cpp.
References OpenDDS::Federator::SubscriptionUpdate::action, Update::ActorStrt< PSQ, RWQ, C, T, CSP >::actorId, Update::ActorStrt< PSQ, RWQ, C, T, CSP >::callback, OpenDDS::Federator::SubscriptionUpdate::callback, Update::ActorStrt< PSQ, RWQ, C, T, CSP >::contentSubscriptionProfile, OpenDDS::Federator::CreateEntity, OpenDDS::Federator::SubscriptionUpdate::datareader_qos, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::SubscriptionUpdate::domain, Update::ActorStrt< PSQ, RWQ, C, T, CSP >::domainId, Update::ActorStrt< PSQ, RWQ, C, T, CSP >::drdwQos, OpenDDS::Federator::SubscriptionUpdate::expression_params, OpenDDS::Federator::SubscriptionUpdate::filter_class_name, OpenDDS::Federator::SubscriptionUpdate::filter_expression, DDS::HANDLE_NIL, OpenDDS::Federator::SubscriptionUpdate::id, TAO_DDS_DCPSFederationId::id(), id(), OpenDDS::Federator::SubscriptionUpdate::participant, Update::ActorStrt< PSQ, RWQ, C, T, CSP >::participantId, Update::ActorStrt< PSQ, RWQ, C, T, CSP >::pubsubQos, OpenDDS::Federator::SubscriptionUpdate::sender, OpenDDS::Federator::SubscriptionUpdate::subscriber_qos, subscriptionWriter_, OpenDDS::Federator::SubscriptionUpdate::topic, Update::ActorStrt< PSQ, RWQ, C, T, CSP >::topicId, OpenDDS::Federator::SubscriptionUpdate::transport_info, and Update::ActorStrt< PSQ, RWQ, C, T, CSP >::transportInterfaceInfo.
00103 { 00104 if (CORBA::is_nil(this->subscriptionWriter_.in())) { 00105 // Decline to publish data until we can. 00106 return; 00107 } 00108 00109 SubscriptionUpdate sample; 00110 sample.sender = this->id().id(); 00111 sample.action = CreateEntity; 00112 00113 sample.domain = reader.domainId; 00114 sample.participant = reader.participantId; 00115 sample.topic = reader.topicId; 00116 sample.id = reader.actorId; 00117 sample.callback = reader.callback.c_str(); 00118 sample.datareader_qos = reader.drdwQos; 00119 sample.subscriber_qos = reader.pubsubQos; 00120 sample.transport_info = reader.transportInterfaceInfo; 00121 sample.filter_class_name = reader.contentSubscriptionProfile.filterClassName; 00122 sample.filter_expression = reader.contentSubscriptionProfile.filterExpr; 00123 sample.expression_params = reader.contentSubscriptionProfile.exprParams; 00124 00125 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 00126 OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant); 00127 OpenDDS::DCPS::RepoIdConverter sub_converter(sample.id); 00128 ACE_DEBUG((LM_DEBUG, 00129 ACE_TEXT("(%P|%t) Federator::ManagerImpl::create( SubscriptionUpdate): ") 00130 ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"), 00131 this->id().id(), 00132 sample.domain, 00133 std::string(part_converter).c_str(), 00134 std::string(sub_converter).c_str())); 00135 } 00136 00137 this->subscriptionWriter_->write(sample, DDS::HANDLE_NIL); 00138 }
void OpenDDS::Federator::ManagerImpl::create | ( | const Update::UParticipant & | participant | ) | [virtual] |
Implements Update::Updater.
Definition at line 72 of file FederatorManagerImpl_updates.cpp.
References OpenDDS::Federator::ParticipantUpdate::action, OpenDDS::Federator::CreateEntity, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::ParticipantUpdate::domain, Update::ParticipantStrt< Q >::domainId, DDS::HANDLE_NIL, OpenDDS::Federator::ParticipantUpdate::id, TAO_DDS_DCPSFederationId::id(), id(), Update::ParticipantStrt< Q >::owner, OpenDDS::Federator::ParticipantUpdate::owner, Update::ParticipantStrt< Q >::participantId, Update::ParticipantStrt< Q >::participantQos, participantWriter_, OpenDDS::Federator::ParticipantUpdate::qos, and OpenDDS::Federator::ParticipantUpdate::sender.
00073 { 00074 if (CORBA::is_nil(this->participantWriter_.in())) { 00075 // Decline to publish data until we can. 00076 return; 00077 } 00078 00079 ParticipantUpdate sample; 00080 sample.sender = this->id().id(); 00081 sample.action = CreateEntity; 00082 00083 sample.owner = participant.owner; 00084 sample.domain = participant.domainId; 00085 sample.id = participant.participantId; 00086 sample.qos = participant.participantQos; 00087 00088 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 00089 OpenDDS::DCPS::RepoIdConverter converter(sample.id); 00090 ACE_DEBUG((LM_DEBUG, 00091 ACE_TEXT("(%P|%t) Federator::ManagerImpl::create( ParticipantUpdate): ") 00092 ACE_TEXT("repo %d - [ domain %d/ participant %C ]\n"), 00093 this->id().id(), 00094 sample.domain, 00095 std::string(converter).c_str())); 00096 } 00097 00098 this->participantWriter_->write(sample, DDS::HANDLE_NIL); 00099 }
void OpenDDS::Federator::ManagerImpl::create | ( | const Update::UTopic & | topic | ) | [virtual] |
Implements Update::Updater.
Definition at line 38 of file FederatorManagerImpl_updates.cpp.
References OpenDDS::Federator::TopicUpdate::action, OpenDDS::Federator::CreateEntity, Update::TopicStrt< Q, S >::dataType, OpenDDS::Federator::TopicUpdate::datatype, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::TopicUpdate::domain, Update::TopicStrt< Q, S >::domainId, DDS::HANDLE_NIL, OpenDDS::Federator::TopicUpdate::id, TAO_DDS_DCPSFederationId::id(), id(), Update::TopicStrt< Q, S >::name, OpenDDS::Federator::TopicUpdate::participant, Update::TopicStrt< Q, S >::participantId, OpenDDS::Federator::TopicUpdate::qos, OpenDDS::Federator::TopicUpdate::sender, OpenDDS::Federator::TopicUpdate::topic, Update::TopicStrt< Q, S >::topicId, Update::TopicStrt< Q, S >::topicQos, and topicWriter_.
00039 { 00040 if (CORBA::is_nil(this->topicWriter_.in())) { 00041 // Decline to publish data until we can. 00042 return; 00043 } 00044 00045 TopicUpdate sample; 00046 sample.sender = this->id().id(); 00047 sample.action = CreateEntity; 00048 00049 sample.id = topic.topicId; 00050 sample.domain = topic.domainId; 00051 sample.participant = topic.participantId; 00052 sample.topic = topic.name.c_str(); 00053 sample.datatype = topic.dataType.c_str(); 00054 sample.qos = topic.topicQos; 00055 00056 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 00057 OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant); 00058 OpenDDS::DCPS::RepoIdConverter topic_converter(sample.id); 00059 ACE_DEBUG((LM_DEBUG, 00060 ACE_TEXT("(%P|%t) Federator::ManagerImpl::create( TopicUpdate): ") 00061 ACE_TEXT("repo %d - [ domain %d/ participant %C/ topic %C ]\n"), 00062 this->id().id(), 00063 sample.domain, 00064 std::string(part_converter).c_str(), 00065 std::string(topic_converter).c_str())); 00066 } 00067 00068 this->topicWriter_->write(sample, DDS::HANDLE_NIL); 00069 }
void OpenDDS::Federator::ManagerImpl::destroy | ( | const Update::IdPath & | id, | |
Update::ItemType | type, | |||
Update::ActorType | actor | |||
) | [virtual] |
Implements Update::Updater.
Definition at line 208 of file FederatorManagerImpl_updates.cpp.
References OpenDDS::Federator::SubscriptionUpdate::action, OpenDDS::Federator::PublicationUpdate::action, OpenDDS::Federator::ParticipantUpdate::action, OpenDDS::Federator::TopicUpdate::action, Update::Actor, Update::DataReader, Update::DataWriter, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::DestroyEntity, OpenDDS::Federator::SubscriptionUpdate::domain, OpenDDS::Federator::PublicationUpdate::domain, OpenDDS::Federator::ParticipantUpdate::domain, OpenDDS::Federator::TopicUpdate::domain, DDS::HANDLE_NIL, OpenDDS::Federator::SubscriptionUpdate::id, OpenDDS::Federator::PublicationUpdate::id, OpenDDS::Federator::ParticipantUpdate::id, OpenDDS::Federator::TopicUpdate::id, TAO_DDS_DCPSFederationId::id(), id(), OpenDDS::Federator::SubscriptionUpdate::participant, OpenDDS::Federator::PublicationUpdate::participant, Update::Participant, OpenDDS::Federator::TopicUpdate::participant, participantWriter_, publicationWriter_, OpenDDS::Federator::SubscriptionUpdate::sender, OpenDDS::Federator::PublicationUpdate::sender, OpenDDS::Federator::ParticipantUpdate::sender, OpenDDS::Federator::TopicUpdate::sender, subscriptionWriter_, Update::Topic, and topicWriter_.
00212 { 00213 // 00214 // Do not propagate any destroy() messages within the FederationDomain. 00215 // This domain will be managed separately. 00216 // 00217 if (id.domain == this->config_.federationDomain()) { 00218 return; 00219 } 00220 00221 switch (type) { 00222 case Update::Topic: { 00223 if (CORBA::is_nil(this->topicWriter_.in())) { 00224 // Decline to publish data until we can. 00225 return; 00226 } 00227 00228 TopicUpdate sample; 00229 sample.sender = this->id().id(); 00230 sample.action = DestroyEntity; 00231 00232 sample.id = id.id; 00233 sample.domain = id.domain; 00234 sample.participant = id.participant; 00235 00236 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 00237 OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant); 00238 OpenDDS::DCPS::RepoIdConverter topic_converter(sample.id); 00239 ACE_DEBUG((LM_DEBUG, 00240 ACE_TEXT("(%P|%t) Federator::ManagerImpl::destroy( TopicUpdate): ") 00241 ACE_TEXT("repo %d - [ domain %d/ participant %C/ topic %C ]\n"), 00242 this->id().id(), 00243 sample.domain, 00244 std::string(part_converter).c_str(), 00245 std::string(topic_converter).c_str())); 00246 } 00247 00248 this->topicWriter_->write(sample, DDS::HANDLE_NIL); 00249 } 00250 break; 00251 00252 case Update::Participant: { 00253 if (CORBA::is_nil(this->participantWriter_.in())) { 00254 // Decline to publish data until we can. 00255 return; 00256 } 00257 00258 ParticipantUpdate sample; 00259 sample.sender = this->id().id(); 00260 sample.action = DestroyEntity; 00261 00262 sample.domain = id.domain; 00263 sample.id = id.id; 00264 00265 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 00266 OpenDDS::DCPS::RepoIdConverter converter(sample.id); 00267 ACE_DEBUG((LM_DEBUG, 00268 ACE_TEXT("(%P|%t) Federator::ManagerImpl::destroy( ParticipantUpdate): ") 00269 ACE_TEXT("repo %d - [ domain %d/ participant %C ]\n"), 00270 this->id().id(), 00271 sample.domain, 00272 std::string(converter).c_str())); 00273 } 00274 00275 this->participantWriter_->write(sample, DDS::HANDLE_NIL); 00276 } 00277 break; 00278 00279 case Update::Actor: 00280 00281 // This is VERY annoying. 00282 switch (actor) { 00283 case Update::DataWriter: { 00284 if (CORBA::is_nil(this->publicationWriter_.in())) { 00285 // Decline to publish data until we can. 00286 return; 00287 } 00288 00289 PublicationUpdate sample; 00290 sample.sender = this->id().id(); 00291 sample.action = DestroyEntity; 00292 00293 sample.domain = id.domain; 00294 sample.participant = id.participant; 00295 sample.id = id.id; 00296 00297 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 00298 OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant); 00299 OpenDDS::DCPS::RepoIdConverter pub_converter(sample.id); 00300 ACE_DEBUG((LM_DEBUG, 00301 ACE_TEXT("(%P|%t) Federator::ManagerImpl::destroy( PublicationUpdate): ") 00302 ACE_TEXT("repo %d - [ domain %d/ participant %C/ publication %C ]\n"), 00303 this->id().id(), 00304 sample.domain, 00305 std::string(part_converter).c_str(), 00306 std::string(pub_converter).c_str())); 00307 } 00308 00309 this->publicationWriter_->write(sample, DDS::HANDLE_NIL); 00310 } 00311 break; 00312 00313 case Update::DataReader: { 00314 if (CORBA::is_nil(this->subscriptionWriter_.in())) { 00315 // Decline to publish data until we can. 00316 return; 00317 } 00318 00319 SubscriptionUpdate sample; 00320 sample.sender = this->id().id(); 00321 sample.action = DestroyEntity; 00322 00323 sample.domain = id.domain; 00324 sample.participant = id.participant; 00325 sample.id = id.id; 00326 00327 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 00328 OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant); 00329 OpenDDS::DCPS::RepoIdConverter sub_converter(sample.id); 00330 ACE_DEBUG((LM_DEBUG, 00331 ACE_TEXT("(%P|%t) Federator::ManagerImpl::destroy( SubscriptionUpdate): ") 00332 ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"), 00333 this->id().id(), 00334 sample.domain, 00335 std::string(part_converter).c_str(), 00336 std::string(sub_converter).c_str())); 00337 } 00338 00339 this->subscriptionWriter_->write(sample, DDS::HANDLE_NIL); 00340 } 00341 break; 00342 } 00343 00344 break; 00345 } 00346 }
CORBA::Boolean OpenDDS::Federator::ManagerImpl::discover_federation | ( | const char * | ior | ) | [virtual] |
Definition at line 902 of file FederatorManagerImpl.cpp.
References OpenDDS::DCPS::DCPS_debug_level.
00903 { 00904 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 00905 ACE_DEBUG((LM_DEBUG, 00906 ACE_TEXT("(%P|%t) ManagerImpl::discover_federation( %C)\n"), 00907 ior)); 00908 } 00909 00910 ///@TODO: Implement this. 00911 return false; 00912 }
RepoKey OpenDDS::Federator::ManagerImpl::federation_id | ( | ) | [virtual] |
Definition at line 865 of file FederatorManagerImpl.cpp.
References OpenDDS::DCPS::DCPS_debug_level, TAO_DDS_DCPSFederationId::id(), and id().
00866 { 00867 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 00868 ACE_DEBUG((LM_DEBUG, 00869 ACE_TEXT("(%P|%t) ManagerImpl::federation_id()\n"))); 00870 } 00871 00872 return this->id().id(); 00873 }
void OpenDDS::Federator::ManagerImpl::finalize | ( | ) |
Release resources gracefully.
Definition at line 787 of file FederatorManagerImpl.cpp.
References OpenDDS::DCPS::DCPS_debug_level, federated_, orb_, ownerListener_, participantListener_, peers_, publicationListener_, DDS::RETCODE_PRECONDITION_NOT_MET, subscriptionListener_, TheParticipantFactory, and topicListener_.
Referenced by InfoRepo::finalize(), and InfoRepo::handle_exception().
00788 { 00789 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 00790 ACE_DEBUG((LM_DEBUG, 00791 ACE_TEXT("(%P|%t) Federator::ManagerImpl::finalize()\n"))); 00792 } 00793 00794 ownerListener_.stop(); 00795 topicListener_.stop(); 00796 participantListener_.stop(); 00797 publicationListener_.stop(); 00798 subscriptionListener_.stop(); 00799 ownerListener_.join(); 00800 topicListener_.join(); 00801 participantListener_.join(); 00802 publicationListener_.join(); 00803 subscriptionListener_.join(); 00804 00805 if (this->federated_) { 00806 try { 00807 IdToManagerMap::iterator where = this->peers_.find(this->joinRepo_); 00808 00809 if (where == this->peers_.end()) { 00810 ACE_DEBUG((LM_DEBUG, 00811 ACE_TEXT("(%P|%t) Federator::Manager::finalize: ") 00812 ACE_TEXT("repository %d - all attachment to federation left.\n"), 00813 this->id().id())); 00814 00815 } else { 00816 if (CORBA::is_nil(where->second.in())) { 00817 ACE_ERROR((LM_ERROR, 00818 ACE_TEXT("(%P|%t) ERROR: Federator::Manager::finalize: ") 00819 ACE_TEXT("repository %d not currently attached to a federation.\n"), 00820 this->id().id())); 00821 00822 } else { 00823 where->second->leave_federation(this->id().id()); 00824 this->federated_ = false; 00825 } 00826 } 00827 00828 } catch (const CORBA::Exception& ex) { 00829 ex._tao_print_exception( 00830 ACE_TEXT("ERROR: Federator::ManagerImpl::finalize() - ") 00831 ACE_TEXT("unable to leave remote federation ")); 00832 throw Incomplete(); 00833 } 00834 } 00835 00836 if (!CORBA::is_nil(this->orb_.in()) && (0 != this->orb_->orb_core())) { 00837 this->orb_->orb_core()->reactor()->remove_handler( 00838 &this->multicastResponder_, 00839 ACE_Event_Handler::READ_MASK | ACE_Event_Handler::DONT_CALL); 00840 } 00841 00842 // Remove our local participant and contained entities. 00843 if (0 == CORBA::is_nil(this->federationParticipant_.in())) { 00844 DDS::DomainParticipantFactory_var dpf = TheParticipantFactory; 00845 if (DDS::RETCODE_PRECONDITION_NOT_MET 00846 == this->federationParticipant_->delete_contained_entities()) { 00847 ACE_ERROR((LM_ERROR, 00848 ACE_TEXT("(%P|%t) ERROR: Federator::Manager ") 00849 ACE_TEXT("unable to release resources for repository %d.\n"), 00850 this->id().id())); 00851 00852 } else if (DDS::RETCODE_PRECONDITION_NOT_MET 00853 == dpf->delete_participant(this->federationParticipant_.in())) { 00854 ACE_ERROR((LM_ERROR, 00855 ACE_TEXT("(%P|%t) ERROR: Federator::Manager ") 00856 ACE_TEXT("unable to release the participant for repository %d.\n"), 00857 this->id().id())); 00858 } 00859 } 00860 }
ACE_INLINE const TAO_DDS_DCPSFederationId & OpenDDS::Federator::ManagerImpl::id | ( | ) | const |
Accessors for the federation Id value.
Definition at line 19 of file FederatorManagerImpl.inl.
References config_, and OpenDDS::Federator::Config::federationId().
Referenced by create(), destroy(), federation_id(), InfoRepo::init(), pushState(), and update().
00020 { 00021 return this->config_.federationId(); 00022 }
ACE_INLINE TAO_DDS_DCPSInfo_i * OpenDDS::Federator::ManagerImpl::info | ( | ) | const |
Definition at line 33 of file FederatorManagerImpl.inl.
References info_.
00034 { 00035 return this->info_; 00036 }
ACE_INLINE TAO_DDS_DCPSInfo_i *& OpenDDS::Federator::ManagerImpl::info | ( | ) |
Accessors for the DCPSInfo reference.
Definition at line 26 of file FederatorManagerImpl.inl.
References info_.
Referenced by InfoRepo::init().
00027 { 00028 return this->info_; 00029 }
void OpenDDS::Federator::ManagerImpl::initialize | ( | ) |
Establish the update publications and subscriptions.
Definition at line 76 of file FederatorManagerImpl.cpp.
References config_, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::TransportRegistry::DEFAULT_INST_PREFIX, OpenDDS::DCPS::DEFAULT_STATUS_MASK, OpenDDS::Federator::Defaults::DiscoveryRequestPort, DDS::DataWriterQos::durability, DDS::DataReaderQos::durability, OpenDDS::Federator::Config::federationDomain(), federationParticipant_, OpenDDS::DCPS::DataWriterImpl::get_publication_id(), OpenDDS::DCPS::DataReaderImpl::get_subscription_id(), DDS::DataWriterQos::history, DDS::DataReaderQos::history, OpenDDS::DCPS::RcHandle< T >::in(), OpenDDS::DCPS::TransportRegistry::instance(), DDS::KEEP_LAST_HISTORY_QOS, orb_, ownerListener_, OpenDDS::Federator::OWNERUPDATETOPICNAME, OpenDDS::Federator::OWNERUPDATETYPENAME, ownerWriter_, PARTICIPANT_QOS_DEFAULT, participantListener_, OpenDDS::Federator::PARTICIPANTUPDATETOPICNAME, OpenDDS::Federator::PARTICIPANTUPDATETYPENAME, participantWriter_, publicationListener_, OpenDDS::Federator::PUBLICATIONUPDATETOPICNAME, OpenDDS::Federator::PUBLICATIONUPDATETYPENAME, publicationWriter_, PUBLISHER_QOS_DEFAULT, DDS::DataWriterQos::reliability, DDS::DataReaderQos::reliability, DDS::RELIABLE_RELIABILITY_QOS, DDS::RETCODE_OK, SUBSCRIBER_QOS_DEFAULT, subscriptionListener_, OpenDDS::Federator::SUBSCRIPTIONUPDATETOPICNAME, OpenDDS::Federator::SUBSCRIPTIONUPDATETYPENAME, subscriptionWriter_, TheParticipantFactory, TOPIC_QOS_DEFAULT, topicListener_, OpenDDS::Federator::TOPICUPDATETOPICNAME, OpenDDS::Federator::TOPICUPDATETYPENAME, topicWriter_, and DDS::TRANSIENT_LOCAL_DURABILITY_QOS.
Referenced by join_federation().
00077 { 00078 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 00079 ACE_DEBUG((LM_DEBUG, 00080 ACE_TEXT("(%P|%t) Federation::ManagerImpl::initialize()\n"))); 00081 } 00082 00083 // Let the listeners know which repository we are to filter samples at 00084 // the earliest opportunity. 00085 this->ownerListener_.federationId(this->id()); 00086 this->topicListener_.federationId(this->id()); 00087 this->participantListener_.federationId(this->id()); 00088 this->publicationListener_.federationId(this->id()); 00089 this->subscriptionListener_.federationId(this->id()); 00090 00091 // Add participant for Federation domain 00092 DDS::DomainParticipantFactory_var dpf = TheParticipantFactory; 00093 this->federationParticipant_ 00094 = dpf->create_participant( 00095 this->config_.federationDomain(), 00096 PARTICIPANT_QOS_DEFAULT, 00097 DDS::DomainParticipantListener::_nil(), 00098 OpenDDS::DCPS::DEFAULT_STATUS_MASK); 00099 00100 if (CORBA::is_nil(this->federationParticipant_.in())) { 00101 ACE_ERROR((LM_ERROR, 00102 ACE_TEXT("(%P|%t) ERROR: create_participant failed for ") 00103 ACE_TEXT("repository %d in federation domain %d.\n"), 00104 this->id().id(), 00105 this->config_.federationDomain())); 00106 throw Incomplete(); 00107 } 00108 // 00109 // Add type support for update topics 00110 // 00111 00112 OwnerUpdateTypeSupportImpl* ownerUpdate = new OwnerUpdateTypeSupportImpl(); 00113 00114 if (DDS::RETCODE_OK != ownerUpdate->register_type( 00115 this->federationParticipant_.in(), 00116 OWNERUPDATETYPENAME)) { 00117 ACE_ERROR((LM_ERROR, 00118 ACE_TEXT("(%P|%t) ERROR: Unable to install ") 00119 ACE_TEXT("OwnerUpdate type support for repository %d.\n"), 00120 this->id().id())); 00121 throw Incomplete(); 00122 } 00123 00124 ParticipantUpdateTypeSupportImpl* participantUpdate = new ParticipantUpdateTypeSupportImpl(); 00125 00126 if (DDS::RETCODE_OK != participantUpdate->register_type( 00127 this->federationParticipant_.in(), 00128 PARTICIPANTUPDATETYPENAME)) { 00129 ACE_ERROR((LM_ERROR, 00130 ACE_TEXT("(%P|%t) ERROR: Unable to install ") 00131 ACE_TEXT("ParticipantUpdate type support for repository %d.\n"), 00132 this->id().id())); 00133 throw Incomplete(); 00134 } 00135 00136 TopicUpdateTypeSupportImpl* topicUpdate = new TopicUpdateTypeSupportImpl(); 00137 00138 if (DDS::RETCODE_OK != topicUpdate->register_type( 00139 this->federationParticipant_.in(), 00140 TOPICUPDATETYPENAME)) { 00141 ACE_ERROR((LM_ERROR, 00142 ACE_TEXT("(%P|%t) ERROR: Unable to install ") 00143 ACE_TEXT("TopicUpdate type support for repository %d.\n"), 00144 this->id().id())); 00145 throw Incomplete(); 00146 } 00147 00148 PublicationUpdateTypeSupportImpl* publicationUpdate = new PublicationUpdateTypeSupportImpl(); 00149 00150 if (DDS::RETCODE_OK != publicationUpdate->register_type( 00151 this->federationParticipant_.in(), 00152 PUBLICATIONUPDATETYPENAME)) { 00153 ACE_ERROR((LM_ERROR, 00154 ACE_TEXT("(%P|%t) ERROR: Unable to install ") 00155 ACE_TEXT("PublicationUpdate type support for repository %d.\n"), 00156 this->id().id())); 00157 throw Incomplete(); 00158 } 00159 00160 SubscriptionUpdateTypeSupportImpl* subscriptionUpdate = new SubscriptionUpdateTypeSupportImpl(); 00161 00162 if (DDS::RETCODE_OK != subscriptionUpdate->register_type( 00163 this->federationParticipant_.in(), 00164 SUBSCRIPTIONUPDATETYPENAME)) { 00165 ACE_ERROR((LM_ERROR, 00166 ACE_TEXT("(%P|%t) ERROR: Unable to install ") 00167 ACE_TEXT("SubscriptionUpdate type support for repository %d.\n"), 00168 this->id().id())); 00169 throw Incomplete(); 00170 } 00171 00172 // 00173 // Create a transport config for use with federation entities. 00174 // 00175 std::string config_name = 00176 OpenDDS::DCPS::TransportRegistry::DEFAULT_INST_PREFIX 00177 + std::string("FederationBITTransportConfig"); 00178 OpenDDS::DCPS::TransportConfig_rch config = 00179 OpenDDS::DCPS::TransportRegistry::instance()->create_config(config_name); 00180 00181 std::string inst_name = OpenDDS::DCPS::TransportRegistry::DEFAULT_INST_PREFIX 00182 + std::string("FederationBITTCPTransportInst"); 00183 OpenDDS::DCPS::TransportInst_rch inst = 00184 OpenDDS::DCPS::TransportRegistry::instance()->create_inst(inst_name, 00185 "tcp"); 00186 config->instances_.push_back(inst); 00187 00188 // 00189 // Create the subscriber for the update topics. 00190 // 00191 00192 DDS::Subscriber_var subscriber 00193 = this->federationParticipant_->create_subscriber( 00194 SUBSCRIBER_QOS_DEFAULT, 00195 DDS::SubscriberListener::_nil(), 00196 OpenDDS::DCPS::DEFAULT_STATUS_MASK); 00197 00198 if (CORBA::is_nil(subscriber.in())) { 00199 ACE_ERROR((LM_ERROR, 00200 ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ") 00201 ACE_TEXT("failed to create subscriber for repository %d\n"), 00202 this->id().id())); 00203 throw Incomplete(); 00204 00205 } else if (OpenDDS::DCPS::DCPS_debug_level > 4) { 00206 ACE_DEBUG((LM_DEBUG, 00207 ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ") 00208 ACE_TEXT("created federation subscriber for repository %d\n"), 00209 this->id().id())); 00210 00211 } 00212 00213 // Attach the transport to it. 00214 00215 try { 00216 OpenDDS::DCPS::TransportRegistry::instance()->bind_config(config, 00217 subscriber.in()); 00218 } catch (const OpenDDS::DCPS::Transport::Exception&) { 00219 ACE_ERROR((LM_ERROR, 00220 ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ") 00221 ACE_TEXT("failed to bind transport config to federation subscriber.\n"))); 00222 throw Incomplete(); 00223 } 00224 00225 // 00226 // Create the publisher for the update topics. 00227 // 00228 00229 DDS::Publisher_var publisher 00230 = this->federationParticipant_->create_publisher( 00231 PUBLISHER_QOS_DEFAULT, 00232 DDS::PublisherListener::_nil(), 00233 OpenDDS::DCPS::DEFAULT_STATUS_MASK); 00234 00235 if (CORBA::is_nil(publisher.in())) { 00236 ACE_ERROR((LM_ERROR, 00237 ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ") 00238 ACE_TEXT("failed to create publisher for repository %d\n"), 00239 this->id().id())); 00240 throw Incomplete(); 00241 00242 } else if (OpenDDS::DCPS::DCPS_debug_level > 4) { 00243 ACE_DEBUG((LM_DEBUG, 00244 ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ") 00245 ACE_TEXT("created federation publisher for repository %d\n"), 00246 this->id().id())); 00247 00248 } 00249 00250 // Attach the transport to it. 00251 00252 try { 00253 OpenDDS::DCPS::TransportRegistry::instance()->bind_config(config, 00254 publisher.in()); 00255 } catch (const OpenDDS::DCPS::Transport::Exception&) { 00256 ACE_ERROR((LM_ERROR, 00257 ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ") 00258 ACE_TEXT("failed to bind transport config to federation publisher.\n"))); 00259 throw Incomplete(); 00260 } 00261 00262 // 00263 // Some useful items for adding the subscriptions. 00264 // 00265 DDS::Topic_var topic; 00266 DDS::TopicDescription_var description; 00267 DDS::DataReader_var dataReader; 00268 DDS::DataWriter_var dataWriter; 00269 00270 DDS::DataReaderQos readerQos; 00271 subscriber->get_default_datareader_qos(readerQos); 00272 readerQos.durability.kind = DDS::TRANSIENT_LOCAL_DURABILITY_QOS; 00273 readerQos.history.kind = DDS::KEEP_LAST_HISTORY_QOS; 00274 readerQos.history.depth = 50; 00275 readerQos.reliability.kind = DDS::RELIABLE_RELIABILITY_QOS; 00276 readerQos.reliability.max_blocking_time.sec = 0; 00277 readerQos.reliability.max_blocking_time.nanosec = 0; 00278 00279 DDS::DataWriterQos writerQos; 00280 publisher->get_default_datawriter_qos(writerQos); 00281 writerQos.durability.kind = DDS::TRANSIENT_LOCAL_DURABILITY_QOS; 00282 writerQos.history.kind = DDS::KEEP_LAST_HISTORY_QOS; 00283 writerQos.history.depth = 50; 00284 writerQos.reliability.kind = DDS::RELIABLE_RELIABILITY_QOS; 00285 writerQos.reliability.max_blocking_time.sec = 0; 00286 writerQos.reliability.max_blocking_time.nanosec = 0; 00287 00288 // 00289 // Add update subscriptions 00290 // 00291 // NOTE: Its ok to lose the references to the objects here since they 00292 // are not needed after this point. The only thing we will do 00293 // with them is to destroy them, and that will be done via a 00294 // cascade delete from the participant. The listeners will 00295 // survive and can be used within other participants as well, 00296 // since the only state they retain is the manager, which is the 00297 // same for all. 00298 // 00299 00300 topic = this->federationParticipant_->create_topic( 00301 OWNERUPDATETOPICNAME, 00302 OWNERUPDATETYPENAME, 00303 TOPIC_QOS_DEFAULT, 00304 DDS::TopicListener::_nil(), 00305 OpenDDS::DCPS::DEFAULT_STATUS_MASK); 00306 00307 dataWriter = publisher->create_datawriter( 00308 topic.in(), 00309 writerQos, 00310 DDS::DataWriterListener::_nil(), 00311 OpenDDS::DCPS::DEFAULT_STATUS_MASK); 00312 00313 if (CORBA::is_nil(dataWriter.in())) { 00314 ACE_ERROR((LM_ERROR, 00315 ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ") 00316 ACE_TEXT("failed to create OwnerUpdate writer for repository %d\n"), 00317 this->id().id())); 00318 throw Incomplete(); 00319 } 00320 00321 this->ownerWriter_ = OwnerUpdateDataWriter::_narrow(dataWriter.in()); 00322 00323 if (::CORBA::is_nil(this->ownerWriter_.in())) { 00324 ACE_ERROR((LM_ERROR, 00325 ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ") 00326 ACE_TEXT("failed to extract typed OwnerUpdate writer.\n"))); 00327 throw Incomplete(); 00328 00329 } else if (OpenDDS::DCPS::DCPS_debug_level > 4) { 00330 OpenDDS::DCPS::DataWriterImpl* servant 00331 = dynamic_cast<OpenDDS::DCPS::DataWriterImpl*>(dataWriter.in()); 00332 00333 if (0 == servant) { 00334 ACE_DEBUG((LM_WARNING, 00335 ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ") 00336 ACE_TEXT("unable to extract typed OwnerUpdate writer.\n"))); 00337 00338 } else { 00339 OpenDDS::DCPS::RepoIdConverter converter(servant->get_publication_id()); 00340 ACE_DEBUG((LM_DEBUG, 00341 ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ") 00342 ACE_TEXT("created federation OwnerUpdate writer %C for repository %d\n"), 00343 std::string(converter).c_str(), 00344 this->id().id())); 00345 } 00346 } 00347 00348 description = this->federationParticipant_->lookup_topicdescription(OWNERUPDATETOPICNAME); 00349 dataReader = subscriber->create_datareader( 00350 description.in(), 00351 readerQos, 00352 &this->ownerListener_, 00353 OpenDDS::DCPS::DEFAULT_STATUS_MASK); 00354 00355 if (CORBA::is_nil(dataReader.in())) { 00356 ACE_ERROR((LM_ERROR, 00357 ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ") 00358 ACE_TEXT("failed to create OwnerUpdate reader for repository %d\n"), 00359 this->id().id())); 00360 throw Incomplete(); 00361 00362 } else if (OpenDDS::DCPS::DCPS_debug_level > 4) { 00363 OpenDDS::DCPS::DataReaderImpl* servant 00364 = dynamic_cast<OpenDDS::DCPS::DataReaderImpl*>(dataReader.in()); 00365 00366 if (0 == servant) { 00367 ACE_DEBUG((LM_WARNING, 00368 ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ") 00369 ACE_TEXT("unable to extract typed OwnerUpdate reader.\n"))); 00370 00371 } else { 00372 OpenDDS::DCPS::RepoIdConverter converter(servant->get_subscription_id()); 00373 ACE_DEBUG((LM_DEBUG, 00374 ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ") 00375 ACE_TEXT("created federation OwnerUpdate reader %C for repository %d\n"), 00376 std::string(converter).c_str(), 00377 this->id().id())); 00378 } 00379 } 00380 00381 topic = this->federationParticipant_->create_topic( 00382 TOPICUPDATETOPICNAME, 00383 TOPICUPDATETYPENAME, 00384 TOPIC_QOS_DEFAULT, 00385 DDS::TopicListener::_nil(), 00386 OpenDDS::DCPS::DEFAULT_STATUS_MASK); 00387 dataWriter = publisher->create_datawriter( 00388 topic.in(), 00389 writerQos, 00390 DDS::DataWriterListener::_nil(), 00391 OpenDDS::DCPS::DEFAULT_STATUS_MASK); 00392 00393 if (CORBA::is_nil(dataWriter.in())) { 00394 ACE_ERROR((LM_ERROR, 00395 ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ") 00396 ACE_TEXT("failed to create TopicUpdate writer for repository %d\n"), 00397 this->id().id())); 00398 throw Incomplete(); 00399 } 00400 00401 this->topicWriter_ 00402 = TopicUpdateDataWriter::_narrow(dataWriter.in()); 00403 00404 if (::CORBA::is_nil(this->topicWriter_.in())) { 00405 ACE_ERROR((LM_ERROR, 00406 ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ") 00407 ACE_TEXT("failed to extract typed TopicUpdate writer.\n"))); 00408 throw Incomplete(); 00409 00410 } else if (OpenDDS::DCPS::DCPS_debug_level > 4) { 00411 OpenDDS::DCPS::DataWriterImpl* servant 00412 = dynamic_cast<OpenDDS::DCPS::DataWriterImpl*>(dataWriter.in()); 00413 00414 if (0 == servant) { 00415 ACE_DEBUG((LM_WARNING, 00416 ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ") 00417 ACE_TEXT("unable to extract typed TopicUpdate writer.\n"))); 00418 00419 } else { 00420 OpenDDS::DCPS::RepoIdConverter converter(servant->get_publication_id()); 00421 ACE_DEBUG((LM_DEBUG, 00422 ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ") 00423 ACE_TEXT("created federation TopicUpdate writer %C for repository %d\n"), 00424 std::string(converter).c_str(), 00425 this->id().id())); 00426 } 00427 } 00428 00429 description = this->federationParticipant_->lookup_topicdescription(TOPICUPDATETOPICNAME); 00430 dataReader = subscriber->create_datareader( 00431 description.in(), 00432 readerQos, 00433 &this->topicListener_, 00434 OpenDDS::DCPS::DEFAULT_STATUS_MASK); 00435 00436 if (CORBA::is_nil(dataReader.in())) { 00437 ACE_ERROR((LM_ERROR, 00438 ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ") 00439 ACE_TEXT("failed to create TopicUpdate reader for repository %d\n"), 00440 this->id().id())); 00441 throw Incomplete(); 00442 00443 } else if (OpenDDS::DCPS::DCPS_debug_level > 4) { 00444 OpenDDS::DCPS::DataReaderImpl* servant 00445 = dynamic_cast<OpenDDS::DCPS::DataReaderImpl*>(dataReader.in()); 00446 00447 if (0 == servant) { 00448 ACE_DEBUG((LM_WARNING, 00449 ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ") 00450 ACE_TEXT("unable to extract typed TopicUpdate reader.\n"))); 00451 00452 } else { 00453 OpenDDS::DCPS::RepoIdConverter converter(servant->get_subscription_id()); 00454 ACE_DEBUG((LM_DEBUG, 00455 ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ") 00456 ACE_TEXT("created federation TopicUpdate reader %C for repository %d\n"), 00457 std::string(converter).c_str(), 00458 this->id().id())); 00459 } 00460 } 00461 00462 topic = this->federationParticipant_->create_topic( 00463 PARTICIPANTUPDATETOPICNAME, 00464 PARTICIPANTUPDATETYPENAME, 00465 TOPIC_QOS_DEFAULT, 00466 DDS::TopicListener::_nil(), 00467 OpenDDS::DCPS::DEFAULT_STATUS_MASK); 00468 dataWriter = publisher->create_datawriter( 00469 topic.in(), 00470 writerQos, 00471 DDS::DataWriterListener::_nil(), 00472 OpenDDS::DCPS::DEFAULT_STATUS_MASK); 00473 00474 if (CORBA::is_nil(dataWriter.in())) { 00475 ACE_ERROR((LM_ERROR, 00476 ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ") 00477 ACE_TEXT("failed to create ParticipantUpdate writer for repository %d\n"), 00478 this->id().id())); 00479 throw Incomplete(); 00480 } 00481 00482 this->participantWriter_ 00483 = ParticipantUpdateDataWriter::_narrow(dataWriter.in()); 00484 00485 if (::CORBA::is_nil(this->participantWriter_.in())) { 00486 ACE_ERROR((LM_ERROR, 00487 ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ") 00488 ACE_TEXT("failed to extract typed ParticipantUpdate writer.\n"))); 00489 throw Incomplete(); 00490 00491 } else if (OpenDDS::DCPS::DCPS_debug_level > 4) { 00492 OpenDDS::DCPS::DataWriterImpl* servant 00493 = dynamic_cast<OpenDDS::DCPS::DataWriterImpl*>(dataWriter.in()); 00494 00495 if (0 == servant) { 00496 ACE_DEBUG((LM_WARNING, 00497 ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ") 00498 ACE_TEXT("unable to extract typed ParticipantUpdate writer.\n"))); 00499 00500 } else { 00501 OpenDDS::DCPS::RepoIdConverter converter(servant->get_publication_id()); 00502 ACE_DEBUG((LM_DEBUG, 00503 ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ") 00504 ACE_TEXT("created federation ParticipantUpdate writer %C for repository %d\n"), 00505 std::string(converter).c_str(), 00506 this->id().id())); 00507 } 00508 } 00509 00510 description = this->federationParticipant_->lookup_topicdescription(PARTICIPANTUPDATETOPICNAME); 00511 dataReader = subscriber->create_datareader( 00512 description.in(), 00513 readerQos, 00514 &this->participantListener_, 00515 OpenDDS::DCPS::DEFAULT_STATUS_MASK); 00516 00517 if (CORBA::is_nil(dataReader.in())) { 00518 ACE_ERROR((LM_ERROR, 00519 ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ") 00520 ACE_TEXT("failed to create ParticipantUpdate reader for repository %d\n"), 00521 this->id().id())); 00522 throw Incomplete(); 00523 00524 } else if (OpenDDS::DCPS::DCPS_debug_level > 4) { 00525 OpenDDS::DCPS::DataReaderImpl* servant 00526 = dynamic_cast<OpenDDS::DCPS::DataReaderImpl*>(dataReader.in()); 00527 00528 if (0 == servant) { 00529 ACE_DEBUG((LM_WARNING, 00530 ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ") 00531 ACE_TEXT("unable to extract typed ParticipantUpdate reader.\n"))); 00532 00533 } else { 00534 OpenDDS::DCPS::RepoIdConverter converter(servant->get_subscription_id()); 00535 ACE_DEBUG((LM_DEBUG, 00536 ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ") 00537 ACE_TEXT("created federation ParticipantUpdate reader %C for repository %d\n"), 00538 std::string(converter).c_str(), 00539 this->id().id())); 00540 } 00541 } 00542 00543 topic = this->federationParticipant_->create_topic( 00544 PUBLICATIONUPDATETOPICNAME, 00545 PUBLICATIONUPDATETYPENAME, 00546 TOPIC_QOS_DEFAULT, 00547 DDS::TopicListener::_nil(), 00548 OpenDDS::DCPS::DEFAULT_STATUS_MASK); 00549 dataWriter = publisher->create_datawriter( 00550 topic.in(), 00551 writerQos, 00552 DDS::DataWriterListener::_nil(), 00553 OpenDDS::DCPS::DEFAULT_STATUS_MASK); 00554 00555 if (CORBA::is_nil(dataWriter.in())) { 00556 ACE_ERROR((LM_ERROR, 00557 ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ") 00558 ACE_TEXT("failed to create PublicationUpdate writer for repository %d\n"), 00559 this->id().id())); 00560 throw Incomplete(); 00561 } 00562 00563 this->publicationWriter_ 00564 = PublicationUpdateDataWriter::_narrow(dataWriter.in()); 00565 00566 if (::CORBA::is_nil(this->publicationWriter_.in())) { 00567 ACE_ERROR((LM_ERROR, 00568 ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ") 00569 ACE_TEXT("failed to extract typed PublicationUpdate writer.\n"))); 00570 throw Incomplete(); 00571 00572 } else if (OpenDDS::DCPS::DCPS_debug_level > 4) { 00573 OpenDDS::DCPS::DataWriterImpl* servant 00574 = dynamic_cast<OpenDDS::DCPS::DataWriterImpl*>(dataWriter.in()); 00575 00576 if (0 == servant) { 00577 ACE_DEBUG((LM_WARNING, 00578 ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ") 00579 ACE_TEXT("unable to extract typed PublicationUpdate writer.\n"))); 00580 00581 } else { 00582 OpenDDS::DCPS::RepoIdConverter converter(servant->get_publication_id()); 00583 ACE_DEBUG((LM_DEBUG, 00584 ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ") 00585 ACE_TEXT("created federation PublicationUpdate writer %C for repository %d\n"), 00586 std::string(converter).c_str(), 00587 this->id().id())); 00588 } 00589 } 00590 00591 description = this->federationParticipant_->lookup_topicdescription(PUBLICATIONUPDATETOPICNAME); 00592 dataReader = subscriber->create_datareader( 00593 description.in(), 00594 readerQos, 00595 &this->publicationListener_, 00596 OpenDDS::DCPS::DEFAULT_STATUS_MASK); 00597 00598 if (CORBA::is_nil(dataReader.in())) { 00599 ACE_ERROR((LM_ERROR, 00600 ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ") 00601 ACE_TEXT("failed to create PublicationUpdate reader for repository %d\n"), 00602 this->id().id())); 00603 throw Incomplete(); 00604 00605 } else if (OpenDDS::DCPS::DCPS_debug_level > 4) { 00606 OpenDDS::DCPS::DataReaderImpl* servant 00607 = dynamic_cast<OpenDDS::DCPS::DataReaderImpl*>(dataReader.in()); 00608 00609 if (0 == servant) { 00610 ACE_DEBUG((LM_WARNING, 00611 ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ") 00612 ACE_TEXT("unable to extract typed PublicationUpdate reader.\n"))); 00613 00614 } else { 00615 OpenDDS::DCPS::RepoIdConverter converter(servant->get_subscription_id()); 00616 ACE_DEBUG((LM_DEBUG, 00617 ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ") 00618 ACE_TEXT("created federation PublicationUpdate reader %C for repository %d\n"), 00619 std::string(converter).c_str(), 00620 this->id().id())); 00621 } 00622 } 00623 00624 topic = this->federationParticipant_->create_topic( 00625 SUBSCRIPTIONUPDATETOPICNAME, 00626 SUBSCRIPTIONUPDATETYPENAME, 00627 TOPIC_QOS_DEFAULT, 00628 DDS::TopicListener::_nil(), 00629 OpenDDS::DCPS::DEFAULT_STATUS_MASK); 00630 dataWriter = publisher->create_datawriter( 00631 topic.in(), 00632 writerQos, 00633 DDS::DataWriterListener::_nil(), 00634 OpenDDS::DCPS::DEFAULT_STATUS_MASK); 00635 00636 if (CORBA::is_nil(dataWriter.in())) { 00637 ACE_ERROR((LM_ERROR, 00638 ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ") 00639 ACE_TEXT("failed to create SubscriptionUpdate writer for repository %d\n"), 00640 this->id().id())); 00641 throw Incomplete(); 00642 } 00643 00644 this->subscriptionWriter_ 00645 = SubscriptionUpdateDataWriter::_narrow(dataWriter.in()); 00646 00647 if (::CORBA::is_nil(this->subscriptionWriter_.in())) { 00648 ACE_ERROR((LM_ERROR, 00649 ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ") 00650 ACE_TEXT("failed to extract typed SubscriptionUpdate writer.\n"))); 00651 throw Incomplete(); 00652 00653 } else if (OpenDDS::DCPS::DCPS_debug_level > 4) { 00654 OpenDDS::DCPS::DataWriterImpl* servant 00655 = dynamic_cast<OpenDDS::DCPS::DataWriterImpl*>(dataWriter.in()); 00656 00657 if (0 == servant) { 00658 ACE_DEBUG((LM_WARNING, 00659 ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ") 00660 ACE_TEXT("unable to extract typed SubscriptionUpdate writer.\n"))); 00661 00662 } else { 00663 OpenDDS::DCPS::RepoIdConverter converter(servant->get_publication_id()); 00664 ACE_DEBUG((LM_DEBUG, 00665 ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ") 00666 ACE_TEXT("created federation SubscriptionUpdate writer %C for repository %d\n"), 00667 std::string(converter).c_str(), 00668 this->id().id())); 00669 } 00670 } 00671 00672 description = this->federationParticipant_->lookup_topicdescription(SUBSCRIPTIONUPDATETOPICNAME); 00673 dataReader = subscriber->create_datareader( 00674 description.in(), 00675 readerQos, 00676 &this->subscriptionListener_, 00677 OpenDDS::DCPS::DEFAULT_STATUS_MASK); 00678 00679 if (CORBA::is_nil(dataReader.in())) { 00680 ACE_ERROR((LM_ERROR, 00681 ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ") 00682 ACE_TEXT("failed to create SubscriptionUpdate reader for repository %d\n"), 00683 this->id().id())); 00684 throw Incomplete(); 00685 00686 } else if (OpenDDS::DCPS::DCPS_debug_level > 4) { 00687 OpenDDS::DCPS::DataReaderImpl* servant 00688 = dynamic_cast<OpenDDS::DCPS::DataReaderImpl*>(dataReader.in()); 00689 00690 if (0 == servant) { 00691 ACE_DEBUG((LM_WARNING, 00692 ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ") 00693 ACE_TEXT("unable to extract typed SubscriptionUpdate reader.\n"))); 00694 00695 } else { 00696 OpenDDS::DCPS::RepoIdConverter converter(servant->get_subscription_id()); 00697 ACE_DEBUG((LM_DEBUG, 00698 ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ") 00699 ACE_TEXT("created federation SubscriptionUpdate reader %C for repository %d\n"), 00700 std::string(converter).c_str(), 00701 this->id().id())); 00702 } 00703 } 00704 00705 // JSP 00706 #if defined (ACE_HAS_IP_MULTICAST) 00707 00708 if (this->multicastEnabled_) { 00709 // 00710 // Install ior multicast handler. 00711 // 00712 // Get reactor instance from TAO. 00713 ACE_Reactor *reactor = this->orb_->orb_core()->reactor(); 00714 00715 // See if the -ORBMulticastDiscoveryEndpoint option was specified. 00716 ACE_CString mde(this->orb_->orb_core()->orb_params()->mcast_discovery_endpoint()); 00717 00718 // First, see if the user has given us a multicast port number 00719 // on the command-line; 00720 u_short port = 0; 00721 00722 // Check environment var. for multicast port. 00723 const char *port_number = ACE_OS::getenv("OpenDDSFederationPort"); 00724 00725 if (port_number != 0) { 00726 port = static_cast<u_short>(ACE_OS::atoi(port_number)); 00727 } 00728 00729 // Port wasn't specified on the command-line - 00730 // use the default. 00731 if (port == 0) 00732 port = OpenDDS::Federator::Defaults::DiscoveryRequestPort; 00733 00734 // Initialize the handler 00735 if (mde.length() != 0) { 00736 if (this->multicastResponder_.init( 00737 this->orb_.in(), 00738 mde.c_str()) == -1) { 00739 ACE_ERROR((LM_ERROR, 00740 ACE_TEXT("(%P|%t) ERROR: Unable to initialize ") 00741 ACE_TEXT("the multicast responder for repository %d.\n"), 00742 this->id().id())); 00743 throw Incomplete(); 00744 } 00745 00746 } else { 00747 if (this->multicastResponder_.init( 00748 this->orb_.in(), 00749 port, 00750 #if defined (ACE_HAS_IPV6) 00751 ACE_DEFAULT_MULTICASTV6_ADDR 00752 #else 00753 ACE_DEFAULT_MULTICAST_ADDR 00754 #endif /* ACE_HAS_IPV6 */ 00755 )) { 00756 ACE_ERROR((LM_ERROR, 00757 ACE_TEXT("(%P|%t) ERROR: Unable to initialize ") 00758 ACE_TEXT("the multicast responder for repository %d.\n"), 00759 this->id().id())); 00760 throw Incomplete(); 00761 } 00762 } 00763 00764 // Register event handler for the ior multicast. 00765 if (reactor->register_handler(&this->multicastResponder_, 00766 ACE_Event_Handler::READ_MASK) == -1) { 00767 ACE_ERROR((LM_ERROR, 00768 ACE_TEXT("(%P|%t) ERROR: Unable to register event handler ") 00769 ACE_TEXT("for repository %d.\n"), 00770 this->id().id())); 00771 throw Incomplete(); 00772 } 00773 00774 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 00775 ACE_DEBUG((LM_DEBUG, 00776 ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ") 00777 ACE_TEXT("multicast server setup is complete.\n"))); 00778 } 00779 } 00780 00781 #else 00782 ACE_UNUSED_ARG(this->multicastEnabled_); 00783 #endif /* ACE_HAS_IP_MULTICAST */ 00784 }
void OpenDDS::Federator::ManagerImpl::initializeOwner | ( | const OpenDDS::Federator::OwnerUpdate & | data | ) | [virtual] |
Definition at line 1127 of file FederatorManagerImpl.cpp.
References processCreate().
01129 { 01130 this->processCreate(&data, 0); 01131 }
void OpenDDS::Federator::ManagerImpl::initializeParticipant | ( | const OpenDDS::Federator::ParticipantUpdate & | data | ) | [virtual] |
Definition at line 1141 of file FederatorManagerImpl.cpp.
References processCreate().
01143 { 01144 this->processCreate(&data, 0); 01145 }
void OpenDDS::Federator::ManagerImpl::initializePublication | ( | const OpenDDS::Federator::PublicationUpdate & | data | ) | [virtual] |
Definition at line 1148 of file FederatorManagerImpl.cpp.
References processCreate().
01150 { 01151 this->processCreate(&data, 0); 01152 }
void OpenDDS::Federator::ManagerImpl::initializeSubscription | ( | const OpenDDS::Federator::SubscriptionUpdate & | data | ) | [virtual] |
Definition at line 1155 of file FederatorManagerImpl.cpp.
References processCreate().
01157 { 01158 this->processCreate(&data, 0); 01159 }
void OpenDDS::Federator::ManagerImpl::initializeTopic | ( | const OpenDDS::Federator::TopicUpdate & | data | ) | [virtual] |
Definition at line 1134 of file FederatorManagerImpl.cpp.
References processCreate().
01136 { 01137 this->processCreate(&data, 0); 01138 }
Manager_ptr OpenDDS::Federator::ManagerImpl::join_federation | ( | Manager_ptr | peer, | |
FederationDomain | federation | |||
) | [virtual] |
Definition at line 915 of file FederatorManagerImpl.cpp.
References config_, OpenDDS::DCPS::DCPS_debug_level, federated_, OpenDDS::Federator::Config::federationDomain(), initialize(), joiner_, joining_, joinRepo_, OpenDDS::Federator::NIL_REPOSITORY, orb(), peers_, pushState(), and TheServiceParticipant.
00919 { 00920 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 00921 ACE_DEBUG((LM_DEBUG, 00922 ACE_TEXT("(%P|%t) ManagerImpl::join_federation( peer, %d)\n"), 00923 federation)); 00924 } 00925 00926 RepoKey remote = NIL_REPOSITORY; 00927 00928 try { 00929 // Obtain the remote repository federator Id value. 00930 remote = peer->federation_id(); 00931 00932 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 00933 ACE_DEBUG((LM_DEBUG, 00934 ACE_TEXT("(%P|%t) Federator::ManagerImpl::join_federation() - ") 00935 ACE_TEXT("repo id %d entered from repository with id %d.\n"), 00936 this->id().id(), 00937 remote)); 00938 } 00939 00940 } catch (const CORBA::Exception& ex) { 00941 ex._tao_print_exception( 00942 ACE_TEXT("ERROR: Federator::ManagerImpl::join_federation() - ") 00943 ACE_TEXT("unable to obtain remote federation Id value: ")); 00944 throw Incomplete(); 00945 } 00946 00947 // If we are recursing, then we are done. 00948 if (this->joiner_ == remote) { 00949 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 00950 ACE_DEBUG((LM_DEBUG, 00951 ACE_TEXT("(%P|%t) Federator::ManagerImpl::join_federation() - ") 00952 ACE_TEXT("repo id %d leaving after reentry from repository with id %d.\n"), 00953 this->id().id(), 00954 remote)); 00955 } 00956 00957 return this->_this(); 00958 00959 } else { 00960 // Block while any different repository is joining. 00961 ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, guard, this->lock_, 0); 00962 00963 while (this->joiner_ != NIL_REPOSITORY) { 00964 // This releases the lock while we block. 00965 this->joining_.wait(); 00966 00967 // We are now recursing - curses! 00968 if (this->joiner_ == remote) { 00969 return this->_this(); 00970 } 00971 } 00972 00973 // Note that we are joining the remote repository now. 00974 this->joiner_ = remote; 00975 } 00976 00977 // 00978 // We only reach this point if: 00979 // 1) No other repository is processing past this point; 00980 // 2) We are not recursing. 00981 // 00982 00983 // Check if we already have Federation repository. 00984 // Check if we are already federated. 00985 if (this->federated_ == false) { 00986 // Go ahead and add the joining repository as our Federation 00987 // repository. 00988 try { 00989 // Mark this repository as the point to which we are joined to 00990 // the federation. 00991 this->joinRepo_ = remote; 00992 00993 // Obtain a reference to the remote repository. 00994 OpenDDS::DCPS::DCPSInfo_var remoteRepo = peer->repository(); 00995 00996 CORBA::ORB_var orb = remoteRepo->_get_orb(); 00997 CORBA::String_var remoteRepoIor = orb->object_to_string(remoteRepo.in()); 00998 if (OpenDDS::DCPS::DCPS_debug_level > 4) { 00999 ACE_DEBUG((LM_DEBUG, 01000 ACE_TEXT("(%P|%t) FederatorManagerImpl::join_federation() - ") 01001 ACE_TEXT("id %d obtained reference to id %d:\n") 01002 ACE_TEXT("\t%C\n"), 01003 this->id().id(), 01004 remote, 01005 remoteRepoIor.in())); 01006 } 01007 01008 // Add remote repository to Service_Participant in the Federation domain 01009 std::ostringstream oss; 01010 oss << remote; 01011 std::string key_string = oss.str(); 01012 TheServiceParticipant->set_repo_ior(remoteRepoIor.in(), key_string); 01013 TheServiceParticipant->set_repo_domain(this->config_.federationDomain(), key_string); 01014 01015 } catch (const CORBA::Exception& ex) { 01016 ex._tao_print_exception( 01017 "ERROR: Federator::ManagerImpl::join_federation() - Unable to join with remote: "); 01018 throw Incomplete(); 01019 } 01020 } 01021 01022 // Symmetrical joining behavior. 01023 try { 01024 Manager_var remoteManager 01025 = peer->join_federation(this->_this(), this->config_.federationDomain()); 01026 01027 if (this->joinRepo_ == remote) { 01028 this->peers_[ this->joinRepo_] 01029 = OpenDDS::Federator::Manager::_duplicate(remoteManager.in()); 01030 } 01031 01032 // 01033 // Push our initial state out to the joining repository *after* we call 01034 // him back to join. This reduces the amount of duplicate data pushed 01035 // when a new (empty) repository is joining an existing federation. 01036 // 01037 if (OpenDDS::DCPS::DCPS_debug_level > 4) { 01038 ACE_DEBUG((LM_DEBUG, 01039 ACE_TEXT("(%P|%t) Federator::ManagerImpl::join_federation() - ") 01040 ACE_TEXT("repo id %d pushing state to repository with id %d.\n"), 01041 this->id().id(), 01042 remote)); 01043 } 01044 01045 this->pushState(peer); 01046 01047 } catch (const CORBA::Exception& ex) { 01048 ex._tao_print_exception( 01049 "ERROR: Federator::ManagerImpl::join_federation() - unsuccessful call to remote->join: "); 01050 throw Incomplete(); 01051 } 01052 01053 if (CORBA::is_nil(this->participantWriter_.in())) { 01054 // 01055 // Establish our update publications and subscriptions *after* we 01056 // have exchanged internal state with the first joining repository. 01057 // 01058 this->initialize(); 01059 } 01060 01061 // Adjust our joining state and give others the opportunity to proceed. 01062 if (OpenDDS::DCPS::DCPS_debug_level > 4) { 01063 ACE_DEBUG((LM_DEBUG, 01064 ACE_TEXT("(%P|%t) Federator::ManagerImpl::join_federation() - ") 01065 ACE_TEXT("repo id %d joined to repository with id %d.\n"), 01066 this->id().id(), 01067 remote)); 01068 } 01069 01070 this->federated_ = true; 01071 this->joiner_ = NIL_REPOSITORY; 01072 this->joining_.signal(); 01073 return this->_this(); 01074 }
void OpenDDS::Federator::ManagerImpl::leave_and_shutdown | ( | ) | [virtual] |
Definition at line 1107 of file FederatorManagerImpl.cpp.
References info_, and TAO_DDS_DCPSInfo_i::shutdown().
01109 { 01110 // Shutdown the process via the repository object. 01111 this->info_->shutdown(); 01112 }
void OpenDDS::Federator::ManagerImpl::leave_federation | ( | RepoKey | id | ) | [virtual] |
Definition at line 1077 of file FederatorManagerImpl.cpp.
References OpenDDS::DCPS::DCPS_debug_level, and peers_.
01079 { 01080 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 01081 ACE_DEBUG((LM_DEBUG, 01082 ACE_TEXT("(%P|%t) ManagerImpl::leave_federation( %d)\n"), 01083 this->id().id())); 01084 } 01085 01086 // Remove the leaving repository from our outbound mappings. 01087 IdToManagerMap::iterator where = this->peers_.find(id); 01088 01089 if (where != this->peers_.end()) { 01090 this->peers_.erase(where); 01091 } 01092 01093 // Remove all the internal Entities owned by the leaving repository. 01094 if (false 01095 == this->info_->remove_by_owner(this->config_.federationDomain(), id)) { 01096 throw Incomplete(); 01097 } 01098 01099 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 01100 ACE_DEBUG((LM_DEBUG, 01101 ACE_TEXT("(%P|%t) ManagerImpl::leave_federation( %d) complete.\n"), 01102 this->id().id())); 01103 } 01104 }
ACE_INLINE void OpenDDS::Federator::ManagerImpl::localRepo | ( | ::OpenDDS::DCPS::DCPSInfo_ptr | repo | ) |
Capture a remote callable reference to the DCPSInfo.
Definition at line 40 of file FederatorManagerImpl.inl.
References localRepo_.
Referenced by InfoRepo::init().
00041 { 00042 this->localRepo_ = OpenDDS::DCPS::DCPSInfo::_duplicate(repo); 00043 }
ACE_INLINE void OpenDDS::Federator::ManagerImpl::orb | ( | CORBA::ORB_ptr | value | ) |
Definition at line 54 of file FederatorManagerImpl.inl.
References orb_.
00055 { 00056 this->orb_ = CORBA::ORB::_duplicate(value); 00057 }
ACE_INLINE CORBA::ORB_ptr OpenDDS::Federator::ManagerImpl::orb | ( | ) |
Accessors for the ORB.
Definition at line 47 of file FederatorManagerImpl.inl.
References orb_.
Referenced by InfoRepo::init(), join_federation(), and pushState().
00048 { 00049 return this->orb_.ptr(); 00050 }
void OpenDDS::Federator::ManagerImpl::processCreate | ( | const TopicUpdate * | sample, | |
const DDS::SampleInfo * | info | |||
) |
Create a proxy for a new topic.
Definition at line 729 of file FederatorManagerImpl_updates.cpp.
References OpenDDS::Federator::TopicUpdate::datatype, OpenDDS::DCPS::DCPS_debug_level, deferredTopics_, OpenDDS::Federator::TopicUpdate::domain, OpenDDS::Federator::TopicUpdate::id, OpenDDS::Federator::TopicUpdate::participant, processDeferred(), OpenDDS::Federator::TopicUpdate::qos, and OpenDDS::Federator::TopicUpdate::topic.
00730 { 00731 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 00732 OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant); 00733 OpenDDS::DCPS::RepoIdConverter topic_converter(sample->id); 00734 ACE_DEBUG((LM_DEBUG, 00735 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processCreate( TopicUpdate): ") 00736 ACE_TEXT("repo %d - [ domain %d/ participant %C/ topic %C ]\n"), 00737 this->id().id(), 00738 sample->domain, 00739 std::string(part_converter).c_str(), 00740 std::string(topic_converter).c_str())); 00741 } 00742 00743 if (false == this->info_->add_topic(sample->id, 00744 sample->domain, 00745 sample->participant, 00746 sample->topic, 00747 sample->datatype, 00748 sample->qos)) { 00749 { 00750 ACE_GUARD(ACE_Thread_Mutex, 00751 guard, 00752 this->deferred_lock_); 00753 this->deferredTopics_.push_back(*sample); 00754 } 00755 00756 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 00757 ACE_DEBUG((LM_DEBUG, 00758 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processCreate( TopicUpdate): ") 00759 ACE_TEXT("deferred update.\n"))); 00760 } 00761 } 00762 00763 this->processDeferred(); 00764 }
void OpenDDS::Federator::ManagerImpl::processCreate | ( | const ParticipantUpdate * | sample, | |
const DDS::SampleInfo * | info | |||
) |
Create a proxy for a new participant.
Definition at line 697 of file FederatorManagerImpl_updates.cpp.
References TAO_DDS_DCPSInfo_i::add_domain_participant(), TAO_DDS_DCPSInfo_i::changeOwnership(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::ParticipantUpdate::domain, OpenDDS::Federator::ParticipantUpdate::id, info_, OpenDDS::Federator::ParticipantUpdate::owner, processDeferred(), OpenDDS::Federator::ParticipantUpdate::qos, and OpenDDS::Federator::ParticipantUpdate::sender.
00698 { 00699 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 00700 OpenDDS::DCPS::RepoIdConverter converter(sample->id); 00701 ACE_DEBUG((LM_DEBUG, 00702 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processCreate( ParticipantUpdate): ") 00703 ACE_TEXT("repo %d - [ domain %d/ participant %C/ owner %d ]\n"), 00704 this->id().id(), 00705 sample->domain, 00706 std::string(converter).c_str(), 00707 sample->owner)); 00708 } 00709 00710 this->info_->add_domain_participant( 00711 sample->domain, 00712 sample->id, 00713 sample->qos); 00714 bool ownershipChanged = this->info_->changeOwnership( 00715 sample->domain, 00716 sample->id, 00717 sample->sender, 00718 sample->owner); 00719 if (!ownershipChanged) { 00720 ACE_ERROR((LM_ERROR, 00721 ACE_TEXT("(%P|%t) ERROR: ") 00722 ACE_TEXT("OpenDDS::Federator::ManagerImpl::processCreate(), ") 00723 ACE_TEXT("Could not change ownership\n"))); 00724 } 00725 this->processDeferred(); 00726 }
void OpenDDS::Federator::ManagerImpl::processCreate | ( | const SubscriptionUpdate * | sample, | |
const DDS::SampleInfo * | info | |||
) |
Create a proxy for a new subscription.
Definition at line 653 of file FederatorManagerImpl_updates.cpp.
References OpenDDS::Federator::SubscriptionUpdate::callback, OpenDDS::Federator::SubscriptionUpdate::datareader_qos, OpenDDS::DCPS::DCPS_debug_level, deferredSubscriptions_, OpenDDS::Federator::SubscriptionUpdate::domain, OpenDDS::Federator::SubscriptionUpdate::expression_params, OpenDDS::Federator::SubscriptionUpdate::filter_class_name, OpenDDS::Federator::SubscriptionUpdate::filter_expression, OpenDDS::Federator::SubscriptionUpdate::id, OpenDDS::Federator::SubscriptionUpdate::participant, processDeferred(), OpenDDS::Federator::SubscriptionUpdate::subscriber_qos, OpenDDS::Federator::SubscriptionUpdate::topic, and OpenDDS::Federator::SubscriptionUpdate::transport_info.
00654 { 00655 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 00656 OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant); 00657 OpenDDS::DCPS::RepoIdConverter sub_converter(sample->id); 00658 ACE_DEBUG((LM_DEBUG, 00659 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processCreate( SubscriptionUpdate): ") 00660 ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"), 00661 this->id().id(), 00662 sample->domain, 00663 std::string(part_converter).c_str(), 00664 std::string(sub_converter).c_str())); 00665 } 00666 00667 if (false == this->info_->add_subscription(sample->domain, 00668 sample->participant, 00669 sample->topic, 00670 sample->id, 00671 sample->callback, 00672 sample->datareader_qos, 00673 sample->transport_info, 00674 sample->subscriber_qos, 00675 sample->filter_class_name, 00676 sample->filter_expression, 00677 sample->expression_params, 00678 true)) { 00679 { 00680 ACE_GUARD(ACE_Thread_Mutex, 00681 guard, 00682 this->deferred_lock_); 00683 this->deferredSubscriptions_.push_back(*sample); 00684 } 00685 00686 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 00687 ACE_DEBUG((LM_DEBUG, 00688 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processCreate( SubscriptionUpdate): ") 00689 ACE_TEXT("deferred update.\n"))); 00690 } 00691 } 00692 00693 this->processDeferred(); 00694 }
void OpenDDS::Federator::ManagerImpl::processCreate | ( | const PublicationUpdate * | sample, | |
const DDS::SampleInfo * | info | |||
) |
Create a proxy for a new publication.
Definition at line 612 of file FederatorManagerImpl_updates.cpp.
References OpenDDS::Federator::PublicationUpdate::callback, OpenDDS::Federator::PublicationUpdate::datawriter_qos, OpenDDS::DCPS::DCPS_debug_level, deferredPublications_, OpenDDS::Federator::PublicationUpdate::domain, OpenDDS::Federator::PublicationUpdate::id, OpenDDS::Federator::PublicationUpdate::participant, processDeferred(), OpenDDS::Federator::PublicationUpdate::publisher_qos, OpenDDS::Federator::PublicationUpdate::topic, and OpenDDS::Federator::PublicationUpdate::transport_info.
00613 { 00614 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 00615 OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant); 00616 OpenDDS::DCPS::RepoIdConverter pub_converter(sample->id); 00617 ACE_DEBUG((LM_DEBUG, 00618 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processCreate( PublicationUpdate): ") 00619 ACE_TEXT("repo %d - [ domain %d/ participant %C/ publication %C ]\n"), 00620 this->id().id(), 00621 sample->domain, 00622 std::string(part_converter).c_str(), 00623 std::string(pub_converter).c_str())); 00624 } 00625 00626 if (false == this->info_->add_publication(sample->domain, 00627 sample->participant, 00628 sample->topic, 00629 sample->id, 00630 sample->callback, 00631 sample->datawriter_qos, 00632 sample->transport_info, 00633 sample->publisher_qos, 00634 true)) { 00635 { 00636 ACE_GUARD(ACE_Thread_Mutex, 00637 guard, 00638 this->deferred_lock_); 00639 this->deferredPublications_.push_back(*sample); 00640 } 00641 00642 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 00643 ACE_DEBUG((LM_DEBUG, 00644 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processCreate( PublicationUpdate): ") 00645 ACE_TEXT("deferred update.\n"))); 00646 } 00647 } 00648 00649 this->processDeferred(); 00650 }
void OpenDDS::Federator::ManagerImpl::processCreate | ( | const OwnerUpdate * | sample, | |
const DDS::SampleInfo * | info | |||
) |
Null implementation for OwnerUpdate samples.
Definition at line 575 of file FederatorManagerImpl_updates.cpp.
References OpenDDS::DCPS::DCPS_debug_level, deferredOwnerships_, OpenDDS::Federator::OwnerUpdate::domain, OpenDDS::Federator::OwnerUpdate::owner, OpenDDS::Federator::OwnerUpdate::participant, processDeferred(), and OpenDDS::Federator::OwnerUpdate::sender.
Referenced by initializeOwner(), initializeParticipant(), initializePublication(), initializeSubscription(), and initializeTopic().
00576 { 00577 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 00578 OpenDDS::DCPS::RepoIdConverter converter(sample->participant); 00579 ACE_DEBUG((LM_DEBUG, 00580 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processCreate( OwnerUpdate): ") 00581 ACE_TEXT("repo %d - [ domain %d/ participant %C/ sender %d/ owner %d ]\n"), 00582 this->id().id(), 00583 sample->domain, 00584 std::string(converter).c_str(), 00585 sample->sender, 00586 sample->owner)); 00587 } 00588 00589 // We could generate an error message here. Instead we let action be irrelevant. 00590 if (false == this->info_->changeOwnership(sample->domain, 00591 sample->participant, 00592 sample->sender, 00593 sample->owner)) { 00594 { 00595 ACE_GUARD(ACE_Thread_Mutex, 00596 guard, 00597 this->deferred_lock_); 00598 this->deferredOwnerships_.push_back(*sample); 00599 } 00600 00601 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 00602 ACE_DEBUG((LM_DEBUG, 00603 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processCreate( OwnerUpdate): ") 00604 ACE_TEXT("deferred update.\n"))); 00605 } 00606 } 00607 00608 this->processDeferred(); 00609 }
void OpenDDS::Federator::ManagerImpl::processDeferred | ( | ) |
Handle any deferred updates that might have become processable.
Definition at line 767 of file FederatorManagerImpl_updates.cpp.
References OpenDDS::DCPS::DCPS_debug_level, deferredOwnerships_, deferredPublications_, deferredSubscriptions_, and deferredTopics_.
Referenced by processCreate().
00768 { 00769 ACE_GUARD(ACE_Thread_Mutex, 00770 guard, 00771 this->deferred_lock_); 00772 00773 { 00774 std::list<OwnerUpdate>::iterator current = this->deferredOwnerships_.begin(); 00775 00776 while (current != this->deferredOwnerships_.end()) { 00777 if (this->info_->changeOwnership(current->domain, 00778 current->participant, 00779 current->sender, 00780 current->owner)) { 00781 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 00782 OpenDDS::DCPS::RepoIdConverter converter(current->participant); 00783 ACE_DEBUG((LM_DEBUG, 00784 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDeferred( OwnerUpdate): ") 00785 ACE_TEXT("repo %d - [ domain %d/ participant %C/ sender %d/ owner %d ]\n"), 00786 this->id().id(), 00787 current->domain, 00788 std::string(converter).c_str(), 00789 current->sender, 00790 current->owner)); 00791 } 00792 00793 current = this->deferredOwnerships_.erase(current); 00794 00795 } else { 00796 ++ current; 00797 } 00798 } 00799 } 00800 00801 { 00802 std::list<TopicUpdate>::iterator current = this->deferredTopics_.begin(); 00803 00804 while (current != this->deferredTopics_.end()) { 00805 if (true == this->info_->add_topic(current->id, 00806 current->domain, 00807 current->participant, 00808 current->topic, 00809 current->datatype, 00810 current->qos)) { 00811 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 00812 OpenDDS::DCPS::RepoIdConverter part_converter(current->participant); 00813 OpenDDS::DCPS::RepoIdConverter topic_converter(current->id); 00814 ACE_DEBUG((LM_DEBUG, 00815 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDeferred( TopicUpdate): ") 00816 ACE_TEXT("repo %d - [ domain %d/ participant %C/ topic %C ]\n"), 00817 this->id().id(), 00818 current->domain, 00819 std::string(part_converter).c_str(), 00820 std::string(topic_converter).c_str())); 00821 } 00822 00823 current = this->deferredTopics_.erase(current); 00824 00825 } else { 00826 ++ current; 00827 } 00828 } 00829 } 00830 00831 { 00832 std::list<PublicationUpdate>::iterator current = this->deferredPublications_.begin(); 00833 00834 while (current != this->deferredPublications_.end()) { 00835 00836 if (true == this->info_->add_publication(current->domain, 00837 current->participant, 00838 current->topic, 00839 current->id, 00840 current->callback, 00841 current->datawriter_qos, 00842 current->transport_info, 00843 current->publisher_qos, 00844 true)) { 00845 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 00846 OpenDDS::DCPS::RepoIdConverter part_converter(current->participant); 00847 OpenDDS::DCPS::RepoIdConverter pub_converter(current->id); 00848 ACE_DEBUG((LM_DEBUG, 00849 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDeferred( PublicationUpdate): ") 00850 ACE_TEXT("repo %d - [ domain %d/ participant %C/ publication %C ]\n"), 00851 this->id().id(), 00852 current->domain, 00853 std::string(part_converter).c_str(), 00854 std::string(pub_converter).c_str())); 00855 } 00856 00857 current = this->deferredPublications_.erase(current); 00858 00859 } else { 00860 ++ current; 00861 } 00862 } 00863 } 00864 00865 { 00866 std::list<SubscriptionUpdate>::iterator current = this->deferredSubscriptions_.begin(); 00867 00868 while (current != this->deferredSubscriptions_.end()) { 00869 00870 if (true == this->info_->add_subscription(current->domain, 00871 current->participant, 00872 current->topic, 00873 current->id, 00874 current->callback, 00875 current->datareader_qos, 00876 current->transport_info, 00877 current->subscriber_qos, 00878 current->filter_class_name, 00879 current->filter_expression, 00880 current->expression_params, 00881 true)) { 00882 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 00883 OpenDDS::DCPS::RepoIdConverter part_converter(current->participant); 00884 OpenDDS::DCPS::RepoIdConverter sub_converter(current->id); 00885 ACE_DEBUG((LM_DEBUG, 00886 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDeferred( SubscriptionUpdate): ") 00887 ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"), 00888 this->id().id(), 00889 current->domain, 00890 std::string(part_converter).c_str(), 00891 std::string(sub_converter).c_str())); 00892 } 00893 00894 current = this->deferredSubscriptions_.erase(current); 00895 00896 } else { 00897 ++ current; 00898 } 00899 } 00900 } 00901 00902 }
void OpenDDS::Federator::ManagerImpl::processDelete | ( | const TopicUpdate * | sample, | |
const DDS::SampleInfo * | info | |||
) |
Delete a proxy for a topic.
Definition at line 1206 of file FederatorManagerImpl_updates.cpp.
References OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::TopicUpdate::domain, OpenDDS::Federator::TopicUpdate::id, info_, OpenDDS::Federator::TopicUpdate::participant, and TAO_DDS_DCPSInfo_i::remove_topic().
01207 { 01208 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 01209 OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant); 01210 OpenDDS::DCPS::RepoIdConverter topic_converter(sample->id); 01211 ACE_DEBUG((LM_DEBUG, 01212 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( TopicUpdate): ") 01213 ACE_TEXT("repo %d - [ domain %d/ participant %C/ topic %C ]\n"), 01214 this->id().id(), 01215 sample->domain, 01216 std::string(part_converter).c_str(), 01217 std::string(topic_converter).c_str())); 01218 } 01219 01220 try { 01221 this->info_->remove_topic( 01222 sample->domain, 01223 sample->participant, 01224 sample->id); 01225 01226 } catch (OpenDDS::DCPS::Invalid_Participant&) { 01227 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 01228 ACE_DEBUG((LM_DEBUG, 01229 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( TopicUpdate): ") 01230 ACE_TEXT("the participant was already removed.\n"))); 01231 } 01232 } 01233 }
void OpenDDS::Federator::ManagerImpl::processDelete | ( | const ParticipantUpdate * | sample, | |
const DDS::SampleInfo * | info | |||
) |
Delete a proxy for a participant.
Definition at line 1181 of file FederatorManagerImpl_updates.cpp.
References OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::ParticipantUpdate::domain, OpenDDS::Federator::ParticipantUpdate::id, info_, and TAO_DDS_DCPSInfo_i::remove_domain_participant().
01182 { 01183 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 01184 OpenDDS::DCPS::RepoIdConverter converter(sample->id); 01185 ACE_DEBUG((LM_DEBUG, 01186 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( ParticipantUpdate): ") 01187 ACE_TEXT("repo %d - [ domain %d/ participant %C ]\n"), 01188 this->id().id(), 01189 sample->domain, 01190 std::string(converter).c_str())); 01191 } 01192 try { 01193 this->info_->remove_domain_participant( 01194 sample->domain, 01195 sample->id); 01196 } catch (OpenDDS::DCPS::Invalid_Participant&) { 01197 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 01198 ACE_DEBUG((LM_DEBUG, 01199 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( ParticipantUpdate): ") 01200 ACE_TEXT("the participant was already removed.\n"))); 01201 } 01202 } 01203 }
void OpenDDS::Federator::ManagerImpl::processDelete | ( | const SubscriptionUpdate * | sample, | |
const DDS::SampleInfo * | info | |||
) |
Delete a proxy for a subscription.
Definition at line 1151 of file FederatorManagerImpl_updates.cpp.
References OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::SubscriptionUpdate::domain, OpenDDS::Federator::SubscriptionUpdate::id, info_, OpenDDS::Federator::SubscriptionUpdate::participant, and TAO_DDS_DCPSInfo_i::remove_subscription().
01152 { 01153 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 01154 OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant); 01155 OpenDDS::DCPS::RepoIdConverter sub_converter(sample->id); 01156 ACE_DEBUG((LM_DEBUG, 01157 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( SubscriptionUpdate): ") 01158 ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"), 01159 this->id().id(), 01160 sample->domain, 01161 std::string(part_converter).c_str(), 01162 std::string(sub_converter).c_str())); 01163 } 01164 01165 try { 01166 this->info_->remove_subscription( 01167 sample->domain, 01168 sample->participant, 01169 sample->id); 01170 01171 } catch (OpenDDS::DCPS::Invalid_Participant&) { 01172 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 01173 ACE_DEBUG((LM_DEBUG, 01174 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( SubscriptionUpdate): ") 01175 ACE_TEXT("the participant was already removed.\n"))); 01176 } 01177 } 01178 }
void OpenDDS::Federator::ManagerImpl::processDelete | ( | const PublicationUpdate * | sample, | |
const DDS::SampleInfo * | info | |||
) |
Delete a proxy for a publication.
Definition at line 1121 of file FederatorManagerImpl_updates.cpp.
References OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::PublicationUpdate::domain, OpenDDS::Federator::PublicationUpdate::id, info_, OpenDDS::Federator::PublicationUpdate::participant, and TAO_DDS_DCPSInfo_i::remove_publication().
01122 { 01123 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 01124 OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant); 01125 OpenDDS::DCPS::RepoIdConverter pub_converter(sample->id); 01126 ACE_DEBUG((LM_DEBUG, 01127 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( PublicationUpdate): ") 01128 ACE_TEXT("repo %d - [ domain %d/ participant %C/ publication %C ]\n"), 01129 this->id().id(), 01130 sample->domain, 01131 std::string(part_converter).c_str(), 01132 std::string(pub_converter).c_str())); 01133 } 01134 01135 try { 01136 this->info_->remove_publication( 01137 sample->domain, 01138 sample->participant, 01139 sample->id); 01140 01141 } catch (OpenDDS::DCPS::Invalid_Participant&) { 01142 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 01143 ACE_DEBUG((LM_DEBUG, 01144 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( PublicationUpdate): ") 01145 ACE_TEXT("the participant was already removed.\n"))); 01146 } 01147 } 01148 }
void OpenDDS::Federator::ManagerImpl::processDelete | ( | const OwnerUpdate * | sample, | |
const DDS::SampleInfo * | info | |||
) |
Null implementation for OwnerUpdate samples.
Definition at line 1089 of file FederatorManagerImpl_updates.cpp.
References OpenDDS::DCPS::DCPS_debug_level, deferredOwnerships_, OpenDDS::Federator::OwnerUpdate::domain, OpenDDS::Federator::OwnerUpdate::owner, OpenDDS::Federator::OwnerUpdate::participant, and OpenDDS::Federator::OwnerUpdate::sender.
01090 { 01091 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 01092 OpenDDS::DCPS::RepoIdConverter converter(sample->participant); 01093 ACE_DEBUG((LM_DEBUG, 01094 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( OwnerUpdate): ") 01095 ACE_TEXT("repo %d - [ domain %d/ participant %C/ sender %d/ owner %d ]\n"), 01096 this->id().id(), 01097 sample->domain, 01098 std::string(converter).c_str(), 01099 sample->sender, 01100 sample->owner)); 01101 } 01102 01103 // We could generate an error message here. Instead we let action be irrelevant. 01104 if (false == this->info_->changeOwnership(sample->domain, 01105 sample->participant, 01106 sample->sender, 01107 sample->owner)) { 01108 { 01109 ACE_GUARD(ACE_Thread_Mutex, 01110 guard, 01111 this->deferred_lock_); 01112 this->deferredOwnerships_.push_back(*sample); 01113 } 01114 ACE_DEBUG((LM_DEBUG, 01115 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( OwnerUpdate): ") 01116 ACE_TEXT("deferred update.\n"))); 01117 } 01118 }
void OpenDDS::Federator::ManagerImpl::processUpdateFilterExpressionParams | ( | const SubscriptionUpdate * | sample, | |
const DDS::SampleInfo * | info | |||
) |
Update the proxy filter expression params for a subscription.
Definition at line 1025 of file FederatorManagerImpl_updates.cpp.
References OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::SubscriptionUpdate::domain, OpenDDS::Federator::SubscriptionUpdate::expression_params, OpenDDS::Federator::SubscriptionUpdate::id, info_, OpenDDS::Federator::SubscriptionUpdate::participant, and TAO_DDS_DCPSInfo_i::update_subscription_params().
01027 { 01028 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 01029 OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant); 01030 OpenDDS::DCPS::RepoIdConverter sub_converter(sample->id); 01031 ACE_DEBUG((LM_DEBUG, 01032 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processUpdateFilterExpressionParams(SubscriptionUpdate): ") 01033 ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"), 01034 this->id().id(), 01035 sample->domain, 01036 std::string(part_converter).c_str(), 01037 std::string(sub_converter).c_str())); 01038 } 01039 01040 this->info_->update_subscription_params( 01041 sample->domain, 01042 sample->participant, 01043 sample->id, 01044 sample->expression_params); 01045 }
void OpenDDS::Federator::ManagerImpl::processUpdateQos1 | ( | const TopicUpdate * | sample, | |
const DDS::SampleInfo * | info | |||
) |
Update the proxy TopicQos for a topic.
Definition at line 1067 of file FederatorManagerImpl_updates.cpp.
References OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::TopicUpdate::domain, OpenDDS::Federator::TopicUpdate::id, info_, OpenDDS::Federator::TopicUpdate::participant, OpenDDS::Federator::TopicUpdate::qos, and TAO_DDS_DCPSInfo_i::update_topic_qos().
01068 { 01069 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 01070 OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant); 01071 OpenDDS::DCPS::RepoIdConverter topic_converter(sample->id); 01072 ACE_DEBUG((LM_DEBUG, 01073 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processUpdateQos1( TopicUpdate): ") 01074 ACE_TEXT("repo %d - [ domain %d/ participant %C/ topic %C ]\n"), 01075 this->id().id(), 01076 sample->domain, 01077 std::string(part_converter).c_str(), 01078 std::string(topic_converter).c_str())); 01079 } 01080 01081 this->info_->update_topic_qos( 01082 sample->id, 01083 sample->domain, 01084 sample->participant, 01085 sample->qos); 01086 }
void OpenDDS::Federator::ManagerImpl::processUpdateQos1 | ( | const ParticipantUpdate * | sample, | |
const DDS::SampleInfo * | info | |||
) |
Update the proxy ParticipantQos for a participant.
Definition at line 1048 of file FederatorManagerImpl_updates.cpp.
References OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::ParticipantUpdate::domain, OpenDDS::Federator::ParticipantUpdate::id, info_, OpenDDS::Federator::ParticipantUpdate::qos, and TAO_DDS_DCPSInfo_i::update_domain_participant_qos().
01049 { 01050 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 01051 OpenDDS::DCPS::RepoIdConverter converter(sample->id); 01052 ACE_DEBUG((LM_DEBUG, 01053 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processUpdateQos1( ParticipantUpdate): ") 01054 ACE_TEXT("repo %d - [ domain %d/ participant %C ]\n"), 01055 this->id().id(), 01056 sample->domain, 01057 std::string(converter).c_str())); 01058 } 01059 01060 this->info_->update_domain_participant_qos( 01061 sample->domain, 01062 sample->id, 01063 sample->qos); 01064 }
void OpenDDS::Federator::ManagerImpl::processUpdateQos1 | ( | const SubscriptionUpdate * | sample, | |
const DDS::SampleInfo * | info | |||
) |
Update the proxy DataReaderQos for a subscription.
Definition at line 981 of file FederatorManagerImpl_updates.cpp.
References OpenDDS::Federator::SubscriptionUpdate::datareader_qos, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::SubscriptionUpdate::domain, OpenDDS::Federator::SubscriptionUpdate::id, info_, OpenDDS::Federator::SubscriptionUpdate::participant, and TAO_DDS_DCPSInfo_i::update_subscription_qos().
00982 { 00983 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 00984 OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant); 00985 OpenDDS::DCPS::RepoIdConverter sub_converter(sample->id); 00986 ACE_DEBUG((LM_DEBUG, 00987 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processUpdateQos1( SubscriptionUpdate): ") 00988 ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"), 00989 this->id().id(), 00990 sample->domain, 00991 std::string(part_converter).c_str(), 00992 std::string(sub_converter).c_str())); 00993 } 00994 00995 this->info_->update_subscription_qos( 00996 sample->domain, 00997 sample->participant, 00998 sample->id, 00999 sample->datareader_qos); 01000 }
void OpenDDS::Federator::ManagerImpl::processUpdateQos1 | ( | const PublicationUpdate * | sample, | |
const DDS::SampleInfo * | info | |||
) |
Update the proxy DataWriterQos for a publication.
Definition at line 937 of file FederatorManagerImpl_updates.cpp.
References OpenDDS::Federator::PublicationUpdate::datawriter_qos, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::PublicationUpdate::domain, OpenDDS::Federator::PublicationUpdate::id, info_, OpenDDS::Federator::PublicationUpdate::participant, and TAO_DDS_DCPSInfo_i::update_publication_qos().
00938 { 00939 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 00940 OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant); 00941 OpenDDS::DCPS::RepoIdConverter pub_converter(sample->id); 00942 ACE_DEBUG((LM_DEBUG, 00943 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processUpdateQos1( PublicationUpdate): ") 00944 ACE_TEXT("repo %d - [ domain %d/ participant %C/ publication %C ]\n"), 00945 this->id().id(), 00946 sample->domain, 00947 std::string(part_converter).c_str(), 00948 std::string(pub_converter).c_str())); 00949 } 00950 00951 this->info_->update_publication_qos( 00952 sample->domain, 00953 sample->participant, 00954 sample->id, 00955 sample->datawriter_qos); 00956 }
void OpenDDS::Federator::ManagerImpl::processUpdateQos1 | ( | const OwnerUpdate * | sample, | |
const DDS::SampleInfo * | info | |||
) |
Process ownership changes.
Definition at line 905 of file FederatorManagerImpl_updates.cpp.
References OpenDDS::DCPS::DCPS_debug_level, deferredOwnerships_, OpenDDS::Federator::OwnerUpdate::domain, OpenDDS::Federator::OwnerUpdate::owner, OpenDDS::Federator::OwnerUpdate::participant, and OpenDDS::Federator::OwnerUpdate::sender.
00906 { 00907 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 00908 OpenDDS::DCPS::RepoIdConverter converter(sample->participant); 00909 ACE_DEBUG((LM_DEBUG, 00910 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processUpdateQos1( OwnerUpdate): ") 00911 ACE_TEXT("repo %d - [ domain %d/ participant %C/ sender %d/ owner %d ]\n"), 00912 this->id().id(), 00913 sample->domain, 00914 std::string(converter).c_str(), 00915 sample->sender, 00916 sample->owner)); 00917 } 00918 00919 if (false == this->info_->changeOwnership(sample->domain, 00920 sample->participant, 00921 sample->sender, 00922 sample->owner)) { 00923 { 00924 ACE_GUARD(ACE_Thread_Mutex, 00925 guard, 00926 this->deferred_lock_); 00927 00928 this->deferredOwnerships_.push_back(*sample); 00929 } 00930 ACE_DEBUG((LM_DEBUG, 00931 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processUpdateQos1( OwnerUpdate): ") 00932 ACE_TEXT("deferred update.\n"))); 00933 } 00934 }
void OpenDDS::Federator::ManagerImpl::processUpdateQos2 | ( | const SubscriptionUpdate * | sample, | |
const DDS::SampleInfo * | info | |||
) |
Update the proxy SubscriberQos for a subscription.
Definition at line 1003 of file FederatorManagerImpl_updates.cpp.
References OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::SubscriptionUpdate::domain, OpenDDS::Federator::SubscriptionUpdate::id, info_, OpenDDS::Federator::SubscriptionUpdate::participant, OpenDDS::Federator::SubscriptionUpdate::subscriber_qos, and TAO_DDS_DCPSInfo_i::update_subscription_qos().
01004 { 01005 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 01006 OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant); 01007 OpenDDS::DCPS::RepoIdConverter sub_converter(sample->id); 01008 ACE_DEBUG((LM_DEBUG, 01009 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processUpdateQos2( SubscriptionUpdate): ") 01010 ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"), 01011 this->id().id(), 01012 sample->domain, 01013 std::string(part_converter).c_str(), 01014 std::string(sub_converter).c_str())); 01015 } 01016 01017 this->info_->update_subscription_qos( 01018 sample->domain, 01019 sample->participant, 01020 sample->id, 01021 sample->subscriber_qos); 01022 }
void OpenDDS::Federator::ManagerImpl::processUpdateQos2 | ( | const PublicationUpdate * | sample, | |
const DDS::SampleInfo * | info | |||
) |
Update the proxy PublisherQos for a publication.
Definition at line 959 of file FederatorManagerImpl_updates.cpp.
References OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::PublicationUpdate::domain, OpenDDS::Federator::PublicationUpdate::id, info_, OpenDDS::Federator::PublicationUpdate::participant, OpenDDS::Federator::PublicationUpdate::publisher_qos, and TAO_DDS_DCPSInfo_i::update_publication_qos().
00960 { 00961 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 00962 OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant); 00963 OpenDDS::DCPS::RepoIdConverter pub_converter(sample->id); 00964 ACE_DEBUG((LM_DEBUG, 00965 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processUpdateQos2( PublicationUpdate): ") 00966 ACE_TEXT("repo %d - [ domain %d/ participant %C/ publication %C ]\n"), 00967 this->id().id(), 00968 sample->domain, 00969 std::string(part_converter).c_str(), 00970 std::string(pub_converter).c_str())); 00971 } 00972 00973 this->info_->update_publication_qos( 00974 sample->domain, 00975 sample->participant, 00976 sample->id, 00977 sample->publisher_qos); 00978 }
void OpenDDS::Federator::ManagerImpl::pushState | ( | Manager_ptr | peer | ) |
Push our current state to a remote repository.
Definition at line 1236 of file FederatorManagerImpl_updates.cpp.
References OpenDDS::Federator::OwnerUpdate::action, OpenDDS::Federator::ParticipantUpdate::action, config_, OpenDDS::Federator::CreateEntity, OpenDDS::Federator::OwnerUpdate::domain, OpenDDS::Federator::ParticipantUpdate::domain, TAO_DDS_DCPSInfo_i::domains(), OpenDDS::Federator::Config::federationDomain(), DCPS_IR_Subscription::get_datareader_qos(), DCPS_IR_Publication::get_datawriter_qos(), DCPS_IR_Subscription::get_expr_params(), DCPS_IR_Subscription::get_filter_expression(), DCPS_IR_Subscription::get_id(), DCPS_IR_Publication::get_id(), DCPS_IR_Subscription::get_participant_id(), DCPS_IR_Publication::get_participant_id(), DCPS_IR_Publication::get_publisher_qos(), DCPS_IR_Subscription::get_subscriber_qos(), DCPS_IR_Subscription::get_topic_id(), DCPS_IR_Publication::get_topic_id(), DCPS_IR_Subscription::get_transportLocatorSeq(), DCPS_IR_Publication::get_transportLocatorSeq(), OpenDDS::Federator::ParticipantUpdate::id, TAO_DDS_DCPSFederationId::id(), id(), info_, TAO_DDS_DCPSInfo_i::orb(), orb(), OpenDDS::Federator::OwnerUpdate::owner, OpenDDS::Federator::ParticipantUpdate::owner, OpenDDS::Federator::OwnerUpdate::participant, OpenDDS::Federator::ParticipantUpdate::qos, DCPS_IR_Subscription::reader(), OpenDDS::Federator::OwnerUpdate::sender, OpenDDS::Federator::ParticipantUpdate::sender, and DCPS_IR_Publication::writer().
Referenced by join_federation().
01237 { 01238 // foreach DCPS_IR_Domain 01239 // foreach DCPS_IR_Participant 01240 // peer->initializeParticipant(...) 01241 // peer->initializeOwner(...) 01242 // foreach DCPS_IR_Participant 01243 // foreach DCPS_IR_Topic 01244 // peer->initializeTopic(...) 01245 // foreach DCPS_IR_Publication 01246 // peer->initializePublication(...) 01247 // foreach DCPS_IR_Subscription 01248 // peer->initializeSubscription(...) 01249 01250 // Process each domain within the repository. 01251 for (DCPS_IR_Domain_Map::const_iterator currentDomain 01252 = this->info_->domains().begin(); 01253 currentDomain != this->info_->domains().end(); 01254 ++currentDomain) { 01255 01256 if (currentDomain->second->get_id() == this->config_.federationDomain()) { 01257 // Do not push the Federation domain publications. 01258 //continue; 01259 } 01260 01261 // Process each participant within the current domain. 01262 for (DCPS_IR_Participant_Map::const_iterator currentParticipant 01263 = currentDomain->second->participants().begin(); 01264 currentParticipant != currentDomain->second->participants().end(); 01265 ++currentParticipant) { 01266 01267 if (currentParticipant->second->isBitPublisher() == true) { 01268 // Do not push the built-in topic publications. 01269 continue; 01270 } 01271 01272 // Initialize the participant on the peer. 01273 ParticipantUpdate participantSample; 01274 participantSample.sender = this->id().id(); 01275 participantSample.action = CreateEntity; 01276 01277 participantSample.owner = currentParticipant->second->owner(); 01278 participantSample.domain = currentDomain->second->get_id(); 01279 participantSample.id = currentParticipant->second->get_id(); 01280 participantSample.qos = *currentParticipant->second->get_qos(); 01281 01282 peer->initializeParticipant(participantSample); 01283 01284 // Initialize the ownership of the participant on the peer. 01285 OwnerUpdate ownerSample; 01286 ownerSample.sender = this->id().id(); 01287 ownerSample.action = CreateEntity; 01288 01289 ownerSample.domain = currentDomain->second->get_id(); 01290 ownerSample.participant = currentParticipant->second->get_id(); 01291 ownerSample.owner = currentParticipant->second->owner(); 01292 01293 peer->initializeOwner(ownerSample); 01294 } 01295 01296 // Process each participant within the current domain. 01297 for (DCPS_IR_Participant_Map::const_iterator currentParticipant 01298 = currentDomain->second->participants().begin(); 01299 currentParticipant != currentDomain->second->participants().end(); 01300 ++currentParticipant) { 01301 01302 if (currentParticipant->second->isBitPublisher() == true) { 01303 // Do not push the built-in topic publications. 01304 continue; 01305 } 01306 01307 // Process each topic within the current particpant. 01308 for (DCPS_IR_Topic_Map::const_iterator currentTopic 01309 = currentParticipant->second->topics().begin(); 01310 currentTopic != currentParticipant->second->topics().end(); 01311 ++currentTopic) { 01312 TopicUpdate topicSample; 01313 topicSample.sender = this->id().id(); 01314 topicSample.action = CreateEntity; 01315 01316 topicSample.id = currentTopic->second->get_id(); 01317 topicSample.domain = currentDomain->second->get_id(); 01318 topicSample.participant = currentTopic->second->get_participant_id(); 01319 topicSample.topic = currentTopic->second->get_topic_description()->get_name(); 01320 topicSample.datatype = currentTopic->second->get_topic_description()->get_dataTypeName(); 01321 topicSample.qos = *currentTopic->second->get_topic_qos(); 01322 01323 peer->initializeTopic(topicSample); 01324 } 01325 01326 // Process each publication within the current particpant. 01327 for (DCPS_IR_Publication_Map::const_iterator currentPublication 01328 = currentParticipant->second->publications().begin(); 01329 currentPublication != currentParticipant->second->publications().end(); 01330 ++currentPublication) { 01331 PublicationUpdate publicationSample; 01332 publicationSample.sender = this->id().id(); 01333 publicationSample.action = CreateEntity; 01334 01335 DCPS_IR_Publication* p = currentPublication->second; 01336 CORBA::ORB_var orb = this->info_->orb(); 01337 CORBA::String_var callback = orb->object_to_string(p->writer()); 01338 01339 publicationSample.domain = currentDomain->second->get_id(); 01340 publicationSample.participant = p->get_participant_id(); 01341 publicationSample.topic = p->get_topic_id(); 01342 publicationSample.id = p->get_id(); 01343 publicationSample.callback = callback.in(); 01344 publicationSample.datawriter_qos = *p->get_datawriter_qos(); 01345 publicationSample.publisher_qos = *p->get_publisher_qos(); 01346 publicationSample.transport_info = p->get_transportLocatorSeq(); 01347 01348 peer->initializePublication(publicationSample); 01349 } 01350 01351 // Process each subscription within the current particpant. 01352 for (DCPS_IR_Subscription_Map::const_iterator currentSubscription 01353 = currentParticipant->second->subscriptions().begin(); 01354 currentSubscription != currentParticipant->second->subscriptions().end(); 01355 ++currentSubscription) { 01356 SubscriptionUpdate subscriptionSample; 01357 subscriptionSample.sender = this->id().id(); 01358 subscriptionSample.action = CreateEntity; 01359 01360 DCPS_IR_Subscription* s = currentSubscription->second; 01361 CORBA::ORB_var orb = this->info_->orb(); 01362 CORBA::String_var callback = orb->object_to_string(s->reader()); 01363 01364 subscriptionSample.domain = currentDomain->second->get_id(); 01365 subscriptionSample.participant = s->get_participant_id(); 01366 subscriptionSample.topic = s->get_topic_id(); 01367 subscriptionSample.id = s->get_id(); 01368 subscriptionSample.callback = callback.in(); 01369 subscriptionSample.datareader_qos = *s->get_datareader_qos(); 01370 subscriptionSample.subscriber_qos = *s->get_subscriber_qos(); 01371 subscriptionSample.transport_info = s->get_transportLocatorSeq(); 01372 subscriptionSample.filter_expression = s->get_filter_expression().c_str(); 01373 subscriptionSample.expression_params = s->get_expr_params(); 01374 01375 peer->initializeSubscription(subscriptionSample); 01376 } 01377 } 01378 } 01379 }
OpenDDS::DCPS::DCPSInfo_ptr OpenDDS::Federator::ManagerImpl::repository | ( | ) | [virtual] |
Definition at line 876 of file FederatorManagerImpl.cpp.
References OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::RcHandle< T >::is_nil(), and TheServiceParticipant.
00877 { 00878 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 00879 ACE_DEBUG((LM_DEBUG, 00880 ACE_TEXT("(%P|%t) ManagerImpl::repository()\n"))); 00881 } 00882 00883 OpenDDS::DCPS::Discovery_rch disco 00884 = TheServiceParticipant->get_discovery( 00885 this->config_.federationDomain()); 00886 OpenDDS::DCPS::DCPSInfo_var repo; 00887 if (!disco.is_nil()) { 00888 OpenDDS::DCPS::InfoRepoDiscovery_rch irDisco = 00889 DCPS::static_rchandle_cast<DCPS::InfoRepoDiscovery>(disco); 00890 repo = irDisco->get_dcps_info(); 00891 } 00892 00893 if (CORBA::is_nil(repo.in())) { 00894 return OpenDDS::DCPS::DCPSInfo::_duplicate(this->localRepo_.in()); 00895 00896 } else { 00897 return OpenDDS::DCPS::DCPSInfo::_duplicate(repo.in()); 00898 } 00899 }
void OpenDDS::Federator::ManagerImpl::requestImage | ( | ) | [virtual] |
void OpenDDS::Federator::ManagerImpl::shutdown | ( | ) | [virtual] |
Definition at line 1115 of file FederatorManagerImpl.cpp.
References federated_, info_, and TAO_DDS_DCPSInfo_i::shutdown().
01117 { 01118 // Prevent the removal of this repository from the federation during 01119 // shutdown processing. 01120 this->federated_ = false; 01121 01122 // Shutdown the process via the repository object. 01123 this->info_->shutdown(); 01124 }
void OpenDDS::Federator::ManagerImpl::unregisterCallback | ( | ) | [virtual] |
void OpenDDS::Federator::ManagerImpl::update | ( | const Update::IdPath & | id, | |
const DDS::StringSeq & | exprParams | |||
) | [virtual] |
Implements Update::Updater.
Definition at line 506 of file FederatorManagerImpl_updates.cpp.
References OpenDDS::Federator::SubscriptionUpdate::action, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::SubscriptionUpdate::domain, OpenDDS::Federator::SubscriptionUpdate::expression_params, DDS::HANDLE_NIL, OpenDDS::Federator::SubscriptionUpdate::id, TAO_DDS_DCPSFederationId::id(), id(), OpenDDS::Federator::SubscriptionUpdate::participant, OpenDDS::Federator::SubscriptionUpdate::sender, subscriptionWriter_, and OpenDDS::Federator::UpdateFilterExpressionParams.
00507 { 00508 if (CORBA::is_nil(this->subscriptionWriter_.in())) { 00509 // Decline to publish data until we can. 00510 return; 00511 } 00512 00513 SubscriptionUpdate sample; 00514 sample.sender = this->id().id(); 00515 sample.action = UpdateFilterExpressionParams; 00516 sample.domain = id.domain; 00517 sample.participant = id.participant; 00518 sample.id = id.id; 00519 sample.expression_params = params; 00520 00521 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 00522 OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant); 00523 OpenDDS::DCPS::RepoIdConverter sub_converter(sample.id); 00524 ACE_DEBUG((LM_DEBUG, 00525 ACE_TEXT("(%P|%t) Federator::ManagerImpl::update(FilterParams): ") 00526 ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"), 00527 this->id().id(), 00528 sample.domain, 00529 std::string(part_converter).c_str(), 00530 std::string(sub_converter).c_str())); 00531 } 00532 00533 this->subscriptionWriter_->write(sample, DDS::HANDLE_NIL); 00534 }
void OpenDDS::Federator::ManagerImpl::update | ( | const Update::IdPath & | id, | |
const DDS::SubscriberQos & | qos | |||
) | [virtual] |
Implements Update::Updater.
Definition at line 537 of file FederatorManagerImpl_updates.cpp.
References OpenDDS::Federator::SubscriptionUpdate::action, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::SubscriptionUpdate::domain, DDS::HANDLE_NIL, OpenDDS::Federator::SubscriptionUpdate::id, TAO_DDS_DCPSFederationId::id(), id(), OpenDDS::Federator::SubscriptionUpdate::participant, OpenDDS::Federator::SubscriptionUpdate::sender, OpenDDS::Federator::SubscriptionUpdate::subscriber_qos, subscriptionWriter_, and OpenDDS::Federator::UpdateQosValue2.
00538 { 00539 if (CORBA::is_nil(this->subscriptionWriter_.in())) { 00540 // Decline to publish data until we can. 00541 return; 00542 } 00543 00544 SubscriptionUpdate sample; 00545 sample.sender = this->id().id(); 00546 sample.action = UpdateQosValue2; 00547 00548 sample.domain = id.domain; 00549 sample.participant = id.participant; 00550 sample.id = id.id; 00551 sample.subscriber_qos = qos; 00552 00553 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 00554 OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant); 00555 OpenDDS::DCPS::RepoIdConverter sub_converter(sample.id); 00556 ACE_DEBUG((LM_DEBUG, 00557 ACE_TEXT("(%P|%t) Federator::ManagerImpl::update( SubscriberUpdate): ") 00558 ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"), 00559 this->id().id(), 00560 sample.domain, 00561 std::string(part_converter).c_str(), 00562 std::string(sub_converter).c_str())); 00563 } 00564 00565 this->subscriptionWriter_->write(sample, DDS::HANDLE_NIL); 00566 }
void OpenDDS::Federator::ManagerImpl::update | ( | const Update::IdPath & | id, | |
const DDS::DataReaderQos & | qos | |||
) | [virtual] |
Implements Update::Updater.
Definition at line 474 of file FederatorManagerImpl_updates.cpp.
References OpenDDS::Federator::SubscriptionUpdate::action, OpenDDS::Federator::SubscriptionUpdate::datareader_qos, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::SubscriptionUpdate::domain, DDS::HANDLE_NIL, OpenDDS::Federator::SubscriptionUpdate::id, TAO_DDS_DCPSFederationId::id(), id(), OpenDDS::Federator::SubscriptionUpdate::participant, OpenDDS::Federator::SubscriptionUpdate::sender, subscriptionWriter_, and OpenDDS::Federator::UpdateQosValue1.
00475 { 00476 if (CORBA::is_nil(this->subscriptionWriter_.in())) { 00477 // Decline to publish data until we can. 00478 return; 00479 } 00480 00481 SubscriptionUpdate sample; 00482 sample.sender = this->id().id(); 00483 sample.action = UpdateQosValue1; 00484 00485 sample.domain = id.domain; 00486 sample.participant = id.participant; 00487 sample.id = id.id; 00488 sample.datareader_qos = qos; 00489 00490 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 00491 OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant); 00492 OpenDDS::DCPS::RepoIdConverter sub_converter(sample.id); 00493 ACE_DEBUG((LM_DEBUG, 00494 ACE_TEXT("(%P|%t) Federator::ManagerImpl::update( ReaderUpdate): ") 00495 ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"), 00496 this->id().id(), 00497 sample.domain, 00498 std::string(part_converter).c_str(), 00499 std::string(sub_converter).c_str())); 00500 } 00501 00502 this->subscriptionWriter_->write(sample, DDS::HANDLE_NIL); 00503 }
void OpenDDS::Federator::ManagerImpl::update | ( | const Update::IdPath & | id, | |
const DDS::PublisherQos & | qos | |||
) | [virtual] |
Implements Update::Updater.
Definition at line 442 of file FederatorManagerImpl_updates.cpp.
References OpenDDS::Federator::PublicationUpdate::action, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::PublicationUpdate::domain, DDS::HANDLE_NIL, OpenDDS::Federator::PublicationUpdate::id, TAO_DDS_DCPSFederationId::id(), id(), OpenDDS::Federator::PublicationUpdate::participant, publicationWriter_, OpenDDS::Federator::PublicationUpdate::publisher_qos, OpenDDS::Federator::PublicationUpdate::sender, and OpenDDS::Federator::UpdateQosValue2.
00443 { 00444 if (CORBA::is_nil(this->publicationWriter_.in())) { 00445 // Decline to publish data until we can. 00446 return; 00447 } 00448 00449 PublicationUpdate sample; 00450 sample.sender = this->id().id(); 00451 sample.action = UpdateQosValue2; 00452 00453 sample.domain = id.domain; 00454 sample.participant = id.participant; 00455 sample.id = id.id; 00456 sample.publisher_qos = qos; 00457 00458 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 00459 OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant); 00460 OpenDDS::DCPS::RepoIdConverter pub_converter(sample.id); 00461 ACE_DEBUG((LM_DEBUG, 00462 ACE_TEXT("(%P|%t) Federator::ManagerImpl::update( PublisherUpdate): ") 00463 ACE_TEXT("repo %d - [ domain %d/ participant %C/ publication %C ]\n"), 00464 this->id().id(), 00465 sample.domain, 00466 std::string(part_converter).c_str(), 00467 std::string(pub_converter).c_str())); 00468 } 00469 00470 this->publicationWriter_->write(sample, DDS::HANDLE_NIL); 00471 }
void OpenDDS::Federator::ManagerImpl::update | ( | const Update::IdPath & | id, | |
const DDS::DataWriterQos & | qos | |||
) | [virtual] |
Implements Update::Updater.
Definition at line 410 of file FederatorManagerImpl_updates.cpp.
References OpenDDS::Federator::PublicationUpdate::action, OpenDDS::Federator::PublicationUpdate::datawriter_qos, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::PublicationUpdate::domain, DDS::HANDLE_NIL, OpenDDS::Federator::PublicationUpdate::id, TAO_DDS_DCPSFederationId::id(), id(), OpenDDS::Federator::PublicationUpdate::participant, publicationWriter_, OpenDDS::Federator::PublicationUpdate::sender, and OpenDDS::Federator::UpdateQosValue1.
00411 { 00412 if (CORBA::is_nil(this->publicationWriter_.in())) { 00413 // Decline to publish data until we can. 00414 return; 00415 } 00416 00417 PublicationUpdate sample; 00418 sample.sender = this->id().id(); 00419 sample.action = UpdateQosValue1; 00420 00421 sample.domain = id.domain; 00422 sample.participant = id.participant; 00423 sample.id = id.id; 00424 sample.datawriter_qos = qos; 00425 00426 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 00427 OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant); 00428 OpenDDS::DCPS::RepoIdConverter pub_converter(sample.id); 00429 ACE_DEBUG((LM_DEBUG, 00430 ACE_TEXT("(%P|%t) Federator::ManagerImpl::update( WriterUpdate): ") 00431 ACE_TEXT("repo %d - [ domain %d/ participant %C/ publication %C ]\n"), 00432 this->id().id(), 00433 sample.domain, 00434 std::string(part_converter).c_str(), 00435 std::string(pub_converter).c_str())); 00436 } 00437 00438 this->publicationWriter_->write(sample, DDS::HANDLE_NIL); 00439 }
void OpenDDS::Federator::ManagerImpl::update | ( | const Update::IdPath & | id, | |
const DDS::TopicQos & | qos | |||
) | [virtual] |
Implements Update::Updater.
Definition at line 378 of file FederatorManagerImpl_updates.cpp.
References OpenDDS::Federator::TopicUpdate::action, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::TopicUpdate::domain, DDS::HANDLE_NIL, OpenDDS::Federator::TopicUpdate::id, TAO_DDS_DCPSFederationId::id(), id(), OpenDDS::Federator::TopicUpdate::participant, OpenDDS::Federator::TopicUpdate::qos, OpenDDS::Federator::TopicUpdate::sender, topicWriter_, and OpenDDS::Federator::UpdateQosValue1.
00379 { 00380 if (CORBA::is_nil(this->topicWriter_.in())) { 00381 // Decline to publish data until we can. 00382 return; 00383 } 00384 00385 TopicUpdate sample; 00386 sample.sender = this->id().id(); 00387 sample.action = UpdateQosValue1; 00388 00389 sample.id = id.id; 00390 sample.domain = id.domain; 00391 sample.participant = id.participant; 00392 sample.qos = qos; 00393 00394 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 00395 OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant); 00396 OpenDDS::DCPS::RepoIdConverter topic_converter(sample.id); 00397 ACE_DEBUG((LM_DEBUG, 00398 ACE_TEXT("(%P|%t) Federator::ManagerImpl::update( TopicUpdate): ") 00399 ACE_TEXT("repo %d - [ domain %d/ participant %C/ topic %C ]\n"), 00400 this->id().id(), 00401 sample.domain, 00402 std::string(part_converter).c_str(), 00403 std::string(topic_converter).c_str())); 00404 } 00405 00406 this->topicWriter_->write(sample, DDS::HANDLE_NIL); 00407 }
void OpenDDS::Federator::ManagerImpl::update | ( | const Update::IdPath & | id, | |
const DDS::DomainParticipantQos & | qos | |||
) | [virtual] |
Implements Update::Updater.
Definition at line 349 of file FederatorManagerImpl_updates.cpp.
References OpenDDS::Federator::ParticipantUpdate::action, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::ParticipantUpdate::domain, DDS::HANDLE_NIL, OpenDDS::Federator::ParticipantUpdate::id, TAO_DDS_DCPSFederationId::id(), id(), participantWriter_, OpenDDS::Federator::ParticipantUpdate::qos, OpenDDS::Federator::ParticipantUpdate::sender, and OpenDDS::Federator::UpdateQosValue1.
00350 { 00351 if (CORBA::is_nil(this->participantWriter_.in())) { 00352 // Decline to publish data until we can. 00353 return; 00354 } 00355 00356 ParticipantUpdate sample; 00357 sample.sender = this->id().id(); 00358 sample.action = UpdateQosValue1; 00359 00360 sample.domain = id.domain; 00361 sample.id = id.id; 00362 sample.qos = qos; 00363 00364 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 00365 OpenDDS::DCPS::RepoIdConverter converter(sample.id); 00366 ACE_DEBUG((LM_DEBUG, 00367 ACE_TEXT("(%P|%t) Federator::ManagerImpl::update( ParticipantUpdate): ") 00368 ACE_TEXT("repo %d - [ domain %d/ participant %C ]\n"), 00369 this->id().id(), 00370 sample.domain, 00371 std::string(converter).c_str())); 00372 } 00373 00374 this->participantWriter_->write(sample, DDS::HANDLE_NIL); 00375 }
Config& OpenDDS::Federator::ManagerImpl::config_ [private] |
The configuration information for this manager.
Definition at line 231 of file FederatorManagerImpl.h.
Referenced by id(), initialize(), join_federation(), and pushState().
ACE_Thread_Mutex OpenDDS::Federator::ManagerImpl::deferred_lock_ [private] |
std::list<OwnerUpdate> OpenDDS::Federator::ManagerImpl::deferredOwnerships_ [private] |
Deferred ownership updates.
Definition at line 279 of file FederatorManagerImpl.h.
Referenced by processCreate(), processDeferred(), processDelete(), and processUpdateQos1().
std::list<PublicationUpdate> OpenDDS::Federator::ManagerImpl::deferredPublications_ [private] |
Deferred publication updates.
Definition at line 285 of file FederatorManagerImpl.h.
Referenced by processCreate(), and processDeferred().
std::list<SubscriptionUpdate> OpenDDS::Federator::ManagerImpl::deferredSubscriptions_ [private] |
Deferred subscription updates.
Definition at line 288 of file FederatorManagerImpl.h.
Referenced by processCreate(), and processDeferred().
std::list<TopicUpdate> OpenDDS::Federator::ManagerImpl::deferredTopics_ [private] |
Deferred topic updates.
Definition at line 282 of file FederatorManagerImpl.h.
Referenced by processCreate(), and processDeferred().
bool OpenDDS::Federator::ManagerImpl::federated_ [private] |
Flag indicating that we are actively participating in a federation of repositories.
Definition at line 219 of file FederatorManagerImpl.h.
Referenced by finalize(), join_federation(), and shutdown().
DDS::DomainParticipant_var OpenDDS::Federator::ManagerImpl::federationParticipant_ [private] |
local DomainParticipant
Definition at line 246 of file FederatorManagerImpl.h.
Referenced by initialize().
The Info object reference to update.
Definition at line 234 of file FederatorManagerImpl.h.
Referenced by info(), leave_and_shutdown(), processCreate(), processDelete(), processUpdateFilterExpressionParams(), processUpdateQos1(), processUpdateQos2(), pushState(), and shutdown().
Simple recursion avoidance during the join operations.
Definition at line 212 of file FederatorManagerImpl.h.
Referenced by join_federation().
ACE_Condition<ACE_SYNCH_MUTEX> OpenDDS::Federator::ManagerImpl::joining_ [private] |
Condition used to gate joining activities.
Definition at line 209 of file FederatorManagerImpl.h.
Referenced by join_federation().
Repository to which we joined.
Definition at line 215 of file FederatorManagerImpl.h.
Referenced by join_federation().
OpenDDS::DCPS::DCPSInfo_var OpenDDS::Federator::ManagerImpl::localRepo_ [private] |
Remotely callable reference to the local repository.
Definition at line 237 of file FederatorManagerImpl.h.
Referenced by localRepo().
ACE_SYNCH_MUTEX OpenDDS::Federator::ManagerImpl::lock_ [private] |
bool OpenDDS::Federator::ManagerImpl::multicastEnabled_ [private] |
Is multicast enabled?
Definition at line 291 of file FederatorManagerImpl.h.
Referenced by ManagerImpl().
CORBA::ORB_var OpenDDS::Federator::ManagerImpl::orb_ [private] |
The ORB in which we are activated.
Definition at line 240 of file FederatorManagerImpl.h.
Referenced by finalize(), initialize(), and orb().
UpdateListener<OwnerUpdate, OwnerUpdateDataReader> OpenDDS::Federator::ManagerImpl::ownerListener_ [private] |
TopicUpdate listener.
Definition at line 249 of file FederatorManagerImpl.h.
Referenced by finalize(), and initialize().
OwnerUpdateDataWriter_var OpenDDS::Federator::ManagerImpl::ownerWriter_ [private] |
TopicUpdate writer.
Definition at line 264 of file FederatorManagerImpl.h.
Referenced by create(), and initialize().
UpdateListener<ParticipantUpdate, ParticipantUpdateDataReader> OpenDDS::Federator::ManagerImpl::participantListener_ [private] |
ParticipantUpdate listener.
Definition at line 255 of file FederatorManagerImpl.h.
Referenced by finalize(), and initialize().
ParticipantUpdateDataWriter_var OpenDDS::Federator::ManagerImpl::participantWriter_ [private] |
ParticipantUpdate writer.
Definition at line 270 of file FederatorManagerImpl.h.
Referenced by create(), destroy(), initialize(), and update().
The peer with which we have federated.
Definition at line 225 of file FederatorManagerImpl.h.
Referenced by finalize(), join_federation(), and leave_federation().
UpdateListener<PublicationUpdate, PublicationUpdateDataReader> OpenDDS::Federator::ManagerImpl::publicationListener_ [private] |
PublicationUpdate listener.
Definition at line 258 of file FederatorManagerImpl.h.
Referenced by finalize(), and initialize().
PublicationUpdateDataWriter_var OpenDDS::Federator::ManagerImpl::publicationWriter_ [private] |
PublicationUpdate writer.
Definition at line 273 of file FederatorManagerImpl.h.
Referenced by destroy(), initialize(), and update().
The packet sequence number for data that we publish.
Definition at line 228 of file FederatorManagerImpl.h.
UpdateListener<SubscriptionUpdate, SubscriptionUpdateDataReader> OpenDDS::Federator::ManagerImpl::subscriptionListener_ [private] |
SubscriptionUpdate listener.
Definition at line 261 of file FederatorManagerImpl.h.
Referenced by finalize(), and initialize().
SubscriptionUpdateDataWriter_var OpenDDS::Federator::ManagerImpl::subscriptionWriter_ [private] |
SubscriptionUpdate writer.
Definition at line 276 of file FederatorManagerImpl.h.
Referenced by create(), destroy(), initialize(), and update().
UpdateListener<TopicUpdate, TopicUpdateDataReader> OpenDDS::Federator::ManagerImpl::topicListener_ [private] |
TopicUpdate listener.
Definition at line 252 of file FederatorManagerImpl.h.
Referenced by finalize(), and initialize().
TopicUpdateDataWriter_var OpenDDS::Federator::ManagerImpl::topicWriter_ [private] |
TopicUpdate writer.
Definition at line 267 of file FederatorManagerImpl.h.
Referenced by create(), destroy(), initialize(), and update().