OpenDDS  Snapshot(2023/04/28-20:55)
DCPS_IR_Subscription.cpp
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 #include "DcpsInfo_pch.h"
9 
10 #include /**/ "DCPS_IR_Subscription.h"
11 
12 #include /**/ "DCPS_IR_Publication.h"
13 #include /**/ "DCPS_IR_Participant.h"
14 #include /**/ "DCPS_IR_Topic_Description.h"
15 #include /**/ "DCPS_IR_Domain.h"
16 #include /**/ "dds/DCPS/DCPS_Utils.h"
17 #include /**/ "dds/DCPS/RepoIdConverter.h"
18 #include /**/ "dds/DCPS/Qos_Helper.h"
19 #include /**/ "tao/debug.h"
20 
21 #include /**/ "ace/OS_NS_unistd.h"
22 
24 
26  DCPS_IR_Participant* participant,
27  DCPS_IR_Topic* topic,
28  OpenDDS::DCPS::DataReaderRemote_ptr reader,
29  const DDS::DataReaderQos& qos,
31  ACE_CDR::ULong transportContext,
32  const DDS::SubscriberQos& subscriberQos,
33  const char* filterClassName,
34  const char* filterExpression,
35  const DDS::StringSeq& exprParams,
36  const DDS::OctetSeq& serializedTypeInfo)
37  : id_(id),
38  participant_(participant),
39  topic_(topic),
40  handle_(0),
41  isBIT_(0),
42  qos_(qos),
43  info_(info),
44  transportContext_(transportContext),
45  subscriberQos_(subscriberQos),
46  filterClassName_(filterClassName),
47  filterExpression_(filterExpression),
48  exprParams_(exprParams),
49  serializedTypeInfo_(serializedTypeInfo)
50 {
51  reader_ = OpenDDS::DCPS::DataReaderRemote::_duplicate(reader);
52 
55 }
56 
58 {
59 }
60 
62  bool active)
63 {
64  // keep track of the association locally
65  int status = associations_.insert(pub);
66 
67  switch (status) {
68  case 0: {
69  // inform the datareader about the association
71  association.writerTransInfo = pub->get_transportLocatorSeq();
72  association.transportContext = pub->get_transportContext();
73  association.writerId = pub->get_id();
74  association.pubQos = *(pub->get_publisher_qos());
75  association.writerQos = *(pub->get_datawriter_qos());
76  association.serializedTypeInfo = pub->get_serialized_type_info();
77  if (participant_->is_alive() && this->participant_->isOwner()) {
78  try {
80  OpenDDS::DCPS::RepoIdConverter sub_converter(id_);
81  OpenDDS::DCPS::RepoIdConverter pub_converter(pub->get_id());
83  ACE_TEXT("(%P|%t) DCPS_IR_Subscription::add_associated_publication:")
84  ACE_TEXT(" subscription %C adding publication %C.\n"),
85  std::string(sub_converter).c_str(),
86  std::string(pub_converter).c_str()));
87  }
88 
89  reader_->add_association(id_, association, active);
90 
93  ACE_TEXT("(%P|%t) DCPS_IR_Subscription::add_associated_publication: ")
94  ACE_TEXT("successfully added publication %x\n"),
95  pub));
96  }
97  } catch (const CORBA::Exception& ex) {
99  "(%P|%t) ERROR: Exception caught in DCPS_IR_Subscription::add_associated_publication:");
101  status = -1;
102  }
103  }
104 
105  }
106  break;
107 
108  case 1: {
109  OpenDDS::DCPS::RepoIdConverter sub_converter(id_);
110  OpenDDS::DCPS::RepoIdConverter pub_converter(pub->get_id());
112  ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Subscription::add_associated_publication: ")
113  ACE_TEXT("subscription %C attempted to re-add publication %C\n"),
114  std::string(sub_converter).c_str(),
115  std::string(pub_converter).c_str()));
116  }
117  break;
118 
119  case -1: {
120  OpenDDS::DCPS::RepoIdConverter sub_converter(id_);
121  OpenDDS::DCPS::RepoIdConverter pub_converter(pub->get_id());
123  ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Subscription::add_associated_publication: ")
124  ACE_TEXT("subscription %C failed to add publication %C\n"),
125  std::string(sub_converter).c_str(),
126  std::string(pub_converter).c_str()));
127  }
128  }
129 
130  return status;
131 }
132 
134  CORBA::Boolean sendNotify,
135  CORBA::Boolean notify_lost,
136  bool notify_both_side)
137 {
138  bool marked_dead = false;
139 
140  if (sendNotify) {
141 
142  if (participant_->is_alive() && this->participant_->isOwner()) {
143  try {
145  OpenDDS::DCPS::RepoIdConverter sub_converter(id_);
146  OpenDDS::DCPS::RepoIdConverter pub_converter(pub->get_id());
148  ACE_TEXT("(%P|%t) DCPS_IR_Subscription::remove_associated_publication:")
149  ACE_TEXT(" calling sub %C with pub %C\n"),
150  std::string(sub_converter).c_str(),
151  std::string(pub_converter).c_str()
152  ));
153  }
154 
156  idSeq.length(1);
157  idSeq[0] = pub->get_id();
158 
159  reader_->remove_associations(idSeq, notify_lost);
160 
161  if (notify_both_side) {
162  pub->remove_associated_subscription(this, sendNotify, notify_lost);
163  }
164 
165  } catch (const CORBA::Exception& ex) {
168  "(%P|%t) ERROR: Exception caught in DCPS_IR_Subscription::remove_associated_publication:");
169  }
170 
172  marked_dead = true;
173  }
174  }
175  }
176 
177  int status = associations_.remove(pub);
178 
179  if (0 == status) {
181  OpenDDS::DCPS::RepoIdConverter sub_converter(id_);
182  OpenDDS::DCPS::RepoIdConverter pub_converter(pub->get_id());
184  ACE_TEXT("(%P|%t) DCPS_IR_Subscription::remove_associated_publication: ")
185  ACE_TEXT("subscription %C removed publication %C at %x.\n"),
186  std::string(sub_converter).c_str(),
187  std::string(pub_converter).c_str(),
188  pub));
189  }
190 
191  } else {
192  OpenDDS::DCPS::RepoIdConverter sub_converter(id_);
193  OpenDDS::DCPS::RepoIdConverter pub_converter(pub->get_id());
195  ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Subscription::remove_associated_publication: ")
196  ACE_TEXT("subscription %C failed to remove publication %C at %x.\n"),
197  std::string(sub_converter).c_str(),
198  std::string(pub_converter).c_str(),
199  pub));
200  } // if (0 == status)
201 
202  if (marked_dead) {
203  return -1;
204 
205  } else {
206  return status;
207  }
208 }
209 
211 {
212  int status = 0;
213  DCPS_IR_Publication* pub = 0;
214  size_t numAssociations = associations_.size();
215  CORBA::Boolean dontSend = 0;
216  CORBA::Boolean send = 1;
217 
218  if (0 < numAssociations) {
221 
222  while (iter != end) {
223  pub = *iter;
224  ++iter;
225 
226  pub->remove_associated_subscription(this, send, notify_lost);
227  CORBA::Boolean dont_notify_lost = 0;
228  remove_associated_publication(pub, dontSend, dont_notify_lost);
229  }
230  }
231  this->defunct_.reset();
232 
233  return status;
234 }
235 
237  bool reassociate)
238 {
239  DCPS_IR_Publication* pub = 0;
240  size_t numAssociations = associations_.size();
241  CORBA::Boolean dontSend = 0;
242  CORBA::Boolean send = 1;
243  long count = 0;
244 
245  if (0 < numAssociations) {
246  OpenDDS::DCPS::WriterIdSeq idSeq(static_cast<CORBA::ULong>(numAssociations));
247  idSeq.length(static_cast<CORBA::ULong>(numAssociations));
248 
251 
252  while (iter != end) {
253  pub = *iter;
254  ++iter;
255 
257  OpenDDS::DCPS::RepoIdConverter sub_converter(id_);
258  OpenDDS::DCPS::RepoIdConverter pub_converter(pub->get_id());
259  OpenDDS::DCPS::RepoIdConverter sub_part_converter(id);
260  OpenDDS::DCPS::RepoIdConverter pub_part_converter(pub->get_participant_id());
262  ACE_TEXT("(%P|%t) DCPS_IR_Subscription::disassociate_participant: ")
263  ACE_TEXT("subscription %C testing if publication %C particpant %C == %C.\n"),
264  std::string(sub_converter).c_str(),
265  std::string(pub_converter).c_str(),
266  std::string(sub_part_converter).c_str(),
267  std::string(pub_part_converter).c_str()));
268  }
269 
270  if (id == pub->get_participant_id()) {
271  CORBA::Boolean dont_notify_lost = 0;
272  pub->remove_associated_subscription(this, send, dont_notify_lost);
273  remove_associated_publication(pub, dontSend, dont_notify_lost);
274 
275  idSeq[count] = pub->get_id();
276  ++count;
277 
278  if (reassociate && this->defunct_.insert(pub) != 0) {
279  OpenDDS::DCPS::RepoIdConverter sub_converter(id_);
280  OpenDDS::DCPS::RepoIdConverter pub_converter(pub->get_id());
282  ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Subscription::disassociate_participant: ")
283  ACE_TEXT("subscription %C failed to reassociate publication %C at %x.\n"),
284  std::string(sub_converter).c_str(),
285  std::string(pub_converter).c_str(),
286  pub));
287  }
288  }
289  }
290 
291  if (0 < count) {
292  idSeq.length(count);
293 
294  if (participant_->is_alive() && this->participant_->isOwner()) {
295  try {
296  CORBA::Boolean dont_notify_lost = 0;
297  reader_->remove_associations(idSeq, dont_notify_lost);
298 
299  } catch (const CORBA::Exception& ex) {
302  "(%P|%t) ERROR: Exception caught in DCPS_IR_Subscription::disassociate_participant:");
303  }
304 
306  }
307  }
308  }
309  }
310 }
311 
313 {
314  DCPS_IR_Publication* pub = 0;
315  size_t numAssociations = associations_.size();
316  CORBA::Boolean dontSend = 0;
317  CORBA::Boolean send = 1;
318  long count = 0;
319 
320  if (0 < numAssociations) {
321  OpenDDS::DCPS::WriterIdSeq idSeq(static_cast<CORBA::ULong>(numAssociations));
322  idSeq.length(static_cast<CORBA::ULong>(numAssociations));
323 
326 
327  while (iter != end) {
328  pub = *iter;
329  ++iter;
330 
332  OpenDDS::DCPS::RepoIdConverter sub_converter(id_);
333  OpenDDS::DCPS::RepoIdConverter pub_converter(pub->get_id());
334  OpenDDS::DCPS::RepoIdConverter sub_topic_converter(id);
335  OpenDDS::DCPS::RepoIdConverter pub_topic_converter(pub->get_topic_id());
337  ACE_TEXT("(%P|%t) DCPS_IR_Subscription::disassociate_topic: ")
338  ACE_TEXT("subscription %C testing if publication %C topic %C == %C.\n"),
339  std::string(sub_converter).c_str(),
340  std::string(pub_converter).c_str(),
341  std::string(sub_topic_converter).c_str(),
342  std::string(pub_topic_converter).c_str()));
343  }
344 
345  if (id == pub->get_topic_id()) {
346  CORBA::Boolean dont_notify_lost = 0;
347  pub->remove_associated_subscription(this, send, dont_notify_lost);
348  remove_associated_publication(pub, dontSend, dont_notify_lost);
349 
350  idSeq[count] = pub->get_id();
351  ++count;
352  }
353  }
354 
355  if (0 < count) {
356  idSeq.length(count);
357 
358  if (participant_->is_alive() && this->participant_->isOwner()) {
359  try {
360  CORBA::Boolean dont_notify_lost = false;
361  reader_->remove_associations(idSeq, dont_notify_lost);
362 
363  } catch (const CORBA::Exception& ex) {
366  "(%P|%t) ERROR: Exception caught in DCPS_IR_Subscription::remove_associations:");
367  }
368 
370  }
371  }
372  }
373  }
374 }
375 
377  bool reassociate)
378 {
379  DCPS_IR_Publication* pub = 0;
380  size_t numAssociations = associations_.size();
381  CORBA::Boolean dontSend = 0;
382  CORBA::Boolean send = 1;
383  long count = 0;
384 
385  if (0 < numAssociations) {
386  OpenDDS::DCPS::WriterIdSeq idSeq(static_cast<CORBA::ULong>(numAssociations));
387  idSeq.length(static_cast<CORBA::ULong>(numAssociations));
388 
391 
392  while (iter != end) {
393  pub = *iter;
394  ++iter;
395 
397  OpenDDS::DCPS::RepoIdConverter sub_converter(id_);
398  OpenDDS::DCPS::RepoIdConverter pub_converter(pub->get_id());
399  OpenDDS::DCPS::RepoIdConverter sub_pub_converter(id);
401  ACE_TEXT("(%P|%t) DCPS_IR_Subscription::disassociate_publication: ")
402  ACE_TEXT("subscription %C testing if publication %C == %C.\n"),
403  std::string(sub_converter).c_str(),
404  std::string(pub_converter).c_str(),
405  std::string(sub_pub_converter).c_str()));
406  }
407 
408  if (id == pub->get_id()) {
409  CORBA::Boolean dont_notify_lost = 0;
410  pub->remove_associated_subscription(this, send, dont_notify_lost);
411  remove_associated_publication(pub, dontSend, dont_notify_lost);
412 
413  idSeq[count] = pub->get_id();
414  ++count;
415 
416  if (reassociate && this->defunct_.insert(pub) != 0) {
417  OpenDDS::DCPS::RepoIdConverter sub_converter(id_);
418  OpenDDS::DCPS::RepoIdConverter pub_converter(pub->get_id());
420  ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Subscription::disassociate_publication: ")
421  ACE_TEXT("subscription %C failed to reassociate publication %C at %x.\n"),
422  std::string(sub_converter).c_str(),
423  std::string(pub_converter).c_str(),
424  pub));
425  }
426  }
427  }
428 
429  if (0 < count) {
430  idSeq.length(count);
431 
432  if (participant_->is_alive() && this->participant_->isOwner()) {
433  try {
434  CORBA::Boolean dont_notify_lost = 0;
435  reader_->remove_associations(idSeq, dont_notify_lost);
436 
437  } catch (const CORBA::Exception& ex) {
440  "(%P|%t) ERROR: Exception caught in DCPS_IR_Subscription::remove_associations:");
441  }
442 
444  }
445  }
446  }
447  }
448 }
449 
451 {
452  if (participant_->is_alive() && this->participant_->isOwner()) {
453  try {
454  reader_->update_incompatible_qos(incompatibleQosStatus_);
456  } catch (const CORBA::Exception& ex) {
459  "(%P|%t) ERROR: Exception caught in DCPS_IR_Subscription::update_incompatible_qos:");
460  }
461 
463  }
464  }
465 }
466 
468  OpenDDS::DCPS::GUID_t topicId,
469  OpenDDS::DCPS::GUID_t pubId)
470 {
471  CORBA::Boolean ignored = (participant_->is_participant_ignored(partId) ||
472  participant_->is_topic_ignored(topicId) ||
474 
475  return ignored;
476 }
477 
479 {
480  return info_;
481 }
482 
484 {
485  return &incompatibleQosStatus_;
486 }
487 
489 {
490  return &qos_;
491 }
492 
494 {
495  return &subscriberQos_;
496 }
497 
498 using OpenDDS::DCPS::operator==;
499 
500 void
502 {
503  if (false == (qos == this->qos_)) {
504  // Check if we should check while we have both values.
505  bool check =
507 
508  // Store the new, compatible, value.
509  this->qos_ = qos;
510 
511  if (check) {
512  // This will remove any newly stale associations.
514 
515  // Sleep a while to let remove_association handled by DataWriter
516  // before add_association. Otherwise, new association will have
517  // trouble to connect each other.
518  ACE_OS::sleep(ACE_Time_Value(0, 250000));
519 
520  // This will establish any newly made associations.
521  DCPS_IR_Topic_Description* description
522  = this->topic_->get_topic_description();
523  description->reevaluate_associations(this);
524  }
525 
527  }
528 }
529 
530 void
532 {
533  if (false == (qos == this->subscriberQos_)) {
534  // Check if we should check while we have both values.
536 
537  // Store the new, compatible, value.
538  this->subscriberQos_ = qos;
539 
540  if (check) {
541  // This will remove any newly stale associations.
543 
544  // Sleep a while to let remove_association handled by DataWriter
545  // before add_association. Otherwise, new association will have
546  // trouble to connect each other.
547  ACE_OS::sleep(ACE_Time_Value(0, 250000));
548 
549  // This will establish any newly made associations.
550  DCPS_IR_Topic_Description* description
551  = this->topic_->get_topic_description();
552  description->reevaluate_associations(this);
553  }
554 
556  }
557 }
558 
560  const DDS::SubscriberQos & subscriberQos,
561  Update::SpecificQos& specificQos)
562 {
563  bool need_evaluate = false;
564  bool u_dr_qos = !(qos_ == qos);
565 
566  if (u_dr_qos) {
568  need_evaluate = true;
569  }
570 
571  qos_ = qos;
572  }
573 
574  bool u_sub_qos = !(subscriberQos_ == subscriberQos);
575 
576  if (u_sub_qos) {
578  need_evaluate = true;
579  }
580 
581  subscriberQos_ = subscriberQos;
582  }
583 
584  if (need_evaluate) {
585  // Check if any existing association need be removed first.
587 
589  description->reevaluate_associations(this);
590  }
591 
593  specificQos = u_dr_qos? Update::DataReaderQos:
594  u_sub_qos? Update::SubscriberQos:
596 
597  return true;
598 }
599 
600 void
602 {
604  while (it != this->defunct_.end()) {
605  DCPS_IR_Publication* publication = *it;
606  ++it;
607 
608  if (reevaluate_association(publication)) {
609  this->defunct_.remove(publication); // no longer defunct
610 
611  } else {
612  OpenDDS::DCPS::RepoIdConverter sub_converter(id_);
613  OpenDDS::DCPS::RepoIdConverter pub_converter(publication->get_id());
615  ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Subscription::reevaluate_defunct_associations: ")
616  ACE_TEXT("subscription %C failed to reassociate publication %C at %x.\n"),
617  std::string(sub_converter).c_str(),
618  std::string(pub_converter).c_str(),
619  publication));
620  }
621  }
622 }
623 
625 {
626  DCPS_IR_Publication * pub = 0;
629 
630  while (iter != end) {
631  pub = *iter;
632  ++iter;
633 
634  this->reevaluate_association(pub);
635  }
636 }
637 
638 bool
640 {
641  int status = this->associations_.find(publication);
642 
643  if (status == 0) {
644  // verify if they are still compatible after change
645 
648  publication->get_transportLocatorSeq(),
649  this->get_transportLocatorSeq(),
650  publication->get_datawriter_qos(),
651  this->get_datareader_qos(),
652  publication->get_publisher_qos(),
653  this->get_subscriber_qos())) {
654  bool sendNotify = true; // inform datareader
655  bool notify_lost = true; // invoke listerner callback
656 
657  this->remove_associated_publication(publication, sendNotify, notify_lost, true);
658  }
659 
660  } else {
662  return description->try_associate(publication, this);
663  }
664 
665  return false;
666 }
667 
669 {
670  return id_;
671 }
672 
674 {
675  return topic_->get_id();
676 }
677 
679 {
680  return participant_->get_id();
681 }
682 
684 {
685  return topic_->get_topic_description();
686 }
687 
689 {
690  return topic_;
691 }
692 
694 {
695  return handle_;
696 }
697 
699 {
700  handle_ = handle;
701 }
702 
704 {
705  return isBIT_;
706 }
707 
709 {
710  isBIT_ = isBIT;
711 }
712 
713 OpenDDS::DCPS::DataReaderRemote_ptr
715 {
716  return OpenDDS::DCPS::DataReaderRemote::_duplicate(this->reader_.in());
717 }
718 
719 std::string
721 {
722  return filterClassName_;
723 }
724 
725 std::string
727 {
728  return filterExpression_;
729 }
730 
733 {
734  return exprParams_;
735 }
736 
737 void
739 {
740  exprParams_ = params;
741  typedef DCPS_IR_Publication_Set::ITERATOR iter_t;
742  for (iter_t i(associations_.begin()), e(associations_.end()); i != e; ++i) {
743  (*i)->update_expr_params(id_, params);
744  }
745 }
746 
747 std::string
748 DCPS_IR_Subscription::dump_to_string(const std::string& prefix, int depth) const
749 {
750  std::string str;
751 #if !defined (OPENDDS_INFOREPO_REDUCED_FOOTPRINT)
752  OpenDDS::DCPS::RepoIdConverter local_converter(id_);
753 
754  for (int i=0; i < depth; i++)
755  str += prefix;
756  std::string indent = str + prefix;
757  str += "DCPS_IR_Subscription[";
758  str += std::string(local_converter);
759  str += "]";
760  if (isBIT_)
761  str += " (BIT)";
762  str += "\n";
763 
764  str += indent + "Associations [ ";
766  assoc != associations_.end();
767  assoc++)
768  {
769  OpenDDS::DCPS::RepoIdConverter assoc_converter((*assoc)->get_id());
770  str += std::string(assoc_converter);
771  str += " ";
772  }
773  str += "]\n";
774 
775  str += indent + "Defunct Associations [ ";
777  def != defunct_.end();
778  def++)
779  {
780  OpenDDS::DCPS::RepoIdConverter def_converter((*def)->get_id());
781  str += std::string(def_converter);
782  str += " ";
783  }
784  str += "]\n";
785 
786 #endif // !defined (OPENDDS_INFOREPO_REDUCED_FOOTPRINT)
787  return str;
788 }
789 
790 const DDS::OctetSeq&
792 {
793  return serializedTypeInfo_;
794 }
795 
DCPS_IR_Publication_Set associations_
const DDS::OctetSeq & get_serialized_type_info() const
int remove_associations(CORBA::Boolean notify_lost)
#define ACE_DEBUG(X)
#define ACE_ERROR(X)
OpenDDS::DCPS::GUID_t get_participant_id()
std::string dump_to_string(const std::string &prefix, int depth) const
ACE_CDR::ULong get_transportContext() const
OpenDDS::DCPS::GUID_t get_id()
DDS::SubscriberQos subscriberQos_
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
OpenDDS::DCPS::GUID_t get_id() const
std::string get_filter_class_name() const
void set_bit_status(CORBA::Boolean isBIT)
DCPS_IR_Topic_Description * get_topic_description()
ssize_t send(ACE_HANDLE handle, const void *buf, size_t len, int flags, const ACE_Time_Value *timeout=0)
int sleep(u_int seconds)
DDS::OctetSeq serializedTypeInfo_
sequence< TransportLocator > TransportLocatorSeq
bool compatibleQOS(OpenDDS::DCPS::IncompatibleQosStatus *writerStatus, OpenDDS::DCPS::IncompatibleQosStatus *readerStatus, const OpenDDS::DCPS::TransportLocatorSeq &pubTLS, const OpenDDS::DCPS::TransportLocatorSeq &subTLS, DDS::DataWriterQos const *const writerQos, DDS::DataReaderQos const *const readerQos, DDS::PublisherQos const *const pubQos, DDS::SubscriberQos const *const subQos)
Definition: DCPS_Utils.cpp:237
OpenDDS::DCPS::GUID_t get_topic_id()
void publish_subscription_bit(DCPS_IR_Subscription *subscription)
Publish Subscription in the Subscription Built-In Topic.
DDS::DataReaderQos qos_
DDS::InstanceHandle_t handle_
DCPS_IR_Subscription(const OpenDDS::DCPS::GUID_t &id, DCPS_IR_Participant *participant, DCPS_IR_Topic *topic, OpenDDS::DCPS::DataReaderRemote_ptr reader, const DDS::DataReaderQos &qos, const OpenDDS::DCPS::TransportLocatorSeq &info, ACE_CDR::ULong transportContext, const DDS::SubscriberQos &subscriberQos, const char *filterClassName, const char *filterExpression, const DDS::StringSeq &exprParams, const DDS::OctetSeq &serializedTypeInfo)
void disassociate_publication(OpenDDS::DCPS::GUID_t id, bool reassociate=false)
Remove any publications with id.
OpenDDS::DCPS::IncompatibleQosStatus * get_incompatibleQosStatus()
DDS::InstanceHandle_t get_handle()
OpenDDS::DCPS::TransportLocatorSeq get_transportLocatorSeq() const
std::string get_filter_expression() const
LM_DEBUG
DCPS_IR_Publication_Set defunct_
OpenDDS::DCPS::IncompatibleQosStatus incompatibleQosStatus_
void set_handle(DDS::InstanceHandle_t handle)
CORBA::Boolean is_participant_ignored(OpenDDS::DCPS::GUID_t id)
void disassociate_topic(OpenDDS::DCPS::GUID_t id)
Remove any publications whose topic has the id.
OpenDDS::DCPS::IncompatibleQosStatus * get_incompatibleQosStatus()
ACE_CDR::Boolean Boolean
OpenDDS::DCPS::GUID_t id_
Representative of a Topic.
Definition: DCPS_IR_Topic.h:43
CORBA::Boolean is_publication_ignored(OpenDDS::DCPS::GUID_t id)
Representative of the Domain Participant.
bool set_qos(const DDS::DataReaderQos &qos, const DDS::SubscriberQos &subscriberQos, Update::SpecificQos &specificQos)
sequence< GUID_t > WriterIdSeq
sequence< octet > OctetSeq
Definition: DdsDcpsCore.idl:64
ACE_UINT32 ULong
DDS::PublisherQos * get_publisher_qos()
OpenDDS::DCPS::GUID_t get_id()
OpenDDS::DCPS::TransportLocatorSeq info_
HANDLE_TYPE_NATIVE InstanceHandle_t
Definition: DdsDcpsCore.idl:51
OpenDDS::DCPS::DataReaderRemote_ptr reader()
ACE_TEXT("TCP_Factory")
CORBA::Boolean is_topic_ignored(OpenDDS::DCPS::GUID_t id)
int remove_associated_publication(DCPS_IR_Publication *pub, CORBA::Boolean sendNotify, CORBA::Boolean notify_lost, bool notify_both_side=false)
void reevaluate_associations(DCPS_IR_Publication *publication)
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
CORBA::Boolean is_publication_ignored(OpenDDS::DCPS::GUID_t partId, OpenDDS::DCPS::GUID_t topicId, OpenDDS::DCPS::GUID_t pubId)
bool try_associate(DCPS_IR_Publication *publication, DCPS_IR_Subscription *subscription)
OpenDDS::DCPS::GUID_t get_id()
void update_expr_params(const DDS::StringSeq &params)
Calls associated Publications.
OpenDDS::DCPS::GUID_t get_participant_id()
const DDS::SubscriberQos * get_subscriber_qos()
void disassociate_participant(OpenDDS::DCPS::GUID_t id, bool reassociate=false)
Remove any publications whose participant has the id.
bool isOwner() const
Indication of whether the current repository is the owner of this participant.
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
int add_associated_publication(DCPS_IR_Publication *pub, bool active)
DCPS_IR_Topic_Description * get_topic_description()
OpenDDS::DCPS::GUID_t get_topic_id()
DCPS_IR_Domain * get_domain_reference() const
int remove_associated_subscription(DCPS_IR_Subscription *sub, CORBA::Boolean sendNotify, CORBA::Boolean notify_lost, bool notify_both_side=false)
bool should_check_association_upon_change(const DDS::DataReaderQos &qos1, const DDS::DataReaderQos &qos2)
Definition: DCPS_Utils.cpp:415
OpenDDS::DCPS::TransportLocatorSeq get_transportLocatorSeq() const
const DDS::OctetSeq & get_serialized_type_info() const
LM_ERROR
void _tao_print_exception(const char *info, FILE *f=stdout) const
Definition: Exception.cpp:82
Representative of a Topic Description.
OpenDDS::DCPS::DataReaderRemote_var reader_
the corresponding DataReaderRemote object
Representative of a Publication.
DDS::StringSeq get_expr_params() const
DDS::DataWriterQos * get_datawriter_qos()
sequence< string > StringSeq
Definition: DdsDcpsCore.idl:50
bool reevaluate_association(DCPS_IR_Publication *publication)
DCPS_IR_Participant * participant_
const DDS::DataReaderQos * get_datareader_qos()