OpenDDS  Snapshot(2023/03/10-19:29)
Public Member Functions | Private Attributes | List of all members
DCPS_IR_Publication Class Reference

Representative of a Publication. More...

#include <DCPS_IR_Publication.h>

Inheritance diagram for DCPS_IR_Publication:
Inheritance graph
[legend]
Collaboration diagram for DCPS_IR_Publication:
Collaboration graph
[legend]

Public Member Functions

 DCPS_IR_Publication (const OpenDDS::DCPS::GUID_t &id, DCPS_IR_Participant *participant, DCPS_IR_Topic *topic, OpenDDS::DCPS::DataWriterRemote_ptr writer, const DDS::DataWriterQos &qos, const OpenDDS::DCPS::TransportLocatorSeq &info, ACE_CDR::ULong transportContext, const DDS::PublisherQos &publisherQos, const DDS::OctetSeq &serializedTypeInfo)
 
 ~DCPS_IR_Publication ()
 
int add_associated_subscription (DCPS_IR_Subscription *sub, bool active)
 
int remove_associated_subscription (DCPS_IR_Subscription *sub, CORBA::Boolean sendNotify, CORBA::Boolean notify_lost, bool notify_both_side=false)
 
int remove_associations (CORBA::Boolean notify_lost)
 
void disassociate_participant (OpenDDS::DCPS::GUID_t id, bool reassociate=false)
 Remove any subscriptions whose participant has the id. More...
 
void disassociate_topic (OpenDDS::DCPS::GUID_t id)
 Remove any subscriptions whose topic has the id. More...
 
void disassociate_subscription (OpenDDS::DCPS::GUID_t id, bool reassociate=false)
 Remove any subscriptions with the id. More...
 
void update_incompatible_qos ()
 
CORBA::Boolean is_subscription_ignored (OpenDDS::DCPS::GUID_t partId, OpenDDS::DCPS::GUID_t topicId, OpenDDS::DCPS::GUID_t subId)
 
DDS::DataWriterQosget_datawriter_qos ()
 
DDS::PublisherQosget_publisher_qos ()
 
bool set_qos (const DDS::DataWriterQos &qos, const DDS::PublisherQos &publisherQos, Update::SpecificQos &specificQos)
 
void set_qos (const DDS::DataWriterQos &qos)
 Update DataWriterQos only. More...
 
void set_qos (const DDS::PublisherQos &qos)
 Update PublisherQos only. More...
 
OpenDDS::DCPS::TransportLocatorSeq get_transportLocatorSeq () const
 
ACE_CDR::ULong get_transportContext () const
 
OpenDDS::DCPS::IncompatibleQosStatusget_incompatibleQosStatus ()
 
OpenDDS::DCPS::GUID_t get_id ()
 
OpenDDS::DCPS::GUID_t get_topic_id ()
 
OpenDDS::DCPS::GUID_t get_participant_id ()
 
DCPS_IR_Topicget_topic ()
 
DCPS_IR_Topic_Descriptionget_topic_description ()
 
DDS::InstanceHandle_t get_handle ()
 
void set_handle (DDS::InstanceHandle_t handle)
 
CORBA::Boolean is_bit ()
 
void set_bit_status (CORBA::Boolean isBIT)
 
OpenDDS::DCPS::DataWriterRemote_ptr writer ()
 
void reevaluate_defunct_associations ()
 
void reevaluate_existing_associations ()
 
bool reevaluate_association (DCPS_IR_Subscription *subscription)
 
void update_expr_params (OpenDDS::DCPS::GUID_t readerId, const DDS::StringSeq &params)
 
std::string dump_to_string (const std::string &prefix, int depth) const
 
const DDS::OctetSeqget_serialized_type_info () const
 

Private Attributes

OpenDDS::DCPS::GUID_t id_
 
DCPS_IR_Participantparticipant_
 
DCPS_IR_Topictopic_
 
DDS::InstanceHandle_t handle_
 
CORBA::Boolean isBIT_
 
OpenDDS::DCPS::DataWriterRemote_var writer_
 the corresponding DataWriterRemote object More...
 
DDS::DataWriterQos qos_
 
OpenDDS::DCPS::TransportLocatorSeq info_
 
ACE_CDR::ULong transportContext_
 
DDS::PublisherQos publisherQos_
 
DDS::OctetSeq serializedTypeInfo_
 
DCPS_IR_Subscription_Set associations_
 
DCPS_IR_Subscription_Set defunct_
 
OpenDDS::DCPS::IncompatibleQosStatus incompatibleQosStatus_
 

Additional Inherited Members

- Protected Member Functions inherited from OpenDDS::DCPS::EnableContainerSupportedUniquePtr< DCPS_IR_Publication >
 EnableContainerSupportedUniquePtr ()
 
void _add_ref ()
 
void _remove_ref ()
 
long ref_count () const
 

Detailed Description

Representative of a Publication.

Definition at line 41 of file DCPS_IR_Publication.h.

Constructor & Destructor Documentation

◆ DCPS_IR_Publication()

OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL DCPS_IR_Publication::DCPS_IR_Publication ( const OpenDDS::DCPS::GUID_t id,
DCPS_IR_Participant participant,
DCPS_IR_Topic topic,
OpenDDS::DCPS::DataWriterRemote_ptr  writer,
const DDS::DataWriterQos qos,
const OpenDDS::DCPS::TransportLocatorSeq info,
ACE_CDR::ULong  transportContext,
const DDS::PublisherQos publisherQos,
const DDS::OctetSeq serializedTypeInfo 
)

Definition at line 26 of file DCPS_IR_Publication.cpp.

References OpenDDS::DCPS::IncompatibleQosStatus::count_since_last_send, incompatibleQosStatus_, OpenDDS::DCPS::IncompatibleQosStatus::total_count, and writer_.

35  : id_(id),
36  participant_(participant),
37  topic_(topic),
38  handle_(0),
39  isBIT_(0),
40  qos_(qos),
41  info_(info),
42  transportContext_(transportContext),
43  publisherQos_(publisherQos),
44  serializedTypeInfo_(serializedTypeInfo)
45 {
46  writer_ = OpenDDS::DCPS::DataWriterRemote::_duplicate(writer);
47 
50 }
DDS::OctetSeq serializedTypeInfo_
DDS::PublisherQos publisherQos_
DCPS_IR_Participant * participant_
OpenDDS::DCPS::DataWriterRemote_ptr writer()
OpenDDS::DCPS::TransportLocatorSeq info_
DDS::DataWriterQos qos_
ACE_CDR::ULong transportContext_
OpenDDS::DCPS::GUID_t id_
OpenDDS::DCPS::IncompatibleQosStatus incompatibleQosStatus_
DDS::InstanceHandle_t handle_
OpenDDS::DCPS::DataWriterRemote_var writer_
the corresponding DataWriterRemote object

◆ ~DCPS_IR_Publication()

DCPS_IR_Publication::~DCPS_IR_Publication ( )

Definition at line 52 of file DCPS_IR_Publication.cpp.

53 {
54 }

Member Function Documentation

◆ add_associated_subscription()

int DCPS_IR_Publication::add_associated_subscription ( DCPS_IR_Subscription sub,
bool  active 
)

Associate with the subscription Adds the subscription to the list of associated subscriptions and notifies datawriter if successfully added This method can mark the participant dead Returns 0 if added, 1 if already exists, -1 other failure

