OpenDDS  Snapshot(2023/04/28-20:55)
Parser.cpp
Go to the documentation of this file.
1 #include "Parser.h"
5 
7 #include "ace/OS_NS_stdio.h"
8 #include "ace/Log_Priority.h"
9 #include "ace/Log_Msg.h"
10 
11 #include <cstring>
12 
14 
15 namespace OpenDDS { namespace FaceTSS { namespace config {
16 
17 namespace {
18  const char* TOPIC_SECTION = "topic";
19  const char* CONNECTION_SECTION = "connection";
20  const char* DATAWRITER_QOS_SECTION = "datawriterqos";
21  const char* DATAREADER_QOS_SECTION = "datareaderqos";
22  const char* PUBLISHER_QOS_SECTION = "publisherqos";
23  const char* SUBSCRIBER_QOS_SECTION = "subscriberqos";
24 
25  OpenDDS::DCPS::GUID_t build_id(const ConnectionSettings& conn)
26  {
27  unsigned char participant_key[6];
28  participant_key[0] = (conn.participant_id_ >> 0) & 0xFF;
29  participant_key[1] = (conn.participant_id_ >> 8) & 0xFF;
30  participant_key[2] = (conn.participant_id_ >> 16) & 0xFF;
31  participant_key[3] = (conn.participant_id_ >> 24) & 0xFF;
32  participant_key[4] = 0; //(conn.domain_id_ >> 32) & 0xFF;
33  participant_key[5] = 0; //(conn.domain_id_ >> 40) & 0xFF;
34 
35  unsigned char entity_key[3];
36  entity_key[0] = (conn.connection_id_ >> 0) & 0xFF;
37  entity_key[1] = (conn.connection_id_ >> 8) & 0xFF;
38  entity_key[2] = (conn.connection_id_ >> 16) & 0xFF;
39 
40  unsigned char entity_kind = 0;
41  switch (conn.direction_) {
42  case FACE::SOURCE:
44  break;
45  case FACE::DESTINATION:
47  break;
48  case FACE::BI_DIRECTIONAL:
49  case FACE::ONE_WAY_REQUEST_SOURCE:
50  case FACE::ONE_WAY_REQUEST_DESTINATION:
51  case FACE::TWO_WAY_REQUEST_SYNCHRONOUS_SOURCE:
52  case FACE::TWO_WAY_REQUEST_SYNCHRONOUS_DESTINATION:
53  case FACE::TWO_WAY_REQUEST_REPLY_ASYNCHRONOUS_SOURCE:
54  case FACE::TWO_WAY_REQUEST_REPLY_ASYNCHRONOUS_DESTINATION:
55  case FACE::NOT_DEFINED_CONNECTION_DIRECTION_TYPE:
56  break;
57  }
58  return OpenDDS::DCPS::EndpointRegistry::build_id(conn.domain_id_,
59  participant_key,
61  entity_kind));
62  }
63 
64 } // anon namespace
65 
66 
67 ConnectionMap Parser::connection_map_;
68 QosMap Parser::qos_map_;
69 TopicMap Parser::topic_map_;
70 
71 int
72 Parser::parse(const char* filename)
73 {
75  config.open();
76  ACE_Ini_ImpExp import(config);
77  int status = import.import_config(filename);
78  if (status) {
80  ACE_TEXT("(%P|%t) ERROR: Initialize() ")
81  ACE_TEXT("import_config () returned %d\n"),
82  status));
83  return status;
84  }
85 
86  status = parse_sections(config, DATAWRITER_QOS_SECTION, false) ||
87  parse_sections(config, DATAREADER_QOS_SECTION, false) ||
88  parse_sections(config, PUBLISHER_QOS_SECTION, false) ||
89  parse_sections(config, SUBSCRIBER_QOS_SECTION, false) ||
90  parse_sections(config, TOPIC_SECTION, true) ||
91  parse_sections(config, CONNECTION_SECTION, true);
92 
93  if (status)
94  return status;
95 
96  status = TheServiceParticipant->load_configuration(config,
97  filename);
98 
99  if (status)
100  return status;
101 
102  for (ConnectionMap::const_iterator pos = connection_map_.begin(), limit = connection_map_.end();
103  pos != limit;
104  ++pos) {
105  const ConnectionSettings& conn = pos->second;
106  OpenDDS::DCPS::GUID_t id = build_id(conn);
107 
108  switch (conn.direction_) {
109  case FACE::SOURCE:
110  {
111  OPENDDS_STRING topic_name = conn.topic_name_;
112 
113  DDS::DataWriterQos qos(TheServiceParticipant->initial_DataWriterQos());
114  if (conn.datawriter_qos_set()) {
115  QosMap::const_iterator p = qos_map_.find(conn.datawriter_qos_name());
116  if (p != qos_map_.end()) {
117  p->second.apply_to(qos);
118  } else {
120  ACE_TEXT("(%P|%t) ERROR: Could not find datawriterqos/%s\n"),
121  conn.datawriter_qos_name()));
122  return -1;
123  }
124  }
125 
126  DDS::PublisherQos publisher_qos(TheServiceParticipant->initial_PublisherQos());
127  if (conn.publisher_qos_set()) {
128  QosMap::const_iterator p = qos_map_.find(conn.publisher_qos_name());
129  if (p != qos_map_.end()) {
130  p->second.apply_to(publisher_qos);
131  } else {
133  ACE_TEXT("(%P|%t) ERROR: Could not find publisherqos/%s\n"),
134  conn.publisher_qos_name()));
135  return -1;
136  }
137  }
138 
139  DCPS::TransportLocatorSeq trans_info;
140 
142 
143  if (conn.config_set()) {
144  config = TheTransportRegistry->get_config(conn.config_name());
145  if (config.is_nil()) {
147  ACE_TEXT("(%P|%t) ERROR: Could not find config/%s\n"),
148  conn.config_name()));
149  return -1;
150  }
151  }
152 
153  if (config.is_nil()) {
154  config = TheTransportRegistry->domain_default_config(conn.domain_id_);
155  }
156 
157  if (config.is_nil()) {
158  config = TheTransportRegistry->global_config();
159  }
160 
161  config->populate_locators(trans_info);
162 
163  // Typically, we would ensure that trans_info is not empty.
164  // However, when using RTPS, trans_info will be empty so don't check.
165 
166  // Populate the userdata.
167  qos.user_data.value.length(3);
168  qos.user_data.value[0] = (conn.connection_id_ >> 0) & 0xFF;
169  qos.user_data.value[1] = (conn.connection_id_ >> 8) & 0xFF;
170  qos.user_data.value[2] = (conn.connection_id_ >> 16) & 0xFF;
171  bool encapsulated_only = false;
172  for (CORBA::ULong i = 0; i < trans_info.length(); ++i) {
173  if (0 == std::strcmp(trans_info[i].transport_type, "rtps_udp")) {
174  encapsulated_only = true;
175  break;
176  }
177  }
178  DCPS::set_writer_effective_data_rep_qos(qos.representation.value, encapsulated_only);
179 
180  OpenDDS::DCPS::EndpointRegistry::Writer w(topic_name, qos, publisher_qos, conn.config_name(), trans_info);
181  OpenDDS::DCPS::StaticDiscovery::instance()->registry.writer_map.insert(std::make_pair(id, w));
182  }
183  break;
184  case FACE::DESTINATION:
185  {
186  OPENDDS_STRING topic_name = conn.topic_name_;
187 
188  DDS::DataReaderQos qos(TheServiceParticipant->initial_DataReaderQos());
189  if (conn.datareader_qos_set()) {
190  QosMap::const_iterator p = qos_map_.find(conn.datareader_qos_name());
191  if (p != qos_map_.end()) {
192  p->second.apply_to(qos);
193  } else {
195  ACE_TEXT("(%P|%t) ERROR: Could not find datareaderqos/%s\n"),
196  conn.datawriter_qos_name()));
197  return -1;
198  }
199  }
200 
201  DDS::SubscriberQos subscriber_qos(TheServiceParticipant->initial_SubscriberQos());
202  if (conn.subscriber_qos_set()) {
203  QosMap::const_iterator p = qos_map_.find(conn.subscriber_qos_name());
204  if (p != qos_map_.end()) {
205  p->second.apply_to(subscriber_qos);
206  } else {
208  ACE_TEXT("(%P|%t) ERROR: Could not find subscriberqos/%s\n"),
209  conn.subscriber_qos_name()));
210  return -1;
211  }
212  }
213 
214  DCPS::TransportLocatorSeq trans_info;
215 
217 
218  if (conn.config_set()) {
219  config = TheTransportRegistry->get_config(conn.config_name());
220  if (config.is_nil()) {
222  ACE_TEXT("(%P|%t) ERROR: Could not find transport/%s\n"),
223  conn.config_name()));
224  return -1;
225  }
226  }
227 
228  if (config.is_nil()) {
229  config = TheTransportRegistry->domain_default_config(conn.domain_id_);
230  }
231 
232  if (config.is_nil()) {
233  config = TheTransportRegistry->global_config();
234  }
235 
236  config->populate_locators(trans_info);
237 
238  // Typically, we would ensure that trans_info is not empty.
239  // However, when using RTPS, trans_info will be empty so don't check.
240 
241  // Populate the userdata.
242  qos.user_data.value.length(3);
243  qos.user_data.value[0] = (conn.connection_id_ >> 0) & 0xFF;
244  qos.user_data.value[1] = (conn.connection_id_ >> 8) & 0xFF;
245  qos.user_data.value[2] = (conn.connection_id_ >> 16) & 0xFF;
246 
247  DCPS::set_reader_effective_data_rep_qos(qos.representation.value);
248  OpenDDS::DCPS::EndpointRegistry::Reader r(topic_name, qos, subscriber_qos, conn.config_name(), trans_info);
249  OpenDDS::DCPS::StaticDiscovery::instance()->registry.reader_map.insert(std::make_pair(id, r));
250  }
251  break;
252  case FACE::BI_DIRECTIONAL:
253  case FACE::ONE_WAY_REQUEST_SOURCE:
254  case FACE::ONE_WAY_REQUEST_DESTINATION:
255  case FACE::TWO_WAY_REQUEST_SYNCHRONOUS_SOURCE:
256  case FACE::TWO_WAY_REQUEST_SYNCHRONOUS_DESTINATION:
257  case FACE::TWO_WAY_REQUEST_REPLY_ASYNCHRONOUS_SOURCE:
258  case FACE::TWO_WAY_REQUEST_REPLY_ASYNCHRONOUS_DESTINATION:
259  case FACE::NOT_DEFINED_CONNECTION_DIRECTION_TYPE:
260  break;
261  }
262  }
263 
264  DCPS::StaticDiscovery::instance()->registry.match();
265 
266  return status;
267 }
268 
269 int
271  ConnectionSettings& target)
272 {
273  int status = 1;
274  ConnectionMap::iterator result;
275  if ((result = connection_map_.find(name)) != connection_map_.end()) {
276  status = 0;
277  target = result->second;
278  }
279  return status;
280 }
281 
282 int
284  TopicSettings& target)
285 {
286  int status = 1;
287  TopicMap::iterator result;
288  if ((result = topic_map_.find(name)) != topic_map_.end()) {
289  status = 0;
290  target = result->second;
291  }
292  return status;
293 }
294 
295 int
297 {
298  int status = 0;
299  QosMap::iterator result;
300  // If thie is a SOURCE
301  if (conn.direction_ == FACE::SOURCE) {
302  if (conn.datawriter_qos_set()) {
303  // Name specified, must be in map
304  result = qos_map_.find(conn.datawriter_qos_name());
305  if (result != qos_map_.end()) {
306  result->second.apply_to(target.datawriter_qos());
307  } else {
308  // Failed
309  status = 1;
310  }
311  }
312  if (status == 0 && (conn.publisher_qos_set())) {
313  // Name specified, must be in map
314  result = qos_map_.find(conn.publisher_qos_name());
315  if (result != qos_map_.end()) {
316  result->second.apply_to(target.publisher_qos());
317  } else {
318  // Failed
319  status = 1;
320  }
321  }
322  // Else DESTINATION
323  } else {
324  if (conn.datareader_qos_set()) {
325  // Name specified, must be in map
326  result = qos_map_.find(conn.datareader_qos_name());
327  if (result != qos_map_.end()) {
328  result->second.apply_to(target.datareader_qos());
329  } else {
330  // Failed
331  status = 1;
332  }
333  }
334  if (status == 0 && (conn.subscriber_qos_set())) {
335  // Name specified, must be in map
336  result = qos_map_.find(conn.subscriber_qos_name());
337  if (result != qos_map_.end()) {
338  result->second.apply_to(target.subscriber_qos());
339  } else {
340  // Failed
341  status = 1;
342  }
343  }
344  }
345 
346  return status;
347 }
348 
349 int
352  const char* topic_name)
353 {
354  int status = 0;
355  int value_index = 0;
356  ACE_TString value_name, value;
357  ACE_Configuration::VALUETYPE value_type;
358 
359  TopicSettings topic;
360 
361  while (!config.enumerate_values(key,
362  value_index++,
363  value_name,
364  value_type)) {
365  if (value_type == ACE_Configuration::STRING) {
366  status = config.get_string_value(key, value_name.c_str(), value);
367  if (!status) {
368  status = status || topic.set(value_name.c_str(), value.c_str());
369  }
370  } else {
371  ACE_ERROR((LM_ERROR, ACE_TEXT("unexpected value type %d\n"), value_type));
372  status = -1;
373  break;
374  }
375  }
376  if (!status) {
377  topic_map_[topic_name] = topic;
378  }
379  return status;
380 }
381 
382 int
385  const char* connection_name)
386 {
387  int status = 0;
388  int value_index = 0;
389  ACE_TString value_name, value;
390  ACE_Configuration::VALUETYPE value_type;
391 
392  ConnectionSettings connection;
393 
394  while (!config.enumerate_values(key,
395  value_index++,
396  value_name,
397  value_type)) {
398  if (value_type == ACE_Configuration::STRING) {
399  status = config.get_string_value(key, value_name.c_str(), value);
400  if (!status) {
401  status = status || connection.set(value_name.c_str(), value.c_str());
402  }
403  } else {
404  ACE_ERROR((LM_ERROR, ACE_TEXT("unexpected value type %d\n"), value_type));
405  status = -1;
406  break;
407  }
408  }
409  if (!status) {
410  connection_map_[connection_name] = connection;
411  }
412  return status;
413 }
414 
415 int
418  const char* qos_name,
419  QosSettings::QosLevel level)
420 {
421  int status = 0;
422  int value_index = 0;
423  ACE_TString value_name, value;
424  ACE_Configuration::VALUETYPE value_type;
425 
426  // Find existing or create new settings
427  QosSettings& qos = qos_map_[qos_name];
428 
429  while (!config.enumerate_values(key,
430  value_index++,
431  value_name,
432  value_type)) {
433  if (value_type == ACE_Configuration::STRING) {
434  status = config.get_string_value(key, value_name.c_str(), value);
435  if (!status) {
436  status = status ||
437  qos.set_qos(level, value_name.c_str(), value.c_str());
438  }
439  } else {
440  ACE_ERROR((LM_ERROR, ACE_TEXT("unexpected value type %d\n"), value_type));
441  status = -1;
442  break;
443  }
444  }
445  return status;
446 }
447 
448 int
450  const char* section_type,
451  bool required)
452 {
453  int status = 0;
455  // If we can't open this section
456  if (config.open_section(config.root_section(),
457  section_type,
458  0, // don't create if missing
459  key) != 0) {
460  if (required) {
461  ACE_ERROR((LM_ERROR, ACE_TEXT("Could not open %C section in config file, status %d\n"), section_type, status));
462  status = -1;
463  }
464  // Else, we can open this section
465  } else {
466  // Open subsections
467  int section_index = 0;
468  ACE_TString section_name;
469 
470  while (!config.enumerate_sections(key,
471  section_index++,
472  section_name)) {
474  // Open subsection
475  if (config.open_section(key,
476  section_name.c_str(),
477  0, // don't create if missing
478  subkey) != 0) {
479  ACE_ERROR((LM_ERROR, ACE_TEXT("Could not open subsections of %C\n"), section_name.c_str()));
480  break;
481  }
482 
483  if (std::strcmp(section_type, CONNECTION_SECTION) == 0) {
484  status = parse_connection(config, subkey, section_name.c_str());
485  } else if (std::strcmp(section_type, TOPIC_SECTION) == 0) {
486  status = parse_topic(config, subkey, section_name.c_str());
487  } else if (std::strcmp(section_type, DATAWRITER_QOS_SECTION) == 0) {
488  status = parse_qos(
489  config, subkey, section_name.c_str(), QosSettings::datawriter);
490  } else if (std::strcmp(section_type, DATAREADER_QOS_SECTION) == 0) {
491  status = parse_qos(
492  config, subkey, section_name.c_str(), QosSettings::datareader);
493  } else if (std::strcmp(section_type, PUBLISHER_QOS_SECTION) == 0) {
494  status = parse_qos(
495  config, subkey, section_name.c_str(), QosSettings::publisher);
496  } else if (std::strcmp(section_type, SUBSCRIBER_QOS_SECTION) == 0) {
497  status = parse_qos(
498  config, subkey, section_name.c_str(), QosSettings::subscriber);
499  } else {
500  ACE_ERROR((LM_ERROR, ACE_TEXT("unknown section %C\n"), section_type));
501  }
502  }
503  }
504  return status;
505 }
506 
507 } } }
508 
int set(const char *name, const char *value)
#define TheTransportRegistry
DDS::SubscriberQos & subscriber_qos()
Definition: QosSettings.h:32
#define ACE_ERROR(X)
const char * c_str(void) const
const LogLevel::Value value
Definition: debug.cpp:61
virtual int get_string_value(const ACE_Configuration_Section_Key &key, const ACE_TCHAR *name, ACE_TString &value)
sequence< octet > key
DDS::PublisherQos & publisher_qos()
Definition: QosSettings.h:31
virtual int import_config(const ACE_TCHAR *filename)
int find_qos(const ConnectionSettings &conn, QosSettings &target)
Definition: Parser.cpp:296
virtual const ACE_Configuration_Section_Key & root_section(void) const
int set_qos(QosLevel level, const char *name, const char *value)
Definition: QosSettings.cpp:44
sequence< TransportLocator > TransportLocatorSeq
static EntityId_t build_id(const unsigned char *entity_key, const unsigned char entity_kind)
virtual int enumerate_values(const ACE_Configuration_Section_Key &key, int index, ACE_TString &name, VALUETYPE &type)
void set_reader_effective_data_rep_qos(DDS::DataRepresentationIdSeq &qos)
Definition: DCPS_Utils.cpp:517
void set_writer_effective_data_rep_qos(DDS::DataRepresentationIdSeq &qos, bool cdr_encapsulated)
Definition: DCPS_Utils.cpp:508
int parse_topic(ACE_Configuration_Heap &config, ACE_Configuration_Section_Key &key, const char *topic_name)
Definition: Parser.cpp:350
const octet ENTITYKIND_USER_READER_WITH_KEY
Definition: DdsDcpsGuid.idl:43
#define OPENDDS_STRING
ACE_CDR::ULong ULong
FACE::CONNECTION_DIRECTION_TYPE direction_
static ConnectionMap connection_map_
Definition: Parser.h:27
void populate_locators(OpenDDS::DCPS::TransportLocatorSeq &trans_info) const
int parse_connection(ACE_Configuration_Heap &config, ACE_Configuration_Section_Key &key, const char *connection_name)
Definition: Parser.cpp:383
int parse_sections(ACE_Configuration_Heap &config, const char *section_type, bool required)
Definition: Parser.cpp:449
const char *const name
Definition: debug.cpp:60
ACE_TEXT("TCP_Factory")
const octet ENTITYKIND_USER_WRITER_WITH_KEY
Definition: DdsDcpsGuid.idl:40
DDS::DataReaderQos & datareader_qos()
Definition: QosSettings.h:34
virtual int open_section(const ACE_Configuration_Section_Key &base, const ACE_TCHAR *sub_section, bool create, ACE_Configuration_Section_Key &result)
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
int find_connection(const char *name, ConnectionSettings &target)
Definition: Parser.cpp:270
int open(const ACE_TCHAR *file_name, void *base_address=ACE_DEFAULT_BASE_ADDR, size_t default_map_size=ACE_DEFAULT_CONFIG_SECTION_SIZE)
DDS::DataWriterQos & datawriter_qos()
Definition: QosSettings.h:33
virtual int enumerate_sections(const ACE_Configuration_Section_Key &key, int index, ACE_TString &name)
int find_topic(const char *name, TopicSettings &target)
Definition: Parser.cpp:283
int parse(const char *filename)
Definition: Parser.cpp:72
#define TheServiceParticipant
LM_ERROR
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
int set(const char *name, const char *value)
static TopicMap topic_map_
Definition: Parser.h:29
int parse_qos(ACE_Configuration_Heap &config, ACE_Configuration_Section_Key &key, const char *qos_name, QosSettings::QosLevel level)
Definition: Parser.cpp:416
static StaticDiscovery_rch instance()