FederatorManagerImpl_updates.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "DcpsInfo_pch.h"
00009 #include "FederatorManagerImpl.h"
00010 #include "DCPSInfo_i.h"
00011 #include "DCPS_IR_Domain.h"
00012 #include "DCPS_IR_Participant.h"
00013 
00014 #include "dds/DCPS/RepoIdConverter.h"
00015 
00016 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00017 
00018 namespace OpenDDS {
00019 namespace Federator {
00020 
00021 void
00022 ManagerImpl::unregisterCallback()
00023 {
00024   /* This method intentionally left unimplemented. */
00025 }
00026 
00027 void
00028 ManagerImpl::requestImage()
00029 {
00030   /* This method intentionally left unimplemented. */
00031 }
00032 
00033 ////////////////////////////////////////////////////////////////////////
00034 //
00035 // The following methods publish updates to the remainder of the
00036 // federation.
00037 //
00038 
00039 void
00040 ManagerImpl::create(const Update::UTopic& topic)
00041 {
00042   if (CORBA::is_nil(this->topicWriter_.in())) {
00043     // Decline to publish data until we can.
00044     return;
00045   }
00046 
00047   TopicUpdate sample;
00048   sample.sender      = this->id().id();
00049   sample.action      = CreateEntity;
00050 
00051   sample.id          = topic.topicId;
00052   sample.domain      = topic.domainId;
00053   sample.participant = topic.participantId;
00054   sample.topic       = topic.name.c_str();
00055   sample.datatype    = topic.dataType.c_str();
00056   sample.qos         = topic.topicQos;
00057 
00058   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00059     OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant);
00060     OpenDDS::DCPS::RepoIdConverter topic_converter(sample.id);
00061     ACE_DEBUG((LM_DEBUG,
00062                ACE_TEXT("(%P|%t) Federator::ManagerImpl::create( TopicUpdate): ")
00063                ACE_TEXT("repo %d - [ domain %d/ participant %C/ topic %C ]\n"),
00064                this->id().id(),
00065                sample.domain,
00066                std::string(part_converter).c_str(),
00067                std::string(topic_converter).c_str()));
00068   }
00069 
00070   this->topicWriter_->write(sample, DDS::HANDLE_NIL);
00071 }
00072 
00073 void
00074 ManagerImpl::create(const Update::UParticipant& participant)
00075 {
00076   if (CORBA::is_nil(this->participantWriter_.in())) {
00077     // Decline to publish data until we can.
00078     return;
00079   }
00080 
00081   ParticipantUpdate sample;
00082   sample.sender = this->id().id();
00083   sample.action = CreateEntity;
00084 
00085   sample.owner  = participant.owner;
00086   sample.domain = participant.domainId;
00087   sample.id     = participant.participantId;
00088   sample.qos    = participant.participantQos;
00089 
00090   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00091     OpenDDS::DCPS::RepoIdConverter converter(sample.id);
00092     ACE_DEBUG((LM_DEBUG,
00093                ACE_TEXT("(%P|%t) Federator::ManagerImpl::create( ParticipantUpdate): ")
00094                ACE_TEXT("repo %d - [ domain %d/ participant %C ]\n"),
00095                this->id().id(),
00096                sample.domain,
00097                std::string(converter).c_str()));
00098   }
00099 
00100   this->participantWriter_->write(sample, DDS::HANDLE_NIL);
00101 }
00102 
00103 void
00104 ManagerImpl::create(const Update::URActor& reader)
00105 {
00106   if (CORBA::is_nil(this->subscriptionWriter_.in())) {
00107     // Decline to publish data until we can.
00108     return;
00109   }
00110 
00111   SubscriptionUpdate sample;
00112   sample.sender         = this->id().id();
00113   sample.action         = CreateEntity;
00114 
00115   sample.domain         = reader.domainId;
00116   sample.participant    = reader.participantId;
00117   sample.topic          = reader.topicId;
00118   sample.id             = reader.actorId;
00119   sample.callback       = reader.callback.c_str();
00120   sample.datareader_qos = reader.drdwQos;
00121   sample.subscriber_qos = reader.pubsubQos;
00122   sample.transport_info = reader.transportInterfaceInfo;
00123   sample.filter_class_name = reader.contentSubscriptionProfile.filterClassName;
00124   sample.filter_expression = reader.contentSubscriptionProfile.filterExpr;
00125   sample.expression_params = reader.contentSubscriptionProfile.exprParams;
00126 
00127   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00128     OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant);
00129     OpenDDS::DCPS::RepoIdConverter sub_converter(sample.id);
00130     ACE_DEBUG((LM_DEBUG,
00131                ACE_TEXT("(%P|%t) Federator::ManagerImpl::create( SubscriptionUpdate): ")
00132                ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"),
00133                this->id().id(),
00134                sample.domain,
00135                std::string(part_converter).c_str(),
00136                std::string(sub_converter).c_str()));
00137   }
00138 
00139   this->subscriptionWriter_->write(sample, DDS::HANDLE_NIL);
00140 }
00141 
00142 void
00143 ManagerImpl::create(const Update::UWActor& writer)
00144 {
00145   if (CORBA::is_nil(this->publicationWriter_.in())) {
00146     // Decline to publish data until we can.
00147     return;
00148   }
00149 
00150   PublicationUpdate sample;
00151   sample.sender         = this->id().id();
00152   sample.action         = CreateEntity;
00153 
00154   sample.domain         = writer.domainId;
00155   sample.participant    = writer.participantId;
00156   sample.topic          = writer.topicId;
00157   sample.id             = writer.actorId;
00158   sample.callback       = writer.callback.c_str();
00159   sample.datawriter_qos = writer.drdwQos;
00160   sample.publisher_qos  = writer.pubsubQos;
00161   sample.transport_info = writer.transportInterfaceInfo;
00162 
00163   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00164     OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant);
00165     OpenDDS::DCPS::RepoIdConverter pub_converter(sample.id);
00166     ACE_DEBUG((LM_DEBUG,
00167                ACE_TEXT("(%P|%t) Federator::ManagerImpl::create( PublicationUpdate): ")
00168                ACE_TEXT("repo %d - [ domain %d/ participant %C/ publication %C ]\n"),
00169                this->id().id(),
00170                sample.domain,
00171                std::string(part_converter).c_str(),
00172                std::string(pub_converter).c_str()));
00173   }
00174 
00175   this->publicationWriter_->write(sample, DDS::HANDLE_NIL);
00176 }
00177 
00178 void
00179 ManagerImpl::create(const Update::OwnershipData& data)
00180 {
00181   if (CORBA::is_nil(this->ownerWriter_.in())) {
00182     // Decline to publish data until we can.
00183     return;
00184   }
00185 
00186   OwnerUpdate sample;
00187   sample.sender      = this->id().id();
00188   sample.action      = CreateEntity;
00189 
00190   sample.domain      = data.domain;
00191   sample.participant = data.participant;
00192   sample.owner       = data.owner;
00193 
00194   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00195     OpenDDS::DCPS::RepoIdConverter converter(sample.participant);
00196     ACE_DEBUG((LM_DEBUG,
00197                ACE_TEXT("(%P|%t) Federator::ManagerImpl::create( OwnerUpdate): ")
00198                ACE_TEXT("repo %d - [ domain %d/ participant %C/ sender %d/ owner %d ]\n"),
00199                this->id().id(),
00200                sample.domain,
00201                std::string(converter).c_str(),
00202                sample.sender,
00203                sample.owner));
00204   }
00205 
00206   this->ownerWriter_->write(sample, DDS::HANDLE_NIL);
00207 }
00208 
00209 void
00210 ManagerImpl::destroy(
00211   const Update::IdPath& id,
00212   Update::ItemType      type,
00213   Update::ActorType     actor)
00214 {
00215   //
00216   // Do not propagate any destroy() messages within the FederationDomain.
00217   // This domain will be managed separately.
00218   //
00219   if (id.domain == this->config_.federationDomain()) {
00220     return;
00221   }
00222 
00223   switch (type) {
00224   case Update::Topic: {
00225     if (CORBA::is_nil(this->topicWriter_.in())) {
00226       // Decline to publish data until we can.
00227       return;
00228     }
00229 
00230     TopicUpdate sample;
00231     sample.sender      = this->id().id();
00232     sample.action      = DestroyEntity;
00233 
00234     sample.id          = id.id;
00235     sample.domain      = id.domain;
00236     sample.participant = id.participant;
00237 
00238     if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00239       OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant);
00240       OpenDDS::DCPS::RepoIdConverter topic_converter(sample.id);
00241       ACE_DEBUG((LM_DEBUG,
00242                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::destroy( TopicUpdate): ")
00243                  ACE_TEXT("repo %d - [ domain %d/ participant %C/ topic %C ]\n"),
00244                  this->id().id(),
00245                  sample.domain,
00246                  std::string(part_converter).c_str(),
00247                  std::string(topic_converter).c_str()));
00248     }
00249 
00250     this->topicWriter_->write(sample, DDS::HANDLE_NIL);
00251   }
00252   break;
00253 
00254   case Update::Participant: {
00255     if (CORBA::is_nil(this->participantWriter_.in())) {
00256       // Decline to publish data until we can.
00257       return;
00258     }
00259 
00260     ParticipantUpdate sample;
00261     sample.sender = this->id().id();
00262     sample.action = DestroyEntity;
00263 
00264     sample.domain = id.domain;
00265     sample.id     = id.id;
00266 
00267     if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00268       OpenDDS::DCPS::RepoIdConverter converter(sample.id);
00269       ACE_DEBUG((LM_DEBUG,
00270                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::destroy( ParticipantUpdate): ")
00271                  ACE_TEXT("repo %d - [ domain %d/ participant %C ]\n"),
00272                  this->id().id(),
00273                  sample.domain,
00274                  std::string(converter).c_str()));
00275     }
00276 
00277     this->participantWriter_->write(sample, DDS::HANDLE_NIL);
00278   }
00279   break;
00280 
00281   case Update::Actor:
00282 
00283     // This is VERY annoying.
00284     switch (actor) {
00285     case Update::DataWriter: {
00286       if (CORBA::is_nil(this->publicationWriter_.in())) {
00287         // Decline to publish data until we can.
00288         return;
00289       }
00290 
00291       PublicationUpdate sample;
00292       sample.sender         = this->id().id();
00293       sample.action         = DestroyEntity;
00294 
00295       sample.domain         = id.domain;
00296       sample.participant    = id.participant;
00297       sample.id             = id.id;
00298 
00299       if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00300         OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant);
00301         OpenDDS::DCPS::RepoIdConverter pub_converter(sample.id);
00302         ACE_DEBUG((LM_DEBUG,
00303                    ACE_TEXT("(%P|%t) Federator::ManagerImpl::destroy( PublicationUpdate): ")
00304                    ACE_TEXT("repo %d - [ domain %d/ participant %C/ publication %C ]\n"),
00305                    this->id().id(),
00306                    sample.domain,
00307                    std::string(part_converter).c_str(),
00308                    std::string(pub_converter).c_str()));
00309       }
00310 
00311       this->publicationWriter_->write(sample, DDS::HANDLE_NIL);
00312     }
00313     break;
00314 
00315     case Update::DataReader: {
00316       if (CORBA::is_nil(this->subscriptionWriter_.in())) {
00317         // Decline to publish data until we can.
00318         return;
00319       }
00320 
00321       SubscriptionUpdate sample;
00322       sample.sender         = this->id().id();
00323       sample.action         = DestroyEntity;
00324 
00325       sample.domain         = id.domain;
00326       sample.participant    = id.participant;
00327       sample.id             = id.id;
00328 
00329       if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00330         OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant);
00331         OpenDDS::DCPS::RepoIdConverter sub_converter(sample.id);
00332         ACE_DEBUG((LM_DEBUG,
00333                    ACE_TEXT("(%P|%t) Federator::ManagerImpl::destroy( SubscriptionUpdate): ")
00334                    ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"),
00335                    this->id().id(),
00336                    sample.domain,
00337                    std::string(part_converter).c_str(),
00338                    std::string(sub_converter).c_str()));
00339       }
00340 
00341       this->subscriptionWriter_->write(sample, DDS::HANDLE_NIL);
00342     }
00343     break;
00344     }
00345 
00346     break;
00347   }
00348 }
00349 
00350 void
00351 ManagerImpl::update(const Update::IdPath& id, const DDS::DomainParticipantQos& qos)
00352 {
00353   if (CORBA::is_nil(this->participantWriter_.in())) {
00354     // Decline to publish data until we can.
00355     return;
00356   }
00357 
00358   ParticipantUpdate sample;
00359   sample.sender = this->id().id();
00360   sample.action = UpdateQosValue1;
00361 
00362   sample.domain = id.domain;
00363   sample.id     = id.id;
00364   sample.qos    = qos;
00365 
00366   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00367     OpenDDS::DCPS::RepoIdConverter converter(sample.id);
00368     ACE_DEBUG((LM_DEBUG,
00369                ACE_TEXT("(%P|%t) Federator::ManagerImpl::update( ParticipantUpdate): ")
00370                ACE_TEXT("repo %d - [ domain %d/ participant %C ]\n"),
00371                this->id().id(),
00372                sample.domain,
00373                std::string(converter).c_str()));
00374   }
00375 
00376   this->participantWriter_->write(sample, DDS::HANDLE_NIL);
00377 }
00378 
00379 void
00380 ManagerImpl::update(const Update::IdPath& id, const DDS::TopicQos& qos)
00381 {
00382   if (CORBA::is_nil(this->topicWriter_.in())) {
00383     // Decline to publish data until we can.
00384     return;
00385   }
00386 
00387   TopicUpdate sample;
00388   sample.sender      = this->id().id();
00389   sample.action      = UpdateQosValue1;
00390 
00391   sample.id          = id.id;
00392   sample.domain      = id.domain;
00393   sample.participant = id.participant;
00394   sample.qos         = qos;
00395 
00396   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00397     OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant);
00398     OpenDDS::DCPS::RepoIdConverter topic_converter(sample.id);
00399     ACE_DEBUG((LM_DEBUG,
00400                ACE_TEXT("(%P|%t) Federator::ManagerImpl::update( TopicUpdate): ")
00401                ACE_TEXT("repo %d - [ domain %d/ participant %C/ topic %C ]\n"),
00402                this->id().id(),
00403                sample.domain,
00404                std::string(part_converter).c_str(),
00405                std::string(topic_converter).c_str()));
00406   }
00407 
00408   this->topicWriter_->write(sample, DDS::HANDLE_NIL);
00409 }
00410 
00411 void
00412 ManagerImpl::update(const Update::IdPath& id, const DDS::DataWriterQos& qos)
00413 {
00414   if (CORBA::is_nil(this->publicationWriter_.in())) {
00415     // Decline to publish data until we can.
00416     return;
00417   }
00418 
00419   PublicationUpdate sample;
00420   sample.sender         = this->id().id();
00421   sample.action         = UpdateQosValue1;
00422 
00423   sample.domain         = id.domain;
00424   sample.participant    = id.participant;
00425   sample.id             = id.id;
00426   sample.datawriter_qos = qos;
00427 
00428   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00429     OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant);
00430     OpenDDS::DCPS::RepoIdConverter pub_converter(sample.id);
00431     ACE_DEBUG((LM_DEBUG,
00432                ACE_TEXT("(%P|%t) Federator::ManagerImpl::update( WriterUpdate): ")
00433                ACE_TEXT("repo %d - [ domain %d/ participant %C/ publication %C ]\n"),
00434                this->id().id(),
00435                sample.domain,
00436                std::string(part_converter).c_str(),
00437                std::string(pub_converter).c_str()));
00438   }
00439 
00440   this->publicationWriter_->write(sample, DDS::HANDLE_NIL);
00441 }
00442 
00443 void
00444 ManagerImpl::update(const Update::IdPath& id, const DDS::PublisherQos& qos)
00445 {
00446   if (CORBA::is_nil(this->publicationWriter_.in())) {
00447     // Decline to publish data until we can.
00448     return;
00449   }
00450 
00451   PublicationUpdate sample;
00452   sample.sender         = this->id().id();
00453   sample.action         = UpdateQosValue2;
00454 
00455   sample.domain         = id.domain;
00456   sample.participant    = id.participant;
00457   sample.id             = id.id;
00458   sample.publisher_qos  = qos;
00459 
00460   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00461     OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant);
00462     OpenDDS::DCPS::RepoIdConverter pub_converter(sample.id);
00463     ACE_DEBUG((LM_DEBUG,
00464                ACE_TEXT("(%P|%t) Federator::ManagerImpl::update( PublisherUpdate): ")
00465                ACE_TEXT("repo %d - [ domain %d/ participant %C/ publication %C ]\n"),
00466                this->id().id(),
00467                sample.domain,
00468                std::string(part_converter).c_str(),
00469                std::string(pub_converter).c_str()));
00470   }
00471 
00472   this->publicationWriter_->write(sample, DDS::HANDLE_NIL);
00473 }
00474 
00475 void
00476 ManagerImpl::update(const Update::IdPath& id, const DDS::DataReaderQos& qos)
00477 {
00478   if (CORBA::is_nil(this->subscriptionWriter_.in())) {
00479     // Decline to publish data until we can.
00480     return;
00481   }
00482 
00483   SubscriptionUpdate sample;
00484   sample.sender         = this->id().id();
00485   sample.action         = UpdateQosValue1;
00486 
00487   sample.domain         = id.domain;
00488   sample.participant    = id.participant;
00489   sample.id             = id.id;
00490   sample.datareader_qos = qos;
00491 
00492   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00493     OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant);
00494     OpenDDS::DCPS::RepoIdConverter sub_converter(sample.id);
00495     ACE_DEBUG((LM_DEBUG,
00496                ACE_TEXT("(%P|%t) Federator::ManagerImpl::update( ReaderUpdate): ")
00497                ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"),
00498                this->id().id(),
00499                sample.domain,
00500                std::string(part_converter).c_str(),
00501                std::string(sub_converter).c_str()));
00502   }
00503 
00504   this->subscriptionWriter_->write(sample, DDS::HANDLE_NIL);
00505 }
00506 
00507 void
00508 ManagerImpl::update(const Update::IdPath& id, const DDS::StringSeq& params)
00509 {
00510   if (CORBA::is_nil(this->subscriptionWriter_.in())) {
00511     // Decline to publish data until we can.
00512     return;
00513   }
00514 
00515   SubscriptionUpdate sample;
00516   sample.sender            = this->id().id();
00517   sample.action            = UpdateFilterExpressionParams;
00518   sample.domain            = id.domain;
00519   sample.participant       = id.participant;
00520   sample.id                = id.id;
00521   sample.expression_params = params;
00522 
00523   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00524     OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant);
00525     OpenDDS::DCPS::RepoIdConverter sub_converter(sample.id);
00526     ACE_DEBUG((LM_DEBUG,
00527                ACE_TEXT("(%P|%t) Federator::ManagerImpl::update(FilterParams): ")
00528                ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"),
00529                this->id().id(),
00530                sample.domain,
00531                std::string(part_converter).c_str(),
00532                std::string(sub_converter).c_str()));
00533   }
00534 
00535   this->subscriptionWriter_->write(sample, DDS::HANDLE_NIL);
00536 }
00537 
00538 void
00539 ManagerImpl::update(const Update::IdPath& id, const DDS::SubscriberQos& qos)
00540 {
00541   if (CORBA::is_nil(this->subscriptionWriter_.in())) {
00542     // Decline to publish data until we can.
00543     return;
00544   }
00545 
00546   SubscriptionUpdate sample;
00547   sample.sender         = this->id().id();
00548   sample.action         = UpdateQosValue2;
00549 
00550   sample.domain         = id.domain;
00551   sample.participant    = id.participant;
00552   sample.id             = id.id;
00553   sample.subscriber_qos = qos;
00554 
00555   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00556     OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant);
00557     OpenDDS::DCPS::RepoIdConverter sub_converter(sample.id);
00558     ACE_DEBUG((LM_DEBUG,
00559                ACE_TEXT("(%P|%t) Federator::ManagerImpl::update( SubscriberUpdate): ")
00560                ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"),
00561                this->id().id(),
00562                sample.domain,
00563                std::string(part_converter).c_str(),
00564                std::string(sub_converter).c_str()));
00565   }
00566 
00567   this->subscriptionWriter_->write(sample, DDS::HANDLE_NIL);
00568 }
00569 
00570 ////////////////////////////////////////////////////////////////////////
00571 //
00572 // The following methods process updates received from the remainder
00573 // of the federation.
00574 //
00575 
00576 void
00577 ManagerImpl::processCreate(const OwnerUpdate* sample, const DDS::SampleInfo* /* info */)
00578 {
00579   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00580     OpenDDS::DCPS::RepoIdConverter converter(sample->participant);
00581     ACE_DEBUG((LM_DEBUG,
00582                ACE_TEXT("(%P|%t) Federator::ManagerImpl::processCreate( OwnerUpdate): ")
00583                ACE_TEXT("repo %d - [ domain %d/ participant %C/ sender %d/ owner %d ]\n"),
00584                this->id().id(),
00585                sample->domain,
00586                std::string(converter).c_str(),
00587                sample->sender,
00588                sample->owner));
00589   }
00590 
00591   // We could generate an error message here.  Instead we let action be irrelevant.
00592   if (false == this->info_->changeOwnership(sample->domain,
00593                                             sample->participant,
00594                                             sample->sender,
00595                                             sample->owner)) {
00596     {
00597       ACE_GUARD(ACE_Thread_Mutex,
00598                 guard,
00599                 this->deferred_lock_);
00600       this->deferredOwnerships_.push_back(*sample);
00601     }
00602 
00603     if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00604       ACE_DEBUG((LM_DEBUG,
00605                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processCreate( OwnerUpdate): ")
00606                  ACE_TEXT("deferred update.\n")));
00607     }
00608   }
00609 
00610   this->processDeferred();
00611 }
00612 
00613 void
00614 ManagerImpl::processCreate(const PublicationUpdate* sample, const DDS::SampleInfo* /* info */)
00615 {
00616   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00617     OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
00618     OpenDDS::DCPS::RepoIdConverter pub_converter(sample->id);
00619     ACE_DEBUG((LM_DEBUG,
00620                ACE_TEXT("(%P|%t) Federator::ManagerImpl::processCreate( PublicationUpdate): ")
00621                ACE_TEXT("repo %d - [ domain %d/ participant %C/ publication %C ]\n"),
00622                this->id().id(),
00623                sample->domain,
00624                std::string(part_converter).c_str(),
00625                std::string(pub_converter).c_str()));
00626   }
00627 
00628   if (false == this->info_->add_publication(sample->domain,
00629                                             sample->participant,
00630                                             sample->topic,
00631                                             sample->id,
00632                                             sample->callback,
00633                                             sample->datawriter_qos,
00634                                             sample->transport_info,
00635                                             sample->publisher_qos,
00636                                             true)) {
00637     {
00638       ACE_GUARD(ACE_Thread_Mutex,
00639                 guard,
00640                 this->deferred_lock_);
00641       this->deferredPublications_.push_back(*sample);
00642     }
00643 
00644     if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00645       ACE_DEBUG((LM_DEBUG,
00646                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processCreate( PublicationUpdate): ")
00647                  ACE_TEXT("deferred update.\n")));
00648     }
00649   }
00650 
00651   this->processDeferred();
00652 }
00653 
00654 void
00655 ManagerImpl::processCreate(const SubscriptionUpdate* sample, const DDS::SampleInfo* /* info */)
00656 {
00657   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00658     OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
00659     OpenDDS::DCPS::RepoIdConverter sub_converter(sample->id);
00660     ACE_DEBUG((LM_DEBUG,
00661                ACE_TEXT("(%P|%t) Federator::ManagerImpl::processCreate( SubscriptionUpdate): ")
00662                ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"),
00663                this->id().id(),
00664                sample->domain,
00665                std::string(part_converter).c_str(),
00666                std::string(sub_converter).c_str()));
00667   }
00668 
00669   if (false == this->info_->add_subscription(sample->domain,
00670                                              sample->participant,
00671                                              sample->topic,
00672                                              sample->id,
00673                                              sample->callback,
00674                                              sample->datareader_qos,
00675                                              sample->transport_info,
00676                                              sample->subscriber_qos,
00677                                              sample->filter_class_name,
00678                                              sample->filter_expression,
00679                                              sample->expression_params,
00680                                              true)) {
00681     {
00682       ACE_GUARD(ACE_Thread_Mutex,
00683                 guard,
00684                 this->deferred_lock_);
00685       this->deferredSubscriptions_.push_back(*sample);
00686     }
00687 
00688     if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00689       ACE_DEBUG((LM_DEBUG,
00690                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processCreate( SubscriptionUpdate): ")
00691                  ACE_TEXT("deferred update.\n")));
00692     }
00693   }
00694 
00695   this->processDeferred();
00696 }
00697 
00698 void
00699 ManagerImpl::processCreate(const ParticipantUpdate* sample, const DDS::SampleInfo* /* info */)
00700 {
00701   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00702     OpenDDS::DCPS::RepoIdConverter converter(sample->id);
00703     ACE_DEBUG((LM_DEBUG,
00704                ACE_TEXT("(%P|%t) Federator::ManagerImpl::processCreate( ParticipantUpdate): ")
00705                ACE_TEXT("repo %d - [ domain %d/ participant %C/ owner %d ]\n"),
00706                this->id().id(),
00707                sample->domain,
00708                std::string(converter).c_str(),
00709                sample->owner));
00710   }
00711 
00712   this->info_->add_domain_participant(
00713     sample->domain,
00714     sample->id,
00715     sample->qos);
00716   bool ownershipChanged = this->info_->changeOwnership(
00717     sample->domain,
00718     sample->id,
00719     sample->sender,
00720     sample->owner);
00721   if (!ownershipChanged) {
00722     ACE_ERROR((LM_ERROR,
00723                ACE_TEXT("(%P|%t) ERROR: ")
00724                ACE_TEXT("OpenDDS::Federator::ManagerImpl::processCreate(), ")
00725                ACE_TEXT("Could not change ownership\n")));
00726   }
00727   this->processDeferred();
00728 }
00729 
00730 void
00731 ManagerImpl::processCreate(const TopicUpdate* sample, const DDS::SampleInfo* /* info */)
00732 {
00733   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00734     OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
00735     OpenDDS::DCPS::RepoIdConverter topic_converter(sample->id);
00736     ACE_DEBUG((LM_DEBUG,
00737                ACE_TEXT("(%P|%t) Federator::ManagerImpl::processCreate( TopicUpdate): ")
00738                ACE_TEXT("repo %d - [ domain %d/ participant %C/ topic %C ]\n"),
00739                this->id().id(),
00740                sample->domain,
00741                std::string(part_converter).c_str(),
00742                std::string(topic_converter).c_str()));
00743   }
00744 
00745   if (false == this->info_->add_topic(sample->id,
00746                                       sample->domain,
00747                                       sample->participant,
00748                                       sample->topic,
00749                                       sample->datatype,
00750                                       sample->qos)) {
00751     {
00752       ACE_GUARD(ACE_Thread_Mutex,
00753                 guard,
00754                 this->deferred_lock_);
00755       this->deferredTopics_.push_back(*sample);
00756     }
00757 
00758     if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00759       ACE_DEBUG((LM_DEBUG,
00760                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processCreate( TopicUpdate): ")
00761                  ACE_TEXT("deferred update.\n")));
00762     }
00763   }
00764 
00765   this->processDeferred();
00766 }
00767 
00768 void
00769 ManagerImpl::processDeferred()
00770 {
00771   ACE_GUARD(ACE_Thread_Mutex,
00772             guard,
00773             this->deferred_lock_);
00774 
00775   {
00776     std::list<OwnerUpdate>::iterator current = this->deferredOwnerships_.begin();
00777 
00778     while (current != this->deferredOwnerships_.end()) {
00779       if (this->info_->changeOwnership(current->domain,
00780                                        current->participant,
00781                                        current->sender,
00782                                        current->owner)) {
00783         if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00784           OpenDDS::DCPS::RepoIdConverter converter(current->participant);
00785           ACE_DEBUG((LM_DEBUG,
00786                      ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDeferred( OwnerUpdate): ")
00787                      ACE_TEXT("repo %d - [ domain %d/ participant %C/ sender %d/ owner %d ]\n"),
00788                      this->id().id(),
00789                      current->domain,
00790                      std::string(converter).c_str(),
00791                      current->sender,
00792                      current->owner));
00793         }
00794 
00795         current = this->deferredOwnerships_.erase(current);
00796 
00797       } else {
00798         ++ current;
00799       }
00800     }
00801   }
00802 
00803   {
00804     std::list<TopicUpdate>::iterator current = this->deferredTopics_.begin();
00805 
00806     while (current != this->deferredTopics_.end()) {
00807       if (true == this->info_->add_topic(current->id,
00808                                          current->domain,
00809                                          current->participant,
00810                                          current->topic,
00811                                          current->datatype,
00812                                          current->qos)) {
00813         if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00814           OpenDDS::DCPS::RepoIdConverter part_converter(current->participant);
00815           OpenDDS::DCPS::RepoIdConverter topic_converter(current->id);
00816           ACE_DEBUG((LM_DEBUG,
00817                      ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDeferred( TopicUpdate): ")
00818                      ACE_TEXT("repo %d - [ domain %d/ participant %C/ topic %C ]\n"),
00819                      this->id().id(),
00820                      current->domain,
00821                      std::string(part_converter).c_str(),
00822                      std::string(topic_converter).c_str()));
00823         }
00824 
00825         current = this->deferredTopics_.erase(current);
00826 
00827       } else {
00828         ++ current;
00829       }
00830     }
00831   }
00832 
00833   {
00834     std::list<PublicationUpdate>::iterator current = this->deferredPublications_.begin();
00835 
00836     while (current != this->deferredPublications_.end()) {
00837 
00838       if (true == this->info_->add_publication(current->domain,
00839                                                current->participant,
00840                                                current->topic,
00841                                                current->id,
00842                                                current->callback,
00843                                                current->datawriter_qos,
00844                                                current->transport_info,
00845                                                current->publisher_qos,
00846                                                true)) {
00847         if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00848           OpenDDS::DCPS::RepoIdConverter part_converter(current->participant);
00849           OpenDDS::DCPS::RepoIdConverter pub_converter(current->id);
00850           ACE_DEBUG((LM_DEBUG,
00851                      ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDeferred( PublicationUpdate): ")
00852                      ACE_TEXT("repo %d - [ domain %d/ participant %C/ publication %C ]\n"),
00853                      this->id().id(),
00854                      current->domain,
00855                      std::string(part_converter).c_str(),
00856                      std::string(pub_converter).c_str()));
00857         }
00858 
00859         current = this->deferredPublications_.erase(current);
00860 
00861       } else {
00862         ++ current;
00863       }
00864     }
00865   }
00866 
00867   {
00868     std::list<SubscriptionUpdate>::iterator current = this->deferredSubscriptions_.begin();
00869 
00870     while (current != this->deferredSubscriptions_.end()) {
00871 
00872       if (true == this->info_->add_subscription(current->domain,
00873                                                 current->participant,
00874                                                 current->topic,
00875                                                 current->id,
00876                                                 current->callback,
00877                                                 current->datareader_qos,
00878                                                 current->transport_info,
00879                                                 current->subscriber_qos,
00880                                                 current->filter_class_name,
00881                                                 current->filter_expression,
00882                                                 current->expression_params,
00883                                                 true)) {
00884         if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00885           OpenDDS::DCPS::RepoIdConverter part_converter(current->participant);
00886           OpenDDS::DCPS::RepoIdConverter sub_converter(current->id);
00887           ACE_DEBUG((LM_DEBUG,
00888                      ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDeferred( SubscriptionUpdate): ")
00889                      ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"),
00890                      this->id().id(),
00891                      current->domain,
00892                      std::string(part_converter).c_str(),
00893                      std::string(sub_converter).c_str()));
00894         }
00895 
00896         current = this->deferredSubscriptions_.erase(current);
00897 
00898       } else {
00899         ++ current;
00900       }
00901     }
00902   }
00903 
00904 }
00905 
00906 void
00907 ManagerImpl::processUpdateQos1(const OwnerUpdate* sample, const DDS::SampleInfo* /* info */)
00908 {
00909   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00910     OpenDDS::DCPS::RepoIdConverter converter(sample->participant);
00911     ACE_DEBUG((LM_DEBUG,
00912                ACE_TEXT("(%P|%t) Federator::ManagerImpl::processUpdateQos1( OwnerUpdate): ")
00913                ACE_TEXT("repo %d - [ domain %d/ participant %C/ sender %d/ owner %d ]\n"),
00914                this->id().id(),
00915                sample->domain,
00916                std::string(converter).c_str(),
00917                sample->sender,
00918                sample->owner));
00919   }
00920 
00921   if (false == this->info_->changeOwnership(sample->domain,
00922                                             sample->participant,
00923                                             sample->sender,
00924                                             sample->owner)) {
00925     {
00926       ACE_GUARD(ACE_Thread_Mutex,
00927                 guard,
00928                 this->deferred_lock_);
00929 
00930       this->deferredOwnerships_.push_back(*sample);
00931     }
00932     ACE_DEBUG((LM_DEBUG,
00933                ACE_TEXT("(%P|%t) Federator::ManagerImpl::processUpdateQos1( OwnerUpdate): ")
00934                ACE_TEXT("deferred update.\n")));
00935   }
00936 }
00937 
00938 void
00939 ManagerImpl::processUpdateQos1(const PublicationUpdate* sample, const DDS::SampleInfo* /* info */)
00940 {
00941   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00942     OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
00943     OpenDDS::DCPS::RepoIdConverter pub_converter(sample->id);
00944     ACE_DEBUG((LM_DEBUG,
00945                ACE_TEXT("(%P|%t) Federator::ManagerImpl::processUpdateQos1( PublicationUpdate): ")
00946                ACE_TEXT("repo %d - [ domain %d/ participant %C/ publication %C ]\n"),
00947                this->id().id(),
00948                sample->domain,
00949                std::string(part_converter).c_str(),
00950                std::string(pub_converter).c_str()));
00951   }
00952 
00953   this->info_->update_publication_qos(
00954     sample->domain,
00955     sample->participant,
00956     sample->id,
00957     sample->datawriter_qos);
00958 }
00959 
00960 void
00961 ManagerImpl::processUpdateQos2(const PublicationUpdate* sample, const DDS::SampleInfo* /* info */)
00962 {
00963   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00964     OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
00965     OpenDDS::DCPS::RepoIdConverter pub_converter(sample->id);
00966     ACE_DEBUG((LM_DEBUG,
00967                ACE_TEXT("(%P|%t) Federator::ManagerImpl::processUpdateQos2( PublicationUpdate): ")
00968                ACE_TEXT("repo %d - [ domain %d/ participant %C/ publication %C ]\n"),
00969                this->id().id(),
00970                sample->domain,
00971                std::string(part_converter).c_str(),
00972                std::string(pub_converter).c_str()));
00973   }
00974 
00975   this->info_->update_publication_qos(
00976     sample->domain,
00977     sample->participant,
00978     sample->id,
00979     sample->publisher_qos);
00980 }
00981 
00982 void
00983 ManagerImpl::processUpdateQos1(const SubscriptionUpdate* sample, const DDS::SampleInfo* /* info */)
00984 {
00985   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00986     OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
00987     OpenDDS::DCPS::RepoIdConverter sub_converter(sample->id);
00988     ACE_DEBUG((LM_DEBUG,
00989                ACE_TEXT("(%P|%t) Federator::ManagerImpl::processUpdateQos1( SubscriptionUpdate): ")
00990                ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"),
00991                this->id().id(),
00992                sample->domain,
00993                std::string(part_converter).c_str(),
00994                std::string(sub_converter).c_str()));
00995   }
00996 
00997   this->info_->update_subscription_qos(
00998     sample->domain,
00999     sample->participant,
01000     sample->id,
01001     sample->datareader_qos);
01002 }
01003 
01004 void
01005 ManagerImpl::processUpdateQos2(const SubscriptionUpdate* sample, const DDS::SampleInfo* /* info */)
01006 {
01007   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
01008     OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
01009     OpenDDS::DCPS::RepoIdConverter sub_converter(sample->id);
01010     ACE_DEBUG((LM_DEBUG,
01011                ACE_TEXT("(%P|%t) Federator::ManagerImpl::processUpdateQos2( SubscriptionUpdate): ")
01012                ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"),
01013                this->id().id(),
01014                sample->domain,
01015                std::string(part_converter).c_str(),
01016                std::string(sub_converter).c_str()));
01017   }
01018 
01019   this->info_->update_subscription_qos(
01020     sample->domain,
01021     sample->participant,
01022     sample->id,
01023     sample->subscriber_qos);
01024 }
01025 
01026 void
01027 ManagerImpl::processUpdateFilterExpressionParams(
01028   const SubscriptionUpdate* sample, const DDS::SampleInfo* /* info */)
01029 {
01030   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
01031     OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
01032     OpenDDS::DCPS::RepoIdConverter sub_converter(sample->id);
01033     ACE_DEBUG((LM_DEBUG,
01034                ACE_TEXT("(%P|%t) Federator::ManagerImpl::processUpdateFilterExpressionParams(SubscriptionUpdate): ")
01035                ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"),
01036                this->id().id(),
01037                sample->domain,
01038                std::string(part_converter).c_str(),
01039                std::string(sub_converter).c_str()));
01040   }
01041 
01042   this->info_->update_subscription_params(
01043     sample->domain,
01044     sample->participant,
01045     sample->id,
01046     sample->expression_params);
01047 }
01048 
01049 void
01050 ManagerImpl::processUpdateQos1(const ParticipantUpdate* sample, const DDS::SampleInfo* /* info */)
01051 {
01052   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
01053     OpenDDS::DCPS::RepoIdConverter converter(sample->id);
01054     ACE_DEBUG((LM_DEBUG,
01055                ACE_TEXT("(%P|%t) Federator::ManagerImpl::processUpdateQos1( ParticipantUpdate): ")
01056                ACE_TEXT("repo %d - [ domain %d/ participant %C ]\n"),
01057                this->id().id(),
01058                sample->domain,
01059                std::string(converter).c_str()));
01060   }
01061 
01062   this->info_->update_domain_participant_qos(
01063     sample->domain,
01064     sample->id,
01065     sample->qos);
01066 }
01067 
01068 void
01069 ManagerImpl::processUpdateQos1(const TopicUpdate* sample, const DDS::SampleInfo* /* info */)
01070 {
01071   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
01072     OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
01073     OpenDDS::DCPS::RepoIdConverter topic_converter(sample->id);
01074     ACE_DEBUG((LM_DEBUG,
01075                ACE_TEXT("(%P|%t) Federator::ManagerImpl::processUpdateQos1( TopicUpdate): ")
01076                ACE_TEXT("repo %d - [ domain %d/ participant %C/ topic %C ]\n"),
01077                this->id().id(),
01078                sample->domain,
01079                std::string(part_converter).c_str(),
01080                std::string(topic_converter).c_str()));
01081   }
01082 
01083   this->info_->update_topic_qos(
01084     sample->id,
01085     sample->domain,
01086     sample->participant,
01087     sample->qos);
01088 }
01089 
01090 void
01091 ManagerImpl::processDelete(const OwnerUpdate* sample, const DDS::SampleInfo* /* info */)
01092 {
01093   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
01094     OpenDDS::DCPS::RepoIdConverter converter(sample->participant);
01095     ACE_DEBUG((LM_DEBUG,
01096                ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( OwnerUpdate): ")
01097                ACE_TEXT("repo %d - [ domain %d/ participant %C/ sender %d/ owner %d ]\n"),
01098                this->id().id(),
01099                sample->domain,
01100                std::string(converter).c_str(),
01101                sample->sender,
01102                sample->owner));
01103   }
01104 
01105   // We could generate an error message here.  Instead we let action be irrelevant.
01106   if (false == this->info_->changeOwnership(sample->domain,
01107                                             sample->participant,
01108                                             sample->sender,
01109                                             sample->owner)) {
01110     {
01111       ACE_GUARD(ACE_Thread_Mutex,
01112                 guard,
01113                 this->deferred_lock_);
01114       this->deferredOwnerships_.push_back(*sample);
01115     }
01116     ACE_DEBUG((LM_DEBUG,
01117                ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( OwnerUpdate): ")
01118                ACE_TEXT("deferred update.\n")));
01119   }
01120 }
01121 
01122 void
01123 ManagerImpl::processDelete(const PublicationUpdate* sample, const DDS::SampleInfo* /* info */)
01124 {
01125   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
01126     OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
01127     OpenDDS::DCPS::RepoIdConverter pub_converter(sample->id);
01128     ACE_DEBUG((LM_DEBUG,
01129                ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( PublicationUpdate): ")
01130                ACE_TEXT("repo %d - [ domain %d/ participant %C/ publication %C ]\n"),
01131                this->id().id(),
01132                sample->domain,
01133                std::string(part_converter).c_str(),
01134                std::string(pub_converter).c_str()));
01135   }
01136 
01137   try {
01138     this->info_->remove_publication(
01139       sample->domain,
01140       sample->participant,
01141       sample->id);
01142 
01143   } catch (OpenDDS::DCPS::Invalid_Participant&) {
01144     if (OpenDDS::DCPS::DCPS_debug_level > 9) {
01145       ACE_DEBUG((LM_DEBUG,
01146                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( PublicationUpdate): ")
01147                  ACE_TEXT("the participant was already removed.\n")));
01148     }
01149   }
01150 }
01151 
01152 void
01153 ManagerImpl::processDelete(const SubscriptionUpdate* sample, const DDS::SampleInfo* /* info */)
01154 {
01155   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
01156     OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
01157     OpenDDS::DCPS::RepoIdConverter sub_converter(sample->id);
01158     ACE_DEBUG((LM_DEBUG,
01159                ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( SubscriptionUpdate): ")
01160                ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"),
01161                this->id().id(),
01162                sample->domain,
01163                std::string(part_converter).c_str(),
01164                std::string(sub_converter).c_str()));
01165   }
01166 
01167   try {
01168     this->info_->remove_subscription(
01169       sample->domain,
01170       sample->participant,
01171       sample->id);
01172 
01173   } catch (OpenDDS::DCPS::Invalid_Participant&) {
01174     if (OpenDDS::DCPS::DCPS_debug_level > 9) {
01175       ACE_DEBUG((LM_DEBUG,
01176                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( SubscriptionUpdate): ")
01177                  ACE_TEXT("the participant was already removed.\n")));
01178     }
01179   }
01180 }
01181 
01182 void
01183 ManagerImpl::processDelete(const ParticipantUpdate* sample, const DDS::SampleInfo* /* info */)
01184 {
01185   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
01186     OpenDDS::DCPS::RepoIdConverter converter(sample->id);
01187     ACE_DEBUG((LM_DEBUG,
01188                ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( ParticipantUpdate): ")
01189                ACE_TEXT("repo %d - [ domain %d/ participant %C ]\n"),
01190                this->id().id(),
01191                sample->domain,
01192                std::string(converter).c_str()));
01193   }
01194   try {
01195     this->info_->remove_domain_participant(
01196       sample->domain,
01197       sample->id);
01198   } catch (OpenDDS::DCPS::Invalid_Participant&) {
01199     if (OpenDDS::DCPS::DCPS_debug_level > 9) {
01200       ACE_DEBUG((LM_DEBUG,
01201                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( ParticipantUpdate): ")
01202                  ACE_TEXT("the participant was already removed.\n")));
01203     }
01204   }
01205 }
01206 
01207 void
01208 ManagerImpl::processDelete(const TopicUpdate* sample, const DDS::SampleInfo* /* info */)
01209 {
01210   if (OpenDDS::DCPS::DCPS_debug_level > 9) {
01211     OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
01212     OpenDDS::DCPS::RepoIdConverter topic_converter(sample->id);
01213     ACE_DEBUG((LM_DEBUG,
01214                ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( TopicUpdate): ")
01215                ACE_TEXT("repo %d - [ domain %d/ participant %C/ topic %C ]\n"),
01216                this->id().id(),
01217                sample->domain,
01218                std::string(part_converter).c_str(),
01219                std::string(topic_converter).c_str()));
01220   }
01221 
01222   try {
01223     this->info_->remove_topic(
01224       sample->domain,
01225       sample->participant,
01226       sample->id);
01227 
01228   } catch (OpenDDS::DCPS::Invalid_Participant&) {
01229     if (OpenDDS::DCPS::DCPS_debug_level > 9) {
01230       ACE_DEBUG((LM_DEBUG,
01231                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( TopicUpdate): ")
01232                  ACE_TEXT("the participant was already removed.\n")));
01233     }
01234   } catch (OpenDDS::DCPS::Invalid_Domain&) {
01235     if (OpenDDS::DCPS::DCPS_debug_level > 1) {
01236       ACE_DEBUG((LM_DEBUG,
01237                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( TopicUpdate): ")
01238                  ACE_TEXT("the domain %d no longer exists.\n"),sample->domain));
01239     }
01240   }
01241 }
01242 
01243 void
01244 ManagerImpl::pushState(Manager_ptr peer)
01245 {
01246   // foreach DCPS_IR_Domain
01247   //   foreach DCPS_IR_Participant
01248   //     peer->initializeParticipant(...)
01249   //     peer->initializeOwner(...)
01250   //   foreach DCPS_IR_Participant
01251   //     foreach DCPS_IR_Topic
01252   //       peer->initializeTopic(...)
01253   //     foreach DCPS_IR_Publication
01254   //       peer->initializePublication(...)
01255   //     foreach DCPS_IR_Subscription
01256   //       peer->initializeSubscription(...)
01257 
01258   // Process each domain within the repository.
01259   for (DCPS_IR_Domain_Map::const_iterator currentDomain
01260        = this->info_->domains().begin();
01261        currentDomain != this->info_->domains().end();
01262        ++currentDomain) {
01263 
01264     if (currentDomain->second->get_id() == this->config_.federationDomain()) {
01265       // Do not push the Federation domain publications.
01266       //continue;
01267     }
01268 
01269     // Process each participant within the current domain.
01270     for (DCPS_IR_Participant_Map::const_iterator currentParticipant
01271          = currentDomain->second->participants().begin();
01272          currentParticipant != currentDomain->second->participants().end();
01273          ++currentParticipant) {
01274 
01275       if (currentParticipant->second->isBitPublisher() == true) {
01276         // Do not push the built-in topic publications.
01277         continue;
01278       }
01279 
01280       // Initialize the participant on the peer.
01281       ParticipantUpdate participantSample;
01282       participantSample.sender = this->id().id();
01283       participantSample.action = CreateEntity;
01284 
01285       participantSample.owner  =  currentParticipant->second->owner();
01286       participantSample.domain =  currentDomain->second->get_id();
01287       participantSample.id     =  currentParticipant->second->get_id();
01288       participantSample.qos    = *currentParticipant->second->get_qos();
01289 
01290       peer->initializeParticipant(participantSample);
01291 
01292       // Initialize the ownership of the participant on the peer.
01293       OwnerUpdate ownerSample;
01294       ownerSample.sender      = this->id().id();
01295       ownerSample.action      = CreateEntity;
01296 
01297       ownerSample.domain      = currentDomain->second->get_id();
01298       ownerSample.participant = currentParticipant->second->get_id();
01299       ownerSample.owner       = currentParticipant->second->owner();
01300 
01301       peer->initializeOwner(ownerSample);
01302     }
01303 
01304     // Process each participant within the current domain.
01305     for (DCPS_IR_Participant_Map::const_iterator currentParticipant
01306          = currentDomain->second->participants().begin();
01307          currentParticipant != currentDomain->second->participants().end();
01308          ++currentParticipant) {
01309 
01310       if (currentParticipant->second->isBitPublisher() == true) {
01311         // Do not push the built-in topic publications.
01312         continue;
01313       }
01314 
01315       // Process each topic within the current particpant.
01316       for (DCPS_IR_Topic_Map::const_iterator currentTopic
01317            = currentParticipant->second->topics().begin();
01318            currentTopic != currentParticipant->second->topics().end();
01319            ++currentTopic) {
01320         TopicUpdate topicSample;
01321         topicSample.sender      = this->id().id();
01322         topicSample.action      = CreateEntity;
01323 
01324         topicSample.id          = currentTopic->second->get_id();
01325         topicSample.domain      = currentDomain->second->get_id();
01326         topicSample.participant = currentTopic->second->get_participant_id();
01327         topicSample.topic       = currentTopic->second->get_topic_description()->get_name();
01328         topicSample.datatype    = currentTopic->second->get_topic_description()->get_dataTypeName();
01329         topicSample.qos         = *currentTopic->second->get_topic_qos();
01330 
01331         peer->initializeTopic(topicSample);
01332       }
01333 
01334       // Process each publication within the current particpant.
01335       for (DCPS_IR_Publication_Map::const_iterator currentPublication
01336            = currentParticipant->second->publications().begin();
01337            currentPublication != currentParticipant->second->publications().end();
01338            ++currentPublication) {
01339         PublicationUpdate publicationSample;
01340         publicationSample.sender         = this->id().id();
01341         publicationSample.action         = CreateEntity;
01342 
01343         DCPS_IR_Publication* p = currentPublication->second.get();
01344         CORBA::ORB_var orb = this->info_->orb();
01345         CORBA::String_var callback = orb->object_to_string(p->writer());
01346 
01347         publicationSample.domain         = currentDomain->second->get_id();
01348         publicationSample.participant    = p->get_participant_id();
01349         publicationSample.topic          = p->get_topic_id();
01350         publicationSample.id             = p->get_id();
01351         publicationSample.callback       = callback.in();
01352         publicationSample.datawriter_qos = *p->get_datawriter_qos();
01353         publicationSample.publisher_qos  = *p->get_publisher_qos();
01354         publicationSample.transport_info = p->get_transportLocatorSeq();
01355 
01356         peer->initializePublication(publicationSample);
01357       }
01358 
01359       // Process each subscription within the current particpant.
01360       for (DCPS_IR_Subscription_Map::const_iterator currentSubscription
01361            = currentParticipant->second->subscriptions().begin();
01362            currentSubscription != currentParticipant->second->subscriptions().end();
01363            ++currentSubscription) {
01364         SubscriptionUpdate subscriptionSample;
01365         subscriptionSample.sender         = this->id().id();
01366         subscriptionSample.action         = CreateEntity;
01367 
01368         DCPS_IR_Subscription* s = currentSubscription->second.get();
01369         CORBA::ORB_var orb = this->info_->orb();
01370         CORBA::String_var callback = orb->object_to_string(s->reader());
01371 
01372         subscriptionSample.domain         = currentDomain->second->get_id();
01373         subscriptionSample.participant    = s->get_participant_id();
01374         subscriptionSample.topic          = s->get_topic_id();
01375         subscriptionSample.id             = s->get_id();
01376         subscriptionSample.callback       = callback.in();
01377         subscriptionSample.datareader_qos = *s->get_datareader_qos();
01378         subscriptionSample.subscriber_qos = *s->get_subscriber_qos();
01379         subscriptionSample.transport_info = s->get_transportLocatorSeq();
01380         subscriptionSample.filter_expression = s->get_filter_expression().c_str();
01381         subscriptionSample.expression_params = s->get_expr_params();
01382 
01383         peer->initializeSubscription(subscriptionSample);
01384       }
01385     }
01386   }
01387 }
01388 
01389 } // namespace Federator
01390 } // namespace OpenDDS
01391 
01392 OPENDDS_END_VERSIONED_NAMESPACE_DECL
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1