OpenDDS  Snapshot(2023/03/10-19:29)
Public Member Functions | Private Attributes | List of all members
DCPS_IR_Subscription Class Reference

Representative of a Subscription. More...

#include <DCPS_IR_Subscription.h>

Inheritance diagram for DCPS_IR_Subscription:
Inheritance graph
[legend]
Collaboration diagram for DCPS_IR_Subscription:
Collaboration graph
[legend]

Public Member Functions

 DCPS_IR_Subscription (const OpenDDS::DCPS::GUID_t &id, DCPS_IR_Participant *participant, DCPS_IR_Topic *topic, OpenDDS::DCPS::DataReaderRemote_ptr reader, const DDS::DataReaderQos &qos, const OpenDDS::DCPS::TransportLocatorSeq &info, ACE_CDR::ULong transportContext, const DDS::SubscriberQos &subscriberQos, const char *filterClassName, const char *filterExpression, const DDS::StringSeq &exprParams, const DDS::OctetSeq &serializedTypeInfo)
 
 ~DCPS_IR_Subscription ()
 
int add_associated_publication (DCPS_IR_Publication *pub, bool active)
 
int remove_associated_publication (DCPS_IR_Publication *pub, CORBA::Boolean sendNotify, CORBA::Boolean notify_lost, bool notify_both_side=false)
 
int remove_associations (CORBA::Boolean notify_lost)
 
void disassociate_participant (OpenDDS::DCPS::GUID_t id, bool reassociate=false)
 Remove any publications whose participant has the id. More...
 
void disassociate_topic (OpenDDS::DCPS::GUID_t id)
 Remove any publications whose topic has the id. More...
 
void disassociate_publication (OpenDDS::DCPS::GUID_t id, bool reassociate=false)
 Remove any publications with id. More...
 
void update_incompatible_qos ()
 
CORBA::Boolean is_publication_ignored (OpenDDS::DCPS::GUID_t partId, OpenDDS::DCPS::GUID_t topicId, OpenDDS::DCPS::GUID_t pubId)
 
const DDS::DataReaderQosget_datareader_qos ()
 
const DDS::SubscriberQosget_subscriber_qos ()
 
bool set_qos (const DDS::DataReaderQos &qos, const DDS::SubscriberQos &subscriberQos, Update::SpecificQos &specificQos)
 
void set_qos (const DDS::DataReaderQos &qos)
 Update DataReaderQos only. More...
 
void set_qos (const DDS::SubscriberQos &qos)
 Update SubscriberQos only. More...
 
void reevaluate_defunct_associations ()
 
void reevaluate_existing_associations ()
 
bool reevaluate_association (DCPS_IR_Publication *publication)
 
OpenDDS::DCPS::TransportLocatorSeq get_transportLocatorSeq () const
 
ACE_CDR::ULong get_transportContext () const
 
OpenDDS::DCPS::IncompatibleQosStatusget_incompatibleQosStatus ()
 
OpenDDS::DCPS::GUID_t get_id ()
 
OpenDDS::DCPS::GUID_t get_topic_id ()
 
OpenDDS::DCPS::GUID_t get_participant_id ()
 
DCPS_IR_Topic_Descriptionget_topic_description ()
 
DCPS_IR_Topicget_topic ()
 
DDS::InstanceHandle_t get_handle ()
 
void set_handle (DDS::InstanceHandle_t handle)
 
CORBA::Boolean is_bit ()
 
void set_bit_status (CORBA::Boolean isBIT)
 
OpenDDS::DCPS::DataReaderRemote_ptr reader ()
 
std::string get_filter_class_name () const
 
std::string get_filter_expression () const
 
DDS::StringSeq get_expr_params () const
 
void update_expr_params (const DDS::StringSeq &params)
 Calls associated Publications. More...
 
std::string dump_to_string (const std::string &prefix, int depth) const
 
const DDS::OctetSeqget_serialized_type_info () const
 

Private Attributes

OpenDDS::DCPS::GUID_t id_
 
DCPS_IR_Participantparticipant_
 
DCPS_IR_Topictopic_
 
DDS::InstanceHandle_t handle_
 
CORBA::Boolean isBIT_
 
OpenDDS::DCPS::DataReaderRemote_var reader_
 the corresponding DataReaderRemote object More...
 
DDS::DataReaderQos qos_
 
OpenDDS::DCPS::TransportLocatorSeq info_
 
ACE_CDR::ULong transportContext_
 
DDS::SubscriberQos subscriberQos_
 
std::string filterClassName_
 
std::string filterExpression_
 
DDS::StringSeq exprParams_
 
DDS::OctetSeq serializedTypeInfo_
 
DCPS_IR_Publication_Set associations_
 
DCPS_IR_Publication_Set defunct_
 
OpenDDS::DCPS::IncompatibleQosStatus incompatibleQosStatus_
 

Additional Inherited Members

- Protected Member Functions inherited from OpenDDS::DCPS::EnableContainerSupportedUniquePtr< DCPS_IR_Subscription >
 EnableContainerSupportedUniquePtr ()
 
void _add_ref ()
 
void _remove_ref ()
 
long ref_count () const
 

Detailed Description

Representative of a Subscription.

Definition at line 41 of file DCPS_IR_Subscription.h.

Constructor & Destructor Documentation

◆ DCPS_IR_Subscription()

OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL DCPS_IR_Subscription::DCPS_IR_Subscription ( const OpenDDS::DCPS::GUID_t id,
DCPS_IR_Participant participant,
DCPS_IR_Topic topic,
OpenDDS::DCPS::DataReaderRemote_ptr  reader,
const DDS::DataReaderQos qos,
const OpenDDS::DCPS::TransportLocatorSeq info,
ACE_CDR::ULong  transportContext,
const DDS::SubscriberQos subscriberQos,
const char *  filterClassName,
const char *  filterExpression,
const DDS::StringSeq exprParams,
const DDS::OctetSeq serializedTypeInfo 
)

Definition at line 25 of file DCPS_IR_Subscription.cpp.

References OpenDDS::DCPS::IncompatibleQosStatus::count_since_last_send, incompatibleQosStatus_, reader_, and OpenDDS::DCPS::IncompatibleQosStatus::total_count.

37  : id_(id),
38  participant_(participant),
39  topic_(topic),
40  handle_(0),
41  isBIT_(0),
42  qos_(qos),
43  info_(info),
44  transportContext_(transportContext),
45  subscriberQos_(subscriberQos),
46  filterClassName_(filterClassName),
47  filterExpression_(filterExpression),
48  exprParams_(exprParams),
49  serializedTypeInfo_(serializedTypeInfo)
50 {
51  reader_ = OpenDDS::DCPS::DataReaderRemote::_duplicate(reader);
52 
55 }
DDS::InstanceHandle_t handle_
DCPS_IR_Participant * participant_
OpenDDS::DCPS::DataReaderRemote_ptr reader()
OpenDDS::DCPS::TransportLocatorSeq info_
DDS::DataReaderQos qos_
DDS::SubscriberQos subscriberQos_
DDS::OctetSeq serializedTypeInfo_
OpenDDS::DCPS::DataReaderRemote_var reader_
the corresponding DataReaderRemote object
ACE_CDR::ULong transportContext_
OpenDDS::DCPS::GUID_t id_
OpenDDS::DCPS::IncompatibleQosStatus incompatibleQosStatus_

◆ ~DCPS_IR_Subscription()