Definition at line 56 of file DCPS_IR_Publication.cpp.

References CORBA::Exception::_tao_print_exception(), ACE_DEBUG, ACE_ERROR, ACE_TEXT(), associations_, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::ReaderAssociation::exprParams, OpenDDS::DCPS::ReaderAssociation::filterClassName, OpenDDS::DCPS::ReaderAssociation::filterExpression, DCPS_IR_Subscription::get_datareader_qos(), DCPS_IR_Subscription::get_expr_params(), DCPS_IR_Subscription::get_filter_class_name(), DCPS_IR_Subscription::get_filter_expression(), DCPS_IR_Subscription::get_id(), DCPS_IR_Subscription::get_serialized_type_info(), DCPS_IR_Subscription::get_subscriber_qos(), DCPS_IR_Subscription::get_transportContext(), DCPS_IR_Subscription::get_transportLocatorSeq(), id_, ACE_Unbounded_Set_Ex< T, ACE_Unbounded_Set_Default_Comparator< T > >::insert(), DCPS_IR_Participant::is_alive(), DCPS_IR_Participant::isOwner(), LM_DEBUG, LM_ERROR, DCPS_IR_Participant::mark_dead(), participant_, OpenDDS::DCPS::ReaderAssociation::readerId, OpenDDS::DCPS::ReaderAssociation::readerQos, OpenDDS::DCPS::ReaderAssociation::readerTransInfo, OpenDDS::DCPS::ReaderAssociation::serializedTypeInfo, OpenDDS::DCPS::ReaderAssociation::subQos, OpenDDS::DCPS::ReaderAssociation::transportContext, and writer_.

Referenced by DCPS_IR_Topic_Description::associate().

58 {
59  // keep track of the association locally
60  int status = associations_.insert(sub);
61 
62  switch (status) {
63  case 0: {
64  // inform the datawriter about the association
66  association.readerTransInfo = sub->get_transportLocatorSeq();
67  association.transportContext = sub->get_transportContext();
68  association.readerId = sub->get_id();
69  association.subQos = *(sub->get_subscriber_qos());
70  association.readerQos = *(sub->get_datareader_qos());
71  association.filterClassName = sub->get_filter_class_name().c_str();
72  association.filterExpression = sub->get_filter_expression().c_str();
73  association.exprParams = sub->get_expr_params();
74  association.serializedTypeInfo = sub->get_serialized_type_info();
75 
76  if (participant_->is_alive() && this->participant_->isOwner()) {
77  try {
79  OpenDDS::DCPS::RepoIdConverter pub_converter(id_);
80  OpenDDS::DCPS::RepoIdConverter sub_converter(sub->get_id());
81  ACE_DEBUG((LM_DEBUG,
82  ACE_TEXT("(%P|%t) DCPS_IR_Publication::add_associated_subscription:")
83  ACE_TEXT(" publication %C adding subscription %C.\n"),
84  std::string(pub_converter).c_str(),
85  std::string(sub_converter).c_str()));
86  }
87 
88  writer_->add_association(id_, association, active);
89 
91  ACE_DEBUG((LM_DEBUG,
92  ACE_TEXT("(%P|%t) DCPS_IR_Publication::add_associated_subscription: ")
93  ACE_TEXT("successfully added subscription %x.\n"),
94  sub));
95  }
96  } catch (const CORBA::Exception& ex) {
98  "(%P|%t) ERROR: Exception caught in DCPS_IR_Publication::add_associated_subscription:");
100  status = -1;
101  }
102  }
103  }
104  break;
105 
106  case 1: {
107  OpenDDS::DCPS::RepoIdConverter pub_converter(id_);
108  OpenDDS::DCPS::RepoIdConverter sub_converter(sub->get_id());
109  ACE_ERROR((LM_ERROR,
110  ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Publication::add_associated_subscription: ")
111  ACE_TEXT("publication %C attempted to re-add subscription %C.\n"),
112  std::string(pub_converter).c_str(),
113  std::string(sub_converter).c_str()));
114  }
115  break;
116 
117  case -1: {
118  OpenDDS::DCPS::RepoIdConverter pub_converter(id_);
119  OpenDDS::DCPS::RepoIdConverter sub_converter(sub->get_id());
120  ACE_ERROR((LM_ERROR,
121  ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Publication::add_associated_subscription: ")
122  ACE_TEXT("publication %C failed to add subscription %C.\n"),
123  std::string(pub_converter).c_str(),
124  std::string(sub_converter).c_str()));
125  }
126  }
127 
128  return status;
129 }
#define ACE_DEBUG(X)
#define ACE_ERROR(X)
const DDS::DataReaderQos * get_datareader_qos()
ACE_CDR::ULong get_transportContext() const
const DDS::SubscriberQos * get_subscriber_qos()
const DDS::OctetSeq & get_serialized_type_info() const
std::string get_filter_expression() const
DCPS_IR_Participant * participant_
DDS::StringSeq get_expr_params() const
OpenDDS::DCPS::TransportLocatorSeq get_transportLocatorSeq() const
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
ACE_TEXT("TCP_Factory")
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
OpenDDS::DCPS::GUID_t id_
OpenDDS::DCPS::GUID_t get_id()
std::string get_filter_class_name() const
void _tao_print_exception(const char *info, FILE *f=stdout) const
Definition: Exception.cpp:82
DCPS_IR_Subscription_Set associations_
bool isOwner() const
Indication of whether the current repository is the owner of this participant.
OpenDDS::DCPS::DataWriterRemote_var writer_
the corresponding DataWriterRemote object

◆ disassociate_participant()

void DCPS_IR_Publication::disassociate_participant ( OpenDDS::DCPS::GUID_t  id,
bool  reassociate = false 
)

Remove any subscriptions whose participant has the id.

Definition at line 230 of file DCPS_IR_Publication.cpp.

References CORBA::Exception::_tao_print_exception(), ACE_DEBUG, ACE_ERROR, ACE_TEXT(), associations_, ACE_Unbounded_Set_Ex< T, ACE_Unbounded_Set_Default_Comparator< T > >::begin(), OpenDDS::DCPS::DCPS_debug_level, defunct_, ACE_Unbounded_Set_Ex< T, ACE_Unbounded_Set_Default_Comparator< T > >::end(), DCPS_IR_Subscription::get_id(), DCPS_IR_Subscription::get_participant_id(), id_, ACE_Unbounded_Set_Ex< T, ACE_Unbounded_Set_Default_Comparator< T > >::insert(), DCPS_IR_Participant::is_alive(), DCPS_IR_Participant::isOwner(), LM_DEBUG, LM_ERROR, DCPS_IR_Participant::mark_dead(), participant_, DCPS_IR_Subscription::remove_associated_publication(), remove_associated_subscription(), send(), ACE_Unbounded_Set_Ex< T, ACE_Unbounded_Set_Default_Comparator< T > >::size(), and writer_.

