OpenDDS  Snapshot(2023/04/28-20:55)
FederatorManagerImpl_updates.cpp
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 #include "DcpsInfo_pch.h"
9 #include "FederatorManagerImpl.h"
10 #include "DCPSInfo_i.h"
11 #include "DCPS_IR_Domain.h"
12 #include "DCPS_IR_Participant.h"
13 
15 
17 
18 namespace OpenDDS {
19 namespace Federator {
20 
21 void
23 {
24  /* This method intentionally left unimplemented. */
25 }
26 
27 void
29 {
30  /* This method intentionally left unimplemented. */
31 }
32 
33 ////////////////////////////////////////////////////////////////////////
34 //
35 // The following methods publish updates to the remainder of the
36 // federation.
37 //
38 
39 void
41 {
42  if (CORBA::is_nil(this->topicWriter_.in())) {
43  // Decline to publish data until we can.
44  return;
45  }
46 
47  TopicUpdate sample = TopicUpdate();
48  sample.sender = this->id().id();
49  sample.action = CreateEntity;
50 
51  sample.id = topic.topicId;
52  sample.domain = topic.domainId;
53  sample.participant = topic.participantId;
54  sample.topic = topic.name.c_str();
55  sample.datatype = topic.dataType.c_str();
56  sample.qos = topic.topicQos;
57 
59  OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant);
60  OpenDDS::DCPS::RepoIdConverter topic_converter(sample.id);
62  ACE_TEXT("(%P|%t) Federator::ManagerImpl::create( TopicUpdate): ")
63  ACE_TEXT("repo %d - [ domain %d/ participant %C/ topic %C ]\n"),
64  this->id().id(),
65  sample.domain,
66  std::string(part_converter).c_str(),
67  std::string(topic_converter).c_str()));
68  }
69 
70  this->topicWriter_->write(sample, DDS::HANDLE_NIL);
71 }
72 
73 void
75 {
76  if (CORBA::is_nil(this->participantWriter_.in())) {
77  // Decline to publish data until we can.
78  return;
79  }
80 
82  sample.sender = this->id().id();
83  sample.action = CreateEntity;
84 
85  sample.owner = participant.owner;
86  sample.domain = participant.domainId;
87  sample.id = participant.participantId;
88  sample.qos = participant.participantQos;
89 
91  OpenDDS::DCPS::RepoIdConverter converter(sample.id);
93  ACE_TEXT("(%P|%t) Federator::ManagerImpl::create( ParticipantUpdate): ")
94  ACE_TEXT("repo %d - [ domain %d/ participant %C ]\n"),
95  this->id().id(),
96  sample.domain,
97  std::string(converter).c_str()));
98  }
99 
100  this->participantWriter_->write(sample, DDS::HANDLE_NIL);
101 }
102 
103 void
105 {
106  if (CORBA::is_nil(this->subscriptionWriter_.in())) {
107  // Decline to publish data until we can.
108  return;
109  }
110 
112  sample.sender = this->id().id();
113  sample.action = CreateEntity;
114 
115  sample.domain = reader.domainId;
116  sample.participant = reader.participantId;
117  sample.topic = reader.topicId;
118  sample.id = reader.actorId;
119  sample.callback = reader.callback.c_str();
120  sample.datareader_qos = reader.drdwQos;
121  sample.subscriber_qos = reader.pubsubQos;
122  sample.transport_info = reader.transportInterfaceInfo;
123  sample.filter_class_name = reader.contentSubscriptionProfile.filterClassName;
124  sample.filter_expression = reader.contentSubscriptionProfile.filterExpr;
125  sample.expression_params = reader.contentSubscriptionProfile.exprParams;
127 
129  OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant);
130  OpenDDS::DCPS::RepoIdConverter sub_converter(sample.id);
132  ACE_TEXT("(%P|%t) Federator::ManagerImpl::create( SubscriptionUpdate): ")
133  ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"),
134  this->id().id(),
135  sample.domain,
136  std::string(part_converter).c_str(),
137  std::string(sub_converter).c_str()));
138  }
139 
140  this->subscriptionWriter_->write(sample, DDS::HANDLE_NIL);
141 }
142 
143 void
145 {
146  if (CORBA::is_nil(this->publicationWriter_.in())) {
147  // Decline to publish data until we can.
148  return;
149  }
150 
152  sample.sender = this->id().id();
153  sample.action = CreateEntity;
154 
155  sample.domain = writer.domainId;
156  sample.participant = writer.participantId;
157  sample.topic = writer.topicId;
158  sample.id = writer.actorId;
159  sample.callback = writer.callback.c_str();
160  sample.datawriter_qos = writer.drdwQos;
161  sample.publisher_qos = writer.pubsubQos;
162  sample.transport_info = writer.transportInterfaceInfo;
164 
166  OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant);
167  OpenDDS::DCPS::RepoIdConverter pub_converter(sample.id);
169  ACE_TEXT("(%P|%t) Federator::ManagerImpl::create( PublicationUpdate): ")
170  ACE_TEXT("repo %d - [ domain %d/ participant %C/ publication %C ]\n"),
171  this->id().id(),
172  sample.domain,
173  std::string(part_converter).c_str(),
174  std::string(pub_converter).c_str()));
175  }
176 
177  this->publicationWriter_->write(sample, DDS::HANDLE_NIL);
178 }
179 
180 void
182 {
183  if (CORBA::is_nil(this->ownerWriter_.in())) {
184  // Decline to publish data until we can.
185  return;
186  }
187 
188  OwnerUpdate sample = OwnerUpdate();
189  sample.sender = this->id().id();
190  sample.action = CreateEntity;
191 
192  sample.domain = data.domain;
193  sample.participant = data.participant;
194  sample.owner = data.owner;
195 
199  ACE_TEXT("(%P|%t) Federator::ManagerImpl::create( OwnerUpdate): ")
200  ACE_TEXT("repo %d - [ domain %d/ participant %C/ sender %d/ owner %d ]\n"),
201  this->id().id(),
202  sample.domain,
203  std::string(converter).c_str(),
204  sample.sender,
205  sample.owner));
206  }
207 
208  this->ownerWriter_->write(sample, DDS::HANDLE_NIL);
209 }
210 
211 void
213  const Update::IdPath& id,
214  Update::ItemType type,
215  Update::ActorType actor)
216 {
217  //
218  // Do not propagate any destroy() messages within the FederationDomain.
219  // This domain will be managed separately.
220  //
221  if (id.domain == this->config_.federationDomain()) {
222  return;
223  }
224 
225  switch (type) {
226  case Update::Topic: {
227  if (CORBA::is_nil(this->topicWriter_.in())) {
228  // Decline to publish data until we can.
229  return;
230  }
231 
232  TopicUpdate sample = TopicUpdate();
233  sample.sender = this->id().id();
234  sample.action = DestroyEntity;
235 
236  sample.id = id.id;
237  sample.domain = id.domain;
238  sample.participant = id.participant;
239 
241  OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant);
242  OpenDDS::DCPS::RepoIdConverter topic_converter(sample.id);
244  ACE_TEXT("(%P|%t) Federator::ManagerImpl::destroy( TopicUpdate): ")
245  ACE_TEXT("repo %d - [ domain %d/ participant %C/ topic %C ]\n"),
246  this->id().id(),
247  sample.domain,
248  std::string(part_converter).c_str(),
249  std::string(topic_converter).c_str()));
250  }
251 
252  this->topicWriter_->write(sample, DDS::HANDLE_NIL);
253  }
254  break;
255 
256  case Update::Participant: {
257  if (CORBA::is_nil(this->participantWriter_.in())) {
258  // Decline to publish data until we can.
259  return;
260  }
261 
263  sample.sender = this->id().id();
264  sample.action = DestroyEntity;
265 
266  sample.domain = id.domain;
267  sample.id = id.id;
268 
270  OpenDDS::DCPS::RepoIdConverter converter(sample.id);
272  ACE_TEXT("(%P|%t) Federator::ManagerImpl::destroy( ParticipantUpdate): ")
273  ACE_TEXT("repo %d - [ domain %d/ participant %C ]\n"),
274  this->id().id(),
275  sample.domain,
276  std::string(converter).c_str()));
277  }
278 
279  this->participantWriter_->write(sample, DDS::HANDLE_NIL);
280  }
281  break;
282 
283  case Update::Actor:
284 
285  // This is VERY annoying.
286  switch (actor) {
287  case Update::DataWriter: {
288  if (CORBA::is_nil(this->publicationWriter_.in())) {
289  // Decline to publish data until we can.
290  return;
291  }
292 
294  sample.sender = this->id().id();
295  sample.action = DestroyEntity;
296 
297  sample.domain = id.domain;
298  sample.participant = id.participant;
299  sample.id = id.id;
300 
302  OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant);
303  OpenDDS::DCPS::RepoIdConverter pub_converter(sample.id);
305  ACE_TEXT("(%P|%t) Federator::ManagerImpl::destroy( PublicationUpdate): ")
306  ACE_TEXT("repo %d - [ domain %d/ participant %C/ publication %C ]\n"),
307  this->id().id(),
308  sample.domain,
309  std::string(part_converter).c_str(),
310  std::string(pub_converter).c_str()));
311  }
312 
313  this->publicationWriter_->write(sample, DDS::HANDLE_NIL);
314  }
315  break;
316 
317  case Update::DataReader: {
318  if (CORBA::is_nil(this->subscriptionWriter_.in())) {
319  // Decline to publish data until we can.
320  return;
321  }
322 
324  sample.sender = this->id().id();
325  sample.action = DestroyEntity;
326 
327  sample.domain = id.domain;
328  sample.participant = id.participant;
329  sample.id = id.id;
330 
332  OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant);
333  OpenDDS::DCPS::RepoIdConverter sub_converter(sample.id);
335  ACE_TEXT("(%P|%t) Federator::ManagerImpl::destroy( SubscriptionUpdate): ")
336  ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"),
337  this->id().id(),
338  sample.domain,
339  std::string(part_converter).c_str(),
340  std::string(sub_converter).c_str()));
341  }
342 
343  this->subscriptionWriter_->write(sample, DDS::HANDLE_NIL);
344  }
345  break;
346  }
347 
348  break;
349  }
350 }
351 
352 void
354 {
355  if (CORBA::is_nil(this->participantWriter_.in())) {
356  // Decline to publish data until we can.
357  return;
358  }
359 
361  sample.sender = this->id().id();
362  sample.action = UpdateQosValue1;
363 
364  sample.domain = id.domain;
365  sample.id = id.id;
366  sample.qos = qos;
367 
369  OpenDDS::DCPS::RepoIdConverter converter(sample.id);
371  ACE_TEXT("(%P|%t) Federator::ManagerImpl::update( ParticipantUpdate): ")
372  ACE_TEXT("repo %d - [ domain %d/ participant %C ]\n"),
373  this->id().id(),
374  sample.domain,
375  std::string(converter).c_str()));
376  }
377 
378  this->participantWriter_->write(sample, DDS::HANDLE_NIL);
379 }
380 
381 void
383 {
384  if (CORBA::is_nil(this->topicWriter_.in())) {
385  // Decline to publish data until we can.
386  return;
387  }
388 
389  TopicUpdate sample = TopicUpdate();
390  sample.sender = this->id().id();
391  sample.action = UpdateQosValue1;
392 
393  sample.id = id.id;
394  sample.domain = id.domain;
395  sample.participant = id.participant;
396  sample.qos = qos;
397 
399  OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant);
400  OpenDDS::DCPS::RepoIdConverter topic_converter(sample.id);
402  ACE_TEXT("(%P|%t) Federator::ManagerImpl::update( TopicUpdate): ")
403  ACE_TEXT("repo %d - [ domain %d/ participant %C/ topic %C ]\n"),
404  this->id().id(),
405  sample.domain,
406  std::string(part_converter).c_str(),
407  std::string(topic_converter).c_str()));
408  }
409 
410  this->topicWriter_->write(sample, DDS::HANDLE_NIL);
411 }
412 
413 void
415 {
416  if (CORBA::is_nil(this->publicationWriter_.in())) {
417  // Decline to publish data until we can.
418  return;
419  }
420 
422  sample.sender = this->id().id();
423  sample.action = UpdateQosValue1;
424 
425  sample.domain = id.domain;
426  sample.participant = id.participant;
427  sample.id = id.id;
428  sample.datawriter_qos = qos;
429 
431  OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant);
432  OpenDDS::DCPS::RepoIdConverter pub_converter(sample.id);
434  ACE_TEXT("(%P|%t) Federator::ManagerImpl::update( WriterUpdate): ")
435  ACE_TEXT("repo %d - [ domain %d/ participant %C/ publication %C ]\n"),
436  this->id().id(),
437  sample.domain,
438  std::string(part_converter).c_str(),
439  std::string(pub_converter).c_str()));
440  }
441 
442  this->publicationWriter_->write(sample, DDS::HANDLE_NIL);
443 }
444 
445 void
447 {
448  if (CORBA::is_nil(this->publicationWriter_.in())) {
449  // Decline to publish data until we can.
450  return;
451  }
452 
454  sample.sender = this->id().id();
455  sample.action = UpdateQosValue2;
456 
457  sample.domain = id.domain;
458  sample.participant = id.participant;
459  sample.id = id.id;
460  sample.publisher_qos = qos;
461 
463  OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant);
464  OpenDDS::DCPS::RepoIdConverter pub_converter(sample.id);
466  ACE_TEXT("(%P|%t) Federator::ManagerImpl::update( PublisherUpdate): ")
467  ACE_TEXT("repo %d - [ domain %d/ participant %C/ publication %C ]\n"),
468  this->id().id(),
469  sample.domain,
470  std::string(part_converter).c_str(),
471  std::string(pub_converter).c_str()));
472  }
473 
474  this->publicationWriter_->write(sample, DDS::HANDLE_NIL);
475 }
476 
477 void
479 {
480  if (CORBA::is_nil(this->subscriptionWriter_.in())) {
481  // Decline to publish data until we can.
482  return;
483  }
484 
486  sample.sender = this->id().id();
487  sample.action = UpdateQosValue1;
488 
489  sample.domain = id.domain;
490  sample.participant = id.participant;
491  sample.id = id.id;
492  sample.datareader_qos = qos;
493 
495  OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant);
496  OpenDDS::DCPS::RepoIdConverter sub_converter(sample.id);
498  ACE_TEXT("(%P|%t) Federator::ManagerImpl::update( ReaderUpdate): ")
499  ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"),
500  this->id().id(),
501  sample.domain,
502  std::string(part_converter).c_str(),
503  std::string(sub_converter).c_str()));
504  }
505 
506  this->subscriptionWriter_->write(sample, DDS::HANDLE_NIL);
507 }
508 
509 void
511 {
512  if (CORBA::is_nil(this->subscriptionWriter_.in())) {
513  // Decline to publish data until we can.
514  return;
515  }
516 
518  sample.sender = this->id().id();
520  sample.domain = id.domain;
521  sample.participant = id.participant;
522  sample.id = id.id;
523  sample.expression_params = params;
524 
526  OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant);
527  OpenDDS::DCPS::RepoIdConverter sub_converter(sample.id);
529  ACE_TEXT("(%P|%t) Federator::ManagerImpl::update(FilterParams): ")
530  ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"),
531  this->id().id(),
532  sample.domain,
533  std::string(part_converter).c_str(),
534  std::string(sub_converter).c_str()));
535  }
536 
537  this->subscriptionWriter_->write(sample, DDS::HANDLE_NIL);
538 }
539 
540 void
542 {
543  if (CORBA::is_nil(this->subscriptionWriter_.in())) {
544  // Decline to publish data until we can.
545  return;
546  }
547 
549  sample.sender = this->id().id();
550  sample.action = UpdateQosValue2;
551 
552  sample.domain = id.domain;
553  sample.participant = id.participant;
554  sample.id = id.id;
555  sample.subscriber_qos = qos;
556 
558  OpenDDS::DCPS::RepoIdConverter part_converter(sample.participant);
559  OpenDDS::DCPS::RepoIdConverter sub_converter(sample.id);
561  ACE_TEXT("(%P|%t) Federator::ManagerImpl::update( SubscriberUpdate): ")
562  ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"),
563  this->id().id(),
564  sample.domain,
565  std::string(part_converter).c_str(),
566  std::string(sub_converter).c_str()));
567  }
568 
569  this->subscriptionWriter_->write(sample, DDS::HANDLE_NIL);
570 }
571 
572 ////////////////////////////////////////////////////////////////////////
573 //
574 // The following methods process updates received from the remainder
575 // of the federation.
576 //
577 
578 void
579 ManagerImpl::processCreate(const OwnerUpdate* sample, const DDS::SampleInfo* /* info */)
580 {
582  OpenDDS::DCPS::RepoIdConverter converter(sample->participant);
584  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processCreate( OwnerUpdate): ")
585  ACE_TEXT("repo %d - [ domain %d/ participant %C/ sender %d/ owner %d ]\n"),
586  this->id().id(),
587  sample->domain,
588  std::string(converter).c_str(),
589  sample->sender,
590  sample->owner));
591  }
592 
593  // We could generate an error message here. Instead we let action be irrelevant.
594  if (false == this->info_->changeOwnership(sample->domain,
595  sample->participant,
596  sample->sender,
597  sample->owner)) {
598  {
600  guard,
601  this->deferred_lock_);
602  this->deferredOwnerships_.push_back(*sample);
603  }
604 
607  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processCreate( OwnerUpdate): ")
608  ACE_TEXT("deferred update.\n")));
609  }
610  }
611 
612  this->processDeferred();
613 }
614 
615 void
617 {
619  OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
620  OpenDDS::DCPS::RepoIdConverter pub_converter(sample->id);
622  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processCreate( PublicationUpdate): ")
623  ACE_TEXT("repo %d - [ domain %d/ participant %C/ publication %C ]\n"),
624  this->id().id(),
625  sample->domain,
626  std::string(part_converter).c_str(),
627  std::string(pub_converter).c_str()));
628  }
629 
630  if (false == this->info_->add_publication(sample->domain,
631  sample->participant,
632  sample->topic,
633  sample->id,
634  sample->callback,
635  sample->datawriter_qos,
636  sample->transport_info,
637  sample->transport_context,
638  sample->publisher_qos,
639  sample->serialized_type_info,
640  true)) {
641  {
643  guard,
644  this->deferred_lock_);
645  this->deferredPublications_.push_back(*sample);
646  }
647 
650  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processCreate( PublicationUpdate): ")
651  ACE_TEXT("deferred update.\n")));
652  }
653  }
654 
655  this->processDeferred();
656 }
657 
658 void
660 {
662  OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
663  OpenDDS::DCPS::RepoIdConverter sub_converter(sample->id);
665  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processCreate( SubscriptionUpdate): ")
666  ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"),
667  this->id().id(),
668  sample->domain,
669  std::string(part_converter).c_str(),
670  std::string(sub_converter).c_str()));
671  }
672 
673  if (false == this->info_->add_subscription(sample->domain,
674  sample->participant,
675  sample->topic,
676  sample->id,
677  sample->callback,
678  sample->datareader_qos,
679  sample->transport_info,
680  sample->transport_context,
681  sample->subscriber_qos,
682  sample->filter_class_name,
683  sample->filter_expression,
684  sample->expression_params,
685  sample->serialized_type_info,
686  true)) {
687  {
689  guard,
690  this->deferred_lock_);
691  this->deferredSubscriptions_.push_back(*sample);
692  }
693 
696  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processCreate( SubscriptionUpdate): ")
697  ACE_TEXT("deferred update.\n")));
698  }
699  }
700 
701  this->processDeferred();
702 }
703 
704 void
706 {
708  OpenDDS::DCPS::RepoIdConverter converter(sample->id);
710  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processCreate( ParticipantUpdate): ")
711  ACE_TEXT("repo %d - [ domain %d/ participant %C/ owner %d ]\n"),
712  this->id().id(),
713  sample->domain,
714  std::string(converter).c_str(),
715  sample->owner));
716  }
717 
719  sample->domain,
720  sample->id,
721  sample->qos);
722  bool ownershipChanged = this->info_->changeOwnership(
723  sample->domain,
724  sample->id,
725  sample->sender,
726  sample->owner);
727  if (!ownershipChanged) {
729  ACE_TEXT("(%P|%t) ERROR: ")
730  ACE_TEXT("OpenDDS::Federator::ManagerImpl::processCreate(), ")
731  ACE_TEXT("Could not change ownership\n")));
732  }
733  this->processDeferred();
734 }
735 
736 void
737 ManagerImpl::processCreate(const TopicUpdate* sample, const DDS::SampleInfo* /* info */)
738 {
740  OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
741  OpenDDS::DCPS::RepoIdConverter topic_converter(sample->id);
743  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processCreate( TopicUpdate): ")
744  ACE_TEXT("repo %d - [ domain %d/ participant %C/ topic %C ]\n"),
745  this->id().id(),
746  sample->domain,
747  std::string(part_converter).c_str(),
748  std::string(topic_converter).c_str()));
749  }
750 
751  if (false == this->info_->add_topic(sample->id,
752  sample->domain,
753  sample->participant,
754  sample->topic,
755  sample->datatype,
756  sample->qos)) {
757  {
759  guard,
760  this->deferred_lock_);
761  this->deferredTopics_.push_back(*sample);
762  }
763 
766  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processCreate( TopicUpdate): ")
767  ACE_TEXT("deferred update.\n")));
768  }
769  }
770 
771  this->processDeferred();
772 }
773 
774 void
776 {
778  guard,
779  this->deferred_lock_);
780 
781  {
782  std::list<OwnerUpdate>::iterator current = this->deferredOwnerships_.begin();
783 
784  while (current != this->deferredOwnerships_.end()) {
785  if (this->info_->changeOwnership(current->domain,
786  current->participant,
787  current->sender,
788  current->owner)) {
790  OpenDDS::DCPS::RepoIdConverter converter(current->participant);
792  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDeferred( OwnerUpdate): ")
793  ACE_TEXT("repo %d - [ domain %d/ participant %C/ sender %d/ owner %d ]\n"),
794  this->id().id(),
795  current->domain,
796  std::string(converter).c_str(),
797  current->sender,
798  current->owner));
799  }
800 
801  current = this->deferredOwnerships_.erase(current);
802 
803  } else {
804  ++ current;
805  }
806  }
807  }
808 
809  {
810  std::list<TopicUpdate>::iterator current = this->deferredTopics_.begin();
811 
812  while (current != this->deferredTopics_.end()) {
813  if (true == this->info_->add_topic(current->id,
814  current->domain,
815  current->participant,
816  current->topic,
817  current->datatype,
818  current->qos)) {
820  OpenDDS::DCPS::RepoIdConverter part_converter(current->participant);
821  OpenDDS::DCPS::RepoIdConverter topic_converter(current->id);
823  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDeferred( TopicUpdate): ")
824  ACE_TEXT("repo %d - [ domain %d/ participant %C/ topic %C ]\n"),
825  this->id().id(),
826  current->domain,
827  std::string(part_converter).c_str(),
828  std::string(topic_converter).c_str()));
829  }
830 
831  current = this->deferredTopics_.erase(current);
832 
833  } else {
834  ++ current;
835  }
836  }
837  }
838 
839  {
840  std::list<PublicationUpdate>::iterator current = this->deferredPublications_.begin();
841 
842  while (current != this->deferredPublications_.end()) {
843 
844  if (true == this->info_->add_publication(current->domain,
845  current->participant,
846  current->topic,
847  current->id,
848  current->callback,
849  current->datawriter_qos,
850  current->transport_info,
851  current->transport_context,
852  current->publisher_qos,
853  current->serialized_type_info,
854  true)) {
856  OpenDDS::DCPS::RepoIdConverter part_converter(current->participant);
857  OpenDDS::DCPS::RepoIdConverter pub_converter(current->id);
859  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDeferred( PublicationUpdate): ")
860  ACE_TEXT("repo %d - [ domain %d/ participant %C/ publication %C ]\n"),
861  this->id().id(),
862  current->domain,
863  std::string(part_converter).c_str(),
864  std::string(pub_converter).c_str()));
865  }
866 
867  current = this->deferredPublications_.erase(current);
868 
869  } else {
870  ++ current;
871  }
872  }
873  }
874 
875  {
876  std::list<SubscriptionUpdate>::iterator current = this->deferredSubscriptions_.begin();
877 
878  while (current != this->deferredSubscriptions_.end()) {
879 
880  if (true == this->info_->add_subscription(current->domain,
881  current->participant,
882  current->topic,
883  current->id,
884  current->callback,
885  current->datareader_qos,
886  current->transport_info,
887  current->transport_context,
888  current->subscriber_qos,
889  current->filter_class_name,
890  current->filter_expression,
891  current->expression_params,
892  current->serialized_type_info,
893  true)) {
895  OpenDDS::DCPS::RepoIdConverter part_converter(current->participant);
896  OpenDDS::DCPS::RepoIdConverter sub_converter(current->id);
898  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDeferred( SubscriptionUpdate): ")
899  ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"),
900  this->id().id(),
901  current->domain,
902  std::string(part_converter).c_str(),
903  std::string(sub_converter).c_str()));
904  }
905 
906  current = this->deferredSubscriptions_.erase(current);
907 
908  } else {
909  ++ current;
910  }
911  }
912  }
913 
914 }
915 
916 void
918 {
920  OpenDDS::DCPS::RepoIdConverter converter(sample->participant);
922  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processUpdateQos1( OwnerUpdate): ")
923  ACE_TEXT("repo %d - [ domain %d/ participant %C/ sender %d/ owner %d ]\n"),
924  this->id().id(),
925  sample->domain,
926  std::string(converter).c_str(),
927  sample->sender,
928  sample->owner));
929  }
930 
931  if (false == this->info_->changeOwnership(sample->domain,
932  sample->participant,
933  sample->sender,
934  sample->owner)) {
935  {
937  guard,
938  this->deferred_lock_);
939 
940  this->deferredOwnerships_.push_back(*sample);
941  }
943  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processUpdateQos1( OwnerUpdate): ")
944  ACE_TEXT("deferred update.\n")));
945  }
946 }
947 
948 void
950 {
952  OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
953  OpenDDS::DCPS::RepoIdConverter pub_converter(sample->id);
955  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processUpdateQos1( PublicationUpdate): ")
956  ACE_TEXT("repo %d - [ domain %d/ participant %C/ publication %C ]\n"),
957  this->id().id(),
958  sample->domain,
959  std::string(part_converter).c_str(),
960  std::string(pub_converter).c_str()));
961  }
962 
964  sample->domain,
965  sample->participant,
966  sample->id,
967  sample->datawriter_qos);
968 }
969 
970 void
972 {
974  OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
975  OpenDDS::DCPS::RepoIdConverter pub_converter(sample->id);
977  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processUpdateQos2( PublicationUpdate): ")
978  ACE_TEXT("repo %d - [ domain %d/ participant %C/ publication %C ]\n"),
979  this->id().id(),
980  sample->domain,
981  std::string(part_converter).c_str(),
982  std::string(pub_converter).c_str()));
983  }
984 
986  sample->domain,
987  sample->participant,
988  sample->id,
989  sample->publisher_qos);
990 }
991 
992 void
994 {
996  OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
997  OpenDDS::DCPS::RepoIdConverter sub_converter(sample->id);
999  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processUpdateQos1( SubscriptionUpdate): ")
1000  ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"),
1001  this->id().id(),
1002  sample->domain,
1003  std::string(part_converter).c_str(),
1004  std::string(sub_converter).c_str()));
1005  }
1006 
1008  sample->domain,
1009  sample->participant,
1010  sample->id,
1011  sample->datareader_qos);
1012 }
1013 
1014 void
1016 {
1018  OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
1019  OpenDDS::DCPS::RepoIdConverter sub_converter(sample->id);
1021  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processUpdateQos2( SubscriptionUpdate): ")
1022  ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"),
1023  this->id().id(),
1024  sample->domain,
1025  std::string(part_converter).c_str(),
1026  std::string(sub_converter).c_str()));
1027  }
1028 
1030  sample->domain,
1031  sample->participant,
1032  sample->id,
1033  sample->subscriber_qos);
1034 }
1035 
1036 void
1038  const SubscriptionUpdate* sample, const DDS::SampleInfo* /* info */)
1039 {
1041  OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
1042  OpenDDS::DCPS::RepoIdConverter sub_converter(sample->id);
1044  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processUpdateFilterExpressionParams(SubscriptionUpdate): ")
1045  ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"),
1046  this->id().id(),
1047  sample->domain,
1048  std::string(part_converter).c_str(),
1049  std::string(sub_converter).c_str()));
1050  }
1051 
1053  sample->domain,
1054  sample->participant,
1055  sample->id,
1056  sample->expression_params);
1057 }
1058 
1059 void
1061 {
1063  OpenDDS::DCPS::RepoIdConverter converter(sample->id);
1065  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processUpdateQos1( ParticipantUpdate): ")
1066  ACE_TEXT("repo %d - [ domain %d/ participant %C ]\n"),
1067  this->id().id(),
1068  sample->domain,
1069  std::string(converter).c_str()));
1070  }
1071 
1073  sample->domain,
1074  sample->id,
1075  sample->qos);
1076 }
1077 
1078 void
1080 {
1082  OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
1083  OpenDDS::DCPS::RepoIdConverter topic_converter(sample->id);
1085  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processUpdateQos1( TopicUpdate): ")
1086  ACE_TEXT("repo %d - [ domain %d/ participant %C/ topic %C ]\n"),
1087  this->id().id(),
1088  sample->domain,
1089  std::string(part_converter).c_str(),
1090  std::string(topic_converter).c_str()));
1091  }
1092 
1093  this->info_->update_topic_qos(
1094  sample->id,
1095  sample->domain,
1096  sample->participant,
1097  sample->qos);
1098 }
1099 
1100 void
1101 ManagerImpl::processDelete(const OwnerUpdate* sample, const DDS::SampleInfo* /* info */)
1102 {
1104  OpenDDS::DCPS::RepoIdConverter converter(sample->participant);
1106  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( OwnerUpdate): ")
1107  ACE_TEXT("repo %d - [ domain %d/ participant %C/ sender %d/ owner %d ]\n"),
1108  this->id().id(),
1109  sample->domain,
1110  std::string(converter).c_str(),
1111  sample->sender,
1112  sample->owner));
1113  }
1114 
1115  // We could generate an error message here. Instead we let action be irrelevant.
1116  if (false == this->info_->changeOwnership(sample->domain,
1117  sample->participant,
1118  sample->sender,
1119  sample->owner)) {
1120  {
1122  guard,
1123  this->deferred_lock_);
1124  this->deferredOwnerships_.push_back(*sample);
1125  }
1127  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( OwnerUpdate): ")
1128  ACE_TEXT("deferred update.\n")));
1129  }
1130 }
1131 
1132 void
1134 {
1136  OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
1137  OpenDDS::DCPS::RepoIdConverter pub_converter(sample->id);
1139  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( PublicationUpdate): ")
1140  ACE_TEXT("repo %d - [ domain %d/ participant %C/ publication %C ]\n"),
1141  this->id().id(),
1142  sample->domain,
1143  std::string(part_converter).c_str(),
1144  std::string(pub_converter).c_str()));
1145  }
1146 
1147  try {
1148  this->info_->remove_publication(
1149  sample->domain,
1150  sample->participant,
1151  sample->id);
1152 
1156  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( PublicationUpdate): ")
1157  ACE_TEXT("the participant was already removed.\n")));
1158  }
1159  }
1160 }
1161 
1162 void
1164 {
1166  OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
1167  OpenDDS::DCPS::RepoIdConverter sub_converter(sample->id);
1169  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( SubscriptionUpdate): ")
1170  ACE_TEXT("repo %d - [ domain %d/ participant %C/ subscription %C ]\n"),
1171  this->id().id(),
1172  sample->domain,
1173  std::string(part_converter).c_str(),
1174  std::string(sub_converter).c_str()));
1175  }
1176 
1177  try {
1178  this->info_->remove_subscription(
1179  sample->domain,
1180  sample->participant,
1181  sample->id);
1182 
1186  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( SubscriptionUpdate): ")
1187  ACE_TEXT("the participant was already removed.\n")));
1188  }
1189  }
1190 }
1191 
1192 void
1194 {
1196  OpenDDS::DCPS::RepoIdConverter converter(sample->id);
1198  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( ParticipantUpdate): ")
1199  ACE_TEXT("repo %d - [ domain %d/ participant %C ]\n"),
1200  this->id().id(),
1201  sample->domain,
1202  std::string(converter).c_str()));
1203  }
1204  try {
1206  sample->domain,
1207  sample->id);
1211  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( ParticipantUpdate): ")
1212  ACE_TEXT("the participant was already removed.\n")));
1213  }
1214  }
1215 }
1216 
1217 void
1218 ManagerImpl::processDelete(const TopicUpdate* sample, const DDS::SampleInfo* /* info */)
1219 {
1221  OpenDDS::DCPS::RepoIdConverter part_converter(sample->participant);
1222  OpenDDS::DCPS::RepoIdConverter topic_converter(sample->id);
1224  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( TopicUpdate): ")
1225  ACE_TEXT("repo %d - [ domain %d/ participant %C/ topic %C ]\n"),
1226  this->id().id(),
1227  sample->domain,
1228  std::string(part_converter).c_str(),
1229  std::string(topic_converter).c_str()));
1230  }
1231 
1232  try {
1233  this->info_->remove_topic(
1234  sample->domain,
1235  sample->participant,
1236  sample->id);
1237 
1241  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( TopicUpdate): ")
1242  ACE_TEXT("the participant was already removed.\n")));
1243  }
1244  } catch (OpenDDS::DCPS::Invalid_Domain&) {
1247  ACE_TEXT("(%P|%t) Federator::ManagerImpl::processDelete( TopicUpdate): ")
1248  ACE_TEXT("the domain %d no longer exists.\n"),sample->domain));
1249  }
1250  }
1251 }
1252 
1253 void
1254 ManagerImpl::pushState(Manager_ptr peer)
1255 {
1256  // foreach DCPS_IR_Domain
1257  // foreach DCPS_IR_Participant
1258  // peer->initializeParticipant(...)
1259  // peer->initializeOwner(...)
1260  // foreach DCPS_IR_Participant
1261  // foreach DCPS_IR_Topic
1262  // peer->initializeTopic(...)
1263  // foreach DCPS_IR_Publication
1264  // peer->initializePublication(...)
1265  // foreach DCPS_IR_Subscription
1266  // peer->initializeSubscription(...)
1267 
1268  // Process each domain within the repository.
1269  for (DCPS_IR_Domain_Map::const_iterator currentDomain
1270  = this->info_->domains().begin();
1271  currentDomain != this->info_->domains().end();
1272  ++currentDomain) {
1273 
1274  if (currentDomain->second->get_id() == this->config_.federationDomain()) {
1275  // Do not push the Federation domain publications.
1276  //continue;
1277  }
1278 
1279  // Process each participant within the current domain.
1280  for (DCPS_IR_Participant_Map::const_iterator currentParticipant
1281  = currentDomain->second->participants().begin();
1282  currentParticipant != currentDomain->second->participants().end();
1283  ++currentParticipant) {
1284 
1285  if (currentParticipant->second->isBitPublisher() == true) {
1286  // Do not push the built-in topic publications.
1287  continue;
1288  }
1289 
1290  // Initialize the participant on the peer.
1291  ParticipantUpdate participantSample;
1292  participantSample.sender = this->id().id();
1293  participantSample.action = CreateEntity;
1294 
1295  participantSample.owner = currentParticipant->second->owner();
1296  participantSample.domain = currentDomain->second->get_id();
1297  participantSample.id = currentParticipant->second->get_id();
1298  participantSample.qos = *currentParticipant->second->get_qos();
1299 
1300  peer->initializeParticipant(participantSample);
1301 
1302  // Initialize the ownership of the participant on the peer.
1303  OwnerUpdate ownerSample;
1304  ownerSample.sender = this->id().id();
1305  ownerSample.action = CreateEntity;
1306 
1307  ownerSample.domain = currentDomain->second->get_id();
1308  ownerSample.participant = currentParticipant->second->get_id();
1309  ownerSample.owner = currentParticipant->second->owner();
1310 
1311  peer->initializeOwner(ownerSample);
1312  }
1313 
1314  // Process each participant within the current domain.
1315  for (DCPS_IR_Participant_Map::const_iterator currentParticipant
1316  = currentDomain->second->participants().begin();
1317  currentParticipant != currentDomain->second->participants().end();
1318  ++currentParticipant) {
1319 
1320  if (currentParticipant->second->isBitPublisher() == true) {
1321  // Do not push the built-in topic publications.
1322  continue;
1323  }
1324 
1325  // Process each topic within the current particpant.
1326  for (DCPS_IR_Topic_Map::const_iterator currentTopic
1327  = currentParticipant->second->topics().begin();
1328  currentTopic != currentParticipant->second->topics().end();
1329  ++currentTopic) {
1330  TopicUpdate topicSample;
1331  topicSample.sender = this->id().id();
1332  topicSample.action = CreateEntity;
1333 
1334  topicSample.id = currentTopic->second->get_id();
1335  topicSample.domain = currentDomain->second->get_id();
1336  topicSample.participant = currentTopic->second->get_participant_id();
1337  topicSample.topic = currentTopic->second->get_topic_description()->get_name();
1338  topicSample.datatype = currentTopic->second->get_topic_description()->get_dataTypeName();
1339  topicSample.qos = *currentTopic->second->get_topic_qos();
1340 
1341  peer->initializeTopic(topicSample);
1342  }
1343 
1344  // Process each publication within the current particpant.
1345  for (DCPS_IR_Publication_Map::const_iterator currentPublication
1346  = currentParticipant->second->publications().begin();
1347  currentPublication != currentParticipant->second->publications().end();
1348  ++currentPublication) {
1349  PublicationUpdate publicationSample;
1350  publicationSample.sender = this->id().id();
1351  publicationSample.action = CreateEntity;
1352 
1353  DCPS_IR_Publication* p = currentPublication->second.get();
1354  CORBA::ORB_var orb = this->info_->orb();
1355  CORBA::String_var callback = orb->object_to_string(p->writer());
1356 
1357  publicationSample.domain = currentDomain->second->get_id();
1358  publicationSample.participant = p->get_participant_id();
1359  publicationSample.topic = p->get_topic_id();
1360  publicationSample.id = p->get_id();
1361  publicationSample.callback = callback.in();
1362  publicationSample.datawriter_qos = *p->get_datawriter_qos();
1363  publicationSample.publisher_qos = *p->get_publisher_qos();
1364  publicationSample.transport_info = p->get_transportLocatorSeq();
1365 
1366  peer->initializePublication(publicationSample);
1367  }
1368 
1369  // Process each subscription within the current particpant.
1370  for (DCPS_IR_Subscription_Map::const_iterator currentSubscription
1371  = currentParticipant->second->subscriptions().begin();
1372  currentSubscription != currentParticipant->second->subscriptions().end();
1373  ++currentSubscription) {
1374  SubscriptionUpdate subscriptionSample;
1375  subscriptionSample.sender = this->id().id();
1376  subscriptionSample.action = CreateEntity;
1377 
1378  DCPS_IR_Subscription* s = currentSubscription->second.get();
1379  CORBA::ORB_var orb = this->info_->orb();
1380  CORBA::String_var callback = orb->object_to_string(s->reader());
1381 
1382  subscriptionSample.domain = currentDomain->second->get_id();
1383  subscriptionSample.participant = s->get_participant_id();
1384  subscriptionSample.topic = s->get_topic_id();
1385  subscriptionSample.id = s->get_id();
1386  subscriptionSample.callback = callback.in();
1387  subscriptionSample.datareader_qos = *s->get_datareader_qos();
1388  subscriptionSample.subscriber_qos = *s->get_subscriber_qos();
1389  subscriptionSample.transport_info = s->get_transportLocatorSeq();
1390  subscriptionSample.filter_expression = s->get_filter_expression().c_str();
1391  subscriptionSample.expression_params = s->get_expr_params();
1392 
1393  peer->initializeSubscription(subscriptionSample);
1394  }
1395  }
1396  }
1397 }
1398 
1399 } // namespace Federator
1400 } // namespace OpenDDS
1401 
virtual void remove_subscription(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId, const OpenDDS::DCPS::GUID_t &subscriptionId)
Definition: DCPSInfo_i.cpp:962
#define ACE_DEBUG(X)
virtual void destroy(const Update::IdPath &id, Update::ItemType type, Update::ActorType actor)
Propagate that an entity has been destroyed.
std::list< TopicUpdate > deferredTopics_
Deferred topic updates.
#define ACE_ERROR(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
OpenDDS::DCPS::GUID_t get_participant_id()
virtual ::CORBA::Boolean update_subscription_params(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId, const OpenDDS::DCPS::GUID_t &subscriptionId, const DDS::StringSeq &params)
const InstanceHandle_t HANDLE_NIL
void federationDomain(long domain)
Federation Id value.
TAO_DDS_DCPSInfo_i * info_
The Info object reference to update.
OpenDDS::DCPS::GUID_t get_id()
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
const TAO_DDS_DCPSFederationId & id() const
Accessors for the federation Id value.
const DCPS_IR_Domain_Map & domains() const
Expose a readable reference of the domain map.
ParticipantUpdateDataWriter_var participantWriter_
ParticipantUpdate writer.
CORBA::ORB_ptr orb()
Expose the ORB.
Definition: DCPSInfo_i.cpp:119
Config & config_
The configuration information for this manager.
SubscriptionUpdateDataWriter_var subscriptionWriter_
SubscriptionUpdate writer.
void processUpdateFilterExpressionParams(const SubscriptionUpdate *sample, const DDS::SampleInfo *info)
Update the proxy filter expression params for a subscription.
OpenDDS::DCPS::GUID_t get_topic_id()
void processDelete(const OwnerUpdate *sample, const DDS::SampleInfo *info)
Null implementation for OwnerUpdate samples.
OwnerUpdateDataWriter_var ownerWriter_
TopicUpdate writer.
DDS::DomainParticipantQos qos
Definition: Federator.idl:91
virtual void create(const Update::UTopic &topic)
CORBA::ORB_ptr orb()
Accessors for the ORB.
virtual OpenDDS::DCPS::GUID_t add_subscription(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId, const OpenDDS::DCPS::GUID_t &topicId, OpenDDS::DCPS::DataReaderRemote_ptr subscription, const DDS::DataReaderQos &qos, const OpenDDS::DCPS::TransportLocatorSeq &transInfo, const DDS::SubscriberQos &subscriberQos, const char *filterClassName, const char *filterExpression, const DDS::StringSeq &exprParams, const DDS::OctetSeq &serializedTypeInfo)
Definition: DCPSInfo_i.cpp:681
virtual OpenDDS::DCPS::TopicStatus remove_topic(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId, const OpenDDS::DCPS::GUID_t &topicId)
Definition: DCPSInfo_i.cpp:325
OpenDDS::DCPS::TransportLocatorSeq get_transportLocatorSeq() const
std::string get_filter_expression() const
LM_DEBUG
virtual CORBA::Boolean update_domain_participant_qos(DDS::DomainId_t domain, const OpenDDS::DCPS::GUID_t &participantId, const DDS::DomainParticipantQos &qos)
bool add_topic(const OpenDDS::DCPS::GUID_t &topicId, DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId, const char *topicName, const char *dataTypeName, const DDS::TopicQos &qos)
Add a previously existing topic to the repository.
Definition: DCPSInfo_i.cpp:232
virtual void remove_publication(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId, const OpenDDS::DCPS::GUID_t &publicationId)
Definition: DCPSInfo_i.cpp:630
DomainIdType domainId
DDS::PublisherQos * get_publisher_qos()
OpenDDS::DCPS::GUID_t get_id()
virtual OpenDDS::DCPS::GUID_t add_publication(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId, const OpenDDS::DCPS::GUID_t &topicId, OpenDDS::DCPS::DataWriterRemote_ptr publication, const DDS::DataWriterQos &qos, const OpenDDS::DCPS::TransportLocatorSeq &transInfo, const DDS::PublisherQos &publisherQos, const DDS::OctetSeq &serializedTypeInfo)
Definition: DCPSInfo_i.cpp:374
void processCreate(const OwnerUpdate *sample, const DDS::SampleInfo *info)
Null implementation for OwnerUpdate samples.
virtual OpenDDS::DCPS::AddDomainStatus add_domain_participant(DDS::DomainId_t domain, const DDS::DomainParticipantQos &qos)
OpenDDS::DCPS::DataReaderRemote_ptr reader()
void processDeferred()
Handle any deferred updates that might have become processable.
ACE_TEXT("TCP_Factory")
OpenDDS::DCPS::DataWriterRemote_ptr writer()
OpenDDS::DCPS::TransportLocatorSeq transport_info
Definition: Federator.idl:141
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
void processUpdateQos2(const PublicationUpdate *sample, const DDS::SampleInfo *info)
Update the proxy PublisherQos for a publication.
std::list< PublicationUpdate > deferredPublications_
Deferred publication updates.
OpenDDS::DCPS::GUID_t get_participant_id()
virtual CORBA::Boolean update_subscription_qos(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &partId, const OpenDDS::DCPS::GUID_t &drId, const DDS::DataReaderQos &qos, const DDS::SubscriberQos &subscriberQos)
const DDS::SubscriberQos * get_subscriber_qos()
virtual void update(const Update::IdPath &id, const DDS::DomainParticipantQos &qos)
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
Representative of a Subscription.
ACE_Thread_Mutex deferred_lock_
Protect deferred updates.
std::list< OwnerUpdate > deferredOwnerships_
Deferred ownership updates.
OpenDDS::DCPS::GUID_t get_topic_id()
bool changeOwnership(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId, long sender, long owner)
assert new ownership for a participant and its contained entities.
Definition: DCPSInfo_i.cpp:152
PublicationUpdateDataWriter_var publicationWriter_
PublicationUpdate writer.
OpenDDS::DCPS::TransportLocatorSeq get_transportLocatorSeq() const
const character_type * in(void) const
DomainIdType domainId
TopicUpdateDataWriter_var topicWriter_
TopicUpdate writer.
LM_ERROR
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
virtual void remove_domain_participant(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId)
void pushState(Manager_ptr peer)
Push our current state to a remote repository.
std::list< SubscriptionUpdate > deferredSubscriptions_
Deferred subscription updates.
Boolean is_nil(T x)
virtual CORBA::Boolean update_topic_qos(const OpenDDS::DCPS::GUID_t &topicId, DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId, const DDS::TopicQos &qos)
void processUpdateQos1(const OwnerUpdate *sample, const DDS::SampleInfo *info)
Process ownership changes.
Representative of a Publication.
DDS::StringSeq get_expr_params() const
DDS::DataWriterQos * get_datawriter_qos()
sequence< string > StringSeq
Definition: DdsDcpsCore.idl:50
OpenDDS::DCPS::TransportLocatorSeq transport_info
Definition: Federator.idl:115
virtual CORBA::Boolean update_publication_qos(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &partId, const OpenDDS::DCPS::GUID_t &dwId, const DDS::DataWriterQos &qos, const DDS::PublisherQos &publisherQos)
const DDS::DataReaderQos * get_datareader_qos()
void id(RepoKey fedId)