Parser.cpp

Go to the documentation of this file.
00001 #include "Parser.h"
00002 #include "dds/DCPS/Service_Participant.h"
00003 #include "dds/DCPS/StaticDiscovery.h"
00004 #include "dds/DCPS/transport/framework/TransportRegistry.h"
00005 
00006 #include "ace/Configuration_Import_Export.h"
00007 #include "ace/OS_NS_stdio.h"
00008 #include "ace/Log_Priority.h"
00009 #include "ace/Log_Msg.h"
00010 
00011 #include <cstring>
00012 
00013 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00014 
00015 namespace OpenDDS { namespace FaceTSS { namespace config {
00016 
00017 namespace {
00018   const char* TOPIC_SECTION          = "topic";
00019   const char* CONNECTION_SECTION     = "connection";
00020   const char* DATAWRITER_QOS_SECTION = "datawriterqos";
00021   const char* DATAREADER_QOS_SECTION = "datareaderqos";
00022   const char* PUBLISHER_QOS_SECTION  = "publisherqos";
00023   const char* SUBSCRIBER_QOS_SECTION = "subscriberqos";
00024 
00025   OpenDDS::DCPS::RepoId build_id(const ConnectionSettings& conn)
00026   {
00027     unsigned char participant_key[6];
00028     participant_key[0] = (conn.participant_id_ >> 0) & 0xFF;
00029     participant_key[1] = (conn.participant_id_ >> 8) & 0xFF;
00030     participant_key[2] = (conn.participant_id_ >> 16) & 0xFF;
00031     participant_key[3] = (conn.participant_id_ >> 24) & 0xFF;
00032     participant_key[4] = 0; //(conn.domain_id_ >> 32) & 0xFF;
00033     participant_key[5] = 0; //(conn.domain_id_ >> 40) & 0xFF;
00034 
00035     unsigned char entity_key[3];
00036     entity_key[0] = (conn.connection_id_ >> 0) & 0xFF;
00037     entity_key[1] = (conn.connection_id_ >> 8) & 0xFF;
00038     entity_key[2] = (conn.connection_id_ >> 16) & 0xFF;
00039 
00040     unsigned char entity_kind = 0;
00041     switch (conn.direction_) {
00042     case FACE::SOURCE:
00043       entity_kind = DCPS::ENTITYKIND_USER_WRITER_WITH_KEY;
00044       break;
00045     case FACE::DESTINATION:
00046       entity_kind = DCPS::ENTITYKIND_USER_READER_WITH_KEY;
00047       break;
00048     case FACE::BI_DIRECTIONAL:
00049     case FACE::ONE_WAY_REQUEST_SOURCE:
00050     case FACE::ONE_WAY_REQUEST_DESTINATION:
00051     case FACE::TWO_WAY_REQUEST_SYNCHRONOUS_SOURCE:
00052     case FACE::TWO_WAY_REQUEST_SYNCHRONOUS_DESTINATION:
00053     case FACE::TWO_WAY_REQUEST_REPLY_ASYNCHRONOUS_SOURCE:
00054     case FACE::TWO_WAY_REQUEST_REPLY_ASYNCHRONOUS_DESTINATION:
00055     case FACE::NOT_DEFINED_CONNECTION_DIRECTION_TYPE:
00056       break;
00057     }
00058     return OpenDDS::DCPS::EndpointRegistry::build_id(conn.domain_id_,
00059                                                      participant_key,
00060                                                      OpenDDS::DCPS::EndpointRegistry::build_id(entity_key,
00061                                                                                                entity_kind));
00062   }
00063 
00064 } // anon namespace
00065 
00066 
00067 ConnectionMap Parser::connection_map_;
00068 QosMap Parser::qos_map_;
00069 TopicMap Parser::topic_map_;
00070 
00071 int
00072 Parser::parse(const char* filename)
00073 {
00074   ACE_Configuration_Heap config;
00075   config.open();
00076   ACE_Ini_ImpExp import(config);
00077   int status = import.import_config(filename);
00078   if (status) {
00079     ACE_ERROR((LM_ERROR,
00080                ACE_TEXT("(%P|%t) ERROR: Initialize() ")
00081                ACE_TEXT("import_config () returned %d\n"),
00082                status));
00083     return status;
00084   }
00085 
00086   status = parse_sections(config, DATAWRITER_QOS_SECTION, false) ||
00087     parse_sections(config, DATAREADER_QOS_SECTION, false) ||
00088     parse_sections(config, PUBLISHER_QOS_SECTION, false) ||
00089     parse_sections(config, SUBSCRIBER_QOS_SECTION, false) ||
00090     parse_sections(config, TOPIC_SECTION, true) ||
00091     parse_sections(config, CONNECTION_SECTION, true);
00092 
00093   if (status)
00094     return status;
00095 
00096   status = TheServiceParticipant->load_configuration(config,
00097                                                      filename);
00098 
00099   if (status)
00100     return status;
00101 
00102   for (ConnectionMap::const_iterator pos = connection_map_.begin(), limit = connection_map_.end();
00103        pos != limit;
00104        ++pos) {
00105     const ConnectionSettings& conn = pos->second;
00106     OpenDDS::DCPS::RepoId id = build_id(conn);
00107 
00108     switch (conn.direction_) {
00109     case FACE::SOURCE:
00110       {
00111         OPENDDS_STRING topic_name = conn.topic_name_;
00112 
00113         DDS::DataWriterQos qos(TheServiceParticipant->initial_DataWriterQos());
00114         if (conn.datawriter_qos_set()) {
00115           QosMap::const_iterator p = qos_map_.find(conn.datawriter_qos_name());
00116           if (p != qos_map_.end()) {
00117             p->second.apply_to(qos);
00118           } else {
00119             ACE_ERROR((LM_ERROR,
00120                        ACE_TEXT("(%P|%t) ERROR: Could not find datawriterqos/%s\n"),
00121                        conn.datawriter_qos_name()));
00122             return -1;
00123           }
00124         }
00125 
00126         DDS::PublisherQos publisher_qos(TheServiceParticipant->initial_PublisherQos());
00127         if (conn.publisher_qos_set()) {
00128           QosMap::const_iterator p = qos_map_.find(conn.publisher_qos_name());
00129           if (p != qos_map_.end()) {
00130             p->second.apply_to(publisher_qos);
00131           } else {
00132             ACE_ERROR((LM_ERROR,
00133                        ACE_TEXT("(%P|%t) ERROR: Could not find publisherqos/%s\n"),
00134                        conn.publisher_qos_name()));
00135             return -1;
00136           }
00137         }
00138 
00139         DCPS::TransportLocatorSeq trans_info;
00140 
00141         OpenDDS::DCPS::TransportConfig_rch config;
00142 
00143         if (conn.config_set()) {
00144           config = TheTransportRegistry->get_config(conn.config_name());
00145           if (config.is_nil()) {
00146             ACE_ERROR((LM_ERROR,
00147                        ACE_TEXT("(%P|%t) ERROR: Could not find config/%s\n"),
00148                        conn.config_name()));
00149             return -1;
00150           }
00151         }
00152 
00153         if (config.is_nil()) {
00154           config = TheTransportRegistry->domain_default_config(conn.domain_id_);
00155         }
00156 
00157         if (config.is_nil()) {
00158           config = TheTransportRegistry->global_config();
00159         }
00160 
00161         config->populate_locators(trans_info);
00162 
00163         // Typically, we would ensure that trans_info is not empty.
00164         // However, when using RTPS, trans_info will be empty so don't check.
00165 
00166         // Populate the userdata.
00167         qos.user_data.value.length(3);
00168         qos.user_data.value[0] = (conn.connection_id_ >> 0) & 0xFF;
00169         qos.user_data.value[1] = (conn.connection_id_ >> 8) & 0xFF;
00170         qos.user_data.value[2] = (conn.connection_id_ >> 16) & 0xFF;
00171 
00172         OpenDDS::DCPS::EndpointRegistry::Writer w(topic_name, qos, publisher_qos, conn.config_name(), trans_info);
00173         OpenDDS::DCPS::StaticDiscovery::instance()->registry.writer_map.insert(std::make_pair(id, w));
00174       }
00175       break;
00176     case FACE::DESTINATION:
00177       {
00178         OPENDDS_STRING topic_name = conn.topic_name_;
00179 
00180         DDS::DataReaderQos qos(TheServiceParticipant->initial_DataReaderQos());
00181         if (conn.datareader_qos_set()) {
00182           QosMap::const_iterator p = qos_map_.find(conn.datareader_qos_name());
00183           if (p != qos_map_.end()) {
00184             p->second.apply_to(qos);
00185           } else {
00186             ACE_ERROR((LM_ERROR,
00187                        ACE_TEXT("(%P|%t) ERROR: Could not find datareaderqos/%s\n"),
00188                        conn.datawriter_qos_name()));
00189             return -1;
00190           }
00191         }
00192 
00193         DDS::SubscriberQos subscriber_qos(TheServiceParticipant->initial_SubscriberQos());
00194         if (conn.subscriber_qos_set()) {
00195           QosMap::const_iterator p = qos_map_.find(conn.subscriber_qos_name());
00196           if (p != qos_map_.end()) {
00197             p->second.apply_to(subscriber_qos);
00198           } else {
00199             ACE_ERROR((LM_ERROR,
00200                        ACE_TEXT("(%P|%t) ERROR: Could not find subscriberqos/%s\n"),
00201                        conn.subscriber_qos_name()));
00202             return -1;
00203           }
00204         }
00205 
00206         DCPS::TransportLocatorSeq trans_info;
00207 
00208         OpenDDS::DCPS::TransportConfig_rch config;
00209 
00210         if (conn.config_set()) {
00211           config = TheTransportRegistry->get_config(conn.config_name());
00212           if (config.is_nil()) {
00213             ACE_ERROR((LM_ERROR,
00214                        ACE_TEXT("(%P|%t) ERROR: Could not find transport/%s\n"),
00215                        conn.config_name()));
00216             return -1;
00217           }
00218         }
00219 
00220         if (config.is_nil()) {
00221           config = TheTransportRegistry->domain_default_config(conn.domain_id_);
00222         }
00223 
00224         if (config.is_nil()) {
00225           config = TheTransportRegistry->global_config();
00226         }
00227 
00228         config->populate_locators(trans_info);
00229 
00230         // Typically, we would ensure that trans_info is not empty.
00231         // However, when using RTPS, trans_info will be empty so don't check.
00232 
00233         // Populate the userdata.
00234         qos.user_data.value.length(3);
00235         qos.user_data.value[0] = (conn.connection_id_ >> 0) & 0xFF;
00236         qos.user_data.value[1] = (conn.connection_id_ >> 8) & 0xFF;
00237         qos.user_data.value[2] = (conn.connection_id_ >> 16) & 0xFF;
00238 
00239         OpenDDS::DCPS::EndpointRegistry::Reader r(topic_name, qos, subscriber_qos, conn.config_name(), trans_info);
00240         OpenDDS::DCPS::StaticDiscovery::instance()->registry.reader_map.insert(std::make_pair(id, r));
00241       }
00242       break;
00243     case FACE::BI_DIRECTIONAL:
00244     case FACE::ONE_WAY_REQUEST_SOURCE:
00245     case FACE::ONE_WAY_REQUEST_DESTINATION:
00246     case FACE::TWO_WAY_REQUEST_SYNCHRONOUS_SOURCE:
00247     case FACE::TWO_WAY_REQUEST_SYNCHRONOUS_DESTINATION:
00248     case FACE::TWO_WAY_REQUEST_REPLY_ASYNCHRONOUS_SOURCE:
00249     case FACE::TWO_WAY_REQUEST_REPLY_ASYNCHRONOUS_DESTINATION:
00250     case FACE::NOT_DEFINED_CONNECTION_DIRECTION_TYPE:
00251       break;
00252     }
00253   }
00254 
00255   DCPS::StaticDiscovery::instance()->registry.match();
00256 
00257   return status;
00258 }
00259 
00260 int
00261 Parser::find_connection(const char* name,
00262                         ConnectionSettings& target)
00263 {
00264   int status = 1;
00265   ConnectionMap::iterator result;
00266   if ((result = connection_map_.find(name)) != connection_map_.end()) {
00267     status = 0;
00268     target = result->second;
00269   }
00270   return status;
00271 }
00272 
00273 int
00274 Parser::find_topic(const char* name,
00275                    TopicSettings& target)
00276 {
00277   int status = 1;
00278   TopicMap::iterator result;
00279   if ((result = topic_map_.find(name)) != topic_map_.end()) {
00280     status = 0;
00281     target = result->second;
00282   }
00283   return status;
00284 }
00285 
00286 int
00287 Parser::find_qos(const ConnectionSettings& conn, QosSettings& target)
00288 {
00289   int status = 0;
00290   QosMap::iterator result;
00291   // If thie is a SOURCE
00292   if (conn.direction_ == FACE::SOURCE) {
00293     if (conn.datawriter_qos_set()) {
00294       // Name specified, must be in map
00295       result = qos_map_.find(conn.datawriter_qos_name());
00296       if (result != qos_map_.end()) {
00297         result->second.apply_to(target.datawriter_qos());
00298       } else {
00299         // Failed
00300         status = 1;
00301       }
00302     }
00303     if (status == 0 && (conn.publisher_qos_set())) {
00304       // Name specified, must be in map
00305       result = qos_map_.find(conn.publisher_qos_name());
00306       if (result != qos_map_.end()) {
00307         result->second.apply_to(target.publisher_qos());
00308       } else {
00309         // Failed
00310         status = 1;
00311       }
00312     }
00313   // Else DESTINATION
00314   } else {
00315     if (conn.datareader_qos_set()) {
00316       // Name specified, must be in map
00317       result = qos_map_.find(conn.datareader_qos_name());
00318       if (result != qos_map_.end()) {
00319         result->second.apply_to(target.datareader_qos());
00320       } else {
00321         // Failed
00322         status = 1;
00323       }
00324     }
00325     if (status == 0 && (conn.subscriber_qos_set())) {
00326       // Name specified, must be in map
00327       result = qos_map_.find(conn.subscriber_qos_name());
00328       if (result != qos_map_.end()) {
00329         result->second.apply_to(target.subscriber_qos());
00330       } else {
00331         // Failed
00332         status = 1;
00333       }
00334     }
00335   }
00336 
00337   return status;
00338 }
00339 
00340 int
00341 Parser::parse_topic(ACE_Configuration_Heap& config,
00342                     ACE_Configuration_Section_Key& key,
00343                     const char* topic_name)
00344 {
00345   int status = 0;
00346   int value_index = 0;
00347   ACE_TString value_name, value;
00348   ACE_Configuration::VALUETYPE value_type;
00349 
00350   TopicSettings topic;
00351 
00352   while (!config.enumerate_values(key,
00353                                   value_index++,
00354                                   value_name,
00355                                   value_type)) {
00356     if (value_type == ACE_Configuration::STRING) {
00357       status = config.get_string_value(key, value_name.c_str(), value);
00358       if (!status) {
00359         status = status || topic.set(value_name.c_str(), value.c_str());
00360       }
00361     } else {
00362       ACE_ERROR((LM_ERROR, ACE_TEXT("unexpected value type %d\n"), value_type));
00363       status = -1;
00364       break;
00365     }
00366   }
00367   if (!status) {
00368     topic_map_[topic_name] = topic;
00369   }
00370   return status;
00371 }
00372 
00373 int
00374 Parser::parse_connection(ACE_Configuration_Heap& config,
00375                          ACE_Configuration_Section_Key& key,
00376                          const char* connection_name)
00377 {
00378   int status = 0;
00379   int value_index = 0;
00380   ACE_TString value_name, value;
00381   ACE_Configuration::VALUETYPE value_type;
00382 
00383   ConnectionSettings connection;
00384 
00385   while (!config.enumerate_values(key,
00386                                   value_index++,
00387                                   value_name,
00388                                   value_type)) {
00389     if (value_type == ACE_Configuration::STRING) {
00390       status = config.get_string_value(key, value_name.c_str(), value);
00391       if (!status) {
00392         status = status || connection.set(value_name.c_str(), value.c_str());
00393       }
00394     } else {
00395       ACE_ERROR((LM_ERROR, ACE_TEXT("unexpected value type %d\n"), value_type));
00396       status = -1;
00397       break;
00398     }
00399   }
00400   if (!status) {
00401     connection_map_[connection_name] = connection;
00402   }
00403   return status;
00404 }
00405 
00406 int
00407 Parser::parse_qos(ACE_Configuration_Heap& config,
00408                   ACE_Configuration_Section_Key& key,
00409                   const char* qos_name,
00410                   QosSettings::QosLevel level)
00411 {
00412   int status = 0;
00413   int value_index = 0;
00414   ACE_TString value_name, value;
00415   ACE_Configuration::VALUETYPE value_type;
00416 
00417   // Find existing or create new settings
00418   QosSettings& qos = qos_map_[qos_name];
00419 
00420   while (!config.enumerate_values(key,
00421                                   value_index++,
00422                                   value_name,
00423                                   value_type)) {
00424     if (value_type == ACE_Configuration::STRING) {
00425       status = config.get_string_value(key, value_name.c_str(), value);
00426       if (!status) {
00427         status = status ||
00428                  qos.set_qos(level, value_name.c_str(), value.c_str());
00429       }
00430     } else {
00431       ACE_ERROR((LM_ERROR, ACE_TEXT("unexpected value type %d\n"), value_type));
00432       status = -1;
00433       break;
00434     }
00435   }
00436   return status;
00437 }
00438 
00439 int
00440 Parser::parse_sections(ACE_Configuration_Heap& config,
00441                        const char* section_type,
00442                        bool required)
00443 {
00444   int status = 0;
00445   ACE_Configuration_Section_Key key;
00446   // If we can't open this section
00447   if (config.open_section(config.root_section(),
00448                           section_type,
00449                           0, // don't create if missing
00450                           key) != 0) {
00451     if (required) {
00452       ACE_ERROR((LM_ERROR, ACE_TEXT("Could not open %C section in config file, status %d\n"), section_type, status));
00453       status = -1;
00454     }
00455   // Else, we can open this section
00456   } else {
00457     // Open subsections
00458     int section_index = 0;
00459     ACE_TString section_name;
00460 
00461     while (!config.enumerate_sections(key,
00462                                       section_index++,
00463                                       section_name)) {
00464       ACE_Configuration_Section_Key subkey;
00465       // Open subsection
00466       if (config.open_section(key,
00467                               section_name.c_str(),
00468                               0, // don't create if missing
00469                               subkey) != 0) {
00470         ACE_ERROR((LM_ERROR, ACE_TEXT("Could not open subsections of %C\n"), section_name.c_str()));
00471         break;
00472       }
00473 
00474       if (std::strcmp(section_type, CONNECTION_SECTION) == 0) {
00475         status = parse_connection(config, subkey, section_name.c_str());
00476       } else if (std::strcmp(section_type, TOPIC_SECTION) == 0) {
00477         status = parse_topic(config, subkey, section_name.c_str());
00478       } else if (std::strcmp(section_type, DATAWRITER_QOS_SECTION) == 0) {
00479         status = parse_qos(
00480             config, subkey, section_name.c_str(), QosSettings::datawriter);
00481       } else if (std::strcmp(section_type, DATAREADER_QOS_SECTION) == 0) {
00482         status = parse_qos(
00483             config, subkey, section_name.c_str(), QosSettings::datareader);
00484       } else if (std::strcmp(section_type, PUBLISHER_QOS_SECTION) == 0) {
00485         status = parse_qos(
00486             config, subkey, section_name.c_str(), QosSettings::publisher);
00487       } else if (std::strcmp(section_type, SUBSCRIBER_QOS_SECTION) == 0) {
00488         status = parse_qos(
00489             config, subkey, section_name.c_str(), QosSettings::subscriber);
00490       } else {
00491         ACE_ERROR((LM_ERROR, ACE_TEXT("unknown section %C\n"), section_type));
00492       }
00493     }
00494   }
00495   return status;
00496 }
00497 
00498 } } }
00499 
00500 OPENDDS_END_VERSIONED_NAMESPACE_DECL
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1