DCPS_IR_Subscription::~DCPS_IR_Subscription ( )

Definition at line 57 of file DCPS_IR_Subscription.cpp.

58 {
59 }

Member Function Documentation

◆ add_associated_publication()

int DCPS_IR_Subscription::add_associated_publication ( DCPS_IR_Publication pub,
bool  active 
)

Associate with the publication Adds the publication to the list of associated publications and notifies datareader if successfully added This method can mark the participant dead Returns 0 if added, 1 if already exists, -1 other failure

Definition at line 61 of file DCPS_IR_Subscription.cpp.

References CORBA::Exception::_tao_print_exception(), ACE_DEBUG, ACE_ERROR, ACE_TEXT(), associations_, OpenDDS::DCPS::DCPS_debug_level, DCPS_IR_Publication::get_datawriter_qos(), DCPS_IR_Publication::get_id(), DCPS_IR_Publication::get_publisher_qos(), DCPS_IR_Publication::get_serialized_type_info(), DCPS_IR_Publication::get_transportContext(), DCPS_IR_Publication::get_transportLocatorSeq(), id_, ACE_Unbounded_Set_Ex< T, ACE_Unbounded_Set_Default_Comparator< T > >::insert(), DCPS_IR_Participant::is_alive(), DCPS_IR_Participant::isOwner(), LM_DEBUG, LM_ERROR, DCPS_IR_Participant::mark_dead(), participant_, OpenDDS::DCPS::WriterAssociation::pubQos, reader_, OpenDDS::DCPS::WriterAssociation::serializedTypeInfo, OpenDDS::DCPS::WriterAssociation::transportContext, OpenDDS::DCPS::WriterAssociation::writerId, OpenDDS::DCPS::WriterAssociation::writerQos, and OpenDDS::DCPS::WriterAssociation::writerTransInfo.

Referenced by DCPS_IR_Topic_Description::associate().

63 {
64  // keep track of the association locally
65  int status = associations_.insert(pub);
66 
67  switch (status) {
68  case 0: {
69  // inform the datareader about the association
71  association.writerTransInfo = pub->get_transportLocatorSeq();
72  association.transportContext = pub->get_transportContext();
73  association.writerId = pub->get_id();
74  association.pubQos = *(pub->get_publisher_qos());
75  association.writerQos = *(pub->get_datawriter_qos());
76  association.serializedTypeInfo = pub->get_serialized_type_info();
77  if (participant_->is_alive() && this->participant_->isOwner()) {
78  try {
80  OpenDDS::DCPS::RepoIdConverter sub_converter(id_);
81  OpenDDS::DCPS::RepoIdConverter pub_converter(pub->get_id());
82  ACE_DEBUG((LM_DEBUG,
83  ACE_TEXT("(%P|%t) DCPS_IR_Subscription::add_associated_publication:")
84  ACE_TEXT(" subscription %C adding publication %C.\n"),
85  std::string(sub_converter).c_str(),
86  std::string(pub_converter).c_str()));
87  }
88 
89  reader_->add_association(id_, association, active);
90 
92  ACE_DEBUG((LM_DEBUG,
93  ACE_TEXT("(%P|%t) DCPS_IR_Subscription::add_associated_publication: ")
94  ACE_TEXT("successfully added publication %x\n"),
95  pub));
96  }
97  } catch (const CORBA::Exception& ex) {
99  "(%P|%t) ERROR: Exception caught in DCPS_IR_Subscription::add_associated_publication:");
101  status = -1;
102  }
103  }
104 
105  }
106  break;
107 
108  case 1: {
109  OpenDDS::DCPS::RepoIdConverter sub_converter(id_);
110  OpenDDS::DCPS::RepoIdConverter pub_converter(pub->get_id());
111  ACE_ERROR((LM_ERROR,
112  ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Subscription::add_associated_publication: ")
113  ACE_TEXT("subscription %C attempted to re-add publication %C\n"),
114  std::string(sub_converter).c_str(),
115  std::string(pub_converter).c_str()));
116  }
117  break;
118 
119  case -1: {
120  OpenDDS::DCPS::RepoIdConverter sub_converter(id_);
121  OpenDDS::DCPS::RepoIdConverter pub_converter(pub->get_id());
122  ACE_ERROR((LM_ERROR,
123  ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Subscription::add_associated_publication: ")
124  ACE_TEXT("subscription %C failed to add publication %C\n"),
125  std::string(sub_converter).c_str(),
126  std::string(pub_converter).c_str()));
127  }
128  }
129 
130  return status;
131 }
#define ACE_DEBUG(X)
#define ACE_ERROR(X)
DCPS_IR_Participant * participant_
ACE_CDR::ULong get_transportContext() const
OpenDDS::DCPS::GUID_t get_id()
DDS::DataWriterQos * get_datawriter_qos()
DDS::PublisherQos * get_publisher_qos()
OpenDDS::DCPS::DataReaderRemote_var reader_
the corresponding DataReaderRemote object
OpenDDS::DCPS::TransportLocatorSeq get_transportLocatorSeq() const
OpenDDS::DCPS::GUID_t id_
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
ACE_TEXT("TCP_Factory")
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
void _tao_print_exception(const char *info, FILE *f=stdout) const
Definition: Exception.cpp:82
DCPS_IR_Publication_Set associations_
bool isOwner() const
Indication of whether the current repository is the owner of this participant.
const DDS::OctetSeq & get_serialized_type_info() const

◆ disassociate_participant()

void DCPS_IR_Subscription::disassociate_participant ( OpenDDS::DCPS::GUID_t  id,
bool  reassociate = false 
)

Remove any publications whose participant has the id.

Definition at line 236 of file DCPS_IR_Subscription.cpp.

References CORBA::Exception::_tao_print_exception(), ACE_DEBUG, ACE_ERROR, ACE_TEXT(), associations_, ACE_Unbounded_Set_Ex< T, ACE_Unbounded_Set_Default_Comparator< T > >::begin(), OpenDDS::DCPS::DCPS_debug_level, defunct_, ACE_Unbounded_Set_Ex< T, ACE_Unbounded_Set_Default_Comparator< T > >::end(), DCPS_IR_Publication::get_id(), DCPS_IR_Publication::get_participant_id(), id_, ACE_Unbounded_Set_Ex< T, ACE_Unbounded_Set_Default_Comparator< T > >::insert(), DCPS_IR_Participant::is_alive(), DCPS_IR_Participant::isOwner(), LM_DEBUG, LM_ERROR, DCPS_IR_Participant::mark_dead(), participant_, reader_, remove_associated_publication(), DCPS_IR_Publication::remove_associated_subscription(), send(), and ACE_Unbounded_Set_Ex< T, ACE_Unbounded_Set_Default_Comparator< T > >::size().

