00001 #include "Parser.h"
00002 #include "dds/DCPS/Service_Participant.h"
00003 #include "dds/DCPS/StaticDiscovery.h"
00004 #include "dds/DCPS/transport/framework/TransportRegistry.h"
00005
00006 #include "ace/Configuration_Import_Export.h"
00007 #include "ace/OS_NS_stdio.h"
00008 #include "ace/Log_Priority.h"
00009 #include "ace/Log_Msg.h"
00010
00011 #include <cstring>
00012
00013 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00014
00015 namespace OpenDDS { namespace FaceTSS { namespace config {
00016
00017 namespace {
00018 const char* TOPIC_SECTION = "topic";
00019 const char* CONNECTION_SECTION = "connection";
00020 const char* DATAWRITER_QOS_SECTION = "datawriterqos";
00021 const char* DATAREADER_QOS_SECTION = "datareaderqos";
00022 const char* PUBLISHER_QOS_SECTION = "publisherqos";
00023 const char* SUBSCRIBER_QOS_SECTION = "subscriberqos";
00024
00025 OpenDDS::DCPS::RepoId build_id(const ConnectionSettings& conn)
00026 {
00027 unsigned char participant_key[6];
00028 participant_key[0] = (conn.participant_id_ >> 0) & 0xFF;
00029 participant_key[1] = (conn.participant_id_ >> 8) & 0xFF;
00030 participant_key[2] = (conn.participant_id_ >> 16) & 0xFF;
00031 participant_key[3] = (conn.participant_id_ >> 24) & 0xFF;
00032 participant_key[4] = 0;
00033 participant_key[5] = 0;
00034
00035 unsigned char entity_key[3];
00036 entity_key[0] = (conn.connection_id_ >> 0) & 0xFF;
00037 entity_key[1] = (conn.connection_id_ >> 8) & 0xFF;
00038 entity_key[2] = (conn.connection_id_ >> 16) & 0xFF;
00039
00040 unsigned char entity_kind = 0;
00041 switch (conn.direction_) {
00042 case FACE::SOURCE:
00043 entity_kind = DCPS::ENTITYKIND_USER_WRITER_WITH_KEY;
00044 break;
00045 case FACE::DESTINATION:
00046 entity_kind = DCPS::ENTITYKIND_USER_READER_WITH_KEY;
00047 break;
00048 case FACE::BI_DIRECTIONAL:
00049 case FACE::ONE_WAY_REQUEST_SOURCE:
00050 case FACE::ONE_WAY_REQUEST_DESTINATION:
00051 case FACE::TWO_WAY_REQUEST_SYNCHRONOUS_SOURCE:
00052 case FACE::TWO_WAY_REQUEST_SYNCHRONOUS_DESTINATION:
00053 case FACE::TWO_WAY_REQUEST_REPLY_ASYNCHRONOUS_SOURCE:
00054 case FACE::TWO_WAY_REQUEST_REPLY_ASYNCHRONOUS_DESTINATION:
00055 case FACE::NOT_DEFINED_CONNECTION_DIRECTION_TYPE:
00056 break;
00057 }
00058 return OpenDDS::DCPS::EndpointRegistry::build_id(conn.domain_id_,
00059 participant_key,
00060 OpenDDS::DCPS::EndpointRegistry::build_id(entity_key,
00061 entity_kind));
00062 }
00063
00064 }
00065
00066
00067 ConnectionMap Parser::connection_map_;
00068 QosMap Parser::qos_map_;
00069 TopicMap Parser::topic_map_;
00070
00071 int
00072 Parser::parse(const char* filename)
00073 {
00074 ACE_Configuration_Heap config;
00075 config.open();
00076 ACE_Ini_ImpExp import(config);
00077 int status = import.import_config(filename);
00078 if (status) {
00079 ACE_ERROR((LM_ERROR,
00080 ACE_TEXT("(%P|%t) ERROR: Initialize() ")
00081 ACE_TEXT("import_config () returned %d\n"),
00082 status));
00083 return status;
00084 }
00085
00086 status = parse_sections(config, DATAWRITER_QOS_SECTION, false) ||
00087 parse_sections(config, DATAREADER_QOS_SECTION, false) ||
00088 parse_sections(config, PUBLISHER_QOS_SECTION, false) ||
00089 parse_sections(config, SUBSCRIBER_QOS_SECTION, false) ||
00090 parse_sections(config, TOPIC_SECTION, true) ||
00091 parse_sections(config, CONNECTION_SECTION, true);
00092
00093 if (status)
00094 return status;
00095
00096 status = TheServiceParticipant->load_configuration(config,
00097 filename);
00098
00099 if (status)
00100 return status;
00101
00102 for (ConnectionMap::const_iterator pos = connection_map_.begin(), limit = connection_map_.end();
00103 pos != limit;
00104 ++pos) {
00105 const ConnectionSettings& conn = pos->second;
00106 OpenDDS::DCPS::RepoId id = build_id(conn);
00107
00108 switch (conn.direction_) {
00109 case FACE::SOURCE:
00110 {
00111 OPENDDS_STRING topic_name = conn.topic_name_;
00112
00113 DDS::DataWriterQos qos(TheServiceParticipant->initial_DataWriterQos());
00114 if (conn.datawriter_qos_set()) {
00115 QosMap::const_iterator p = qos_map_.find(conn.datawriter_qos_name());
00116 if (p != qos_map_.end()) {
00117 p->second.apply_to(qos);
00118 } else {
00119 ACE_ERROR((LM_ERROR,
00120 ACE_TEXT("(%P|%t) ERROR: Could not find datawriterqos/%s\n"),
00121 conn.datawriter_qos_name()));
00122 return -1;
00123 }
00124 }
00125
00126 DDS::PublisherQos publisher_qos(TheServiceParticipant->initial_PublisherQos());
00127 if (conn.publisher_qos_set()) {
00128 QosMap::const_iterator p = qos_map_.find(conn.publisher_qos_name());
00129 if (p != qos_map_.end()) {
00130 p->second.apply_to(publisher_qos);
00131 } else {
00132 ACE_ERROR((LM_ERROR,
00133 ACE_TEXT("(%P|%t) ERROR: Could not find publisherqos/%s\n"),
00134 conn.publisher_qos_name()));
00135 return -1;
00136 }
00137 }
00138
00139 DCPS::TransportLocatorSeq trans_info;
00140
00141 OpenDDS::DCPS::TransportConfig_rch config;
00142
00143 if (conn.config_set()) {
00144 config = TheTransportRegistry->get_config(conn.config_name());
00145 if (config.is_nil()) {
00146 ACE_ERROR((LM_ERROR,
00147 ACE_TEXT("(%P|%t) ERROR: Could not find config/%s\n"),
00148 conn.config_name()));
00149 return -1;
00150 }
00151 }
00152
00153 if (config.is_nil()) {
00154 config = TheTransportRegistry->domain_default_config(conn.domain_id_);
00155 }
00156
00157 if (config.is_nil()) {
00158 config = TheTransportRegistry->global_config();
00159 }
00160
00161 config->populate_locators(trans_info);
00162
00163
00164
00165
00166
00167 qos.user_data.value.length(3);
00168 qos.user_data.value[0] = (conn.connection_id_ >> 0) & 0xFF;
00169 qos.user_data.value[1] = (conn.connection_id_ >> 8) & 0xFF;
00170 qos.user_data.value[2] = (conn.connection_id_ >> 16) & 0xFF;
00171
00172 OpenDDS::DCPS::EndpointRegistry::Writer w(topic_name, qos, publisher_qos, conn.config_name(), trans_info);
00173 OpenDDS::DCPS::StaticDiscovery::instance()->registry.writer_map.insert(std::make_pair(id, w));
00174 }
00175 break;
00176 case FACE::DESTINATION:
00177 {
00178 OPENDDS_STRING topic_name = conn.topic_name_;
00179
00180 DDS::DataReaderQos qos(TheServiceParticipant->initial_DataReaderQos());
00181 if (conn.datareader_qos_set()) {
00182 QosMap::const_iterator p = qos_map_.find(conn.datareader_qos_name());
00183 if (p != qos_map_.end()) {
00184 p->second.apply_to(qos);
00185 } else {
00186 ACE_ERROR((LM_ERROR,
00187 ACE_TEXT("(%P|%t) ERROR: Could not find datareaderqos/%s\n"),
00188 conn.datawriter_qos_name()));
00189 return -1;
00190 }
00191 }
00192
00193 DDS::SubscriberQos subscriber_qos(TheServiceParticipant->initial_SubscriberQos());
00194 if (conn.subscriber_qos_set()) {
00195 QosMap::const_iterator p = qos_map_.find(conn.subscriber_qos_name());
00196 if (p != qos_map_.end()) {
00197 p->second.apply_to(subscriber_qos);
00198 } else {
00199 ACE_ERROR((LM_ERROR,
00200 ACE_TEXT("(%P|%t) ERROR: Could not find subscriberqos/%s\n"),
00201 conn.subscriber_qos_name()));
00202 return -1;
00203 }
00204 }
00205
00206 DCPS::TransportLocatorSeq trans_info;
00207
00208 OpenDDS::DCPS::TransportConfig_rch config;
00209
00210 if (conn.config_set()) {
00211 config = TheTransportRegistry->get_config(conn.config_name());
00212 if (config.is_nil()) {
00213 ACE_ERROR((LM_ERROR,
00214 ACE_TEXT("(%P|%t) ERROR: Could not find transport/%s\n"),
00215 conn.config_name()));
00216 return -1;
00217 }
00218 }
00219
00220 if (config.is_nil()) {
00221 config = TheTransportRegistry->domain_default_config(conn.domain_id_);
00222 }
00223
00224 if (config.is_nil()) {
00225 config = TheTransportRegistry->global_config();
00226 }
00227
00228 config->populate_locators(trans_info);
00229
00230
00231
00232
00233
00234 qos.user_data.value.length(3);
00235 qos.user_data.value[0] = (conn.connection_id_ >> 0) & 0xFF;
00236 qos.user_data.value[1] = (conn.connection_id_ >> 8) & 0xFF;
00237 qos.user_data.value[2] = (conn.connection_id_ >> 16) & 0xFF;
00238
00239 OpenDDS::DCPS::EndpointRegistry::Reader r(topic_name, qos, subscriber_qos, conn.config_name(), trans_info);
00240 OpenDDS::DCPS::StaticDiscovery::instance()->registry.reader_map.insert(std::make_pair(id, r));
00241 }
00242 break;
00243 case FACE::BI_DIRECTIONAL:
00244 case FACE::ONE_WAY_REQUEST_SOURCE:
00245 case FACE::ONE_WAY_REQUEST_DESTINATION:
00246 case FACE::TWO_WAY_REQUEST_SYNCHRONOUS_SOURCE:
00247 case FACE::TWO_WAY_REQUEST_SYNCHRONOUS_DESTINATION:
00248 case FACE::TWO_WAY_REQUEST_REPLY_ASYNCHRONOUS_SOURCE:
00249 case FACE::TWO_WAY_REQUEST_REPLY_ASYNCHRONOUS_DESTINATION:
00250 case FACE::NOT_DEFINED_CONNECTION_DIRECTION_TYPE:
00251 break;
00252 }
00253 }
00254
00255 DCPS::StaticDiscovery::instance()->registry.match();
00256
00257 return status;
00258 }
00259
00260 int
00261 Parser::find_connection(const char* name,
00262 ConnectionSettings& target)
00263 {
00264 int status = 1;
00265 ConnectionMap::iterator result;
00266 if ((result = connection_map_.find(name)) != connection_map_.end()) {
00267 status = 0;
00268 target = result->second;
00269 }
00270 return status;
00271 }
00272
00273 int
00274 Parser::find_topic(const char* name,
00275 TopicSettings& target)
00276 {
00277 int status = 1;
00278 TopicMap::iterator result;
00279 if ((result = topic_map_.find(name)) != topic_map_.end()) {
00280 status = 0;
00281 target = result->second;
00282 }
00283 return status;
00284 }
00285
00286 int
00287 Parser::find_qos(const ConnectionSettings& conn, QosSettings& target)
00288 {
00289 int status = 0;
00290 QosMap::iterator result;
00291
00292 if (conn.direction_ == FACE::SOURCE) {
00293 if (conn.datawriter_qos_set()) {
00294
00295 result = qos_map_.find(conn.datawriter_qos_name());
00296 if (result != qos_map_.end()) {
00297 result->second.apply_to(target.datawriter_qos());
00298 } else {
00299
00300 status = 1;
00301 }
00302 }
00303 if (status == 0 && (conn.publisher_qos_set())) {
00304
00305 result = qos_map_.find(conn.publisher_qos_name());
00306 if (result != qos_map_.end()) {
00307 result->second.apply_to(target.publisher_qos());
00308 } else {
00309
00310 status = 1;
00311 }
00312 }
00313
00314 } else {
00315 if (conn.datareader_qos_set()) {
00316
00317 result = qos_map_.find(conn.datareader_qos_name());
00318 if (result != qos_map_.end()) {
00319 result->second.apply_to(target.datareader_qos());
00320 } else {
00321
00322 status = 1;
00323 }
00324 }
00325 if (status == 0 && (conn.subscriber_qos_set())) {
00326
00327 result = qos_map_.find(conn.subscriber_qos_name());
00328 if (result != qos_map_.end()) {
00329 result->second.apply_to(target.subscriber_qos());
00330 } else {
00331
00332 status = 1;
00333 }
00334 }
00335 }
00336
00337 return status;
00338 }
00339
00340 int
00341 Parser::parse_topic(ACE_Configuration_Heap& config,
00342 ACE_Configuration_Section_Key& key,
00343 const char* topic_name)
00344 {
00345 int status = 0;
00346 int value_index = 0;
00347 ACE_TString value_name, value;
00348 ACE_Configuration::VALUETYPE value_type;
00349
00350 TopicSettings topic;
00351
00352 while (!config.enumerate_values(key,
00353 value_index++,
00354 value_name,
00355 value_type)) {
00356 if (value_type == ACE_Configuration::STRING) {
00357 status = config.get_string_value(key, value_name.c_str(), value);
00358 if (!status) {
00359 status = status || topic.set(value_name.c_str(), value.c_str());
00360 }
00361 } else {
00362 ACE_ERROR((LM_ERROR, ACE_TEXT("unexpected value type %d\n"), value_type));
00363 status = -1;
00364 break;
00365 }
00366 }
00367 if (!status) {
00368 topic_map_[topic_name] = topic;
00369 }
00370 return status;
00371 }
00372
00373 int
00374 Parser::parse_connection(ACE_Configuration_Heap& config,
00375 ACE_Configuration_Section_Key& key,
00376 const char* connection_name)
00377 {
00378 int status = 0;
00379 int value_index = 0;
00380 ACE_TString value_name, value;
00381 ACE_Configuration::VALUETYPE value_type;
00382
00383 ConnectionSettings connection;
00384
00385 while (!config.enumerate_values(key,
00386 value_index++,
00387 value_name,
00388 value_type)) {
00389 if (value_type == ACE_Configuration::STRING) {
00390 status = config.get_string_value(key, value_name.c_str(), value);
00391 if (!status) {
00392 status = status || connection.set(value_name.c_str(), value.c_str());
00393 }
00394 } else {
00395 ACE_ERROR((LM_ERROR, ACE_TEXT("unexpected value type %d\n"), value_type));
00396 status = -1;
00397 break;
00398 }
00399 }
00400 if (!status) {
00401 connection_map_[connection_name] = connection;
00402 }
00403 return status;
00404 }
00405
00406 int
00407 Parser::parse_qos(ACE_Configuration_Heap& config,
00408 ACE_Configuration_Section_Key& key,
00409 const char* qos_name,
00410 QosSettings::QosLevel level)
00411 {
00412 int status = 0;
00413 int value_index = 0;
00414 ACE_TString value_name, value;
00415 ACE_Configuration::VALUETYPE value_type;
00416
00417
00418 QosSettings& qos = qos_map_[qos_name];
00419
00420 while (!config.enumerate_values(key,
00421 value_index++,
00422 value_name,
00423 value_type)) {
00424 if (value_type == ACE_Configuration::STRING) {
00425 status = config.get_string_value(key, value_name.c_str(), value);
00426 if (!status) {
00427 status = status ||
00428 qos.set_qos(level, value_name.c_str(), value.c_str());
00429 }
00430 } else {
00431 ACE_ERROR((LM_ERROR, ACE_TEXT("unexpected value type %d\n"), value_type));
00432 status = -1;
00433 break;
00434 }
00435 }
00436 return status;
00437 }
00438
00439 int
00440 Parser::parse_sections(ACE_Configuration_Heap& config,
00441 const char* section_type,
00442 bool required)
00443 {
00444 int status = 0;
00445 ACE_Configuration_Section_Key key;
00446
00447 if (config.open_section(config.root_section(),
00448 section_type,
00449 0,
00450 key) != 0) {
00451 if (required) {
00452 ACE_ERROR((LM_ERROR, ACE_TEXT("Could not open %C section in config file, status %d\n"), section_type, status));
00453 status = -1;
00454 }
00455
00456 } else {
00457
00458 int section_index = 0;
00459 ACE_TString section_name;
00460
00461 while (!config.enumerate_sections(key,
00462 section_index++,
00463 section_name)) {
00464 ACE_Configuration_Section_Key subkey;
00465
00466 if (config.open_section(key,
00467 section_name.c_str(),
00468 0,
00469 subkey) != 0) {
00470 ACE_ERROR((LM_ERROR, ACE_TEXT("Could not open subsections of %C\n"), section_name.c_str()));
00471 break;
00472 }
00473
00474 if (std::strcmp(section_type, CONNECTION_SECTION) == 0) {
00475 status = parse_connection(config, subkey, section_name.c_str());
00476 } else if (std::strcmp(section_type, TOPIC_SECTION) == 0) {
00477 status = parse_topic(config, subkey, section_name.c_str());
00478 } else if (std::strcmp(section_type, DATAWRITER_QOS_SECTION) == 0) {
00479 status = parse_qos(
00480 config, subkey, section_name.c_str(), QosSettings::datawriter);
00481 } else if (std::strcmp(section_type, DATAREADER_QOS_SECTION) == 0) {
00482 status = parse_qos(
00483 config, subkey, section_name.c_str(), QosSettings::datareader);
00484 } else if (std::strcmp(section_type, PUBLISHER_QOS_SECTION) == 0) {
00485 status = parse_qos(
00486 config, subkey, section_name.c_str(), QosSettings::publisher);
00487 } else if (std::strcmp(section_type, SUBSCRIBER_QOS_SECTION) == 0) {
00488 status = parse_qos(
00489 config, subkey, section_name.c_str(), QosSettings::subscriber);
00490 } else {
00491 ACE_ERROR((LM_ERROR, ACE_TEXT("unknown section %C\n"), section_type));
00492 }
00493 }
00494 }
00495 return status;
00496 }
00497
00498 } } }
00499
00500 OPENDDS_END_VERSIONED_NAMESPACE_DECL