OpenDDS  Snapshot(2023/04/28-20:55)
InfoRepoDiscovery.cpp
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 #include "InfoRepoDiscovery.h"
8 
9 #include "DataReaderRemoteImpl.h"
10 #include "DataWriterRemoteC.h"
11 #include "DataWriterRemoteImpl.h"
12 #include "FailoverListener.h"
14 #include "dds/DCPS/RepoIdBuilder.h"
15 #include "dds/DCPS/ConfigUtils.h"
16 #include "dds/DCPS/DCPS_Utils.h"
18 
22 
23 #include "tao/ORB_Core.h"
24 #include "tao/BiDir_GIOP/BiDirGIOP.h"
25 #include "ace/Reactor.h"
26 
27 #if !defined (DDS_HAS_MINIMUM_BIT)
31 
33 
36 #include "dds/DdsDcpsCoreTypeSupportImpl.h"
37 #endif
38 
39 namespace {
40 const char ROOT_POA[] = "RootPOA";
41 const char BIDIR_POA[] = "BiDirPOA";
42 
43 struct DestroyPolicy {
44  explicit DestroyPolicy(const CORBA::Policy_ptr& p)
45  : p_(CORBA::Policy::_duplicate(p)) {}
46 
47  ~DestroyPolicy() { p_->destroy(); }
48 
49  CORBA::Policy_var p_;
50 };
51 
53 {
54  CORBA::Object_var obj =
55  orb->resolve_initial_references(ROOT_POA);
56  PortableServer::POA_var root_poa = PortableServer::POA::_narrow(obj.in());
57 
58  if (TheServiceParticipant->use_bidir_giop()) {
59  while (true) {
60  try {
61  return root_poa->find_POA(BIDIR_POA, false /*activate*/);
63  // go ahead and create it...
64  }
65  CORBA::PolicyList policies(1);
66  policies.length(1);
67  CORBA::Any policy;
68  policy <<= BiDirPolicy::BOTH;
69  policies[0] =
70  orb->create_policy(BiDirPolicy::BIDIRECTIONAL_POLICY_TYPE, policy);
71  DestroyPolicy destroy(policies[0]);
72  PortableServer::POAManager_var manager = root_poa->the_POAManager();
73  try {
74  return root_poa->create_POA(BIDIR_POA, manager, policies);
76  // another thread created it, try to find it again
77  }
78  }
79  }
80 
81  return root_poa._retn();
82 }
83 
84 /// Get a servant pointer given an object reference.
85 /// @throws PortableServer::POA::ObjectNotActive
86 /// PortableServer::POA::WrongAdapter
87 /// PortableServer::POA::WrongPolicy
88 template <class T_impl, class T_ptr>
89 T_impl* remote_reference_to_servant(T_ptr p, CORBA::ORB_ptr orb)
90 {
91  if (CORBA::is_nil(p)) {
92  return 0;
93  }
94 
95  PortableServer::POA_var poa = get_POA(orb);
96 
97  T_impl* the_servant =
98  dynamic_cast<T_impl*>(poa->reference_to_servant(p));
99 
100  // Use the ServantBase_var so that the servant's reference
101  // count will not be changed by this operation.
102  PortableServer::ServantBase_var servant = the_servant;
103 
104  return the_servant;
105 }
106 
107 /// Given a servant, return the remote object reference from the local POA.
108 /// @throws PortableServer::POA::ServantNotActive,
109 /// PortableServer::POA::WrongPolicy
110 template <class T>
111 typename T::_stub_ptr_type servant_to_remote_reference(T* servant, CORBA::ORB_ptr orb)
112 {
113  PortableServer::POA_var poa = get_POA(orb);
114  PortableServer::ObjectId_var oid = poa->activate_object(servant);
115  CORBA::Object_var obj = poa->id_to_reference(oid.in());
116 
117  typename T::_stub_ptr_type the_obj = T::_stub_type::_narrow(obj.in());
118  return the_obj;
119 }
120 
121 template <class T>
122 void deactivate_remote_object(T obj, CORBA::ORB_ptr orb)
123 {
124  PortableServer::POA_var poa = get_POA(orb);
126  poa->reference_to_id(obj);
127  poa->deactivate_object(oid.in());
128 }
129 
130 }
131 
133 
134 namespace OpenDDS {
135 namespace DCPS {
136 
137 InfoRepoDiscovery::InfoRepoDiscovery(const RepoKey& key,
138  const std::string& ior)
139  : Discovery(key),
140  ior_(ior),
141  bit_transport_port_(0),
142  use_local_bit_config_(false),
143  orb_from_user_(false)
144 {
145 }
146 
148  const DCPSInfo_var& info)
149  : Discovery(key),
150  info_(info),
152  use_local_bit_config_(false),
153  orb_from_user_(false)
154 {
155 }
156 
158 {
159  if (!orb_from_user_ && orb_runner_) {
160  if (0 == --orb_runner_->use_count_) {
161  try {
163  }
164  catch (const CORBA::Exception& ex) {
166  ACE_TEXT("ERROR: InfoRepoDiscovery::~InfoRepoDiscovery - ")
167  ACE_TEXT("Exception caught during ORB shutdown: %C.\n"),
168  ex._info().c_str()));
169  }
170 
171  delete orb_runner_;
172  orb_runner_ = 0;
173  }
174  }
175 }
176 
177 bool
179 {
180  if (!CORBA::is_nil (orb_.in()) || CORBA::is_nil (orb)) {
181  return false;
182  }
183 
185  orb_from_user_ = true;
186  return true;
187 }
188 
189 namespace
190 {
191  DCPSInfo_ptr get_repo(const char* ior, CORBA::ORB_ptr orb)
192  {
194  try {
195  o = orb->string_to_object(ior);
196  } catch (CORBA::INV_OBJREF&) {
197  // host:port format causes an exception; try again
198  // with corbaloc format
199  std::string second_try("corbaloc:iiop:");
200  second_try += ior;
201  second_try += "/DCPSInfoRepo";
202 
203  o = orb->string_to_object(second_try.c_str());
204  }
205 
206  return DCPSInfo::_narrow(o.in());
207  }
208 }
209 
210 DCPSInfo_var
212 {
214  if (CORBA::is_nil(this->info_.in())) {
215 
216  if (!orb_) {
218  if (!orb_runner_) {
219  orb_runner_ = new OrbRunner;
220  ACE_ARGV* argv = TheServiceParticipant->ORB_argv();
221  int argc = argv->argc();
222  orb_runner_->orb_ =
223  CORBA::ORB_init(argc, argv->argv(), DEFAULT_ORB_NAME);
224  orb_runner_->use_count_ = 1;
226 
227  CORBA::Object_var rp =
228  orb_runner_->orb_->resolve_initial_references(ROOT_POA);
229  PortableServer::POA_var poa = PortableServer::POA::_narrow(rp);
230  PortableServer::POAManager_var poa_manager = poa->the_POAManager();
231  poa_manager->activate();
232  } else {
234  }
235  orb_ = orb_runner_->orb_;
236  }
237 
238  try {
239  this->info_ = get_repo(this->ior_.c_str(), orb_);
240 
241  if (CORBA::is_nil(this->info_.in())) {
243  ACE_TEXT("(%P|%t) ERROR: InfoRepoDiscovery::get_dcps_info: ")
244  ACE_TEXT("unable to narrow DCPSInfo (%C) for key %C.\n"),
245  this->ior_.c_str(),
246  this->key().c_str()));
247  return DCPSInfo::_nil();
248  }
249 
250  } catch (const CORBA::Exception& ex) {
251  ex._tao_print_exception("ERROR: InfoRepoDiscovery::get_dcps_info: failed to resolve ior - ");
252  return DCPSInfo::_nil();
253  }
254  }
255 
256  return this->info_;
257 }
258 
259 std::string
261 {
263  return this->ior_;
264 }
265 
268 {
269 #if !defined (DDS_HAS_MINIMUM_BIT)
271  if (bit_config_.is_nil()) {
272  const std::string cfg_name = TransportRegistry::DEFAULT_INST_PREFIX +
273  std::string("_BITTransportConfig_") + key();
275 
276  const std::string inst_name = TransportRegistry::DEFAULT_INST_PREFIX +
277  std::string("_BITTCPTransportInst_") + key();
278  TransportInst_rch inst =
279  TransportRegistry::instance()->create_inst(inst_name, "tcp");
280  bit_config_->instances_.push_back(inst);
281 
282  if (!use_local_bit_config_) {
283  bit_transport_ip_ = TheServiceParticipant->bit_transport_ip();
284  bit_transport_port_ = TheServiceParticipant->bit_transport_port();
285  }
286 
287  // Use a static cast to avoid dependency on the Tcp library
288  TcpInst_rch tcp_inst = static_rchandle_cast<TcpInst>(inst);
289 
290  tcp_inst->datalink_release_delay_ = 0;
291  if (!bit_transport_ip_.empty()) {
292  tcp_inst->local_address(bit_transport_port_,
293  bit_transport_ip_.c_str());
294  } else {
295  tcp_inst->local_address_set_port(bit_transport_port_);
296  }
297 
298  if (DCPS_debug_level) {
299  ACE_DEBUG((LM_INFO, ACE_TEXT("(%P|%t) InfoRepoDiscovery::bit_config")
300  ACE_TEXT(" - BIT tcp transport %C\n"), tcp_inst->local_address_string().c_str()));
301  }
302  }
303  return bit_config_;
304 #else
305  return TransportConfig_rch();
306 #endif
307 }
308 
311 {
312 #if defined (DDS_HAS_MINIMUM_BIT)
313  ACE_UNUSED_ARG(participant);
314  return RcHandle<BitSubscriber>();
315 #else
316  if (!TheServiceParticipant->get_BIT()) {
317  return RcHandle<BitSubscriber>();
318  }
319 
320  if (create_bit_topics(participant) != DDS::RETCODE_OK) {
321  return RcHandle<BitSubscriber>();
322  }
323 
324  DDS::Subscriber_var bit_subscriber =
326  DDS::SubscriberListener::_nil(),
328  try {
329  TransportConfig_rch config = bit_config();
330  TransportRegistry::instance()->bind_config(config, bit_subscriber);
331 
332  } catch (const Transport::Exception&) {
333  ACE_ERROR((LM_ERROR, "(%P|%t) InfoRepoDiscovery::init_bit, "
334  "exception during transport initialization\n"));
335  return RcHandle<BitSubscriber>();
336  }
337 
338  // DataReaders
339  try {
340  DDS::DataReaderQos participantReaderQos;
341  bit_subscriber->get_default_datareader_qos(participantReaderQos);
342  participantReaderQos.durability.kind = DDS::TRANSIENT_LOCAL_DURABILITY_QOS;
343 
344  if (participant->federated()) {
345  participantReaderQos.liveliness.lease_duration.nanosec = 0;
346  participantReaderQos.liveliness.lease_duration.sec =
347  TheServiceParticipant->federation_liveliness();
348  }
349 
350  DDS::TopicDescription_var bit_part_topic =
352 
353  DDS::DataReader_var dr =
354  bit_subscriber->create_datareader(bit_part_topic,
355  participantReaderQos,
356  DDS::DataReaderListener::_nil(),
358 
359  if (participant->federated()) {
360  DDS::ParticipantBuiltinTopicDataDataReader_var pbit_dr =
361  DDS::ParticipantBuiltinTopicDataDataReader::_narrow(dr.in());
362 
363  DataReaderListener_var failover = new FailoverListener(key());
364  pbit_dr->set_listener(failover, DEFAULT_STATUS_MASK);
365  // No need to invoke the listener.
366  }
367 
368  DDS::DataReaderQos dr_qos;
369  bit_subscriber->get_default_datareader_qos(dr_qos);
371 
372  DDS::TopicDescription_var bit_topic_topic =
374 
375  dr = bit_subscriber->create_datareader(bit_topic_topic,
376  dr_qos,
377  DDS::DataReaderListener::_nil(),
379 
380  DDS::TopicDescription_var bit_pub_topic =
382 
383  dr = bit_subscriber->create_datareader(bit_pub_topic,
384  dr_qos,
385  DDS::DataReaderListener::_nil(),
387 
388  DDS::TopicDescription_var bit_sub_topic =
390 
391  dr = bit_subscriber->create_datareader(bit_sub_topic,
392  dr_qos,
393  DDS::DataReaderListener::_nil(),
395 
396  const DDS::ReturnCode_t ret = bit_subscriber->enable();
397  if (ret != DDS::RETCODE_OK) {
398  if (DCPS_debug_level) {
399  ACE_DEBUG((LM_INFO, ACE_TEXT("(%P|%t) InfoRepoDiscovery::init_bit")
400  ACE_TEXT(" - Error <%C> enabling subscriber\n"), retcode_to_string(ret)));
401  }
402  return RcHandle<BitSubscriber>();
403  }
404 
405  } catch (const CORBA::Exception&) {
406  ACE_ERROR((LM_ERROR, "(%P|%t) InfoRepoDiscovery::init_bit, "
407  "exception during DataReader initialization\n"));
408  return RcHandle<BitSubscriber>();
409  }
410  return make_rch<BitSubscriber>(bit_subscriber);
411 #endif
412 }
413 
414 void
416 {
417  // nothing to do for DCPSInfoRepo
418 }
419 
420 bool
422 {
423  try {
424  // invoke a CORBA call, if we are active then there will be no exception
425  get_dcps_info()->_is_a("Not_An_IDL_Type");
426  return true;
427  } catch (const CORBA::Exception&) {
428  return false;
429  }
430 }
431 
432 // Participant operations:
433 
434 bool
436  const GUID_t& participantId)
437 {
438  try {
439  return get_dcps_info()->attach_participant(domainId, participantId);
440  } catch (const CORBA::Exception& ex) {
441  ex._tao_print_exception("ERROR: InfoRepoDiscovery::attach_participant: ");
442  return false;
443  }
444 }
445 
448 {
449  return GUID_UNKNOWN;
450 }
451 
454  const DDS::DomainParticipantQos& qos,
456 {
457  try {
458  const DCPSInfo_var info = get_dcps_info();
459  if (!CORBA::is_nil(info)) {
460  return info->add_domain_participant(domainId, qos);
461  }
462  } catch (const CORBA::Exception& ex) {
463  ex._tao_print_exception("ERROR: InfoRepoDiscovery::add_domain_participant: ");
464  }
465  const DCPS::AddDomainStatus ads = {OpenDDS::DCPS::GUID_UNKNOWN, false /*federated*/};
466  return ads;
467 }
468 
469 #if defined(OPENDDS_SECURITY)
472  DDS::DomainId_t /*domain*/,
473  const DDS::DomainParticipantQos& /*qos*/,
475  const OpenDDS::DCPS::GUID_t& /*guid*/,
479 {
480  const DCPS::AddDomainStatus ads = {OpenDDS::DCPS::GUID_UNKNOWN, false /*federated*/};
481  return ads;
482 }
483 #endif
484 
485 bool
487  const GUID_t& participantId)
488 {
489  try {
490  get_dcps_info()->remove_domain_participant(domainId, participantId);
491  return true;
492  } catch (const CORBA::Exception& ex) {
493  ex._tao_print_exception("ERROR: InfoRepoDiscovery::remove_domain_participant: ");
494  return false;
495  }
496 }
497 
498 bool
500  const GUID_t& myParticipantId,
501  const GUID_t& ignoreId)
502 {
503  try {
504  get_dcps_info()->ignore_domain_participant(domainId, myParticipantId, ignoreId);
505  return true;
506  } catch (const CORBA::Exception& ex) {
507  ex._tao_print_exception("ERROR: InfoRepoDiscovery::ignore_domain_participant: ");
508  return false;
509  }
510 }
511 
512 bool
514  const GUID_t& participant,
515  const DDS::DomainParticipantQos& qos)
516 {
517  try {
518  return get_dcps_info()->update_domain_participant_qos(domainId, participant, qos);
519  } catch (const CORBA::Exception& ex) {
520  ex._tao_print_exception("ERROR: InfoRepoDiscovery::update_domain_participant_qos: ");
521  return false;
522  }
523 }
524 
525 // Topic operations:
526 
528 InfoRepoDiscovery::assert_topic(DCPS::GUID_t_out topicId, DDS::DomainId_t domainId,
529  const GUID_t& participantId, const char* topicName,
530  const char* dataTypeName, const DDS::TopicQos& qos,
531  bool hasDcpsKey, TopicCallbacks* /*topic_callbacks*/)
532 {
533  try {
534  return get_dcps_info()->assert_topic(topicId, domainId, participantId, topicName,
535  dataTypeName, qos, hasDcpsKey);
536  } catch (const CORBA::Exception& ex) {
537  ex._tao_print_exception("ERROR: InfoRepoDiscovery::assert_topic: ");
538  return DCPS::INTERNAL_ERROR;
539  }
540 }
541 
544  const DCPS::GUID_t& /*participantId*/,
545  const char* topicName,
546  CORBA::String_out dataTypeName,
547  DDS::TopicQos_out qos,
548  DCPS::GUID_t_out topicId)
549 {
550  try {
551  return get_dcps_info()->find_topic(domainId, topicName, dataTypeName, qos, topicId);
552  } catch (const CORBA::Exception& ex) {
553  ex._tao_print_exception("ERROR: InfoRepoDiscovery::find_topic: ");
554  return DCPS::INTERNAL_ERROR;
555  }
556 }
557 
560  const GUID_t& topicId)
561 {
562  try {
563  return get_dcps_info()->remove_topic(domainId, participantId, topicId);
564  } catch (const CORBA::Exception& ex) {
565  ex._tao_print_exception("ERROR: InfoRepoDiscovery::remove_topic: ");
566  return DCPS::INTERNAL_ERROR;
567  }
568 }
569 
570 bool
571 InfoRepoDiscovery::ignore_topic(DDS::DomainId_t domainId, const GUID_t& myParticipantId,
572  const GUID_t& ignoreId)
573 {
574  try {
575  get_dcps_info()->ignore_topic(domainId, myParticipantId, ignoreId);
576  return true;
577  } catch (const CORBA::Exception& ex) {
578  ex._tao_print_exception("ERROR: InfoRepoDiscovery::ignore_topic: ");
579  return false;
580  }
581 }
582 
583 bool
585  const GUID_t& participantId, const DDS::TopicQos& qos)
586 {
587  try {
588  return get_dcps_info()->update_topic_qos(topicId, domainId, participantId, qos);
589  } catch (const CORBA::Exception& ex) {
590  ex._tao_print_exception("ERROR: InfoRepoDiscovery::update_topic_qos: ");
591  return false;
592  }
593 }
594 
595 
596 // Publication operations:
597 
598 GUID_t
600  const GUID_t& participantId,
601  const GUID_t& topicId,
602  DCPS::DataWriterCallbacks_rch publication,
603  const DDS::DataWriterQos& qos,
604  const DCPS::TransportLocatorSeq& transInfo,
605  const DDS::PublisherQos& publisherQos,
606  const XTypes::TypeInformation& type_info)
607 {
608  GUID_t pubId;
609 
610  try {
611  DCPS::DataWriterRemoteImpl* writer_remote_impl = 0;
612  ACE_NEW_RETURN(writer_remote_impl,
613  DataWriterRemoteImpl(*publication),
615 
616  //this is taking ownership of the DataWriterRemoteImpl (server side) allocated above
617  PortableServer::ServantBase_var writer_remote(writer_remote_impl);
618 
619  //this is the client reference to the DataWriterRemoteImpl
620  OpenDDS::DCPS::DataWriterRemote_var dr_remote_obj =
621  servant_to_remote_reference(writer_remote_impl, orb_);
622  //turn into a octet seq to pass through generated files
623  DDS::OctetSeq serializedTypeInfo;
624  XTypes::serialize_type_info(type_info, serializedTypeInfo);
625 
626  pubId = get_dcps_info()->add_publication(domainId, participantId, topicId,
627  dr_remote_obj, qos, transInfo, publisherQos, serializedTypeInfo);
628 
630  // take ownership of the client allocated above
631  dataWriterMap_[pubId] = dr_remote_obj;
632 
633  } catch (const CORBA::Exception& ex) {
634  ex._tao_print_exception("ERROR: InfoRepoDiscovery::add_publication: ");
635  pubId = DCPS::GUID_UNKNOWN;
636  }
637 
638  return pubId;
639 }
640 
641 bool
643  const GUID_t& participantId,
644  const GUID_t& publicationId)
645 {
646  {
647  ACE_GUARD_RETURN(ACE_Thread_Mutex, g, this->lock_, false);
648  removeDataWriterRemote(publicationId);
649  }
650  bool removed = false;
651  try {
652  get_dcps_info()->remove_publication(domainId, participantId, publicationId);
653  removed = true;
654  } catch (const CORBA::Exception& ex) {
655  ex._tao_print_exception("ERROR: InfoRepoDiscovery::remove_publication: ");
656  }
657 
658  return removed;
659 }
660 
661 bool
663  const GUID_t& participantId,
664  const GUID_t& ignoreId)
665 {
666  try {
667  get_dcps_info()->ignore_publication(domainId, participantId, ignoreId);
668  return true;
669  } catch (const CORBA::Exception& ex) {
670  ex._tao_print_exception("ERROR: InfoRepoDiscovery::ignore_publication: ");
671  return false;
672  }
673 }
674 
675 bool
677  const GUID_t& participantId,
678  const GUID_t& dwId,
679  const DDS::DataWriterQos& qos,
680  const DDS::PublisherQos& publisherQos)
681 {
682  try {
683  return get_dcps_info()->update_publication_qos(domainId, participantId, dwId,
684  qos, publisherQos);
685  } catch (const CORBA::Exception& ex) {
686  ex._tao_print_exception("ERROR: InfoRepoDiscovery::update_publication_qos: ");
687  return false;
688  }
689 }
690 
691 
692 // Subscription operations:
693 
694 GUID_t
696  const GUID_t& participantId,
697  const GUID_t& topicId,
698  DCPS::DataReaderCallbacks_rch subscription,
699  const DDS::DataReaderQos& qos,
700  const DCPS::TransportLocatorSeq& transInfo,
701  const DDS::SubscriberQos& subscriberQos,
702  const char* filterClassName,
703  const char* filterExpr,
704  const DDS::StringSeq& params,
705  const XTypes::TypeInformation& type_info)
706 {
707  GUID_t subId;
708 
709  try {
710  DCPS::DataReaderRemoteImpl* reader_remote_impl = 0;
711  ACE_NEW_RETURN(reader_remote_impl,
712  DataReaderRemoteImpl(*subscription),
714 
715  //this is taking ownership of the DataReaderRemoteImpl (server side) allocated above
716  PortableServer::ServantBase_var reader_remote(reader_remote_impl);
717 
718  //this is the client reference to the DataReaderRemoteImpl
719  OpenDDS::DCPS::DataReaderRemote_var dr_remote_obj =
720  servant_to_remote_reference(reader_remote_impl, orb_);
721  //turn into a octet seq to pass through generated files
722  DDS::OctetSeq serializedTypeInfo;
723  XTypes::serialize_type_info(type_info, serializedTypeInfo);
724 
725  subId = get_dcps_info()->add_subscription(domainId, participantId, topicId,
726  dr_remote_obj, qos, transInfo, subscriberQos,
727  filterClassName, filterExpr, params,
728  serializedTypeInfo);
729 
731  // take ownership of the client allocated above
732  dataReaderMap_[subId] = dr_remote_obj;
733 
734  } catch (const CORBA::Exception& ex) {
735  ex._tao_print_exception("ERROR: InfoRepoDiscovery::add_subscription: ");
736  subId = DCPS::GUID_UNKNOWN;
737  }
738  return subId;
739 }
740 
741 bool
743  const GUID_t& participantId,
744  const GUID_t& subscriptionId)
745 {
746  {
747  ACE_GUARD_RETURN(ACE_Thread_Mutex, g, this->lock_, false);
748  removeDataReaderRemote(subscriptionId);
749  }
750  bool removed = false;
751  try {
752  get_dcps_info()->remove_subscription(domainId, participantId, subscriptionId);
753  removed = true;
754  } catch (const CORBA::Exception& ex) {
755  ex._tao_print_exception("ERROR: InfoRepoDiscovery::remove_subscription: ");
756  }
757 
758  return removed;
759 }
760 
761 bool
763  const GUID_t& participantId,
764  const GUID_t& ignoreId)
765 {
766  try {
767  get_dcps_info()->ignore_subscription(domainId, participantId, ignoreId);
768  return true;
769  } catch (const CORBA::Exception& ex) {
770  ex._tao_print_exception("ERROR: InfoRepoDiscovery::ignore_subscription: ");
771  return false;
772  }
773 }
774 
775 bool
777  const GUID_t& participantId,
778  const GUID_t& drId,
779  const DDS::DataReaderQos& qos,
780  const DDS::SubscriberQos& subQos)
781 {
782  try {
783  return get_dcps_info()->update_subscription_qos(domainId, participantId,
784  drId, qos, subQos);
785  } catch (const CORBA::Exception& ex) {
786  ex._tao_print_exception("ERROR: InfoRepoDiscovery::update_subscription_qos: ");
787  return false;
788  }
789 }
790 
791 bool
793  const GUID_t& participantId,
794  const GUID_t& subId,
795  const DDS::StringSeq& params)
796 
797 {
798  try {
799  return get_dcps_info()->update_subscription_params(domainId, participantId,
800  subId, params);
801  } catch (const CORBA::Exception& ex) {
802  ex._tao_print_exception("ERROR: InfoRepoDiscovery::update_subscription_params: ");
803  return false;
804  }
805 }
806 
807 
808 // Managing reader/writer associations:
809 
810 void
812 {
813  DataReaderMap::iterator drr = dataReaderMap_.find(subscriptionId);
814  if (drr == dataReaderMap_.end()) {
816  ACE_TEXT("(%P|%t) ERROR: InfoRepoDiscovery::removeDataReaderRemote: ")
817  ACE_TEXT(" could not find DataReader for subscriptionId.\n")));
818  return;
819  }
820 
821  try {
822  DataReaderRemoteImpl* impl =
823  remote_reference_to_servant<DataReaderRemoteImpl>(drr->second.in(), orb_);
824  impl->detach_parent();
825  deactivate_remote_object(drr->second.in(), orb_);
826  } catch (const CORBA::BAD_INV_ORDER&) {
827  // The orb may throw ::CORBA::BAD_INV_ORDER when is has been shutdown.
828  // Ignore it anyway.
829  } catch (const CORBA::OBJECT_NOT_EXIST&) {
830  // Same for CORBA::OBJECT_NOT_EXIST
831  }
832 
833  dataReaderMap_.erase(drr);
834 }
835 
836 void
838 {
839  DataWriterMap::iterator dwr = dataWriterMap_.find(publicationId);
840  if (dwr == dataWriterMap_.end()) {
842  ACE_TEXT("(%P|%t) ERROR: InfoRepoDiscovery::removeDataWriterRemote: ")
843  ACE_TEXT(" could not find DataWriter for publicationId.\n")));
844  return;
845  }
846 
847  try {
848  DataWriterRemoteImpl* impl =
849  remote_reference_to_servant<DataWriterRemoteImpl>(dwr->second.in(), orb_);
850  impl->detach_parent();
851  deactivate_remote_object(dwr->second.in(), orb_);
852  } catch (const CORBA::BAD_INV_ORDER&) {
853  // The orb may throw ::CORBA::BAD_INV_ORDER when is has been shutdown.
854  // Ignore it anyway.
855  } catch (const CORBA::OBJECT_NOT_EXIST&) {
856  // Same for CORBA::OBJECT_NOT_EXIST
857  }
858 
859  dataWriterMap_.erase(dwr);
860 }
861 
862 namespace {
863  const ACE_TCHAR REPO_SECTION_NAME[] = ACE_TEXT("repository");
864 }
865 
866 int
868 {
869  const ACE_Configuration_Section_Key& root = cf.root_section();
871 
872  if (cf.open_section(root, REPO_SECTION_NAME, 0, repo_sect) != 0) {
873  if (DCPS_debug_level > 0) {
874  // This is not an error if the configuration file does not have
875  // any repository (sub)section. The code default configuration will be used.
877  ACE_TEXT("(%P|%t) NOTICE: InfoRepoDiscovery::Config::discovery_config ")
878  ACE_TEXT("failed to open [%s] section.\n"),
880  }
881 
882  return 0;
883 
884  } else {
885  // Ensure there are no properties in this section
886  ValueMap vm;
887  if (pullValues(cf, repo_sect, vm) > 0) {
888  // There are values inside [repo]
890  ACE_TEXT("(%P|%t) InfoRepoDiscovery::Config::discovery_config ")
891  ACE_TEXT("repo sections must have a subsection name\n")),
892  -1);
893  }
894  // Process the subsections of this section (the individual repos)
895  KeyList keys;
896  if (processSections( cf, repo_sect, keys ) != 0) {
898  ACE_TEXT("(%P|%t) InfoRepoDiscovery::Config::discovery_config ")
899  ACE_TEXT("too many nesting layers in the [repo] section.\n")),
900  -1);
901  }
902 
903  // Loop through the [repo/*] sections
904  for (KeyList::const_iterator it=keys.begin(); it != keys.end(); ++it) {
905  std::string repo_name = (*it).first;
906 
907  ValueMap values;
908  pullValues( cf, (*it).second, values );
910  bool repoKeySpecified = false, bitIpSpecified = false,
911  bitPortSpecified = false;
912  std::string repoIor;
913  int bitPort = 0;
914  std::string bitIp;
915  for (ValueMap::const_iterator it=values.begin(); it != values.end(); ++it) {
916  std::string name = (*it).first;
917  if (name == "RepositoryKey") {
918  repoKey = (*it).second;
919  repoKeySpecified = true;
920  if (DCPS_debug_level > 0) {
922  ACE_TEXT("(%P|%t) [repository/%C]: RepositoryKey == %C\n"),
923  repo_name.c_str(), repoKey.c_str()));
924  }
925 
926  } else if (name == "RepositoryIor") {
927  repoIor = (*it).second;
928 
929  if (DCPS_debug_level > 0) {
931  ACE_TEXT("(%P|%t) [repository/%C]: RepositoryIor == %C\n"),
932  repo_name.c_str(), repoIor.c_str()));
933  }
934  } else if (name == "DCPSBitTransportIPAddress") {
935  bitIp = (*it).second;
936  bitIpSpecified = true;
937  if (DCPS_debug_level > 0) {
939  ACE_TEXT("(%P|%t) [repository/%C]: DCPSBitTransportIPAddress == %C\n"),
940  repo_name.c_str(), bitIp.c_str()));
941  }
942  } else if (name == "DCPSBitTransportPort") {
943  std::string value = (*it).second;
944  bitPort = ACE_OS::atoi(value.c_str());
945  bitPortSpecified = true;
946  if (convertToInteger(value, bitPort)) {
947  } else {
949  ACE_TEXT("(%P|%t) InfoRepoDiscovery::Config::discovery_config ")
950  ACE_TEXT("Illegal integer value for DCPSBitTransportPort (%C) in [repository/%C] section.\n"),
951  value.c_str(), repo_name.c_str()),
952  -1);
953  }
954  if (DCPS_debug_level > 0) {
956  ACE_TEXT("(%P|%t) [repository/%C]: DCPSBitTransportPort == %d\n"),
957  repo_name.c_str(), bitPort));
958  }
959  } else {
961  ACE_TEXT("(%P|%t) InfoRepoDiscovery::Config::discovery_config ")
962  ACE_TEXT("Unexpected entry (%C) in [repository/%C] section.\n"),
963  name.c_str(), repo_name.c_str()),
964  -1);
965  }
966  }
967 
968  if (values.find("RepositoryIor") == values.end()) {
970  ACE_TEXT("(%P|%t) InfoRepoDiscovery::Config::discovery_config ")
971  ACE_TEXT("Repository section [repository/%C] section is missing RepositoryIor value.\n"),
972  repo_name.c_str()),
973  -1);
974  }
975 
976  if (!repoKeySpecified) {
977  // If the RepositoryKey option was not specified, use the section
978  // name as the repo key
979  repoKey = repo_name;
980  }
981  InfoRepoDiscovery_rch discovery(
982  make_rch<InfoRepoDiscovery>(repoKey, repoIor.c_str()));
983  if (bitPortSpecified) discovery->bit_transport_port(bitPort);
984  if (bitIpSpecified) discovery->bit_transport_ip(bitIp);
985  TheServiceParticipant->add_discovery(
986  DCPS::static_rchandle_cast<Discovery>(discovery));
987  }
988  }
989 
990  return 0;
991 }
992 
993 void
995 {
996  orb_->shutdown();
997  ThreadStatusManager& thread_status_manager = TheServiceParticipant->get_thread_status_manager();
998  {
999  ThreadStatusManager::Sleeper s(thread_status_manager);
1000  wait();
1001  }
1002  orb_->destroy();
1003 }
1004 
1007 
1008 int
1010 {
1011  ThreadStatusManager::Start s(TheServiceParticipant->get_thread_status_manager(), "OrbRunner");
1012 
1013  // this method was originally Service_Participant::svc()
1014  bool done = false;
1015 
1016  // Ignore all signals to avoid
1017  // ERROR: <something descriptive> Interrupted system call
1018  // The main thread will handle signals.
1019  sigset_t set;
1020  ACE_OS::sigfillset(&set);
1021  ACE_OS::thr_sigsetmask(SIG_SETMASK, &set, NULL);
1022 
1023  while (!done) {
1024  try {
1025  if (orb_->orb_core()->has_shutdown() == false) {
1026  orb_->run();
1027  }
1028 
1029  done = true;
1030 
1031  } catch (const CORBA::SystemException& sysex) {
1032  sysex._tao_print_exception(
1033  "ERROR: InfoRepoDiscovery::OrbRunner");
1034 
1035  } catch (const CORBA::UserException& userex) {
1036  userex._tao_print_exception(
1037  "ERROR: InfoRepoDiscovery::OrbRunner");
1038 
1039  } catch (const CORBA::Exception& ex) {
1041  "ERROR: InfoRepoDiscovery::OrbRunner");
1042  }
1043 
1044  if (orb_->orb_core()->has_shutdown()) {
1045  done = true;
1046 
1047  } else {
1048  orb_->orb_core()->reactor()->reset_reactor_event_loop();
1049  }
1050  }
1051 
1052  return 0;
1053 }
1054 
1055 class InfoRepoType : public TransportType {
1056 public:
1057  const char* name() { return "repository"; }
1058 
1059  TransportInst_rch new_inst(const std::string&)
1060  {
1061  return TransportInst_rch();
1062  }
1063 };
1064 
1066 {
1068  if (!registry->register_type(make_rch<InfoRepoType>())) {
1069  return;
1070  }
1071  TheServiceParticipant->register_discovery_type("repository", new Config);
1072 }
1073 
1074 int
1076 {
1077  // no-op: since the library is loaded, InfoRepoDiscovery::StaticInitializer
1078  // has already been constructed.
1079  return 0;
1080 }
1081 
1082 ACE_FACTORY_DEFINE(OpenDDS_InfoRepoDiscovery, IRDiscoveryLoader);
1085  ACE_TEXT("OpenDDS_InfoRepoDiscovery"),
1086  ACE_SVC_OBJ_T,
1089  0)
1090 
1091 } // namespace DCPS
1092 } // namespace OpenDDS
1093 
virtual bool ignore_topic(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &myParticipantId, const OpenDDS::DCPS::GUID_t &ignoreId)
#define TheTransportRegistry
Implements the OpenDDS::DCPS::DomainParticipant interfaces.
#define ACE_DEBUG(X)
CORBA::Object_ptr resolve_initial_references(const char *name, ACE_Time_Value *timeout=0)
CORBA::Object_ptr string_to_object(const char *str)
#define ACE_ERROR(X)
Implements the OpenDDS::DCPS::ReaderRemote interface that is used to add and remove associations...
const char * c_str(void) const
static const char * DEFAULT_REPO
Key value for the default repository IOR.
Definition: Discovery.h:85
const LogLevel::Value value
Definition: debug.cpp:61
int bit_transport_port_
The builtin topic transport port number.
RepoKey key() const
Definition: Discovery.h:96
virtual int init(int argc, ACE_TCHAR *argv[])
CORBA::Policy_ptr create_policy(CORBA::PolicyType type, const CORBA::Any &val)
ACE_FACTORY_DEFINE(OpenDDS_InfoRepoDiscovery, IRDiscoveryLoader)
LM_INFO
virtual bool attach_participant(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId)
virtual bool update_subscription_qos(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &partId, const OpenDDS::DCPS::GUID_t &drId, const DDS::DataReaderQos &qos, const DDS::SubscriberQos &subscriberQos)
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
DDS::ReturnCode_t create_bit_topics(DomainParticipantImpl *participant)
Definition: Discovery.cpp:51
Implements the OpenDDS::DCPS::DataWriterRemote interface.
sequence< octet > key
& ACE_SVC_NAME(TAO_AV_TCP_Factory)
CORBA::OctetSeq_var ObjectId_var
static const ACE_TCHAR REPO_SECTION_NAME[]
virtual DDS::Subscriber_ptr create_subscriber(const DDS::SubscriberQos &qos, DDS::SubscriberListener_ptr a_listener, DDS::StatusMask mask)
bool set_ORB(CORBA::ORB_ptr orb)
virtual void fini_bit(DCPS::DomainParticipantImpl *participant)
virtual bool update_subscription_params(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId, const OpenDDS::DCPS::GUID_t &subscriptionId, const DDS::StringSeq &params)
static ACE_Thread_Mutex mtx_orb_runner_
LivelinessQosPolicy liveliness
virtual const ACE_Configuration_Section_Key & root_section(void) const
virtual OpenDDS::DCPS::GUID_t add_subscription(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId, const OpenDDS::DCPS::GUID_t &topicId, OpenDDS::DCPS::DataReaderCallbacks_rch subscription, const DDS::DataReaderQos &qos, const OpenDDS::DCPS::TransportLocatorSeq &transInfo, const DDS::SubscriberQos &subscriberQos, const char *filterClassName, const char *filterExpression, const DDS::StringSeq &exprParams, const XTypes::TypeInformation &type_info)
RcHandle< TransportConfig > TransportConfig_rch
int discovery_config(ACE_Configuration_Heap &cf)
pid_t wait(int *=0)
virtual bool update_publication_qos(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &partId, const OpenDDS::DCPS::GUID_t &dwId, const DDS::DataWriterQos &qos, const DDS::PublisherQos &publisherQos)
sequence< TransportLocator > TransportLocatorSeq
virtual bool update_domain_participant_qos(DDS::DomainId_t domain, const OpenDDS::DCPS::GUID_t &participantId, const DDS::DomainParticipantQos &qos)
std::string bit_transport_ip_
The builtin topic transport address.
ACE_STATIC_SVC_DEFINE(ACE_Logging_Strategy, ACE_TEXT("Logging_Strategy"), ACE_Service_Type::SERVICE_OBJECT, &ACE_SVC_NAME(ACE_Logging_Strategy), ACE_Service_Type::DELETE_THIS|ACE_Service_Type::DELETE_OBJ, 0) extern "C" int _get_dll_unload_policy()
ACE_SVC_OBJ_T
RcHandle< TransportInst > TransportInst_rch
The type definition for the smart-pointer to the underlying type.
T * _duplicate(T *st)
void bind_config(const OPENDDS_STRING &name, DDS::Entity_ptr entity)
const DDS::StatusMask DEFAULT_STATUS_MASK
const char *const BUILT_IN_PUBLICATION_TOPIC
const char DEFAULT_ORB_NAME[]
virtual bool remove_subscription(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId, const OpenDDS::DCPS::GUID_t &subscriptionId)
DOMAINID_TYPE_NATIVE DomainId_t
TransportInst_rch new_inst(const std::string &)
LM_DEBUG
virtual bool ignore_subscription(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &myParticipantId, const OpenDDS::DCPS::GUID_t &ignoreId)
DurabilityQosPolicyKind kind
DurabilityQosPolicy durability
unsigned long nanosec
Definition: DdsDcpsCore.idl:69
char ACE_TCHAR
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
void serialize_type_info(const TypeInformation &type_info, T &seq, const DCPS::Encoding *encoding_option=0)
Definition: TypeObject.h:3382
int argc(void) const
int thr_sigsetmask(int how, const sigset_t *nsm, sigset_t *osm)
LM_NOTICE
sequence< Policy > PolicyList
virtual ACE_CString _info(void) const=0
virtual OpenDDS::DCPS::TopicStatus assert_topic(OpenDDS::DCPS::GUID_t_out topicId, DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId, const char *topicName, const char *dataTypeName, const DDS::TopicQos &qos, bool hasDcpsKey, TopicCallbacks *topic_callbacks)
const char *const BUILT_IN_PARTICIPANT_TOPIC
_retn_type _retn(void)
sequence< octet > OctetSeq
Definition: DdsDcpsCore.idl:64
TransportInst_rch create_inst(const OPENDDS_STRING &name, const OPENDDS_STRING &transport_type)
static CORBA::ORB_ptr _duplicate(CORBA::ORB_ptr orb)
TransportConfig_rch create_config(const OPENDDS_STRING &name)
const char *const name
Definition: debug.cpp:60
virtual OpenDDS::DCPS::TopicStatus remove_topic(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId, const OpenDDS::DCPS::GUID_t &topicId)
void removeDataWriterRemote(const GUID_t &publicationId)
_in_type in(void) const
virtual bool ignore_publication(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &myParticipantId, const OpenDDS::DCPS::GUID_t &ignoreId)
ACE_TEXT("TCP_Factory")
int atoi(const char *s)
const char *const BUILT_IN_SUBSCRIPTION_TOPIC
CHAR_TYPE ** argv(void)
#define SUBSCRIBER_QOS_DEFAULT
#define SIG_SETMASK
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
virtual OpenDDS::DCPS::TopicStatus find_topic(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId, const char *topicName, CORBA::String_out dataTypeName, DDS::TopicQos_out qos, OpenDDS::DCPS::GUID_t_out topicId)
virtual bool update_topic_qos(const OpenDDS::DCPS::GUID_t &topicId, DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId, const DDS::TopicQos &qos)
ORB_ptr ORB_init(int &argc, char *argv[], const char *orb_name=0)
Base class for concrete transports to provide new objects.
Definition: TransportType.h:37
const char *const BUILT_IN_TOPIC_TOPIC
virtual int open_section(const ACE_Configuration_Section_Key &base, const ACE_TCHAR *sub_section, bool create, ACE_Configuration_Section_Key &result)
const char * retcode_to_string(DDS::ReturnCode_t value)
Definition: DCPS_Utils.cpp:29
RcHandle< T > static_rchandle_cast(const RcHandle< U > &h)
Definition: RcHandle_T.h:202
virtual OpenDDS::DCPS::AddDomainStatus add_domain_participant(DDS::DomainId_t domain, const DDS::DomainParticipantQos &qos, XTypes::TypeLookupService_rch tls)
virtual OpenDDS::DCPS::AddDomainStatus add_domain_participant_secure(DDS::DomainId_t domain, const DDS::DomainParticipantQos &qos, XTypes::TypeLookupService_rch tls, const OpenDDS::DCPS::GUID_t &guid, DDS::Security::IdentityHandle id, DDS::Security::PermissionsHandle perm, DDS::Security::ParticipantCryptoHandle part_crypto)
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
virtual RcHandle< BitSubscriber > init_bit(DomainParticipantImpl *participant)
Discovery Strategy interface class.
Definition: Discovery.h:76
static const char DEFAULT_INST_PREFIX[]
static TransportRegistry * instance()
Return a singleton instance of this class.
int processSections(ACE_Configuration_Heap &cf, const ACE_Configuration_Section_Key &key, KeyList &subsections)
Definition: ConfigUtils.cpp:41
virtual bool remove_domain_participant(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId)
#define ACE_NEW_RETURN(POINTER, CONSTRUCTOR, RET_VAL)
const ReturnCode_t RETCODE_OK
virtual OpenDDS::DCPS::GUID_t generate_participant_guid()
virtual bool ignore_domain_participant(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &myParticipantId, const OpenDDS::DCPS::GUID_t &ignoreId)
#define ACE_ERROR_RETURN(X, Y)
void removeDataReaderRemote(const GUID_t &subscriptionId)
virtual int activate(long flags=THR_NEW_LWP|THR_JOINABLE|THR_INHERIT_SCHED, int n_threads=1, int force_active=0, long priority=ACE_DEFAULT_THREAD_PRIORITY, int grp_id=-1, ACE_Task_Base *task=0, ACE_hthread_t thread_handles[]=0, void *stack[]=0, size_t stack_size[]=0, ACE_thread_t thread_ids[]=0, const char *thr_name[]=0)
int pullValues(ACE_Configuration_Heap &cf, const ACE_Configuration_Section_Key &key, ValueMap &values)
Definition: ConfigUtils.cpp:17
Defines the interface for Discovery callbacks into the Topic.
#define TheServiceParticipant
virtual OpenDDS::DCPS::GUID_t add_publication(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId, const OpenDDS::DCPS::GUID_t &topicId, OpenDDS::DCPS::DataWriterCallbacks_rch publication, const DDS::DataWriterQos &qos, const OpenDDS::DCPS::TransportLocatorSeq &transInfo, const DDS::PublisherQos &publisherQos, const XTypes::TypeInformation &type_info)
OPENDDS_STRING RepoKey
Definition: Discovery.h:80
int sigfillset(sigset_t *s)
LM_ERROR
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
void _tao_print_exception(const char *info, FILE *f=stdout) const
Definition: Exception.cpp:82
virtual DDS::TopicDescription_ptr lookup_topicdescription(const char *name)
virtual bool remove_publication(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId, const OpenDDS::DCPS::GUID_t &publicationId)
Boolean is_nil(T x)
bool register_type(const TransportType_rch &type)
sequence< string > StringSeq
Definition: DdsDcpsCore.idl:50
bool convertToInteger(const String &s, T &value)
InfoRepoDiscovery(const RepoKey &key, const std::string &ior)