232 {
233  DCPS_IR_Subscription* sub = 0;
234  size_t numAssociations = associations_.size();
235  CORBA::Boolean send = 1;
236  CORBA::Boolean dontSend = 0;
237  long count = 0;
238 
239  if (0 < numAssociations) {
240  OpenDDS::DCPS::ReaderIdSeq idSeq(static_cast<CORBA::ULong>(numAssociations));
241  idSeq.length(static_cast<CORBA::ULong>(numAssociations));
242 
245 
246  while (iter != end) {
247  sub = *iter;
248  ++iter;
249 
251  OpenDDS::DCPS::RepoIdConverter pub_converter(id_);
252  OpenDDS::DCPS::RepoIdConverter sub_converter(sub->get_id());
253  OpenDDS::DCPS::RepoIdConverter pub_part_converter(id);
254  OpenDDS::DCPS::RepoIdConverter sub_part_converter(sub->get_participant_id());
255  ACE_DEBUG((LM_DEBUG,
256  ACE_TEXT("(%P|%t) DCPS_IR_Publication::disassociate_participant: ")
257  ACE_TEXT("publication %C testing if subscription %C particpant %C == %C.\n"),
258  std::string(pub_converter).c_str(),
259  std::string(sub_converter).c_str(),
260  std::string(sub_part_converter).c_str(),
261  std::string(pub_part_converter).c_str()));
262  }
263 
264  if (id == sub->get_participant_id()) {
265  CORBA::Boolean dont_notify_lost = 0;
266  sub->remove_associated_publication(this, send, dont_notify_lost);
267  remove_associated_subscription(sub, dontSend, dont_notify_lost);
268 
269  idSeq[count] = sub->get_id();
270  ++count;
271 
272  if (reassociate && this->defunct_.insert(sub) != 0) {
273  OpenDDS::DCPS::RepoIdConverter pub_converter(id_);
274  OpenDDS::DCPS::RepoIdConverter sub_converter(sub->get_id());
275  ACE_ERROR((LM_ERROR,
276  ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Publication::disassociate_participant: ")
277  ACE_TEXT("publication %C failed to reassociate subscription %C at %x.\n"),
278  std::string(pub_converter).c_str(),
279  std::string(sub_converter).c_str(),
280  sub));
281  }
282  }
283  }
284 
285  if (0 < count) {
286  idSeq.length(count);
287 
288  if (participant_->is_alive() && this->participant_->isOwner()) {
289  try {
290  CORBA::Boolean dont_notify_lost = 0;
291  writer_->remove_associations(idSeq, dont_notify_lost);
292 
293  } catch (const CORBA::Exception& ex) {
296  "(%P|%t) ERROR: Exception caught in DCPS_IR_Publication::disassociate_participant:");
297  }
298 
300  }
301  }
302  }
303  }
304 }
#define ACE_DEBUG(X)
#define ACE_ERROR(X)
OpenDDS::DCPS::GUID_t get_participant_id()
ssize_t send(ACE_HANDLE handle, const void *buf, size_t len, int flags, const ACE_Time_Value *timeout=0)
int remove_associated_subscription(DCPS_IR_Subscription *sub, CORBA::Boolean sendNotify, CORBA::Boolean notify_lost, bool notify_both_side=false)
int remove_associated_publication(DCPS_IR_Publication *pub, CORBA::Boolean sendNotify, CORBA::Boolean notify_lost, bool notify_both_side=false)
DCPS_IR_Participant * participant_
DCPS_IR_Subscription_Set defunct_
Representative of a Subscription.
ACE_CDR::Boolean Boolean
sequence< GUID_t > ReaderIdSeq
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
ACE_TEXT("TCP_Factory")
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
OpenDDS::DCPS::GUID_t id_
OpenDDS::DCPS::GUID_t get_id()
void _tao_print_exception(const char *info, FILE *f=stdout) const
Definition: Exception.cpp:82
DCPS_IR_Subscription_Set associations_
bool isOwner() const
Indication of whether the current repository is the owner of this participant.
OpenDDS::DCPS::DataWriterRemote_var writer_
the corresponding DataWriterRemote object

◆ disassociate_subscription()

void DCPS_IR_Publication::disassociate_subscription ( OpenDDS::DCPS::GUID_t  id,
bool  reassociate = false 
)

Remove any subscriptions with the id.

Definition at line 370 of file DCPS_IR_Publication.cpp.

References CORBA::Exception::_tao_print_exception(), ACE_DEBUG, ACE_ERROR, ACE_TEXT(), associations_, ACE_Unbounded_Set_Ex< T, ACE_Unbounded_Set_Default_Comparator< T > >::begin(), OpenDDS::DCPS::DCPS_debug_level, defunct_, ACE_Unbounded_Set_Ex< T, ACE_Unbounded_Set_Default_Comparator< T > >::end(), DCPS_IR_Subscription::get_id(), id_, ACE_Unbounded_Set_Ex< T, ACE_Unbounded_Set_Default_Comparator< T > >::insert(), DCPS_IR_Participant::is_alive(), DCPS_IR_Participant::isOwner(), LM_DEBUG, LM_ERROR, DCPS_IR_Participant::mark_dead(), participant_, DCPS_IR_Subscription::remove_associated_publication(), remove_associated_subscription(), send(), ACE_Unbounded_Set_Ex< T, ACE_Unbounded_Set_Default_Comparator< T > >::size(), and writer_.

Referenced by TAO_DDS_DCPSInfo_i::disassociate_publication().

372 {
373  DCPS_IR_Subscription* sub = 0;
374  size_t numAssociations = associations_.size();
375  CORBA::Boolean send = 1;
376  CORBA::Boolean dontSend = 0;
377  long count = 0;
378 
379  if (0 < numAssociations) {
380  OpenDDS::DCPS::ReaderIdSeq idSeq(static_cast<CORBA::ULong>(numAssociations));
381  idSeq.length(static_cast<CORBA::ULong>(numAssociations));
382 
385 
386  while (iter != end) {
387  sub = *iter;
388  ++iter;
389 
391  OpenDDS::DCPS::RepoIdConverter pub_converter(id_);
392  OpenDDS::DCPS::RepoIdConverter sub_converter(sub->get_id());
393  OpenDDS::DCPS::RepoIdConverter pub_sub_converter(id);
394  ACE_DEBUG((LM_DEBUG,
395  ACE_TEXT("(%P|%t) DCPS_IR_Publication::disassociate_subscription: ")
396  ACE_TEXT("publication %C testing if subscription %C == %C.\n"),
397  std::string(pub_converter).c_str(),
398  std::string(sub_converter).c_str(),
399  std::string(pub_sub_converter).c_str()));
400  }
401 
402  if (id == sub->get_id()) {
403  CORBA::Boolean dont_notify_lost = 0;
404  sub->remove_associated_publication(this, send, dont_notify_lost);
405  remove_associated_subscription(sub, dontSend, dont_notify_lost);
406 
407  idSeq[count] = sub->get_id();
408  ++count;
409 
410  if (reassociate && this->defunct_.insert(sub) != 0) {
411  OpenDDS::DCPS::RepoIdConverter pub_converter(id_);
412  OpenDDS::DCPS::RepoIdConverter sub_converter(sub->get_id());
413  ACE_ERROR((LM_ERROR,
414  ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Publication::disassociate_subscription: ")
415  ACE_TEXT("publication %C failed to reassociate subscription %C at %x.\n"),
416  std::string(pub_converter).c_str(),
417  std::string(sub_converter).c_str(),
418  sub));
419  }
420  }
421  }
422 
423  if (0 < count) {
424  idSeq.length(count);
425 
426  if (participant_->is_alive() && this->participant_->isOwner()) {
427  try {
428  CORBA::Boolean dont_notify_lost = 0;
429  writer_->remove_associations(idSeq, dont_notify_lost);
430 
431  } catch (const CORBA::Exception& ex) {
434  "(%P|%t) ERROR: Exception caught in DCPS_IR_Publication::remove_associations:");
435  }
436 
438  }
439  }
440  }
441  }
442 }
#define ACE_DEBUG(X)
#define ACE_ERROR(X)
ssize_t send(ACE_HANDLE handle, const void *buf, size_t len, int flags, const ACE_Time_Value *timeout=0)
int remove_associated_subscription(DCPS_IR_Subscription *sub, CORBA::Boolean sendNotify, CORBA::Boolean notify_lost, bool notify_both_side=false)
int remove_associated_publication(DCPS_IR_Publication *pub, CORBA::Boolean sendNotify, CORBA::Boolean notify_lost, bool notify_both_side=false)
DCPS_IR_Participant * participant_
DCPS_IR_Subscription_Set defunct_
Representative of a Subscription.
ACE_CDR::Boolean Boolean
sequence< GUID_t > ReaderIdSeq
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
ACE_TEXT("TCP_Factory")
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
OpenDDS::DCPS::GUID_t id_
OpenDDS::DCPS::GUID_t get_id()
void _tao_print_exception(const char *info, FILE *f=stdout) const
Definition: Exception.cpp:82
DCPS_IR_Subscription_Set associations_
bool isOwner() const
Indication of whether the current repository is the owner of this participant.
OpenDDS::DCPS::DataWriterRemote_var writer_
the corresponding DataWriterRemote object

