OpenDDS  Snapshot(2023/04/28-20:55)
DCPS_IR_Topic_Description.cpp
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 #include "DcpsInfo_pch.h"
9 #include /**/ "DCPS_IR_Topic_Description.h"
10 
11 #include /**/ "DCPS_IR_Subscription.h"
12 #include /**/ "DCPS_IR_Publication.h"
13 
14 #include /**/ "DCPS_IR_Topic.h"
15 #include /**/ "DCPS_IR_Domain.h"
16 
17 #include /**/ "dds/DCPS/DCPS_Utils.h"
18 
19 #include /**/ "tao/debug.h"
20 
21 #include /**/ "dds/DCPS/RepoIdConverter.h"
22 
24 
26  const char* name,
27  const char* dataTypeName)
28  : name_(name),
29  dataTypeName_(dataTypeName),
30  domain_(domain)
31 {
32 }
33 
35 {
36 }
37 
39  , bool associate)
40 {
41  int status = subscriptionRefs_.insert(subscription);
42 
43  switch (status) {
44  case 0:
45 
46  // Publish the BIT information
47  domain_->publish_subscription_bit(subscription);
48 
49  if (associate) {
50  try_associate_subscription(subscription);
51  // Do not check incompatible qos here. The check is done
52  // in the DCPS_IR_Topic_Description::try_associate_subscription method
53  }
54 
56  OpenDDS::DCPS::RepoIdConverter converter(subscription->get_id());
58  ACE_TEXT("(%P|%t) DCPS_IR_Topic_Description::add_subscription_reference: ")
59  ACE_TEXT("topic description %C added subscription %C at %x\n"),
60  this->name_.c_str(),
61  std::string(converter).c_str(),
62  subscription));
63  }
64 
65  break;
66 
67  case 1: {
68  OpenDDS::DCPS::RepoIdConverter converter(subscription->get_id());
70  ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Topic_Description::add_subscription_reference: ")
71  ACE_TEXT("topic description %C attempt to re-add subscription %C.\n"),
72  this->name_.c_str(),
73  std::string(converter).c_str()));
74  }
75  break;
76 
77  case -1: {
78  OpenDDS::DCPS::RepoIdConverter converter(subscription->get_id());
80  ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Topic_Description::add_subscription_reference: ")
81  ACE_TEXT("topic description %C failed to add subscription %C.\n"),
82  this->name_.c_str(),
83  std::string(converter).c_str()));
84  }
85  };
86 
87  return status;
88 }
89 
91 {
92  int status = subscriptionRefs_.remove(subscription);
93 
94  if (0 == status) {
96  OpenDDS::DCPS::RepoIdConverter converter(subscription->get_id());
98  ACE_TEXT("(%P|%t) DCPS_IR_Topic_Description::remove_subscription_reference: ")
99  ACE_TEXT("topic description %C removed subscription %C.\n"),
100  this->name_.c_str(),
101  std::string(converter).c_str()));
102  }
103 
104  } else {
105  OpenDDS::DCPS::RepoIdConverter converter(subscription->get_id());
107  ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Topic_Description::remove_subscription_reference: ")
108  ACE_TEXT("topic description %C failed to remove subscription %C.\n"),
109  this->name_.c_str(),
110  std::string(converter).c_str()));
111  } // if (0 == status)
112 
113  return status;
114 }
115 
117 {
118  int status = topics_.insert(topic);
119 
120  switch (status) {
121  case 0:
122 
124  OpenDDS::DCPS::RepoIdConverter converter(topic->get_id());
126  ACE_TEXT("(%P|%t) DCPS_IR_Topic_Description::add_topic: ")
127  ACE_TEXT("topic description %C added topic %C at %x.\n"),
128  this->name_.c_str(),
129  std::string(converter).c_str(),
130  topic));
131  }
132 
133  break;
134  case 1:
135 
137  OpenDDS::DCPS::RepoIdConverter converter(topic->get_id());
139  ACE_TEXT("(%P|%t) WARNING: DCPS_IR_Topic_Description::add_topic: ")
140  ACE_TEXT("topic description %C attempt to re-add topic %C.\n"),
141  this->name_.c_str(),
142  std::string(converter).c_str()));
143  }
144 
145  break;
146  case -1: {
147  OpenDDS::DCPS::RepoIdConverter converter(topic->get_id());
149  ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Topic_Description::add_topic: ")
150  ACE_TEXT("topic description %C failed to add topic %C.\n"),
151  this->name_.c_str(),
152  std::string(converter).c_str()));
153  }
154  break;
155  };
156 
157  return status;
158 }
159 
161 {
162  int status = topics_.remove(topic);
163 
164  if (0 == status) {
166  OpenDDS::DCPS::RepoIdConverter converter(topic->get_id());
168  ACE_TEXT("(%P|%t) DCPS_IR_Topic_Description::remove_topic: ")
169  ACE_TEXT("topic description %C removed topic %C.\n"),
170  this->name_.c_str(),
171  std::string(converter).c_str()));
172  }
173 
174  } else {
175  OpenDDS::DCPS::RepoIdConverter converter(topic->get_id());
177  ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Topic_Description::remove_topic: ")
178  ACE_TEXT("topic description failed to remove topic %C.\n"),
179  this->name_.c_str(),
180  std::string(converter).c_str()));
181  }
182 
183  return status;
184 }
185 
187 {
188  DCPS_IR_Topic* topic = 0;
189 
190  if (0 < topics_.size()) {
192  topic = *iter;
193 
195  OpenDDS::DCPS::RepoIdConverter converter(topic->get_id());
197  ACE_TEXT("(%P|%t) DCPS_IR_Topic_Description::get_first_topic: ")
198  ACE_TEXT("topic description %C first topic %C.\n"),
199  this->name_.c_str(),
200  std::string(converter).c_str()));
201  }
202  }
203 
204  return topic;
205 }
206 
208 {
209  // for each subscription check for compatibility
210  DCPS_IR_Subscription* subscription = 0;
212 
215 
216  while (iter != end) {
217  subscription = *iter;
218  ++iter;
219  try_associate(publication, subscription);
220 
221  // Check the subscriptions QOS status
222  qosStatus = subscription->get_incompatibleQosStatus();
223 
224  if (0 < qosStatus->count_since_last_send) {
225  subscription->update_incompatible_qos();
226  }
227  }
228 
229  // Check the publications QOS status
230  qosStatus = publication->get_incompatibleQosStatus();
231 
232  if (0 < qosStatus->count_since_last_send) {
233  publication->update_incompatible_qos();
234  }
235 }
236 
238 {
239  // check all topics for compatible publications
240 
241  DCPS_IR_Topic* topic = 0;
242 
245 
246  while (iter != end) {
247  topic = *iter;
248  ++iter;
249 
250  topic->try_associate(subscription);
251  }
252 
253  // Check the subscriptions QOS status
255  subscription->get_incompatibleQosStatus();
256 
257  if (0 < qosStatus->total_count) {
259  OpenDDS::DCPS::RepoIdConverter converter(subscription->get_id());
261  ACE_TEXT("(%P|%t) DCPS_IR_Topic_Description::try_associate_subscription: ")
262  ACE_TEXT("topic description %C has %d incompatible publications ")
263  ACE_TEXT("with subscription %C.\n"),
264  this->name_.c_str(),
265  qosStatus->total_count,
266  std::string(converter).c_str()));
267  }
268 
269  subscription->update_incompatible_qos();
270  }
271 }
272 
273 bool
275  DCPS_IR_Subscription* subscription)
276 {
277  if (publication->is_subscription_ignored(subscription->get_participant_id(),
278  subscription->get_topic_id(),
279  subscription->get_id())) {
281  OpenDDS::DCPS::RepoIdConverter pub_converter(publication->get_id());
282  OpenDDS::DCPS::RepoIdConverter sub_converter(subscription->get_id());
284  ACE_TEXT("(%P|%t) DCPS_IR_Topic_Description::try_associate: ")
285  ACE_TEXT("topic description %C publication %C ignores subscription %C.\n"),
286  this->name_.c_str(),
287  std::string(pub_converter).c_str(),
288  std::string(sub_converter).c_str()));
289  }
290  }
291 
292  else if (subscription->is_publication_ignored(publication->get_participant_id(),
293  publication->get_topic_id(),
294  publication->get_id())) {
296  OpenDDS::DCPS::RepoIdConverter pub_converter(publication->get_id());
297  OpenDDS::DCPS::RepoIdConverter sub_converter(subscription->get_id());
299  ACE_TEXT("(%P|%t) DCPS_IR_Topic_Description::try_associate: ")
300  ACE_TEXT("topic description %C subscription %C ignores publication %C.\n"),
301  this->name_.c_str(),
302  std::string(pub_converter).c_str(),
303  std::string(sub_converter).c_str()));
304  }
305 
306  } else {
308  OpenDDS::DCPS::RepoIdConverter pub_converter(publication->get_id());
309  OpenDDS::DCPS::RepoIdConverter sub_converter(subscription->get_id());
311  ACE_TEXT("(%P|%t) DCPS_IR_Topic_Description::try_associate: ")
312  ACE_TEXT("topic description %C checking compatibility of ")
313  ACE_TEXT("publication %C with subscription %C.\n"),
314  this->name_.c_str(),
315  std::string(pub_converter).c_str(),
316  std::string(sub_converter).c_str()));
317  }
318 
320  subscription->get_incompatibleQosStatus(),
321  publication->get_transportLocatorSeq(),
322  subscription->get_transportLocatorSeq(),
323  publication->get_datawriter_qos(),
324  subscription->get_datareader_qos(),
325  publication->get_publisher_qos(),
326  subscription->get_subscriber_qos())) {
327  associate(publication, subscription);
328  return true;
329  }
330 
331  // Dont notify that there is an incompatible qos here
332  // notify where we can distinguish which one is being added
333  // so we only send one response(with all incompatible qos) to it
334  }
335 
336  return false;
337 }
338 
340  DCPS_IR_Subscription* subscription)
341 {
343  OpenDDS::DCPS::RepoIdConverter pub_converter(publication->get_id());
344  OpenDDS::DCPS::RepoIdConverter sub_converter(subscription->get_id());
346  ACE_TEXT("(%P|%t) DCPS_IR_Topic_Description::associate: ")
347  ACE_TEXT("topic description %C associating ")
348  ACE_TEXT("publication %C with subscription %C.\n"),
349  this->name_.c_str(),
350  std::string(pub_converter).c_str(),
351  std::string(sub_converter).c_str()));
352  }
353 
354  // The publication must be told first because it will be the connector
355  // if a data link needs to be created.
356  // This is only required if the publication and subscription are being
357  // handed by the same process and thread. Order when there is
358  // another thread or process is not important.
359  // Note: the client thread may process the add_associations() oneway
360  // call instead of the ORB thread because it is currently
361  // in a two-way call to the Repo.
362  int error = publication->add_associated_subscription(subscription, true);
363 
364  // If there was no TAO error contacting the publication (This can happen if
365  // an old publisher has exited non-gracefully)
366  if (error != -1) {
367  // Associate the subscription with the publication
368  subscription->add_associated_publication(publication, false);
369  } else {
370  ACE_DEBUG((LM_INFO, ACE_TEXT("Invalid publication detected, NOT notifying subscription of association\n")));
371  }
372 }
373 
375 {
376  DCPS_IR_Topic* topic = 0;
377 
380 
381  while (iter != end) {
382  topic = *iter;
383  ++iter;
384 
385  topic->reevaluate_associations(subscription);
386  }
387 }
388 
390 {
391  DCPS_IR_Subscription * sub = 0;
394 
395  while (iter != end) {
396  sub = *iter;
397  ++iter;
398  publication->reevaluate_association(sub);
399  sub->reevaluate_association(publication);
400  }
401 }
402 
404 {
405  return name_.c_str();
406 }
407 
409 {
410  return dataTypeName_.c_str();
411 }
412 
414 {
415  return static_cast<CORBA::ULong>(topics_.size());
416 }
417 
418 std::string
420  int depth) const
421 {
422  std::string str;
423 #if !defined (OPENDDS_INFOREPO_REDUCED_FOOTPRINT)
424  for (int i=0; i < depth; i++)
425  str += prefix;
426  std::string indent = str + prefix;
427  str += "DCPS_IR_Topic_Description [";
428  str += name_.c_str();
429  str += "][";
430  str += dataTypeName_.c_str();
431  str += "]\n";
432 
433  str += indent + "Subscription References [ ";
435  sub != subscriptionRefs_.end();
436  sub++)
437  {
438  OpenDDS::DCPS::RepoIdConverter sub_converter((*sub)->get_id());
439  str += std::string(sub_converter);
440  str += " ";
441  }
442  str += "]\n";
443 
444  str += indent + "Topics [ ";
446  top != topics_.end();
447  top++)
448  {
449  OpenDDS::DCPS::RepoIdConverter top_converter((*top)->get_id());
450  str += std::string(top_converter);
451  str += " ";
452  }
453  str += "]\n";
454 #endif // !defined (OPENDDS_INFOREPO_REDUCED_FOOTPRINT)
455  return str;
456 }
457 
#define ACE_DEBUG(X)
void try_associate_publication(DCPS_IR_Publication *publication)
#define ACE_ERROR(X)
const char * c_str(void) const
OpenDDS::DCPS::GUID_t get_participant_id()
DCPS_IR_Topic_Description(DCPS_IR_Domain *domain, const char *name, const char *dataTypeName)
LM_INFO
OpenDDS::DCPS::GUID_t get_id()
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
OpenDDS::DCPS::GUID_t get_id() const
CORBA::ULong get_number_topics() const
Returns the number of topics.
int add_topic(DCPS_IR_Topic *topic)
bool compatibleQOS(OpenDDS::DCPS::IncompatibleQosStatus *writerStatus, OpenDDS::DCPS::IncompatibleQosStatus *readerStatus, const OpenDDS::DCPS::TransportLocatorSeq &pubTLS, const OpenDDS::DCPS::TransportLocatorSeq &subTLS, DDS::DataWriterQos const *const writerQos, DDS::DataReaderQos const *const readerQos, DDS::PublisherQos const *const pubQos, DDS::SubscriberQos const *const subQos)
Definition: DCPS_Utils.cpp:237
OpenDDS::DCPS::GUID_t get_topic_id()
void publish_subscription_bit(DCPS_IR_Subscription *subscription)
Publish Subscription in the Subscription Built-In Topic.
DCPS_IR_Subscription_Set subscriptionRefs_
void try_associate_subscription(DCPS_IR_Subscription *subscription)
OpenDDS::DCPS::IncompatibleQosStatus * get_incompatibleQosStatus()
OpenDDS::DCPS::TransportLocatorSeq get_transportLocatorSeq() const
LM_DEBUG
int remove_subscription_reference(DCPS_IR_Subscription *subscription)
int add_associated_subscription(DCPS_IR_Subscription *sub, bool active)
ACE_CDR::ULong ULong
void associate(DCPS_IR_Publication *publication, DCPS_IR_Subscription *subscription)
Associate the publication and subscription.
OpenDDS::DCPS::IncompatibleQosStatus * get_incompatibleQosStatus()
int remove_topic(DCPS_IR_Topic *topic)
Representative of a Topic.
Definition: DCPS_IR_Topic.h:43
CORBA::Boolean is_subscription_ignored(OpenDDS::DCPS::GUID_t partId, OpenDDS::DCPS::GUID_t topicId, OpenDDS::DCPS::GUID_t subId)
bool reevaluate_association(DCPS_IR_Subscription *subscription)
LM_WARNING
DDS::PublisherQos * get_publisher_qos()
OpenDDS::DCPS::GUID_t get_id()
const char *const name
Definition: debug.cpp:60
void try_associate(DCPS_IR_Subscription *subscription)
DCPS_IR_Topic * get_first_topic()
Gets the first topic in the topic list.
ACE_TEXT("TCP_Factory")
void reevaluate_associations(DCPS_IR_Publication *publication)
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
CORBA::Boolean is_publication_ignored(OpenDDS::DCPS::GUID_t partId, OpenDDS::DCPS::GUID_t topicId, OpenDDS::DCPS::GUID_t pubId)
bool try_associate(DCPS_IR_Publication *publication, DCPS_IR_Subscription *subscription)
OpenDDS::DCPS::GUID_t get_participant_id()
const DDS::SubscriberQos * get_subscriber_qos()
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
int add_associated_publication(DCPS_IR_Publication *pub, bool active)
void reevaluate_associations(DCPS_IR_Subscription *subscription)
std::string dump_to_string(const std::string &prefix, int depth) const
Representative of a Subscription.
OpenDDS::DCPS::GUID_t get_topic_id()
Representation of a Domain in the system.
OpenDDS::DCPS::TransportLocatorSeq get_transportLocatorSeq() const
LM_ERROR
int add_subscription_reference(DCPS_IR_Subscription *subscription, bool associate=true)
Representative of a Publication.
DDS::DataWriterQos * get_datawriter_qos()
bool reevaluate_association(DCPS_IR_Publication *publication)
const DDS::DataReaderQos * get_datareader_qos()