#include <FederatorManagerImpl.h>
Public Member Functions | |
ManagerImpl (Config &config) | |
virtual | ~ManagerImpl () |
virtual CORBA::Boolean | discover_federation (const char *ior) |
virtual Manager_ptr | join_federation (Manager_ptr peer, FederationDomain federation) |
virtual void | leave_federation (RepoKey id) |
virtual RepoKey | federation_id () |
virtual OpenDDS::DCPS::DCPSInfo_ptr | repository () |
virtual void | initializeOwner (const OpenDDS::Federator::OwnerUpdate &data) |
virtual void | initializeTopic (const OpenDDS::Federator::TopicUpdate &data) |
virtual void | initializeParticipant (const OpenDDS::Federator::ParticipantUpdate &data) |
virtual void | initializePublication (const OpenDDS::Federator::PublicationUpdate &data) |
virtual void | initializeSubscription (const OpenDDS::Federator::SubscriptionUpdate &data) |
virtual void | leave_and_shutdown () |
virtual void | shutdown () |
void | initialize () |
Establish the update publications and subscriptions. | |
void | finalize () |
Release resources gracefully. | |
TAO_DDS_DCPSInfo_i *& | info () |
Accessors for the DCPSInfo reference. | |
TAO_DDS_DCPSInfo_i * | info () const |
void | localRepo (::OpenDDS::DCPS::DCPSInfo_ptr repo) |
Capture a remote callable reference to the DCPSInfo. | |
const TAO_DDS_DCPSFederationId & | id () const |
Accessors for the federation Id value. | |
CORBA::ORB_ptr | orb () |
Accessors for the ORB. | |
void | orb (CORBA::ORB_ptr value) |
void | pushState (Manager_ptr peer) |
Push our current state to a remote repository. | |
void | processDeferred () |
Handle any deferred updates that might have become processable. | |
virtual void | unregisterCallback () |
virtual void | requestImage () |
virtual void | create (const Update::UTopic &topic) |
virtual void | create (const Update::UParticipant &participant) |
virtual void | create (const Update::URActor &reader) |
virtual void | create (const Update::UWActor &writer) |
virtual void | create (const Update::OwnershipData &data) |
virtual void | update (const Update::IdPath &id, const DDS::DomainParticipantQos &qos) |
virtual void | update (const Update::IdPath &id, const DDS::TopicQos &qos) |
virtual void | update (const Update::IdPath &id, const DDS::DataWriterQos &qos) |
virtual void | update (const Update::IdPath &id, const DDS::PublisherQos &qos) |
virtual void | update (const Update::IdPath &id, const DDS::DataReaderQos &qos) |
virtual void | update (const Update::IdPath &id, const DDS::SubscriberQos &qos) |
virtual void | update (const Update::IdPath &id, const DDS::StringSeq &exprParams) |
virtual void | destroy (const Update::IdPath &id, Update::ItemType type, Update::ActorType actor) |
Propagate that an entity has been destroyed. | |
void | processCreate (const OwnerUpdate *sample, const DDS::SampleInfo *info) |
Null implementation for OwnerUpdate samples. | |
void | processCreate (const PublicationUpdate *sample, const DDS::SampleInfo *info) |
Create a proxy for a new publication. | |
void | processCreate (const SubscriptionUpdate *sample, const DDS::SampleInfo *info) |
Create a proxy for a new subscription. | |
void | processCreate (const ParticipantUpdate *sample, const DDS::SampleInfo *info) |
Create a proxy for a new participant. | |
void | processCreate (const TopicUpdate *sample, const DDS::SampleInfo *info) |
Create a proxy for a new topic. | |
void | processUpdateQos1 (const OwnerUpdate *sample, const DDS::SampleInfo *info) |
Process ownership changes. | |
void | processUpdateQos1 (const PublicationUpdate *sample, const DDS::SampleInfo *info) |
Update the proxy DataWriterQos for a publication. | |
void | processUpdateQos2 (const PublicationUpdate *sample, const DDS::SampleInfo *info) |
Update the proxy PublisherQos for a publication. | |
void | processUpdateQos1 (const SubscriptionUpdate *sample, const DDS::SampleInfo *info) |
Update the proxy DataReaderQos for a subscription. | |
void | processUpdateQos2 (const SubscriptionUpdate *sample, const DDS::SampleInfo *info) |
Update the proxy SubscriberQos for a subscription. | |
void | processUpdateFilterExpressionParams (const SubscriptionUpdate *sample, const DDS::SampleInfo *info) |
Update the proxy filter expression params for a subscription. | |
void | processUpdateQos1 (const ParticipantUpdate *sample, const DDS::SampleInfo *info) |
Update the proxy ParticipantQos for a participant. | |
void | processUpdateQos1 (const TopicUpdate *sample, const DDS::SampleInfo *info) |
Update the proxy TopicQos for a topic. | |
void | processDelete (const OwnerUpdate *sample, const DDS::SampleInfo *info) |
Null implementation for OwnerUpdate samples. | |
void | processDelete (const PublicationUpdate *sample, const DDS::SampleInfo *info) |
Delete a proxy for a publication. | |
void | processDelete (const SubscriptionUpdate *sample, const DDS::SampleInfo *info) |
Delete a proxy for a subscription. | |
void | processDelete (const ParticipantUpdate *sample, const DDS::SampleInfo *info) |
Delete a proxy for a participant. | |
void | processDelete (const TopicUpdate *sample, const DDS::SampleInfo *info) |
Delete a proxy for a topic. | |
Private Types | |
typedef std::map< RepoKey, Manager_var > | IdToManagerMap |
Map type to hold references to federated repository Managers. | |
Private Attributes | |
ACE_SYNCH_MUTEX | lock_ |
Critical section MUTEX. | |
ACE_Condition< ACE_SYNCH_MUTEX > | joining_ |
Condition used to gate joining activities. | |
RepoKey | joiner_ |
Simple recursion avoidance during the join operations. | |
RepoKey | joinRepo_ |
Repository to which we joined. | |
bool | federated_ |
IdToManagerMap | peers_ |
The peer with which we have federated. | |
OpenDDS::DCPS::SequenceNumber | sequence_ |
The packet sequence number for data that we publish. | |
Config & | config_ |
The configuration information for this manager. | |
TAO_DDS_DCPSInfo_i * | info_ |
The Info object reference to update. | |
OpenDDS::DCPS::DCPSInfo_var | localRepo_ |
Remotely callable reference to the local repository. | |
CORBA::ORB_var | orb_ |
The ORB in which we are activated. | |
InfoRepoMulticastResponder | multicastResponder_ |
Multicast responder. | |
DDS::DomainParticipant_var | federationParticipant_ |
local DomainParticipant | |
UpdateListener< OwnerUpdate, OwnerUpdateDataReader > | ownerListener_ |
TopicUpdate listener. | |
UpdateListener< TopicUpdate, TopicUpdateDataReader > | topicListener_ |
TopicUpdate listener. | |
UpdateListener < ParticipantUpdate, ParticipantUpdateDataReader > | participantListener_ |
ParticipantUpdate listener. | |
UpdateListener < PublicationUpdate, PublicationUpdateDataReader > | publicationListener_ |
PublicationUpdate listener. | |
UpdateListener < SubscriptionUpdate, SubscriptionUpdateDataReader > | subscriptionListener_ |
SubscriptionUpdate listener. | |
OwnerUpdateDataWriter_var | ownerWriter_ |
TopicUpdate writer. | |
TopicUpdateDataWriter_var | topicWriter_ |
TopicUpdate writer. | |
ParticipantUpdateDataWriter_var | participantWriter_ |
ParticipantUpdate writer. | |
PublicationUpdateDataWriter_var | publicationWriter_ |
PublicationUpdate writer. | |
SubscriptionUpdateDataWriter_var | subscriptionWriter_ |
SubscriptionUpdate writer. | |
std::list< OwnerUpdate > | deferredOwnerships_ |
Deferred ownership updates. | |
std::list< TopicUpdate > | deferredTopics_ |
Deferred topic updates. | |
std::list< PublicationUpdate > | deferredPublications_ |
Deferred publication updates. | |
std::list< SubscriptionUpdate > | deferredSubscriptions_ |
Deferred subscription updates. | |
bool | multicastEnabled_ |
Is multicast enabled? | |
ACE_Thread_Mutex | deferred_lock_ |
Protect deferred updates. |
Definition at line 36 of file FederatorManagerImpl.h.
typedef std::map<RepoKey, Manager_var> OpenDDS::Federator::ManagerImpl::IdToManagerMap [private] |
Map type to hold references to federated repository Managers.
Definition at line 222 of file FederatorManagerImpl.h.
OpenDDS::Federator::ManagerImpl::ManagerImpl | ( | Config & | config | ) |
Definition at line 39 of file FederatorManagerImpl.cpp.
References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, ACE_OS::getenv(), LM_DEBUG, and multicastEnabled_.
00040 : joining_(this->lock_), 00041 joiner_(NIL_REPOSITORY), 00042 joinRepo_(NIL_REPOSITORY), 00043 federated_(false), 00044 config_(config), 00045 info_(0), 00046 ownerListener_(*this), 00047 topicListener_(*this), 00048 participantListener_(*this), 00049 publicationListener_(*this), 00050 subscriptionListener_(*this), 00051 multicastEnabled_(false) 00052 { 00053 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 00054 ACE_DEBUG((LM_DEBUG, 00055 ACE_TEXT("(%P|%t) Federator::ManagerImpl::ManagerImpl()\n"))); 00056 } 00057 00058 char* mdec = ACE_OS::getenv("MulticastDiscoveryEnabled"); 00059 00060 if (mdec != 0) { 00061 std::string mde(ACE_OS::getenv("MulticastDiscoveryEnabled")); 00062 00063 if (mde != "0") { 00064 multicastEnabled_ = true; 00065 } 00066 } 00067 }
OpenDDS::Federator::ManagerImpl::~ManagerImpl | ( | ) | [virtual] |
Definition at line 69 of file FederatorManagerImpl.cpp.
References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, and LM_DEBUG.
00070 { 00071 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 00072 ACE_DEBUG((LM_DEBUG, 00073 ACE_TEXT("(%P|%t) Federator::ManagerImpl::~ManagerImpl()\n"))); 00074 } 00075 }
void OpenDDS::Federator::ManagerImpl::create | ( | const Update::OwnershipData & | data | ) | [virtual] |
Propagate an entity has been created.
Implements Update::Updater.
Definition at line 179 of file FederatorManagerImpl_updates.cpp.
References ACE_TEXT(), OpenDDS::Federator::OwnerUpdate::action, OpenDDS::Federator::CreateEntity, OpenDDS::DCPS::DCPS_debug_level, Update::OwnershipData::domain, OpenDDS::Federator::OwnerUpdate::domain, DDS::HANDLE_NIL, TAO_DDS_DCPSFederationId::id(), id(), CORBA::is_nil(), LM_DEBUG, Update::OwnershipData::owner, OpenDDS::Federator::OwnerUpdate::owner, ownerWriter_, Update::OwnershipData::participant, OpenDDS::Federator::OwnerUpdate::participant, and OpenDDS::Federator::OwnerUpdate::sender.
00180 { 00181 if (CORBA::is_nil(this->ownerWriter_.in())) { 00182 // Decline to publish data until we can. 00183 return; 00184 } 00185 00186 OwnerUpdate sample; 00187 sample.sender = this->id().id(); 00188 sample.action = CreateEntity; 00189 00190 sample.domain = data.domain; 00191 sample.participant = data.participant; 00192 sample.owner = data.owner; 00193 00194 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 00195 OpenDDS::DCPS::RepoIdConverter converter(sample.participant); 00196 ACE_DEBUG((LM_DEBUG, 00197 ACE_TEXT("(%P|%t) Federator::ManagerImpl::create( OwnerUpdate): ") 00198 ACE_TEXT("repo %d - [ domain %d/ participant %C/ sender %d/ owner %d ]\n"), 00199 this->id().id(), 00200 sample.domain, 00201 std::string(converter).c_str(), 00202 sample.sender, 00203 sample.owner)); 00204 } 00205 00206 this->ownerWriter_->write(sample, DDS::HANDLE_NIL); 00207 }
virtual void OpenDDS::Federator::ManagerImpl::create | ( | const Update::UWActor & | actor | ) | [virtual] |
Propagate an entity has been created.
Implements Update::Updater.
void OpenDDS::Federator::ManagerImpl::create | ( | const Update::URActor & | actor | ) | [virtual] |
Propagate an entity has been created.
Implements Update::Updater.
Definition at line 104 of file FederatorManagerImpl_updates.cpp.
References ACE_TEXT(), OpenDDS::Federator::SubscriptionUpdate::action, Update::ActorStrt< PSQ, RWQ, C, T, CSP >::actorId, Update::ActorStrt< PSQ, RWQ, C, T, CSP >::callback, OpenDDS::Federator::SubscriptionUpdate::callback, Update::ActorStrt< PSQ, RWQ, C, T, CSP >::contentSubscriptionProfile, OpenDDS::Federator::CreateEntity, OpenDDS::Federator::SubscriptionUpdate::datareader_qos, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::SubscriptionUpdate::domain, Update::ActorStrt< PSQ, RWQ, C, T, CSP >::domainId, Update::ActorStrt< PSQ, RWQ, C, T, CSP >::drdwQos, OpenDDS::Federator::SubscriptionUpdate::expression_params, OpenDDS::Federator::SubscriptionUpdate::filter_class_name, OpenDDS::Federator::SubscriptionUpdate::filter_expression, DDS::HANDLE_NIL, OpenDDS::Federator::SubscriptionUpdate::id, TAO_DDS_DCPSFederationId::id(), id(), CORBA::is_nil(), LM_DEBUG, OpenDDS::Federator::SubscriptionUpdate::participant, Update::ActorStrt< PSQ, RWQ, C, T, CSP >::participantId, Update::ActorStrt< PSQ, RWQ, C, T, CSP >::pubsubQos, OpenDDS::Federator::SubscriptionUpdate::sender, OpenDDS::Federator::SubscriptionUpdate::subscriber_qos, subscriptionWriter_, OpenDDS::Federator::SubscriptionUpdate::topic, Update::ActorStrt< PSQ, RWQ, C, T, CSP >::topicId, OpenDDS::Federator::SubscriptionUpdate::transport_info, and Update::ActorStrt< PSQ, RWQ, C, T, CSP >::transportInterfaceInfo.
00105 { 00106 if (CORBA::is_nil(this->subscriptionWriter_.in())) { 00107 // Decline to publish data until we can. 00108 return; 00109 } 00110 00111 SubscriptionUpdate sample; 00112 sample.sender = this->id().id(); 00113 sample.action = CreateEntity; 00114 00115 sample.domain = reader.domainId; 00116 sample.participant = reader.participantId; 00117 sample.topic = reader.topicId; 00118 sample.id = reader.actorId; 00119 sample.callback = reader.callback.c_str(); 00120 sample.datareader_qos = reader.drdwQos; 00121 sample.subscriber_qos = reader.pubsubQos; 00122 sample.transport_info = reader.transportInterfaceInfo; 00123 sample.filter_class_name = reader.contentSubscriptionProfile.filterClassName; 00124 sample.filter_expression = reader.contentSubscriptionProfile.filterExpr; 00125 sample.expression_params = reader.contentSubscriptionProfile.exprParams; 00126 00127 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 00128 OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant); 00129 OpenDDS::DCPS::RepoIdConverter sub_converter(sample.id); 00130 ACE_DEBUG((LM_DEBUG, 00131 ACE_TEXT("(%P|%t) Federator::ManagerImpl::create( SubscriptionUpdate): ") 00132 ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"), 00133 this->id().id(), 00134 sample.domain, 00135 std::string(part_converter).c_str(), 00136 std::string(sub_converter).c_str())); 00137 } 00138 00139 this->subscriptionWriter_->write(sample, DDS::HANDLE_NIL); 00140 }
void OpenDDS::Federator::ManagerImpl::create | ( | const Update::UParticipant & | participant | ) | [virtual] |
Propagate an entity has been created.
Implements Update::Updater.
Definition at line 74 of file FederatorManagerImpl_updates.cpp.
References ACE_TEXT(), OpenDDS::Federator::ParticipantUpdate::action, OpenDDS::Federator::CreateEntity, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::ParticipantUpdate::domain, Update::ParticipantStrt< Q >::domainId, DDS::HANDLE_NIL, OpenDDS::Federator::ParticipantUpdate::id, TAO_DDS_DCPSFederationId::id(), id(), CORBA::is_nil(), LM_DEBUG, Update::ParticipantStrt< Q >::owner, OpenDDS::Federator::ParticipantUpdate::owner, Update::ParticipantStrt< Q >::participantId, Update::ParticipantStrt< Q >::participantQos, participantWriter_, OpenDDS::Federator::ParticipantUpdate::qos, and OpenDDS::Federator::ParticipantUpdate::sender.
00075 { 00076 if (CORBA::is_nil(this->participantWriter_.in())) { 00077 // Decline to publish data until we can. 00078 return; 00079 } 00080 00081 ParticipantUpdate sample; 00082 sample.sender = this->id().id(); 00083 sample.action = CreateEntity; 00084 00085 sample.owner = participant.owner; 00086 sample.domain = participant.domainId; 00087 sample.id = participant.participantId; 00088 sample.qos = participant.participantQos; 00089 00090 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 00091 OpenDDS::DCPS::RepoIdConverter converter(sample.id); 00092 ACE_DEBUG((LM_DEBUG, 00093 ACE_TEXT("(%P|%t) Federator::ManagerImpl::create( ParticipantUpdate): ") 00094 ACE_TEXT("repo %d - [ domain %d/ participant %C ]\n"), 00095 this->id().id(), 00096 sample.domain, 00097 std::string(converter).c_str())); 00098 } 00099 00100 this->participantWriter_->write(sample, DDS::HANDLE_NIL); 00101 }
void OpenDDS::Federator::ManagerImpl::create | ( | const Update::UTopic & | topic | ) | [virtual] |
Propagate an entity has been created.
Implements Update::Updater.
Definition at line 40 of file FederatorManagerImpl_updates.cpp.
References ACE_TEXT(), OpenDDS::Federator::TopicUpdate::action, OpenDDS::Federator::CreateEntity, Update::TopicStrt< Q, S >::dataType, OpenDDS::Federator::TopicUpdate::datatype, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::TopicUpdate::domain, Update::TopicStrt< Q, S >::domainId, DDS::HANDLE_NIL, OpenDDS::Federator::TopicUpdate::id, TAO_DDS_DCPSFederationId::id(), id(), CORBA::is_nil(), LM_DEBUG, Update::TopicStrt< Q, S >::name, OpenDDS::Federator::TopicUpdate::participant, Update::TopicStrt< Q, S >::participantId, OpenDDS::Federator::TopicUpdate::qos, OpenDDS::Federator::TopicUpdate::sender, OpenDDS::Federator::TopicUpdate::topic, Update::TopicStrt< Q, S >::topicId, Update::TopicStrt< Q, S >::topicQos, and topicWriter_.
00041 { 00042 if (CORBA::is_nil(this->topicWriter_.in())) { 00043 // Decline to publish data until we can. 00044 return; 00045 } 00046 00047 TopicUpdate sample; 00048 sample.sender = this->id().id(); 00049 sample.action = CreateEntity; 00050 00051 sample.id = topic.topicId; 00052 sample.domain = topic.domainId; 00053 sample.participant = topic.participantId; 00054 sample.topic = topic.name.c_str(); 00055 sample.datatype = topic.dataType.c_str(); 00056 sample.qos = topic.topicQos; 00057 00058 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 00059 OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant); 00060 OpenDDS::DCPS::RepoIdConverter topic_converter(sample.id); 00061 ACE_DEBUG((LM_DEBUG, 00062 ACE_TEXT("(%P|%t) Federator::ManagerImpl::create( TopicUpdate): ") 00063 ACE_TEXT("repo %d - [ domain %d/ participant %C/ topic %C ]\n"), 00064 this->id().id(), 00065 sample.domain, 00066 std::string(part_converter).c_str(), 00067 std::string(topic_converter).c_str())); 00068 } 00069 00070 this->topicWriter_->write(sample, DDS::HANDLE_NIL); 00071 }
void OpenDDS::Federator::ManagerImpl::destroy | ( | const Update::IdPath & | id, | |
Update::ItemType | type, | |||
Update::ActorType | actor | |||
) | [virtual] |
Propagate that an entity has been destroyed.
Implements Update::Updater.
Definition at line 210 of file FederatorManagerImpl_updates.cpp.
References ACE_TEXT(), OpenDDS::Federator::SubscriptionUpdate::action, OpenDDS::Federator::PublicationUpdate::action, OpenDDS::Federator::ParticipantUpdate::action, OpenDDS::Federator::TopicUpdate::action, Update::Actor, config_, Update::DataReader, Update::DataWriter, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::DestroyEntity, OpenDDS::Federator::SubscriptionUpdate::domain, OpenDDS::Federator::PublicationUpdate::domain, OpenDDS::Federator::ParticipantUpdate::domain, OpenDDS::Federator::TopicUpdate::domain, OpenDDS::Federator::Config::federationDomain(), DDS::HANDLE_NIL, OpenDDS::Federator::SubscriptionUpdate::id, OpenDDS::Federator::PublicationUpdate::id, OpenDDS::Federator::ParticipantUpdate::id, OpenDDS::Federator::TopicUpdate::id, TAO_DDS_DCPSFederationId::id(), id(), CORBA::is_nil(), LM_DEBUG, OpenDDS::Federator::SubscriptionUpdate::participant, OpenDDS::Federator::PublicationUpdate::participant, Update::Participant, OpenDDS::Federator::TopicUpdate::participant, participantWriter_, publicationWriter_, OpenDDS::Federator::SubscriptionUpdate::sender, OpenDDS::Federator::PublicationUpdate::sender, OpenDDS::Federator::ParticipantUpdate::sender, OpenDDS::Federator::TopicUpdate::sender, subscriptionWriter_, Update::Topic, and topicWriter_.
00214 { 00215 // 00216 // Do not propagate any destroy() messages within the FederationDomain. 00217 // This domain will be managed separately. 00218 // 00219 if (id.domain == this->config_.federationDomain()) { 00220 return; 00221 } 00222 00223 switch (type) { 00224 case Update::Topic: { 00225 if (CORBA::is_nil(this->topicWriter_.in())) { 00226 // Decline to publish data until we can. 00227 return; 00228 } 00229 00230 TopicUpdate sample; 00231 sample.sender = this->id().id(); 00232 sample.action = DestroyEntity; 00233 00234 sample.id = id.id; 00235 sample.domain = id.domain; 00236 sample.participant = id.participant; 00237 00238 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 00239 OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant); 00240 OpenDDS::DCPS::RepoIdConverter topic_converter(sample.id); 00241 ACE_DEBUG((LM_DEBUG, 00242 ACE_TEXT("(%P|%t) Federator::ManagerImpl::destroy( TopicUpdate): ") 00243 ACE_TEXT("repo %d - [ domain %d/ participant %C/ topic %C ]\n"), 00244 this->id().id(), 00245 sample.domain, 00246 std::string(part_converter).c_str(), 00247 std::string(topic_converter).c_str())); 00248 } 00249 00250 this->topicWriter_->write(sample, DDS::HANDLE_NIL); 00251 } 00252 break; 00253 00254 case Update::Participant: { 00255 if (CORBA::is_nil(this->participantWriter_.in())) { 00256 // Decline to publish data until we can. 00257 return; 00258 } 00259 00260 ParticipantUpdate sample; 00261 sample.sender = this->id().id(); 00262 sample.action = DestroyEntity; 00263 00264 sample.domain = id.domain; 00265 sample.id = id.id; 00266 00267 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 00268 OpenDDS::DCPS::RepoIdConverter converter(sample.id); 00269 ACE_DEBUG((LM_DEBUG, 00270 ACE_TEXT("(%P|%t) Federator::ManagerImpl::destroy( ParticipantUpdate): ") 00271 ACE_TEXT("repo %d - [ domain %d/ participant %C ]\n"), 00272 this->id().id(), 00273 sample.domain, 00274 std::string(converter).c_str())); 00275 } 00276 00277 this->participantWriter_->write(sample, DDS::HANDLE_NIL); 00278 } 00279 break; 00280 00281 case Update::Actor: 00282 00283 // This is VERY annoying. 00284 switch (actor) { 00285 case Update::DataWriter: { 00286 if (CORBA::is_nil(this->publicationWriter_.in())) { 00287 // Decline to publish data until we can. 00288 return; 00289 } 00290 00291 PublicationUpdate sample; 00292 sample.sender = this->id().id(); 00293 sample.action = DestroyEntity; 00294 00295 sample.domain = id.domain; 00296 sample.participant = id.participant; 00297 sample.id = id.id; 00298 00299 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 00300 OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant); 00301 OpenDDS::DCPS::RepoIdConverter pub_converter(sample.id); 00302 ACE_DEBUG((LM_DEBUG, 00303 ACE_TEXT("(%P|%t) Federator::ManagerImpl::destroy( PublicationUpdate): ") 00304 ACE_TEXT("repo %d - [ domain %d/ participant %C/ publication %C ]\n"), 00305 this->id().id(), 00306 sample.domain, 00307 std::string(part_converter).c_str(), 00308 std::string(pub_converter).c_str())); 00309 } 00310 00311 this->publicationWriter_->write(sample, DDS::HANDLE_NIL); 00312 } 00313 break; 00314 00315 case Update::DataReader: { 00316 if (CORBA::is_nil(this->subscriptionWriter_.in())) { 00317 // Decline to publish data until we can. 00318 return; 00319 } 00320 00321 SubscriptionUpdate sample; 00322 sample.sender = this->id().id(); 00323 sample.action = DestroyEntity; 00324 00325 sample.domain = id.domain; 00326 sample.participant = id.participant; 00327 sample.id = id.id; 00328 00329 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 00330 OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant); 00331 OpenDDS::DCPS::RepoIdConverter sub_converter(sample.id); 00332 ACE_DEBUG((LM_DEBUG, 00333 ACE_TEXT("(%P|%t) Federator::ManagerImpl::destroy( SubscriptionUpdate): ") 00334 ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"), 00335 this->id().id(), 00336 sample.domain, 00337 std::string(part_converter).c_str(), 00338 std::string(sub_converter).c_str())); 00339 } 00340 00341 this->subscriptionWriter_->write(sample, DDS::HANDLE_NIL); 00342 } 00343 break; 00344 } 00345 00346 break; 00347 } 00348 }
CORBA::Boolean OpenDDS::Federator::ManagerImpl::discover_federation | ( | const char * | ior | ) | [virtual] |
: Implement this.
Definition at line 904 of file FederatorManagerImpl.cpp.
References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, and LM_DEBUG.
00905 { 00906 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 00907 ACE_DEBUG((LM_DEBUG, 00908 ACE_TEXT("(%P|%t) ManagerImpl::discover_federation( %C)\n"), 00909 ior)); 00910 } 00911 00912 ///@TODO: Implement this. 00913 return false; 00914 }
RepoKey OpenDDS::Federator::ManagerImpl::federation_id | ( | ) | [virtual] |
Definition at line 867 of file FederatorManagerImpl.cpp.
References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, TAO_DDS_DCPSFederationId::id(), id(), and LM_DEBUG.
00868 { 00869 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 00870 ACE_DEBUG((LM_DEBUG, 00871 ACE_TEXT("(%P|%t) ManagerImpl::federation_id()\n"))); 00872 } 00873 00874 return this->id().id(); 00875 }
void OpenDDS::Federator::ManagerImpl::finalize | ( | void | ) |
Release resources gracefully.
Definition at line 789 of file FederatorManagerImpl.cpp.
References CORBA::Exception::_tao_print_exception(), ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, ACE_Event_Handler::DONT_CALL, federated_, federationParticipant_, TAO_Pseudo_Var_T< T >::in(), CORBA::is_nil(), OpenDDS::Federator::UpdateListener< DataType, ReaderType >::join(), joinRepo_, LM_DEBUG, LM_ERROR, multicastResponder_, orb_, ownerListener_, participantListener_, peers_, publicationListener_, ACE_Event_Handler::READ_MASK, DDS::RETCODE_PRECONDITION_NOT_MET, OpenDDS::Federator::UpdateListener< DataType, ReaderType >::stop(), subscriptionListener_, TheParticipantFactory, and topicListener_.
Referenced by InfoRepo::finalize(), and InfoRepo::handle_exception().
00790 { 00791 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 00792 ACE_DEBUG((LM_DEBUG, 00793 ACE_TEXT("(%P|%t) Federator::ManagerImpl::finalize()\n"))); 00794 } 00795 00796 ownerListener_.stop(); 00797 topicListener_.stop(); 00798 participantListener_.stop(); 00799 publicationListener_.stop(); 00800 subscriptionListener_.stop(); 00801 ownerListener_.join(); 00802 topicListener_.join(); 00803 participantListener_.join(); 00804 publicationListener_.join(); 00805 subscriptionListener_.join(); 00806 00807 if (this->federated_) { 00808 try { 00809 IdToManagerMap::iterator where = this->peers_.find(this->joinRepo_); 00810 00811 if (where == this->peers_.end()) { 00812 ACE_DEBUG((LM_DEBUG, 00813 ACE_TEXT("(%P|%t) Federator::Manager::finalize: ") 00814 ACE_TEXT("repository %d - all attachment to federation left.\n"), 00815 this->id().id())); 00816 00817 } else { 00818 if (CORBA::is_nil(where->second.in())) { 00819 ACE_ERROR((LM_ERROR, 00820 ACE_TEXT("(%P|%t) ERROR: Federator::Manager::finalize: ") 00821 ACE_TEXT("repository %d not currently attached to a federation.\n"), 00822 this->id().id())); 00823 00824 } else { 00825 where->second->leave_federation(this->id().id()); 00826 this->federated_ = false; 00827 } 00828 } 00829 00830 } catch (const CORBA::Exception& ex) { 00831 ex._tao_print_exception( 00832 ACE_TEXT("ERROR: Federator::ManagerImpl::finalize() - ") 00833 ACE_TEXT("unable to leave remote federation ")); 00834 throw Incomplete(); 00835 } 00836 } 00837 00838 if (!CORBA::is_nil(this->orb_.in()) && (0 != this->orb_->orb_core())) { 00839 this->orb_->orb_core()->reactor()->remove_handler( 00840 &this->multicastResponder_, 00841 ACE_Event_Handler::READ_MASK | ACE_Event_Handler::DONT_CALL); 00842 } 00843 00844 // Remove our local participant and contained entities. 00845 if (0 == CORBA::is_nil(this->federationParticipant_.in())) { 00846 DDS::DomainParticipantFactory_var dpf = TheParticipantFactory; 00847 if (DDS::RETCODE_PRECONDITION_NOT_MET 00848 == this->federationParticipant_->delete_contained_entities()) { 00849 ACE_ERROR((LM_ERROR, 00850 ACE_TEXT("(%P|%t) ERROR: Federator::Manager ") 00851 ACE_TEXT("unable to release resources for repository %d.\n"), 00852 this->id().id())); 00853 00854 } else if (DDS::RETCODE_PRECONDITION_NOT_MET 00855 == dpf->delete_participant(this->federationParticipant_.in())) { 00856 ACE_ERROR((LM_ERROR, 00857 ACE_TEXT("(%P|%t) ERROR: Federator::Manager ") 00858 ACE_TEXT("unable to release the participant for repository %d.\n"), 00859 this->id().id())); 00860 } 00861 } 00862 }
OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL ACE_INLINE const TAO_DDS_DCPSFederationId & OpenDDS::Federator::ManagerImpl::id | ( | void | ) | const |
Accessors for the federation Id value.
Definition at line 12 of file FederatorManagerImpl.inl.
References config_, and OpenDDS::Federator::Config::federationId().
Referenced by create(), destroy(), federation_id(), InfoRepo::init(), pushState(), and update().
00013 { 00014 return this->config_.federationId(); 00015 }
ACE_INLINE TAO_DDS_DCPSInfo_i * OpenDDS::Federator::ManagerImpl::info | ( | void | ) | const |
Definition at line 26 of file FederatorManagerImpl.inl.
References info_.
00027 { 00028 return this->info_; 00029 }
ACE_INLINE TAO_DDS_DCPSInfo_i *& OpenDDS::Federator::ManagerImpl::info | ( | void | ) |
Accessors for the DCPSInfo reference.
Definition at line 19 of file FederatorManagerImpl.inl.
References info_.
Referenced by InfoRepo::init().
00020 { 00021 return this->info_; 00022 }
void OpenDDS::Federator::ManagerImpl::initialize | ( | void | ) |
Establish the update publications and subscriptions.
Definition at line 78 of file FederatorManagerImpl.cpp.
References ACE_TEXT(), ACE_OS::atoi(), config_, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::TransportRegistry::DEFAULT_INST_PREFIX, OpenDDS::DCPS::DEFAULT_STATUS_MASK, OpenDDS::Federator::Defaults::DiscoveryRequestPort, DDS::DataWriterQos::durability, DDS::DataReaderQos::durability, OpenDDS::Federator::Config::federationDomain(), OpenDDS::Federator::UpdateListener< DataType, ReaderType >::federationId(), federationParticipant_, OpenDDS::DCPS::DataWriterImpl::get_publication_id(), OpenDDS::DCPS::DataReaderImpl::get_subscription_id(), ACE_OS::getenv(), DDS::DataWriterQos::history, DDS::DataReaderQos::history, OpenDDS::DCPS::RcHandle< T >::in(), OpenDDS::Federator::InfoRepoMulticastResponder::init(), OpenDDS::DCPS::TransportRegistry::instance(), CORBA::is_nil(), DDS::KEEP_LAST_HISTORY_QOS, LM_DEBUG, LM_ERROR, LM_WARNING, multicastEnabled_, multicastResponder_, orb_, ownerListener_, OpenDDS::Federator::OWNERUPDATETOPICNAME, OpenDDS::Federator::OWNERUPDATETYPENAME, ownerWriter_, PARTICIPANT_QOS_DEFAULT, participantListener_, OpenDDS::Federator::PARTICIPANTUPDATETOPICNAME, OpenDDS::Federator::PARTICIPANTUPDATETYPENAME, participantWriter_, publicationListener_, OpenDDS::Federator::PUBLICATIONUPDATETOPICNAME, OpenDDS::Federator::PUBLICATIONUPDATETYPENAME, publicationWriter_, PUBLISHER_QOS_DEFAULT, ACE_Event_Handler::READ_MASK, ACE_Reactor::register_handler(), DDS::DataWriterQos::reliability, DDS::DataReaderQos::reliability, DDS::RELIABLE_RELIABILITY_QOS, DDS::RETCODE_OK, SUBSCRIBER_QOS_DEFAULT, subscriptionListener_, OpenDDS::Federator::SUBSCRIPTIONUPDATETOPICNAME, OpenDDS::Federator::SUBSCRIPTIONUPDATETYPENAME, subscriptionWriter_, TheParticipantFactory, TOPIC_QOS_DEFAULT, topicListener_, OpenDDS::Federator::TOPICUPDATETOPICNAME, OpenDDS::Federator::TOPICUPDATETYPENAME, topicWriter_, and DDS::TRANSIENT_LOCAL_DURABILITY_QOS.
Referenced by join_federation().
00079 { 00080 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 00081 ACE_DEBUG((LM_DEBUG, 00082 ACE_TEXT("(%P|%t) Federation::ManagerImpl::initialize()\n"))); 00083 } 00084 00085 // Let the listeners know which repository we are to filter samples at 00086 // the earliest opportunity. 00087 this->ownerListener_.federationId(this->id()); 00088 this->topicListener_.federationId(this->id()); 00089 this->participantListener_.federationId(this->id()); 00090 this->publicationListener_.federationId(this->id()); 00091 this->subscriptionListener_.federationId(this->id()); 00092 00093 // Add participant for Federation domain 00094 DDS::DomainParticipantFactory_var dpf = TheParticipantFactory; 00095 this->federationParticipant_ 00096 = dpf->create_participant( 00097 this->config_.federationDomain(), 00098 PARTICIPANT_QOS_DEFAULT, 00099 DDS::DomainParticipantListener::_nil(), 00100 OpenDDS::DCPS::DEFAULT_STATUS_MASK); 00101 00102 if (CORBA::is_nil(this->federationParticipant_.in())) { 00103 ACE_ERROR((LM_ERROR, 00104 ACE_TEXT("(%P|%t) ERROR: create_participant failed for ") 00105 ACE_TEXT("repository %d in federation domain %d.\n"), 00106 this->id().id(), 00107 this->config_.federationDomain())); 00108 throw Incomplete(); 00109 } 00110 // 00111 // Add type support for update topics 00112 // 00113 00114 OwnerUpdateTypeSupportImpl::_var_type ownerUpdate = new OwnerUpdateTypeSupportImpl(); 00115 00116 if (DDS::RETCODE_OK != ownerUpdate->register_type( 00117 this->federationParticipant_.in(), 00118 OWNERUPDATETYPENAME)) { 00119 ACE_ERROR((LM_ERROR, 00120 ACE_TEXT("(%P|%t) ERROR: Unable to install ") 00121 ACE_TEXT("OwnerUpdate type support for repository %d.\n"), 00122 this->id().id())); 00123 throw Incomplete(); 00124 } 00125 00126 ParticipantUpdateTypeSupportImpl::_var_type participantUpdate = new ParticipantUpdateTypeSupportImpl(); 00127 00128 if (DDS::RETCODE_OK != participantUpdate->register_type( 00129 this->federationParticipant_.in(), 00130 PARTICIPANTUPDATETYPENAME)) { 00131 ACE_ERROR((LM_ERROR, 00132 ACE_TEXT("(%P|%t) ERROR: Unable to install ") 00133 ACE_TEXT("ParticipantUpdate type support for repository %d.\n"), 00134 this->id().id())); 00135 throw Incomplete(); 00136 } 00137 00138 TopicUpdateTypeSupportImpl::_var_type topicUpdate = new TopicUpdateTypeSupportImpl(); 00139 00140 if (DDS::RETCODE_OK != topicUpdate->register_type( 00141 this->federationParticipant_.in(), 00142 TOPICUPDATETYPENAME)) { 00143 ACE_ERROR((LM_ERROR, 00144 ACE_TEXT("(%P|%t) ERROR: Unable to install ") 00145 ACE_TEXT("TopicUpdate type support for repository %d.\n"), 00146 this->id().id())); 00147 throw Incomplete(); 00148 } 00149 00150 PublicationUpdateTypeSupportImpl::_var_type publicationUpdate = new PublicationUpdateTypeSupportImpl(); 00151 00152 if (DDS::RETCODE_OK != publicationUpdate->register_type( 00153 this->federationParticipant_.in(), 00154 PUBLICATIONUPDATETYPENAME)) { 00155 ACE_ERROR((LM_ERROR, 00156 ACE_TEXT("(%P|%t) ERROR: Unable to install ") 00157 ACE_TEXT("PublicationUpdate type support for repository %d.\n"), 00158 this->id().id())); 00159 throw Incomplete(); 00160 } 00161 00162 SubscriptionUpdateTypeSupportImpl::_var_type subscriptionUpdate = new SubscriptionUpdateTypeSupportImpl(); 00163 00164 if (DDS::RETCODE_OK != subscriptionUpdate->register_type( 00165 this->federationParticipant_.in(), 00166 SUBSCRIPTIONUPDATETYPENAME)) { 00167 ACE_ERROR((LM_ERROR, 00168 ACE_TEXT("(%P|%t) ERROR: Unable to install ") 00169 ACE_TEXT("SubscriptionUpdate type support for repository %d.\n"), 00170 this->id().id())); 00171 throw Incomplete(); 00172 } 00173 00174 // 00175 // Create a transport config for use with federation entities. 00176 // 00177 std::string config_name = 00178 OpenDDS::DCPS::TransportRegistry::DEFAULT_INST_PREFIX 00179 + std::string("FederationBITTransportConfig"); 00180 OpenDDS::DCPS::TransportConfig_rch config = 00181 OpenDDS::DCPS::TransportRegistry::instance()->create_config(config_name); 00182 00183 std::string inst_name = OpenDDS::DCPS::TransportRegistry::DEFAULT_INST_PREFIX 00184 + std::string("FederationBITTCPTransportInst"); 00185 OpenDDS::DCPS::TransportInst_rch inst = 00186 OpenDDS::DCPS::TransportRegistry::instance()->create_inst(inst_name, 00187 "tcp"); 00188 config->instances_.push_back(inst); 00189 00190 // 00191 // Create the subscriber for the update topics. 00192 // 00193 00194 DDS::Subscriber_var subscriber 00195 = this->federationParticipant_->create_subscriber( 00196 SUBSCRIBER_QOS_DEFAULT, 00197 DDS::SubscriberListener::_nil(), 00198 OpenDDS::DCPS::DEFAULT_STATUS_MASK); 00199 00200 if (CORBA::is_nil(subscriber.in())) { 00201 ACE_ERROR((LM_ERROR, 00202 ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ") 00203 ACE_TEXT("failed to create subscriber for repository %d\n"), 00204 this->id().id())); 00205 throw Incomplete(); 00206 00207 } else if (OpenDDS::DCPS::DCPS_debug_level > 4) { 00208 ACE_DEBUG((LM_DEBUG, 00209 ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ") 00210 ACE_TEXT("created federation subscriber for repository %d\n"), 00211 this->id().id())); 00212 00213 } 00214 00215 // Attach the transport to it. 00216 00217 try { 00218 OpenDDS::DCPS::TransportRegistry::instance()->bind_config(config, 00219 subscriber.in()); 00220 } catch (const OpenDDS::DCPS::Transport::Exception&) { 00221 ACE_ERROR((LM_ERROR, 00222 ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ") 00223 ACE_TEXT("failed to bind transport config to federation subscriber.\n"))); 00224 throw Incomplete(); 00225 } 00226 00227 // 00228 // Create the publisher for the update topics. 00229 // 00230 00231 DDS::Publisher_var publisher 00232 = this->federationParticipant_->create_publisher( 00233 PUBLISHER_QOS_DEFAULT, 00234 DDS::PublisherListener::_nil(), 00235 OpenDDS::DCPS::DEFAULT_STATUS_MASK); 00236 00237 if (CORBA::is_nil(publisher.in())) { 00238 ACE_ERROR((LM_ERROR, 00239 ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ") 00240 ACE_TEXT("failed to create publisher for repository %d\n"), 00241 this->id().id())); 00242 throw Incomplete(); 00243 00244 } else if (OpenDDS::DCPS::DCPS_debug_level > 4) { 00245 ACE_DEBUG((LM_DEBUG, 00246 ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ") 00247 ACE_TEXT("created federation publisher for repository %d\n"), 00248 this->id().id())); 00249 00250 } 00251 00252 // Attach the transport to it. 00253 00254 try { 00255 OpenDDS::DCPS::TransportRegistry::instance()->bind_config(config, 00256 publisher.in()); 00257 } catch (const OpenDDS::DCPS::Transport::Exception&) { 00258 ACE_ERROR((LM_ERROR, 00259 ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ") 00260 ACE_TEXT("failed to bind transport config to federation publisher.\n"))); 00261 throw Incomplete(); 00262 } 00263 00264 // 00265 // Some useful items for adding the subscriptions. 00266 // 00267 DDS::Topic_var topic; 00268 DDS::TopicDescription_var description; 00269 DDS::DataReader_var dataReader; 00270 DDS::DataWriter_var dataWriter; 00271 00272 DDS::DataReaderQos readerQos; 00273 subscriber->get_default_datareader_qos(readerQos); 00274 readerQos.durability.kind = DDS::TRANSIENT_LOCAL_DURABILITY_QOS; 00275 readerQos.history.kind = DDS::KEEP_LAST_HISTORY_QOS; 00276 readerQos.history.depth = 50; 00277 readerQos.reliability.kind = DDS::RELIABLE_RELIABILITY_QOS; 00278 readerQos.reliability.max_blocking_time.sec = 0; 00279 readerQos.reliability.max_blocking_time.nanosec = 0; 00280 00281 DDS::DataWriterQos writerQos; 00282 publisher->get_default_datawriter_qos(writerQos); 00283 writerQos.durability.kind = DDS::TRANSIENT_LOCAL_DURABILITY_QOS; 00284 writerQos.history.kind = DDS::KEEP_LAST_HISTORY_QOS; 00285 writerQos.history.depth = 50; 00286 writerQos.reliability.kind = DDS::RELIABLE_RELIABILITY_QOS; 00287 writerQos.reliability.max_blocking_time.sec = 0; 00288 writerQos.reliability.max_blocking_time.nanosec = 0; 00289 00290 // 00291 // Add update subscriptions 00292 // 00293 // NOTE: Its ok to lose the references to the objects here since they 00294 // are not needed after this point. The only thing we will do 00295 // with them is to destroy them, and that will be done via a 00296 // cascade delete from the participant. The listeners will 00297 // survive and can be used within other participants as well, 00298 // since the only state they retain is the manager, which is the 00299 // same for all. 00300 // 00301 00302 topic = this->federationParticipant_->create_topic( 00303 OWNERUPDATETOPICNAME, 00304 OWNERUPDATETYPENAME, 00305 TOPIC_QOS_DEFAULT, 00306 DDS::TopicListener::_nil(), 00307 OpenDDS::DCPS::DEFAULT_STATUS_MASK); 00308 00309 dataWriter = publisher->create_datawriter( 00310 topic.in(), 00311 writerQos, 00312 DDS::DataWriterListener::_nil(), 00313 OpenDDS::DCPS::DEFAULT_STATUS_MASK); 00314 00315 if (CORBA::is_nil(dataWriter.in())) { 00316 ACE_ERROR((LM_ERROR, 00317 ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ") 00318 ACE_TEXT("failed to create OwnerUpdate writer for repository %d\n"), 00319 this->id().id())); 00320 throw Incomplete(); 00321 } 00322 00323 this->ownerWriter_ = OwnerUpdateDataWriter::_narrow(dataWriter.in()); 00324 00325 if (::CORBA::is_nil(this->ownerWriter_.in())) { 00326 ACE_ERROR((LM_ERROR, 00327 ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ") 00328 ACE_TEXT("failed to extract typed OwnerUpdate writer.\n"))); 00329 throw Incomplete(); 00330 00331 } else if (OpenDDS::DCPS::DCPS_debug_level > 4) { 00332 OpenDDS::DCPS::DataWriterImpl* servant 00333 = dynamic_cast<OpenDDS::DCPS::DataWriterImpl*>(dataWriter.in()); 00334 00335 if (0 == servant) { 00336 ACE_DEBUG((LM_WARNING, 00337 ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ") 00338 ACE_TEXT("unable to extract typed OwnerUpdate writer.\n"))); 00339 00340 } else { 00341 OpenDDS::DCPS::RepoIdConverter converter(servant->get_publication_id()); 00342 ACE_DEBUG((LM_DEBUG, 00343 ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ") 00344 ACE_TEXT("created federation OwnerUpdate writer %C for repository %d\n"), 00345 std::string(converter).c_str(), 00346 this->id().id())); 00347 } 00348 } 00349 00350 description = this->federationParticipant_->lookup_topicdescription(OWNERUPDATETOPICNAME); 00351 dataReader = subscriber->create_datareader( 00352 description.in(), 00353 readerQos, 00354 &this->ownerListener_, 00355 OpenDDS::DCPS::DEFAULT_STATUS_MASK); 00356 00357 if (CORBA::is_nil(dataReader.in())) { 00358 ACE_ERROR((LM_ERROR, 00359 ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ") 00360 ACE_TEXT("failed to create OwnerUpdate reader for repository %d\n"), 00361 this->id().id())); 00362 throw Incomplete(); 00363 00364 } else if (OpenDDS::DCPS::DCPS_debug_level > 4) { 00365 OpenDDS::DCPS::DataReaderImpl* servant 00366 = dynamic_cast<OpenDDS::DCPS::DataReaderImpl*>(dataReader.in()); 00367 00368 if (0 == servant) { 00369 ACE_DEBUG((LM_WARNING, 00370 ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ") 00371 ACE_TEXT("unable to extract typed OwnerUpdate reader.\n"))); 00372 00373 } else { 00374 OpenDDS::DCPS::RepoIdConverter converter(servant->get_subscription_id()); 00375 ACE_DEBUG((LM_DEBUG, 00376 ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ") 00377 ACE_TEXT("created federation OwnerUpdate reader %C for repository %d\n"), 00378 std::string(converter).c_str(), 00379 this->id().id())); 00380 } 00381 } 00382 00383 topic = this->federationParticipant_->create_topic( 00384 TOPICUPDATETOPICNAME, 00385 TOPICUPDATETYPENAME, 00386 TOPIC_QOS_DEFAULT, 00387 DDS::TopicListener::_nil(), 00388 OpenDDS::DCPS::DEFAULT_STATUS_MASK); 00389 dataWriter = publisher->create_datawriter( 00390 topic.in(), 00391 writerQos, 00392 DDS::DataWriterListener::_nil(), 00393 OpenDDS::DCPS::DEFAULT_STATUS_MASK); 00394 00395 if (CORBA::is_nil(dataWriter.in())) { 00396 ACE_ERROR((LM_ERROR, 00397 ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ") 00398 ACE_TEXT("failed to create TopicUpdate writer for repository %d\n"), 00399 this->id().id())); 00400 throw Incomplete(); 00401 } 00402 00403 this->topicWriter_ 00404 = TopicUpdateDataWriter::_narrow(dataWriter.in()); 00405 00406 if (::CORBA::is_nil(this->topicWriter_.in())) { 00407 ACE_ERROR((LM_ERROR, 00408 ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ") 00409 ACE_TEXT("failed to extract typed TopicUpdate writer.\n"))); 00410 throw Incomplete(); 00411 00412 } else if (OpenDDS::DCPS::DCPS_debug_level > 4) { 00413 OpenDDS::DCPS::DataWriterImpl* servant 00414 = dynamic_cast<OpenDDS::DCPS::DataWriterImpl*>(dataWriter.in()); 00415 00416 if (0 == servant) { 00417 ACE_DEBUG((LM_WARNING, 00418 ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ") 00419 ACE_TEXT("unable to extract typed TopicUpdate writer.\n"))); 00420 00421 } else { 00422 OpenDDS::DCPS::RepoIdConverter converter(servant->get_publication_id()); 00423 ACE_DEBUG((LM_DEBUG, 00424 ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ") 00425 ACE_TEXT("created federation TopicUpdate writer %C for repository %d\n"), 00426 std::string(converter).c_str(), 00427 this->id().id())); 00428 } 00429 } 00430 00431 description = this->federationParticipant_->lookup_topicdescription(TOPICUPDATETOPICNAME); 00432 dataReader = subscriber->create_datareader( 00433 description.in(), 00434 readerQos, 00435 &this->topicListener_, 00436 OpenDDS::DCPS::DEFAULT_STATUS_MASK); 00437 00438 if (CORBA::is_nil(dataReader.in())) { 00439 ACE_ERROR((LM_ERROR, 00440 ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ") 00441 ACE_TEXT("failed to create TopicUpdate reader for repository %d\n"), 00442 this->id().id())); 00443 throw Incomplete(); 00444 00445 } else if (OpenDDS::DCPS::DCPS_debug_level > 4) { 00446 OpenDDS::DCPS::DataReaderImpl* servant 00447 = dynamic_cast<OpenDDS::DCPS::DataReaderImpl*>(dataReader.in()); 00448 00449 if (0 == servant) { 00450 ACE_DEBUG((LM_WARNING, 00451 ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ") 00452 ACE_TEXT("unable to extract typed TopicUpdate reader.\n"))); 00453 00454 } else { 00455 OpenDDS::DCPS::RepoIdConverter converter(servant->get_subscription_id()); 00456 ACE_DEBUG((LM_DEBUG, 00457 ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ") 00458 ACE_TEXT("created federation TopicUpdate reader %C for repository %d\n"), 00459 std::string(converter).c_str(), 00460 this->id().id())); 00461 } 00462 } 00463 00464 topic = this->federationParticipant_->create_topic( 00465 PARTICIPANTUPDATETOPICNAME, 00466 PARTICIPANTUPDATETYPENAME, 00467 TOPIC_QOS_DEFAULT, 00468 DDS::TopicListener::_nil(), 00469 OpenDDS::DCPS::DEFAULT_STATUS_MASK); 00470 dataWriter = publisher->create_datawriter( 00471 topic.in(), 00472 writerQos, 00473 DDS::DataWriterListener::_nil(), 00474 OpenDDS::DCPS::DEFAULT_STATUS_MASK); 00475 00476 if (CORBA::is_nil(dataWriter.in())) { 00477 ACE_ERROR((LM_ERROR, 00478 ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ") 00479 ACE_TEXT("failed to create ParticipantUpdate writer for repository %d\n"), 00480 this->id().id())); 00481 throw Incomplete(); 00482 } 00483 00484 this->participantWriter_ 00485 = ParticipantUpdateDataWriter::_narrow(dataWriter.in()); 00486 00487 if (::CORBA::is_nil(this->participantWriter_.in())) { 00488 ACE_ERROR((LM_ERROR, 00489 ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ") 00490 ACE_TEXT("failed to extract typed ParticipantUpdate writer.\n"))); 00491 throw Incomplete(); 00492 00493 } else if (OpenDDS::DCPS::DCPS_debug_level > 4) { 00494 OpenDDS::DCPS::DataWriterImpl* servant 00495 = dynamic_cast<OpenDDS::DCPS::DataWriterImpl*>(dataWriter.in()); 00496 00497 if (0 == servant) { 00498 ACE_DEBUG((LM_WARNING, 00499 ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ") 00500 ACE_TEXT("unable to extract typed ParticipantUpdate writer.\n"))); 00501 00502 } else { 00503 OpenDDS::DCPS::RepoIdConverter converter(servant->get_publication_id()); 00504 ACE_DEBUG((LM_DEBUG, 00505 ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ") 00506 ACE_TEXT("created federation ParticipantUpdate writer %C for repository %d\n"), 00507 std::string(converter).c_str(), 00508 this->id().id())); 00509 } 00510 } 00511 00512 description = this->federationParticipant_->lookup_topicdescription(PARTICIPANTUPDATETOPICNAME); 00513 dataReader = subscriber->create_datareader( 00514 description.in(), 00515 readerQos, 00516 &this->participantListener_, 00517 OpenDDS::DCPS::DEFAULT_STATUS_MASK); 00518 00519 if (CORBA::is_nil(dataReader.in())) { 00520 ACE_ERROR((LM_ERROR, 00521 ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ") 00522 ACE_TEXT("failed to create ParticipantUpdate reader for repository %d\n"), 00523 this->id().id())); 00524 throw Incomplete(); 00525 00526 } else if (OpenDDS::DCPS::DCPS_debug_level > 4) { 00527 OpenDDS::DCPS::DataReaderImpl* servant 00528 = dynamic_cast<OpenDDS::DCPS::DataReaderImpl*>(dataReader.in()); 00529 00530 if (0 == servant) { 00531 ACE_DEBUG((LM_WARNING, 00532 ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ") 00533 ACE_TEXT("unable to extract typed ParticipantUpdate reader.\n"))); 00534 00535 } else { 00536 OpenDDS::DCPS::RepoIdConverter converter(servant->get_subscription_id()); 00537 ACE_DEBUG((LM_DEBUG, 00538 ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ") 00539 ACE_TEXT("created federation ParticipantUpdate reader %C for repository %d\n"), 00540 std::string(converter).c_str(), 00541 this->id().id())); 00542 } 00543 } 00544 00545 topic = this->federationParticipant_->create_topic( 00546 PUBLICATIONUPDATETOPICNAME, 00547 PUBLICATIONUPDATETYPENAME, 00548 TOPIC_QOS_DEFAULT, 00549 DDS::TopicListener::_nil(), 00550 OpenDDS::DCPS::DEFAULT_STATUS_MASK); 00551 dataWriter = publisher->create_datawriter( 00552 topic.in(), 00553 writerQos, 00554 DDS::DataWriterListener::_nil(), 00555 OpenDDS::DCPS::DEFAULT_STATUS_MASK); 00556 00557 if (CORBA::is_nil(dataWriter.in())) { 00558 ACE_ERROR((LM_ERROR, 00559 ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ") 00560 ACE_TEXT("failed to create PublicationUpdate writer for repository %d\n"), 00561 this->id().id())); 00562 throw Incomplete(); 00563 } 00564 00565 this->publicationWriter_ 00566 = PublicationUpdateDataWriter::_narrow(dataWriter.in()); 00567 00568 if (::CORBA::is_nil(this->publicationWriter_.in())) { 00569 ACE_ERROR((LM_ERROR, 00570 ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ") 00571 ACE_TEXT("failed to extract typed PublicationUpdate writer.\n"))); 00572 throw Incomplete(); 00573 00574 } else if (OpenDDS::DCPS::DCPS_debug_level > 4) { 00575 OpenDDS::DCPS::DataWriterImpl* servant 00576 = dynamic_cast<OpenDDS::DCPS::DataWriterImpl*>(dataWriter.in()); 00577 00578 if (0 == servant) { 00579 ACE_DEBUG((LM_WARNING, 00580 ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ") 00581 ACE_TEXT("unable to extract typed PublicationUpdate writer.\n"))); 00582 00583 } else { 00584 OpenDDS::DCPS::RepoIdConverter converter(servant->get_publication_id()); 00585 ACE_DEBUG((LM_DEBUG, 00586 ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ") 00587 ACE_TEXT("created federation PublicationUpdate writer %C for repository %d\n"), 00588 std::string(converter).c_str(), 00589 this->id().id())); 00590 } 00591 } 00592 00593 description = this->federationParticipant_->lookup_topicdescription(PUBLICATIONUPDATETOPICNAME); 00594 dataReader = subscriber->create_datareader( 00595 description.in(), 00596 readerQos, 00597 &this->publicationListener_, 00598 OpenDDS::DCPS::DEFAULT_STATUS_MASK); 00599 00600 if (CORBA::is_nil(dataReader.in())) { 00601 ACE_ERROR((LM_ERROR, 00602 ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ") 00603 ACE_TEXT("failed to create PublicationUpdate reader for repository %d\n"), 00604 this->id().id())); 00605 throw Incomplete(); 00606 00607 } else if (OpenDDS::DCPS::DCPS_debug_level > 4) { 00608 OpenDDS::DCPS::DataReaderImpl* servant 00609 = dynamic_cast<OpenDDS::DCPS::DataReaderImpl*>(dataReader.in()); 00610 00611 if (0 == servant) { 00612 ACE_DEBUG((LM_WARNING, 00613 ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ") 00614 ACE_TEXT("unable to extract typed PublicationUpdate reader.\n"))); 00615 00616 } else { 00617 OpenDDS::DCPS::RepoIdConverter converter(servant->get_subscription_id()); 00618 ACE_DEBUG((LM_DEBUG, 00619 ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ") 00620 ACE_TEXT("created federation PublicationUpdate reader %C for repository %d\n"), 00621 std::string(converter).c_str(), 00622 this->id().id())); 00623 } 00624 } 00625 00626 topic = this->federationParticipant_->create_topic( 00627 SUBSCRIPTIONUPDATETOPICNAME, 00628 SUBSCRIPTIONUPDATETYPENAME, 00629 TOPIC_QOS_DEFAULT, 00630 DDS::TopicListener::_nil(), 00631 OpenDDS::DCPS::DEFAULT_STATUS_MASK); 00632 dataWriter = publisher->create_datawriter( 00633 topic.in(), 00634 writerQos, 00635 DDS::DataWriterListener::_nil(), 00636 OpenDDS::DCPS::DEFAULT_STATUS_MASK); 00637 00638 if (CORBA::is_nil(dataWriter.in())) { 00639 ACE_ERROR((LM_ERROR, 00640 ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ") 00641 ACE_TEXT("failed to create SubscriptionUpdate writer for repository %d\n"), 00642 this->id().id())); 00643 throw Incomplete(); 00644 } 00645 00646 this->subscriptionWriter_ 00647 = SubscriptionUpdateDataWriter::_narrow(dataWriter.in()); 00648 00649 if (::CORBA::is_nil(this->subscriptionWriter_.in())) { 00650 ACE_ERROR((LM_ERROR, 00651 ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ") 00652 ACE_TEXT("failed to extract typed SubscriptionUpdate writer.\n"))); 00653 throw Incomplete(); 00654 00655 } else if (OpenDDS::DCPS::DCPS_debug_level > 4) { 00656 OpenDDS::DCPS::DataWriterImpl* servant 00657 = dynamic_cast<OpenDDS::DCPS::DataWriterImpl*>(dataWriter.in()); 00658 00659 if (0 == servant) { 00660 ACE_DEBUG((LM_WARNING, 00661 ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ") 00662 ACE_TEXT("unable to extract typed SubscriptionUpdate writer.\n"))); 00663 00664 } else { 00665 OpenDDS::DCPS::RepoIdConverter converter(servant->get_publication_id()); 00666 ACE_DEBUG((LM_DEBUG, 00667 ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ") 00668 ACE_TEXT("created federation SubscriptionUpdate writer %C for repository %d\n"), 00669 std::string(converter).c_str(), 00670 this->id().id())); 00671 } 00672 } 00673 00674 description = this->federationParticipant_->lookup_topicdescription(SUBSCRIPTIONUPDATETOPICNAME); 00675 dataReader = subscriber->create_datareader( 00676 description.in(), 00677 readerQos, 00678 &this->subscriptionListener_, 00679 OpenDDS::DCPS::DEFAULT_STATUS_MASK); 00680 00681 if (CORBA::is_nil(dataReader.in())) { 00682 ACE_ERROR((LM_ERROR, 00683 ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ") 00684 ACE_TEXT("failed to create SubscriptionUpdate reader for repository %d\n"), 00685 this->id().id())); 00686 throw Incomplete(); 00687 00688 } else if (OpenDDS::DCPS::DCPS_debug_level > 4) { 00689 OpenDDS::DCPS::DataReaderImpl* servant 00690 = dynamic_cast<OpenDDS::DCPS::DataReaderImpl*>(dataReader.in()); 00691 00692 if (0 == servant) { 00693 ACE_DEBUG((LM_WARNING, 00694 ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ") 00695 ACE_TEXT("unable to extract typed SubscriptionUpdate reader.\n"))); 00696 00697 } else { 00698 OpenDDS::DCPS::RepoIdConverter converter(servant->get_subscription_id()); 00699 ACE_DEBUG((LM_DEBUG, 00700 ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ") 00701 ACE_TEXT("created federation SubscriptionUpdate reader %C for repository %d\n"), 00702 std::string(converter).c_str(), 00703 this->id().id())); 00704 } 00705 } 00706 00707 // JSP 00708 #if defined (ACE_HAS_IP_MULTICAST) 00709 00710 if (this->multicastEnabled_) { 00711 // 00712 // Install ior multicast handler. 00713 // 00714 // Get reactor instance from TAO. 00715 ACE_Reactor *reactor = this->orb_->orb_core()->reactor(); 00716 00717 // See if the -ORBMulticastDiscoveryEndpoint option was specified. 00718 ACE_CString mde(this->orb_->orb_core()->orb_params()->mcast_discovery_endpoint()); 00719 00720 // First, see if the user has given us a multicast port number 00721 // on the command-line; 00722 u_short port = 0; 00723 00724 // Check environment var. for multicast port. 00725 const char *port_number = ACE_OS::getenv("OpenDDSFederationPort"); 00726 00727 if (port_number != 0) { 00728 port = static_cast<u_short>(ACE_OS::atoi(port_number)); 00729 } 00730 00731 // Port wasn't specified on the command-line - 00732 // use the default. 00733 if (port == 0) 00734 port = OpenDDS::Federator::Defaults::DiscoveryRequestPort; 00735 00736 // Initialize the handler 00737 if (mde.length() != 0) { 00738 if (this->multicastResponder_.init( 00739 this->orb_.in(), 00740 mde.c_str()) == -1) { 00741 ACE_ERROR((LM_ERROR, 00742 ACE_TEXT("(%P|%t) ERROR: Unable to initialize ") 00743 ACE_TEXT("the multicast responder for repository %d.\n"), 00744 this->id().id())); 00745 throw Incomplete(); 00746 } 00747 00748 } else { 00749 if (this->multicastResponder_.init( 00750 this->orb_.in(), 00751 port, 00752 #if defined (ACE_HAS_IPV6) 00753 ACE_DEFAULT_MULTICASTV6_ADDR 00754 #else 00755 ACE_DEFAULT_MULTICAST_ADDR 00756 #endif /* ACE_HAS_IPV6 */ 00757 )) { 00758 ACE_ERROR((LM_ERROR, 00759 ACE_TEXT("(%P|%t) ERROR: Unable to initialize ") 00760 ACE_TEXT("the multicast responder for repository %d.\n"), 00761 this->id().id())); 00762 throw Incomplete(); 00763 } 00764 } 00765 00766 // Register event handler for the ior multicast. 00767 if (reactor->register_handler(&this->multicastResponder_, 00768 ACE_Event_Handler::READ_MASK) == -1) { 00769 ACE_ERROR((LM_ERROR, 00770 ACE_TEXT("(%P|%t) ERROR: Unable to register event handler ") 00771 ACE_TEXT("for repository %d.\n"), 00772 this->id().id())); 00773 throw Incomplete(); 00774 } 00775 00776 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 00777 ACE_DEBUG((LM_DEBUG, 00778 ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ") 00779 ACE_TEXT("multicast server setup is complete.\n"))); 00780 } 00781 } 00782 00783 #else 00784 ACE_UNUSED_ARG(this->multicastEnabled_); 00785 #endif /* ACE_HAS_IP_MULTICAST */ 00786 }
void OpenDDS::Federator::ManagerImpl::initializeOwner | ( | const OpenDDS::Federator::OwnerUpdate & | data | ) | [virtual] |
Definition at line 1130 of file FederatorManagerImpl.cpp.
References processCreate().
01132 { 01133 this->processCreate(&data, 0); 01134 }
void OpenDDS::Federator::ManagerImpl::initializeParticipant | ( | const OpenDDS::Federator::ParticipantUpdate & | data | ) | [virtual] |
Definition at line 1144 of file FederatorManagerImpl.cpp.
References processCreate().
01146 { 01147 this->processCreate(&data, 0); 01148 }
void OpenDDS::Federator::ManagerImpl::initializePublication | ( | const OpenDDS::Federator::PublicationUpdate & | data | ) | [virtual] |
Definition at line 1151 of file FederatorManagerImpl.cpp.
References processCreate().
01153 { 01154 this->processCreate(&data, 0); 01155 }
void OpenDDS::Federator::ManagerImpl::initializeSubscription | ( | const OpenDDS::Federator::SubscriptionUpdate & | data | ) | [virtual] |
Definition at line 1158 of file FederatorManagerImpl.cpp.
References processCreate().
01160 { 01161 this->processCreate(&data, 0); 01162 }
void OpenDDS::Federator::ManagerImpl::initializeTopic | ( | const OpenDDS::Federator::TopicUpdate & | data | ) | [virtual] |
Definition at line 1137 of file FederatorManagerImpl.cpp.
References processCreate().
01139 { 01140 this->processCreate(&data, 0); 01141 }
Manager_ptr OpenDDS::Federator::ManagerImpl::join_federation | ( | Manager_ptr | peer, | |
FederationDomain | federation | |||
) | [virtual] |
Definition at line 917 of file FederatorManagerImpl.cpp.
References CORBA::Exception::_tao_print_exception(), ACE_TEXT(), config_, OpenDDS::DCPS::DCPS_debug_level, federated_, OpenDDS::Federator::Config::federationDomain(), initialize(), CORBA::is_nil(), joiner_, joining_, joinRepo_, LM_DEBUG, lock_, OpenDDS::Federator::NIL_REPOSITORY, orb(), participantWriter_, peers_, pushState(), ACE_Condition< MUTEX >::signal(), TheServiceParticipant, and ACE_Condition< MUTEX >::wait().
00921 { 00922 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 00923 ACE_DEBUG((LM_DEBUG, 00924 ACE_TEXT("(%P|%t) ManagerImpl::join_federation( peer, %d)\n"), 00925 federation)); 00926 } 00927 00928 RepoKey remote = NIL_REPOSITORY; 00929 00930 try { 00931 // Obtain the remote repository federator Id value. 00932 remote = peer->federation_id(); 00933 00934 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 00935 ACE_DEBUG((LM_DEBUG, 00936 ACE_TEXT("(%P|%t) Federator::ManagerImpl::join_federation() - ") 00937 ACE_TEXT("repo id %d entered from repository with id %d.\n"), 00938 this->id().id(), 00939 remote)); 00940 } 00941 00942 } catch (const CORBA::Exception& ex) { 00943 ex._tao_print_exception( 00944 ACE_TEXT("ERROR: Federator::ManagerImpl::join_federation() - ") 00945 ACE_TEXT("unable to obtain remote federation Id value: ")); 00946 throw Incomplete(); 00947 } 00948 00949 // If we are recursing, then we are done. 00950 if (this->joiner_ == remote) { 00951 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 00952 ACE_DEBUG((LM_DEBUG, 00953 ACE_TEXT("(%P|%t) Federator::ManagerImpl::join_federation() - ") 00954 ACE_TEXT("repo id %d leaving after reentry from repository with id %d.\n"), 00955 this->id().id(), 00956 remote)); 00957 } 00958 00959 return this->_this(); 00960 00961 } else { 00962 // Block while any different repository is joining. 00963 ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, guard, this->lock_, 0); 00964 00965 while (this->joiner_ != NIL_REPOSITORY) { 00966 // This releases the lock while we block. 00967 this->joining_.wait(); 00968 00969 // We are now recursing - curses! 00970 if (this->joiner_ == remote) { 00971 return this->_this(); 00972 } 00973 } 00974 00975 // Note that we are joining the remote repository now. 00976 this->joiner_ = remote; 00977 } 00978 00979 // 00980 // We only reach this point if: 00981 // 1) No other repository is processing past this point; 00982 // 2) We are not recursing. 00983 // 00984 00985 // Check if we already have Federation repository. 00986 // Check if we are already federated. 00987 if (this->federated_ == false) { 00988 // Go ahead and add the joining repository as our Federation 00989 // repository. 00990 try { 00991 // Mark this repository as the point to which we are joined to 00992 // the federation. 00993 this->joinRepo_ = remote; 00994 00995 // Obtain a reference to the remote repository. 00996 OpenDDS::DCPS::DCPSInfo_var remoteRepo = peer->repository(); 00997 00998 CORBA::ORB_var orb = remoteRepo->_get_orb(); 00999 CORBA::String_var remoteRepoIor = orb->object_to_string(remoteRepo.in()); 01000 if (OpenDDS::DCPS::DCPS_debug_level > 4) { 01001 ACE_DEBUG((LM_DEBUG, 01002 ACE_TEXT("(%P|%t) FederatorManagerImpl::join_federation() - ") 01003 ACE_TEXT("id %d obtained reference to id %d:\n") 01004 ACE_TEXT("\t%C\n"), 01005 this->id().id(), 01006 remote, 01007 remoteRepoIor.in())); 01008 } 01009 01010 // Add remote repository to Service_Participant in the Federation domain 01011 std::ostringstream oss; 01012 oss << remote; 01013 std::string key_string = oss.str(); 01014 TheServiceParticipant->set_repo_ior(remoteRepoIor.in(), key_string); 01015 TheServiceParticipant->set_repo_domain(this->config_.federationDomain(), key_string); 01016 01017 } catch (const CORBA::Exception& ex) { 01018 ex._tao_print_exception( 01019 "ERROR: Federator::ManagerImpl::join_federation() - Unable to join with remote: "); 01020 throw Incomplete(); 01021 } 01022 } 01023 01024 // Symmetrical joining behavior. 01025 try { 01026 Manager_var self = this->_this(); 01027 Manager_var remoteManager 01028 = peer->join_federation(self, this->config_.federationDomain()); 01029 01030 if (this->joinRepo_ == remote) { 01031 this->peers_[ this->joinRepo_] 01032 = OpenDDS::Federator::Manager::_duplicate(remoteManager.in()); 01033 } 01034 01035 // 01036 // Push our initial state out to the joining repository *after* we call 01037 // him back to join. This reduces the amount of duplicate data pushed 01038 // when a new (empty) repository is joining an existing federation. 01039 // 01040 if (OpenDDS::DCPS::DCPS_debug_level > 4) { 01041 ACE_DEBUG((LM_DEBUG, 01042 ACE_TEXT("(%P|%t) Federator::ManagerImpl::join_federation() - ") 01043 ACE_TEXT("repo id %d pushing state to repository with id %d.\n"), 01044 this->id().id(), 01045 remote)); 01046 } 01047 01048 this->pushState(peer); 01049 01050 } catch (const CORBA::Exception& ex) { 01051 ex._tao_print_exception( 01052 "ERROR: Federator::ManagerImpl::join_federation() - unsuccessful call to remote->join: "); 01053 throw Incomplete(); 01054 } 01055 01056 if (CORBA::is_nil(this->participantWriter_.in())) { 01057 // 01058 // Establish our update publications and subscriptions *after* we 01059 // have exchanged internal state with the first joining repository. 01060 // 01061 this->initialize(); 01062 } 01063 01064 // Adjust our joining state and give others the opportunity to proceed. 01065 if (OpenDDS::DCPS::DCPS_debug_level > 4) { 01066 ACE_DEBUG((LM_DEBUG, 01067 ACE_TEXT("(%P|%t) Federator::ManagerImpl::join_federation() - ") 01068 ACE_TEXT("repo id %d joined to repository with id %d.\n"), 01069 this->id().id(), 01070 remote)); 01071 } 01072 01073 this->federated_ = true; 01074 this->joiner_ = NIL_REPOSITORY; 01075 this->joining_.signal(); 01076 return this->_this(); 01077 }
void OpenDDS::Federator::ManagerImpl::leave_and_shutdown | ( | void | ) | [virtual] |
Definition at line 1110 of file FederatorManagerImpl.cpp.
References info_, and TAO_DDS_DCPSInfo_i::shutdown().
01112 { 01113 // Shutdown the process via the repository object. 01114 this->info_->shutdown(); 01115 }
void OpenDDS::Federator::ManagerImpl::leave_federation | ( | RepoKey | id | ) | [virtual] |
Definition at line 1080 of file FederatorManagerImpl.cpp.
References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, info_, LM_DEBUG, peers_, and TAO_DDS_DCPSInfo_i::remove_by_owner().
01082 { 01083 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 01084 ACE_DEBUG((LM_DEBUG, 01085 ACE_TEXT("(%P|%t) ManagerImpl::leave_federation( %d)\n"), 01086 this->id().id())); 01087 } 01088 01089 // Remove the leaving repository from our outbound mappings. 01090 IdToManagerMap::iterator where = this->peers_.find(id); 01091 01092 if (where != this->peers_.end()) { 01093 this->peers_.erase(where); 01094 } 01095 01096 // Remove all the internal Entities owned by the leaving repository. 01097 if (false 01098 == this->info_->remove_by_owner(this->config_.federationDomain(), id)) { 01099 throw Incomplete(); 01100 } 01101 01102 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 01103 ACE_DEBUG((LM_DEBUG, 01104 ACE_TEXT("(%P|%t) ManagerImpl::leave_federation( %d) complete.\n"), 01105 this->id().id())); 01106 } 01107 }
ACE_INLINE void OpenDDS::Federator::ManagerImpl::localRepo | ( | ::OpenDDS::DCPS::DCPSInfo_ptr | repo | ) |
Capture a remote callable reference to the DCPSInfo.
Definition at line 33 of file FederatorManagerImpl.inl.
References localRepo_.
Referenced by InfoRepo::init().
00034 { 00035 this->localRepo_ = OpenDDS::DCPS::DCPSInfo::_duplicate(repo); 00036 }
ACE_INLINE void OpenDDS::Federator::ManagerImpl::orb | ( | CORBA::ORB_ptr | value | ) |
Definition at line 47 of file FederatorManagerImpl.inl.
References CORBA::ORB::_duplicate(), and orb_.
00048 { 00049 this->orb_ = CORBA::ORB::_duplicate(value); 00050 }
ACE_INLINE CORBA::ORB_ptr OpenDDS::Federator::ManagerImpl::orb | ( | void | ) |
Accessors for the ORB.
Definition at line 40 of file FederatorManagerImpl.inl.
References orb_, and TAO_Pseudo_Var_T< T >::ptr().
Referenced by InfoRepo::init(), join_federation(), and pushState().
void OpenDDS::Federator::ManagerImpl::processCreate | ( | const TopicUpdate * | sample, | |
const DDS::SampleInfo * | info | |||
) | [virtual] |
Create a proxy for a new topic.
Implements OpenDDS::Federator::UpdateProcessor< TopicUpdate >.
Definition at line 731 of file FederatorManagerImpl_updates.cpp.
References ACE_TEXT(), TAO_DDS_DCPSInfo_i::add_topic(), OpenDDS::Federator::TopicUpdate::datatype, OpenDDS::DCPS::DCPS_debug_level, deferred_lock_, deferredTopics_, OpenDDS::Federator::TopicUpdate::domain, OpenDDS::Federator::TopicUpdate::id, info_, LM_DEBUG, OpenDDS::Federator::TopicUpdate::participant, processDeferred(), OpenDDS::Federator::TopicUpdate::qos, and OpenDDS::Federator::TopicUpdate::topic.
00732 { 00733 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 00734 OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant); 00735 OpenDDS::DCPS::RepoIdConverter topic_converter(sample->id); 00736 ACE_DEBUG((LM_DEBUG, 00737 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processCreate( TopicUpdate): ") 00738 ACE_TEXT("repo %d - [ domain %d/ participant %C/ topic %C ]\n"), 00739 this->id().id(), 00740 sample->domain, 00741 std::string(part_converter).c_str(), 00742 std::string(topic_converter).c_str())); 00743 } 00744 00745 if (false == this->info_->add_topic(sample->id, 00746 sample->domain, 00747 sample->participant, 00748 sample->topic, 00749 sample->datatype, 00750 sample->qos)) { 00751 { 00752 ACE_GUARD(ACE_Thread_Mutex, 00753 guard, 00754 this->deferred_lock_); 00755 this->deferredTopics_.push_back(*sample); 00756 } 00757 00758 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 00759 ACE_DEBUG((LM_DEBUG, 00760 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processCreate( TopicUpdate): ") 00761 ACE_TEXT("deferred update.\n"))); 00762 } 00763 } 00764 00765 this->processDeferred(); 00766 }
void OpenDDS::Federator::ManagerImpl::processCreate | ( | const ParticipantUpdate * | sample, | |
const DDS::SampleInfo * | info | |||
) | [virtual] |
Create a proxy for a new participant.
Implements OpenDDS::Federator::UpdateProcessor< ParticipantUpdate >.
Definition at line 699 of file FederatorManagerImpl_updates.cpp.
References ACE_TEXT(), TAO_DDS_DCPSInfo_i::add_domain_participant(), TAO_DDS_DCPSInfo_i::changeOwnership(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::ParticipantUpdate::domain, OpenDDS::Federator::ParticipantUpdate::id, info_, LM_DEBUG, LM_ERROR, OpenDDS::Federator::ParticipantUpdate::owner, processDeferred(), OpenDDS::Federator::ParticipantUpdate::qos, and OpenDDS::Federator::ParticipantUpdate::sender.
00700 { 00701 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 00702 OpenDDS::DCPS::RepoIdConverter converter(sample->id); 00703 ACE_DEBUG((LM_DEBUG, 00704 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processCreate( ParticipantUpdate): ") 00705 ACE_TEXT("repo %d - [ domain %d/ participant %C/ owner %d ]\n"), 00706 this->id().id(), 00707 sample->domain, 00708 std::string(converter).c_str(), 00709 sample->owner)); 00710 } 00711 00712 this->info_->add_domain_participant( 00713 sample->domain, 00714 sample->id, 00715 sample->qos); 00716 bool ownershipChanged = this->info_->changeOwnership( 00717 sample->domain, 00718 sample->id, 00719 sample->sender, 00720 sample->owner); 00721 if (!ownershipChanged) { 00722 ACE_ERROR((LM_ERROR, 00723 ACE_TEXT("(%P|%t) ERROR: ") 00724 ACE_TEXT("OpenDDS::Federator::ManagerImpl::processCreate(), ") 00725 ACE_TEXT("Could not change ownership\n"))); 00726 } 00727 this->processDeferred(); 00728 }
void OpenDDS::Federator::ManagerImpl::processCreate | ( | const SubscriptionUpdate * | sample, | |
const DDS::SampleInfo * | info | |||
) | [virtual] |
Create a proxy for a new subscription.
Implements OpenDDS::Federator::UpdateProcessor< SubscriptionUpdate >.
Definition at line 655 of file FederatorManagerImpl_updates.cpp.
References ACE_TEXT(), TAO_DDS_DCPSInfo_i::add_subscription(), OpenDDS::Federator::SubscriptionUpdate::callback, OpenDDS::Federator::SubscriptionUpdate::datareader_qos, OpenDDS::DCPS::DCPS_debug_level, deferred_lock_, deferredSubscriptions_, OpenDDS::Federator::SubscriptionUpdate::domain, OpenDDS::Federator::SubscriptionUpdate::expression_params, OpenDDS::Federator::SubscriptionUpdate::filter_class_name, OpenDDS::Federator::SubscriptionUpdate::filter_expression, OpenDDS::Federator::SubscriptionUpdate::id, info_, LM_DEBUG, OpenDDS::Federator::SubscriptionUpdate::participant, processDeferred(), OpenDDS::Federator::SubscriptionUpdate::subscriber_qos, OpenDDS::Federator::SubscriptionUpdate::topic, and OpenDDS::Federator::SubscriptionUpdate::transport_info.
00656 { 00657 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 00658 OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant); 00659 OpenDDS::DCPS::RepoIdConverter sub_converter(sample->id); 00660 ACE_DEBUG((LM_DEBUG, 00661 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processCreate( SubscriptionUpdate): ") 00662 ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"), 00663 this->id().id(), 00664 sample->domain, 00665 std::string(part_converter).c_str(), 00666 std::string(sub_converter).c_str())); 00667 } 00668 00669 if (false == this->info_->add_subscription(sample->domain, 00670 sample->participant, 00671 sample->topic, 00672 sample->id, 00673 sample->callback, 00674 sample->datareader_qos, 00675 sample->transport_info, 00676 sample->subscriber_qos, 00677 sample->filter_class_name, 00678 sample->filter_expression, 00679 sample->expression_params, 00680 true)) { 00681 { 00682 ACE_GUARD(ACE_Thread_Mutex, 00683 guard, 00684 this->deferred_lock_); 00685 this->deferredSubscriptions_.push_back(*sample); 00686 } 00687 00688 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 00689 ACE_DEBUG((LM_DEBUG, 00690 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processCreate( SubscriptionUpdate): ") 00691 ACE_TEXT("deferred update.\n"))); 00692 } 00693 } 00694 00695 this->processDeferred(); 00696 }
void OpenDDS::Federator::ManagerImpl::processCreate | ( | const PublicationUpdate * | sample, | |
const DDS::SampleInfo * | info | |||
) | [virtual] |
Create a proxy for a new publication.
Implements OpenDDS::Federator::UpdateProcessor< PublicationUpdate >.
Definition at line 614 of file FederatorManagerImpl_updates.cpp.
References ACE_TEXT(), TAO_DDS_DCPSInfo_i::add_publication(), OpenDDS::Federator::PublicationUpdate::callback, OpenDDS::Federator::PublicationUpdate::datawriter_qos, OpenDDS::DCPS::DCPS_debug_level, deferred_lock_, deferredPublications_, OpenDDS::Federator::PublicationUpdate::domain, OpenDDS::Federator::PublicationUpdate::id, info_, LM_DEBUG, OpenDDS::Federator::PublicationUpdate::participant, processDeferred(), OpenDDS::Federator::PublicationUpdate::publisher_qos, OpenDDS::Federator::PublicationUpdate::topic, and OpenDDS::Federator::PublicationUpdate::transport_info.
00615 { 00616 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 00617 OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant); 00618 OpenDDS::DCPS::RepoIdConverter pub_converter(sample->id); 00619 ACE_DEBUG((LM_DEBUG, 00620 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processCreate( PublicationUpdate): ") 00621 ACE_TEXT("repo %d - [ domain %d/ participant %C/ publication %C ]\n"), 00622 this->id().id(), 00623 sample->domain, 00624 std::string(part_converter).c_str(), 00625 std::string(pub_converter).c_str())); 00626 } 00627 00628 if (false == this->info_->add_publication(sample->domain, 00629 sample->participant, 00630 sample->topic, 00631 sample->id, 00632 sample->callback, 00633 sample->datawriter_qos, 00634 sample->transport_info, 00635 sample->publisher_qos, 00636 true)) { 00637 { 00638 ACE_GUARD(ACE_Thread_Mutex, 00639 guard, 00640 this->deferred_lock_); 00641 this->deferredPublications_.push_back(*sample); 00642 } 00643 00644 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 00645 ACE_DEBUG((LM_DEBUG, 00646 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processCreate( PublicationUpdate): ") 00647 ACE_TEXT("deferred update.\n"))); 00648 } 00649 } 00650 00651 this->processDeferred(); 00652 }
void OpenDDS::Federator::ManagerImpl::processCreate | ( | const OwnerUpdate * | sample, | |
const DDS::SampleInfo * | info | |||
) | [virtual] |
Null implementation for OwnerUpdate samples.
Implements OpenDDS::Federator::UpdateProcessor< OwnerUpdate >.
Definition at line 577 of file FederatorManagerImpl_updates.cpp.
References ACE_TEXT(), TAO_DDS_DCPSInfo_i::changeOwnership(), OpenDDS::DCPS::DCPS_debug_level, deferred_lock_, deferredOwnerships_, OpenDDS::Federator::OwnerUpdate::domain, info_, LM_DEBUG, OpenDDS::Federator::OwnerUpdate::owner, OpenDDS::Federator::OwnerUpdate::participant, processDeferred(), and OpenDDS::Federator::OwnerUpdate::sender.
Referenced by initializeOwner(), initializeParticipant(), initializePublication(), initializeSubscription(), and initializeTopic().
00578 { 00579 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 00580 OpenDDS::DCPS::RepoIdConverter converter(sample->participant); 00581 ACE_DEBUG((LM_DEBUG, 00582 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processCreate( OwnerUpdate): ") 00583 ACE_TEXT("repo %d - [ domain %d/ participant %C/ sender %d/ owner %d ]\n"), 00584 this->id().id(), 00585 sample->domain, 00586 std::string(converter).c_str(), 00587 sample->sender, 00588 sample->owner)); 00589 } 00590 00591 // We could generate an error message here. Instead we let action be irrelevant. 00592 if (false == this->info_->changeOwnership(sample->domain, 00593 sample->participant, 00594 sample->sender, 00595 sample->owner)) { 00596 { 00597 ACE_GUARD(ACE_Thread_Mutex, 00598 guard, 00599 this->deferred_lock_); 00600 this->deferredOwnerships_.push_back(*sample); 00601 } 00602 00603 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 00604 ACE_DEBUG((LM_DEBUG, 00605 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processCreate( OwnerUpdate): ") 00606 ACE_TEXT("deferred update.\n"))); 00607 } 00608 } 00609 00610 this->processDeferred(); 00611 }
void OpenDDS::Federator::ManagerImpl::processDeferred | ( | ) |
Handle any deferred updates that might have become processable.
Definition at line 769 of file FederatorManagerImpl_updates.cpp.
References ACE_TEXT(), TAO_DDS_DCPSInfo_i::add_publication(), TAO_DDS_DCPSInfo_i::add_subscription(), TAO_DDS_DCPSInfo_i::add_topic(), TAO_DDS_DCPSInfo_i::changeOwnership(), OpenDDS::DCPS::DCPS_debug_level, deferred_lock_, deferredOwnerships_, deferredPublications_, deferredSubscriptions_, deferredTopics_, info_, and LM_DEBUG.
Referenced by processCreate().
00770 { 00771 ACE_GUARD(ACE_Thread_Mutex, 00772 guard, 00773 this->deferred_lock_); 00774 00775 { 00776 std::list<OwnerUpdate>::iterator current = this->deferredOwnerships_.begin(); 00777 00778 while (current != this->deferredOwnerships_.end()) { 00779 if (this->info_->changeOwnership(current->domain, 00780 current->participant, 00781 current->sender, 00782 current->owner)) { 00783 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 00784 OpenDDS::DCPS::RepoIdConverter converter(current->participant); 00785 ACE_DEBUG((LM_DEBUG, 00786 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDeferred( OwnerUpdate): ") 00787 ACE_TEXT("repo %d - [ domain %d/ participant %C/ sender %d/ owner %d ]\n"), 00788 this->id().id(), 00789 current->domain, 00790 std::string(converter).c_str(), 00791 current->sender, 00792 current->owner)); 00793 } 00794 00795 current = this->deferredOwnerships_.erase(current); 00796 00797 } else { 00798 ++ current; 00799 } 00800 } 00801 } 00802 00803 { 00804 std::list<TopicUpdate>::iterator current = this->deferredTopics_.begin(); 00805 00806 while (current != this->deferredTopics_.end()) { 00807 if (true == this->info_->add_topic(current->id, 00808 current->domain, 00809 current->participant, 00810 current->topic, 00811 current->datatype, 00812 current->qos)) { 00813 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 00814 OpenDDS::DCPS::RepoIdConverter part_converter(current->participant); 00815 OpenDDS::DCPS::RepoIdConverter topic_converter(current->id); 00816 ACE_DEBUG((LM_DEBUG, 00817 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDeferred( TopicUpdate): ") 00818 ACE_TEXT("repo %d - [ domain %d/ participant %C/ topic %C ]\n"), 00819 this->id().id(), 00820 current->domain, 00821 std::string(part_converter).c_str(), 00822 std::string(topic_converter).c_str())); 00823 } 00824 00825 current = this->deferredTopics_.erase(current); 00826 00827 } else { 00828 ++ current; 00829 } 00830 } 00831 } 00832 00833 { 00834 std::list<PublicationUpdate>::iterator current = this->deferredPublications_.begin(); 00835 00836 while (current != this->deferredPublications_.end()) { 00837 00838 if (true == this->info_->add_publication(current->domain, 00839 current->participant, 00840 current->topic, 00841 current->id, 00842 current->callback, 00843 current->datawriter_qos, 00844 current->transport_info, 00845 current->publisher_qos, 00846 true)) { 00847 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 00848 OpenDDS::DCPS::RepoIdConverter part_converter(current->participant); 00849 OpenDDS::DCPS::RepoIdConverter pub_converter(current->id); 00850 ACE_DEBUG((LM_DEBUG, 00851 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDeferred( PublicationUpdate): ") 00852 ACE_TEXT("repo %d - [ domain %d/ participant %C/ publication %C ]\n"), 00853 this->id().id(), 00854 current->domain, 00855 std::string(part_converter).c_str(), 00856 std::string(pub_converter).c_str())); 00857 } 00858 00859 current = this->deferredPublications_.erase(current); 00860 00861 } else { 00862 ++ current; 00863 } 00864 } 00865 } 00866 00867 { 00868 std::list<SubscriptionUpdate>::iterator current = this->deferredSubscriptions_.begin(); 00869 00870 while (current != this->deferredSubscriptions_.end()) { 00871 00872 if (true == this->info_->add_subscription(current->domain, 00873 current->participant, 00874 current->topic, 00875 current->id, 00876 current->callback, 00877 current->datareader_qos, 00878 current->transport_info, 00879 current->subscriber_qos, 00880 current->filter_class_name, 00881 current->filter_expression, 00882 current->expression_params, 00883 true)) { 00884 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 00885 OpenDDS::DCPS::RepoIdConverter part_converter(current->participant); 00886 OpenDDS::DCPS::RepoIdConverter sub_converter(current->id); 00887 ACE_DEBUG((LM_DEBUG, 00888 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDeferred( SubscriptionUpdate): ") 00889 ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"), 00890 this->id().id(), 00891 current->domain, 00892 std::string(part_converter).c_str(), 00893 std::string(sub_converter).c_str())); 00894 } 00895 00896 current = this->deferredSubscriptions_.erase(current); 00897 00898 } else { 00899 ++ current; 00900 } 00901 } 00902 } 00903 00904 }
void OpenDDS::Federator::ManagerImpl::processDelete | ( | const TopicUpdate * | sample, | |
const DDS::SampleInfo * | info | |||
) | [virtual] |
Delete a proxy for a topic.
Implements OpenDDS::Federator::UpdateProcessor< TopicUpdate >.
Definition at line 1208 of file FederatorManagerImpl_updates.cpp.
References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::TopicUpdate::domain, OpenDDS::Federator::TopicUpdate::id, info_, LM_DEBUG, OpenDDS::Federator::TopicUpdate::participant, and TAO_DDS_DCPSInfo_i::remove_topic().
01209 { 01210 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 01211 OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant); 01212 OpenDDS::DCPS::RepoIdConverter topic_converter(sample->id); 01213 ACE_DEBUG((LM_DEBUG, 01214 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( TopicUpdate): ") 01215 ACE_TEXT("repo %d - [ domain %d/ participant %C/ topic %C ]\n"), 01216 this->id().id(), 01217 sample->domain, 01218 std::string(part_converter).c_str(), 01219 std::string(topic_converter).c_str())); 01220 } 01221 01222 try { 01223 this->info_->remove_topic( 01224 sample->domain, 01225 sample->participant, 01226 sample->id); 01227 01228 } catch (OpenDDS::DCPS::Invalid_Participant&) { 01229 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 01230 ACE_DEBUG((LM_DEBUG, 01231 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( TopicUpdate): ") 01232 ACE_TEXT("the participant was already removed.\n"))); 01233 } 01234 } catch (OpenDDS::DCPS::Invalid_Domain&) { 01235 if (OpenDDS::DCPS::DCPS_debug_level > 1) { 01236 ACE_DEBUG((LM_DEBUG, 01237 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( TopicUpdate): ") 01238 ACE_TEXT("the domain %d no longer exists.\n"),sample->domain)); 01239 } 01240 } 01241 }
void OpenDDS::Federator::ManagerImpl::processDelete | ( | const ParticipantUpdate * | sample, | |
const DDS::SampleInfo * | info | |||
) | [virtual] |
Delete a proxy for a participant.
Implements OpenDDS::Federator::UpdateProcessor< ParticipantUpdate >.
Definition at line 1183 of file FederatorManagerImpl_updates.cpp.
References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::ParticipantUpdate::domain, OpenDDS::Federator::ParticipantUpdate::id, info_, LM_DEBUG, and TAO_DDS_DCPSInfo_i::remove_domain_participant().
01184 { 01185 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 01186 OpenDDS::DCPS::RepoIdConverter converter(sample->id); 01187 ACE_DEBUG((LM_DEBUG, 01188 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( ParticipantUpdate): ") 01189 ACE_TEXT("repo %d - [ domain %d/ participant %C ]\n"), 01190 this->id().id(), 01191 sample->domain, 01192 std::string(converter).c_str())); 01193 } 01194 try { 01195 this->info_->remove_domain_participant( 01196 sample->domain, 01197 sample->id); 01198 } catch (OpenDDS::DCPS::Invalid_Participant&) { 01199 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 01200 ACE_DEBUG((LM_DEBUG, 01201 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( ParticipantUpdate): ") 01202 ACE_TEXT("the participant was already removed.\n"))); 01203 } 01204 } 01205 }
void OpenDDS::Federator::ManagerImpl::processDelete | ( | const SubscriptionUpdate * | sample, | |
const DDS::SampleInfo * | info | |||
) | [virtual] |
Delete a proxy for a subscription.
Implements OpenDDS::Federator::UpdateProcessor< SubscriptionUpdate >.
Definition at line 1153 of file FederatorManagerImpl_updates.cpp.
References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::SubscriptionUpdate::domain, OpenDDS::Federator::SubscriptionUpdate::id, info_, LM_DEBUG, OpenDDS::Federator::SubscriptionUpdate::participant, and TAO_DDS_DCPSInfo_i::remove_subscription().
01154 { 01155 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 01156 OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant); 01157 OpenDDS::DCPS::RepoIdConverter sub_converter(sample->id); 01158 ACE_DEBUG((LM_DEBUG, 01159 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( SubscriptionUpdate): ") 01160 ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"), 01161 this->id().id(), 01162 sample->domain, 01163 std::string(part_converter).c_str(), 01164 std::string(sub_converter).c_str())); 01165 } 01166 01167 try { 01168 this->info_->remove_subscription( 01169 sample->domain, 01170 sample->participant, 01171 sample->id); 01172 01173 } catch (OpenDDS::DCPS::Invalid_Participant&) { 01174 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 01175 ACE_DEBUG((LM_DEBUG, 01176 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( SubscriptionUpdate): ") 01177 ACE_TEXT("the participant was already removed.\n"))); 01178 } 01179 } 01180 }
void OpenDDS::Federator::ManagerImpl::processDelete | ( | const PublicationUpdate * | sample, | |
const DDS::SampleInfo * | info | |||
) | [virtual] |
Delete a proxy for a publication.
Implements OpenDDS::Federator::UpdateProcessor< PublicationUpdate >.
Definition at line 1123 of file FederatorManagerImpl_updates.cpp.
References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::PublicationUpdate::domain, OpenDDS::Federator::PublicationUpdate::id, info_, LM_DEBUG, OpenDDS::Federator::PublicationUpdate::participant, and TAO_DDS_DCPSInfo_i::remove_publication().
01124 { 01125 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 01126 OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant); 01127 OpenDDS::DCPS::RepoIdConverter pub_converter(sample->id); 01128 ACE_DEBUG((LM_DEBUG, 01129 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( PublicationUpdate): ") 01130 ACE_TEXT("repo %d - [ domain %d/ participant %C/ publication %C ]\n"), 01131 this->id().id(), 01132 sample->domain, 01133 std::string(part_converter).c_str(), 01134 std::string(pub_converter).c_str())); 01135 } 01136 01137 try { 01138 this->info_->remove_publication( 01139 sample->domain, 01140 sample->participant, 01141 sample->id); 01142 01143 } catch (OpenDDS::DCPS::Invalid_Participant&) { 01144 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 01145 ACE_DEBUG((LM_DEBUG, 01146 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( PublicationUpdate): ") 01147 ACE_TEXT("the participant was already removed.\n"))); 01148 } 01149 } 01150 }
void OpenDDS::Federator::ManagerImpl::processDelete | ( | const OwnerUpdate * | sample, | |
const DDS::SampleInfo * | info | |||
) | [virtual] |
Null implementation for OwnerUpdate samples.
Implements OpenDDS::Federator::UpdateProcessor< OwnerUpdate >.
Definition at line 1091 of file FederatorManagerImpl_updates.cpp.
References ACE_TEXT(), TAO_DDS_DCPSInfo_i::changeOwnership(), OpenDDS::DCPS::DCPS_debug_level, deferred_lock_, deferredOwnerships_, OpenDDS::Federator::OwnerUpdate::domain, info_, LM_DEBUG, OpenDDS::Federator::OwnerUpdate::owner, OpenDDS::Federator::OwnerUpdate::participant, and OpenDDS::Federator::OwnerUpdate::sender.
01092 { 01093 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 01094 OpenDDS::DCPS::RepoIdConverter converter(sample->participant); 01095 ACE_DEBUG((LM_DEBUG, 01096 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( OwnerUpdate): ") 01097 ACE_TEXT("repo %d - [ domain %d/ participant %C/ sender %d/ owner %d ]\n"), 01098 this->id().id(), 01099 sample->domain, 01100 std::string(converter).c_str(), 01101 sample->sender, 01102 sample->owner)); 01103 } 01104 01105 // We could generate an error message here. Instead we let action be irrelevant. 01106 if (false == this->info_->changeOwnership(sample->domain, 01107 sample->participant, 01108 sample->sender, 01109 sample->owner)) { 01110 { 01111 ACE_GUARD(ACE_Thread_Mutex, 01112 guard, 01113 this->deferred_lock_); 01114 this->deferredOwnerships_.push_back(*sample); 01115 } 01116 ACE_DEBUG((LM_DEBUG, 01117 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( OwnerUpdate): ") 01118 ACE_TEXT("deferred update.\n"))); 01119 } 01120 }
void OpenDDS::Federator::ManagerImpl::processUpdateFilterExpressionParams | ( | const SubscriptionUpdate * | sample, | |
const DDS::SampleInfo * | info | |||
) | [virtual] |
Update the proxy filter expression params for a subscription.
Reimplemented from OpenDDS::Federator::UpdateProcessor< SubscriptionUpdate >.
Definition at line 1027 of file FederatorManagerImpl_updates.cpp.
References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::SubscriptionUpdate::domain, OpenDDS::Federator::SubscriptionUpdate::expression_params, OpenDDS::Federator::SubscriptionUpdate::id, info_, LM_DEBUG, OpenDDS::Federator::SubscriptionUpdate::participant, and TAO_DDS_DCPSInfo_i::update_subscription_params().
01029 { 01030 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 01031 OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant); 01032 OpenDDS::DCPS::RepoIdConverter sub_converter(sample->id); 01033 ACE_DEBUG((LM_DEBUG, 01034 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processUpdateFilterExpressionParams(SubscriptionUpdate): ") 01035 ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"), 01036 this->id().id(), 01037 sample->domain, 01038 std::string(part_converter).c_str(), 01039 std::string(sub_converter).c_str())); 01040 } 01041 01042 this->info_->update_subscription_params( 01043 sample->domain, 01044 sample->participant, 01045 sample->id, 01046 sample->expression_params); 01047 }
void OpenDDS::Federator::ManagerImpl::processUpdateQos1 | ( | const TopicUpdate * | sample, | |
const DDS::SampleInfo * | info | |||
) | [virtual] |
Update the proxy TopicQos for a topic.
Implements OpenDDS::Federator::UpdateProcessor< TopicUpdate >.
Definition at line 1069 of file FederatorManagerImpl_updates.cpp.
References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::TopicUpdate::domain, OpenDDS::Federator::TopicUpdate::id, info_, LM_DEBUG, OpenDDS::Federator::TopicUpdate::participant, OpenDDS::Federator::TopicUpdate::qos, and TAO_DDS_DCPSInfo_i::update_topic_qos().
01070 { 01071 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 01072 OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant); 01073 OpenDDS::DCPS::RepoIdConverter topic_converter(sample->id); 01074 ACE_DEBUG((LM_DEBUG, 01075 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processUpdateQos1( TopicUpdate): ") 01076 ACE_TEXT("repo %d - [ domain %d/ participant %C/ topic %C ]\n"), 01077 this->id().id(), 01078 sample->domain, 01079 std::string(part_converter).c_str(), 01080 std::string(topic_converter).c_str())); 01081 } 01082 01083 this->info_->update_topic_qos( 01084 sample->id, 01085 sample->domain, 01086 sample->participant, 01087 sample->qos); 01088 }
void OpenDDS::Federator::ManagerImpl::processUpdateQos1 | ( | const ParticipantUpdate * | sample, | |
const DDS::SampleInfo * | info | |||
) | [virtual] |
Update the proxy ParticipantQos for a participant.
Implements OpenDDS::Federator::UpdateProcessor< ParticipantUpdate >.
Definition at line 1050 of file FederatorManagerImpl_updates.cpp.
References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::ParticipantUpdate::domain, OpenDDS::Federator::ParticipantUpdate::id, info_, LM_DEBUG, OpenDDS::Federator::ParticipantUpdate::qos, and TAO_DDS_DCPSInfo_i::update_domain_participant_qos().
01051 { 01052 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 01053 OpenDDS::DCPS::RepoIdConverter converter(sample->id); 01054 ACE_DEBUG((LM_DEBUG, 01055 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processUpdateQos1( ParticipantUpdate): ") 01056 ACE_TEXT("repo %d - [ domain %d/ participant %C ]\n"), 01057 this->id().id(), 01058 sample->domain, 01059 std::string(converter).c_str())); 01060 } 01061 01062 this->info_->update_domain_participant_qos( 01063 sample->domain, 01064 sample->id, 01065 sample->qos); 01066 }
void OpenDDS::Federator::ManagerImpl::processUpdateQos1 | ( | const SubscriptionUpdate * | sample, | |
const DDS::SampleInfo * | info | |||
) | [virtual] |
Update the proxy DataReaderQos for a subscription.
Implements OpenDDS::Federator::UpdateProcessor< SubscriptionUpdate >.
Definition at line 983 of file FederatorManagerImpl_updates.cpp.
References ACE_TEXT(), OpenDDS::Federator::SubscriptionUpdate::datareader_qos, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::SubscriptionUpdate::domain, OpenDDS::Federator::SubscriptionUpdate::id, info_, LM_DEBUG, OpenDDS::Federator::SubscriptionUpdate::participant, and TAO_DDS_DCPSInfo_i::update_subscription_qos().
00984 { 00985 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 00986 OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant); 00987 OpenDDS::DCPS::RepoIdConverter sub_converter(sample->id); 00988 ACE_DEBUG((LM_DEBUG, 00989 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processUpdateQos1( SubscriptionUpdate): ") 00990 ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"), 00991 this->id().id(), 00992 sample->domain, 00993 std::string(part_converter).c_str(), 00994 std::string(sub_converter).c_str())); 00995 } 00996 00997 this->info_->update_subscription_qos( 00998 sample->domain, 00999 sample->participant, 01000 sample->id, 01001 sample->datareader_qos); 01002 }
void OpenDDS::Federator::ManagerImpl::processUpdateQos1 | ( | const PublicationUpdate * | sample, | |
const DDS::SampleInfo * | info | |||
) | [virtual] |
Update the proxy DataWriterQos for a publication.
Implements OpenDDS::Federator::UpdateProcessor< PublicationUpdate >.
Definition at line 939 of file FederatorManagerImpl_updates.cpp.
References ACE_TEXT(), OpenDDS::Federator::PublicationUpdate::datawriter_qos, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::PublicationUpdate::domain, OpenDDS::Federator::PublicationUpdate::id, info_, LM_DEBUG, OpenDDS::Federator::PublicationUpdate::participant, and TAO_DDS_DCPSInfo_i::update_publication_qos().
00940 { 00941 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 00942 OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant); 00943 OpenDDS::DCPS::RepoIdConverter pub_converter(sample->id); 00944 ACE_DEBUG((LM_DEBUG, 00945 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processUpdateQos1( PublicationUpdate): ") 00946 ACE_TEXT("repo %d - [ domain %d/ participant %C/ publication %C ]\n"), 00947 this->id().id(), 00948 sample->domain, 00949 std::string(part_converter).c_str(), 00950 std::string(pub_converter).c_str())); 00951 } 00952 00953 this->info_->update_publication_qos( 00954 sample->domain, 00955 sample->participant, 00956 sample->id, 00957 sample->datawriter_qos); 00958 }
void OpenDDS::Federator::ManagerImpl::processUpdateQos1 | ( | const OwnerUpdate * | sample, | |
const DDS::SampleInfo * | info | |||
) | [virtual] |
Process ownership changes.
Implements OpenDDS::Federator::UpdateProcessor< OwnerUpdate >.
Definition at line 907 of file FederatorManagerImpl_updates.cpp.
References ACE_TEXT(), TAO_DDS_DCPSInfo_i::changeOwnership(), OpenDDS::DCPS::DCPS_debug_level, deferred_lock_, deferredOwnerships_, OpenDDS::Federator::OwnerUpdate::domain, info_, LM_DEBUG, OpenDDS::Federator::OwnerUpdate::owner, OpenDDS::Federator::OwnerUpdate::participant, and OpenDDS::Federator::OwnerUpdate::sender.
00908 { 00909 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 00910 OpenDDS::DCPS::RepoIdConverter converter(sample->participant); 00911 ACE_DEBUG((LM_DEBUG, 00912 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processUpdateQos1( OwnerUpdate): ") 00913 ACE_TEXT("repo %d - [ domain %d/ participant %C/ sender %d/ owner %d ]\n"), 00914 this->id().id(), 00915 sample->domain, 00916 std::string(converter).c_str(), 00917 sample->sender, 00918 sample->owner)); 00919 } 00920 00921 if (false == this->info_->changeOwnership(sample->domain, 00922 sample->participant, 00923 sample->sender, 00924 sample->owner)) { 00925 { 00926 ACE_GUARD(ACE_Thread_Mutex, 00927 guard, 00928 this->deferred_lock_); 00929 00930 this->deferredOwnerships_.push_back(*sample); 00931 } 00932 ACE_DEBUG((LM_DEBUG, 00933 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processUpdateQos1( OwnerUpdate): ") 00934 ACE_TEXT("deferred update.\n"))); 00935 } 00936 }
void OpenDDS::Federator::ManagerImpl::processUpdateQos2 | ( | const SubscriptionUpdate * | sample, | |
const DDS::SampleInfo * | info | |||
) | [virtual] |
Update the proxy SubscriberQos for a subscription.
Reimplemented from OpenDDS::Federator::UpdateProcessor< SubscriptionUpdate >.
Definition at line 1005 of file FederatorManagerImpl_updates.cpp.
References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::SubscriptionUpdate::domain, OpenDDS::Federator::SubscriptionUpdate::id, info_, LM_DEBUG, OpenDDS::Federator::SubscriptionUpdate::participant, OpenDDS::Federator::SubscriptionUpdate::subscriber_qos, and TAO_DDS_DCPSInfo_i::update_subscription_qos().
01006 { 01007 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 01008 OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant); 01009 OpenDDS::DCPS::RepoIdConverter sub_converter(sample->id); 01010 ACE_DEBUG((LM_DEBUG, 01011 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processUpdateQos2( SubscriptionUpdate): ") 01012 ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"), 01013 this->id().id(), 01014 sample->domain, 01015 std::string(part_converter).c_str(), 01016 std::string(sub_converter).c_str())); 01017 } 01018 01019 this->info_->update_subscription_qos( 01020 sample->domain, 01021 sample->participant, 01022 sample->id, 01023 sample->subscriber_qos); 01024 }
void OpenDDS::Federator::ManagerImpl::processUpdateQos2 | ( | const PublicationUpdate * | sample, | |
const DDS::SampleInfo * | info | |||
) | [virtual] |
Update the proxy PublisherQos for a publication.
Reimplemented from OpenDDS::Federator::UpdateProcessor< PublicationUpdate >.
Definition at line 961 of file FederatorManagerImpl_updates.cpp.
References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::PublicationUpdate::domain, OpenDDS::Federator::PublicationUpdate::id, info_, LM_DEBUG, OpenDDS::Federator::PublicationUpdate::participant, OpenDDS::Federator::PublicationUpdate::publisher_qos, and TAO_DDS_DCPSInfo_i::update_publication_qos().
00962 { 00963 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 00964 OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant); 00965 OpenDDS::DCPS::RepoIdConverter pub_converter(sample->id); 00966 ACE_DEBUG((LM_DEBUG, 00967 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processUpdateQos2( PublicationUpdate): ") 00968 ACE_TEXT("repo %d - [ domain %d/ participant %C/ publication %C ]\n"), 00969 this->id().id(), 00970 sample->domain, 00971 std::string(part_converter).c_str(), 00972 std::string(pub_converter).c_str())); 00973 } 00974 00975 this->info_->update_publication_qos( 00976 sample->domain, 00977 sample->participant, 00978 sample->id, 00979 sample->publisher_qos); 00980 }
void OpenDDS::Federator::ManagerImpl::pushState | ( | Manager_ptr | peer | ) |
Push our current state to a remote repository.
Definition at line 1244 of file FederatorManagerImpl_updates.cpp.
References OpenDDS::Federator::SubscriptionUpdate::action, OpenDDS::Federator::PublicationUpdate::action, OpenDDS::Federator::TopicUpdate::action, OpenDDS::Federator::OwnerUpdate::action, OpenDDS::Federator::ParticipantUpdate::action, OpenDDS::Federator::SubscriptionUpdate::callback, OpenDDS::Federator::PublicationUpdate::callback, config_, OpenDDS::Federator::CreateEntity, OpenDDS::Federator::SubscriptionUpdate::datareader_qos, OpenDDS::Federator::TopicUpdate::datatype, OpenDDS::Federator::PublicationUpdate::datawriter_qos, OpenDDS::Federator::SubscriptionUpdate::domain, OpenDDS::Federator::PublicationUpdate::domain, OpenDDS::Federator::TopicUpdate::domain, OpenDDS::Federator::OwnerUpdate::domain, OpenDDS::Federator::ParticipantUpdate::domain, TAO_DDS_DCPSInfo_i::domains(), OpenDDS::Federator::SubscriptionUpdate::expression_params, OpenDDS::Federator::Config::federationDomain(), OpenDDS::Federator::SubscriptionUpdate::filter_expression, DCPS_IR_Subscription::get_datareader_qos(), DCPS_IR_Publication::get_datawriter_qos(), DCPS_IR_Subscription::get_expr_params(), DCPS_IR_Subscription::get_filter_expression(), DCPS_IR_Subscription::get_id(), DCPS_IR_Publication::get_id(), DCPS_IR_Subscription::get_participant_id(), DCPS_IR_Publication::get_participant_id(), DCPS_IR_Publication::get_publisher_qos(), DCPS_IR_Subscription::get_subscriber_qos(), DCPS_IR_Subscription::get_topic_id(), DCPS_IR_Publication::get_topic_id(), DCPS_IR_Subscription::get_transportLocatorSeq(), DCPS_IR_Publication::get_transportLocatorSeq(), OpenDDS::Federator::SubscriptionUpdate::id, OpenDDS::Federator::PublicationUpdate::id, OpenDDS::Federator::TopicUpdate::id, OpenDDS::Federator::ParticipantUpdate::id, TAO_DDS_DCPSFederationId::id(), id(), info_, TAO_DDS_DCPSInfo_i::orb(), orb(), OpenDDS::Federator::OwnerUpdate::owner, OpenDDS::Federator::ParticipantUpdate::owner, OpenDDS::Federator::SubscriptionUpdate::participant, OpenDDS::Federator::PublicationUpdate::participant, OpenDDS::Federator::TopicUpdate::participant, OpenDDS::Federator::OwnerUpdate::participant, OpenDDS::Federator::PublicationUpdate::publisher_qos, OpenDDS::Federator::TopicUpdate::qos, OpenDDS::Federator::ParticipantUpdate::qos, DCPS_IR_Subscription::reader(), OpenDDS::Federator::SubscriptionUpdate::sender, OpenDDS::Federator::PublicationUpdate::sender, OpenDDS::Federator::TopicUpdate::sender, OpenDDS::Federator::OwnerUpdate::sender, OpenDDS::Federator::ParticipantUpdate::sender, OpenDDS::Federator::SubscriptionUpdate::subscriber_qos, OpenDDS::Federator::SubscriptionUpdate::topic, OpenDDS::Federator::PublicationUpdate::topic, OpenDDS::Federator::TopicUpdate::topic, OpenDDS::Federator::SubscriptionUpdate::transport_info, OpenDDS::Federator::PublicationUpdate::transport_info, and DCPS_IR_Publication::writer().
Referenced by join_federation().
01245 { 01246 // foreach DCPS_IR_Domain 01247 // foreach DCPS_IR_Participant 01248 // peer->initializeParticipant(...) 01249 // peer->initializeOwner(...) 01250 // foreach DCPS_IR_Participant 01251 // foreach DCPS_IR_Topic 01252 // peer->initializeTopic(...) 01253 // foreach DCPS_IR_Publication 01254 // peer->initializePublication(...) 01255 // foreach DCPS_IR_Subscription 01256 // peer->initializeSubscription(...) 01257 01258 // Process each domain within the repository. 01259 for (DCPS_IR_Domain_Map::const_iterator currentDomain 01260 = this->info_->domains().begin(); 01261 currentDomain != this->info_->domains().end(); 01262 ++currentDomain) { 01263 01264 if (currentDomain->second->get_id() == this->config_.federationDomain()) { 01265 // Do not push the Federation domain publications. 01266 //continue; 01267 } 01268 01269 // Process each participant within the current domain. 01270 for (DCPS_IR_Participant_Map::const_iterator currentParticipant 01271 = currentDomain->second->participants().begin(); 01272 currentParticipant != currentDomain->second->participants().end(); 01273 ++currentParticipant) { 01274 01275 if (currentParticipant->second->isBitPublisher() == true) { 01276 // Do not push the built-in topic publications. 01277 continue; 01278 } 01279 01280 // Initialize the participant on the peer. 01281 ParticipantUpdate participantSample; 01282 participantSample.sender = this->id().id(); 01283 participantSample.action = CreateEntity; 01284 01285 participantSample.owner = currentParticipant->second->owner(); 01286 participantSample.domain = currentDomain->second->get_id(); 01287 participantSample.id = currentParticipant->second->get_id(); 01288 participantSample.qos = *currentParticipant->second->get_qos(); 01289 01290 peer->initializeParticipant(participantSample); 01291 01292 // Initialize the ownership of the participant on the peer. 01293 OwnerUpdate ownerSample; 01294 ownerSample.sender = this->id().id(); 01295 ownerSample.action = CreateEntity; 01296 01297 ownerSample.domain = currentDomain->second->get_id(); 01298 ownerSample.participant = currentParticipant->second->get_id(); 01299 ownerSample.owner = currentParticipant->second->owner(); 01300 01301 peer->initializeOwner(ownerSample); 01302 } 01303 01304 // Process each participant within the current domain. 01305 for (DCPS_IR_Participant_Map::const_iterator currentParticipant 01306 = currentDomain->second->participants().begin(); 01307 currentParticipant != currentDomain->second->participants().end(); 01308 ++currentParticipant) { 01309 01310 if (currentParticipant->second->isBitPublisher() == true) { 01311 // Do not push the built-in topic publications. 01312 continue; 01313 } 01314 01315 // Process each topic within the current particpant. 01316 for (DCPS_IR_Topic_Map::const_iterator currentTopic 01317 = currentParticipant->second->topics().begin(); 01318 currentTopic != currentParticipant->second->topics().end(); 01319 ++currentTopic) { 01320 TopicUpdate topicSample; 01321 topicSample.sender = this->id().id(); 01322 topicSample.action = CreateEntity; 01323 01324 topicSample.id = currentTopic->second->get_id(); 01325 topicSample.domain = currentDomain->second->get_id(); 01326 topicSample.participant = currentTopic->second->get_participant_id(); 01327 topicSample.topic = currentTopic->second->get_topic_description()->get_name(); 01328 topicSample.datatype = currentTopic->second->get_topic_description()->get_dataTypeName(); 01329 topicSample.qos = *currentTopic->second->get_topic_qos(); 01330 01331 peer->initializeTopic(topicSample); 01332 } 01333 01334 // Process each publication within the current particpant. 01335 for (DCPS_IR_Publication_Map::const_iterator currentPublication 01336 = currentParticipant->second->publications().begin(); 01337 currentPublication != currentParticipant->second->publications().end(); 01338 ++currentPublication) { 01339 PublicationUpdate publicationSample; 01340 publicationSample.sender = this->id().id(); 01341 publicationSample.action = CreateEntity; 01342 01343 DCPS_IR_Publication* p = currentPublication->second.get(); 01344 CORBA::ORB_var orb = this->info_->orb(); 01345 CORBA::String_var callback = orb->object_to_string(p->writer()); 01346 01347 publicationSample.domain = currentDomain->second->get_id(); 01348 publicationSample.participant = p->get_participant_id(); 01349 publicationSample.topic = p->get_topic_id(); 01350 publicationSample.id = p->get_id(); 01351 publicationSample.callback = callback.in(); 01352 publicationSample.datawriter_qos = *p->get_datawriter_qos(); 01353 publicationSample.publisher_qos = *p->get_publisher_qos(); 01354 publicationSample.transport_info = p->get_transportLocatorSeq(); 01355 01356 peer->initializePublication(publicationSample); 01357 } 01358 01359 // Process each subscription within the current particpant. 01360 for (DCPS_IR_Subscription_Map::const_iterator currentSubscription 01361 = currentParticipant->second->subscriptions().begin(); 01362 currentSubscription != currentParticipant->second->subscriptions().end(); 01363 ++currentSubscription) { 01364 SubscriptionUpdate subscriptionSample; 01365 subscriptionSample.sender = this->id().id(); 01366 subscriptionSample.action = CreateEntity; 01367 01368 DCPS_IR_Subscription* s = currentSubscription->second.get(); 01369 CORBA::ORB_var orb = this->info_->orb(); 01370 CORBA::String_var callback = orb->object_to_string(s->reader()); 01371 01372 subscriptionSample.domain = currentDomain->second->get_id(); 01373 subscriptionSample.participant = s->get_participant_id(); 01374 subscriptionSample.topic = s->get_topic_id(); 01375 subscriptionSample.id = s->get_id(); 01376 subscriptionSample.callback = callback.in(); 01377 subscriptionSample.datareader_qos = *s->get_datareader_qos(); 01378 subscriptionSample.subscriber_qos = *s->get_subscriber_qos(); 01379 subscriptionSample.transport_info = s->get_transportLocatorSeq(); 01380 subscriptionSample.filter_expression = s->get_filter_expression().c_str(); 01381 subscriptionSample.expression_params = s->get_expr_params(); 01382 01383 peer->initializeSubscription(subscriptionSample); 01384 } 01385 } 01386 } 01387 }
OpenDDS::DCPS::DCPSInfo_ptr OpenDDS::Federator::ManagerImpl::repository | ( | void | ) | [virtual] |
Definition at line 878 of file FederatorManagerImpl.cpp.
References ACE_TEXT(), config_, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::Config::federationDomain(), CORBA::is_nil(), OpenDDS::DCPS::RcHandle< T >::is_nil(), LM_DEBUG, localRepo_, OpenDDS::DCPS::static_rchandle_cast(), and TheServiceParticipant.
00879 { 00880 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 00881 ACE_DEBUG((LM_DEBUG, 00882 ACE_TEXT("(%P|%t) ManagerImpl::repository()\n"))); 00883 } 00884 00885 OpenDDS::DCPS::Discovery_rch disco 00886 = TheServiceParticipant->get_discovery( 00887 this->config_.federationDomain()); 00888 OpenDDS::DCPS::DCPSInfo_var repo; 00889 if (!disco.is_nil()) { 00890 OpenDDS::DCPS::InfoRepoDiscovery_rch irDisco = 00891 DCPS::static_rchandle_cast<DCPS::InfoRepoDiscovery>(disco); 00892 repo = irDisco->get_dcps_info(); 00893 } 00894 00895 if (CORBA::is_nil(repo.in())) { 00896 return OpenDDS::DCPS::DCPSInfo::_duplicate(this->localRepo_.in()); 00897 00898 } else { 00899 return OpenDDS::DCPS::DCPSInfo::_duplicate(repo.in()); 00900 } 00901 }
void OpenDDS::Federator::ManagerImpl::requestImage | ( | ) | [virtual] |
Request an image refresh to be sent to the specified callback (asynchronously).
Implements Update::Updater.
Definition at line 28 of file FederatorManagerImpl_updates.cpp.
void OpenDDS::Federator::ManagerImpl::shutdown | ( | void | ) | [virtual] |
Definition at line 1118 of file FederatorManagerImpl.cpp.
References federated_, info_, and TAO_DDS_DCPSInfo_i::shutdown().
01120 { 01121 // Prevent the removal of this repository from the federation during 01122 // shutdown processing. 01123 this->federated_ = false; 01124 01125 // Shutdown the process via the repository object. 01126 this->info_->shutdown(); 01127 }
void OpenDDS::Federator::ManagerImpl::unregisterCallback | ( | ) | [virtual] |
Definition at line 22 of file FederatorManagerImpl_updates.cpp.
void OpenDDS::Federator::ManagerImpl::update | ( | const Update::IdPath & | id, | |
const DDS::StringSeq & | exprParams | |||
) | [virtual] |
Propagate an entity has been created.
Implements Update::Updater.
Definition at line 508 of file FederatorManagerImpl_updates.cpp.
References ACE_TEXT(), OpenDDS::Federator::SubscriptionUpdate::action, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::SubscriptionUpdate::domain, OpenDDS::Federator::SubscriptionUpdate::expression_params, DDS::HANDLE_NIL, OpenDDS::Federator::SubscriptionUpdate::id, TAO_DDS_DCPSFederationId::id(), id(), CORBA::is_nil(), LM_DEBUG, OpenDDS::Federator::SubscriptionUpdate::participant, OpenDDS::Federator::SubscriptionUpdate::sender, subscriptionWriter_, and OpenDDS::Federator::UpdateFilterExpressionParams.
00509 { 00510 if (CORBA::is_nil(this->subscriptionWriter_.in())) { 00511 // Decline to publish data until we can. 00512 return; 00513 } 00514 00515 SubscriptionUpdate sample; 00516 sample.sender = this->id().id(); 00517 sample.action = UpdateFilterExpressionParams; 00518 sample.domain = id.domain; 00519 sample.participant = id.participant; 00520 sample.id = id.id; 00521 sample.expression_params = params; 00522 00523 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 00524 OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant); 00525 OpenDDS::DCPS::RepoIdConverter sub_converter(sample.id); 00526 ACE_DEBUG((LM_DEBUG, 00527 ACE_TEXT("(%P|%t) Federator::ManagerImpl::update(FilterParams): ") 00528 ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"), 00529 this->id().id(), 00530 sample.domain, 00531 std::string(part_converter).c_str(), 00532 std::string(sub_converter).c_str())); 00533 } 00534 00535 this->subscriptionWriter_->write(sample, DDS::HANDLE_NIL); 00536 }
void OpenDDS::Federator::ManagerImpl::update | ( | const Update::IdPath & | id, | |
const DDS::SubscriberQos & | qos | |||
) | [virtual] |
Propagate an entity has been created.
Implements Update::Updater.
Definition at line 539 of file FederatorManagerImpl_updates.cpp.
References ACE_TEXT(), OpenDDS::Federator::SubscriptionUpdate::action, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::SubscriptionUpdate::domain, DDS::HANDLE_NIL, OpenDDS::Federator::SubscriptionUpdate::id, TAO_DDS_DCPSFederationId::id(), id(), CORBA::is_nil(), LM_DEBUG, OpenDDS::Federator::SubscriptionUpdate::participant, OpenDDS::Federator::SubscriptionUpdate::sender, OpenDDS::Federator::SubscriptionUpdate::subscriber_qos, subscriptionWriter_, and OpenDDS::Federator::UpdateQosValue2.
00540 { 00541 if (CORBA::is_nil(this->subscriptionWriter_.in())) { 00542 // Decline to publish data until we can. 00543 return; 00544 } 00545 00546 SubscriptionUpdate sample; 00547 sample.sender = this->id().id(); 00548 sample.action = UpdateQosValue2; 00549 00550 sample.domain = id.domain; 00551 sample.participant = id.participant; 00552 sample.id = id.id; 00553 sample.subscriber_qos = qos; 00554 00555 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 00556 OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant); 00557 OpenDDS::DCPS::RepoIdConverter sub_converter(sample.id); 00558 ACE_DEBUG((LM_DEBUG, 00559 ACE_TEXT("(%P|%t) Federator::ManagerImpl::update( SubscriberUpdate): ") 00560 ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"), 00561 this->id().id(), 00562 sample.domain, 00563 std::string(part_converter).c_str(), 00564 std::string(sub_converter).c_str())); 00565 } 00566 00567 this->subscriptionWriter_->write(sample, DDS::HANDLE_NIL); 00568 }
void OpenDDS::Federator::ManagerImpl::update | ( | const Update::IdPath & | id, | |
const DDS::DataReaderQos & | qos | |||
) | [virtual] |
Propagate an entity has been created.
Implements Update::Updater.
Definition at line 476 of file FederatorManagerImpl_updates.cpp.
References ACE_TEXT(), OpenDDS::Federator::SubscriptionUpdate::action, OpenDDS::Federator::SubscriptionUpdate::datareader_qos, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::SubscriptionUpdate::domain, DDS::HANDLE_NIL, OpenDDS::Federator::SubscriptionUpdate::id, TAO_DDS_DCPSFederationId::id(), id(), CORBA::is_nil(), LM_DEBUG, OpenDDS::Federator::SubscriptionUpdate::participant, OpenDDS::Federator::SubscriptionUpdate::sender, subscriptionWriter_, and OpenDDS::Federator::UpdateQosValue1.
00477 { 00478 if (CORBA::is_nil(this->subscriptionWriter_.in())) { 00479 // Decline to publish data until we can. 00480 return; 00481 } 00482 00483 SubscriptionUpdate sample; 00484 sample.sender = this->id().id(); 00485 sample.action = UpdateQosValue1; 00486 00487 sample.domain = id.domain; 00488 sample.participant = id.participant; 00489 sample.id = id.id; 00490 sample.datareader_qos = qos; 00491 00492 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 00493 OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant); 00494 OpenDDS::DCPS::RepoIdConverter sub_converter(sample.id); 00495 ACE_DEBUG((LM_DEBUG, 00496 ACE_TEXT("(%P|%t) Federator::ManagerImpl::update( ReaderUpdate): ") 00497 ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"), 00498 this->id().id(), 00499 sample.domain, 00500 std::string(part_converter).c_str(), 00501 std::string(sub_converter).c_str())); 00502 } 00503 00504 this->subscriptionWriter_->write(sample, DDS::HANDLE_NIL); 00505 }
void OpenDDS::Federator::ManagerImpl::update | ( | const Update::IdPath & | id, | |
const DDS::PublisherQos & | qos | |||
) | [virtual] |
Propagate an entity has been created.
Implements Update::Updater.
Definition at line 444 of file FederatorManagerImpl_updates.cpp.
References ACE_TEXT(), OpenDDS::Federator::PublicationUpdate::action, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::PublicationUpdate::domain, DDS::HANDLE_NIL, OpenDDS::Federator::PublicationUpdate::id, TAO_DDS_DCPSFederationId::id(), id(), CORBA::is_nil(), LM_DEBUG, OpenDDS::Federator::PublicationUpdate::participant, publicationWriter_, OpenDDS::Federator::PublicationUpdate::publisher_qos, OpenDDS::Federator::PublicationUpdate::sender, and OpenDDS::Federator::UpdateQosValue2.
00445 { 00446 if (CORBA::is_nil(this->publicationWriter_.in())) { 00447 // Decline to publish data until we can. 00448 return; 00449 } 00450 00451 PublicationUpdate sample; 00452 sample.sender = this->id().id(); 00453 sample.action = UpdateQosValue2; 00454 00455 sample.domain = id.domain; 00456 sample.participant = id.participant; 00457 sample.id = id.id; 00458 sample.publisher_qos = qos; 00459 00460 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 00461 OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant); 00462 OpenDDS::DCPS::RepoIdConverter pub_converter(sample.id); 00463 ACE_DEBUG((LM_DEBUG, 00464 ACE_TEXT("(%P|%t) Federator::ManagerImpl::update( PublisherUpdate): ") 00465 ACE_TEXT("repo %d - [ domain %d/ participant %C/ publication %C ]\n"), 00466 this->id().id(), 00467 sample.domain, 00468 std::string(part_converter).c_str(), 00469 std::string(pub_converter).c_str())); 00470 } 00471 00472 this->publicationWriter_->write(sample, DDS::HANDLE_NIL); 00473 }
void OpenDDS::Federator::ManagerImpl::update | ( | const Update::IdPath & | id, | |
const DDS::DataWriterQos & | qos | |||
) | [virtual] |
Propagate an entity has been created.
Implements Update::Updater.
Definition at line 412 of file FederatorManagerImpl_updates.cpp.
References ACE_TEXT(), OpenDDS::Federator::PublicationUpdate::action, OpenDDS::Federator::PublicationUpdate::datawriter_qos, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::PublicationUpdate::domain, DDS::HANDLE_NIL, OpenDDS::Federator::PublicationUpdate::id, TAO_DDS_DCPSFederationId::id(), id(), CORBA::is_nil(), LM_DEBUG, OpenDDS::Federator::PublicationUpdate::participant, publicationWriter_, OpenDDS::Federator::PublicationUpdate::sender, and OpenDDS::Federator::UpdateQosValue1.
00413 { 00414 if (CORBA::is_nil(this->publicationWriter_.in())) { 00415 // Decline to publish data until we can. 00416 return; 00417 } 00418 00419 PublicationUpdate sample; 00420 sample.sender = this->id().id(); 00421 sample.action = UpdateQosValue1; 00422 00423 sample.domain = id.domain; 00424 sample.participant = id.participant; 00425 sample.id = id.id; 00426 sample.datawriter_qos = qos; 00427 00428 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 00429 OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant); 00430 OpenDDS::DCPS::RepoIdConverter pub_converter(sample.id); 00431 ACE_DEBUG((LM_DEBUG, 00432 ACE_TEXT("(%P|%t) Federator::ManagerImpl::update( WriterUpdate): ") 00433 ACE_TEXT("repo %d - [ domain %d/ participant %C/ publication %C ]\n"), 00434 this->id().id(), 00435 sample.domain, 00436 std::string(part_converter).c_str(), 00437 std::string(pub_converter).c_str())); 00438 } 00439 00440 this->publicationWriter_->write(sample, DDS::HANDLE_NIL); 00441 }
void OpenDDS::Federator::ManagerImpl::update | ( | const Update::IdPath & | id, | |
const DDS::TopicQos & | qos | |||
) | [virtual] |
Propagate an entity has been created.
Implements Update::Updater.
Definition at line 380 of file FederatorManagerImpl_updates.cpp.
References ACE_TEXT(), OpenDDS::Federator::TopicUpdate::action, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::TopicUpdate::domain, DDS::HANDLE_NIL, OpenDDS::Federator::TopicUpdate::id, TAO_DDS_DCPSFederationId::id(), id(), CORBA::is_nil(), LM_DEBUG, OpenDDS::Federator::TopicUpdate::participant, OpenDDS::Federator::TopicUpdate::qos, OpenDDS::Federator::TopicUpdate::sender, topicWriter_, and OpenDDS::Federator::UpdateQosValue1.
00381 { 00382 if (CORBA::is_nil(this->topicWriter_.in())) { 00383 // Decline to publish data until we can. 00384 return; 00385 } 00386 00387 TopicUpdate sample; 00388 sample.sender = this->id().id(); 00389 sample.action = UpdateQosValue1; 00390 00391 sample.id = id.id; 00392 sample.domain = id.domain; 00393 sample.participant = id.participant; 00394 sample.qos = qos; 00395 00396 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 00397 OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant); 00398 OpenDDS::DCPS::RepoIdConverter topic_converter(sample.id); 00399 ACE_DEBUG((LM_DEBUG, 00400 ACE_TEXT("(%P|%t) Federator::ManagerImpl::update( TopicUpdate): ") 00401 ACE_TEXT("repo %d - [ domain %d/ participant %C/ topic %C ]\n"), 00402 this->id().id(), 00403 sample.domain, 00404 std::string(part_converter).c_str(), 00405 std::string(topic_converter).c_str())); 00406 } 00407 00408 this->topicWriter_->write(sample, DDS::HANDLE_NIL); 00409 }
void OpenDDS::Federator::ManagerImpl::update | ( | const Update::IdPath & | id, | |
const DDS::DomainParticipantQos & | qos | |||
) | [virtual] |
Propagate updated Qos parameters for an entity.
Implements Update::Updater.
Definition at line 351 of file FederatorManagerImpl_updates.cpp.
References ACE_TEXT(), OpenDDS::Federator::ParticipantUpdate::action, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::ParticipantUpdate::domain, DDS::HANDLE_NIL, OpenDDS::Federator::ParticipantUpdate::id, TAO_DDS_DCPSFederationId::id(), id(), CORBA::is_nil(), LM_DEBUG, participantWriter_, OpenDDS::Federator::ParticipantUpdate::qos, OpenDDS::Federator::ParticipantUpdate::sender, and OpenDDS::Federator::UpdateQosValue1.
00352 { 00353 if (CORBA::is_nil(this->participantWriter_.in())) { 00354 // Decline to publish data until we can. 00355 return; 00356 } 00357 00358 ParticipantUpdate sample; 00359 sample.sender = this->id().id(); 00360 sample.action = UpdateQosValue1; 00361 00362 sample.domain = id.domain; 00363 sample.id = id.id; 00364 sample.qos = qos; 00365 00366 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 00367 OpenDDS::DCPS::RepoIdConverter converter(sample.id); 00368 ACE_DEBUG((LM_DEBUG, 00369 ACE_TEXT("(%P|%t) Federator::ManagerImpl::update( ParticipantUpdate): ") 00370 ACE_TEXT("repo %d - [ domain %d/ participant %C ]\n"), 00371 this->id().id(), 00372 sample.domain, 00373 std::string(converter).c_str())); 00374 } 00375 00376 this->participantWriter_->write(sample, DDS::HANDLE_NIL); 00377 }
Config& OpenDDS::Federator::ManagerImpl::config_ [private] |
The configuration information for this manager.
Definition at line 231 of file FederatorManagerImpl.h.
Referenced by destroy(), id(), initialize(), join_federation(), pushState(), and repository().
Protect deferred updates.
Definition at line 294 of file FederatorManagerImpl.h.
Referenced by processCreate(), processDeferred(), processDelete(), and processUpdateQos1().
std::list<OwnerUpdate> OpenDDS::Federator::ManagerImpl::deferredOwnerships_ [private] |
Deferred ownership updates.
Definition at line 279 of file FederatorManagerImpl.h.
Referenced by processCreate(), processDeferred(), processDelete(), and processUpdateQos1().
std::list<PublicationUpdate> OpenDDS::Federator::ManagerImpl::deferredPublications_ [private] |
Deferred publication updates.
Definition at line 285 of file FederatorManagerImpl.h.
Referenced by processCreate(), and processDeferred().
std::list<SubscriptionUpdate> OpenDDS::Federator::ManagerImpl::deferredSubscriptions_ [private] |
Deferred subscription updates.
Definition at line 288 of file FederatorManagerImpl.h.
Referenced by processCreate(), and processDeferred().
std::list<TopicUpdate> OpenDDS::Federator::ManagerImpl::deferredTopics_ [private] |
Deferred topic updates.
Definition at line 282 of file FederatorManagerImpl.h.
Referenced by processCreate(), and processDeferred().
bool OpenDDS::Federator::ManagerImpl::federated_ [private] |
Flag indicating that we are actively participating in a federation of repositories.
Definition at line 219 of file FederatorManagerImpl.h.
Referenced by finalize(), join_federation(), and shutdown().
DDS::DomainParticipant_var OpenDDS::Federator::ManagerImpl::federationParticipant_ [private] |
local DomainParticipant
Definition at line 246 of file FederatorManagerImpl.h.
Referenced by finalize(), and initialize().
The Info object reference to update.
Definition at line 234 of file FederatorManagerImpl.h.
Referenced by info(), leave_and_shutdown(), leave_federation(), processCreate(), processDeferred(), processDelete(), processUpdateFilterExpressionParams(), processUpdateQos1(), processUpdateQos2(), pushState(), and shutdown().
Simple recursion avoidance during the join operations.
Definition at line 212 of file FederatorManagerImpl.h.
Referenced by join_federation().
ACE_Condition<ACE_SYNCH_MUTEX> OpenDDS::Federator::ManagerImpl::joining_ [private] |
Condition used to gate joining activities.
Definition at line 209 of file FederatorManagerImpl.h.
Referenced by join_federation().
Repository to which we joined.
Definition at line 215 of file FederatorManagerImpl.h.
Referenced by finalize(), and join_federation().
OpenDDS::DCPS::DCPSInfo_var OpenDDS::Federator::ManagerImpl::localRepo_ [private] |
Remotely callable reference to the local repository.
Definition at line 237 of file FederatorManagerImpl.h.
Referenced by localRepo(), and repository().
ACE_SYNCH_MUTEX OpenDDS::Federator::ManagerImpl::lock_ [private] |
Critical section MUTEX.
Definition at line 206 of file FederatorManagerImpl.h.
Referenced by join_federation().
bool OpenDDS::Federator::ManagerImpl::multicastEnabled_ [private] |
Is multicast enabled?
Definition at line 291 of file FederatorManagerImpl.h.
Referenced by initialize(), and ManagerImpl().
Multicast responder.
Definition at line 243 of file FederatorManagerImpl.h.
Referenced by finalize(), and initialize().
The ORB in which we are activated.
Definition at line 240 of file FederatorManagerImpl.h.
Referenced by finalize(), initialize(), and orb().
UpdateListener<OwnerUpdate, OwnerUpdateDataReader> OpenDDS::Federator::ManagerImpl::ownerListener_ [private] |
TopicUpdate listener.
Definition at line 249 of file FederatorManagerImpl.h.
Referenced by finalize(), and initialize().
OwnerUpdateDataWriter_var OpenDDS::Federator::ManagerImpl::ownerWriter_ [private] |
TopicUpdate writer.
Definition at line 264 of file FederatorManagerImpl.h.
Referenced by create(), and initialize().
UpdateListener<ParticipantUpdate, ParticipantUpdateDataReader> OpenDDS::Federator::ManagerImpl::participantListener_ [private] |
ParticipantUpdate listener.
Definition at line 255 of file FederatorManagerImpl.h.
Referenced by finalize(), and initialize().
ParticipantUpdateDataWriter_var OpenDDS::Federator::ManagerImpl::participantWriter_ [private] |
ParticipantUpdate writer.
Definition at line 270 of file FederatorManagerImpl.h.
Referenced by create(), destroy(), initialize(), join_federation(), and update().
The peer with which we have federated.
Definition at line 225 of file FederatorManagerImpl.h.
Referenced by finalize(), join_federation(), and leave_federation().
UpdateListener<PublicationUpdate, PublicationUpdateDataReader> OpenDDS::Federator::ManagerImpl::publicationListener_ [private] |
PublicationUpdate listener.
Definition at line 258 of file FederatorManagerImpl.h.
Referenced by finalize(), and initialize().
PublicationUpdateDataWriter_var OpenDDS::Federator::ManagerImpl::publicationWriter_ [private] |
PublicationUpdate writer.
Definition at line 273 of file FederatorManagerImpl.h.
Referenced by destroy(), initialize(), and update().
The packet sequence number for data that we publish.
Definition at line 228 of file FederatorManagerImpl.h.
UpdateListener<SubscriptionUpdate, SubscriptionUpdateDataReader> OpenDDS::Federator::ManagerImpl::subscriptionListener_ [private] |
SubscriptionUpdate listener.
Definition at line 261 of file FederatorManagerImpl.h.
Referenced by finalize(), and initialize().
SubscriptionUpdateDataWriter_var OpenDDS::Federator::ManagerImpl::subscriptionWriter_ [private] |
SubscriptionUpdate writer.
Definition at line 276 of file FederatorManagerImpl.h.
Referenced by create(), destroy(), initialize(), and update().
UpdateListener<TopicUpdate, TopicUpdateDataReader> OpenDDS::Federator::ManagerImpl::topicListener_ [private] |
TopicUpdate listener.
Definition at line 252 of file FederatorManagerImpl.h.
Referenced by finalize(), and initialize().
TopicUpdateDataWriter_var OpenDDS::Federator::ManagerImpl::topicWriter_ [private] |
TopicUpdate writer.
Definition at line 267 of file FederatorManagerImpl.h.
Referenced by create(), destroy(), initialize(), and update().