238 {
239  DCPS_IR_Publication* pub = 0;
240  size_t numAssociations = associations_.size();
241  CORBA::Boolean dontSend = 0;
242  CORBA::Boolean send = 1;
243  long count = 0;
244 
245  if (0 < numAssociations) {
246  OpenDDS::DCPS::WriterIdSeq idSeq(static_cast<CORBA::ULong>(numAssociations));
247  idSeq.length(static_cast<CORBA::ULong>(numAssociations));
248 
251 
252  while (iter != end) {
253  pub = *iter;
254  ++iter;
255 
257  OpenDDS::DCPS::RepoIdConverter sub_converter(id_);
258  OpenDDS::DCPS::RepoIdConverter pub_converter(pub->get_id());
259  OpenDDS::DCPS::RepoIdConverter sub_part_converter(id);
260  OpenDDS::DCPS::RepoIdConverter pub_part_converter(pub->get_participant_id());
261  ACE_DEBUG((LM_DEBUG,
262  ACE_TEXT("(%P|%t) DCPS_IR_Subscription::disassociate_participant: ")
263  ACE_TEXT("subscription %C testing if publication %C particpant %C == %C.\n"),
264  std::string(sub_converter).c_str(),
265  std::string(pub_converter).c_str(),
266  std::string(sub_part_converter).c_str(),
267  std::string(pub_part_converter).c_str()));
268  }
269 
270  if (id == pub->get_participant_id()) {
271  CORBA::Boolean dont_notify_lost = 0;
272  pub->remove_associated_subscription(this, send, dont_notify_lost);
273  remove_associated_publication(pub, dontSend, dont_notify_lost);
274 
275  idSeq[count] = pub->get_id();
276  ++count;
277 
278  if (reassociate && this->defunct_.insert(pub) != 0) {
279  OpenDDS::DCPS::RepoIdConverter sub_converter(id_);
280  OpenDDS::DCPS::RepoIdConverter pub_converter(pub->get_id());
281  ACE_ERROR((LM_ERROR,
282  ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Subscription::disassociate_participant: ")
283  ACE_TEXT("subscription %C failed to reassociate publication %C at %x.\n"),
284  std::string(sub_converter).c_str(),
285  std::string(pub_converter).c_str(),
286  pub));
287  }
288  }
289  }
290 
291  if (0 < count) {
292  idSeq.length(count);
293 
294  if (participant_->is_alive() && this->participant_->isOwner()) {
295  try {
296  CORBA::Boolean dont_notify_lost = 0;
297  reader_->remove_associations(idSeq, dont_notify_lost);
298 
299  } catch (const CORBA::Exception& ex) {
302  "(%P|%t) ERROR: Exception caught in DCPS_IR_Subscription::disassociate_participant:");
303  }
304 
306  }
307  }
308  }
309  }
310 }
#define ACE_DEBUG(X)
#define ACE_ERROR(X)
DCPS_IR_Participant * participant_
Representative of a Publication.
ssize_t send(ACE_HANDLE handle, const void *buf, size_t len, int flags, const ACE_Time_Value *timeout=0)
int remove_associated_subscription(DCPS_IR_Subscription *sub, CORBA::Boolean sendNotify, CORBA::Boolean notify_lost, bool notify_both_side=false)
int remove_associated_publication(DCPS_IR_Publication *pub, CORBA::Boolean sendNotify, CORBA::Boolean notify_lost, bool notify_both_side=false)
OpenDDS::DCPS::GUID_t get_id()
OpenDDS::DCPS::DataReaderRemote_var reader_
the corresponding DataReaderRemote object
ACE_CDR::Boolean Boolean
DCPS_IR_Publication_Set defunct_
OpenDDS::DCPS::GUID_t id_
sequence< GUID_t > WriterIdSeq
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
ACE_TEXT("TCP_Factory")
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
OpenDDS::DCPS::GUID_t get_participant_id()
void _tao_print_exception(const char *info, FILE *f=stdout) const
Definition: Exception.cpp:82
DCPS_IR_Publication_Set associations_
bool isOwner() const
Indication of whether the current repository is the owner of this participant.

◆ disassociate_publication()

void DCPS_IR_Subscription::disassociate_publication ( OpenDDS::DCPS::GUID_t  id,
bool  reassociate = false 
)

Remove any publications with id.

Definition at line 376 of file DCPS_IR_Subscription.cpp.

References CORBA::Exception::_tao_print_exception(), ACE_DEBUG, ACE_ERROR, ACE_TEXT(), associations_, ACE_Unbounded_Set_Ex< T, ACE_Unbounded_Set_Default_Comparator< T > >::begin(), OpenDDS::DCPS::DCPS_debug_level, defunct_, ACE_Unbounded_Set_Ex< T, ACE_Unbounded_Set_Default_Comparator< T > >::end(), DCPS_IR_Publication::get_id(), id_, ACE_Unbounded_Set_Ex< T, ACE_Unbounded_Set_Default_Comparator< T > >::insert(), DCPS_IR_Participant::is_alive(), DCPS_IR_Participant::isOwner(), LM_DEBUG, LM_ERROR, DCPS_IR_Participant::mark_dead(), participant_, reader_, remove_associated_publication(), DCPS_IR_Publication::remove_associated_subscription(), send(), and ACE_Unbounded_Set_Ex< T, ACE_Unbounded_Set_Default_Comparator< T > >::size().

Referenced by TAO_DDS_DCPSInfo_i::disassociate_subscription().

378 {
379  DCPS_IR_Publication* pub = 0;
380  size_t numAssociations = associations_.size();
381  CORBA::Boolean dontSend = 0;
382  CORBA::Boolean send = 1;
383  long count = 0;
384 
385  if (0 < numAssociations) {
386  OpenDDS::DCPS::WriterIdSeq idSeq(static_cast<CORBA::ULong>(numAssociations));
387  idSeq.length(static_cast<CORBA::ULong>(numAssociations));
388 
391 
392  while (iter != end) {
393  pub = *iter;
394  ++iter;
395 
397  OpenDDS::DCPS::RepoIdConverter sub_converter(id_);
398  OpenDDS::DCPS::RepoIdConverter pub_converter(pub->get_id());
399  OpenDDS::DCPS::RepoIdConverter sub_pub_converter(id);
400  ACE_DEBUG((LM_DEBUG,
401  ACE_TEXT("(%P|%t) DCPS_IR_Subscription::disassociate_publication: ")
402  ACE_TEXT("subscription %C testing if publication %C == %C.\n"),
403  std::string(sub_converter).c_str(),
404  std::string(pub_converter).c_str(),
405  std::string(sub_pub_converter).c_str()));
406  }
407 
408  if (id == pub->get_id()) {
409  CORBA::Boolean dont_notify_lost = 0;
410  pub->remove_associated_subscription(this, send, dont_notify_lost);
411  remove_associated_publication(pub, dontSend, dont_notify_lost);
412 
413  idSeq[count] = pub->get_id();
414  ++count;
415 
416  if (reassociate && this->defunct_.insert(pub) != 0) {
417  OpenDDS::DCPS::RepoIdConverter sub_converter(id_);
418  OpenDDS::DCPS::RepoIdConverter pub_converter(pub->get_id());
419  ACE_ERROR((LM_ERROR,
420  ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Subscription::disassociate_publication: ")
421  ACE_TEXT("subscription %C failed to reassociate publication %C at %x.\n"),
422  std::string(sub_converter).c_str(),
423  std::string(pub_converter).c_str(),
424  pub));
425  }
426  }
427  }
428 
429  if (0 < count) {
430  idSeq.length(count);
431 
432  if (participant_->is_alive() && this->participant_->isOwner()) {
433  try {
434  CORBA::Boolean dont_notify_lost = 0;
435  reader_->remove_associations(idSeq, dont_notify_lost);
436 
437  } catch (const CORBA::Exception& ex) {
440  "(%P|%t) ERROR: Exception caught in DCPS_IR_Subscription::remove_associations:");
441  }
442 
444  }
445  }
446  }
447  }
448 }
#define ACE_DEBUG(X)
#define ACE_ERROR(X)
DCPS_IR_Participant * participant_
Representative of a Publication.
ssize_t send(ACE_HANDLE handle, const void *buf, size_t len, int flags, const ACE_Time_Value *timeout=0)
int remove_associated_subscription(DCPS_IR_Subscription *sub, CORBA::Boolean sendNotify, CORBA::Boolean notify_lost, bool notify_both_side=false)
int remove_associated_publication(DCPS_IR_Publication *pub, CORBA::Boolean sendNotify, CORBA::Boolean notify_lost, bool notify_both_side=false)
OpenDDS::DCPS::GUID_t get_id()
OpenDDS::DCPS::DataReaderRemote_var reader_
the corresponding DataReaderRemote object
ACE_CDR::Boolean Boolean
DCPS_IR_Publication_Set defunct_
OpenDDS::DCPS::GUID_t id_
sequence< GUID_t > WriterIdSeq
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
ACE_TEXT("TCP_Factory")
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
void _tao_print_exception(const char *info, FILE *f=stdout) const
Definition: Exception.cpp:82
DCPS_IR_Publication_Set associations_
bool isOwner() const
Indication of whether the current repository is the owner of this participant.

