FederatorManagerImpl_updates.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "DcpsInfo_pch.h"
00009 #include "FederatorManagerImpl.h"
00010 #include "DCPSInfo_i.h"
00011 #include "DCPS_IR_Domain.h"
00012 #include "DCPS_IR_Participant.h"
00013 
00014 #include "dds/DCPS/RepoIdConverter.h"
00015 
00016 namespace OpenDDS {
00017 namespace Federator {
00018 
00019 void
00020 ManagerImpl::unregisterCallback()
00021 {
00022   /* This method intentionally left unimplemented. */
00023 }
00024 
00025 void
00026 ManagerImpl::requestImage()
00027 {
00028   /* This method intentionally left unimplemented. */
00029 }
00030 
00031 ////////////////////////////////////////////////////////////////////////
00032 //
00033 // The following methods publish updates to the remainder of the
00034 // federation.
00035 //
00036 
00037 void
00038 ManagerImpl::create(const Update::UTopic& topic)
00039 {
00040   if (CORBA::is_nil(this->topicWriter_.in())) {
00041     // Decline to publish data until we can.
00042     return;
00043   }
00044 
00045   TopicUpdate sample;
00046   sample.sender      = this->id().id();
00047   sample.action      = CreateEntity;
00048 
00049   sample.id          = topic.topicId;
00050   sample.domain      = topic.domainId;
00051   sample.participant = topic.participantId;
00052   sample.topic       = topic.name.c_str();
00053   sample.datatype    = topic.dataType.c_str();
00054   sample.qos         = topic.topicQos;
00055 
00056   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00057     OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant);
00058     OpenDDS::DCPS::RepoIdConverter topic_converter(sample.id);
00059     ACE_DEBUG((LM_DEBUG,
00060                ACE_TEXT("(%P|%t) Federator::ManagerImpl::create( TopicUpdate): ")
00061                ACE_TEXT("repo %d - [ domain %d/ participant %C/ topic %C ]\n"),
00062                this->id().id(),
00063                sample.domain,
00064                std::string(part_converter).c_str(),
00065                std::string(topic_converter).c_str()));
00066   }
00067 
00068   this->topicWriter_->write(sample, DDS::HANDLE_NIL);
00069 }
00070 
00071 void
00072 ManagerImpl::create(const Update::UParticipant& participant)
00073 {
00074   if (CORBA::is_nil(this->participantWriter_.in())) {
00075     // Decline to publish data until we can.
00076     return;
00077   }
00078 
00079   ParticipantUpdate sample;
00080   sample.sender = this->id().id();
00081   sample.action = CreateEntity;
00082 
00083   sample.owner  = participant.owner;
00084   sample.domain = participant.domainId;
00085   sample.id     = participant.participantId;
00086   sample.qos    = participant.participantQos;
00087 
00088   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00089     OpenDDS::DCPS::RepoIdConverter converter(sample.id);
00090     ACE_DEBUG((LM_DEBUG,
00091                ACE_TEXT("(%P|%t) Federator::ManagerImpl::create( ParticipantUpdate): ")
00092                ACE_TEXT("repo %d - [ domain %d/ participant %C ]\n"),
00093                this->id().id(),
00094                sample.domain,
00095                std::string(converter).c_str()));
00096   }
00097 
00098   this->participantWriter_->write(sample, DDS::HANDLE_NIL);
00099 }
00100 
00101 void
00102 ManagerImpl::create(const Update::URActor& reader)
00103 {
00104   if (CORBA::is_nil(this->subscriptionWriter_.in())) {
00105     // Decline to publish data until we can.
00106     return;
00107   }
00108 
00109   SubscriptionUpdate sample;
00110   sample.sender         = this->id().id();
00111   sample.action         = CreateEntity;
00112 
00113   sample.domain         = reader.domainId;
00114   sample.participant    = reader.participantId;
00115   sample.topic          = reader.topicId;
00116   sample.id             = reader.actorId;
00117   sample.callback       = reader.callback.c_str();
00118   sample.datareader_qos = reader.drdwQos;
00119   sample.subscriber_qos = reader.pubsubQos;
00120   sample.transport_info = reader.transportInterfaceInfo;
00121   sample.filter_class_name = reader.contentSubscriptionProfile.filterClassName;
00122   sample.filter_expression = reader.contentSubscriptionProfile.filterExpr;
00123   sample.expression_params = reader.contentSubscriptionProfile.exprParams;
00124 
00125   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00126     OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant);
00127     OpenDDS::DCPS::RepoIdConverter sub_converter(sample.id);
00128     ACE_DEBUG((LM_DEBUG,
00129                ACE_TEXT("(%P|%t) Federator::ManagerImpl::create( SubscriptionUpdate): ")
00130                ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"),
00131                this->id().id(),
00132                sample.domain,
00133                std::string(part_converter).c_str(),
00134                std::string(sub_converter).c_str()));
00135   }
00136 
00137   this->subscriptionWriter_->write(sample, DDS::HANDLE_NIL);
00138 }
00139 
00140 void
00141 ManagerImpl::create(const Update::UWActor& writer)
00142 {
00143   if (CORBA::is_nil(this->publicationWriter_.in())) {
00144     // Decline to publish data until we can.
00145     return;
00146   }
00147 
00148   PublicationUpdate sample;
00149   sample.sender         = this->id().id();
00150   sample.action         = CreateEntity;
00151 
00152   sample.domain         = writer.domainId;
00153   sample.participant    = writer.participantId;
00154   sample.topic          = writer.topicId;
00155   sample.id             = writer.actorId;
00156   sample.callback       = writer.callback.c_str();
00157   sample.datawriter_qos = writer.drdwQos;
00158   sample.publisher_qos  = writer.pubsubQos;
00159   sample.transport_info = writer.transportInterfaceInfo;
00160 
00161   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00162     OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant);
00163     OpenDDS::DCPS::RepoIdConverter pub_converter(sample.id);
00164     ACE_DEBUG((LM_DEBUG,
00165                ACE_TEXT("(%P|%t) Federator::ManagerImpl::create( PublicationUpdate): ")
00166                ACE_TEXT("repo %d - [ domain %d/ participant %C/ publication %C ]\n"),
00167                this->id().id(),
00168                sample.domain,
00169                std::string(part_converter).c_str(),
00170                std::string(pub_converter).c_str()));
00171   }
00172 
00173   this->publicationWriter_->write(sample, DDS::HANDLE_NIL);
00174 }
00175 
00176 void
00177 ManagerImpl::create(const Update::OwnershipData& data)
00178 {
00179   if (CORBA::is_nil(this->ownerWriter_.in())) {
00180     // Decline to publish data until we can.
00181     return;
00182   }
00183 
00184   OwnerUpdate sample;
00185   sample.sender      = this->id().id();
00186   sample.action      = CreateEntity;
00187 
00188   sample.domain      = data.domain;
00189   sample.participant = data.participant;
00190   sample.owner       = data.owner;
00191 
00192   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00193     OpenDDS::DCPS::RepoIdConverter converter(sample.participant);
00194     ACE_DEBUG((LM_DEBUG,
00195                ACE_TEXT("(%P|%t) Federator::ManagerImpl::create( OwnerUpdate): ")
00196                ACE_TEXT("repo %d - [ domain %d/ participant %C/ sender %d/ owner %d ]\n"),
00197                this->id().id(),
00198                sample.domain,
00199                std::string(converter).c_str(),
00200                sample.sender,
00201                sample.owner));
00202   }
00203 
00204   this->ownerWriter_->write(sample, DDS::HANDLE_NIL);
00205 }
00206 
00207 void
00208 ManagerImpl::destroy(
00209   const Update::IdPath& id,
00210   Update::ItemType      type,
00211   Update::ActorType     actor)
00212 {
00213   //
00214   // Do not propagate any destroy() messages within the FederationDomain.
00215   // This domain will be managed separately.
00216   //
00217   if (id.domain == this->config_.federationDomain()) {
00218     return;
00219   }
00220 
00221   switch (type) {
00222   case Update::Topic: {
00223     if (CORBA::is_nil(this->topicWriter_.in())) {
00224       // Decline to publish data until we can.
00225       return;
00226     }
00227 
00228     TopicUpdate sample;
00229     sample.sender      = this->id().id();
00230     sample.action      = DestroyEntity;
00231 
00232     sample.id          = id.id;
00233     sample.domain      = id.domain;
00234     sample.participant = id.participant;
00235 
00236     if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00237       OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant);
00238       OpenDDS::DCPS::RepoIdConverter topic_converter(sample.id);
00239       ACE_DEBUG((LM_DEBUG,
00240                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::destroy( TopicUpdate): ")
00241                  ACE_TEXT("repo %d - [ domain %d/ participant %C/ topic %C ]\n"),
00242                  this->id().id(),
00243                  sample.domain,
00244                  std::string(part_converter).c_str(),
00245                  std::string(topic_converter).c_str()));
00246     }
00247 
00248     this->topicWriter_->write(sample, DDS::HANDLE_NIL);
00249   }
00250   break;
00251 
00252   case Update::Participant: {
00253     if (CORBA::is_nil(this->participantWriter_.in())) {
00254       // Decline to publish data until we can.
00255       return;
00256     }
00257 
00258     ParticipantUpdate sample;
00259     sample.sender = this->id().id();
00260     sample.action = DestroyEntity;
00261 
00262     sample.domain = id.domain;
00263     sample.id     = id.id;
00264 
00265     if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00266       OpenDDS::DCPS::RepoIdConverter converter(sample.id);
00267       ACE_DEBUG((LM_DEBUG,
00268                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::destroy( ParticipantUpdate): ")
00269                  ACE_TEXT("repo %d - [ domain %d/ participant %C ]\n"),
00270                  this->id().id(),
00271                  sample.domain,
00272                  std::string(converter).c_str()));
00273     }
00274 
00275     this->participantWriter_->write(sample, DDS::HANDLE_NIL);
00276   }
00277   break;
00278 
00279   case Update::Actor:
00280 
00281     // This is VERY annoying.
00282     switch (actor) {
00283     case Update::DataWriter: {
00284       if (CORBA::is_nil(this->publicationWriter_.in())) {
00285         // Decline to publish data until we can.
00286         return;
00287       }
00288 
00289       PublicationUpdate sample;
00290       sample.sender         = this->id().id();
00291       sample.action         = DestroyEntity;
00292 
00293       sample.domain         = id.domain;
00294       sample.participant    = id.participant;
00295       sample.id             = id.id;
00296 
00297       if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00298         OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant);
00299         OpenDDS::DCPS::RepoIdConverter pub_converter(sample.id);
00300         ACE_DEBUG((LM_DEBUG,
00301                    ACE_TEXT("(%P|%t) Federator::ManagerImpl::destroy( PublicationUpdate): ")
00302                    ACE_TEXT("repo %d - [ domain %d/ participant %C/ publication %C ]\n"),
00303                    this->id().id(),
00304                    sample.domain,
00305                    std::string(part_converter).c_str(),
00306                    std::string(pub_converter).c_str()));
00307       }
00308 
00309       this->publicationWriter_->write(sample, DDS::HANDLE_NIL);
00310     }
00311     break;
00312 
00313     case Update::DataReader: {
00314       if (CORBA::is_nil(this->subscriptionWriter_.in())) {
00315         // Decline to publish data until we can.
00316         return;
00317       }
00318 
00319       SubscriptionUpdate sample;
00320       sample.sender         = this->id().id();
00321       sample.action         = DestroyEntity;
00322 
00323       sample.domain         = id.domain;
00324       sample.participant    = id.participant;
00325       sample.id             = id.id;
00326 
00327       if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00328         OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant);
00329         OpenDDS::DCPS::RepoIdConverter sub_converter(sample.id);
00330         ACE_DEBUG((LM_DEBUG,
00331                    ACE_TEXT("(%P|%t) Federator::ManagerImpl::destroy( SubscriptionUpdate): ")
00332                    ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"),
00333                    this->id().id(),
00334                    sample.domain,
00335                    std::string(part_converter).c_str(),
00336                    std::string(sub_converter).c_str()));
00337       }
00338 
00339       this->subscriptionWriter_->write(sample, DDS::HANDLE_NIL);
00340     }
00341     break;
00342     }
00343 
00344     break;
00345   }
00346 }
00347 
00348 void
00349 ManagerImpl::update(const Update::IdPath& id, const DDS::DomainParticipantQos& qos)
00350 {
00351   if (CORBA::is_nil(this->participantWriter_.in())) {
00352     // Decline to publish data until we can.
00353     return;
00354   }
00355 
00356   ParticipantUpdate sample;
00357   sample.sender = this->id().id();
00358   sample.action = UpdateQosValue1;
00359 
00360   sample.domain = id.domain;
00361   sample.id     = id.id;
00362   sample.qos    = qos;
00363 
00364   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00365     OpenDDS::DCPS::RepoIdConverter converter(sample.id);
00366     ACE_DEBUG((LM_DEBUG,
00367                ACE_TEXT("(%P|%t) Federator::ManagerImpl::update( ParticipantUpdate): ")
00368                ACE_TEXT("repo %d - [ domain %d/ participant %C ]\n"),
00369                this->id().id(),
00370                sample.domain,
00371                std::string(converter).c_str()));
00372   }
00373 
00374   this->participantWriter_->write(sample, DDS::HANDLE_NIL);
00375 }
00376 
00377 void
00378 ManagerImpl::update(const Update::IdPath& id, const DDS::TopicQos& qos)
00379 {
00380   if (CORBA::is_nil(this->topicWriter_.in())) {
00381     // Decline to publish data until we can.
00382     return;
00383   }
00384 
00385   TopicUpdate sample;
00386   sample.sender      = this->id().id();
00387   sample.action      = UpdateQosValue1;
00388 
00389   sample.id          = id.id;
00390   sample.domain      = id.domain;
00391   sample.participant = id.participant;
00392   sample.qos         = qos;
00393 
00394   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00395     OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant);
00396     OpenDDS::DCPS::RepoIdConverter topic_converter(sample.id);
00397     ACE_DEBUG((LM_DEBUG,
00398                ACE_TEXT("(%P|%t) Federator::ManagerImpl::update( TopicUpdate): ")
00399                ACE_TEXT("repo %d - [ domain %d/ participant %C/ topic %C ]\n"),
00400                this->id().id(),
00401                sample.domain,
00402                std::string(part_converter).c_str(),
00403                std::string(topic_converter).c_str()));
00404   }
00405 
00406   this->topicWriter_->write(sample, DDS::HANDLE_NIL);
00407 }
00408 
00409 void
00410 ManagerImpl::update(const Update::IdPath& id, const DDS::DataWriterQos& qos)
00411 {
00412   if (CORBA::is_nil(this->publicationWriter_.in())) {
00413     // Decline to publish data until we can.
00414     return;
00415   }
00416 
00417   PublicationUpdate sample;
00418   sample.sender         = this->id().id();
00419   sample.action         = UpdateQosValue1;
00420 
00421   sample.domain         = id.domain;
00422   sample.participant    = id.participant;
00423   sample.id             = id.id;
00424   sample.datawriter_qos = qos;
00425 
00426   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00427     OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant);
00428     OpenDDS::DCPS::RepoIdConverter pub_converter(sample.id);
00429     ACE_DEBUG((LM_DEBUG,
00430                ACE_TEXT("(%P|%t) Federator::ManagerImpl::update( WriterUpdate): ")
00431                ACE_TEXT("repo %d - [ domain %d/ participant %C/ publication %C ]\n"),
00432                this->id().id(),
00433                sample.domain,
00434                std::string(part_converter).c_str(),
00435                std::string(pub_converter).c_str()));
00436   }
00437 
00438   this->publicationWriter_->write(sample, DDS::HANDLE_NIL);
00439 }
00440 
00441 void
00442 ManagerImpl::update(const Update::IdPath& id, const DDS::PublisherQos& qos)
00443 {
00444   if (CORBA::is_nil(this->publicationWriter_.in())) {
00445     // Decline to publish data until we can.
00446     return;
00447   }
00448 
00449   PublicationUpdate sample;
00450   sample.sender         = this->id().id();
00451   sample.action         = UpdateQosValue2;
00452 
00453   sample.domain         = id.domain;
00454   sample.participant    = id.participant;
00455   sample.id             = id.id;
00456   sample.publisher_qos  = qos;
00457 
00458   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00459     OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant);
00460     OpenDDS::DCPS::RepoIdConverter pub_converter(sample.id);
00461     ACE_DEBUG((LM_DEBUG,
00462                ACE_TEXT("(%P|%t) Federator::ManagerImpl::update( PublisherUpdate): ")
00463                ACE_TEXT("repo %d - [ domain %d/ participant %C/ publication %C ]\n"),
00464                this->id().id(),
00465                sample.domain,
00466                std::string(part_converter).c_str(),
00467                std::string(pub_converter).c_str()));
00468   }
00469 
00470   this->publicationWriter_->write(sample, DDS::HANDLE_NIL);
00471 }
00472 
00473 void
00474 ManagerImpl::update(const Update::IdPath& id, const DDS::DataReaderQos& qos)
00475 {
00476   if (CORBA::is_nil(this->subscriptionWriter_.in())) {
00477     // Decline to publish data until we can.
00478     return;
00479   }
00480 
00481   SubscriptionUpdate sample;
00482   sample.sender         = this->id().id();
00483   sample.action         = UpdateQosValue1;
00484 
00485   sample.domain         = id.domain;
00486   sample.participant    = id.participant;
00487   sample.id             = id.id;
00488   sample.datareader_qos = qos;
00489 
00490   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00491     OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant);
00492     OpenDDS::DCPS::RepoIdConverter sub_converter(sample.id);
00493     ACE_DEBUG((LM_DEBUG,
00494                ACE_TEXT("(%P|%t) Federator::ManagerImpl::update( ReaderUpdate): ")
00495                ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"),
00496                this->id().id(),
00497                sample.domain,
00498                std::string(part_converter).c_str(),
00499                std::string(sub_converter).c_str()));
00500   }
00501 
00502   this->subscriptionWriter_->write(sample, DDS::HANDLE_NIL);
00503 }
00504 
00505 void
00506 ManagerImpl::update(const Update::IdPath& id, const DDS::StringSeq& params)
00507 {
00508   if (CORBA::is_nil(this->subscriptionWriter_.in())) {
00509     // Decline to publish data until we can.
00510     return;
00511   }
00512 
00513   SubscriptionUpdate sample;
00514   sample.sender            = this->id().id();
00515   sample.action            = UpdateFilterExpressionParams;
00516   sample.domain            = id.domain;
00517   sample.participant       = id.participant;
00518   sample.id                = id.id;
00519   sample.expression_params = params;
00520 
00521   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00522     OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant);
00523     OpenDDS::DCPS::RepoIdConverter sub_converter(sample.id);
00524     ACE_DEBUG((LM_DEBUG,
00525                ACE_TEXT("(%P|%t) Federator::ManagerImpl::update(FilterParams): ")
00526                ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"),
00527                this->id().id(),
00528                sample.domain,
00529                std::string(part_converter).c_str(),
00530                std::string(sub_converter).c_str()));
00531   }
00532 
00533   this->subscriptionWriter_->write(sample, DDS::HANDLE_NIL);
00534 }
00535 
00536 void
00537 ManagerImpl::update(const Update::IdPath& id, const DDS::SubscriberQos& qos)
00538 {
00539   if (CORBA::is_nil(this->subscriptionWriter_.in())) {
00540     // Decline to publish data until we can.
00541     return;
00542   }
00543 
00544   SubscriptionUpdate sample;
00545   sample.sender         = this->id().id();
00546   sample.action         = UpdateQosValue2;
00547 
00548   sample.domain         = id.domain;
00549   sample.participant    = id.participant;
00550   sample.id             = id.id;
00551   sample.subscriber_qos = qos;
00552 
00553   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00554     OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant);
00555     OpenDDS::DCPS::RepoIdConverter sub_converter(sample.id);
00556     ACE_DEBUG((LM_DEBUG,
00557                ACE_TEXT("(%P|%t) Federator::ManagerImpl::update( SubscriberUpdate): ")
00558                ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"),
00559                this->id().id(),
00560                sample.domain,
00561                std::string(part_converter).c_str(),
00562                std::string(sub_converter).c_str()));
00563   }
00564 
00565   this->subscriptionWriter_->write(sample, DDS::HANDLE_NIL);
00566 }
00567 
00568 ////////////////////////////////////////////////////////////////////////
00569 //
00570 // The following methods process updates received from the remainder
00571 // of the federation.
00572 //
00573 
00574 void
00575 ManagerImpl::processCreate(const OwnerUpdate* sample, const DDS::SampleInfo* /* info */)
00576 {
00577   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00578     OpenDDS::DCPS::RepoIdConverter converter(sample->participant);
00579     ACE_DEBUG((LM_DEBUG,
00580                ACE_TEXT("(%P|%t) Federator::ManagerImpl::processCreate( OwnerUpdate): ")
00581                ACE_TEXT("repo %d - [ domain %d/ participant %C/ sender %d/ owner %d ]\n"),
00582                this->id().id(),
00583                sample->domain,
00584                std::string(converter).c_str(),
00585                sample->sender,
00586                sample->owner));
00587   }
00588 
00589   // We could generate an error message here.  Instead we let action be irrelevant.
00590   if (false == this->info_->changeOwnership(sample->domain,
00591                                             sample->participant,
00592                                             sample->sender,
00593                                             sample->owner)) {
00594     {
00595       ACE_GUARD(ACE_Thread_Mutex,
00596                 guard,
00597                 this->deferred_lock_);
00598       this->deferredOwnerships_.push_back(*sample);
00599     }
00600 
00601     if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00602       ACE_DEBUG((LM_DEBUG,
00603                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processCreate( OwnerUpdate): ")
00604                  ACE_TEXT("deferred update.\n")));
00605     }
00606   }
00607 
00608   this->processDeferred();
00609 }
00610 
00611 void
00612 ManagerImpl::processCreate(const PublicationUpdate* sample, const DDS::SampleInfo* /* info */)
00613 {
00614   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00615     OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
00616     OpenDDS::DCPS::RepoIdConverter pub_converter(sample->id);
00617     ACE_DEBUG((LM_DEBUG,
00618                ACE_TEXT("(%P|%t) Federator::ManagerImpl::processCreate( PublicationUpdate): ")
00619                ACE_TEXT("repo %d - [ domain %d/ participant %C/ publication %C ]\n"),
00620                this->id().id(),
00621                sample->domain,
00622                std::string(part_converter).c_str(),
00623                std::string(pub_converter).c_str()));
00624   }
00625 
00626   if (false == this->info_->add_publication(sample->domain,
00627                                             sample->participant,
00628                                             sample->topic,
00629                                             sample->id,
00630                                             sample->callback,
00631                                             sample->datawriter_qos,
00632                                             sample->transport_info,
00633                                             sample->publisher_qos,
00634                                             true)) {
00635     {
00636       ACE_GUARD(ACE_Thread_Mutex,
00637                 guard,
00638                 this->deferred_lock_);
00639       this->deferredPublications_.push_back(*sample);
00640     }
00641 
00642     if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00643       ACE_DEBUG((LM_DEBUG,
00644                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processCreate( PublicationUpdate): ")
00645                  ACE_TEXT("deferred update.\n")));
00646     }
00647   }
00648 
00649   this->processDeferred();
00650 }
00651 
00652 void
00653 ManagerImpl::processCreate(const SubscriptionUpdate* sample, const DDS::SampleInfo* /* info */)
00654 {
00655   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00656     OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
00657     OpenDDS::DCPS::RepoIdConverter sub_converter(sample->id);
00658     ACE_DEBUG((LM_DEBUG,
00659                ACE_TEXT("(%P|%t) Federator::ManagerImpl::processCreate( SubscriptionUpdate): ")
00660                ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"),
00661                this->id().id(),
00662                sample->domain,
00663                std::string(part_converter).c_str(),
00664                std::string(sub_converter).c_str()));
00665   }
00666 
00667   if (false == this->info_->add_subscription(sample->domain,
00668                                              sample->participant,
00669                                              sample->topic,
00670                                              sample->id,
00671                                              sample->callback,
00672                                              sample->datareader_qos,
00673                                              sample->transport_info,
00674                                              sample->subscriber_qos,
00675                                              sample->filter_class_name,
00676                                              sample->filter_expression,
00677                                              sample->expression_params,
00678                                              true)) {
00679     {
00680       ACE_GUARD(ACE_Thread_Mutex,
00681                 guard,
00682                 this->deferred_lock_);
00683       this->deferredSubscriptions_.push_back(*sample);
00684     }
00685 
00686     if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00687       ACE_DEBUG((LM_DEBUG,
00688                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processCreate( SubscriptionUpdate): ")
00689                  ACE_TEXT("deferred update.\n")));
00690     }
00691   }
00692 
00693   this->processDeferred();
00694 }
00695 
00696 void
00697 ManagerImpl::processCreate(const ParticipantUpdate* sample, const DDS::SampleInfo* /* info */)
00698 {
00699   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00700     OpenDDS::DCPS::RepoIdConverter converter(sample->id);
00701     ACE_DEBUG((LM_DEBUG,
00702                ACE_TEXT("(%P|%t) Federator::ManagerImpl::processCreate( ParticipantUpdate): ")
00703                ACE_TEXT("repo %d - [ domain %d/ participant %C/ owner %d ]\n"),
00704                this->id().id(),
00705                sample->domain,
00706                std::string(converter).c_str(),
00707                sample->owner));
00708   }
00709 
00710   this->info_->add_domain_participant(
00711     sample->domain,
00712     sample->id,
00713     sample->qos);
00714   bool ownershipChanged = this->info_->changeOwnership(
00715     sample->domain,
00716     sample->id,
00717     sample->sender,
00718     sample->owner);
00719   if (!ownershipChanged) {
00720     ACE_ERROR((LM_ERROR,
00721                ACE_TEXT("(%P|%t) ERROR: ")
00722                ACE_TEXT("OpenDDS::Federator::ManagerImpl::processCreate(), ")
00723                ACE_TEXT("Could not change ownership\n")));
00724   }
00725   this->processDeferred();
00726 }
00727 
00728 void
00729 ManagerImpl::processCreate(const TopicUpdate* sample, const DDS::SampleInfo* /* info */)
00730 {
00731   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00732     OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
00733     OpenDDS::DCPS::RepoIdConverter topic_converter(sample->id);
00734     ACE_DEBUG((LM_DEBUG,
00735                ACE_TEXT("(%P|%t) Federator::ManagerImpl::processCreate( TopicUpdate): ")
00736                ACE_TEXT("repo %d - [ domain %d/ participant %C/ topic %C ]\n"),
00737                this->id().id(),
00738                sample->domain,
00739                std::string(part_converter).c_str(),
00740                std::string(topic_converter).c_str()));
00741   }
00742 
00743   if (false == this->info_->add_topic(sample->id,
00744                                       sample->domain,
00745                                       sample->participant,
00746                                       sample->topic,
00747                                       sample->datatype,
00748                                       sample->qos)) {
00749     {
00750       ACE_GUARD(ACE_Thread_Mutex,
00751                 guard,
00752                 this->deferred_lock_);
00753       this->deferredTopics_.push_back(*sample);
00754     }
00755 
00756     if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00757       ACE_DEBUG((LM_DEBUG,
00758                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processCreate( TopicUpdate): ")
00759                  ACE_TEXT("deferred update.\n")));
00760     }
00761   }
00762 
00763   this->processDeferred();
00764 }
00765 
00766 void
00767 ManagerImpl::processDeferred()
00768 {
00769   ACE_GUARD(ACE_Thread_Mutex,
00770             guard,
00771             this->deferred_lock_);
00772 
00773   {
00774     std::list<OwnerUpdate>::iterator current = this->deferredOwnerships_.begin();
00775 
00776     while (current != this->deferredOwnerships_.end()) {
00777       if (this->info_->changeOwnership(current->domain,
00778                                        current->participant,
00779                                        current->sender,
00780                                        current->owner)) {
00781         if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00782           OpenDDS::DCPS::RepoIdConverter converter(current->participant);
00783           ACE_DEBUG((LM_DEBUG,
00784                      ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDeferred( OwnerUpdate): ")
00785                      ACE_TEXT("repo %d - [ domain %d/ participant %C/ sender %d/ owner %d ]\n"),
00786                      this->id().id(),
00787                      current->domain,
00788                      std::string(converter).c_str(),
00789                      current->sender,
00790                      current->owner));
00791         }
00792 
00793         current = this->deferredOwnerships_.erase(current);
00794 
00795       } else {
00796         ++ current;
00797       }
00798     }
00799   }
00800 
00801   {
00802     std::list<TopicUpdate>::iterator current = this->deferredTopics_.begin();
00803 
00804     while (current != this->deferredTopics_.end()) {
00805       if (true == this->info_->add_topic(current->id,
00806                                          current->domain,
00807                                          current->participant,
00808                                          current->topic,
00809                                          current->datatype,
00810                                          current->qos)) {
00811         if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00812           OpenDDS::DCPS::RepoIdConverter part_converter(current->participant);
00813           OpenDDS::DCPS::RepoIdConverter topic_converter(current->id);
00814           ACE_DEBUG((LM_DEBUG,
00815                      ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDeferred( TopicUpdate): ")
00816                      ACE_TEXT("repo %d - [ domain %d/ participant %C/ topic %C ]\n"),
00817                      this->id().id(),
00818                      current->domain,
00819                      std::string(part_converter).c_str(),
00820                      std::string(topic_converter).c_str()));
00821         }
00822 
00823         current = this->deferredTopics_.erase(current);
00824 
00825       } else {
00826         ++ current;
00827       }
00828     }
00829   }
00830 
00831   {
00832     std::list<PublicationUpdate>::iterator current = this->deferredPublications_.begin();
00833 
00834     while (current != this->deferredPublications_.end()) {
00835 
00836       if (true == this->info_->add_publication(current->domain,
00837                                                current->participant,
00838                                                current->topic,
00839                                                current->id,
00840                                                current->callback,
00841                                                current->datawriter_qos,
00842                                                current->transport_info,
00843                                                current->publisher_qos,
00844                                                true)) {
00845         if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00846           OpenDDS::DCPS::RepoIdConverter part_converter(current->participant);
00847           OpenDDS::DCPS::RepoIdConverter pub_converter(current->id);
00848           ACE_DEBUG((LM_DEBUG,
00849                      ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDeferred( PublicationUpdate): ")
00850                      ACE_TEXT("repo %d - [ domain %d/ participant %C/ publication %C ]\n"),
00851                      this->id().id(),
00852                      current->domain,
00853                      std::string(part_converter).c_str(),
00854                      std::string(pub_converter).c_str()));
00855         }
00856 
00857         current = this->deferredPublications_.erase(current);
00858 
00859       } else {
00860         ++ current;
00861       }
00862     }
00863   }
00864 
00865   {
00866     std::list<SubscriptionUpdate>::iterator current = this->deferredSubscriptions_.begin();
00867 
00868     while (current != this->deferredSubscriptions_.end()) {
00869 
00870       if (true == this->info_->add_subscription(current->domain,
00871                                                 current->participant,
00872                                                 current->topic,
00873                                                 current->id,
00874                                                 current->callback,
00875                                                 current->datareader_qos,
00876                                                 current->transport_info,
00877                                                 current->subscriber_qos,
00878                                                 current->filter_class_name,
00879                                                 current->filter_expression,
00880                                                 current->expression_params,
00881                                                 true)) {
00882         if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00883           OpenDDS::DCPS::RepoIdConverter part_converter(current->participant);
00884           OpenDDS::DCPS::RepoIdConverter sub_converter(current->id);
00885           ACE_DEBUG((LM_DEBUG,
00886                      ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDeferred( SubscriptionUpdate): ")
00887                      ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"),
00888                      this->id().id(),
00889                      current->domain,
00890                      std::string(part_converter).c_str(),
00891                      std::string(sub_converter).c_str()));
00892         }
00893 
00894         current = this->deferredSubscriptions_.erase(current);
00895 
00896       } else {
00897         ++ current;
00898       }
00899     }
00900   }
00901 
00902 }
00903 
00904 void
00905 ManagerImpl::processUpdateQos1(const OwnerUpdate* sample, const DDS::SampleInfo* /* info */)
00906 {
00907   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00908     OpenDDS::DCPS::RepoIdConverter converter(sample->participant);
00909     ACE_DEBUG((LM_DEBUG,
00910                ACE_TEXT("(%P|%t) Federator::ManagerImpl::processUpdateQos1( OwnerUpdate): ")
00911                ACE_TEXT("repo %d - [ domain %d/ participant %C/ sender %d/ owner %d ]\n"),
00912                this->id().id(),
00913                sample->domain,
00914                std::string(converter).c_str(),
00915                sample->sender,
00916                sample->owner));
00917   }
00918 
00919   if (false == this->info_->changeOwnership(sample->domain,
00920                                             sample->participant,
00921                                             sample->sender,
00922                                             sample->owner)) {
00923     {
00924       ACE_GUARD(ACE_Thread_Mutex,
00925                 guard,
00926                 this->deferred_lock_);
00927 
00928       this->deferredOwnerships_.push_back(*sample);
00929     }
00930     ACE_DEBUG((LM_DEBUG,
00931                ACE_TEXT("(%P|%t) Federator::ManagerImpl::processUpdateQos1( OwnerUpdate): ")
00932                ACE_TEXT("deferred update.\n")));
00933   }
00934 }
00935 
00936 void
00937 ManagerImpl::processUpdateQos1(const PublicationUpdate* sample, const DDS::SampleInfo* /* info */)
00938 {
00939   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00940     OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
00941     OpenDDS::DCPS::RepoIdConverter pub_converter(sample->id);
00942     ACE_DEBUG((LM_DEBUG,
00943                ACE_TEXT("(%P|%t) Federator::ManagerImpl::processUpdateQos1( PublicationUpdate): ")
00944                ACE_TEXT("repo %d - [ domain %d/ participant %C/ publication %C ]\n"),
00945                this->id().id(),
00946                sample->domain,
00947                std::string(part_converter).c_str(),
00948                std::string(pub_converter).c_str()));
00949   }
00950 
00951   this->info_->update_publication_qos(
00952     sample->domain,
00953     sample->participant,
00954     sample->id,
00955     sample->datawriter_qos);
00956 }
00957 
00958 void
00959 ManagerImpl::processUpdateQos2(const PublicationUpdate* sample, const DDS::SampleInfo* /* info */)
00960 {
00961   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00962     OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
00963     OpenDDS::DCPS::RepoIdConverter pub_converter(sample->id);
00964     ACE_DEBUG((LM_DEBUG,
00965                ACE_TEXT("(%P|%t) Federator::ManagerImpl::processUpdateQos2( PublicationUpdate): ")
00966                ACE_TEXT("repo %d - [ domain %d/ participant %C/ publication %C ]\n"),
00967                this->id().id(),
00968                sample->domain,
00969                std::string(part_converter).c_str(),
00970                std::string(pub_converter).c_str()));
00971   }
00972 
00973   this->info_->update_publication_qos(
00974     sample->domain,
00975     sample->participant,
00976     sample->id,
00977     sample->publisher_qos);
00978 }
00979 
00980 void
00981 ManagerImpl::processUpdateQos1(const SubscriptionUpdate* sample, const DDS::SampleInfo* /* info */)
00982 {
00983   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00984     OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
00985     OpenDDS::DCPS::RepoIdConverter sub_converter(sample->id);
00986     ACE_DEBUG((LM_DEBUG,
00987                ACE_TEXT("(%P|%t) Federator::ManagerImpl::processUpdateQos1( SubscriptionUpdate): ")
00988                ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"),
00989                this->id().id(),
00990                sample->domain,
00991                std::string(part_converter).c_str(),
00992                std::string(sub_converter).c_str()));
00993   }
00994 
00995   this->info_->update_subscription_qos(
00996     sample->domain,
00997     sample->participant,
00998     sample->id,
00999     sample->datareader_qos);
01000 }
01001 
01002 void
01003 ManagerImpl::processUpdateQos2(const SubscriptionUpdate* sample, const DDS::SampleInfo* /* info */)
01004 {
01005   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
01006     OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
01007     OpenDDS::DCPS::RepoIdConverter sub_converter(sample->id);
01008     ACE_DEBUG((LM_DEBUG,
01009                ACE_TEXT("(%P|%t) Federator::ManagerImpl::processUpdateQos2( SubscriptionUpdate): ")
01010                ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"),
01011                this->id().id(),
01012                sample->domain,
01013                std::string(part_converter).c_str(),
01014                std::string(sub_converter).c_str()));
01015   }
01016 
01017   this->info_->update_subscription_qos(
01018     sample->domain,
01019     sample->participant,
01020     sample->id,
01021     sample->subscriber_qos);
01022 }
01023 
01024 void
01025 ManagerImpl::processUpdateFilterExpressionParams(
01026   const SubscriptionUpdate* sample, const DDS::SampleInfo* /* info */)
01027 {
01028   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
01029     OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
01030     OpenDDS::DCPS::RepoIdConverter sub_converter(sample->id);
01031     ACE_DEBUG((LM_DEBUG,
01032                ACE_TEXT("(%P|%t) Federator::ManagerImpl::processUpdateFilterExpressionParams(SubscriptionUpdate): ")
01033                ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"),
01034                this->id().id(),
01035                sample->domain,
01036                std::string(part_converter).c_str(),
01037                std::string(sub_converter).c_str()));
01038   }
01039 
01040   this->info_->update_subscription_params(
01041     sample->domain,
01042     sample->participant,
01043     sample->id,
01044     sample->expression_params);
01045 }
01046 
01047 void
01048 ManagerImpl::processUpdateQos1(const ParticipantUpdate* sample, const DDS::SampleInfo* /* info */)
01049 {
01050   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
01051     OpenDDS::DCPS::RepoIdConverter converter(sample->id);
01052     ACE_DEBUG((LM_DEBUG,
01053                ACE_TEXT("(%P|%t) Federator::ManagerImpl::processUpdateQos1( ParticipantUpdate): ")
01054                ACE_TEXT("repo %d - [ domain %d/ participant %C ]\n"),
01055                this->id().id(),
01056                sample->domain,
01057                std::string(converter).c_str()));
01058   }
01059 
01060   this->info_->update_domain_participant_qos(
01061     sample->domain,
01062     sample->id,
01063     sample->qos);
01064 }
01065 
01066 void
01067 ManagerImpl::processUpdateQos1(const TopicUpdate* sample, const DDS::SampleInfo* /* info */)
01068 {
01069   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
01070     OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
01071     OpenDDS::DCPS::RepoIdConverter topic_converter(sample->id);
01072     ACE_DEBUG((LM_DEBUG,
01073                ACE_TEXT("(%P|%t) Federator::ManagerImpl::processUpdateQos1( TopicUpdate): ")
01074                ACE_TEXT("repo %d - [ domain %d/ participant %C/ topic %C ]\n"),
01075                this->id().id(),
01076                sample->domain,
01077                std::string(part_converter).c_str(),
01078                std::string(topic_converter).c_str()));
01079   }
01080 
01081   this->info_->update_topic_qos(
01082     sample->id,
01083     sample->domain,
01084     sample->participant,
01085     sample->qos);
01086 }
01087 
01088 void
01089 ManagerImpl::processDelete(const OwnerUpdate* sample, const DDS::SampleInfo* /* info */)
01090 {
01091   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
01092     OpenDDS::DCPS::RepoIdConverter converter(sample->participant);
01093     ACE_DEBUG((LM_DEBUG,
01094                ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( OwnerUpdate): ")
01095                ACE_TEXT("repo %d - [ domain %d/ participant %C/ sender %d/ owner %d ]\n"),
01096                this->id().id(),
01097                sample->domain,
01098                std::string(converter).c_str(),
01099                sample->sender,
01100                sample->owner));
01101   }
01102 
01103   // We could generate an error message here.  Instead we let action be irrelevant.
01104   if (false == this->info_->changeOwnership(sample->domain,
01105                                             sample->participant,
01106                                             sample->sender,
01107                                             sample->owner)) {
01108     {
01109       ACE_GUARD(ACE_Thread_Mutex,
01110                 guard,
01111                 this->deferred_lock_);
01112       this->deferredOwnerships_.push_back(*sample);
01113     }
01114     ACE_DEBUG((LM_DEBUG,
01115                ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( OwnerUpdate): ")
01116                ACE_TEXT("deferred update.\n")));
01117   }
01118 }
01119 
01120 void
01121 ManagerImpl::processDelete(const PublicationUpdate* sample, const DDS::SampleInfo* /* info */)
01122 {
01123   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
01124     OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
01125     OpenDDS::DCPS::RepoIdConverter pub_converter(sample->id);
01126     ACE_DEBUG((LM_DEBUG,
01127                ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( PublicationUpdate): ")
01128                ACE_TEXT("repo %d - [ domain %d/ participant %C/ publication %C ]\n"),
01129                this->id().id(),
01130                sample->domain,
01131                std::string(part_converter).c_str(),
01132                std::string(pub_converter).c_str()));
01133   }
01134 
01135   try {
01136     this->info_->remove_publication(
01137       sample->domain,
01138       sample->participant,
01139       sample->id);
01140 
01141   } catch (OpenDDS::DCPS::Invalid_Participant&) {
01142     if (OpenDDS::DCPS::DCPS_debug_level > 9) {
01143       ACE_DEBUG((LM_DEBUG,
01144                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( PublicationUpdate): ")
01145                  ACE_TEXT("the participant was already removed.\n")));
01146     }
01147   }
01148 }
01149 
01150 void
01151 ManagerImpl::processDelete(const SubscriptionUpdate* sample, const DDS::SampleInfo* /* info */)
01152 {
01153   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
01154     OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
01155     OpenDDS::DCPS::RepoIdConverter sub_converter(sample->id);
01156     ACE_DEBUG((LM_DEBUG,
01157                ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( SubscriptionUpdate): ")
01158                ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"),
01159                this->id().id(),
01160                sample->domain,
01161                std::string(part_converter).c_str(),
01162                std::string(sub_converter).c_str()));
01163   }
01164 
01165   try {
01166     this->info_->remove_subscription(
01167       sample->domain,
01168       sample->participant,
01169       sample->id);
01170 
01171   } catch (OpenDDS::DCPS::Invalid_Participant&) {
01172     if (OpenDDS::DCPS::DCPS_debug_level > 9) {
01173       ACE_DEBUG((LM_DEBUG,
01174                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( SubscriptionUpdate): ")
01175                  ACE_TEXT("the participant was already removed.\n")));
01176     }
01177   }
01178 }
01179 
01180 void
01181 ManagerImpl::processDelete(const ParticipantUpdate* sample, const DDS::SampleInfo* /* info */)
01182 {
01183   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
01184     OpenDDS::DCPS::RepoIdConverter converter(sample->id);
01185     ACE_DEBUG((LM_DEBUG,
01186                ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( ParticipantUpdate): ")
01187                ACE_TEXT("repo %d - [ domain %d/ participant %C ]\n"),
01188                this->id().id(),
01189                sample->domain,
01190                std::string(converter).c_str()));
01191   }
01192   try {
01193     this->info_->remove_domain_participant(
01194       sample->domain,
01195       sample->id);
01196   } catch (OpenDDS::DCPS::Invalid_Participant&) {
01197     if (OpenDDS::DCPS::DCPS_debug_level > 9) {
01198       ACE_DEBUG((LM_DEBUG,
01199                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( ParticipantUpdate): ")
01200                  ACE_TEXT("the participant was already removed.\n")));
01201     }
01202   }
01203 }
01204 
01205 void
01206 ManagerImpl::processDelete(const TopicUpdate* sample, const DDS::SampleInfo* /* info */)
01207 {
01208   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
01209     OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
01210     OpenDDS::DCPS::RepoIdConverter topic_converter(sample->id);
01211     ACE_DEBUG((LM_DEBUG,
01212                ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( TopicUpdate): ")
01213                ACE_TEXT("repo %d - [ domain %d/ participant %C/ topic %C ]\n"),
01214                this->id().id(),
01215                sample->domain,
01216                std::string(part_converter).c_str(),
01217                std::string(topic_converter).c_str()));
01218   }
01219 
01220   try {
01221     this->info_->remove_topic(
01222       sample->domain,
01223       sample->participant,
01224       sample->id);
01225 
01226   } catch (OpenDDS::DCPS::Invalid_Participant&) {
01227     if (OpenDDS::DCPS::DCPS_debug_level > 9) {
01228       ACE_DEBUG((LM_DEBUG,
01229                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( TopicUpdate): ")
01230                  ACE_TEXT("the participant was already removed.\n")));
01231     }
01232   }
01233 }
01234 
01235 void
01236 ManagerImpl::pushState(Manager_ptr peer)
01237 {
01238   // foreach DCPS_IR_Domain
01239   //   foreach DCPS_IR_Participant
01240   //     peer->initializeParticipant(...)
01241   //     peer->initializeOwner(...)
01242   //   foreach DCPS_IR_Participant
01243   //     foreach DCPS_IR_Topic
01244   //       peer->initializeTopic(...)
01245   //     foreach DCPS_IR_Publication
01246   //       peer->initializePublication(...)
01247   //     foreach DCPS_IR_Subscription
01248   //       peer->initializeSubscription(...)
01249 
01250   // Process each domain within the repository.
01251   for (DCPS_IR_Domain_Map::const_iterator currentDomain
01252        = this->info_->domains().begin();
01253        currentDomain != this->info_->domains().end();
01254        ++currentDomain) {
01255 
01256     if (currentDomain->second->get_id() == this->config_.federationDomain()) {
01257       // Do not push the Federation domain publications.
01258       //continue;
01259     }
01260 
01261     // Process each participant within the current domain.
01262     for (DCPS_IR_Participant_Map::const_iterator currentParticipant
01263          = currentDomain->second->participants().begin();
01264          currentParticipant != currentDomain->second->participants().end();
01265          ++currentParticipant) {
01266 
01267       if (currentParticipant->second->isBitPublisher() == true) {
01268         // Do not push the built-in topic publications.
01269         continue;
01270       }
01271 
01272       // Initialize the participant on the peer.
01273       ParticipantUpdate participantSample;
01274       participantSample.sender = this->id().id();
01275       participantSample.action = CreateEntity;
01276 
01277       participantSample.owner  =  currentParticipant->second->owner();
01278       participantSample.domain =  currentDomain->second->get_id();
01279       participantSample.id     =  currentParticipant->second->get_id();
01280       participantSample.qos    = *currentParticipant->second->get_qos();
01281 
01282       peer->initializeParticipant(participantSample);
01283 
01284       // Initialize the ownership of the participant on the peer.
01285       OwnerUpdate ownerSample;
01286       ownerSample.sender      = this->id().id();
01287       ownerSample.action      = CreateEntity;
01288 
01289       ownerSample.domain      = currentDomain->second->get_id();
01290       ownerSample.participant = currentParticipant->second->get_id();
01291       ownerSample.owner       = currentParticipant->second->owner();
01292 
01293       peer->initializeOwner(ownerSample);
01294     }
01295 
01296     // Process each participant within the current domain.
01297     for (DCPS_IR_Participant_Map::const_iterator currentParticipant
01298          = currentDomain->second->participants().begin();
01299          currentParticipant != currentDomain->second->participants().end();
01300          ++currentParticipant) {
01301 
01302       if (currentParticipant->second->isBitPublisher() == true) {
01303         // Do not push the built-in topic publications.
01304         continue;
01305       }
01306 
01307       // Process each topic within the current particpant.
01308       for (DCPS_IR_Topic_Map::const_iterator currentTopic
01309            = currentParticipant->second->topics().begin();
01310            currentTopic != currentParticipant->second->topics().end();
01311            ++currentTopic) {
01312         TopicUpdate topicSample;
01313         topicSample.sender      = this->id().id();
01314         topicSample.action      = CreateEntity;
01315 
01316         topicSample.id          = currentTopic->second->get_id();
01317         topicSample.domain      = currentDomain->second->get_id();
01318         topicSample.participant = currentTopic->second->get_participant_id();
01319         topicSample.topic       = currentTopic->second->get_topic_description()->get_name();
01320         topicSample.datatype    = currentTopic->second->get_topic_description()->get_dataTypeName();
01321         topicSample.qos         = *currentTopic->second->get_topic_qos();
01322 
01323         peer->initializeTopic(topicSample);
01324       }
01325 
01326       // Process each publication within the current particpant.
01327       for (DCPS_IR_Publication_Map::const_iterator currentPublication
01328            = currentParticipant->second->publications().begin();
01329            currentPublication != currentParticipant->second->publications().end();
01330            ++currentPublication) {
01331         PublicationUpdate publicationSample;
01332         publicationSample.sender         = this->id().id();
01333         publicationSample.action         = CreateEntity;
01334 
01335         DCPS_IR_Publication* p = currentPublication->second;
01336         CORBA::ORB_var orb = this->info_->orb();
01337         CORBA::String_var callback = orb->object_to_string(p->writer());
01338 
01339         publicationSample.domain         = currentDomain->second->get_id();
01340         publicationSample.participant    = p->get_participant_id();
01341         publicationSample.topic          = p->get_topic_id();
01342         publicationSample.id             = p->get_id();
01343         publicationSample.callback       = callback.in();
01344         publicationSample.datawriter_qos = *p->get_datawriter_qos();
01345         publicationSample.publisher_qos  = *p->get_publisher_qos();
01346         publicationSample.transport_info = p->get_transportLocatorSeq();
01347 
01348         peer->initializePublication(publicationSample);
01349       }
01350 
01351       // Process each subscription within the current particpant.
01352       for (DCPS_IR_Subscription_Map::const_iterator currentSubscription
01353            = currentParticipant->second->subscriptions().begin();
01354            currentSubscription != currentParticipant->second->subscriptions().end();
01355            ++currentSubscription) {
01356         SubscriptionUpdate subscriptionSample;
01357         subscriptionSample.sender         = this->id().id();
01358         subscriptionSample.action         = CreateEntity;
01359 
01360         DCPS_IR_Subscription* s = currentSubscription->second;
01361         CORBA::ORB_var orb = this->info_->orb();
01362         CORBA::String_var callback = orb->object_to_string(s->reader());
01363 
01364         subscriptionSample.domain         = currentDomain->second->get_id();
01365         subscriptionSample.participant    = s->get_participant_id();
01366         subscriptionSample.topic          = s->get_topic_id();
01367         subscriptionSample.id             = s->get_id();
01368         subscriptionSample.callback       = callback.in();
01369         subscriptionSample.datareader_qos = *s->get_datareader_qos();
01370         subscriptionSample.subscriber_qos = *s->get_subscriber_qos();
01371         subscriptionSample.transport_info = s->get_transportLocatorSeq();
01372         subscriptionSample.filter_expression = s->get_filter_expression().c_str();
01373         subscriptionSample.expression_params = s->get_expr_params();
01374 
01375         peer->initializeSubscription(subscriptionSample);
01376       }
01377     }
01378   }
01379 }
01380 
01381 } // namespace Federator
01382 } // namespace OpenDDS

Generated on Fri Feb 12 20:05:23 2016 for OpenDDS by  doxygen 1.4.7