OpenDDS  Snapshot(2023/04/28-20:55)
DCPS_IR_Publication.cpp
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 #include "DcpsInfo_pch.h"
9 #include /**/ "DCPS_IR_Publication.h"
10 
11 #include /**/ "DCPS_IR_Participant.h"
12 #include /**/ "DCPS_IR_Topic.h"
13 #include /**/ "DCPS_IR_Subscription.h"
14 #include /**/ "DCPS_IR_Domain.h"
15 #include /**/ "DCPS_IR_Topic_Description.h"
16 #include /**/ "dds/DCPS/DCPS_Utils.h"
17 #include /**/ "dds/DdsDcpsInfoUtilsC.h"
18 #include /**/ "dds/DCPS/RepoIdConverter.h"
19 #include /**/ "dds/DCPS/Qos_Helper.h"
20 #include /**/ "tao/debug.h"
21 
22 #include /**/ "ace/OS_NS_unistd.h"
23 
25 
27  DCPS_IR_Participant* participant,
28  DCPS_IR_Topic* topic,
29  OpenDDS::DCPS::DataWriterRemote_ptr writer,
30  const DDS::DataWriterQos& qos,
32  ACE_CDR::ULong transportContext,
33  const DDS::PublisherQos& publisherQos,
34  const DDS::OctetSeq& serializedTypeInfo)
35  : id_(id),
36  participant_(participant),
37  topic_(topic),
38  handle_(0),
39  isBIT_(0),
40  qos_(qos),
41  info_(info),
42  transportContext_(transportContext),
43  publisherQos_(publisherQos),
44  serializedTypeInfo_(serializedTypeInfo)
45 {
46  writer_ = OpenDDS::DCPS::DataWriterRemote::_duplicate(writer);
47 
50 }
51 
53 {
54 }
55 
57  bool active)
58 {
59  // keep track of the association locally
60  int status = associations_.insert(sub);
61 
62  switch (status) {
63  case 0: {
64  // inform the datawriter about the association
66  association.readerTransInfo = sub->get_transportLocatorSeq();
67  association.transportContext = sub->get_transportContext();
68  association.readerId = sub->get_id();
69  association.subQos = *(sub->get_subscriber_qos());
70  association.readerQos = *(sub->get_datareader_qos());
71  association.filterClassName = sub->get_filter_class_name().c_str();
72  association.filterExpression = sub->get_filter_expression().c_str();
73  association.exprParams = sub->get_expr_params();
74  association.serializedTypeInfo = sub->get_serialized_type_info();
75 
76  if (participant_->is_alive() && this->participant_->isOwner()) {
77  try {
79  OpenDDS::DCPS::RepoIdConverter pub_converter(id_);
80  OpenDDS::DCPS::RepoIdConverter sub_converter(sub->get_id());
82  ACE_TEXT("(%P|%t) DCPS_IR_Publication::add_associated_subscription:")
83  ACE_TEXT(" publication %C adding subscription %C.\n"),
84  std::string(pub_converter).c_str(),
85  std::string(sub_converter).c_str()));
86  }
87 
88  writer_->add_association(id_, association, active);
89 
92  ACE_TEXT("(%P|%t) DCPS_IR_Publication::add_associated_subscription: ")
93  ACE_TEXT("successfully added subscription %x.\n"),
94  sub));
95  }
96  } catch (const CORBA::Exception& ex) {
98  "(%P|%t) ERROR: Exception caught in DCPS_IR_Publication::add_associated_subscription:");
100  status = -1;
101  }
102  }
103  }
104  break;
105 
106  case 1: {
107  OpenDDS::DCPS::RepoIdConverter pub_converter(id_);
108  OpenDDS::DCPS::RepoIdConverter sub_converter(sub->get_id());
110  ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Publication::add_associated_subscription: ")
111  ACE_TEXT("publication %C attempted to re-add subscription %C.\n"),
112  std::string(pub_converter).c_str(),
113  std::string(sub_converter).c_str()));
114  }
115  break;
116 
117  case -1: {
118  OpenDDS::DCPS::RepoIdConverter pub_converter(id_);
119  OpenDDS::DCPS::RepoIdConverter sub_converter(sub->get_id());
121  ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Publication::add_associated_subscription: ")
122  ACE_TEXT("publication %C failed to add subscription %C.\n"),
123  std::string(pub_converter).c_str(),
124  std::string(sub_converter).c_str()));
125  }
126  }
127 
128  return status;
129 }
130 
132  CORBA::Boolean sendNotify,
133  CORBA::Boolean notify_lost,
134  bool notify_both_side)
135 {
136  bool marked_dead = false;
137 
138  if (sendNotify) {
139  if (participant_->is_alive() && this->participant_->isOwner()) {
140  try {
142  OpenDDS::DCPS::RepoIdConverter pub_converter(id_);
143  OpenDDS::DCPS::RepoIdConverter sub_converter(sub->get_id());
145  ACE_TEXT("(%P|%t) DCPS_IR_Publication::remove_associated_subscription:")
146  ACE_TEXT(" calling pub %C with sub %C\n"),
147  OPENDDS_STRING(pub_converter).c_str(),
148  OPENDDS_STRING(sub_converter).c_str()));
149  }
150 
152  idSeq.length(1);
153  idSeq[0] = sub->get_id();
154 
155  writer_->remove_associations(idSeq, notify_lost);
156 
157  if (notify_both_side) {
158  sub->remove_associated_publication(this, sendNotify, notify_lost);
159  }
160 
161  } catch (const CORBA::Exception& ex) {
164  "(%P|%t) ERROR: Exception caught in DCPS_IR_Publication::remove_associated_subscription:");
165  }
166 
168  marked_dead = true;
169  }
170  }
171  }
172 
173  int status = associations_.remove(sub);
174 
175  if (0 == status) {
177  OpenDDS::DCPS::RepoIdConverter pub_converter(id_);
178  OpenDDS::DCPS::RepoIdConverter sub_converter(sub->get_id());
180  ACE_TEXT("(%P|%t) DCPS_IR_Publication::remove_associated_subscription: ")
181  ACE_TEXT("publication %C removed subscription %C at %x.\n"),
182  std::string(pub_converter).c_str(),
183  std::string(sub_converter).c_str(),
184  sub));
185  }
186 
187  } else {
188  OpenDDS::DCPS::RepoIdConverter pub_converter(id_);
189  OpenDDS::DCPS::RepoIdConverter sub_converter(sub->get_id());
191  ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Publication::remove_associated_subscription: ")
192  ACE_TEXT("publication %C failed to remove subscription %C at %x.\n"),
193  std::string(pub_converter).c_str(),
194  std::string(sub_converter).c_str(),
195  sub));
196  } // if (0 == status)
197 
198  if (marked_dead) {
199  return -1;
200 
201  } else {
202  return status;
203  }
204 }
205 
207 {
208  int status = 0;
209  DCPS_IR_Subscription* sub = 0;
210  size_t numAssociations = associations_.size();
211  CORBA::Boolean dontSend = 0;
212  CORBA::Boolean send = 1;
213 
214  if (0 < numAssociations) {
217 
218  while (iter != end) {
219  sub = *iter;
220  ++iter;
221  sub->remove_associated_publication(this, send, notify_lost);
222  remove_associated_subscription(sub, dontSend, notify_lost);
223  }
224  }
225  this->defunct_.reset();
226 
227  return status;
228 }
229 
231  bool reassociate)
232 {
233  DCPS_IR_Subscription* sub = 0;
234  size_t numAssociations = associations_.size();
235  CORBA::Boolean send = 1;
236  CORBA::Boolean dontSend = 0;
237  long count = 0;
238 
239  if (0 < numAssociations) {
240  OpenDDS::DCPS::ReaderIdSeq idSeq(static_cast<CORBA::ULong>(numAssociations));
241  idSeq.length(static_cast<CORBA::ULong>(numAssociations));
242 
245 
246  while (iter != end) {
247  sub = *iter;
248  ++iter;
249 
251  OpenDDS::DCPS::RepoIdConverter pub_converter(id_);
252  OpenDDS::DCPS::RepoIdConverter sub_converter(sub->get_id());
253  OpenDDS::DCPS::RepoIdConverter pub_part_converter(id);
254  OpenDDS::DCPS::RepoIdConverter sub_part_converter(sub->get_participant_id());
256  ACE_TEXT("(%P|%t) DCPS_IR_Publication::disassociate_participant: ")
257  ACE_TEXT("publication %C testing if subscription %C particpant %C == %C.\n"),
258  std::string(pub_converter).c_str(),
259  std::string(sub_converter).c_str(),
260  std::string(sub_part_converter).c_str(),
261  std::string(pub_part_converter).c_str()));
262  }
263 
264  if (id == sub->get_participant_id()) {
265  CORBA::Boolean dont_notify_lost = 0;
266  sub->remove_associated_publication(this, send, dont_notify_lost);
267  remove_associated_subscription(sub, dontSend, dont_notify_lost);
268 
269  idSeq[count] = sub->get_id();
270  ++count;
271 
272  if (reassociate && this->defunct_.insert(sub) != 0) {
273  OpenDDS::DCPS::RepoIdConverter pub_converter(id_);
274  OpenDDS::DCPS::RepoIdConverter sub_converter(sub->get_id());
276  ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Publication::disassociate_participant: ")
277  ACE_TEXT("publication %C failed to reassociate subscription %C at %x.\n"),
278  std::string(pub_converter).c_str(),
279  std::string(sub_converter).c_str(),
280  sub));
281  }
282  }
283  }
284 
285  if (0 < count) {
286  idSeq.length(count);
287 
288  if (participant_->is_alive() && this->participant_->isOwner()) {
289  try {
290  CORBA::Boolean dont_notify_lost = 0;
291  writer_->remove_associations(idSeq, dont_notify_lost);
292 
293  } catch (const CORBA::Exception& ex) {
296  "(%P|%t) ERROR: Exception caught in DCPS_IR_Publication::disassociate_participant:");
297  }
298 
300  }
301  }
302  }
303  }
304 }
305 
307 {
308  DCPS_IR_Subscription* sub = 0;
309  size_t numAssociations = associations_.size();
310  CORBA::Boolean send = 1;
311  CORBA::Boolean dontSend = 0;
312  long count = 0;
313 
314  if (0 < numAssociations) {
315  OpenDDS::DCPS::ReaderIdSeq idSeq(static_cast<CORBA::ULong>(numAssociations));
316  idSeq.length(static_cast<CORBA::ULong>(numAssociations));
317 
320 
321  while (iter != end) {
322  sub = *iter;
323  ++iter;
324 
326  OpenDDS::DCPS::RepoIdConverter pub_converter(id_);
327  OpenDDS::DCPS::RepoIdConverter sub_converter(sub->get_id());
328  OpenDDS::DCPS::RepoIdConverter pub_topic_converter(id);
329  OpenDDS::DCPS::RepoIdConverter sub_topic_converter(sub->get_topic_id());
331  ACE_TEXT("(%P|%t) DCPS_IR_Publication::disassociate_topic: ")
332  ACE_TEXT("publication %C testing if subscription %C topic %C == %C.\n"),
333  std::string(pub_converter).c_str(),
334  std::string(sub_converter).c_str(),
335  std::string(sub_topic_converter).c_str(),
336  std::string(pub_topic_converter).c_str()));
337  }
338 
339  if (id == sub->get_topic_id()) {
340  CORBA::Boolean dont_notify_lost = 0;
341  sub->remove_associated_publication(this, send, dont_notify_lost);
342  remove_associated_subscription(sub, dontSend, dont_notify_lost);
343 
344  idSeq[count] = sub->get_id();
345  ++count;
346  }
347  }
348 
349  if (0 < count) {
350  idSeq.length(count);
351 
352  if (participant_->is_alive() && this->participant_->isOwner()) {
353  try {
354  CORBA::Boolean dont_notify_lost = 0;
355  writer_->remove_associations(idSeq, dont_notify_lost);
356 
357  } catch (const CORBA::Exception& ex) {
360  "(%P|%t) ERROR: Exception caught in DCPS_IR_Publication::remove_associations:");
361  }
362 
364  }
365  }
366  }
367  }
368 }
369 
371  bool reassociate)
372 {
373  DCPS_IR_Subscription* sub = 0;
374  size_t numAssociations = associations_.size();
375  CORBA::Boolean send = 1;
376  CORBA::Boolean dontSend = 0;
377  long count = 0;
378 
379  if (0 < numAssociations) {
380  OpenDDS::DCPS::ReaderIdSeq idSeq(static_cast<CORBA::ULong>(numAssociations));
381  idSeq.length(static_cast<CORBA::ULong>(numAssociations));
382 
385 
386  while (iter != end) {
387  sub = *iter;
388  ++iter;
389 
391  OpenDDS::DCPS::RepoIdConverter pub_converter(id_);
392  OpenDDS::DCPS::RepoIdConverter sub_converter(sub->get_id());
393  OpenDDS::DCPS::RepoIdConverter pub_sub_converter(id);
395  ACE_TEXT("(%P|%t) DCPS_IR_Publication::disassociate_subscription: ")
396  ACE_TEXT("publication %C testing if subscription %C == %C.\n"),
397  std::string(pub_converter).c_str(),
398  std::string(sub_converter).c_str(),
399  std::string(pub_sub_converter).c_str()));
400  }
401 
402  if (id == sub->get_id()) {
403  CORBA::Boolean dont_notify_lost = 0;
404  sub->remove_associated_publication(this, send, dont_notify_lost);
405  remove_associated_subscription(sub, dontSend, dont_notify_lost);
406 
407  idSeq[count] = sub->get_id();
408  ++count;
409 
410  if (reassociate && this->defunct_.insert(sub) != 0) {
411  OpenDDS::DCPS::RepoIdConverter pub_converter(id_);
412  OpenDDS::DCPS::RepoIdConverter sub_converter(sub->get_id());
414  ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Publication::disassociate_subscription: ")
415  ACE_TEXT("publication %C failed to reassociate subscription %C at %x.\n"),
416  std::string(pub_converter).c_str(),
417  std::string(sub_converter).c_str(),
418  sub));
419  }
420  }
421  }
422 
423  if (0 < count) {
424  idSeq.length(count);
425 
426  if (participant_->is_alive() && this->participant_->isOwner()) {
427  try {
428  CORBA::Boolean dont_notify_lost = 0;
429  writer_->remove_associations(idSeq, dont_notify_lost);
430 
431  } catch (const CORBA::Exception& ex) {
434  "(%P|%t) ERROR: Exception caught in DCPS_IR_Publication::remove_associations:");
435  }
436 
438  }
439  }
440  }
441  }
442 }
443 
445 {
446  if (participant_->is_alive() && this->participant_->isOwner()) {
447  try {
448  writer_->update_incompatible_qos(incompatibleQosStatus_);
450  } catch (const CORBA::Exception& ex) {
453  "(%P|%t) ERROR: Exception caught in DCPS_IR_Publication::update_incompatible_qos:");
454  }
455 
457  }
458  }
459 }
460 
462  OpenDDS::DCPS::GUID_t topicId,
463  OpenDDS::DCPS::GUID_t subId)
464 {
465  CORBA::Boolean ignored;
466  ignored = (participant_->is_participant_ignored(partId) ||
467  participant_->is_topic_ignored(topicId) ||
469 
470  return ignored;
471 }
472 
474 {
475  return &qos_;
476 }
477 
478 using OpenDDS::DCPS::operator==;
479 
480 void
482 {
483  if (false == (qos == this->qos_)) {
484  // Check if we should check while we have both values.
486 
487  // Store the new, compatible, value.
488  this->qos_ = qos;
489 
490  if (check) {
491  // This will remove any newly stale associations.
493 
494  // Sleep a while to let remove_association handled by DataWriter
495  // before add_association. Otherwise, new association will have
496  // trouble to connect each other.
497  ACE_OS::sleep(ACE_Time_Value(0, 250000));
498 
499  // This will establish any newly made associations.
500  DCPS_IR_Topic_Description* description
501  = this->topic_->get_topic_description();
502  description->reevaluate_associations(this);
503  }
504 
506  }
507 }
508 
509 void
511 {
512  if (false == (qos == this->publisherQos_)) {
513  // Check if we should check while we have both values.
515 
516  // Store the new, compatible, value.
517  this->publisherQos_ = qos;
518 
519  if (check) {
520  // This will remove any newly stale associations.
522 
523  // Sleep a while to let remove_association handled by DataWriter
524  // before add_association. Otherwise, new association will have
525  // trouble to connect each other.
526  ACE_OS::sleep(ACE_Time_Value(0, 250000));
527 
528  // This will establish any newly made associations.
529  DCPS_IR_Topic_Description* description
530  = this->topic_->get_topic_description();
531  description->reevaluate_associations(this);
532  }
533 
535  }
536 }
537 
539  const DDS::PublisherQos & publisherQos,
540  Update::SpecificQos& specificQos)
541 {
542  bool need_evaluate = false;
543  bool u_dw_qos = !(qos_ == qos);
544 
545  if (u_dw_qos) {
547  need_evaluate = true;
548  }
549 
550  qos_ = qos;
551  }
552 
553  bool u_pub_qos = !(publisherQos_ == publisherQos);
554 
555  if (u_pub_qos) {
557  need_evaluate = true;
558  }
559 
560  publisherQos_ = publisherQos;
561  }
562 
563  if (need_evaluate) {
564  // Check if any existing association need be removed first.
566 
568  description->reevaluate_associations(this);
569  }
570 
572  specificQos = u_dw_qos? Update::DataWriterQos:
573  u_pub_qos? Update::PublisherQos:
575 
576  return true;
577 }
578 
580 {
581  return &publisherQos_;
582 }
583 
585 {
586  return info_;
587 }
588 
590 {
591  return &incompatibleQosStatus_;
592 }
593 
595 {
596  return id_;
597 }
598 
600 {
601  return topic_->get_id();
602 }
603 
605 {
606  return participant_->get_id();
607 }
608 
610 {
611  return topic_;
612 }
613 
615 {
616  return topic_->get_topic_description();
617 }
618 
620 {
621  return handle_;
622 }
623 
625 {
626  handle_ = handle;
627 }
628 
630 {
631  return isBIT_;
632 }
633 
635 {
636  isBIT_ = isBIT;
637 }
638 
639 OpenDDS::DCPS::DataWriterRemote_ptr
641 {
642  return OpenDDS::DCPS::DataWriterRemote::_duplicate(this->writer_.in());
643 }
644 
645 void
647 {
649  while (it != this->defunct_.end()) {
650  DCPS_IR_Subscription* subscription = *it;
651  ++it;
652 
653  if (reevaluate_association(subscription)) {
654  this->defunct_.remove(subscription); // no longer defunct
655 
656  } else {
657  OpenDDS::DCPS::RepoIdConverter pub_converter(id_);
658  OpenDDS::DCPS::RepoIdConverter sub_converter(subscription->get_id());
660  ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Publication::reevaluate_defunct_associations: ")
661  ACE_TEXT("publication %C failed to reassociate subscription %C at %x.\n"),
662  std::string(pub_converter).c_str(),
663  std::string(sub_converter).c_str(),
664  subscription));
665  }
666  }
667 }
668 
670 {
671  DCPS_IR_Subscription* sub = 0;
674 
675  while (iter != end) {
676  sub = *iter;
677  ++iter;
678 
679  this->reevaluate_association(sub);
680  }
681 }
682 
683 bool
685 {
686  int status = this->associations_.find(subscription);
687 
688  if (status == 0) {
689  // verify if they are still compatible after change
690 
691 
693  subscription->get_incompatibleQosStatus(),
694  this->get_transportLocatorSeq(),
695  subscription->get_transportLocatorSeq(),
696  this->get_datawriter_qos(),
697  subscription->get_datareader_qos(),
698  this->get_publisher_qos(),
699  subscription->get_subscriber_qos())) {
700  bool sendNotify = true; // inform datawriter
701  bool notify_lost = true; // invoke listerner callback
702 
703  this->remove_associated_subscription(subscription, sendNotify, notify_lost, true);
704  }
705 
706  } else {
708  return description->try_associate(this, subscription);
709  }
710 
711  return false;
712 }
713 
714 void
716  const DDS::StringSeq& params)
717 {
718  try {
719  writer_->update_subscription_params(readerId, params);
720  } catch (const CORBA::SystemException& ex) {
722  ex._tao_print_exception("(%P|%t) ERROR: Exception caught in "
723  "DCPS_IR_Publication::update_expr_params:");
724  }
726  }
727 }
728 
729 std::string
730 DCPS_IR_Publication::dump_to_string(const std::string& prefix, int depth) const
731 {
732  std::string str;
733 #if !defined (OPENDDS_INFOREPO_REDUCED_FOOTPRINT)
734  OpenDDS::DCPS::RepoIdConverter local_converter(id_);
735 
736  for (int i=0; i < depth; i++)
737  str += prefix;
738  std::string indent = str + prefix;
739  str += "DCPS_IR_Publication[";
740  str += std::string(local_converter);
741  str += "]";
742  if (isBIT_)
743  str += " (BIT)";
744  str += "\n";
745 
746  str += indent + "Associations [ ";
748  assoc != associations_.end();
749  assoc++)
750  {
751  OpenDDS::DCPS::RepoIdConverter assoc_converter((*assoc)->get_id());
752  str += std::string(assoc_converter);
753  str += " ";
754  }
755  str += "]\n";
756 
757  str += indent + "Defunct Associations [ ";
759  def != defunct_.end();
760  def++)
761  {
762  OpenDDS::DCPS::RepoIdConverter def_converter((*def)->get_id());
763  str += std::string(def_converter);
764  str += " ";
765  }
766  str += "]\n";
767 #endif // !defined (OPENDDS_INFOREPO_REDUCED_FOOTPRINT)
768  return str;
769 }
770 
771 const DDS::OctetSeq&
773 {
774  return serializedTypeInfo_;
775 }
776 
int remove_associations(CORBA::Boolean notify_lost)
const DDS::OctetSeq & get_serialized_type_info() const
std::string dump_to_string(const std::string &prefix, int depth) const
#define ACE_DEBUG(X)
void update_expr_params(OpenDDS::DCPS::GUID_t readerId, const DDS::StringSeq &params)
#define ACE_ERROR(X)
DDS::OctetSeq serializedTypeInfo_
OpenDDS::DCPS::GUID_t get_participant_id()
bool set_qos(const DDS::DataWriterQos &qos, const DDS::PublisherQos &publisherQos, Update::SpecificQos &specificQos)
void set_bit_status(CORBA::Boolean isBIT)
DDS::InstanceHandle_t get_handle()
CORBA::Boolean is_subscription_ignored(OpenDDS::DCPS::GUID_t id)
OpenDDS::DCPS::GUID_t get_id()
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
OpenDDS::DCPS::GUID_t get_id() const
DCPS_IR_Publication(const OpenDDS::DCPS::GUID_t &id, DCPS_IR_Participant *participant, DCPS_IR_Topic *topic, OpenDDS::DCPS::DataWriterRemote_ptr writer, const DDS::DataWriterQos &qos, const OpenDDS::DCPS::TransportLocatorSeq &info, ACE_CDR::ULong transportContext, const DDS::PublisherQos &publisherQos, const DDS::OctetSeq &serializedTypeInfo)
std::string get_filter_class_name() const
DCPS_IR_Topic_Description * get_topic_description()
ssize_t send(ACE_HANDLE handle, const void *buf, size_t len, int flags, const ACE_Time_Value *timeout=0)
int sleep(u_int seconds)
sequence< TransportLocator > TransportLocatorSeq
bool compatibleQOS(OpenDDS::DCPS::IncompatibleQosStatus *writerStatus, OpenDDS::DCPS::IncompatibleQosStatus *readerStatus, const OpenDDS::DCPS::TransportLocatorSeq &pubTLS, const OpenDDS::DCPS::TransportLocatorSeq &subTLS, DDS::DataWriterQos const *const writerQos, DDS::DataReaderQos const *const readerQos, DDS::PublisherQos const *const pubQos, DDS::SubscriberQos const *const subQos)
Definition: DCPS_Utils.cpp:237
OpenDDS::DCPS::GUID_t get_topic_id()
OpenDDS::DCPS::GUID_t id_
void disassociate_topic(OpenDDS::DCPS::GUID_t id)
Remove any subscriptions whose topic has the id.
#define OPENDDS_STRING
OpenDDS::DCPS::IncompatibleQosStatus * get_incompatibleQosStatus()
sequence< GUID_t > ReaderIdSeq
OpenDDS::DCPS::TransportLocatorSeq get_transportLocatorSeq() const
std::string get_filter_expression() const
LM_DEBUG
int add_associated_subscription(DCPS_IR_Subscription *sub, bool active)
CORBA::Boolean is_participant_ignored(OpenDDS::DCPS::GUID_t id)
DCPS_IR_Topic_Description * get_topic_description()
OpenDDS::DCPS::IncompatibleQosStatus * get_incompatibleQosStatus()
ACE_CDR::Boolean Boolean
OpenDDS::DCPS::TransportLocatorSeq info_
Representative of a Topic.
Definition: DCPS_IR_Topic.h:43
CORBA::Boolean is_subscription_ignored(OpenDDS::DCPS::GUID_t partId, OpenDDS::DCPS::GUID_t topicId, OpenDDS::DCPS::GUID_t subId)
DCPS_IR_Participant * participant_
Representative of the Domain Participant.
DCPS_IR_Subscription_Set defunct_
bool reevaluate_association(DCPS_IR_Subscription *subscription)
OpenDDS::DCPS::IncompatibleQosStatus incompatibleQosStatus_
sequence< octet > OctetSeq
Definition: DdsDcpsCore.idl:64
ACE_UINT32 ULong
DDS::PublisherQos * get_publisher_qos()
OpenDDS::DCPS::GUID_t get_id()
void set_handle(DDS::InstanceHandle_t handle)
HANDLE_TYPE_NATIVE InstanceHandle_t
Definition: DdsDcpsCore.idl:51
OpenDDS::DCPS::DataWriterRemote_var writer_
the corresponding DataWriterRemote object
ACE_TEXT("TCP_Factory")
CORBA::Boolean is_topic_ignored(OpenDDS::DCPS::GUID_t id)
DDS::DataWriterQos qos_
int remove_associated_publication(DCPS_IR_Publication *pub, CORBA::Boolean sendNotify, CORBA::Boolean notify_lost, bool notify_both_side=false)
DDS::PublisherQos publisherQos_
void reevaluate_associations(DCPS_IR_Publication *publication)
OpenDDS::DCPS::DataWriterRemote_ptr writer()
void publish_publication_bit(DCPS_IR_Publication *publication)
Publish Publication in the Publication Built-In Topic.
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
bool try_associate(DCPS_IR_Publication *publication, DCPS_IR_Subscription *subscription)
OpenDDS::DCPS::GUID_t get_id()
OpenDDS::DCPS::GUID_t get_participant_id()
void disassociate_subscription(OpenDDS::DCPS::GUID_t id, bool reassociate=false)
Remove any subscriptions with the id.
const DDS::SubscriberQos * get_subscriber_qos()
bool isOwner() const
Indication of whether the current repository is the owner of this participant.
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
Representative of a Subscription.
OpenDDS::DCPS::GUID_t get_topic_id()
DCPS_IR_Domain * get_domain_reference() const
DCPS_IR_Topic * get_topic()
int remove_associated_subscription(DCPS_IR_Subscription *sub, CORBA::Boolean sendNotify, CORBA::Boolean notify_lost, bool notify_both_side=false)
DDS::InstanceHandle_t handle_
bool should_check_association_upon_change(const DDS::DataReaderQos &qos1, const DDS::DataReaderQos &qos2)
Definition: DCPS_Utils.cpp:415
ACE_CDR::ULong get_transportContext() const
OpenDDS::DCPS::TransportLocatorSeq get_transportLocatorSeq() const
const DDS::OctetSeq & get_serialized_type_info() const
DCPS_IR_Subscription_Set associations_
LM_ERROR
void _tao_print_exception(const char *info, FILE *f=stdout) const
Definition: Exception.cpp:82
Representative of a Topic Description.
void disassociate_participant(OpenDDS::DCPS::GUID_t id, bool reassociate=false)
Remove any subscriptions whose participant has the id.
DDS::StringSeq get_expr_params() const
DDS::DataWriterQos * get_datawriter_qos()
sequence< string > StringSeq
Definition: DdsDcpsCore.idl:50
const DDS::DataReaderQos * get_datareader_qos()