◆ disassociate_topic()

void DCPS_IR_Subscription::disassociate_topic ( OpenDDS::DCPS::GUID_t  id)

Remove any publications whose topic has the id.

Definition at line 312 of file DCPS_IR_Subscription.cpp.

References CORBA::Exception::_tao_print_exception(), ACE_DEBUG, ACE_TEXT(), associations_, ACE_Unbounded_Set_Ex< T, ACE_Unbounded_Set_Default_Comparator< T > >::begin(), OpenDDS::DCPS::DCPS_debug_level, ACE_Unbounded_Set_Ex< T, ACE_Unbounded_Set_Default_Comparator< T > >::end(), DCPS_IR_Publication::get_id(), DCPS_IR_Publication::get_topic_id(), id_, DCPS_IR_Participant::is_alive(), DCPS_IR_Participant::isOwner(), LM_DEBUG, DCPS_IR_Participant::mark_dead(), participant_, reader_, remove_associated_publication(), DCPS_IR_Publication::remove_associated_subscription(), send(), and ACE_Unbounded_Set_Ex< T, ACE_Unbounded_Set_Default_Comparator< T > >::size().

313 {
314  DCPS_IR_Publication* pub = 0;
315  size_t numAssociations = associations_.size();
316  CORBA::Boolean dontSend = 0;
317  CORBA::Boolean send = 1;
318  long count = 0;
319 
320  if (0 < numAssociations) {
321  OpenDDS::DCPS::WriterIdSeq idSeq(static_cast<CORBA::ULong>(numAssociations));
322  idSeq.length(static_cast<CORBA::ULong>(numAssociations));
323 
326 
327  while (iter != end) {
328  pub = *iter;
329  ++iter;
330 
332  OpenDDS::DCPS::RepoIdConverter sub_converter(id_);
333  OpenDDS::DCPS::RepoIdConverter pub_converter(pub->get_id());
334  OpenDDS::DCPS::RepoIdConverter sub_topic_converter(id);
335  OpenDDS::DCPS::RepoIdConverter pub_topic_converter(pub->get_topic_id());
336  ACE_DEBUG((LM_DEBUG,
337  ACE_TEXT("(%P|%t) DCPS_IR_Subscription::disassociate_topic: ")
338  ACE_TEXT("subscription %C testing if publication %C topic %C == %C.\n"),
339  std::string(sub_converter).c_str(),
340  std::string(pub_converter).c_str(),
341  std::string(sub_topic_converter).c_str(),
342  std::string(pub_topic_converter).c_str()));
343  }
344 
345  if (id == pub->get_topic_id()) {
346  CORBA::Boolean dont_notify_lost = 0;
347  pub->remove_associated_subscription(this, send, dont_notify_lost);
348  remove_associated_publication(pub, dontSend, dont_notify_lost);
349 
350  idSeq[count] = pub->get_id();
351  ++count;
352  }
353  }
354 
355  if (0 < count) {
356  idSeq.length(count);
357 
358  if (participant_->is_alive() && this->participant_->isOwner()) {
359  try {
360  CORBA::Boolean dont_notify_lost = false;
361  reader_->remove_associations(idSeq, dont_notify_lost);
362 
363  } catch (const CORBA::Exception& ex) {
366  "(%P|%t) ERROR: Exception caught in DCPS_IR_Subscription::remove_associations:");
367  }
368 
370  }
371  }
372  }
373  }
374 }
#define ACE_DEBUG(X)
DCPS_IR_Participant * participant_
Representative of a Publication.
ssize_t send(ACE_HANDLE handle, const void *buf, size_t len, int flags, const ACE_Time_Value *timeout=0)
int remove_associated_subscription(DCPS_IR_Subscription *sub, CORBA::Boolean sendNotify, CORBA::Boolean notify_lost, bool notify_both_side=false)
int remove_associated_publication(DCPS_IR_Publication *pub, CORBA::Boolean sendNotify, CORBA::Boolean notify_lost, bool notify_both_side=false)
OpenDDS::DCPS::GUID_t get_id()
OpenDDS::DCPS::GUID_t get_topic_id()
OpenDDS::DCPS::DataReaderRemote_var reader_
the corresponding DataReaderRemote object
ACE_CDR::Boolean Boolean
OpenDDS::DCPS::GUID_t id_
sequence< GUID_t > WriterIdSeq
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
ACE_TEXT("TCP_Factory")
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
void _tao_print_exception(const char *info, FILE *f=stdout) const
Definition: Exception.cpp:82
DCPS_IR_Publication_Set associations_
bool isOwner() const
Indication of whether the current repository is the owner of this participant.

◆ dump_to_string()

std::string DCPS_IR_Subscription::dump_to_string ( const std::string &  prefix,
int  depth 
) const

Definition at line 748 of file DCPS_IR_Subscription.cpp.

References associations_, ACE_Unbounded_Set_Ex< T, ACE_Unbounded_Set_Default_Comparator< T > >::begin(), defunct_, ACE_Unbounded_Set_Ex< T, ACE_Unbounded_Set_Default_Comparator< T > >::end(), id_, and isBIT_.

749 {
750  std::string str;
751 #if !defined (OPENDDS_INFOREPO_REDUCED_FOOTPRINT)
752  OpenDDS::DCPS::RepoIdConverter local_converter(id_);
753 
754  for (int i=0; i < depth; i++)
755  str += prefix;
756  std::string indent = str + prefix;
757  str += "DCPS_IR_Subscription[";
758  str += std::string(local_converter);
759  str += "]";
760  if (isBIT_)
761  str += " (BIT)";
762  str += "\n";
763 
764  str += indent + "Associations [ ";
766  assoc != associations_.end();
767  assoc++)
768  {
769  OpenDDS::DCPS::RepoIdConverter assoc_converter((*assoc)->get_id());
770  str += std::string(assoc_converter);
771  str += " ";
772  }
773  str += "]\n";
774 
775  str += indent + "Defunct Associations [ ";
777  def != defunct_.end();
778  def++)
779  {
780  OpenDDS::DCPS::RepoIdConverter def_converter((*def)->get_id());
781  str += std::string(def_converter);
782  str += " ";
783  }
784  str += "]\n";
785 
786 #endif // !defined (OPENDDS_INFOREPO_REDUCED_FOOTPRINT)
787  return str;
788 }
DCPS_IR_Publication_Set defunct_
OpenDDS::DCPS::GUID_t id_
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
DCPS_IR_Publication_Set associations_

