FederatorManagerImpl.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "DcpsInfo_pch.h"
00009 #include "tao/ORB_Core.h"
00010 #include "FederatorManagerImpl.h"
00011 #include "DCPSInfo_i.h"
00012 #include "DefaultValues.h"
00013 #include "dds/DCPS/InfoRepoDiscovery/InfoRepoDiscovery.h"
00014 #include "dds/DCPS/SubscriberImpl.h"
00015 #include "dds/DCPS/Service_Participant.h"
00016 #include "dds/DCPS/Marked_Default_Qos.h"
00017 #include "dds/DCPS/RepoIdConverter.h"
00018 #include "dds/DCPS/transport/framework/TransportRegistry.h"
00019 #include "dds/DCPS/transport/framework/TransportExceptions.h"
00020 #include "dds/DCPS/transport/tcp/TcpInst.h"
00021 #include "dds/DCPS/transport/tcp/Tcp.h"
00022 #include "ace/Log_Priority.h"
00023 #include "ace/Log_Msg.h"
00024 
00025 #include "FederatorTypeSupportC.h"
00026 #include "FederatorTypeSupportImpl.h"
00027 
00028 #include <sstream>
00029 
00030 #if !defined (__ACE_INLINE__)
00031 # include "FederatorManagerImpl.inl"
00032 #endif /* !__ACE_INLINE__ */
00033 
00034 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00035 
00036 namespace OpenDDS {
00037 namespace Federator {
00038 
00039 ManagerImpl::ManagerImpl(Config& config)
00040   : joining_(this->lock_),
00041     joiner_(NIL_REPOSITORY),
00042     joinRepo_(NIL_REPOSITORY),
00043     federated_(false),
00044     config_(config),
00045     info_(0),
00046     ownerListener_(*this),
00047     topicListener_(*this),
00048     participantListener_(*this),
00049     publicationListener_(*this),
00050     subscriptionListener_(*this),
00051     multicastEnabled_(false)
00052 {
00053   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00054     ACE_DEBUG((LM_DEBUG,
00055                ACE_TEXT("(%P|%t) Federator::ManagerImpl::ManagerImpl()\n")));
00056   }
00057 
00058   char* mdec = ACE_OS::getenv("MulticastDiscoveryEnabled");
00059 
00060   if (mdec != 0) {
00061     std::string mde(ACE_OS::getenv("MulticastDiscoveryEnabled"));
00062 
00063     if (mde != "0") {
00064       multicastEnabled_ = true;
00065     }
00066   }
00067 }
00068 
00069 ManagerImpl::~ManagerImpl()
00070 {
00071   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00072     ACE_DEBUG((LM_DEBUG,
00073                ACE_TEXT("(%P|%t) Federator::ManagerImpl::~ManagerImpl()\n")));
00074   }
00075 }
00076 
00077 void
00078 ManagerImpl::initialize()
00079 {
00080   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00081     ACE_DEBUG((LM_DEBUG,
00082                ACE_TEXT("(%P|%t) Federation::ManagerImpl::initialize()\n")));
00083   }
00084 
00085   // Let the listeners know which repository we are to filter samples at
00086   // the earliest opportunity.
00087   this->ownerListener_.federationId(this->id());
00088   this->topicListener_.federationId(this->id());
00089   this->participantListener_.federationId(this->id());
00090   this->publicationListener_.federationId(this->id());
00091   this->subscriptionListener_.federationId(this->id());
00092 
00093   // Add participant for Federation domain
00094   DDS::DomainParticipantFactory_var dpf = TheParticipantFactory;
00095   this->federationParticipant_
00096   = dpf->create_participant(
00097       this->config_.federationDomain(),
00098       PARTICIPANT_QOS_DEFAULT,
00099       DDS::DomainParticipantListener::_nil(),
00100       OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00101 
00102   if (CORBA::is_nil(this->federationParticipant_.in())) {
00103     ACE_ERROR((LM_ERROR,
00104                ACE_TEXT("(%P|%t) ERROR: create_participant failed for ")
00105                ACE_TEXT("repository %d in federation domain %d.\n"),
00106                this->id().id(),
00107                this->config_.federationDomain()));
00108     throw Incomplete();
00109   }
00110   //
00111   // Add type support for update topics
00112   //
00113 
00114   OwnerUpdateTypeSupportImpl::_var_type ownerUpdate = new OwnerUpdateTypeSupportImpl();
00115 
00116   if (DDS::RETCODE_OK != ownerUpdate->register_type(
00117         this->federationParticipant_.in(),
00118         OWNERUPDATETYPENAME)) {
00119     ACE_ERROR((LM_ERROR,
00120                ACE_TEXT("(%P|%t) ERROR: Unable to install ")
00121                ACE_TEXT("OwnerUpdate type support for repository %d.\n"),
00122                this->id().id()));
00123     throw Incomplete();
00124   }
00125 
00126   ParticipantUpdateTypeSupportImpl::_var_type participantUpdate = new ParticipantUpdateTypeSupportImpl();
00127 
00128   if (DDS::RETCODE_OK != participantUpdate->register_type(
00129         this->federationParticipant_.in(),
00130         PARTICIPANTUPDATETYPENAME)) {
00131     ACE_ERROR((LM_ERROR,
00132                ACE_TEXT("(%P|%t) ERROR: Unable to install ")
00133                ACE_TEXT("ParticipantUpdate type support for repository %d.\n"),
00134                this->id().id()));
00135     throw Incomplete();
00136   }
00137 
00138   TopicUpdateTypeSupportImpl::_var_type topicUpdate = new TopicUpdateTypeSupportImpl();
00139 
00140   if (DDS::RETCODE_OK != topicUpdate->register_type(
00141         this->federationParticipant_.in(),
00142         TOPICUPDATETYPENAME)) {
00143     ACE_ERROR((LM_ERROR,
00144                ACE_TEXT("(%P|%t) ERROR: Unable to install ")
00145                ACE_TEXT("TopicUpdate type support for repository %d.\n"),
00146                this->id().id()));
00147     throw Incomplete();
00148   }
00149 
00150   PublicationUpdateTypeSupportImpl::_var_type publicationUpdate = new PublicationUpdateTypeSupportImpl();
00151 
00152   if (DDS::RETCODE_OK != publicationUpdate->register_type(
00153         this->federationParticipant_.in(),
00154         PUBLICATIONUPDATETYPENAME)) {
00155     ACE_ERROR((LM_ERROR,
00156                ACE_TEXT("(%P|%t) ERROR: Unable to install ")
00157                ACE_TEXT("PublicationUpdate type support for repository %d.\n"),
00158                this->id().id()));
00159     throw Incomplete();
00160   }
00161 
00162   SubscriptionUpdateTypeSupportImpl::_var_type subscriptionUpdate = new SubscriptionUpdateTypeSupportImpl();
00163 
00164   if (DDS::RETCODE_OK != subscriptionUpdate->register_type(
00165         this->federationParticipant_.in(),
00166         SUBSCRIPTIONUPDATETYPENAME)) {
00167     ACE_ERROR((LM_ERROR,
00168                ACE_TEXT("(%P|%t) ERROR: Unable to install ")
00169                ACE_TEXT("SubscriptionUpdate type support for repository %d.\n"),
00170                this->id().id()));
00171     throw Incomplete();
00172   }
00173 
00174   //
00175   // Create a transport config for use with federation entities.
00176   //
00177   std::string config_name =
00178     OpenDDS::DCPS::TransportRegistry::DEFAULT_INST_PREFIX
00179     + std::string("FederationBITTransportConfig");
00180   OpenDDS::DCPS::TransportConfig_rch config =
00181     OpenDDS::DCPS::TransportRegistry::instance()->create_config(config_name);
00182 
00183   std::string inst_name = OpenDDS::DCPS::TransportRegistry::DEFAULT_INST_PREFIX
00184     + std::string("FederationBITTCPTransportInst");
00185   OpenDDS::DCPS::TransportInst_rch inst =
00186     OpenDDS::DCPS::TransportRegistry::instance()->create_inst(inst_name,
00187                                                               "tcp");
00188   config->instances_.push_back(inst);
00189 
00190   //
00191   // Create the subscriber for the update topics.
00192   //
00193 
00194   DDS::Subscriber_var subscriber
00195   = this->federationParticipant_->create_subscriber(
00196       SUBSCRIBER_QOS_DEFAULT,
00197       DDS::SubscriberListener::_nil(),
00198       OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00199 
00200   if (CORBA::is_nil(subscriber.in())) {
00201     ACE_ERROR((LM_ERROR,
00202                ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00203                ACE_TEXT("failed to create subscriber for repository %d\n"),
00204                this->id().id()));
00205     throw Incomplete();
00206 
00207   } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00208     ACE_DEBUG((LM_DEBUG,
00209                ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
00210                ACE_TEXT("created federation subscriber for repository %d\n"),
00211                this->id().id()));
00212 
00213   }
00214 
00215   // Attach the transport to it.
00216 
00217   try {
00218     OpenDDS::DCPS::TransportRegistry::instance()->bind_config(config,
00219                                                               subscriber.in());
00220   } catch (const OpenDDS::DCPS::Transport::Exception&) {
00221     ACE_ERROR((LM_ERROR,
00222                ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00223                ACE_TEXT("failed to bind transport config to federation subscriber.\n")));
00224     throw Incomplete();
00225   }
00226 
00227   //
00228   // Create the publisher for the update topics.
00229   //
00230 
00231   DDS::Publisher_var publisher
00232   = this->federationParticipant_->create_publisher(
00233       PUBLISHER_QOS_DEFAULT,
00234       DDS::PublisherListener::_nil(),
00235       OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00236 
00237   if (CORBA::is_nil(publisher.in())) {
00238     ACE_ERROR((LM_ERROR,
00239                ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00240                ACE_TEXT("failed to create publisher for repository %d\n"),
00241                this->id().id()));
00242     throw Incomplete();
00243 
00244   } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00245     ACE_DEBUG((LM_DEBUG,
00246                ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
00247                ACE_TEXT("created federation publisher for repository %d\n"),
00248                this->id().id()));
00249 
00250   }
00251 
00252   // Attach the transport to it.
00253 
00254   try {
00255     OpenDDS::DCPS::TransportRegistry::instance()->bind_config(config,
00256                                                               publisher.in());
00257   } catch (const OpenDDS::DCPS::Transport::Exception&) {
00258     ACE_ERROR((LM_ERROR,
00259                ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00260                ACE_TEXT("failed to bind transport config to federation publisher.\n")));
00261     throw Incomplete();
00262   }
00263 
00264   //
00265   // Some useful items for adding the subscriptions.
00266   //
00267   DDS::Topic_var            topic;
00268   DDS::TopicDescription_var description;
00269   DDS::DataReader_var       dataReader;
00270   DDS::DataWriter_var       dataWriter;
00271 
00272   DDS::DataReaderQos readerQos;
00273   subscriber->get_default_datareader_qos(readerQos);
00274   readerQos.durability.kind                          = DDS::TRANSIENT_LOCAL_DURABILITY_QOS;
00275   readerQos.history.kind                             = DDS::KEEP_LAST_HISTORY_QOS;
00276   readerQos.history.depth                            = 50;
00277   readerQos.reliability.kind                         = DDS::RELIABLE_RELIABILITY_QOS;
00278   readerQos.reliability.max_blocking_time.sec        = 0;
00279   readerQos.reliability.max_blocking_time.nanosec    = 0;
00280 
00281   DDS::DataWriterQos writerQos;
00282   publisher->get_default_datawriter_qos(writerQos);
00283   writerQos.durability.kind                          = DDS::TRANSIENT_LOCAL_DURABILITY_QOS;
00284   writerQos.history.kind                             = DDS::KEEP_LAST_HISTORY_QOS;
00285   writerQos.history.depth                            = 50;
00286   writerQos.reliability.kind                         = DDS::RELIABLE_RELIABILITY_QOS;
00287   writerQos.reliability.max_blocking_time.sec        = 0;
00288   writerQos.reliability.max_blocking_time.nanosec    = 0;
00289 
00290   //
00291   // Add update subscriptions
00292   //
00293   // NOTE: Its ok to lose the references to the objects here since they
00294   //       are not needed after this point.  The only thing we will do
00295   //       with them is to destroy them, and that will be done via a
00296   //       cascade delete from the participant.  The listeners will
00297   //       survive and can be used within other participants as well,
00298   //       since the only state they retain is the manager, which is the
00299   //       same for all.
00300   //
00301 
00302   topic = this->federationParticipant_->create_topic(
00303             OWNERUPDATETOPICNAME,
00304             OWNERUPDATETYPENAME,
00305             TOPIC_QOS_DEFAULT,
00306             DDS::TopicListener::_nil(),
00307             OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00308 
00309   dataWriter = publisher->create_datawriter(
00310                  topic.in(),
00311                  writerQos,
00312                  DDS::DataWriterListener::_nil(),
00313                  OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00314 
00315   if (CORBA::is_nil(dataWriter.in())) {
00316     ACE_ERROR((LM_ERROR,
00317                ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00318                ACE_TEXT("failed to create OwnerUpdate writer for repository %d\n"),
00319                this->id().id()));
00320     throw Incomplete();
00321   }
00322 
00323   this->ownerWriter_ = OwnerUpdateDataWriter::_narrow(dataWriter.in());
00324 
00325   if (::CORBA::is_nil(this->ownerWriter_.in())) {
00326     ACE_ERROR((LM_ERROR,
00327                ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00328                ACE_TEXT("failed to extract typed OwnerUpdate writer.\n")));
00329     throw Incomplete();
00330 
00331   } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00332     OpenDDS::DCPS::DataWriterImpl* servant
00333     = dynamic_cast<OpenDDS::DCPS::DataWriterImpl*>(dataWriter.in());
00334 
00335     if (0 == servant) {
00336       ACE_DEBUG((LM_WARNING,
00337                  ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ")
00338                  ACE_TEXT("unable to extract typed OwnerUpdate writer.\n")));
00339 
00340     } else {
00341       OpenDDS::DCPS::RepoIdConverter converter(servant->get_publication_id());
00342       ACE_DEBUG((LM_DEBUG,
00343                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
00344                  ACE_TEXT("created federation OwnerUpdate writer %C for repository %d\n"),
00345                  std::string(converter).c_str(),
00346                  this->id().id()));
00347     }
00348   }
00349 
00350   description = this->federationParticipant_->lookup_topicdescription(OWNERUPDATETOPICNAME);
00351   dataReader  = subscriber->create_datareader(
00352                   description.in(),
00353                   readerQos,
00354                   &this->ownerListener_,
00355                   OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00356 
00357   if (CORBA::is_nil(dataReader.in())) {
00358     ACE_ERROR((LM_ERROR,
00359                ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00360                ACE_TEXT("failed to create OwnerUpdate reader for repository %d\n"),
00361                this->id().id()));
00362     throw Incomplete();
00363 
00364   } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00365     OpenDDS::DCPS::DataReaderImpl* servant
00366     = dynamic_cast<OpenDDS::DCPS::DataReaderImpl*>(dataReader.in());
00367 
00368     if (0 == servant) {
00369       ACE_DEBUG((LM_WARNING,
00370                  ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ")
00371                  ACE_TEXT("unable to extract typed OwnerUpdate reader.\n")));
00372 
00373     } else {
00374       OpenDDS::DCPS::RepoIdConverter converter(servant->get_subscription_id());
00375       ACE_DEBUG((LM_DEBUG,
00376                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
00377                  ACE_TEXT("created federation OwnerUpdate reader %C for repository %d\n"),
00378                  std::string(converter).c_str(),
00379                  this->id().id()));
00380     }
00381   }
00382 
00383   topic = this->federationParticipant_->create_topic(
00384             TOPICUPDATETOPICNAME,
00385             TOPICUPDATETYPENAME,
00386             TOPIC_QOS_DEFAULT,
00387             DDS::TopicListener::_nil(),
00388             OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00389   dataWriter = publisher->create_datawriter(
00390                  topic.in(),
00391                  writerQos,
00392                  DDS::DataWriterListener::_nil(),
00393                  OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00394 
00395   if (CORBA::is_nil(dataWriter.in())) {
00396     ACE_ERROR((LM_ERROR,
00397                ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00398                ACE_TEXT("failed to create TopicUpdate writer for repository %d\n"),
00399                this->id().id()));
00400     throw Incomplete();
00401   }
00402 
00403   this->topicWriter_
00404   = TopicUpdateDataWriter::_narrow(dataWriter.in());
00405 
00406   if (::CORBA::is_nil(this->topicWriter_.in())) {
00407     ACE_ERROR((LM_ERROR,
00408                ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00409                ACE_TEXT("failed to extract typed TopicUpdate writer.\n")));
00410     throw Incomplete();
00411 
00412   } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00413     OpenDDS::DCPS::DataWriterImpl* servant
00414     = dynamic_cast<OpenDDS::DCPS::DataWriterImpl*>(dataWriter.in());
00415 
00416     if (0 == servant) {
00417       ACE_DEBUG((LM_WARNING,
00418                  ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ")
00419                  ACE_TEXT("unable to extract typed TopicUpdate writer.\n")));
00420 
00421     } else {
00422       OpenDDS::DCPS::RepoIdConverter converter(servant->get_publication_id());
00423       ACE_DEBUG((LM_DEBUG,
00424                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
00425                  ACE_TEXT("created federation TopicUpdate writer %C for repository %d\n"),
00426                  std::string(converter).c_str(),
00427                  this->id().id()));
00428     }
00429   }
00430 
00431   description = this->federationParticipant_->lookup_topicdescription(TOPICUPDATETOPICNAME);
00432   dataReader  = subscriber->create_datareader(
00433                   description.in(),
00434                   readerQos,
00435                   &this->topicListener_,
00436                   OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00437 
00438   if (CORBA::is_nil(dataReader.in())) {
00439     ACE_ERROR((LM_ERROR,
00440                ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00441                ACE_TEXT("failed to create TopicUpdate reader for repository %d\n"),
00442                this->id().id()));
00443     throw Incomplete();
00444 
00445   } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00446     OpenDDS::DCPS::DataReaderImpl* servant
00447     = dynamic_cast<OpenDDS::DCPS::DataReaderImpl*>(dataReader.in());
00448 
00449     if (0 == servant) {
00450       ACE_DEBUG((LM_WARNING,
00451                  ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ")
00452                  ACE_TEXT("unable to extract typed TopicUpdate reader.\n")));
00453 
00454     } else {
00455       OpenDDS::DCPS::RepoIdConverter converter(servant->get_subscription_id());
00456       ACE_DEBUG((LM_DEBUG,
00457                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
00458                  ACE_TEXT("created federation TopicUpdate reader %C for repository %d\n"),
00459                  std::string(converter).c_str(),
00460                  this->id().id()));
00461     }
00462   }
00463 
00464   topic = this->federationParticipant_->create_topic(
00465             PARTICIPANTUPDATETOPICNAME,
00466             PARTICIPANTUPDATETYPENAME,
00467             TOPIC_QOS_DEFAULT,
00468             DDS::TopicListener::_nil(),
00469             OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00470   dataWriter = publisher->create_datawriter(
00471                  topic.in(),
00472                  writerQos,
00473                  DDS::DataWriterListener::_nil(),
00474                  OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00475 
00476   if (CORBA::is_nil(dataWriter.in())) {
00477     ACE_ERROR((LM_ERROR,
00478                ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00479                ACE_TEXT("failed to create ParticipantUpdate writer for repository %d\n"),
00480                this->id().id()));
00481     throw Incomplete();
00482   }
00483 
00484   this->participantWriter_
00485   = ParticipantUpdateDataWriter::_narrow(dataWriter.in());
00486 
00487   if (::CORBA::is_nil(this->participantWriter_.in())) {
00488     ACE_ERROR((LM_ERROR,
00489                ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00490                ACE_TEXT("failed to extract typed ParticipantUpdate writer.\n")));
00491     throw Incomplete();
00492 
00493   } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00494     OpenDDS::DCPS::DataWriterImpl* servant
00495     = dynamic_cast<OpenDDS::DCPS::DataWriterImpl*>(dataWriter.in());
00496 
00497     if (0 == servant) {
00498       ACE_DEBUG((LM_WARNING,
00499                  ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ")
00500                  ACE_TEXT("unable to extract typed ParticipantUpdate writer.\n")));
00501 
00502     } else {
00503       OpenDDS::DCPS::RepoIdConverter converter(servant->get_publication_id());
00504       ACE_DEBUG((LM_DEBUG,
00505                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
00506                  ACE_TEXT("created federation ParticipantUpdate writer %C for repository %d\n"),
00507                  std::string(converter).c_str(),
00508                  this->id().id()));
00509     }
00510   }
00511 
00512   description = this->federationParticipant_->lookup_topicdescription(PARTICIPANTUPDATETOPICNAME);
00513   dataReader  = subscriber->create_datareader(
00514                   description.in(),
00515                   readerQos,
00516                   &this->participantListener_,
00517                   OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00518 
00519   if (CORBA::is_nil(dataReader.in())) {
00520     ACE_ERROR((LM_ERROR,
00521                ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00522                ACE_TEXT("failed to create ParticipantUpdate reader for repository %d\n"),
00523                this->id().id()));
00524     throw Incomplete();
00525 
00526   } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00527     OpenDDS::DCPS::DataReaderImpl* servant
00528     = dynamic_cast<OpenDDS::DCPS::DataReaderImpl*>(dataReader.in());
00529 
00530     if (0 == servant) {
00531       ACE_DEBUG((LM_WARNING,
00532                  ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ")
00533                  ACE_TEXT("unable to extract typed ParticipantUpdate reader.\n")));
00534 
00535     } else {
00536       OpenDDS::DCPS::RepoIdConverter converter(servant->get_subscription_id());
00537       ACE_DEBUG((LM_DEBUG,
00538                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
00539                  ACE_TEXT("created federation ParticipantUpdate reader %C for repository %d\n"),
00540                  std::string(converter).c_str(),
00541                  this->id().id()));
00542     }
00543   }
00544 
00545   topic = this->federationParticipant_->create_topic(
00546             PUBLICATIONUPDATETOPICNAME,
00547             PUBLICATIONUPDATETYPENAME,
00548             TOPIC_QOS_DEFAULT,
00549             DDS::TopicListener::_nil(),
00550             OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00551   dataWriter = publisher->create_datawriter(
00552                  topic.in(),
00553                  writerQos,
00554                  DDS::DataWriterListener::_nil(),
00555                  OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00556 
00557   if (CORBA::is_nil(dataWriter.in())) {
00558     ACE_ERROR((LM_ERROR,
00559                ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00560                ACE_TEXT("failed to create PublicationUpdate writer for repository %d\n"),
00561                this->id().id()));
00562     throw Incomplete();
00563   }
00564 
00565   this->publicationWriter_
00566   = PublicationUpdateDataWriter::_narrow(dataWriter.in());
00567 
00568   if (::CORBA::is_nil(this->publicationWriter_.in())) {
00569     ACE_ERROR((LM_ERROR,
00570                ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00571                ACE_TEXT("failed to extract typed PublicationUpdate writer.\n")));
00572     throw Incomplete();
00573 
00574   } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00575     OpenDDS::DCPS::DataWriterImpl* servant
00576     = dynamic_cast<OpenDDS::DCPS::DataWriterImpl*>(dataWriter.in());
00577 
00578     if (0 == servant) {
00579       ACE_DEBUG((LM_WARNING,
00580                  ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ")
00581                  ACE_TEXT("unable to extract typed PublicationUpdate writer.\n")));
00582 
00583     } else {
00584       OpenDDS::DCPS::RepoIdConverter converter(servant->get_publication_id());
00585       ACE_DEBUG((LM_DEBUG,
00586                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
00587                  ACE_TEXT("created federation PublicationUpdate writer %C for repository %d\n"),
00588                  std::string(converter).c_str(),
00589                  this->id().id()));
00590     }
00591   }
00592 
00593   description = this->federationParticipant_->lookup_topicdescription(PUBLICATIONUPDATETOPICNAME);
00594   dataReader  = subscriber->create_datareader(
00595                   description.in(),
00596                   readerQos,
00597                   &this->publicationListener_,
00598                   OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00599 
00600   if (CORBA::is_nil(dataReader.in())) {
00601     ACE_ERROR((LM_ERROR,
00602                ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00603                ACE_TEXT("failed to create PublicationUpdate reader for repository %d\n"),
00604                this->id().id()));
00605     throw Incomplete();
00606 
00607   } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00608     OpenDDS::DCPS::DataReaderImpl* servant
00609     = dynamic_cast<OpenDDS::DCPS::DataReaderImpl*>(dataReader.in());
00610 
00611     if (0 == servant) {
00612       ACE_DEBUG((LM_WARNING,
00613                  ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ")
00614                  ACE_TEXT("unable to extract typed PublicationUpdate reader.\n")));
00615 
00616     } else {
00617       OpenDDS::DCPS::RepoIdConverter converter(servant->get_subscription_id());
00618       ACE_DEBUG((LM_DEBUG,
00619                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
00620                  ACE_TEXT("created federation PublicationUpdate reader %C for repository %d\n"),
00621                  std::string(converter).c_str(),
00622                  this->id().id()));
00623     }
00624   }
00625 
00626   topic = this->federationParticipant_->create_topic(
00627             SUBSCRIPTIONUPDATETOPICNAME,
00628             SUBSCRIPTIONUPDATETYPENAME,
00629             TOPIC_QOS_DEFAULT,
00630             DDS::TopicListener::_nil(),
00631             OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00632   dataWriter = publisher->create_datawriter(
00633                  topic.in(),
00634                  writerQos,
00635                  DDS::DataWriterListener::_nil(),
00636                  OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00637 
00638   if (CORBA::is_nil(dataWriter.in())) {
00639     ACE_ERROR((LM_ERROR,
00640                ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00641                ACE_TEXT("failed to create SubscriptionUpdate writer for repository %d\n"),
00642                this->id().id()));
00643     throw Incomplete();
00644   }
00645 
00646   this->subscriptionWriter_
00647   = SubscriptionUpdateDataWriter::_narrow(dataWriter.in());
00648 
00649   if (::CORBA::is_nil(this->subscriptionWriter_.in())) {
00650     ACE_ERROR((LM_ERROR,
00651                ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00652                ACE_TEXT("failed to extract typed SubscriptionUpdate writer.\n")));
00653     throw Incomplete();
00654 
00655   } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00656     OpenDDS::DCPS::DataWriterImpl* servant
00657     = dynamic_cast<OpenDDS::DCPS::DataWriterImpl*>(dataWriter.in());
00658 
00659     if (0 == servant) {
00660       ACE_DEBUG((LM_WARNING,
00661                  ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ")
00662                  ACE_TEXT("unable to extract typed SubscriptionUpdate writer.\n")));
00663 
00664     } else {
00665       OpenDDS::DCPS::RepoIdConverter converter(servant->get_publication_id());
00666       ACE_DEBUG((LM_DEBUG,
00667                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
00668                  ACE_TEXT("created federation SubscriptionUpdate writer %C for repository %d\n"),
00669                  std::string(converter).c_str(),
00670                  this->id().id()));
00671     }
00672   }
00673 
00674   description = this->federationParticipant_->lookup_topicdescription(SUBSCRIPTIONUPDATETOPICNAME);
00675   dataReader  = subscriber->create_datareader(
00676                   description.in(),
00677                   readerQos,
00678                   &this->subscriptionListener_,
00679                   OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00680 
00681   if (CORBA::is_nil(dataReader.in())) {
00682     ACE_ERROR((LM_ERROR,
00683                ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00684                ACE_TEXT("failed to create SubscriptionUpdate reader for repository %d\n"),
00685                this->id().id()));
00686     throw Incomplete();
00687 
00688   } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00689     OpenDDS::DCPS::DataReaderImpl* servant
00690     = dynamic_cast<OpenDDS::DCPS::DataReaderImpl*>(dataReader.in());
00691 
00692     if (0 == servant) {
00693       ACE_DEBUG((LM_WARNING,
00694                  ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ")
00695                  ACE_TEXT("unable to extract typed SubscriptionUpdate reader.\n")));
00696 
00697     } else {
00698       OpenDDS::DCPS::RepoIdConverter converter(servant->get_subscription_id());
00699       ACE_DEBUG((LM_DEBUG,
00700                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
00701                  ACE_TEXT("created federation SubscriptionUpdate reader %C for repository %d\n"),
00702                  std::string(converter).c_str(),
00703                  this->id().id()));
00704     }
00705   }
00706 
00707   // JSP
00708 #if defined (ACE_HAS_IP_MULTICAST)
00709 
00710   if (this->multicastEnabled_) {
00711     //
00712     // Install ior multicast handler.
00713     //
00714     // Get reactor instance from TAO.
00715     ACE_Reactor *reactor = this->orb_->orb_core()->reactor();
00716 
00717     // See if the -ORBMulticastDiscoveryEndpoint option was specified.
00718     ACE_CString mde(this->orb_->orb_core()->orb_params()->mcast_discovery_endpoint());
00719 
00720     // First, see if the user has given us a multicast port number
00721     // on the command-line;
00722     u_short port = 0;
00723 
00724     // Check environment var. for multicast port.
00725     const char *port_number = ACE_OS::getenv("OpenDDSFederationPort");
00726 
00727     if (port_number != 0) {
00728       port = static_cast<u_short>(ACE_OS::atoi(port_number));
00729     }
00730 
00731     // Port wasn't specified on the command-line -
00732     // use the default.
00733     if (port == 0)
00734       port = OpenDDS::Federator::Defaults::DiscoveryRequestPort;
00735 
00736     // Initialize the handler
00737     if (mde.length() != 0) {
00738       if (this->multicastResponder_.init(
00739             this->orb_.in(),
00740             mde.c_str()) == -1) {
00741         ACE_ERROR((LM_ERROR,
00742                    ACE_TEXT("(%P|%t) ERROR: Unable to initialize ")
00743                    ACE_TEXT("the multicast responder for repository %d.\n"),
00744                    this->id().id()));
00745         throw Incomplete();
00746       }
00747 
00748     } else {
00749       if (this->multicastResponder_.init(
00750             this->orb_.in(),
00751             port,
00752 #if defined (ACE_HAS_IPV6)
00753             ACE_DEFAULT_MULTICASTV6_ADDR
00754 #else
00755             ACE_DEFAULT_MULTICAST_ADDR
00756 #endif /* ACE_HAS_IPV6 */
00757           )) {
00758         ACE_ERROR((LM_ERROR,
00759                    ACE_TEXT("(%P|%t) ERROR: Unable to initialize ")
00760                    ACE_TEXT("the multicast responder for repository %d.\n"),
00761                    this->id().id()));
00762         throw Incomplete();
00763       }
00764     }
00765 
00766     // Register event handler for the ior multicast.
00767     if (reactor->register_handler(&this->multicastResponder_,
00768                                   ACE_Event_Handler::READ_MASK) == -1) {
00769       ACE_ERROR((LM_ERROR,
00770                  ACE_TEXT("(%P|%t) ERROR: Unable to register event handler ")
00771                  ACE_TEXT("for repository %d.\n"),
00772                  this->id().id()));
00773       throw Incomplete();
00774     }
00775 
00776     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00777       ACE_DEBUG((LM_DEBUG,
00778                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
00779                  ACE_TEXT("multicast server setup is complete.\n")));
00780     }
00781   }
00782 
00783 #else
00784   ACE_UNUSED_ARG(this->multicastEnabled_);
00785 #endif /* ACE_HAS_IP_MULTICAST */
00786 }
00787 
00788 void
00789 ManagerImpl::finalize()
00790 {
00791   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00792     ACE_DEBUG((LM_DEBUG,
00793                ACE_TEXT("(%P|%t) Federator::ManagerImpl::finalize()\n")));
00794   }
00795 
00796   ownerListener_.stop();
00797   topicListener_.stop();
00798   participantListener_.stop();
00799   publicationListener_.stop();
00800   subscriptionListener_.stop();
00801   ownerListener_.join();
00802   topicListener_.join();
00803   participantListener_.join();
00804   publicationListener_.join();
00805   subscriptionListener_.join();
00806 
00807   if (this->federated_) {
00808     try {
00809       IdToManagerMap::iterator where = this->peers_.find(this->joinRepo_);
00810 
00811       if (where == this->peers_.end()) {
00812         ACE_DEBUG((LM_DEBUG,
00813                    ACE_TEXT("(%P|%t) Federator::Manager::finalize: ")
00814                    ACE_TEXT("repository %d - all attachment to federation left.\n"),
00815                    this->id().id()));
00816 
00817       } else {
00818         if (CORBA::is_nil(where->second.in())) {
00819           ACE_ERROR((LM_ERROR,
00820                      ACE_TEXT("(%P|%t) ERROR: Federator::Manager::finalize: ")
00821                      ACE_TEXT("repository %d not currently attached to a federation.\n"),
00822                      this->id().id()));
00823 
00824         } else {
00825           where->second->leave_federation(this->id().id());
00826           this->federated_ = false;
00827         }
00828       }
00829 
00830     } catch (const CORBA::Exception& ex) {
00831       ex._tao_print_exception(
00832         ACE_TEXT("ERROR: Federator::ManagerImpl::finalize() - ")
00833         ACE_TEXT("unable to leave remote federation "));
00834       throw Incomplete();
00835     }
00836   }
00837 
00838   if (!CORBA::is_nil(this->orb_.in()) && (0 != this->orb_->orb_core())) {
00839     this->orb_->orb_core()->reactor()->remove_handler(
00840       &this->multicastResponder_,
00841       ACE_Event_Handler::READ_MASK | ACE_Event_Handler::DONT_CALL);
00842   }
00843 
00844   // Remove our local participant and contained entities.
00845   if (0 == CORBA::is_nil(this->federationParticipant_.in())) {
00846     DDS::DomainParticipantFactory_var dpf = TheParticipantFactory;
00847     if (DDS::RETCODE_PRECONDITION_NOT_MET
00848         == this->federationParticipant_->delete_contained_entities()) {
00849       ACE_ERROR((LM_ERROR,
00850                  ACE_TEXT("(%P|%t) ERROR: Federator::Manager ")
00851                  ACE_TEXT("unable to release resources for repository %d.\n"),
00852                  this->id().id()));
00853 
00854     } else if (DDS::RETCODE_PRECONDITION_NOT_MET
00855                == dpf->delete_participant(this->federationParticipant_.in())) {
00856       ACE_ERROR((LM_ERROR,
00857                  ACE_TEXT("(%P|%t) ERROR: Federator::Manager ")
00858                  ACE_TEXT("unable to release the participant for repository %d.\n"),
00859                  this->id().id()));
00860     }
00861   }
00862 }
00863 
00864 // IDL methods.
00865 
00866 RepoKey
00867 ManagerImpl::federation_id()
00868 {
00869   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00870     ACE_DEBUG((LM_DEBUG,
00871                ACE_TEXT("(%P|%t) ManagerImpl::federation_id()\n")));
00872   }
00873 
00874   return this->id().id();
00875 }
00876 
00877 OpenDDS::DCPS::DCPSInfo_ptr
00878 ManagerImpl::repository()
00879 {
00880   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00881     ACE_DEBUG((LM_DEBUG,
00882                ACE_TEXT("(%P|%t) ManagerImpl::repository()\n")));
00883   }
00884 
00885   OpenDDS::DCPS::Discovery_rch disco
00886   = TheServiceParticipant->get_discovery(
00887       this->config_.federationDomain());
00888   OpenDDS::DCPS::DCPSInfo_var repo;
00889   if (!disco.is_nil()) {
00890     OpenDDS::DCPS::InfoRepoDiscovery_rch irDisco =
00891       DCPS::static_rchandle_cast<DCPS::InfoRepoDiscovery>(disco);
00892     repo = irDisco->get_dcps_info();
00893   }
00894 
00895   if (CORBA::is_nil(repo.in())) {
00896     return OpenDDS::DCPS::DCPSInfo::_duplicate(this->localRepo_.in());
00897 
00898   } else {
00899     return OpenDDS::DCPS::DCPSInfo::_duplicate(repo.in());
00900   }
00901 }
00902 
00903 CORBA::Boolean
00904 ManagerImpl::discover_federation(const char * ior)
00905 {
00906   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00907     ACE_DEBUG((LM_DEBUG,
00908                ACE_TEXT("(%P|%t) ManagerImpl::discover_federation( %C)\n"),
00909                ior));
00910   }
00911 
00912   ///@TODO: Implement this.
00913   return false;
00914 }
00915 
00916 Manager_ptr
00917 ManagerImpl::join_federation(
00918   Manager_ptr peer,
00919   FederationDomain federation
00920 )
00921 {
00922   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00923     ACE_DEBUG((LM_DEBUG,
00924                ACE_TEXT("(%P|%t) ManagerImpl::join_federation( peer, %d)\n"),
00925                federation));
00926   }
00927 
00928   RepoKey remote = NIL_REPOSITORY;
00929 
00930   try {
00931     // Obtain the remote repository federator Id value.
00932     remote = peer->federation_id();
00933 
00934     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00935       ACE_DEBUG((LM_DEBUG,
00936                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::join_federation() - ")
00937                  ACE_TEXT("repo id %d entered from repository with id %d.\n"),
00938                  this->id().id(),
00939                  remote));
00940     }
00941 
00942   } catch (const CORBA::Exception& ex) {
00943     ex._tao_print_exception(
00944       ACE_TEXT("ERROR: Federator::ManagerImpl::join_federation() - ")
00945       ACE_TEXT("unable to obtain remote federation Id value: "));
00946     throw Incomplete();
00947   }
00948 
00949   // If we are recursing, then we are done.
00950   if (this->joiner_ == remote) {
00951     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00952       ACE_DEBUG((LM_DEBUG,
00953                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::join_federation() - ")
00954                  ACE_TEXT("repo id %d leaving after reentry from repository with id %d.\n"),
00955                  this->id().id(),
00956                  remote));
00957     }
00958 
00959     return this->_this();
00960 
00961   } else {
00962     // Block while any different repository is joining.
00963     ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, guard, this->lock_, 0);
00964 
00965     while (this->joiner_ != NIL_REPOSITORY) {
00966       // This releases the lock while we block.
00967       this->joining_.wait();
00968 
00969       // We are now recursing - curses!
00970       if (this->joiner_ == remote) {
00971         return this->_this();
00972       }
00973     }
00974 
00975     // Note that we are joining the remote repository now.
00976     this->joiner_ = remote;
00977   }
00978 
00979   //
00980   // We only reach this point if:
00981   //   1) No other repository is processing past this point;
00982   //   2) We are not recursing.
00983   //
00984 
00985   // Check if we already have Federation repository.
00986   // Check if we are already federated.
00987   if (this->federated_ == false) {
00988     // Go ahead and add the joining repository as our Federation
00989     // repository.
00990     try {
00991       // Mark this repository as the point to which we are joined to
00992       // the federation.
00993       this->joinRepo_ = remote;
00994 
00995       // Obtain a reference to the remote repository.
00996       OpenDDS::DCPS::DCPSInfo_var remoteRepo = peer->repository();
00997 
00998       CORBA::ORB_var orb = remoteRepo->_get_orb();
00999       CORBA::String_var remoteRepoIor = orb->object_to_string(remoteRepo.in());
01000       if (OpenDDS::DCPS::DCPS_debug_level > 4) {
01001         ACE_DEBUG((LM_DEBUG,
01002                    ACE_TEXT("(%P|%t) FederatorManagerImpl::join_federation() - ")
01003                    ACE_TEXT("id %d obtained reference to id %d:\n")
01004                    ACE_TEXT("\t%C\n"),
01005                    this->id().id(),
01006                    remote,
01007                    remoteRepoIor.in()));
01008       }
01009 
01010       // Add remote repository to Service_Participant in the Federation domain
01011       std::ostringstream oss;
01012       oss << remote;
01013       std::string key_string = oss.str();
01014       TheServiceParticipant->set_repo_ior(remoteRepoIor.in(), key_string);
01015       TheServiceParticipant->set_repo_domain(this->config_.federationDomain(), key_string);
01016 
01017     } catch (const CORBA::Exception& ex) {
01018       ex._tao_print_exception(
01019         "ERROR: Federator::ManagerImpl::join_federation() - Unable to join with remote: ");
01020       throw Incomplete();
01021     }
01022   }
01023 
01024   // Symmetrical joining behavior.
01025   try {
01026     Manager_var self = this->_this();
01027     Manager_var remoteManager
01028     = peer->join_federation(self, this->config_.federationDomain());
01029 
01030     if (this->joinRepo_ == remote) {
01031       this->peers_[ this->joinRepo_]
01032       = OpenDDS::Federator::Manager::_duplicate(remoteManager.in());
01033     }
01034 
01035     //
01036     // Push our initial state out to the joining repository *after* we call
01037     // him back to join.  This reduces the amount of duplicate data pushed
01038     // when a new (empty) repository is joining an existing federation.
01039     //
01040     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
01041       ACE_DEBUG((LM_DEBUG,
01042                  ACE_TEXT("(%P|%t) Federator::ManagerImpl::join_federation() - ")
01043                  ACE_TEXT("repo id %d pushing state to repository with id %d.\n"),
01044                  this->id().id(),
01045                  remote));
01046     }
01047 
01048     this->pushState(peer);
01049 
01050   } catch (const CORBA::Exception& ex) {
01051     ex._tao_print_exception(
01052       "ERROR: Federator::ManagerImpl::join_federation() - unsuccessful call to remote->join: ");
01053     throw Incomplete();
01054   }
01055 
01056   if (CORBA::is_nil(this->participantWriter_.in())) {
01057     //
01058     // Establish our update publications and subscriptions *after* we
01059     // have exchanged internal state with the first joining repository.
01060     //
01061     this->initialize();
01062   }
01063 
01064   // Adjust our joining state and give others the opportunity to proceed.
01065   if (OpenDDS::DCPS::DCPS_debug_level > 4) {
01066     ACE_DEBUG((LM_DEBUG,
01067                ACE_TEXT("(%P|%t) Federator::ManagerImpl::join_federation() - ")
01068                ACE_TEXT("repo id %d joined to repository with id %d.\n"),
01069                this->id().id(),
01070                remote));
01071   }
01072 
01073   this->federated_ = true;
01074   this->joiner_    = NIL_REPOSITORY;
01075   this->joining_.signal();
01076   return this->_this();
01077 }
01078 
01079 void
01080 ManagerImpl::leave_federation(
01081   RepoKey id)
01082 {
01083   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
01084     ACE_DEBUG((LM_DEBUG,
01085                ACE_TEXT("(%P|%t) ManagerImpl::leave_federation( %d)\n"),
01086                this->id().id()));
01087   }
01088 
01089   // Remove the leaving repository from our outbound mappings.
01090   IdToManagerMap::iterator where = this->peers_.find(id);
01091 
01092   if (where != this->peers_.end()) {
01093     this->peers_.erase(where);
01094   }
01095 
01096   // Remove all the internal Entities owned by the leaving repository.
01097   if (false
01098       == this->info_->remove_by_owner(this->config_.federationDomain(), id)) {
01099     throw Incomplete();
01100   }
01101 
01102   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
01103     ACE_DEBUG((LM_DEBUG,
01104                ACE_TEXT("(%P|%t) ManagerImpl::leave_federation( %d) complete.\n"),
01105                this->id().id()));
01106   }
01107 }
01108 
01109 void
01110 ManagerImpl::leave_and_shutdown(
01111   void)
01112 {
01113   // Shutdown the process via the repository object.
01114   this->info_->shutdown();
01115 }
01116 
01117 void
01118 ManagerImpl::shutdown(
01119   void)
01120 {
01121   // Prevent the removal of this repository from the federation during
01122   // shutdown processing.
01123   this->federated_ = false;
01124 
01125   // Shutdown the process via the repository object.
01126   this->info_->shutdown();
01127 }
01128 
01129 void
01130 ManagerImpl::initializeOwner(
01131   const OpenDDS::Federator::OwnerUpdate & data)
01132 {
01133   this->processCreate(&data, 0);
01134 }
01135 
01136 void
01137 ManagerImpl::initializeTopic(
01138   const OpenDDS::Federator::TopicUpdate & data)
01139 {
01140   this->processCreate(&data, 0);
01141 }
01142 
01143 void
01144 ManagerImpl::initializeParticipant(
01145   const OpenDDS::Federator::ParticipantUpdate & data)
01146 {
01147   this->processCreate(&data, 0);
01148 }
01149 
01150 void
01151 ManagerImpl::initializePublication(
01152   const OpenDDS::Federator::PublicationUpdate & data)
01153 {
01154   this->processCreate(&data, 0);
01155 }
01156 
01157 void
01158 ManagerImpl::initializeSubscription(
01159   const OpenDDS::Federator::SubscriptionUpdate & data)
01160 {
01161   this->processCreate(&data, 0);
01162 }
01163 
01164 } // namespace Federator
01165 } // namespace OpenDDS
01166 
01167 OPENDDS_END_VERSIONED_NAMESPACE_DECL
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1