00001
00002
00003
00004
00005
00006
00007
00008 #include "DcpsInfo_pch.h"
00009 #include "tao/ORB_Core.h"
00010 #include "FederatorManagerImpl.h"
00011 #include "DCPSInfo_i.h"
00012 #include "DefaultValues.h"
00013 #include "dds/DCPS/InfoRepoDiscovery/InfoRepoDiscovery.h"
00014 #include "dds/DCPS/SubscriberImpl.h"
00015 #include "dds/DCPS/Service_Participant.h"
00016 #include "dds/DCPS/Marked_Default_Qos.h"
00017 #include "dds/DCPS/RepoIdConverter.h"
00018 #include "dds/DCPS/transport/framework/TransportRegistry.h"
00019 #include "dds/DCPS/transport/framework/TransportExceptions.h"
00020 #include "dds/DCPS/transport/tcp/TcpInst.h"
00021 #include "dds/DCPS/transport/tcp/Tcp.h"
00022 #include "ace/Log_Priority.h"
00023 #include "ace/Log_Msg.h"
00024
00025 #include "FederatorTypeSupportC.h"
00026 #include "FederatorTypeSupportImpl.h"
00027
00028 #include <sstream>
00029
00030 #if !defined (__ACE_INLINE__)
00031 # include "FederatorManagerImpl.inl"
00032 #endif
00033
00034 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00035
00036 namespace OpenDDS {
00037 namespace Federator {
00038
00039 ManagerImpl::ManagerImpl(Config& config)
00040 : joining_(this->lock_),
00041 joiner_(NIL_REPOSITORY),
00042 joinRepo_(NIL_REPOSITORY),
00043 federated_(false),
00044 config_(config),
00045 info_(0),
00046 ownerListener_(*this),
00047 topicListener_(*this),
00048 participantListener_(*this),
00049 publicationListener_(*this),
00050 subscriptionListener_(*this),
00051 multicastEnabled_(false)
00052 {
00053 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00054 ACE_DEBUG((LM_DEBUG,
00055 ACE_TEXT("(%P|%t) Federator::ManagerImpl::ManagerImpl()\n")));
00056 }
00057
00058 char* mdec = ACE_OS::getenv("MulticastDiscoveryEnabled");
00059
00060 if (mdec != 0) {
00061 std::string mde(ACE_OS::getenv("MulticastDiscoveryEnabled"));
00062
00063 if (mde != "0") {
00064 multicastEnabled_ = true;
00065 }
00066 }
00067 }
00068
00069 ManagerImpl::~ManagerImpl()
00070 {
00071 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00072 ACE_DEBUG((LM_DEBUG,
00073 ACE_TEXT("(%P|%t) Federator::ManagerImpl::~ManagerImpl()\n")));
00074 }
00075 }
00076
00077 void
00078 ManagerImpl::initialize()
00079 {
00080 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00081 ACE_DEBUG((LM_DEBUG,
00082 ACE_TEXT("(%P|%t) Federation::ManagerImpl::initialize()\n")));
00083 }
00084
00085
00086
00087 this->ownerListener_.federationId(this->id());
00088 this->topicListener_.federationId(this->id());
00089 this->participantListener_.federationId(this->id());
00090 this->publicationListener_.federationId(this->id());
00091 this->subscriptionListener_.federationId(this->id());
00092
00093
00094 DDS::DomainParticipantFactory_var dpf = TheParticipantFactory;
00095 this->federationParticipant_
00096 = dpf->create_participant(
00097 this->config_.federationDomain(),
00098 PARTICIPANT_QOS_DEFAULT,
00099 DDS::DomainParticipantListener::_nil(),
00100 OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00101
00102 if (CORBA::is_nil(this->federationParticipant_.in())) {
00103 ACE_ERROR((LM_ERROR,
00104 ACE_TEXT("(%P|%t) ERROR: create_participant failed for ")
00105 ACE_TEXT("repository %d in federation domain %d.\n"),
00106 this->id().id(),
00107 this->config_.federationDomain()));
00108 throw Incomplete();
00109 }
00110
00111
00112
00113
00114 OwnerUpdateTypeSupportImpl::_var_type ownerUpdate = new OwnerUpdateTypeSupportImpl();
00115
00116 if (DDS::RETCODE_OK != ownerUpdate->register_type(
00117 this->federationParticipant_.in(),
00118 OWNERUPDATETYPENAME)) {
00119 ACE_ERROR((LM_ERROR,
00120 ACE_TEXT("(%P|%t) ERROR: Unable to install ")
00121 ACE_TEXT("OwnerUpdate type support for repository %d.\n"),
00122 this->id().id()));
00123 throw Incomplete();
00124 }
00125
00126 ParticipantUpdateTypeSupportImpl::_var_type participantUpdate = new ParticipantUpdateTypeSupportImpl();
00127
00128 if (DDS::RETCODE_OK != participantUpdate->register_type(
00129 this->federationParticipant_.in(),
00130 PARTICIPANTUPDATETYPENAME)) {
00131 ACE_ERROR((LM_ERROR,
00132 ACE_TEXT("(%P|%t) ERROR: Unable to install ")
00133 ACE_TEXT("ParticipantUpdate type support for repository %d.\n"),
00134 this->id().id()));
00135 throw Incomplete();
00136 }
00137
00138 TopicUpdateTypeSupportImpl::_var_type topicUpdate = new TopicUpdateTypeSupportImpl();
00139
00140 if (DDS::RETCODE_OK != topicUpdate->register_type(
00141 this->federationParticipant_.in(),
00142 TOPICUPDATETYPENAME)) {
00143 ACE_ERROR((LM_ERROR,
00144 ACE_TEXT("(%P|%t) ERROR: Unable to install ")
00145 ACE_TEXT("TopicUpdate type support for repository %d.\n"),
00146 this->id().id()));
00147 throw Incomplete();
00148 }
00149
00150 PublicationUpdateTypeSupportImpl::_var_type publicationUpdate = new PublicationUpdateTypeSupportImpl();
00151
00152 if (DDS::RETCODE_OK != publicationUpdate->register_type(
00153 this->federationParticipant_.in(),
00154 PUBLICATIONUPDATETYPENAME)) {
00155 ACE_ERROR((LM_ERROR,
00156 ACE_TEXT("(%P|%t) ERROR: Unable to install ")
00157 ACE_TEXT("PublicationUpdate type support for repository %d.\n"),
00158 this->id().id()));
00159 throw Incomplete();
00160 }
00161
00162 SubscriptionUpdateTypeSupportImpl::_var_type subscriptionUpdate = new SubscriptionUpdateTypeSupportImpl();
00163
00164 if (DDS::RETCODE_OK != subscriptionUpdate->register_type(
00165 this->federationParticipant_.in(),
00166 SUBSCRIPTIONUPDATETYPENAME)) {
00167 ACE_ERROR((LM_ERROR,
00168 ACE_TEXT("(%P|%t) ERROR: Unable to install ")
00169 ACE_TEXT("SubscriptionUpdate type support for repository %d.\n"),
00170 this->id().id()));
00171 throw Incomplete();
00172 }
00173
00174
00175
00176
00177 std::string config_name =
00178 OpenDDS::DCPS::TransportRegistry::DEFAULT_INST_PREFIX
00179 + std::string("FederationBITTransportConfig");
00180 OpenDDS::DCPS::TransportConfig_rch config =
00181 OpenDDS::DCPS::TransportRegistry::instance()->create_config(config_name);
00182
00183 std::string inst_name = OpenDDS::DCPS::TransportRegistry::DEFAULT_INST_PREFIX
00184 + std::string("FederationBITTCPTransportInst");
00185 OpenDDS::DCPS::TransportInst_rch inst =
00186 OpenDDS::DCPS::TransportRegistry::instance()->create_inst(inst_name,
00187 "tcp");
00188 config->instances_.push_back(inst);
00189
00190
00191
00192
00193
00194 DDS::Subscriber_var subscriber
00195 = this->federationParticipant_->create_subscriber(
00196 SUBSCRIBER_QOS_DEFAULT,
00197 DDS::SubscriberListener::_nil(),
00198 OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00199
00200 if (CORBA::is_nil(subscriber.in())) {
00201 ACE_ERROR((LM_ERROR,
00202 ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00203 ACE_TEXT("failed to create subscriber for repository %d\n"),
00204 this->id().id()));
00205 throw Incomplete();
00206
00207 } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00208 ACE_DEBUG((LM_DEBUG,
00209 ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
00210 ACE_TEXT("created federation subscriber for repository %d\n"),
00211 this->id().id()));
00212
00213 }
00214
00215
00216
00217 try {
00218 OpenDDS::DCPS::TransportRegistry::instance()->bind_config(config,
00219 subscriber.in());
00220 } catch (const OpenDDS::DCPS::Transport::Exception&) {
00221 ACE_ERROR((LM_ERROR,
00222 ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00223 ACE_TEXT("failed to bind transport config to federation subscriber.\n")));
00224 throw Incomplete();
00225 }
00226
00227
00228
00229
00230
00231 DDS::Publisher_var publisher
00232 = this->federationParticipant_->create_publisher(
00233 PUBLISHER_QOS_DEFAULT,
00234 DDS::PublisherListener::_nil(),
00235 OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00236
00237 if (CORBA::is_nil(publisher.in())) {
00238 ACE_ERROR((LM_ERROR,
00239 ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00240 ACE_TEXT("failed to create publisher for repository %d\n"),
00241 this->id().id()));
00242 throw Incomplete();
00243
00244 } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00245 ACE_DEBUG((LM_DEBUG,
00246 ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
00247 ACE_TEXT("created federation publisher for repository %d\n"),
00248 this->id().id()));
00249
00250 }
00251
00252
00253
00254 try {
00255 OpenDDS::DCPS::TransportRegistry::instance()->bind_config(config,
00256 publisher.in());
00257 } catch (const OpenDDS::DCPS::Transport::Exception&) {
00258 ACE_ERROR((LM_ERROR,
00259 ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00260 ACE_TEXT("failed to bind transport config to federation publisher.\n")));
00261 throw Incomplete();
00262 }
00263
00264
00265
00266
00267 DDS::Topic_var topic;
00268 DDS::TopicDescription_var description;
00269 DDS::DataReader_var dataReader;
00270 DDS::DataWriter_var dataWriter;
00271
00272 DDS::DataReaderQos readerQos;
00273 subscriber->get_default_datareader_qos(readerQos);
00274 readerQos.durability.kind = DDS::TRANSIENT_LOCAL_DURABILITY_QOS;
00275 readerQos.history.kind = DDS::KEEP_LAST_HISTORY_QOS;
00276 readerQos.history.depth = 50;
00277 readerQos.reliability.kind = DDS::RELIABLE_RELIABILITY_QOS;
00278 readerQos.reliability.max_blocking_time.sec = 0;
00279 readerQos.reliability.max_blocking_time.nanosec = 0;
00280
00281 DDS::DataWriterQos writerQos;
00282 publisher->get_default_datawriter_qos(writerQos);
00283 writerQos.durability.kind = DDS::TRANSIENT_LOCAL_DURABILITY_QOS;
00284 writerQos.history.kind = DDS::KEEP_LAST_HISTORY_QOS;
00285 writerQos.history.depth = 50;
00286 writerQos.reliability.kind = DDS::RELIABLE_RELIABILITY_QOS;
00287 writerQos.reliability.max_blocking_time.sec = 0;
00288 writerQos.reliability.max_blocking_time.nanosec = 0;
00289
00290
00291
00292
00293
00294
00295
00296
00297
00298
00299
00300
00301
00302 topic = this->federationParticipant_->create_topic(
00303 OWNERUPDATETOPICNAME,
00304 OWNERUPDATETYPENAME,
00305 TOPIC_QOS_DEFAULT,
00306 DDS::TopicListener::_nil(),
00307 OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00308
00309 dataWriter = publisher->create_datawriter(
00310 topic.in(),
00311 writerQos,
00312 DDS::DataWriterListener::_nil(),
00313 OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00314
00315 if (CORBA::is_nil(dataWriter.in())) {
00316 ACE_ERROR((LM_ERROR,
00317 ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00318 ACE_TEXT("failed to create OwnerUpdate writer for repository %d\n"),
00319 this->id().id()));
00320 throw Incomplete();
00321 }
00322
00323 this->ownerWriter_ = OwnerUpdateDataWriter::_narrow(dataWriter.in());
00324
00325 if (::CORBA::is_nil(this->ownerWriter_.in())) {
00326 ACE_ERROR((LM_ERROR,
00327 ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00328 ACE_TEXT("failed to extract typed OwnerUpdate writer.\n")));
00329 throw Incomplete();
00330
00331 } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00332 OpenDDS::DCPS::DataWriterImpl* servant
00333 = dynamic_cast<OpenDDS::DCPS::DataWriterImpl*>(dataWriter.in());
00334
00335 if (0 == servant) {
00336 ACE_DEBUG((LM_WARNING,
00337 ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ")
00338 ACE_TEXT("unable to extract typed OwnerUpdate writer.\n")));
00339
00340 } else {
00341 OpenDDS::DCPS::RepoIdConverter converter(servant->get_publication_id());
00342 ACE_DEBUG((LM_DEBUG,
00343 ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
00344 ACE_TEXT("created federation OwnerUpdate writer %C for repository %d\n"),
00345 std::string(converter).c_str(),
00346 this->id().id()));
00347 }
00348 }
00349
00350 description = this->federationParticipant_->lookup_topicdescription(OWNERUPDATETOPICNAME);
00351 dataReader = subscriber->create_datareader(
00352 description.in(),
00353 readerQos,
00354 &this->ownerListener_,
00355 OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00356
00357 if (CORBA::is_nil(dataReader.in())) {
00358 ACE_ERROR((LM_ERROR,
00359 ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00360 ACE_TEXT("failed to create OwnerUpdate reader for repository %d\n"),
00361 this->id().id()));
00362 throw Incomplete();
00363
00364 } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00365 OpenDDS::DCPS::DataReaderImpl* servant
00366 = dynamic_cast<OpenDDS::DCPS::DataReaderImpl*>(dataReader.in());
00367
00368 if (0 == servant) {
00369 ACE_DEBUG((LM_WARNING,
00370 ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ")
00371 ACE_TEXT("unable to extract typed OwnerUpdate reader.\n")));
00372
00373 } else {
00374 OpenDDS::DCPS::RepoIdConverter converter(servant->get_subscription_id());
00375 ACE_DEBUG((LM_DEBUG,
00376 ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
00377 ACE_TEXT("created federation OwnerUpdate reader %C for repository %d\n"),
00378 std::string(converter).c_str(),
00379 this->id().id()));
00380 }
00381 }
00382
00383 topic = this->federationParticipant_->create_topic(
00384 TOPICUPDATETOPICNAME,
00385 TOPICUPDATETYPENAME,
00386 TOPIC_QOS_DEFAULT,
00387 DDS::TopicListener::_nil(),
00388 OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00389 dataWriter = publisher->create_datawriter(
00390 topic.in(),
00391 writerQos,
00392 DDS::DataWriterListener::_nil(),
00393 OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00394
00395 if (CORBA::is_nil(dataWriter.in())) {
00396 ACE_ERROR((LM_ERROR,
00397 ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00398 ACE_TEXT("failed to create TopicUpdate writer for repository %d\n"),
00399 this->id().id()));
00400 throw Incomplete();
00401 }
00402
00403 this->topicWriter_
00404 = TopicUpdateDataWriter::_narrow(dataWriter.in());
00405
00406 if (::CORBA::is_nil(this->topicWriter_.in())) {
00407 ACE_ERROR((LM_ERROR,
00408 ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00409 ACE_TEXT("failed to extract typed TopicUpdate writer.\n")));
00410 throw Incomplete();
00411
00412 } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00413 OpenDDS::DCPS::DataWriterImpl* servant
00414 = dynamic_cast<OpenDDS::DCPS::DataWriterImpl*>(dataWriter.in());
00415
00416 if (0 == servant) {
00417 ACE_DEBUG((LM_WARNING,
00418 ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ")
00419 ACE_TEXT("unable to extract typed TopicUpdate writer.\n")));
00420
00421 } else {
00422 OpenDDS::DCPS::RepoIdConverter converter(servant->get_publication_id());
00423 ACE_DEBUG((LM_DEBUG,
00424 ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
00425 ACE_TEXT("created federation TopicUpdate writer %C for repository %d\n"),
00426 std::string(converter).c_str(),
00427 this->id().id()));
00428 }
00429 }
00430
00431 description = this->federationParticipant_->lookup_topicdescription(TOPICUPDATETOPICNAME);
00432 dataReader = subscriber->create_datareader(
00433 description.in(),
00434 readerQos,
00435 &this->topicListener_,
00436 OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00437
00438 if (CORBA::is_nil(dataReader.in())) {
00439 ACE_ERROR((LM_ERROR,
00440 ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00441 ACE_TEXT("failed to create TopicUpdate reader for repository %d\n"),
00442 this->id().id()));
00443 throw Incomplete();
00444
00445 } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00446 OpenDDS::DCPS::DataReaderImpl* servant
00447 = dynamic_cast<OpenDDS::DCPS::DataReaderImpl*>(dataReader.in());
00448
00449 if (0 == servant) {
00450 ACE_DEBUG((LM_WARNING,
00451 ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ")
00452 ACE_TEXT("unable to extract typed TopicUpdate reader.\n")));
00453
00454 } else {
00455 OpenDDS::DCPS::RepoIdConverter converter(servant->get_subscription_id());
00456 ACE_DEBUG((LM_DEBUG,
00457 ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
00458 ACE_TEXT("created federation TopicUpdate reader %C for repository %d\n"),
00459 std::string(converter).c_str(),
00460 this->id().id()));
00461 }
00462 }
00463
00464 topic = this->federationParticipant_->create_topic(
00465 PARTICIPANTUPDATETOPICNAME,
00466 PARTICIPANTUPDATETYPENAME,
00467 TOPIC_QOS_DEFAULT,
00468 DDS::TopicListener::_nil(),
00469 OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00470 dataWriter = publisher->create_datawriter(
00471 topic.in(),
00472 writerQos,
00473 DDS::DataWriterListener::_nil(),
00474 OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00475
00476 if (CORBA::is_nil(dataWriter.in())) {
00477 ACE_ERROR((LM_ERROR,
00478 ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00479 ACE_TEXT("failed to create ParticipantUpdate writer for repository %d\n"),
00480 this->id().id()));
00481 throw Incomplete();
00482 }
00483
00484 this->participantWriter_
00485 = ParticipantUpdateDataWriter::_narrow(dataWriter.in());
00486
00487 if (::CORBA::is_nil(this->participantWriter_.in())) {
00488 ACE_ERROR((LM_ERROR,
00489 ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00490 ACE_TEXT("failed to extract typed ParticipantUpdate writer.\n")));
00491 throw Incomplete();
00492
00493 } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00494 OpenDDS::DCPS::DataWriterImpl* servant
00495 = dynamic_cast<OpenDDS::DCPS::DataWriterImpl*>(dataWriter.in());
00496
00497 if (0 == servant) {
00498 ACE_DEBUG((LM_WARNING,
00499 ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ")
00500 ACE_TEXT("unable to extract typed ParticipantUpdate writer.\n")));
00501
00502 } else {
00503 OpenDDS::DCPS::RepoIdConverter converter(servant->get_publication_id());
00504 ACE_DEBUG((LM_DEBUG,
00505 ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
00506 ACE_TEXT("created federation ParticipantUpdate writer %C for repository %d\n"),
00507 std::string(converter).c_str(),
00508 this->id().id()));
00509 }
00510 }
00511
00512 description = this->federationParticipant_->lookup_topicdescription(PARTICIPANTUPDATETOPICNAME);
00513 dataReader = subscriber->create_datareader(
00514 description.in(),
00515 readerQos,
00516 &this->participantListener_,
00517 OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00518
00519 if (CORBA::is_nil(dataReader.in())) {
00520 ACE_ERROR((LM_ERROR,
00521 ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00522 ACE_TEXT("failed to create ParticipantUpdate reader for repository %d\n"),
00523 this->id().id()));
00524 throw Incomplete();
00525
00526 } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00527 OpenDDS::DCPS::DataReaderImpl* servant
00528 = dynamic_cast<OpenDDS::DCPS::DataReaderImpl*>(dataReader.in());
00529
00530 if (0 == servant) {
00531 ACE_DEBUG((LM_WARNING,
00532 ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ")
00533 ACE_TEXT("unable to extract typed ParticipantUpdate reader.\n")));
00534
00535 } else {
00536 OpenDDS::DCPS::RepoIdConverter converter(servant->get_subscription_id());
00537 ACE_DEBUG((LM_DEBUG,
00538 ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
00539 ACE_TEXT("created federation ParticipantUpdate reader %C for repository %d\n"),
00540 std::string(converter).c_str(),
00541 this->id().id()));
00542 }
00543 }
00544
00545 topic = this->federationParticipant_->create_topic(
00546 PUBLICATIONUPDATETOPICNAME,
00547 PUBLICATIONUPDATETYPENAME,
00548 TOPIC_QOS_DEFAULT,
00549 DDS::TopicListener::_nil(),
00550 OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00551 dataWriter = publisher->create_datawriter(
00552 topic.in(),
00553 writerQos,
00554 DDS::DataWriterListener::_nil(),
00555 OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00556
00557 if (CORBA::is_nil(dataWriter.in())) {
00558 ACE_ERROR((LM_ERROR,
00559 ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00560 ACE_TEXT("failed to create PublicationUpdate writer for repository %d\n"),
00561 this->id().id()));
00562 throw Incomplete();
00563 }
00564
00565 this->publicationWriter_
00566 = PublicationUpdateDataWriter::_narrow(dataWriter.in());
00567
00568 if (::CORBA::is_nil(this->publicationWriter_.in())) {
00569 ACE_ERROR((LM_ERROR,
00570 ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00571 ACE_TEXT("failed to extract typed PublicationUpdate writer.\n")));
00572 throw Incomplete();
00573
00574 } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00575 OpenDDS::DCPS::DataWriterImpl* servant
00576 = dynamic_cast<OpenDDS::DCPS::DataWriterImpl*>(dataWriter.in());
00577
00578 if (0 == servant) {
00579 ACE_DEBUG((LM_WARNING,
00580 ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ")
00581 ACE_TEXT("unable to extract typed PublicationUpdate writer.\n")));
00582
00583 } else {
00584 OpenDDS::DCPS::RepoIdConverter converter(servant->get_publication_id());
00585 ACE_DEBUG((LM_DEBUG,
00586 ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
00587 ACE_TEXT("created federation PublicationUpdate writer %C for repository %d\n"),
00588 std::string(converter).c_str(),
00589 this->id().id()));
00590 }
00591 }
00592
00593 description = this->federationParticipant_->lookup_topicdescription(PUBLICATIONUPDATETOPICNAME);
00594 dataReader = subscriber->create_datareader(
00595 description.in(),
00596 readerQos,
00597 &this->publicationListener_,
00598 OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00599
00600 if (CORBA::is_nil(dataReader.in())) {
00601 ACE_ERROR((LM_ERROR,
00602 ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00603 ACE_TEXT("failed to create PublicationUpdate reader for repository %d\n"),
00604 this->id().id()));
00605 throw Incomplete();
00606
00607 } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00608 OpenDDS::DCPS::DataReaderImpl* servant
00609 = dynamic_cast<OpenDDS::DCPS::DataReaderImpl*>(dataReader.in());
00610
00611 if (0 == servant) {
00612 ACE_DEBUG((LM_WARNING,
00613 ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ")
00614 ACE_TEXT("unable to extract typed PublicationUpdate reader.\n")));
00615
00616 } else {
00617 OpenDDS::DCPS::RepoIdConverter converter(servant->get_subscription_id());
00618 ACE_DEBUG((LM_DEBUG,
00619 ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
00620 ACE_TEXT("created federation PublicationUpdate reader %C for repository %d\n"),
00621 std::string(converter).c_str(),
00622 this->id().id()));
00623 }
00624 }
00625
00626 topic = this->federationParticipant_->create_topic(
00627 SUBSCRIPTIONUPDATETOPICNAME,
00628 SUBSCRIPTIONUPDATETYPENAME,
00629 TOPIC_QOS_DEFAULT,
00630 DDS::TopicListener::_nil(),
00631 OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00632 dataWriter = publisher->create_datawriter(
00633 topic.in(),
00634 writerQos,
00635 DDS::DataWriterListener::_nil(),
00636 OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00637
00638 if (CORBA::is_nil(dataWriter.in())) {
00639 ACE_ERROR((LM_ERROR,
00640 ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00641 ACE_TEXT("failed to create SubscriptionUpdate writer for repository %d\n"),
00642 this->id().id()));
00643 throw Incomplete();
00644 }
00645
00646 this->subscriptionWriter_
00647 = SubscriptionUpdateDataWriter::_narrow(dataWriter.in());
00648
00649 if (::CORBA::is_nil(this->subscriptionWriter_.in())) {
00650 ACE_ERROR((LM_ERROR,
00651 ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00652 ACE_TEXT("failed to extract typed SubscriptionUpdate writer.\n")));
00653 throw Incomplete();
00654
00655 } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00656 OpenDDS::DCPS::DataWriterImpl* servant
00657 = dynamic_cast<OpenDDS::DCPS::DataWriterImpl*>(dataWriter.in());
00658
00659 if (0 == servant) {
00660 ACE_DEBUG((LM_WARNING,
00661 ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ")
00662 ACE_TEXT("unable to extract typed SubscriptionUpdate writer.\n")));
00663
00664 } else {
00665 OpenDDS::DCPS::RepoIdConverter converter(servant->get_publication_id());
00666 ACE_DEBUG((LM_DEBUG,
00667 ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
00668 ACE_TEXT("created federation SubscriptionUpdate writer %C for repository %d\n"),
00669 std::string(converter).c_str(),
00670 this->id().id()));
00671 }
00672 }
00673
00674 description = this->federationParticipant_->lookup_topicdescription(SUBSCRIPTIONUPDATETOPICNAME);
00675 dataReader = subscriber->create_datareader(
00676 description.in(),
00677 readerQos,
00678 &this->subscriptionListener_,
00679 OpenDDS::DCPS::DEFAULT_STATUS_MASK);
00680
00681 if (CORBA::is_nil(dataReader.in())) {
00682 ACE_ERROR((LM_ERROR,
00683 ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
00684 ACE_TEXT("failed to create SubscriptionUpdate reader for repository %d\n"),
00685 this->id().id()));
00686 throw Incomplete();
00687
00688 } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00689 OpenDDS::DCPS::DataReaderImpl* servant
00690 = dynamic_cast<OpenDDS::DCPS::DataReaderImpl*>(dataReader.in());
00691
00692 if (0 == servant) {
00693 ACE_DEBUG((LM_WARNING,
00694 ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ")
00695 ACE_TEXT("unable to extract typed SubscriptionUpdate reader.\n")));
00696
00697 } else {
00698 OpenDDS::DCPS::RepoIdConverter converter(servant->get_subscription_id());
00699 ACE_DEBUG((LM_DEBUG,
00700 ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
00701 ACE_TEXT("created federation SubscriptionUpdate reader %C for repository %d\n"),
00702 std::string(converter).c_str(),
00703 this->id().id()));
00704 }
00705 }
00706
00707
00708 #if defined (ACE_HAS_IP_MULTICAST)
00709
00710 if (this->multicastEnabled_) {
00711
00712
00713
00714
00715 ACE_Reactor *reactor = this->orb_->orb_core()->reactor();
00716
00717
00718 ACE_CString mde(this->orb_->orb_core()->orb_params()->mcast_discovery_endpoint());
00719
00720
00721
00722 u_short port = 0;
00723
00724
00725 const char *port_number = ACE_OS::getenv("OpenDDSFederationPort");
00726
00727 if (port_number != 0) {
00728 port = static_cast<u_short>(ACE_OS::atoi(port_number));
00729 }
00730
00731
00732
00733 if (port == 0)
00734 port = OpenDDS::Federator::Defaults::DiscoveryRequestPort;
00735
00736
00737 if (mde.length() != 0) {
00738 if (this->multicastResponder_.init(
00739 this->orb_.in(),
00740 mde.c_str()) == -1) {
00741 ACE_ERROR((LM_ERROR,
00742 ACE_TEXT("(%P|%t) ERROR: Unable to initialize ")
00743 ACE_TEXT("the multicast responder for repository %d.\n"),
00744 this->id().id()));
00745 throw Incomplete();
00746 }
00747
00748 } else {
00749 if (this->multicastResponder_.init(
00750 this->orb_.in(),
00751 port,
00752 #if defined (ACE_HAS_IPV6)
00753 ACE_DEFAULT_MULTICASTV6_ADDR
00754 #else
00755 ACE_DEFAULT_MULTICAST_ADDR
00756 #endif
00757 )) {
00758 ACE_ERROR((LM_ERROR,
00759 ACE_TEXT("(%P|%t) ERROR: Unable to initialize ")
00760 ACE_TEXT("the multicast responder for repository %d.\n"),
00761 this->id().id()));
00762 throw Incomplete();
00763 }
00764 }
00765
00766
00767 if (reactor->register_handler(&this->multicastResponder_,
00768 ACE_Event_Handler::READ_MASK) == -1) {
00769 ACE_ERROR((LM_ERROR,
00770 ACE_TEXT("(%P|%t) ERROR: Unable to register event handler ")
00771 ACE_TEXT("for repository %d.\n"),
00772 this->id().id()));
00773 throw Incomplete();
00774 }
00775
00776 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00777 ACE_DEBUG((LM_DEBUG,
00778 ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
00779 ACE_TEXT("multicast server setup is complete.\n")));
00780 }
00781 }
00782
00783 #else
00784 ACE_UNUSED_ARG(this->multicastEnabled_);
00785 #endif
00786 }
00787
00788 void
00789 ManagerImpl::finalize()
00790 {
00791 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00792 ACE_DEBUG((LM_DEBUG,
00793 ACE_TEXT("(%P|%t) Federator::ManagerImpl::finalize()\n")));
00794 }
00795
00796 ownerListener_.stop();
00797 topicListener_.stop();
00798 participantListener_.stop();
00799 publicationListener_.stop();
00800 subscriptionListener_.stop();
00801 ownerListener_.join();
00802 topicListener_.join();
00803 participantListener_.join();
00804 publicationListener_.join();
00805 subscriptionListener_.join();
00806
00807 if (this->federated_) {
00808 try {
00809 IdToManagerMap::iterator where = this->peers_.find(this->joinRepo_);
00810
00811 if (where == this->peers_.end()) {
00812 ACE_DEBUG((LM_DEBUG,
00813 ACE_TEXT("(%P|%t) Federator::Manager::finalize: ")
00814 ACE_TEXT("repository %d - all attachment to federation left.\n"),
00815 this->id().id()));
00816
00817 } else {
00818 if (CORBA::is_nil(where->second.in())) {
00819 ACE_ERROR((LM_ERROR,
00820 ACE_TEXT("(%P|%t) ERROR: Federator::Manager::finalize: ")
00821 ACE_TEXT("repository %d not currently attached to a federation.\n"),
00822 this->id().id()));
00823
00824 } else {
00825 where->second->leave_federation(this->id().id());
00826 this->federated_ = false;
00827 }
00828 }
00829
00830 } catch (const CORBA::Exception& ex) {
00831 ex._tao_print_exception(
00832 ACE_TEXT("ERROR: Federator::ManagerImpl::finalize() - ")
00833 ACE_TEXT("unable to leave remote federation "));
00834 throw Incomplete();
00835 }
00836 }
00837
00838 if (!CORBA::is_nil(this->orb_.in()) && (0 != this->orb_->orb_core())) {
00839 this->orb_->orb_core()->reactor()->remove_handler(
00840 &this->multicastResponder_,
00841 ACE_Event_Handler::READ_MASK | ACE_Event_Handler::DONT_CALL);
00842 }
00843
00844
00845 if (0 == CORBA::is_nil(this->federationParticipant_.in())) {
00846 DDS::DomainParticipantFactory_var dpf = TheParticipantFactory;
00847 if (DDS::RETCODE_PRECONDITION_NOT_MET
00848 == this->federationParticipant_->delete_contained_entities()) {
00849 ACE_ERROR((LM_ERROR,
00850 ACE_TEXT("(%P|%t) ERROR: Federator::Manager ")
00851 ACE_TEXT("unable to release resources for repository %d.\n"),
00852 this->id().id()));
00853
00854 } else if (DDS::RETCODE_PRECONDITION_NOT_MET
00855 == dpf->delete_participant(this->federationParticipant_.in())) {
00856 ACE_ERROR((LM_ERROR,
00857 ACE_TEXT("(%P|%t) ERROR: Federator::Manager ")
00858 ACE_TEXT("unable to release the participant for repository %d.\n"),
00859 this->id().id()));
00860 }
00861 }
00862 }
00863
00864
00865
00866 RepoKey
00867 ManagerImpl::federation_id()
00868 {
00869 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00870 ACE_DEBUG((LM_DEBUG,
00871 ACE_TEXT("(%P|%t) ManagerImpl::federation_id()\n")));
00872 }
00873
00874 return this->id().id();
00875 }
00876
00877 OpenDDS::DCPS::DCPSInfo_ptr
00878 ManagerImpl::repository()
00879 {
00880 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00881 ACE_DEBUG((LM_DEBUG,
00882 ACE_TEXT("(%P|%t) ManagerImpl::repository()\n")));
00883 }
00884
00885 OpenDDS::DCPS::Discovery_rch disco
00886 = TheServiceParticipant->get_discovery(
00887 this->config_.federationDomain());
00888 OpenDDS::DCPS::DCPSInfo_var repo;
00889 if (!disco.is_nil()) {
00890 OpenDDS::DCPS::InfoRepoDiscovery_rch irDisco =
00891 DCPS::static_rchandle_cast<DCPS::InfoRepoDiscovery>(disco);
00892 repo = irDisco->get_dcps_info();
00893 }
00894
00895 if (CORBA::is_nil(repo.in())) {
00896 return OpenDDS::DCPS::DCPSInfo::_duplicate(this->localRepo_.in());
00897
00898 } else {
00899 return OpenDDS::DCPS::DCPSInfo::_duplicate(repo.in());
00900 }
00901 }
00902
00903 CORBA::Boolean
00904 ManagerImpl::discover_federation(const char * ior)
00905 {
00906 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00907 ACE_DEBUG((LM_DEBUG,
00908 ACE_TEXT("(%P|%t) ManagerImpl::discover_federation( %C)\n"),
00909 ior));
00910 }
00911
00912
00913 return false;
00914 }
00915
00916 Manager_ptr
00917 ManagerImpl::join_federation(
00918 Manager_ptr peer,
00919 FederationDomain federation
00920 )
00921 {
00922 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00923 ACE_DEBUG((LM_DEBUG,
00924 ACE_TEXT("(%P|%t) ManagerImpl::join_federation( peer, %d)\n"),
00925 federation));
00926 }
00927
00928 RepoKey remote = NIL_REPOSITORY;
00929
00930 try {
00931
00932 remote = peer->federation_id();
00933
00934 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00935 ACE_DEBUG((LM_DEBUG,
00936 ACE_TEXT("(%P|%t) Federator::ManagerImpl::join_federation() - ")
00937 ACE_TEXT("repo id %d entered from repository with id %d.\n"),
00938 this->id().id(),
00939 remote));
00940 }
00941
00942 } catch (const CORBA::Exception& ex) {
00943 ex._tao_print_exception(
00944 ACE_TEXT("ERROR: Federator::ManagerImpl::join_federation() - ")
00945 ACE_TEXT("unable to obtain remote federation Id value: "));
00946 throw Incomplete();
00947 }
00948
00949
00950 if (this->joiner_ == remote) {
00951 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00952 ACE_DEBUG((LM_DEBUG,
00953 ACE_TEXT("(%P|%t) Federator::ManagerImpl::join_federation() - ")
00954 ACE_TEXT("repo id %d leaving after reentry from repository with id %d.\n"),
00955 this->id().id(),
00956 remote));
00957 }
00958
00959 return this->_this();
00960
00961 } else {
00962
00963 ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, guard, this->lock_, 0);
00964
00965 while (this->joiner_ != NIL_REPOSITORY) {
00966
00967 this->joining_.wait();
00968
00969
00970 if (this->joiner_ == remote) {
00971 return this->_this();
00972 }
00973 }
00974
00975
00976 this->joiner_ = remote;
00977 }
00978
00979
00980
00981
00982
00983
00984
00985
00986
00987 if (this->federated_ == false) {
00988
00989
00990 try {
00991
00992
00993 this->joinRepo_ = remote;
00994
00995
00996 OpenDDS::DCPS::DCPSInfo_var remoteRepo = peer->repository();
00997
00998 CORBA::ORB_var orb = remoteRepo->_get_orb();
00999 CORBA::String_var remoteRepoIor = orb->object_to_string(remoteRepo.in());
01000 if (OpenDDS::DCPS::DCPS_debug_level > 4) {
01001 ACE_DEBUG((LM_DEBUG,
01002 ACE_TEXT("(%P|%t) FederatorManagerImpl::join_federation() - ")
01003 ACE_TEXT("id %d obtained reference to id %d:\n")
01004 ACE_TEXT("\t%C\n"),
01005 this->id().id(),
01006 remote,
01007 remoteRepoIor.in()));
01008 }
01009
01010
01011 std::ostringstream oss;
01012 oss << remote;
01013 std::string key_string = oss.str();
01014 TheServiceParticipant->set_repo_ior(remoteRepoIor.in(), key_string);
01015 TheServiceParticipant->set_repo_domain(this->config_.federationDomain(), key_string);
01016
01017 } catch (const CORBA::Exception& ex) {
01018 ex._tao_print_exception(
01019 "ERROR: Federator::ManagerImpl::join_federation() - Unable to join with remote: ");
01020 throw Incomplete();
01021 }
01022 }
01023
01024
01025 try {
01026 Manager_var self = this->_this();
01027 Manager_var remoteManager
01028 = peer->join_federation(self, this->config_.federationDomain());
01029
01030 if (this->joinRepo_ == remote) {
01031 this->peers_[ this->joinRepo_]
01032 = OpenDDS::Federator::Manager::_duplicate(remoteManager.in());
01033 }
01034
01035
01036
01037
01038
01039
01040 if (OpenDDS::DCPS::DCPS_debug_level > 4) {
01041 ACE_DEBUG((LM_DEBUG,
01042 ACE_TEXT("(%P|%t) Federator::ManagerImpl::join_federation() - ")
01043 ACE_TEXT("repo id %d pushing state to repository with id %d.\n"),
01044 this->id().id(),
01045 remote));
01046 }
01047
01048 this->pushState(peer);
01049
01050 } catch (const CORBA::Exception& ex) {
01051 ex._tao_print_exception(
01052 "ERROR: Federator::ManagerImpl::join_federation() - unsuccessful call to remote->join: ");
01053 throw Incomplete();
01054 }
01055
01056 if (CORBA::is_nil(this->participantWriter_.in())) {
01057
01058
01059
01060
01061 this->initialize();
01062 }
01063
01064
01065 if (OpenDDS::DCPS::DCPS_debug_level > 4) {
01066 ACE_DEBUG((LM_DEBUG,
01067 ACE_TEXT("(%P|%t) Federator::ManagerImpl::join_federation() - ")
01068 ACE_TEXT("repo id %d joined to repository with id %d.\n"),
01069 this->id().id(),
01070 remote));
01071 }
01072
01073 this->federated_ = true;
01074 this->joiner_ = NIL_REPOSITORY;
01075 this->joining_.signal();
01076 return this->_this();
01077 }
01078
01079 void
01080 ManagerImpl::leave_federation(
01081 RepoKey id)
01082 {
01083 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
01084 ACE_DEBUG((LM_DEBUG,
01085 ACE_TEXT("(%P|%t) ManagerImpl::leave_federation( %d)\n"),
01086 this->id().id()));
01087 }
01088
01089
01090 IdToManagerMap::iterator where = this->peers_.find(id);
01091
01092 if (where != this->peers_.end()) {
01093 this->peers_.erase(where);
01094 }
01095
01096
01097 if (false
01098 == this->info_->remove_by_owner(this->config_.federationDomain(), id)) {
01099 throw Incomplete();
01100 }
01101
01102 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
01103 ACE_DEBUG((LM_DEBUG,
01104 ACE_TEXT("(%P|%t) ManagerImpl::leave_federation( %d) complete.\n"),
01105 this->id().id()));
01106 }
01107 }
01108
01109 void
01110 ManagerImpl::leave_and_shutdown(
01111 void)
01112 {
01113
01114 this->info_->shutdown();
01115 }
01116
01117 void
01118 ManagerImpl::shutdown(
01119 void)
01120 {
01121
01122
01123 this->federated_ = false;
01124
01125
01126 this->info_->shutdown();
01127 }
01128
01129 void
01130 ManagerImpl::initializeOwner(
01131 const OpenDDS::Federator::OwnerUpdate & data)
01132 {
01133 this->processCreate(&data, 0);
01134 }
01135
01136 void
01137 ManagerImpl::initializeTopic(
01138 const OpenDDS::Federator::TopicUpdate & data)
01139 {
01140 this->processCreate(&data, 0);
01141 }
01142
01143 void
01144 ManagerImpl::initializeParticipant(
01145 const OpenDDS::Federator::ParticipantUpdate & data)
01146 {
01147 this->processCreate(&data, 0);
01148 }
01149
01150 void
01151 ManagerImpl::initializePublication(
01152 const OpenDDS::Federator::PublicationUpdate & data)
01153 {
01154 this->processCreate(&data, 0);
01155 }
01156
01157 void
01158 ManagerImpl::initializeSubscription(
01159 const OpenDDS::Federator::SubscriptionUpdate & data)
01160 {
01161 this->processCreate(&data, 0);
01162 }
01163
01164 }
01165 }
01166
01167 OPENDDS_END_VERSIONED_NAMESPACE_DECL