00001
00002
00003
00004
00005
00006
00007
00008 #include "DcpsInfo_pch.h"
00009 #include "FederatorManagerImpl.h"
00010 #include "DCPSInfo_i.h"
00011 #include "DefaultValues.h"
00012 #include "dds/DCPS/InfoRepoDiscovery/InfoRepoDiscovery.h"
00013 #include "dds/DCPS/SubscriberImpl.h"
00014 #include "dds/DCPS/Service_Participant.h"
00015 #include "dds/DCPS/Marked_Default_Qos.h"
00016 #include "dds/DCPS/RepoIdConverter.h"
00017 #include "dds/DCPS/transport/framework/TransportRegistry.h"
00018 #include "dds/DCPS/transport/framework/TransportExceptions.h"
00019 #include "dds/DCPS/transport/tcp/TcpInst.h"
00020 #include "dds/DCPS/transport/tcp/Tcp.h"
00021 #include "tao/ORB_Core.h"
00022 #include "ace/Log_Priority.h"
00023 #include "ace/Log_Msg.h"
00024
00025 #include "FederatorTypeSupportC.h"
00026 #include "FederatorTypeSupportImpl.h"
00027
00028 #include <sstream>
00029
00030 #if !defined (__ACE_INLINE__)
00031 # include "FederatorManagerImpl.inl"
00032 #endif
00033
00034 namespace OpenDDS {
00035 namespace Federator {
00036
00037 ManagerImpl::ManagerImpl(Config& config)
00038 : joining_(this->lock_),
00039 joiner_(NIL_REPOSITORY),
00040 joinRepo_(NIL_REPOSITORY),
00041 federated_(false),
00042 config_(config),
00043 info_(0),
00044 ownerListener_(*this),
00045 topicListener_(*this),
00046 participantListener_(*this),
00047 publicationListener_(*this),
00048 subscriptionListener_(*this),
00049 multicastEnabled_(false)
00050 {
00051 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00052 ACE_DEBUG((LM_DEBUG,
00053 ACE_TEXT("(%P|%t) Federator::ManagerImpl::ManagerImpl()\n")));
00054 }
00055
00056 char* mdec = ACE_OS::getenv("MulticastDiscoveryEnabled");
00057
00058 if (mdec != 0) {
00059 std::string mde(ACE_OS::getenv("MulticastDiscoveryEnabled"));
00060
00061 if (mde != "0") {
00062 multicastEnabled_ = true;
00063 }
00064 }
00065 }
00066
00067 ManagerImpl::~ManagerImpl()
00068 {
00069 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00070 ACE_DEBUG((LM_DEBUG,
00071 ACE_TEXT("(%P|%t) Federator::ManagerImpl::~ManagerImpl()\n")));
00072 }
00073 }
00074
00075 void
00076 ManagerImpl::initialize()
00077 {
00078 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00079 ACE_DEBUG((LM_DEBUG,
00080 ACE_TEXT("(%P|%t) Federation::ManagerImpl::initialize()\n")));
00081 }
00082
00083
00084
00085 this->ownerListener_.federationId(this->id());
00086 this->topicListener_.federationId(this->id());
00087 this->participantListener_.federationId(this->id());
00088 this->publicationListener_.federationId(this->id());
00089 this->subscriptionListener_.federationId(this->id());
00090
00091
00092 DDS::DomainParticipantFactory_var dpf = TheParticipantFactory;
00093 this->federationParticipant_
00094 = dpf->create_participant(
00095 this->config_.federationDomain(),
00096 PARTICIPANT_QOS_DEFAULT,
00097 DDS::DomainParticipantListener::_nil(),
00098 OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00099
00100 if (CORBA::is_nil(this->federationParticipant_.in())) {
00101 ACE_ERROR((LM_ERROR,
00102 ACE_TEXT("(%P|%t) ERROR: create_participant failed for ")
00103 ACE_TEXT("repository %d in federation domain %d.\n"),
00104 this->id().id(),
00105 this->config_.federationDomain()));
00106 throw Incomplete();
00107 }
00108
00109
00110
00111
00112 OwnerUpdateTypeSupportImpl* ownerUpdate = new OwnerUpdateTypeSupportImpl();
00113
00114 if (DDS::RETCODE_OK != ownerUpdate->register_type(
00115 this->federationParticipant_.in(),
00116 OWNERUPDATETYPENAME)) {
00117 ACE_ERROR((LM_ERROR,
00118 ACE_TEXT("(%P|%t) ERROR: Unable to install ")
00119 ACE_TEXT("OwnerUpdate type support for repository %d.\n"),
00120 this->id().id()));
00121 throw Incomplete();
00122 }
00123
00124 ParticipantUpdateTypeSupportImpl* participantUpdate = new ParticipantUpdateTypeSupportImpl();
00125
00126 if (DDS::RETCODE_OK != participantUpdate->register_type(
00127 this->federationParticipant_.in(),
00128 PARTICIPANTUPDATETYPENAME)) {
00129 ACE_ERROR((LM_ERROR,
00130 ACE_TEXT("(%P|%t) ERROR: Unable to install ")
00131 ACE_TEXT("ParticipantUpdate type support for repository %d.\n"),
00132 this->id().id()));
00133 throw Incomplete();
00134 }
00135
00136 TopicUpdateTypeSupportImpl* topicUpdate = new TopicUpdateTypeSupportImpl();
00137
00138 if (DDS::RETCODE_OK != topicUpdate->register_type(
00139 this->federationParticipant_.in(),
00140 TOPICUPDATETYPENAME)) {
00141 ACE_ERROR((LM_ERROR,
00142 ACE_TEXT("(%P|%t) ERROR: Unable to install ")
00143 ACE_TEXT("TopicUpdate type support for repository %d.\n"),
00144 this->id().id()));
00145 throw Incomplete();
00146 }
00147
00148 PublicationUpdateTypeSupportImpl* publicationUpdate = new PublicationUpdateTypeSupportImpl();
00149
00150 if (DDS::RETCODE_OK != publicationUpdate->register_type(
00151 this->federationParticipant_.in(),
00152 PUBLICATIONUPDATETYPENAME)) {
00153 ACE_ERROR((LM_ERROR,
00154 ACE_TEXT("(%P|%t) ERROR: Unable to install ")
00155 ACE_TEXT("PublicationUpdate type support for repository %d.\n"),
00156 this->id().id()));
00157 throw Incomplete();
00158 }
00159
00160 SubscriptionUpdateTypeSupportImpl* subscriptionUpdate = new SubscriptionUpdateTypeSupportImpl();
00161
00162 if (DDS::RETCODE_OK != subscriptionUpdate->register_type(
00163 this->federationParticipant_.in(),
00164 SUBSCRIPTIONUPDATETYPENAME)) {
00165 ACE_ERROR((LM_ERROR,
00166 ACE_TEXT("(%P|%t) ERROR: Unable to install ")
00167 ACE_TEXT("SubscriptionUpdate type support for repository %d.\n"),
00168 this->id().id()));
00169 throw Incomplete();
00170 }
00171
00172
00173
00174
00175 std::string config_name =
00176 OpenDDS::DCPS::TransportRegistry::DEFAULT_INST_PREFIX
00177 + std::string("FederationBITTransportConfig");
00178 OpenDDS::DCPS::TransportConfig_rch config =
00179 OpenDDS::DCPS::TransportRegistry::instance()->create_config(config_name);
00180
00181 std::string inst_name = OpenDDS::DCPS::TransportRegistry::DEFAULT_INST_PREFIX
00182 + std::string("FederationBITTCPTransportInst");
00183 OpenDDS::DCPS::TransportInst_rch inst =
00184 OpenDDS::DCPS::TransportRegistry::instance()->create_inst(inst_name,
00185 "tcp");
00186 config->instances_.push_back(inst);
00187
00188
00189
00190
00191
00192 DDS::Subscriber_var subscriber
00193 = this->federationParticipant_->create_subscriber(
00194 SUBSCRIBER_QOS_DEFAULT,
00195 DDS::SubscriberListener::_nil(),
00196 OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00197
00198 if (CORBA::is_nil(subscriber.in())) {
00199 ACE_ERROR((LM_ERROR,
00200 ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00201 ACE_TEXT("failed to create subscriber for repository %d\n"),
00202 this->id().id()));
00203 throw Incomplete();
00204
00205 } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00206 ACE_DEBUG((LM_DEBUG,
00207 ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
00208 ACE_TEXT("created federation subscriber for repository %d\n"),
00209 this->id().id()));
00210
00211 }
00212
00213
00214
00215 try {
00216 OpenDDS::DCPS::TransportRegistry::instance()->bind_config(config,
00217 subscriber.in());
00218 } catch (const OpenDDS::DCPS::Transport::Exception&) {
00219 ACE_ERROR((LM_ERROR,
00220 ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00221 ACE_TEXT("failed to bind transport config to federation subscriber.\n")));
00222 throw Incomplete();
00223 }
00224
00225
00226
00227
00228
00229 DDS::Publisher_var publisher
00230 = this->federationParticipant_->create_publisher(
00231 PUBLISHER_QOS_DEFAULT,
00232 DDS::PublisherListener::_nil(),
00233 OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00234
00235 if (CORBA::is_nil(publisher.in())) {
00236 ACE_ERROR((LM_ERROR,
00237 ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00238 ACE_TEXT("failed to create publisher for repository %d\n"),
00239 this->id().id()));
00240 throw Incomplete();
00241
00242 } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00243 ACE_DEBUG((LM_DEBUG,
00244 ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
00245 ACE_TEXT("created federation publisher for repository %d\n"),
00246 this->id().id()));
00247
00248 }
00249
00250
00251
00252 try {
00253 OpenDDS::DCPS::TransportRegistry::instance()->bind_config(config,
00254 publisher.in());
00255 } catch (const OpenDDS::DCPS::Transport::Exception&) {
00256 ACE_ERROR((LM_ERROR,
00257 ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00258 ACE_TEXT("failed to bind transport config to federation publisher.\n")));
00259 throw Incomplete();
00260 }
00261
00262
00263
00264
00265 DDS::Topic_var topic;
00266 DDS::TopicDescription_var description;
00267 DDS::DataReader_var dataReader;
00268 DDS::DataWriter_var dataWriter;
00269
00270 DDS::DataReaderQos readerQos;
00271 subscriber->get_default_datareader_qos(readerQos);
00272 readerQos.durability.kind = DDS::TRANSIENT_LOCAL_DURABILITY_QOS;
00273 readerQos.history.kind = DDS::KEEP_LAST_HISTORY_QOS;
00274 readerQos.history.depth = 50;
00275 readerQos.reliability.kind = DDS::RELIABLE_RELIABILITY_QOS;
00276 readerQos.reliability.max_blocking_time.sec = 0;
00277 readerQos.reliability.max_blocking_time.nanosec = 0;
00278
00279 DDS::DataWriterQos writerQos;
00280 publisher->get_default_datawriter_qos(writerQos);
00281 writerQos.durability.kind = DDS::TRANSIENT_LOCAL_DURABILITY_QOS;
00282 writerQos.history.kind = DDS::KEEP_LAST_HISTORY_QOS;
00283 writerQos.history.depth = 50;
00284 writerQos.reliability.kind = DDS::RELIABLE_RELIABILITY_QOS;
00285 writerQos.reliability.max_blocking_time.sec = 0;
00286 writerQos.reliability.max_blocking_time.nanosec = 0;
00287
00288
00289
00290
00291
00292
00293
00294
00295
00296
00297
00298
00299
00300 topic = this->federationParticipant_->create_topic(
00301 OWNERUPDATETOPICNAME,
00302 OWNERUPDATETYPENAME,
00303 TOPIC_QOS_DEFAULT,
00304 DDS::TopicListener::_nil(),
00305 OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00306
00307 dataWriter = publisher->create_datawriter(
00308 topic.in(),
00309 writerQos,
00310 DDS::DataWriterListener::_nil(),
00311 OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00312
00313 if (CORBA::is_nil(dataWriter.in())) {
00314 ACE_ERROR((LM_ERROR,
00315 ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00316 ACE_TEXT("failed to create OwnerUpdate writer for repository %d\n"),
00317 this->id().id()));
00318 throw Incomplete();
00319 }
00320
00321 this->ownerWriter_ = OwnerUpdateDataWriter::_narrow(dataWriter.in());
00322
00323 if (::CORBA::is_nil(this->ownerWriter_.in())) {
00324 ACE_ERROR((LM_ERROR,
00325 ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00326 ACE_TEXT("failed to extract typed OwnerUpdate writer.\n")));
00327 throw Incomplete();
00328
00329 } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00330 OpenDDS::DCPS::DataWriterImpl* servant
00331 = dynamic_cast<OpenDDS::DCPS::DataWriterImpl*>(dataWriter.in());
00332
00333 if (0 == servant) {
00334 ACE_DEBUG((LM_WARNING,
00335 ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ")
00336 ACE_TEXT("unable to extract typed OwnerUpdate writer.\n")));
00337
00338 } else {
00339 OpenDDS::DCPS::RepoIdConverter converter(servant->get_publication_id());
00340 ACE_DEBUG((LM_DEBUG,
00341 ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
00342 ACE_TEXT("created federation OwnerUpdate writer %C for repository %d\n"),
00343 std::string(converter).c_str(),
00344 this->id().id()));
00345 }
00346 }
00347
00348 description = this->federationParticipant_->lookup_topicdescription(OWNERUPDATETOPICNAME);
00349 dataReader = subscriber->create_datareader(
00350 description.in(),
00351 readerQos,
00352 &this->ownerListener_,
00353 OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00354
00355 if (CORBA::is_nil(dataReader.in())) {
00356 ACE_ERROR((LM_ERROR,
00357 ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00358 ACE_TEXT("failed to create OwnerUpdate reader for repository %d\n"),
00359 this->id().id()));
00360 throw Incomplete();
00361
00362 } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00363 OpenDDS::DCPS::DataReaderImpl* servant
00364 = dynamic_cast<OpenDDS::DCPS::DataReaderImpl*>(dataReader.in());
00365
00366 if (0 == servant) {
00367 ACE_DEBUG((LM_WARNING,
00368 ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ")
00369 ACE_TEXT("unable to extract typed OwnerUpdate reader.\n")));
00370
00371 } else {
00372 OpenDDS::DCPS::RepoIdConverter converter(servant->get_subscription_id());
00373 ACE_DEBUG((LM_DEBUG,
00374 ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
00375 ACE_TEXT("created federation OwnerUpdate reader %C for repository %d\n"),
00376 std::string(converter).c_str(),
00377 this->id().id()));
00378 }
00379 }
00380
00381 topic = this->federationParticipant_->create_topic(
00382 TOPICUPDATETOPICNAME,
00383 TOPICUPDATETYPENAME,
00384 TOPIC_QOS_DEFAULT,
00385 DDS::TopicListener::_nil(),
00386 OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00387 dataWriter = publisher->create_datawriter(
00388 topic.in(),
00389 writerQos,
00390 DDS::DataWriterListener::_nil(),
00391 OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00392
00393 if (CORBA::is_nil(dataWriter.in())) {
00394 ACE_ERROR((LM_ERROR,
00395 ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00396 ACE_TEXT("failed to create TopicUpdate writer for repository %d\n"),
00397 this->id().id()));
00398 throw Incomplete();
00399 }
00400
00401 this->topicWriter_
00402 = TopicUpdateDataWriter::_narrow(dataWriter.in());
00403
00404 if (::CORBA::is_nil(this->topicWriter_.in())) {
00405 ACE_ERROR((LM_ERROR,
00406 ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00407 ACE_TEXT("failed to extract typed TopicUpdate writer.\n")));
00408 throw Incomplete();
00409
00410 } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00411 OpenDDS::DCPS::DataWriterImpl* servant
00412 = dynamic_cast<OpenDDS::DCPS::DataWriterImpl*>(dataWriter.in());
00413
00414 if (0 == servant) {
00415 ACE_DEBUG((LM_WARNING,
00416 ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ")
00417 ACE_TEXT("unable to extract typed TopicUpdate writer.\n")));
00418
00419 } else {
00420 OpenDDS::DCPS::RepoIdConverter converter(servant->get_publication_id());
00421 ACE_DEBUG((LM_DEBUG,
00422 ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
00423 ACE_TEXT("created federation TopicUpdate writer %C for repository %d\n"),
00424 std::string(converter).c_str(),
00425 this->id().id()));
00426 }
00427 }
00428
00429 description = this->federationParticipant_->lookup_topicdescription(TOPICUPDATETOPICNAME);
00430 dataReader = subscriber->create_datareader(
00431 description.in(),
00432 readerQos,
00433 &this->topicListener_,
00434 OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00435
00436 if (CORBA::is_nil(dataReader.in())) {
00437 ACE_ERROR((LM_ERROR,
00438 ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00439 ACE_TEXT("failed to create TopicUpdate reader for repository %d\n"),
00440 this->id().id()));
00441 throw Incomplete();
00442
00443 } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00444 OpenDDS::DCPS::DataReaderImpl* servant
00445 = dynamic_cast<OpenDDS::DCPS::DataReaderImpl*>(dataReader.in());
00446
00447 if (0 == servant) {
00448 ACE_DEBUG((LM_WARNING,
00449 ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ")
00450 ACE_TEXT("unable to extract typed TopicUpdate reader.\n")));
00451
00452 } else {
00453 OpenDDS::DCPS::RepoIdConverter converter(servant->get_subscription_id());
00454 ACE_DEBUG((LM_DEBUG,
00455 ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
00456 ACE_TEXT("created federation TopicUpdate reader %C for repository %d\n"),
00457 std::string(converter).c_str(),
00458 this->id().id()));
00459 }
00460 }
00461
00462 topic = this->federationParticipant_->create_topic(
00463 PARTICIPANTUPDATETOPICNAME,
00464 PARTICIPANTUPDATETYPENAME,
00465 TOPIC_QOS_DEFAULT,
00466 DDS::TopicListener::_nil(),
00467 OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00468 dataWriter = publisher->create_datawriter(
00469 topic.in(),
00470 writerQos,
00471 DDS::DataWriterListener::_nil(),
00472 OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00473
00474 if (CORBA::is_nil(dataWriter.in())) {
00475 ACE_ERROR((LM_ERROR,
00476 ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00477 ACE_TEXT("failed to create ParticipantUpdate writer for repository %d\n"),
00478 this->id().id()));
00479 throw Incomplete();
00480 }
00481
00482 this->participantWriter_
00483 = ParticipantUpdateDataWriter::_narrow(dataWriter.in());
00484
00485 if (::CORBA::is_nil(this->participantWriter_.in())) {
00486 ACE_ERROR((LM_ERROR,
00487 ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00488 ACE_TEXT("failed to extract typed ParticipantUpdate writer.\n")));
00489 throw Incomplete();
00490
00491 } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00492 OpenDDS::DCPS::DataWriterImpl* servant
00493 = dynamic_cast<OpenDDS::DCPS::DataWriterImpl*>(dataWriter.in());
00494
00495 if (0 == servant) {
00496 ACE_DEBUG((LM_WARNING,
00497 ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ")
00498 ACE_TEXT("unable to extract typed ParticipantUpdate writer.\n")));
00499
00500 } else {
00501 OpenDDS::DCPS::RepoIdConverter converter(servant->get_publication_id());
00502 ACE_DEBUG((LM_DEBUG,
00503 ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
00504 ACE_TEXT("created federation ParticipantUpdate writer %C for repository %d\n"),
00505 std::string(converter).c_str(),
00506 this->id().id()));
00507 }
00508 }
00509
00510 description = this->federationParticipant_->lookup_topicdescription(PARTICIPANTUPDATETOPICNAME);
00511 dataReader = subscriber->create_datareader(
00512 description.in(),
00513 readerQos,
00514 &this->participantListener_,
00515 OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00516
00517 if (CORBA::is_nil(dataReader.in())) {
00518 ACE_ERROR((LM_ERROR,
00519 ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00520 ACE_TEXT("failed to create ParticipantUpdate reader for repository %d\n"),
00521 this->id().id()));
00522 throw Incomplete();
00523
00524 } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00525 OpenDDS::DCPS::DataReaderImpl* servant
00526 = dynamic_cast<OpenDDS::DCPS::DataReaderImpl*>(dataReader.in());
00527
00528 if (0 == servant) {
00529 ACE_DEBUG((LM_WARNING,
00530 ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ")
00531 ACE_TEXT("unable to extract typed ParticipantUpdate reader.\n")));
00532
00533 } else {
00534 OpenDDS::DCPS::RepoIdConverter converter(servant->get_subscription_id());
00535 ACE_DEBUG((LM_DEBUG,
00536 ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
00537 ACE_TEXT("created federation ParticipantUpdate reader %C for repository %d\n"),
00538 std::string(converter).c_str(),
00539 this->id().id()));
00540 }
00541 }
00542
00543 topic = this->federationParticipant_->create_topic(
00544 PUBLICATIONUPDATETOPICNAME,
00545 PUBLICATIONUPDATETYPENAME,
00546 TOPIC_QOS_DEFAULT,
00547 DDS::TopicListener::_nil(),
00548 OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00549 dataWriter = publisher->create_datawriter(
00550 topic.in(),
00551 writerQos,
00552 DDS::DataWriterListener::_nil(),
00553 OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00554
00555 if (CORBA::is_nil(dataWriter.in())) {
00556 ACE_ERROR((LM_ERROR,
00557 ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00558 ACE_TEXT("failed to create PublicationUpdate writer for repository %d\n"),
00559 this->id().id()));
00560 throw Incomplete();
00561 }
00562
00563 this->publicationWriter_
00564 = PublicationUpdateDataWriter::_narrow(dataWriter.in());
00565
00566 if (::CORBA::is_nil(this->publicationWriter_.in())) {
00567 ACE_ERROR((LM_ERROR,
00568 ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00569 ACE_TEXT("failed to extract typed PublicationUpdate writer.\n")));
00570 throw Incomplete();
00571
00572 } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00573 OpenDDS::DCPS::DataWriterImpl* servant
00574 = dynamic_cast<OpenDDS::DCPS::DataWriterImpl*>(dataWriter.in());
00575
00576 if (0 == servant) {
00577 ACE_DEBUG((LM_WARNING,
00578 ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ")
00579 ACE_TEXT("unable to extract typed PublicationUpdate writer.\n")));
00580
00581 } else {
00582 OpenDDS::DCPS::RepoIdConverter converter(servant->get_publication_id());
00583 ACE_DEBUG((LM_DEBUG,
00584 ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
00585 ACE_TEXT("created federation PublicationUpdate writer %C for repository %d\n"),
00586 std::string(converter).c_str(),
00587 this->id().id()));
00588 }
00589 }
00590
00591 description = this->federationParticipant_->lookup_topicdescription(PUBLICATIONUPDATETOPICNAME);
00592 dataReader = subscriber->create_datareader(
00593 description.in(),
00594 readerQos,
00595 &this->publicationListener_,
00596 OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00597
00598 if (CORBA::is_nil(dataReader.in())) {
00599 ACE_ERROR((LM_ERROR,
00600 ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00601 ACE_TEXT("failed to create PublicationUpdate reader for repository %d\n"),
00602 this->id().id()));
00603 throw Incomplete();
00604
00605 } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00606 OpenDDS::DCPS::DataReaderImpl* servant
00607 = dynamic_cast<OpenDDS::DCPS::DataReaderImpl*>(dataReader.in());
00608
00609 if (0 == servant) {
00610 ACE_DEBUG((LM_WARNING,
00611 ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ")
00612 ACE_TEXT("unable to extract typed PublicationUpdate reader.\n")));
00613
00614 } else {
00615 OpenDDS::DCPS::RepoIdConverter converter(servant->get_subscription_id());
00616 ACE_DEBUG((LM_DEBUG,
00617 ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
00618 ACE_TEXT("created federation PublicationUpdate reader %C for repository %d\n"),
00619 std::string(converter).c_str(),
00620 this->id().id()));
00621 }
00622 }
00623
00624 topic = this->federationParticipant_->create_topic(
00625 SUBSCRIPTIONUPDATETOPICNAME,
00626 SUBSCRIPTIONUPDATETYPENAME,
00627 TOPIC_QOS_DEFAULT,
00628 DDS::TopicListener::_nil(),
00629 OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00630 dataWriter = publisher->create_datawriter(
00631 topic.in(),
00632 writerQos,
00633 DDS::DataWriterListener::_nil(),
00634 OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00635
00636 if (CORBA::is_nil(dataWriter.in())) {
00637 ACE_ERROR((LM_ERROR,
00638 ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00639 ACE_TEXT("failed to create SubscriptionUpdate writer for repository %d\n"),
00640 this->id().id()));
00641 throw Incomplete();
00642 }
00643
00644 this->subscriptionWriter_
00645 = SubscriptionUpdateDataWriter::_narrow(dataWriter.in());
00646
00647 if (::CORBA::is_nil(this->subscriptionWriter_.in())) {
00648 ACE_ERROR((LM_ERROR,
00649 ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00650 ACE_TEXT("failed to extract typed SubscriptionUpdate writer.\n")));
00651 throw Incomplete();
00652
00653 } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00654 OpenDDS::DCPS::DataWriterImpl* servant
00655 = dynamic_cast<OpenDDS::DCPS::DataWriterImpl*>(dataWriter.in());
00656
00657 if (0 == servant) {
00658 ACE_DEBUG((LM_WARNING,
00659 ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ")
00660 ACE_TEXT("unable to extract typed SubscriptionUpdate writer.\n")));
00661
00662 } else {
00663 OpenDDS::DCPS::RepoIdConverter converter(servant->get_publication_id());
00664 ACE_DEBUG((LM_DEBUG,
00665 ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
00666 ACE_TEXT("created federation SubscriptionUpdate writer %C for repository %d\n"),
00667 std::string(converter).c_str(),
00668 this->id().id()));
00669 }
00670 }
00671
00672 description = this->federationParticipant_->lookup_topicdescription(SUBSCRIPTIONUPDATETOPICNAME);
00673 dataReader = subscriber->create_datareader(
00674 description.in(),
00675 readerQos,
00676 &this->subscriptionListener_,
00677 OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00678
00679 if (CORBA::is_nil(dataReader.in())) {
00680 ACE_ERROR((LM_ERROR,
00681 ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00682 ACE_TEXT("failed to create SubscriptionUpdate reader for repository %d\n"),
00683 this->id().id()));
00684 throw Incomplete();
00685
00686 } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00687 OpenDDS::DCPS::DataReaderImpl* servant
00688 = dynamic_cast<OpenDDS::DCPS::DataReaderImpl*>(dataReader.in());
00689
00690 if (0 == servant) {
00691 ACE_DEBUG((LM_WARNING,
00692 ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ")
00693 ACE_TEXT("unable to extract typed SubscriptionUpdate reader.\n")));
00694
00695 } else {
00696 OpenDDS::DCPS::RepoIdConverter converter(servant->get_subscription_id());
00697 ACE_DEBUG((LM_DEBUG,
00698 ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
00699 ACE_TEXT("created federation SubscriptionUpdate reader %C for repository %d\n"),
00700 std::string(converter).c_str(),
00701 this->id().id()));
00702 }
00703 }
00704
00705
00706 #if defined (ACE_HAS_IP_MULTICAST)
00707
00708 if (this->multicastEnabled_) {
00709
00710
00711
00712
00713 ACE_Reactor *reactor = this->orb_->orb_core()->reactor();
00714
00715
00716 ACE_CString mde(this->orb_->orb_core()->orb_params()->mcast_discovery_endpoint());
00717
00718
00719
00720 u_short port = 0;
00721
00722
00723 const char *port_number = ACE_OS::getenv("OpenDDSFederationPort");
00724
00725 if (port_number != 0) {
00726 port = static_cast<u_short>(ACE_OS::atoi(port_number));
00727 }
00728
00729
00730
00731 if (port == 0)
00732 port = OpenDDS::Federator::Defaults::DiscoveryRequestPort;
00733
00734
00735 if (mde.length() != 0) {
00736 if (this->multicastResponder_.init(
00737 this->orb_.in(),
00738 mde.c_str()) == -1) {
00739 ACE_ERROR((LM_ERROR,
00740 ACE_TEXT("(%P|%t) ERROR: Unable to initialize ")
00741 ACE_TEXT("the multicast responder for repository %d.\n"),
00742 this->id().id()));
00743 throw Incomplete();
00744 }
00745
00746 } else {
00747 if (this->multicastResponder_.init(
00748 this->orb_.in(),
00749 port,
00750 #if defined (ACE_HAS_IPV6)
00751 ACE_DEFAULT_MULTICASTV6_ADDR
00752 #else
00753 ACE_DEFAULT_MULTICAST_ADDR
00754 #endif
00755 )) {
00756 ACE_ERROR((LM_ERROR,
00757 ACE_TEXT("(%P|%t) ERROR: Unable to initialize ")
00758 ACE_TEXT("the multicast responder for repository %d.\n"),
00759 this->id().id()));
00760 throw Incomplete();
00761 }
00762 }
00763
00764
00765 if (reactor->register_handler(&this->multicastResponder_,
00766 ACE_Event_Handler::READ_MASK) == -1) {
00767 ACE_ERROR((LM_ERROR,
00768 ACE_TEXT("(%P|%t) ERROR: Unable to register event handler ")
00769 ACE_TEXT("for repository %d.\n"),
00770 this->id().id()));
00771 throw Incomplete();
00772 }
00773
00774 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00775 ACE_DEBUG((LM_DEBUG,
00776 ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
00777 ACE_TEXT("multicast server setup is complete.\n")));
00778 }
00779 }
00780
00781 #else
00782 ACE_UNUSED_ARG(this->multicastEnabled_);
00783 #endif
00784 }
00785
00786 void
00787 ManagerImpl::finalize()
00788 {
00789 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00790 ACE_DEBUG((LM_DEBUG,
00791 ACE_TEXT("(%P|%t) Federator::ManagerImpl::finalize()\n")));
00792 }
00793
00794 ownerListener_.stop();
00795 topicListener_.stop();
00796 participantListener_.stop();
00797 publicationListener_.stop();
00798 subscriptionListener_.stop();
00799 ownerListener_.join();
00800 topicListener_.join();
00801 participantListener_.join();
00802 publicationListener_.join();
00803 subscriptionListener_.join();
00804
00805 if (this->federated_) {
00806 try {
00807 IdToManagerMap::iterator where = this->peers_.find(this->joinRepo_);
00808
00809 if (where == this->peers_.end()) {
00810 ACE_DEBUG((LM_DEBUG,
00811 ACE_TEXT("(%P|%t) Federator::Manager::finalize: ")
00812 ACE_TEXT("repository %d - all attachment to federation left.\n"),
00813 this->id().id()));
00814
00815 } else {
00816 if (CORBA::is_nil(where->second.in())) {
00817 ACE_ERROR((LM_ERROR,
00818 ACE_TEXT("(%P|%t) ERROR: Federator::Manager::finalize: ")
00819 ACE_TEXT("repository %d not currently attached to a federation.\n"),
00820 this->id().id()));
00821
00822 } else {
00823 where->second->leave_federation(this->id().id());
00824 this->federated_ = false;
00825 }
00826 }
00827
00828 } catch (const CORBA::Exception& ex) {
00829 ex._tao_print_exception(
00830 ACE_TEXT("ERROR: Federator::ManagerImpl::finalize() - ")
00831 ACE_TEXT("unable to leave remote federation "));
00832 throw Incomplete();
00833 }
00834 }
00835
00836 if (!CORBA::is_nil(this->orb_.in()) && (0 != this->orb_->orb_core())) {
00837 this->orb_->orb_core()->reactor()->remove_handler(
00838 &this->multicastResponder_,
00839 ACE_Event_Handler::READ_MASK | ACE_Event_Handler::DONT_CALL);
00840 }
00841
00842
00843 if (0 == CORBA::is_nil(this->federationParticipant_.in())) {
00844 DDS::DomainParticipantFactory_var dpf = TheParticipantFactory;
00845 if (DDS::RETCODE_PRECONDITION_NOT_MET
00846 == this->federationParticipant_->delete_contained_entities()) {
00847 ACE_ERROR((LM_ERROR,
00848 ACE_TEXT("(%P|%t) ERROR: Federator::Manager ")
00849 ACE_TEXT("unable to release resources for repository %d.\n"),
00850 this->id().id()));
00851
00852 } else if (DDS::RETCODE_PRECONDITION_NOT_MET
00853 == dpf->delete_participant(this->federationParticipant_.in())) {
00854 ACE_ERROR((LM_ERROR,
00855 ACE_TEXT("(%P|%t) ERROR: Federator::Manager ")
00856 ACE_TEXT("unable to release the participant for repository %d.\n"),
00857 this->id().id()));
00858 }
00859 }
00860 }
00861
00862
00863
00864 RepoKey
00865 ManagerImpl::federation_id()
00866 {
00867 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00868 ACE_DEBUG((LM_DEBUG,
00869 ACE_TEXT("(%P|%t) ManagerImpl::federation_id()\n")));
00870 }
00871
00872 return this->id().id();
00873 }
00874
00875 OpenDDS::DCPS::DCPSInfo_ptr
00876 ManagerImpl::repository()
00877 {
00878 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00879 ACE_DEBUG((LM_DEBUG,
00880 ACE_TEXT("(%P|%t) ManagerImpl::repository()\n")));
00881 }
00882
00883 OpenDDS::DCPS::Discovery_rch disco
00884 = TheServiceParticipant->get_discovery(
00885 this->config_.federationDomain());
00886 OpenDDS::DCPS::DCPSInfo_var repo;
00887 if (!disco.is_nil()) {
00888 OpenDDS::DCPS::InfoRepoDiscovery_rch irDisco =
00889 DCPS::static_rchandle_cast<DCPS::InfoRepoDiscovery>(disco);
00890 repo = irDisco->get_dcps_info();
00891 }
00892
00893 if (CORBA::is_nil(repo.in())) {
00894 return OpenDDS::DCPS::DCPSInfo::_duplicate(this->localRepo_.in());
00895
00896 } else {
00897 return OpenDDS::DCPS::DCPSInfo::_duplicate(repo.in());
00898 }
00899 }
00900
00901 CORBA::Boolean
00902 ManagerImpl::discover_federation(const char * ior)
00903 {
00904 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00905 ACE_DEBUG((LM_DEBUG,
00906 ACE_TEXT("(%P|%t) ManagerImpl::discover_federation( %C)\n"),
00907 ior));
00908 }
00909
00910
00911 return false;
00912 }
00913
00914 Manager_ptr
00915 ManagerImpl::join_federation(
00916 Manager_ptr peer,
00917 FederationDomain federation
00918 )
00919 {
00920 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00921 ACE_DEBUG((LM_DEBUG,
00922 ACE_TEXT("(%P|%t) ManagerImpl::join_federation( peer, %d)\n"),
00923 federation));
00924 }
00925
00926 RepoKey remote = NIL_REPOSITORY;
00927
00928 try {
00929
00930 remote = peer->federation_id();
00931
00932 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00933 ACE_DEBUG((LM_DEBUG,
00934 ACE_TEXT("(%P|%t) Federator::ManagerImpl::join_federation() - ")
00935 ACE_TEXT("repo id %d entered from repository with id %d.\n"),
00936 this->id().id(),
00937 remote));
00938 }
00939
00940 } catch (const CORBA::Exception& ex) {
00941 ex._tao_print_exception(
00942 ACE_TEXT("ERROR: Federator::ManagerImpl::join_federation() - ")
00943 ACE_TEXT("unable to obtain remote federation Id value: "));
00944 throw Incomplete();
00945 }
00946
00947
00948 if (this->joiner_ == remote) {
00949 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00950 ACE_DEBUG((LM_DEBUG,
00951 ACE_TEXT("(%P|%t) Federator::ManagerImpl::join_federation() - ")
00952 ACE_TEXT("repo id %d leaving after reentry from repository with id %d.\n"),
00953 this->id().id(),
00954 remote));
00955 }
00956
00957 return this->_this();
00958
00959 } else {
00960
00961 ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, guard, this->lock_, 0);
00962
00963 while (this->joiner_ != NIL_REPOSITORY) {
00964
00965 this->joining_.wait();
00966
00967
00968 if (this->joiner_ == remote) {
00969 return this->_this();
00970 }
00971 }
00972
00973
00974 this->joiner_ = remote;
00975 }
00976
00977
00978
00979
00980
00981
00982
00983
00984
00985 if (this->federated_ == false) {
00986
00987
00988 try {
00989
00990
00991 this->joinRepo_ = remote;
00992
00993
00994 OpenDDS::DCPS::DCPSInfo_var remoteRepo = peer->repository();
00995
00996 CORBA::ORB_var orb = remoteRepo->_get_orb();
00997 CORBA::String_var remoteRepoIor = orb->object_to_string(remoteRepo.in());
00998 if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00999 ACE_DEBUG((LM_DEBUG,
01000 ACE_TEXT("(%P|%t) FederatorManagerImpl::join_federation() - ")
01001 ACE_TEXT("id %d obtained reference to id %d:\n")
01002 ACE_TEXT("\t%C\n"),
01003 this->id().id(),
01004 remote,
01005 remoteRepoIor.in()));
01006 }
01007
01008
01009 std::ostringstream oss;
01010 oss << remote;
01011 std::string key_string = oss.str();
01012 TheServiceParticipant->set_repo_ior(remoteRepoIor.in(), key_string);
01013 TheServiceParticipant->set_repo_domain(this->config_.federationDomain(), key_string);
01014
01015 } catch (const CORBA::Exception& ex) {
01016 ex._tao_print_exception(
01017 "ERROR: Federator::ManagerImpl::join_federation() - Unable to join with remote: ");
01018 throw Incomplete();
01019 }
01020 }
01021
01022
01023 try {
01024 Manager_var remoteManager
01025 = peer->join_federation(this->_this(), this->config_.federationDomain());
01026
01027 if (this->joinRepo_ == remote) {
01028 this->peers_[ this->joinRepo_]
01029 = OpenDDS::Federator::Manager::_duplicate(remoteManager.in());
01030 }
01031
01032
01033
01034
01035
01036
01037 if (OpenDDS::DCPS::DCPS_debug_level > 4) {
01038 ACE_DEBUG((LM_DEBUG,
01039 ACE_TEXT("(%P|%t) Federator::ManagerImpl::join_federation() - ")
01040 ACE_TEXT("repo id %d pushing state to repository with id %d.\n"),
01041 this->id().id(),
01042 remote));
01043 }
01044
01045 this->pushState(peer);
01046
01047 } catch (const CORBA::Exception& ex) {
01048 ex._tao_print_exception(
01049 "ERROR: Federator::ManagerImpl::join_federation() - unsuccessful call to remote->join: ");
01050 throw Incomplete();
01051 }
01052
01053 if (CORBA::is_nil(this->participantWriter_.in())) {
01054
01055
01056
01057
01058 this->initialize();
01059 }
01060
01061
01062 if (OpenDDS::DCPS::DCPS_debug_level > 4) {
01063 ACE_DEBUG((LM_DEBUG,
01064 ACE_TEXT("(%P|%t) Federator::ManagerImpl::join_federation() - ")
01065 ACE_TEXT("repo id %d joined to repository with id %d.\n"),
01066 this->id().id(),
01067 remote));
01068 }
01069
01070 this->federated_ = true;
01071 this->joiner_ = NIL_REPOSITORY;
01072 this->joining_.signal();
01073 return this->_this();
01074 }
01075
01076 void
01077 ManagerImpl::leave_federation(
01078 RepoKey id)
01079 {
01080 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
01081 ACE_DEBUG((LM_DEBUG,
01082 ACE_TEXT("(%P|%t) ManagerImpl::leave_federation( %d)\n"),
01083 this->id().id()));
01084 }
01085
01086
01087 IdToManagerMap::iterator where = this->peers_.find(id);
01088
01089 if (where != this->peers_.end()) {
01090 this->peers_.erase(where);
01091 }
01092
01093
01094 if (false
01095 == this->info_->remove_by_owner(this->config_.federationDomain(), id)) {
01096 throw Incomplete();
01097 }
01098
01099 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
01100 ACE_DEBUG((LM_DEBUG,
01101 ACE_TEXT("(%P|%t) ManagerImpl::leave_federation( %d) complete.\n"),
01102 this->id().id()));
01103 }
01104 }
01105
01106 void
01107 ManagerImpl::leave_and_shutdown(
01108 void)
01109 {
01110
01111 this->info_->shutdown();
01112 }
01113
01114 void
01115 ManagerImpl::shutdown(
01116 void)
01117 {
01118
01119
01120 this->federated_ = false;
01121
01122
01123 this->info_->shutdown();
01124 }
01125
01126 void
01127 ManagerImpl::initializeOwner(
01128 const OpenDDS::Federator::OwnerUpdate & data)
01129 {
01130 this->processCreate(&data, 0);
01131 }
01132
01133 void
01134 ManagerImpl::initializeTopic(
01135 const OpenDDS::Federator::TopicUpdate & data)
01136 {
01137 this->processCreate(&data, 0);
01138 }
01139
01140 void
01141 ManagerImpl::initializeParticipant(
01142 const OpenDDS::Federator::ParticipantUpdate & data)
01143 {
01144 this->processCreate(&data, 0);
01145 }
01146
01147 void
01148 ManagerImpl::initializePublication(
01149 const OpenDDS::Federator::PublicationUpdate & data)
01150 {
01151 this->processCreate(&data, 0);
01152 }
01153
01154 void
01155 ManagerImpl::initializeSubscription(
01156 const OpenDDS::Federator::SubscriptionUpdate & data)
01157 {
01158 this->processCreate(&data, 0);
01159 }
01160
01161 }
01162 }