◆ disassociate_topic()

void DCPS_IR_Publication::disassociate_topic ( OpenDDS::DCPS::GUID_t  id)

Remove any subscriptions whose topic has the id.

Definition at line 306 of file DCPS_IR_Publication.cpp.

References CORBA::Exception::_tao_print_exception(), ACE_DEBUG, ACE_TEXT(), associations_, ACE_Unbounded_Set_Ex< T, ACE_Unbounded_Set_Default_Comparator< T > >::begin(), OpenDDS::DCPS::DCPS_debug_level, ACE_Unbounded_Set_Ex< T, ACE_Unbounded_Set_Default_Comparator< T > >::end(), DCPS_IR_Subscription::get_id(), DCPS_IR_Subscription::get_topic_id(), id_, DCPS_IR_Participant::is_alive(), DCPS_IR_Participant::isOwner(), LM_DEBUG, DCPS_IR_Participant::mark_dead(), participant_, DCPS_IR_Subscription::remove_associated_publication(), remove_associated_subscription(), send(), ACE_Unbounded_Set_Ex< T, ACE_Unbounded_Set_Default_Comparator< T > >::size(), and writer_.

307 {
308  DCPS_IR_Subscription* sub = 0;
309  size_t numAssociations = associations_.size();
310  CORBA::Boolean send = 1;
311  CORBA::Boolean dontSend = 0;
312  long count = 0;
313 
314  if (0 < numAssociations) {
315  OpenDDS::DCPS::ReaderIdSeq idSeq(static_cast<CORBA::ULong>(numAssociations));
316  idSeq.length(static_cast<CORBA::ULong>(numAssociations));
317 
320 
321  while (iter != end) {
322  sub = *iter;
323  ++iter;
324 
326  OpenDDS::DCPS::RepoIdConverter pub_converter(id_);
327  OpenDDS::DCPS::RepoIdConverter sub_converter(sub->get_id());
328  OpenDDS::DCPS::RepoIdConverter pub_topic_converter(id);
329  OpenDDS::DCPS::RepoIdConverter sub_topic_converter(sub->get_topic_id());
330  ACE_DEBUG((LM_DEBUG,
331  ACE_TEXT("(%P|%t) DCPS_IR_Publication::disassociate_topic: ")
332  ACE_TEXT("publication %C testing if subscription %C topic %C == %C.\n"),
333  std::string(pub_converter).c_str(),
334  std::string(sub_converter).c_str(),
335  std::string(sub_topic_converter).c_str(),
336  std::string(pub_topic_converter).c_str()));
337  }
338 
339  if (id == sub->get_topic_id()) {
340  CORBA::Boolean dont_notify_lost = 0;
341  sub->remove_associated_publication(this, send, dont_notify_lost);
342  remove_associated_subscription(sub, dontSend, dont_notify_lost);
343 
344  idSeq[count] = sub->get_id();
345  ++count;
346  }
347  }
348 
349  if (0 < count) {
350  idSeq.length(count);
351 
352  if (participant_->is_alive() && this->participant_->isOwner()) {
353  try {
354  CORBA::Boolean dont_notify_lost = 0;
355  writer_->remove_associations(idSeq, dont_notify_lost);
356 
357  } catch (const CORBA::Exception& ex) {
360  "(%P|%t) ERROR: Exception caught in DCPS_IR_Publication::remove_associations:");
361  }
362 
364  }
365  }
366  }
367  }
368 }
#define ACE_DEBUG(X)
ssize_t send(ACE_HANDLE handle, const void *buf, size_t len, int flags, const ACE_Time_Value *timeout=0)
int remove_associated_subscription(DCPS_IR_Subscription *sub, CORBA::Boolean sendNotify, CORBA::Boolean notify_lost, bool notify_both_side=false)
int remove_associated_publication(DCPS_IR_Publication *pub, CORBA::Boolean sendNotify, CORBA::Boolean notify_lost, bool notify_both_side=false)
DCPS_IR_Participant * participant_
Representative of a Subscription.
ACE_CDR::Boolean Boolean
sequence< GUID_t > ReaderIdSeq
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
ACE_TEXT("TCP_Factory")
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
OpenDDS::DCPS::GUID_t id_
OpenDDS::DCPS::GUID_t get_id()
OpenDDS::DCPS::GUID_t get_topic_id()
void _tao_print_exception(const char *info, FILE *f=stdout) const
Definition: Exception.cpp:82
DCPS_IR_Subscription_Set associations_
bool isOwner() const
Indication of whether the current repository is the owner of this participant.
OpenDDS::DCPS::DataWriterRemote_var writer_
the corresponding DataWriterRemote object

◆ dump_to_string()

std::string DCPS_IR_Publication::dump_to_string ( const std::string &  prefix,
int  depth 
) const

Definition at line 730 of file DCPS_IR_Publication.cpp.

References associations_, ACE_Unbounded_Set_Ex< T, ACE_Unbounded_Set_Default_Comparator< T > >::begin(), defunct_, ACE_Unbounded_Set_Ex< T, ACE_Unbounded_Set_Default_Comparator< T > >::end(), id_, and isBIT_.

731 {
732  std::string str;
733 #if !defined (OPENDDS_INFOREPO_REDUCED_FOOTPRINT)
734  OpenDDS::DCPS::RepoIdConverter local_converter(id_);
735 
736  for (int i=0; i < depth; i++)
737  str += prefix;
738  std::string indent = str + prefix;
739  str += "DCPS_IR_Publication[";
740  str += std::string(local_converter);
741  str += "]";
742  if (isBIT_)
743  str += " (BIT)";
744  str += "\n";
745 
746  str += indent + "Associations [ ";
748  assoc != associations_.end();
749  assoc++)
750  {
751  OpenDDS::DCPS::RepoIdConverter assoc_converter((*assoc)->get_id());
752  str += std::string(assoc_converter);
753  str += " ";
754  }
755  str += "]\n";
756 
757  str += indent + "Defunct Associations [ ";
759  def != defunct_.end();
760  def++)
761  {
762  OpenDDS::DCPS::RepoIdConverter def_converter((*def)->get_id());
763  str += std::string(def_converter);
764  str += " ";
765  }
766  str += "]\n";
767 #endif // !defined (OPENDDS_INFOREPO_REDUCED_FOOTPRINT)
768  return str;
769 }
DCPS_IR_Subscription_Set defunct_
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
OpenDDS::DCPS::GUID_t id_
DCPS_IR_Subscription_Set associations_

