StaticDiscovery.cpp

Go to the documentation of this file.
00001 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
00002 
00003 #include "StaticDiscovery.h"
00004 #include "dds/DCPS/debug.h"
00005 #include "dds/DCPS/ConfigUtils.h"
00006 #include "dds/DCPS/DomainParticipantImpl.h"
00007 #include "dds/DCPS/Marked_Default_Qos.h"
00008 #include "dds/DCPS/SubscriberImpl.h"
00009 #include "dds/DCPS/BuiltInTopicUtils.h"
00010 #include "dds/DCPS/Registered_Data_Types.h"
00011 #include "dds/DCPS/Qos_Helper.h"
00012 #include "dds/DCPS/DataWriterImpl.h"
00013 #include "dds/DCPS/transport/framework/TransportRegistry.h"
00014 
00015 #include <ctype.h>
00016 
00017 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00018 
00019 namespace OpenDDS {
00020 namespace DCPS {
00021 
00022 namespace {
00023   const size_t BYTES_IN_VENDOR = 2;
00024   const size_t HEX_DIGITS_IN_VENDOR = 2 * BYTES_IN_VENDOR;
00025   const size_t BYTES_IN_DOMAIN = 4;
00026   const size_t HEX_DIGITS_IN_DOMAIN = 2 * BYTES_IN_DOMAIN;
00027   const size_t BYTES_IN_PARTICIPANT = 6;
00028   const size_t HEX_DIGITS_IN_PARTICIPANT = 2 * BYTES_IN_PARTICIPANT;
00029   const size_t BYTES_IN_ENTITY = 3;
00030   const size_t HEX_DIGITS_IN_ENTITY = 2 * BYTES_IN_ENTITY;
00031   const size_t TYPE_NAME_MAX = 128;
00032 }
00033 
00034 void EndpointRegistry::match()
00035 {
00036   for (WriterMapType::iterator wp = writer_map.begin(), wp_limit = writer_map.end();
00037        wp != wp_limit;
00038        ++wp) {
00039     const RepoId& writerid = wp->first;
00040     Writer& writer = wp->second;
00041     for (ReaderMapType::iterator rp = reader_map.begin(), rp_limit = reader_map.end();
00042          rp != rp_limit;
00043          ++rp) {
00044       const RepoId& readerid = rp->first;
00045       Reader& reader = rp->second;
00046 
00047       if (StaticDiscGuidDomainEqual()(readerid.guidPrefix, writerid.guidPrefix) &&
00048           !StaticDiscGuidPartEqual()(readerid.guidPrefix, writerid.guidPrefix) &&
00049           reader.topic_name == writer.topic_name) {
00050         // Different participants, same topic.
00051         IncompatibleQosStatus writerStatus = {0, 0, 0, DDS::QosPolicyCountSeq()};
00052         IncompatibleQosStatus readerStatus = {0, 0, 0, DDS::QosPolicyCountSeq()};
00053         const TransportLocatorSeq& writer_trans_info = writer.trans_info;
00054         const TransportLocatorSeq& reader_trans_info = reader.trans_info;
00055         const DDS::DataWriterQos& writer_qos = writer.qos;
00056         const DDS::DataReaderQos& reader_qos = reader.qos;
00057         const DDS::PublisherQos& publisher_qos = writer.publisher_qos;
00058         const DDS::SubscriberQos& subscriber_qos = reader.subscriber_qos;
00059 
00060         if (compatibleQOS(&writerStatus, &readerStatus, writer_trans_info, reader_trans_info,
00061                           &writer_qos, &reader_qos, &publisher_qos, &subscriber_qos)) {
00062           switch (reader.qos.reliability.kind) {
00063           case DDS::BEST_EFFORT_RELIABILITY_QOS:
00064             writer.best_effort_readers.insert(readerid);
00065             reader.best_effort_writers.insert(writerid);
00066             break;
00067           case DDS::RELIABLE_RELIABILITY_QOS:
00068             writer.reliable_readers.insert(readerid);
00069             reader.reliable_writers.insert(writerid);
00070             break;
00071           }
00072         }
00073       }
00074     }
00075   }
00076 }
00077 
00078 StaticEndpointManager::StaticEndpointManager(const RepoId& participant_id,
00079                                              ACE_Thread_Mutex& lock,
00080                                              const EndpointRegistry& registry,
00081                                              StaticParticipant& participant)
00082   : EndpointManager<StaticDiscoveredParticipantData>(participant_id, lock)
00083   , registry_(registry)
00084   , participant_(participant)
00085 {
00086   pub_bit_key_.value[0] = pub_bit_key_.value[1] = pub_bit_key_.value[2] = 0;
00087   sub_bit_key_.value[0] = sub_bit_key_.value[1] = sub_bit_key_.value[2] = 0;
00088 }
00089 
00090 void StaticEndpointManager::init_bit()
00091 {
00092   // Discover all remote publications and subscriptions.
00093 
00094   for (EndpointRegistry::WriterMapType::const_iterator pos = registry_.writer_map.begin(),
00095          limit = registry_.writer_map.end();
00096        pos != limit;
00097        ++pos) {
00098     const RepoId& remoteid = pos->first;
00099     const EndpointRegistry::Writer& writer = pos->second;
00100 
00101     if (!GuidPrefixEqual()(participant_id_.guidPrefix, remoteid.guidPrefix)) {
00102       increment_key(pub_bit_key_);
00103       pub_key_to_id_[pub_bit_key_] = remoteid;
00104 
00105       // pos represents a remote.
00106       // Populate data.
00107       DDS::PublicationBuiltinTopicData data;
00108 
00109       data.key = pub_bit_key_;
00110       OPENDDS_STRING topic_name = writer.topic_name;
00111       data.topic_name = topic_name.c_str();
00112       const EndpointRegistry::Topic& topic = registry_.topic_map.find(topic_name)->second;
00113       data.type_name = topic.type_name.c_str();
00114       data.durability = writer.qos.durability;
00115       data.durability_service = writer.qos.durability_service;
00116       data.deadline = writer.qos.deadline;
00117       data.latency_budget = writer.qos.latency_budget;
00118       data.liveliness = writer.qos.liveliness;
00119       data.reliability = writer.qos.reliability;
00120       data.lifespan = writer.qos.lifespan;
00121       data.user_data = writer.qos.user_data;
00122       data.ownership = writer.qos.ownership;
00123       data.ownership_strength = writer.qos.ownership_strength;
00124       data.destination_order = writer.qos.destination_order;
00125       data.presentation = writer.publisher_qos.presentation;
00126       data.partition = writer.publisher_qos.partition;
00127       // If the TopicQos becomes available, this can be populated.
00128       //data.topic_data = topic_details.qos_.topic_data;
00129       data.group_data = writer.publisher_qos.group_data;
00130 #ifndef DDS_HAS_MINIMUM_BIT
00131       OpenDDS::DCPS::PublicationBuiltinTopicDataDataReaderImpl* bit = pub_bit();
00132       if (bit) { // bit may be null if the DomainParticipant is shutting down
00133         bit->store_synthetic_data(data, DDS::NEW_VIEW_STATE);
00134       }
00135 #endif /* DDS_HAS_MINIMUM_BIT */
00136     }
00137   }
00138 
00139   for (EndpointRegistry::ReaderMapType::const_iterator pos = registry_.reader_map.begin(),
00140          limit = registry_.reader_map.end();
00141        pos != limit;
00142        ++pos) {
00143     const RepoId& remoteid = pos->first;
00144     const EndpointRegistry::Reader& reader = pos->second;
00145 
00146     if (!GuidPrefixEqual()(participant_id_.guidPrefix, remoteid.guidPrefix)) {
00147       increment_key(sub_bit_key_);
00148       sub_key_to_id_[sub_bit_key_] = remoteid;
00149 
00150       // pos represents a remote.
00151       // Populate data.
00152       DDS::SubscriptionBuiltinTopicData data;
00153 
00154       data.key = sub_bit_key_;
00155       OPENDDS_STRING topic_name = reader.topic_name;
00156       data.topic_name = topic_name.c_str();
00157       const EndpointRegistry::Topic& topic = registry_.topic_map.find(topic_name)->second;
00158       data.type_name = topic.type_name.c_str();
00159       data.durability = reader.qos.durability;
00160       data.deadline = reader.qos.deadline;
00161       data.latency_budget = reader.qos.latency_budget;
00162       data.liveliness = reader.qos.liveliness;
00163       data.reliability = reader.qos.reliability;
00164       data.ownership = reader.qos.ownership;
00165       data.destination_order = reader.qos.destination_order;
00166       data.user_data = reader.qos.user_data;
00167       data.time_based_filter = reader.qos.time_based_filter;
00168       data.presentation = reader.subscriber_qos.presentation;
00169       data.partition = reader.subscriber_qos.partition;
00170       // // If the TopicQos becomes available, this can be populated.
00171       //data.topic_data = topic_details.qos_.topic_data;
00172       data.group_data = reader.subscriber_qos.group_data;
00173 
00174 #ifndef DDS_HAS_MINIMUM_BIT
00175       OpenDDS::DCPS::SubscriptionBuiltinTopicDataDataReaderImpl* bit = sub_bit();
00176       if (bit) { // bit may be null if the DomainParticipant is shutting down
00177         bit->store_synthetic_data(data, DDS::NEW_VIEW_STATE);
00178       }
00179 #endif /* DDS_HAS_MINIMUM_BIT */
00180     }
00181   }
00182 }
00183 
00184 void StaticEndpointManager::assign_publication_key(RepoId& rid,
00185                                                    const RepoId& /*topicId*/,
00186                                                    const DDS::DataWriterQos& qos)
00187 {
00188   if (qos.user_data.value.length() != BYTES_IN_ENTITY) {
00189     ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: StaticEndpointManager::assign_publication_key: no user data to identify writer\n")));
00190     return;
00191   }
00192 
00193   rid.entityId.entityKey[0] = qos.user_data.value[0];
00194   rid.entityId.entityKey[1] = qos.user_data.value[1];
00195   rid.entityId.entityKey[2] = qos.user_data.value[2];
00196   rid.entityId.entityKind = ENTITYKIND_USER_WRITER_WITH_KEY;
00197 
00198   if (DCPS_debug_level > 8) {
00199     ACE_DEBUG((LM_INFO, "(%P|%t) looking up writer ID %C\n",
00200                LogGuid(rid).c_str()));
00201   }
00202 
00203   EndpointRegistry::WriterMapType::const_iterator pos = registry_.writer_map.find(rid);
00204   if (pos == registry_.writer_map.end()) {
00205     ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: StaticEndpointManager::assign_publication_key: unknown writer: %C\n"), LogGuid(rid).c_str()));
00206     return;
00207   }
00208 
00209   DDS::DataWriterQos qos2(qos);
00210   // Qos in registry will not have the user data so overwrite.
00211   qos2.user_data = pos->second.qos.user_data;
00212 
00213   DDS::DataWriterQos qos3(pos->second.qos);
00214 
00215   if (qos2 != qos3) {
00216     ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) WARNING: StaticEndpointManager::assign_publication_key: dynamic and static QoS differ\n")));
00217   }
00218 }
00219 
00220 void StaticEndpointManager::assign_subscription_key(RepoId& rid,
00221                                                     const RepoId& /*topicId*/,
00222                                                     const DDS::DataReaderQos& qos)
00223 {
00224   if (qos.user_data.value.length() != BYTES_IN_ENTITY) {
00225     ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: StaticEndpointManager::assign_subscription_key: no user data to identify reader\n")));
00226     return;
00227   }
00228 
00229   rid.entityId.entityKey[0] = qos.user_data.value[0];
00230   rid.entityId.entityKey[1] = qos.user_data.value[1];
00231   rid.entityId.entityKey[2] = qos.user_data.value[2];
00232   rid.entityId.entityKind = ENTITYKIND_USER_READER_WITH_KEY;
00233 
00234   EndpointRegistry::ReaderMapType::const_iterator pos = registry_.reader_map.find(rid);
00235   if (pos == registry_.reader_map.end()) {
00236     ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: StaticEndpointManager::assign_subscription_key: unknown reader: %C\n"), LogGuid(rid).c_str()));
00237     return;
00238   }
00239 
00240   DDS::DataReaderQos qos2(qos);
00241   // Qos in registry will not have the user data so overwrite.
00242   qos2.user_data = pos->second.qos.user_data;
00243 
00244   if (qos2 != pos->second.qos) {
00245     ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) WARNING: StaticEndpointManager::assign_subscription_key: dynamic and static QoS differ\n")));
00246   }
00247 }
00248 
00249 bool
00250 StaticEndpointManager::update_topic_qos(const RepoId& /*topicId*/,
00251                                         const DDS::TopicQos& /*qos*/,
00252                                         OPENDDS_STRING& /*name*/)
00253 {
00254   ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: StaticEndpointManager::update_topic_qos - ")
00255              ACE_TEXT("Not allowed\n")));
00256   return false;
00257 }
00258 
00259 bool
00260 StaticEndpointManager::update_publication_qos(const RepoId& /*publicationId*/,
00261                                               const DDS::DataWriterQos& /*qos*/,
00262                                               const DDS::PublisherQos& /*publisherQos*/)
00263 {
00264   ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: StaticEndpointManager::update_publication_qos - ")
00265              ACE_TEXT("Not allowed\n")));
00266   return false;
00267 }
00268 
00269 bool
00270 StaticEndpointManager::update_subscription_qos(const RepoId& /*subscriptionId*/,
00271                                                const DDS::DataReaderQos& /*qos*/,
00272                                                const DDS::SubscriberQos& /*subscriberQos*/)
00273 {
00274   ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: StaticEndpointManager::update_subscription_qos - ")
00275              ACE_TEXT("Not allowed\n")));
00276   return false;
00277 }
00278 
00279 bool
00280 StaticEndpointManager::update_subscription_params(const RepoId& /*subId*/,
00281                                                   const DDS::StringSeq& /*params*/)
00282 {
00283   ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: StaticEndpointManager::update_subscription_qos - ")
00284              ACE_TEXT("Not allowed\n")));
00285   return false;
00286 }
00287 
00288 void
00289 StaticEndpointManager::association_complete(const RepoId& /*localId*/,
00290                                             const RepoId& /*remoteId*/)
00291 {
00292   // Do nothing.
00293 }
00294 
00295 bool
00296 StaticEndpointManager::disassociate(const StaticDiscoveredParticipantData& /*pdata*/)
00297 {
00298   ACE_DEBUG((LM_NOTICE, ACE_TEXT("(%P|%t) StaticEndpointManager::disassociate TODO\n")));
00299   // TODO
00300   return false;
00301 }
00302 
00303 DDS::ReturnCode_t
00304 StaticEndpointManager::add_publication_i(const RepoId& writerid,
00305                                          LocalPublication& pub)
00306 {
00307   /*
00308     Find all matching remote readers.
00309     If the reader is best effort, then associate immediately.
00310     If the reader is reliable (we are reliable by implication), register with the transport to receive notification that the remote reader is up.
00311     */
00312   EndpointRegistry::WriterMapType::const_iterator pos = registry_.writer_map.find(writerid);
00313   if (pos == registry_.writer_map.end()) {
00314     return DDS::RETCODE_ERROR;
00315   }
00316   const EndpointRegistry::Writer& writer = pos->second;
00317 
00318   for (RepoIdSet::const_iterator pos = writer.best_effort_readers.begin(), limit = writer.best_effort_readers.end();
00319        pos != limit;
00320        ++pos) {
00321     const RepoId& readerid = *pos;
00322     const EndpointRegistry::Reader& reader = registry_.reader_map.find(readerid)->second;
00323 
00324 #ifdef __SUNPRO_CC
00325     ReaderAssociation ra;
00326     ra.readerTransInfo = reader.trans_info;
00327     ra.readerId = readerid;
00328     ra.subQos = reader.subscriber_qos;
00329     ra.readerQos = reader.qos;
00330     ra.filterClassName = "";
00331     ra.filterExpression = "";
00332     ra.exprParams = 0;
00333 #else
00334     const ReaderAssociation ra =
00335       {reader.trans_info, readerid, reader.subscriber_qos, reader.qos, "", "", 0};
00336 #endif
00337     pub.publication_->add_association(writerid, ra, true);
00338     pub.publication_->association_complete(readerid);
00339   }
00340 
00341   for (RepoIdSet::const_iterator pos = writer.reliable_readers.begin(), limit = writer.reliable_readers.end();
00342        pos != limit;
00343        ++pos) {
00344     const RepoId& readerid = *pos;
00345     const EndpointRegistry::Reader& reader = registry_.reader_map.find(readerid)->second;
00346     pub.publication_->register_for_reader(participant_id_, writerid, readerid, reader.trans_info, this);
00347   }
00348 
00349   return DDS::RETCODE_OK;
00350 }
00351 
00352 DDS::ReturnCode_t
00353 StaticEndpointManager::remove_publication_i(const RepoId& writerid)
00354 {
00355   LocalPublicationMap::const_iterator lp_pos = local_publications_.find(writerid);
00356   if (lp_pos == local_publications_.end()) {
00357     return DDS::RETCODE_ERROR;
00358   }
00359 
00360   const LocalPublication& pub = lp_pos->second;
00361 
00362   EndpointRegistry::WriterMapType::const_iterator pos = registry_.writer_map.find(writerid);
00363   if (pos == registry_.writer_map.end()) {
00364     return DDS::RETCODE_ERROR;
00365   }
00366 
00367   const EndpointRegistry::Writer& writer = pos->second;
00368 
00369   ReaderIdSeq ids;
00370   ids.length((CORBA::ULong)writer.reliable_readers.size());
00371   CORBA::ULong idx = 0;
00372   for (RepoIdSet::const_iterator pos = writer.reliable_readers.begin(), limit = writer.reliable_readers.end();
00373         pos != limit;
00374         ++pos, ++idx) {
00375     const RepoId& readerid = *pos;
00376     ids[idx] = readerid;
00377     pub.publication_->unregister_for_reader(participant_id_, writerid, readerid);
00378   }
00379 
00380   return DDS::RETCODE_OK;
00381 }
00382 
00383 DDS::ReturnCode_t
00384 StaticEndpointManager::add_subscription_i(const RepoId& readerid,
00385                                           LocalSubscription& sub)
00386 {
00387   /*
00388     Find all matching remote writers.
00389     If we (the reader) is best effort, then associate immediately.
00390     If we (the reader) are reliable, then register with the transport to receive notification that the remote writer is up.
00391     */
00392   EndpointRegistry::ReaderMapType::const_iterator pos = registry_.reader_map.find(readerid);
00393   if (pos == registry_.reader_map.end()) {
00394     return DDS::RETCODE_ERROR;
00395   }
00396   const EndpointRegistry::Reader& reader = pos->second;
00397 
00398   for (RepoIdSet::const_iterator pos = reader.best_effort_writers.begin(), limit = reader.best_effort_writers.end();
00399        pos != limit;
00400        ++pos) {
00401     const RepoId& writerid = *pos;
00402     const EndpointRegistry::Writer& writer = registry_.writer_map.find(writerid)->second;
00403 
00404 #ifdef __SUNPRO_CC
00405     WriterAssociation wa;
00406     wa.writerTransInfo = writer.trans_info;
00407     wa.writerId = writerid;
00408     wa.pubQos = writer.publisher_qos;
00409     wa.writerQos = writer.qos;
00410 #else
00411     const WriterAssociation wa =
00412       {writer.trans_info, writerid, writer.publisher_qos, writer.qos};
00413 #endif
00414     sub.subscription_->add_association(readerid, wa, false);
00415   }
00416 
00417   for (RepoIdSet::const_iterator pos = reader.reliable_writers.begin(), limit = reader.reliable_writers.end();
00418        pos != limit;
00419        ++pos) {
00420     const RepoId& writerid = *pos;
00421     const EndpointRegistry::Writer& writer = registry_.writer_map.find(writerid)->second;
00422     sub.subscription_->register_for_writer(participant_id_, readerid, writerid, writer.trans_info, this);
00423   }
00424 
00425   return DDS::RETCODE_OK;
00426 }
00427 
00428 DDS::ReturnCode_t
00429 StaticEndpointManager::remove_subscription_i(const RepoId& readerid)
00430 {
00431   LocalSubscriptionMap::const_iterator ls_pos = local_subscriptions_.find(readerid);
00432   if (ls_pos == local_subscriptions_.end()) {
00433     return DDS::RETCODE_ERROR;
00434   }
00435 
00436   const LocalSubscription& sub = ls_pos->second;
00437 
00438   EndpointRegistry::ReaderMapType::const_iterator pos = registry_.reader_map.find(readerid);
00439   if (pos == registry_.reader_map.end()) {
00440     return DDS::RETCODE_ERROR;
00441   }
00442 
00443   const EndpointRegistry::Reader& reader = pos->second;
00444 
00445   WriterIdSeq ids;
00446   ids.length((CORBA::ULong)reader.reliable_writers.size());
00447   CORBA::ULong idx = 0;
00448   for (RepoIdSet::const_iterator pos = reader.reliable_writers.begin(), limit = reader.reliable_writers.end();
00449         pos != limit;
00450         ++pos, ++idx) {
00451     const RepoId& writerid = *pos;
00452     ids[idx] = writerid;
00453     sub.subscription_->unregister_for_writer(participant_id_, readerid, writerid);
00454   }
00455 
00456   return DDS::RETCODE_OK;
00457 }
00458 
00459 bool
00460 StaticEndpointManager::shutting_down() const
00461 {
00462   ACE_DEBUG((LM_NOTICE, ACE_TEXT("(%P|%t) StaticEndpointManager::shutting_down TODO\n")));
00463   // TODO
00464   return false;
00465 }
00466 
00467 void
00468 StaticEndpointManager::populate_transport_locator_sequence(TransportLocatorSeq*& /*tls*/,
00469                                                            DiscoveredSubscriptionIter& /*iter*/,
00470                                                            const RepoId& /*reader*/)
00471 {
00472   ACE_DEBUG((LM_NOTICE, ACE_TEXT("(%P|%t) StaticEndpointManager::populate_transport_locator_sequence TODO\n")));
00473   // TODO
00474 }
00475 
00476 void
00477 StaticEndpointManager::populate_transport_locator_sequence(TransportLocatorSeq*& /*tls*/,
00478                                                            DiscoveredPublicationIter& /*iter*/,
00479                                                            const RepoId& /*reader*/)
00480 {
00481   ACE_DEBUG((LM_NOTICE, ACE_TEXT("(%P|%t) StaticEndpointManager::populate_transport_locator_sequence TODO\n")));
00482   // TODO
00483 }
00484 
00485 bool
00486 StaticEndpointManager::defer_writer(const RepoId& /*writer*/,
00487                                     const RepoId& /*writer_participant*/)
00488 {
00489   ACE_DEBUG((LM_NOTICE, ACE_TEXT("(%P|%t) StaticEndpointManager::defer_writer TODO\n")));
00490   // TODO
00491   return false;
00492 }
00493 
00494 bool
00495 StaticEndpointManager::defer_reader(const RepoId& /*writer*/,
00496                                     const RepoId& /*writer_participant*/)
00497 {
00498   // TODO
00499   ACE_DEBUG((LM_NOTICE, ACE_TEXT("(%P|%t) StaticEndpointManager::defer_reader TODO\n")));
00500   return false;
00501 }
00502 
00503 void
00504 StaticEndpointManager::reader_exists(const RepoId& readerid, const RepoId& writerid)
00505 {
00506   ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00507   LocalPublicationMap::const_iterator lp_pos = local_publications_.find(writerid);
00508   EndpointRegistry::ReaderMapType::const_iterator reader_pos = registry_.reader_map.find(readerid);
00509   if (lp_pos != local_publications_.end() &&
00510       reader_pos != registry_.reader_map.end()) {
00511     DataWriterCallbacks* dwr = lp_pos->second.publication_;
00512 #ifdef __SUNPRO_CC
00513     ReaderAssociation ra;
00514     ra.readerTransInfo = reader_pos->second.trans_info;
00515     ra.readerId = readerid;
00516     ra.subQos = reader_pos->second.subscriber_qos;
00517     ra.readerQos = reader_pos->second.qos;
00518     ra.filterClassName = "";
00519     ra.filterExpression = "";
00520     ra.exprParams = 0;
00521 #else
00522     const ReaderAssociation ra =
00523       {reader_pos->second.trans_info, readerid, reader_pos->second.subscriber_qos, reader_pos->second.qos, "", "", 0};
00524 
00525 #endif
00526     dwr->add_association(writerid, ra, true);
00527     dwr->association_complete(readerid);
00528   }
00529 }
00530 
00531 void
00532 StaticEndpointManager::reader_does_not_exist(const RepoId& readerid, const RepoId& writerid)
00533 {
00534   ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00535   LocalPublicationMap::const_iterator lp_pos = local_publications_.find(writerid);
00536   EndpointRegistry::ReaderMapType::const_iterator reader_pos = registry_.reader_map.find(readerid);
00537   if (lp_pos != local_publications_.end() &&
00538       reader_pos != registry_.reader_map.end()) {
00539     DataWriterCallbacks* dwr = lp_pos->second.publication_;
00540     ReaderIdSeq ids;
00541     ids.length(1);
00542     ids[0] = readerid;
00543     dwr->remove_associations(ids, true);
00544   }
00545 }
00546 
00547 void
00548 StaticEndpointManager::writer_exists(const RepoId& writerid, const RepoId& readerid)
00549 {
00550   ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00551   LocalSubscriptionMap::const_iterator ls_pos = local_subscriptions_.find(readerid);
00552   EndpointRegistry::WriterMapType::const_iterator writer_pos = registry_.writer_map.find(writerid);
00553   if (ls_pos != local_subscriptions_.end() &&
00554       writer_pos != registry_.writer_map.end()) {
00555     DataReaderCallbacks* drr = ls_pos->second.subscription_;
00556 #ifdef __SUNPRO_CC
00557     WriterAssociation wa;
00558     wa.writerTransInfo = writer_pos->second.trans_info;
00559     wa.writerId = writerid;
00560     wa.pubQos = writer_pos->second.publisher_qos;
00561     wa.writerQos = writer_pos->second.qos;
00562 #else
00563     const WriterAssociation wa =
00564       {writer_pos->second.trans_info, writerid, writer_pos->second.publisher_qos, writer_pos->second.qos};
00565 #endif
00566     drr->add_association(readerid, wa, false);
00567   }
00568 }
00569 
00570 void
00571 StaticEndpointManager::writer_does_not_exist(const RepoId& writerid, const RepoId& readerid)
00572 {
00573   ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00574   LocalSubscriptionMap::const_iterator ls_pos = local_subscriptions_.find(readerid);
00575   EndpointRegistry::WriterMapType::const_iterator writer_pos = registry_.writer_map.find(writerid);
00576   if (ls_pos != local_subscriptions_.end() &&
00577       writer_pos != registry_.writer_map.end()) {
00578     DataReaderCallbacks* drr = ls_pos->second.subscription_;
00579     WriterIdSeq ids;
00580     ids.length(1);
00581     ids[0] = writerid;
00582     drr->remove_associations(ids, true);
00583   }
00584 }
00585 
00586 #ifndef DDS_HAS_MINIMUM_BIT
00587 OpenDDS::DCPS::PublicationBuiltinTopicDataDataReaderImpl*
00588 StaticEndpointManager::pub_bit()
00589 {
00590   DDS::Subscriber_var sub = participant_.bit_subscriber();
00591   if (!sub.in())
00592     return 0;
00593 
00594   DDS::DataReader_var d = sub->lookup_datareader(BUILT_IN_PUBLICATION_TOPIC);
00595   return dynamic_cast<OpenDDS::DCPS::PublicationBuiltinTopicDataDataReaderImpl*>(d.in());
00596 }
00597 
00598 OpenDDS::DCPS::SubscriptionBuiltinTopicDataDataReaderImpl*
00599 StaticEndpointManager::sub_bit()
00600 {
00601   DDS::Subscriber_var sub = participant_.bit_subscriber();
00602   if (!sub.in())
00603     return 0;
00604 
00605   DDS::DataReader_var d = sub->lookup_datareader(BUILT_IN_SUBSCRIPTION_TOPIC);
00606   return dynamic_cast<OpenDDS::DCPS::SubscriptionBuiltinTopicDataDataReaderImpl*>(d.in());
00607 }
00608 #endif /* DDS_HAS_MINIMUM_BIT */
00609 
00610 StaticDiscovery::StaticDiscovery(const RepoKey& key)
00611   : PeerDiscovery<StaticParticipant>(key)
00612 {}
00613 
00614 namespace {
00615   unsigned char hextobyte(unsigned char c)
00616   {
00617     if (c >= '0' && c <= '9') {
00618       return c - '0';
00619     }
00620     if (c >= 'a' && c <= 'f') {
00621       return 10 + c - 'a';
00622     }
00623     if (c >= 'A' && c <= 'F') {
00624       return 10 + c - 'A';
00625     }
00626     return c;
00627   }
00628 
00629   unsigned char
00630   fromhex(const OPENDDS_STRING& x, size_t idx)
00631   {
00632     return (hextobyte(x[idx * 2]) << 4) | (hextobyte(x[idx * 2 + 1]));
00633   }
00634 }
00635 
00636 EntityId_t
00637 EndpointRegistry::build_id(const unsigned char* entity_key,
00638                            const unsigned char entity_kind)
00639 {
00640   EntityId_t retval;
00641   retval.entityKey[0] = entity_key[0];
00642   retval.entityKey[1] = entity_key[1];
00643   retval.entityKey[2] = entity_key[2];
00644   retval.entityKind = entity_kind;
00645   return retval;
00646 }
00647 
00648 RepoId
00649 EndpointRegistry::build_id(DDS::DomainId_t domain,
00650                            const unsigned char* participant_id,
00651                            const EntityId_t& entity_id)
00652 {
00653   RepoId id;
00654   id.guidPrefix[0] = VENDORID_OCI[0];
00655   id.guidPrefix[1] = VENDORID_OCI[1];
00656   // id.guidPrefix[2] = domain[0]
00657   // id.guidPrefix[3] = domain[1]
00658   // id.guidPrefix[4] = domain[2]
00659   // id.guidPrefix[5] = domain[3]
00660   DDS::DomainId_t netdom = ACE_HTONL(domain);
00661   ACE_OS::memcpy(&id.guidPrefix[2], &netdom, sizeof(DDS::DomainId_t));
00662   // id.guidPrefix[6] = participant[0]
00663   // id.guidPrefix[7] = participant[1]
00664   // id.guidPrefix[8] = participant[2]
00665   // id.guidPrefix[9] = participant[3]
00666   // id.guidPrefix[10] = participant[4]
00667   // id.guidPrefix[11] = participant[5]
00668   ACE_OS::memcpy(&id.guidPrefix[6], participant_id, 6);
00669   id.entityId = entity_id;
00670   return id;
00671 }
00672 
00673 OpenDDS::DCPS::RepoId
00674 StaticDiscovery::generate_participant_guid()
00675 {
00676   return GUID_UNKNOWN;
00677 }
00678 
00679 AddDomainStatus
00680 StaticDiscovery::add_domain_participant(DDS::DomainId_t domain,
00681                                         const DDS::DomainParticipantQos& qos)
00682 {
00683   AddDomainStatus ads = {RepoId(), false /*federated*/};
00684 
00685   if (qos.user_data.value.length() != BYTES_IN_PARTICIPANT) {
00686     ACE_ERROR((LM_ERROR,
00687                 ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::add_domain_participant ")
00688                 ACE_TEXT("No userdata to identify participant\n")));
00689     return ads;
00690   }
00691 
00692   RepoId id = EndpointRegistry::build_id(domain,
00693                                          qos.user_data.value.get_buffer(),
00694                                          ENTITYID_PARTICIPANT);
00695   if (!get_part(domain, id).is_nil()) {
00696     ACE_ERROR((LM_ERROR,
00697                 ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::add_domain_participant ")
00698                 ACE_TEXT("Duplicate participant\n")));
00699     return ads;
00700   }
00701 
00702   const RcHandle<StaticParticipant> participant (make_rch<StaticParticipant>(ref(id), qos, registry));
00703 
00704   {
00705     ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, ads);
00706     participants_[domain][id] = participant;
00707   }
00708 
00709   ads.id = id;
00710   return ads;
00711 }
00712 
00713 #if defined(OPENDDS_SECURITY)
00714 AddDomainStatus
00715 StaticDiscovery::add_domain_participant_secure(
00716   DDS::DomainId_t /*domain*/,
00717   const DDS::DomainParticipantQos& /*qos*/,
00718   const OpenDDS::DCPS::RepoId& /*guid*/,
00719   DDS::Security::IdentityHandle /*id*/,
00720   DDS::Security::PermissionsHandle /*perm*/,
00721   DDS::Security::ParticipantCryptoHandle /*part_crypto*/)
00722 {
00723   const DCPS::AddDomainStatus ads = {OpenDDS::DCPS::GUID_UNKNOWN, false /*federated*/};
00724   ACE_ERROR((LM_ERROR,
00725               ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::add_domain_participant_secure ")
00726               ACE_TEXT("Security not supported for static discovery.\n")));
00727   return ads;
00728 }
00729 #endif
00730 
00731 namespace {
00732   const ACE_TCHAR TOPIC_SECTION_NAME[] = ACE_TEXT("topic");
00733   const ACE_TCHAR DATAWRITERQOS_SECTION_NAME[] = ACE_TEXT("datawriterqos");
00734   const ACE_TCHAR DATAREADERQOS_SECTION_NAME[] = ACE_TEXT("datareaderqos");
00735   const ACE_TCHAR PUBLISHERQOS_SECTION_NAME[]  = ACE_TEXT("publisherqos");
00736   const ACE_TCHAR SUBSCRIBERQOS_SECTION_NAME[] = ACE_TEXT("subscriberqos");
00737   const ACE_TCHAR ENDPOINT_SECTION_NAME[] = ACE_TEXT("endpoint");
00738 
00739   void parse_second(CORBA::Long& x, const OPENDDS_STRING& value)
00740   {
00741     if (value == "DURATION_INFINITE_SEC") {
00742       x = DDS::DURATION_INFINITE_SEC;
00743     } else {
00744       x = atoi(value.c_str());
00745     }
00746   }
00747 
00748   void parse_nanosecond(CORBA::ULong& x, const OPENDDS_STRING& value)
00749   {
00750     if (value == "DURATION_INFINITE_NANOSEC") {
00751       x = DDS::DURATION_INFINITE_NSEC;
00752     } else {
00753       x = atoi(value.c_str());
00754     }
00755   }
00756 
00757   bool parse_bool(CORBA::Boolean& x, const OPENDDS_STRING& value)
00758   {
00759     if (value == "true") {
00760       x = true;
00761       return true;
00762     } else if (value == "false") {
00763       x = false;
00764       return true;
00765     }
00766     return false;
00767   }
00768 
00769   void parse_list(DDS::PartitionQosPolicy& x, const OPENDDS_STRING& value)
00770   {
00771     // Value can be a comma-separated list
00772     const char* start = value.c_str();
00773     char buffer[128];
00774     std::memset(buffer, 0, sizeof(buffer));
00775     while (const char* next_comma = std::strchr(start, ',')) {
00776       // Copy into temp buffer, won't have null
00777       std::strncpy(buffer, start, next_comma - start);
00778       // Append null
00779       buffer[next_comma - start] = '\0';
00780       // Add to QOS
00781       x.name.length(x.name.length() + 1);
00782       x.name[x.name.length() - 1] = static_cast<const char*>(buffer);
00783       // Advance pointer
00784       start = next_comma + 1;
00785     }
00786     // Append everything after last comma
00787     x.name.length(x.name.length() + 1);
00788     x.name[x.name.length() - 1] = start;
00789   }
00790 }
00791 
00792 int
00793 StaticDiscovery::load_configuration(ACE_Configuration_Heap& cf)
00794 {
00795   if (parse_topics(cf) ||
00796       parse_datawriterqos(cf) ||
00797       parse_datareaderqos(cf) ||
00798       parse_publisherqos(cf) ||
00799       parse_subscriberqos(cf) ||
00800       parse_endpoints(cf)) {
00801     return -1;
00802   }
00803 
00804   registry.match();
00805 
00806   return 0;
00807 }
00808 
00809 int
00810 StaticDiscovery::parse_topics(ACE_Configuration_Heap& cf)
00811 {
00812   const ACE_Configuration_Section_Key& root = cf.root_section();
00813   ACE_Configuration_Section_Key section;
00814 
00815   if (cf.open_section(root, TOPIC_SECTION_NAME, 0, section) != 0) {
00816     if (DCPS_debug_level > 0) {
00817       // This is not an error if the configuration file does not have
00818       // any topic (sub)section.
00819       ACE_DEBUG((LM_NOTICE,
00820                   ACE_TEXT("(%P|%t) NOTICE: StaticDiscovery::parse_topics ")
00821                   ACE_TEXT("no [%s] sections.\n"),
00822                   TOPIC_SECTION_NAME));
00823     }
00824     return 0;
00825   }
00826 
00827   // Ensure there are no key/values in the [topic] section.
00828   // Every key/value must be in a [topic/*] sub-section.
00829   ValueMap vm;
00830   if (pullValues(cf, section, vm) > 0) {
00831     ACE_ERROR_RETURN((LM_ERROR,
00832                       ACE_TEXT("(%P|%t) StaticDiscovery::parse_topics ")
00833                       ACE_TEXT("[topic] sections must have a subsection name\n")),
00834                       -1);
00835   }
00836   // Process the subsections of this section
00837   KeyList keys;
00838   if (processSections(cf, section, keys) != 0) {
00839     ACE_ERROR_RETURN((LM_ERROR,
00840                       ACE_TEXT("(%P|%t) StaticDiscovery::parse_topics ")
00841                       ACE_TEXT("too many nesting layers in the [topic] section.\n")),
00842                       -1);
00843   }
00844 
00845   // Loop through the [topic/*] sections
00846   for (KeyList::const_iterator it = keys.begin(); it != keys.end(); ++it) {
00847     OPENDDS_STRING topic_name = it->first;
00848 
00849     if (DCPS_debug_level > 0) {
00850       ACE_DEBUG((LM_NOTICE,
00851                   ACE_TEXT("(%P|%t) NOTICE: StaticDiscovery::parse_topics ")
00852                   ACE_TEXT("processing [topic/%C] section.\n"),
00853                   topic_name.c_str()));
00854     }
00855 
00856     ValueMap values;
00857     pullValues(cf, it->second, values);
00858 
00859     EndpointRegistry::Topic topic;
00860     bool name_specified = false,
00861       type_name_specified = false;
00862 
00863     for (ValueMap::const_iterator it = values.begin(); it != values.end(); ++it) {
00864       OPENDDS_STRING name = it->first;
00865       OPENDDS_STRING value = it->second;
00866 
00867       if (name == "name") {
00868         topic.name = value;
00869         name_specified = true;
00870       } else if (name == "type_name") {
00871         if (value.size() >= TYPE_NAME_MAX) {
00872           ACE_ERROR_RETURN((LM_ERROR,
00873                             ACE_TEXT("(%P|%t) StaticDiscovery::parse_topics ")
00874                             ACE_TEXT("type_name (%C) must be less than 128 characters in [topic/%C] section.\n"),
00875                             value.c_str(), topic_name.c_str()),
00876                             -1);
00877         }
00878         topic.type_name = value;
00879         type_name_specified = true;
00880       } else {
00881         // Typos are ignored to avoid parsing FACE-specific keys.
00882       }
00883     }
00884 
00885     if (!name_specified) {
00886       topic.name = topic_name;
00887     }
00888 
00889     if (!type_name_specified) {
00890       ACE_ERROR_RETURN((LM_ERROR,
00891                         ACE_TEXT("(%P|%t) StaticDiscovery::parse_topics ")
00892                         ACE_TEXT("No type_name specified for [topic/%C] section.\n"),
00893                         topic_name.c_str()),
00894                        -1);
00895     }
00896 
00897     registry.topic_map[topic_name] = topic;
00898   }
00899 
00900   return 0;
00901 }
00902 
00903 int
00904 StaticDiscovery::parse_datawriterqos(ACE_Configuration_Heap& cf)
00905 {
00906   const ACE_Configuration_Section_Key& root = cf.root_section();
00907   ACE_Configuration_Section_Key section;
00908 
00909   if (cf.open_section(root, DATAWRITERQOS_SECTION_NAME, 0, section) != 0) {
00910     if (DCPS_debug_level > 0) {
00911       // This is not an error if the configuration file does not have
00912       // any datawriterqos (sub)section.
00913       ACE_DEBUG((LM_NOTICE,
00914                   ACE_TEXT("(%P|%t) NOTICE: StaticDiscovery::parse_datawriterqos ")
00915                   ACE_TEXT("no [%s] sections.\n"),
00916                   DATAWRITERQOS_SECTION_NAME));
00917     }
00918     return 0;
00919   }
00920 
00921   // Ensure there are no key/values in the [datawriterqos] section.
00922   // Every key/value must be in a [datawriterqos/*] sub-section.
00923   ValueMap vm;
00924   if (pullValues(cf, section, vm) > 0) {
00925     ACE_ERROR_RETURN((LM_ERROR,
00926                       ACE_TEXT("(%P|%t) StaticDiscovery::parse_datawriterqos ")
00927                       ACE_TEXT("[datawriterqos] sections must have a subsection name\n")),
00928                       -1);
00929   }
00930   // Process the subsections of this section
00931   KeyList keys;
00932   if (processSections(cf, section, keys) != 0) {
00933     ACE_ERROR_RETURN((LM_ERROR,
00934                       ACE_TEXT("(%P|%t) StaticDiscovery::parse_datawriterqos ")
00935                       ACE_TEXT("too many nesting layers in the [datawriterqos] section.\n")),
00936                       -1);
00937   }
00938 
00939   // Loop through the [datawriterqos/*] sections
00940   for (KeyList::const_iterator it = keys.begin(); it != keys.end(); ++it) {
00941     OPENDDS_STRING datawriterqos_name = it->first;
00942 
00943     if (DCPS_debug_level > 0) {
00944       ACE_DEBUG((LM_NOTICE,
00945                   ACE_TEXT("(%P|%t) NOTICE: StaticDiscovery::parse_datawriterqos ")
00946                   ACE_TEXT("processing [datawriterqos/%C] section.\n"),
00947                   datawriterqos_name.c_str()));
00948     }
00949 
00950     ValueMap values;
00951     pullValues(cf, it->second, values);
00952 
00953     DDS::DataWriterQos datawriterqos(TheServiceParticipant->initial_DataWriterQos());
00954 
00955     for (ValueMap::const_iterator it = values.begin(); it != values.end(); ++it) {
00956       OPENDDS_STRING name = it->first;
00957       OPENDDS_STRING value = it->second;
00958 
00959       if (name == "durability.kind") {
00960         if (value == "VOLATILE") {
00961           datawriterqos.durability.kind = DDS::VOLATILE_DURABILITY_QOS;
00962         } else if (value == "TRANSIENT_LOCAL") {
00963           datawriterqos.durability.kind = DDS::TRANSIENT_LOCAL_DURABILITY_QOS;
00964 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
00965         } else if (value == "TRANSIENT") {
00966           datawriterqos.durability.kind = DDS::TRANSIENT_DURABILITY_QOS;
00967         } else if (value == "PERSISTENT") {
00968           datawriterqos.durability.kind = DDS::PERSISTENT_DURABILITY_QOS;
00969 #endif
00970         } else {
00971           ACE_ERROR_RETURN((LM_ERROR,
00972                             ACE_TEXT("(%P|%t) StaticDiscovery::parse_datawriterqos ")
00973                             ACE_TEXT("Illegal value for durability.kind (%C) in [datawriterqos/%C] section.\n"),
00974                             value.c_str(), datawriterqos_name.c_str()),
00975                             -1);
00976         }
00977       } else if (name == "deadline.period.sec") {
00978         parse_second(datawriterqos.deadline.period.sec, value);
00979       } else if (name == "deadline.period.nanosec") {
00980         parse_nanosecond(datawriterqos.deadline.period.nanosec, value);
00981       } else if (name == "latency_budget.duration.sec") {
00982         parse_second(datawriterqos.latency_budget.duration.sec, value);
00983       } else if (name == "latency_budget.duration.nanosec") {
00984         parse_nanosecond(datawriterqos.latency_budget.duration.nanosec, value);
00985       } else if (name == "liveliness.kind") {
00986         if (value == "AUTOMATIC") {
00987           datawriterqos.liveliness.kind = DDS::AUTOMATIC_LIVELINESS_QOS;
00988         } else if (value == "MANUAL_BY_TOPIC") {
00989           datawriterqos.liveliness.kind = DDS::MANUAL_BY_TOPIC_LIVELINESS_QOS;
00990         } else if (value == "MANUAL_BY_PARTICIPANT") {
00991           datawriterqos.liveliness.kind = DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS;
00992         } else {
00993           ACE_ERROR_RETURN((LM_ERROR,
00994                             ACE_TEXT("(%P|%t) StaticDiscovery::parse_datawriterqos ")
00995                             ACE_TEXT("Illegal value for liveliness.kind (%C) in [datawriterqos/%C] section.\n"),
00996                             value.c_str(), datawriterqos_name.c_str()),
00997                             -1);
00998         }
00999       } else if (name == "liveliness.lease_duration.sec") {
01000         parse_second(datawriterqos.liveliness.lease_duration.sec, value);
01001       } else if (name == "liveliness.lease_duration.nanosec") {
01002         parse_nanosecond(datawriterqos.liveliness.lease_duration.nanosec, value);
01003       } else if (name == "reliability.kind") {
01004         if (value == "BEST_EFFORT") {
01005           datawriterqos.reliability.kind = DDS::BEST_EFFORT_RELIABILITY_QOS;
01006         } else if (value == "RELIABLE") {
01007           datawriterqos.reliability.kind = DDS::RELIABLE_RELIABILITY_QOS;
01008         } else {
01009           ACE_ERROR_RETURN((LM_ERROR,
01010                             ACE_TEXT("(%P|%t) StaticDiscovery::parse_datawriterqos ")
01011                             ACE_TEXT("Illegal value for reliability.kind (%C) in [datawriterqos/%C] section.\n"),
01012                             value.c_str(), datawriterqos_name.c_str()),
01013                             -1);
01014         }
01015       } else if (name == "reliability.max_blocking_time.sec") {
01016         parse_second(datawriterqos.reliability.max_blocking_time.sec, value);
01017       } else if (name == "reliability.max_blocking_time.nanosec") {
01018         parse_nanosecond(datawriterqos.reliability.max_blocking_time.nanosec, value);
01019       } else if (name == "destination_order.kind") {
01020         if (value == "BY_RECEPTION_TIMESTAMP") {
01021           datawriterqos.destination_order.kind = DDS::BY_RECEPTION_TIMESTAMP_DESTINATIONORDER_QOS;
01022         } else if (value == "BY_SOURCE_TIMESTAMP") {
01023           datawriterqos.destination_order.kind = DDS::BY_SOURCE_TIMESTAMP_DESTINATIONORDER_QOS;
01024         } else {
01025           ACE_ERROR_RETURN((LM_ERROR,
01026                             ACE_TEXT("(%P|%t) StaticDiscovery::parse_datawriterqos ")
01027                             ACE_TEXT("Illegal value for destination_order.kind (%C) in [datawriterqos/%C] section.\n"),
01028                             value.c_str(), datawriterqos_name.c_str()),
01029                             -1);
01030         }
01031       } else if (name == "history.kind") {
01032         if (value == "KEEP_ALL") {
01033           datawriterqos.history.kind = DDS::KEEP_ALL_HISTORY_QOS;
01034         } else if (value == "KEEP_LAST") {
01035           datawriterqos.history.kind = DDS::KEEP_LAST_HISTORY_QOS;
01036         } else {
01037           ACE_ERROR_RETURN((LM_ERROR,
01038                             ACE_TEXT("(%P|%t) StaticDiscovery::parse_datawriterqos ")
01039                             ACE_TEXT("Illegal value for history.kind (%C) in [datawriterqos/%C] section.\n"),
01040                             value.c_str(), datawriterqos_name.c_str()),
01041                             -1);
01042         }
01043       } else if (name == "history.depth") {
01044         datawriterqos.history.depth = atoi(value.c_str());
01045       } else if (name == "resource_limits.max_samples") {
01046         datawriterqos.resource_limits.max_samples = atoi(value.c_str());
01047       } else if (name == "resource_limits.max_instances") {
01048         datawriterqos.resource_limits.max_instances = atoi(value.c_str());
01049       } else if (name == "resource_limits.max_samples_per_instance") {
01050         datawriterqos.resource_limits.max_samples_per_instance = atoi(value.c_str());
01051       } else if (name == "transport_priority.value") {
01052         datawriterqos.transport_priority.value = atoi(value.c_str());
01053       } else if (name == "lifespan.duration.sec") {
01054         parse_second(datawriterqos.lifespan.duration.sec, value);
01055       } else if (name == "lifespan.duration.nanosec") {
01056         parse_nanosecond(datawriterqos.lifespan.duration.nanosec, value);
01057       } else if (name == "ownership.kind") {
01058         if (value == "SHARED") {
01059           datawriterqos.ownership.kind = DDS::SHARED_OWNERSHIP_QOS;
01060         } else if (value == "EXCLUSIVE") {
01061           datawriterqos.ownership.kind = DDS::EXCLUSIVE_OWNERSHIP_QOS;
01062         } else {
01063           ACE_ERROR_RETURN((LM_ERROR,
01064                             ACE_TEXT("(%P|%t) StaticDiscovery::parse_datawriterqos ")
01065                             ACE_TEXT("Illegal value for ownership.kind (%C) in [datawriterqos/%C] section.\n"),
01066                             value.c_str(), datawriterqos_name.c_str()),
01067                             -1);
01068         }
01069       } else if (name == "ownership_strength.value") {
01070         datawriterqos.ownership_strength.value = atoi(value.c_str());
01071       } else {
01072         ACE_ERROR_RETURN((LM_ERROR,
01073                           ACE_TEXT("(%P|%t) StaticDiscovery::parse_datawriterqos ")
01074                           ACE_TEXT("Unexpected entry (%C) in [datawriterqos/%C] section.\n"),
01075                           name.c_str(), datawriterqos_name.c_str()),
01076                           -1);
01077       }
01078     }
01079 
01080     registry.datawriterqos_map[datawriterqos_name] = datawriterqos;
01081   }
01082 
01083   return 0;
01084 }
01085 
01086 int
01087 StaticDiscovery::parse_datareaderqos(ACE_Configuration_Heap& cf)
01088 {
01089   const ACE_Configuration_Section_Key& root = cf.root_section();
01090   ACE_Configuration_Section_Key section;
01091 
01092   if (cf.open_section(root, DATAREADERQOS_SECTION_NAME, 0, section) != 0) {
01093     if (DCPS_debug_level > 0) {
01094       // This is not an error if the configuration file does not have
01095       // any datareaderqos (sub)section.
01096       ACE_DEBUG((LM_NOTICE,
01097                   ACE_TEXT("(%P|%t) NOTICE: StaticDiscovery::parse_datareaderqos ")
01098                   ACE_TEXT("no [%s] sections.\n"),
01099                   DATAREADERQOS_SECTION_NAME));
01100     }
01101     return 0;
01102   }
01103 
01104   // Ensure there are no key/values in the [datareaderqos] section.
01105   // Every key/value must be in a [datareaderqos/*] sub-section.
01106   ValueMap vm;
01107   if (pullValues(cf, section, vm) > 0) {
01108     ACE_ERROR_RETURN((LM_ERROR,
01109                       ACE_TEXT("(%P|%t) StaticDiscovery::parse_datareaderqos ")
01110                       ACE_TEXT("[datareaderqos] sections must have a subsection name\n")),
01111                       -1);
01112   }
01113   // Process the subsections of this section
01114   KeyList keys;
01115   if (processSections(cf, section, keys) != 0) {
01116     ACE_ERROR_RETURN((LM_ERROR,
01117                       ACE_TEXT("(%P|%t) StaticDiscovery::parse_datareaderqos ")
01118                       ACE_TEXT("too many nesting layers in the [datareaderqos] section.\n")),
01119                       -1);
01120   }
01121 
01122   // Loop through the [datareaderqos/*] sections
01123   for (KeyList::const_iterator it = keys.begin(); it != keys.end(); ++it) {
01124     OPENDDS_STRING datareaderqos_name = it->first;
01125 
01126     if (DCPS_debug_level > 0) {
01127       ACE_DEBUG((LM_NOTICE,
01128                   ACE_TEXT("(%P|%t) NOTICE: StaticDiscovery::parse_datareaderqos ")
01129                   ACE_TEXT("processing [datareaderqos/%C] section.\n"),
01130                   datareaderqos_name.c_str()));
01131     }
01132 
01133     ValueMap values;
01134     pullValues(cf, it->second, values);
01135 
01136     DDS::DataReaderQos datareaderqos(TheServiceParticipant->initial_DataReaderQos());
01137 
01138     for (ValueMap::const_iterator it = values.begin(); it != values.end(); ++it) {
01139       OPENDDS_STRING name = it->first;
01140       OPENDDS_STRING value = it->second;
01141 
01142       if (name == "durability.kind") {
01143         if (value == "VOLATILE") {
01144           datareaderqos.durability.kind = DDS::VOLATILE_DURABILITY_QOS;
01145         } else if (value == "TRANSIENT_LOCAL") {
01146           datareaderqos.durability.kind = DDS::TRANSIENT_LOCAL_DURABILITY_QOS;
01147 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
01148         } else if (value == "TRANSIENT") {
01149           datareaderqos.durability.kind = DDS::TRANSIENT_DURABILITY_QOS;
01150         } else if (value == "PERSISTENT") {
01151           datareaderqos.durability.kind = DDS::PERSISTENT_DURABILITY_QOS;
01152 #endif
01153         } else {
01154           ACE_ERROR_RETURN((LM_ERROR,
01155                             ACE_TEXT("(%P|%t) StaticDiscovery::parse_datareaderqos ")
01156                             ACE_TEXT("Illegal value for durability.kind (%C) in [datareaderqos/%C] section.\n"),
01157                             value.c_str(), datareaderqos_name.c_str()),
01158                             -1);
01159         }
01160       } else if (name == "deadline.period.sec") {
01161         parse_second(datareaderqos.deadline.period.sec, value);
01162       } else if (name == "deadline.period.nanosec") {
01163         parse_nanosecond(datareaderqos.deadline.period.nanosec, value);
01164       } else if (name == "latency_budget.duration.sec") {
01165         parse_second(datareaderqos.latency_budget.duration.sec, value);
01166       } else if (name == "latency_budget.duration.nanosec") {
01167         parse_nanosecond(datareaderqos.latency_budget.duration.nanosec, value);
01168       } else if (name == "liveliness.kind") {
01169         if (value == "AUTOMATIC") {
01170           datareaderqos.liveliness.kind = DDS::AUTOMATIC_LIVELINESS_QOS;
01171         } else if (value == "MANUAL_BY_TOPIC") {
01172           datareaderqos.liveliness.kind = DDS::MANUAL_BY_TOPIC_LIVELINESS_QOS;
01173         } else if (value == "MANUAL_BY_PARTICIPANT") {
01174           datareaderqos.liveliness.kind = DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS;
01175         } else {
01176           ACE_ERROR_RETURN((LM_ERROR,
01177                             ACE_TEXT("(%P|%t) StaticDiscovery::parse_datareaderqos ")
01178                             ACE_TEXT("Illegal value for liveliness.kind (%C) in [datareaderqos/%C] section.\n"),
01179                             value.c_str(), datareaderqos_name.c_str()),
01180                             -1);
01181         }
01182       } else if (name == "liveliness.lease_duration.sec") {
01183         parse_second(datareaderqos.liveliness.lease_duration.sec, value);
01184       } else if (name == "liveliness.lease_duration.nanosec") {
01185         parse_nanosecond(datareaderqos.liveliness.lease_duration.nanosec, value);
01186       } else if (name == "reliability.kind") {
01187         if (value == "BEST_EFFORT") {
01188           datareaderqos.reliability.kind = DDS::BEST_EFFORT_RELIABILITY_QOS;
01189         } else if (value == "RELIABLE") {
01190           datareaderqos.reliability.kind = DDS::RELIABLE_RELIABILITY_QOS;
01191         } else {
01192           ACE_ERROR_RETURN((LM_ERROR,
01193                             ACE_TEXT("(%P|%t) StaticDiscovery::parse_datareaderqos ")
01194                             ACE_TEXT("Illegal value for reliability.kind (%C) in [datareaderqos/%C] section.\n"),
01195                             value.c_str(), datareaderqos_name.c_str()),
01196                             -1);
01197         }
01198       } else if (name == "reliability.max_blocking_time.sec") {
01199         parse_second(datareaderqos.reliability.max_blocking_time.sec, value);
01200       } else if (name == "reliability.max_blocking_time.nanosec") {
01201         parse_nanosecond(datareaderqos.reliability.max_blocking_time.nanosec, value);
01202       } else if (name == "destination_order.kind") {
01203         if (value == "BY_RECEPTION_TIMESTAMP") {
01204           datareaderqos.destination_order.kind = DDS::BY_RECEPTION_TIMESTAMP_DESTINATIONORDER_QOS;
01205         } else if (value == "BY_SOURCE_TIMESTAMP") {
01206           datareaderqos.destination_order.kind = DDS::BY_SOURCE_TIMESTAMP_DESTINATIONORDER_QOS;
01207         } else {
01208           ACE_ERROR_RETURN((LM_ERROR,
01209                             ACE_TEXT("(%P|%t) StaticDiscovery::parse_datareaderqos ")
01210                             ACE_TEXT("Illegal value for destination_order.kind (%C) in [datareaderqos/%C] section.\n"),
01211                             value.c_str(), datareaderqos_name.c_str()),
01212                             -1);
01213         }
01214       } else if (name == "history.kind") {
01215         if (value == "KEEP_ALL") {
01216           datareaderqos.history.kind = DDS::KEEP_ALL_HISTORY_QOS;
01217         } else if (value == "KEEP_LAST") {
01218           datareaderqos.history.kind = DDS::KEEP_LAST_HISTORY_QOS;
01219         } else {
01220           ACE_ERROR_RETURN((LM_ERROR,
01221                             ACE_TEXT("(%P|%t) StaticDiscovery::parse_datareaderqos ")
01222                             ACE_TEXT("Illegal value for history.kind (%C) in [datareaderqos/%C] section.\n"),
01223                             value.c_str(), datareaderqos_name.c_str()),
01224                             -1);
01225         }
01226       } else if (name == "history.depth") {
01227         datareaderqos.history.depth = atoi(value.c_str());
01228       } else if (name == "resource_limits.max_samples") {
01229         datareaderqos.resource_limits.max_samples = atoi(value.c_str());
01230       } else if (name == "resource_limits.max_instances") {
01231         datareaderqos.resource_limits.max_instances = atoi(value.c_str());
01232       } else if (name == "resource_limits.max_samples_per_instance") {
01233         datareaderqos.resource_limits.max_samples_per_instance = atoi(value.c_str());
01234       } else if (name == "time_based_filter.minimum_separation.sec") {
01235         parse_second(datareaderqos.time_based_filter.minimum_separation.sec, value);
01236       } else if (name == "time_based_filter.minimum_separation.nanosec") {
01237         parse_nanosecond(datareaderqos.time_based_filter.minimum_separation.nanosec, value);
01238       } else if (name == "reader_data_lifecycle.autopurge_nowriter_samples_delay.sec") {
01239         parse_second(datareaderqos.reader_data_lifecycle.autopurge_nowriter_samples_delay.sec, value);
01240       } else if (name == "reader_data_lifecycle.autopurge_nowriter_samples_delay.nanosec") {
01241         parse_nanosecond(datareaderqos.reader_data_lifecycle.autopurge_nowriter_samples_delay.nanosec, value);
01242       } else if (name == "reader_data_lifecycle.autopurge_disposed_samples_delay.sec") {
01243         parse_second(datareaderqos.reader_data_lifecycle.autopurge_disposed_samples_delay.sec, value);
01244       } else if (name == "reader_data_lifecycle.autopurge_disposed_samples_delay.nanosec") {
01245         parse_nanosecond(datareaderqos.reader_data_lifecycle.autopurge_disposed_samples_delay.nanosec, value);
01246       } else {
01247         ACE_ERROR_RETURN((LM_ERROR,
01248                           ACE_TEXT("(%P|%t) StaticDiscovery::parse_datareaderqos ")
01249                           ACE_TEXT("Unexpected entry (%C) in [datareaderqos/%C] section.\n"),
01250                           name.c_str(), datareaderqos_name.c_str()),
01251                           -1);
01252       }
01253     }
01254 
01255     registry.datareaderqos_map[datareaderqos_name] = datareaderqos;
01256   }
01257 
01258   return 0;
01259 }
01260 
01261 int
01262 StaticDiscovery::parse_publisherqos(ACE_Configuration_Heap& cf)
01263 {
01264   const ACE_Configuration_Section_Key& root = cf.root_section();
01265   ACE_Configuration_Section_Key section;
01266 
01267   if (cf.open_section(root, PUBLISHERQOS_SECTION_NAME, 0, section) != 0) {
01268     if (DCPS_debug_level > 0) {
01269       // This is not an error if the configuration file does not have
01270       // any publisherqos (sub)section.
01271       ACE_DEBUG((LM_NOTICE,
01272                   ACE_TEXT("(%P|%t) NOTICE: StaticDiscovery::parse_publisherqos ")
01273                   ACE_TEXT("no [%s] sections.\n"),
01274                   PUBLISHERQOS_SECTION_NAME));
01275     }
01276     return 0;
01277   }
01278 
01279   // Ensure there are no key/values in the [publisherqos] section.
01280   // Every key/value must be in a [publisherqos/*] sub-section.
01281   ValueMap vm;
01282   if (pullValues(cf, section, vm) > 0) {
01283     ACE_ERROR_RETURN((LM_ERROR,
01284                       ACE_TEXT("(%P|%t) StaticDiscovery::parse_publisherqos ")
01285                       ACE_TEXT("[publisherqos] sections must have a subsection name\n")),
01286                       -1);
01287   }
01288   // Process the subsections of this section
01289   KeyList keys;
01290   if (processSections(cf, section, keys) != 0) {
01291     ACE_ERROR_RETURN((LM_ERROR,
01292                       ACE_TEXT("(%P|%t) StaticDiscovery::parse_publisherqos ")
01293                       ACE_TEXT("too many nesting layers in the [publisherqos] section.\n")),
01294                       -1);
01295   }
01296 
01297   // Loop through the [publisherqos/*] sections
01298   for (KeyList::const_iterator it = keys.begin(); it != keys.end(); ++it) {
01299     OPENDDS_STRING publisherqos_name = it->first;
01300 
01301     if (DCPS_debug_level > 0) {
01302       ACE_DEBUG((LM_NOTICE,
01303                   ACE_TEXT("(%P|%t) NOTICE: StaticDiscovery::parse_publisherqos ")
01304                   ACE_TEXT("processing [publisherqos/%C] section.\n"),
01305                   publisherqos_name.c_str()));
01306     }
01307 
01308     ValueMap values;
01309     pullValues(cf, it->second, values);
01310 
01311     DDS::PublisherQos publisherqos(TheServiceParticipant->initial_PublisherQos());
01312 
01313     for (ValueMap::const_iterator it = values.begin(); it != values.end(); ++it) {
01314       OPENDDS_STRING name = it->first;
01315       OPENDDS_STRING value = it->second;
01316 
01317       if (name == "presentation.access_scope") {
01318         if (value == "INSTANCE") {
01319           publisherqos.presentation.access_scope = DDS::INSTANCE_PRESENTATION_QOS;
01320         } else if (value == "TOPIC") {
01321           publisherqos.presentation.access_scope = DDS::TOPIC_PRESENTATION_QOS;
01322         } else if (value == "GROUP") {
01323           publisherqos.presentation.access_scope = DDS::GROUP_PRESENTATION_QOS;
01324         } else {
01325           ACE_ERROR_RETURN((LM_ERROR,
01326                             ACE_TEXT("(%P|%t) StaticDiscovery::parse_publisherqos ")
01327                             ACE_TEXT("Illegal value for presentation.access_scope (%C) in [publisherqos/%C] section.\n"),
01328                             value.c_str(), publisherqos_name.c_str()),
01329                             -1);
01330         }
01331       } else if (name == "presentation.coherent_access") {
01332         if (parse_bool(publisherqos.presentation.coherent_access, value)) {
01333         } else {
01334           ACE_ERROR_RETURN((LM_ERROR,
01335                             ACE_TEXT("(%P|%t) StaticDiscovery::parse_publisherqos ")
01336                             ACE_TEXT("Illegal value for presentation.coherent_access (%C) in [publisherqos/%C] section.\n"),
01337                             value.c_str(), publisherqos_name.c_str()),
01338                             -1);
01339         }
01340       } else if (name == "presentation.ordered_access") {
01341         if (parse_bool(publisherqos.presentation.ordered_access, value)) {
01342         } else {
01343           ACE_ERROR_RETURN((LM_ERROR,
01344                             ACE_TEXT("(%P|%t) StaticDiscovery::parse_publisherqos ")
01345                             ACE_TEXT("Illegal value for presentation.ordered_access (%C)")
01346                             ACE_TEXT("in [publisherqos/%C] section.\n"),
01347                             value.c_str(), publisherqos_name.c_str()),
01348                             -1);
01349         }
01350       } else if (name == "partition.name") {
01351         try {
01352           parse_list(publisherqos.partition, value);
01353         }
01354         catch (const CORBA::Exception& ex) {
01355           ACE_ERROR_RETURN((LM_ERROR,
01356             ACE_TEXT("(%P|%t) StaticDiscovery::parse_publisherqos ")
01357             ACE_TEXT("Exception caught while parsing partition.name (%C) ")
01358             ACE_TEXT("in [publisherqos/%C] section: %C.\n"),
01359             value.c_str(), publisherqos_name.c_str(), ex._info().c_str()),
01360             -1);
01361         }
01362       } else {
01363         ACE_ERROR_RETURN((LM_ERROR,
01364                           ACE_TEXT("(%P|%t) StaticDiscovery::parse_publisherqos ")
01365                           ACE_TEXT("Unexpected entry (%C) in [publisherqos/%C] section.\n"),
01366                           name.c_str(), publisherqos_name.c_str()),
01367                           -1);
01368       }
01369     }
01370 
01371     registry.publisherqos_map[publisherqos_name] = publisherqos;
01372   }
01373 
01374   return 0;
01375 }
01376 
01377 int
01378 StaticDiscovery::parse_subscriberqos(ACE_Configuration_Heap& cf)
01379 {
01380   const ACE_Configuration_Section_Key& root = cf.root_section();
01381   ACE_Configuration_Section_Key section;
01382 
01383   if (cf.open_section(root, SUBSCRIBERQOS_SECTION_NAME, 0, section) != 0) {
01384     if (DCPS_debug_level > 0) {
01385       // This is not an error if the configuration file does not have
01386       // any subscriberqos (sub)section.
01387       ACE_DEBUG((LM_NOTICE,
01388                   ACE_TEXT("(%P|%t) NOTICE: StaticDiscovery::parse_subscriberqos ")
01389                   ACE_TEXT("no [%s] sections.\n"),
01390                   SUBSCRIBERQOS_SECTION_NAME));
01391     }
01392     return 0;
01393   }
01394 
01395   // Ensure there are no key/values in the [subscriberqos] section.
01396   // Every key/value must be in a [subscriberqos/*] sub-section.
01397   ValueMap vm;
01398   if (pullValues(cf, section, vm) > 0) {
01399     ACE_ERROR_RETURN((LM_ERROR,
01400                       ACE_TEXT("(%P|%t) StaticDiscovery::parse_subscriberqos ")
01401                       ACE_TEXT("[subscriberqos] sections must have a subsection name\n")),
01402                       -1);
01403   }
01404   // Process the subsections of this section
01405   KeyList keys;
01406   if (processSections(cf, section, keys) != 0) {
01407     ACE_ERROR_RETURN((LM_ERROR,
01408                       ACE_TEXT("(%P|%t) StaticDiscovery::parse_subscriberqos ")
01409                       ACE_TEXT("too many nesting layers in the [subscriberqos] section.\n")),
01410                       -1);
01411   }
01412 
01413   // Loop through the [subscriberqos/*] sections
01414   for (KeyList::const_iterator it = keys.begin(); it != keys.end(); ++it) {
01415     OPENDDS_STRING subscriberqos_name = it->first;
01416 
01417     if (DCPS_debug_level > 0) {
01418       ACE_DEBUG((LM_NOTICE,
01419                   ACE_TEXT("(%P|%t) NOTICE: StaticDiscovery::parse_subscriberqos ")
01420                   ACE_TEXT("processing [subscriberqos/%C] section.\n"),
01421                   subscriberqos_name.c_str()));
01422     }
01423 
01424     ValueMap values;
01425     pullValues(cf, it->second, values);
01426 
01427     DDS::SubscriberQos subscriberqos(TheServiceParticipant->initial_SubscriberQos());
01428 
01429     for (ValueMap::const_iterator it = values.begin(); it != values.end(); ++it) {
01430       OPENDDS_STRING name = it->first;
01431       OPENDDS_STRING value = it->second;
01432 
01433       if (name == "presentation.access_scope") {
01434         if (value == "INSTANCE") {
01435           subscriberqos.presentation.access_scope = DDS::INSTANCE_PRESENTATION_QOS;
01436         } else if (value == "TOPIC") {
01437           subscriberqos.presentation.access_scope = DDS::TOPIC_PRESENTATION_QOS;
01438         } else if (value == "GROUP") {
01439           subscriberqos.presentation.access_scope = DDS::GROUP_PRESENTATION_QOS;
01440         } else {
01441           ACE_ERROR_RETURN((LM_ERROR,
01442                             ACE_TEXT("(%P|%t) StaticDiscovery::parse_subscriberqos ")
01443                             ACE_TEXT("Illegal value for presentation.access_scope (%C) in [subscriberqos/%C] section.\n"),
01444                             value.c_str(), subscriberqos_name.c_str()),
01445                             -1);
01446         }
01447       } else if (name == "presentation.coherent_access") {
01448         if (parse_bool(subscriberqos.presentation.coherent_access, value)) {
01449         } else {
01450           ACE_ERROR_RETURN((LM_ERROR,
01451                             ACE_TEXT("(%P|%t) StaticDiscovery::parse_subscriberqos ")
01452                             ACE_TEXT("Illegal value for presentation.coherent_access (%C) in [subscriberqos/%C] section.\n"),
01453                             value.c_str(), subscriberqos_name.c_str()),
01454                             -1);
01455         }
01456       } else if (name == "presentation.ordered_access") {
01457         if (parse_bool(subscriberqos.presentation.ordered_access, value)) {
01458         } else {
01459           ACE_ERROR_RETURN((LM_ERROR,
01460                             ACE_TEXT("(%P|%t) StaticDiscovery::parse_subscriberqos ")
01461                             ACE_TEXT("Illegal value for presentation.ordered_access (%C) in [subscriberqos/%C] section.\n"),
01462                             value.c_str(), subscriberqos_name.c_str()),
01463                             -1);
01464         }
01465       } else if (name == "partition.name") {
01466         try {
01467           parse_list(subscriberqos.partition, value);
01468         }
01469         catch (const CORBA::Exception& ex) {
01470           ACE_ERROR_RETURN((LM_ERROR,
01471             ACE_TEXT("(%P|%t) StaticDiscovery::parse_subscriberqos ")
01472             ACE_TEXT("Exception caught while parsing partition.name (%C) ")
01473             ACE_TEXT("in [subscriberqos/%C] section: %C.\n"),
01474             value.c_str(), subscriberqos_name.c_str(), ex._info().c_str()),
01475             -1);
01476         }
01477       } else {
01478         ACE_ERROR_RETURN((LM_ERROR,
01479                           ACE_TEXT("(%P|%t) StaticDiscovery::parse_subscriberqos ")
01480                           ACE_TEXT("Unexpected entry (%C) in [subscriberqos/%C] section.\n"),
01481                           name.c_str(), subscriberqos_name.c_str()),
01482                           -1);
01483       }
01484     }
01485 
01486    registry.subscriberqos_map[subscriberqos_name] = subscriberqos;
01487   }
01488 
01489   return 0;
01490 }
01491 
01492 int
01493 StaticDiscovery::parse_endpoints(ACE_Configuration_Heap& cf)
01494 {
01495   const ACE_Configuration_Section_Key& root = cf.root_section();
01496   ACE_Configuration_Section_Key section;
01497 
01498   if (cf.open_section(root, ENDPOINT_SECTION_NAME, 0, section) != 0) {
01499     if (DCPS_debug_level > 0) {
01500       // This is not an error if the configuration file does not have
01501       // any endpoint (sub)section.
01502       ACE_DEBUG((LM_NOTICE,
01503                   ACE_TEXT("(%P|%t) NOTICE: StaticDiscovery::parse_endpoints ")
01504                   ACE_TEXT("no [%s] sections.\n"),
01505                   ENDPOINT_SECTION_NAME));
01506     }
01507     return 0;
01508   }
01509 
01510   // Ensure there are no key/values in the [endpoint] section.
01511   // Every key/value must be in a [endpoint/*] sub-section.
01512   ValueMap vm;
01513   if (pullValues(cf, section, vm) > 0) {
01514     ACE_ERROR_RETURN((LM_ERROR,
01515                       ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
01516                       ACE_TEXT("[endpoint] sections must have a subsection name\n")),
01517                       -1);
01518   }
01519   // Process the subsections of this section
01520   KeyList keys;
01521   if (processSections(cf, section, keys) != 0) {
01522     ACE_ERROR_RETURN((LM_ERROR,
01523                       ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
01524                       ACE_TEXT("too many nesting layers in the [endpoint] section.\n")),
01525                       -1);
01526   }
01527 
01528   // Loop through the [endpoint/*] sections
01529   for (KeyList::const_iterator it = keys.begin(); it != keys.end(); ++it) {
01530     OPENDDS_STRING endpoint_name = it->first;
01531 
01532     if (DCPS_debug_level > 0) {
01533       ACE_DEBUG((LM_NOTICE,
01534                   ACE_TEXT("(%P|%t) NOTICE: StaticDiscovery::parse_endpoints ")
01535                   ACE_TEXT("processing [endpoint/%C] section.\n"),
01536                   endpoint_name.c_str()));
01537     }
01538 
01539     ValueMap values;
01540     pullValues(cf, it->second, values);
01541     int domain = 0;
01542     unsigned char participant[6];
01543     unsigned char entity[3];
01544     enum Type {
01545       Reader,
01546       Writer
01547     };
01548     Type type = Reader; // avoid warning
01549     OPENDDS_STRING topic_name;
01550     DDS::DataWriterQos datawriterqos(TheServiceParticipant->initial_DataWriterQos());
01551     DDS::DataReaderQos datareaderqos(TheServiceParticipant->initial_DataReaderQos());
01552     DDS::PublisherQos publisherqos(TheServiceParticipant->initial_PublisherQos());
01553     DDS::SubscriberQos subscriberqos(TheServiceParticipant->initial_SubscriberQos());
01554     TransportLocatorSeq trans_info;
01555     OPENDDS_STRING config_name;
01556 
01557     bool domain_specified = false,
01558       participant_specified = false,
01559       entity_specified = false,
01560       type_specified = false,
01561       topic_name_specified = false,
01562       config_name_specified = false;
01563 
01564     for (ValueMap::const_iterator it = values.begin(); it != values.end(); ++it) {
01565       OPENDDS_STRING name = it->first;
01566       OPENDDS_STRING value = it->second;
01567 
01568       if (name == "domain") {
01569         if (convertToInteger(value, domain)) {
01570           domain_specified = true;
01571         } else {
01572           ACE_ERROR_RETURN((LM_ERROR,
01573                             ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
01574                             ACE_TEXT("Illegal integer value for domain (%C) in [endpoint/%C] section.\n"),
01575                             value.c_str(), endpoint_name.c_str()),
01576                             -1);
01577         }
01578       } else if (name == "participant") {
01579 #ifdef __SUNPRO_CC
01580         int count = 0; std::count_if(value.begin(), value.end(), isxdigit, count);
01581 #else
01582         int count = std::count_if(value.begin(), value.end(), isxdigit);
01583 #endif
01584         if (value.size() != HEX_DIGITS_IN_PARTICIPANT || static_cast<size_t>(count) != HEX_DIGITS_IN_PARTICIPANT) {
01585           ACE_ERROR_RETURN((LM_ERROR,
01586                             ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
01587                             ACE_TEXT("participant (%C) must be 12 hexadecimal digits in [endpoint/%C] section.\n"),
01588                             value.c_str(), endpoint_name.c_str()),
01589                             -1);
01590         }
01591 
01592         for (size_t idx = 0; idx != BYTES_IN_PARTICIPANT; ++idx) {
01593           participant[idx] = fromhex(value, idx);
01594         }
01595         participant_specified = true;
01596       } else if (name == "entity") {
01597 #ifdef __SUNPRO_CC
01598         int count = 0; std::count_if(value.begin(), value.end(), isxdigit, count);
01599 #else
01600         int count = std::count_if(value.begin(), value.end(), isxdigit);
01601 #endif
01602         if (value.size() != HEX_DIGITS_IN_ENTITY || static_cast<size_t>(count) != HEX_DIGITS_IN_ENTITY) {
01603           ACE_ERROR_RETURN((LM_ERROR,
01604                             ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
01605                             ACE_TEXT("entity (%C) must be 6 hexadecimal digits in [endpoint/%C] section.\n"),
01606                             value.c_str(), endpoint_name.c_str()),
01607                             -1);
01608         }
01609 
01610         for (size_t idx = 0; idx != BYTES_IN_ENTITY; ++idx) {
01611           entity[idx] = fromhex(value, idx);
01612         }
01613         entity_specified = true;
01614       } else if (name == "type") {
01615         if (value == "reader") {
01616           type = Reader;
01617           type_specified = true;
01618         } else if (value == "writer") {
01619           type = Writer;
01620           type_specified = true;
01621         } else {
01622           ACE_ERROR_RETURN((LM_ERROR,
01623                             ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
01624                             ACE_TEXT("Illegal string value for type (%C) in [endpoint/%C] section.\n"),
01625                             value.c_str(), endpoint_name.c_str()),
01626                             -1);
01627         }
01628       } else if (name == "topic") {
01629         EndpointRegistry::TopicMapType::const_iterator pos = this->registry.topic_map.find(value);
01630         if (pos != this->registry.topic_map.end()) {
01631           topic_name = pos->second.name;
01632           topic_name_specified = true;
01633         } else {
01634           ACE_ERROR_RETURN((LM_ERROR,
01635                             ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
01636                             ACE_TEXT("Illegal topic reference (%C) in [endpoint/%C] section.\n"),
01637                             value.c_str(), endpoint_name.c_str()),
01638                             -1);
01639         }
01640       } else if (name == "datawriterqos") {
01641         EndpointRegistry::DataWriterQosMapType::const_iterator pos = this->registry.datawriterqos_map.find(value);
01642         if (pos != this->registry.datawriterqos_map.end()) {
01643           datawriterqos = pos->second;
01644         } else {
01645           ACE_ERROR_RETURN((LM_ERROR,
01646                             ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
01647                             ACE_TEXT("Illegal datawriterqos reference (%C) in [endpoint/%C] section.\n"),
01648                             value.c_str(), endpoint_name.c_str()),
01649                             -1);
01650         }
01651       } else if (name == "publisherqos") {
01652         EndpointRegistry::PublisherQosMapType::const_iterator pos = this->registry.publisherqos_map.find(value);
01653         if (pos != this->registry.publisherqos_map.end()) {
01654           publisherqos = pos->second;
01655         } else {
01656           ACE_ERROR_RETURN((LM_ERROR,
01657                             ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
01658                             ACE_TEXT("Illegal publisherqos reference (%C) in [endpoint/%C] section.\n"),
01659                             value.c_str(), endpoint_name.c_str()),
01660                             -1);
01661         }
01662       } else if (name == "datareaderqos") {
01663         EndpointRegistry::DataReaderQosMapType::const_iterator pos = this->registry.datareaderqos_map.find(value);
01664         if (pos != this->registry.datareaderqos_map.end()) {
01665           datareaderqos = pos->second;
01666         } else {
01667           ACE_ERROR_RETURN((LM_ERROR,
01668                             ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
01669                             ACE_TEXT("Illegal datareaderqos reference (%C) in [endpoint/%C] section.\n"),
01670                             value.c_str(), endpoint_name.c_str()),
01671                             -1);
01672         }
01673       } else if (name == "subscriberqos") {
01674         EndpointRegistry::SubscriberQosMapType::const_iterator pos = this->registry.subscriberqos_map.find(value);
01675         if (pos != this->registry.subscriberqos_map.end()) {
01676           subscriberqos = pos->second;
01677         } else {
01678           ACE_ERROR_RETURN((LM_ERROR,
01679                             ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
01680                             ACE_TEXT("Illegal subscriberqos reference (%C) in [endpoint/%C] section.\n"),
01681                             value.c_str(), endpoint_name.c_str()),
01682                             -1);
01683         }
01684       } else if (name == "config") {
01685         config_name = value;
01686         config_name_specified = true;
01687       } else {
01688         ACE_ERROR_RETURN((LM_ERROR,
01689                           ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
01690                           ACE_TEXT("Unexpected entry (%C) in [endpoint/%C] section.\n"),
01691                           name.c_str(), endpoint_name.c_str()),
01692                           -1);
01693       }
01694     }
01695 
01696     if (!domain_specified) {
01697       ACE_ERROR_RETURN((LM_ERROR,
01698                         ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
01699                         ACE_TEXT("No domain specified for [endpoint/%C] section.\n"),
01700                         endpoint_name.c_str()),
01701                         -1);
01702     }
01703 
01704     if (!participant_specified) {
01705       ACE_ERROR_RETURN((LM_ERROR,
01706                         ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
01707                         ACE_TEXT("No participant specified for [endpoint/%C] section.\n"),
01708                         endpoint_name.c_str()),
01709                         -1);
01710     }
01711 
01712     if (!entity_specified) {
01713       ACE_ERROR_RETURN((LM_ERROR,
01714                         ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
01715                         ACE_TEXT("No entity specified for [endpoint/%C] section.\n"),
01716                         endpoint_name.c_str()),
01717                         -1);
01718     }
01719 
01720     if (!type_specified) {
01721       ACE_ERROR_RETURN((LM_ERROR,
01722                         ACE_TEXT("(%P|%t) ERROR:StaticDiscovery::parse_endpoints ")
01723                         ACE_TEXT("No type specified for [endpoint/%C] section.\n"),
01724                         endpoint_name.c_str()),
01725                         -1);
01726     }
01727 
01728     if (!topic_name_specified) {
01729       ACE_ERROR_RETURN((LM_ERROR,
01730                         ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
01731                         ACE_TEXT("No topic specified for [endpoint/%C] section.\n"),
01732                         endpoint_name.c_str()),
01733                         -1);
01734     }
01735 
01736     TransportConfig_rch config;
01737 
01738     if (config_name_specified) {
01739       config = TheTransportRegistry->get_config(config_name);
01740       if (config.is_nil()) {
01741         ACE_ERROR_RETURN((LM_ERROR,
01742                           ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
01743                           ACE_TEXT("Illegal config reference (%C) in [endpoint/%C] section.\n"),
01744                           config_name.c_str(), endpoint_name.c_str()),
01745                           -1);
01746       }
01747     }
01748 
01749     if (config.is_nil() && domain_specified) {
01750       config = TheTransportRegistry->domain_default_config(domain);
01751     }
01752 
01753     if (config.is_nil()) {
01754       config = TheTransportRegistry->global_config();
01755     }
01756 
01757     try {
01758       config->populate_locators(trans_info);
01759     }
01760     catch (const CORBA::Exception& ex) {
01761       ACE_ERROR_RETURN((LM_ERROR,
01762                         ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
01763                         ACE_TEXT("Exception caught while populating locators for [endpoint/%C] section. %C\n"),
01764                         endpoint_name.c_str(), ex._info().c_str()),
01765                         -1);
01766     }
01767     if (trans_info.length() == 0) {
01768         ACE_ERROR_RETURN((LM_ERROR,
01769                           ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
01770                           ACE_TEXT("No locators for [endpoint/%C] section.\n"),
01771                           endpoint_name.c_str()),
01772                           -1);
01773     }
01774 
01775     EntityId_t entity_id = EndpointRegistry::build_id(entity,
01776       (type == Reader) ? ENTITYKIND_USER_READER_WITH_KEY : ENTITYKIND_USER_WRITER_WITH_KEY);
01777 
01778     RepoId id = EndpointRegistry::build_id(domain, participant, entity_id);
01779 
01780     if (DCPS_debug_level > 0) {
01781       ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DEBUG: StaticDiscovery::parse_endpoints adding entity with id %C\n"), LogGuid(id).c_str()));
01782     }
01783 
01784     switch (type) {
01785     case Reader:
01786       // Populate the userdata.
01787       datareaderqos.user_data.value.length(3);
01788       datareaderqos.user_data.value[0] = entity_id.entityKey[0];
01789       datareaderqos.user_data.value[1] = entity_id.entityKey[1];
01790       datareaderqos.user_data.value[2] = entity_id.entityKey[2];
01791 
01792       if (!registry.reader_map.insert(std::make_pair(id,
01793             EndpointRegistry::Reader(topic_name, datareaderqos, subscriberqos, config_name, trans_info))).second) {
01794         ACE_ERROR_RETURN((LM_ERROR,
01795                           ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
01796                           ACE_TEXT("Section [endpoint/%C] ignored - duplicate reader.\n"),
01797                           endpoint_name.c_str()),
01798                           -1);
01799       }
01800       break;
01801     case Writer:
01802       // Populate the userdata.
01803       datawriterqos.user_data.value.length(3);
01804       datawriterqos.user_data.value[0] = entity_id.entityKey[0];
01805       datawriterqos.user_data.value[1] = entity_id.entityKey[1];
01806       datawriterqos.user_data.value[2] = entity_id.entityKey[2];
01807 
01808       if (!registry.writer_map.insert(std::make_pair(id,
01809             EndpointRegistry::Writer(topic_name, datawriterqos, publisherqos, config_name, trans_info))).second) {
01810         ACE_ERROR_RETURN((LM_ERROR,
01811                           ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
01812                           ACE_TEXT("Section [endpoint/%C] ignored - duplicate writer.\n"),
01813                           endpoint_name.c_str()),
01814                           -1);
01815       }
01816       break;
01817     }
01818   }
01819 
01820   return 0;
01821 }
01822 
01823 void StaticDiscovery::pre_writer(DataWriterImpl* writer)
01824 {
01825   const DDS::Publisher_var pub = writer->get_publisher();
01826   const DDS::DomainParticipant_var part = pub->get_participant();
01827   const DDS::DomainId_t dom = part->get_domain_id();
01828 
01829   DDS::DomainParticipantQos partQos;
01830   part->get_qos(partQos);
01831   if (partQos.user_data.value.length() < 6)
01832     return;
01833   const unsigned char* const partId = partQos.user_data.value.get_buffer();
01834 
01835   DDS::DataWriterQos qos;
01836   writer->get_qos(qos);
01837   if (qos.user_data.value.length() < 3)
01838     return;
01839   const unsigned char* const dwId = qos.user_data.value.get_buffer();
01840 
01841   const EntityId_t entId =
01842     EndpointRegistry::build_id(dwId, ENTITYKIND_USER_WRITER_WITH_KEY);
01843   const RepoId rid = EndpointRegistry::build_id(dom, partId, entId);
01844 
01845   const EndpointRegistry::WriterMapType::const_iterator iter =
01846     registry.writer_map.find(rid);
01847 
01848   if (iter != registry.writer_map.end() && !iter->second.trans_cfg.empty()) {
01849     TransportRegistry::instance()->bind_config(iter->second.trans_cfg, writer);
01850   }
01851 }
01852 
01853 void StaticDiscovery::pre_reader(DataReaderImpl* reader)
01854 {
01855   const DDS::Subscriber_var sub = reader->get_subscriber();
01856   const DDS::DomainParticipant_var part = sub->get_participant();
01857   const DDS::DomainId_t dom = part->get_domain_id();
01858 
01859   DDS::DomainParticipantQos partQos;
01860   part->get_qos(partQos);
01861   if (partQos.user_data.value.length() < 6)
01862     return;
01863   const unsigned char* const partId = partQos.user_data.value.get_buffer();
01864 
01865   DDS::DataReaderQos qos;
01866   reader->get_qos(qos);
01867   if (qos.user_data.value.length() < 3)
01868     return;
01869   const unsigned char* const drId = qos.user_data.value.get_buffer();
01870 
01871   const EntityId_t entId =
01872     EndpointRegistry::build_id(drId, ENTITYKIND_USER_READER_WITH_KEY);
01873   const RepoId rid = EndpointRegistry::build_id(dom, partId, entId);
01874 
01875   const EndpointRegistry::ReaderMapType::const_iterator iter =
01876     registry.reader_map.find(rid);
01877 
01878   if (iter != registry.reader_map.end() && !iter->second.trans_cfg.empty()) {
01879     TransportRegistry::instance()->bind_config(iter->second.trans_cfg, reader);
01880   }
01881 }
01882 
01883 StaticDiscovery_rch StaticDiscovery::instance_(make_rch<StaticDiscovery>(Discovery::DEFAULT_STATIC));
01884 
01885 }
01886 }
01887 
01888 OPENDDS_END_VERSIONED_NAMESPACE_DECL
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1