OpenDDS  Snapshot(2023/04/28-20:55)
StaticDiscovery.cpp
Go to the documentation of this file.
1 #include <DCPS/DdsDcps_pch.h> // Only the _pch include should start with DCPS/
2 
3 #include "StaticDiscovery.h"
4 
5 #include "debug.h"
6 #include "ConfigUtils.h"
8 #include "Marked_Default_Qos.h"
9 #include "SubscriberImpl.h"
10 #include "BuiltInTopicUtils.h"
11 #include "Registered_Data_Types.h"
12 #include "Qos_Helper.h"
13 #include "DataWriterImpl.h"
14 #include "DcpsUpcalls.h"
17 
18 #include <ctype.h>
19 
21 
22 namespace OpenDDS {
23 namespace DCPS {
24 
25 namespace {
26  const size_t BYTES_IN_PARTICIPANT = 6;
27  const size_t HEX_DIGITS_IN_PARTICIPANT = 2 * BYTES_IN_PARTICIPANT;
28  const size_t BYTES_IN_ENTITY = 3;
29  const size_t HEX_DIGITS_IN_ENTITY = 2 * BYTES_IN_ENTITY;
30  const size_t TYPE_NAME_MAX = 128;
31 }
32 
34 {
35  for (WriterMapType::iterator wp = writer_map.begin(), wp_limit = writer_map.end();
36  wp != wp_limit;
37  ++wp) {
38  const GUID_t& writerid = wp->first;
39  Writer& writer = wp->second;
40  for (ReaderMapType::iterator rp = reader_map.begin(), rp_limit = reader_map.end();
41  rp != rp_limit;
42  ++rp) {
43  const GUID_t& readerid = rp->first;
44  Reader& reader = rp->second;
45 
46  if (StaticDiscGuidDomainEqual()(readerid.guidPrefix, writerid.guidPrefix) &&
47  !StaticDiscGuidPartEqual()(readerid.guidPrefix, writerid.guidPrefix) &&
48  reader.topic_name == writer.topic_name) {
49  // Different participants, same topic.
50  IncompatibleQosStatus writerStatus = {0, 0, 0, DDS::QosPolicyCountSeq()};
51  IncompatibleQosStatus readerStatus = {0, 0, 0, DDS::QosPolicyCountSeq()};
52  const TransportLocatorSeq& writer_trans_info = writer.trans_info;
53  const TransportLocatorSeq& reader_trans_info = reader.trans_info;
54  const DDS::DataWriterQos& writer_qos = writer.qos;
55  const DDS::DataReaderQos& reader_qos = reader.qos;
56  const DDS::PublisherQos& publisher_qos = writer.publisher_qos;
57  const DDS::SubscriberQos& subscriber_qos = reader.subscriber_qos;
58 
59  if (compatibleQOS(&writerStatus, &readerStatus, writer_trans_info, reader_trans_info,
60  &writer_qos, &reader_qos, &publisher_qos, &subscriber_qos)) {
61  switch (reader.qos.reliability.kind) {
63  writer.best_effort_readers.insert(readerid);
64  reader.best_effort_writers.insert(writerid);
65  break;
67  writer.reliable_readers.insert(readerid);
68  reader.reliable_writers.insert(writerid);
69  break;
70  }
71  }
72  }
73  }
74  }
75 }
76 
78  ACE_Thread_Mutex& lock,
79  const EndpointRegistry& registry,
80  StaticParticipant& participant)
81  : lock_(lock)
82  , participant_id_(participant_id)
83  , topic_counter_(0)
84  , registry_(registry)
85 #ifndef DDS_HAS_MINIMUM_BIT
86  , participant_(participant)
87 #endif
88  , max_type_lookup_service_reply_period_(0)
89  , type_lookup_service_sequence_number_(0)
90 {
91 #ifdef DDS_HAS_MINIMUM_BIT
92  ACE_UNUSED_ARG(participant);
93 #endif
95 }
96 
98 {
100 }
101 
103 {
104  // Discover all remote publications and subscriptions.
105 
106  for (EndpointRegistry::WriterMapType::const_iterator pos = registry_.writer_map.begin(),
107  limit = registry_.writer_map.end();
108  pos != limit;
109  ++pos) {
110  const GUID_t& remoteid = pos->first;
111  const EndpointRegistry::Writer& writer = pos->second;
112 
113  if (!equal_guid_prefixes(participant_id_, remoteid)) {
114  const DDS::BuiltinTopicKey_t key = guid_to_bit_key(remoteid);
115 
116  // pos represents a remote.
117  // Populate data.
119 
120  data.key = key;
121  OPENDDS_STRING topic_name = writer.topic_name;
122  data.topic_name = topic_name.c_str();
123  const EndpointRegistry::Topic& topic = registry_.topic_map.find(topic_name)->second;
124  data.type_name = topic.type_name.c_str();
125  data.durability = writer.qos.durability;
126  data.durability_service = writer.qos.durability_service;
127  data.deadline = writer.qos.deadline;
128  data.latency_budget = writer.qos.latency_budget;
129  data.liveliness = writer.qos.liveliness;
130  data.reliability = writer.qos.reliability;
131  data.lifespan = writer.qos.lifespan;
132  data.user_data = writer.qos.user_data;
133  data.ownership = writer.qos.ownership;
134  data.ownership_strength = writer.qos.ownership_strength;
135  data.destination_order = writer.qos.destination_order;
136  data.presentation = writer.publisher_qos.presentation;
137  data.partition = writer.publisher_qos.partition;
138  // If the TopicQos becomes available, this can be populated.
139  //data.topic_data = topic_details.qos_.topic_data;
140  data.group_data = writer.publisher_qos.group_data;
141  data.representation = writer.qos.representation;
142 
143 #ifndef DDS_HAS_MINIMUM_BIT
145  if (bit) { // bit may be null if the DomainParticipant is shutting down
147  }
148 #endif /* DDS_HAS_MINIMUM_BIT */
149  }
150  }
151 
152  for (EndpointRegistry::ReaderMapType::const_iterator pos = registry_.reader_map.begin(),
153  limit = registry_.reader_map.end();
154  pos != limit;
155  ++pos) {
156  const GUID_t& remoteid = pos->first;
157  const EndpointRegistry::Reader& reader = pos->second;
158 
159  if (!equal_guid_prefixes(participant_id_, remoteid)) {
160  const DDS::BuiltinTopicKey_t key = guid_to_bit_key(remoteid);
161 
162  // pos represents a remote.
163  // Populate data.
165 
166  data.key = key;
167  OPENDDS_STRING topic_name = reader.topic_name;
168  data.topic_name = topic_name.c_str();
169  const EndpointRegistry::Topic& topic = registry_.topic_map.find(topic_name)->second;
170  data.type_name = topic.type_name.c_str();
171  data.durability = reader.qos.durability;
172  data.deadline = reader.qos.deadline;
173  data.latency_budget = reader.qos.latency_budget;
174  data.liveliness = reader.qos.liveliness;
175  data.reliability = reader.qos.reliability;
176  data.ownership = reader.qos.ownership;
177  data.destination_order = reader.qos.destination_order;
178  data.user_data = reader.qos.user_data;
179  data.time_based_filter = reader.qos.time_based_filter;
180  data.presentation = reader.subscriber_qos.presentation;
181  data.partition = reader.subscriber_qos.partition;
182  // // If the TopicQos becomes available, this can be populated.
183  //data.topic_data = topic_details.qos_.topic_data;
184  data.group_data = reader.subscriber_qos.group_data;
185  data.representation = reader.qos.representation;
186 
187 #ifndef DDS_HAS_MINIMUM_BIT
189  if (bit) { // bit may be null if the DomainParticipant is shutting down
191  }
192 #endif /* DDS_HAS_MINIMUM_BIT */
193  }
194  }
195 }
196 
198  const GUID_t& /*topicId*/,
199  const DDS::DataWriterQos& qos)
200 {
201  if (qos.user_data.value.length() != BYTES_IN_ENTITY) {
202  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: StaticEndpointManager::assign_publication_key: no user data to identify writer\n")));
203  return;
204  }
205 
206  rid.entityId.entityKey[0] = qos.user_data.value[0];
207  rid.entityId.entityKey[1] = qos.user_data.value[1];
208  rid.entityId.entityKey[2] = qos.user_data.value[2];
210 
211  if (DCPS_debug_level > 8) {
212  ACE_DEBUG((LM_INFO, "(%P|%t) looking up writer ID %C\n",
213  LogGuid(rid).c_str()));
214  }
215 
216  EndpointRegistry::WriterMapType::const_iterator pos = registry_.writer_map.find(rid);
217  if (pos == registry_.writer_map.end()) {
218  ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: StaticEndpointManager::assign_publication_key: unknown writer: %C\n"), LogGuid(rid).c_str()));
219  return;
220  }
221 
222  DDS::DataWriterQos qos2(qos);
223  // Qos in registry will not have the user data so overwrite.
224  qos2.user_data = pos->second.qos.user_data;
225 
226  DDS::DataWriterQos qos3(pos->second.qos);
227 
228  if (qos2 != qos3) {
229  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: StaticEndpointManager::assign_publication_key: dynamic and static QoS differ\n")));
230  }
231 }
232 
234  const GUID_t& /*topicId*/,
235  const DDS::DataReaderQos& qos)
236 {
237  if (qos.user_data.value.length() != BYTES_IN_ENTITY) {
238  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: StaticEndpointManager::assign_subscription_key: no user data to identify reader\n")));
239  return;
240  }
241 
242  rid.entityId.entityKey[0] = qos.user_data.value[0];
243  rid.entityId.entityKey[1] = qos.user_data.value[1];
244  rid.entityId.entityKey[2] = qos.user_data.value[2];
246 
247  EndpointRegistry::ReaderMapType::const_iterator pos = registry_.reader_map.find(rid);
248  if (pos == registry_.reader_map.end()) {
249  ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: StaticEndpointManager::assign_subscription_key: unknown reader: %C\n"), LogGuid(rid).c_str()));
250  return;
251  }
252 
253  DDS::DataReaderQos qos2(qos);
254  // Qos in registry will not have the user data so overwrite.
255  qos2.user_data = pos->second.qos.user_data;
256 
257  DDS::DataReaderQos qos3(pos->second.qos);
258 
259  if (qos2 != qos3) {
260  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: StaticEndpointManager::assign_subscription_key: dynamic and static QoS differ\n")));
261  }
262 }
263 
264 bool
266  const DDS::TopicQos& /*qos*/)
267 {
268  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: StaticEndpointManager::update_topic_qos - ")
269  ACE_TEXT("Not allowed\n")));
270  return false;
271 }
272 
273 bool
275  const DDS::DataWriterQos& /*qos*/,
276  const DDS::PublisherQos& /*publisherQos*/)
277 {
278  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: StaticEndpointManager::update_publication_qos - ")
279  ACE_TEXT("Not allowed\n")));
280  return false;
281 }
282 
283 bool
285  const DDS::DataReaderQos& /*qos*/,
286  const DDS::SubscriberQos& /*subscriberQos*/)
287 {
288  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: StaticEndpointManager::update_subscription_qos - ")
289  ACE_TEXT("Not allowed\n")));
290  return false;
291 }
292 
293 bool
295  const DDS::StringSeq& /*params*/)
296 {
297  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: StaticEndpointManager::update_subscription_qos - ")
298  ACE_TEXT("Not allowed\n")));
299  return false;
300 }
301 
302 bool
304 {
305  ACE_DEBUG((LM_NOTICE, ACE_TEXT("(%P|%t) StaticEndpointManager::disassociate TODO\n")));
306  // TODO
307  return false;
308 }
309 
312  LocalPublication& pub)
313 {
314  /*
315  Find all matching remote readers.
316  If the reader is best effort, then associate immediately.
317  If the reader is reliable (we are reliable by implication), register with the transport to receive notification that the remote reader is up.
318  */
319  EndpointRegistry::WriterMapType::const_iterator pos = registry_.writer_map.find(writerid);
320  if (pos == registry_.writer_map.end()) {
321  return DDS::RETCODE_ERROR;
322  }
323  const EndpointRegistry::Writer& writer = pos->second;
324 
325  for (RepoIdSet::const_iterator pos = writer.best_effort_readers.begin(), limit = writer.best_effort_readers.end();
326  pos != limit;
327  ++pos) {
328  const GUID_t& readerid = *pos;
329  const EndpointRegistry::Reader& reader = registry_.reader_map.find(readerid)->second;
330 
331  const ReaderAssociation ra =
332  {reader.trans_info, TransportLocator(), 0, readerid, reader.subscriber_qos, reader.qos, "", "", 0, 0, {0, 0}};
334  if (pl) {
335  pl->add_association(writerid, ra, true);
336  }
337  }
338 
339  for (RepoIdSet::const_iterator pos = writer.reliable_readers.begin(), limit = writer.reliable_readers.end();
340  pos != limit;
341  ++pos) {
342  const GUID_t& readerid = *pos;
343  const EndpointRegistry::Reader& reader = registry_.reader_map.find(readerid)->second;
345  if (pl) {
346  pl->register_for_reader(participant_id_, writerid, readerid, reader.trans_info, this);
347  }
348  }
349 
350  return DDS::RETCODE_OK;
351 }
352 
355 {
356  EndpointRegistry::WriterMapType::const_iterator pos = registry_.writer_map.find(writerid);
357  if (pos == registry_.writer_map.end()) {
358  return DDS::RETCODE_ERROR;
359  }
360 
361  const EndpointRegistry::Writer& writer = pos->second;
362 
363  ReaderIdSeq ids;
364  ids.length((CORBA::ULong)writer.reliable_readers.size());
365  CORBA::ULong idx = 0;
366  for (RepoIdSet::const_iterator pos = writer.reliable_readers.begin(), limit = writer.reliable_readers.end();
367  pos != limit;
368  ++pos, ++idx) {
369  const GUID_t& readerid = *pos;
370  ids[idx] = readerid;
372  if (pl) {
373  pl->unregister_for_reader(participant_id_, writerid, readerid);
374  }
375  }
376 
377  return DDS::RETCODE_OK;
378 }
379 
382  LocalSubscription& sub)
383 {
384  /*
385  Find all matching remote writers.
386  If we (the reader) is best effort, then associate immediately.
387  If we (the reader) are reliable, then register with the transport to receive notification that the remote writer is up.
388  */
389  EndpointRegistry::ReaderMapType::const_iterator pos = registry_.reader_map.find(readerid);
390  if (pos == registry_.reader_map.end()) {
391  return DDS::RETCODE_ERROR;
392  }
393  const EndpointRegistry::Reader& reader = pos->second;
394 
395  for (RepoIdSet::const_iterator pos = reader.best_effort_writers.begin(), limit = reader.best_effort_writers.end();
396  pos != limit;
397  ++pos) {
398  const GUID_t& writerid = *pos;
399  const EndpointRegistry::Writer& writer = registry_.writer_map.find(writerid)->second;
400 
401  DDS::OctetSeq type_info;
402  const WriterAssociation wa = {
403  writer.trans_info, TransportLocator(), 0, writerid, writer.publisher_qos, writer.qos, type_info, {0, 0}
404  };
406  if (sl) {
407  sl->add_association(readerid, wa, false);
408  }
409  }
410 
411  for (RepoIdSet::const_iterator pos = reader.reliable_writers.begin(), limit = reader.reliable_writers.end();
412  pos != limit;
413  ++pos) {
414  const GUID_t& writerid = *pos;
415  const EndpointRegistry::Writer& writer = registry_.writer_map.find(writerid)->second;
417  if (sl) {
418  sl->register_for_writer(participant_id_, readerid, writerid, writer.trans_info, this);
419  }
420  }
421 
422  return DDS::RETCODE_OK;
423 }
424 
426  const GUID_t& readerid, LocalSubscription& sub)
427 {
428  EndpointRegistry::ReaderMapType::const_iterator pos = registry_.reader_map.find(readerid);
429  if (pos == registry_.reader_map.end()) {
430  return DDS::RETCODE_ERROR;
431  }
432 
433  const EndpointRegistry::Reader& reader = pos->second;
434 
435  WriterIdSeq ids;
436  ids.length((CORBA::ULong)reader.reliable_writers.size());
437  CORBA::ULong idx = 0;
438  for (RepoIdSet::const_iterator pos = reader.reliable_writers.begin(), limit = reader.reliable_writers.end();
439  pos != limit;
440  ++pos, ++idx) {
441  const GUID_t& writerid = *pos;
442  ids[idx] = writerid;
444  if (sl) {
445  sl->unregister_for_writer(participant_id_, readerid, writerid);
446  }
447  }
448 
449  return DDS::RETCODE_OK;
450 }
451 
452 bool
454 {
455  // We can't propagate associated writers via SEDP announcments if we're
456  // using static discovery, so nobody ought to be "expecting" them
457  return false;
458 }
459 
460 bool
462 {
463  ACE_DEBUG((LM_NOTICE, ACE_TEXT("(%P|%t) StaticEndpointManager::shutting_down TODO\n")));
464  // TODO
465  return false;
466 }
467 
468 void
470  DiscoveredSubscriptionIter& /*iter*/,
471  const GUID_t& /*reader*/)
472 {
473  ACE_DEBUG((LM_NOTICE, ACE_TEXT("(%P|%t) StaticEndpointManager::populate_transport_locator_sequence TODO\n")));
474  // TODO
475 }
476 
477 void
479  DiscoveredPublicationIter& /*iter*/,
480  const GUID_t& /*reader*/)
481 {
482  ACE_DEBUG((LM_NOTICE, ACE_TEXT("(%P|%t) StaticEndpointManager::populate_transport_locator_sequence TODO\n")));
483  // TODO
484 }
485 
486 void
487 StaticEndpointManager::reader_exists(const GUID_t& readerid, const GUID_t& writerid)
488 {
490  LocalPublicationMap::const_iterator lp_pos = local_publications_.find(writerid);
491  EndpointRegistry::ReaderMapType::const_iterator reader_pos = registry_.reader_map.find(readerid);
492  if (lp_pos != local_publications_.end() &&
493  reader_pos != registry_.reader_map.end()) {
494  DataWriterCallbacks_rch dwr = lp_pos->second.publication_.lock();
495  if (dwr) {
496  const ReaderAssociation ra =
497  {reader_pos->second.trans_info, TransportLocator(), 0, readerid, reader_pos->second.subscriber_qos, reader_pos->second.qos,
498  "", "", DDS::StringSeq(), DDS::OctetSeq(), {0, 0}};
499  dwr->add_association(writerid, ra, true);
500  }
501  }
502 }
503 
504 void
506 {
508  LocalPublicationMap::const_iterator lp_pos = local_publications_.find(writerid);
509  EndpointRegistry::ReaderMapType::const_iterator reader_pos = registry_.reader_map.find(readerid);
510  if (lp_pos != local_publications_.end() &&
511  reader_pos != registry_.reader_map.end()) {
512  DataWriterCallbacks_rch dwr = lp_pos->second.publication_.lock();
513  if (dwr) {
514  ReaderIdSeq ids;
515  ids.length(1);
516  ids[0] = readerid;
517  dwr->remove_associations(ids, true);
518  }
519  }
520 }
521 
522 void
523 StaticEndpointManager::writer_exists(const GUID_t& writerid, const GUID_t& readerid)
524 {
526  LocalSubscriptionMap::const_iterator ls_pos = local_subscriptions_.find(readerid);
527  EndpointRegistry::WriterMapType::const_iterator writer_pos = registry_.writer_map.find(writerid);
528  if (ls_pos != local_subscriptions_.end() &&
529  writer_pos != registry_.writer_map.end()) {
530  DataReaderCallbacks_rch drr = ls_pos->second.subscription_.lock();
531  if (drr) {
532  const WriterAssociation wa =
533  {writer_pos->second.trans_info, TransportLocator(), 0, writerid, writer_pos->second.publisher_qos, writer_pos->second.qos, DDS::OctetSeq(), {0,0}};
534  drr->add_association(readerid, wa, false);
535  }
536  }
537 }
538 
539 void
541 {
543  LocalSubscriptionMap::const_iterator ls_pos = local_subscriptions_.find(readerid);
544  EndpointRegistry::WriterMapType::const_iterator writer_pos = registry_.writer_map.find(writerid);
545  if (ls_pos != local_subscriptions_.end() &&
546  writer_pos != registry_.writer_map.end()) {
547  DataReaderCallbacks_rch drr = ls_pos->second.subscription_.lock();
548  if (drr) {
549  WriterIdSeq ids;
550  ids.length(1);
551  ids[0] = writerid;
552  drr->remove_associations(ids, true);
553  }
554  }
555 }
556 
558  const XTypes::TypeIdentifier& /*ti*/,
559  bool /*secure*/)
560 {
561  // Do nothing.
562 }
563 
564 #ifndef DDS_HAS_MINIMUM_BIT
567 {
568  DDS::Subscriber_var sub = participant_.bit_subscriber();
569  if (!sub.in())
570  return 0;
571 
572  DDS::DataReader_var d = sub->lookup_datareader(BUILT_IN_PUBLICATION_TOPIC);
573  return dynamic_cast<OpenDDS::DCPS::PublicationBuiltinTopicDataDataReaderImpl*>(d.in());
574 }
575 
578 {
579  DDS::Subscriber_var sub = participant_.bit_subscriber();
580  if (!sub.in())
581  return 0;
582 
583  DDS::DataReader_var d = sub->lookup_datareader(BUILT_IN_SUBSCRIPTION_TOPIC);
584  return dynamic_cast<OpenDDS::DCPS::SubscriptionBuiltinTopicDataDataReaderImpl*>(d.in());
585 }
586 #endif /* DDS_HAS_MINIMUM_BIT */
587 
589 {
592  DCPS::make_rch<StaticEndpointManagerSporadic>(TheServiceParticipant->time_source(), reactor_interceptor,
594  }
595 }
596 
598 {
602  }
603 }
604 
607 {
609 }
610 
612 {
613  TopicDetailsMap::iterator top_it = topics_.find(topic_name);
614  topic_names_.erase(top_it->second.topic_id());
615  topics_.erase(top_it);
616 }
617 
618 void StaticEndpointManager::ignore(const GUID_t& to_ignore)
619 {
620  // Locked prior to call from Spdp.
621  ignored_guids_.insert(to_ignore);
622  {
623  const DiscoveredPublicationIter iter = discovered_publications_.find(to_ignore);
624  if (iter != discovered_publications_.end()) {
625  // clean up tracking info
626  const String topic_name = iter->second.get_topic_name();
627  TopicDetails& td = topics_[topic_name];
628  td.remove_discovered_publication(to_ignore);
629  remove_from_bit(iter->second);
630  discovered_publications_.erase(iter);
631  // break associations
632  match_endpoints(to_ignore, td, true /*remove*/);
633  if (td.is_dead()) {
634  purge_dead_topic(topic_name);
635  }
636  return;
637  }
638  }
639  {
640  const DiscoveredSubscriptionIter iter =
641  discovered_subscriptions_.find(to_ignore);
642  if (iter != discovered_subscriptions_.end()) {
643  // clean up tracking info
644  const String topic_name = iter->second.get_topic_name();
645  TopicDetails& td = topics_[topic_name];
646  td.remove_discovered_publication(to_ignore);
647  remove_from_bit(iter->second);
648  discovered_subscriptions_.erase(iter);
649  // break associations
650  match_endpoints(to_ignore, td, true /*remove*/);
651  if (td.is_dead()) {
652  purge_dead_topic(topic_name);
653  }
654  return;
655  }
656  }
657  {
659  iter = topic_names_.find(to_ignore);
660  if (iter != topic_names_.end()) {
661  ignored_topics_.insert(iter->second);
662  // Remove all publications and subscriptions on this topic
663  TopicDetails& td = topics_[iter->second];
664  {
665  const RepoIdSet ids = td.discovered_publications();
666  for (RepoIdSet::const_iterator ep = ids.begin(); ep!= ids.end(); ++ep) {
667  match_endpoints(*ep, td, true /*remove*/);
669  // TODO: Do we need to remove from discovered_subscriptions?
670  if (shutting_down()) { return; }
671  }
672  }
673  {
674  const RepoIdSet ids = td.discovered_subscriptions();
675  for (RepoIdSet::const_iterator ep = ids.begin(); ep!= ids.end(); ++ep) {
676  match_endpoints(*ep, td, true /*remove*/);
678  // TODO: Do we need to remove from discovered_publications?
679  if (shutting_down()) { return; }
680  }
681  }
682  if (td.is_dead()) {
683  purge_dead_topic(iter->second);
684  }
685  }
686  }
687 }
688 
690 {
691  return ignored_guids_.count(guid);
692 }
693 bool StaticEndpointManager::ignoring(const char* topic_name) const
694 {
695  return ignored_topics_.count(topic_name);
696 }
697 
699  GUID_t& topicId, const char* topicName,
700  const char* dataTypeName, const DDS::TopicQos& qos,
701  bool hasDcpsKey, TopicCallbacks* topic_callbacks)
702 {
704  TopicDetailsMap::iterator iter = topics_.find(topicName);
705  if (iter != topics_.end()) {
706  if (iter->second.local_is_set() && iter->second.local_data_type_name() != dataTypeName) {
707  return CONFLICTING_TYPENAME;
708  }
709  topicId = iter->second.topic_id();
710  iter->second.set_local(dataTypeName, qos, hasDcpsKey, topic_callbacks);
711  return FOUND;
712  }
713 
714  TopicDetails& td = topics_[topicName];
715  topicId = make_topic_guid();
716  td.init(topicName, topicId);
717  topic_names_[topicId] = topicName;
718  td.set_local(dataTypeName, qos, hasDcpsKey, topic_callbacks);
719 
720  return CREATED;
721 }
722 
724  const char* topicName,
725  CORBA::String_out dataTypeName,
726  DDS::TopicQos_out qos,
727  GUID_t& topicId)
728 {
730  TopicDetailsMap::const_iterator iter = topics_.find(topicName);
731  if (iter == topics_.end()) {
732  return NOT_FOUND;
733  }
734 
735  const TopicDetails& td = iter->second;
736 
737  dataTypeName = td.local_data_type_name().c_str();
738  qos = new DDS::TopicQos(td.local_qos());
739  topicId = td.topic_id();
740  return FOUND;
741 }
742 
744 {
746  TopicNameMap::iterator name_iter = topic_names_.find(topicId);
747  if (name_iter == topic_names_.end()) {
748  return NOT_FOUND;
749  }
750  const String& name = name_iter->second;
751  TopicDetails& td = topics_[name];
752  td.unset_local();
753  if (td.is_dead()) {
754  purge_dead_topic(name);
755  }
756 
757  return REMOVED;
758 }
759 
761  const GUID_t& topicId,
762  DataWriterCallbacks_rch publication,
763  const DDS::DataWriterQos& qos,
764  const TransportLocatorSeq& transInfo,
765  const DDS::PublisherQos& publisherQos,
766  const XTypes::TypeInformation& type_info)
767 {
769 
770  GUID_t rid = participant_id_;
771  assign_publication_key(rid, topicId, qos);
773  pb.topic_id_ = topicId;
774  pb.publication_ = publication;
775  pb.qos_ = qos;
776  pb.trans_info_ = transInfo;
777  pb.publisher_qos_ = publisherQos;
778  pb.type_info_ = type_info;
779  const OPENDDS_STRING& topic_name = topic_names_[topicId];
780 
781  TopicDetails& td = topics_[topic_name];
782  td.add_local_publication(rid);
783 
784  if (DDS::RETCODE_OK != add_publication_i(rid, pb)) {
785  return GUID_t();
786  }
787 
788  if (DDS::RETCODE_OK != write_publication_data(rid, pb)) {
789  return GUID_t();
790  }
791 
792  if (DCPS_debug_level > 3) {
793  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) StaticEndpointManager::add_publication - ")
794  ACE_TEXT("calling match_endpoints\n")));
795  }
796  match_endpoints(rid, td);
797 
798  return rid;
799 }
800 
802 {
804  LocalPublicationIter iter = local_publications_.find(publicationId);
805  if (iter != local_publications_.end()) {
806  if (DDS::RETCODE_OK == remove_publication_i(publicationId, iter->second)) {
807  OPENDDS_STRING topic_name = topic_names_[iter->second.topic_id_];
808  local_publications_.erase(publicationId);
809  TopicDetailsMap::iterator top_it = topics_.find(topic_name);
810  if (top_it != topics_.end()) {
811  match_endpoints(publicationId, top_it->second, true /*remove*/);
812  top_it->second.remove_local_publication(publicationId);
813  // Local, no need to check for dead topic.
814  }
815  } else {
817  ACE_TEXT("(%P|%t) ERROR: StaticEndpointManager::remove_publication - ")
818  ACE_TEXT("Failed to publish dispose msg\n")));
819  }
820  }
821 }
822 
824  const GUID_t& publicationId, const TransportLocatorSeq& transInfo)
825 {
827  LocalPublicationIter iter = local_publications_.find(publicationId);
828  if (iter != local_publications_.end()) {
829  if (DCPS_debug_level > 3) {
831  ACE_TEXT("(%P|%t) StaticEndpointManager::update_publication_locators - updating locators for %C\n"),
832  LogGuid(publicationId).c_str()));
833  }
834  iter->second.trans_info_ = transInfo;
835  write_publication_data(publicationId, iter->second);
836  }
837 }
838 
840  const GUID_t& topicId,
841  DataReaderCallbacks_rch subscription,
842  const DDS::DataReaderQos& qos,
843  const TransportLocatorSeq& transInfo,
844  const DDS::SubscriberQos& subscriberQos,
845  const char* filterClassName,
846  const char* filterExpr,
847  const DDS::StringSeq& params,
848  const XTypes::TypeInformation& type_info)
849 {
851 
852  GUID_t rid = participant_id_;
853  assign_subscription_key(rid, topicId, qos);
855  sb.topic_id_ = topicId;
856  sb.subscription_ = subscription;
857  sb.qos_ = qos;
858  sb.trans_info_ = transInfo;
859  sb.subscriber_qos_ = subscriberQos;
860  sb.filterProperties.filterClassName = filterClassName;
861  sb.filterProperties.filterExpression = filterExpr;
863  sb.type_info_ = type_info;
864  const OPENDDS_STRING& topic_name = topic_names_[topicId];
865 
866  TopicDetails& td = topics_[topic_name];
867  td.add_local_subscription(rid);
868 
869  if (DDS::RETCODE_OK != add_subscription_i(rid, sb)) {
870  return GUID_t();
871  }
872 
873  if (DDS::RETCODE_OK != write_subscription_data(rid, sb)) {
874  return GUID_t();
875  }
876 
877  if (DCPS_debug_level > 3) {
878  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) StaticEndpointManager::add_subscription - ")
879  ACE_TEXT("calling match_endpoints\n")));
880  }
881  match_endpoints(rid, td);
882 
883  return rid;
884 }
885 
887 {
889  LocalSubscriptionIter iter = local_subscriptions_.find(subscriptionId);
890  if (iter != local_subscriptions_.end()) {
891  if (DDS::RETCODE_OK == remove_subscription_i(subscriptionId, iter->second)) {
892  OPENDDS_STRING topic_name = topic_names_[iter->second.topic_id_];
893  local_subscriptions_.erase(subscriptionId);
894  TopicDetailsMap::iterator top_it = topics_.find(topic_name);
895  if (top_it != topics_.end()) {
896  match_endpoints(subscriptionId, top_it->second, true /*remove*/);
897  top_it->second.remove_local_subscription(subscriptionId);
898  // Local, no need to check for dead topic.
899  }
900  } else {
902  ACE_TEXT("(%P|%t) ERROR: StaticEndpointManager::remove_subscription - ")
903  ACE_TEXT("Failed to publish dispose msg\n")));
904  }
905  }
906 }
907 
909  const GUID_t& subscriptionId,
910  const TransportLocatorSeq& transInfo)
911 {
913  LocalSubscriptionIter iter = local_subscriptions_.find(subscriptionId);
914  if (iter != local_subscriptions_.end()) {
915  if (DCPS_debug_level > 3) {
917  ACE_TEXT("(%P|%t) StaticEndpointManager::update_subscription_locators updating locators for %C\n"),
918  LogGuid(subscriptionId).c_str()));
919  }
920  iter->second.trans_info_ = transInfo;
921  write_subscription_data(subscriptionId, iter->second);
922  }
923 }
924 
925 // TODO: This is perhaps too generic since the context probably has the details this function computes.
927  GUID_t repoId, const TopicDetails& td, bool remove)
928 {
929  if (DCPS_debug_level >= 4) {
930  ACE_DEBUG((LM_DEBUG, "(%P|%t) StaticEndpointManager::match_endpoints %C%C\n",
931  remove ? "remove " : "", LogGuid(repoId).c_str()));
932  }
933 
934  const bool reader = GuidConverter(repoId).isReader();
935  // Copy the endpoint set - lock can be released in match()
936  RepoIdSet local_endpoints;
937  RepoIdSet discovered_endpoints;
938  if (reader) {
939  local_endpoints = td.local_publications();
940  discovered_endpoints = td.discovered_publications();
941  } else {
942  local_endpoints = td.local_subscriptions();
943  discovered_endpoints = td.discovered_subscriptions();
944  }
945 
946  const bool is_remote = !equal_guid_prefixes(repoId, participant_id_);
947  if (is_remote && local_endpoints.empty()) {
948  // Nothing to match.
949  return;
950  }
951 
952  for (RepoIdSet::const_iterator iter = local_endpoints.begin();
953  iter != local_endpoints.end(); ++iter) {
954  // check to make sure it's a Reader/Writer or Writer/Reader match
955  if (GuidConverter(*iter).isReader() != reader) {
956  if (remove) {
957  remove_assoc(*iter, repoId);
958  } else {
959  match(reader ? *iter : repoId, reader ? repoId : *iter);
960  }
961  }
962  }
963 
964  // Remote/remote matches are a waste of time
965  if (is_remote) {
966  return;
967  }
968 
969  for (RepoIdSet::const_iterator iter = discovered_endpoints.begin();
970  iter != discovered_endpoints.end(); ++iter) {
971  // check to make sure it's a Reader/Writer or Writer/Reader match
972  if (GuidConverter(*iter).isReader() != reader) {
973  if (remove) {
974  remove_assoc(*iter, repoId);
975  } else {
976  match(reader ? *iter : repoId, reader ? repoId : *iter);
977  }
978  }
979  }
980 }
981 
982 void StaticEndpointManager::remove_assoc(const GUID_t& remove_from, const GUID_t& removing)
983 {
984  if (GuidConverter(remove_from).isReader()) {
985  const LocalSubscriptionIter lsi = local_subscriptions_.find(remove_from);
986  if (lsi != local_subscriptions_.end()) {
987  lsi->second.matched_endpoints_.erase(removing);
988  const DiscoveredPublicationIter dpi = discovered_publications_.find(removing);
989  if (dpi != discovered_publications_.end()) {
990  dpi->second.matched_endpoints_.erase(remove_from);
991  }
992  WriterIdSeq writer_seq(1);
993  writer_seq.length(1);
994  writer_seq[0] = removing;
995  const size_t count = lsi->second.remote_expectant_opendds_associations_.erase(removing);
996  DataReaderCallbacks_rch drr = lsi->second.subscription_.lock();
997  if (drr) {
998  drr->remove_associations(writer_seq, false /*notify_lost*/);
999  }
1000  remove_assoc_i(remove_from, lsi->second, removing);
1001  // Update writer
1002  if (count) {
1003  write_subscription_data(remove_from, lsi->second);
1004  }
1005  }
1006 
1007  } else {
1008  const LocalPublicationIter lpi = local_publications_.find(remove_from);
1009  if (lpi != local_publications_.end()) {
1010  lpi->second.matched_endpoints_.erase(removing);
1011  const DiscoveredSubscriptionIter dsi = discovered_subscriptions_.find(removing);
1012  if (dsi != discovered_subscriptions_.end()) {
1013  dsi->second.matched_endpoints_.erase(remove_from);
1014  }
1015  ReaderIdSeq reader_seq(1);
1016  reader_seq.length(1);
1017  reader_seq[0] = removing;
1018  lpi->second.remote_expectant_opendds_associations_.erase(removing);
1019  DataWriterCallbacks_rch dwr = lpi->second.publication_.lock();
1020  if (dwr) {
1021  dwr->remove_associations(reader_seq, false /*notify_lost*/);
1022  }
1023  remove_assoc_i(remove_from, lpi->second, removing);
1024  }
1025  }
1026 }
1027 
1028 void StaticEndpointManager::match(const GUID_t& writer, const GUID_t& reader)
1029 {
1030  if (DCPS_debug_level >= 4) {
1031  ACE_DEBUG((LM_DEBUG, "(%P|%t) StaticEndpointManager::match: w: %C r: %C\n",
1032  LogGuid(writer).c_str(), LogGuid(reader).c_str()));
1033  }
1034 
1035  match_continue(writer, reader);
1036 }
1037 
1039  const MonotonicTimePoint& /*now*/)
1040 {
1043 
1044  // Clean up internal data used by getTypeDependencies
1045  for (OrigSeqNumberMap::iterator it = orig_seq_numbers_.begin(); it != orig_seq_numbers_.end();) {
1046  if (now - it->second.time_started >= max_type_lookup_service_reply_period_) {
1047  if (DCPS_debug_level >= 4) {
1048  ACE_DEBUG((LM_DEBUG, "(%P|%t) StaticEndpointManager::remove_expired_endpoints: "
1049  "clean up type lookup data for %C\n",
1050  LogGuid(it->second.participant).c_str()));
1051  }
1052  cleanup_type_lookup_data(it->second.participant, it->second.type_id, it->second.secure);
1053  orig_seq_numbers_.erase(it++);
1054  } else {
1055  ++it;
1056  }
1057  }
1058 }
1059 
1060 void StaticEndpointManager::match_continue(const GUID_t& writer, const GUID_t& reader)
1061 {
1062  if (DCPS_debug_level >= 4) {
1063  ACE_DEBUG((LM_DEBUG, "(%P|%t) StaticEndpointManager::match_continue: w: %C r: %C\n",
1064  LogGuid(writer).c_str(), LogGuid(reader).c_str()));
1065  }
1066 
1067  // 0. For discovered endpoints, we'll have the QoS info in the form of the
1068  // publication or subscription BIT data which doesn't use the same structures
1069  // for QoS. In those cases we can copy the individual QoS policies to temp
1070  // QoS structs:
1071  DDS::DataWriterQos tempDwQos;
1072  DDS::PublisherQos tempPubQos;
1073  DDS::DataReaderQos tempDrQos;
1074  DDS::SubscriberQos tempSubQos;
1075  ContentFilterProperty_t tempCfp;
1076 
1079  if (dpi != discovered_publications_.end() && dsi != discovered_subscriptions_.end()) {
1080  // This is a discovered/discovered match, nothing for us to do
1081  return;
1082  }
1083 
1084  // 1. Collect details about the writer, which may be local or discovered
1085  const DDS::DataWriterQos* dwQos = 0;
1086  const DDS::PublisherQos* pubQos = 0;
1087  TransportLocatorSeq* wTls = 0;
1088  ACE_CDR::ULong wTransportContext = 0;
1089  XTypes::TypeInformation* writer_type_info = 0;
1090  OPENDDS_STRING topic_name;
1091  MonotonicTime_t writer_participant_discovered_at;
1092 
1093  const LocalPublicationIter lpi = local_publications_.find(writer);
1094  bool writer_local = false, already_matched = false;
1095  if (lpi != local_publications_.end()) {
1096  writer_local = true;
1097  dwQos = &lpi->second.qos_;
1098  pubQos = &lpi->second.publisher_qos_;
1099  wTls = &lpi->second.trans_info_;
1100  wTransportContext = lpi->second.transport_context_;
1101  already_matched = lpi->second.matched_endpoints_.count(reader);
1102  writer_type_info = &lpi->second.type_info_;
1103  topic_name = topic_names_[lpi->second.topic_id_];
1104  writer_participant_discovered_at = lpi->second.participant_discovered_at_;
1105  } else if (dpi != discovered_publications_.end()) {
1106  wTls = &dpi->second.writer_data_.writerProxy.allLocators;
1107  wTransportContext = dpi->second.transport_context_;
1108  writer_type_info = &dpi->second.type_info_;
1109  topic_name = dpi->second.get_topic_name();
1110  writer_participant_discovered_at = dpi->second.participant_discovered_at_;
1111 
1113  dpi->second.writer_data_.ddsPublicationData;
1114  tempDwQos.durability = bit.durability;
1115  tempDwQos.durability_service = bit.durability_service;
1116  tempDwQos.deadline = bit.deadline;
1117  tempDwQos.latency_budget = bit.latency_budget;
1118  tempDwQos.liveliness = bit.liveliness;
1119  tempDwQos.reliability = bit.reliability;
1120  tempDwQos.destination_order = bit.destination_order;
1121  tempDwQos.history = TheServiceParticipant->initial_HistoryQosPolicy();
1122  tempDwQos.resource_limits =
1123  TheServiceParticipant->initial_ResourceLimitsQosPolicy();
1124  tempDwQos.transport_priority =
1125  TheServiceParticipant->initial_TransportPriorityQosPolicy();
1126  tempDwQos.lifespan = bit.lifespan;
1127  tempDwQos.user_data = bit.user_data;
1128  tempDwQos.ownership = bit.ownership;
1129  tempDwQos.ownership_strength = bit.ownership_strength;
1130  tempDwQos.writer_data_lifecycle =
1131  TheServiceParticipant->initial_WriterDataLifecycleQosPolicy();
1132  tempDwQos.representation = bit.representation;
1133  dwQos = &tempDwQos;
1134 
1135  tempPubQos.presentation = bit.presentation;
1136  tempPubQos.partition = bit.partition;
1137  tempPubQos.group_data = bit.group_data;
1138  tempPubQos.entity_factory =
1139  TheServiceParticipant->initial_EntityFactoryQosPolicy();
1140  pubQos = &tempPubQos;
1141 
1142  populate_transport_locator_sequence(wTls, dpi, writer);
1143  } else {
1144  return; // Possible and ok, since lock is released
1145  }
1146 
1147  // 2. Collect details about the reader, which may be local or discovered
1148  const DDS::DataReaderQos* drQos = 0;
1149  const DDS::SubscriberQos* subQos = 0;
1150  TransportLocatorSeq* rTls = 0;
1151  ACE_CDR::ULong rTransportContext = 0;
1152  const ContentFilterProperty_t* cfProp = 0;
1153  XTypes::TypeInformation* reader_type_info = 0;
1154  MonotonicTime_t reader_participant_discovered_at;
1155 
1156  const LocalSubscriptionIter lsi = local_subscriptions_.find(reader);
1157  bool reader_local = false;
1158  if (lsi != local_subscriptions_.end()) {
1159  reader_local = true;
1160  drQos = &lsi->second.qos_;
1161  subQos = &lsi->second.subscriber_qos_;
1162  rTls = &lsi->second.trans_info_;
1163  rTransportContext = lsi->second.transport_context_;
1164  reader_type_info = &lsi->second.type_info_;
1165  if (lsi->second.filterProperties.filterExpression[0] != 0) {
1166  tempCfp.filterExpression = lsi->second.filterProperties.filterExpression;
1167  tempCfp.expressionParameters = lsi->second.filterProperties.expressionParameters;
1168  }
1169  cfProp = &tempCfp;
1170  if (!already_matched) {
1171  already_matched = lsi->second.matched_endpoints_.count(writer);
1172  }
1173  reader_participant_discovered_at = lsi->second.participant_discovered_at_;
1174  } else if (dsi != discovered_subscriptions_.end()) {
1175  rTls = &dsi->second.reader_data_.readerProxy.allLocators;
1176 
1177  populate_transport_locator_sequence(rTls, dsi, reader);
1178  rTransportContext = dsi->second.transport_context_;
1179 
1181  dsi->second.reader_data_.ddsSubscriptionData;
1182  tempDrQos.durability = bit.durability;
1183  tempDrQos.deadline = bit.deadline;
1184  tempDrQos.latency_budget = bit.latency_budget;
1185  tempDrQos.liveliness = bit.liveliness;
1186  tempDrQos.reliability = bit.reliability;
1187  tempDrQos.destination_order = bit.destination_order;
1188  tempDrQos.history = TheServiceParticipant->initial_HistoryQosPolicy();
1189  tempDrQos.resource_limits =
1190  TheServiceParticipant->initial_ResourceLimitsQosPolicy();
1191  tempDrQos.user_data = bit.user_data;
1192  tempDrQos.ownership = bit.ownership;
1193  tempDrQos.time_based_filter = bit.time_based_filter;
1194  tempDrQos.reader_data_lifecycle =
1195  TheServiceParticipant->initial_ReaderDataLifecycleQosPolicy();
1196  tempDrQos.representation = bit.representation;
1197  tempDrQos.type_consistency = bit.type_consistency;
1198  drQos = &tempDrQos;
1199 
1200  tempSubQos.presentation = bit.presentation;
1201  tempSubQos.partition = bit.partition;
1202  tempSubQos.group_data = bit.group_data;
1203  tempSubQos.entity_factory =
1204  TheServiceParticipant->initial_EntityFactoryQosPolicy();
1205  subQos = &tempSubQos;
1206 
1207  cfProp = &dsi->second.reader_data_.contentFilterProperty;
1208  reader_type_info = &dsi->second.type_info_;
1209  reader_participant_discovered_at = dsi->second.participant_discovered_at_;
1210  } else {
1211  return; // Possible and ok, since lock is released
1212  }
1213 
1214  // 3. Perform type consistency check (XTypes 1.3, Section 7.6.3.4.2)
1215  bool consistent = false;
1216 
1217  TopicDetailsMap::iterator td_iter = topics_.find(topic_name);
1218  if (td_iter == topics_.end()) {
1220  ACE_TEXT("(%P|%t) StaticEndpointManager::match_continue - ERROR ")
1221  ACE_TEXT("Didn't find topic for consistency check\n")));
1222  return;
1223  } else {
1224  const XTypes::TypeIdentifier& writer_type_id = writer_type_info->minimal.typeid_with_size.type_id;
1225  const XTypes::TypeIdentifier& reader_type_id = reader_type_info->minimal.typeid_with_size.type_id;
1226  if (writer_type_id.kind() != XTypes::TK_NONE && reader_type_id.kind() != XTypes::TK_NONE) {
1227  if (!writer_local || !reader_local) {
1228  Encoding::Kind encoding_kind;
1229  if (tempDwQos.representation.value.length() > 0 &&
1230  repr_to_encoding_kind(tempDwQos.representation.value[0], encoding_kind) &&
1231  encoding_kind == Encoding::KIND_XCDR1) {
1232  const XTypes::TypeFlag extensibility_mask = XTypes::IS_APPENDABLE;
1233  if (type_lookup_service_->extensibility(extensibility_mask, writer_type_id)) {
1234  if (DCPS_debug_level) {
1235  ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: ")
1236  ACE_TEXT("StaticEndpointManager::match_continue: ")
1237  ACE_TEXT("Encountered unsupported combination of XCDR1 encoding and appendable extensibility\n")));
1238  }
1239  }
1240  }
1241  }
1242 
1243  XTypes::TypeConsistencyAttributes type_consistency;
1246  type_consistency.ignore_member_names = drQos->type_consistency.ignore_member_names;
1248  XTypes::TypeAssignability ta(type_lookup_service_, type_consistency);
1249 
1251  consistent = ta.assignable(reader_type_id, writer_type_id);
1252  } else {
1253  // The two types must be equivalent for DISALLOW_TYPE_COERCION
1254  consistent = reader_type_id == writer_type_id;
1255  }
1256  } else {
1258  // Cannot do type validation since not both TypeObjects are available
1259  consistent = false;
1260  } else {
1261  // Fall back to matching type names
1262  OPENDDS_STRING writer_type_name;
1263  OPENDDS_STRING reader_type_name;
1264  if (writer_local) {
1265  writer_type_name = td_iter->second.local_data_type_name();
1266  } else {
1267  writer_type_name = dpi->second.get_type_name();
1268  }
1269  if (reader_local) {
1270  reader_type_name = td_iter->second.local_data_type_name();
1271  } else {
1272  reader_type_name = dsi->second.get_type_name();
1273  }
1274  consistent = writer_type_name == reader_type_name;
1275  }
1276  }
1277 
1278  if (!consistent) {
1279  td_iter->second.increment_inconsistent();
1280  if (DCPS::DCPS_debug_level) {
1282  ACE_TEXT("(%P|%t) StaticEndpointManager::match_continue - WARNING ")
1283  ACE_TEXT("Data types of topic %C does not match (inconsistent)\n"),
1284  topic_name.c_str()));
1285  }
1286  return;
1287  }
1288  }
1289 
1290  // Need to release lock, below, for callbacks into DCPS which could
1291  // call into Spdp/Sedp. Note that this doesn't unlock, it just constructs
1292  // an ACE object which will be used below for unlocking.
1294 
1295  // 4. Check transport and QoS compatibility
1296 
1297  // Copy entries from local publication and local subscription maps
1298  // prior to releasing lock
1301  if (writer_local) {
1302  dwr = lpi->second.publication_;
1303  OPENDDS_ASSERT(lpi->second.publication_);
1304  OPENDDS_ASSERT(dwr);
1305  }
1306  if (reader_local) {
1307  drr = lsi->second.subscription_;
1308  OPENDDS_ASSERT(lsi->second.subscription_);
1309  OPENDDS_ASSERT(drr);
1310  }
1311 
1312  IncompatibleQosStatus writerStatus = {0, 0, 0, DDS::QosPolicyCountSeq()};
1313  IncompatibleQosStatus readerStatus = {0, 0, 0, DDS::QosPolicyCountSeq()};
1314 
1315  if (compatibleQOS(&writerStatus, &readerStatus, *wTls, *rTls,
1316  dwQos, drQos, pubQos, subQos)) {
1317 
1318  bool call_writer = false, call_reader = false;
1319 
1320  if (writer_local) {
1321  call_writer = lpi->second.matched_endpoints_.insert(reader).second;
1322  dwr = lpi->second.publication_;
1323  if (!reader_local) {
1324  dsi->second.matched_endpoints_.insert(writer);
1325  }
1326  }
1327  if (reader_local) {
1328  call_reader = lsi->second.matched_endpoints_.insert(writer).second;
1329  drr = lsi->second.subscription_;
1330  if (!writer_local) {
1331  dpi->second.matched_endpoints_.insert(reader);
1332  }
1333  }
1334 
1335  if (writer_local && !reader_local) {
1336  add_assoc_i(writer, lpi->second, reader, dsi->second);
1337  }
1338  if (reader_local && !writer_local) {
1339  add_assoc_i(reader, lsi->second, writer, dpi->second);
1340  }
1341 
1342  if (!call_writer && !call_reader) {
1343  return; // nothing more to do
1344  }
1345 
1346  // Copy reader and writer association data prior to releasing lock
1347  DDS::OctetSeq octet_seq_type_info_reader;
1348  XTypes::serialize_type_info(*reader_type_info, octet_seq_type_info_reader);
1349  const ReaderAssociation ra = {
1350  *rTls, TransportLocator(), rTransportContext, reader, *subQos, *drQos,
1351 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
1352  cfProp->filterClassName, cfProp->filterExpression,
1353 #else
1354  "", "",
1355 #endif
1356  cfProp->expressionParameters,
1357  octet_seq_type_info_reader,
1358  reader_participant_discovered_at
1359  };
1360 
1361  DDS::OctetSeq octet_seq_type_info_writer;
1362  XTypes::serialize_type_info(*writer_type_info, octet_seq_type_info_writer);
1363  const WriterAssociation wa = {
1364  *wTls, TransportLocator(), wTransportContext, writer, *pubQos, *dwQos,
1365  octet_seq_type_info_writer,
1366  writer_participant_discovered_at
1367  };
1368 
1370  static const bool writer_active = true;
1371 
1372  if (call_writer) {
1373  if (DCPS_debug_level > 3) {
1374  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) StaticEndpointManager::match_continue - ")
1375  ACE_TEXT("adding writer %C association for reader %C\n"), LogGuid(writer).c_str(),
1376  LogGuid(reader).c_str()));
1377  }
1378  DataWriterCallbacks_rch dwr_lock = dwr.lock();
1379  if (dwr_lock) {
1380  if (call_reader) {
1381  DataReaderCallbacks_rch drr_lock = drr.lock();
1382  if (drr_lock) {
1383  DcpsUpcalls thr(drr_lock, reader, wa, !writer_active, dwr_lock);
1384  thr.activate();
1385  dwr_lock->add_association(writer, ra, writer_active);
1386  thr.writer_done();
1387  }
1388  } else {
1389  dwr_lock->add_association(writer, ra, writer_active);
1390  }
1391  }
1392  } else if (call_reader) {
1393  if (DCPS_debug_level > 3) {
1394  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) StaticEndpointManager::match_continue - ")
1395  ACE_TEXT("adding reader %C association for writer %C\n"),
1396  LogGuid(reader).c_str(), LogGuid(writer).c_str()));
1397  }
1398  DataReaderCallbacks_rch drr_lock = drr.lock();
1399  if (drr_lock) {
1400  drr_lock->add_association(reader, wa, !writer_active);
1401  }
1402  }
1403 
1404  } else if (already_matched) { // break an existing associtaion
1405  if (writer_local) {
1406  lpi->second.matched_endpoints_.erase(reader);
1407  lpi->second.remote_expectant_opendds_associations_.erase(reader);
1408  if (dsi != discovered_subscriptions_.end()) {
1409  dsi->second.matched_endpoints_.erase(writer);
1410  }
1411  }
1412  if (reader_local) {
1413  lsi->second.matched_endpoints_.erase(writer);
1414  lsi->second.remote_expectant_opendds_associations_.erase(writer);
1415  if (dpi != discovered_publications_.end()) {
1416  dpi->second.matched_endpoints_.erase(reader);
1417  }
1418  }
1419  if (writer_local && !reader_local) {
1420  remove_assoc_i(writer, lpi->second, reader);
1421  }
1422  if (reader_local && !writer_local) {
1423  remove_assoc_i(reader, lsi->second, writer);
1424  }
1426  if (writer_local) {
1427  ReaderIdSeq reader_seq(1);
1428  reader_seq.length(1);
1429  reader_seq[0] = reader;
1430  DataWriterCallbacks_rch dwr_lock = dwr.lock();
1431  if (dwr_lock) {
1432  dwr_lock->remove_associations(reader_seq, false /*notify_lost*/);
1433  }
1434  }
1435  if (reader_local) {
1436  WriterIdSeq writer_seq(1);
1437  writer_seq.length(1);
1438  writer_seq[0] = writer;
1439  DataReaderCallbacks_rch drr_lock = drr.lock();
1440  if (drr_lock) {
1441  drr_lock->remove_associations(writer_seq, false /*notify_lost*/);
1442  }
1443  }
1444  } else { // something was incompatible
1446  if (writer_local && writerStatus.count_since_last_send) {
1447  if (DCPS_debug_level > 3) {
1448  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) StaticEndpointManager::match - ")
1449  ACE_TEXT("writer incompatible\n")));
1450  }
1451  DataWriterCallbacks_rch dwr_lock = dwr.lock();
1452  if (dwr_lock) {
1453  dwr_lock->update_incompatible_qos(writerStatus);
1454  }
1455  }
1456  if (reader_local && readerStatus.count_since_last_send) {
1457  if (DCPS_debug_level > 3) {
1458  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) StaticEndpointManager::match - ")
1459  ACE_TEXT("reader incompatible\n")));
1460  }
1461  DataReaderCallbacks_rch drr_lock = drr.lock();
1462  if (drr_lock) {
1463  drr_lock->update_incompatible_qos(readerStatus);
1464  }
1465  }
1466  }
1467 }
1468 
1470 {
1471  EntityId_t entity_id;
1472  assign(entity_id.entityKey, topic_counter_);
1473  ++topic_counter_;
1475 
1476  if (topic_counter_ == 0x1000000) {
1478  ACE_TEXT("(%P|%t) ERROR: StaticEndpointManager::make_topic_guid: ")
1479  ACE_TEXT("Exceeded Maximum number of topic entity keys!")
1480  ACE_TEXT("Next key will be a duplicate!\n")));
1481  topic_counter_ = 0;
1482  }
1483 
1484  return make_id(participant_id_, entity_id);
1485 }
1486 
1488 {
1490  TNMap::const_iterator tn = topic_names_.find(topicId);
1491  if (tn == topic_names_.end()) return false;
1492 
1493  TopicDetailsMap::const_iterator td = topics_.find(tn->second);
1494  if (td == topics_.end()) return false;
1495 
1496  return td->second.has_dcps_key();
1497 }
1498 
1500  : Discovery(key)
1501 {}
1502 
1503 namespace {
1504  unsigned char hextobyte(unsigned char c)
1505  {
1506  if (c >= '0' && c <= '9') {
1507  return c - '0';
1508  }
1509  if (c >= 'a' && c <= 'f') {
1510  return 10 + c - 'a';
1511  }
1512  if (c >= 'A' && c <= 'F') {
1513  return 10 + c - 'A';
1514  }
1515  return c;
1516  }
1517 
1518  unsigned char
1519  fromhex(const OPENDDS_STRING& x, size_t idx)
1520  {
1521  return (hextobyte(x[idx * 2]) << 4) | (hextobyte(x[idx * 2 + 1]));
1522  }
1523 }
1524 
1525 EntityId_t
1526 EndpointRegistry::build_id(const unsigned char* entity_key,
1527  const unsigned char entity_kind)
1528 {
1529  EntityId_t retval;
1530  retval.entityKey[0] = entity_key[0];
1531  retval.entityKey[1] = entity_key[1];
1532  retval.entityKey[2] = entity_key[2];
1533  retval.entityKind = entity_kind;
1534  return retval;
1535 }
1536 
1537 GUID_t
1539  const unsigned char* participant_id,
1540  const EntityId_t& entity_id)
1541 {
1542  GUID_t id;
1543  id.guidPrefix[0] = VENDORID_OCI[0];
1544  id.guidPrefix[1] = VENDORID_OCI[1];
1545  // id.guidPrefix[2] = domain[0]
1546  // id.guidPrefix[3] = domain[1]
1547  // id.guidPrefix[4] = domain[2]
1548  // id.guidPrefix[5] = domain[3]
1549  DDS::DomainId_t netdom = ACE_HTONL(domain);
1550  ACE_OS::memcpy(&id.guidPrefix[2], &netdom, sizeof(DDS::DomainId_t));
1551  // id.guidPrefix[6] = participant[0]
1552  // id.guidPrefix[7] = participant[1]
1553  // id.guidPrefix[8] = participant[2]
1554  // id.guidPrefix[9] = participant[3]
1555  // id.guidPrefix[10] = participant[4]
1556  // id.guidPrefix[11] = participant[5]
1557  ACE_OS::memcpy(&id.guidPrefix[6], participant_id, 6);
1558  id.entityId = entity_id;
1559  return id;
1560 }
1561 
1564 {
1565  return GUID_UNKNOWN;
1566 }
1567 
1570  const DDS::DomainParticipantQos& qos,
1572 {
1573  AddDomainStatus ads = {GUID_t(), false /*federated*/};
1574 
1575  if (qos.user_data.value.length() != BYTES_IN_PARTICIPANT) {
1577  ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::add_domain_participant ")
1578  ACE_TEXT("No userdata to identify participant\n")));
1579  return ads;
1580  }
1581 
1582  GUID_t id = EndpointRegistry::build_id(domain,
1583  qos.user_data.value.get_buffer(),
1585  if (!get_part(domain, id).is_nil()) {
1587  ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::add_domain_participant ")
1588  ACE_TEXT("Duplicate participant\n")));
1589  return ads;
1590  }
1591 
1592  const RcHandle<StaticParticipant> participant (make_rch<StaticParticipant>(ref(id), qos, registry));
1593 
1594  {
1596  participants_[domain][id] = participant;
1597  }
1598 
1599  participant->type_lookup_service(tls);
1600 
1601  ads.id = id;
1602  return ads;
1603 }
1604 
1605 #if defined(OPENDDS_SECURITY)
1608  DDS::DomainId_t /*domain*/,
1609  const DDS::DomainParticipantQos& /*qos*/,
1611  const OpenDDS::DCPS::GUID_t& /*guid*/,
1615 {
1616  const DCPS::AddDomainStatus ads = {OpenDDS::DCPS::GUID_UNKNOWN, false /*federated*/};
1618  ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::add_domain_participant_secure ")
1619  ACE_TEXT("Security not supported for static discovery.\n")));
1620  return ads;
1621 }
1622 #endif
1623 
1624 namespace {
1625  const ACE_TCHAR TOPIC_SECTION_NAME[] = ACE_TEXT("topic");
1626  const ACE_TCHAR DATAWRITERQOS_SECTION_NAME[] = ACE_TEXT("datawriterqos");
1627  const ACE_TCHAR DATAREADERQOS_SECTION_NAME[] = ACE_TEXT("datareaderqos");
1628  const ACE_TCHAR PUBLISHERQOS_SECTION_NAME[] = ACE_TEXT("publisherqos");
1629  const ACE_TCHAR SUBSCRIBERQOS_SECTION_NAME[] = ACE_TEXT("subscriberqos");
1630  const ACE_TCHAR ENDPOINT_SECTION_NAME[] = ACE_TEXT("endpoint");
1631 
1632  void parse_second(CORBA::Long& x, const OPENDDS_STRING& value)
1633  {
1634  if (value == "DURATION_INFINITE_SEC") {
1636  } else {
1637  x = atoi(value.c_str());
1638  }
1639  }
1640 
1641  void parse_nanosecond(CORBA::ULong& x, const OPENDDS_STRING& value)
1642  {
1643  if (value == "DURATION_INFINITE_NANOSEC") {
1645  } else {
1646  x = atoi(value.c_str());
1647  }
1648  }
1649 
1650  bool parse_bool(CORBA::Boolean& x, const OPENDDS_STRING& value)
1651  {
1652  if (value == "true") {
1653  x = true;
1654  return true;
1655  } else if (value == "false") {
1656  x = false;
1657  return true;
1658  }
1659  return false;
1660  }
1661 
1662  void parse_list(DDS::PartitionQosPolicy& x, const OPENDDS_STRING& value)
1663  {
1664  // Value can be a comma-separated list
1665  const char* start = value.c_str();
1666  while (const char* next_comma = std::strchr(start, ',')) {
1667  const size_t size = next_comma - start;
1668  const OPENDDS_STRING temp(start, size);
1669  // Add to QOS
1670  x.name.length(x.name.length() + 1);
1671  x.name[x.name.length() - 1] = temp.c_str();
1672  // Advance pointer
1673  start = next_comma + 1;
1674  }
1675  // Append everything after last comma
1676  x.name.length(x.name.length() + 1);
1677  x.name[x.name.length() - 1] = start;
1678  }
1679 }
1680 
1681 int
1683 {
1684  if (parse_topics(cf) ||
1685  parse_datawriterqos(cf) ||
1686  parse_datareaderqos(cf) ||
1687  parse_publisherqos(cf) ||
1688  parse_subscriberqos(cf) ||
1689  parse_endpoints(cf)) {
1690  return -1;
1691  }
1692 
1693  registry.match();
1694 
1695  return 0;
1696 }
1697 
1698 int
1700 {
1701  const ACE_Configuration_Section_Key& root = cf.root_section();
1703 
1704  if (cf.open_section(root, TOPIC_SECTION_NAME, false, section) != 0) {
1705  if (DCPS_debug_level > 0) {
1706  // This is not an error if the configuration file does not have
1707  // any topic (sub)section.
1709  ACE_TEXT("(%P|%t) NOTICE: StaticDiscovery::parse_topics ")
1710  ACE_TEXT("no [%s] sections.\n"),
1711  TOPIC_SECTION_NAME));
1712  }
1713  return 0;
1714  }
1715 
1716  // Ensure there are no key/values in the [topic] section.
1717  // Every key/value must be in a [topic/*] sub-section.
1718  ValueMap vm;
1719  if (pullValues(cf, section, vm) > 0) {
1721  ACE_TEXT("(%P|%t) StaticDiscovery::parse_topics ")
1722  ACE_TEXT("[topic] sections must have a subsection name\n")),
1723  -1);
1724  }
1725  // Process the subsections of this section
1726  KeyList keys;
1727  if (processSections(cf, section, keys) != 0) {
1729  ACE_TEXT("(%P|%t) StaticDiscovery::parse_topics ")
1730  ACE_TEXT("too many nesting layers in the [topic] section.\n")),
1731  -1);
1732  }
1733 
1734  // Loop through the [topic/*] sections
1735  for (KeyList::const_iterator it = keys.begin(); it != keys.end(); ++it) {
1736  OPENDDS_STRING topic_name = it->first;
1737 
1738  if (DCPS_debug_level > 0) {
1740  ACE_TEXT("(%P|%t) NOTICE: StaticDiscovery::parse_topics ")
1741  ACE_TEXT("processing [topic/%C] section.\n"),
1742  topic_name.c_str()));
1743  }
1744 
1745  ValueMap values;
1746  pullValues(cf, it->second, values);
1747 
1749  bool name_specified = false,
1750  type_name_specified = false;
1751 
1752  for (ValueMap::const_iterator it = values.begin(); it != values.end(); ++it) {
1753  OPENDDS_STRING name = it->first;
1754  OPENDDS_STRING value = it->second;
1755 
1756  if (name == "name") {
1757  topic.name = value;
1758  name_specified = true;
1759  } else if (name == "type_name") {
1760  if (value.size() >= TYPE_NAME_MAX) {
1762  ACE_TEXT("(%P|%t) StaticDiscovery::parse_topics ")
1763  ACE_TEXT("type_name (%C) must be less than 128 characters in [topic/%C] section.\n"),
1764  value.c_str(), topic_name.c_str()),
1765  -1);
1766  }
1767  topic.type_name = value;
1768  type_name_specified = true;
1769  } else {
1770  // Typos are ignored to avoid parsing FACE-specific keys.
1771  }
1772  }
1773 
1774  if (!name_specified) {
1775  topic.name = topic_name;
1776  }
1777 
1778  if (!type_name_specified) {
1780  ACE_TEXT("(%P|%t) StaticDiscovery::parse_topics ")
1781  ACE_TEXT("No type_name specified for [topic/%C] section.\n"),
1782  topic_name.c_str()),
1783  -1);
1784  }
1785 
1786  registry.topic_map[topic_name] = topic;
1787  }
1788 
1789  return 0;
1790 }
1791 
1792 int
1794 {
1795  const ACE_Configuration_Section_Key& root = cf.root_section();
1797 
1798  if (cf.open_section(root, DATAWRITERQOS_SECTION_NAME, false, section) != 0) {
1799  if (DCPS_debug_level > 0) {
1800  // This is not an error if the configuration file does not have
1801  // any datawriterqos (sub)section.
1803  ACE_TEXT("(%P|%t) NOTICE: StaticDiscovery::parse_datawriterqos ")
1804  ACE_TEXT("no [%s] sections.\n"),
1805  DATAWRITERQOS_SECTION_NAME));
1806  }
1807  return 0;
1808  }
1809 
1810  // Ensure there are no key/values in the [datawriterqos] section.
1811  // Every key/value must be in a [datawriterqos/*] sub-section.
1812  ValueMap vm;
1813  if (pullValues(cf, section, vm) > 0) {
1815  ACE_TEXT("(%P|%t) StaticDiscovery::parse_datawriterqos ")
1816  ACE_TEXT("[datawriterqos] sections must have a subsection name\n")),
1817  -1);
1818  }
1819  // Process the subsections of this section
1820  KeyList keys;
1821  if (processSections(cf, section, keys) != 0) {
1823  ACE_TEXT("(%P|%t) StaticDiscovery::parse_datawriterqos ")
1824  ACE_TEXT("too many nesting layers in the [datawriterqos] section.\n")),
1825  -1);
1826  }
1827 
1828  // Loop through the [datawriterqos/*] sections
1829  for (KeyList::const_iterator it = keys.begin(); it != keys.end(); ++it) {
1830  OPENDDS_STRING datawriterqos_name = it->first;
1831 
1832  if (DCPS_debug_level > 0) {
1834  ACE_TEXT("(%P|%t) NOTICE: StaticDiscovery::parse_datawriterqos ")
1835  ACE_TEXT("processing [datawriterqos/%C] section.\n"),
1836  datawriterqos_name.c_str()));
1837  }
1838 
1839  ValueMap values;
1840  pullValues(cf, it->second, values);
1841 
1842  DDS::DataWriterQos datawriterqos(TheServiceParticipant->initial_DataWriterQos());
1843 
1844  for (ValueMap::const_iterator it = values.begin(); it != values.end(); ++it) {
1845  OPENDDS_STRING name = it->first;
1846  OPENDDS_STRING value = it->second;
1847 
1848  if (name == "durability.kind") {
1849  if (value == "VOLATILE") {
1850  datawriterqos.durability.kind = DDS::VOLATILE_DURABILITY_QOS;
1851  } else if (value == "TRANSIENT_LOCAL") {
1852  datawriterqos.durability.kind = DDS::TRANSIENT_LOCAL_DURABILITY_QOS;
1853 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
1854  } else if (value == "TRANSIENT") {
1855  datawriterqos.durability.kind = DDS::TRANSIENT_DURABILITY_QOS;
1856  } else if (value == "PERSISTENT") {
1857  datawriterqos.durability.kind = DDS::PERSISTENT_DURABILITY_QOS;
1858 #endif
1859  } else {
1861  ACE_TEXT("(%P|%t) StaticDiscovery::parse_datawriterqos ")
1862  ACE_TEXT("Illegal value for durability.kind (%C) in [datawriterqos/%C] section.\n"),
1863  value.c_str(), datawriterqos_name.c_str()),
1864  -1);
1865  }
1866  } else if (name == "deadline.period.sec") {
1867  parse_second(datawriterqos.deadline.period.sec, value);
1868  } else if (name == "deadline.period.nanosec") {
1869  parse_nanosecond(datawriterqos.deadline.period.nanosec, value);
1870  } else if (name == "latency_budget.duration.sec") {
1871  parse_second(datawriterqos.latency_budget.duration.sec, value);
1872  } else if (name == "latency_budget.duration.nanosec") {
1873  parse_nanosecond(datawriterqos.latency_budget.duration.nanosec, value);
1874  } else if (name == "liveliness.kind") {
1875  if (value == "AUTOMATIC") {
1876  datawriterqos.liveliness.kind = DDS::AUTOMATIC_LIVELINESS_QOS;
1877  } else if (value == "MANUAL_BY_TOPIC") {
1878  datawriterqos.liveliness.kind = DDS::MANUAL_BY_TOPIC_LIVELINESS_QOS;
1879  } else if (value == "MANUAL_BY_PARTICIPANT") {
1880  datawriterqos.liveliness.kind = DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS;
1881  } else {
1883  ACE_TEXT("(%P|%t) StaticDiscovery::parse_datawriterqos ")
1884  ACE_TEXT("Illegal value for liveliness.kind (%C) in [datawriterqos/%C] section.\n"),
1885  value.c_str(), datawriterqos_name.c_str()),
1886  -1);
1887  }
1888  } else if (name == "liveliness.lease_duration.sec") {
1889  parse_second(datawriterqos.liveliness.lease_duration.sec, value);
1890  } else if (name == "liveliness.lease_duration.nanosec") {
1891  parse_nanosecond(datawriterqos.liveliness.lease_duration.nanosec, value);
1892  } else if (name == "reliability.kind") {
1893  if (value == "BEST_EFFORT") {
1894  datawriterqos.reliability.kind = DDS::BEST_EFFORT_RELIABILITY_QOS;
1895  } else if (value == "RELIABLE") {
1896  datawriterqos.reliability.kind = DDS::RELIABLE_RELIABILITY_QOS;
1897  } else {
1899  ACE_TEXT("(%P|%t) StaticDiscovery::parse_datawriterqos ")
1900  ACE_TEXT("Illegal value for reliability.kind (%C) in [datawriterqos/%C] section.\n"),
1901  value.c_str(), datawriterqos_name.c_str()),
1902  -1);
1903  }
1904  } else if (name == "reliability.max_blocking_time.sec") {
1905  parse_second(datawriterqos.reliability.max_blocking_time.sec, value);
1906  } else if (name == "reliability.max_blocking_time.nanosec") {
1907  parse_nanosecond(datawriterqos.reliability.max_blocking_time.nanosec, value);
1908  } else if (name == "destination_order.kind") {
1909  if (value == "BY_RECEPTION_TIMESTAMP") {
1910  datawriterqos.destination_order.kind = DDS::BY_RECEPTION_TIMESTAMP_DESTINATIONORDER_QOS;
1911  } else if (value == "BY_SOURCE_TIMESTAMP") {
1912  datawriterqos.destination_order.kind = DDS::BY_SOURCE_TIMESTAMP_DESTINATIONORDER_QOS;
1913  } else {
1915  ACE_TEXT("(%P|%t) StaticDiscovery::parse_datawriterqos ")
1916  ACE_TEXT("Illegal value for destination_order.kind (%C) in [datawriterqos/%C] section.\n"),
1917  value.c_str(), datawriterqos_name.c_str()),
1918  -1);
1919  }
1920  } else if (name == "history.kind") {
1921  if (value == "KEEP_ALL") {
1922  datawriterqos.history.kind = DDS::KEEP_ALL_HISTORY_QOS;
1923  } else if (value == "KEEP_LAST") {
1924  datawriterqos.history.kind = DDS::KEEP_LAST_HISTORY_QOS;
1925  } else {
1927  ACE_TEXT("(%P|%t) StaticDiscovery::parse_datawriterqos ")
1928  ACE_TEXT("Illegal value for history.kind (%C) in [datawriterqos/%C] section.\n"),
1929  value.c_str(), datawriterqos_name.c_str()),
1930  -1);
1931  }
1932  } else if (name == "history.depth") {
1933  datawriterqos.history.depth = atoi(value.c_str());
1934  } else if (name == "resource_limits.max_samples") {
1935  datawriterqos.resource_limits.max_samples = atoi(value.c_str());
1936  } else if (name == "resource_limits.max_instances") {
1937  datawriterqos.resource_limits.max_instances = atoi(value.c_str());
1938  } else if (name == "resource_limits.max_samples_per_instance") {
1939  datawriterqos.resource_limits.max_samples_per_instance = atoi(value.c_str());
1940  } else if (name == "transport_priority.value") {
1941  datawriterqos.transport_priority.value = atoi(value.c_str());
1942  } else if (name == "lifespan.duration.sec") {
1943  parse_second(datawriterqos.lifespan.duration.sec, value);
1944  } else if (name == "lifespan.duration.nanosec") {
1945  parse_nanosecond(datawriterqos.lifespan.duration.nanosec, value);
1946  } else if (name == "ownership.kind") {
1947  if (value == "SHARED") {
1948  datawriterqos.ownership.kind = DDS::SHARED_OWNERSHIP_QOS;
1949  } else if (value == "EXCLUSIVE") {
1950  datawriterqos.ownership.kind = DDS::EXCLUSIVE_OWNERSHIP_QOS;
1951  } else {
1953  ACE_TEXT("(%P|%t) StaticDiscovery::parse_datawriterqos ")
1954  ACE_TEXT("Illegal value for ownership.kind (%C) in [datawriterqos/%C] section.\n"),
1955  value.c_str(), datawriterqos_name.c_str()),
1956  -1);
1957  }
1958  } else if (name == "ownership_strength.value") {
1959  datawriterqos.ownership_strength.value = atoi(value.c_str());
1960  } else {
1962  ACE_TEXT("(%P|%t) StaticDiscovery::parse_datawriterqos ")
1963  ACE_TEXT("Unexpected entry (%C) in [datawriterqos/%C] section.\n"),
1964  name.c_str(), datawriterqos_name.c_str()),
1965  -1);
1966  }
1967  }
1968 
1969  registry.datawriterqos_map[datawriterqos_name] = datawriterqos;
1970  }
1971 
1972  return 0;
1973 }
1974 
1975 int
1977 {
1978  const ACE_Configuration_Section_Key& root = cf.root_section();
1980 
1981  if (cf.open_section(root, DATAREADERQOS_SECTION_NAME, false, section) != 0) {
1982  if (DCPS_debug_level > 0) {
1983  // This is not an error if the configuration file does not have
1984  // any datareaderqos (sub)section.
1986  ACE_TEXT("(%P|%t) NOTICE: StaticDiscovery::parse_datareaderqos ")
1987  ACE_TEXT("no [%s] sections.\n"),
1988  DATAREADERQOS_SECTION_NAME));
1989  }
1990  return 0;
1991  }
1992 
1993  // Ensure there are no key/values in the [datareaderqos] section.
1994  // Every key/value must be in a [datareaderqos/*] sub-section.
1995  ValueMap vm;
1996  if (pullValues(cf, section, vm) > 0) {
1998  ACE_TEXT("(%P|%t) StaticDiscovery::parse_datareaderqos ")
1999  ACE_TEXT("[datareaderqos] sections must have a subsection name\n")),
2000  -1);
2001  }
2002  // Process the subsections of this section
2003  KeyList keys;
2004  if (processSections(cf, section, keys) != 0) {
2006  ACE_TEXT("(%P|%t) StaticDiscovery::parse_datareaderqos ")
2007  ACE_TEXT("too many nesting layers in the [datareaderqos] section.\n")),
2008  -1);
2009  }
2010 
2011  // Loop through the [datareaderqos/*] sections
2012  for (KeyList::const_iterator it = keys.begin(); it != keys.end(); ++it) {
2013  OPENDDS_STRING datareaderqos_name = it->first;
2014 
2015  if (DCPS_debug_level > 0) {
2017  ACE_TEXT("(%P|%t) NOTICE: StaticDiscovery::parse_datareaderqos ")
2018  ACE_TEXT("processing [datareaderqos/%C] section.\n"),
2019  datareaderqos_name.c_str()));
2020  }
2021 
2022  ValueMap values;
2023  pullValues(cf, it->second, values);
2024 
2025  DDS::DataReaderQos datareaderqos(TheServiceParticipant->initial_DataReaderQos());
2026 
2027  for (ValueMap::const_iterator it = values.begin(); it != values.end(); ++it) {
2028  OPENDDS_STRING name = it->first;
2029  OPENDDS_STRING value = it->second;
2030 
2031  if (name == "durability.kind") {
2032  if (value == "VOLATILE") {
2033  datareaderqos.durability.kind = DDS::VOLATILE_DURABILITY_QOS;
2034  } else if (value == "TRANSIENT_LOCAL") {
2035  datareaderqos.durability.kind = DDS::TRANSIENT_LOCAL_DURABILITY_QOS;
2036 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
2037  } else if (value == "TRANSIENT") {
2038  datareaderqos.durability.kind = DDS::TRANSIENT_DURABILITY_QOS;
2039  } else if (value == "PERSISTENT") {
2040  datareaderqos.durability.kind = DDS::PERSISTENT_DURABILITY_QOS;
2041 #endif
2042  } else {
2044  ACE_TEXT("(%P|%t) StaticDiscovery::parse_datareaderqos ")
2045  ACE_TEXT("Illegal value for durability.kind (%C) in [datareaderqos/%C] section.\n"),
2046  value.c_str(), datareaderqos_name.c_str()),
2047  -1);
2048  }
2049  } else if (name == "deadline.period.sec") {
2050  parse_second(datareaderqos.deadline.period.sec, value);
2051  } else if (name == "deadline.period.nanosec") {
2052  parse_nanosecond(datareaderqos.deadline.period.nanosec, value);
2053  } else if (name == "latency_budget.duration.sec") {
2054  parse_second(datareaderqos.latency_budget.duration.sec, value);
2055  } else if (name == "latency_budget.duration.nanosec") {
2056  parse_nanosecond(datareaderqos.latency_budget.duration.nanosec, value);
2057  } else if (name == "liveliness.kind") {
2058  if (value == "AUTOMATIC") {
2059  datareaderqos.liveliness.kind = DDS::AUTOMATIC_LIVELINESS_QOS;
2060  } else if (value == "MANUAL_BY_TOPIC") {
2061  datareaderqos.liveliness.kind = DDS::MANUAL_BY_TOPIC_LIVELINESS_QOS;
2062  } else if (value == "MANUAL_BY_PARTICIPANT") {
2063  datareaderqos.liveliness.kind = DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS;
2064  } else {
2066  ACE_TEXT("(%P|%t) StaticDiscovery::parse_datareaderqos ")
2067  ACE_TEXT("Illegal value for liveliness.kind (%C) in [datareaderqos/%C] section.\n"),
2068  value.c_str(), datareaderqos_name.c_str()),
2069  -1);
2070  }
2071  } else if (name == "liveliness.lease_duration.sec") {
2072  parse_second(datareaderqos.liveliness.lease_duration.sec, value);
2073  } else if (name == "liveliness.lease_duration.nanosec") {
2074  parse_nanosecond(datareaderqos.liveliness.lease_duration.nanosec, value);
2075  } else if (name == "reliability.kind") {
2076  if (value == "BEST_EFFORT") {
2077  datareaderqos.reliability.kind = DDS::BEST_EFFORT_RELIABILITY_QOS;
2078  } else if (value == "RELIABLE") {
2079  datareaderqos.reliability.kind = DDS::RELIABLE_RELIABILITY_QOS;
2080  } else {
2082  ACE_TEXT("(%P|%t) StaticDiscovery::parse_datareaderqos ")
2083  ACE_TEXT("Illegal value for reliability.kind (%C) in [datareaderqos/%C] section.\n"),
2084  value.c_str(), datareaderqos_name.c_str()),
2085  -1);
2086  }
2087  } else if (name == "reliability.max_blocking_time.sec") {
2088  parse_second(datareaderqos.reliability.max_blocking_time.sec, value);
2089  } else if (name == "reliability.max_blocking_time.nanosec") {
2090  parse_nanosecond(datareaderqos.reliability.max_blocking_time.nanosec, value);
2091  } else if (name == "destination_order.kind") {
2092  if (value == "BY_RECEPTION_TIMESTAMP") {
2093  datareaderqos.destination_order.kind = DDS::BY_RECEPTION_TIMESTAMP_DESTINATIONORDER_QOS;
2094  } else if (value == "BY_SOURCE_TIMESTAMP") {
2095  datareaderqos.destination_order.kind = DDS::BY_SOURCE_TIMESTAMP_DESTINATIONORDER_QOS;
2096  } else {
2098  ACE_TEXT("(%P|%t) StaticDiscovery::parse_datareaderqos ")
2099  ACE_TEXT("Illegal value for destination_order.kind (%C) in [datareaderqos/%C] section.\n"),
2100  value.c_str(), datareaderqos_name.c_str()),
2101  -1);
2102  }
2103  } else if (name == "history.kind") {
2104  if (value == "KEEP_ALL") {
2105  datareaderqos.history.kind = DDS::KEEP_ALL_HISTORY_QOS;
2106  } else if (value == "KEEP_LAST") {
2107  datareaderqos.history.kind = DDS::KEEP_LAST_HISTORY_QOS;
2108  } else {
2110  ACE_TEXT("(%P|%t) StaticDiscovery::parse_datareaderqos ")
2111  ACE_TEXT("Illegal value for history.kind (%C) in [datareaderqos/%C] section.\n"),
2112  value.c_str(), datareaderqos_name.c_str()),
2113  -1);
2114  }
2115  } else if (name == "history.depth") {
2116  datareaderqos.history.depth = atoi(value.c_str());
2117  } else if (name == "resource_limits.max_samples") {
2118  datareaderqos.resource_limits.max_samples = atoi(value.c_str());
2119  } else if (name == "resource_limits.max_instances") {
2120  datareaderqos.resource_limits.max_instances = atoi(value.c_str());
2121  } else if (name == "resource_limits.max_samples_per_instance") {
2122  datareaderqos.resource_limits.max_samples_per_instance = atoi(value.c_str());
2123  } else if (name == "time_based_filter.minimum_separation.sec") {
2124  parse_second(datareaderqos.time_based_filter.minimum_separation.sec, value);
2125  } else if (name == "time_based_filter.minimum_separation.nanosec") {
2126  parse_nanosecond(datareaderqos.time_based_filter.minimum_separation.nanosec, value);
2127  } else if (name == "reader_data_lifecycle.autopurge_nowriter_samples_delay.sec") {
2128  parse_second(datareaderqos.reader_data_lifecycle.autopurge_nowriter_samples_delay.sec, value);
2129  } else if (name == "reader_data_lifecycle.autopurge_nowriter_samples_delay.nanosec") {
2130  parse_nanosecond(datareaderqos.reader_data_lifecycle.autopurge_nowriter_samples_delay.nanosec, value);
2131  } else if (name == "reader_data_lifecycle.autopurge_disposed_samples_delay.sec") {
2132  parse_second(datareaderqos.reader_data_lifecycle.autopurge_disposed_samples_delay.sec, value);
2133  } else if (name == "reader_data_lifecycle.autopurge_disposed_samples_delay.nanosec") {
2134  parse_nanosecond(datareaderqos.reader_data_lifecycle.autopurge_disposed_samples_delay.nanosec, value);
2135  } else {
2137  ACE_TEXT("(%P|%t) StaticDiscovery::parse_datareaderqos ")
2138  ACE_TEXT("Unexpected entry (%C) in [datareaderqos/%C] section.\n"),
2139  name.c_str(), datareaderqos_name.c_str()),
2140  -1);
2141  }
2142  }
2143 
2144  registry.datareaderqos_map[datareaderqos_name] = datareaderqos;
2145  }
2146 
2147  return 0;
2148 }
2149 
2150 int
2152 {
2153  const ACE_Configuration_Section_Key& root = cf.root_section();
2155 
2156  if (cf.open_section(root, PUBLISHERQOS_SECTION_NAME, false, section) != 0) {
2157  if (DCPS_debug_level > 0) {
2158  // This is not an error if the configuration file does not have
2159  // any publisherqos (sub)section.
2161  ACE_TEXT("(%P|%t) NOTICE: StaticDiscovery::parse_publisherqos ")
2162  ACE_TEXT("no [%s] sections.\n"),
2163  PUBLISHERQOS_SECTION_NAME));
2164  }
2165  return 0;
2166  }
2167 
2168  // Ensure there are no key/values in the [publisherqos] section.
2169  // Every key/value must be in a [publisherqos/*] sub-section.
2170  ValueMap vm;
2171  if (pullValues(cf, section, vm) > 0) {
2173  ACE_TEXT("(%P|%t) StaticDiscovery::parse_publisherqos ")
2174  ACE_TEXT("[publisherqos] sections must have a subsection name\n")),
2175  -1);
2176  }
2177  // Process the subsections of this section
2178  KeyList keys;
2179  if (processSections(cf, section, keys) != 0) {
2181  ACE_TEXT("(%P|%t) StaticDiscovery::parse_publisherqos ")
2182  ACE_TEXT("too many nesting layers in the [publisherqos] section.\n")),
2183  -1);
2184  }
2185 
2186  // Loop through the [publisherqos/*] sections
2187  for (KeyList::const_iterator it = keys.begin(); it != keys.end(); ++it) {
2188  OPENDDS_STRING publisherqos_name = it->first;
2189 
2190  if (DCPS_debug_level > 0) {
2192  ACE_TEXT("(%P|%t) NOTICE: StaticDiscovery::parse_publisherqos ")
2193  ACE_TEXT("processing [publisherqos/%C] section.\n"),
2194  publisherqos_name.c_str()));
2195  }
2196 
2197  ValueMap values;
2198  pullValues(cf, it->second, values);
2199 
2200  DDS::PublisherQos publisherqos(TheServiceParticipant->initial_PublisherQos());
2201 
2202  for (ValueMap::const_iterator it = values.begin(); it != values.end(); ++it) {
2203  OPENDDS_STRING name = it->first;
2204  OPENDDS_STRING value = it->second;
2205 
2206  if (name == "presentation.access_scope") {
2207  if (value == "INSTANCE") {
2208  publisherqos.presentation.access_scope = DDS::INSTANCE_PRESENTATION_QOS;
2209  } else if (value == "TOPIC") {
2210  publisherqos.presentation.access_scope = DDS::TOPIC_PRESENTATION_QOS;
2211  } else if (value == "GROUP") {
2212  publisherqos.presentation.access_scope = DDS::GROUP_PRESENTATION_QOS;
2213  } else {
2215  ACE_TEXT("(%P|%t) StaticDiscovery::parse_publisherqos ")
2216  ACE_TEXT("Illegal value for presentation.access_scope (%C) in [publisherqos/%C] section.\n"),
2217  value.c_str(), publisherqos_name.c_str()),
2218  -1);
2219  }
2220  } else if (name == "presentation.coherent_access") {
2221  if (parse_bool(publisherqos.presentation.coherent_access, value)) {
2222  } else {
2224  ACE_TEXT("(%P|%t) StaticDiscovery::parse_publisherqos ")
2225  ACE_TEXT("Illegal value for presentation.coherent_access (%C) in [publisherqos/%C] section.\n"),
2226  value.c_str(), publisherqos_name.c_str()),
2227  -1);
2228  }
2229  } else if (name == "presentation.ordered_access") {
2230  if (parse_bool(publisherqos.presentation.ordered_access, value)) {
2231  } else {
2233  ACE_TEXT("(%P|%t) StaticDiscovery::parse_publisherqos ")
2234  ACE_TEXT("Illegal value for presentation.ordered_access (%C)")
2235  ACE_TEXT("in [publisherqos/%C] section.\n"),
2236  value.c_str(), publisherqos_name.c_str()),
2237  -1);
2238  }
2239  } else if (name == "partition.name") {
2240  try {
2241  parse_list(publisherqos.partition, value);
2242  }
2243  catch (const CORBA::Exception& ex) {
2245  ACE_TEXT("(%P|%t) StaticDiscovery::parse_publisherqos ")
2246  ACE_TEXT("Exception caught while parsing partition.name (%C) ")
2247  ACE_TEXT("in [publisherqos/%C] section: %C.\n"),
2248  value.c_str(), publisherqos_name.c_str(), ex._info().c_str()),
2249  -1);
2250  }
2251  } else {
2253  ACE_TEXT("(%P|%t) StaticDiscovery::parse_publisherqos ")
2254  ACE_TEXT("Unexpected entry (%C) in [publisherqos/%C] section.\n"),
2255  name.c_str(), publisherqos_name.c_str()),
2256  -1);
2257  }
2258  }
2259 
2260  registry.publisherqos_map[publisherqos_name] = publisherqos;
2261  }
2262 
2263  return 0;
2264 }
2265 
2266 int
2268 {
2269  const ACE_Configuration_Section_Key& root = cf.root_section();
2271 
2272  if (cf.open_section(root, SUBSCRIBERQOS_SECTION_NAME, false, section) != 0) {
2273  if (DCPS_debug_level > 0) {
2274  // This is not an error if the configuration file does not have
2275  // any subscriberqos (sub)section.
2277  ACE_TEXT("(%P|%t) NOTICE: StaticDiscovery::parse_subscriberqos ")
2278  ACE_TEXT("no [%s] sections.\n"),
2279  SUBSCRIBERQOS_SECTION_NAME));
2280  }
2281  return 0;
2282  }
2283 
2284  // Ensure there are no key/values in the [subscriberqos] section.
2285  // Every key/value must be in a [subscriberqos/*] sub-section.
2286  ValueMap vm;
2287  if (pullValues(cf, section, vm) > 0) {
2289  ACE_TEXT("(%P|%t) StaticDiscovery::parse_subscriberqos ")
2290  ACE_TEXT("[subscriberqos] sections must have a subsection name\n")),
2291  -1);
2292  }
2293  // Process the subsections of this section
2294  KeyList keys;
2295  if (processSections(cf, section, keys) != 0) {
2297  ACE_TEXT("(%P|%t) StaticDiscovery::parse_subscriberqos ")
2298  ACE_TEXT("too many nesting layers in the [subscriberqos] section.\n")),
2299  -1);
2300  }
2301 
2302  // Loop through the [subscriberqos/*] sections
2303  for (KeyList::const_iterator it = keys.begin(); it != keys.end(); ++it) {
2304  OPENDDS_STRING subscriberqos_name = it->first;
2305 
2306  if (DCPS_debug_level > 0) {
2308  ACE_TEXT("(%P|%t) NOTICE: StaticDiscovery::parse_subscriberqos ")
2309  ACE_TEXT("processing [subscriberqos/%C] section.\n"),
2310  subscriberqos_name.c_str()));
2311  }
2312 
2313  ValueMap values;
2314  pullValues(cf, it->second, values);
2315 
2316  DDS::SubscriberQos subscriberqos(TheServiceParticipant->initial_SubscriberQos());
2317 
2318  for (ValueMap::const_iterator it = values.begin(); it != values.end(); ++it) {
2319  OPENDDS_STRING name = it->first;
2320  OPENDDS_STRING value = it->second;
2321 
2322  if (name == "presentation.access_scope") {
2323  if (value == "INSTANCE") {
2324  subscriberqos.presentation.access_scope = DDS::INSTANCE_PRESENTATION_QOS;
2325  } else if (value == "TOPIC") {
2326  subscriberqos.presentation.access_scope = DDS::TOPIC_PRESENTATION_QOS;
2327  } else if (value == "GROUP") {
2328  subscriberqos.presentation.access_scope = DDS::GROUP_PRESENTATION_QOS;
2329  } else {
2331  ACE_TEXT("(%P|%t) StaticDiscovery::parse_subscriberqos ")
2332  ACE_TEXT("Illegal value for presentation.access_scope (%C) in [subscriberqos/%C] section.\n"),
2333  value.c_str(), subscriberqos_name.c_str()),
2334  -1);
2335  }
2336  } else if (name == "presentation.coherent_access") {
2337  if (parse_bool(subscriberqos.presentation.coherent_access, value)) {
2338  } else {
2340  ACE_TEXT("(%P|%t) StaticDiscovery::parse_subscriberqos ")
2341  ACE_TEXT("Illegal value for presentation.coherent_access (%C) in [subscriberqos/%C] section.\n"),
2342  value.c_str(), subscriberqos_name.c_str()),
2343  -1);
2344  }
2345  } else if (name == "presentation.ordered_access") {
2346  if (parse_bool(subscriberqos.presentation.ordered_access, value)) {
2347  } else {
2349  ACE_TEXT("(%P|%t) StaticDiscovery::parse_subscriberqos ")
2350  ACE_TEXT("Illegal value for presentation.ordered_access (%C) in [subscriberqos/%C] section.\n"),
2351  value.c_str(), subscriberqos_name.c_str()),
2352  -1);
2353  }
2354  } else if (name == "partition.name") {
2355  try {
2356  parse_list(subscriberqos.partition, value);
2357  }
2358  catch (const CORBA::Exception& ex) {
2360  ACE_TEXT("(%P|%t) StaticDiscovery::parse_subscriberqos ")
2361  ACE_TEXT("Exception caught while parsing partition.name (%C) ")
2362  ACE_TEXT("in [subscriberqos/%C] section: %C.\n"),
2363  value.c_str(), subscriberqos_name.c_str(), ex._info().c_str()),
2364  -1);
2365  }
2366  } else {
2368  ACE_TEXT("(%P|%t) StaticDiscovery::parse_subscriberqos ")
2369  ACE_TEXT("Unexpected entry (%C) in [subscriberqos/%C] section.\n"),
2370  name.c_str(), subscriberqos_name.c_str()),
2371  -1);
2372  }
2373  }
2374 
2375  registry.subscriberqos_map[subscriberqos_name] = subscriberqos;
2376  }
2377 
2378  return 0;
2379 }
2380 
2381 int
2383 {
2384  const ACE_Configuration_Section_Key& root = cf.root_section();
2386 
2387  if (cf.open_section(root, ENDPOINT_SECTION_NAME, false, section) != 0) {
2388  if (DCPS_debug_level > 0) {
2389  // This is not an error if the configuration file does not have
2390  // any endpoint (sub)section.
2392  ACE_TEXT("(%P|%t) NOTICE: StaticDiscovery::parse_endpoints ")
2393  ACE_TEXT("no [%s] sections.\n"),
2394  ENDPOINT_SECTION_NAME));
2395  }
2396  return 0;
2397  }
2398 
2399  // Ensure there are no key/values in the [endpoint] section.
2400  // Every key/value must be in a [endpoint/*] sub-section.
2401  ValueMap vm;
2402  if (pullValues(cf, section, vm) > 0) {
2404  ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
2405  ACE_TEXT("[endpoint] sections must have a subsection name\n")),
2406  -1);
2407  }
2408  // Process the subsections of this section
2409  KeyList keys;
2410  if (processSections(cf, section, keys) != 0) {
2412  ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
2413  ACE_TEXT("too many nesting layers in the [endpoint] section.\n")),
2414  -1);
2415  }
2416 
2417  // Loop through the [endpoint/*] sections
2418  for (KeyList::const_iterator it = keys.begin(); it != keys.end(); ++it) {
2419  OPENDDS_STRING endpoint_name = it->first;
2420 
2421  if (DCPS_debug_level > 0) {
2423  ACE_TEXT("(%P|%t) NOTICE: StaticDiscovery::parse_endpoints ")
2424  ACE_TEXT("processing [endpoint/%C] section.\n"),
2425  endpoint_name.c_str()));
2426  }
2427 
2428  ValueMap values;
2429  pullValues(cf, it->second, values);
2430  int domain = 0;
2431  unsigned char participant[6] = { 0 };
2432  unsigned char entity[3] = { 0 };
2433  enum Type {
2434  Reader,
2435  Writer
2436  };
2437  Type type = Reader; // avoid warning
2438  OPENDDS_STRING topic_name;
2439  DDS::DataWriterQos datawriterqos(TheServiceParticipant->initial_DataWriterQos());
2440  DDS::DataReaderQos datareaderqos(TheServiceParticipant->initial_DataReaderQos());
2441  DDS::PublisherQos publisherqos(TheServiceParticipant->initial_PublisherQos());
2442  DDS::SubscriberQos subscriberqos(TheServiceParticipant->initial_SubscriberQos());
2443  TransportLocatorSeq trans_info;
2444  OPENDDS_STRING config_name;
2445 
2446  bool domain_specified = false,
2447  participant_specified = false,
2448  entity_specified = false,
2449  type_specified = false,
2450  topic_name_specified = false,
2451  config_name_specified = false;
2452 
2453  for (ValueMap::const_iterator it = values.begin(); it != values.end(); ++it) {
2454  OPENDDS_STRING name = it->first;
2455  OPENDDS_STRING value = it->second;
2456 
2457  if (name == "domain") {
2458  if (convertToInteger(value, domain)) {
2459  domain_specified = true;
2460  } else {
2462  ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
2463  ACE_TEXT("Illegal integer value for domain (%C) in [endpoint/%C] section.\n"),
2464  value.c_str(), endpoint_name.c_str()),
2465  -1);
2466  }
2467  } else if (name == "participant") {
2468  const OPENDDS_STRING::difference_type count = std::count_if(value.begin(), value.end(), isxdigit);
2469  if (value.size() != HEX_DIGITS_IN_PARTICIPANT || static_cast<size_t>(count) != HEX_DIGITS_IN_PARTICIPANT) {
2471  ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
2472  ACE_TEXT("participant (%C) must be 12 hexadecimal digits in [endpoint/%C] section.\n"),
2473  value.c_str(), endpoint_name.c_str()),
2474  -1);
2475  }
2476 
2477  for (size_t idx = 0; idx != BYTES_IN_PARTICIPANT; ++idx) {
2478  participant[idx] = fromhex(value, idx);
2479  }
2480  participant_specified = true;
2481  } else if (name == "entity") {
2482  const OPENDDS_STRING::difference_type count = std::count_if(value.begin(), value.end(), isxdigit);
2483  if (value.size() != HEX_DIGITS_IN_ENTITY || static_cast<size_t>(count) != HEX_DIGITS_IN_ENTITY) {
2485  ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
2486  ACE_TEXT("entity (%C) must be 6 hexadecimal digits in [endpoint/%C] section.\n"),
2487  value.c_str(), endpoint_name.c_str()),
2488  -1);
2489  }
2490 
2491  for (size_t idx = 0; idx != BYTES_IN_ENTITY; ++idx) {
2492  entity[idx] = fromhex(value, idx);
2493  }
2494  entity_specified = true;
2495  } else if (name == "type") {
2496  if (value == "reader") {
2497  type = Reader;
2498  type_specified = true;
2499  } else if (value == "writer") {
2500  type = Writer;
2501  type_specified = true;
2502  } else {
2504  ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
2505  ACE_TEXT("Illegal string value for type (%C) in [endpoint/%C] section.\n"),
2506  value.c_str(), endpoint_name.c_str()),
2507  -1);
2508  }
2509  } else if (name == "topic") {
2510  EndpointRegistry::TopicMapType::const_iterator pos = this->registry.topic_map.find(value);
2511  if (pos != this->registry.topic_map.end()) {
2512  topic_name = pos->second.name;
2513  topic_name_specified = true;
2514  } else {
2516  ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
2517  ACE_TEXT("Illegal topic reference (%C) in [endpoint/%C] section.\n"),
2518  value.c_str(), endpoint_name.c_str()),
2519  -1);
2520  }
2521  } else if (name == "datawriterqos") {
2522  EndpointRegistry::DataWriterQosMapType::const_iterator pos = this->registry.datawriterqos_map.find(value);
2523  if (pos != this->registry.datawriterqos_map.end()) {
2524  datawriterqos = pos->second;
2525  } else {
2527  ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
2528  ACE_TEXT("Illegal datawriterqos reference (%C) in [endpoint/%C] section.\n"),
2529  value.c_str(), endpoint_name.c_str()),
2530  -1);
2531  }
2532  } else if (name == "publisherqos") {
2533  EndpointRegistry::PublisherQosMapType::const_iterator pos = this->registry.publisherqos_map.find(value);
2534  if (pos != this->registry.publisherqos_map.end()) {
2535  publisherqos = pos->second;
2536  } else {
2538  ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
2539  ACE_TEXT("Illegal publisherqos reference (%C) in [endpoint/%C] section.\n"),
2540  value.c_str(), endpoint_name.c_str()),
2541  -1);
2542  }
2543  } else if (name == "datareaderqos") {
2544  EndpointRegistry::DataReaderQosMapType::const_iterator pos = this->registry.datareaderqos_map.find(value);
2545  if (pos != this->registry.datareaderqos_map.end()) {
2546  datareaderqos = pos->second;
2547  } else {
2549  ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
2550  ACE_TEXT("Illegal datareaderqos reference (%C) in [endpoint/%C] section.\n"),
2551  value.c_str(), endpoint_name.c_str()),
2552  -1);
2553  }
2554  } else if (name == "subscriberqos") {
2555  EndpointRegistry::SubscriberQosMapType::const_iterator pos = this->registry.subscriberqos_map.find(value);
2556  if (pos != this->registry.subscriberqos_map.end()) {
2557  subscriberqos = pos->second;
2558  } else {
2560  ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
2561  ACE_TEXT("Illegal subscriberqos reference (%C) in [endpoint/%C] section.\n"),
2562  value.c_str(), endpoint_name.c_str()),
2563  -1);
2564  }
2565  } else if (name == "config") {
2566  config_name = value;
2567  config_name_specified = true;
2568  } else {
2570  ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
2571  ACE_TEXT("Unexpected entry (%C) in [endpoint/%C] section.\n"),
2572  name.c_str(), endpoint_name.c_str()),
2573  -1);
2574  }
2575  }
2576 
2577  if (!domain_specified) {
2579  ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
2580  ACE_TEXT("No domain specified for [endpoint/%C] section.\n"),
2581  endpoint_name.c_str()),
2582  -1);
2583  }
2584 
2585  if (!participant_specified) {
2587  ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
2588  ACE_TEXT("No participant specified for [endpoint/%C] section.\n"),
2589  endpoint_name.c_str()),
2590  -1);
2591  }
2592 
2593  if (!entity_specified) {
2595  ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
2596  ACE_TEXT("No entity specified for [endpoint/%C] section.\n"),
2597  endpoint_name.c_str()),
2598  -1);
2599  }
2600 
2601  if (!type_specified) {
2603  ACE_TEXT("(%P|%t) ERROR:StaticDiscovery::parse_endpoints ")
2604  ACE_TEXT("No type specified for [endpoint/%C] section.\n"),
2605  endpoint_name.c_str()),
2606  -1);
2607  }
2608 
2609  if (!topic_name_specified) {
2611  ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
2612  ACE_TEXT("No topic specified for [endpoint/%C] section.\n"),
2613  endpoint_name.c_str()),
2614  -1);
2615  }
2616 
2617  TransportConfig_rch config;
2618 
2619  if (config_name_specified) {
2620  config = TheTransportRegistry->get_config(config_name);
2621  if (config.is_nil()) {
2623  ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
2624  ACE_TEXT("Illegal config reference (%C) in [endpoint/%C] section.\n"),
2625  config_name.c_str(), endpoint_name.c_str()),
2626  -1);
2627  }
2628  }
2629 
2630  if (config.is_nil() && domain_specified) {
2631  config = TheTransportRegistry->domain_default_config(domain);
2632  }
2633 
2634  if (config.is_nil()) {
2635  config = TheTransportRegistry->global_config();
2636  }
2637 
2638  try {
2639  config->populate_locators(trans_info);
2640  }
2641  catch (const CORBA::Exception& ex) {
2643  ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
2644  ACE_TEXT("Exception caught while populating locators for [endpoint/%C] section. %C\n"),
2645  endpoint_name.c_str(), ex._info().c_str()),
2646  -1);
2647  }
2648  if (trans_info.length() == 0) {
2650  ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
2651  ACE_TEXT("No locators for [endpoint/%C] section.\n"),
2652  endpoint_name.c_str()),
2653  -1);
2654  }
2655 
2656  EntityId_t entity_id = EndpointRegistry::build_id(entity,
2658 
2659  GUID_t id = EndpointRegistry::build_id(domain, participant, entity_id);
2660 
2661  if (DCPS_debug_level > 0) {
2662  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DEBUG: StaticDiscovery::parse_endpoints adding entity with id %C\n"), LogGuid(id).c_str()));
2663  }
2664 
2665  switch (type) {
2666  case Reader:
2667  // Populate the userdata.
2668  datareaderqos.user_data.value.length(3);
2669  datareaderqos.user_data.value[0] = entity_id.entityKey[0];
2670  datareaderqos.user_data.value[1] = entity_id.entityKey[1];
2671  datareaderqos.user_data.value[2] = entity_id.entityKey[2];
2672  set_reader_effective_data_rep_qos(datareaderqos.representation.value);
2673  if (!registry.reader_map.insert(std::make_pair(id,
2674  EndpointRegistry::Reader(topic_name, datareaderqos, subscriberqos, config_name, trans_info))).second) {
2676  ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
2677  ACE_TEXT("Section [endpoint/%C] ignored - duplicate reader.\n"),
2678  endpoint_name.c_str()),
2679  -1);
2680  }
2681  break;
2682  case Writer:
2683  // Populate the userdata.
2684  datawriterqos.user_data.value.length(3);
2685  datawriterqos.user_data.value[0] = entity_id.entityKey[0];
2686  datawriterqos.user_data.value[1] = entity_id.entityKey[1];
2687  datawriterqos.user_data.value[2] = entity_id.entityKey[2];
2688  bool encapsulated_only = false;
2689  for (CORBA::ULong i = 0; i < trans_info.length(); ++i) {
2690  if (0 == std::strcmp(trans_info[i].transport_type, "rtps_udp")) {
2691  encapsulated_only = true;
2692  break;
2693  }
2694  }
2695  set_writer_effective_data_rep_qos(datawriterqos.representation.value, encapsulated_only);
2696 
2697  if (!registry.writer_map.insert(std::make_pair(id,
2698  EndpointRegistry::Writer(topic_name, datawriterqos, publisherqos, config_name, trans_info))).second) {
2700  ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
2701  ACE_TEXT("Section [endpoint/%C] ignored - duplicate writer.\n"),
2702  endpoint_name.c_str()),
2703  -1);
2704  }
2705  break;
2706  }
2707  }
2708 
2709  return 0;
2710 }
2711 
2713 {
2714  const DDS::Publisher_var pub = writer->get_publisher();
2715  const DDS::DomainParticipant_var part = pub->get_participant();
2716  const DDS::DomainId_t dom = part->get_domain_id();
2717 
2718  DDS::DomainParticipantQos partQos;
2719  part->get_qos(partQos);
2720  if (partQos.user_data.value.length() < 6)
2721  return;
2722  const unsigned char* const partId = partQos.user_data.value.get_buffer();
2723 
2724  DDS::DataWriterQos qos;
2725  writer->get_qos(qos);
2726  if (qos.user_data.value.length() < 3)
2727  return;
2728  const unsigned char* const dwId = qos.user_data.value.get_buffer();
2729 
2730  const EntityId_t entId =
2732  const GUID_t rid = EndpointRegistry::build_id(dom, partId, entId);
2733 
2734  const EndpointRegistry::WriterMapType::const_iterator iter =
2735  registry.writer_map.find(rid);
2736 
2737  if (iter != registry.writer_map.end() && !iter->second.trans_cfg.empty()) {
2738  TransportRegistry::instance()->bind_config(iter->second.trans_cfg, writer);
2739  }
2740 }
2741 
2743 {
2744  const DDS::Subscriber_var sub = reader->get_subscriber();
2745  const DDS::DomainParticipant_var part = sub->get_participant();
2746  const DDS::DomainId_t dom = part->get_domain_id();
2747 
2748  DDS::DomainParticipantQos partQos;
2749  part->get_qos(partQos);
2750  if (partQos.user_data.value.length() < 6)
2751  return;
2752  const unsigned char* const partId = partQos.user_data.value.get_buffer();
2753 
2754  DDS::DataReaderQos qos;
2755  reader->get_qos(qos);
2756  if (qos.user_data.value.length() < 3)
2757  return;
2758  const unsigned char* const drId = qos.user_data.value.get_buffer();
2759 
2760  const EntityId_t entId =
2762  const GUID_t rid = EndpointRegistry::build_id(dom, partId, entId);
2763 
2764  const EndpointRegistry::ReaderMapType::const_iterator iter =
2765  registry.reader_map.find(rid);
2766 
2767  if (iter != registry.reader_map.end() && !iter->second.trans_cfg.empty()) {
2768  TransportRegistry::instance()->bind_config(iter->second.trans_cfg, reader);
2769  }
2770 }
2771 
2773 
2775 {
2776  DDS::Subscriber_var bit_subscriber;
2777 #ifndef DDS_HAS_MINIMUM_BIT
2778  if (!TheServiceParticipant->get_BIT()) {
2779  get_part(participant->get_domain_id(), participant->get_id())->init_bit(bit_subscriber);
2780  return RcHandle<BitSubscriber>();
2781  }
2782 
2783  if (create_bit_topics(participant) != DDS::RETCODE_OK) {
2784  return RcHandle<BitSubscriber>();
2785  }
2786 
2787  bit_subscriber =
2789  DDS::SubscriberListener::_nil(),
2791  SubscriberImpl* sub = dynamic_cast<SubscriberImpl*>(bit_subscriber.in());
2792  if (sub == 0) {
2793  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) PeerDiscovery::init_bit")
2794  ACE_TEXT(" - Could not cast Subscriber to SubscriberImpl\n")));
2795  return RcHandle<BitSubscriber>();
2796  }
2797 
2798  DDS::DataReaderQos dr_qos;
2799  sub->get_default_datareader_qos(dr_qos);
2800  dr_qos.durability.kind = DDS::TRANSIENT_LOCAL_DURABILITY_QOS;
2801 
2802  dr_qos.reader_data_lifecycle.autopurge_nowriter_samples_delay =
2803  TheServiceParticipant->bit_autopurge_nowriter_samples_delay();
2804  dr_qos.reader_data_lifecycle.autopurge_disposed_samples_delay =
2805  TheServiceParticipant->bit_autopurge_disposed_samples_delay();
2806 
2807  DDS::TopicDescription_var bit_part_topic =
2810  sub, dr_qos);
2811 
2812  DDS::TopicDescription_var bit_topic_topic =
2814  create_bit_dr(bit_topic_topic, BUILT_IN_TOPIC_TOPIC_TYPE,
2815  sub, dr_qos);
2816 
2817  DDS::TopicDescription_var bit_pub_topic =
2820  sub, dr_qos);
2821 
2822  DDS::TopicDescription_var bit_sub_topic =
2825  sub, dr_qos);
2826 
2827  DDS::TopicDescription_var bit_part_loc_topic =
2830  sub, dr_qos);
2831 
2832  DDS::TopicDescription_var bit_connection_record_topic =
2834  create_bit_dr(bit_connection_record_topic, BUILT_IN_CONNECTION_RECORD_TOPIC_TYPE,
2835  sub, dr_qos);
2836 
2837  DDS::TopicDescription_var bit_internal_thread_topic =
2839  create_bit_dr(bit_internal_thread_topic, BUILT_IN_INTERNAL_THREAD_TOPIC_TYPE,
2840  sub, dr_qos);
2841 
2842  const DDS::ReturnCode_t ret = bit_subscriber->enable();
2843  if (ret != DDS::RETCODE_OK) {
2844  if (DCPS_debug_level) {
2845  ACE_DEBUG((LM_INFO, ACE_TEXT("(%P|%t) PeerDiscovery::init_bit")
2846  ACE_TEXT(" - Error %d enabling subscriber\n"), ret));
2847  }
2848  return RcHandle<BitSubscriber>();
2849  }
2850 #endif /* DDS_HAS_MINIMUM_BIT */
2851 
2852  get_part(participant->get_domain_id(), participant->get_id())->init_bit(bit_subscriber);
2853 
2854  return make_rch<BitSubscriber>(bit_subscriber);
2855 }
2856 
2858 {
2859  get_part(participant->get_domain_id(), participant->get_id())->fini_bit();
2860 }
2861 
2863  DDS::DomainId_t /*domainId*/, const GUID_t& /*participantId*/)
2864 {
2865  return false; // This is just for DCPSInfoRepo?
2866 }
2867 
2869  DDS::DomainId_t domain_id, const GUID_t& participantId)
2870 {
2871  // Use reference counting to ensure participant
2872  // does not get deleted until lock as been released.
2873  ParticipantHandle participant;
2875  DomainParticipantMap::iterator domain = participants_.find(domain_id);
2876  if (domain == participants_.end()) {
2877  return false;
2878  }
2879  ParticipantMap::iterator part = domain->second.find(participantId);
2880  if (part == domain->second.end()) {
2881  return false;
2882  }
2883  participant = part->second;
2884  domain->second.erase(part);
2885  if (domain->second.empty()) {
2886  participants_.erase(domain);
2887  }
2888 
2889  participant->shutdown();
2890  return true;
2891 }
2892 
2894  DDS::DomainId_t domain, const GUID_t& myParticipantId, const GUID_t& ignoreId)
2895 {
2896  get_part(domain, myParticipantId)->ignore_domain_participant(ignoreId);
2897  return true;
2898 }
2899 
2901  DDS::DomainId_t domain, const GUID_t& participant, const DDS::DomainParticipantQos& qos)
2902 {
2903  return get_part(domain, participant)->update_domain_participant_qos(qos);
2904 }
2905 
2907  GUID_t& topicId,
2908  DDS::DomainId_t domainId,
2909  const GUID_t& participantId,
2910  const char* topicName,
2911  const char* dataTypeName,
2912  const DDS::TopicQos& qos,
2913  bool hasDcpsKey,
2914  DCPS::TopicCallbacks* topic_callbacks)
2915 {
2917  // Verified its safe to hold lock during call to assert_topic
2918  return participants_[domainId][participantId]->assert_topic(topicId, topicName,
2919  dataTypeName, qos,
2920  hasDcpsKey, topic_callbacks);
2921 }
2922 
2924  DDS::DomainId_t domainId,
2925  const GUID_t& participantId,
2926  const char* topicName,
2927  CORBA::String_out dataTypeName,
2928  DDS::TopicQos_out qos,
2929  GUID_t& topicId)
2930 {
2932  return participants_[domainId][participantId]->find_topic(topicName, dataTypeName, qos, topicId);
2933 }
2934 
2936  DDS::DomainId_t domainId,
2937  const GUID_t& participantId,
2938  const GUID_t& topicId)
2939 {
2941  // Safe to hold lock while calling remove topic
2942  return participants_[domainId][participantId]->remove_topic(topicId);
2943 }
2944 
2945 bool StaticDiscovery::ignore_topic(DDS::DomainId_t domainId, const GUID_t& myParticipantId,
2946  const GUID_t& ignoreId)
2947 {
2948  get_part(domainId, myParticipantId)->ignore_topic(ignoreId);
2949  return true;
2950 }
2951 
2953  const GUID_t& participantId, const DDS::TopicQos& qos)
2954 {
2956  // Safe to hold lock while calling update_topic_qos
2957  return participants_[domainId][participantId]->update_topic_qos(topicId, qos);
2958 }
2959 
2961  DDS::DomainId_t domainId,
2962  const GUID_t& participantId,
2963  const GUID_t& topicId,
2964  DCPS::DataWriterCallbacks_rch publication,
2965  const DDS::DataWriterQos& qos,
2966  const DCPS::TransportLocatorSeq& transInfo,
2967  const DDS::PublisherQos& publisherQos,
2968  const XTypes::TypeInformation& type_info)
2969 {
2970  return get_part(domainId, participantId)->add_publication(
2971  topicId, publication, qos, transInfo, publisherQos, type_info);
2972 }
2973 
2975  DDS::DomainId_t domainId, const GUID_t& participantId, const GUID_t& publicationId)
2976 {
2977  get_part(domainId, participantId)->remove_publication(publicationId);
2978  return true;
2979 }
2980 
2982  DDS::DomainId_t domainId, const GUID_t& participantId, const GUID_t& ignoreId)
2983 {
2984  get_part(domainId, participantId)->ignore_publication(ignoreId);
2985  return true;
2986 }
2987 
2989  DDS::DomainId_t domainId,
2990  const GUID_t& partId,
2991  const GUID_t& dwId,
2992  const DDS::DataWriterQos& qos,
2993  const DDS::PublisherQos& publisherQos)
2994 {
2995  return get_part(domainId, partId)->update_publication_qos(dwId, qos,
2996  publisherQos);
2997 }
2998 
3000  DDS::DomainId_t domainId, const GUID_t& partId, const GUID_t& dwId,
3001  const DCPS::TransportLocatorSeq& transInfo)
3002 {
3003  get_part(domainId, partId)->update_publication_locators(dwId, transInfo);
3004 }
3005 
3007  DDS::DomainId_t domainId,
3008  const GUID_t& participantId,
3009  const GUID_t& topicId,
3010  DCPS::DataReaderCallbacks_rch subscription,
3011  const DDS::DataReaderQos& qos,
3012  const DCPS::TransportLocatorSeq& transInfo,
3013  const DDS::SubscriberQos& subscriberQos,
3014  const char* filterClassName,
3015  const char* filterExpr,
3016  const DDS::StringSeq& params,
3017  const XTypes::TypeInformation& type_info)
3018 {
3019  return get_part(domainId, participantId)->add_subscription(
3020  topicId, subscription, qos, transInfo, subscriberQos, filterClassName,
3021  filterExpr, params, type_info);
3022 }
3023 
3025  DDS::DomainId_t domainId, const GUID_t& participantId, const GUID_t& subscriptionId)
3026 {
3027  get_part(domainId, participantId)->remove_subscription(subscriptionId);
3028  return true;
3029 }
3030 
3032  DDS::DomainId_t domainId, const GUID_t& participantId, const GUID_t& ignoreId)
3033 {
3034  get_part(domainId, participantId)->ignore_subscription(ignoreId);
3035  return true;
3036 }
3037 
3039  DDS::DomainId_t domainId,
3040  const GUID_t& partId,
3041  const GUID_t& drId,
3042  const DDS::DataReaderQos& qos,
3043  const DDS::SubscriberQos& subQos)
3044 {
3045  return get_part(domainId, partId)->update_subscription_qos(drId, qos, subQos);
3046 }
3047 
3049  DDS::DomainId_t domainId, const GUID_t& partId, const GUID_t& subId, const DDS::StringSeq& params)
3050 {
3051  return get_part(domainId, partId)->update_subscription_params(subId, params);
3052 }
3053 
3055  DDS::DomainId_t domainId, const GUID_t& partId, const GUID_t& subId,
3056  const DCPS::TransportLocatorSeq& transInfo)
3057 {
3058  get_part(domainId, partId)->update_subscription_locators(subId, transInfo);
3059 }
3060 
3062  const DDS::DomainId_t domain_id, const GUID_t& part_id) const
3063 {
3065  DomainParticipantMap::const_iterator domain = participants_.find(domain_id);
3066  if (domain == participants_.end()) {
3067  return ParticipantHandle();
3068  }
3069  ParticipantMap::const_iterator part = domain->second.find(part_id);
3070  if (part == domain->second.end()) {
3071  return ParticipantHandle();
3072  }
3073  return part->second;
3074 }
3075 
3076 void StaticDiscovery::create_bit_dr(DDS::TopicDescription_ptr topic, const char* type,
3077  SubscriberImpl* sub, const DDS::DataReaderQos& qos)
3078 {
3079  TopicDescriptionImpl* bit_topic_i =
3080  dynamic_cast<TopicDescriptionImpl*>(topic);
3081  if (bit_topic_i == 0) {
3082  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) PeerDiscovery::create_bit_dr")
3083  ACE_TEXT(" - Could not cast TopicDescription to TopicDescriptionImpl\n")));
3084  return;
3085  }
3086 
3087  DDS::DomainParticipant_var participant = sub->get_participant();
3088  DomainParticipantImpl* participant_i =
3089  dynamic_cast<DomainParticipantImpl*>(participant.in());
3090  if (participant_i == 0) {
3091  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) PeerDiscovery::create_bit_dr")
3092  ACE_TEXT(" - Could not cast DomainParticipant to DomainParticipantImpl\n")));
3093  return;
3094  }
3095 
3096  TypeSupport_var type_support =
3097  Registered_Data_Types->lookup(participant, type);
3098 
3099  DDS::DataReader_var dr = type_support->create_datareader();
3100  DataReaderImpl* dri = dynamic_cast<DataReaderImpl*>(dr.in());
3101  if (dri == 0) {
3102  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) PeerDiscovery::create_bit_dr")
3103  ACE_TEXT(" - Could not cast DataReader to DataReaderImpl\n")));
3104  return;
3105  }
3106 
3107  dri->init(bit_topic_i, qos, 0 /*listener*/, 0 /*mask*/, participant_i, sub);
3108  dri->disable_transport();
3109  dri->enable();
3110 }
3111 
3113 {
3114  if (iter == participants_.end()) {
3115  return;
3116  }
3117  GUID_t part_id = iter->first;
3118  bool removed = endpoint_manager().disassociate();
3119  iter = participants_.find(part_id); // refresh iter after disassociate, which can unlock
3120  if (iter == participants_.end()) {
3121  return;
3122  }
3123  if (removed) {
3124 #ifndef DDS_HAS_MINIMUM_BIT
3126  ParticipantLocationBuiltinTopicDataDataReaderImpl* loc_bit = part_loc_bit();
3127  // bit may be null if the DomainParticipant is shutting down
3128  if ((bit && iter->second.bit_ih_ != DDS::HANDLE_NIL) ||
3129  (loc_bit && iter->second.location_ih_ != DDS::HANDLE_NIL)) {
3130  {
3131  const DDS::InstanceHandle_t bit_ih = iter->second.bit_ih_;
3132  const DDS::InstanceHandle_t location_ih = iter->second.location_ih_;
3133 
3136  if (bit && bit_ih != DDS::HANDLE_NIL) {
3137  bit->set_instance_state(bit_ih,
3139  }
3140  if (loc_bit && location_ih != DDS::HANDLE_NIL) {
3141  loc_bit->set_instance_state(location_ih,
3143  }
3144  }
3145  iter = participants_.find(part_id);
3146  if (iter == participants_.end()) {
3147  return;
3148  }
3149  }
3150 #endif /* DDS_HAS_MINIMUM_BIT */
3151  if (DCPS_debug_level > 3) {
3152  ACE_DEBUG((LM_INFO, ACE_TEXT("(%P|%t) LocalParticipant::remove_discovered_participant")
3153  ACE_TEXT(" - erasing %C (%B)\n"), LogGuid(iter->first).c_str(), participants_.size()));
3154  }
3155 
3156  remove_discovered_participant_i(iter);
3157 
3158  participants_.erase(iter);
3159  }
3160 }
3161 
3162 } // namespace DCPS
3163 } // namespace OpenDDS
3164 
virtual void reader_exists(const GUID_t &readerid, const GUID_t &writerid)
const char *const BUILT_IN_SUBSCRIPTION_TOPIC_TYPE
UserDataQosPolicy user_data
#define TheTransportRegistry
PartitionQosPolicy partition
Implements the OpenDDS::DCPS::DomainParticipant interfaces.
#define ACE_DEBUG(X)
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310
HistoryQosPolicy history
TopicStatus assert_topic(GUID_t &topicId, const char *topicName, const char *dataTypeName, const DDS::TopicQos &qos, bool hasDcpsKey, TopicCallbacks *topic_callbacks)
void remove_discovered_subscription(const DCPS::GUID_t &guid)
Definition: TopicDetails.h:118
ACE_CDR::Long Long
DiscoveredPublicationMap::iterator DiscoveredPublicationIter
#define ACE_ERROR(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
const char * c_str(void) const
DCPS::TopicStatus find_topic(DDS::DomainId_t domainId, const GUID_t &participantId, const char *topicName, CORBA::String_out dataTypeName, DDS::TopicQos_out qos, GUID_t &topicId)
const LogLevel::Value value
Definition: debug.cpp:61
GUID_t add_subscription(const GUID_t &topicId, DataReaderCallbacks_rch subscription, const DDS::DataReaderQos &qos, const TransportLocatorSeq &transInfo, const DDS::SubscriberQos &subscriberQos, const char *filterClassName, const char *filterExpr, const DDS::StringSeq &params, const XTypes::TypeInformation &type_info)
GUID_t add_publication(DDS::DomainId_t domainId, const GUID_t &participantId, const GUID_t &topicId, DCPS::DataWriterCallbacks_rch publication, const DDS::DataWriterQos &qos, const DCPS::TransportLocatorSeq &transInfo, const DDS::PublisherQos &publisherQos, const XTypes::TypeInformation &type_info)
const InstanceHandle_t HANDLE_NIL
std::string String
#define ACE_HTONL(X)
DiscoveredSubscriptionMap::iterator DiscoveredSubscriptionIter
void init(const OPENDDS_STRING &name, const DCPS::GUID_t &topic_id)
Definition: TopicDetails.h:34
sequence< QosPolicyCount > QosPolicyCountSeq
Definition: DdsDcpsCore.idl:62
const char *const BUILT_IN_CONNECTION_RECORD_TOPIC_TYPE
ReliabilityQosPolicy reliability
virtual GUID_t generate_participant_guid()
virtual DDS::ReturnCode_t remove_subscription_i(const GUID_t &, LocalSubscription &)
GroupDataQosPolicy group_data
LocalSubscriptionMap::iterator LocalSubscriptionIter
LM_INFO
OwnershipQosPolicy ownership
const char *const BUILT_IN_CONNECTION_RECORD_TOPIC
LocalSubscriptionMap local_subscriptions_
const long DURATION_INFINITE_SEC
Definition: DdsDcpsCore.idl:72
DurabilityQosPolicy durability
virtual DDS::Subscriber_ptr get_subscriber()
const RepoIdSet & local_publications() const
Definition: TopicDetails.h:78
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
DDS::ReturnCode_t create_bit_topics(DomainParticipantImpl *participant)
Definition: Discovery.cpp:51
bool isReader() const
Returns true if the GUID represents a reader entity.
TimeBasedFilterQosPolicy time_based_filter
virtual void writer_does_not_exist(const GUID_t &writerid, const GUID_t &readerid)
sequence< octet > key
static StaticDiscovery_rch instance_
const RepoIdSet & discovered_subscriptions() const
Definition: TopicDetails.h:123
void init(TopicDescriptionImpl *a_topic_desc, const DDS::DataReaderQos &qos, DDS::DataReaderListener_ptr a_listener, const DDS::StatusMask &mask, DomainParticipantImpl *participant, SubscriberImpl *subscriber)
void * memcpy(void *t, const void *s, size_t len)
void match_continue(const GUID_t &writer, const GUID_t &reader)
virtual DDS::Subscriber_ptr create_subscriber(const DDS::SubscriberQos &qos, DDS::SubscriberListener_ptr a_listener, DDS::StatusMask mask)
SubscriptionBuiltinTopicDataDataReaderImpl * sub_bit()
GuidSet RepoIdSet
Definition: GuidUtils.h:113
OwnershipQosPolicy ownership
const char * c_str() const
const RepoIdSet & local_subscriptions() const
Definition: TopicDetails.h:93
void set_instance_state(DDS::InstanceHandle_t instance, DDS::InstanceStateKind state, const SystemTimePoint &timestamp=SystemTimePoint::now(), const GUID_t &guid=GUID_UNKNOWN)
ACE_CDR::Octet kind() const
Definition: TypeObject.h:748
#define OPENDDS_ASSERT(C)
Definition: Definitions.h:72
LivelinessQosPolicy liveliness
virtual const ACE_Configuration_Section_Key & root_section(void) const
DeadlineQosPolicy deadline
void remove_expired_endpoints(const MonotonicTimePoint &)
void add_local_publication(const DCPS::GUID_t &guid)
Definition: TopicDetails.h:68
bool ignore_topic(DDS::DomainId_t domainId, const GUID_t &myParticipantId, const GUID_t &ignoreId)
key GuidPrefix_t guidPrefix
Definition: DdsDcpsGuid.idl:58
virtual void remove_assoc_i(const GUID_t &, const LocalPublication &, const GUID_t &)
DestinationOrderQosPolicy destination_order
reference_wrapper< T > ref(T &r)
Definition: RcHandle_T.h:237
sequence< TransportLocator > TransportLocatorSeq
EntityFactoryQosPolicy entity_factory
bool compatibleQOS(OpenDDS::DCPS::IncompatibleQosStatus *writerStatus, OpenDDS::DCPS::IncompatibleQosStatus *readerStatus, const OpenDDS::DCPS::TransportLocatorSeq &pubTLS, const OpenDDS::DCPS::TransportLocatorSeq &subTLS, DDS::DataWriterQos const *const writerQos, DDS::DataReaderQos const *const readerQos, DDS::PublisherQos const *const pubQos, DDS::SubscriberQos const *const subQos)
Definition: DCPS_Utils.cpp:237
DataRepresentationQosPolicy representation
void remove_publication(const GUID_t &publicationId)
const DCPS::GUID_t & topic_id() const
Definition: TopicDetails.h:136
RcHandle< StaticParticipant > ParticipantHandle
static EntityId_t build_id(const unsigned char *entity_key, const unsigned char entity_kind)
ACE_Guard< ACE_Thread_Mutex > lock_
int parse_topics(ACE_Configuration_Heap &cf)
LocalPublicationMap::iterator LocalPublicationIter
void set_reader_effective_data_rep_qos(DDS::DataRepresentationIdSeq &qos)
Definition: DCPS_Utils.cpp:517
EntityFactoryQosPolicy entity_factory
virtual DDS::ReturnCode_t add_publication_i(const GUID_t &, LocalPublication &)
virtual DDS::ReturnCode_t get_default_datareader_qos(DDS::DataReaderQos &qos)
virtual void assign_publication_key(GUID_t &rid, const GUID_t &topicId, const DDS::DataWriterQos &qos)
void bind_config(const OPENDDS_STRING &name, DDS::Entity_ptr entity)
Conversion processing and value testing utilities for RTPS GUID_t types.
Definition: GuidConverter.h:62
const DDS::StatusMask DEFAULT_STATUS_MASK
GUID_t add_publication(const GUID_t &topicId, DataWriterCallbacks_rch publication, const DDS::DataWriterQos &qos, const TransportLocatorSeq &transInfo, const DDS::PublisherQos &publisherQos, const XTypes::TypeInformation &type_info)
virtual AddDomainStatus add_domain_participant(DDS::DomainId_t domain, const DDS::DomainParticipantQos &qos, XTypes::TypeLookupService_rch tls)
TopicStatus remove_topic(const GUID_t &topicId)
LatencyBudgetQosPolicy latency_budget
void set_writer_effective_data_rep_qos(DDS::DataRepresentationIdSeq &qos, bool cdr_encapsulated)
Definition: DCPS_Utils.cpp:508
DiscoveredSubscriptionMap discovered_subscriptions_
static TimePoint_T< MonotonicClock > now()
Definition: TimePoint_T.inl:41
void type_lookup_init(ReactorInterceptor_rch reactor_interceptor)
const char *const BUILT_IN_PUBLICATION_TOPIC
const octet ENTITYKIND_USER_READER_WITH_KEY
Definition: DdsDcpsGuid.idl:43
#define OPENDDS_STRING
bool ignore_publication(DDS::DomainId_t domainId, const GUID_t &participantId, const GUID_t &ignoreId)
sequence< GUID_t > ReaderIdSeq
int parse_subscriberqos(ACE_Configuration_Heap &cf)
DurabilityServiceQosPolicy durability_service
TypeConsistencyEnforcementQosPolicy type_consistency
virtual DDS::ReturnCode_t get_qos(DDS::DataWriterQos &qos)
DOMAINID_TYPE_NATIVE DomainId_t
DataWriterQosMapType datawriterqos_map
DCPS::TopicStatus assert_topic(GUID_t &topicId, DDS::DomainId_t domainId, const GUID_t &participantId, const char *topicName, const char *dataTypeName, const DDS::TopicQos &qos, bool hasDcpsKey, DCPS::TopicCallbacks *topic_callbacks)
const EntityId_t ENTITYID_PARTICIPANT
Definition: GuidUtils.h:37
const TypeConsistencyEnforcementQosPolicyKind_t ALLOW_TYPE_COERCION
PresentationQosPolicy presentation
const DDS::TopicQos local_qos() const
Definition: TopicDetails.h:135
void fini_bit(DCPS::DomainParticipantImpl *participant)
LM_DEBUG
#define Registered_Data_Types
DiscoveredParticipantMap::iterator DiscoveredParticipantIter
Implements the OpenDDS::DCPS::DataWriterRemote interfaces and DDS::DataWriter interfaces.
static const char * DEFAULT_STATIC
Definition: Discovery.h:87
ReliabilityQosPolicyKind kind
ACE_CDR::ULong ULong
DCPS::TopicStatus remove_topic(DDS::DomainId_t domainId, const GUID_t &participantId, const GUID_t &topicId)
DurabilityQosPolicy durability
const octet ENTITYKIND_OPENDDS_TOPIC
Definition: DdsDcpsGuid.idl:49
PublicationBuiltinTopicDataDataReaderImpl * pub_bit()
const char *const BUILT_IN_PARTICIPANT_LOCATION_TOPIC_TYPE
OwnershipStrengthQosPolicy ownership_strength
virtual bool update_subscription_params(const GUID_t &, const DDS::StringSeq &)
char ACE_TCHAR
void match(const GUID_t &writer, const GUID_t &reader)
void type_lookup_service(const XTypes::TypeLookupService_rch type_lookup_service)
DomainParticipantMap participants_
ACE_CDR::Boolean Boolean
DestinationOrderQosPolicy destination_order
virtual void populate_transport_locator_sequence(TransportLocatorSeq *&, DiscoveredSubscriptionIter &, const GUID_t &)
DataRepresentationQosPolicy representation
RcHandle< StaticEndpointManagerSporadic > type_lookup_reply_deadline_processor_
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
void pre_writer(DataWriterImpl *writer)
void serialize_type_info(const TypeInformation &type_info, T &seq, const DCPS::Encoding *encoding_option=0)
Definition: TypeObject.h:3382
void create_bit_dr(DDS::TopicDescription_ptr topic, const char *type, SubscriberImpl *sub, const DDS::DataReaderQos &qos)
key EntityKey_t entityKey
Definition: DdsDcpsGuid.idl:27
void pre_reader(DataReaderImpl *reader)
int parse_datawriterqos(ACE_Configuration_Heap &cf)
virtual DDS::ReturnCode_t remove_publication_i(const GUID_t &, LocalPublication &)
DataReaderQosMapType datareaderqos_map
bool update_domain_participant_qos(DDS::DomainId_t domain, const GUID_t &participant, const DDS::DomainParticipantQos &qos)
Implements the DDS::DataReader interface.
bool remove_subscription(DDS::DomainId_t domainId, const GUID_t &participantId, const GUID_t &subscriptionId)
const ViewStateKind NEW_VIEW_STATE
void populate_locators(OpenDDS::DCPS::TransportLocatorSeq &trans_info) const
LM_NOTICE
bool ignore_domain_participant(DDS::DomainId_t domain, const GUID_t &myParticipantId, const GUID_t &ignoreId)
void match_endpoints(GUID_t repoId, const TopicDetails &td, bool remove=false)
SubscriberQosMapType subscriberqos_map
void update_publication_locators(DDS::DomainId_t domainId, const GUID_t &partId, const GUID_t &dwId, const DCPS::TransportLocatorSeq &transInfo)
void set_local(const OPENDDS_STRING &data_type_name, const DDS::TopicQos &qos, bool has_dcps_key, TopicCallbacks *topic_callbacks)
Definition: TopicDetails.h:40
virtual AddDomainStatus add_domain_participant_secure(DDS::DomainId_t domain, const DDS::DomainParticipantQos &qos, XTypes::TypeLookupService_rch tls, const GUID_t &guid, DDS::Security::IdentityHandle id, DDS::Security::PermissionsHandle perm, DDS::Security::ParticipantCryptoHandle part_crypto)
bool update_topic_qos(const GUID_t &topicId, DDS::DomainId_t domainId, const GUID_t &participantId, const DDS::TopicQos &qos)
ResourceLimitsQosPolicy resource_limits
bool ignoring(const GUID_t &guid) const
void update_subscription_locators(const GUID_t &subscriptionId, const TransportLocatorSeq &transInfo)
virtual ACE_CString _info(void) const=0
virtual void writer_exists(const GUID_t &writerid, const GUID_t &readerid)
const RepoIdSet & discovered_publications() const
Definition: TopicDetails.h:108
XTypes::TypeLookupService_rch type_lookup_service_
sequence< GUID_t > WriterIdSeq
void update_publication_locators(const GUID_t &publicationId, const TransportLocatorSeq &transInfo)
int parse_endpoints(ACE_Configuration_Heap &cf)
const char *const BUILT_IN_PARTICIPANT_TOPIC
LM_WARNING
sequence< octet > OctetSeq
Definition: DdsDcpsCore.idl:64
const char *const BUILT_IN_PARTICIPANT_LOCATION_TOPIC
ACE_UINT32 ULong
virtual void assign_subscription_key(GUID_t &rid, const GUID_t &topicId, const DDS::DataReaderQos &qos)
virtual bool is_expectant_opendds(const GUID_t &endpoint) const
bool update_subscription_params(DDS::DomainId_t domainId, const GUID_t &partId, const GUID_t &subId, const DDS::StringSeq &params)
bool assignable(const TypeObject &ta, const TypeObject &tb) const
Both input type objects must be minimal.
const char *const name
Definition: debug.cpp:60
ReaderDataLifecycleQosPolicy reader_data_lifecycle
void purge_dead_topic(const String &topic_name)
virtual void add_assoc_i(const GUID_t &, const LocalPublication &, const GUID_t &, const DiscoveredSubscription &)
HANDLE_TYPE_NATIVE InstanceHandle_t
Definition: DdsDcpsCore.idl:51
GUID_t add_subscription(DDS::DomainId_t domainId, const GUID_t &participantId, const GUID_t &topicId, DCPS::DataReaderCallbacks_rch subscription, const DDS::DataReaderQos &qos, const DCPS::TransportLocatorSeq &transInfo, const DDS::SubscriberQos &subscriberQos, const char *filterClassName, const char *filterExpr, const DDS::StringSeq &params, const XTypes::TypeInformation &type_info)
ACE_TEXT("TCP_Factory")
int atoi(const char *s)
HistoryQosPolicy history
const char *const BUILT_IN_PARTICIPANT_TOPIC_TYPE
const unsigned long DURATION_INFINITE_NSEC
Definition: DdsDcpsCore.idl:73
const char *const BUILT_IN_SUBSCRIPTION_TOPIC
void remove_discovered_participant(DiscoveredParticipantIter &iter)
TransportPriorityQosPolicy transport_priority
#define SUBSCRIBER_QOS_DEFAULT
ACE_CDR::UShort TypeFlag
Definition: TypeObject.h:399
virtual DDS::ReturnCode_t write_publication_data(const GUID_t &, LocalPublication &, const GUID_t &reader=GUID_UNKNOWN)
void remove_from_bit(const DiscoveredPublication &pub)
OpenDDS_Dcps_Export bool equal_guid_prefixes(const GuidPrefix_t &lhs, const GuidPrefix_t &rhs)
Definition: GuidUtils.h:132
const char *const BUILT_IN_INTERNAL_THREAD_TOPIC
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
bool attach_participant(DDS::DomainId_t domainId, const GUID_t &participantId)
ReliabilityQosPolicy reliability
const octet ENTITYKIND_USER_WRITER_WITH_KEY
Definition: DdsDcpsGuid.idl:40
void add_local_subscription(const DCPS::GUID_t &guid)
Definition: TopicDetails.h:83
DDS::Subscriber_var bit_subscriber() const
virtual DDS::DomainParticipant_ptr get_participant()
bool repr_to_encoding_kind(DDS::DataRepresentationId_t repr, Encoding::Kind &kind)
Definition: DCPS_Utils.cpp:455
virtual DDS::Publisher_ptr get_publisher()
Implements the DDS::TopicDescription interface.
virtual bool update_topic_qos(const GUID_t &, const DDS::TopicQos &)
const char *const BUILT_IN_TOPIC_TOPIC
virtual int open_section(const ACE_Configuration_Section_Key &base, const ACE_TCHAR *sub_section, bool create, ACE_Configuration_Section_Key &result)
octet GuidPrefix_t[12]
Definition: DdsDcpsGuid.idl:19
bool parse_bool(const XMLCh *in, bool &value)
Definition: XmlUtils.cpp:150
bool ignore_subscription(DDS::DomainId_t domainId, const GUID_t &participantId, const GUID_t &ignoreId)
const ReturnCode_t RETCODE_ERROR
const InstanceStateKind NOT_ALIVE_DISPOSED_INSTANCE_STATE
virtual bool update_publication_qos(const GUID_t &, const DDS::DataWriterQos &, const DDS::PublisherQos &)
bool update_subscription_qos(DDS::DomainId_t domainId, const GUID_t &partId, const GUID_t &drId, const DDS::DataReaderQos &qos, const DDS::SubscriberQos &subQos)
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
LifespanQosPolicy lifespan
void update_subscription_locators(DDS::DomainId_t domainId, const GUID_t &partId, const GUID_t &subId, const DCPS::TransportLocatorSeq &transInfo)
void remove_discovered_publication(const DCPS::GUID_t &guid)
Definition: TopicDetails.h:103
Discovery Strategy interface class.
Definition: Discovery.h:76
virtual DDS::ReturnCode_t enable()
static TransportRegistry * instance()
Return a singleton instance of this class.
WriterDataLifecycleQosPolicy writer_data_lifecycle
int processSections(ACE_Configuration_Heap &cf, const ACE_Configuration_Section_Key &key, KeyList &subsections)
Definition: ConfigUtils.cpp:41
bool remove_publication(DDS::DomainId_t domainId, const GUID_t &participantId, const GUID_t &publicationId)
void remove_assoc(const GUID_t &remove_from, const GUID_t &removing)
bool update_publication_qos(DDS::DomainId_t domainId, const GUID_t &partId, const GUID_t &dwId, const DDS::DataWriterQos &qos, const DDS::PublisherQos &publisherQos)
const TypeFlag IS_APPENDABLE
Definition: TypeObject.h:401
const ReturnCode_t RETCODE_OK
DDS::InstanceHandle_t store_synthetic_data(const MessageType &sample, DDS::ViewStateKind view, const SystemTimePoint &timestamp=SystemTimePoint::now())
void assign(EntityId_t &dest, const EntityId_t &src)
Definition: GuidUtils.h:157
const OPENDDS_STRING local_data_type_name() const
Definition: TopicDetails.h:134
bool remove_domain_participant(DDS::DomainId_t domain_id, const GUID_t &participantId)
#define ACE_ERROR_RETURN(X, Y)
PublisherQosMapType publisherqos_map
virtual DDS::ReturnCode_t write_subscription_data(const GUID_t &, LocalSubscription &, const GUID_t &reader=GUID_UNKNOWN)
LivelinessQosPolicy liveliness
RcHandle< T > lock() const
Definition: RcObject.h:188
virtual DDS::ReturnCode_t get_qos(DDS::DataReaderQos &qos)
TypeConsistencyEnforcementQosPolicyKind_t kind
DataRepresentationIdSeq value
virtual int activate(long flags=THR_NEW_LWP|THR_JOINABLE|THR_INHERIT_SCHED, int n_threads=1, int force_active=0, long priority=ACE_DEFAULT_THREAD_PRIORITY, int grp_id=-1, ACE_Task_Base *task=0, ACE_hthread_t thread_handles[]=0, void *stack[]=0, size_t stack_size[]=0, ACE_thread_t thread_ids[]=0, const char *thr_name[]=0)
const EndpointRegistry & registry_
int pullValues(ACE_Configuration_Heap &cf, const ACE_Configuration_Section_Key &key, ValueMap &values)
Definition: ConfigUtils.cpp:17
GUID_t make_id(const GuidPrefix_t &prefix, const EntityId_t &entity)
Definition: GuidUtils.h:200
UserDataQosPolicy user_data
UserDataQosPolicy user_data
DeadlineQosPolicy deadline
ParticipantHandle get_part(const DDS::DomainId_t domain_id, const GUID_t &part_id) const
const char *const BUILT_IN_INTERNAL_THREAD_TOPIC_TYPE
const GuidVendorId_t VENDORID_OCI
Vendor Id value specified for OCI is used for OpenDDS.
Definition: GuidUtils.h:29
GroupDataQosPolicy group_data
Defines the interface for Discovery callbacks into the Topic.
#define TheServiceParticipant
const TypeKind TK_NONE
Definition: TypeObject.h:213
int load_configuration(ACE_Configuration_Heap &config)
StaticEndpointManager(const GUID_t &participant_id, ACE_Thread_Mutex &lock, const EndpointRegistry &registry, StaticParticipant &participant)
virtual DDS::ReturnCode_t add_subscription_i(const GUID_t &, LocalSubscription &)
OPENDDS_STRING RepoKey
Definition: Discovery.h:80
OpenDDS_Dcps_Export DDS::BuiltinTopicKey_t guid_to_bit_key(const GUID_t &guid)
Definition: GuidUtils.h:243
LM_ERROR
PartitionQosPolicy partition
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
int parse_datareaderqos(ACE_Configuration_Heap &cf)
virtual void reader_does_not_exist(const GUID_t &readerid, const GUID_t &writerid)
virtual DDS::TopicDescription_ptr lookup_topicdescription(const char *name)
void cleanup_type_lookup_data(const GuidPrefix_t &prefix, const XTypes::TypeIdentifier &ti, bool secure)
TypeIdentifierWithDependencies minimal
Definition: TypeObject.h:3373
PresentationQosPolicy presentation
int parse_publisherqos(ACE_Configuration_Heap &cf)
ResourceLimitsQosPolicy resource_limits
RcHandle< BitSubscriber > init_bit(DCPS::DomainParticipantImpl *participant)
void ignore(const GUID_t &to_ignore)
DiscoveredPublicationMap discovered_publications_
typedef OPENDDS_MAP_CMP(GUID_t, DiscoveredSubscription, GUID_tKeyLessThan) DiscoveredSubscriptionMap
key EntityId_t entityId
Definition: DdsDcpsGuid.idl:59
TopicStatus find_topic(const char *topicName, CORBA::String_out dataTypeName, DDS::TopicQos_out qos, GUID_t &topicId)
const char *const BUILT_IN_TOPIC_TOPIC_TYPE
const char *const BUILT_IN_PUBLICATION_TOPIC_TYPE
sequence< string > StringSeq
Definition: DdsDcpsCore.idl:50
bool convertToInteger(const String &s, T &value)
bool has_dcps_key(const GUID_t &topicId) const
LatencyBudgetQosPolicy latency_budget
void remove_subscription(const GUID_t &subscriptionId)
virtual bool update_subscription_qos(const GUID_t &, const DDS::DataReaderQos &, const DDS::SubscriberQos &)