Parser.cpp

Go to the documentation of this file.
00001 #include "Parser.h"
00002 #include "dds/DCPS/Service_Participant.h"
00003 #include "dds/DCPS/StaticDiscovery.h"
00004 #include "dds/DCPS/transport/framework/TransportRegistry.h"
00005 
00006 #include "ace/Configuration_Import_Export.h"
00007 #include "ace/OS_NS_stdio.h"
00008 #include "ace/Log_Priority.h"
00009 #include "ace/Log_Msg.h"
00010 
00011 #include <cstring>
00012 
00013 namespace OpenDDS { namespace FaceTSS { namespace config {
00014 
00015 namespace {
00016   const char* TOPIC_SECTION          = "topic";
00017   const char* CONNECTION_SECTION     = "connection";
00018   const char* DATAWRITER_QOS_SECTION = "datawriterqos";
00019   const char* DATAREADER_QOS_SECTION = "datareaderqos";
00020   const char* PUBLISHER_QOS_SECTION  = "publisherqos";
00021   const char* SUBSCRIBER_QOS_SECTION = "subscriberqos";
00022 
00023   OpenDDS::DCPS::RepoId build_id(const ConnectionSettings& conn)
00024   {
00025     unsigned char participant_key[6];
00026     participant_key[0] = (conn.participant_id_ >> 0) & 0xFF;
00027     participant_key[1] = (conn.participant_id_ >> 8) & 0xFF;
00028     participant_key[2] = (conn.participant_id_ >> 16) & 0xFF;
00029     participant_key[3] = (conn.participant_id_ >> 24) & 0xFF;
00030     participant_key[4] = 0; //(conn.domain_id_ >> 32) & 0xFF;
00031     participant_key[5] = 0; //(conn.domain_id_ >> 40) & 0xFF;
00032 
00033     unsigned char entity_key[3];
00034     entity_key[0] = (conn.connection_id_ >> 0) & 0xFF;
00035     entity_key[1] = (conn.connection_id_ >> 8) & 0xFF;
00036     entity_key[2] = (conn.connection_id_ >> 16) & 0xFF;
00037 
00038     unsigned char entity_kind = 0;
00039     switch (conn.direction_) {
00040     case FACE::SOURCE:
00041       entity_kind = DCPS::ENTITYKIND_USER_WRITER_WITH_KEY;
00042       break;
00043     case FACE::DESTINATION:
00044       entity_kind = DCPS::ENTITYKIND_USER_READER_WITH_KEY;
00045       break;
00046     case FACE::BI_DIRECTIONAL:
00047     case FACE::ONE_WAY_REQUEST_SOURCE:
00048     case FACE::ONE_WAY_REQUEST_DESTINATION:
00049     case FACE::TWO_WAY_REQUEST_SYNCHRONOUS_SOURCE:
00050     case FACE::TWO_WAY_REQUEST_SYNCHRONOUS_DESTINATION:
00051     case FACE::TWO_WAY_REQUEST_REPLY_ASYNCHRONOUS_SOURCE:
00052     case FACE::TWO_WAY_REQUEST_REPLY_ASYNCHRONOUS_DESTINATION:
00053     case FACE::NOT_DEFINED_CONNECTION_DIRECTION_TYPE:
00054       break;
00055     }
00056     return OpenDDS::DCPS::EndpointRegistry::build_id(conn.domain_id_,
00057                                                      participant_key,
00058                                                      OpenDDS::DCPS::EndpointRegistry::build_id(entity_key,
00059                                                                                                entity_kind));
00060   }
00061 
00062 } // anon namespace
00063 
00064 
00065 ConnectionMap Parser::connection_map_;
00066 QosMap Parser::qos_map_;
00067 TopicMap Parser::topic_map_;
00068 
00069 int
00070 Parser::parse(const char* filename)
00071 {
00072   ACE_Configuration_Heap config;
00073   config.open();
00074   ACE_Ini_ImpExp import(config);
00075   int status = import.import_config(filename);
00076   if (status) {
00077     ACE_ERROR((LM_ERROR,
00078                ACE_TEXT("(%P|%t) ERROR: Initialize() ")
00079                ACE_TEXT("import_config () returned %d\n"),
00080                status));
00081     return status;
00082   }
00083 
00084   status = parse_sections(config, DATAWRITER_QOS_SECTION, false) ||
00085     parse_sections(config, DATAREADER_QOS_SECTION, false) ||
00086     parse_sections(config, PUBLISHER_QOS_SECTION, false) ||
00087     parse_sections(config, SUBSCRIBER_QOS_SECTION, false) ||
00088     parse_sections(config, TOPIC_SECTION, true) ||
00089     parse_sections(config, CONNECTION_SECTION, true);
00090 
00091   if (status)
00092     return status;
00093 
00094   status = TheServiceParticipant->load_configuration(config,
00095                                                      filename);
00096 
00097   if (status)
00098     return status;
00099 
00100   for (ConnectionMap::const_iterator pos = connection_map_.begin(), limit = connection_map_.end();
00101        pos != limit;
00102        ++pos) {
00103     const ConnectionSettings& conn = pos->second;
00104     OpenDDS::DCPS::RepoId id = build_id(conn);
00105 
00106     switch (conn.direction_) {
00107     case FACE::SOURCE:
00108       {
00109         OPENDDS_STRING topic_name = conn.topic_name_;
00110 
00111         DDS::DataWriterQos qos(TheServiceParticipant->initial_DataWriterQos());
00112         if (conn.datawriter_qos_set()) {
00113           QosMap::const_iterator p = qos_map_.find(conn.datawriter_qos_name());
00114           if (p != qos_map_.end()) {
00115             p->second.apply_to(qos);
00116           } else {
00117             ACE_ERROR((LM_ERROR,
00118                        ACE_TEXT("(%P|%t) ERROR: Could not find datawriterqos/%s\n"),
00119                        conn.datawriter_qos_name()));
00120             return -1;
00121           }
00122         }
00123 
00124         DDS::PublisherQos publisher_qos(TheServiceParticipant->initial_PublisherQos());
00125         if (conn.publisher_qos_set()) {
00126           QosMap::const_iterator p = qos_map_.find(conn.publisher_qos_name());
00127           if (p != qos_map_.end()) {
00128             p->second.apply_to(publisher_qos);
00129           } else {
00130             ACE_ERROR((LM_ERROR,
00131                        ACE_TEXT("(%P|%t) ERROR: Could not find publisherqos/%s\n"),
00132                        conn.publisher_qos_name()));
00133             return -1;
00134           }
00135         }
00136 
00137         DCPS::TransportLocatorSeq trans_info;
00138 
00139         OpenDDS::DCPS::TransportConfig_rch config;
00140 
00141         if (conn.config_set()) {
00142           config = TheTransportRegistry->get_config(conn.config_name());
00143           if (config.is_nil()) {
00144             ACE_ERROR((LM_ERROR,
00145                        ACE_TEXT("(%P|%t) ERROR: Could not find config/%s\n"),
00146                        conn.config_name()));
00147             return -1;
00148           }
00149         }
00150 
00151         if (config.is_nil()) {
00152           config = TheTransportRegistry->domain_default_config(conn.domain_id_);
00153         }
00154 
00155         if (config.is_nil()) {
00156           config = TheTransportRegistry->global_config();
00157         }
00158 
00159         config->populate_locators(trans_info);
00160 
00161         // Typically, we would ensure that trans_info is not empty.
00162         // However, when using RTPS, trans_info will be empty so don't check.
00163 
00164         // Populate the userdata.
00165         qos.user_data.value.length(3);
00166         qos.user_data.value[0] = (conn.connection_id_ >> 0) & 0xFF;
00167         qos.user_data.value[1] = (conn.connection_id_ >> 8) & 0xFF;
00168         qos.user_data.value[2] = (conn.connection_id_ >> 16) & 0xFF;
00169 
00170         OpenDDS::DCPS::EndpointRegistry::Writer w(topic_name, qos, publisher_qos, conn.config_name(), trans_info);
00171         OpenDDS::DCPS::StaticDiscovery::instance()->registry.writer_map.insert(std::make_pair(id, w));
00172       }
00173       break;
00174     case FACE::DESTINATION:
00175       {
00176         OPENDDS_STRING topic_name = conn.topic_name_;
00177 
00178         DDS::DataReaderQos qos(TheServiceParticipant->initial_DataReaderQos());
00179         if (conn.datareader_qos_set()) {
00180           QosMap::const_iterator p = qos_map_.find(conn.datareader_qos_name());
00181           if (p != qos_map_.end()) {
00182             p->second.apply_to(qos);
00183           } else {
00184             ACE_ERROR((LM_ERROR,
00185                        ACE_TEXT("(%P|%t) ERROR: Could not find datareaderqos/%s\n"),
00186                        conn.datawriter_qos_name()));
00187             return -1;
00188           }
00189         }
00190 
00191         DDS::SubscriberQos subscriber_qos(TheServiceParticipant->initial_SubscriberQos());
00192         if (conn.subscriber_qos_set()) {
00193           QosMap::const_iterator p = qos_map_.find(conn.subscriber_qos_name());
00194           if (p != qos_map_.end()) {
00195             p->second.apply_to(subscriber_qos);
00196           } else {
00197             ACE_ERROR((LM_ERROR,
00198                        ACE_TEXT("(%P|%t) ERROR: Could not find subscriberqos/%s\n"),
00199                        conn.subscriber_qos_name()));
00200             return -1;
00201           }
00202         }
00203 
00204         DCPS::TransportLocatorSeq trans_info;
00205 
00206         OpenDDS::DCPS::TransportConfig_rch config;
00207 
00208         if (conn.config_set()) {
00209           config = TheTransportRegistry->get_config(conn.config_name());
00210           if (config.is_nil()) {
00211             ACE_ERROR((LM_ERROR,
00212                        ACE_TEXT("(%P|%t) ERROR: Could not find transport/%s\n"),
00213                        conn.config_name()));
00214             return -1;
00215           }
00216         }
00217 
00218         if (config.is_nil()) {
00219           config = TheTransportRegistry->domain_default_config(conn.domain_id_);
00220         }
00221 
00222         if (config.is_nil()) {
00223           config = TheTransportRegistry->global_config();
00224         }
00225 
00226         config->populate_locators(trans_info);
00227 
00228         // Typically, we would ensure that trans_info is not empty.
00229         // However, when using RTPS, trans_info will be empty so don't check.
00230 
00231         // Populate the userdata.
00232         qos.user_data.value.length(3);
00233         qos.user_data.value[0] = (conn.connection_id_ >> 0) & 0xFF;
00234         qos.user_data.value[1] = (conn.connection_id_ >> 8) & 0xFF;
00235         qos.user_data.value[2] = (conn.connection_id_ >> 16) & 0xFF;
00236 
00237         OpenDDS::DCPS::EndpointRegistry::Reader r(topic_name, qos, subscriber_qos, conn.config_name(), trans_info);
00238         OpenDDS::DCPS::StaticDiscovery::instance()->registry.reader_map.insert(std::make_pair(id, r));
00239       }
00240       break;
00241     case FACE::BI_DIRECTIONAL:
00242     case FACE::ONE_WAY_REQUEST_SOURCE:
00243     case FACE::ONE_WAY_REQUEST_DESTINATION:
00244     case FACE::TWO_WAY_REQUEST_SYNCHRONOUS_SOURCE:
00245     case FACE::TWO_WAY_REQUEST_SYNCHRONOUS_DESTINATION:
00246     case FACE::TWO_WAY_REQUEST_REPLY_ASYNCHRONOUS_SOURCE:
00247     case FACE::TWO_WAY_REQUEST_REPLY_ASYNCHRONOUS_DESTINATION:
00248     case FACE::NOT_DEFINED_CONNECTION_DIRECTION_TYPE:
00249       break;
00250     }
00251   }
00252 
00253   DCPS::StaticDiscovery::instance()->registry.match();
00254 
00255   return status;
00256 }
00257 
00258 int
00259 Parser::find_connection(const char* name,
00260                         ConnectionSettings& target)
00261 {
00262   int status = 1;
00263   ConnectionMap::iterator result;
00264   if ((result = connection_map_.find(name)) != connection_map_.end()) {
00265     status = 0;
00266     target = result->second;
00267   }
00268   return status;
00269 }
00270 
00271 int
00272 Parser::find_topic(const char* name,
00273                    TopicSettings& target)
00274 {
00275   int status = 1;
00276   TopicMap::iterator result;
00277   if ((result = topic_map_.find(name)) != topic_map_.end()) {
00278     status = 0;
00279     target = result->second;
00280   }
00281   return status;
00282 }
00283 
00284 int
00285 Parser::find_qos(const ConnectionSettings& conn, QosSettings& target)
00286 {
00287   int status = 0;
00288   QosMap::iterator result;
00289   // If thie is a SOURCE
00290   if (conn.direction_ == FACE::SOURCE) {
00291     if (conn.datawriter_qos_set()) {
00292       // Name specified, must be in map
00293       result = qos_map_.find(conn.datawriter_qos_name());
00294       if (result != qos_map_.end()) {
00295         result->second.apply_to(target.datawriter_qos());
00296       } else {
00297         // Failed
00298         status = 1;
00299       }
00300     }
00301     if (status == 0 && (conn.publisher_qos_set())) {
00302       // Name specified, must be in map
00303       result = qos_map_.find(conn.publisher_qos_name());
00304       if (result != qos_map_.end()) {
00305         result->second.apply_to(target.publisher_qos());
00306       } else {
00307         // Failed
00308         status = 1;
00309       }
00310     }
00311   // Else DESTINATION
00312   } else {
00313     if (conn.datareader_qos_set()) {
00314       // Name specified, must be in map
00315       result = qos_map_.find(conn.datareader_qos_name());
00316       if (result != qos_map_.end()) {
00317         result->second.apply_to(target.datareader_qos());
00318       } else {
00319         // Failed
00320         status = 1;
00321       }
00322     }
00323     if (status == 0 && (conn.subscriber_qos_set())) {
00324       // Name specified, must be in map
00325       result = qos_map_.find(conn.subscriber_qos_name());
00326       if (result != qos_map_.end()) {
00327         result->second.apply_to(target.subscriber_qos());
00328       } else {
00329         // Failed
00330         status = 1;
00331       }
00332     }
00333   }
00334 
00335   return status;
00336 }
00337 
00338 int
00339 Parser::parse_topic(ACE_Configuration_Heap& config,
00340                     ACE_Configuration_Section_Key& key,
00341                     const char* topic_name)
00342 {
00343   int status = 0;
00344   int value_index = 0;
00345   ACE_TString value_name, value;
00346   ACE_Configuration::VALUETYPE value_type;
00347 
00348   TopicSettings topic;
00349 
00350   while (!config.enumerate_values(key,
00351                                   value_index++,
00352                                   value_name,
00353                                   value_type)) {
00354     if (value_type == ACE_Configuration::STRING) {
00355       status = config.get_string_value(key, value_name.c_str(), value);
00356       if (!status) {
00357         status = status || topic.set(value_name.c_str(), value.c_str());
00358       }
00359     } else {
00360       ACE_ERROR((LM_ERROR, ACE_TEXT("unexpected value type %d\n"), value_type));
00361       status = -1;
00362       break;
00363     }
00364   }
00365   if (!status) {
00366     topic_map_[topic_name] = topic;
00367   }
00368   return status;
00369 }
00370 
00371 int
00372 Parser::parse_connection(ACE_Configuration_Heap& config,
00373                          ACE_Configuration_Section_Key& key,
00374                          const char* connection_name)
00375 {
00376   int status = 0;
00377   int value_index = 0;
00378   ACE_TString value_name, value;
00379   ACE_Configuration::VALUETYPE value_type;
00380 
00381   ConnectionSettings connection;
00382 
00383   while (!config.enumerate_values(key,
00384                                   value_index++,
00385                                   value_name,
00386                                   value_type)) {
00387     if (value_type == ACE_Configuration::STRING) {
00388       status = config.get_string_value(key, value_name.c_str(), value);
00389       if (!status) {
00390         status = status || connection.set(value_name.c_str(), value.c_str());
00391       }
00392     } else {
00393       ACE_ERROR((LM_ERROR, ACE_TEXT("unexpected value type %d\n"), value_type));
00394       status = -1;
00395       break;
00396     }
00397   }
00398   if (!status) {
00399     connection_map_[connection_name] = connection;
00400   }
00401   return status;
00402 }
00403 
00404 int
00405 Parser::parse_qos(ACE_Configuration_Heap& config,
00406                   ACE_Configuration_Section_Key& key,
00407                   const char* qos_name,
00408                   QosSettings::QosLevel level)
00409 {
00410   int status = 0;
00411   int value_index = 0;
00412   ACE_TString value_name, value;
00413   ACE_Configuration::VALUETYPE value_type;
00414 
00415   // Find existing or create new settings
00416   QosSettings& qos = qos_map_[qos_name];
00417 
00418   while (!config.enumerate_values(key,
00419                                   value_index++,
00420                                   value_name,
00421                                   value_type)) {
00422     if (value_type == ACE_Configuration::STRING) {
00423       status = config.get_string_value(key, value_name.c_str(), value);
00424       if (!status) {
00425         status = status ||
00426                  qos.set_qos(level, value_name.c_str(), value.c_str());
00427       }
00428     } else {
00429       ACE_ERROR((LM_ERROR, ACE_TEXT("unexpected value type %d\n"), value_type));
00430       status = -1;
00431       break;
00432     }
00433   }
00434   return status;
00435 }
00436 
00437 int
00438 Parser::parse_sections(ACE_Configuration_Heap& config,
00439                        const char* section_type,
00440                        bool required)
00441 {
00442   int status = 0;
00443   ACE_Configuration_Section_Key key;
00444   // If we can't open this section
00445   if (config.open_section(config.root_section(),
00446                           section_type,
00447                           0, // don't create if missing
00448                           key) != 0) {
00449     if (required) {
00450       ACE_ERROR((LM_ERROR, ACE_TEXT("Could not open %C section in config file, status %d\n"), section_type, status));
00451       status = -1;
00452     }
00453   // Else, we can open this section
00454   } else {
00455     // Open subsections
00456     int section_index = 0;
00457     ACE_TString section_name;
00458 
00459     while (!config.enumerate_sections(key,
00460                                       section_index++,
00461                                       section_name)) {
00462       ACE_Configuration_Section_Key subkey;
00463       // Open subsection
00464       if (config.open_section(key,
00465                               section_name.c_str(),
00466                               0, // don't create if missing
00467                               subkey) != 0) {
00468         ACE_ERROR((LM_ERROR, ACE_TEXT("Could not open subsections of %C\n"), section_name.c_str()));
00469         break;
00470       }
00471 
00472       if (std::strcmp(section_type, CONNECTION_SECTION) == 0) {
00473         status = parse_connection(config, subkey, section_name.c_str());
00474       } else if (std::strcmp(section_type, TOPIC_SECTION) == 0) {
00475         status = parse_topic(config, subkey, section_name.c_str());
00476       } else if (std::strcmp(section_type, DATAWRITER_QOS_SECTION) == 0) {
00477         status = parse_qos(
00478             config, subkey, section_name.c_str(), QosSettings::datawriter);
00479       } else if (std::strcmp(section_type, DATAREADER_QOS_SECTION) == 0) {
00480         status = parse_qos(
00481             config, subkey, section_name.c_str(), QosSettings::datareader);
00482       } else if (std::strcmp(section_type, PUBLISHER_QOS_SECTION) == 0) {
00483         status = parse_qos(
00484             config, subkey, section_name.c_str(), QosSettings::publisher);
00485       } else if (std::strcmp(section_type, SUBSCRIBER_QOS_SECTION) == 0) {
00486         status = parse_qos(
00487             config, subkey, section_name.c_str(), QosSettings::subscriber);
00488       } else {
00489         ACE_ERROR((LM_ERROR, ACE_TEXT("unknown section %C\n"), section_type));
00490       }
00491     }
00492   }
00493   return status;
00494 }
00495 
00496 } } }

Generated on Fri Feb 12 20:05:24 2016 for OpenDDS by  doxygen 1.4.7