◆ get_datawriter_qos()

DDS::DataWriterQos * DCPS_IR_Publication::get_datawriter_qos ( )

◆ get_handle()

DDS::InstanceHandle_t DCPS_IR_Publication::get_handle ( void  )

Definition at line 619 of file DCPS_IR_Publication.cpp.

References handle_.

Referenced by DCPS_IR_Domain::dispose_publication_bit().

620 {
621  return handle_;
622 }
DDS::InstanceHandle_t handle_

◆ get_id()

OpenDDS::DCPS::GUID_t DCPS_IR_Publication::get_id ( void  )

◆ get_incompatibleQosStatus()

OpenDDS::DCPS::IncompatibleQosStatus * DCPS_IR_Publication::get_incompatibleQosStatus ( )

Return pointer to the incompatible qos status Publication retains ownership

Definition at line 589 of file DCPS_IR_Publication.cpp.

References incompatibleQosStatus_.

Referenced by DCPS_IR_Subscription::reevaluate_association(), reevaluate_association(), DCPS_IR_Topic_Description::try_associate(), DCPS_IR_Topic::try_associate(), and DCPS_IR_Topic_Description::try_associate_publication().

590 {
591  return &incompatibleQosStatus_;
592 }
OpenDDS::DCPS::IncompatibleQosStatus incompatibleQosStatus_

◆ get_participant_id()

OpenDDS::DCPS::GUID_t DCPS_IR_Publication::get_participant_id ( )

◆ get_publisher_qos()

DDS::PublisherQos * DCPS_IR_Publication::get_publisher_qos ( )

◆ get_serialized_type_info()

const DDS::OctetSeq & DCPS_IR_Publication::get_serialized_type_info ( ) const

◆ get_topic()

DCPS_IR_Topic * DCPS_IR_Publication::get_topic ( )

Definition at line 609 of file DCPS_IR_Publication.cpp.

References topic_.

Referenced by DCPS_IR_Domain::publish_publication_bit().

610 {
611  return topic_;
612 }

◆ get_topic_description()

DCPS_IR_Topic_Description * DCPS_IR_Publication::get_topic_description ( )

Definition at line 614 of file DCPS_IR_Publication.cpp.

References DCPS_IR_Topic::get_topic_description(), and topic_.

Referenced by DCPS_IR_Domain::publish_publication_bit().

615 {
616  return topic_->get_topic_description();
617 }
DCPS_IR_Topic_Description * get_topic_description()

◆ get_topic_id()

OpenDDS::DCPS::GUID_t DCPS_IR_Publication::get_topic_id ( )

◆ get_transportContext()

ACE_CDR::ULong DCPS_IR_Publication::get_transportContext ( ) const
inline

Definition at line 131 of file DCPS_IR_Publication.h.

References get_handle().

Referenced by DCPS_IR_Subscription::add_associated_publication().

131 { return transportContext_; }
ACE_CDR::ULong transportContext_

◆ get_transportLocatorSeq()

OpenDDS::DCPS::TransportLocatorSeq DCPS_IR_Publication::get_transportLocatorSeq ( ) const

◆ is_bit()

CORBA::Boolean DCPS_IR_Publication::is_bit ( )

Definition at line 629 of file DCPS_IR_Publication.cpp.

References isBIT_.

Referenced by DCPS_IR_Domain::dispose_publication_bit().

630 {
631  return isBIT_;
632 }

◆ is_subscription_ignored()

CORBA::Boolean DCPS_IR_Publication::is_subscription_ignored ( OpenDDS::DCPS::GUID_t  partId,
OpenDDS::DCPS::GUID_t  topicId,
OpenDDS::DCPS::GUID_t  subId 
)

Check that none of the ids given are ones that this publication should ignore. returns 1 if one of these ids is an ignored id

Definition at line 461 of file DCPS_IR_Publication.cpp.

References DCPS_IR_Participant::is_participant_ignored(), DCPS_IR_Participant::is_subscription_ignored(), DCPS_IR_Participant::is_topic_ignored(), and participant_.

Referenced by DCPS_IR_Topic_Description::try_associate().

464 {
465  CORBA::Boolean ignored;
466  ignored = (participant_->is_participant_ignored(partId) ||
467  participant_->is_topic_ignored(topicId) ||
469 
470  return ignored;
471 }
CORBA::Boolean is_subscription_ignored(OpenDDS::DCPS::GUID_t id)
DCPS_IR_Participant * participant_
ACE_CDR::Boolean Boolean
CORBA::Boolean is_participant_ignored(OpenDDS::DCPS::GUID_t id)
CORBA::Boolean is_topic_ignored(OpenDDS::DCPS::GUID_t id)

◆ reevaluate_association()

bool DCPS_IR_Publication::reevaluate_association ( DCPS_IR_Subscription subscription)

Definition at line 684 of file DCPS_IR_Publication.cpp.

References associations_, OpenDDS::DCPS::compatibleQOS(), ACE_Unbounded_Set_Ex< T, ACE_Unbounded_Set_Default_Comparator< T > >::find(), DCPS_IR_Subscription::get_datareader_qos(), get_datawriter_qos(), get_incompatibleQosStatus(), DCPS_IR_Subscription::get_incompatibleQosStatus(), get_publisher_qos(), DCPS_IR_Subscription::get_subscriber_qos(), DCPS_IR_Topic::get_topic_description(), get_transportLocatorSeq(), DCPS_IR_Subscription::get_transportLocatorSeq(), remove_associated_subscription(), topic_, and DCPS_IR_Topic_Description::try_associate().

Referenced by DCPS_IR_Topic::reevaluate_associations(), DCPS_IR_Topic_Description::reevaluate_associations(), reevaluate_defunct_associations(), and reevaluate_existing_associations().

685 {
686  int status = this->associations_.find(subscription);
687 
688  if (status == 0) {
689  // verify if they are still compatible after change
690 
691 
693  subscription->get_incompatibleQosStatus(),
694  this->get_transportLocatorSeq(),
695  subscription->get_transportLocatorSeq(),
696  this->get_datawriter_qos(),
697  subscription->get_datareader_qos(),
698  this->get_publisher_qos(),
699  subscription->get_subscriber_qos())) {
700  bool sendNotify = true; // inform datawriter
701  bool notify_lost = true; // invoke listerner callback
702 
703  this->remove_associated_subscription(subscription, sendNotify, notify_lost, true);
704  }
705 
706  } else {
708  return description->try_associate(this, subscription);
709  }
710 
711  return false;
712 }
const DDS::DataReaderQos * get_datareader_qos()
const DDS::SubscriberQos * get_subscriber_qos()
bool try_associate(DCPS_IR_Publication *publication, DCPS_IR_Subscription *subscription)
OpenDDS::DCPS::IncompatibleQosStatus * get_incompatibleQosStatus()
DCPS_IR_Topic_Description * get_topic_description()
int remove_associated_subscription(DCPS_IR_Subscription *sub, CORBA::Boolean sendNotify, CORBA::Boolean notify_lost, bool notify_both_side=false)
bool compatibleQOS(OpenDDS::DCPS::IncompatibleQosStatus *writerStatus, OpenDDS::DCPS::IncompatibleQosStatus *readerStatus, const OpenDDS::DCPS::TransportLocatorSeq &pubTLS, const OpenDDS::DCPS::TransportLocatorSeq &subTLS, DDS::DataWriterQos const *const writerQos, DDS::DataReaderQos const *const readerQos, DDS::PublisherQos const *const pubQos, DDS::SubscriberQos const *const subQos)
Definition: DCPS_Utils.cpp:237
DDS::DataWriterQos * get_datawriter_qos()
DDS::PublisherQos * get_publisher_qos()
OpenDDS::DCPS::TransportLocatorSeq get_transportLocatorSeq() const
OpenDDS::DCPS::IncompatibleQosStatus * get_incompatibleQosStatus()
Representative of a Topic Description.
OpenDDS::DCPS::TransportLocatorSeq get_transportLocatorSeq() const
DCPS_IR_Subscription_Set associations_

