OpenDDS  Snapshot(2023/04/28-20:55)
DCPS_IR_Participant.cpp
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 #include "DcpsInfo_pch.h"
9 #include /**/ "DCPS_IR_Participant.h"
10 #include "FederationId.h"
11 #include "UpdateManager.h"
12 
13 #include /**/ "DCPS_IR_Domain.h"
14 #include /**/ "DCPS_IR_Subscription.h"
15 #include /**/ "DCPS_IR_Publication.h"
16 #include /**/ "DCPS_IR_Topic.h"
17 #include /**/ "DCPS_IR_Topic_Description.h"
18 
19 #include /**/ "dds/DCPS/RepoIdConverter.h"
20 #include <sstream>
21 
22 #include /**/ "tao/debug.h"
23 
25 
28  DCPS_IR_Domain* domain,
30  Update::Manager* um,
31  bool isBit)
32  : id_(id),
33  domain_(domain),
34  qos_(qos),
35  aliveStatus_(1),
36  handle_(0),
37  federationId_(federationId),
38  owner_(federationId.overridden() ? OWNER_NONE : federationId.id()),
39  topicIdGenerator_(
40  federationId.id(),
41  OpenDDS::DCPS::RepoIdConverter(id).participantId(),
42  isBit ? OpenDDS::DCPS::KIND_BUILTIN_TOPIC : OpenDDS::DCPS::KIND_USER_TOPIC),
43  publicationIdGenerator_(
44  federationId.id(),
45  OpenDDS::DCPS::RepoIdConverter(id).participantId(),
46  isBit ? OpenDDS::DCPS::KIND_BUILTIN_WRITER : OpenDDS::DCPS::KIND_USER_WRITER),
47  subscriptionIdGenerator_(
48  federationId.id(),
49  OpenDDS::DCPS::RepoIdConverter(id).participantId(),
50  isBit ? OpenDDS::DCPS::KIND_BUILTIN_READER : OpenDDS::DCPS::KIND_USER_READER),
51  um_(um),
52  isBitPublisher_(isBit)
53 {
54 }
55 
57 {
58 }
59 
62 {
63  return this->publications_;
64 }
65 
68 {
69  return this->subscriptions_;
70 }
71 
72 const DCPS_IR_Topic_Map&
74 {
75  return this->topicRefs_;
76 }
77 
78 void
80 {
81  /// Publish an update with our ownership.
82  if (this->um_ && (this->isBitPublisher() == false)) {
83  this->um_->create(
85  this->domain_->get_id(),
86  this->id_,
87  this->federationId_.id()));
88 
92  ACE_TEXT("(%P|%t) DCPS_IR_Participant::take_ownership: ")
93  ACE_TEXT("pushing ownership %C in domain %d.\n"),
94  std::string(converter).c_str(),
95  this->domain_->get_id()));
96  }
97  }
98 
99  // And now handle our internal ownership processing.
100  this->changeOwner(this->federationId_.id(), this->federationId_.id());
101 }
102 
103 void
105 {
106  {
107  ACE_GUARD(ACE_SYNCH_MUTEX, guard, this->ownerLock_);
108 
109  if ((owner == OWNER_NONE)
110  && (this->isOwner() || (this->owner_ != sender))) {
111  // Do not eliminate ownership if we are the owner or if the update
112  // does not come from the current owner.
113  return;
114  }
115 
116  // Finally. Change the value.
117  this->owner_ = owner;
118 
119  } // End of lock scope.
120 
121  if (this->isOwner()) {
122  /// @TODO: Ensure that any stalled callbacks are made.
123  }
124 }
125 
126 long
128 {
129  return this->owner_;
130 }
131 
132 bool
134 {
135  return this->owner_ == this->federationId_.id();
136 }
137 
138 bool&
140 {
141  return this->isBitPublisher_;
142 }
143 
144 bool
146 {
147  return this->isBitPublisher_;
148 }
149 
151 {
152  OpenDDS::DCPS::GUID_t pubId = pub->get_id();
153  DCPS_IR_Publication_Map::iterator where = this->publications_.find(pubId);
154 
155  if (where == this->publications_.end()) {
156  DCPS_IR_Publication* pubptr = pub.get();
157  this->publications_.insert(
158  where, DCPS_IR_Publication_Map::value_type(pubId, OpenDDS::DCPS::move(pub)));
159 
160  if (isBitPublisher_) {
162  }
163 
165  OpenDDS::DCPS::RepoIdConverter part_converter(id_);
166  OpenDDS::DCPS::RepoIdConverter pub_converter(pubId);
168  ACE_TEXT("(%P|%t) DCPS_IR_Participant::add_publication: ")
169  ACE_TEXT("participant %C successfully added publication %C at 0x%x.\n"),
170  std::string(part_converter).c_str(),
171  std::string(pub_converter).c_str(),
172  pubptr));
173  }
174 
175  return 0;
176 
177  } else {
179  OpenDDS::DCPS::RepoIdConverter part_converter(id_);
180  OpenDDS::DCPS::RepoIdConverter pub_converter(pubId);
182  ACE_TEXT("(%P|%t) NOTICE: DCPS_IR_Participant::add_publication: ")
183  ACE_TEXT("participant %C attempted to add existing publication %C.\n"),
184  std::string(part_converter).c_str(),
185  std::string(pub_converter).c_str()));
186  }
187 
188  return 1;
189  }
190 }
191 
193  DCPS_IR_Publication* & pub)
194 {
195  DCPS_IR_Publication_Map::iterator where = this->publications_.find(pubId);
196 
197  if (where != this->publications_.end()) {
198  pub = where->second.get();
199 
201  OpenDDS::DCPS::RepoIdConverter part_converter(id_);
202  OpenDDS::DCPS::RepoIdConverter pub_converter(pubId);
204  ACE_TEXT("(%P|%t) DCPS_IR_Participant::find_publication_reference: ")
205  ACE_TEXT("participant %C found publication %C at 0x%x.\n"),
206  std::string(part_converter).c_str(),
207  std::string(pub_converter).c_str(),
208  pub));
209  }
210 
211  return 0;
212 
213  } else {
215  OpenDDS::DCPS::RepoIdConverter part_converter(id_);
216  OpenDDS::DCPS::RepoIdConverter pub_converter(pubId);
218  ACE_TEXT("(%P|%t) DCPS_IR_Participant::find_publication_reference: ")
219  ACE_TEXT("participant %C could not find publication %C.\n"),
220  std::string(part_converter).c_str(),
221  std::string(pub_converter).c_str()));
222  }
223  pub = 0;
224  return -1;
225  }
226 }
227 
229 {
230  DCPS_IR_Publication_Map::iterator where = this->publications_.find(pubId);
231 
232  if (where != this->publications_.end()) {
233  DCPS_IR_Topic* topic = where->second->get_topic();
234  topic->remove_publication_reference(where->second.get());
235 
236  if (0 != where->second->remove_associations(false)) {
237  // N.B. As written today, this branch will never be taken.
238  OpenDDS::DCPS::RepoIdConverter part_converter(id_);
239  OpenDDS::DCPS::RepoIdConverter pub_converter(pubId);
241  ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Participant::remove_publication: ")
242  ACE_TEXT("participant %C unable to remove associations from publication %C\n"),
243  std::string(part_converter).c_str(),
244  std::string(pub_converter).c_str()));
245  return -1;
246  }
247 
248  this->domain_->dispose_publication_bit(where->second.get());
249  topic->release(false);
250  this->publications_.erase(where);
251 
253  OpenDDS::DCPS::RepoIdConverter part_converter(id_);
254  OpenDDS::DCPS::RepoIdConverter pub_converter(pubId);
256  ACE_TEXT("(%P|%t) DCPS_IR_Participant::remove_publication: ")
257  ACE_TEXT("participant %C removed publication %C.\n"),
258  std::string(part_converter).c_str(),
259  std::string(pub_converter).c_str()));
260  }
261 
262  return 0;
263 
264  } else {
265  OpenDDS::DCPS::RepoIdConverter part_converter(id_);
266  OpenDDS::DCPS::RepoIdConverter pub_converter(pubId);
268  ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Participant::remove_publication: ")
269  ACE_TEXT("participant %C unable to remove publication %C.\n"),
270  std::string(part_converter).c_str(),
271  std::string(pub_converter).c_str()));
272  return -1;
273  }
274 }
275 
277 {
278  OpenDDS::DCPS::GUID_t subId = sub->get_id();
279  DCPS_IR_Subscription_Map::iterator where = this->subscriptions_.find(subId);
280 
281  if (where == this->subscriptions_.end()) {
282 
283  DCPS_IR_Subscription* subptr = sub.get();
284  this->subscriptions_.insert(
285  where, DCPS_IR_Subscription_Map::value_type(subId, OpenDDS::DCPS::move(sub)));
286 
287  if (isBitPublisher_) {
289  }
290 
292  OpenDDS::DCPS::RepoIdConverter part_converter(id_);
293  OpenDDS::DCPS::RepoIdConverter sub_converter(subId);
295  ACE_TEXT("(%P|%t) DCPS_IR_Participant::add_subscription: ")
296  ACE_TEXT("participant %C successfully added subscription %C at 0x%x.\n"),
297  std::string(part_converter).c_str(),
298  std::string(sub_converter).c_str(),
299  subptr));
300  }
301 
302  return 0;
303 
304  } else {
306  OpenDDS::DCPS::RepoIdConverter part_converter(id_);
307  OpenDDS::DCPS::RepoIdConverter sub_converter(subId);
309  ACE_TEXT("(%P|%t) NOTICE: DCPS_IR_Participant::add_subscription: ")
310  ACE_TEXT("participant %C attempted to add existing subscription %C.\n"),
311  std::string(part_converter).c_str(),
312  std::string(sub_converter).c_str()));
313  }
314 
315  return 1;
316  }
317 }
318 
320  DCPS_IR_Subscription*& sub)
321 {
322  DCPS_IR_Subscription_Map::iterator where = this->subscriptions_.find(subId);
323 
324  if (where != this->subscriptions_.end()) {
325  sub = where->second.get();
326 
328  OpenDDS::DCPS::RepoIdConverter part_converter(id_);
329  OpenDDS::DCPS::RepoIdConverter sub_converter(subId);
331  ACE_TEXT("(%P|%t) DCPS_IR_Participant::find_subscription_reference: ")
332  ACE_TEXT("participant %C found subscription %C at 0x%x.\n"),
333  std::string(part_converter).c_str(),
334  std::string(sub_converter).c_str(),
335  sub));
336  }
337 
338  return 0;
339 
340  } else {
342  OpenDDS::DCPS::RepoIdConverter part_converter(id_);
343  OpenDDS::DCPS::RepoIdConverter sub_converter(subId);
345  ACE_TEXT("(%P|%t) DCPS_IR_Participant::find_subscription_reference: ")
346  ACE_TEXT("participant %C could not find subscription %C.\n"),
347  std::string(part_converter).c_str(),
348  std::string(sub_converter).c_str()));
349  }
350  sub = 0;
351  return -1;
352  }
353 }
354 
356 {
357  DCPS_IR_Subscription_Map::iterator where = this->subscriptions_.find(subId);
358 
359  if (where != this->subscriptions_.end()) {
360  DCPS_IR_Topic* topic = where->second->get_topic();
361  topic->remove_subscription_reference(where->second.get());
362 
363  if (0 != where->second->remove_associations(false)) {
364  // N.B. As written today, this branch will never be taken.
365  OpenDDS::DCPS::RepoIdConverter part_converter(id_);
366  OpenDDS::DCPS::RepoIdConverter sub_converter(subId);
368  ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Participant::remove_subscription: ")
369  ACE_TEXT("participant %C unable to remove associations from subscription %C\n"),
370  std::string(part_converter).c_str(),
371  std::string(sub_converter).c_str()));
372  return -1;
373  }
374 
375  this->domain_->dispose_subscription_bit(where->second.get());
376  topic->release(false);
377  this->subscriptions_.erase(where);
378 
380  OpenDDS::DCPS::RepoIdConverter part_converter(id_);
381  OpenDDS::DCPS::RepoIdConverter sub_converter(subId);
383  ACE_TEXT("(%P|%t) DCPS_IR_Participant::remove_subscription: ")
384  ACE_TEXT("participant %C removed subscription %C.\n"),
385  std::string(part_converter).c_str(),
386  std::string(sub_converter).c_str()));
387  }
388 
389  return 0;
390 
391  } else {
392  OpenDDS::DCPS::RepoIdConverter part_converter(id_);
393  OpenDDS::DCPS::RepoIdConverter sub_converter(subId);
395  ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Participant::remove_subscription: ")
396  ACE_TEXT("participant %C unable to remove subscription %C.\n"),
397  std::string(part_converter).c_str(),
398  std::string(sub_converter).c_str()));
399  return -1;
400  }
401 }
402 
404 {
405  OpenDDS::DCPS::GUID_t topicId = topic->get_id();
406  DCPS_IR_Topic_Map::iterator where = this->topicRefs_.find(topicId);
407 
408  if (where == this->topicRefs_.end()) {
409  this->topicRefs_.insert(
410  where, DCPS_IR_Topic_Map::value_type(topicId, topic));
411 
412  if (isBitPublisher_) {
414  }
415 
417  OpenDDS::DCPS::RepoIdConverter part_converter(id_);
418  OpenDDS::DCPS::RepoIdConverter topic_converter(topicId);
420  ACE_TEXT("(%P|%t) DCPS_IR_Participant::add_topic_reference: ")
421  ACE_TEXT("participant %C successfully added topic %C at 0x%x.\n"),
422  std::string(part_converter).c_str(),
423  std::string(topic_converter).c_str(),
424  topic));
425  }
426 
427  return 0;
428 
429  } else {
431  OpenDDS::DCPS::RepoIdConverter part_converter(id_);
432  OpenDDS::DCPS::RepoIdConverter topic_converter(topicId);
434  ACE_TEXT("(%P|%t) NOTICE: DCPS_IR_Participant::add_topic_reference: ")
435  ACE_TEXT("participant %C attempted to add existing topic %C.\n"),
436  std::string(part_converter).c_str(),
437  std::string(topic_converter).c_str()));
438  }
439 
440  return 1;
441  }
442 }
443 
445  DCPS_IR_Topic*& topic)
446 {
447  DCPS_IR_Topic_Map::iterator where = this->topicRefs_.find(topicId);
448 
449  if (where != this->topicRefs_.end()) {
450  topic = where->second;
451  this->topicRefs_.erase(where);
452 
454  OpenDDS::DCPS::RepoIdConverter part_converter(id_);
455  OpenDDS::DCPS::RepoIdConverter topic_converter(topicId);
457  ACE_TEXT("(%P|%t) DCPS_IR_Participant::remove_topic_reference: ")
458  ACE_TEXT("participant %C removed topic %C at 0x%x.\n"),
459  std::string(part_converter).c_str(),
460  std::string(topic_converter).c_str(),
461  topic));
462  }
463 
464  return 0;
465 
466  } else {
468  OpenDDS::DCPS::RepoIdConverter part_converter(id_);
469  OpenDDS::DCPS::RepoIdConverter topic_converter(topicId);
471  ACE_TEXT("(%P|%t) WARNING: DCPS_IR_Participant::remove_topic_reference: ")
472  ACE_TEXT("participant %C unable to find topic %C for removal.\n"),
473  std::string(part_converter).c_str(),
474  std::string(topic_converter).c_str()));
475  }
476 
477  return -1;
478  }
479 }
480 
482  DCPS_IR_Topic*& topic)
483 {
484  DCPS_IR_Topic_Map::iterator where = this->topicRefs_.find(topicId);
485 
486  if (where != this->topicRefs_.end()) {
487  topic = where->second;
488 
490  OpenDDS::DCPS::RepoIdConverter part_converter(id_);
491  OpenDDS::DCPS::RepoIdConverter topic_converter(topicId);
493  ACE_TEXT("(%P|%t) DCPS_IR_Participant::find_topic_reference: ")
494  ACE_TEXT("participant %C found topic %C at %x.\n"),
495  std::string(part_converter).c_str(),
496  std::string(topic_converter).c_str(),
497  topic));
498  }
499 
500  return 0;
501 
502  } else {
503  OpenDDS::DCPS::RepoIdConverter part_converter(id_);
504  OpenDDS::DCPS::RepoIdConverter topic_converter(topicId);
506  ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Participant::find_topic_reference: ")
507  ACE_TEXT("participant %C unable to find topic %C.\n"),
508  std::string(part_converter).c_str(),
509  std::string(topic_converter).c_str()));
510  topic = 0;
511  return -1;
512  }
513 }
514 
516 {
517  // remove all the publications associations
518  {
519  DCPS_IR_Publication_Map::const_iterator next = this->publications_.begin();
520 
521  while (next != this->publications_.end()) {
522  DCPS_IR_Publication_Map::const_iterator current = next;
523  ++ next;
524  DCPS_IR_Topic* topic = current->second->get_topic();
525  topic->remove_publication_reference(current->second.get());
526 
527  if (0 != current->second->remove_associations(notify_lost)) {
528  return;
529  }
530 
531  topic->release(false);
532  }
533  }
534 
535  {
536  DCPS_IR_Subscription_Map::const_iterator next = this->subscriptions_.begin();
537 
538  while (next != this->subscriptions_.end()) {
539  DCPS_IR_Subscription_Map::const_iterator current = next;
540  ++ next;
541  DCPS_IR_Topic* topic = current->second->get_topic();
542  topic->remove_subscription_reference(current->second.get());
543 
544  if (0 != current->second->remove_associations(notify_lost)) {
545  return;
546  }
547 
548  topic->release(false);
549  }
550  }
551 
552  {
553  DCPS_IR_Topic_Map::const_iterator next = this->topicRefs_.begin();
554 
555  while (next != this->topicRefs_.end()) {
556  DCPS_IR_Topic_Map::const_iterator current = next;
557  ++ next;
558 
559  // Notify the federation to remove the topic.
560  if (this->um_ && (this->isBitPublisher() == false)) {
561  Update::IdPath path(
562  this->domain_->get_id(),
563  this->get_id(),
564  current->second->get_id());
565  this->um_->destroy(path, Update::Topic);
566 
568  OpenDDS::DCPS::GUID_t id = current->second->get_id();
569  OpenDDS::DCPS::RepoIdConverter converter(id);
571  ACE_TEXT("(%P|%t) DCPS_IR_Participant::remove_all_dependents: ")
572  ACE_TEXT("pushing deletion of topic %C in domain %d.\n"),
573  std::string(converter).c_str(),
574  this->domain_->get_id()));
575  }
576 
577  // Remove the topic ourselves.
578  // N.B. This call sets the second (reference) argument to 0, so when
579  // clear() is called below, no destructor is (re)called.
580 
581  // Get the topic id and topic point before remove_topic since it
582  // invalidates the iterator. Accessing after removal got SEGV.
583  OpenDDS::DCPS::GUID_t id = current->first;
584  DCPS_IR_Topic* topic = current->second;
585 
586  this->domain_->remove_topic(this, topic);
587 
589  OpenDDS::DCPS::RepoIdConverter part_converter(id_);
590  OpenDDS::DCPS::RepoIdConverter topic_converter(id);
592  ACE_TEXT("(%P|%t) DCPS_IR_Participant::remove_all_dependents: ")
593  ACE_TEXT("domain %d participant %C removed topic %C.\n"),
594  this->domain_->get_id(),
595  std::string(part_converter).c_str(),
596  std::string(topic_converter).c_str()));
597  }
598  }
599  }
600  }
601 
602  // Clear the Topic container of null pointers.
603  this->topicRefs_.clear();
604 
605  // The publications and subscriptions can NOT be deleted until after all
606  // the associations have been removed. Otherwise an access violation
607  // can occur because a publication and subscription of this participant
608  // could be associated.
609 
610  // delete all the publications
611  for (DCPS_IR_Publication_Map::const_iterator current = this->publications_.begin();
612  current != this->publications_.end();
613  ++current) {
614  // Notify the federation to destroy the publication.
615  if (this->um_ && (this->isBitPublisher() == false)) {
616  Update::IdPath path(
617  this->domain_->get_id(),
618  this->get_id(),
619  current->second->get_id());
621 
623  OpenDDS::DCPS::GUID_t id = current->second->get_id();
624  OpenDDS::DCPS::RepoIdConverter converter(id);
626  ACE_TEXT("(%P|%t) DCPS_IR_Participant::remove_all_dependents: ")
627  ACE_TEXT("pushing deletion of publication %C in domain %d.\n"),
628  std::string(converter).c_str(),
629  this->domain_->get_id()));
630  }
631  }
632  }
633 
634  // Clear the container.
635  this->publications_.clear();
636 
637  // delete all the subscriptions
638  for (DCPS_IR_Subscription_Map::const_iterator current
639  = this->subscriptions_.begin();
640  current != this->subscriptions_.end();
641  ++current) {
642  // Notify the federation to destroy the subscription.
643  if (this->um_ && (this->isBitPublisher() == false)) {
644  Update::IdPath path(
645  this->domain_->get_id(),
646  this->get_id(),
647  current->second->get_id());
649 
651  OpenDDS::DCPS::GUID_t id = current->second->get_id();
652  OpenDDS::DCPS::RepoIdConverter converter(id);
654  ACE_TEXT("(%P|%t) DCPS_IR_Participant::remove_all_dependents: ")
655  ACE_TEXT("pushing deletion of subscription %C in domain %d.\n"),
656  std::string(converter).c_str(),
657  this->domain_->get_id()));
658  }
659  }
660  }
661 
662  // Clear the container.
663  this->subscriptions_.clear();
664 }
665 
667 {
668  aliveStatus_ = 0;
670 }
671 
673 {
674  return id_;
675 }
676 
678 {
679  return aliveStatus_;
680 }
681 
683 {
684  aliveStatus_ = alive;
685 }
686 
688 {
690  OpenDDS::DCPS::RepoIdConverter part_converter(id_);
691  OpenDDS::DCPS::RepoIdConverter ignore_converter(id);
693  ACE_TEXT("(%P|%t) DCPS_IR_Participant::ignore_participant: ")
694  ACE_TEXT("participant %C now ignoring participant %C.\n"),
695  std::string(part_converter).c_str(),
696  std::string(ignore_converter).c_str()));
697  }
698 
700 
701  // disassociate any publications
702  for (DCPS_IR_Publication_Map::const_iterator current = this->publications_.begin();
703  current != this->publications_.end();
704  ++current) {
705  current->second->disassociate_participant(id);
706  }
707 
708  // disassociate any subscriptions
709  for (DCPS_IR_Subscription_Map::const_iterator current = this->subscriptions_.begin();
710  current != this->subscriptions_.end();
711  ++current) {
712  current->second->disassociate_participant(id);
713  }
714 }
715 
717 {
719  OpenDDS::DCPS::RepoIdConverter part_converter(id_);
720  OpenDDS::DCPS::RepoIdConverter ignore_converter(id);
722  ACE_TEXT("(%P|%t) DCPS_IR_Participant::ignore_topic: ")
723  ACE_TEXT("participant %C now ignoring topic %C.\n"),
724  std::string(part_converter).c_str(),
725  std::string(ignore_converter).c_str()));
726  }
727 
729 
730  // disassociate any publications
731  for (DCPS_IR_Publication_Map::const_iterator current = this->publications_.begin();
732  current != this->publications_.end();
733  ++current) {
734  current->second->disassociate_topic(id);
735  }
736 
737  // disassociate any subscriptions
738  for (DCPS_IR_Subscription_Map::const_iterator current = this->subscriptions_.begin();
739  current != this->subscriptions_.end();
740  ++current) {
741  current->second->disassociate_topic(id);
742  }
743 }
744 
746 {
748  OpenDDS::DCPS::RepoIdConverter part_converter(id_);
749  OpenDDS::DCPS::RepoIdConverter ignore_converter(id);
751  ACE_TEXT("(%P|%t) DCPS_IR_Participant::ignore_publication: ")
752  ACE_TEXT("participant %C now ignoring publication %C.\n"),
753  std::string(part_converter).c_str(),
754  std::string(ignore_converter).c_str()));
755  }
756 
758 
759  // disassociate any subscriptions
760  for (DCPS_IR_Subscription_Map::const_iterator current = this->subscriptions_.begin();
761  current != this->subscriptions_.end();
762  ++current) {
763  current->second->disassociate_publication(id);
764  }
765 }
766 
768 {
770  OpenDDS::DCPS::RepoIdConverter part_converter(id_);
771  OpenDDS::DCPS::RepoIdConverter ignore_converter(id);
773  ACE_TEXT("(%P|%t) DCPS_IR_Participant::ignore_subscription: ")
774  ACE_TEXT("participant %C now ignoring subscription %C.\n"),
775  std::string(part_converter).c_str(),
776  std::string(ignore_converter).c_str()));
777  }
778 
780 
781  // disassociate any publications
782  for (DCPS_IR_Publication_Map::const_iterator current = this->publications_.begin();
783  current != this->publications_.end();
784  ++current) {
785  current->second->disassociate_subscription(id);
786  }
787 }
788 
790 {
791  return (0 == ignoredParticipants_.find(id));
792 }
793 
795 {
796  return (0 == ignoredTopics_.find(id));
797 }
798 
800 {
801  return (0 == ignoredPublications_.find(id));
802 }
803 
805 {
806  return (0 == ignoredSubscriptions_.find(id));
807 }
808 
810 {
811  return handle_;
812 }
813 
815 {
816  handle_ = handle;
817 }
818 
820 {
821  return &qos_;
822 }
823 
825 {
826  // Do not need re-evaluate compatibility and associations when
827  // DomainParticipantQos changes since only datareader and datawriter
828  // QoS are evaludated during normal associations establishment.
829 
830  // Do not need publish the QoS change to topics or datareader or
831  // datawriter BIT as they are independent.
832  qos_ = qos;
833  this->domain_->publish_participant_bit(this);
834 
835  return true;
836 }
837 
839 {
840  return domain_;
841 }
842 
845 {
846  return this->topicIdGenerator_.next(builtin);
847 }
848 
851 {
852  return this->publicationIdGenerator_.next(builtin);
853 }
854 
857 {
858  return this->subscriptionIdGenerator_.next(builtin);
859 }
860 
861 void
863 {
864  return this->topicIdGenerator_.last(key);
865 }
866 
867 void
869 {
870  return this->publicationIdGenerator_.last(key);
871 }
872 
873 void
875 {
876  return this->subscriptionIdGenerator_.last(key);
877 }
878 
879 std::string
880 DCPS_IR_Participant::dump_to_string(const std::string& prefix, int depth) const
881 {
882  std::string str;
883 #if !defined (OPENDDS_INFOREPO_REDUCED_FOOTPRINT)
884  OpenDDS::DCPS::RepoIdConverter local_converter(id_);
885 
886  for (int i=0; i < depth; i++)
887  str += prefix;
888  std::string indent = str + prefix;
889  str += "DCPS_IR_Participant[";
890  str += std::string(local_converter);
891  str += "]";
892  if (isBitPublisher_)
893  str += " (BIT)";
894  std::ostringstream os;
895  os << "federation id[" << federationId_.id();
897  os << "(federated)";
898 
899  os << "] owner[" << owner_ << "]";
900  str += os.str();
901  str += aliveStatus_ ? " (alive)" : " (not alive)";
902  str += "\n";
903 
904  str += indent + "Topics:\n";
905  for (DCPS_IR_Topic_Map::const_iterator tm = topicRefs_.begin();
906  tm != topicRefs_.end();
907  tm++)
908  {
909  str += tm->second->dump_to_string(prefix, depth+1);
910  }
911 
912  str += indent + "Publications:\n";
913  for (DCPS_IR_Publication_Map::const_iterator pm = publications_.begin();
914  pm != publications_.end();
915  pm++)
916  {
917  str += pm->second->dump_to_string(prefix, depth+1);
918  }
919 
920  str += indent + "Subscriptions:\n";
921  for (DCPS_IR_Subscription_Map::const_iterator sm = subscriptions_.begin();
922  sm != subscriptions_.end();
923  sm++)
924  {
925  str += sm->second->dump_to_string(prefix, depth+1);
926  }
927 
928  str += indent + "ignored Participants [ ";
930  ipart != ignoredParticipants_.end();
931  ipart++)
932  {
933  OpenDDS::DCPS::RepoIdConverter ipart_converter(*ipart);
934  str += std::string(ipart_converter);
935  str += " ";
936  }
937  str += "]\n";
938  str += indent + "ignored Topics [ ";
939 
941  itop != ignoredTopics_.end();
942  itop++)
943  {
944  OpenDDS::DCPS::RepoIdConverter itop_converter(*itop);
945  str += std::string(itop_converter);
946  str += " ";
947  }
948  str += "]\n";
949  str += indent + "ignored Publications [ ";
950 
952  ipub != ignoredPublications_.end();
953  ipub++)
954  {
955  OpenDDS::DCPS::RepoIdConverter ipub_converter(*ipub);
956  str += std::string(ipub_converter);
957  str += " ";
958  }
959  str += "]\n";
960  str += indent + "ignored Subscriptions [ ";
961 
963  isub != ignoredSubscriptions_.end();
964  isub++)
965  {
966  OpenDDS::DCPS::RepoIdConverter isub_converter(*isub);
967  str += std::string(isub_converter);
968  str += " ";
969  }
970  str += "]\n";
971 
972 #endif // !defined (OPENDDS_INFOREPO_REDUCED_FOOTPRINT)
973  return str;
974 }
975 
TAO_DDS_RepoId_Set ignoredPublications_
#define ACE_DEBUG(X)
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310
int find_topic_reference(OpenDDS::DCPS::GUID_t topicId, DCPS_IR_Topic *&topic)
#define ACE_ERROR(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
void set_bit_status(CORBA::Boolean isBIT)
#define ACE_SYNCH_MUTEX
CORBA::Boolean is_subscription_ignored(OpenDDS::DCPS::GUID_t id)
Update::Manager * um_
int add_subscription(OpenDDS::DCPS::unique_ptr< DCPS_IR_Subscription > sub)
OpenDDS::DCPS::GUID_t get_id()
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
OpenDDS::DCPS::RepoIdGenerator publicationIdGenerator_
DDS::DomainParticipantQos qos_
OpenDDS::DCPS::GUID_t get_id() const
ENTITYKIND_BUILTIN_WRITER_WITH_KEY and ENTITYKIND_USER_WRITER_NO_KEY.
Definition: GuidUtils.h:72
sequence< octet > key
const DDS::DomainParticipantQos * get_qos()
iterator end(void)
const DCPS_IR_Topic_Map & topics() const
Expose a readable reference to the topic map.
void destroy(const IdPath &id, ItemType type, ActorType actor=DataWriter)
void set_bit_status(CORBA::Boolean isBIT)
OpenDDS::DCPS::GUID_t get_next_publication_id(bool builtin)
int remove_publication_reference(DCPS_IR_Publication *publication)
void ignore_participant(OpenDDS::DCPS::GUID_t id)
Ignore the participant with the id.
OpenDDS::DCPS::GUID_t get_next_topic_id(bool builtin)
OpenDDS::DCPS::GUID_t get_next_subscription_id(bool builtin)
T::rv_reference move(T &p)
Definition: unique_ptr.h:141
int remove_subscription(OpenDDS::DCPS::GUID_t subId)
ENTITYKIND_USER_WRITER_WITH_KEY and ENTITYKIND_USER_WRITER_NO_KEY.
Definition: GuidUtils.h:69
bool set_qos(const DDS::DomainParticipantQos &qos)
void create(const UType &info)
void ignore_publication(OpenDDS::DCPS::GUID_t id)
Ignore the publication with the id.
ENTITYKIND_BUILTIN_TOPIC.
Definition: GuidUtils.h:74
OpenDDS::DCPS::GUID_t id_
CORBA::Boolean aliveStatus_
std::map< OpenDDS::DCPS::GUID_t, OpenDDS::DCPS::container_supported_unique_ptr< DCPS_IR_Publication >, OpenDDS::DCPS::GUID_tKeyLessThan > DCPS_IR_Publication_Map
OpenDDS::DCPS::TopicStatus remove_topic(DCPS_IR_Participant *part, DCPS_IR_Topic *&topic)
const DCPS_IR_Publication_Map & publications() const
Expose a readable reference to the publication map.
DCPS_IR_Topic_Map topicRefs_
DCPS_IR_Subscription_Map subscriptions_
iterator begin(void)
LM_DEBUG
ENTITYKIND_OPENDDS_TOPIC.
Definition: GuidUtils.h:71
CORBA::Boolean is_participant_ignored(OpenDDS::DCPS::GUID_t id)
void changeOwner(long sender, long owner)
Process an incoming update that changes ownership.
ENTITYKIND_USER_READER_WITH_KEY and ENTITYKIND_USER_READER_NO_KEY.
Definition: GuidUtils.h:70
DDS::DomainId_t get_id()
const TAO_DDS_DCPSFederationId & federationId_
TAO_DDS_RepoId_Set ignoredSubscriptions_
ACE_CDR::Boolean Boolean
ACE_SYNCH_MUTEX ownerLock_
Lock portions ownership processing.
DCPS_IR_Publication_Map publications_
TAO_DDS_RepoId_Set ignoredParticipants_
long owner() const
Value of the owner for this participant.
int insert(const T &new_item)
Representative of a Topic.
Definition: DCPS_IR_Topic.h:43
CORBA::Boolean is_publication_ignored(OpenDDS::DCPS::GUID_t id)
LM_NOTICE
DDS::InstanceHandle_t get_handle()
void set_handle(DDS::InstanceHandle_t handle)
LM_WARNING
DDS::InstanceHandle_t handle_
int remove_topic_reference(OpenDDS::DCPS::GUID_t topicId, DCPS_IR_Topic *&topic)
bool isBitPublisher_
Flag indicating this participant publishes built-in topics.
OpenDDS::DCPS::GUID_t get_id()
GUID_t next(bool builtin=false)
Obtain the next GUID_t value.
HANDLE_TYPE_NATIVE InstanceHandle_t
Definition: DdsDcpsCore.idl:51
ACE_TEXT("TCP_Factory")
CORBA::Boolean is_topic_ignored(OpenDDS::DCPS::GUID_t id)
int find_publication_reference(OpenDDS::DCPS::GUID_t pubId, DCPS_IR_Publication *&pub)
Return the publication object.
void add_dead_participant(DCPS_IR_Participant_rch participant)
std::string dump_to_string(const std::string &prefix, int depth) const
std::map< OpenDDS::DCPS::GUID_t, DCPS_IR_Topic *, OpenDDS::DCPS::GUID_tKeyLessThan > DCPS_IR_Topic_Map
void publish_participant_bit(DCPS_IR_Participant *participant)
Publish Participant in the Participant Built-In Topic.
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
OpenDDS::DCPS::GUID_t get_id()
void set_bit_status(CORBA::Boolean isBIT)
TAO_DDS_RepoId_Set ignoredTopics_
void ignore_subscription(OpenDDS::DCPS::GUID_t id)
Ignore the subscription with the id.
int remove_publication(OpenDDS::DCPS::GUID_t pubId)
OpenDDS::DCPS::RepoIdGenerator topicIdGenerator_
int find(const T &item) const
DCPS_IR_Domain * domain_
bool isOwner() const
Indication of whether the current repository is the owner of this participant.
void set_alive(CORBA::Boolean alive)
std::map< OpenDDS::DCPS::GUID_t, OpenDDS::DCPS::container_supported_unique_ptr< DCPS_IR_Subscription >, OpenDDS::DCPS::GUID_tKeyLessThan > DCPS_IR_Subscription_Map
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
Representative of a Subscription.
void dispose_subscription_bit(DCPS_IR_Subscription *subscription)
Dispose Subscription in the Subscription Built-In Topic.
int add_publication(OpenDDS::DCPS::unique_ptr< DCPS_IR_Publication > pub)
Representation of a Domain in the system.
DCPS_IR_Domain * get_domain_reference() const
int add_topic_reference(DCPS_IR_Topic *topic)
DCPS_IR_Participant(const TAO_DDS_DCPSFederationId &federationId, OpenDDS::DCPS::GUID_t id, DCPS_IR_Domain *domain, DDS::DomainParticipantQos qos, Update::Manager *um, bool isBit)
int find_subscription_reference(OpenDDS::DCPS::GUID_t subId, DCPS_IR_Subscription *&sub)
Return the subscription object.
void dispose_publication_bit(DCPS_IR_Publication *publication)
Dispose Publication in the Publication Built-In Topic.
void last_subscription_key(long key)
OpenDDS::DCPS::RepoIdGenerator subscriptionIdGenerator_
void ignore_topic(OpenDDS::DCPS::GUID_t id)
Ignore the topic with the id.
int remove_subscription_reference(DCPS_IR_Subscription *subscription)
LM_ERROR
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
ENTITYKIND_BUILTIN_READER_WITH_KEY and ENTITYKIND_USER_READER_NO_KEY.
Definition: GuidUtils.h:73
void takeOwnership()
Take local ownership of this participant and publish an update.
const DCPS_IR_Subscription_Map & subscriptions() const
Expose a readable reference to the subscription map.
Representative of a Publication.
void remove_all_dependents(CORBA::Boolean notify_lost)
Removes all topics, publications and.
void release(bool removing)
void id(RepoKey fedId)
void last_publication_key(long key)