◆ get_datareader_qos()

const DDS::DataReaderQos * DCPS_IR_Subscription::get_datareader_qos ( )

◆ get_expr_params()

DDS::StringSeq DCPS_IR_Subscription::get_expr_params ( ) const

◆ get_filter_class_name()

std::string DCPS_IR_Subscription::get_filter_class_name ( ) const

Definition at line 720 of file DCPS_IR_Subscription.cpp.

References filterClassName_.

Referenced by DCPS_IR_Publication::add_associated_subscription().

721 {
722  return filterClassName_;
723 }

◆ get_filter_expression()

std::string DCPS_IR_Subscription::get_filter_expression ( ) const

◆ get_handle()

DDS::InstanceHandle_t DCPS_IR_Subscription::get_handle ( void  )

Definition at line 693 of file DCPS_IR_Subscription.cpp.

References handle_.

Referenced by DCPS_IR_Domain::dispose_subscription_bit().

694 {
695  return handle_;
696 }
DDS::InstanceHandle_t handle_

◆ get_id()

OpenDDS::DCPS::GUID_t DCPS_IR_Subscription::get_id ( void  )

◆ get_incompatibleQosStatus()

OpenDDS::DCPS::IncompatibleQosStatus * DCPS_IR_Subscription::get_incompatibleQosStatus ( )

Return pointer to the incompatible qos status Subscription retains ownership

Definition at line 483 of file DCPS_IR_Subscription.cpp.

References incompatibleQosStatus_.

Referenced by reevaluate_association(), DCPS_IR_Publication::reevaluate_association(), DCPS_IR_Topic_Description::try_associate(), DCPS_IR_Topic_Description::try_associate_publication(), and DCPS_IR_Topic_Description::try_associate_subscription().

484 {
485  return &incompatibleQosStatus_;
486 }
OpenDDS::DCPS::IncompatibleQosStatus incompatibleQosStatus_

◆ get_participant_id()

OpenDDS::DCPS::GUID_t DCPS_IR_Subscription::get_participant_id ( )

◆ get_serialized_type_info()

const DDS::OctetSeq & DCPS_IR_Subscription::get_serialized_type_info ( ) const

◆ get_subscriber_qos()

const DDS::SubscriberQos * DCPS_IR_Subscription::get_subscriber_qos ( )

◆ get_topic()

DCPS_IR_Topic * DCPS_IR_Subscription::get_topic ( )

Definition at line 688 of file DCPS_IR_Subscription.cpp.

References topic_.

Referenced by DCPS_IR_Domain::publish_subscription_bit().

689 {
690  return topic_;
691 }

◆ get_topic_description()

DCPS_IR_Topic_Description * DCPS_IR_Subscription::get_topic_description ( )

Definition at line 683 of file DCPS_IR_Subscription.cpp.

References DCPS_IR_Topic::get_topic_description(), and topic_.

Referenced by DCPS_IR_Domain::publish_subscription_bit().

684 {
685  return topic_->get_topic_description();
686 }
DCPS_IR_Topic_Description * get_topic_description()

◆ get_topic_id()

OpenDDS::DCPS::GUID_t DCPS_IR_Subscription::get_topic_id ( )

◆ get_transportContext()

ACE_CDR::ULong DCPS_IR_Subscription::get_transportContext ( ) const
inline

Definition at line 150 of file DCPS_IR_Subscription.h.

References get_handle().

Referenced by DCPS_IR_Publication::add_associated_subscription().

150 { return transportContext_; }
ACE_CDR::ULong transportContext_

◆ get_transportLocatorSeq()

OpenDDS::DCPS::TransportLocatorSeq DCPS_IR_Subscription::get_transportLocatorSeq ( ) const

◆ is_bit()

CORBA::Boolean DCPS_IR_Subscription::is_bit ( )

Definition at line 703 of file DCPS_IR_Subscription.cpp.

References isBIT_.

Referenced by DCPS_IR_Domain::dispose_subscription_bit().

704 {
705  return isBIT_;
706 }

◆ is_publication_ignored()

CORBA::Boolean DCPS_IR_Subscription::is_publication_ignored ( OpenDDS::DCPS::GUID_t  partId,
OpenDDS::DCPS::GUID_t  topicId,
OpenDDS::DCPS::GUID_t  pubId 
)

Check that none of the ids given are ones that this subscription should ignore. returns 1 if one of these ids is an ignored id

Definition at line 467 of file DCPS_IR_Subscription.cpp.

References DCPS_IR_Participant::is_participant_ignored(), DCPS_IR_Participant::is_publication_ignored(), DCPS_IR_Participant::is_topic_ignored(), and participant_.

Referenced by DCPS_IR_Topic_Description::try_associate().

470 {
471  CORBA::Boolean ignored = (participant_->is_participant_ignored(partId) ||
472  participant_->is_topic_ignored(topicId) ||
474 
475  return ignored;
476 }
DCPS_IR_Participant * participant_
CORBA::Boolean is_publication_ignored(OpenDDS::DCPS::GUID_t id)
ACE_CDR::Boolean Boolean
CORBA::Boolean is_participant_ignored(OpenDDS::DCPS::GUID_t id)
CORBA::Boolean is_topic_ignored(OpenDDS::DCPS::GUID_t id)

◆ reader()

OpenDDS::DCPS::DataReaderRemote_ptr DCPS_IR_Subscription::reader ( void  )

Definition at line 714 of file DCPS_IR_Subscription.cpp.

References reader_.

Referenced by OpenDDS::Federator::ManagerImpl::pushState().

715 {
716  return OpenDDS::DCPS::DataReaderRemote::_duplicate(this->reader_.in());
717 }
OpenDDS::DCPS::DataReaderRemote_var reader_
the corresponding DataReaderRemote object

◆ reevaluate_association()

bool DCPS_IR_Subscription::reevaluate_association ( DCPS_IR_Publication publication)

Definition at line 639 of file DCPS_IR_Subscription.cpp.

References associations_, OpenDDS::DCPS::compatibleQOS(), ACE_Unbounded_Set_Ex< T, ACE_Unbounded_Set_Default_Comparator< T > >::find(), get_datareader_qos(), DCPS_IR_Publication::get_datawriter_qos(), DCPS_IR_Publication::get_incompatibleQosStatus(), get_incompatibleQosStatus(), DCPS_IR_Publication::get_publisher_qos(), get_subscriber_qos(), DCPS_IR_Topic::get_topic_description(), DCPS_IR_Publication::get_transportLocatorSeq(), get_transportLocatorSeq(), remove_associated_publication(), topic_, and DCPS_IR_Topic_Description::try_associate().

Referenced by DCPS_IR_Topic::reevaluate_associations(), DCPS_IR_Topic_Description::reevaluate_associations(), reevaluate_defunct_associations(), and reevaluate_existing_associations().

640 {
641  int status = this->associations_.find(publication);
642 
643  if (status == 0) {
644  // verify if they are still compatible after change
645 
648  publication->get_transportLocatorSeq(),
649  this->get_transportLocatorSeq(),
650  publication->get_datawriter_qos(),
651  this->get_datareader_qos(),
652  publication->get_publisher_qos(),
653  this->get_subscriber_qos())) {
654  bool sendNotify = true; // inform datareader
655  bool notify_lost = true; // invoke listerner callback
656 
657  this->remove_associated_publication(publication, sendNotify, notify_lost, true);
658  }
659 
660  } else {
662  return description->try_associate(publication, this);
663  }
664 
665  return false;
666 }
const DDS::DataReaderQos * get_datareader_qos()
const DDS::SubscriberQos * get_subscriber_qos()
bool try_associate(DCPS_IR_Publication *publication, DCPS_IR_Subscription *subscription)
OpenDDS::DCPS::IncompatibleQosStatus * get_incompatibleQosStatus()
DCPS_IR_Topic_Description * get_topic_description()
int remove_associated_publication(DCPS_IR_Publication *pub, CORBA::Boolean sendNotify, CORBA::Boolean notify_lost, bool notify_both_side=false)
bool compatibleQOS(OpenDDS::DCPS::IncompatibleQosStatus *writerStatus, OpenDDS::DCPS::IncompatibleQosStatus *readerStatus, const OpenDDS::DCPS::TransportLocatorSeq &pubTLS, const OpenDDS::DCPS::TransportLocatorSeq &subTLS, DDS::DataWriterQos const *const writerQos, DDS::DataReaderQos const *const readerQos, DDS::PublisherQos const *const pubQos, DDS::SubscriberQos const *const subQos)
Definition: DCPS_Utils.cpp:237
DDS::DataWriterQos * get_datawriter_qos()
DDS::PublisherQos * get_publisher_qos()
OpenDDS::DCPS::TransportLocatorSeq get_transportLocatorSeq() const
OpenDDS::DCPS::IncompatibleQosStatus * get_incompatibleQosStatus()
Representative of a Topic Description.
OpenDDS::DCPS::TransportLocatorSeq get_transportLocatorSeq() const
DCPS_IR_Publication_Set associations_

◆ reevaluate_defunct_associations()

void DCPS_IR_Subscription::reevaluate_defunct_associations ( )

Definition at line 601 of file DCPS_IR_Subscription.cpp.

References ACE_ERROR, ACE_TEXT(), ACE_Unbounded_Set_Ex< T, ACE_Unbounded_Set_Default_Comparator< T > >::begin(), defunct_, ACE_Unbounded_Set_Ex< T, ACE_Unbounded_Set_Default_Comparator< T > >::end(), DCPS_IR_Publication::get_id(), id_, LM_ERROR, reevaluate_association(), and ACE_Unbounded_Set_Ex< T, ACE_Unbounded_Set_Default_Comparator< T > >::remove().

602 {
604  while (it != this->defunct_.end()) {
605  DCPS_IR_Publication* publication = *it;
606  ++it;
607 
608  if (reevaluate_association(publication)) {
609  this->defunct_.remove(publication); // no longer defunct
610 
611  } else {
612  OpenDDS::DCPS::RepoIdConverter sub_converter(id_);
613  OpenDDS::DCPS::RepoIdConverter pub_converter(publication->get_id());
614  ACE_ERROR((LM_ERROR,
615  ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Subscription::reevaluate_defunct_associations: ")
616  ACE_TEXT("subscription %C failed to reassociate publication %C at %x.\n"),
617  std::string(sub_converter).c_str(),
618  std::string(pub_converter).c_str(),
619  publication));
620  }
621  }
622 }
#define ACE_ERROR(X)
Representative of a Publication.
OpenDDS::DCPS::GUID_t get_id()
DCPS_IR_Publication_Set defunct_
OpenDDS::DCPS::GUID_t id_
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
ACE_TEXT("TCP_Factory")
bool reevaluate_association(DCPS_IR_Publication *publication)

◆ reevaluate_existing_associations()

void DCPS_IR_Subscription::reevaluate_existing_associations ( )

◆ remove_associated_publication()

int DCPS_IR_Subscription::remove_associated_publication ( DCPS_IR_Publication pub,
CORBA::Boolean  sendNotify,
CORBA::Boolean  notify_lost,
bool  notify_both_side = false 
)

Remove the associated publication Removes the publication from the list of associated publications if return successful sendNotify indicates whether to tell the datareader about removing the publication The notify_lost flag true indicates this remove_associations is called when the InfoRepo detects this subscription is lost because of the failure of invocation on this subscription. The notify_both_side parameter indicates if it needs call pub to remove association as well. This method can mark the participant dead Returns 0 if successful

Definition at line 133 of file DCPS_IR_Subscription.cpp.

References CORBA::Exception::_tao_print_exception(), ACE_DEBUG, ACE_ERROR, ACE_TEXT(), associations_, OpenDDS::DCPS::DCPS_debug_level, DCPS_IR_Publication::get_id(), id_, DCPS_IR_Participant::is_alive(), DCPS_IR_Participant::isOwner(), LM_DEBUG, LM_ERROR, DCPS_IR_Participant::mark_dead(), participant_, reader_, ACE_Unbounded_Set_Ex< T, ACE_Unbounded_Set_Default_Comparator< T > >::remove(), and DCPS_IR_Publication::remove_associated_subscription().

Referenced by DCPS_IR_Publication::disassociate_participant(), disassociate_participant(), disassociate_publication(), DCPS_IR_Publication::disassociate_subscription(), DCPS_IR_Publication::disassociate_topic(), disassociate_topic(), reevaluate_association(), DCPS_IR_Publication::remove_associated_subscription(), DCPS_IR_Publication::remove_associations(), and remove_associations().

137 {
138  bool marked_dead = false;
139 
140  if (sendNotify) {
141 
142  if (participant_->is_alive() && this->participant_->isOwner()) {
143  try {
145  OpenDDS::DCPS::RepoIdConverter sub_converter(id_);
146  OpenDDS::DCPS::RepoIdConverter pub_converter(pub->get_id());
147  ACE_DEBUG((LM_DEBUG,
148  ACE_TEXT("(%P|%t) DCPS_IR_Subscription::remove_associated_publication:")
149  ACE_TEXT(" calling sub %C with pub %C\n"),
150  std::string(sub_converter).c_str(),
151  std::string(pub_converter).c_str()
152  ));
153  }
154 
156  idSeq.length(1);
157  idSeq[0] = pub->get_id();
158 
159  reader_->remove_associations(idSeq, notify_lost);
160 
161  if (notify_both_side) {
162  pub->remove_associated_subscription(this, sendNotify, notify_lost);
163  }
164 
165  } catch (const CORBA::Exception& ex) {
168  "(%P|%t) ERROR: Exception caught in DCPS_IR_Subscription::remove_associated_publication:");
169  }
170 
172  marked_dead = true;
173  }
174  }
175  }
176 
177  int status = associations_.remove(pub);
178 
179  if (0 == status) {
181  OpenDDS::DCPS::RepoIdConverter sub_converter(id_);
182  OpenDDS::DCPS::RepoIdConverter pub_converter(pub->get_id());
183  ACE_DEBUG((LM_DEBUG,
184  ACE_TEXT("(%P|%t) DCPS_IR_Subscription::remove_associated_publication: ")
185  ACE_TEXT("subscription %C removed publication %C at %x.\n"),
186  std::string(sub_converter).c_str(),
187  std::string(pub_converter).c_str(),
188  pub));
189  }
190 
191  } else {
192  OpenDDS::DCPS::RepoIdConverter sub_converter(id_);
193  OpenDDS::DCPS::RepoIdConverter pub_converter(pub->get_id());
194  ACE_ERROR((LM_ERROR,
195  ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Subscription::remove_associated_publication: ")
196  ACE_TEXT("subscription %C failed to remove publication %C at %x.\n"),
197  std::string(sub_converter).c_str(),
198  std::string(pub_converter).c_str(),
199  pub));
200  } // if (0 == status)
201 
202  if (marked_dead) {
203  return -1;
204 
205  } else {
206  return status;
207  }
208 }
#define ACE_DEBUG(X)
#define ACE_ERROR(X)
DCPS_IR_Participant * participant_
int remove_associated_subscription(DCPS_IR_Subscription *sub, CORBA::Boolean sendNotify, CORBA::Boolean notify_lost, bool notify_both_side=false)
OpenDDS::DCPS::GUID_t get_id()
OpenDDS::DCPS::DataReaderRemote_var reader_
the corresponding DataReaderRemote object
OpenDDS::DCPS::GUID_t id_
sequence< GUID_t > WriterIdSeq
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
ACE_TEXT("TCP_Factory")
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
void _tao_print_exception(const char *info, FILE *f=stdout) const
Definition: Exception.cpp:82
DCPS_IR_Publication_Set associations_
bool isOwner() const
Indication of whether the current repository is the owner of this participant.

◆ remove_associations()

int DCPS_IR_Subscription::remove_associations ( CORBA::Boolean  notify_lost)

Removes all the associated publications This method can mark the participant dead The notify_lost flag true indicates this remove_associations is called when the InfoRepo detects this subscription is lost because of the failure of invocation on this subscription. Returns 0 if successful

Definition at line 210 of file DCPS_IR_Subscription.cpp.

References associations_, ACE_Unbounded_Set_Ex< T, ACE_Unbounded_Set_Default_Comparator< T > >::begin(), defunct_, ACE_Unbounded_Set_Ex< T, ACE_Unbounded_Set_Default_Comparator< T > >::end(), remove_associated_publication(), DCPS_IR_Publication::remove_associated_subscription(), ACE_Unbounded_Set_Ex< T, ACE_Unbounded_Set_Default_Comparator< T > >::reset(), send(), and ACE_Unbounded_Set_Ex< T, ACE_Unbounded_Set_Default_Comparator< T > >::size().

211 {
212  int status = 0;
213  DCPS_IR_Publication* pub = 0;
214  size_t numAssociations = associations_.size();
215  CORBA::Boolean dontSend = 0;
216  CORBA::Boolean send = 1;
217 
218  if (0 < numAssociations) {
221 
222  while (iter != end) {
223  pub = *iter;
224  ++iter;
225 
226  pub->remove_associated_subscription(this, send, notify_lost);
227  CORBA::Boolean dont_notify_lost = 0;
228  remove_associated_publication(pub, dontSend, dont_notify_lost);
229  }
230  }
231  this->defunct_.reset();
232 
233  return status;
234 }
Representative of a Publication.
ssize_t send(ACE_HANDLE handle, const void *buf, size_t len, int flags, const ACE_Time_Value *timeout=0)
int remove_associated_subscription(DCPS_IR_Subscription *sub, CORBA::Boolean sendNotify, CORBA::Boolean notify_lost, bool notify_both_side=false)
int remove_associated_publication(DCPS_IR_Publication *pub, CORBA::Boolean sendNotify, CORBA::Boolean notify_lost, bool notify_both_side=false)
ACE_CDR::Boolean Boolean
DCPS_IR_Publication_Set defunct_
DCPS_IR_Publication_Set associations_

◆ set_bit_status()

void DCPS_IR_Subscription::set_bit_status ( CORBA::Boolean  isBIT)

Definition at line 708 of file DCPS_IR_Subscription.cpp.

References isBIT_.

Referenced by DCPS_IR_Participant::add_subscription(), and DCPS_IR_Domain::publish_subscription_bit().

709 {
710  isBIT_ = isBIT;
711 }

◆ set_handle()

void DCPS_IR_Subscription::set_handle ( DDS::InstanceHandle_t  handle)

Definition at line 698 of file DCPS_IR_Subscription.cpp.

References handle_.

Referenced by DCPS_IR_Domain::publish_subscription_bit().

699 {
700  handle_ = handle;
701 }
DDS::InstanceHandle_t handle_

◆ set_qos() [1/3]

bool DCPS_IR_Subscription::set_qos ( const DDS::DataReaderQos qos,
const DDS::SubscriberQos subscriberQos,
Update::SpecificQos specificQos 
)

Update the DataReader or Subscriber qos and also publish the qos changes to datereader BIT.

Definition at line 559 of file DCPS_IR_Subscription.cpp.

References Update::DataReaderQos, DCPS_IR_Participant::get_domain_reference(), DCPS_IR_Topic::get_topic_description(), Update::NoQos, participant_, DCPS_IR_Domain::publish_subscription_bit(), qos_, DCPS_IR_Topic_Description::reevaluate_associations(), reevaluate_existing_associations(), OpenDDS::DCPS::should_check_association_upon_change(), Update::SubscriberQos, subscriberQos_, and topic_.

Referenced by TAO_DDS_DCPSInfo_i::update_subscription_qos().

562 {
563  bool need_evaluate = false;
564  bool u_dr_qos = !(qos_ == qos);
565 
566  if (u_dr_qos) {
568  need_evaluate = true;
569  }
570 
571  qos_ = qos;
572  }
573 
574  bool u_sub_qos = !(subscriberQos_ == subscriberQos);
575 
576  if (u_sub_qos) {
578  need_evaluate = true;
579  }
580 
581  subscriberQos_ = subscriberQos;
582  }
583 
584  if (need_evaluate) {
585  // Check if any existing association need be removed first.
587 
589  description->reevaluate_associations(this);
590  }
591 
593  specificQos = u_dr_qos? Update::DataReaderQos:
594  u_sub_qos? Update::SubscriberQos:
596 
597  return true;
598 }
DCPS_IR_Participant * participant_
DCPS_IR_Topic_Description * get_topic_description()
bool should_check_association_upon_change(const DDS::DataReaderQos &qos1, const DDS::DataReaderQos &qos2)
Definition: DCPS_Utils.cpp:415
DDS::DataReaderQos qos_
DDS::SubscriberQos subscriberQos_
Representative of a Topic Description.
void publish_subscription_bit(DCPS_IR_Subscription *subscription)
Publish Subscription in the Subscription Built-In Topic.
DCPS_IR_Domain * get_domain_reference() const
void reevaluate_associations(DCPS_IR_Publication *publication)

◆ set_qos() [2/3]

void DCPS_IR_Subscription::set_qos ( const DDS::DataReaderQos qos)

Update DataReaderQos only.

Definition at line 501 of file DCPS_IR_Subscription.cpp.

References DCPS_IR_Participant::get_domain_reference(), DCPS_IR_Topic::get_topic_description(), participant_, DCPS_IR_Domain::publish_subscription_bit(), qos_, DCPS_IR_Topic_Description::reevaluate_associations(), reevaluate_existing_associations(), OpenDDS::DCPS::should_check_association_upon_change(), ACE_OS::sleep(), and topic_.

502 {
503  if (false == (qos == this->qos_)) {
504  // Check if we should check while we have both values.
505  bool check =
507 
508  // Store the new, compatible, value.
509  this->qos_ = qos;
510 
511  if (check) {
512  // This will remove any newly stale associations.
514 
515  // Sleep a while to let remove_association handled by DataWriter
516  // before add_association. Otherwise, new association will have
517  // trouble to connect each other.
518  ACE_OS::sleep(ACE_Time_Value(0, 250000));
519 
520  // This will establish any newly made associations.
521  DCPS_IR_Topic_Description* description
522  = this->topic_->get_topic_description();
523  description->reevaluate_associations(this);
524  }
525 
527  }
528 }
DCPS_IR_Participant * participant_
int sleep(u_int seconds)
DCPS_IR_Topic_Description * get_topic_description()
bool should_check_association_upon_change(const DDS::DataReaderQos &qos1, const DDS::DataReaderQos &qos2)
Definition: DCPS_Utils.cpp:415
DDS::DataReaderQos qos_
Representative of a Topic Description.
void publish_subscription_bit(DCPS_IR_Subscription *subscription)
Publish Subscription in the Subscription Built-In Topic.
DCPS_IR_Domain * get_domain_reference() const
void reevaluate_associations(DCPS_IR_Publication *publication)

◆ set_qos() [3/3]

void DCPS_IR_Subscription::set_qos ( const DDS::SubscriberQos qos)

Update SubscriberQos only.

Definition at line 531 of file DCPS_IR_Subscription.cpp.

References DCPS_IR_Participant::get_domain_reference(), DCPS_IR_Topic::get_topic_description(), participant_, DCPS_IR_Domain::publish_subscription_bit(), DCPS_IR_Topic_Description::reevaluate_associations(), reevaluate_existing_associations(), OpenDDS::DCPS::should_check_association_upon_change(), ACE_OS::sleep(), subscriberQos_, and topic_.

532 {
533  if (false == (qos == this->subscriberQos_)) {
534  // Check if we should check while we have both values.
536 
537  // Store the new, compatible, value.
538  this->subscriberQos_ = qos;
539 
540  if (check) {
541  // This will remove any newly stale associations.
543 
544  // Sleep a while to let remove_association handled by DataWriter
545  // before add_association. Otherwise, new association will have
546  // trouble to connect each other.
547  ACE_OS::sleep(ACE_Time_Value(0, 250000));
548 
549  // This will establish any newly made associations.
550  DCPS_IR_Topic_Description* description
551  = this->topic_->get_topic_description();
552  description->reevaluate_associations(this);
553  }
554 
556  }
557 }
DCPS_IR_Participant * participant_
int sleep(u_int seconds)
DCPS_IR_Topic_Description * get_topic_description()
bool should_check_association_upon_change(const DDS::DataReaderQos &qos1, const DDS::DataReaderQos &qos2)
Definition: DCPS_Utils.cpp:415
DDS::SubscriberQos subscriberQos_
Representative of a Topic Description.
void publish_subscription_bit(DCPS_IR_Subscription *subscription)
Publish Subscription in the Subscription Built-In Topic.
DCPS_IR_Domain * get_domain_reference() const
void reevaluate_associations(DCPS_IR_Publication *publication)

◆ update_expr_params()

void DCPS_IR_Subscription::update_expr_params ( const DDS::StringSeq params)

◆ update_incompatible_qos()

void DCPS_IR_Subscription::update_incompatible_qos ( )

Notify the reader of incompatible qos status and reset the status' count_since_last_send to 0

Definition at line 450 of file DCPS_IR_Subscription.cpp.

References CORBA::Exception::_tao_print_exception(), OpenDDS::DCPS::IncompatibleQosStatus::count_since_last_send, OpenDDS::DCPS::DCPS_debug_level, incompatibleQosStatus_, DCPS_IR_Participant::is_alive(), DCPS_IR_Participant::isOwner(), DCPS_IR_Participant::mark_dead(), participant_, and reader_.

Referenced by DCPS_IR_Topic_Description::try_associate_publication(), and DCPS_IR_Topic_Description::try_associate_subscription().

451 {
452  if (participant_->is_alive() && this->participant_->isOwner()) {
453  try {
454  reader_->update_incompatible_qos(incompatibleQosStatus_);
456  } catch (const CORBA::Exception& ex) {
459  "(%P|%t) ERROR: Exception caught in DCPS_IR_Subscription::update_incompatible_qos:");
460  }
461 
463  }
464  }
465 }
DCPS_IR_Participant * participant_
OpenDDS::DCPS::DataReaderRemote_var reader_
the corresponding DataReaderRemote object
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
OpenDDS::DCPS::IncompatibleQosStatus incompatibleQosStatus_
void _tao_print_exception(const char *info, FILE *f=stdout) const
Definition: Exception.cpp:82
bool isOwner() const
Indication of whether the current repository is the owner of this participant.

Member Data Documentation

◆ associations_

DCPS_IR_Publication_Set DCPS_IR_Subscription::associations_
private

◆ defunct_

DCPS_IR_Publication_Set DCPS_IR_Subscription::defunct_
private

◆ exprParams_

DDS::StringSeq DCPS_IR_Subscription::exprParams_
private

Definition at line 200 of file DCPS_IR_Subscription.h.

Referenced by get_expr_params(), and update_expr_params().

◆ filterClassName_

std::string DCPS_IR_Subscription::filterClassName_
private

Definition at line 198 of file DCPS_IR_Subscription.h.

Referenced by get_filter_class_name().

◆ filterExpression_

std::string DCPS_IR_Subscription::filterExpression_
private

Definition at line 199 of file DCPS_IR_Subscription.h.

Referenced by get_filter_expression().

◆ handle_

DDS::InstanceHandle_t DCPS_IR_Subscription::handle_
private

Definition at line 189 of file DCPS_IR_Subscription.h.

Referenced by get_handle(), and set_handle().

◆ id_

OpenDDS::DCPS::GUID_t DCPS_IR_Subscription::id_
private

◆ incompatibleQosStatus_

OpenDDS::DCPS::IncompatibleQosStatus DCPS_IR_Subscription::incompatibleQosStatus_
private

◆ info_

OpenDDS::DCPS::TransportLocatorSeq DCPS_IR_Subscription::info_
private

Definition at line 195 of file DCPS_IR_Subscription.h.

Referenced by get_transportLocatorSeq().

◆ isBIT_

CORBA::Boolean DCPS_IR_Subscription::isBIT_
private

Definition at line 190 of file DCPS_IR_Subscription.h.

Referenced by dump_to_string(), is_bit(), and set_bit_status().

◆ participant_

DCPS_IR_Participant* DCPS_IR_Subscription::participant_
private

◆ qos_

DDS::DataReaderQos DCPS_IR_Subscription::qos_
private

Definition at line 194 of file DCPS_IR_Subscription.h.

Referenced by get_datareader_qos(), and set_qos().

◆ reader_

OpenDDS::DCPS::DataReaderRemote_var DCPS_IR_Subscription::reader_
private

◆ serializedTypeInfo_

DDS::OctetSeq DCPS_IR_Subscription::serializedTypeInfo_
private

Definition at line 201 of file DCPS_IR_Subscription.h.

Referenced by get_serialized_type_info().

◆ subscriberQos_

DDS::SubscriberQos DCPS_IR_Subscription::subscriberQos_
private

Definition at line 197 of file DCPS_IR_Subscription.h.

Referenced by get_subscriber_qos(), and set_qos().

◆ topic_

DCPS_IR_Topic* DCPS_IR_Subscription::topic_
private

◆ transportContext_

ACE_CDR::ULong DCPS_IR_Subscription::transportContext_
private

Definition at line 196 of file DCPS_IR_Subscription.h.


The documentation for this class was generated from the following files: