00001
00002
00003
00004
00005
00006
00007
00008 #include "DcpsInfo_pch.h"
00009 #include "FederatorManagerImpl.h"
00010 #include "DCPSInfo_i.h"
00011 #include "DCPS_IR_Domain.h"
00012 #include "DCPS_IR_Participant.h"
00013
00014 #include "dds/DCPS/RepoIdConverter.h"
00015
00016 namespace OpenDDS {
00017 namespace Federator {
00018
00019 void
00020 ManagerImpl::unregisterCallback()
00021 {
00022
00023 }
00024
00025 void
00026 ManagerImpl::requestImage()
00027 {
00028
00029 }
00030
00031
00032
00033
00034
00035
00036
00037 void
00038 ManagerImpl::create(const Update::UTopic& topic)
00039 {
00040 if (CORBA::is_nil(this->topicWriter_.in())) {
00041
00042 return;
00043 }
00044
00045 TopicUpdate sample;
00046 sample.sender = this->id().id();
00047 sample.action = CreateEntity;
00048
00049 sample.id = topic.topicId;
00050 sample.domain = topic.domainId;
00051 sample.participant = topic.participantId;
00052 sample.topic = topic.name.c_str();
00053 sample.datatype = topic.dataType.c_str();
00054 sample.qos = topic.topicQos;
00055
00056 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00057 OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant);
00058 OpenDDS::DCPS::RepoIdConverter topic_converter(sample.id);
00059 ACE_DEBUG((LM_DEBUG,
00060 ACE_TEXT("(%P|%t) Federator::ManagerImpl::create( TopicUpdate): ")
00061 ACE_TEXT("repo %d - [ domain %d/ participant %C/ topic %C ]\n"),
00062 this->id().id(),
00063 sample.domain,
00064 std::string(part_converter).c_str(),
00065 std::string(topic_converter).c_str()));
00066 }
00067
00068 this->topicWriter_->write(sample, DDS::HANDLE_NIL);
00069 }
00070
00071 void
00072 ManagerImpl::create(const Update::UParticipant& participant)
00073 {
00074 if (CORBA::is_nil(this->participantWriter_.in())) {
00075
00076 return;
00077 }
00078
00079 ParticipantUpdate sample;
00080 sample.sender = this->id().id();
00081 sample.action = CreateEntity;
00082
00083 sample.owner = participant.owner;
00084 sample.domain = participant.domainId;
00085 sample.id = participant.participantId;
00086 sample.qos = participant.participantQos;
00087
00088 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00089 OpenDDS::DCPS::RepoIdConverter converter(sample.id);
00090 ACE_DEBUG((LM_DEBUG,
00091 ACE_TEXT("(%P|%t) Federator::ManagerImpl::create( ParticipantUpdate): ")
00092 ACE_TEXT("repo %d - [ domain %d/ participant %C ]\n"),
00093 this->id().id(),
00094 sample.domain,
00095 std::string(converter).c_str()));
00096 }
00097
00098 this->participantWriter_->write(sample, DDS::HANDLE_NIL);
00099 }
00100
00101 void
00102 ManagerImpl::create(const Update::URActor& reader)
00103 {
00104 if (CORBA::is_nil(this->subscriptionWriter_.in())) {
00105
00106 return;
00107 }
00108
00109 SubscriptionUpdate sample;
00110 sample.sender = this->id().id();
00111 sample.action = CreateEntity;
00112
00113 sample.domain = reader.domainId;
00114 sample.participant = reader.participantId;
00115 sample.topic = reader.topicId;
00116 sample.id = reader.actorId;
00117 sample.callback = reader.callback.c_str();
00118 sample.datareader_qos = reader.drdwQos;
00119 sample.subscriber_qos = reader.pubsubQos;
00120 sample.transport_info = reader.transportInterfaceInfo;
00121 sample.filter_class_name = reader.contentSubscriptionProfile.filterClassName;
00122 sample.filter_expression = reader.contentSubscriptionProfile.filterExpr;
00123 sample.expression_params = reader.contentSubscriptionProfile.exprParams;
00124
00125 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00126 OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant);
00127 OpenDDS::DCPS::RepoIdConverter sub_converter(sample.id);
00128 ACE_DEBUG((LM_DEBUG,
00129 ACE_TEXT("(%P|%t) Federator::ManagerImpl::create( SubscriptionUpdate): ")
00130 ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"),
00131 this->id().id(),
00132 sample.domain,
00133 std::string(part_converter).c_str(),
00134 std::string(sub_converter).c_str()));
00135 }
00136
00137 this->subscriptionWriter_->write(sample, DDS::HANDLE_NIL);
00138 }
00139
00140 void
00141 ManagerImpl::create(const Update::UWActor& writer)
00142 {
00143 if (CORBA::is_nil(this->publicationWriter_.in())) {
00144
00145 return;
00146 }
00147
00148 PublicationUpdate sample;
00149 sample.sender = this->id().id();
00150 sample.action = CreateEntity;
00151
00152 sample.domain = writer.domainId;
00153 sample.participant = writer.participantId;
00154 sample.topic = writer.topicId;
00155 sample.id = writer.actorId;
00156 sample.callback = writer.callback.c_str();
00157 sample.datawriter_qos = writer.drdwQos;
00158 sample.publisher_qos = writer.pubsubQos;
00159 sample.transport_info = writer.transportInterfaceInfo;
00160
00161 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00162 OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant);
00163 OpenDDS::DCPS::RepoIdConverter pub_converter(sample.id);
00164 ACE_DEBUG((LM_DEBUG,
00165 ACE_TEXT("(%P|%t) Federator::ManagerImpl::create( PublicationUpdate): ")
00166 ACE_TEXT("repo %d - [ domain %d/ participant %C/ publication %C ]\n"),
00167 this->id().id(),
00168 sample.domain,
00169 std::string(part_converter).c_str(),
00170 std::string(pub_converter).c_str()));
00171 }
00172
00173 this->publicationWriter_->write(sample, DDS::HANDLE_NIL);
00174 }
00175
00176 void
00177 ManagerImpl::create(const Update::OwnershipData& data)
00178 {
00179 if (CORBA::is_nil(this->ownerWriter_.in())) {
00180
00181 return;
00182 }
00183
00184 OwnerUpdate sample;
00185 sample.sender = this->id().id();
00186 sample.action = CreateEntity;
00187
00188 sample.domain = data.domain;
00189 sample.participant = data.participant;
00190 sample.owner = data.owner;
00191
00192 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00193 OpenDDS::DCPS::RepoIdConverter converter(sample.participant);
00194 ACE_DEBUG((LM_DEBUG,
00195 ACE_TEXT("(%P|%t) Federator::ManagerImpl::create( OwnerUpdate): ")
00196 ACE_TEXT("repo %d - [ domain %d/ participant %C/ sender %d/ owner %d ]\n"),
00197 this->id().id(),
00198 sample.domain,
00199 std::string(converter).c_str(),
00200 sample.sender,
00201 sample.owner));
00202 }
00203
00204 this->ownerWriter_->write(sample, DDS::HANDLE_NIL);
00205 }
00206
00207 void
00208 ManagerImpl::destroy(
00209 const Update::IdPath& id,
00210 Update::ItemType type,
00211 Update::ActorType actor)
00212 {
00213
00214
00215
00216
00217 if (id.domain == this->config_.federationDomain()) {
00218 return;
00219 }
00220
00221 switch (type) {
00222 case Update::Topic: {
00223 if (CORBA::is_nil(this->topicWriter_.in())) {
00224
00225 return;
00226 }
00227
00228 TopicUpdate sample;
00229 sample.sender = this->id().id();
00230 sample.action = DestroyEntity;
00231
00232 sample.id = id.id;
00233 sample.domain = id.domain;
00234 sample.participant = id.participant;
00235
00236 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00237 OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant);
00238 OpenDDS::DCPS::RepoIdConverter topic_converter(sample.id);
00239 ACE_DEBUG((LM_DEBUG,
00240 ACE_TEXT("(%P|%t) Federator::ManagerImpl::destroy( TopicUpdate): ")
00241 ACE_TEXT("repo %d - [ domain %d/ participant %C/ topic %C ]\n"),
00242 this->id().id(),
00243 sample.domain,
00244 std::string(part_converter).c_str(),
00245 std::string(topic_converter).c_str()));
00246 }
00247
00248 this->topicWriter_->write(sample, DDS::HANDLE_NIL);
00249 }
00250 break;
00251
00252 case Update::Participant: {
00253 if (CORBA::is_nil(this->participantWriter_.in())) {
00254
00255 return;
00256 }
00257
00258 ParticipantUpdate sample;
00259 sample.sender = this->id().id();
00260 sample.action = DestroyEntity;
00261
00262 sample.domain = id.domain;
00263 sample.id = id.id;
00264
00265 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00266 OpenDDS::DCPS::RepoIdConverter converter(sample.id);
00267 ACE_DEBUG((LM_DEBUG,
00268 ACE_TEXT("(%P|%t) Federator::ManagerImpl::destroy( ParticipantUpdate): ")
00269 ACE_TEXT("repo %d - [ domain %d/ participant %C ]\n"),
00270 this->id().id(),
00271 sample.domain,
00272 std::string(converter).c_str()));
00273 }
00274
00275 this->participantWriter_->write(sample, DDS::HANDLE_NIL);
00276 }
00277 break;
00278
00279 case Update::Actor:
00280
00281
00282 switch (actor) {
00283 case Update::DataWriter: {
00284 if (CORBA::is_nil(this->publicationWriter_.in())) {
00285
00286 return;
00287 }
00288
00289 PublicationUpdate sample;
00290 sample.sender = this->id().id();
00291 sample.action = DestroyEntity;
00292
00293 sample.domain = id.domain;
00294 sample.participant = id.participant;
00295 sample.id = id.id;
00296
00297 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00298 OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant);
00299 OpenDDS::DCPS::RepoIdConverter pub_converter(sample.id);
00300 ACE_DEBUG((LM_DEBUG,
00301 ACE_TEXT("(%P|%t) Federator::ManagerImpl::destroy( PublicationUpdate): ")
00302 ACE_TEXT("repo %d - [ domain %d/ participant %C/ publication %C ]\n"),
00303 this->id().id(),
00304 sample.domain,
00305 std::string(part_converter).c_str(),
00306 std::string(pub_converter).c_str()));
00307 }
00308
00309 this->publicationWriter_->write(sample, DDS::HANDLE_NIL);
00310 }
00311 break;
00312
00313 case Update::DataReader: {
00314 if (CORBA::is_nil(this->subscriptionWriter_.in())) {
00315
00316 return;
00317 }
00318
00319 SubscriptionUpdate sample;
00320 sample.sender = this->id().id();
00321 sample.action = DestroyEntity;
00322
00323 sample.domain = id.domain;
00324 sample.participant = id.participant;
00325 sample.id = id.id;
00326
00327 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00328 OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant);
00329 OpenDDS::DCPS::RepoIdConverter sub_converter(sample.id);
00330 ACE_DEBUG((LM_DEBUG,
00331 ACE_TEXT("(%P|%t) Federator::ManagerImpl::destroy( SubscriptionUpdate): ")
00332 ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"),
00333 this->id().id(),
00334 sample.domain,
00335 std::string(part_converter).c_str(),
00336 std::string(sub_converter).c_str()));
00337 }
00338
00339 this->subscriptionWriter_->write(sample, DDS::HANDLE_NIL);
00340 }
00341 break;
00342 }
00343
00344 break;
00345 }
00346 }
00347
00348 void
00349 ManagerImpl::update(const Update::IdPath& id, const DDS::DomainParticipantQos& qos)
00350 {
00351 if (CORBA::is_nil(this->participantWriter_.in())) {
00352
00353 return;
00354 }
00355
00356 ParticipantUpdate sample;
00357 sample.sender = this->id().id();
00358 sample.action = UpdateQosValue1;
00359
00360 sample.domain = id.domain;
00361 sample.id = id.id;
00362 sample.qos = qos;
00363
00364 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00365 OpenDDS::DCPS::RepoIdConverter converter(sample.id);
00366 ACE_DEBUG((LM_DEBUG,
00367 ACE_TEXT("(%P|%t) Federator::ManagerImpl::update( ParticipantUpdate): ")
00368 ACE_TEXT("repo %d - [ domain %d/ participant %C ]\n"),
00369 this->id().id(),
00370 sample.domain,
00371 std::string(converter).c_str()));
00372 }
00373
00374 this->participantWriter_->write(sample, DDS::HANDLE_NIL);
00375 }
00376
00377 void
00378 ManagerImpl::update(const Update::IdPath& id, const DDS::TopicQos& qos)
00379 {
00380 if (CORBA::is_nil(this->topicWriter_.in())) {
00381
00382 return;
00383 }
00384
00385 TopicUpdate sample;
00386 sample.sender = this->id().id();
00387 sample.action = UpdateQosValue1;
00388
00389 sample.id = id.id;
00390 sample.domain = id.domain;
00391 sample.participant = id.participant;
00392 sample.qos = qos;
00393
00394 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00395 OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant);
00396 OpenDDS::DCPS::RepoIdConverter topic_converter(sample.id);
00397 ACE_DEBUG((LM_DEBUG,
00398 ACE_TEXT("(%P|%t) Federator::ManagerImpl::update( TopicUpdate): ")
00399 ACE_TEXT("repo %d - [ domain %d/ participant %C/ topic %C ]\n"),
00400 this->id().id(),
00401 sample.domain,
00402 std::string(part_converter).c_str(),
00403 std::string(topic_converter).c_str()));
00404 }
00405
00406 this->topicWriter_->write(sample, DDS::HANDLE_NIL);
00407 }
00408
00409 void
00410 ManagerImpl::update(const Update::IdPath& id, const DDS::DataWriterQos& qos)
00411 {
00412 if (CORBA::is_nil(this->publicationWriter_.in())) {
00413
00414 return;
00415 }
00416
00417 PublicationUpdate sample;
00418 sample.sender = this->id().id();
00419 sample.action = UpdateQosValue1;
00420
00421 sample.domain = id.domain;
00422 sample.participant = id.participant;
00423 sample.id = id.id;
00424 sample.datawriter_qos = qos;
00425
00426 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00427 OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant);
00428 OpenDDS::DCPS::RepoIdConverter pub_converter(sample.id);
00429 ACE_DEBUG((LM_DEBUG,
00430 ACE_TEXT("(%P|%t) Federator::ManagerImpl::update( WriterUpdate): ")
00431 ACE_TEXT("repo %d - [ domain %d/ participant %C/ publication %C ]\n"),
00432 this->id().id(),
00433 sample.domain,
00434 std::string(part_converter).c_str(),
00435 std::string(pub_converter).c_str()));
00436 }
00437
00438 this->publicationWriter_->write(sample, DDS::HANDLE_NIL);
00439 }
00440
00441 void
00442 ManagerImpl::update(const Update::IdPath& id, const DDS::PublisherQos& qos)
00443 {
00444 if (CORBA::is_nil(this->publicationWriter_.in())) {
00445
00446 return;
00447 }
00448
00449 PublicationUpdate sample;
00450 sample.sender = this->id().id();
00451 sample.action = UpdateQosValue2;
00452
00453 sample.domain = id.domain;
00454 sample.participant = id.participant;
00455 sample.id = id.id;
00456 sample.publisher_qos = qos;
00457
00458 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00459 OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant);
00460 OpenDDS::DCPS::RepoIdConverter pub_converter(sample.id);
00461 ACE_DEBUG((LM_DEBUG,
00462 ACE_TEXT("(%P|%t) Federator::ManagerImpl::update( PublisherUpdate): ")
00463 ACE_TEXT("repo %d - [ domain %d/ participant %C/ publication %C ]\n"),
00464 this->id().id(),
00465 sample.domain,
00466 std::string(part_converter).c_str(),
00467 std::string(pub_converter).c_str()));
00468 }
00469
00470 this->publicationWriter_->write(sample, DDS::HANDLE_NIL);
00471 }
00472
00473 void
00474 ManagerImpl::update(const Update::IdPath& id, const DDS::DataReaderQos& qos)
00475 {
00476 if (CORBA::is_nil(this->subscriptionWriter_.in())) {
00477
00478 return;
00479 }
00480
00481 SubscriptionUpdate sample;
00482 sample.sender = this->id().id();
00483 sample.action = UpdateQosValue1;
00484
00485 sample.domain = id.domain;
00486 sample.participant = id.participant;
00487 sample.id = id.id;
00488 sample.datareader_qos = qos;
00489
00490 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00491 OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant);
00492 OpenDDS::DCPS::RepoIdConverter sub_converter(sample.id);
00493 ACE_DEBUG((LM_DEBUG,
00494 ACE_TEXT("(%P|%t) Federator::ManagerImpl::update( ReaderUpdate): ")
00495 ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"),
00496 this->id().id(),
00497 sample.domain,
00498 std::string(part_converter).c_str(),
00499 std::string(sub_converter).c_str()));
00500 }
00501
00502 this->subscriptionWriter_->write(sample, DDS::HANDLE_NIL);
00503 }
00504
00505 void
00506 ManagerImpl::update(const Update::IdPath& id, const DDS::StringSeq& params)
00507 {
00508 if (CORBA::is_nil(this->subscriptionWriter_.in())) {
00509
00510 return;
00511 }
00512
00513 SubscriptionUpdate sample;
00514 sample.sender = this->id().id();
00515 sample.action = UpdateFilterExpressionParams;
00516 sample.domain = id.domain;
00517 sample.participant = id.participant;
00518 sample.id = id.id;
00519 sample.expression_params = params;
00520
00521 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00522 OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant);
00523 OpenDDS::DCPS::RepoIdConverter sub_converter(sample.id);
00524 ACE_DEBUG((LM_DEBUG,
00525 ACE_TEXT("(%P|%t) Federator::ManagerImpl::update(FilterParams): ")
00526 ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"),
00527 this->id().id(),
00528 sample.domain,
00529 std::string(part_converter).c_str(),
00530 std::string(sub_converter).c_str()));
00531 }
00532
00533 this->subscriptionWriter_->write(sample, DDS::HANDLE_NIL);
00534 }
00535
00536 void
00537 ManagerImpl::update(const Update::IdPath& id, const DDS::SubscriberQos& qos)
00538 {
00539 if (CORBA::is_nil(this->subscriptionWriter_.in())) {
00540
00541 return;
00542 }
00543
00544 SubscriptionUpdate sample;
00545 sample.sender = this->id().id();
00546 sample.action = UpdateQosValue2;
00547
00548 sample.domain = id.domain;
00549 sample.participant = id.participant;
00550 sample.id = id.id;
00551 sample.subscriber_qos = qos;
00552
00553 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00554 OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant);
00555 OpenDDS::DCPS::RepoIdConverter sub_converter(sample.id);
00556 ACE_DEBUG((LM_DEBUG,
00557 ACE_TEXT("(%P|%t) Federator::ManagerImpl::update( SubscriberUpdate): ")
00558 ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"),
00559 this->id().id(),
00560 sample.domain,
00561 std::string(part_converter).c_str(),
00562 std::string(sub_converter).c_str()));
00563 }
00564
00565 this->subscriptionWriter_->write(sample, DDS::HANDLE_NIL);
00566 }
00567
00568
00569
00570
00571
00572
00573
00574 void
00575 ManagerImpl::processCreate(const OwnerUpdate* sample, const DDS::SampleInfo* )
00576 {
00577 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00578 OpenDDS::DCPS::RepoIdConverter converter(sample->participant);
00579 ACE_DEBUG((LM_DEBUG,
00580 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processCreate( OwnerUpdate): ")
00581 ACE_TEXT("repo %d - [ domain %d/ participant %C/ sender %d/ owner %d ]\n"),
00582 this->id().id(),
00583 sample->domain,
00584 std::string(converter).c_str(),
00585 sample->sender,
00586 sample->owner));
00587 }
00588
00589
00590 if (false == this->info_->changeOwnership(sample->domain,
00591 sample->participant,
00592 sample->sender,
00593 sample->owner)) {
00594 {
00595 ACE_GUARD(ACE_Thread_Mutex,
00596 guard,
00597 this->deferred_lock_);
00598 this->deferredOwnerships_.push_back(*sample);
00599 }
00600
00601 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00602 ACE_DEBUG((LM_DEBUG,
00603 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processCreate( OwnerUpdate): ")
00604 ACE_TEXT("deferred update.\n")));
00605 }
00606 }
00607
00608 this->processDeferred();
00609 }
00610
00611 void
00612 ManagerImpl::processCreate(const PublicationUpdate* sample, const DDS::SampleInfo* )
00613 {
00614 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00615 OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
00616 OpenDDS::DCPS::RepoIdConverter pub_converter(sample->id);
00617 ACE_DEBUG((LM_DEBUG,
00618 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processCreate( PublicationUpdate): ")
00619 ACE_TEXT("repo %d - [ domain %d/ participant %C/ publication %C ]\n"),
00620 this->id().id(),
00621 sample->domain,
00622 std::string(part_converter).c_str(),
00623 std::string(pub_converter).c_str()));
00624 }
00625
00626 if (false == this->info_->add_publication(sample->domain,
00627 sample->participant,
00628 sample->topic,
00629 sample->id,
00630 sample->callback,
00631 sample->datawriter_qos,
00632 sample->transport_info,
00633 sample->publisher_qos,
00634 true)) {
00635 {
00636 ACE_GUARD(ACE_Thread_Mutex,
00637 guard,
00638 this->deferred_lock_);
00639 this->deferredPublications_.push_back(*sample);
00640 }
00641
00642 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00643 ACE_DEBUG((LM_DEBUG,
00644 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processCreate( PublicationUpdate): ")
00645 ACE_TEXT("deferred update.\n")));
00646 }
00647 }
00648
00649 this->processDeferred();
00650 }
00651
00652 void
00653 ManagerImpl::processCreate(const SubscriptionUpdate* sample, const DDS::SampleInfo* )
00654 {
00655 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00656 OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
00657 OpenDDS::DCPS::RepoIdConverter sub_converter(sample->id);
00658 ACE_DEBUG((LM_DEBUG,
00659 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processCreate( SubscriptionUpdate): ")
00660 ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"),
00661 this->id().id(),
00662 sample->domain,
00663 std::string(part_converter).c_str(),
00664 std::string(sub_converter).c_str()));
00665 }
00666
00667 if (false == this->info_->add_subscription(sample->domain,
00668 sample->participant,
00669 sample->topic,
00670 sample->id,
00671 sample->callback,
00672 sample->datareader_qos,
00673 sample->transport_info,
00674 sample->subscriber_qos,
00675 sample->filter_class_name,
00676 sample->filter_expression,
00677 sample->expression_params,
00678 true)) {
00679 {
00680 ACE_GUARD(ACE_Thread_Mutex,
00681 guard,
00682 this->deferred_lock_);
00683 this->deferredSubscriptions_.push_back(*sample);
00684 }
00685
00686 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00687 ACE_DEBUG((LM_DEBUG,
00688 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processCreate( SubscriptionUpdate): ")
00689 ACE_TEXT("deferred update.\n")));
00690 }
00691 }
00692
00693 this->processDeferred();
00694 }
00695
00696 void
00697 ManagerImpl::processCreate(const ParticipantUpdate* sample, const DDS::SampleInfo* )
00698 {
00699 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00700 OpenDDS::DCPS::RepoIdConverter converter(sample->id);
00701 ACE_DEBUG((LM_DEBUG,
00702 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processCreate( ParticipantUpdate): ")
00703 ACE_TEXT("repo %d - [ domain %d/ participant %C/ owner %d ]\n"),
00704 this->id().id(),
00705 sample->domain,
00706 std::string(converter).c_str(),
00707 sample->owner));
00708 }
00709
00710 this->info_->add_domain_participant(
00711 sample->domain,
00712 sample->id,
00713 sample->qos);
00714 bool ownershipChanged = this->info_->changeOwnership(
00715 sample->domain,
00716 sample->id,
00717 sample->sender,
00718 sample->owner);
00719 if (!ownershipChanged) {
00720 ACE_ERROR((LM_ERROR,
00721 ACE_TEXT("(%P|%t) ERROR: ")
00722 ACE_TEXT("OpenDDS::Federator::ManagerImpl::processCreate(), ")
00723 ACE_TEXT("Could not change ownership\n")));
00724 }
00725 this->processDeferred();
00726 }
00727
00728 void
00729 ManagerImpl::processCreate(const TopicUpdate* sample, const DDS::SampleInfo* )
00730 {
00731 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00732 OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
00733 OpenDDS::DCPS::RepoIdConverter topic_converter(sample->id);
00734 ACE_DEBUG((LM_DEBUG,
00735 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processCreate( TopicUpdate): ")
00736 ACE_TEXT("repo %d - [ domain %d/ participant %C/ topic %C ]\n"),
00737 this->id().id(),
00738 sample->domain,
00739 std::string(part_converter).c_str(),
00740 std::string(topic_converter).c_str()));
00741 }
00742
00743 if (false == this->info_->add_topic(sample->id,
00744 sample->domain,
00745 sample->participant,
00746 sample->topic,
00747 sample->datatype,
00748 sample->qos)) {
00749 {
00750 ACE_GUARD(ACE_Thread_Mutex,
00751 guard,
00752 this->deferred_lock_);
00753 this->deferredTopics_.push_back(*sample);
00754 }
00755
00756 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00757 ACE_DEBUG((LM_DEBUG,
00758 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processCreate( TopicUpdate): ")
00759 ACE_TEXT("deferred update.\n")));
00760 }
00761 }
00762
00763 this->processDeferred();
00764 }
00765
00766 void
00767 ManagerImpl::processDeferred()
00768 {
00769 ACE_GUARD(ACE_Thread_Mutex,
00770 guard,
00771 this->deferred_lock_);
00772
00773 {
00774 std::list<OwnerUpdate>::iterator current = this->deferredOwnerships_.begin();
00775
00776 while (current != this->deferredOwnerships_.end()) {
00777 if (this->info_->changeOwnership(current->domain,
00778 current->participant,
00779 current->sender,
00780 current->owner)) {
00781 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00782 OpenDDS::DCPS::RepoIdConverter converter(current->participant);
00783 ACE_DEBUG((LM_DEBUG,
00784 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDeferred( OwnerUpdate): ")
00785 ACE_TEXT("repo %d - [ domain %d/ participant %C/ sender %d/ owner %d ]\n"),
00786 this->id().id(),
00787 current->domain,
00788 std::string(converter).c_str(),
00789 current->sender,
00790 current->owner));
00791 }
00792
00793 current = this->deferredOwnerships_.erase(current);
00794
00795 } else {
00796 ++ current;
00797 }
00798 }
00799 }
00800
00801 {
00802 std::list<TopicUpdate>::iterator current = this->deferredTopics_.begin();
00803
00804 while (current != this->deferredTopics_.end()) {
00805 if (true == this->info_->add_topic(current->id,
00806 current->domain,
00807 current->participant,
00808 current->topic,
00809 current->datatype,
00810 current->qos)) {
00811 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00812 OpenDDS::DCPS::RepoIdConverter part_converter(current->participant);
00813 OpenDDS::DCPS::RepoIdConverter topic_converter(current->id);
00814 ACE_DEBUG((LM_DEBUG,
00815 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDeferred( TopicUpdate): ")
00816 ACE_TEXT("repo %d - [ domain %d/ participant %C/ topic %C ]\n"),
00817 this->id().id(),
00818 current->domain,
00819 std::string(part_converter).c_str(),
00820 std::string(topic_converter).c_str()));
00821 }
00822
00823 current = this->deferredTopics_.erase(current);
00824
00825 } else {
00826 ++ current;
00827 }
00828 }
00829 }
00830
00831 {
00832 std::list<PublicationUpdate>::iterator current = this->deferredPublications_.begin();
00833
00834 while (current != this->deferredPublications_.end()) {
00835
00836 if (true == this->info_->add_publication(current->domain,
00837 current->participant,
00838 current->topic,
00839 current->id,
00840 current->callback,
00841 current->datawriter_qos,
00842 current->transport_info,
00843 current->publisher_qos,
00844 true)) {
00845 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00846 OpenDDS::DCPS::RepoIdConverter part_converter(current->participant);
00847 OpenDDS::DCPS::RepoIdConverter pub_converter(current->id);
00848 ACE_DEBUG((LM_DEBUG,
00849 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDeferred( PublicationUpdate): ")
00850 ACE_TEXT("repo %d - [ domain %d/ participant %C/ publication %C ]\n"),
00851 this->id().id(),
00852 current->domain,
00853 std::string(part_converter).c_str(),
00854 std::string(pub_converter).c_str()));
00855 }
00856
00857 current = this->deferredPublications_.erase(current);
00858
00859 } else {
00860 ++ current;
00861 }
00862 }
00863 }
00864
00865 {
00866 std::list<SubscriptionUpdate>::iterator current = this->deferredSubscriptions_.begin();
00867
00868 while (current != this->deferredSubscriptions_.end()) {
00869
00870 if (true == this->info_->add_subscription(current->domain,
00871 current->participant,
00872 current->topic,
00873 current->id,
00874 current->callback,
00875 current->datareader_qos,
00876 current->transport_info,
00877 current->subscriber_qos,
00878 current->filter_class_name,
00879 current->filter_expression,
00880 current->expression_params,
00881 true)) {
00882 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00883 OpenDDS::DCPS::RepoIdConverter part_converter(current->participant);
00884 OpenDDS::DCPS::RepoIdConverter sub_converter(current->id);
00885 ACE_DEBUG((LM_DEBUG,
00886 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDeferred( SubscriptionUpdate): ")
00887 ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"),
00888 this->id().id(),
00889 current->domain,
00890 std::string(part_converter).c_str(),
00891 std::string(sub_converter).c_str()));
00892 }
00893
00894 current = this->deferredSubscriptions_.erase(current);
00895
00896 } else {
00897 ++ current;
00898 }
00899 }
00900 }
00901
00902 }
00903
00904 void
00905 ManagerImpl::processUpdateQos1(const OwnerUpdate* sample, const DDS::SampleInfo* )
00906 {
00907 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00908 OpenDDS::DCPS::RepoIdConverter converter(sample->participant);
00909 ACE_DEBUG((LM_DEBUG,
00910 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processUpdateQos1( OwnerUpdate): ")
00911 ACE_TEXT("repo %d - [ domain %d/ participant %C/ sender %d/ owner %d ]\n"),
00912 this->id().id(),
00913 sample->domain,
00914 std::string(converter).c_str(),
00915 sample->sender,
00916 sample->owner));
00917 }
00918
00919 if (false == this->info_->changeOwnership(sample->domain,
00920 sample->participant,
00921 sample->sender,
00922 sample->owner)) {
00923 {
00924 ACE_GUARD(ACE_Thread_Mutex,
00925 guard,
00926 this->deferred_lock_);
00927
00928 this->deferredOwnerships_.push_back(*sample);
00929 }
00930 ACE_DEBUG((LM_DEBUG,
00931 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processUpdateQos1( OwnerUpdate): ")
00932 ACE_TEXT("deferred update.\n")));
00933 }
00934 }
00935
00936 void
00937 ManagerImpl::processUpdateQos1(const PublicationUpdate* sample, const DDS::SampleInfo* )
00938 {
00939 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00940 OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
00941 OpenDDS::DCPS::RepoIdConverter pub_converter(sample->id);
00942 ACE_DEBUG((LM_DEBUG,
00943 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processUpdateQos1( PublicationUpdate): ")
00944 ACE_TEXT("repo %d - [ domain %d/ participant %C/ publication %C ]\n"),
00945 this->id().id(),
00946 sample->domain,
00947 std::string(part_converter).c_str(),
00948 std::string(pub_converter).c_str()));
00949 }
00950
00951 this->info_->update_publication_qos(
00952 sample->domain,
00953 sample->participant,
00954 sample->id,
00955 sample->datawriter_qos);
00956 }
00957
00958 void
00959 ManagerImpl::processUpdateQos2(const PublicationUpdate* sample, const DDS::SampleInfo* )
00960 {
00961 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00962 OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
00963 OpenDDS::DCPS::RepoIdConverter pub_converter(sample->id);
00964 ACE_DEBUG((LM_DEBUG,
00965 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processUpdateQos2( PublicationUpdate): ")
00966 ACE_TEXT("repo %d - [ domain %d/ participant %C/ publication %C ]\n"),
00967 this->id().id(),
00968 sample->domain,
00969 std::string(part_converter).c_str(),
00970 std::string(pub_converter).c_str()));
00971 }
00972
00973 this->info_->update_publication_qos(
00974 sample->domain,
00975 sample->participant,
00976 sample->id,
00977 sample->publisher_qos);
00978 }
00979
00980 void
00981 ManagerImpl::processUpdateQos1(const SubscriptionUpdate* sample, const DDS::SampleInfo* )
00982 {
00983 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00984 OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
00985 OpenDDS::DCPS::RepoIdConverter sub_converter(sample->id);
00986 ACE_DEBUG((LM_DEBUG,
00987 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processUpdateQos1( SubscriptionUpdate): ")
00988 ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"),
00989 this->id().id(),
00990 sample->domain,
00991 std::string(part_converter).c_str(),
00992 std::string(sub_converter).c_str()));
00993 }
00994
00995 this->info_->update_subscription_qos(
00996 sample->domain,
00997 sample->participant,
00998 sample->id,
00999 sample->datareader_qos);
01000 }
01001
01002 void
01003 ManagerImpl::processUpdateQos2(const SubscriptionUpdate* sample, const DDS::SampleInfo* )
01004 {
01005 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
01006 OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
01007 OpenDDS::DCPS::RepoIdConverter sub_converter(sample->id);
01008 ACE_DEBUG((LM_DEBUG,
01009 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processUpdateQos2( SubscriptionUpdate): ")
01010 ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"),
01011 this->id().id(),
01012 sample->domain,
01013 std::string(part_converter).c_str(),
01014 std::string(sub_converter).c_str()));
01015 }
01016
01017 this->info_->update_subscription_qos(
01018 sample->domain,
01019 sample->participant,
01020 sample->id,
01021 sample->subscriber_qos);
01022 }
01023
01024 void
01025 ManagerImpl::processUpdateFilterExpressionParams(
01026 const SubscriptionUpdate* sample, const DDS::SampleInfo* )
01027 {
01028 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
01029 OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
01030 OpenDDS::DCPS::RepoIdConverter sub_converter(sample->id);
01031 ACE_DEBUG((LM_DEBUG,
01032 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processUpdateFilterExpressionParams(SubscriptionUpdate): ")
01033 ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"),
01034 this->id().id(),
01035 sample->domain,
01036 std::string(part_converter).c_str(),
01037 std::string(sub_converter).c_str()));
01038 }
01039
01040 this->info_->update_subscription_params(
01041 sample->domain,
01042 sample->participant,
01043 sample->id,
01044 sample->expression_params);
01045 }
01046
01047 void
01048 ManagerImpl::processUpdateQos1(const ParticipantUpdate* sample, const DDS::SampleInfo* )
01049 {
01050 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
01051 OpenDDS::DCPS::RepoIdConverter converter(sample->id);
01052 ACE_DEBUG((LM_DEBUG,
01053 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processUpdateQos1( ParticipantUpdate): ")
01054 ACE_TEXT("repo %d - [ domain %d/ participant %C ]\n"),
01055 this->id().id(),
01056 sample->domain,
01057 std::string(converter).c_str()));
01058 }
01059
01060 this->info_->update_domain_participant_qos(
01061 sample->domain,
01062 sample->id,
01063 sample->qos);
01064 }
01065
01066 void
01067 ManagerImpl::processUpdateQos1(const TopicUpdate* sample, const DDS::SampleInfo* )
01068 {
01069 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
01070 OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
01071 OpenDDS::DCPS::RepoIdConverter topic_converter(sample->id);
01072 ACE_DEBUG((LM_DEBUG,
01073 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processUpdateQos1( TopicUpdate): ")
01074 ACE_TEXT("repo %d - [ domain %d/ participant %C/ topic %C ]\n"),
01075 this->id().id(),
01076 sample->domain,
01077 std::string(part_converter).c_str(),
01078 std::string(topic_converter).c_str()));
01079 }
01080
01081 this->info_->update_topic_qos(
01082 sample->id,
01083 sample->domain,
01084 sample->participant,
01085 sample->qos);
01086 }
01087
01088 void
01089 ManagerImpl::processDelete(const OwnerUpdate* sample, const DDS::SampleInfo* )
01090 {
01091 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
01092 OpenDDS::DCPS::RepoIdConverter converter(sample->participant);
01093 ACE_DEBUG((LM_DEBUG,
01094 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( OwnerUpdate): ")
01095 ACE_TEXT("repo %d - [ domain %d/ participant %C/ sender %d/ owner %d ]\n"),
01096 this->id().id(),
01097 sample->domain,
01098 std::string(converter).c_str(),
01099 sample->sender,
01100 sample->owner));
01101 }
01102
01103
01104 if (false == this->info_->changeOwnership(sample->domain,
01105 sample->participant,
01106 sample->sender,
01107 sample->owner)) {
01108 {
01109 ACE_GUARD(ACE_Thread_Mutex,
01110 guard,
01111 this->deferred_lock_);
01112 this->deferredOwnerships_.push_back(*sample);
01113 }
01114 ACE_DEBUG((LM_DEBUG,
01115 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( OwnerUpdate): ")
01116 ACE_TEXT("deferred update.\n")));
01117 }
01118 }
01119
01120 void
01121 ManagerImpl::processDelete(const PublicationUpdate* sample, const DDS::SampleInfo* )
01122 {
01123 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
01124 OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
01125 OpenDDS::DCPS::RepoIdConverter pub_converter(sample->id);
01126 ACE_DEBUG((LM_DEBUG,
01127 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( PublicationUpdate): ")
01128 ACE_TEXT("repo %d - [ domain %d/ participant %C/ publication %C ]\n"),
01129 this->id().id(),
01130 sample->domain,
01131 std::string(part_converter).c_str(),
01132 std::string(pub_converter).c_str()));
01133 }
01134
01135 try {
01136 this->info_->remove_publication(
01137 sample->domain,
01138 sample->participant,
01139 sample->id);
01140
01141 } catch (OpenDDS::DCPS::Invalid_Participant&) {
01142 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
01143 ACE_DEBUG((LM_DEBUG,
01144 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( PublicationUpdate): ")
01145 ACE_TEXT("the participant was already removed.\n")));
01146 }
01147 }
01148 }
01149
01150 void
01151 ManagerImpl::processDelete(const SubscriptionUpdate* sample, const DDS::SampleInfo* )
01152 {
01153 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
01154 OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
01155 OpenDDS::DCPS::RepoIdConverter sub_converter(sample->id);
01156 ACE_DEBUG((LM_DEBUG,
01157 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( SubscriptionUpdate): ")
01158 ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"),
01159 this->id().id(),
01160 sample->domain,
01161 std::string(part_converter).c_str(),
01162 std::string(sub_converter).c_str()));
01163 }
01164
01165 try {
01166 this->info_->remove_subscription(
01167 sample->domain,
01168 sample->participant,
01169 sample->id);
01170
01171 } catch (OpenDDS::DCPS::Invalid_Participant&) {
01172 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
01173 ACE_DEBUG((LM_DEBUG,
01174 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( SubscriptionUpdate): ")
01175 ACE_TEXT("the participant was already removed.\n")));
01176 }
01177 }
01178 }
01179
01180 void
01181 ManagerImpl::processDelete(const ParticipantUpdate* sample, const DDS::SampleInfo* )
01182 {
01183 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
01184 OpenDDS::DCPS::RepoIdConverter converter(sample->id);
01185 ACE_DEBUG((LM_DEBUG,
01186 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( ParticipantUpdate): ")
01187 ACE_TEXT("repo %d - [ domain %d/ participant %C ]\n"),
01188 this->id().id(),
01189 sample->domain,
01190 std::string(converter).c_str()));
01191 }
01192 try {
01193 this->info_->remove_domain_participant(
01194 sample->domain,
01195 sample->id);
01196 } catch (OpenDDS::DCPS::Invalid_Participant&) {
01197 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
01198 ACE_DEBUG((LM_DEBUG,
01199 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( ParticipantUpdate): ")
01200 ACE_TEXT("the participant was already removed.\n")));
01201 }
01202 }
01203 }
01204
01205 void
01206 ManagerImpl::processDelete(const TopicUpdate* sample, const DDS::SampleInfo* )
01207 {
01208 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
01209 OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
01210 OpenDDS::DCPS::RepoIdConverter topic_converter(sample->id);
01211 ACE_DEBUG((LM_DEBUG,
01212 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( TopicUpdate): ")
01213 ACE_TEXT("repo %d - [ domain %d/ participant %C/ topic %C ]\n"),
01214 this->id().id(),
01215 sample->domain,
01216 std::string(part_converter).c_str(),
01217 std::string(topic_converter).c_str()));
01218 }
01219
01220 try {
01221 this->info_->remove_topic(
01222 sample->domain,
01223 sample->participant,
01224 sample->id);
01225
01226 } catch (OpenDDS::DCPS::Invalid_Participant&) {
01227 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
01228 ACE_DEBUG((LM_DEBUG,
01229 ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( TopicUpdate): ")
01230 ACE_TEXT("the participant was already removed.\n")));
01231 }
01232 }
01233 }
01234
01235 void
01236 ManagerImpl::pushState(Manager_ptr peer)
01237 {
01238
01239
01240
01241
01242
01243
01244
01245
01246
01247
01248
01249
01250
01251 for (DCPS_IR_Domain_Map::const_iterator currentDomain
01252 = this->info_->domains().begin();
01253 currentDomain != this->info_->domains().end();
01254 ++currentDomain) {
01255
01256 if (currentDomain->second->get_id() == this->config_.federationDomain()) {
01257
01258
01259 }
01260
01261
01262 for (DCPS_IR_Participant_Map::const_iterator currentParticipant
01263 = currentDomain->second->participants().begin();
01264 currentParticipant != currentDomain->second->participants().end();
01265 ++currentParticipant) {
01266
01267 if (currentParticipant->second->isBitPublisher() == true) {
01268
01269 continue;
01270 }
01271
01272
01273 ParticipantUpdate participantSample;
01274 participantSample.sender = this->id().id();
01275 participantSample.action = CreateEntity;
01276
01277 participantSample.owner = currentParticipant->second->owner();
01278 participantSample.domain = currentDomain->second->get_id();
01279 participantSample.id = currentParticipant->second->get_id();
01280 participantSample.qos = *currentParticipant->second->get_qos();
01281
01282 peer->initializeParticipant(participantSample);
01283
01284
01285 OwnerUpdate ownerSample;
01286 ownerSample.sender = this->id().id();
01287 ownerSample.action = CreateEntity;
01288
01289 ownerSample.domain = currentDomain->second->get_id();
01290 ownerSample.participant = currentParticipant->second->get_id();
01291 ownerSample.owner = currentParticipant->second->owner();
01292
01293 peer->initializeOwner(ownerSample);
01294 }
01295
01296
01297 for (DCPS_IR_Participant_Map::const_iterator currentParticipant
01298 = currentDomain->second->participants().begin();
01299 currentParticipant != currentDomain->second->participants().end();
01300 ++currentParticipant) {
01301
01302 if (currentParticipant->second->isBitPublisher() == true) {
01303
01304 continue;
01305 }
01306
01307
01308 for (DCPS_IR_Topic_Map::const_iterator currentTopic
01309 = currentParticipant->second->topics().begin();
01310 currentTopic != currentParticipant->second->topics().end();
01311 ++currentTopic) {
01312 TopicUpdate topicSample;
01313 topicSample.sender = this->id().id();
01314 topicSample.action = CreateEntity;
01315
01316 topicSample.id = currentTopic->second->get_id();
01317 topicSample.domain = currentDomain->second->get_id();
01318 topicSample.participant = currentTopic->second->get_participant_id();
01319 topicSample.topic = currentTopic->second->get_topic_description()->get_name();
01320 topicSample.datatype = currentTopic->second->get_topic_description()->get_dataTypeName();
01321 topicSample.qos = *currentTopic->second->get_topic_qos();
01322
01323 peer->initializeTopic(topicSample);
01324 }
01325
01326
01327 for (DCPS_IR_Publication_Map::const_iterator currentPublication
01328 = currentParticipant->second->publications().begin();
01329 currentPublication != currentParticipant->second->publications().end();
01330 ++currentPublication) {
01331 PublicationUpdate publicationSample;
01332 publicationSample.sender = this->id().id();
01333 publicationSample.action = CreateEntity;
01334
01335 DCPS_IR_Publication* p = currentPublication->second;
01336 CORBA::ORB_var orb = this->info_->orb();
01337 CORBA::String_var callback = orb->object_to_string(p->writer());
01338
01339 publicationSample.domain = currentDomain->second->get_id();
01340 publicationSample.participant = p->get_participant_id();
01341 publicationSample.topic = p->get_topic_id();
01342 publicationSample.id = p->get_id();
01343 publicationSample.callback = callback.in();
01344 publicationSample.datawriter_qos = *p->get_datawriter_qos();
01345 publicationSample.publisher_qos = *p->get_publisher_qos();
01346 publicationSample.transport_info = p->get_transportLocatorSeq();
01347
01348 peer->initializePublication(publicationSample);
01349 }
01350
01351
01352 for (DCPS_IR_Subscription_Map::const_iterator currentSubscription
01353 = currentParticipant->second->subscriptions().begin();
01354 currentSubscription != currentParticipant->second->subscriptions().end();
01355 ++currentSubscription) {
01356 SubscriptionUpdate subscriptionSample;
01357 subscriptionSample.sender = this->id().id();
01358 subscriptionSample.action = CreateEntity;
01359
01360 DCPS_IR_Subscription* s = currentSubscription->second;
01361 CORBA::ORB_var orb = this->info_->orb();
01362 CORBA::String_var callback = orb->object_to_string(s->reader());
01363
01364 subscriptionSample.domain = currentDomain->second->get_id();
01365 subscriptionSample.participant = s->get_participant_id();
01366 subscriptionSample.topic = s->get_topic_id();
01367 subscriptionSample.id = s->get_id();
01368 subscriptionSample.callback = callback.in();
01369 subscriptionSample.datareader_qos = *s->get_datareader_qos();
01370 subscriptionSample.subscriber_qos = *s->get_subscriber_qos();
01371 subscriptionSample.transport_info = s->get_transportLocatorSeq();
01372 subscriptionSample.filter_expression = s->get_filter_expression().c_str();
01373 subscriptionSample.expression_params = s->get_expr_params();
01374
01375 peer->initializeSubscription(subscriptionSample);
01376 }
01377 }
01378 }
01379 }
01380
01381 }
01382 }