FederatorManagerImpl.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "DcpsInfo_pch.h"
00009 #include "FederatorManagerImpl.h"
00010 #include "DCPSInfo_i.h"
00011 #include "DefaultValues.h"
00012 #include "dds/DCPS/InfoRepoDiscovery/InfoRepoDiscovery.h"
00013 #include "dds/DCPS/SubscriberImpl.h"
00014 #include "dds/DCPS/Service_Participant.h"
00015 #include "dds/DCPS/Marked_Default_Qos.h"
00016 #include "dds/DCPS/RepoIdConverter.h"
00017 #include "dds/DCPS/transport/framework/TransportRegistry.h"
00018 #include "dds/DCPS/transport/framework/TransportExceptions.h"
00019 #include "dds/DCPS/transport/tcp/TcpInst.h"
00020 #include "dds/DCPS/transport/tcp/Tcp.h"
00021 #include "tao/ORB_Core.h"
00022 #include "ace/Log_Priority.h"
00023 #include "ace/Log_Msg.h"
00024 
00025 #include "FederatorTypeSupportC.h"
00026 #include "FederatorTypeSupportImpl.h"
00027 
00028 #include <sstream>
00029 
00030 #if !defined (__ACE_INLINE__)
00031 # include "FederatorManagerImpl.inl"
00032 #endif /* !__ACE_INLINE__ */
00033 
00034 namespace OpenDDS {
00035 namespace Federator {
00036 
00037 ManagerImpl::ManagerImpl(Config& config)
00038   : joining_(this->lock_),
00039     joiner_(NIL_REPOSITORY),
00040     joinRepo_(NIL_REPOSITORY),
00041     federated_(false),
00042     config_(config),
00043     info_(0),
00044     ownerListener_(*this),
00045     topicListener_(*this),
00046     participantListener_(*this),
00047     publicationListener_(*this),
00048     subscriptionListener_(*this),
00049     multicastEnabled_(false)
00050 {
00051   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00052     ACE_DEBUG((LM_DEBUG,
00053                ACE_TEXT("(%P|%t) Federator::ManagerImpl::ManagerImpl()\n")));
00054   }
00055 
00056   char* mdec = ACE_OS::getenv("MulticastDiscoveryEnabled");
00057 
00058   if (mdec != 0) {
00059     std::string mde(ACE_OS::getenv("MulticastDiscoveryEnabled"));
00060 
00061     if (mde != "0") {
00062       multicastEnabled_ = true;
00063     }
00064   }
00065 }
00066 
00067 ManagerImpl::~ManagerImpl()
00068 {
00069   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00070     ACE_DEBUG((LM_DEBUG,
00071                ACE_TEXT("(%P|%t) Federator::ManagerImpl::~ManagerImpl()\n")));
00072   }
00073 }
00074 
00075 void
00076 ManagerImpl::initialize()
00077 {
00078   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00079     ACE_DEBUG((LM_DEBUG,
00080                ACE_TEXT("(%P|%t) Federation::ManagerImpl::initialize()\n")));
00081   }
00082 
00083   // Let the listeners know which repository we are to filter samples at
00084   // the earliest opportunity.
00085   this->ownerListener_.federationId(this->id());
00086   this->topicListener_.federationId(this->id());
00087   this->participantListener_.federationId(this->id());
00088   this->publicationListener_.federationId(this->id());
00089   this->subscriptionListener_.federationId(this->id());
00090 
00091   // Add participant for Federation domain
00092   DDS::DomainParticipantFactory_var dpf = TheParticipantFactory;
00093   this->federationParticipant_
00094   = dpf->create_participant(
00095       this->config_.federationDomain(),
00096       PARTICIPANT_QOS_DEFAULT,
00097       DDS::DomainParticipantListener::_nil(),
00098       OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00099 
00100   if (CORBA::is_nil(this->federationParticipant_.in())) {
00101     ACE_ERROR((LM_ERROR,
00102                ACE_TEXT("(%P|%t) ERROR: create_participant failed for ")
00103                ACE_TEXT("repository %d in federation domain %d.\n"),
00104                this->id().id(),
00105                this->config_.federationDomain()));
00106     throw Incomplete();
00107   }
00108   //
00109   // Add type support for update topics
00110   //
00111 
00112   OwnerUpdateTypeSupportImpl* ownerUpdate = new OwnerUpdateTypeSupportImpl();
00113 
00114   if (DDS::RETCODE_OK != ownerUpdate->register_type(
00115         this->federationParticipant_.in(),
00116         OWNERUPDATETYPENAME)) {
00117     ACE_ERROR((LM_ERROR,
00118                ACE_TEXT("(%P|%t) ERROR: Unable to install ")
00119                ACE_TEXT("OwnerUpdate type support for repository %d.\n"),
00120                this->id().id()));
00121     throw Incomplete();
00122   }
00123 
00124   ParticipantUpdateTypeSupportImpl* participantUpdate = new ParticipantUpdateTypeSupportImpl();
00125 
00126   if (DDS::RETCODE_OK != participantUpdate->register_type(
00127         this->federationParticipant_.in(),
00128         PARTICIPANTUPDATETYPENAME)) {
00129     ACE_ERROR((LM_ERROR,
00130                ACE_TEXT("(%P|%t) ERROR: Unable to install ")
00131                ACE_TEXT("ParticipantUpdate type support for repository %d.\n"),
00132                this->id().id()));
00133     throw Incomplete();
00134   }
00135 
00136   TopicUpdateTypeSupportImpl* topicUpdate = new TopicUpdateTypeSupportImpl();
00137 
00138   if (DDS::RETCODE_OK != topicUpdate->register_type(
00139         this->federationParticipant_.in(),
00140         TOPICUPDATETYPENAME)) {
00141     ACE_ERROR((LM_ERROR,
00142                ACE_TEXT("(%P|%t) ERROR: Unable to install ")
00143                ACE_TEXT("TopicUpdate type support for repository %d.\n"),
00144                this->id().id()));
00145     throw Incomplete();
00146   }
00147 
00148   PublicationUpdateTypeSupportImpl* publicationUpdate = new PublicationUpdateTypeSupportImpl();
00149 
00150   if (DDS::RETCODE_OK != publicationUpdate->register_type(
00151         this->federationParticipant_.in(),
00152         PUBLICATIONUPDATETYPENAME)) {
00153     ACE_ERROR((LM_ERROR,
00154                ACE_TEXT("(%P|%t) ERROR: Unable to install ")
00155                ACE_TEXT("PublicationUpdate type support for repository %d.\n"),
00156                this->id().id()));
00157     throw Incomplete();
00158   }
00159 
00160   SubscriptionUpdateTypeSupportImpl* subscriptionUpdate = new SubscriptionUpdateTypeSupportImpl();
00161 
00162   if (DDS::RETCODE_OK != subscriptionUpdate->register_type(
00163         this->federationParticipant_.in(),
00164         SUBSCRIPTIONUPDATETYPENAME)) {
00165     ACE_ERROR((LM_ERROR,
00166                ACE_TEXT("(%P|%t) ERROR: Unable to install ")
00167                ACE_TEXT("SubscriptionUpdate type support for repository %d.\n"),
00168                this->id().id()));
00169     throw Incomplete();
00170   }
00171 
00172   //
00173   // Create a transport config for use with federation entities.
00174   //
00175   std::string config_name =
00176     OpenDDS::DCPS::TransportRegistry::DEFAULT_INST_PREFIX
00177     + std::string("FederationBITTransportConfig");
00178   OpenDDS::DCPS::TransportConfig_rch config =
00179     OpenDDS::DCPS::TransportRegistry::instance()->create_config(config_name);
00180 
00181   std::string inst_name = OpenDDS::DCPS::TransportRegistry::DEFAULT_INST_PREFIX
00182     + std::string("FederationBITTCPTransportInst");
00183   OpenDDS::DCPS::TransportInst_rch inst =
00184     OpenDDS::DCPS::TransportRegistry::instance()->create_inst(inst_name,
00185                                                               "tcp");
00186   config->instances_.push_back(inst);
00187 
00188   //
00189   // Create the subscriber for the update topics.
00190   //
00191 
00192   DDS::Subscriber_var subscriber
00193   = this->federationParticipant_->create_subscriber(
00194       SUBSCRIBER_QOS_DEFAULT,
00195       DDS::SubscriberListener::_nil(),
00196       OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00197 
00198   if (CORBA::is_nil(subscriber.in())) {
00199     ACE_ERROR((LM_ERROR,
00200                ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00201                ACE_TEXT("failed to create subscriber for repository %d\n"),
00202                this->id().id()));
00203     throw Incomplete();
00204 
00205   } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00206     ACE_DEBUG((LM_DEBUG,
00207                ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
00208                ACE_TEXT("created federation subscriber for repository %d\n"),
00209                this->id().id()));
00210 
00211   }
00212 
00213   // Attach the transport to it.
00214 
00215   try {
00216     OpenDDS::DCPS::TransportRegistry::instance()->bind_config(config,
00217                                                               subscriber.in());
00218   } catch (const OpenDDS::DCPS::Transport::Exception&) {
00219     ACE_ERROR((LM_ERROR,
00220                ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00221                ACE_TEXT("failed to bind transport config to federation subscriber.\n")));
00222     throw Incomplete();
00223   }
00224 
00225   //
00226   // Create the publisher for the update topics.
00227   //
00228 
00229   DDS::Publisher_var publisher
00230   = this->federationParticipant_->create_publisher(
00231       PUBLISHER_QOS_DEFAULT,
00232       DDS::PublisherListener::_nil(),
00233       OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00234 
00235   if (CORBA::is_nil(publisher.in())) {
00236     ACE_ERROR((LM_ERROR,
00237                ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00238                ACE_TEXT("failed to create publisher for repository %d\n"),
00239                this->id().id()));
00240     throw Incomplete();
00241 
00242   } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00243     ACE_DEBUG((LM_DEBUG,
00244                ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
00245                ACE_TEXT("created federation publisher for repository %d\n"),
00246                this->id().id()));
00247 
00248   }
00249 
00250   // Attach the transport to it.
00251 
00252   try {
00253     OpenDDS::DCPS::TransportRegistry::instance()->bind_config(config,
00254                                                               publisher.in());
00255   } catch (const OpenDDS::DCPS::Transport::Exception&) {
00256     ACE_ERROR((LM_ERROR,
00257                ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00258                ACE_TEXT("failed to bind transport config to federation publisher.\n")));
00259     throw Incomplete();
00260   }
00261 
00262   //
00263   // Some useful items for adding the subscriptions.
00264   //
00265   DDS::Topic_var            topic;
00266   DDS::TopicDescription_var description;
00267   DDS::DataReader_var       dataReader;
00268   DDS::DataWriter_var       dataWriter;
00269 
00270   DDS::DataReaderQos readerQos;
00271   subscriber->get_default_datareader_qos(readerQos);
00272   readerQos.durability.kind                          = DDS::TRANSIENT_LOCAL_DURABILITY_QOS;
00273   readerQos.history.kind                             = DDS::KEEP_LAST_HISTORY_QOS;
00274   readerQos.history.depth                            = 50;
00275   readerQos.reliability.kind                         = DDS::RELIABLE_RELIABILITY_QOS;
00276   readerQos.reliability.max_blocking_time.sec        = 0;
00277   readerQos.reliability.max_blocking_time.nanosec    = 0;
00278 
00279   DDS::DataWriterQos writerQos;
00280   publisher->get_default_datawriter_qos(writerQos);
00281   writerQos.durability.kind                          = DDS::TRANSIENT_LOCAL_DURABILITY_QOS;
00282   writerQos.history.kind                             = DDS::KEEP_LAST_HISTORY_QOS;
00283   writerQos.history.depth                            = 50;
00284   writerQos.reliability.kind                         = DDS::RELIABLE_RELIABILITY_QOS;
00285   writerQos.reliability.max_blocking_time.sec        = 0;
00286   writerQos.reliability.max_blocking_time.nanosec    = 0;
00287 
00288   //
00289   // Add update subscriptions
00290   //
00291   // NOTE: Its ok to lose the references to the objects here since they
00292   //       are not needed after this point.  The only thing we will do
00293   //       with them is to destroy them, and that will be done via a
00294   //       cascade delete from the participant.  The listeners will
00295   //       survive and can be used within other participants as well,
00296   //       since the only state they retain is the manager, which is the
00297   //       same for all.
00298   //
00299 
00300   topic = this->federationParticipant_->create_topic(
00301             OWNERUPDATETOPICNAME,
00302             OWNERUPDATETYPENAME,
00303             TOPIC_QOS_DEFAULT,
00304             DDS::TopicListener::_nil(),
00305             OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00306 
00307   dataWriter = publisher->create_datawriter(
00308                  topic.in(),
00309                  writerQos,
00310                  DDS::DataWriterListener::_nil(),
00311                  OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00312 
00313   if (CORBA::is_nil(dataWriter.in())) {
00314     ACE_ERROR((LM_ERROR,
00315                ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00316                ACE_TEXT("failed to create OwnerUpdate writer for repository %d\n"),
00317                this->id().id()));
00318     throw Incomplete();
00319   }
00320 
00321   this->ownerWriter_ = OwnerUpdateDataWriter::_narrow(dataWriter.in());
00322 
00323   if (::CORBA::is_nil(this->ownerWriter_.in())) {
00324     ACE_ERROR((LM_ERROR,
00325                ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00326                ACE_TEXT("failed to extract typed OwnerUpdate writer.\n")));
00327     throw Incomplete();
00328 
00329   } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00330     OpenDDS::DCPS::DataWriterImpl* servant
00331     = dynamic_cast<OpenDDS::DCPS::DataWriterImpl*>(dataWriter.in());
00332 
00333     if (0 == servant) {
00334       ACE_DEBUG((LM_WARNING,
00335                  ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ")
00336                  ACE_TEXT("unable to extract typed OwnerUpdate writer.\n")));
00337 
00338     } else {
00339       OpenDDS::DCPS::RepoIdConverter converter(servant->get_publication_id());
00340       ACE_DEBUG((LM_DEBUG,
00341                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
00342                  ACE_TEXT("created federation OwnerUpdate writer %C for repository %d\n"),
00343                  std::string(converter).c_str(),
00344                  this->id().id()));
00345     }
00346   }
00347 
00348   description = this->federationParticipant_->lookup_topicdescription(OWNERUPDATETOPICNAME);
00349   dataReader  = subscriber->create_datareader(
00350                   description.in(),
00351                   readerQos,
00352                   &this->ownerListener_,
00353                   OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00354 
00355   if (CORBA::is_nil(dataReader.in())) {
00356     ACE_ERROR((LM_ERROR,
00357                ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00358                ACE_TEXT("failed to create OwnerUpdate reader for repository %d\n"),
00359                this->id().id()));
00360     throw Incomplete();
00361 
00362   } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00363     OpenDDS::DCPS::DataReaderImpl* servant
00364     = dynamic_cast<OpenDDS::DCPS::DataReaderImpl*>(dataReader.in());
00365 
00366     if (0 == servant) {
00367       ACE_DEBUG((LM_WARNING,
00368                  ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ")
00369                  ACE_TEXT("unable to extract typed OwnerUpdate reader.\n")));
00370 
00371     } else {
00372       OpenDDS::DCPS::RepoIdConverter converter(servant->get_subscription_id());
00373       ACE_DEBUG((LM_DEBUG,
00374                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
00375                  ACE_TEXT("created federation OwnerUpdate reader %C for repository %d\n"),
00376                  std::string(converter).c_str(),
00377                  this->id().id()));
00378     }
00379   }
00380 
00381   topic = this->federationParticipant_->create_topic(
00382             TOPICUPDATETOPICNAME,
00383             TOPICUPDATETYPENAME,
00384             TOPIC_QOS_DEFAULT,
00385             DDS::TopicListener::_nil(),
00386             OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00387   dataWriter = publisher->create_datawriter(
00388                  topic.in(),
00389                  writerQos,
00390                  DDS::DataWriterListener::_nil(),
00391                  OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00392 
00393   if (CORBA::is_nil(dataWriter.in())) {
00394     ACE_ERROR((LM_ERROR,
00395                ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00396                ACE_TEXT("failed to create TopicUpdate writer for repository %d\n"),
00397                this->id().id()));
00398     throw Incomplete();
00399   }
00400 
00401   this->topicWriter_
00402   = TopicUpdateDataWriter::_narrow(dataWriter.in());
00403 
00404   if (::CORBA::is_nil(this->topicWriter_.in())) {
00405     ACE_ERROR((LM_ERROR,
00406                ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00407                ACE_TEXT("failed to extract typed TopicUpdate writer.\n")));
00408     throw Incomplete();
00409 
00410   } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00411     OpenDDS::DCPS::DataWriterImpl* servant
00412     = dynamic_cast<OpenDDS::DCPS::DataWriterImpl*>(dataWriter.in());
00413 
00414     if (0 == servant) {
00415       ACE_DEBUG((LM_WARNING,
00416                  ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ")
00417                  ACE_TEXT("unable to extract typed TopicUpdate writer.\n")));
00418 
00419     } else {
00420       OpenDDS::DCPS::RepoIdConverter converter(servant->get_publication_id());
00421       ACE_DEBUG((LM_DEBUG,
00422                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
00423                  ACE_TEXT("created federation TopicUpdate writer %C for repository %d\n"),
00424                  std::string(converter).c_str(),
00425                  this->id().id()));
00426     }
00427   }
00428 
00429   description = this->federationParticipant_->lookup_topicdescription(TOPICUPDATETOPICNAME);
00430   dataReader  = subscriber->create_datareader(
00431                   description.in(),
00432                   readerQos,
00433                   &this->topicListener_,
00434                   OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00435 
00436   if (CORBA::is_nil(dataReader.in())) {
00437     ACE_ERROR((LM_ERROR,
00438                ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00439                ACE_TEXT("failed to create TopicUpdate reader for repository %d\n"),
00440                this->id().id()));
00441     throw Incomplete();
00442 
00443   } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00444     OpenDDS::DCPS::DataReaderImpl* servant
00445     = dynamic_cast<OpenDDS::DCPS::DataReaderImpl*>(dataReader.in());
00446 
00447     if (0 == servant) {
00448       ACE_DEBUG((LM_WARNING,
00449                  ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ")
00450                  ACE_TEXT("unable to extract typed TopicUpdate reader.\n")));
00451 
00452     } else {
00453       OpenDDS::DCPS::RepoIdConverter converter(servant->get_subscription_id());
00454       ACE_DEBUG((LM_DEBUG,
00455                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
00456                  ACE_TEXT("created federation TopicUpdate reader %C for repository %d\n"),
00457                  std::string(converter).c_str(),
00458                  this->id().id()));
00459     }
00460   }
00461 
00462   topic = this->federationParticipant_->create_topic(
00463             PARTICIPANTUPDATETOPICNAME,
00464             PARTICIPANTUPDATETYPENAME,
00465             TOPIC_QOS_DEFAULT,
00466             DDS::TopicListener::_nil(),
00467             OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00468   dataWriter = publisher->create_datawriter(
00469                  topic.in(),
00470                  writerQos,
00471                  DDS::DataWriterListener::_nil(),
00472                  OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00473 
00474   if (CORBA::is_nil(dataWriter.in())) {
00475     ACE_ERROR((LM_ERROR,
00476                ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00477                ACE_TEXT("failed to create ParticipantUpdate writer for repository %d\n"),
00478                this->id().id()));
00479     throw Incomplete();
00480   }
00481 
00482   this->participantWriter_
00483   = ParticipantUpdateDataWriter::_narrow(dataWriter.in());
00484 
00485   if (::CORBA::is_nil(this->participantWriter_.in())) {
00486     ACE_ERROR((LM_ERROR,
00487                ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00488                ACE_TEXT("failed to extract typed ParticipantUpdate writer.\n")));
00489     throw Incomplete();
00490 
00491   } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00492     OpenDDS::DCPS::DataWriterImpl* servant
00493     = dynamic_cast<OpenDDS::DCPS::DataWriterImpl*>(dataWriter.in());
00494 
00495     if (0 == servant) {
00496       ACE_DEBUG((LM_WARNING,
00497                  ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ")
00498                  ACE_TEXT("unable to extract typed ParticipantUpdate writer.\n")));
00499 
00500     } else {
00501       OpenDDS::DCPS::RepoIdConverter converter(servant->get_publication_id());
00502       ACE_DEBUG((LM_DEBUG,
00503                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
00504                  ACE_TEXT("created federation ParticipantUpdate writer %C for repository %d\n"),
00505                  std::string(converter).c_str(),
00506                  this->id().id()));
00507     }
00508   }
00509 
00510   description = this->federationParticipant_->lookup_topicdescription(PARTICIPANTUPDATETOPICNAME);
00511   dataReader  = subscriber->create_datareader(
00512                   description.in(),
00513                   readerQos,
00514                   &this->participantListener_,
00515                   OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00516 
00517   if (CORBA::is_nil(dataReader.in())) {
00518     ACE_ERROR((LM_ERROR,
00519                ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00520                ACE_TEXT("failed to create ParticipantUpdate reader for repository %d\n"),
00521                this->id().id()));
00522     throw Incomplete();
00523 
00524   } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00525     OpenDDS::DCPS::DataReaderImpl* servant
00526     = dynamic_cast<OpenDDS::DCPS::DataReaderImpl*>(dataReader.in());
00527 
00528     if (0 == servant) {
00529       ACE_DEBUG((LM_WARNING,
00530                  ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ")
00531                  ACE_TEXT("unable to extract typed ParticipantUpdate reader.\n")));
00532 
00533     } else {
00534       OpenDDS::DCPS::RepoIdConverter converter(servant->get_subscription_id());
00535       ACE_DEBUG((LM_DEBUG,
00536                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
00537                  ACE_TEXT("created federation ParticipantUpdate reader %C for repository %d\n"),
00538                  std::string(converter).c_str(),
00539                  this->id().id()));
00540     }
00541   }
00542 
00543   topic = this->federationParticipant_->create_topic(
00544             PUBLICATIONUPDATETOPICNAME,
00545             PUBLICATIONUPDATETYPENAME,
00546             TOPIC_QOS_DEFAULT,
00547             DDS::TopicListener::_nil(),
00548             OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00549   dataWriter = publisher->create_datawriter(
00550                  topic.in(),
00551                  writerQos,
00552                  DDS::DataWriterListener::_nil(),
00553                  OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00554 
00555   if (CORBA::is_nil(dataWriter.in())) {
00556     ACE_ERROR((LM_ERROR,
00557                ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00558                ACE_TEXT("failed to create PublicationUpdate writer for repository %d\n"),
00559                this->id().id()));
00560     throw Incomplete();
00561   }
00562 
00563   this->publicationWriter_
00564   = PublicationUpdateDataWriter::_narrow(dataWriter.in());
00565 
00566   if (::CORBA::is_nil(this->publicationWriter_.in())) {
00567     ACE_ERROR((LM_ERROR,
00568                ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00569                ACE_TEXT("failed to extract typed PublicationUpdate writer.\n")));
00570     throw Incomplete();
00571 
00572   } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00573     OpenDDS::DCPS::DataWriterImpl* servant
00574     = dynamic_cast<OpenDDS::DCPS::DataWriterImpl*>(dataWriter.in());
00575 
00576     if (0 == servant) {
00577       ACE_DEBUG((LM_WARNING,
00578                  ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ")
00579                  ACE_TEXT("unable to extract typed PublicationUpdate writer.\n")));
00580 
00581     } else {
00582       OpenDDS::DCPS::RepoIdConverter converter(servant->get_publication_id());
00583       ACE_DEBUG((LM_DEBUG,
00584                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
00585                  ACE_TEXT("created federation PublicationUpdate writer %C for repository %d\n"),
00586                  std::string(converter).c_str(),
00587                  this->id().id()));
00588     }
00589   }
00590 
00591   description = this->federationParticipant_->lookup_topicdescription(PUBLICATIONUPDATETOPICNAME);
00592   dataReader  = subscriber->create_datareader(
00593                   description.in(),
00594                   readerQos,
00595                   &this->publicationListener_,
00596                   OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00597 
00598   if (CORBA::is_nil(dataReader.in())) {
00599     ACE_ERROR((LM_ERROR,
00600                ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00601                ACE_TEXT("failed to create PublicationUpdate reader for repository %d\n"),
00602                this->id().id()));
00603     throw Incomplete();
00604 
00605   } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00606     OpenDDS::DCPS::DataReaderImpl* servant
00607     = dynamic_cast<OpenDDS::DCPS::DataReaderImpl*>(dataReader.in());
00608 
00609     if (0 == servant) {
00610       ACE_DEBUG((LM_WARNING,
00611                  ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ")
00612                  ACE_TEXT("unable to extract typed PublicationUpdate reader.\n")));
00613 
00614     } else {
00615       OpenDDS::DCPS::RepoIdConverter converter(servant->get_subscription_id());
00616       ACE_DEBUG((LM_DEBUG,
00617                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
00618                  ACE_TEXT("created federation PublicationUpdate reader %C for repository %d\n"),
00619                  std::string(converter).c_str(),
00620                  this->id().id()));
00621     }
00622   }
00623 
00624   topic = this->federationParticipant_->create_topic(
00625             SUBSCRIPTIONUPDATETOPICNAME,
00626             SUBSCRIPTIONUPDATETYPENAME,
00627             TOPIC_QOS_DEFAULT,
00628             DDS::TopicListener::_nil(),
00629             OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00630   dataWriter = publisher->create_datawriter(
00631                  topic.in(),
00632                  writerQos,
00633                  DDS::DataWriterListener::_nil(),
00634                  OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00635 
00636   if (CORBA::is_nil(dataWriter.in())) {
00637     ACE_ERROR((LM_ERROR,
00638                ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00639                ACE_TEXT("failed to create SubscriptionUpdate writer for repository %d\n"),
00640                this->id().id()));
00641     throw Incomplete();
00642   }
00643 
00644   this->subscriptionWriter_
00645   = SubscriptionUpdateDataWriter::_narrow(dataWriter.in());
00646 
00647   if (::CORBA::is_nil(this->subscriptionWriter_.in())) {
00648     ACE_ERROR((LM_ERROR,
00649                ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00650                ACE_TEXT("failed to extract typed SubscriptionUpdate writer.\n")));
00651     throw Incomplete();
00652 
00653   } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00654     OpenDDS::DCPS::DataWriterImpl* servant
00655     = dynamic_cast<OpenDDS::DCPS::DataWriterImpl*>(dataWriter.in());
00656 
00657     if (0 == servant) {
00658       ACE_DEBUG((LM_WARNING,
00659                  ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ")
00660                  ACE_TEXT("unable to extract typed SubscriptionUpdate writer.\n")));
00661 
00662     } else {
00663       OpenDDS::DCPS::RepoIdConverter converter(servant->get_publication_id());
00664       ACE_DEBUG((LM_DEBUG,
00665                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
00666                  ACE_TEXT("created federation SubscriptionUpdate writer %C for repository %d\n"),
00667                  std::string(converter).c_str(),
00668                  this->id().id()));
00669     }
00670   }
00671 
00672   description = this->federationParticipant_->lookup_topicdescription(SUBSCRIPTIONUPDATETOPICNAME);
00673   dataReader  = subscriber->create_datareader(
00674                   description.in(),
00675                   readerQos,
00676                   &this->subscriptionListener_,
00677                   OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00678 
00679   if (CORBA::is_nil(dataReader.in())) {
00680     ACE_ERROR((LM_ERROR,
00681                ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00682                ACE_TEXT("failed to create SubscriptionUpdate reader for repository %d\n"),
00683                this->id().id()));
00684     throw Incomplete();
00685 
00686   } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00687     OpenDDS::DCPS::DataReaderImpl* servant
00688     = dynamic_cast<OpenDDS::DCPS::DataReaderImpl*>(dataReader.in());
00689 
00690     if (0 == servant) {
00691       ACE_DEBUG((LM_WARNING,
00692                  ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ")
00693                  ACE_TEXT("unable to extract typed SubscriptionUpdate reader.\n")));
00694 
00695     } else {
00696       OpenDDS::DCPS::RepoIdConverter converter(servant->get_subscription_id());
00697       ACE_DEBUG((LM_DEBUG,
00698                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
00699                  ACE_TEXT("created federation SubscriptionUpdate reader %C for repository %d\n"),
00700                  std::string(converter).c_str(),
00701                  this->id().id()));
00702     }
00703   }
00704 
00705   // JSP
00706 #if defined (ACE_HAS_IP_MULTICAST)
00707 
00708   if (this->multicastEnabled_) {
00709     //
00710     // Install ior multicast handler.
00711     //
00712     // Get reactor instance from TAO.
00713     ACE_Reactor *reactor = this->orb_->orb_core()->reactor();
00714 
00715     // See if the -ORBMulticastDiscoveryEndpoint option was specified.
00716     ACE_CString mde(this->orb_->orb_core()->orb_params()->mcast_discovery_endpoint());
00717 
00718     // First, see if the user has given us a multicast port number
00719     // on the command-line;
00720     u_short port = 0;
00721 
00722     // Check environment var. for multicast port.
00723     const char *port_number = ACE_OS::getenv("OpenDDSFederationPort");
00724 
00725     if (port_number != 0) {
00726       port = static_cast<u_short>(ACE_OS::atoi(port_number));
00727     }
00728 
00729     // Port wasn't specified on the command-line -
00730     // use the default.
00731     if (port == 0)
00732       port = OpenDDS::Federator::Defaults::DiscoveryRequestPort;
00733 
00734     // Initialize the handler
00735     if (mde.length() != 0) {
00736       if (this->multicastResponder_.init(
00737             this->orb_.in(),
00738             mde.c_str()) == -1) {
00739         ACE_ERROR((LM_ERROR,
00740                    ACE_TEXT("(%P|%t) ERROR: Unable to initialize ")
00741                    ACE_TEXT("the multicast responder for repository %d.\n"),
00742                    this->id().id()));
00743         throw Incomplete();
00744       }
00745 
00746     } else {
00747       if (this->multicastResponder_.init(
00748             this->orb_.in(),
00749             port,
00750 #if defined (ACE_HAS_IPV6)
00751             ACE_DEFAULT_MULTICASTV6_ADDR
00752 #else
00753             ACE_DEFAULT_MULTICAST_ADDR
00754 #endif /* ACE_HAS_IPV6 */
00755           )) {
00756         ACE_ERROR((LM_ERROR,
00757                    ACE_TEXT("(%P|%t) ERROR: Unable to initialize ")
00758                    ACE_TEXT("the multicast responder for repository %d.\n"),
00759                    this->id().id()));
00760         throw Incomplete();
00761       }
00762     }
00763 
00764     // Register event handler for the ior multicast.
00765     if (reactor->register_handler(&this->multicastResponder_,
00766                                   ACE_Event_Handler::READ_MASK) == -1) {
00767       ACE_ERROR((LM_ERROR,
00768                  ACE_TEXT("(%P|%t) ERROR: Unable to register event handler ")
00769                  ACE_TEXT("for repository %d.\n"),
00770                  this->id().id()));
00771       throw Incomplete();
00772     }
00773 
00774     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00775       ACE_DEBUG((LM_DEBUG,
00776                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
00777                  ACE_TEXT("multicast server setup is complete.\n")));
00778     }
00779   }
00780 
00781 #else
00782   ACE_UNUSED_ARG(this->multicastEnabled_);
00783 #endif /* ACE_HAS_IP_MULTICAST */
00784 }
00785 
00786 void
00787 ManagerImpl::finalize()
00788 {
00789   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00790     ACE_DEBUG((LM_DEBUG,
00791                ACE_TEXT("(%P|%t) Federator::ManagerImpl::finalize()\n")));
00792   }
00793 
00794   ownerListener_.stop();
00795   topicListener_.stop();
00796   participantListener_.stop();
00797   publicationListener_.stop();
00798   subscriptionListener_.stop();
00799   ownerListener_.join();
00800   topicListener_.join();
00801   participantListener_.join();
00802   publicationListener_.join();
00803   subscriptionListener_.join();
00804 
00805   if (this->federated_) {
00806     try {
00807       IdToManagerMap::iterator where = this->peers_.find(this->joinRepo_);
00808 
00809       if (where == this->peers_.end()) {
00810         ACE_DEBUG((LM_DEBUG,
00811                    ACE_TEXT("(%P|%t) Federator::Manager::finalize: ")
00812                    ACE_TEXT("repository %d - all attachment to federation left.\n"),
00813                    this->id().id()));
00814 
00815       } else {
00816         if (CORBA::is_nil(where->second.in())) {
00817           ACE_ERROR((LM_ERROR,
00818                      ACE_TEXT("(%P|%t) ERROR: Federator::Manager::finalize: ")
00819                      ACE_TEXT("repository %d not currently attached to a federation.\n"),
00820                      this->id().id()));
00821 
00822         } else {
00823           where->second->leave_federation(this->id().id());
00824           this->federated_ = false;
00825         }
00826       }
00827 
00828     } catch (const CORBA::Exception& ex) {
00829       ex._tao_print_exception(
00830         ACE_TEXT("ERROR: Federator::ManagerImpl::finalize() - ")
00831         ACE_TEXT("unable to leave remote federation "));
00832       throw Incomplete();
00833     }
00834   }
00835 
00836   if (!CORBA::is_nil(this->orb_.in()) && (0 != this->orb_->orb_core())) {
00837     this->orb_->orb_core()->reactor()->remove_handler(
00838       &this->multicastResponder_,
00839       ACE_Event_Handler::READ_MASK | ACE_Event_Handler::DONT_CALL);
00840   }
00841 
00842   // Remove our local participant and contained entities.
00843   if (0 == CORBA::is_nil(this->federationParticipant_.in())) {
00844     DDS::DomainParticipantFactory_var dpf = TheParticipantFactory;
00845     if (DDS::RETCODE_PRECONDITION_NOT_MET
00846         == this->federationParticipant_->delete_contained_entities()) {
00847       ACE_ERROR((LM_ERROR,
00848                  ACE_TEXT("(%P|%t) ERROR: Federator::Manager ")
00849                  ACE_TEXT("unable to release resources for repository %d.\n"),
00850                  this->id().id()));
00851 
00852     } else if (DDS::RETCODE_PRECONDITION_NOT_MET
00853                == dpf->delete_participant(this->federationParticipant_.in())) {
00854       ACE_ERROR((LM_ERROR,
00855                  ACE_TEXT("(%P|%t) ERROR: Federator::Manager ")
00856                  ACE_TEXT("unable to release the participant for repository %d.\n"),
00857                  this->id().id()));
00858     }
00859   }
00860 }
00861 
00862 // IDL methods.
00863 
00864 RepoKey
00865 ManagerImpl::federation_id()
00866 {
00867   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00868     ACE_DEBUG((LM_DEBUG,
00869                ACE_TEXT("(%P|%t) ManagerImpl::federation_id()\n")));
00870   }
00871 
00872   return this->id().id();
00873 }
00874 
00875 OpenDDS::DCPS::DCPSInfo_ptr
00876 ManagerImpl::repository()
00877 {
00878   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00879     ACE_DEBUG((LM_DEBUG,
00880                ACE_TEXT("(%P|%t) ManagerImpl::repository()\n")));
00881   }
00882 
00883   OpenDDS::DCPS::Discovery_rch disco
00884   = TheServiceParticipant->get_discovery(
00885       this->config_.federationDomain());
00886   OpenDDS::DCPS::DCPSInfo_var repo;
00887   if (!disco.is_nil()) {
00888     OpenDDS::DCPS::InfoRepoDiscovery_rch irDisco =
00889       DCPS::static_rchandle_cast<DCPS::InfoRepoDiscovery>(disco);
00890     repo = irDisco->get_dcps_info();
00891   }
00892 
00893   if (CORBA::is_nil(repo.in())) {
00894     return OpenDDS::DCPS::DCPSInfo::_duplicate(this->localRepo_.in());
00895 
00896   } else {
00897     return OpenDDS::DCPS::DCPSInfo::_duplicate(repo.in());
00898   }
00899 }
00900 
00901 CORBA::Boolean
00902 ManagerImpl::discover_federation(const char * ior)
00903 {
00904   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00905     ACE_DEBUG((LM_DEBUG,
00906                ACE_TEXT("(%P|%t) ManagerImpl::discover_federation( %C)\n"),
00907                ior));
00908   }
00909 
00910   ///@TODO: Implement this.
00911   return false;
00912 }
00913 
00914 Manager_ptr
00915 ManagerImpl::join_federation(
00916   Manager_ptr peer,
00917   FederationDomain federation
00918 )
00919 {
00920   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00921     ACE_DEBUG((LM_DEBUG,
00922                ACE_TEXT("(%P|%t) ManagerImpl::join_federation( peer, %d)\n"),
00923                federation));
00924   }
00925 
00926   RepoKey remote = NIL_REPOSITORY;
00927 
00928   try {
00929     // Obtain the remote repository federator Id value.
00930     remote = peer->federation_id();
00931 
00932     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00933       ACE_DEBUG((LM_DEBUG,
00934                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::join_federation() - ")
00935                  ACE_TEXT("repo id %d entered from repository with id %d.\n"),
00936                  this->id().id(),
00937                  remote));
00938     }
00939 
00940   } catch (const CORBA::Exception& ex) {
00941     ex._tao_print_exception(
00942       ACE_TEXT("ERROR: Federator::ManagerImpl::join_federation() - ")
00943       ACE_TEXT("unable to obtain remote federation Id value: "));
00944     throw Incomplete();
00945   }
00946 
00947   // If we are recursing, then we are done.
00948   if (this->joiner_ == remote) {
00949     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00950       ACE_DEBUG((LM_DEBUG,
00951                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::join_federation() - ")
00952                  ACE_TEXT("repo id %d leaving after reentry from repository with id %d.\n"),
00953                  this->id().id(),
00954                  remote));
00955     }
00956 
00957     return this->_this();
00958 
00959   } else {
00960     // Block while any different repository is joining.
00961     ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, guard, this->lock_, 0);
00962 
00963     while (this->joiner_ != NIL_REPOSITORY) {
00964       // This releases the lock while we block.
00965       this->joining_.wait();
00966 
00967       // We are now recursing - curses!
00968       if (this->joiner_ == remote) {
00969         return this->_this();
00970       }
00971     }
00972 
00973     // Note that we are joining the remote repository now.
00974     this->joiner_ = remote;
00975   }
00976 
00977   //
00978   // We only reach this point if:
00979   //   1) No other repository is processing past this point;
00980   //   2) We are not recursing.
00981   //
00982 
00983   // Check if we already have Federation repository.
00984   // Check if we are already federated.
00985   if (this->federated_ == false) {
00986     // Go ahead and add the joining repository as our Federation
00987     // repository.
00988     try {
00989       // Mark this repository as the point to which we are joined to
00990       // the federation.
00991       this->joinRepo_ = remote;
00992 
00993       // Obtain a reference to the remote repository.
00994       OpenDDS::DCPS::DCPSInfo_var remoteRepo = peer->repository();
00995 
00996       CORBA::ORB_var orb = remoteRepo->_get_orb();
00997       CORBA::String_var remoteRepoIor = orb->object_to_string(remoteRepo.in());
00998       if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00999         ACE_DEBUG((LM_DEBUG,
01000                    ACE_TEXT("(%P|%t) FederatorManagerImpl::join_federation() - ")
01001                    ACE_TEXT("id %d obtained reference to id %d:\n")
01002                    ACE_TEXT("\t%C\n"),
01003                    this->id().id(),
01004                    remote,
01005                    remoteRepoIor.in()));
01006       }
01007 
01008       // Add remote repository to Service_Participant in the Federation domain
01009       std::ostringstream oss;
01010       oss << remote;
01011       std::string key_string = oss.str();
01012       TheServiceParticipant->set_repo_ior(remoteRepoIor.in(), key_string);
01013       TheServiceParticipant->set_repo_domain(this->config_.federationDomain(), key_string);
01014 
01015     } catch (const CORBA::Exception& ex) {
01016       ex._tao_print_exception(
01017         "ERROR: Federator::ManagerImpl::join_federation() - Unable to join with remote: ");
01018       throw Incomplete();
01019     }
01020   }
01021 
01022   // Symmetrical joining behavior.
01023   try {
01024     Manager_var remoteManager
01025     = peer->join_federation(this->_this(), this->config_.federationDomain());
01026 
01027     if (this->joinRepo_ == remote) {
01028       this->peers_[ this->joinRepo_]
01029       = OpenDDS::Federator::Manager::_duplicate(remoteManager.in());
01030     }
01031 
01032     //
01033     // Push our initial state out to the joining repository *after* we call
01034     // him back to join.  This reduces the amount of duplicate data pushed
01035     // when a new (empty) repository is joining an existing federation.
01036     //
01037     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
01038       ACE_DEBUG((LM_DEBUG,
01039                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::join_federation() - ")
01040                  ACE_TEXT("repo id %d pushing state to repository with id %d.\n"),
01041                  this->id().id(),
01042                  remote));
01043     }
01044 
01045     this->pushState(peer);
01046 
01047   } catch (const CORBA::Exception& ex) {
01048     ex._tao_print_exception(
01049       "ERROR: Federator::ManagerImpl::join_federation() - unsuccessful call to remote->join: ");
01050     throw Incomplete();
01051   }
01052 
01053   if (CORBA::is_nil(this->participantWriter_.in())) {
01054     //
01055     // Establish our update publications and subscriptions *after* we
01056     // have exchanged internal state with the first joining repository.
01057     //
01058     this->initialize();
01059   }
01060 
01061   // Adjust our joining state and give others the opportunity to proceed.
01062   if (OpenDDS::DCPS::DCPS_debug_level > 4) {
01063     ACE_DEBUG((LM_DEBUG,
01064                ACE_TEXT("(%P|%t) Federator::ManagerImpl::join_federation() - ")
01065                ACE_TEXT("repo id %d joined to repository with id %d.\n"),
01066                this->id().id(),
01067                remote));
01068   }
01069 
01070   this->federated_ = true;
01071   this->joiner_    = NIL_REPOSITORY;
01072   this->joining_.signal();
01073   return this->_this();
01074 }
01075 
01076 void
01077 ManagerImpl::leave_federation(
01078   RepoKey id)
01079 {
01080   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
01081     ACE_DEBUG((LM_DEBUG,
01082                ACE_TEXT("(%P|%t) ManagerImpl::leave_federation( %d)\n"),
01083                this->id().id()));
01084   }
01085 
01086   // Remove the leaving repository from our outbound mappings.
01087   IdToManagerMap::iterator where = this->peers_.find(id);
01088 
01089   if (where != this->peers_.end()) {
01090     this->peers_.erase(where);
01091   }
01092 
01093   // Remove all the internal Entities owned by the leaving repository.
01094   if (false
01095       == this->info_->remove_by_owner(this->config_.federationDomain(), id)) {
01096     throw Incomplete();
01097   }
01098 
01099   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
01100     ACE_DEBUG((LM_DEBUG,
01101                ACE_TEXT("(%P|%t) ManagerImpl::leave_federation( %d) complete.\n"),
01102                this->id().id()));
01103   }
01104 }
01105 
01106 void
01107 ManagerImpl::leave_and_shutdown(
01108   void)
01109 {
01110   // Shutdown the process via the repository object.
01111   this->info_->shutdown();
01112 }
01113 
01114 void
01115 ManagerImpl::shutdown(
01116   void)
01117 {
01118   // Prevent the removal of this repository from the federation during
01119   // shutdown processing.
01120   this->federated_ = false;
01121 
01122   // Shutdown the process via the repository object.
01123   this->info_->shutdown();
01124 }
01125 
01126 void
01127 ManagerImpl::initializeOwner(
01128   const OpenDDS::Federator::OwnerUpdate & data)
01129 {
01130   this->processCreate(&data, 0);
01131 }
01132 
01133 void
01134 ManagerImpl::initializeTopic(
01135   const OpenDDS::Federator::TopicUpdate & data)
01136 {
01137   this->processCreate(&data, 0);
01138 }
01139 
01140 void
01141 ManagerImpl::initializeParticipant(
01142   const OpenDDS::Federator::ParticipantUpdate & data)
01143 {
01144   this->processCreate(&data, 0);
01145 }
01146 
01147 void
01148 ManagerImpl::initializePublication(
01149   const OpenDDS::Federator::PublicationUpdate & data)
01150 {
01151   this->processCreate(&data, 0);
01152 }
01153 
01154 void
01155 ManagerImpl::initializeSubscription(
01156   const OpenDDS::Federator::SubscriptionUpdate & data)
01157 {
01158   this->processCreate(&data, 0);
01159 }
01160 
01161 } // namespace Federator
01162 } // namespace OpenDDS

Generated on Fri Feb 12 20:05:23 2016 for OpenDDS by  doxygen 1.4.7