OpenDDS  Snapshot(2023/04/28-20:55)
DCPSInfo_i.cpp
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 #include "DcpsInfo_pch.h"
9 
10 #include "tao/ORB_Core.h"
11 
12 #include /**/ "DCPSInfo_i.h"
13 
14 #include "dds/DCPS/InfoRepoDiscovery/InfoC.h"
21 #include "UpdateManager.h"
22 #include "ShutdownInterface.h"
23 
25 #include "dds/DCPS/GuidUtils.h"
27 
28 #include /**/ "tao/debug.h"
29 
30 #include /**/ "ace/Read_Buffer.h"
31 #include /**/ "ace/OS_NS_stdio.h"
32 #include "ace/Dynamic_Service.h"
33 #include "ace/Reactor.h"
34 
36 
37 namespace {
39 }
40 
41 // constructor
43  , bool reincarnate
44  , ShutdownInterface* shutdown
45  , const TAO_DDS_DCPSFederationId& federation)
46  : orb_(CORBA::ORB::_duplicate(orb))
47  , federation_(federation)
48  , participantIdGenerator_(federation.id())
49  , um_(0)
50  , reincarnate_(reincarnate)
51  , shutdown_(shutdown)
52  , reassociate_timer_id_(-1)
53  , dispatch_check_timer_id_(-1)
54 #ifndef DDS_HAS_MINIMUM_BIT
55  , in_cleanup_all_built_in_topics_(false)
56 #endif
57 {
58  if (!TheServiceParticipant->use_bidir_giop()) {
59  int argc = 0;
60  char** no_argv = 0;
61  dispatchingOrb_ = CORBA::ORB_init(argc, no_argv, "dispatchingOnly");
62  }
63 }
64 
66 {
67 }
68 
69 int
71  const void* arg)
72 {
74 
75  if (arg == this) {
76  if (dispatchingOrb_) {
77  if (dispatchingOrb_->work_pending()) {
78  // Ten microseconds
79  ACE_Time_Value smallval(0, 10);
80  dispatchingOrb_->perform_work(smallval);
81  }
82  }
83  } else {
84  // NOTE: This is a purposefully naive approach to addressing defunct
85  // associations. In the future, it may be worthwhile to introduce a
86  // callback model to fix the heinous runtime cost below:
87  for (DCPS_IR_Domain_Map::const_iterator dom(this->domains_.begin());
88  dom != this->domains_.end(); ++dom) {
89 
90  const DCPS_IR_Participant_Map& participants(dom->second->participants());
91  for (DCPS_IR_Participant_Map::const_iterator part(participants.begin());
92  part != participants.end(); ++part) {
93 
94  const DCPS_IR_Subscription_Map& subscriptions(part->second->subscriptions());
95  for (DCPS_IR_Subscription_Map::const_iterator sub(subscriptions.begin());
96  sub != subscriptions.end(); ++sub) {
97  sub->second->reevaluate_defunct_associations();
98  }
99 
100  const DCPS_IR_Publication_Map& publications(part->second->publications());
101  for (DCPS_IR_Publication_Map::const_iterator pub(publications.begin());
102  pub != publications.end(); ++pub) {
103  pub->second->reevaluate_defunct_associations();
104  }
105  }
106  }
107  }
108 
109  return 0;
110 }
111 
112 void
114 {
115  this->shutdown_->shutdown();
116 }
117 
120 {
121  return CORBA::ORB::_duplicate(this->orb_.in());
122 }
123 
125  DDS::DomainId_t domainId,
126  const OpenDDS::DCPS::GUID_t& participantId)
127 {
129 
130  // Grab the domain.
131  DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
132 
133  if (where == this->domains_.end()) {
135  }
136 
137  // Grab the participant.
138  DCPS_IR_Participant* participant
139  = where->second->participant(participantId);
140 
141  if (0 == participant) {
143  }
144 
145  // Establish ownership within the local repository.
146  participant->takeOwnership();
147 
148  return false;
149 }
150 
151 bool
153  DDS::DomainId_t domainId,
154  const OpenDDS::DCPS::GUID_t& participantId,
155  long sender,
156  long owner)
157 {
158  ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, false);
159 
160  // Grab the domain.
161  DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
162 
163  if (where == this->domains_.end()) {
164  return false;
165  }
166 
167  // Grab the participant.
168  DCPS_IR_Participant* participant
169  = where->second->participant(participantId);
170 
171  if (0 == participant) {
172  return false;
173  }
174 
175  // Establish the ownership.
176  participant->changeOwner(sender, owner);
177  return true;
178 }
179 
181  OpenDDS::DCPS::GUID_t_out topicId,
182  DDS::DomainId_t domainId,
183  const OpenDDS::DCPS::GUID_t& participantId,
184  const char * topicName,
185  const char * dataTypeName,
186  const DDS::TopicQos & qos,
187  bool /*hasDcpsKey -- only used for RTPS Discovery*/)
188 {
190  // Grab the domain.
191  DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
192 
193  if (where == this->domains_.end()) {
195  }
196 
197  // Grab the participant.
198  DCPS_IR_Participant* participantPtr
199  = where->second->participant(participantId);
200 
201  if (0 == participantPtr) {
203  }
204 
205  OpenDDS::DCPS::TopicStatus topicStatus
206  = where->second->add_topic(
207  topicId,
208  topicName,
209  dataTypeName,
210  qos,
211  participantPtr);
212 
213  if (this->um_ && (participantPtr->isBitPublisher() == false)) {
214  Update::UTopic topic(domainId, topicId, participantId
215  , topicName, dataTypeName
216  , const_cast<DDS::TopicQos &>(qos));
217  this->um_->create(topic);
218 
220  OpenDDS::DCPS::RepoIdConverter converter(topicId);
222  ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::assert_topic: ")
223  ACE_TEXT("pushing creation of topic %C in domain %d.\n"),
224  std::string(converter).c_str(),
225  domainId));
226  }
227  }
228  return topicStatus;
229 }
230 
231 bool
233  DDS::DomainId_t domainId,
234  const OpenDDS::DCPS::GUID_t& participantId,
235  const char* topicName,
236  const char* dataTypeName,
237  const DDS::TopicQos& qos)
238 {
239  ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, false);
240 
241  // Grab the domain.
242  DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
243 
244  if (where == this->domains_.end()) {
247  ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_topic: ")
248  ACE_TEXT("invalid domain %d.\n"),
249  domainId));
250  }
251 
252  return false;
253  }
254 
255  // Grab the participant.
256  DCPS_IR_Participant* participantPtr
257  = where->second->participant(participantId);
258 
259  if (0 == participantPtr) {
261  OpenDDS::DCPS::RepoIdConverter converter(participantId);
263  ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_topic: ")
264  ACE_TEXT("invalid participant %C.\n"),
265  std::string(converter).c_str()));
266  }
267 
268  return false;
269  }
270 
271  OpenDDS::DCPS::TopicStatus topicStatus
272  = where->second->force_add_topic(topicId, topicName, dataTypeName,
273  qos, participantPtr);
274 
275  if (topicStatus != OpenDDS::DCPS::CREATED) {
276  return false;
277  }
278 
279  OpenDDS::DCPS::RepoIdConverter converter(topicId);
280 
281  // See if we are adding a topic that was created within this
282  // repository or a different repository.
283  if (converter.federationId() == federation_.id()) {
284  // Ensure the topic GUID_t values do not conflict.
285  participantPtr->last_topic_key(converter.entityKey());
286  }
287 
288  return true;
289 }
290 
292  DDS::DomainId_t domainId,
293  const char * topicName,
294  CORBA::String_out dataTypeName,
295  DDS::TopicQos_out qos,
296  OpenDDS::DCPS::GUID_t_out topicId)
297 {
299 
300  // Grab the domain.
301  DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
302 
303  if (where == this->domains_.end()) {
305  }
306 
308 
309  DCPS_IR_Topic* topic = 0;
310  qos = new DDS::TopicQos;
311 
312  status = where->second->find_topic(topicName, topic);
313 
314  if (0 != topic) {
315  status = OpenDDS::DCPS::FOUND;
316  const DCPS_IR_Topic_Description* desc = topic->get_topic_description();
317  dataTypeName = desc->get_dataTypeName();
318  *qos = *(topic->get_topic_qos());
319  topicId = topic->get_id();
320  }
321 
322  return status;
323 }
324 
326  DDS::DomainId_t domainId,
327  const OpenDDS::DCPS::GUID_t& participantId,
328  const OpenDDS::DCPS::GUID_t& topicId)
329 {
331 
332  // Grab the domain.
333  DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
334 
335  if (where == this->domains_.end()) {
337  }
338 
339  // Grab the participant.
340  DCPS_IR_Participant* partPtr
341  = where->second->participant(participantId);
342 
343  if (0 == partPtr) {
345  }
346 
347  DCPS_IR_Topic* topic;
348 
349  if (partPtr->find_topic_reference(topicId, topic) != 0) {
351  }
352 
353  OpenDDS::DCPS::TopicStatus removedStatus = where->second->remove_topic(partPtr, topic);
354 
355  if (this->um_
356  && (partPtr->isOwner() == true)
357  && (partPtr->isBitPublisher() == false)) {
358  Update::IdPath path(domainId, participantId, topicId);
359  this->um_->destroy(path, Update::Topic);
360 
362  OpenDDS::DCPS::RepoIdConverter converter(topicId);
364  ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::remove_topic: ")
365  ACE_TEXT("pushing deletion of topic %C in domain %d.\n"),
366  std::string(converter).c_str(),
367  domainId));
368  }
369  }
370 
371  return removedStatus;
372 }
373 
375  DDS::DomainId_t domainId,
376  const OpenDDS::DCPS::GUID_t& participantId,
377  const OpenDDS::DCPS::GUID_t& topicId,
378  OpenDDS::DCPS::DataWriterRemote_ptr publication,
379  const DDS::DataWriterQos & qos,
380  const OpenDDS::DCPS::TransportLocatorSeq& transInfo,
381  const DDS::PublisherQos& publisherQos,
382  const DDS::OctetSeq& serializedTypeInfo)
383 {
384  if (CORBA::is_nil(publication)) {
387  ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_publication: ")
388  ACE_TEXT("invalid publication reference.\n")));
389  }
391  }
392 
394 
395  // Grab the domain.
396  DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
397 
398  if (where == this->domains_.end()) {
400  }
401 
402  // Grab the participant.
403  DCPS_IR_Participant* partPtr
404  = where->second->participant(participantId);
405 
406  if (0 == partPtr) {
408  }
409 
410  DCPS_IR_Topic* topic = where->second->find_topic(topicId);
411 
412  if (topic == 0) {
414  }
415 
416  // Get a Id for the Writer, make it a builtin kind if this is for a BIT
418  OpenDDS::DCPS::RepoIdConverter(topicId).isBuiltinDomainEntity());
419 
420  OpenDDS::DCPS::DataWriterRemote_var dispatchingPublication =
421  OpenDDS::DCPS::DataWriterRemote::_duplicate(publication);
422 
423  if (dispatchingOrb_) {
424  // Remarshall the remote reference onto the dispatching orb.
425  CORBA::String_var pubStr = orb_->object_to_string(dispatchingPublication);
426  CORBA::Object_var pubObj = dispatchingOrb_->string_to_object(pubStr);
427  if (CORBA::is_nil(pubObj)) {
430  ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_publication: ")
431  ACE_TEXT("failure marshalling publication on dispatching orb.\n")));
432  }
434  }
435 
436  dispatchingPublication = OpenDDS::DCPS::DataWriterRemote::_unchecked_narrow(pubObj);
437  }
438 
441  pubId,
442  partPtr,
443  topic,
444  dispatchingPublication.in(),
445  qos,
446  transInfo,
448  publisherQos,
449  serializedTypeInfo));
450 
451  DCPS_IR_Publication* pub = pubPtr.get();
452  if (partPtr->add_publication(OpenDDS::DCPS::move(pubPtr)) != 0) {
453  // failed to add. we are responsible for the memory.
455  } else if (topic->add_publication_reference(pub) != 0) {
456  // Failed to add to the topic
457  // so remove from participant and fail.
458  partPtr->remove_publication(pubId);
460  }
461 
462  if (this->um_ && (partPtr->isBitPublisher() == false)) {
463  CORBA::String_var callback = orb_->object_to_string(publication);
465 
466  Update::UWActor actor(domainId, pubId, topicId, participantId, Update::DataWriter
467  , callback.in()
468  , const_cast<DDS::PublisherQos&>(publisherQos)
469  , const_cast<DDS::DataWriterQos&>(qos)
470  , const_cast<OpenDDS::DCPS::TransportLocatorSeq&>(transInfo)
471  , transportContextDefault, csi
472  , const_cast<DDS::OctetSeq&>(serializedTypeInfo));
473  this->um_->create(actor);
474 
476  OpenDDS::DCPS::RepoIdConverter converter(pubId);
478  ACE_TEXT("(%P|%t) (GUID_t)TAO_DDS_DCPSInfo_i::add_publication: ")
479  ACE_TEXT("pushing creation of publication %C in domain %d.\n"),
480  std::string(converter).c_str(),
481  domainId));
482  }
483  }
484 
485  where->second->remove_dead_participants();
486  return pubId;
487 }
488 
489 bool
491  const OpenDDS::DCPS::GUID_t& participantId,
492  const OpenDDS::DCPS::GUID_t& topicId,
493  const OpenDDS::DCPS::GUID_t& pubId,
494  const char* pub_str,
495  const DDS::DataWriterQos & qos,
496  const OpenDDS::DCPS::TransportLocatorSeq & transInfo,
497  ACE_CDR::ULong transportContext,
498  const DDS::PublisherQos & publisherQos,
499  const DDS::OctetSeq & serializedTypeInfo,
500  bool associate)
501 {
502  ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, false);
503 
504  // Grab the domain.
505  DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
506 
507  if (where == this->domains_.end()) {
510  ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_publication: ")
511  ACE_TEXT("invalid domain %d.\n"),
512  domainId));
513  }
514 
515  return false;
516  }
517 
518  // Grab the participant.
519  DCPS_IR_Participant* partPtr
520  = where->second->participant(participantId);
521 
522  if (0 == partPtr) {
524  OpenDDS::DCPS::RepoIdConverter converter(pubId);
526  ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_publication: ")
527  ACE_TEXT("invalid participant %C in domain %d.\n"),
528  std::string(converter).c_str(),
529  domainId));
530  }
531 
532  return false;
533  }
534 
535  DCPS_IR_Topic* topic = where->second->find_topic(topicId);
536 
537  if (topic == 0) {
538  OpenDDS::DCPS::RepoIdConverter converter(topicId);
540  ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_publication: ")
541  ACE_TEXT("invalid topic %C in domain %d.\n"),
542  std::string(converter).c_str(),
543  domainId));
544  return false;
545  }
546 
547  /// @TODO: Check if this is already stored. If so, just clear the callback IOR.
548 
549  CORBA::Object_var obj = (dispatchingOrb_ ? dispatchingOrb_ : orb_)->string_to_object(pub_str);
550  if (CORBA::is_nil(obj.in())) {
553  ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_publication: ")
554  ACE_TEXT("failure converting string %C to objref\n"),
555  pub_str));
556  }
557  return false;
558  }
559 
560  OpenDDS::DCPS::DataWriterRemote_var publication = OpenDDS::DCPS::DataWriterRemote::_unchecked_narrow(obj.in());
561 
564  pubId,
565  partPtr,
566  topic,
567  publication.in(),
568  qos,
569  transInfo,
570  transportContext,
571  publisherQos,
572  serializedTypeInfo));
573 
574  DCPS_IR_Publication* pub = pubPtr.get();
575  switch (partPtr->add_publication(move(pubPtr))) {
576  case -1: {
577  OpenDDS::DCPS::RepoIdConverter converter(pubId);
579  ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::add_publication: ")
580  ACE_TEXT("failed to add publication to participant %C.\n"),
581  std::string(converter).c_str()));
582  return false;
583  }
584 
585  case 1:
586  return false;
587  case 0:
588  default:
589  break;
590  }
591 
592  switch (topic->add_publication_reference(pub, associate)) {
593  case -1: {
594  OpenDDS::DCPS::RepoIdConverter converter(pubId);
596  ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::add_publication: ")
597  ACE_TEXT("failed to add publication to participant %C topic list.\n"),
598  std::string(converter).c_str()));
599 
600  // Remove the publication.
601  partPtr->remove_publication(pubId);
602 
603  }
604  return false;
605 
606  case 1: // This is actually a really really bad place to jump to.
607  // This means that we successfully added the new publication
608  // to the participant (it had not been inserted before) but
609  // that we are adding a duplicate publication to the topic
610  // list - which should never ever be able to happen.
611  return false;
612 
613  case 0:
614  default:
615  break;
616  }
617 
618  OpenDDS::DCPS::RepoIdConverter converter(pubId);
619 
620  // See if we are adding a publication that was created within this
621  // repository or a different repository.
622  if (converter.federationId() == federation_.id()) {
623  // Ensure the publication GUID_t values do not conflict.
624  partPtr->last_publication_key(converter.entityKey());
625  }
626 
627  return true;
628 }
629 
631  DDS::DomainId_t domainId,
632  const OpenDDS::DCPS::GUID_t& participantId,
633  const OpenDDS::DCPS::GUID_t& publicationId)
634 {
636 
637  // Grab the domain.
638  DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
639 
640  if (where == this->domains_.end()) {
642  }
643 
644  // Grab the participant.
645  DCPS_IR_Participant* const partPtr = where->second->participant(participantId);
646  if (!partPtr) {
648  }
649 
650  const bool in_cleanup =
651 #ifdef DDS_HAS_MINIMUM_BIT
652  false;
653 #else
655 #endif
656 
657  if (partPtr->remove_publication(publicationId) != 0) {
658  where->second->remove_dead_participants(in_cleanup);
659 
660  // throw exception because the publication was not removed!
662  }
663 
664  where->second->remove_dead_participants(in_cleanup);
665 
666  if (um_ && partPtr->isOwner() && !partPtr->isBitPublisher()) {
667  Update::IdPath path(domainId, participantId, publicationId);
669 
671  OpenDDS::DCPS::RepoIdConverter converter(publicationId);
673  ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::remove_publication: ")
674  ACE_TEXT("pushing deletion of publication %C in domain %d.\n"),
675  std::string(converter).c_str(),
676  domainId));
677  }
678  }
679 }
680 
682  DDS::DomainId_t domainId,
683  const OpenDDS::DCPS::GUID_t& participantId,
684  const OpenDDS::DCPS::GUID_t& topicId,
685  OpenDDS::DCPS::DataReaderRemote_ptr subscription,
686  const DDS::DataReaderQos & qos,
687  const OpenDDS::DCPS::TransportLocatorSeq & transInfo,
688  const DDS::SubscriberQos & subscriberQos,
689  const char* filterClassName,
690  const char* filterExpression,
691  const DDS::StringSeq& exprParams,
692  const DDS::OctetSeq & serializedTypeInfo)
693 {
694  if (CORBA::is_nil(subscription)) {
697  ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_subscription: ")
698  ACE_TEXT("invalid subscription reference.\n")));
699  }
701  }
702 
703  DCPS_IR_Domain* domainPtr;
704  DCPS_IR_Participant* partPtr;
705  DCPS_IR_Topic* topic;
706  OpenDDS::DCPS::GUID_t subId;
708  {
710 
711  // Grab the domain.
712  DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
713 
714  if (where == this->domains_.end()) {
716  }
717 
718  // Grab the domain and participant.
719  domainPtr = where->second.get();
720  partPtr = domainPtr->participant(participantId);
721 
722  if (0 == partPtr) {
724  }
725 
726  topic = where->second->find_topic(topicId);
727 
728  if (topic == 0) {
730  }
731 
732  // Get a Id for the Reader, make it a builtin kind if this is for a BIT
733  subId = partPtr->get_next_subscription_id(
734  OpenDDS::DCPS::RepoIdConverter(topicId).isBuiltinDomainEntity());
735 
736  OpenDDS::DCPS::DataReaderRemote_var dispatchingSubscription (
737  OpenDDS::DCPS::DataReaderRemote::_duplicate(subscription));
738 
739  if (dispatchingOrb_) {
740  // Remarshall the remote reference onto the dispatching orb.
741  CORBA::String_var subStr = orb_->object_to_string(dispatchingSubscription);
742  CORBA::Object_var subObj = dispatchingOrb_->string_to_object(subStr);
743  if (CORBA::is_nil(subObj.in())) {
746  ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_subscription: ")
747  ACE_TEXT("failure marshalling subscription on dispatching orb.\n")));
748  }
750  }
751  dispatchingSubscription = OpenDDS::DCPS::DataReaderRemote::_unchecked_narrow(subObj);
752  }
753 
754  subPtr.reset(
756  subId,
757  partPtr,
758  topic,
759  dispatchingSubscription.in(),
760  qos,
761  transInfo,
763  subscriberQos,
764  filterClassName,
765  filterExpression,
766  exprParams,
767  serializedTypeInfo));
768 
769  // Release lock
770  }
771 
772  DCPS_IR_Subscription* sub = subPtr.get();
773  if (partPtr->add_subscription(move(subPtr)) != 0) {
774  // failed to add. we are responsible for the memory.
776  } else if (topic->add_subscription_reference(sub) != 0) {
777  ACE_ERROR((LM_ERROR, ACE_TEXT("Failed to add subscription to topic list.\n")));
778  // No associations were made so remove and fail.
779  partPtr->remove_subscription(subId);
781  }
782 
783  if (this->um_ && (partPtr->isBitPublisher() == false)) {
784  CORBA::String_var callback = orb_->object_to_string(subscription);
785  Update::ContentSubscriptionInfo csi(filterClassName, filterExpression, exprParams);
786 
787  Update::URActor actor(domainId, subId, topicId, participantId, Update::DataReader
788  , callback.in()
789  , const_cast<DDS::SubscriberQos&>(subscriberQos)
790  , const_cast<DDS::DataReaderQos&>(qos)
791  , const_cast<OpenDDS::DCPS::TransportLocatorSeq&>(transInfo)
793  , const_cast<DDS::OctetSeq&>(serializedTypeInfo));
794 
795  this->um_->create(actor);
796 
798  OpenDDS::DCPS::RepoIdConverter converter(subId);
800  ACE_TEXT("(%P|%t) (GUID_t)TAO_DDS_DCPSInfo_i::add_subscription: ")
801  ACE_TEXT("pushing creation of subscription %C in domain %d.\n"),
802  std::string(converter).c_str(),
803  domainId));
804  }
805  }
806 
807  domainPtr->remove_dead_participants();
808 
809  return subId;
810 }
811 
812 bool
814  DDS::DomainId_t domainId,
815  const OpenDDS::DCPS::GUID_t& participantId,
816  const OpenDDS::DCPS::GUID_t& topicId,
817  const OpenDDS::DCPS::GUID_t& subId,
818  const char* sub_str,
819  const DDS::DataReaderQos & qos,
820  const OpenDDS::DCPS::TransportLocatorSeq & transInfo,
821  ACE_CDR::ULong transportContext,
822  const DDS::SubscriberQos & subscriberQos,
823  const char* filterClassName,
824  const char* filterExpression,
825  const DDS::StringSeq& exprParams,
826  const DDS::OctetSeq & serializedTypeInfo,
827  bool associate)
828 {
829  ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, false);
830 
831  // Grab the domain.
832  DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
833 
834  if (where == this->domains_.end()) {
837  ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_subscription: ")
838  ACE_TEXT("invalid domain %d.\n"),
839  domainId));
840  }
841 
842  return false;
843  }
844 
845  // Grab the participant.
846  DCPS_IR_Participant* partPtr
847  = where->second->participant(participantId);
848 
849  if (0 == partPtr) {
851  OpenDDS::DCPS::RepoIdConverter converter(participantId);
853  ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_subscription: ")
854  ACE_TEXT("invalid participant %C in domain %d.\n"),
855  std::string(converter).c_str(),
856  domainId));
857  }
858 
859  return false;
860  }
861 
862  DCPS_IR_Topic* topic = where->second->find_topic(topicId);
863 
864  if (topic == 0) {
866  OpenDDS::DCPS::RepoIdConverter converter(topicId);
868  ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_subscription: ")
869  ACE_TEXT("invalid topic %C in domain %d.\n"),
870  std::string(converter).c_str(),
871  domainId));
872  }
873 
874  return false;
875  }
876 
877  CORBA::Object_var obj = (dispatchingOrb_ ? dispatchingOrb_ : orb_) ->string_to_object(sub_str);
878  if (CORBA::is_nil(obj.in())) {
881  ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_subscription: ")
882  ACE_TEXT("failure converting string %C to objref\n"),
883  sub_str));
884  }
885  return false;
886  }
887 
888  OpenDDS::DCPS::DataReaderRemote_var subscription = OpenDDS::DCPS::DataReaderRemote::_unchecked_narrow(obj.in());
889 
892  subId,
893  partPtr,
894  topic,
895  subscription.in(),
896  qos,
897  transInfo,
898  transportContext,
899  subscriberQos,
900  filterClassName,
901  filterExpression,
902  exprParams,
903  serializedTypeInfo));
904 
905  DCPS_IR_Subscription* sub = subPtr.get();
906  switch (partPtr->add_subscription(OpenDDS::DCPS::move(subPtr))) {
907  case -1: {
908  OpenDDS::DCPS::RepoIdConverter converter(subId);
910  ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::add_subscription: ")
911  ACE_TEXT("failed to add subscription to participant %C.\n"),
912  std::string(converter).c_str()));
913  return false;
914  }
915 
916  case 1:
917  return false;
918 
919  case 0:
920  default:
921  break;
922  }
923 
924  switch (topic->add_subscription_reference(sub, associate)) {
925  case -1: {
926  OpenDDS::DCPS::RepoIdConverter converter(subId);
928  ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::add_subscription: ")
929  ACE_TEXT("failed to add subscription to participant %C topic list.\n"),
930  std::string(converter).c_str()));
931 
932  // Remove the subscription.
933  partPtr->remove_subscription(subId);
934 
935  }
936  return false;
937 
938  case 1: // This is actually a really really bad place to jump to.
939  // This means that we successfully added the new subscription
940  // to the participant (it had not been inserted before) but
941  // that we are adding a duplicate subscription to the topic
942  // list - which should never ever be able to happen.
943  return false;
944 
945  case 0:
946  default:
947  break;
948  }
949 
950  OpenDDS::DCPS::RepoIdConverter converter(subId);
951 
952  // See if we are adding a subscription that was created within this
953  // repository or a different repository.
954  if (converter.federationId() == federation_.id()) {
955  // Ensure the subscription GUID_t values do not conflict.
956  partPtr->last_subscription_key(converter.entityKey());
957  }
958 
959  return true;
960 }
961 
963  DDS::DomainId_t domainId,
964  const OpenDDS::DCPS::GUID_t& participantId,
965  const OpenDDS::DCPS::GUID_t& subscriptionId)
966 {
968 
969  // Grab the domain.
970  DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
971 
972  if (where == this->domains_.end()) {
974  }
975 
976  // Grab the participant.
977  DCPS_IR_Participant* const partPtr = where->second->participant(participantId);
978  if (!partPtr) {
980  }
981 
982  if (partPtr->remove_subscription(subscriptionId) != 0) {
983  // throw exception because the subscription was not removed!
985  }
986 
987  where->second->remove_dead_participants(
988 #ifdef DDS_HAS_MINIMUM_BIT
989  false
990 #else
992 #endif
993  );
994 
995  if (um_ && partPtr->isOwner() && !partPtr->isBitPublisher()) {
996  Update::IdPath path(domainId, participantId, subscriptionId);
998 
1000  OpenDDS::DCPS::RepoIdConverter converter(subscriptionId);
1002  ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::remove_subscription: ")
1003  ACE_TEXT("pushing deletion of subscription %C in domain %d.\n"),
1004  std::string(converter).c_str(),
1005  domainId));
1006  }
1007  }
1008 }
1009 
1012  const DDS::DomainParticipantQos & qos)
1013 {
1014  // A value to return.
1017  value.federated = this->federation_.overridden();
1018 
1019  ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, value);
1020 
1021  // Grab the domain.
1022  DCPS_IR_Domain* domainPtr = this->domain(domain);
1023 
1024  if (0 == domainPtr) {
1026  }
1027 
1028  // Obtain a shiny new GUID value.
1029  OpenDDS::DCPS::GUID_t participantId = domainPtr->get_next_participant_id();
1030 
1031  // Determine if this is the 'special' repository internal participant
1032  // that publishes the built-in topics for a domain.
1033  bool isBitPart = domainPtr->participants().empty() && TheServiceParticipant->get_BIT();
1034 
1035  DCPS_IR_Participant_rch participant =
1036  OpenDDS::DCPS::make_rch<DCPS_IR_Participant>(
1037  this->federation_,
1038  participantId,
1039  domainPtr,
1040  qos, um_, isBitPart);
1041 
1042  // We created the participant, now we can return the Id value (eventually).
1043  value.id = participantId;
1044 
1045  if (isBitPart) {
1046  participant->isBitPublisher() = true;
1047 
1049  OpenDDS::DCPS::RepoIdConverter converter(participantId);
1051  ACE_TEXT("(%P|%t) (GUID_t)TAO_DDS_DCPSInfo_i::add_domain_participant: ")
1052  ACE_TEXT("participant %C in domain %d is BIT publisher for this domain.\n"),
1053  std::string(converter).c_str(),
1054  domain));
1055  }
1056  }
1057 
1058  // Assume responsibility for writing back to the participant.
1059  participant->takeOwnership();
1060 
1061  int status = domainPtr->add_participant(participant);
1062 
1063  if (0 != status) {
1064  // Adding the participant failed return the invalid
1065  // participant Id number.
1066  participantId = OpenDDS::DCPS::GUID_UNKNOWN;
1067 
1068  } else if (this->um_) {
1069  OpenDDS::DCPS::RepoIdConverter converter(participantId);
1070  if (participant->isBitPublisher() == false) {
1071  // Push this participant to interested observers.
1072  Update::UParticipant updateParticipant(
1073  domain,
1074  participant->owner(),
1075  participantId,
1076  const_cast<DDS::DomainParticipantQos &>(qos));
1077  this->um_->create(updateParticipant);
1078 
1081  ACE_TEXT("(%P|%t) (GUID_t)TAO_DDS_DCPSInfo_i::add_domain_participant: ")
1082  ACE_TEXT("pushing creation of participant %C in domain %d.\n"),
1083  std::string(converter).c_str(),
1084  domain));
1085  }
1086  }
1087 
1088  // Update what the last participant id was
1089  um_->updateLastPartId(converter.participantId());
1090  }
1091 
1093  OpenDDS::DCPS::RepoIdConverter converter(participantId);
1095  ACE_TEXT("(%P|%t) (GUID_t)TAO_DDS_DCPSInfo_i::add_domain_participant: ")
1096  ACE_TEXT("domain %d loaded participant %C at 0x%x.\n"),
1097  domain,
1098  std::string(converter).c_str(),
1099  participant.get()));
1100  }
1101  return value;
1102 }
1103 
1104 bool
1106  , const OpenDDS::DCPS::GUID_t& participantId
1107  , const DDS::DomainParticipantQos & qos)
1108 {
1109  ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, false);
1110 
1111  // Grab the domain.
1112  DCPS_IR_Domain* domainPtr = this->domain(domainId);
1113 
1114  if (0 == domainPtr) {
1117  ACE_TEXT("(%P|%t) WARNING: (bool)TAO_DDS_DCPSInfo_i::add_domain_participant: ")
1118  ACE_TEXT("invalid domain Id: %d\n"),
1119  domainId));
1120  }
1121 
1122  return false;
1123  }
1124 
1125  // Prepare to manipulate the participant's Id value.
1126  OpenDDS::DCPS::RepoIdConverter converter(participantId);
1127 
1128  // Determine if this is the 'special' repository internal participant
1129  // that publishes the built-in topics for a domain.
1130  bool isBitPart = domainPtr->participants().empty() && TheServiceParticipant->get_BIT();
1131 
1132  // Grab the participant.
1133  DCPS_IR_Participant* partPtr = domainPtr->participant(participantId);
1134 
1135  if (0 != partPtr) {
1138  ACE_TEXT("(%P|%t) (bool)TAO_DDS_DCPSInfo_i::add_domain_participant: ")
1139  ACE_TEXT("participant id %C already exists.\n"),
1140  std::string(converter).c_str()));
1141  }
1142 
1143  return false;
1144  }
1145 
1146  DCPS_IR_Participant_rch participant =
1147  OpenDDS::DCPS::make_rch<DCPS_IR_Participant>(this->federation_,
1148  participantId,
1149  domainPtr,
1150  qos, um_, isBitPart);
1151 
1152  switch (domainPtr->add_participant(participant)) {
1153  case -1: {
1155  ACE_TEXT("(%P|%t) ERROR: (bool)TAO_DDS_DCPSInfo_i::add_domain_participant: ")
1156  ACE_TEXT("failed to load participant %C in domain %d.\n"),
1157  std::string(converter).c_str(),
1158  domainId));
1159  }
1160  return false;
1161 
1162  case 1:
1163 
1166  ACE_TEXT("(%P|%t) WARNING: (bool)TAO_DDS_DCPSInfo_i::add_domain_participant: ")
1167  ACE_TEXT("attempt to load duplicate participant %C in domain %d.\n"),
1168  std::string(converter).c_str(),
1169  domainId));
1170  }
1171 
1172  return false;
1173 
1174  case 0:
1175  default:
1176  break;
1177  }
1178 
1179  // See if we are adding a participant that was created within this
1180  // repository or a different repository.
1181  if (converter.federationId() == this->federation_.id()) {
1182  // Ensure the participant GUID values do not conflict.
1183  domainPtr->last_participant_key(converter.participantId());
1184 
1187  ACE_TEXT("(%P|%t) (bool)TAO_DDS_DCPSInfo_i::add_domain_participant: ")
1188  ACE_TEXT("Adjusting highest participant Id value to at least %d.\n"),
1189  converter.participantId()));
1190  }
1191  }
1192 
1195  ACE_TEXT("(%P|%t) (bool)TAO_DDS_DCPSInfo_i::add_domain_participant: ")
1196  ACE_TEXT("loaded participant %C at 0x%x in domain %d.\n"),
1197  std::string(converter).c_str(),
1198  participant.in(),
1199  domainId));
1200  }
1201 
1202  return true;
1203 }
1204 
1205 bool
1208  long owner)
1209 {
1210  ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, false);
1211 
1212  // Grab the domain.
1213  DCPS_IR_Domain_Map::iterator where = this->domains_.find(domain);
1214 
1215  if (where == this->domains_.end()) {
1216  return false;
1217  }
1218 
1219  std::vector<OpenDDS::DCPS::GUID_t> candidates;
1220 
1221  for (DCPS_IR_Participant_Map::const_iterator
1222  current = where->second->participants().begin();
1223  current != where->second->participants().end();
1224  ++current) {
1225  if (current->second->owner() == owner) {
1226  candidates.push_back(current->second->get_id());
1227  }
1228  }
1229 
1232  ACE_TEXT("(%P|%t) (bool)TAO_DDS_DCPSInfo_i::remove_by_owner: ")
1233  ACE_TEXT("%d participants to remove from domain %d.\n"),
1234  candidates.size(),
1235  domain));
1236  }
1237 
1238  bool status = true;
1239 
1240  for (unsigned int index = 0; index < candidates.size(); ++index) {
1241  DCPS_IR_Participant* participant
1242  = where->second->participant(candidates[index]);
1243  if (participant) {
1244  std::vector<OpenDDS::DCPS::GUID_t> keylist;
1245 
1246  // Remove Subscriptions
1247  for (DCPS_IR_Subscription_Map::const_iterator
1248  current = participant->subscriptions().begin();
1249  current != participant->subscriptions().end();
1250  ++current) {
1251  keylist.push_back(current->second->get_id());
1252  }
1253 
1255  OpenDDS::DCPS::RepoIdConverter converter(candidates[index]);
1257  ACE_TEXT("(%P|%t) (bool)TAO_DDS_DCPSInfo_i::remove_by_owner: ")
1258  ACE_TEXT("%d subscriptions to remove from participant %C.\n"),
1259  keylist.size(),
1260  std::string(converter).c_str()));
1261  }
1262 
1263  for (unsigned int key = 0; key < keylist.size(); ++key) {
1264  if (participant->remove_subscription(keylist[key]) != 0) {
1265  status = false;
1266  }
1267  }
1268 
1269  // Remove Publications
1270  keylist.clear();
1271 
1272  for (DCPS_IR_Publication_Map::const_iterator
1273  current = participant->publications().begin();
1274  current != participant->publications().end();
1275  ++current) {
1276  keylist.push_back(current->second->get_id());
1277  }
1278 
1280  OpenDDS::DCPS::RepoIdConverter converter(candidates[index]);
1282  ACE_TEXT("(%P|%t) (bool)TAO_DDS_DCPSInfo_i::remove_by_owner: ")
1283  ACE_TEXT("%d publications to remove from participant %C.\n"),
1284  keylist.size(),
1285  std::string(converter).c_str()));
1286  }
1287 
1288  for (unsigned int key = 0; key < keylist.size(); ++key) {
1289  if (participant->remove_publication(keylist[key]) != 0) {
1290  status = false;
1291  }
1292  }
1293 
1294  // Remove Topics
1295  keylist.clear();
1296 
1297  for (DCPS_IR_Topic_Map::const_iterator
1298  current = participant->topics().begin();
1299  current != participant->topics().end();
1300  ++current) {
1301  keylist.push_back(current->second->get_id());
1302  }
1303 
1305  OpenDDS::DCPS::RepoIdConverter converter(candidates[index]);
1307  ACE_TEXT("(%P|%t) (bool)TAO_DDS_DCPSInfo_i::remove_by_owner: ")
1308  ACE_TEXT("%d topics to remove from participant %C.\n"),
1309  keylist.size(),
1310  std::string(converter).c_str()));
1311  }
1312 
1313  for (unsigned int key = 0; key < keylist.size(); ++key) {
1314  DCPS_IR_Topic* discard;
1315 
1316  if (participant->remove_topic_reference(keylist[key], discard) != 0) {
1317  status = false;
1318  }
1319  }
1320  }
1321 
1322  // Remove Participant
1323  this->remove_domain_participant(domain, candidates[ index]);
1324  }
1325 
1326  return status;
1327 }
1328 
1329 void
1331  DDS::DomainId_t domainId,
1332  const OpenDDS::DCPS::GUID_t& local_id,
1333  const OpenDDS::DCPS::GUID_t& remote_id)
1334 {
1336 
1337  DCPS_IR_Domain_Map::iterator it(this->domains_.find(domainId));
1338  if (it == this->domains_.end()) {
1340  }
1341 
1342  DCPS_IR_Participant* participant = it->second->participant(local_id);
1343  if (participant == 0) {
1345  }
1346 
1347  // Disassociate from participant temporarily:
1348  const DCPS_IR_Subscription_Map& subscriptions = participant->subscriptions();
1349  for (DCPS_IR_Subscription_Map::const_iterator sub(subscriptions.begin());
1350  sub != subscriptions.end(); ++sub) {
1351  sub->second->disassociate_participant(remote_id, true);
1352  }
1353 
1354  const DCPS_IR_Publication_Map& publications = participant->publications();
1355  for (DCPS_IR_Publication_Map::const_iterator pub(publications.begin());
1356  pub != publications.end(); ++pub) {
1357  pub->second->disassociate_participant(remote_id, true);
1358  }
1359 
1360  it->second->remove_dead_participants();
1361 }
1362 
1363 void
1365  DDS::DomainId_t domainId,
1366  const OpenDDS::DCPS::GUID_t& participantId,
1367  const OpenDDS::DCPS::GUID_t& local_id,
1368  const OpenDDS::DCPS::GUID_t& remote_id)
1369 {
1371 
1372  DCPS_IR_Domain_Map::iterator it(this->domains_.find(domainId));
1373  if (it == this->domains_.end()) {
1375  }
1376 
1377  DCPS_IR_Participant* participant = it->second->participant(participantId);
1378  if (participant == 0) {
1380  }
1381 
1383  ACE_DEBUG((LM_INFO, "(%P|%t) disassociating subscription\n"));
1384  }
1385 
1386  DCPS_IR_Subscription* subscription;
1387  if (participant->find_subscription_reference(local_id, subscription)
1388  != 0 || subscription == 0) {
1389  OpenDDS::DCPS::RepoIdConverter part_converter(participantId);
1390  OpenDDS::DCPS::RepoIdConverter sub_converter(local_id);
1392  ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::disassociate_subscription: ")
1393  ACE_TEXT("participant %C could not find subscription %C.\n"),
1394  std::string(part_converter).c_str(),
1395  std::string(sub_converter).c_str()));
1397  }
1398 
1399  // Disassociate from publication temporarily:
1400  subscription->disassociate_publication(remote_id, true);
1401 
1402  it->second->remove_dead_participants();
1403 }
1404 
1405 void
1407  DDS::DomainId_t domainId,
1408  const OpenDDS::DCPS::GUID_t& participantId,
1409  const OpenDDS::DCPS::GUID_t& local_id,
1410  const OpenDDS::DCPS::GUID_t& remote_id)
1411 {
1413 
1414  DCPS_IR_Domain_Map::iterator it(this->domains_.find(domainId));
1415  if (it == this->domains_.end()) {
1417  }
1418 
1419  DCPS_IR_Participant* participant = it->second->participant(participantId);
1420  if (participant == 0) {
1422  }
1423 
1425  ACE_DEBUG((LM_INFO, "(%P|%t) disassociating publication\n"));
1426  }
1427 
1428  DCPS_IR_Publication* publication;
1429  if (participant->find_publication_reference(local_id, publication)
1430  != 0 || publication == 0) {
1431  OpenDDS::DCPS::RepoIdConverter part_converter(participantId);
1432  OpenDDS::DCPS::RepoIdConverter pub_converter(local_id);
1434  ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::disassociate_publication: ")
1435  ACE_TEXT("participant %C could not find publication %C.\n"),
1436  std::string(part_converter).c_str(),
1437  std::string(pub_converter).c_str()));
1439  }
1440 
1441  // Disassociate from subscription temporarily:
1442  publication->disassociate_subscription(remote_id, true);
1443 
1444  it->second->remove_dead_participants();
1445 }
1446 
1448  DDS::DomainId_t domainId,
1449  const OpenDDS::DCPS::GUID_t& participantId)
1450 {
1452 
1453  // Grab the domain.
1454  DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
1455 
1456  if (where == this->domains_.end()) {
1458  }
1459 
1460  DCPS_IR_Participant_rch participant = where->second->participant_rch(participantId);
1461  if (!participant) {
1462  OpenDDS::DCPS::RepoIdConverter converter(participantId);
1464  ACE_TEXT("(%P|%t) ERROR: (bool)TAO_DDS_DCPSInfo_i::remove_domain_participant: ")
1465  ACE_TEXT("failed to locate participant %C in domain %d.\n"),
1466  std::string(converter).c_str(),
1467  domainId));
1469  }
1470 
1471  // Determine if we should propagate this event; we need to cache this
1472  // result as the participant will be gone by the time we use the result.
1473  bool sendUpdate = participant->isOwner() && !participant->isBitPublisher();
1474 
1475  CORBA::Boolean dont_notify_lost = 0;
1476  int status = where->second->remove_participant(participantId, dont_notify_lost);
1477 
1478  if (0 != status) {
1479  // Removing the participant failed
1481  }
1482 
1483  // Update any concerned observers that the participant was destroyed.
1484  if (this->um_ && sendUpdate) {
1485  Update::IdPath path(
1486  where->second->get_id(),
1487  participantId,
1488  participantId);
1489  this->um_->destroy(path, Update::Participant);
1490 
1492  OpenDDS::DCPS::RepoIdConverter converter(participantId);
1494  ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::remove_domain_participant: ")
1495  ACE_TEXT("pushing deletion of participant %C in domain %d.\n"),
1496  std::string(converter).c_str(),
1497  domainId));
1498  }
1499  }
1500 
1501  if (where->second->participants().empty()
1502 #ifndef DDS_HAS_MINIMUM_BIT
1503  && !(participant->isOwner() && participant->isBitPublisher() && in_cleanup_all_built_in_topics_)
1504  // If this is false, we're running as part of cleanup_all_built_in_topics
1505  // and we can't remove the domain because we would invalid the iterator
1506  // we're using in cleanup_all_built_in_topics. cleanup_all_built_in_topics
1507  // will clear the domains once it's done.
1508 #endif
1509  ) {
1510  domains_.erase(where);
1511  }
1512 #ifndef DDS_HAS_MINIMUM_BIT
1513  else if (where->second->useBIT() &&
1514  where->second->participants().size() == 1) {
1515  // The only participant left is the one we created to publish BITs.
1516  // It can be removed now since no user participants exist in this domain,
1517  // but it has to be removed on the Service Participant's reactor thread
1518  // in order to make the locking work properly in delete_participant().
1519  BIT_Cleanup_Handler* eh_impl = new BIT_Cleanup_Handler(this, domainId);
1520  const ACE_Event_Handler_var eh = eh_impl;
1521  TheServiceParticipant->reactor()->notify(eh.handler());
1522 
1523  // Wait for that to be finished
1526  OpenDDS::DCPS::ThreadStatusManager& thread_status_manager = TheServiceParticipant->get_thread_status_manager();
1527  while (status == CvStatus_NoTimeout && !eh_impl->done_) {
1528  status = eh_impl->cv_.wait(thread_status_manager);
1529  }
1530  }
1531 #endif
1532 }
1533 
1534 #ifndef DDS_HAS_MINIMUM_BIT
1536 {
1537  ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, parent_->lock_, 0);
1538 
1539  const DCPS_IR_Domain_Map::iterator where = parent_->domains_.find(domain_);
1540 
1541  if (where != parent_->domains_.end() && where->second->participants().size() == 1) {
1542  where->second->cleanup_built_in_topics();
1543  }
1544 
1545  done_ = true;
1546  cv_.notify_all();
1547 
1548  return 0;
1549 }
1550 #endif
1551 
1553  DDS::DomainId_t domainId,
1554  const OpenDDS::DCPS::GUID_t& myParticipantId,
1555  const OpenDDS::DCPS::GUID_t& ignoreId)
1556 {
1558 
1559  // Grab the domain.
1560  DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
1561 
1562  if (where == this->domains_.end()) {
1564  }
1565 
1566  // Grab the participant.
1567  DCPS_IR_Participant* partPtr
1568  = where->second->participant(myParticipantId);
1569 
1570  if (0 == partPtr) {
1572  }
1573 
1574  partPtr->ignore_participant(ignoreId);
1575 
1576  where->second->remove_dead_participants();
1577 }
1578 
1580  DDS::DomainId_t domainId,
1581  const OpenDDS::DCPS::GUID_t& myParticipantId,
1582  const OpenDDS::DCPS::GUID_t& ignoreId)
1583 {
1585 
1586  // Grab the domain.
1587  DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
1588 
1589  if (where == this->domains_.end()) {
1591  }
1592 
1593  // Grab the participant.
1594  DCPS_IR_Participant* partPtr
1595  = where->second->participant(myParticipantId);
1596 
1597  if (0 == partPtr) {
1599  }
1600 
1601  partPtr->ignore_topic(ignoreId);
1602 
1603  where->second->remove_dead_participants();
1604 }
1605 
1607  DDS::DomainId_t domainId,
1608  const OpenDDS::DCPS::GUID_t& myParticipantId,
1609  const OpenDDS::DCPS::GUID_t& ignoreId)
1610 {
1612 
1613  // Grab the domain.
1614  DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
1615 
1616  if (where == this->domains_.end()) {
1618  }
1619 
1620  // Grab the participant.
1621  DCPS_IR_Participant* partPtr
1622  = where->second->participant(myParticipantId);
1623 
1624  if (0 == partPtr) {
1626  }
1627 
1628  partPtr->ignore_subscription(ignoreId);
1629 
1630  where->second->remove_dead_participants();
1631 }
1632 
1634  DDS::DomainId_t domainId,
1635  const OpenDDS::DCPS::GUID_t& myParticipantId,
1636  const OpenDDS::DCPS::GUID_t& ignoreId)
1637 {
1639 
1640  // Grab the domain.
1641  DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
1642 
1643  if (where == this->domains_.end()) {
1645  }
1646 
1647  // Grab the participant.
1648  DCPS_IR_Participant* partPtr
1649  = where->second->participant(myParticipantId);
1650 
1651  if (0 == partPtr) {
1653  }
1654 
1655  partPtr->ignore_publication(ignoreId);
1656 
1657  where->second->remove_dead_participants();
1658 }
1659 
1661  DDS::DomainId_t domainId,
1662  const OpenDDS::DCPS::GUID_t& partId,
1663  const OpenDDS::DCPS::GUID_t& dwId,
1664  const DDS::DataWriterQos & qos,
1665  const DDS::PublisherQos & publisherQos)
1666 {
1668 
1669  // Grab the domain.
1670  DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
1671 
1672  if (where == this->domains_.end()) {
1674  }
1675 
1676  // Grab the participant.
1677  DCPS_IR_Participant* partPtr
1678  = where->second->participant(partId);
1679 
1680  if (0 == partPtr) {
1682  }
1683 
1685  ACE_DEBUG((LM_INFO, "(%P|%t) updating publication qos 1\n"));
1686  }
1687 
1688  DCPS_IR_Publication* pub;
1689 
1690  if (partPtr->find_publication_reference(dwId, pub) != 0 || pub == 0) {
1691  OpenDDS::DCPS::RepoIdConverter part_converter(partId);
1692  OpenDDS::DCPS::RepoIdConverter pub_converter(dwId);
1694  ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::update_publication_qos: ")
1695  ACE_TEXT("participant %C could not find publication %C.\n"),
1696  std::string(part_converter).c_str(),
1697  std::string(pub_converter).c_str()));
1699  }
1700 
1701  Update::SpecificQos qosType;
1702 
1703  if (pub->set_qos(qos, publisherQos, qosType) == false) // failed
1704  return 0;
1705 
1706  if (this->um_ && (partPtr->isBitPublisher() == false)) {
1707  Update::IdPath path(domainId, partId, dwId);
1708 
1709  switch (qosType) {
1710  case Update::DataWriterQos:
1711  this->um_->update(path, qos);
1712  break;
1713 
1714  case Update::PublisherQos:
1715  this->um_->update(path, publisherQos);
1716  break;
1717 
1718  case Update::NoQos:
1719  default:
1720  break;
1721  }
1722 
1724  OpenDDS::DCPS::RepoIdConverter converter(dwId);
1726  ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::update_publication_qos: ")
1727  ACE_TEXT("pushing update of publication %C in domain %d.\n"),
1728  std::string(converter).c_str(),
1729  domainId));
1730  }
1731  }
1732 
1733  return 1;
1734 }
1735 
1736 void
1738  DDS::DomainId_t domainId,
1739  const OpenDDS::DCPS::GUID_t& partId,
1740  const OpenDDS::DCPS::GUID_t& dwId,
1741  const DDS::DataWriterQos& qos)
1742 {
1744 
1745  // Grab the domain.
1746  DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
1747 
1748  if (where == this->domains_.end()) {
1750  }
1751 
1752  // Grab the participant.
1753  DCPS_IR_Participant* partPtr
1754  = where->second->participant(partId);
1755 
1756  if (0 == partPtr) {
1758  }
1759 
1761  ACE_DEBUG((LM_INFO, "(%P|%t) updating publication qos 2\n"));
1762  }
1763 
1764  DCPS_IR_Publication* pub;
1765 
1766  if (partPtr->find_publication_reference(dwId, pub) != 0 || pub == 0) {
1767  OpenDDS::DCPS::RepoIdConverter part_converter(partId);
1768  OpenDDS::DCPS::RepoIdConverter pub_converter(dwId);
1770  ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::update_publication_qos: ")
1771  ACE_TEXT("participant %C could not find publication %C.\n"),
1772  std::string(part_converter).c_str(),
1773  std::string(pub_converter).c_str()));
1775  }
1776 
1777  pub->set_qos(qos);
1778 }
1779 
1780 void
1782  DDS::DomainId_t domainId,
1783  const OpenDDS::DCPS::GUID_t& partId,
1784  const OpenDDS::DCPS::GUID_t& dwId,
1785  const DDS::PublisherQos& qos)
1786 {
1788 
1789  // Grab the domain.
1790  DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
1791 
1792  if (where == this->domains_.end()) {
1794  }
1795 
1796  // Grab the participant.
1797  DCPS_IR_Participant* partPtr
1798  = where->second->participant(partId);
1799 
1800  if (0 == partPtr) {
1802  }
1803 
1805  ACE_DEBUG((LM_INFO, "(%P|%t) updating publication qos 3\n"));
1806  }
1807 
1808  DCPS_IR_Publication* pub;
1809 
1810  if (partPtr->find_publication_reference(dwId, pub) != 0 || pub == 0) {
1811  OpenDDS::DCPS::RepoIdConverter part_converter(partId);
1812  OpenDDS::DCPS::RepoIdConverter pub_converter(dwId);
1814  ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::update_publication_qos: ")
1815  ACE_TEXT("participant %C could not find publication %C.\n"),
1816  std::string(part_converter).c_str(),
1817  std::string(pub_converter).c_str()));
1819  }
1820 
1821  pub->set_qos(qos);
1822 }
1823 
1825  DDS::DomainId_t domainId,
1826  const OpenDDS::DCPS::GUID_t& partId,
1827  const OpenDDS::DCPS::GUID_t& drId,
1828  const DDS::DataReaderQos & qos,
1829  const DDS::SubscriberQos & subscriberQos)
1830 {
1832 
1833  // Grab the domain.
1834  DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
1835 
1836  if (where == this->domains_.end()) {
1838  }
1839 
1840  // Grab the participant.
1841  DCPS_IR_Participant* partPtr
1842  = where->second->participant(partId);
1843 
1844  if (0 == partPtr) {
1846  }
1847 
1848  DCPS_IR_Subscription* sub;
1849 
1851  ACE_DEBUG((LM_INFO, "(%P|%t) updating QOS for subscription 1\n"));
1852  }
1853 
1854  if (partPtr->find_subscription_reference(drId, sub) != 0 || sub == 0) {
1855  OpenDDS::DCPS::RepoIdConverter part_converter(partId);
1856  OpenDDS::DCPS::RepoIdConverter sub_converter(drId);
1858  ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::update_subscription_qos: ")
1859  ACE_TEXT("participant %C could not find subscription %C.\n"),
1860  std::string(part_converter).c_str(),
1861  std::string(sub_converter).c_str()));
1863  }
1864 
1865  Update::SpecificQos qosType;
1866 
1867  if (sub->set_qos(qos, subscriberQos, qosType) == false) // failed
1868  return 0;
1869 
1870  if (this->um_ && (partPtr->isBitPublisher() == false)) {
1871  Update::IdPath path(domainId, partId, drId);
1872 
1873  switch (qosType) {
1874  case Update::DataReaderQos:
1875  this->um_->update(path, qos);
1876  break;
1877 
1878  case Update::SubscriberQos:
1879  this->um_->update(path, subscriberQos);
1880  break;
1881 
1882  case Update::NoQos:
1883  default:
1884  break;
1885  }
1886 
1888  OpenDDS::DCPS::RepoIdConverter converter(drId);
1890  ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::update_subscription_qos: ")
1891  ACE_TEXT("pushing update of subscription %C in domain %d.\n"),
1892  std::string(converter).c_str(),
1893  domainId));
1894  }
1895  }
1896 
1897  return 1;
1898 }
1899 
1900 void
1902  DDS::DomainId_t domainId,
1903  const OpenDDS::DCPS::GUID_t& partId,
1904  const OpenDDS::DCPS::GUID_t& drId,
1905  const DDS::DataReaderQos& qos)
1906 {
1908 
1909  // Grab the domain.
1910  DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
1911 
1912  if (where == this->domains_.end()) {
1914  }
1915 
1916  // Grab the participant.
1917  DCPS_IR_Participant* partPtr
1918  = where->second->participant(partId);
1919 
1920  if (0 == partPtr) {
1922  }
1923 
1924  DCPS_IR_Subscription* sub;
1925 
1927  ACE_DEBUG((LM_INFO, "(%P|%t) updating QOS for subscription 2\n"));
1928  }
1929 
1930  if (partPtr->find_subscription_reference(drId, sub) != 0 || sub == 0) {
1931  OpenDDS::DCPS::RepoIdConverter part_converter(partId);
1932  OpenDDS::DCPS::RepoIdConverter sub_converter(drId);
1934  ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::update_subscription_qos: ")
1935  ACE_TEXT("participant %C could not find subscription %C.\n"),
1936  std::string(part_converter).c_str(),
1937  std::string(sub_converter).c_str()));
1939  }
1940 
1941  sub->set_qos(qos);
1942 }
1943 
1944 void
1946  DDS::DomainId_t domainId,
1947  const OpenDDS::DCPS::GUID_t& partId,
1948  const OpenDDS::DCPS::GUID_t& drId,
1949  const DDS::SubscriberQos& qos)
1950 {
1952 
1953  // Grab the domain.
1954  DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
1955 
1956  if (where == this->domains_.end()) {
1958  }
1959 
1960  // Grab the participant.
1961  DCPS_IR_Participant* partPtr
1962  = where->second->participant(partId);
1963 
1964  if (0 == partPtr) {
1966  }
1967 
1968  DCPS_IR_Subscription* sub;
1969 
1971  ACE_DEBUG((LM_INFO, "(%P|%t) updating QOS for subscription 3\n"));
1972  }
1973 
1974  if (partPtr->find_subscription_reference(drId, sub) != 0 || sub == 0) {
1975  OpenDDS::DCPS::RepoIdConverter part_converter(partId);
1976  OpenDDS::DCPS::RepoIdConverter sub_converter(drId);
1978  ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::update_subscription_qos: ")
1979  ACE_TEXT("participant %C could not find subscription %C.\n"),
1980  std::string(part_converter).c_str(),
1981  std::string(sub_converter).c_str()));
1983  }
1984 
1985  sub->set_qos(qos);
1986 }
1987 
1990  DDS::DomainId_t domainId,
1991  const OpenDDS::DCPS::GUID_t& participantId,
1992  const OpenDDS::DCPS::GUID_t& subscriptionId,
1993  const DDS::StringSeq& params)
1994 {
1996 
1997  DCPS_IR_Domain_Map::iterator domain = this->domains_.find(domainId);
1998  if (domain == this->domains_.end()) {
2000  }
2001 
2002  DCPS_IR_Participant* partPtr = domain->second->participant(participantId);
2003  if (0 == partPtr) {
2005  }
2006 
2008  ACE_DEBUG((LM_INFO, "(%P|%t) updating subscription params\n"));
2009  }
2010 
2011  DCPS_IR_Subscription* sub;
2012  if (partPtr->find_subscription_reference(subscriptionId, sub) != 0) {
2013  OpenDDS::DCPS::RepoIdConverter part_converter(participantId);
2014  OpenDDS::DCPS::RepoIdConverter sub_converter(subscriptionId);
2016  ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::update_subscription_params: ")
2017  ACE_TEXT("participant %C could not find subscription %C.\n"),
2018  std::string(part_converter).c_str(),
2019  std::string(sub_converter).c_str()));
2021  }
2022 
2023  sub->update_expr_params(params); // calls writers via DataWriterRemote
2024 
2025  if (this->um_ && !partPtr->isBitPublisher()) {
2026  Update::IdPath path(domainId, participantId, subscriptionId);
2027  this->um_->update(path, params);
2028  }
2029 
2030  return true;
2031 }
2032 
2034  const OpenDDS::DCPS::GUID_t& topicId,
2035  DDS::DomainId_t domainId,
2036  const OpenDDS::DCPS::GUID_t& participantId,
2037  const DDS::TopicQos & qos)
2038 {
2040 
2041  // Grab the domain.
2042  DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
2043 
2044  if (where == this->domains_.end()) {
2046  }
2047 
2048  // Grab the participant.
2049  DCPS_IR_Participant* partPtr
2050  = where->second->participant(participantId);
2051 
2052  if (0 == partPtr) {
2054  }
2055 
2056  DCPS_IR_Topic* topic;
2057 
2058  if (partPtr->find_topic_reference(topicId, topic) != 0) {
2060  }
2061 
2062  if (topic->set_topic_qos(qos) == false)
2063  return 0;
2064 
2065  if (this->um_
2066  && (partPtr->isOwner() == true)
2067  && (partPtr->isBitPublisher() == false)) {
2068  Update::IdPath path(domainId, participantId, topicId);
2069  this->um_->update(path, qos);
2070 
2072  OpenDDS::DCPS::RepoIdConverter converter(topicId);
2074  ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::update_topic_qos: ")
2075  ACE_TEXT("pushing update of topic %C in domain %d.\n"),
2076  std::string(converter).c_str(),
2077  domainId));
2078  }
2079  }
2080 
2081  return 1;
2082 }
2083 
2085  DDS::DomainId_t domainId,
2086  const OpenDDS::DCPS::GUID_t& participantId,
2087  const DDS::DomainParticipantQos & qos)
2088 {
2090 
2091  // Grab the domain.
2092  DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
2093 
2094  if (where == this->domains_.end()) {
2096  }
2097 
2098  // Grab the participant.
2099  DCPS_IR_Participant* partPtr
2100  = where->second->participant(participantId);
2101 
2102  if (0 == partPtr) {
2104  }
2105 
2106  if (partPtr->set_qos(qos) == false)
2107  return 0;
2108 
2109  if (this->um_
2110  && (partPtr->isOwner() == true)
2111  && (partPtr->isBitPublisher() == false)) {
2112  Update::IdPath path(domainId, participantId, participantId);
2113  this->um_->update(path, qos);
2114 
2116  OpenDDS::DCPS::RepoIdConverter converter(participantId);
2118  ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::update_domain_participant_qos: ")
2119  ACE_TEXT("pushing update of participant %C in domain %d.\n"),
2120  std::string(converter).c_str(),
2121  domainId));
2122  }
2123  }
2124 
2125  return 1;
2126 }
2127 
2130 {
2133  ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::domain: ")
2134  ACE_TEXT("ANY_DOMAIN not supported for operations.\n")));
2135  return 0;
2136  }
2137 
2138  // Check if the domain is already in the map.
2139  DCPS_IR_Domain_Map::iterator where = this->domains_.find(domain);
2140 
2141  if (where == this->domains_.end()) {
2142  // We will attempt to insert a new domain, go ahead and allocate it.
2144  DCPS_IR_Domain(domain, this->participantIdGenerator_));
2145 
2146  DCPS_IR_Domain* domainPtr = domain_uptr.get();
2147 
2148  // We need to insert the domain into the map at this time since it
2149  // might be looked up during the init_built_in_topics() call.
2150  this->domains_.insert(
2151  where,
2152  DCPS_IR_Domain_Map::value_type(domain, OpenDDS::DCPS::move(domain_uptr)));
2153 
2154 #ifndef DDS_HAS_MINIMUM_BIT
2155  if (TheServiceParticipant->get_BIT() && !domainPtr->useBIT() &&
2157  ) {
2159  ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::domain: ")
2160  ACE_TEXT("failed to initialize the Built-In Topics ")
2161  ACE_TEXT("when loading domain %d.\n"),
2162  domain));
2163  this->domains_.erase(domain);
2164  return 0;
2165  }
2166 #endif
2167 
2170  ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::domain: ")
2171  ACE_TEXT("successfully loaded domain %d at %x.\n"),
2172  domain,
2173  domainPtr));
2174  }
2175  return domainPtr;
2176 
2177  } else {
2178  return where->second.get();
2179  }
2180 }
2181 
2182 int TAO_DDS_DCPSInfo_i::init_transport(int listen_address_given,
2183  const char* listen_str)
2184 {
2185  int status = 0;
2186 
2187 #ifndef DDS_HAS_MINIMUM_BIT
2188  try {
2189 
2190 #ifndef ACE_AS_STATIC_LIBS
2191  if (ACE_Service_Config::current()->find(ACE_TEXT("OpenDDS_Tcp"))
2192  < 0 /* not found (-1) or suspended (-2) */) {
2193  static const ACE_TCHAR directive[] =
2194  ACE_TEXT("dynamic OpenDDS_Tcp Service_Object * ")
2195  ACE_TEXT("OpenDDS_Tcp:_make_TcpLoader()");
2197  }
2198 #endif
2199 
2200  const std::string config_name =
2202  + std::string("InfoRepoBITTransportConfig");
2205 
2206  const std::string inst_name =
2208  + std::string("InfoRepoBITTCPTransportInst");
2211  "tcp");
2212  config->instances_.push_back(inst);
2213 
2214  OpenDDS::DCPS::TcpInst_rch tcp_inst =
2216  inst->datalink_release_delay_ = 0;
2217 
2218  tcp_inst->conn_retry_attempts_ = 0;
2219 
2220  if (listen_address_given) {
2221  tcp_inst->local_address(listen_str);
2222  }
2223 
2224  } catch (...) {
2225  // TransportRegistry is extremely varied in the exceptions that
2226  // it throws on failure; do not allow exceptions to bubble up
2227  // beyond this point.
2228  status = 1;
2229  }
2230 #else
2231  ACE_UNUSED_ARG(listen_address_given);
2232  ACE_UNUSED_ARG(listen_str);
2233 #endif
2234 
2235  return status;
2236 }
2237 
2238 bool
2240 {
2243  ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::receive_image: ")
2244  ACE_TEXT("processing persistent data.\n")));
2245  }
2246 
2247  // Initialize builtin topics first so that they always have the same IDs
2248 #ifndef DDS_HAS_MINIMUM_BIT
2249  if (TheServiceParticipant->get_BIT()) {
2250  for (Update::UImage::ParticipantSeq::const_iterator
2251  iter = image.participants.begin();
2252  iter != image.participants.end(); iter++) {
2253  const Update::UParticipant* part = *iter;
2254  if (!domain(part->domainId)) {
2257  ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i::receive_image: ")
2258  ACE_TEXT("invalid domain Id: %d\n"),
2259  part->domainId));
2260  }
2261  return false;
2262  }
2263  }
2264  }
2265 #endif
2266 
2267  // Ensure that new non-BIT participants do not reuse an id
2269 
2270  for (Update::UImage::ParticipantSeq::const_iterator
2271  iter = image.participants.begin();
2272  iter != image.participants.end(); iter++) {
2273  const Update::UParticipant* part = *iter;
2274 
2275  if (!this->add_domain_participant(part->domainId, part->participantId
2276  , part->participantQos)) {
2279  ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::receive_image: ")
2280  ACE_TEXT("failed to add participant %C to domain %d.\n"),
2281  std::string(converter).c_str(),
2282  part->domainId));
2283  return false;
2284 
2285  } else if (OpenDDS::DCPS::DCPS_debug_level > 0) {
2288  ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::receive_image: ")
2289  ACE_TEXT("added participant %C to domain %d.\n"),
2290  std::string(converter).c_str(),
2291  part->domainId));
2292  }
2293  }
2294 
2295  for (Update::UImage::TopicSeq::const_iterator iter = image.topics.begin();
2296  iter != image.topics.end(); iter++) {
2297  const Update::UTopic* topic = *iter;
2298 
2299  if (!this->add_topic(topic->topicId, topic->domainId
2300  , topic->participantId, topic->name.c_str()
2301  , topic->dataType.c_str(), topic->topicQos)) {
2302  OpenDDS::DCPS::RepoIdConverter topic_converter(topic->topicId);
2303  OpenDDS::DCPS::RepoIdConverter part_converter(topic->participantId);
2305  ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::receive_image: ")
2306  ACE_TEXT("failed to add topic %C to participant %C.\n"),
2307  std::string(topic_converter).c_str(),
2308  std::string(part_converter).c_str()));
2309  return false;
2310 
2311  } else if (OpenDDS::DCPS::DCPS_debug_level > 0) {
2312  OpenDDS::DCPS::RepoIdConverter topic_converter(topic->topicId);
2313  OpenDDS::DCPS::RepoIdConverter part_converter(topic->participantId);
2315  ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::receive_image: ")
2316  ACE_TEXT("added topic %C to participant %C.\n"),
2317  std::string(topic_converter).c_str(),
2318  std::string(part_converter).c_str()));
2319  }
2320  }
2321 
2322  for (Update::UImage::ReaderSeq::const_iterator iter = image.actors.begin();
2323  iter != image.actors.end(); iter++) {
2324  const Update::URActor* sub = *iter;
2325  // no reason to associate, there are no publishers yet to associate with
2326  if (!this->add_subscription(sub->domainId, sub->participantId
2327  , sub->topicId, sub->actorId
2328  , sub->callback.c_str(), sub->drdwQos
2329  , sub->transportInterfaceInfo
2330  , sub->transportContext
2331  , sub->pubsubQos
2332  , sub->contentSubscriptionProfile.filterClassName
2333  , sub->contentSubscriptionProfile.filterExpr
2334  , sub->contentSubscriptionProfile.exprParams
2335  , sub->serializedTypeInfo)) {
2336  OpenDDS::DCPS::RepoIdConverter sub_converter(sub->actorId);
2337  OpenDDS::DCPS::RepoIdConverter part_converter(sub->participantId);
2339  ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::receive_image: ")
2340  ACE_TEXT("failed to add subscription %C to participant %C.\n"),
2341  std::string(sub_converter).c_str(),
2342  std::string(part_converter).c_str()));
2343  return false;
2344 
2345  } else if (OpenDDS::DCPS::DCPS_debug_level > 0) {
2346  OpenDDS::DCPS::RepoIdConverter sub_converter(sub->actorId);
2347  OpenDDS::DCPS::RepoIdConverter part_converter(sub->participantId);
2349  ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::receive_image: ")
2350  ACE_TEXT("added subscription %C to participant %C.\n"),
2351  std::string(sub_converter).c_str(),
2352  std::string(part_converter).c_str()));
2353  }
2354  }
2355 
2356  for (Update::UImage::WriterSeq::const_iterator iter = image.wActors.begin();
2357  iter != image.wActors.end(); iter++) {
2358  const Update::UWActor* pub = *iter;
2359 
2360  // try to associate with any persisted subscriptions to track any expected
2361  // existing associations
2362  if (!this->add_publication(pub->domainId, pub->participantId
2363  , pub->topicId, pub->actorId
2364  , pub->callback.c_str() , pub->drdwQos
2366  , pub->pubsubQos
2367  , pub->serializedTypeInfo
2368  , true)) {
2369  OpenDDS::DCPS::RepoIdConverter pub_converter(pub->actorId);
2370  OpenDDS::DCPS::RepoIdConverter part_converter(pub->participantId);
2372  ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::receive_image: ")
2373  ACE_TEXT("failed to add publication %C to participant %C.\n"),
2374  std::string(pub_converter).c_str(),
2375  std::string(part_converter).c_str()));
2376  return false;
2377 
2378  } else if (OpenDDS::DCPS::DCPS_debug_level > 0) {
2379  OpenDDS::DCPS::RepoIdConverter pub_converter(pub->actorId);
2380  OpenDDS::DCPS::RepoIdConverter part_converter(pub->participantId);
2382  ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::receive_image: ")
2383  ACE_TEXT("added publication %C to participant %C.\n"),
2384  std::string(pub_converter).c_str(),
2385  std::string(part_converter).c_str()));
2386  }
2387  }
2388 
2389 #ifndef DDS_HAS_MINIMUM_BIT
2390  if (TheServiceParticipant->get_BIT()) {
2391  for (DCPS_IR_Domain_Map::const_iterator currentDomain = domains_.begin();
2392  currentDomain != domains_.end();
2393  ++currentDomain) {
2394  currentDomain->second->reassociate_built_in_topic_pubs();
2395  }
2396  }
2397 #endif
2398 
2399  return true;
2400 }
2401 
2402 void
2404 {
2405  if (this->um_) {
2406  this->um_->add(updater);
2407  }
2408 }
2409 
2410 bool
2412 {
2414  ("UpdateManagerSvc");
2415 
2416  if (um_ != 0) {
2417  um_->add(this);
2418 
2419  // Request persistent image.
2420  if (reincarnate_) {
2421  um_->requestImage();
2422  }
2423 
2424  } else {
2425  ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("TAO_DDS_DCPSInfo_i> Failed to discover ")
2426  ACE_TEXT("UpdateManagerSvc.\n")), false);
2427  }
2428 
2429  return true;
2430 }
2431 
2432 bool
2434 {
2435  if (this->reassociate_timer_id_ != -1) return false; // already scheduled
2436 
2437  ACE_Reactor* reactor = this->orb_->orb_core()->reactor();
2438 
2439  this->reassociate_timer_id_ = reactor->schedule_timer(this, 0, delay, delay);
2440  return this->reassociate_timer_id_ != -1;
2441 }
2442 
2443 bool
2445 {
2446  if (this->dispatch_check_timer_id_ != -1) return false; // already scheduled
2447 
2448  ACE_Reactor* reactor = this->orb_->orb_core()->reactor();
2449 
2450  this->dispatch_check_timer_id_ = reactor->schedule_timer(this, this, delay, delay);
2451  return this->dispatch_check_timer_id_ != -1;
2452 }
2453 
2454 void
2456 {
2457  if (reassociate_timer_id_ != -1) {
2458  ACE_Reactor* reactor = this->orb_->orb_core()->reactor();
2459 
2460  reactor->cancel_timer(this->reassociate_timer_id_);
2461  this->reassociate_timer_id_ = -1;
2462  }
2463 
2464  if (dispatch_check_timer_id_ != -1) {
2465  ACE_Reactor* reactor = this->orb_->orb_core()->reactor();
2466 
2467  reactor->cancel_timer(this->dispatch_check_timer_id_);
2468  this->dispatch_check_timer_id_ = -1;
2469  }
2470 }
2471 
2472 const DCPS_IR_Domain_Map&
2474 {
2475  return this->domains_;
2476 }
2477 
2478 
2479 char*
2481 {
2482  std::string dump;
2483 #if !defined (OPENDDS_INFOREPO_REDUCED_FOOTPRINT)
2484  std::string indent (" ");
2485 
2486  for (DCPS_IR_Domain_Map::const_iterator dm = domains_.begin();
2487  dm != domains_.end();
2488  dm++)
2489  {
2490  dump += dm->second->dump_to_string(indent, 0);
2491  }
2492 #endif // !defined (OPENDDS_INFOREPO_REDUCED_FOOTPRINT)
2493  return CORBA::string_dup(dump.c_str());
2494 
2495 }
2496 
2498 {
2499 #ifndef DDS_HAS_MINIMUM_BIT
2501  {
2503  if (domains_.empty() || in_cleanup_all_built_in_topics_) {
2504  return;
2505  }
2506  copy = domains_;
2508  }
2509 
2510  for (DCPS_IR_Domain_Map::iterator it = copy.begin(); it != copy.end(); ++it) {
2511  it->second->cleanup_built_in_topics();
2512  }
2513 
2514  {
2517  copy.clear();
2518  domains_.clear();
2519  }
2520 #endif
2521 }
2522 
virtual void remove_subscription(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId, const OpenDDS::DCPS::GUID_t &subscriptionId)
Definition: DCPSInfo_i.cpp:962
virtual void disassociate_publication(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId, const OpenDDS::DCPS::GUID_t &local_id, const OpenDDS::DCPS::GUID_t &remote_id)
#define ACE_DEBUG(X)
int find_topic_reference(OpenDDS::DCPS::GUID_t topicId, DCPS_IR_Topic *&topic)
#define ACE_ERROR(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
bool set_qos(const DDS::DataWriterQos &qos, const DDS::PublisherQos &publisherQos, Update::SpecificQos &specificQos)
const LogLevel::Value value
Definition: debug.cpp:61
virtual ::CORBA::Boolean update_subscription_params(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId, const OpenDDS::DCPS::GUID_t &subscriptionId, const DDS::StringSeq &params)
int add_subscription_reference(DCPS_IR_Subscription *subscription, bool associate=true)
virtual void disassociate_subscription(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId, const OpenDDS::DCPS::GUID_t &local_id, const OpenDDS::DCPS::GUID_t &remote_id)
ACE_CDR::ULong transportContext
OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL typedef std::map< DDS::DomainId_t, OpenDDS::DCPS::RcHandle< DCPS_IR_Domain > > DCPS_IR_Domain_Map
Definition: DCPSInfo_i.h:36
OpenDDS::DCPS::GUID_t get_next_participant_id()
Next Entity Id value in sequence.
virtual void shutdown()
Cause the entire repository to exit.
Definition: DCPSInfo_i.cpp:113
bool set_topic_qos(const DDS::TopicQos &qos)
virtual CORBA::Boolean attach_participant(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId)
Definition: DCPSInfo_i.cpp:124
int add_subscription(OpenDDS::DCPS::unique_ptr< DCPS_IR_Subscription > sub)
LM_INFO
virtual ~TAO_DDS_DCPSInfo_i()
Definition: DCPSInfo_i.cpp:65
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
void remove_dead_participants(bool part_of_cleanup=false)
Remove any participants currently marked as dead.
const ACE_CDR::ULong transportContextDefault
Definition: DCPSInfo_i.cpp:38
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
const DCPS_IR_Domain_Map & domains() const
Expose a readable reference of the domain map.
OpenDDS::DCPS::GUID_t get_id() const
sequence< octet > key
static int process_directive(const ACE_TCHAR directive[])
const DCPS_IR_Topic_Map & topics() const
Expose a readable reference to the topic map.
ParticipantId participantId() const
Get the participant id from the GUID.
void destroy(const IdPath &id, ItemType type, ActorType actor=DataWriter)
DCPS_IR_Domain_Map domains_
Definition: DCPSInfo_i.h:414
CORBA::ORB_ptr orb()
Expose the ORB.
Definition: DCPSInfo_i.cpp:119
static TYPE * instance(const ACE_TCHAR *name)
bool receive_image(const Update::UImage &image)
OpenDDS::DCPS::GUID_t get_next_publication_id(bool builtin)
DCPS_IR_Topic_Description * get_topic_description()
virtual OpenDDS::DCPS::TopicStatus assert_topic(OpenDDS::DCPS::GUID_t_out topicId, DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId, const char *topicName, const char *dataTypeName, const DDS::TopicQos &qos, bool hasDcpsKey)
Definition: DCPSInfo_i.cpp:180
void ignore_participant(OpenDDS::DCPS::GUID_t id)
Ignore the participant with the id.
sequence< TransportLocator > TransportLocatorSeq
CORBA::ORB_var orb_
Definition: DCPSInfo_i.h:415
OpenDDS::Federator::RepoKey federationId() const
Get the federeation id from the GUID.
OpenDDS::DCPS::GUID_t get_next_subscription_id(bool builtin)
T::rv_reference move(T &p)
Definition: unique_ptr.h:141
Update::Manager * um_
Definition: DCPSInfo_i.h:421
int remove_subscription(OpenDDS::DCPS::GUID_t subId)
int init_built_in_topics(bool federated, bool persistent)
CORBA::ORB_var dispatchingOrb_
Definition: DCPSInfo_i.h:416
bool init_dispatchChecking(const ACE_Time_Value &delay)
T * _duplicate(T *st)
bool set_qos(const DDS::DomainParticipantQos &qos)
void create(const UType &info)
void ignore_publication(OpenDDS::DCPS::GUID_t id)
Ignore the publication with the id.
DCPS_IR_Participant * participant(const OpenDDS::DCPS::GUID_t &id) const
Find the participant with the id.
DDS::TopicQos * get_topic_qos()
void disassociate_publication(OpenDDS::DCPS::GUID_t id, bool reassociate=false)
Remove any publications with id.
OpenDDS::DCPS::ConditionVariable< ACE_Recursive_Thread_Mutex > cv_
Definition: DCPSInfo_i.h:446
std::map< OpenDDS::DCPS::GUID_t, OpenDDS::DCPS::container_supported_unique_ptr< DCPS_IR_Publication >, OpenDDS::DCPS::GUID_tKeyLessThan > DCPS_IR_Publication_Map
virtual OpenDDS::DCPS::GUID_t add_subscription(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId, const OpenDDS::DCPS::GUID_t &topicId, OpenDDS::DCPS::DataReaderRemote_ptr subscription, const DDS::DataReaderQos &qos, const OpenDDS::DCPS::TransportLocatorSeq &transInfo, const DDS::SubscriberQos &subscriberQos, const char *filterClassName, const char *filterExpression, const DDS::StringSeq &exprParams, const DDS::OctetSeq &serializedTypeInfo)
Definition: DCPSInfo_i.cpp:681
const DCPS_IR_Publication_Map & publications() const
Expose a readable reference to the publication map.
DOMAINID_TYPE_NATIVE DomainId_t
virtual OpenDDS::DCPS::TopicStatus remove_topic(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId, const OpenDDS::DCPS::GUID_t &topicId)
Definition: DCPSInfo_i.cpp:325
char * string_dup(const char *)
LM_DEBUG
ACE_Event_Handler * handler(void) const
void finalize()
Cleanup state for shutdown.
void add(TAO_DDS_DCPSInfo_i *info)
long entityKey() const
Extract the EntityKey value.
void changeOwner(long sender, long owner)
Process an incoming update that changes ownership.
bool remove_by_owner(DDS::DomainId_t domain, long owner)
char ACE_TCHAR
virtual CORBA::Boolean update_domain_participant_qos(DDS::DomainId_t domain, const OpenDDS::DCPS::GUID_t &participantId, const DDS::DomainParticipantQos &qos)
ACE_CDR::Boolean Boolean
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
virtual long schedule_timer(ACE_Event_Handler *event_handler, const void *arg, const ACE_Time_Value &delay, const ACE_Time_Value &interval=ACE_Time_Value::zero)
virtual void ignore_subscription(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &myParticipantId, const OpenDDS::DCPS::GUID_t &ignoreId)
Representative of a Topic.
Definition: DCPS_IR_Topic.h:43
bool add_topic(const OpenDDS::DCPS::GUID_t &topicId, DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId, const char *topicName, const char *dataTypeName, const DDS::TopicQos &qos)
Add a previously existing topic to the repository.
Definition: DCPSInfo_i.cpp:232
void requestImage()
Force a clean shutdown.
Representative of the Domain Participant.
virtual void remove_publication(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId, const OpenDDS::DCPS::GUID_t &publicationId)
Definition: DCPSInfo_i.cpp:630
bool set_qos(const DDS::DataReaderQos &qos, const DDS::SubscriberQos &subscriberQos, Update::SpecificQos &specificQos)
PartIdType lastPartId
What the last participant id is/was.
void cleanup_all_built_in_topics()
void add(Update::Updater *updater)
Add an additional Updater interface.
int add_publication_reference(DCPS_IR_Publication *publication, bool associate=true)
ParticipantSeq participants
LM_WARNING
sequence< octet > OctetSeq
Definition: DdsDcpsCore.idl:64
TransportInst_rch create_inst(const OPENDDS_STRING &name, const OPENDDS_STRING &transport_type)
static ACE_Service_Gestalt * current(void)
static CORBA::ORB_ptr _duplicate(CORBA::ORB_ptr orb)
TransportConfig_rch create_config(const OPENDDS_STRING &name)
ACE_UINT32 ULong
DomainIdType domainId
virtual void ignore_domain_participant(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &myParticipantId, const OpenDDS::DCPS::GUID_t &ignoreId)
int remove_topic_reference(OpenDDS::DCPS::GUID_t topicId, DCPS_IR_Topic *&topic)
virtual ACE_Reactor * reactor(void) const
virtual OpenDDS::DCPS::GUID_t add_publication(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId, const OpenDDS::DCPS::GUID_t &topicId, OpenDDS::DCPS::DataWriterRemote_ptr publication, const DDS::DataWriterQos &qos, const OpenDDS::DCPS::TransportLocatorSeq &transInfo, const DDS::PublisherQos &publisherQos, const DDS::OctetSeq &serializedTypeInfo)
Definition: DCPSInfo_i.cpp:374
ShutdownInterface * shutdown_
Interface to effect shutdown of the process.
Definition: DCPSInfo_i.h:425
int find(const PG_Property_Set &decoder, const ACE_CString &key, TYPE &value)
long dispatch_check_timer_id_
Definition: DCPSInfo_i.h:430
_in_type in(void) const
virtual void disassociate_participant(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &local_id, const OpenDDS::DCPS::GUID_t &remote_id)
virtual OpenDDS::DCPS::AddDomainStatus add_domain_participant(DDS::DomainId_t domain, const DDS::DomainParticipantQos &qos)
ACE_TEXT("TCP_Factory")
virtual OpenDDS::DCPS::TopicStatus find_topic(DDS::DomainId_t domainId, const char *topicName, CORBA::String_out dataTypeName, DDS::TopicQos_out qos, OpenDDS::DCPS::GUID_t_out topicId)
Definition: DCPSInfo_i.cpp:291
TAO_DDS_DCPSInfo_i(CORBA::ORB_ptr orb, bool reincarnate, ShutdownInterface *shutdown, const TAO_DDS_DCPSFederationId &federation)
Definition: DCPSInfo_i.cpp:42
int find_publication_reference(OpenDDS::DCPS::GUID_t pubId, DCPS_IR_Publication *&pub)
Return the publication object.
virtual char * dump_to_string()
Dump the Repos state to string.
ACE_recursive_thread_mutex_t lock_
void update(const IdPath &id, const QosType &qos)
CvStatus wait(ThreadStatusManager &thread_status_manager)
Block until thread is woken up.
virtual void ignore_publication(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &myParticipantId, const OpenDDS::DCPS::GUID_t &ignoreId)
DCPS_IR_Domain * domain(DDS::DomainId_t domain)
Convert a domain Id into a reference to a DCPS_IR_Domain object.
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
const DCPS_IR_Participant_Map & participants() const
Expose a readable reference to the participant map.
virtual void shutdown()=0
void update_expr_params(const DDS::StringSeq &params)
Calls associated Publications.
ORB_ptr ORB_init(int &argc, char *argv[], const char *orb_name=0)
void disassociate_subscription(OpenDDS::DCPS::GUID_t id, bool reassociate=false)
Remove any subscriptions with the id.
bool in_cleanup_all_built_in_topics_
Definition: DCPSInfo_i.h:450
virtual int cancel_timer(long timer_id, const void **arg=0, int dont_call_handle_close=1)
virtual CORBA::Boolean update_subscription_qos(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &partId, const OpenDDS::DCPS::GUID_t &drId, const DDS::DataReaderQos &qos, const DDS::SubscriberQos &subscriberQos)
void ignore_subscription(OpenDDS::DCPS::GUID_t id)
Ignore the subscription with the id.
int remove_publication(OpenDDS::DCPS::GUID_t pubId)
DDS::ReturnCode_t copy(DDS::DynamicData_ptr dest, DDS::DynamicData_ptr src)
bool isOwner() const
Indication of whether the current repository is the owner of this participant.
std::map< OpenDDS::DCPS::GUID_t, OpenDDS::DCPS::container_supported_unique_ptr< DCPS_IR_Subscription >, OpenDDS::DCPS::GUID_tKeyLessThan > DCPS_IR_Subscription_Map
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
static const char DEFAULT_INST_PREFIX[]
virtual int handle_timeout(const ACE_Time_Value &now, const void *arg)
Definition: DCPSInfo_i.cpp:70
static TransportRegistry * instance()
Return a singleton instance of this class.
Representative of a Subscription.
int add_publication(OpenDDS::DCPS::unique_ptr< DCPS_IR_Publication > pub)
Representation of a Domain in the system.
bool changeOwnership(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId, long sender, long owner)
assert new ownership for a participant and its contained entities.
Definition: DCPSInfo_i.cpp:152
const TAO_DDS_DCPSFederationId & federation_
Definition: DCPSInfo_i.h:418
int find_subscription_reference(OpenDDS::DCPS::GUID_t subId, DCPS_IR_Subscription *&sub)
Return the subscription object.
#define ACE_ERROR_RETURN(X, Y)
The wait has returned because it was woken up.
const character_type * in(void) const
RcHandle< T > dynamic_rchandle_cast(const RcHandle< U > &h)
Definition: RcHandle_T.h:214
DomainIdType domainId
int init_transport(int listen_address_given, const char *listen_str)
#define TheServiceParticipant
void last_subscription_key(long key)
void ignore_topic(OpenDDS::DCPS::GUID_t id)
Ignore the topic with the id.
long reassociate_timer_id_
Definition: DCPSInfo_i.h:429
LM_ERROR
virtual void remove_domain_participant(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId)
void takeOwnership()
Take local ownership of this participant and publish an update.
Representative of a Topic Description.
bool init_reassociation(const ACE_Time_Value &delay)
Boolean is_nil(T x)
std::map< OpenDDS::DCPS::GUID_t, DCPS_IR_Participant_rch, OpenDDS::DCPS::GUID_tKeyLessThan > DCPS_IR_Participant_Map
virtual CORBA::Boolean update_topic_qos(const OpenDDS::DCPS::GUID_t &topicId, DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId, const DDS::TopicQos &qos)
const DCPS_IR_Subscription_Map & subscriptions() const
Expose a readable reference to the subscription map.
Representative of a Publication.
OpenDDS::DCPS::RepoIdGenerator participantIdGenerator_
Definition: DCPSInfo_i.h:419
virtual void ignore_topic(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &myParticipantId, const OpenDDS::DCPS::GUID_t &ignoreId)
bool useBIT() const
sequence< string > StringSeq
Definition: DdsDcpsCore.idl:50
ACE_Recursive_Thread_Mutex lock_
Definition: DCPSInfo_i.h:427
virtual CORBA::Boolean update_publication_qos(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &partId, const OpenDDS::DCPS::GUID_t &dwId, const DDS::DataWriterQos &qos, const DDS::PublisherQos &publisherQos)
void id(RepoKey fedId)
void last_publication_key(long key)