00001 #include "Parser.h"
00002 #include "dds/DCPS/Service_Participant.h"
00003 #include "dds/DCPS/StaticDiscovery.h"
00004 #include "dds/DCPS/transport/framework/TransportRegistry.h"
00005
00006 #include "ace/Configuration_Import_Export.h"
00007 #include "ace/OS_NS_stdio.h"
00008 #include "ace/Log_Priority.h"
00009 #include "ace/Log_Msg.h"
00010
00011 #include <cstring>
00012
00013 namespace OpenDDS { namespace FaceTSS { namespace config {
00014
00015 namespace {
00016 const char* TOPIC_SECTION = "topic";
00017 const char* CONNECTION_SECTION = "connection";
00018 const char* DATAWRITER_QOS_SECTION = "datawriterqos";
00019 const char* DATAREADER_QOS_SECTION = "datareaderqos";
00020 const char* PUBLISHER_QOS_SECTION = "publisherqos";
00021 const char* SUBSCRIBER_QOS_SECTION = "subscriberqos";
00022
00023 OpenDDS::DCPS::RepoId build_id(const ConnectionSettings& conn)
00024 {
00025 unsigned char participant_key[6];
00026 participant_key[0] = (conn.participant_id_ >> 0) & 0xFF;
00027 participant_key[1] = (conn.participant_id_ >> 8) & 0xFF;
00028 participant_key[2] = (conn.participant_id_ >> 16) & 0xFF;
00029 participant_key[3] = (conn.participant_id_ >> 24) & 0xFF;
00030 participant_key[4] = 0;
00031 participant_key[5] = 0;
00032
00033 unsigned char entity_key[3];
00034 entity_key[0] = (conn.connection_id_ >> 0) & 0xFF;
00035 entity_key[1] = (conn.connection_id_ >> 8) & 0xFF;
00036 entity_key[2] = (conn.connection_id_ >> 16) & 0xFF;
00037
00038 unsigned char entity_kind = 0;
00039 switch (conn.direction_) {
00040 case FACE::SOURCE:
00041 entity_kind = DCPS::ENTITYKIND_USER_WRITER_WITH_KEY;
00042 break;
00043 case FACE::DESTINATION:
00044 entity_kind = DCPS::ENTITYKIND_USER_READER_WITH_KEY;
00045 break;
00046 case FACE::BI_DIRECTIONAL:
00047 case FACE::ONE_WAY_REQUEST_SOURCE:
00048 case FACE::ONE_WAY_REQUEST_DESTINATION:
00049 case FACE::TWO_WAY_REQUEST_SYNCHRONOUS_SOURCE:
00050 case FACE::TWO_WAY_REQUEST_SYNCHRONOUS_DESTINATION:
00051 case FACE::TWO_WAY_REQUEST_REPLY_ASYNCHRONOUS_SOURCE:
00052 case FACE::TWO_WAY_REQUEST_REPLY_ASYNCHRONOUS_DESTINATION:
00053 case FACE::NOT_DEFINED_CONNECTION_DIRECTION_TYPE:
00054 break;
00055 }
00056 return OpenDDS::DCPS::EndpointRegistry::build_id(conn.domain_id_,
00057 participant_key,
00058 OpenDDS::DCPS::EndpointRegistry::build_id(entity_key,
00059 entity_kind));
00060 }
00061
00062 }
00063
00064
00065 ConnectionMap Parser::connection_map_;
00066 QosMap Parser::qos_map_;
00067 TopicMap Parser::topic_map_;
00068
00069 int
00070 Parser::parse(const char* filename)
00071 {
00072 ACE_Configuration_Heap config;
00073 config.open();
00074 ACE_Ini_ImpExp import(config);
00075 int status = import.import_config(filename);
00076 if (status) {
00077 ACE_ERROR((LM_ERROR,
00078 ACE_TEXT("(%P|%t) ERROR: Initialize() ")
00079 ACE_TEXT("import_config () returned %d\n"),
00080 status));
00081 return status;
00082 }
00083
00084 status = parse_sections(config, DATAWRITER_QOS_SECTION, false) ||
00085 parse_sections(config, DATAREADER_QOS_SECTION, false) ||
00086 parse_sections(config, PUBLISHER_QOS_SECTION, false) ||
00087 parse_sections(config, SUBSCRIBER_QOS_SECTION, false) ||
00088 parse_sections(config, TOPIC_SECTION, true) ||
00089 parse_sections(config, CONNECTION_SECTION, true);
00090
00091 if (status)
00092 return status;
00093
00094 status = TheServiceParticipant->load_configuration(config,
00095 filename);
00096
00097 if (status)
00098 return status;
00099
00100 for (ConnectionMap::const_iterator pos = connection_map_.begin(), limit = connection_map_.end();
00101 pos != limit;
00102 ++pos) {
00103 const ConnectionSettings& conn = pos->second;
00104 OpenDDS::DCPS::RepoId id = build_id(conn);
00105
00106 switch (conn.direction_) {
00107 case FACE::SOURCE:
00108 {
00109 OPENDDS_STRING topic_name = conn.topic_name_;
00110
00111 DDS::DataWriterQos qos(TheServiceParticipant->initial_DataWriterQos());
00112 if (conn.datawriter_qos_set()) {
00113 QosMap::const_iterator p = qos_map_.find(conn.datawriter_qos_name());
00114 if (p != qos_map_.end()) {
00115 p->second.apply_to(qos);
00116 } else {
00117 ACE_ERROR((LM_ERROR,
00118 ACE_TEXT("(%P|%t) ERROR: Could not find datawriterqos/%s\n"),
00119 conn.datawriter_qos_name()));
00120 return -1;
00121 }
00122 }
00123
00124 DDS::PublisherQos publisher_qos(TheServiceParticipant->initial_PublisherQos());
00125 if (conn.publisher_qos_set()) {
00126 QosMap::const_iterator p = qos_map_.find(conn.publisher_qos_name());
00127 if (p != qos_map_.end()) {
00128 p->second.apply_to(publisher_qos);
00129 } else {
00130 ACE_ERROR((LM_ERROR,
00131 ACE_TEXT("(%P|%t) ERROR: Could not find publisherqos/%s\n"),
00132 conn.publisher_qos_name()));
00133 return -1;
00134 }
00135 }
00136
00137 DCPS::TransportLocatorSeq trans_info;
00138
00139 OpenDDS::DCPS::TransportConfig_rch config;
00140
00141 if (conn.config_set()) {
00142 config = TheTransportRegistry->get_config(conn.config_name());
00143 if (config.is_nil()) {
00144 ACE_ERROR((LM_ERROR,
00145 ACE_TEXT("(%P|%t) ERROR: Could not find config/%s\n"),
00146 conn.config_name()));
00147 return -1;
00148 }
00149 }
00150
00151 if (config.is_nil()) {
00152 config = TheTransportRegistry->domain_default_config(conn.domain_id_);
00153 }
00154
00155 if (config.is_nil()) {
00156 config = TheTransportRegistry->global_config();
00157 }
00158
00159 config->populate_locators(trans_info);
00160
00161
00162
00163
00164
00165 qos.user_data.value.length(3);
00166 qos.user_data.value[0] = (conn.connection_id_ >> 0) & 0xFF;
00167 qos.user_data.value[1] = (conn.connection_id_ >> 8) & 0xFF;
00168 qos.user_data.value[2] = (conn.connection_id_ >> 16) & 0xFF;
00169
00170 OpenDDS::DCPS::EndpointRegistry::Writer w(topic_name, qos, publisher_qos, conn.config_name(), trans_info);
00171 OpenDDS::DCPS::StaticDiscovery::instance()->registry.writer_map.insert(std::make_pair(id, w));
00172 }
00173 break;
00174 case FACE::DESTINATION:
00175 {
00176 OPENDDS_STRING topic_name = conn.topic_name_;
00177
00178 DDS::DataReaderQos qos(TheServiceParticipant->initial_DataReaderQos());
00179 if (conn.datareader_qos_set()) {
00180 QosMap::const_iterator p = qos_map_.find(conn.datareader_qos_name());
00181 if (p != qos_map_.end()) {
00182 p->second.apply_to(qos);
00183 } else {
00184 ACE_ERROR((LM_ERROR,
00185 ACE_TEXT("(%P|%t) ERROR: Could not find datareaderqos/%s\n"),
00186 conn.datawriter_qos_name()));
00187 return -1;
00188 }
00189 }
00190
00191 DDS::SubscriberQos subscriber_qos(TheServiceParticipant->initial_SubscriberQos());
00192 if (conn.subscriber_qos_set()) {
00193 QosMap::const_iterator p = qos_map_.find(conn.subscriber_qos_name());
00194 if (p != qos_map_.end()) {
00195 p->second.apply_to(subscriber_qos);
00196 } else {
00197 ACE_ERROR((LM_ERROR,
00198 ACE_TEXT("(%P|%t) ERROR: Could not find subscriberqos/%s\n"),
00199 conn.subscriber_qos_name()));
00200 return -1;
00201 }
00202 }
00203
00204 DCPS::TransportLocatorSeq trans_info;
00205
00206 OpenDDS::DCPS::TransportConfig_rch config;
00207
00208 if (conn.config_set()) {
00209 config = TheTransportRegistry->get_config(conn.config_name());
00210 if (config.is_nil()) {
00211 ACE_ERROR((LM_ERROR,
00212 ACE_TEXT("(%P|%t) ERROR: Could not find transport/%s\n"),
00213 conn.config_name()));
00214 return -1;
00215 }
00216 }
00217
00218 if (config.is_nil()) {
00219 config = TheTransportRegistry->domain_default_config(conn.domain_id_);
00220 }
00221
00222 if (config.is_nil()) {
00223 config = TheTransportRegistry->global_config();
00224 }
00225
00226 config->populate_locators(trans_info);
00227
00228
00229
00230
00231
00232 qos.user_data.value.length(3);
00233 qos.user_data.value[0] = (conn.connection_id_ >> 0) & 0xFF;
00234 qos.user_data.value[1] = (conn.connection_id_ >> 8) & 0xFF;
00235 qos.user_data.value[2] = (conn.connection_id_ >> 16) & 0xFF;
00236
00237 OpenDDS::DCPS::EndpointRegistry::Reader r(topic_name, qos, subscriber_qos, conn.config_name(), trans_info);
00238 OpenDDS::DCPS::StaticDiscovery::instance()->registry.reader_map.insert(std::make_pair(id, r));
00239 }
00240 break;
00241 case FACE::BI_DIRECTIONAL:
00242 case FACE::ONE_WAY_REQUEST_SOURCE:
00243 case FACE::ONE_WAY_REQUEST_DESTINATION:
00244 case FACE::TWO_WAY_REQUEST_SYNCHRONOUS_SOURCE:
00245 case FACE::TWO_WAY_REQUEST_SYNCHRONOUS_DESTINATION:
00246 case FACE::TWO_WAY_REQUEST_REPLY_ASYNCHRONOUS_SOURCE:
00247 case FACE::TWO_WAY_REQUEST_REPLY_ASYNCHRONOUS_DESTINATION:
00248 case FACE::NOT_DEFINED_CONNECTION_DIRECTION_TYPE:
00249 break;
00250 }
00251 }
00252
00253 DCPS::StaticDiscovery::instance()->registry.match();
00254
00255 return status;
00256 }
00257
00258 int
00259 Parser::find_connection(const char* name,
00260 ConnectionSettings& target)
00261 {
00262 int status = 1;
00263 ConnectionMap::iterator result;
00264 if ((result = connection_map_.find(name)) != connection_map_.end()) {
00265 status = 0;
00266 target = result->second;
00267 }
00268 return status;
00269 }
00270
00271 int
00272 Parser::find_topic(const char* name,
00273 TopicSettings& target)
00274 {
00275 int status = 1;
00276 TopicMap::iterator result;
00277 if ((result = topic_map_.find(name)) != topic_map_.end()) {
00278 status = 0;
00279 target = result->second;
00280 }
00281 return status;
00282 }
00283
00284 int
00285 Parser::find_qos(const ConnectionSettings& conn, QosSettings& target)
00286 {
00287 int status = 0;
00288 QosMap::iterator result;
00289
00290 if (conn.direction_ == FACE::SOURCE) {
00291 if (conn.datawriter_qos_set()) {
00292
00293 result = qos_map_.find(conn.datawriter_qos_name());
00294 if (result != qos_map_.end()) {
00295 result->second.apply_to(target.datawriter_qos());
00296 } else {
00297
00298 status = 1;
00299 }
00300 }
00301 if (status == 0 && (conn.publisher_qos_set())) {
00302
00303 result = qos_map_.find(conn.publisher_qos_name());
00304 if (result != qos_map_.end()) {
00305 result->second.apply_to(target.publisher_qos());
00306 } else {
00307
00308 status = 1;
00309 }
00310 }
00311
00312 } else {
00313 if (conn.datareader_qos_set()) {
00314
00315 result = qos_map_.find(conn.datareader_qos_name());
00316 if (result != qos_map_.end()) {
00317 result->second.apply_to(target.datareader_qos());
00318 } else {
00319
00320 status = 1;
00321 }
00322 }
00323 if (status == 0 && (conn.subscriber_qos_set())) {
00324
00325 result = qos_map_.find(conn.subscriber_qos_name());
00326 if (result != qos_map_.end()) {
00327 result->second.apply_to(target.subscriber_qos());
00328 } else {
00329
00330 status = 1;
00331 }
00332 }
00333 }
00334
00335 return status;
00336 }
00337
00338 int
00339 Parser::parse_topic(ACE_Configuration_Heap& config,
00340 ACE_Configuration_Section_Key& key,
00341 const char* topic_name)
00342 {
00343 int status = 0;
00344 int value_index = 0;
00345 ACE_TString value_name, value;
00346 ACE_Configuration::VALUETYPE value_type;
00347
00348 TopicSettings topic;
00349
00350 while (!config.enumerate_values(key,
00351 value_index++,
00352 value_name,
00353 value_type)) {
00354 if (value_type == ACE_Configuration::STRING) {
00355 status = config.get_string_value(key, value_name.c_str(), value);
00356 if (!status) {
00357 status = status || topic.set(value_name.c_str(), value.c_str());
00358 }
00359 } else {
00360 ACE_ERROR((LM_ERROR, ACE_TEXT("unexpected value type %d\n"), value_type));
00361 status = -1;
00362 break;
00363 }
00364 }
00365 if (!status) {
00366 topic_map_[topic_name] = topic;
00367 }
00368 return status;
00369 }
00370
00371 int
00372 Parser::parse_connection(ACE_Configuration_Heap& config,
00373 ACE_Configuration_Section_Key& key,
00374 const char* connection_name)
00375 {
00376 int status = 0;
00377 int value_index = 0;
00378 ACE_TString value_name, value;
00379 ACE_Configuration::VALUETYPE value_type;
00380
00381 ConnectionSettings connection;
00382
00383 while (!config.enumerate_values(key,
00384 value_index++,
00385 value_name,
00386 value_type)) {
00387 if (value_type == ACE_Configuration::STRING) {
00388 status = config.get_string_value(key, value_name.c_str(), value);
00389 if (!status) {
00390 status = status || connection.set(value_name.c_str(), value.c_str());
00391 }
00392 } else {
00393 ACE_ERROR((LM_ERROR, ACE_TEXT("unexpected value type %d\n"), value_type));
00394 status = -1;
00395 break;
00396 }
00397 }
00398 if (!status) {
00399 connection_map_[connection_name] = connection;
00400 }
00401 return status;
00402 }
00403
00404 int
00405 Parser::parse_qos(ACE_Configuration_Heap& config,
00406 ACE_Configuration_Section_Key& key,
00407 const char* qos_name,
00408 QosSettings::QosLevel level)
00409 {
00410 int status = 0;
00411 int value_index = 0;
00412 ACE_TString value_name, value;
00413 ACE_Configuration::VALUETYPE value_type;
00414
00415
00416 QosSettings& qos = qos_map_[qos_name];
00417
00418 while (!config.enumerate_values(key,
00419 value_index++,
00420 value_name,
00421 value_type)) {
00422 if (value_type == ACE_Configuration::STRING) {
00423 status = config.get_string_value(key, value_name.c_str(), value);
00424 if (!status) {
00425 status = status ||
00426 qos.set_qos(level, value_name.c_str(), value.c_str());
00427 }
00428 } else {
00429 ACE_ERROR((LM_ERROR, ACE_TEXT("unexpected value type %d\n"), value_type));
00430 status = -1;
00431 break;
00432 }
00433 }
00434 return status;
00435 }
00436
00437 int
00438 Parser::parse_sections(ACE_Configuration_Heap& config,
00439 const char* section_type,
00440 bool required)
00441 {
00442 int status = 0;
00443 ACE_Configuration_Section_Key key;
00444
00445 if (config.open_section(config.root_section(),
00446 section_type,
00447 0,
00448 key) != 0) {
00449 if (required) {
00450 ACE_ERROR((LM_ERROR, ACE_TEXT("Could not open %C section in config file, status %d\n"), section_type, status));
00451 status = -1;
00452 }
00453
00454 } else {
00455
00456 int section_index = 0;
00457 ACE_TString section_name;
00458
00459 while (!config.enumerate_sections(key,
00460 section_index++,
00461 section_name)) {
00462 ACE_Configuration_Section_Key subkey;
00463
00464 if (config.open_section(key,
00465 section_name.c_str(),
00466 0,
00467 subkey) != 0) {
00468 ACE_ERROR((LM_ERROR, ACE_TEXT("Could not open subsections of %C\n"), section_name.c_str()));
00469 break;
00470 }
00471
00472 if (std::strcmp(section_type, CONNECTION_SECTION) == 0) {
00473 status = parse_connection(config, subkey, section_name.c_str());
00474 } else if (std::strcmp(section_type, TOPIC_SECTION) == 0) {
00475 status = parse_topic(config, subkey, section_name.c_str());
00476 } else if (std::strcmp(section_type, DATAWRITER_QOS_SECTION) == 0) {
00477 status = parse_qos(
00478 config, subkey, section_name.c_str(), QosSettings::datawriter);
00479 } else if (std::strcmp(section_type, DATAREADER_QOS_SECTION) == 0) {
00480 status = parse_qos(
00481 config, subkey, section_name.c_str(), QosSettings::datareader);
00482 } else if (std::strcmp(section_type, PUBLISHER_QOS_SECTION) == 0) {
00483 status = parse_qos(
00484 config, subkey, section_name.c_str(), QosSettings::publisher);
00485 } else if (std::strcmp(section_type, SUBSCRIBER_QOS_SECTION) == 0) {
00486 status = parse_qos(
00487 config, subkey, section_name.c_str(), QosSettings::subscriber);
00488 } else {
00489 ACE_ERROR((LM_ERROR, ACE_TEXT("unknown section %C\n"), section_type));
00490 }
00491 }
00492 }
00493 return status;
00494 }
00495
00496 } } }