◆ reevaluate_defunct_associations()

void DCPS_IR_Publication::reevaluate_defunct_associations ( )

Definition at line 646 of file DCPS_IR_Publication.cpp.

References ACE_ERROR, ACE_TEXT(), ACE_Unbounded_Set_Ex< T, ACE_Unbounded_Set_Default_Comparator< T > >::begin(), defunct_, ACE_Unbounded_Set_Ex< T, ACE_Unbounded_Set_Default_Comparator< T > >::end(), DCPS_IR_Subscription::get_id(), id_, LM_ERROR, reevaluate_association(), and ACE_Unbounded_Set_Ex< T, ACE_Unbounded_Set_Default_Comparator< T > >::remove().

647 {
649  while (it != this->defunct_.end()) {
650  DCPS_IR_Subscription* subscription = *it;
651  ++it;
652 
653  if (reevaluate_association(subscription)) {
654  this->defunct_.remove(subscription); // no longer defunct
655 
656  } else {
657  OpenDDS::DCPS::RepoIdConverter pub_converter(id_);
658  OpenDDS::DCPS::RepoIdConverter sub_converter(subscription->get_id());
659  ACE_ERROR((LM_ERROR,
660  ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Publication::reevaluate_defunct_associations: ")
661  ACE_TEXT("publication %C failed to reassociate subscription %C at %x.\n"),
662  std::string(pub_converter).c_str(),
663  std::string(sub_converter).c_str(),
664  subscription));
665  }
666  }
667 }
#define ACE_ERROR(X)
bool reevaluate_association(DCPS_IR_Subscription *subscription)
DCPS_IR_Subscription_Set defunct_
Representative of a Subscription.
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
ACE_TEXT("TCP_Factory")
OpenDDS::DCPS::GUID_t id_
OpenDDS::DCPS::GUID_t get_id()

◆ reevaluate_existing_associations()

void DCPS_IR_Publication::reevaluate_existing_associations ( )

◆ remove_associated_subscription()

int DCPS_IR_Publication::remove_associated_subscription ( DCPS_IR_Subscription sub,
CORBA::Boolean  sendNotify,
CORBA::Boolean  notify_lost,
bool  notify_both_side = false 
)

Remove the associated subscription Removes the subscription from the list of associated subscriptions if return successful sendNotify indicates whether to tell the datawriter about removing the subscription The notify_lost parameter is passed to the remove_associations() The notify_both_side parameter indicates if it needs call sub to remove association as well. See the comments of remove_associations() in DataWriterRemote.idl or DataReaderRemote.idl. This method can mark the participant dead Returns 0 if successful

Definition at line 131 of file DCPS_IR_Publication.cpp.

References CORBA::Exception::_tao_print_exception(), ACE_DEBUG, ACE_ERROR, ACE_TEXT(), associations_, OpenDDS::DCPS::DCPS_debug_level, DCPS_IR_Subscription::get_id(), id_, DCPS_IR_Participant::is_alive(), DCPS_IR_Participant::isOwner(), LM_DEBUG, LM_ERROR, DCPS_IR_Participant::mark_dead(), OPENDDS_STRING, participant_, ACE_Unbounded_Set_Ex< T, ACE_Unbounded_Set_Default_Comparator< T > >::remove(), DCPS_IR_Subscription::remove_associated_publication(), and writer_.

Referenced by disassociate_participant(), DCPS_IR_Subscription::disassociate_participant(), DCPS_IR_Subscription::disassociate_publication(), disassociate_subscription(), disassociate_topic(), DCPS_IR_Subscription::disassociate_topic(), reevaluate_association(), DCPS_IR_Subscription::remove_associated_publication(), remove_associations(), and DCPS_IR_Subscription::remove_associations().

135 {
136  bool marked_dead = false;
137 
138  if (sendNotify) {
139  if (participant_->is_alive() && this->participant_->isOwner()) {
140  try {
142  OpenDDS::DCPS::RepoIdConverter pub_converter(id_);
143  OpenDDS::DCPS::RepoIdConverter sub_converter(sub->get_id());
144  ACE_DEBUG((LM_DEBUG,
145  ACE_TEXT("(%P|%t) DCPS_IR_Publication::remove_associated_subscription:")
146  ACE_TEXT(" calling pub %C with sub %C\n"),
147  OPENDDS_STRING(pub_converter).c_str(),
148  OPENDDS_STRING(sub_converter).c_str()));
149  }
150 
152  idSeq.length(1);
153  idSeq[0] = sub->get_id();
154 
155  writer_->remove_associations(idSeq, notify_lost);
156 
157  if (notify_both_side) {
158  sub->remove_associated_publication(this, sendNotify, notify_lost);
159  }
160 
161  } catch (const CORBA::Exception& ex) {
164  "(%P|%t) ERROR: Exception caught in DCPS_IR_Publication::remove_associated_subscription:");
165  }
166 
168  marked_dead = true;
169  }
170  }
171  }
172 
173  int status = associations_.remove(sub);
174 
175  if (0 == status) {
177  OpenDDS::DCPS::RepoIdConverter pub_converter(id_);
178  OpenDDS::DCPS::RepoIdConverter sub_converter(sub->get_id());
179  ACE_DEBUG((LM_DEBUG,
180  ACE_TEXT("(%P|%t) DCPS_IR_Publication::remove_associated_subscription: ")
181  ACE_TEXT("publication %C removed subscription %C at %x.\n"),
182  std::string(pub_converter).c_str(),
183  std::string(sub_converter).c_str(),
184  sub));
185  }
186 
187  } else {
188  OpenDDS::DCPS::RepoIdConverter pub_converter(id_);
189  OpenDDS::DCPS::RepoIdConverter sub_converter(sub->get_id());
190  ACE_ERROR((LM_ERROR,
191  ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Publication::remove_associated_subscription: ")
192  ACE_TEXT("publication %C failed to remove subscription %C at %x.\n"),
193  std::string(pub_converter).c_str(),
194  std::string(sub_converter).c_str(),
195  sub));
196  } // if (0 == status)
197 
198  if (marked_dead) {
199  return -1;
200 
201  } else {
202  return status;
203  }
204 }
#define ACE_DEBUG(X)
#define ACE_ERROR(X)
int remove_associated_publication(DCPS_IR_Publication *pub, CORBA::Boolean sendNotify, CORBA::Boolean notify_lost, bool notify_both_side=false)
DCPS_IR_Participant * participant_
#define OPENDDS_STRING
sequence< GUID_t > ReaderIdSeq
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
ACE_TEXT("TCP_Factory")
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
OpenDDS::DCPS::GUID_t id_
OpenDDS::DCPS::GUID_t get_id()
void _tao_print_exception(const char *info, FILE *f=stdout) const
Definition: Exception.cpp:82
DCPS_IR_Subscription_Set associations_
bool isOwner() const
Indication of whether the current repository is the owner of this participant.
OpenDDS::DCPS::DataWriterRemote_var writer_
the corresponding DataWriterRemote object

