OpenDDS  Snapshot(2023/04/28-20:55)
FederatorManagerImpl.cpp
Go to the documentation of this file.
1 /*
2  * Distributed under the OpenDDS License.
3  * See: http://www.opendds.org/license.html
4  */
5 
6 #include "DcpsInfo_pch.h"
7 
8 #include "FederatorManagerImpl.h"
9 
10 #include "DCPSInfo_i.h"
11 #include "DefaultValues.h"
12 #include "FederatorTypeSupportC.h"
13 #include "FederatorTypeSupportImpl.h"
14 
20 #include <dds/DCPS/DCPS_Utils.h>
25 
26 #include <tao/ORB_Core.h>
27 
28 #include <ace/Log_Priority.h>
29 #include <ace/Log_Msg.h>
30 
31 #include <sstream>
32 
33 #if !defined (__ACE_INLINE__)
34 # include "FederatorManagerImpl.inl"
35 #endif /* !__ACE_INLINE__ */
36 
38 
39 namespace OpenDDS {
40 namespace Federator {
41 
43  : joining_(this->lock_),
44  joiner_(NIL_REPOSITORY),
45  joinRepo_(NIL_REPOSITORY),
46  federated_(false),
47  config_(config),
48  info_(0),
49  ownerListener_(*this),
50  topicListener_(*this),
51  participantListener_(*this),
52  publicationListener_(*this),
53  subscriptionListener_(*this),
54  multicastEnabled_(false)
55 {
58  ACE_TEXT("(%P|%t) Federator::ManagerImpl::ManagerImpl()\n")));
59  }
60 
61  char* mdec = ACE_OS::getenv("MulticastDiscoveryEnabled");
62 
63  if (mdec != 0) {
64  std::string mde(ACE_OS::getenv("MulticastDiscoveryEnabled"));
65 
66  if (mde != "0") {
67  multicastEnabled_ = true;
68  }
69  }
70 }
71 
73 {
76  ACE_TEXT("(%P|%t) Federator::ManagerImpl::~ManagerImpl()\n")));
77  }
78 }
79 
80 void
82 {
85  ACE_TEXT("(%P|%t) Federation::ManagerImpl::initialize()\n")));
86  }
87 
88  // Let the listeners know which repository we are to filter samples at
89  // the earliest opportunity.
90  this->ownerListener_.federationId(this->id());
91  this->topicListener_.federationId(this->id());
92  this->participantListener_.federationId(this->id());
93  this->publicationListener_.federationId(this->id());
94  this->subscriptionListener_.federationId(this->id());
95 
96  // Add participant for Federation domain
97  DDS::DomainParticipantFactory_var dpf = TheParticipantFactory;
98  federationParticipant_ = dpf->create_participant(
99  this->config_.federationDomain(),
101  DDS::DomainParticipantListener::_nil(),
103  if (!federationParticipant_) {
105  ACE_TEXT("(%P|%t) ERROR: create_participant failed for ")
106  ACE_TEXT("repository %d in federation domain %d.\n"),
107  this->id().id(),
108  this->config_.federationDomain()));
109  throw Incomplete();
110  }
111 
112  //
113  // Add type support for update topics
114  //
115 
116  OwnerUpdateTypeSupportImpl::_var_type ownerUpdate = new OwnerUpdateTypeSupportImpl();
117 
118  if (DDS::RETCODE_OK != ownerUpdate->register_type(
119  this->federationParticipant_.in(),
122  ACE_TEXT("(%P|%t) ERROR: Unable to install ")
123  ACE_TEXT("OwnerUpdate type support for repository %d.\n"),
124  this->id().id()));
125  throw Incomplete();
126  }
127 
128  ParticipantUpdateTypeSupportImpl::_var_type participantUpdate = new ParticipantUpdateTypeSupportImpl();
129 
130  if (DDS::RETCODE_OK != participantUpdate->register_type(
131  this->federationParticipant_.in(),
134  ACE_TEXT("(%P|%t) ERROR: Unable to install ")
135  ACE_TEXT("ParticipantUpdate type support for repository %d.\n"),
136  this->id().id()));
137  throw Incomplete();
138  }
139 
140  TopicUpdateTypeSupportImpl::_var_type topicUpdate = new TopicUpdateTypeSupportImpl();
141 
142  if (DDS::RETCODE_OK != topicUpdate->register_type(
143  this->federationParticipant_.in(),
146  ACE_TEXT("(%P|%t) ERROR: Unable to install ")
147  ACE_TEXT("TopicUpdate type support for repository %d.\n"),
148  this->id().id()));
149  throw Incomplete();
150  }
151 
152  PublicationUpdateTypeSupportImpl::_var_type publicationUpdate = new PublicationUpdateTypeSupportImpl();
153 
154  if (DDS::RETCODE_OK != publicationUpdate->register_type(
155  this->federationParticipant_.in(),
158  ACE_TEXT("(%P|%t) ERROR: Unable to install ")
159  ACE_TEXT("PublicationUpdate type support for repository %d.\n"),
160  this->id().id()));
161  throw Incomplete();
162  }
163 
164  SubscriptionUpdateTypeSupportImpl::_var_type subscriptionUpdate = new SubscriptionUpdateTypeSupportImpl();
165 
166  if (DDS::RETCODE_OK != subscriptionUpdate->register_type(
167  this->federationParticipant_.in(),
170  ACE_TEXT("(%P|%t) ERROR: Unable to install ")
171  ACE_TEXT("SubscriptionUpdate type support for repository %d.\n"),
172  this->id().id()));
173  throw Incomplete();
174  }
175 
176  //
177  // Create a transport config for use with federation entities.
178  //
179  std::string config_name =
181  + std::string("FederationBITTransportConfig");
184 
186  + std::string("FederationBITTCPTransportInst");
189  "tcp");
190  config->instances_.push_back(inst);
191 
192  //
193  // Create the subscriber for the update topics.
194  //
195 
196  DDS::Subscriber_var subscriber
197  = this->federationParticipant_->create_subscriber(
199  DDS::SubscriberListener::_nil(),
201 
202  if (CORBA::is_nil(subscriber.in())) {
204  ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
205  ACE_TEXT("failed to create subscriber for repository %d\n"),
206  this->id().id()));
207  throw Incomplete();
208 
209  } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
211  ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
212  ACE_TEXT("created federation subscriber for repository %d\n"),
213  this->id().id()));
214 
215  }
216 
217  // Attach the transport to it.
218 
219  try {
221  subscriber.in());
222  } catch (const OpenDDS::DCPS::Transport::Exception&) {
224  ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
225  ACE_TEXT("failed to bind transport config to federation subscriber.\n")));
226  throw Incomplete();
227  }
228 
229  //
230  // Create the publisher for the update topics.
231  //
232 
233  DDS::Publisher_var publisher
234  = this->federationParticipant_->create_publisher(
236  DDS::PublisherListener::_nil(),
238 
239  if (CORBA::is_nil(publisher.in())) {
241  ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
242  ACE_TEXT("failed to create publisher for repository %d\n"),
243  this->id().id()));
244  throw Incomplete();
245 
246  } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
248  ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
249  ACE_TEXT("created federation publisher for repository %d\n"),
250  this->id().id()));
251 
252  }
253 
254  // Attach the transport to it.
255 
256  try {
258  publisher.in());
259  } catch (const OpenDDS::DCPS::Transport::Exception&) {
261  ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
262  ACE_TEXT("failed to bind transport config to federation publisher.\n")));
263  throw Incomplete();
264  }
265 
266  //
267  // Some useful items for adding the subscriptions.
268  //
269  DDS::Topic_var topic;
270  DDS::TopicDescription_var description;
271  DDS::DataReader_var dataReader;
272  DDS::DataWriter_var dataWriter;
273 
274  DDS::DataReaderQos readerQos;
275  subscriber->get_default_datareader_qos(readerQos);
278  readerQos.history.depth = 50;
280  readerQos.reliability.max_blocking_time.sec = 0;
281  readerQos.reliability.max_blocking_time.nanosec = 0;
282 
283  DDS::DataWriterQos writerQos;
284  publisher->get_default_datawriter_qos(writerQos);
287  writerQos.history.depth = 50;
289  writerQos.reliability.max_blocking_time.sec = 0;
290  writerQos.reliability.max_blocking_time.nanosec = 0;
291 
292  //
293  // Add update subscriptions
294  //
295  // NOTE: Its ok to lose the references to the objects here since they
296  // are not needed after this point. The only thing we will do
297  // with them is to destroy them, and that will be done via a
298  // cascade delete from the participant. The listeners will
299  // survive and can be used within other participants as well,
300  // since the only state they retain is the manager, which is the
301  // same for all.
302  //
303 
304  topic = this->federationParticipant_->create_topic(
308  DDS::TopicListener::_nil(),
310 
311  dataWriter = publisher->create_datawriter(
312  topic.in(),
313  writerQos,
314  DDS::DataWriterListener::_nil(),
316 
317  if (CORBA::is_nil(dataWriter.in())) {
319  ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
320  ACE_TEXT("failed to create OwnerUpdate writer for repository %d\n"),
321  this->id().id()));
322  throw Incomplete();
323  }
324 
325  this->ownerWriter_ = OwnerUpdateDataWriter::_narrow(dataWriter.in());
326 
327  if (::CORBA::is_nil(this->ownerWriter_.in())) {
329  ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
330  ACE_TEXT("failed to extract typed OwnerUpdate writer.\n")));
331  throw Incomplete();
332 
333  } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
335  = dynamic_cast<OpenDDS::DCPS::DataWriterImpl*>(dataWriter.in());
336 
337  if (0 == servant) {
339  ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ")
340  ACE_TEXT("unable to extract typed OwnerUpdate writer.\n")));
341 
342  } else {
343  OpenDDS::DCPS::RepoIdConverter converter(servant->get_guid());
345  ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
346  ACE_TEXT("created federation OwnerUpdate writer %C for repository %d\n"),
347  std::string(converter).c_str(),
348  this->id().id()));
349  }
350  }
351 
352  description = this->federationParticipant_->lookup_topicdescription(OWNERUPDATETOPICNAME);
353  dataReader = subscriber->create_datareader(
354  description.in(),
355  readerQos,
356  &this->ownerListener_,
358 
359  if (CORBA::is_nil(dataReader.in())) {
361  ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
362  ACE_TEXT("failed to create OwnerUpdate reader for repository %d\n"),
363  this->id().id()));
364  throw Incomplete();
365 
366  } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
368  = dynamic_cast<OpenDDS::DCPS::DataReaderImpl*>(dataReader.in());
369 
370  if (0 == servant) {
372  ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ")
373  ACE_TEXT("unable to extract typed OwnerUpdate reader.\n")));
374 
375  } else {
376  OpenDDS::DCPS::RepoIdConverter converter(servant->get_guid());
378  ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
379  ACE_TEXT("created federation OwnerUpdate reader %C for repository %d\n"),
380  std::string(converter).c_str(),
381  this->id().id()));
382  }
383  }
384 
385  topic = this->federationParticipant_->create_topic(
389  DDS::TopicListener::_nil(),
391  dataWriter = publisher->create_datawriter(
392  topic.in(),
393  writerQos,
394  DDS::DataWriterListener::_nil(),
396 
397  if (CORBA::is_nil(dataWriter.in())) {
399  ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
400  ACE_TEXT("failed to create TopicUpdate writer for repository %d\n"),
401  this->id().id()));
402  throw Incomplete();
403  }
404 
405  this->topicWriter_
406  = TopicUpdateDataWriter::_narrow(dataWriter.in());
407 
408  if (::CORBA::is_nil(this->topicWriter_.in())) {
410  ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
411  ACE_TEXT("failed to extract typed TopicUpdate writer.\n")));
412  throw Incomplete();
413 
414  } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
416  = dynamic_cast<OpenDDS::DCPS::DataWriterImpl*>(dataWriter.in());
417 
418  if (0 == servant) {
420  ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ")
421  ACE_TEXT("unable to extract typed TopicUpdate writer.\n")));
422 
423  } else {
424  OpenDDS::DCPS::RepoIdConverter converter(servant->get_guid());
426  ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
427  ACE_TEXT("created federation TopicUpdate writer %C for repository %d\n"),
428  std::string(converter).c_str(),
429  this->id().id()));
430  }
431  }
432 
433  description = this->federationParticipant_->lookup_topicdescription(TOPICUPDATETOPICNAME);
434  dataReader = subscriber->create_datareader(
435  description.in(),
436  readerQos,
437  &this->topicListener_,
439 
440  if (CORBA::is_nil(dataReader.in())) {
442  ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
443  ACE_TEXT("failed to create TopicUpdate reader for repository %d\n"),
444  this->id().id()));
445  throw Incomplete();
446 
447  } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
449  = dynamic_cast<OpenDDS::DCPS::DataReaderImpl*>(dataReader.in());
450 
451  if (0 == servant) {
453  ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ")
454  ACE_TEXT("unable to extract typed TopicUpdate reader.\n")));
455 
456  } else {
457  OpenDDS::DCPS::RepoIdConverter converter(servant->get_guid());
459  ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
460  ACE_TEXT("created federation TopicUpdate reader %C for repository %d\n"),
461  std::string(converter).c_str(),
462  this->id().id()));
463  }
464  }
465 
466  topic = this->federationParticipant_->create_topic(
470  DDS::TopicListener::_nil(),
472  dataWriter = publisher->create_datawriter(
473  topic.in(),
474  writerQos,
475  DDS::DataWriterListener::_nil(),
477 
478  if (CORBA::is_nil(dataWriter.in())) {
480  ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
481  ACE_TEXT("failed to create ParticipantUpdate writer for repository %d\n"),
482  this->id().id()));
483  throw Incomplete();
484  }
485 
486  this->participantWriter_
487  = ParticipantUpdateDataWriter::_narrow(dataWriter.in());
488 
489  if (::CORBA::is_nil(this->participantWriter_.in())) {
491  ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
492  ACE_TEXT("failed to extract typed ParticipantUpdate writer.\n")));
493  throw Incomplete();
494 
495  } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
497  = dynamic_cast<OpenDDS::DCPS::DataWriterImpl*>(dataWriter.in());
498 
499  if (0 == servant) {
501  ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ")
502  ACE_TEXT("unable to extract typed ParticipantUpdate writer.\n")));
503 
504  } else {
505  OpenDDS::DCPS::RepoIdConverter converter(servant->get_guid());
507  ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
508  ACE_TEXT("created federation ParticipantUpdate writer %C for repository %d\n"),
509  std::string(converter).c_str(),
510  this->id().id()));
511  }
512  }
513 
514  description = this->federationParticipant_->lookup_topicdescription(PARTICIPANTUPDATETOPICNAME);
515  dataReader = subscriber->create_datareader(
516  description.in(),
517  readerQos,
518  &this->participantListener_,
520 
521  if (CORBA::is_nil(dataReader.in())) {
523  ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
524  ACE_TEXT("failed to create ParticipantUpdate reader for repository %d\n"),
525  this->id().id()));
526  throw Incomplete();
527 
528  } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
530  = dynamic_cast<OpenDDS::DCPS::DataReaderImpl*>(dataReader.in());
531 
532  if (0 == servant) {
534  ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ")
535  ACE_TEXT("unable to extract typed ParticipantUpdate reader.\n")));
536 
537  } else {
538  OpenDDS::DCPS::RepoIdConverter converter(servant->get_guid());
540  ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
541  ACE_TEXT("created federation ParticipantUpdate reader %C for repository %d\n"),
542  std::string(converter).c_str(),
543  this->id().id()));
544  }
545  }
546 
547  topic = this->federationParticipant_->create_topic(
551  DDS::TopicListener::_nil(),
553  dataWriter = publisher->create_datawriter(
554  topic.in(),
555  writerQos,
556  DDS::DataWriterListener::_nil(),
558 
559  if (CORBA::is_nil(dataWriter.in())) {
561  ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
562  ACE_TEXT("failed to create PublicationUpdate writer for repository %d\n"),
563  this->id().id()));
564  throw Incomplete();
565  }
566 
567  this->publicationWriter_
568  = PublicationUpdateDataWriter::_narrow(dataWriter.in());
569 
570  if (::CORBA::is_nil(this->publicationWriter_.in())) {
572  ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
573  ACE_TEXT("failed to extract typed PublicationUpdate writer.\n")));
574  throw Incomplete();
575 
576  } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
578  = dynamic_cast<OpenDDS::DCPS::DataWriterImpl*>(dataWriter.in());
579 
580  if (0 == servant) {
582  ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ")
583  ACE_TEXT("unable to extract typed PublicationUpdate writer.\n")));
584 
585  } else {
586  OpenDDS::DCPS::RepoIdConverter converter(servant->get_guid());
588  ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
589  ACE_TEXT("created federation PublicationUpdate writer %C for repository %d\n"),
590  std::string(converter).c_str(),
591  this->id().id()));
592  }
593  }
594 
595  description = this->federationParticipant_->lookup_topicdescription(PUBLICATIONUPDATETOPICNAME);
596  dataReader = subscriber->create_datareader(
597  description.in(),
598  readerQos,
599  &this->publicationListener_,
601 
602  if (CORBA::is_nil(dataReader.in())) {
604  ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
605  ACE_TEXT("failed to create PublicationUpdate reader for repository %d\n"),
606  this->id().id()));
607  throw Incomplete();
608 
609  } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
611  = dynamic_cast<OpenDDS::DCPS::DataReaderImpl*>(dataReader.in());
612 
613  if (0 == servant) {
615  ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ")
616  ACE_TEXT("unable to extract typed PublicationUpdate reader.\n")));
617 
618  } else {
619  OpenDDS::DCPS::RepoIdConverter converter(servant->get_guid());
621  ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
622  ACE_TEXT("created federation PublicationUpdate reader %C for repository %d\n"),
623  std::string(converter).c_str(),
624  this->id().id()));
625  }
626  }
627 
628  topic = this->federationParticipant_->create_topic(
632  DDS::TopicListener::_nil(),
634  dataWriter = publisher->create_datawriter(
635  topic.in(),
636  writerQos,
637  DDS::DataWriterListener::_nil(),
639 
640  if (CORBA::is_nil(dataWriter.in())) {
642  ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
643  ACE_TEXT("failed to create SubscriptionUpdate writer for repository %d\n"),
644  this->id().id()));
645  throw Incomplete();
646  }
647 
648  this->subscriptionWriter_
649  = SubscriptionUpdateDataWriter::_narrow(dataWriter.in());
650 
651  if (::CORBA::is_nil(this->subscriptionWriter_.in())) {
653  ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
654  ACE_TEXT("failed to extract typed SubscriptionUpdate writer.\n")));
655  throw Incomplete();
656 
657  } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
659  = dynamic_cast<OpenDDS::DCPS::DataWriterImpl*>(dataWriter.in());
660 
661  if (0 == servant) {
663  ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ")
664  ACE_TEXT("unable to extract typed SubscriptionUpdate writer.\n")));
665 
666  } else {
667  OpenDDS::DCPS::RepoIdConverter converter(servant->get_guid());
669  ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
670  ACE_TEXT("created federation SubscriptionUpdate writer %C for repository %d\n"),
671  std::string(converter).c_str(),
672  this->id().id()));
673  }
674  }
675 
676  description = this->federationParticipant_->lookup_topicdescription(SUBSCRIPTIONUPDATETOPICNAME);
677  dataReader = subscriber->create_datareader(
678  description.in(),
679  readerQos,
680  &this->subscriptionListener_,
682 
683  if (CORBA::is_nil(dataReader.in())) {
685  ACE_TEXT("(%P|%t) ERROR: Federator::ManagerImpl::initialize() - ")
686  ACE_TEXT("failed to create SubscriptionUpdate reader for repository %d\n"),
687  this->id().id()));
688  throw Incomplete();
689 
690  } else if (OpenDDS::DCPS::DCPS_debug_level > 4) {
692  = dynamic_cast<OpenDDS::DCPS::DataReaderImpl*>(dataReader.in());
693 
694  if (0 == servant) {
696  ACE_TEXT("(%P|%t) WARNING: Federator::ManagerImpl::initialize() - ")
697  ACE_TEXT("unable to extract typed SubscriptionUpdate reader.\n")));
698 
699  } else {
700  OpenDDS::DCPS::RepoIdConverter converter(servant->get_guid());
702  ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
703  ACE_TEXT("created federation SubscriptionUpdate reader %C for repository %d\n"),
704  std::string(converter).c_str(),
705  this->id().id()));
706  }
707  }
708 
709  // JSP
710 #if defined (ACE_HAS_IP_MULTICAST)
711 
712  if (this->multicastEnabled_) {
713  //
714  // Install ior multicast handler.
715  //
716  // Get reactor instance from TAO.
717  ACE_Reactor* reactor = this->orb_->orb_core()->reactor();
718 
719  // See if the -ORBMulticastDiscoveryEndpoint option was specified.
720  ACE_CString mde(this->orb_->orb_core()->orb_params()->mcast_discovery_endpoint());
721 
722  // First, see if the user has given us a multicast port number
723  // on the command-line;
724  u_short port = 0;
725 
726  // Check environment var. for multicast port.
727  const char *port_number = ACE_OS::getenv("OpenDDSFederationPort");
728 
729  if (port_number != 0) {
730  port = static_cast<u_short>(ACE_OS::atoi(port_number));
731  }
732 
733  // Port wasn't specified on the command-line -
734  // use the default.
735  if (port == 0)
737 
738  // Initialize the handler
739  if (mde.length() != 0) {
740  if (this->multicastResponder_.init(
741  this->orb_.in(),
742  mde.c_str()) == -1) {
744  ACE_TEXT("(%P|%t) ERROR: Unable to initialize ")
745  ACE_TEXT("the multicast responder for repository %d.\n"),
746  this->id().id()));
747  throw Incomplete();
748  }
749 
750  } else {
751  if (this->multicastResponder_.init(
752  this->orb_.in(),
753  port,
754 #if defined (ACE_HAS_IPV6)
756 #else
758 #endif /* ACE_HAS_IPV6 */
759  )) {
761  ACE_TEXT("(%P|%t) ERROR: Unable to initialize ")
762  ACE_TEXT("the multicast responder for repository %d.\n"),
763  this->id().id()));
764  throw Incomplete();
765  }
766  }
767 
768  // Register event handler for the ior multicast.
769  if (reactor->register_handler(&this->multicastResponder_,
772  ACE_TEXT("(%P|%t) ERROR: Unable to register event handler ")
773  ACE_TEXT("for repository %d.\n"),
774  this->id().id()));
775  throw Incomplete();
776  }
777 
780  ACE_TEXT("(%P|%t) Federator::ManagerImpl::initialize() - ")
781  ACE_TEXT("multicast server setup is complete.\n")));
782  }
783  }
784 
785 #else
786  ACE_UNUSED_ARG(this->multicastEnabled_);
787 #endif /* ACE_HAS_IP_MULTICAST */
788 }
789 
790 void
792 {
795  ACE_TEXT("(%P|%t) Federator::ManagerImpl::finalize()\n")));
796  }
797 
798  ownerListener_.stop();
799  topicListener_.stop();
800  participantListener_.stop();
801  publicationListener_.stop();
802  subscriptionListener_.stop();
803  ownerListener_.join();
804  topicListener_.join();
805  participantListener_.join();
806  publicationListener_.join();
807  subscriptionListener_.join();
808 
809  if (this->federated_) {
810  try {
811  IdToManagerMap::iterator where = this->peers_.find(this->joinRepo_);
812 
813  if (where == this->peers_.end()) {
815  ACE_TEXT("(%P|%t) Federator::Manager::finalize: ")
816  ACE_TEXT("repository %d - all attachment to federation left.\n"),
817  this->id().id()));
818 
819  } else {
820  if (CORBA::is_nil(where->second.in())) {
822  ACE_TEXT("(%P|%t) ERROR: Federator::Manager::finalize: ")
823  ACE_TEXT("repository %d not currently attached to a federation.\n"),
824  this->id().id()));
825 
826  } else {
827  where->second->leave_federation(this->id().id());
828  this->federated_ = false;
829  }
830  }
831 
832  } catch (const CORBA::Exception& ex) {
834  ACE_TEXT("ERROR: Federator::ManagerImpl::finalize() - ")
835  ACE_TEXT("unable to leave remote federation "));
836  throw Incomplete();
837  }
838  }
839 
840  if (!CORBA::is_nil(this->orb_.in()) && (0 != this->orb_->orb_core())) {
841  this->orb_->orb_core()->reactor()->remove_handler(
842  &this->multicastResponder_,
844  }
845 
846  // Remove our local participant and contained entities.
848  const DDS::ReturnCode_t entities_error =
849  federationParticipant_->delete_contained_entities();
850  if (entities_error) {
851  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: Federator::Manager: "
852  "unable to release resources for repository %d: %C\n",
853  id().id(), DCPS::retcode_to_string(entities_error)));
854  return;
855  }
856 
857  DDS::DomainParticipantFactory_var dpf = TheParticipantFactory;
858  const DDS::ReturnCode_t part_error = dpf->delete_participant(federationParticipant_);
859  if (part_error) {
860  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: Federator::Manager: "
861  "unable to release the participant for repository %d: %C\n",
862  id().id(), DCPS::retcode_to_string(part_error)));
863  }
864  }
865 }
866 
867 // IDL methods.
868 
869 RepoKey
871 {
874  ACE_TEXT("(%P|%t) ManagerImpl::federation_id()\n")));
875  }
876 
877  return this->id().id();
878 }
879 
880 OpenDDS::DCPS::DCPSInfo_ptr
882 {
885  ACE_TEXT("(%P|%t) ManagerImpl::repository()\n")));
886  }
887 
889  = TheServiceParticipant->get_discovery(
890  this->config_.federationDomain());
891  OpenDDS::DCPS::DCPSInfo_var repo;
892  if (!disco.is_nil()) {
895  repo = irDisco->get_dcps_info();
896  }
897 
898  if (CORBA::is_nil(repo.in())) {
899  return OpenDDS::DCPS::DCPSInfo::_duplicate(this->localRepo_.in());
900 
901  } else {
902  return OpenDDS::DCPS::DCPSInfo::_duplicate(repo.in());
903  }
904 }
905 
908 {
911  ACE_TEXT("(%P|%t) ManagerImpl::discover_federation( %C)\n"),
912  ior));
913  }
914 
915  ///@TODO: Implement this.
916  return false;
917 }
918 
919 Manager_ptr
921  Manager_ptr peer,
922  FederationDomain federation
923 )
924 {
927  ACE_TEXT("(%P|%t) ManagerImpl::join_federation( peer, %d)\n"),
928  federation));
929  }
930 
931  RepoKey remote = NIL_REPOSITORY;
932 
933  try {
934  // Obtain the remote repository federator Id value.
935  remote = peer->federation_id();
936 
939  ACE_TEXT("(%P|%t) Federator::ManagerImpl::join_federation() - ")
940  ACE_TEXT("repo id %d entered from repository with id %d.\n"),
941  this->id().id(),
942  remote));
943  }
944 
945  } catch (const CORBA::Exception& ex) {
947  ACE_TEXT("ERROR: Federator::ManagerImpl::join_federation() - ")
948  ACE_TEXT("unable to obtain remote federation Id value: "));
949  throw Incomplete();
950  }
951 
952  // If we are recursing, then we are done.
953  if (this->joiner_ == remote) {
956  ACE_TEXT("(%P|%t) Federator::ManagerImpl::join_federation() - ")
957  ACE_TEXT("repo id %d leaving after reentry from repository with id %d.\n"),
958  this->id().id(),
959  remote));
960  }
961 
962  return this->_this();
963 
964  } else {
965  // Block while any different repository is joining.
966  ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, guard, this->lock_, 0);
967 
968  while (this->joiner_ != NIL_REPOSITORY) {
969  // This releases the lock while we block.
970  this->joining_.wait();
971 
972  // We are now recursing - curses!
973  if (this->joiner_ == remote) {
974  return this->_this();
975  }
976  }
977 
978  // Note that we are joining the remote repository now.
979  this->joiner_ = remote;
980  }
981 
982  //
983  // We only reach this point if:
984  // 1) No other repository is processing past this point;
985  // 2) We are not recursing.
986  //
987 
988  // Check if we already have Federation repository.
989  // Check if we are already federated.
990  if (this->federated_ == false) {
991  // Go ahead and add the joining repository as our Federation
992  // repository.
993  try {
994  // Mark this repository as the point to which we are joined to
995  // the federation.
996  this->joinRepo_ = remote;
997 
998  // Obtain a reference to the remote repository.
999  OpenDDS::DCPS::DCPSInfo_var remoteRepo = peer->repository();
1000 
1001  CORBA::ORB_var orb = remoteRepo->_get_orb();
1002  CORBA::String_var remoteRepoIor = orb->object_to_string(remoteRepo.in());
1005  ACE_TEXT("(%P|%t) FederatorManagerImpl::join_federation() - ")
1006  ACE_TEXT("id %d obtained reference to id %d:\n")
1007  ACE_TEXT("\t%C\n"),
1008  this->id().id(),
1009  remote,
1010  remoteRepoIor.in()));
1011  }
1012 
1013  // Add remote repository to Service_Participant in the Federation domain
1014  std::ostringstream oss;
1015  oss << remote;
1016  std::string key_string = oss.str();
1017  TheServiceParticipant->set_repo_ior(remoteRepoIor.in(), key_string);
1018  TheServiceParticipant->set_repo_domain(this->config_.federationDomain(), key_string);
1019 
1020  } catch (const CORBA::Exception& ex) {
1022  "ERROR: Federator::ManagerImpl::join_federation() - Unable to join with remote: ");
1023  throw Incomplete();
1024  }
1025  }
1026 
1027  // Symmetrical joining behavior.
1028  try {
1029  Manager_var self = this->_this();
1030  Manager_var remoteManager
1031  = peer->join_federation(self, this->config_.federationDomain());
1032 
1033  if (this->joinRepo_ == remote) {
1034  this->peers_[ this->joinRepo_]
1035  = OpenDDS::Federator::Manager::_duplicate(remoteManager.in());
1036  }
1037 
1038  //
1039  // Push our initial state out to the joining repository *after* we call
1040  // him back to join. This reduces the amount of duplicate data pushed
1041  // when a new (empty) repository is joining an existing federation.
1042  //
1045  ACE_TEXT("(%P|%t) Federator::ManagerImpl::join_federation() - ")
1046  ACE_TEXT("repo id %d pushing state to repository with id %d.\n"),
1047  this->id().id(),
1048  remote));
1049  }
1050 
1051  this->pushState(peer);
1052 
1053  } catch (const CORBA::Exception& ex) {
1055  "ERROR: Federator::ManagerImpl::join_federation() - unsuccessful call to remote->join: ");
1056  throw Incomplete();
1057  }
1058 
1059  if (CORBA::is_nil(this->participantWriter_.in())) {
1060  //
1061  // Establish our update publications and subscriptions *after* we
1062  // have exchanged internal state with the first joining repository.
1063  //
1064  this->initialize();
1065  }
1066 
1067  // Adjust our joining state and give others the opportunity to proceed.
1070  ACE_TEXT("(%P|%t) Federator::ManagerImpl::join_federation() - ")
1071  ACE_TEXT("repo id %d joined to repository with id %d.\n"),
1072  this->id().id(),
1073  remote));
1074  }
1075 
1076  this->federated_ = true;
1077  this->joiner_ = NIL_REPOSITORY;
1078  this->joining_.signal();
1079  return this->_this();
1080 }
1081 
1082 void
1084  RepoKey id)
1085 {
1088  ACE_TEXT("(%P|%t) ManagerImpl::leave_federation( %d)\n"),
1089  this->id().id()));
1090  }
1091 
1092  // Remove the leaving repository from our outbound mappings.
1093  IdToManagerMap::iterator where = this->peers_.find(id);
1094 
1095  if (where != this->peers_.end()) {
1096  this->peers_.erase(where);
1097  }
1098 
1099  // Remove all the internal Entities owned by the leaving repository.
1101  throw Incomplete();
1102  }
1103 
1106  ACE_TEXT("(%P|%t) ManagerImpl::leave_federation( %d) complete.\n"),
1107  this->id().id()));
1108  }
1109 }
1110 
1111 void
1113  void)
1114 {
1115  // Shutdown the process via the repository object.
1116  this->info_->shutdown();
1117 }
1118 
1119 void
1121  void)
1122 {
1123  // Prevent the removal of this repository from the federation during
1124  // shutdown processing.
1125  this->federated_ = false;
1126 
1127  // Shutdown the process via the repository object.
1128  this->info_->shutdown();
1129 }
1130 
1131 void
1133  const OpenDDS::Federator::OwnerUpdate & data)
1134 {
1135  this->processCreate(&data, 0);
1136 }
1137 
1138 void
1140  const OpenDDS::Federator::TopicUpdate & data)
1141 {
1142  this->processCreate(&data, 0);
1143 }
1144 
1145 void
1148 {
1149  this->processCreate(&data, 0);
1150 }
1151 
1152 void
1155 {
1156  this->processCreate(&data, 0);
1157 }
1158 
1159 void
1162 {
1163  this->processCreate(&data, 0);
1164 }
1165 
1166 } // namespace Federator
1167 } // namespace OpenDDS
1168 
bool multicastEnabled_
Is multicast enabled?
RepoKey joiner_
Simple recursion avoidance during the join operations.
#define PARTICIPANT_QOS_DEFAULT
#define ACE_DEBUG(X)
const string PUBLICATIONUPDATETOPICNAME
Definition: Federator.idl:121
HistoryQosPolicy history
#define ACE_ERROR(X)
const string TOPICUPDATETOPICNAME
Definition: Federator.idl:76
const string OWNERUPDATETYPENAME
Definition: Federator.idl:56
virtual void initializeSubscription(const OpenDDS::Federator::SubscriptionUpdate &data)
UpdateListener< SubscriptionUpdate, SubscriptionUpdateDataReader > subscriptionListener_
SubscriptionUpdate listener.
#define ACE_SYNCH_MUTEX
ReliabilityQosPolicy reliability
virtual void shutdown()
Cause the entire repository to exit.
Definition: DCPSInfo_i.cpp:113
void federationDomain(long domain)
Federation Id value.
TAO_DDS_DCPSInfo_i * info_
The Info object reference to update.
InfoRepoMulticastResponder multicastResponder_
Multicast responder.
DurabilityQosPolicy durability
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
const TAO_DDS_DCPSFederationId & id() const
Accessors for the federation Id value.
HistoryQosPolicyKind kind
CORBA::ORB_var orb_
The ORB in which we are activated.
virtual void initializePublication(const OpenDDS::Federator::PublicationUpdate &data)
ParticipantUpdateDataWriter_var participantWriter_
ParticipantUpdate writer.
IdToManagerMap peers_
The peer with which we have federated.
virtual OpenDDS::DCPS::DCPSInfo_ptr repository()
virtual void initializeOwner(const OpenDDS::Federator::OwnerUpdate &data)
Config & config_
The configuration information for this manager.
SubscriptionUpdateDataWriter_var subscriptionWriter_
SubscriptionUpdate writer.
char * getenv(const char *symbol)
ACE_Guard< ACE_Thread_Mutex > lock_
const string SUBSCRIPTIONUPDATETOPICNAME
Definition: Federator.idl:152
OwnerUpdateDataWriter_var ownerWriter_
TopicUpdate writer.
void bind_config(const OPENDDS_STRING &name, DDS::Entity_ptr entity)
const DDS::StatusMask DEFAULT_STATUS_MASK
Discovery Strategy class that implements InfoRepo discovery.
CORBA::ORB_ptr orb()
Accessors for the ORB.
const RepoKey NIL_REPOSITORY
Definition: Federator.idl:36
int register_handler(ACE_Event_Handler *event_handler, ACE_Reactor_Mask mask)
int wait(const ACE_Time_Value *abstime)
virtual void initializeParticipant(const OpenDDS::Federator::ParticipantUpdate &data)
LM_DEBUG
ACE_Condition< ACE_SYNCH_MUTEX > joining_
Condition used to gate joining activities.
Implements the OpenDDS::DCPS::DataWriterRemote interfaces and DDS::DataWriter interfaces.
const string SUBSCRIPTIONUPDATETYPENAME
Definition: Federator.idl:153
ReliabilityQosPolicyKind kind
DurabilityQosPolicyKind kind
DurabilityQosPolicy durability
UpdateListener< OwnerUpdate, OwnerUpdateDataReader > ownerListener_
TopicUpdate listener.
RepoKey joinRepo_
Repository to which we joined.
bool remove_by_owner(DDS::DomainId_t domain, long owner)
unsigned long nanosec
Definition: DdsDcpsCore.idl:69
virtual Manager_ptr join_federation(Manager_ptr peer, FederationDomain federation)
ACE_CDR::Boolean Boolean
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
void initialize()
Establish the update publications and subscriptions.
virtual void initializeTopic(const OpenDDS::Federator::TopicUpdate &data)
Implements the DDS::DataReader interface.
const string PUBLICATIONUPDATETYPENAME
Definition: Federator.idl:122
UpdateListener< PublicationUpdate, PublicationUpdateDataReader > publicationListener_
PublicationUpdate listener.
DDS::DomainId_t FederationDomain
Definition: Federator.idl:21
virtual CORBA::Boolean discover_federation(const char *ior)
UpdateListener< ParticipantUpdate, ParticipantUpdateDataReader > participantListener_
ParticipantUpdate listener.
#define PUBLISHER_QOS_DEFAULT
LM_WARNING
TransportInst_rch create_inst(const OPENDDS_STRING &name, const OPENDDS_STRING &transport_type)
TransportConfig_rch create_config(const OPENDDS_STRING &name)
int init(CORBA::ORB_ptr orb, u_short port, const char *mcast_addr)
Initialization method.
virtual void leave_federation(RepoKey id)
void processCreate(const OwnerUpdate *sample, const DDS::SampleInfo *info)
Null implementation for OwnerUpdate samples.
_in_type in(void) const
ACE_TEXT("TCP_Factory")
int atoi(const char *s)
HistoryQosPolicy history
ACE_SYNCH_MUTEX lock_
Critical section MUTEX.
#define SUBSCRIBER_QOS_DEFAULT
#define ACE_DEFAULT_MULTICASTV6_ADDR
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ReliabilityQosPolicy reliability
const string PARTICIPANTUPDATETOPICNAME
Definition: Federator.idl:95
::CORBA::Long RepoKey
const char * retcode_to_string(DDS::ReturnCode_t value)
Definition: DCPS_Utils.cpp:29
RcHandle< T > static_rchandle_cast(const RcHandle< U > &h)
Definition: RcHandle_T.h:202
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
void finalize()
Release resources gracefully.
UpdateListener< TopicUpdate, TopicUpdateDataReader > topicListener_
TopicUpdate listener.
static const char DEFAULT_INST_PREFIX[]
static TransportRegistry * instance()
Return a singleton instance of this class.
const ReturnCode_t RETCODE_OK
PublicationUpdateDataWriter_var publicationWriter_
PublicationUpdate writer.
#define TheParticipantFactory
DDS::DomainParticipant_var federationParticipant_
local DomainParticipant
const character_type * in(void) const
const string OWNERUPDATETOPICNAME
Definition: Federator.idl:55
#define ACE_DEFAULT_MULTICAST_ADDR
TopicUpdateDataWriter_var topicWriter_
TopicUpdate writer.
#define TheServiceParticipant
LM_ERROR
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
int signal(void)
void _tao_print_exception(const char *info, FILE *f=stdout) const
Definition: Exception.cpp:82
void pushState(Manager_ptr peer)
Push our current state to a remote repository.
Boolean is_nil(T x)
const string PARTICIPANTUPDATETYPENAME
Definition: Federator.idl:96
OpenDDS::DCPS::DCPSInfo_var localRepo_
Remotely callable reference to the local repository.
#define TOPIC_QOS_DEFAULT
const string TOPICUPDATETYPENAME
Definition: Federator.idl:77
void id(RepoKey fedId)