00001
00002
00003
00004
00005
00006
00007
00008 #include "DcpsInfo_pch.h"
00009 #include "FederatorManagerImpl.h"
00010 #include "DCPSInfo_i.h"
00011 #include "DCPS_IR_Domain.h"
00012 #include "DCPS_IR_Participant.h"
00013
00014 #include "dds/DCPS/RepoIdConverter.h"
00015
00016 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00017
00018 namespace OpenDDS {
00019 namespace Federator {
00020
00021 void
00022 ManagerImpl::unregisterCallback()
00023 {
00024
00025 }
00026
00027 void
00028 ManagerImpl::requestImage()
00029 {
00030
00031 }
00032
00033
00034
00035
00036
00037
00038
00039 void
00040 ManagerImpl::create(const Update::UTopic& topic)
00041 {
00042 if (CORBA::is_nil(this->topicWriter_.in())) {
00043
00044 return;
00045 }
00046
00047 TopicUpdate sample;
00048 sample.sender = this->id().id();
00049 sample.action = CreateEntity;
00050
00051 sample.id = topic.topicId;
00052 sample.domain = topic.domainId;
00053 sample.participant = topic.participantId;
00054 sample.topic = topic.name.c_str();
00055 sample.datatype = topic.dataType.c_str();
00056 sample.qos = topic.topicQos;
00057
00058 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00059 OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant);
00060 OpenDDS::DCPS::RepoIdConverter topic_converter(sample.id);
00061 ACE_DEBUG((LM_DEBUG,
00062 ACE_TEXT("(%P|%t) Federator::ManagerImpl::create( TopicUpdate): ")
00063 ACE_TEXT("repo %d - [ domain %d/ participant %C/ topic %C ]\n"),
00064 this->id().id(),
00065 sample.domain,
00066 std::string(part_converter).c_str(),
00067 std::string(topic_converter).c_str()));
00068 }
00069
00070 this->topicWriter_->write(sample, DDS::HANDLE_NIL);
00071 }
00072
00073 void
00074 ManagerImpl::create(const Update::UParticipant& participant)
00075 {
00076 if (CORBA::is_nil(this->participantWriter_.in())) {
00077
00078 return;
00079 }
00080
00081 ParticipantUpdate sample;
00082 sample.sender = this->id().id();
00083 sample.action = CreateEntity;
00084
00085 sample.owner = participant.owner;
00086 sample.domain = participant.domainId;
00087 sample.id = participant.participantId;
00088 sample.qos = participant.participantQos;
00089
00090 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00091 OpenDDS::DCPS::RepoIdConverter converter(sample.id);
00092 ACE_DEBUG((LM_DEBUG,
00093 ACE_TEXT("(%P|%t) Federator::ManagerImpl::create( ParticipantUpdate): ")
00094 ACE_TEXT("repo %d - [ domain %d/ participant %C ]\n"),
00095 this->id().id(),
00096 sample.domain,
00097 std::string(converter).c_str()));
00098 }
00099
00100 this->participantWriter_->write(sample, DDS::HANDLE_NIL);
00101 }
00102
00103 void
00104 ManagerImpl::create(const Update::URActor& reader)
00105 {
00106 if (CORBA::is_nil(this->subscriptionWriter_.in())) {
00107
00108 return;
00109 }
00110
00111 SubscriptionUpdate sample;
00112 sample.sender = this->id().id();
00113 sample.action = CreateEntity;
00114
00115 sample.domain = reader.domainId;
00116 sample.participant = reader.participantId;
00117 sample.topic = reader.topicId;
00118 sample.id = reader.actorId;
00119 sample.callback = reader.callback.c_str();
00120 sample.datareader_qos = reader.drdwQos;
00121 sample.subscriber_qos = reader.pubsubQos;
00122 sample.transport_info = reader.transportInterfaceInfo;
00123 sample.filter_class_name = reader.contentSubscriptionProfile.filterClassName;
00124 sample.filter_expression = reader.contentSubscriptionProfile.filterExpr;
00125 sample.expression_params = reader.contentSubscriptionProfile.exprParams;
00126
00127 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00128 OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant);
00129 OpenDDS::DCPS::RepoIdConverter sub_converter(sample.id);
00130 ACE_DEBUG((LM_DEBUG,
00131 ACE_TEXT("(%P|%t) Federator::ManagerImpl::create( SubscriptionUpdate): ")
00132 ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"),
00133 this->id().id(),
00134 sample.domain,
00135 std::string(part_converter).c_str(),
00136 std::string(sub_converter).c_str()));
00137 }
00138
00139 this->subscriptionWriter_->write(sample, DDS::HANDLE_NIL);
00140 }
00141
00142 void
00143 ManagerImpl::create(const Update::UWActor& writer)
00144 {
00145 if (CORBA::is_nil(this->publicationWriter_.in())) {
00146
00147 return;
00148 }
00149
00150 PublicationUpdate sample;
00151 sample.sender = this->id().id();
00152 sample.action = CreateEntity;
00153
00154 sample.domain = writer.domainId;
00155 sample.participant = writer.participantId;
00156 sample.topic = writer.topicId;
00157 sample.id = writer.actorId;
00158 sample.callback = writer.callback.c_str();
00159 sample.datawriter_qos = writer.drdwQos;
00160 sample.publisher_qos = writer.pubsubQos;
00161 sample.transport_info = writer.transportInterfaceInfo;
00162
00163 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00164 OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant);
00165 OpenDDS::DCPS::RepoIdConverter pub_converter(sample.id);
00166 ACE_DEBUG((LM_DEBUG,
00167 ACE_TEXT("(%P|%t) Federator::ManagerImpl::create( PublicationUpdate): ")
00168 ACE_TEXT("repo %d - [ domain %d/ participant %C/ publication %C ]\n"),
00169 this->id().id(),
00170 sample.domain,
00171 std::string(part_converter).c_str(),
00172 std::string(pub_converter).c_str()));
00173 }
00174
00175 this->publicationWriter_->write(sample, DDS::HANDLE_NIL);
00176 }
00177
00178 void
00179 ManagerImpl::create(const Update::OwnershipData& data)
00180 {
00181 if (CORBA::is_nil(this->ownerWriter_.in())) {
00182
00183 return;
00184 }
00185
00186 OwnerUpdate sample;
00187 sample.sender = this->id().id();
00188 sample.action = CreateEntity;
00189
00190 sample.domain = data.domain;
00191 sample.participant = data.participant;
00192 sample.owner = data.owner;
00193
00194 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00195 OpenDDS::DCPS::RepoIdConverter converter(sample.participant);
00196 ACE_DEBUG((LM_DEBUG,
00197 ACE_TEXT("(%P|%t) Federator::ManagerImpl::create( OwnerUpdate): ")
00198 ACE_TEXT("repo %d - [ domain %d/ participant %C/ sender %d/ owner %d ]\n"),
00199 this->id().id(),
00200 sample.domain,
00201 std::string(converter).c_str(),
00202 sample.sender,
00203 sample.owner));
00204 }
00205
00206 this->ownerWriter_->write(sample, DDS::HANDLE_NIL);
00207 }
00208
00209 void
00210 ManagerImpl::destroy(
00211 const Update::IdPath& id,
00212 Update::ItemType type,
00213 Update::ActorType actor)
00214 {
00215
00216
00217
00218
00219 if (id.domain == this->config_.federationDomain()) {
00220 return;
00221 }
00222
00223 switch (type) {
00224 case Update::Topic: {
00225 if (CORBA::is_nil(this->topicWriter_.in())) {
00226
00227 return;
00228 }
00229
00230 TopicUpdate sample;
00231 sample.sender = this->id().id();
00232 sample.action = DestroyEntity;
00233
00234 sample.id = id.id;
00235 sample.domain = id.domain;
00236 sample.participant = id.participant;
00237
00238 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00239 OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant);
00240 OpenDDS::DCPS::RepoIdConverter topic_converter(sample.id);
00241 ACE_DEBUG((LM_DEBUG,
00242 ACE_TEXT("(%P|%t) Federator::ManagerImpl::destroy( TopicUpdate): ")
00243 ACE_TEXT("repo %d - [ domain %d/ participant %C/ topic %C ]\n"),
00244 this->id().id(),
00245 sample.domain,
00246 std::string(part_converter).c_str(),
00247 std::string(topic_converter).c_str()));
00248 }
00249
00250 this->topicWriter_->write(sample, DDS::HANDLE_NIL);
00251 }
00252 break;
00253
00254 case Update::Participant: {
00255 if (CORBA::is_nil(this->participantWriter_.in())) {
00256
00257 return;
00258 }
00259
00260 ParticipantUpdate sample;
00261 sample.sender = this->id().id();
00262 sample.action = DestroyEntity;
00263
00264 sample.domain = id.domain;
00265 sample.id = id.id;
00266
00267 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00268 OpenDDS::DCPS::RepoIdConverter converter(sample.id);
00269 ACE_DEBUG((LM_DEBUG,
00270 ACE_TEXT("(%P|%t) Federator::ManagerImpl::destroy( ParticipantUpdate): ")
00271 ACE_TEXT("repo %d - [ domain %d/ participant %C ]\n"),
00272 this->id().id(),
00273 sample.domain,
00274 std::string(converter).c_str()));
00275 }
00276
00277 this->participantWriter_->write(sample, DDS::HANDLE_NIL);
00278 }
00279 break;
00280
00281 case Update::Actor:
00282
00283
00284 switch (actor) {
00285 case Update::DataWriter: {
00286 if (CORBA::is_nil(this->publicationWriter_.in())) {
00287
00288 return;
00289 }
00290
00291 PublicationUpdate sample;
00292 sample.sender = this->id().id();
00293 sample.action = DestroyEntity;
00294
00295 sample.domain = id.domain;
00296 sample.participant = id.participant;
00297 sample.id = id.id;
00298
00299 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00300 OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant);
00301 OpenDDS::DCPS::RepoIdConverter pub_converter(sample.id);
00302 ACE_DEBUG((LM_DEBUG,
00303 ACE_TEXT("(%P|%t) Federator::ManagerImpl::destroy( PublicationUpdate): ")
00304 ACE_TEXT("repo %d - [ domain %d/ participant %C/ publication %C ]\n"),
00305 this->id().id(),
00306 sample.domain,
00307 std::string(part_converter).c_str(),
00308 std::string(pub_converter).c_str()));
00309 }
00310
00311 this->publicationWriter_->write(sample, DDS::HANDLE_NIL);
00312 }
00313 break;
00314
00315 case Update::DataReader: {
00316 if (CORBA::is_nil(this->subscriptionWriter_.in())) {
00317
00318 return;
00319 }
00320
00321 SubscriptionUpdate sample;
00322 sample.sender = this->id().id();
00323 sample.action = DestroyEntity;
00324
00325 sample.domain = id.domain;
00326 sample.participant = id.participant;
00327 sample.id = id.id;
00328
00329 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00330 OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant);
00331 OpenDDS::DCPS::RepoIdConverter sub_converter(sample.id);
00332 ACE_DEBUG((LM_DEBUG,
00333 ACE_TEXT("(%P|%t) Federator::ManagerImpl::destroy( SubscriptionUpdate): ")
00334 ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"),
00335 this->id().id(),
00336 sample.domain,
00337 std::string(part_converter).c_str(),
00338 std::string(sub_converter).c_str()));
00339 }
00340
00341 this->subscriptionWriter_->write(sample, DDS::HANDLE_NIL);
00342 }
00343 break;
00344 }
00345
00346 break;
00347 }
00348 }
00349
00350 void
00351 ManagerImpl::update(const Update::IdPath& id, const DDS::DomainParticipantQos& qos)
00352 {
00353 if (CORBA::is_nil(this->participantWriter_.in())) {
00354
00355 return;
00356 }
00357
00358 ParticipantUpdate sample;
00359 sample.sender = this->id().id();
00360 sample.action = UpdateQosValue1;
00361
00362 sample.domain = id.domain;
00363 sample.id = id.id;
00364 sample.qos = qos;
00365
00366 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00367 OpenDDS::DCPS::RepoIdConverter converter(sample.id);
00368 ACE_DEBUG((LM_DEBUG,
00369 ACE_TEXT("(%P|%t) Federator::ManagerImpl::update( ParticipantUpdate): ")
00370 ACE_TEXT("repo %d - [ domain %d/ participant %C ]\n"),
00371 this->id().id(),
00372 sample.domain,
00373 std::string(converter).c_str()));
00374 }
00375
00376 this->participantWriter_->write(sample, DDS::HANDLE_NIL);
00377 }
00378
00379 void
00380 ManagerImpl::update(const Update::IdPath& id, const DDS::TopicQos& qos)
00381 {
00382 if (CORBA::is_nil(this->topicWriter_.in())) {
00383
00384 return;
00385 }
00386
00387 TopicUpdate sample;
00388 sample.sender = this->id().id();
00389 sample.action = UpdateQosValue1;
00390
00391 sample.id = id.id;
00392 sample.domain = id.domain;
00393 sample.participant = id.participant;
00394 sample.qos = qos;
00395
00396 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00397 OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant);
00398 OpenDDS::DCPS::RepoIdConverter topic_converter(sample.id);
00399 ACE_DEBUG((LM_DEBUG,
00400 ACE_TEXT("(%P|%t) Federator::ManagerImpl::update( TopicUpdate): ")
00401 ACE_TEXT("repo %d - [ domain %d/ participant %C/ topic %C ]\n"),
00402 this->id().id(),
00403 sample.domain,
00404 std::string(part_converter).c_str(),
00405 std::string(topic_converter).c_str()));
00406 }
00407
00408 this->topicWriter_->write(sample, DDS::HANDLE_NIL);
00409 }
00410
00411 void
00412 ManagerImpl::update(const Update::IdPath& id, const DDS::DataWriterQos& qos)
00413 {
00414 if (CORBA::is_nil(this->publicationWriter_.in())) {
00415
00416 return;
00417 }
00418
00419 PublicationUpdate sample;
00420 sample.sender = this->id().id();
00421 sample.action = UpdateQosValue1;
00422
00423 sample.domain = id.domain;
00424 sample.participant = id.participant;
00425 sample.id = id.id;
00426 sample.datawriter_qos = qos;
00427
00428 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00429 OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant);
00430 OpenDDS::DCPS::RepoIdConverter pub_converter(sample.id);
00431 ACE_DEBUG((LM_DEBUG,
00432 ACE_TEXT("(%P|%t) Federator::ManagerImpl::update( WriterUpdate): ")
00433 ACE_TEXT("repo %d - [ domain %d/ participant %C/ publication %C ]\n"),
00434 this->id().id(),
00435 sample.domain,
00436 std::string(part_converter).c_str(),
00437 std::string(pub_converter).c_str()));
00438 }
00439
00440 this->publicationWriter_->write(sample, DDS::HANDLE_NIL);
00441 }
00442
00443 void
00444 ManagerImpl::update(const Update::IdPath& id, const DDS::PublisherQos& qos)
00445 {
00446 if (CORBA::is_nil(this->publicationWriter_.in())) {
00447
00448 return;
00449 }
00450
00451 PublicationUpdate sample;
00452 sample.sender = this->id().id();
00453 sample.action = UpdateQosValue2;
00454
00455 sample.domain = id.domain;
00456 sample.participant = id.participant;
00457 sample.id = id.id;
00458 sample.publisher_qos = qos;
00459
00460 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00461 OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant);
00462 OpenDDS::DCPS::RepoIdConverter pub_converter(sample.id);
00463 ACE_DEBUG((LM_DEBUG,
00464 ACE_TEXT("(%P|%t) Federator::ManagerImpl::update( PublisherUpdate): ")
00465 ACE_TEXT("repo %d - [ domain %d/ participant %C/ publication %C ]\n"),
00466 this->id().id(),
00467 sample.domain,
00468 std::string(part_converter).c_str(),
00469 std::string(pub_converter).c_str()));
00470 }
00471
00472 this->publicationWriter_->write(sample, DDS::HANDLE_NIL);
00473 }
00474
00475 void
00476 ManagerImpl::update(const Update::IdPath& id, const DDS::DataReaderQos& qos)
00477 {
00478 if (CORBA::is_nil(this->subscriptionWriter_.in())) {
00479
00480 return;
00481 }
00482
00483 SubscriptionUpdate sample;
00484 sample.sender = this->id().id();
00485 sample.action = UpdateQosValue1;
00486
00487 sample.domain = id.domain;
00488 sample.participant = id.participant;
00489 sample.id = id.id;
00490 sample.datareader_qos = qos;
00491
00492 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00493 OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant);
00494 OpenDDS::DCPS::RepoIdConverter sub_converter(sample.id);
00495 ACE_DEBUG((LM_DEBUG,
00496 ACE_TEXT("(%P|%t) Federator::ManagerImpl::update( ReaderUpdate): ")
00497 ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"),
00498 this->id().id(),
00499 sample.domain,
00500 std::string(part_converter).c_str(),
00501 std::string(sub_converter).c_str()));
00502 }
00503
00504 this->subscriptionWriter_->write(sample, DDS::HANDLE_NIL);
00505 }
00506
00507 void
00508 ManagerImpl::update(const Update::IdPath& id, const DDS::StringSeq& params)
00509 {
00510 if (CORBA::is_nil(this->subscriptionWriter_.in())) {
00511
00512 return;
00513 }
00514
00515 SubscriptionUpdate sample;
00516 sample.sender = this->id().id();
00517 sample.action = UpdateFilterExpressionParams;
00518 sample.domain = id.domain;
00519 sample.participant = id.participant;
00520 sample.id = id.id;
00521 sample.expression_params = params;
00522
00523 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00524 OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant);
00525 OpenDDS::DCPS::RepoIdConverter sub_converter(sample.id);
00526 ACE_DEBUG((LM_DEBUG,
00527 ACE_TEXT("(%P|%t) Federator::ManagerImpl::update(FilterParams): ")
00528 ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"),
00529 this->id().id(),
00530 sample.domain,
00531 std::string(part_converter).c_str(),
00532 std::string(sub_converter).c_str()));
00533 }
00534
00535 this->subscriptionWriter_->write(sample, DDS::HANDLE_NIL);
00536 }
00537
00538 void
00539 ManagerImpl::update(const Update::IdPath& id, const DDS::SubscriberQos& qos)
00540 {
00541 if (CORBA::is_nil(this->subscriptionWriter_.in())) {
00542
00543 return;
00544 }
00545
00546 SubscriptionUpdate sample;
00547 sample.sender = this->id().id();
00548 sample.action = UpdateQosValue2;
00549
00550 sample.domain = id.domain;
00551 sample.participant = id.participant;
00552 sample.id = id.id;
00553 sample.subscriber_qos = qos;
00554
00555 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00556 OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant);
00557 OpenDDS::DCPS::RepoIdConverter sub_converter(sample.id);
00558 ACE_DEBUG((LM_DEBUG,
00559 ACE_TEXT("(%P|%t) Federator::ManagerImpl::update( SubscriberUpdate): ")
00560 ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"),
00561 this->id().id(),
00562 sample.domain,
00563 std::string(part_converter).c_str(),
00564 std::string(sub_converter).c_str()));
00565 }
00566
00567 this->subscriptionWriter_->write(sample, DDS::HANDLE_NIL);
00568 }
00569
00570
00571
00572
00573
00574
00575
00576 void
00577 ManagerImpl::processCreate(const OwnerUpdate* sample, const DDS::SampleInfo* )
00578 {
00579 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00580 OpenDDS::DCPS::RepoIdConverter converter(sample->participant);
00581 ACE_DEBUG((LM_DEBUG,
00582 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processCreate( OwnerUpdate): ")
00583 ACE_TEXT("repo %d - [ domain %d/ participant %C/ sender %d/ owner %d ]\n"),
00584 this->id().id(),
00585 sample->domain,
00586 std::string(converter).c_str(),
00587 sample->sender,
00588 sample->owner));
00589 }
00590
00591
00592 if (false == this->info_->changeOwnership(sample->domain,
00593 sample->participant,
00594 sample->sender,
00595 sample->owner)) {
00596 {
00597 ACE_GUARD(ACE_Thread_Mutex,
00598 guard,
00599 this->deferred_lock_);
00600 this->deferredOwnerships_.push_back(*sample);
00601 }
00602
00603 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00604 ACE_DEBUG((LM_DEBUG,
00605 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processCreate( OwnerUpdate): ")
00606 ACE_TEXT("deferred update.\n")));
00607 }
00608 }
00609
00610 this->processDeferred();
00611 }
00612
00613 void
00614 ManagerImpl::processCreate(const PublicationUpdate* sample, const DDS::SampleInfo* )
00615 {
00616 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00617 OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
00618 OpenDDS::DCPS::RepoIdConverter pub_converter(sample->id);
00619 ACE_DEBUG((LM_DEBUG,
00620 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processCreate( PublicationUpdate): ")
00621 ACE_TEXT("repo %d - [ domain %d/ participant %C/ publication %C ]\n"),
00622 this->id().id(),
00623 sample->domain,
00624 std::string(part_converter).c_str(),
00625 std::string(pub_converter).c_str()));
00626 }
00627
00628 if (false == this->info_->add_publication(sample->domain,
00629 sample->participant,
00630 sample->topic,
00631 sample->id,
00632 sample->callback,
00633 sample->datawriter_qos,
00634 sample->transport_info,
00635 sample->publisher_qos,
00636 true)) {
00637 {
00638 ACE_GUARD(ACE_Thread_Mutex,
00639 guard,
00640 this->deferred_lock_);
00641 this->deferredPublications_.push_back(*sample);
00642 }
00643
00644 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00645 ACE_DEBUG((LM_DEBUG,
00646 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processCreate( PublicationUpdate): ")
00647 ACE_TEXT("deferred update.\n")));
00648 }
00649 }
00650
00651 this->processDeferred();
00652 }
00653
00654 void
00655 ManagerImpl::processCreate(const SubscriptionUpdate* sample, const DDS::SampleInfo* )
00656 {
00657 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00658 OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
00659 OpenDDS::DCPS::RepoIdConverter sub_converter(sample->id);
00660 ACE_DEBUG((LM_DEBUG,
00661 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processCreate( SubscriptionUpdate): ")
00662 ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"),
00663 this->id().id(),
00664 sample->domain,
00665 std::string(part_converter).c_str(),
00666 std::string(sub_converter).c_str()));
00667 }
00668
00669 if (false == this->info_->add_subscription(sample->domain,
00670 sample->participant,
00671 sample->topic,
00672 sample->id,
00673 sample->callback,
00674 sample->datareader_qos,
00675 sample->transport_info,
00676 sample->subscriber_qos,
00677 sample->filter_class_name,
00678 sample->filter_expression,
00679 sample->expression_params,
00680 true)) {
00681 {
00682 ACE_GUARD(ACE_Thread_Mutex,
00683 guard,
00684 this->deferred_lock_);
00685 this->deferredSubscriptions_.push_back(*sample);
00686 }
00687
00688 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00689 ACE_DEBUG((LM_DEBUG,
00690 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processCreate( SubscriptionUpdate): ")
00691 ACE_TEXT("deferred update.\n")));
00692 }
00693 }
00694
00695 this->processDeferred();
00696 }
00697
00698 void
00699 ManagerImpl::processCreate(const ParticipantUpdate* sample, const DDS::SampleInfo* )
00700 {
00701 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00702 OpenDDS::DCPS::RepoIdConverter converter(sample->id);
00703 ACE_DEBUG((LM_DEBUG,
00704 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processCreate( ParticipantUpdate): ")
00705 ACE_TEXT("repo %d - [ domain %d/ participant %C/ owner %d ]\n"),
00706 this->id().id(),
00707 sample->domain,
00708 std::string(converter).c_str(),
00709 sample->owner));
00710 }
00711
00712 this->info_->add_domain_participant(
00713 sample->domain,
00714 sample->id,
00715 sample->qos);
00716 bool ownershipChanged = this->info_->changeOwnership(
00717 sample->domain,
00718 sample->id,
00719 sample->sender,
00720 sample->owner);
00721 if (!ownershipChanged) {
00722 ACE_ERROR((LM_ERROR,
00723 ACE_TEXT("(%P|%t) ERROR: ")
00724 ACE_TEXT("OpenDDS::Federator::ManagerImpl::processCreate(), ")
00725 ACE_TEXT("Could not change ownership\n")));
00726 }
00727 this->processDeferred();
00728 }
00729
00730 void
00731 ManagerImpl::processCreate(const TopicUpdate* sample, const DDS::SampleInfo* )
00732 {
00733 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00734 OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
00735 OpenDDS::DCPS::RepoIdConverter topic_converter(sample->id);
00736 ACE_DEBUG((LM_DEBUG,
00737 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processCreate( TopicUpdate): ")
00738 ACE_TEXT("repo %d - [ domain %d/ participant %C/ topic %C ]\n"),
00739 this->id().id(),
00740 sample->domain,
00741 std::string(part_converter).c_str(),
00742 std::string(topic_converter).c_str()));
00743 }
00744
00745 if (false == this->info_->add_topic(sample->id,
00746 sample->domain,
00747 sample->participant,
00748 sample->topic,
00749 sample->datatype,
00750 sample->qos)) {
00751 {
00752 ACE_GUARD(ACE_Thread_Mutex,
00753 guard,
00754 this->deferred_lock_);
00755 this->deferredTopics_.push_back(*sample);
00756 }
00757
00758 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00759 ACE_DEBUG((LM_DEBUG,
00760 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processCreate( TopicUpdate): ")
00761 ACE_TEXT("deferred update.\n")));
00762 }
00763 }
00764
00765 this->processDeferred();
00766 }
00767
00768 void
00769 ManagerImpl::processDeferred()
00770 {
00771 ACE_GUARD(ACE_Thread_Mutex,
00772 guard,
00773 this->deferred_lock_);
00774
00775 {
00776 std::list<OwnerUpdate>::iterator current = this->deferredOwnerships_.begin();
00777
00778 while (current != this->deferredOwnerships_.end()) {
00779 if (this->info_->changeOwnership(current->domain,
00780 current->participant,
00781 current->sender,
00782 current->owner)) {
00783 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00784 OpenDDS::DCPS::RepoIdConverter converter(current->participant);
00785 ACE_DEBUG((LM_DEBUG,
00786 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDeferred( OwnerUpdate): ")
00787 ACE_TEXT("repo %d - [ domain %d/ participant %C/ sender %d/ owner %d ]\n"),
00788 this->id().id(),
00789 current->domain,
00790 std::string(converter).c_str(),
00791 current->sender,
00792 current->owner));
00793 }
00794
00795 current = this->deferredOwnerships_.erase(current);
00796
00797 } else {
00798 ++ current;
00799 }
00800 }
00801 }
00802
00803 {
00804 std::list<TopicUpdate>::iterator current = this->deferredTopics_.begin();
00805
00806 while (current != this->deferredTopics_.end()) {
00807 if (true == this->info_->add_topic(current->id,
00808 current->domain,
00809 current->participant,
00810 current->topic,
00811 current->datatype,
00812 current->qos)) {
00813 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00814 OpenDDS::DCPS::RepoIdConverter part_converter(current->participant);
00815 OpenDDS::DCPS::RepoIdConverter topic_converter(current->id);
00816 ACE_DEBUG((LM_DEBUG,
00817 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDeferred( TopicUpdate): ")
00818 ACE_TEXT("repo %d - [ domain %d/ participant %C/ topic %C ]\n"),
00819 this->id().id(),
00820 current->domain,
00821 std::string(part_converter).c_str(),
00822 std::string(topic_converter).c_str()));
00823 }
00824
00825 current = this->deferredTopics_.erase(current);
00826
00827 } else {
00828 ++ current;
00829 }
00830 }
00831 }
00832
00833 {
00834 std::list<PublicationUpdate>::iterator current = this->deferredPublications_.begin();
00835
00836 while (current != this->deferredPublications_.end()) {
00837
00838 if (true == this->info_->add_publication(current->domain,
00839 current->participant,
00840 current->topic,
00841 current->id,
00842 current->callback,
00843 current->datawriter_qos,
00844 current->transport_info,
00845 current->publisher_qos,
00846 true)) {
00847 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00848 OpenDDS::DCPS::RepoIdConverter part_converter(current->participant);
00849 OpenDDS::DCPS::RepoIdConverter pub_converter(current->id);
00850 ACE_DEBUG((LM_DEBUG,
00851 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDeferred( PublicationUpdate): ")
00852 ACE_TEXT("repo %d - [ domain %d/ participant %C/ publication %C ]\n"),
00853 this->id().id(),
00854 current->domain,
00855 std::string(part_converter).c_str(),
00856 std::string(pub_converter).c_str()));
00857 }
00858
00859 current = this->deferredPublications_.erase(current);
00860
00861 } else {
00862 ++ current;
00863 }
00864 }
00865 }
00866
00867 {
00868 std::list<SubscriptionUpdate>::iterator current = this->deferredSubscriptions_.begin();
00869
00870 while (current != this->deferredSubscriptions_.end()) {
00871
00872 if (true == this->info_->add_subscription(current->domain,
00873 current->participant,
00874 current->topic,
00875 current->id,
00876 current->callback,
00877 current->datareader_qos,
00878 current->transport_info,
00879 current->subscriber_qos,
00880 current->filter_class_name,
00881 current->filter_expression,
00882 current->expression_params,
00883 true)) {
00884 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00885 OpenDDS::DCPS::RepoIdConverter part_converter(current->participant);
00886 OpenDDS::DCPS::RepoIdConverter sub_converter(current->id);
00887 ACE_DEBUG((LM_DEBUG,
00888 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDeferred( SubscriptionUpdate): ")
00889 ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"),
00890 this->id().id(),
00891 current->domain,
00892 std::string(part_converter).c_str(),
00893 std::string(sub_converter).c_str()));
00894 }
00895
00896 current = this->deferredSubscriptions_.erase(current);
00897
00898 } else {
00899 ++ current;
00900 }
00901 }
00902 }
00903
00904 }
00905
00906 void
00907 ManagerImpl::processUpdateQos1(const OwnerUpdate* sample, const DDS::SampleInfo* )
00908 {
00909 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00910 OpenDDS::DCPS::RepoIdConverter converter(sample->participant);
00911 ACE_DEBUG((LM_DEBUG,
00912 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processUpdateQos1( OwnerUpdate): ")
00913 ACE_TEXT("repo %d - [ domain %d/ participant %C/ sender %d/ owner %d ]\n"),
00914 this->id().id(),
00915 sample->domain,
00916 std::string(converter).c_str(),
00917 sample->sender,
00918 sample->owner));
00919 }
00920
00921 if (false == this->info_->changeOwnership(sample->domain,
00922 sample->participant,
00923 sample->sender,
00924 sample->owner)) {
00925 {
00926 ACE_GUARD(ACE_Thread_Mutex,
00927 guard,
00928 this->deferred_lock_);
00929
00930 this->deferredOwnerships_.push_back(*sample);
00931 }
00932 ACE_DEBUG((LM_DEBUG,
00933 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processUpdateQos1( OwnerUpdate): ")
00934 ACE_TEXT("deferred update.\n")));
00935 }
00936 }
00937
00938 void
00939 ManagerImpl::processUpdateQos1(const PublicationUpdate* sample, const DDS::SampleInfo* )
00940 {
00941 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00942 OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
00943 OpenDDS::DCPS::RepoIdConverter pub_converter(sample->id);
00944 ACE_DEBUG((LM_DEBUG,
00945 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processUpdateQos1( PublicationUpdate): ")
00946 ACE_TEXT("repo %d - [ domain %d/ participant %C/ publication %C ]\n"),
00947 this->id().id(),
00948 sample->domain,
00949 std::string(part_converter).c_str(),
00950 std::string(pub_converter).c_str()));
00951 }
00952
00953 this->info_->update_publication_qos(
00954 sample->domain,
00955 sample->participant,
00956 sample->id,
00957 sample->datawriter_qos);
00958 }
00959
00960 void
00961 ManagerImpl::processUpdateQos2(const PublicationUpdate* sample, const DDS::SampleInfo* )
00962 {
00963 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00964 OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
00965 OpenDDS::DCPS::RepoIdConverter pub_converter(sample->id);
00966 ACE_DEBUG((LM_DEBUG,
00967 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processUpdateQos2( PublicationUpdate): ")
00968 ACE_TEXT("repo %d - [ domain %d/ participant %C/ publication %C ]\n"),
00969 this->id().id(),
00970 sample->domain,
00971 std::string(part_converter).c_str(),
00972 std::string(pub_converter).c_str()));
00973 }
00974
00975 this->info_->update_publication_qos(
00976 sample->domain,
00977 sample->participant,
00978 sample->id,
00979 sample->publisher_qos);
00980 }
00981
00982 void
00983 ManagerImpl::processUpdateQos1(const SubscriptionUpdate* sample, const DDS::SampleInfo* )
00984 {
00985 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00986 OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
00987 OpenDDS::DCPS::RepoIdConverter sub_converter(sample->id);
00988 ACE_DEBUG((LM_DEBUG,
00989 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processUpdateQos1( SubscriptionUpdate): ")
00990 ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"),
00991 this->id().id(),
00992 sample->domain,
00993 std::string(part_converter).c_str(),
00994 std::string(sub_converter).c_str()));
00995 }
00996
00997 this->info_->update_subscription_qos(
00998 sample->domain,
00999 sample->participant,
01000 sample->id,
01001 sample->datareader_qos);
01002 }
01003
01004 void
01005 ManagerImpl::processUpdateQos2(const SubscriptionUpdate* sample, const DDS::SampleInfo* )
01006 {
01007 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
01008 OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
01009 OpenDDS::DCPS::RepoIdConverter sub_converter(sample->id);
01010 ACE_DEBUG((LM_DEBUG,
01011 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processUpdateQos2( SubscriptionUpdate): ")
01012 ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"),
01013 this->id().id(),
01014 sample->domain,
01015 std::string(part_converter).c_str(),
01016 std::string(sub_converter).c_str()));
01017 }
01018
01019 this->info_->update_subscription_qos(
01020 sample->domain,
01021 sample->participant,
01022 sample->id,
01023 sample->subscriber_qos);
01024 }
01025
01026 void
01027 ManagerImpl::processUpdateFilterExpressionParams(
01028 const SubscriptionUpdate* sample, const DDS::SampleInfo* )
01029 {
01030 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
01031 OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
01032 OpenDDS::DCPS::RepoIdConverter sub_converter(sample->id);
01033 ACE_DEBUG((LM_DEBUG,
01034 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processUpdateFilterExpressionParams(SubscriptionUpdate): ")
01035 ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"),
01036 this->id().id(),
01037 sample->domain,
01038 std::string(part_converter).c_str(),
01039 std::string(sub_converter).c_str()));
01040 }
01041
01042 this->info_->update_subscription_params(
01043 sample->domain,
01044 sample->participant,
01045 sample->id,
01046 sample->expression_params);
01047 }
01048
01049 void
01050 ManagerImpl::processUpdateQos1(const ParticipantUpdate* sample, const DDS::SampleInfo* )
01051 {
01052 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
01053 OpenDDS::DCPS::RepoIdConverter converter(sample->id);
01054 ACE_DEBUG((LM_DEBUG,
01055 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processUpdateQos1( ParticipantUpdate): ")
01056 ACE_TEXT("repo %d - [ domain %d/ participant %C ]\n"),
01057 this->id().id(),
01058 sample->domain,
01059 std::string(converter).c_str()));
01060 }
01061
01062 this->info_->update_domain_participant_qos(
01063 sample->domain,
01064 sample->id,
01065 sample->qos);
01066 }
01067
01068 void
01069 ManagerImpl::processUpdateQos1(const TopicUpdate* sample, const DDS::SampleInfo* )
01070 {
01071 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
01072 OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
01073 OpenDDS::DCPS::RepoIdConverter topic_converter(sample->id);
01074 ACE_DEBUG((LM_DEBUG,
01075 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processUpdateQos1( TopicUpdate): ")
01076 ACE_TEXT("repo %d - [ domain %d/ participant %C/ topic %C ]\n"),
01077 this->id().id(),
01078 sample->domain,
01079 std::string(part_converter).c_str(),
01080 std::string(topic_converter).c_str()));
01081 }
01082
01083 this->info_->update_topic_qos(
01084 sample->id,
01085 sample->domain,
01086 sample->participant,
01087 sample->qos);
01088 }
01089
01090 void
01091 ManagerImpl::processDelete(const OwnerUpdate* sample, const DDS::SampleInfo* )
01092 {
01093 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
01094 OpenDDS::DCPS::RepoIdConverter converter(sample->participant);
01095 ACE_DEBUG((LM_DEBUG,
01096 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( OwnerUpdate): ")
01097 ACE_TEXT("repo %d - [ domain %d/ participant %C/ sender %d/ owner %d ]\n"),
01098 this->id().id(),
01099 sample->domain,
01100 std::string(converter).c_str(),
01101 sample->sender,
01102 sample->owner));
01103 }
01104
01105
01106 if (false == this->info_->changeOwnership(sample->domain,
01107 sample->participant,
01108 sample->sender,
01109 sample->owner)) {
01110 {
01111 ACE_GUARD(ACE_Thread_Mutex,
01112 guard,
01113 this->deferred_lock_);
01114 this->deferredOwnerships_.push_back(*sample);
01115 }
01116 ACE_DEBUG((LM_DEBUG,
01117 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( OwnerUpdate): ")
01118 ACE_TEXT("deferred update.\n")));
01119 }
01120 }
01121
01122 void
01123 ManagerImpl::processDelete(const PublicationUpdate* sample, const DDS::SampleInfo* )
01124 {
01125 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
01126 OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
01127 OpenDDS::DCPS::RepoIdConverter pub_converter(sample->id);
01128 ACE_DEBUG((LM_DEBUG,
01129 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( PublicationUpdate): ")
01130 ACE_TEXT("repo %d - [ domain %d/ participant %C/ publication %C ]\n"),
01131 this->id().id(),
01132 sample->domain,
01133 std::string(part_converter).c_str(),
01134 std::string(pub_converter).c_str()));
01135 }
01136
01137 try {
01138 this->info_->remove_publication(
01139 sample->domain,
01140 sample->participant,
01141 sample->id);
01142
01143 } catch (OpenDDS::DCPS::Invalid_Participant&) {
01144 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
01145 ACE_DEBUG((LM_DEBUG,
01146 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( PublicationUpdate): ")
01147 ACE_TEXT("the participant was already removed.\n")));
01148 }
01149 }
01150 }
01151
01152 void
01153 ManagerImpl::processDelete(const SubscriptionUpdate* sample, const DDS::SampleInfo* )
01154 {
01155 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
01156 OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
01157 OpenDDS::DCPS::RepoIdConverter sub_converter(sample->id);
01158 ACE_DEBUG((LM_DEBUG,
01159 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( SubscriptionUpdate): ")
01160 ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"),
01161 this->id().id(),
01162 sample->domain,
01163 std::string(part_converter).c_str(),
01164 std::string(sub_converter).c_str()));
01165 }
01166
01167 try {
01168 this->info_->remove_subscription(
01169 sample->domain,
01170 sample->participant,
01171 sample->id);
01172
01173 } catch (OpenDDS::DCPS::Invalid_Participant&) {
01174 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
01175 ACE_DEBUG((LM_DEBUG,
01176 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( SubscriptionUpdate): ")
01177 ACE_TEXT("the participant was already removed.\n")));
01178 }
01179 }
01180 }
01181
01182 void
01183 ManagerImpl::processDelete(const ParticipantUpdate* sample, const DDS::SampleInfo* )
01184 {
01185 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
01186 OpenDDS::DCPS::RepoIdConverter converter(sample->id);
01187 ACE_DEBUG((LM_DEBUG,
01188 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( ParticipantUpdate): ")
01189 ACE_TEXT("repo %d - [ domain %d/ participant %C ]\n"),
01190 this->id().id(),
01191 sample->domain,
01192 std::string(converter).c_str()));
01193 }
01194 try {
01195 this->info_->remove_domain_participant(
01196 sample->domain,
01197 sample->id);
01198 } catch (OpenDDS::DCPS::Invalid_Participant&) {
01199 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
01200 ACE_DEBUG((LM_DEBUG,
01201 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( ParticipantUpdate): ")
01202 ACE_TEXT("the participant was already removed.\n")));
01203 }
01204 }
01205 }
01206
01207 void
01208 ManagerImpl::processDelete(const TopicUpdate* sample, const DDS::SampleInfo* )
01209 {
01210 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
01211 OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
01212 OpenDDS::DCPS::RepoIdConverter topic_converter(sample->id);
01213 ACE_DEBUG((LM_DEBUG,
01214 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( TopicUpdate): ")
01215 ACE_TEXT("repo %d - [ domain %d/ participant %C/ topic %C ]\n"),
01216 this->id().id(),
01217 sample->domain,
01218 std::string(part_converter).c_str(),
01219 std::string(topic_converter).c_str()));
01220 }
01221
01222 try {
01223 this->info_->remove_topic(
01224 sample->domain,
01225 sample->participant,
01226 sample->id);
01227
01228 } catch (OpenDDS::DCPS::Invalid_Participant&) {
01229 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
01230 ACE_DEBUG((LM_DEBUG,
01231 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( TopicUpdate): ")
01232 ACE_TEXT("the participant was already removed.\n")));
01233 }
01234 } catch (OpenDDS::DCPS::Invalid_Domain&) {
01235 if (OpenDDS::DCPS::DCPS_debug_level > 1) {
01236 ACE_DEBUG((LM_DEBUG,
01237 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( TopicUpdate): ")
01238 ACE_TEXT("the domain %d no longer exists.\n"),sample->domain));
01239 }
01240 }
01241 }
01242
01243 void
01244 ManagerImpl::pushState(Manager_ptr peer)
01245 {
01246
01247
01248
01249
01250
01251
01252
01253
01254
01255
01256
01257
01258
01259 for (DCPS_IR_Domain_Map::const_iterator currentDomain
01260 = this->info_->domains().begin();
01261 currentDomain != this->info_->domains().end();
01262 ++currentDomain) {
01263
01264 if (currentDomain->second->get_id() == this->config_.federationDomain()) {
01265
01266
01267 }
01268
01269
01270 for (DCPS_IR_Participant_Map::const_iterator currentParticipant
01271 = currentDomain->second->participants().begin();
01272 currentParticipant != currentDomain->second->participants().end();
01273 ++currentParticipant) {
01274
01275 if (currentParticipant->second->isBitPublisher() == true) {
01276
01277 continue;
01278 }
01279
01280
01281 ParticipantUpdate participantSample;
01282 participantSample.sender = this->id().id();
01283 participantSample.action = CreateEntity;
01284
01285 participantSample.owner = currentParticipant->second->owner();
01286 participantSample.domain = currentDomain->second->get_id();
01287 participantSample.id = currentParticipant->second->get_id();
01288 participantSample.qos = *currentParticipant->second->get_qos();
01289
01290 peer->initializeParticipant(participantSample);
01291
01292
01293 OwnerUpdate ownerSample;
01294 ownerSample.sender = this->id().id();
01295 ownerSample.action = CreateEntity;
01296
01297 ownerSample.domain = currentDomain->second->get_id();
01298 ownerSample.participant = currentParticipant->second->get_id();
01299 ownerSample.owner = currentParticipant->second->owner();
01300
01301 peer->initializeOwner(ownerSample);
01302 }
01303
01304
01305 for (DCPS_IR_Participant_Map::const_iterator currentParticipant
01306 = currentDomain->second->participants().begin();
01307 currentParticipant != currentDomain->second->participants().end();
01308 ++currentParticipant) {
01309
01310 if (currentParticipant->second->isBitPublisher() == true) {
01311
01312 continue;
01313 }
01314
01315
01316 for (DCPS_IR_Topic_Map::const_iterator currentTopic
01317 = currentParticipant->second->topics().begin();
01318 currentTopic != currentParticipant->second->topics().end();
01319 ++currentTopic) {
01320 TopicUpdate topicSample;
01321 topicSample.sender = this->id().id();
01322 topicSample.action = CreateEntity;
01323
01324 topicSample.id = currentTopic->second->get_id();
01325 topicSample.domain = currentDomain->second->get_id();
01326 topicSample.participant = currentTopic->second->get_participant_id();
01327 topicSample.topic = currentTopic->second->get_topic_description()->get_name();
01328 topicSample.datatype = currentTopic->second->get_topic_description()->get_dataTypeName();
01329 topicSample.qos = *currentTopic->second->get_topic_qos();
01330
01331 peer->initializeTopic(topicSample);
01332 }
01333
01334
01335 for (DCPS_IR_Publication_Map::const_iterator currentPublication
01336 = currentParticipant->second->publications().begin();
01337 currentPublication != currentParticipant->second->publications().end();
01338 ++currentPublication) {
01339 PublicationUpdate publicationSample;
01340 publicationSample.sender = this->id().id();
01341 publicationSample.action = CreateEntity;
01342
01343 DCPS_IR_Publication* p = currentPublication->second.get();
01344 CORBA::ORB_var orb = this->info_->orb();
01345 CORBA::String_var callback = orb->object_to_string(p->writer());
01346
01347 publicationSample.domain = currentDomain->second->get_id();
01348 publicationSample.participant = p->get_participant_id();
01349 publicationSample.topic = p->get_topic_id();
01350 publicationSample.id = p->get_id();
01351 publicationSample.callback = callback.in();
01352 publicationSample.datawriter_qos = *p->get_datawriter_qos();
01353 publicationSample.publisher_qos = *p->get_publisher_qos();
01354 publicationSample.transport_info = p->get_transportLocatorSeq();
01355
01356 peer->initializePublication(publicationSample);
01357 }
01358
01359
01360 for (DCPS_IR_Subscription_Map::const_iterator currentSubscription
01361 = currentParticipant->second->subscriptions().begin();
01362 currentSubscription != currentParticipant->second->subscriptions().end();
01363 ++currentSubscription) {
01364 SubscriptionUpdate subscriptionSample;
01365 subscriptionSample.sender = this->id().id();
01366 subscriptionSample.action = CreateEntity;
01367
01368 DCPS_IR_Subscription* s = currentSubscription->second.get();
01369 CORBA::ORB_var orb = this->info_->orb();
01370 CORBA::String_var callback = orb->object_to_string(s->reader());
01371
01372 subscriptionSample.domain = currentDomain->second->get_id();
01373 subscriptionSample.participant = s->get_participant_id();
01374 subscriptionSample.topic = s->get_topic_id();
01375 subscriptionSample.id = s->get_id();
01376 subscriptionSample.callback = callback.in();
01377 subscriptionSample.datareader_qos = *s->get_datareader_qos();
01378 subscriptionSample.subscriber_qos = *s->get_subscriber_qos();
01379 subscriptionSample.transport_info = s->get_transportLocatorSeq();
01380 subscriptionSample.filter_expression = s->get_filter_expression().c_str();
01381 subscriptionSample.expression_params = s->get_expr_params();
01382
01383 peer->initializeSubscription(subscriptionSample);
01384 }
01385 }
01386 }
01387 }
01388
01389 }
01390 }
01391
01392 OPENDDS_END_VERSIONED_NAMESPACE_DECL