◆ remove_associations()

int DCPS_IR_Publication::remove_associations ( CORBA::Boolean  notify_lost)

Removes all the associated subscriptions This method can mark the participant dead The notify_lost flag true indicates this remove_associations is called when the InfoRepo detects this publication is lost because of the failure of invocation on this publication. Returns 0 if successful

Definition at line 206 of file DCPS_IR_Publication.cpp.

References associations_, ACE_Unbounded_Set_Ex< T, ACE_Unbounded_Set_Default_Comparator< T > >::begin(), defunct_, ACE_Unbounded_Set_Ex< T, ACE_Unbounded_Set_Default_Comparator< T > >::end(), DCPS_IR_Subscription::remove_associated_publication(), remove_associated_subscription(), ACE_Unbounded_Set_Ex< T, ACE_Unbounded_Set_Default_Comparator< T > >::reset(), send(), and ACE_Unbounded_Set_Ex< T, ACE_Unbounded_Set_Default_Comparator< T > >::size().

207 {
208  int status = 0;
209  DCPS_IR_Subscription* sub = 0;
210  size_t numAssociations = associations_.size();
211  CORBA::Boolean dontSend = 0;
212  CORBA::Boolean send = 1;
213 
214  if (0 < numAssociations) {
217 
218  while (iter != end) {
219  sub = *iter;
220  ++iter;
221  sub->remove_associated_publication(this, send, notify_lost);
222  remove_associated_subscription(sub, dontSend, notify_lost);
223  }
224  }
225  this->defunct_.reset();
226 
227  return status;
228 }
ssize_t send(ACE_HANDLE handle, const void *buf, size_t len, int flags, const ACE_Time_Value *timeout=0)
int remove_associated_subscription(DCPS_IR_Subscription *sub, CORBA::Boolean sendNotify, CORBA::Boolean notify_lost, bool notify_both_side=false)
int remove_associated_publication(DCPS_IR_Publication *pub, CORBA::Boolean sendNotify, CORBA::Boolean notify_lost, bool notify_both_side=false)
DCPS_IR_Subscription_Set defunct_
Representative of a Subscription.
ACE_CDR::Boolean Boolean
DCPS_IR_Subscription_Set associations_

◆ set_bit_status()

void DCPS_IR_Publication::set_bit_status ( CORBA::Boolean  isBIT)

Definition at line 634 of file DCPS_IR_Publication.cpp.

References isBIT_.

Referenced by DCPS_IR_Participant::add_publication(), and DCPS_IR_Domain::publish_publication_bit().

635 {
636  isBIT_ = isBIT;
637 }

◆ set_handle()

void DCPS_IR_Publication::set_handle ( DDS::InstanceHandle_t  handle)

Definition at line 624 of file DCPS_IR_Publication.cpp.

References handle_.

Referenced by DCPS_IR_Domain::publish_publication_bit().

625 {
626  handle_ = handle;
627 }
DDS::InstanceHandle_t handle_

◆ set_qos() [1/3]

bool DCPS_IR_Publication::set_qos ( const DDS::DataWriterQos qos,
const DDS::PublisherQos publisherQos,
Update::SpecificQos specificQos 
)

Update the DataWriter or Publisher qos and also publish the qos changes to datawriter BIT.

Definition at line 538 of file DCPS_IR_Publication.cpp.

References Update::DataWriterQos, DCPS_IR_Participant::get_domain_reference(), DCPS_IR_Topic::get_topic_description(), Update::NoQos, participant_, DCPS_IR_Domain::publish_publication_bit(), Update::PublisherQos, publisherQos_, qos_, DCPS_IR_Topic_Description::reevaluate_associations(), reevaluate_existing_associations(), OpenDDS::DCPS::should_check_association_upon_change(), and topic_.

Referenced by TAO_DDS_DCPSInfo_i::update_publication_qos().

541 {
542  bool need_evaluate = false;
543  bool u_dw_qos = !(qos_ == qos);
544 
545  if (u_dw_qos) {
547  need_evaluate = true;
548  }
549 
550  qos_ = qos;
551  }
552 
553  bool u_pub_qos = !(publisherQos_ == publisherQos);
554 
555  if (u_pub_qos) {
557  need_evaluate = true;
558  }
559 
560  publisherQos_ = publisherQos;
561  }
562 
563  if (need_evaluate) {
564  // Check if any existing association need be removed first.
566 
568  description->reevaluate_associations(this);
569  }
570 
572  specificQos = u_dw_qos? Update::DataWriterQos:
573  u_pub_qos? Update::PublisherQos:
575 
576  return true;
577 }
DDS::PublisherQos publisherQos_
DCPS_IR_Topic_Description * get_topic_description()
bool should_check_association_upon_change(const DDS::DataReaderQos &qos1, const DDS::DataReaderQos &qos2)
Definition: DCPS_Utils.cpp:415
DCPS_IR_Participant * participant_
Representative of a Topic Description.
DCPS_IR_Domain * get_domain_reference() const
DDS::DataWriterQos qos_
void publish_publication_bit(DCPS_IR_Publication *publication)
Publish Publication in the Publication Built-In Topic.
void reevaluate_associations(DCPS_IR_Publication *publication)

◆ set_qos() [2/3]

void DCPS_IR_Publication::set_qos ( const DDS::DataWriterQos qos)

Update DataWriterQos only.

Definition at line 481 of file DCPS_IR_Publication.cpp.

References DCPS_IR_Participant::get_domain_reference(), DCPS_IR_Topic::get_topic_description(), participant_, DCPS_IR_Domain::publish_publication_bit(), qos_, DCPS_IR_Topic_Description::reevaluate_associations(), reevaluate_existing_associations(), OpenDDS::DCPS::should_check_association_upon_change(), ACE_OS::sleep(), and topic_.

482 {
483  if (false == (qos == this->qos_)) {
484  // Check if we should check while we have both values.
486 
487  // Store the new, compatible, value.
488  this->qos_ = qos;
489 
490  if (check) {
491  // This will remove any newly stale associations.
493 
494  // Sleep a while to let remove_association handled by DataWriter
495  // before add_association. Otherwise, new association will have
496  // trouble to connect each other.
497  ACE_OS::sleep(ACE_Time_Value(0, 250000));
498 
499  // This will establish any newly made associations.
500  DCPS_IR_Topic_Description* description
501  = this->topic_->get_topic_description();
502  description->reevaluate_associations(this);
503  }
504 
506  }
507 }
int sleep(u_int seconds)
DCPS_IR_Topic_Description * get_topic_description()
bool should_check_association_upon_change(const DDS::DataReaderQos &qos1, const DDS::DataReaderQos &qos2)
Definition: DCPS_Utils.cpp:415
DCPS_IR_Participant * participant_
Representative of a Topic Description.
DCPS_IR_Domain * get_domain_reference() const
DDS::DataWriterQos qos_
void publish_publication_bit(DCPS_IR_Publication *publication)
Publish Publication in the Publication Built-In Topic.
void reevaluate_associations(DCPS_IR_Publication *publication)

◆ set_qos() [3/3]

void DCPS_IR_Publication::set_qos ( const DDS::PublisherQos qos)

Update PublisherQos only.

Definition at line 510 of file DCPS_IR_Publication.cpp.

References DCPS_IR_Participant::get_domain_reference(), DCPS_IR_Topic::get_topic_description(), participant_, DCPS_IR_Domain::publish_publication_bit(), publisherQos_, DCPS_IR_Topic_Description::reevaluate_associations(), reevaluate_existing_associations(), OpenDDS::DCPS::should_check_association_upon_change(), ACE_OS::sleep(), and topic_.

511 {
512  if (false == (qos == this->publisherQos_)) {
513  // Check if we should check while we have both values.
515 
516  // Store the new, compatible, value.
517  this->publisherQos_ = qos;
518 
519  if (check) {
520  // This will remove any newly stale associations.
522 
523  // Sleep a while to let remove_association handled by DataWriter
524  // before add_association. Otherwise, new association will have
525  // trouble to connect each other.
526  ACE_OS::sleep(ACE_Time_Value(0, 250000));
527 
528  // This will establish any newly made associations.
529  DCPS_IR_Topic_Description* description
530  = this->topic_->get_topic_description();
531  description->reevaluate_associations(this);
532  }
533 
535  }
536 }
DDS::PublisherQos publisherQos_
int sleep(u_int seconds)
DCPS_IR_Topic_Description * get_topic_description()
bool should_check_association_upon_change(const DDS::DataReaderQos &qos1, const DDS::DataReaderQos &qos2)
Definition: DCPS_Utils.cpp:415
DCPS_IR_Participant * participant_
Representative of a Topic Description.
DCPS_IR_Domain * get_domain_reference() const
void publish_publication_bit(DCPS_IR_Publication *publication)
Publish Publication in the Publication Built-In Topic.
void reevaluate_associations(DCPS_IR_Publication *publication)

◆ update_expr_params()

void DCPS_IR_Publication::update_expr_params ( OpenDDS::DCPS::GUID_t  readerId,
const DDS::StringSeq params 
)

Definition at line 715 of file DCPS_IR_Publication.cpp.

References CORBA::Exception::_tao_print_exception(), OpenDDS::DCPS::DCPS_debug_level, DCPS_IR_Participant::mark_dead(), participant_, and writer_.

717 {
718  try {
719  writer_->update_subscription_params(readerId, params);
720  } catch (const CORBA::SystemException& ex) {
722  ex._tao_print_exception("(%P|%t) ERROR: Exception caught in "
723  "DCPS_IR_Publication::update_expr_params:");
724  }
726  }
727 }
DCPS_IR_Participant * participant_
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
void _tao_print_exception(const char *info, FILE *f=stdout) const
Definition: Exception.cpp:82
OpenDDS::DCPS::DataWriterRemote_var writer_
the corresponding DataWriterRemote object

◆ update_incompatible_qos()

void DCPS_IR_Publication::update_incompatible_qos ( )

Notify the writer of incompatible qos status and reset the status' count_since_last_send to 0

Definition at line 444 of file DCPS_IR_Publication.cpp.

References CORBA::Exception::_tao_print_exception(), OpenDDS::DCPS::IncompatibleQosStatus::count_since_last_send, OpenDDS::DCPS::DCPS_debug_level, incompatibleQosStatus_, DCPS_IR_Participant::is_alive(), DCPS_IR_Participant::isOwner(), DCPS_IR_Participant::mark_dead(), participant_, and writer_.

Referenced by DCPS_IR_Topic::try_associate(), and DCPS_IR_Topic_Description::try_associate_publication().

445 {
446  if (participant_->is_alive() && this->participant_->isOwner()) {
447  try {
448  writer_->update_incompatible_qos(incompatibleQosStatus_);
450  } catch (const CORBA::Exception& ex) {
453  "(%P|%t) ERROR: Exception caught in DCPS_IR_Publication::update_incompatible_qos:");
454  }
455 
457  }
458  }
459 }
DCPS_IR_Participant * participant_
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
OpenDDS::DCPS::IncompatibleQosStatus incompatibleQosStatus_
void _tao_print_exception(const char *info, FILE *f=stdout) const
Definition: Exception.cpp:82
bool isOwner() const
Indication of whether the current repository is the owner of this participant.
OpenDDS::DCPS::DataWriterRemote_var writer_
the corresponding DataWriterRemote object

◆ writer()

OpenDDS::DCPS::DataWriterRemote_ptr DCPS_IR_Publication::writer ( void  )

Definition at line 640 of file DCPS_IR_Publication.cpp.

References writer_.

Referenced by OpenDDS::Federator::ManagerImpl::pushState().

641 {
642  return OpenDDS::DCPS::DataWriterRemote::_duplicate(this->writer_.in());
643 }
OpenDDS::DCPS::DataWriterRemote_var writer_
the corresponding DataWriterRemote object

Member Data Documentation

◆ associations_

DCPS_IR_Subscription_Set DCPS_IR_Publication::associations_
private

◆ defunct_

DCPS_IR_Subscription_Set DCPS_IR_Publication::defunct_
private

◆ handle_

DDS::InstanceHandle_t DCPS_IR_Publication::handle_
private

Definition at line 181 of file DCPS_IR_Publication.h.

Referenced by get_handle(), and set_handle().

◆ id_

OpenDDS::DCPS::GUID_t DCPS_IR_Publication::id_
private

◆ incompatibleQosStatus_

OpenDDS::DCPS::IncompatibleQosStatus DCPS_IR_Publication::incompatibleQosStatus_
private

◆ info_

OpenDDS::DCPS::TransportLocatorSeq DCPS_IR_Publication::info_
private

Definition at line 187 of file DCPS_IR_Publication.h.

Referenced by get_transportLocatorSeq().

◆ isBIT_

CORBA::Boolean DCPS_IR_Publication::isBIT_
private

Definition at line 182 of file DCPS_IR_Publication.h.

Referenced by dump_to_string(), is_bit(), and set_bit_status().

◆ participant_

DCPS_IR_Participant* DCPS_IR_Publication::participant_
private

◆ publisherQos_

DDS::PublisherQos DCPS_IR_Publication::publisherQos_
private

Definition at line 189 of file DCPS_IR_Publication.h.

Referenced by get_publisher_qos(), and set_qos().

◆ qos_

DDS::DataWriterQos DCPS_IR_Publication::qos_
private

Definition at line 186 of file DCPS_IR_Publication.h.

Referenced by get_datawriter_qos(), and set_qos().

◆ serializedTypeInfo_

DDS::OctetSeq DCPS_IR_Publication::serializedTypeInfo_
private

Definition at line 190 of file DCPS_IR_Publication.h.

Referenced by get_serialized_type_info().

◆ topic_

DCPS_IR_Topic* DCPS_IR_Publication::topic_
private

◆ transportContext_

ACE_CDR::ULong DCPS_IR_Publication::transportContext_
private

Definition at line 188 of file DCPS_IR_Publication.h.

◆ writer_

OpenDDS::DCPS::DataWriterRemote_var DCPS_IR_Publication::writer_
private

The documentation for this class was generated from the following files: