OpenDDS  Snapshot(2023/04/28-20:55)
DCPS_IR_Topic.cpp
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 #include "DcpsInfo_pch.h"
9 
10 #include /**/ "DCPS_IR_Topic.h"
11 #include /**/ "DCPS_IR_Domain.h"
12 
13 #include /**/ "DCPS_IR_Subscription.h"
14 #include /**/ "DCPS_IR_Publication.h"
15 #include /**/ "DCPS_IR_Participant.h"
16 #include /**/ "DCPS_IR_Topic_Description.h"
17 
18 #include /**/ "dds/DCPS/DCPS_Utils.h"
19 #include /**/ "dds/DCPS/RepoIdConverter.h"
20 #include /**/ "dds/DCPS/Qos_Helper.h"
21 #include /**/ "tao/debug.h"
22 
24 
26  const DDS::TopicQos& qos,
27  DCPS_IR_Domain* domain,
28  DCPS_IR_Participant* creator,
29  DCPS_IR_Topic_Description* description,
30  bool isBIT)
31  : id_(id),
32  qos_(qos),
33  domain_(domain),
34  participant_(creator),
35  description_(description),
36  handle_(0),
37  isBIT_(isBIT),
38  removed_(false)
39 {
40 }
41 
43 {
44 }
45 
46 void DCPS_IR_Topic::release(bool removing)
47 {
48  if (removing || this->removed_) {
49  this->removed_ = true;
50 
51  if (publicationRefs_.size() == 0 && subscriptionRefs_.size() == 0) {
52  this->domain_->remove_topic_id_mapping(this->id_);
53  }
54  }
55 }
56 
58  , bool associate)
59 {
60  int status = publicationRefs_.insert(publication);
61 
62  switch (status) {
63  case 0:
64 
65  // Publish the BIT information
66  domain_->publish_publication_bit(publication);
67 
68  if (associate) {
70  // Do not check incompatible qos here. The check is done
71  // in the DCPS_IR_Topic_Description::try_associate_publication method
72  }
73 
75  OpenDDS::DCPS::RepoIdConverter topic_converter(id_);
76  OpenDDS::DCPS::RepoIdConverter pub_converter(publication->get_id());
78  ACE_TEXT("(%P|%t) DCPS_IR_Topic::add_publication_reference: ")
79  ACE_TEXT("topic %C added publication %C at %x\n"),
80  std::string(topic_converter).c_str(),
81  std::string(pub_converter).c_str(),
82  publication));
83  }
84 
85  break;
86 
87  case 1:
88 
90  OpenDDS::DCPS::RepoIdConverter topic_converter(id_);
91  OpenDDS::DCPS::RepoIdConverter pub_converter(publication->get_id());
93  ACE_TEXT("(%P|%t) WARNING: DCPS_IR_Topic::add_publication_reference: ")
94  ACE_TEXT("topic %C attempt to re-add publication %C.\n"),
95  std::string(topic_converter).c_str(),
96  std::string(pub_converter).c_str()));
97  }
98 
99  break;
100 
101  case -1: {
102  OpenDDS::DCPS::RepoIdConverter topic_converter(id_);
103  OpenDDS::DCPS::RepoIdConverter pub_converter(publication->get_id());
105  ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Topic::add_publication_reference: ")
106  ACE_TEXT("topic %C failed to add publication %C\n"),
107  std::string(topic_converter).c_str(),
108  std::string(pub_converter).c_str()));
109  }
110  };
111 
112  return status;
113 }
114 
116 {
117  int status = publicationRefs_.remove(publication);
118 
119  if (0 == status) {
121  OpenDDS::DCPS::RepoIdConverter topic_converter(id_);
122  OpenDDS::DCPS::RepoIdConverter pub_converter(publication->get_id());
124  ACE_TEXT("(%P|%t) DCPS_IR_Topic::remove_publication_reference: ")
125  ACE_TEXT("topic %C removed publication %C.\n"),
126  std::string(topic_converter).c_str(),
127  std::string(pub_converter).c_str()));
128  }
129 
130  } else {
131  OpenDDS::DCPS::RepoIdConverter topic_converter(id_);
132  OpenDDS::DCPS::RepoIdConverter pub_converter(publication->get_id());
134  ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Topic::remove_publication_reference: ")
135  ACE_TEXT("topic %C failed to remove publication %C.\n"),
136  std::string(topic_converter).c_str(),
137  std::string(pub_converter).c_str()));
138  }
139 
140  return status;
141 }
142 
144  , bool associate)
145 {
146  int status = subscriptionRefs_.insert(subscription);
147 
148  switch (status) {
149  case 0:
150 
152  OpenDDS::DCPS::RepoIdConverter topic_converter(id_);
153  OpenDDS::DCPS::RepoIdConverter sub_converter(subscription->get_id());
155  ACE_TEXT("(%P|%t) DCPS_IR_Topic::add_subscription_reference: ")
156  ACE_TEXT("topic %C added subscription %C at %x.\n"),
157  std::string(topic_converter).c_str(),
158  std::string(sub_converter).c_str(),
159  subscription));
160  }
161 
162  status = this->description_->add_subscription_reference(subscription, associate);
163  break;
164 
165  case 1: {
166  OpenDDS::DCPS::RepoIdConverter topic_converter(id_);
167  OpenDDS::DCPS::RepoIdConverter sub_converter(subscription->get_id());
169  ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Topic::add_subscription_reference: ")
170  ACE_TEXT("topic %C attempt to re-add subscription %C.\n"),
171  std::string(topic_converter).c_str(),
172  std::string(sub_converter).c_str()));
173  }
174  break;
175 
176  case -1: {
177  OpenDDS::DCPS::RepoIdConverter topic_converter(id_);
178  OpenDDS::DCPS::RepoIdConverter sub_converter(subscription->get_id());
180  ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Topic::add_subscription_reference: ")
181  ACE_TEXT("topic %C failed to add subscription %C.\n"),
182  std::string(topic_converter).c_str(),
183  std::string(sub_converter).c_str()));
184  }
185  };
186 
187  return status;
188 }
189 
191 {
192  int status = subscriptionRefs_.remove(subscription);
193 
194  if (0 == status) {
196  OpenDDS::DCPS::RepoIdConverter topic_converter(id_);
197  OpenDDS::DCPS::RepoIdConverter sub_converter(subscription->get_id());
199  ACE_TEXT("(%P|%t) DCPS_IR_Topic::remove_subscription_reference: ")
200  ACE_TEXT("topic %C removed subscription %C.\n"),
201  std::string(topic_converter).c_str(),
202  std::string(sub_converter).c_str()));
203  }
204 
205  this->description_->remove_subscription_reference(subscription);
206 
207  } else {
208  OpenDDS::DCPS::RepoIdConverter topic_converter(id_);
209  OpenDDS::DCPS::RepoIdConverter sub_converter(subscription->get_id());
211  ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Topic::remove_subscription_reference: ")
212  ACE_TEXT("topic %C failed to remove subscription %C.\n"),
213  std::string(topic_converter).c_str(),
214  std::string(sub_converter).c_str()));
215  } // if (0 == status)
216 
217  return status;
218 }
219 
221 {
222  return id_;
223 }
224 
226 {
227  return participant_->get_id();
228 }
229 
231 {
232  return &qos_;
233 }
234 
236 {
237  // Do not need re-evaluate compatibility and associations when
238  // TopicQos changes since only datareader and datawriter QoS
239  // are evaludated during normal associations establishment.
240  using OpenDDS::DCPS::operator==;
241  bool pub_to_rd_wr = !(qos.topic_data == qos_.topic_data);
242 
243  qos_ = qos;
244  domain_->publish_topic_bit(this);
245 
246  if (!pub_to_rd_wr)
247  return true;
248 
249  // The only changeable TopicQos used by DataWriter and DataReader
250  // is topic_data so we need publish it to DW/DR BIT to make they
251  // are consistent.
252 
253  // Update qos in datawriter BIT for associated datawriters.
254 
255  {
258 
259  while (iter != end) {
261  ++iter;
262  }
263  }
264 
265  // Update qos in datareader BIT for associated datareader.
266 
267  {
270 
271  while (iter != end) {
273  ++iter;
274  }
275  }
276 
277  return true;
278 }
279 
281 {
282  // check if we should ignore this subscription
283  if (participant_->is_subscription_ignored(subscription->get_id()) ||
285  participant_->is_topic_ignored(subscription->get_topic_id())) {
287  OpenDDS::DCPS::RepoIdConverter topic_converter(id_);
288  OpenDDS::DCPS::RepoIdConverter sub_converter(subscription->get_id());
290  ACE_TEXT("(%P|%t) DCPS_IR_Topic::try_associate: ")
291  ACE_TEXT("topic %C ignoring subscription %C.\n"),
292  std::string(topic_converter).c_str(),
293  std::string(sub_converter).c_str()));
294  }
295 
296  } else {
297  // check all publications for compatibility
298  DCPS_IR_Publication* pub = 0;
300 
303 
304  while (iter != end) {
305  pub = *iter;
306  ++iter;
307  description_->try_associate(pub, subscription);
308  // Check the publications QOS status
309  qosStatus = pub->get_incompatibleQosStatus();
310 
311  if (0 < qosStatus->count_since_last_send) {
313  }
314  } /* while (iter != end) */
315 
316  // The subscription QOS is not checked because
317  // we don't know if the subscription is finished cycling
318  // through topics.
319  }
320 }
321 
323 {
324  return description_;
325 }
326 
328 {
329  return handle_;
330 }
331 
333 {
334  handle_ = handle;
335 }
336 
338 {
339  return isBIT_;
340 }
341 
343 {
344  isBIT_ = isBIT;
345 }
346 
348 {
349  DCPS_IR_Publication * pub = 0;
352 
353  while (iter != end) {
354  pub = *iter;
355  ++iter;
356 
357  subscription->reevaluate_association(pub);
358  pub->reevaluate_association(subscription);
359  }
360 }
361 
362 
364 {
367 
368  for ( ; iter != end; ++iter)
369  {
371  }
372 }
373 
374 std::string
375 DCPS_IR_Topic::dump_to_string(const std::string& prefix, int depth) const
376 {
377  std::string str;
378 #if !defined (OPENDDS_INFOREPO_REDUCED_FOOTPRINT)
379  OpenDDS::DCPS::RepoIdConverter local_converter(id_);
380 
381  for (int i=0; i < depth; i++)
382  str += prefix;
383  std::string indent = str + prefix;
384  str += "DCPS_IR_Topic[";
385  str += std::string(local_converter);
386  str += "]";
387  if (isBIT_)
388  str += " (BIT)";
389  str += "\n";
390 
391  str += indent + "Publications:\n";
393  pub != publicationRefs_.end();
394  pub++)
395  {
396  OpenDDS::DCPS::RepoIdConverter pub_converter((*pub)->get_id());
397  str += indent + std::string(pub_converter);
398  str += "\n";
399 
400  }
401 
402  str += indent + "Subscriptions:\n";
404  sub != subscriptionRefs_.end();
405  sub++)
406  {
407  OpenDDS::DCPS::RepoIdConverter sub_converter((*sub)->get_id());
408  str += indent + std::string(sub_converter);
409  str += "\n";
410  }
411 #endif // !defined (OPENDDS_INFOREPO_REDUCED_FOOTPRINT)
412  return str;
413 }
414 
#define ACE_DEBUG(X)
void try_associate_publication(DCPS_IR_Publication *publication)
#define ACE_ERROR(X)
OpenDDS::DCPS::GUID_t get_participant_id() const
int add_subscription_reference(DCPS_IR_Subscription *subscription, bool associate=true)
CORBA::Boolean is_subscription_ignored(OpenDDS::DCPS::GUID_t id)
bool set_topic_qos(const DDS::TopicQos &qos)
void reassociate_all_publications()
OpenDDS::DCPS::GUID_t get_id()
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
DCPS_IR_Topic(const OpenDDS::DCPS::GUID_t &id, const DDS::TopicQos &qos, DCPS_IR_Domain *domain, DCPS_IR_Participant *creator, DCPS_IR_Topic_Description *description, bool isBIT)
std::string dump_to_string(const std::string &prefix, int depth) const
OpenDDS::DCPS::GUID_t get_id() const
OpenDDS::DCPS::GUID_t id_
int remove_publication_reference(DCPS_IR_Publication *publication)
DCPS_IR_Topic_Description * get_topic_description()
CORBA::Boolean isBIT_
DCPS_IR_Participant * participant_
DCPS_IR_Topic_Description * description_
OpenDDS::DCPS::GUID_t get_topic_id()
DCPS_IR_Publication_Set publicationRefs_
CORBA::Boolean is_bit()
void publish_subscription_bit(DCPS_IR_Subscription *subscription)
Publish Subscription in the Subscription Built-In Topic.
DDS::InstanceHandle_t get_handle()
DDS::TopicQos * get_topic_qos()
OpenDDS::DCPS::IncompatibleQosStatus * get_incompatibleQosStatus()
LM_DEBUG
int remove_subscription_reference(DCPS_IR_Subscription *subscription)
CORBA::Boolean is_participant_ignored(OpenDDS::DCPS::GUID_t id)
DCPS_IR_Domain * domain_
ACE_CDR::Boolean Boolean
void publish_topic_bit(DCPS_IR_Topic *topic)
Publish Topic in the Topic Built-In Topic.
DDS::TopicQos qos_
Representative of the Domain Participant.
int add_publication_reference(DCPS_IR_Publication *publication, bool associate=true)
TopicDataQosPolicy topic_data
bool reevaluate_association(DCPS_IR_Subscription *subscription)
LM_WARNING
void set_handle(DDS::InstanceHandle_t handle)
OpenDDS::DCPS::GUID_t get_id()
void try_associate(DCPS_IR_Subscription *subscription)
HANDLE_TYPE_NATIVE InstanceHandle_t
Definition: DdsDcpsCore.idl:51
ACE_TEXT("TCP_Factory")
CORBA::Boolean is_topic_ignored(OpenDDS::DCPS::GUID_t id)
DDS::InstanceHandle_t handle_
void remove_topic_id_mapping(const OpenDDS::DCPS::GUID_t &topicId)
DCPS_IR_Subscription_Set subscriptionRefs_
void publish_publication_bit(DCPS_IR_Publication *publication)
Publish Publication in the Publication Built-In Topic.
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
bool try_associate(DCPS_IR_Publication *publication, DCPS_IR_Subscription *subscription)
OpenDDS::DCPS::GUID_t get_id()
OpenDDS::DCPS::GUID_t get_participant_id()
void set_bit_status(CORBA::Boolean isBIT)
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
void reevaluate_associations(DCPS_IR_Subscription *subscription)
Representative of a Subscription.
Representation of a Domain in the system.
int remove_subscription_reference(DCPS_IR_Subscription *subscription)
LM_ERROR
int add_subscription_reference(DCPS_IR_Subscription *subscription, bool associate=true)
Representative of a Topic Description.
Representative of a Publication.
void release(bool removing)
bool reevaluate_association(DCPS_IR_Publication *publication)