StaticDiscovery.cpp

Go to the documentation of this file.
00001 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
00002 
00003 #include "StaticDiscovery.h"
00004 #include "dds/DCPS/debug.h"
00005 #include "dds/DCPS/ConfigUtils.h"
00006 #include "dds/DCPS/DomainParticipantImpl.h"
00007 #include "dds/DCPS/Marked_Default_Qos.h"
00008 #include "dds/DCPS/SubscriberImpl.h"
00009 #include "dds/DCPS/BuiltInTopicUtils.h"
00010 #include "dds/DCPS/Registered_Data_Types.h"
00011 #include "dds/DCPS/Qos_Helper.h"
00012 #include "dds/DCPS/transport/framework/TransportRegistry.h"
00013 
00014 #include <ctype.h>
00015 
00016 namespace OpenDDS {
00017 namespace DCPS {
00018 
00019 void EndpointRegistry::match()
00020 {
00021   for (WriterMapType::iterator wp = writer_map.begin(), wp_limit = writer_map.end();
00022        wp != wp_limit;
00023        ++wp) {
00024     const RepoId& writerid = wp->first;
00025     Writer& writer = wp->second;
00026     for (ReaderMapType::iterator rp = reader_map.begin(), rp_limit = reader_map.end();
00027          rp != rp_limit;
00028          ++rp) {
00029       const RepoId& readerid = rp->first;
00030       Reader& reader = rp->second;
00031 
00032       if (StaticDiscGuidDomainEqual()(readerid.guidPrefix, writerid.guidPrefix) &&
00033           !StaticDiscGuidPartEqual()(readerid.guidPrefix, writerid.guidPrefix) &&
00034           reader.topic_name == writer.topic_name) {
00035         // Different participants, same topic.
00036         IncompatibleQosStatus writerStatus = {0, 0, 0, DDS::QosPolicyCountSeq()};
00037         IncompatibleQosStatus readerStatus = {0, 0, 0, DDS::QosPolicyCountSeq()};
00038         const TransportLocatorSeq& writer_trans_info = writer.trans_info;
00039         const TransportLocatorSeq& reader_trans_info = reader.trans_info;
00040         const DDS::DataWriterQos& writer_qos = writer.qos;
00041         const DDS::DataReaderQos& reader_qos = reader.qos;
00042         const DDS::PublisherQos& publisher_qos = writer.publisher_qos;
00043         const DDS::SubscriberQos& subscriber_qos = reader.subscriber_qos;
00044 
00045         if (compatibleQOS(&writerStatus, &readerStatus, writer_trans_info, reader_trans_info,
00046                           &writer_qos, &reader_qos, &publisher_qos, &subscriber_qos)) {
00047           switch (reader.qos.reliability.kind) {
00048           case DDS::BEST_EFFORT_RELIABILITY_QOS:
00049             writer.best_effort_readers.insert(readerid);
00050             reader.best_effort_writers.insert(writerid);
00051             break;
00052           case DDS::RELIABLE_RELIABILITY_QOS:
00053             writer.reliable_readers.insert(readerid);
00054             reader.reliable_writers.insert(writerid);
00055             break;
00056           }
00057         }
00058       }
00059     }
00060   }
00061 }
00062 
00063 StaticEndpointManager::StaticEndpointManager(const RepoId& participant_id,
00064                                              ACE_Thread_Mutex& lock,
00065                                              const EndpointRegistry& registry,
00066                                              StaticParticipant& participant)
00067   : EndpointManager<StaticDiscoveredParticipantData>(participant_id, lock)
00068   , registry_(registry)
00069   , participant_(participant)
00070 {
00071   pub_bit_key_.value[0] = pub_bit_key_.value[1] = pub_bit_key_.value[2] = 0;
00072   sub_bit_key_.value[0] = sub_bit_key_.value[1] = sub_bit_key_.value[2] = 0;
00073 }
00074 
00075 void StaticEndpointManager::init_bit()
00076 {
00077   // Discover all remote publications and subscriptions.
00078 
00079   for (EndpointRegistry::WriterMapType::const_iterator pos = registry_.writer_map.begin(),
00080          limit = registry_.writer_map.end();
00081        pos != limit;
00082        ++pos) {
00083     const RepoId& remoteid = pos->first;
00084     const EndpointRegistry::Writer& writer = pos->second;
00085 
00086     if (!GuidPrefixEqual()(participant_id_.guidPrefix, remoteid.guidPrefix)) {
00087       increment_key(pub_bit_key_);
00088       pub_key_to_id_[pub_bit_key_] = remoteid;
00089 
00090       // pos represents a remote.
00091       // Populate data.
00092       DDS::PublicationBuiltinTopicData data;
00093 
00094       data.key = pub_bit_key_;
00095       OPENDDS_STRING topic_name = writer.topic_name;
00096       data.topic_name = topic_name.c_str();
00097       const EndpointRegistry::Topic& topic = registry_.topic_map.find(topic_name)->second;
00098       data.type_name = topic.type_name.c_str();
00099       data.durability = writer.qos.durability;
00100       data.durability_service = writer.qos.durability_service;
00101       data.deadline = writer.qos.deadline;
00102       data.latency_budget = writer.qos.latency_budget;
00103       data.liveliness = writer.qos.liveliness;
00104       data.reliability = writer.qos.reliability;
00105       data.lifespan = writer.qos.lifespan;
00106       data.user_data = writer.qos.user_data;
00107       data.ownership = writer.qos.ownership;
00108       data.ownership_strength = writer.qos.ownership_strength;
00109       data.destination_order = writer.qos.destination_order;
00110       data.presentation = writer.publisher_qos.presentation;
00111       data.partition = writer.publisher_qos.partition;
00112       // If the TopicQos becomes available, this can be populated.
00113       //data.topic_data = topic_details.qos_.topic_data;
00114       data.group_data = writer.publisher_qos.group_data;
00115 #ifndef DDS_HAS_MINIMUM_BIT
00116       DDS::PublicationBuiltinTopicDataDataReaderImpl* bit = pub_bit();
00117       if (bit) { // bit may be null if the DomainParticipant is shutting down
00118         bit->store_synthetic_data(data, DDS::NEW_VIEW_STATE);
00119       }
00120 #endif /* DDS_HAS_MINIMUM_BIT */
00121     }
00122   }
00123 
00124   for (EndpointRegistry::ReaderMapType::const_iterator pos = registry_.reader_map.begin(),
00125          limit = registry_.reader_map.end();
00126        pos != limit;
00127        ++pos) {
00128     const RepoId& remoteid = pos->first;
00129     const EndpointRegistry::Reader& reader = pos->second;
00130 
00131     if (!GuidPrefixEqual()(participant_id_.guidPrefix, remoteid.guidPrefix)) {
00132       increment_key(sub_bit_key_);
00133       sub_key_to_id_[sub_bit_key_] = remoteid;
00134 
00135       // pos represents a remote.
00136       // Populate data.
00137       DDS::SubscriptionBuiltinTopicData data;
00138 
00139       data.key = sub_bit_key_;
00140       OPENDDS_STRING topic_name = reader.topic_name;
00141       data.topic_name = topic_name.c_str();
00142       const EndpointRegistry::Topic& topic = registry_.topic_map.find(topic_name)->second;
00143       data.type_name = topic.type_name.c_str();
00144       data.durability = reader.qos.durability;
00145       data.deadline = reader.qos.deadline;
00146       data.latency_budget = reader.qos.latency_budget;
00147       data.liveliness = reader.qos.liveliness;
00148       data.reliability = reader.qos.reliability;
00149       data.ownership = reader.qos.ownership;
00150       data.destination_order = reader.qos.destination_order;
00151       data.user_data = reader.qos.user_data;
00152       data.time_based_filter = reader.qos.time_based_filter;
00153       data.presentation = reader.subscriber_qos.presentation;
00154       data.partition = reader.subscriber_qos.partition;
00155       // // If the TopicQos becomes available, this can be populated.
00156       //data.topic_data = topic_details.qos_.topic_data;
00157       data.group_data = reader.subscriber_qos.group_data;
00158 
00159 #ifndef DDS_HAS_MINIMUM_BIT
00160       DDS::SubscriptionBuiltinTopicDataDataReaderImpl* bit = sub_bit();
00161       if (bit) { // bit may be null if the DomainParticipant is shutting down
00162         bit->store_synthetic_data(data, DDS::NEW_VIEW_STATE);
00163       }
00164 #endif /* DDS_HAS_MINIMUM_BIT */
00165     }
00166   }
00167 }
00168 
00169 void StaticEndpointManager::assign_publication_key(RepoId& rid,
00170                                                    const RepoId& /*topicId*/,
00171                                                    const DDS::DataWriterQos& qos)
00172 {
00173   if (qos.user_data.value.length() != 3) {
00174     ACE_DEBUG((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: StaticEndpointManager::assign_publication_key: no user data to identify writer\n")));
00175     return;
00176   }
00177 
00178   rid.entityId.entityKey[0] = qos.user_data.value[0];
00179   rid.entityId.entityKey[1] = qos.user_data.value[1];
00180   rid.entityId.entityKey[2] = qos.user_data.value[2];
00181   rid.entityId.entityKind = ENTITYKIND_USER_WRITER_WITH_KEY;
00182 
00183   if (DCPS_debug_level > 8) {
00184     ACE_DEBUG((LM_INFO, "(%P|%t) looking up writer ID %s\n",
00185                LogGuid(rid).c_str()));
00186   }
00187 
00188   EndpointRegistry::WriterMapType::const_iterator pos = registry_.writer_map.find(rid);
00189   if (pos == registry_.writer_map.end()) {
00190     ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: StaticEndpointManager::assign_publication_key: unknown writer: %s\n"), LogGuid(rid).c_str()));
00191     return;
00192   }
00193 
00194   DDS::DataWriterQos qos2(qos);
00195   // Qos in registry will not have the user data so overwrite.
00196   qos2.user_data = pos->second.qos.user_data;
00197 
00198   DDS::DataWriterQos qos3(pos->second.qos);
00199 
00200   if (qos2 != qos3) {
00201     ACE_DEBUG((LM_ERROR, ACE_TEXT("(%P|%t) WARNING: StaticEndpointManager::assign_publication_key: dynamic and static QoS differ\n")));
00202   }
00203 }
00204 
00205 void StaticEndpointManager::assign_subscription_key(RepoId& rid,
00206                                                     const RepoId& /*topicId*/,
00207                                                     const DDS::DataReaderQos& qos)
00208 {
00209   if (qos.user_data.value.length() != 3) {
00210     ACE_DEBUG((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: StaticEndpointManager::assign_subscription_key: no user data to identify reader\n")));
00211     return;
00212   }
00213 
00214   rid.entityId.entityKey[0] = qos.user_data.value[0];
00215   rid.entityId.entityKey[1] = qos.user_data.value[1];
00216   rid.entityId.entityKey[2] = qos.user_data.value[2];
00217   rid.entityId.entityKind = ENTITYKIND_USER_READER_WITH_KEY;
00218 
00219   EndpointRegistry::ReaderMapType::const_iterator pos = registry_.reader_map.find(rid);
00220   if (pos == registry_.reader_map.end()) {
00221     ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: StaticEndpointManager::assign_subscription_key: unknown reader: %s\n"), LogGuid(rid).c_str()));
00222     return;
00223   }
00224 
00225   DDS::DataReaderQos qos2(qos);
00226   // Qos in registry will not have the user data so overwrite.
00227   qos2.user_data = pos->second.qos.user_data;
00228 
00229   if (qos2 != pos->second.qos) {
00230     ACE_DEBUG((LM_ERROR, ACE_TEXT("(%P|%t) WARNING: StaticEndpointManager::assign_subscription_key: dynamic and static QoS differ\n")));
00231   }
00232 }
00233 
00234 bool
00235 StaticEndpointManager::update_topic_qos(const RepoId& /*topicId*/,
00236                                         const DDS::TopicQos& /*qos*/,
00237                                         OPENDDS_STRING& /*name*/)
00238 {
00239   ACE_DEBUG((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: StaticEndpointManager::update_topic_qos - ")
00240              ACE_TEXT("Not allowed\n")));
00241   return false;
00242 }
00243 
00244 bool
00245 StaticEndpointManager::update_publication_qos(const RepoId& /*publicationId*/,
00246                                               const DDS::DataWriterQos& /*qos*/,
00247                                               const DDS::PublisherQos& /*publisherQos*/)
00248 {
00249   ACE_DEBUG((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: StaticEndpointManager::update_publication_qos - ")
00250              ACE_TEXT("Not allowed\n")));
00251   return false;
00252 }
00253 
00254 bool
00255 StaticEndpointManager::update_subscription_qos(const RepoId& /*subscriptionId*/,
00256                                                const DDS::DataReaderQos& /*qos*/,
00257                                                const DDS::SubscriberQos& /*subscriberQos*/)
00258 {
00259   ACE_DEBUG((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: StaticEndpointManager::update_subscription_qos - ")
00260              ACE_TEXT("Not allowed\n")));
00261   return false;
00262 }
00263 
00264 bool
00265 StaticEndpointManager::update_subscription_params(const RepoId& /*subId*/,
00266                                                   const DDS::StringSeq& /*params*/)
00267 {
00268   ACE_DEBUG((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: StaticEndpointManager::update_subscription_qos - ")
00269              ACE_TEXT("Not allowed\n")));
00270   return false;
00271 }
00272 
00273 void
00274 StaticEndpointManager::association_complete(const RepoId& /*localId*/,
00275                                             const RepoId& /*remoteId*/)
00276 {
00277   // Do nothing.
00278 }
00279 
00280 bool
00281 StaticEndpointManager::disassociate(const StaticDiscoveredParticipantData& /*pdata*/)
00282 {
00283   ACE_DEBUG((LM_NOTICE, ACE_TEXT("(%P|%t) StaticEndpointManager::disassociate TODO\n")));
00284   // TODO
00285   return false;
00286 }
00287 
00288 DDS::ReturnCode_t
00289 StaticEndpointManager::add_publication_i(const RepoId& writerid,
00290                                          LocalPublication& pub)
00291 {
00292   /*
00293     Find all matching remote readers.
00294     If the reader is best effort, then associate immediately.
00295     If the reader is reliable (we are reliable by implication), register with the transport to receive notification that the remote reader is up.
00296     */
00297   EndpointRegistry::WriterMapType::const_iterator pos = registry_.writer_map.find(writerid);
00298   if (pos == registry_.writer_map.end()) {
00299     return DDS::RETCODE_ERROR;
00300   }
00301   const EndpointRegistry::Writer& writer = pos->second;
00302 
00303   for (RepoIdSet::const_iterator pos = writer.best_effort_readers.begin(), limit = writer.best_effort_readers.end();
00304        pos != limit;
00305        ++pos) {
00306     const RepoId& readerid = *pos;
00307     const EndpointRegistry::Reader& reader = registry_.reader_map.find(readerid)->second;
00308 
00309 #ifdef __SUNPRO_CC
00310     ReaderAssociation ra;
00311     ra.readerTransInfo = reader.trans_info;
00312     ra.readerId = readerid;
00313     ra.subQos = reader.subscriber_qos;
00314     ra.readerQos = reader.qos;
00315     ra.filterClassName = "";
00316     ra.filterExpression = "";
00317     ra.exprParams = 0;
00318 #else
00319     const ReaderAssociation ra =
00320       {reader.trans_info, readerid, reader.subscriber_qos, reader.qos, "", "", 0};
00321 #endif
00322     pub.publication_->add_association(writerid, ra, true);
00323     pub.publication_->association_complete(readerid);
00324   }
00325 
00326   for (RepoIdSet::const_iterator pos = writer.reliable_readers.begin(), limit = writer.reliable_readers.end();
00327        pos != limit;
00328        ++pos) {
00329     const RepoId& readerid = *pos;
00330     const EndpointRegistry::Reader& reader = registry_.reader_map.find(readerid)->second;
00331     pub.publication_->register_for_reader(participant_id_, writerid, readerid, reader.trans_info, this);
00332   }
00333 
00334   return DDS::RETCODE_OK;
00335 }
00336 
00337 DDS::ReturnCode_t
00338 StaticEndpointManager::remove_publication_i(const RepoId& writerid)
00339 {
00340   LocalPublicationMap::const_iterator lp_pos = local_publications_.find(writerid);
00341   if (lp_pos == local_publications_.end()) {
00342     return DDS::RETCODE_ERROR;
00343   }
00344 
00345   const LocalPublication& pub = lp_pos->second;
00346 
00347   EndpointRegistry::WriterMapType::const_iterator pos = registry_.writer_map.find(writerid);
00348   if (pos == registry_.writer_map.end()) {
00349     return DDS::RETCODE_ERROR;
00350   }
00351 
00352   const EndpointRegistry::Writer& writer = pos->second;
00353 
00354   ReaderIdSeq ids;
00355   ids.length((CORBA::ULong)writer.reliable_readers.size());
00356   CORBA::ULong idx = 0;
00357   for (RepoIdSet::const_iterator pos = writer.reliable_readers.begin(), limit = writer.reliable_readers.end();
00358         pos != limit;
00359         ++pos, ++idx) {
00360     const RepoId& readerid = *pos;
00361     ids[idx] = readerid;
00362     pub.publication_->unregister_for_reader(participant_id_, writerid, readerid);
00363   }
00364 
00365   return DDS::RETCODE_OK;
00366 }
00367 
00368 DDS::ReturnCode_t
00369 StaticEndpointManager::add_subscription_i(const RepoId& readerid,
00370                                           LocalSubscription& sub)
00371 {
00372   /*
00373     Find all matching remote writers.
00374     If we (the reader) is best effort, then associate immediately.
00375     If we (the reader) are reliable, then register with the transport to receive notification that the remote writer is up.
00376     */
00377   EndpointRegistry::ReaderMapType::const_iterator pos = registry_.reader_map.find(readerid);
00378   if (pos == registry_.reader_map.end()) {
00379     return DDS::RETCODE_ERROR;
00380   }
00381   const EndpointRegistry::Reader& reader = pos->second;
00382 
00383   for (RepoIdSet::const_iterator pos = reader.best_effort_writers.begin(), limit = reader.best_effort_writers.end();
00384        pos != limit;
00385        ++pos) {
00386     const RepoId& writerid = *pos;
00387     const EndpointRegistry::Writer& writer = registry_.writer_map.find(writerid)->second;
00388 
00389 #ifdef __SUNPRO_CC
00390     WriterAssociation wa;
00391     wa.writerTransInfo = writer.trans_info;
00392     wa.writerId = writerid;
00393     wa.pubQos = writer.publisher_qos;
00394     wa.writerQos = writer.qos;
00395 #else
00396     const WriterAssociation wa =
00397       {writer.trans_info, writerid, writer.publisher_qos, writer.qos};
00398 #endif
00399     sub.subscription_->add_association(readerid, wa, false);
00400   }
00401 
00402   for (RepoIdSet::const_iterator pos = reader.reliable_writers.begin(), limit = reader.reliable_writers.end();
00403        pos != limit;
00404        ++pos) {
00405     const RepoId& writerid = *pos;
00406     const EndpointRegistry::Writer& writer = registry_.writer_map.find(writerid)->second;
00407     sub.subscription_->register_for_writer(participant_id_, readerid, writerid, writer.trans_info, this);
00408   }
00409 
00410   return DDS::RETCODE_OK;
00411 }
00412 
00413 DDS::ReturnCode_t
00414 StaticEndpointManager::remove_subscription_i(const RepoId& readerid)
00415 {
00416   LocalSubscriptionMap::const_iterator ls_pos = local_subscriptions_.find(readerid);
00417   if (ls_pos == local_subscriptions_.end()) {
00418     return DDS::RETCODE_ERROR;
00419   }
00420 
00421   const LocalSubscription& sub = ls_pos->second;
00422 
00423   EndpointRegistry::ReaderMapType::const_iterator pos = registry_.reader_map.find(readerid);
00424   if (pos == registry_.reader_map.end()) {
00425     return DDS::RETCODE_ERROR;
00426   }
00427 
00428   const EndpointRegistry::Reader& reader = pos->second;
00429 
00430   WriterIdSeq ids;
00431   ids.length((CORBA::ULong)reader.reliable_writers.size());
00432   CORBA::ULong idx = 0;
00433   for (RepoIdSet::const_iterator pos = reader.reliable_writers.begin(), limit = reader.reliable_writers.end();
00434         pos != limit;
00435         ++pos, ++idx) {
00436     const RepoId& writerid = *pos;
00437     ids[idx] = writerid;
00438     sub.subscription_->unregister_for_writer(participant_id_, readerid, writerid);
00439   }
00440 
00441   return DDS::RETCODE_OK;
00442 }
00443 
00444 bool
00445 StaticEndpointManager::shutting_down() const
00446 {
00447   ACE_DEBUG((LM_NOTICE, ACE_TEXT("(%P|%t) StaticEndpointManager::shutting_down TODO\n")));
00448   // TODO
00449   return false;
00450 }
00451 
00452 void
00453 StaticEndpointManager::populate_transport_locator_sequence(TransportLocatorSeq*& /*tls*/,
00454                                                            DiscoveredSubscriptionIter& /*iter*/,
00455                                                            const RepoId& /*reader*/)
00456 {
00457   ACE_DEBUG((LM_NOTICE, ACE_TEXT("(%P|%t) StaticEndpointManager::populate_transport_locator_sequence TODO\n")));
00458   // TODO
00459 }
00460 
00461 void
00462 StaticEndpointManager::populate_transport_locator_sequence(TransportLocatorSeq*& /*tls*/,
00463                                                            DiscoveredPublicationIter& /*iter*/,
00464                                                            const RepoId& /*reader*/)
00465 {
00466   ACE_DEBUG((LM_NOTICE, ACE_TEXT("(%P|%t) StaticEndpointManager::populate_transport_locator_sequence TODO\n")));
00467   // TODO
00468 }
00469 
00470 bool
00471 StaticEndpointManager::defer_writer(const RepoId& /*writer*/,
00472                                     const RepoId& /*writer_participant*/)
00473 {
00474   ACE_DEBUG((LM_NOTICE, ACE_TEXT("(%P|%t) StaticEndpointManager::defer_writer TODO\n")));
00475   // TODO
00476   return false;
00477 }
00478 
00479 bool
00480 StaticEndpointManager::defer_reader(const RepoId& /*writer*/,
00481                                     const RepoId& /*writer_participant*/)
00482 {
00483   // TODO
00484   ACE_DEBUG((LM_NOTICE, ACE_TEXT("(%P|%t) StaticEndpointManager::defer_reader TODO\n")));
00485   return false;
00486 }
00487 
00488 void
00489 StaticEndpointManager::reader_exists(const RepoId& readerid, const RepoId& writerid)
00490 {
00491   ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00492   LocalPublicationMap::const_iterator lp_pos = local_publications_.find(writerid);
00493   EndpointRegistry::ReaderMapType::const_iterator reader_pos = registry_.reader_map.find(readerid);
00494   if (lp_pos != local_publications_.end() &&
00495       reader_pos != registry_.reader_map.end()) {
00496     DataWriterCallbacks* dwr = lp_pos->second.publication_;
00497 #ifdef __SUNPRO_CC
00498     ReaderAssociation ra;
00499     ra.readerTransInfo = reader_pos->second.trans_info;
00500     ra.readerId = readerid;
00501     ra.subQos = reader_pos->second.subscriber_qos;
00502     ra.readerQos = reader_pos->second.qos;
00503     ra.filterClassName = "";
00504     ra.filterExpression = "";
00505     ra.exprParams = 0;
00506 #else
00507     const ReaderAssociation ra =
00508       {reader_pos->second.trans_info, readerid, reader_pos->second.subscriber_qos, reader_pos->second.qos, "", "", 0};
00509 
00510 #endif
00511     dwr->add_association(writerid, ra, true);
00512     dwr->association_complete(readerid);
00513   }
00514 }
00515 
00516 void
00517 StaticEndpointManager::reader_does_not_exist(const RepoId& readerid, const RepoId& writerid)
00518 {
00519   ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00520   LocalPublicationMap::const_iterator lp_pos = local_publications_.find(writerid);
00521   EndpointRegistry::ReaderMapType::const_iterator reader_pos = registry_.reader_map.find(readerid);
00522   if (lp_pos != local_publications_.end() &&
00523       reader_pos != registry_.reader_map.end()) {
00524     DataWriterCallbacks* dwr = lp_pos->second.publication_;
00525     ReaderIdSeq ids;
00526     ids.length(1);
00527     ids[0] = readerid;
00528     dwr->remove_associations(ids, true);
00529   }
00530 }
00531 
00532 void
00533 StaticEndpointManager::writer_exists(const RepoId& writerid, const RepoId& readerid)
00534 {
00535   ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00536   LocalSubscriptionMap::const_iterator ls_pos = local_subscriptions_.find(readerid);
00537   EndpointRegistry::WriterMapType::const_iterator writer_pos = registry_.writer_map.find(writerid);
00538   if (ls_pos != local_subscriptions_.end() &&
00539       writer_pos != registry_.writer_map.end()) {
00540     DataReaderCallbacks* drr = ls_pos->second.subscription_;
00541 #ifdef __SUNPRO_CC
00542     WriterAssociation wa;
00543     wa.writerTransInfo = writer_pos->second.trans_info;
00544     wa.writerId = writerid;
00545     wa.pubQos = writer_pos->second.publisher_qos;
00546     wa.writerQos = writer_pos->second.qos;
00547 #else
00548     const WriterAssociation wa =
00549       {writer_pos->second.trans_info, writerid, writer_pos->second.publisher_qos, writer_pos->second.qos};
00550 #endif
00551     drr->add_association(readerid, wa, false);
00552   }
00553 }
00554 
00555 void
00556 StaticEndpointManager::writer_does_not_exist(const RepoId& writerid, const RepoId& readerid)
00557 {
00558   ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00559   LocalSubscriptionMap::const_iterator ls_pos = local_subscriptions_.find(readerid);
00560   EndpointRegistry::WriterMapType::const_iterator writer_pos = registry_.writer_map.find(writerid);
00561   if (ls_pos != local_subscriptions_.end() &&
00562       writer_pos != registry_.writer_map.end()) {
00563     DataReaderCallbacks* drr = ls_pos->second.subscription_;
00564     WriterIdSeq ids;
00565     ids.length(1);
00566     ids[0] = writerid;
00567     drr->remove_associations(ids, true);
00568   }
00569 }
00570 
00571 #ifndef DDS_HAS_MINIMUM_BIT
00572 DDS::PublicationBuiltinTopicDataDataReaderImpl*
00573 StaticEndpointManager::pub_bit()
00574 {
00575   DDS::Subscriber_var sub = participant_.bit_subscriber();
00576   if (!sub.in())
00577     return 0;
00578 
00579   DDS::DataReader_var d = sub->lookup_datareader(BUILT_IN_PUBLICATION_TOPIC);
00580   return dynamic_cast<DDS::PublicationBuiltinTopicDataDataReaderImpl*>(d.in());
00581 }
00582 
00583 DDS::SubscriptionBuiltinTopicDataDataReaderImpl*
00584 StaticEndpointManager::sub_bit()
00585 {
00586   DDS::Subscriber_var sub = participant_.bit_subscriber();
00587   if (!sub.in())
00588     return 0;
00589 
00590   DDS::DataReader_var d = sub->lookup_datareader(BUILT_IN_SUBSCRIPTION_TOPIC);
00591   return dynamic_cast<DDS::SubscriptionBuiltinTopicDataDataReaderImpl*>(d.in());
00592 }
00593 #endif /* DDS_HAS_MINIMUM_BIT */
00594 
00595 StaticDiscovery::StaticDiscovery(const RepoKey& key)
00596   : PeerDiscovery<StaticParticipant>(key)
00597 {}
00598 
00599 namespace {
00600   unsigned char hextobyte(unsigned char c)
00601   {
00602     if (c >= '0' && c <= '9') {
00603       return c - '0';
00604     }
00605     if (c >= 'a' && c <= 'f') {
00606       return 10 + c - 'a';
00607     }
00608     if (c >= 'A' && c <= 'F') {
00609       return 10 + c - 'A';
00610     }
00611     return c;
00612   }
00613 
00614   unsigned char
00615   fromhex(const OPENDDS_STRING& x, size_t idx)
00616   {
00617     return (hextobyte(x[idx * 2]) << 4) | (hextobyte(x[idx * 2 + 1]));
00618   }
00619 }
00620 
00621 EntityId_t
00622 EndpointRegistry::build_id(const unsigned char* entity_key,
00623                            const unsigned char entity_kind)
00624 {
00625   EntityId_t retval;
00626   retval.entityKey[0] = entity_key[0];
00627   retval.entityKey[1] = entity_key[1];
00628   retval.entityKey[2] = entity_key[2];
00629   retval.entityKind = entity_kind;
00630   return retval;
00631 }
00632 
00633 RepoId
00634 EndpointRegistry::build_id(DDS::DomainId_t domain,
00635                            const unsigned char* participant_id,
00636                            const EntityId_t& entity_id)
00637 {
00638   RepoId id;
00639   id.guidPrefix[0] = VENDORID_OCI[0];
00640   id.guidPrefix[1] = VENDORID_OCI[1];
00641   // id.guidPrefix[2] = domain[0]
00642   // id.guidPrefix[3] = domain[1]
00643   // id.guidPrefix[4] = domain[2]
00644   // id.guidPrefix[5] = domain[3]
00645   DDS::DomainId_t netdom = ACE_HTONL(domain);
00646   ACE_OS::memcpy(&id.guidPrefix[2], &netdom, sizeof(DDS::DomainId_t));
00647   // id.guidPrefix[6] = participant[0]
00648   // id.guidPrefix[7] = participant[1]
00649   // id.guidPrefix[8] = participant[2]
00650   // id.guidPrefix[9] = participant[3]
00651   // id.guidPrefix[10] = participant[4]
00652   // id.guidPrefix[11] = participant[5]
00653   ACE_OS::memcpy(&id.guidPrefix[6], participant_id, 6);
00654   id.entityId = entity_id;
00655   return id;
00656 }
00657 
00658 AddDomainStatus
00659 StaticDiscovery::add_domain_participant(DDS::DomainId_t domain,
00660                                         const DDS::DomainParticipantQos& qos)
00661 {
00662   AddDomainStatus ads = {RepoId(), false /*federated*/};
00663 
00664   if (qos.user_data.value.length() != 6) {
00665     ACE_DEBUG((LM_ERROR,
00666                 ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::add_domain_participant ")
00667                 ACE_TEXT("No userdata to identify participant\n")));
00668     return ads;
00669   }
00670 
00671   RepoId id = EndpointRegistry::build_id(domain,
00672                                          qos.user_data.value.get_buffer(),
00673                                          ENTITYID_PARTICIPANT);
00674   if (!get_part(domain, id).is_nil()) {
00675     ACE_DEBUG((LM_ERROR,
00676                 ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::add_domain_participant ")
00677                 ACE_TEXT("Duplicate participant\n")));
00678     return ads;
00679   }
00680 
00681   const RcHandle<StaticParticipant> participant = new StaticParticipant(id, qos, registry);
00682 
00683   {
00684     ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, ads);
00685     participants_[domain][id] = participant;
00686   }
00687 
00688   ads.id = id;
00689   return ads;
00690 }
00691 
00692 namespace {
00693   const ACE_TCHAR TOPIC_SECTION_NAME[] = ACE_TEXT("topic");
00694   const ACE_TCHAR DATAWRITERQOS_SECTION_NAME[] = ACE_TEXT("datawriterqos");
00695   const ACE_TCHAR DATAREADERQOS_SECTION_NAME[] = ACE_TEXT("datareaderqos");
00696   const ACE_TCHAR PUBLISHERQOS_SECTION_NAME[]  = ACE_TEXT("publisherqos");
00697   const ACE_TCHAR SUBSCRIBERQOS_SECTION_NAME[] = ACE_TEXT("subscriberqos");
00698   const ACE_TCHAR ENDPOINT_SECTION_NAME[] = ACE_TEXT("endpoint");
00699 
00700   void parse_second(CORBA::Long& x, const OPENDDS_STRING& value)
00701   {
00702     if (value == "DURATION_INFINITE_SEC") {
00703       x = DDS::DURATION_INFINITE_SEC;
00704     } else {
00705       x = atoi(value.c_str());
00706     }
00707   }
00708 
00709   void parse_nanosecond(CORBA::ULong& x, const OPENDDS_STRING& value)
00710   {
00711     if (value == "DURATION_INFINITE_NANOSEC") {
00712       x = DDS::DURATION_INFINITE_NSEC;
00713     } else {
00714       x = atoi(value.c_str());
00715     }
00716   }
00717 
00718   bool parse_bool(CORBA::Boolean& x, const OPENDDS_STRING& value)
00719   {
00720     if (value == "true") {
00721       x = true;
00722       return true;
00723     } else if (value == "false") {
00724       x = false;
00725       return true;
00726     }
00727     return false;
00728   }
00729 
00730   void parse_list(DDS::PartitionQosPolicy& x, const OPENDDS_STRING& value)
00731   {
00732     // Value can be a comma-separated list
00733     const char* start = value.c_str();
00734     char buffer[128];
00735     std::memset(buffer, 0, sizeof(buffer));
00736     while (const char* next_comma = std::strchr(start, ',')) {
00737       // Copy into temp buffer, won't have null
00738       std::strncpy(buffer, start, next_comma - start);
00739       // Append null
00740       buffer[next_comma - start] = '\0';
00741       // Add to QOS
00742       x.name.length(x.name.length() + 1);
00743       x.name[x.name.length() - 1] = static_cast<const char*>(buffer);
00744       // Advance pointer
00745       start = next_comma + 1;
00746     }
00747     // Append everything after last comma
00748     x.name.length(x.name.length() + 1);
00749     x.name[x.name.length() - 1] = start;
00750   }
00751 }
00752 
00753 int
00754 StaticDiscovery::load_configuration(ACE_Configuration_Heap& cf)
00755 {
00756   if (parse_topics(cf) ||
00757       parse_datawriterqos(cf) ||
00758       parse_datareaderqos(cf) ||
00759       parse_publisherqos(cf) ||
00760       parse_subscriberqos(cf) ||
00761       parse_endpoints(cf)) {
00762     return -1;
00763   }
00764 
00765   registry.match();
00766 
00767   return 0;
00768 }
00769 
00770 int
00771 StaticDiscovery::parse_topics(ACE_Configuration_Heap& cf)
00772 {
00773   const ACE_Configuration_Section_Key& root = cf.root_section();
00774   ACE_Configuration_Section_Key section;
00775 
00776   if (cf.open_section(root, TOPIC_SECTION_NAME, 0, section) != 0) {
00777     if (DCPS_debug_level > 0) {
00778       // This is not an error if the configuration file does not have
00779       // any topic (sub)section.
00780       ACE_DEBUG((LM_NOTICE,
00781                   ACE_TEXT("(%P|%t) NOTICE: StaticDiscovery::parse_topics ")
00782                   ACE_TEXT("no [%s] sections.\n"),
00783                   TOPIC_SECTION_NAME));
00784     }
00785     return 0;
00786   }
00787 
00788   // Ensure there are no key/values in the [topic] section.
00789   // Every key/value must be in a [topic/*] sub-section.
00790   ValueMap vm;
00791   if (pullValues(cf, section, vm) > 0) {
00792     ACE_ERROR_RETURN((LM_ERROR,
00793                       ACE_TEXT("(%P|%t) StaticDiscovery::parse_topics ")
00794                       ACE_TEXT("[topic] sections must have a subsection name\n")),
00795                       -1);
00796   }
00797   // Process the subsections of this section
00798   KeyList keys;
00799   if (processSections(cf, section, keys) != 0) {
00800     ACE_ERROR_RETURN((LM_ERROR,
00801                       ACE_TEXT("(%P|%t) StaticDiscovery::parse_topics ")
00802                       ACE_TEXT("too many nesting layers in the [topic] section.\n")),
00803                       -1);
00804   }
00805 
00806   // Loop through the [topic/*] sections
00807   for (KeyList::const_iterator it = keys.begin(); it != keys.end(); ++it) {
00808     OPENDDS_STRING topic_name = it->first;
00809 
00810     if (DCPS_debug_level > 0) {
00811       ACE_DEBUG((LM_NOTICE,
00812                   ACE_TEXT("(%P|%t) NOTICE: StaticDiscovery::parse_topics ")
00813                   ACE_TEXT("processing [topic/%C] section.\n"),
00814                   topic_name.c_str()));
00815     }
00816 
00817     ValueMap values;
00818     pullValues(cf, it->second, values);
00819 
00820     EndpointRegistry::Topic topic;
00821     bool name_Specified = false,
00822       type_name_Specified = false;
00823 
00824     for (ValueMap::const_iterator it = values.begin(); it != values.end(); ++it) {
00825       OPENDDS_STRING name = it->first;
00826       OPENDDS_STRING value = it->second;
00827 
00828       if (name == "name") {
00829         topic.name = value;
00830         name_Specified = true;
00831       } else if (name == "type_name") {
00832         if (value.size() >= 128) {
00833           ACE_ERROR_RETURN((LM_ERROR,
00834                             ACE_TEXT("(%P|%t) StaticDiscovery::parse_topics ")
00835                             ACE_TEXT("type_name (%C) must be less than 128 characters in [topic/%C] section.\n"),
00836                             value.c_str(), topic_name.c_str()),
00837                             -1);
00838         }
00839         topic.type_name = value;
00840         type_name_Specified = true;
00841       } else {
00842         // Typos are ignored to avoid parsing FACE-specific keys.
00843       }
00844     }
00845 
00846     if (!name_Specified) {
00847       topic.name = topic_name;
00848     }
00849 
00850     if (!type_name_Specified) {
00851       ACE_ERROR_RETURN((LM_ERROR,
00852                         ACE_TEXT("(%P|%t) StaticDiscovery::parse_topics ")
00853                         ACE_TEXT("No type_name specified for [topic/%C] section.\n"),
00854                         topic_name.c_str()),
00855                        -1);
00856     }
00857 
00858     registry.topic_map[topic_name] = topic;
00859   }
00860 
00861   return 0;
00862 }
00863 
00864 int
00865 StaticDiscovery::parse_datawriterqos(ACE_Configuration_Heap& cf)
00866 {
00867   const ACE_Configuration_Section_Key& root = cf.root_section();
00868   ACE_Configuration_Section_Key section;
00869 
00870   if (cf.open_section(root, DATAWRITERQOS_SECTION_NAME, 0, section) != 0) {
00871     if (DCPS_debug_level > 0) {
00872       // This is not an error if the configuration file does not have
00873       // any datawriterqos (sub)section.
00874       ACE_DEBUG((LM_NOTICE,
00875                   ACE_TEXT("(%P|%t) NOTICE: StaticDiscovery::parse_datawriterqos ")
00876                   ACE_TEXT("no [%s] sections.\n"),
00877                   DATAWRITERQOS_SECTION_NAME));
00878     }
00879     return 0;
00880   }
00881 
00882   // Ensure there are no key/values in the [datawriterqos] section.
00883   // Every key/value must be in a [datawriterqos/*] sub-section.
00884   ValueMap vm;
00885   if (pullValues(cf, section, vm) > 0) {
00886     ACE_ERROR_RETURN((LM_ERROR,
00887                       ACE_TEXT("(%P|%t) StaticDiscovery::parse_datawriterqos ")
00888                       ACE_TEXT("[datawriterqos] sections must have a subsection name\n")),
00889                       -1);
00890   }
00891   // Process the subsections of this section
00892   KeyList keys;
00893   if (processSections(cf, section, keys) != 0) {
00894     ACE_ERROR_RETURN((LM_ERROR,
00895                       ACE_TEXT("(%P|%t) StaticDiscovery::parse_datawriterqos ")
00896                       ACE_TEXT("too many nesting layers in the [datawriterqos] section.\n")),
00897                       -1);
00898   }
00899 
00900   // Loop through the [datawriterqos/*] sections
00901   for (KeyList::const_iterator it = keys.begin(); it != keys.end(); ++it) {
00902     OPENDDS_STRING datawriterqos_name = it->first;
00903 
00904     if (DCPS_debug_level > 0) {
00905       ACE_DEBUG((LM_NOTICE,
00906                   ACE_TEXT("(%P|%t) NOTICE: StaticDiscovery::parse_datawriterqos ")
00907                   ACE_TEXT("processing [datawriterqos/%C] section.\n"),
00908                   datawriterqos_name.c_str()));
00909     }
00910 
00911     ValueMap values;
00912     pullValues(cf, it->second, values);
00913 
00914     DDS::DataWriterQos datawriterqos(TheServiceParticipant->initial_DataWriterQos());
00915 
00916     for (ValueMap::const_iterator it = values.begin(); it != values.end(); ++it) {
00917       OPENDDS_STRING name = it->first;
00918       OPENDDS_STRING value = it->second;
00919 
00920       if (name == "durability.kind") {
00921         if (value == "VOLATILE") {
00922           datawriterqos.durability.kind = DDS::VOLATILE_DURABILITY_QOS;
00923         } else if (value == "TRANSIENT_LOCAL") {
00924           datawriterqos.durability.kind = DDS::TRANSIENT_LOCAL_DURABILITY_QOS;
00925 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
00926         } else if (value == "TRANSIENT") {
00927           datawriterqos.durability.kind = DDS::TRANSIENT_DURABILITY_QOS;
00928         } else if (value == "PERSISTENT") {
00929           datawriterqos.durability.kind = DDS::PERSISTENT_DURABILITY_QOS;
00930 #endif
00931         } else {
00932           ACE_ERROR_RETURN((LM_ERROR,
00933                             ACE_TEXT("(%P|%t) StaticDiscovery::parse_datawriterqos ")
00934                             ACE_TEXT("Illegal value for durability.kind (%C) in [datawriterqos/%C] section.\n"),
00935                             value.c_str(), datawriterqos_name.c_str()),
00936                             -1);
00937         }
00938       } else if (name == "deadline.period.sec") {
00939         parse_second(datawriterqos.deadline.period.sec, value);
00940       } else if (name == "deadline.period.nanosec") {
00941         parse_nanosecond(datawriterqos.deadline.period.nanosec, value);
00942       } else if (name == "latency_budget.duration.sec") {
00943         parse_second(datawriterqos.latency_budget.duration.sec, value);
00944       } else if (name == "latency_budget.duration.nanosec") {
00945         parse_nanosecond(datawriterqos.latency_budget.duration.nanosec, value);
00946       } else if (name == "liveliness.kind") {
00947         if (value == "AUTOMATIC") {
00948           datawriterqos.liveliness.kind = DDS::AUTOMATIC_LIVELINESS_QOS;
00949         } else if (value == "MANUAL_BY_TOPIC") {
00950           datawriterqos.liveliness.kind = DDS::MANUAL_BY_TOPIC_LIVELINESS_QOS;
00951         } else if (value == "MANUAL_BY_PARTICIPANT") {
00952           datawriterqos.liveliness.kind = DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS;
00953         } else {
00954           ACE_ERROR_RETURN((LM_ERROR,
00955                             ACE_TEXT("(%P|%t) StaticDiscovery::parse_datawriterqos ")
00956                             ACE_TEXT("Illegal value for liveliness.kind (%C) in [datawriterqos/%C] section.\n"),
00957                             value.c_str(), datawriterqos_name.c_str()),
00958                             -1);
00959         }
00960       } else if (name == "liveliness.lease_duration.sec") {
00961         parse_second(datawriterqos.liveliness.lease_duration.sec, value);
00962       } else if (name == "liveliness.lease_duration.nanosec") {
00963         parse_nanosecond(datawriterqos.liveliness.lease_duration.nanosec, value);
00964       } else if (name == "reliability.kind") {
00965         if (value == "BEST_EFFORT") {
00966           datawriterqos.reliability.kind = DDS::BEST_EFFORT_RELIABILITY_QOS;
00967         } else if (value == "RELIABLE") {
00968           datawriterqos.reliability.kind = DDS::RELIABLE_RELIABILITY_QOS;
00969         } else {
00970           ACE_ERROR_RETURN((LM_ERROR,
00971                             ACE_TEXT("(%P|%t) StaticDiscovery::parse_datawriterqos ")
00972                             ACE_TEXT("Illegal value for reliability.kind (%C) in [datawriterqos/%C] section.\n"),
00973                             value.c_str(), datawriterqos_name.c_str()),
00974                             -1);
00975         }
00976       } else if (name == "reliability.max_blocking_time.sec") {
00977         parse_second(datawriterqos.reliability.max_blocking_time.sec, value);
00978       } else if (name == "reliability.max_blocking_time.nanosec") {
00979         parse_nanosecond(datawriterqos.reliability.max_blocking_time.nanosec, value);
00980       } else if (name == "destination_order.kind") {
00981         if (value == "BY_RECEPTION_TIMESTAMP") {
00982           datawriterqos.destination_order.kind = DDS::BY_RECEPTION_TIMESTAMP_DESTINATIONORDER_QOS;
00983         } else if (value == "BY_SOURCE_TIMESTAMP") {
00984           datawriterqos.destination_order.kind = DDS::BY_SOURCE_TIMESTAMP_DESTINATIONORDER_QOS;
00985         } else {
00986           ACE_ERROR_RETURN((LM_ERROR,
00987                             ACE_TEXT("(%P|%t) StaticDiscovery::parse_datawriterqos ")
00988                             ACE_TEXT("Illegal value for destination_order.kind (%C) in [datawriterqos/%C] section.\n"),
00989                             value.c_str(), datawriterqos_name.c_str()),
00990                             -1);
00991         }
00992       } else if (name == "history.kind") {
00993         if (value == "KEEP_ALL") {
00994           datawriterqos.history.kind = DDS::KEEP_ALL_HISTORY_QOS;
00995         } else if (value == "KEEP_LAST") {
00996           datawriterqos.history.kind = DDS::KEEP_LAST_HISTORY_QOS;
00997         } else {
00998           ACE_ERROR_RETURN((LM_ERROR,
00999                             ACE_TEXT("(%P|%t) StaticDiscovery::parse_datawriterqos ")
01000                             ACE_TEXT("Illegal value for history.kind (%C) in [datawriterqos/%C] section.\n"),
01001                             value.c_str(), datawriterqos_name.c_str()),
01002                             -1);
01003         }
01004       } else if (name == "history.depth") {
01005         datawriterqos.history.depth = atoi(value.c_str());
01006       } else if (name == "resource_limits.max_samples") {
01007         datawriterqos.resource_limits.max_samples = atoi(value.c_str());
01008       } else if (name == "resource_limits.max_instances") {
01009         datawriterqos.resource_limits.max_instances = atoi(value.c_str());
01010       } else if (name == "resource_limits.max_samples_per_instance") {
01011         datawriterqos.resource_limits.max_samples_per_instance = atoi(value.c_str());
01012       } else if (name == "transport_priority.value") {
01013         datawriterqos.transport_priority.value = atoi(value.c_str());
01014       } else if (name == "lifespan.duration.sec") {
01015         parse_second(datawriterqos.lifespan.duration.sec, value);
01016       } else if (name == "lifespan.duration.nanosec") {
01017         parse_nanosecond(datawriterqos.lifespan.duration.nanosec, value);
01018       } else if (name == "ownership.kind") {
01019         if (value == "SHARED") {
01020           datawriterqos.ownership.kind = DDS::SHARED_OWNERSHIP_QOS;
01021         } else if (value == "EXCLUSIVE") {
01022           datawriterqos.ownership.kind = DDS::EXCLUSIVE_OWNERSHIP_QOS;
01023         } else {
01024           ACE_ERROR_RETURN((LM_ERROR,
01025                             ACE_TEXT("(%P|%t) StaticDiscovery::parse_datawriterqos ")
01026                             ACE_TEXT("Illegal value for ownership.kind (%C) in [datawriterqos/%C] section.\n"),
01027                             value.c_str(), datawriterqos_name.c_str()),
01028                             -1);
01029         }
01030       } else if (name == "ownership_strength.value") {
01031         datawriterqos.ownership_strength.value = atoi(value.c_str());
01032       } else {
01033         ACE_ERROR_RETURN((LM_ERROR,
01034                           ACE_TEXT("(%P|%t) StaticDiscovery::parse_datawriterqos ")
01035                           ACE_TEXT("Unexpected entry (%C) in [datawriterqos/%C] section.\n"),
01036                           name.c_str(), datawriterqos_name.c_str()),
01037                           -1);
01038       }
01039     }
01040 
01041     registry.datawriterqos_map[datawriterqos_name] = datawriterqos;
01042   }
01043 
01044   return 0;
01045 }
01046 
01047 int
01048 StaticDiscovery::parse_datareaderqos(ACE_Configuration_Heap& cf)
01049 {
01050   const ACE_Configuration_Section_Key& root = cf.root_section();
01051   ACE_Configuration_Section_Key section;
01052 
01053   if (cf.open_section(root, DATAREADERQOS_SECTION_NAME, 0, section) != 0) {
01054     if (DCPS_debug_level > 0) {
01055       // This is not an error if the configuration file does not have
01056       // any datareaderqos (sub)section.
01057       ACE_DEBUG((LM_NOTICE,
01058                   ACE_TEXT("(%P|%t) NOTICE: StaticDiscovery::parse_datareaderqos ")
01059                   ACE_TEXT("no [%s] sections.\n"),
01060                   DATAREADERQOS_SECTION_NAME));
01061     }
01062     return 0;
01063   }
01064 
01065   // Ensure there are no key/values in the [datareaderqos] section.
01066   // Every key/value must be in a [datareaderqos/*] sub-section.
01067   ValueMap vm;
01068   if (pullValues(cf, section, vm) > 0) {
01069     ACE_ERROR_RETURN((LM_ERROR,
01070                       ACE_TEXT("(%P|%t) StaticDiscovery::parse_datareaderqos ")
01071                       ACE_TEXT("[datareaderqos] sections must have a subsection name\n")),
01072                       -1);
01073   }
01074   // Process the subsections of this section
01075   KeyList keys;
01076   if (processSections(cf, section, keys) != 0) {
01077     ACE_ERROR_RETURN((LM_ERROR,
01078                       ACE_TEXT("(%P|%t) StaticDiscovery::parse_datareaderqos ")
01079                       ACE_TEXT("too many nesting layers in the [datareaderqos] section.\n")),
01080                       -1);
01081   }
01082 
01083   // Loop through the [datareaderqos/*] sections
01084   for (KeyList::const_iterator it = keys.begin(); it != keys.end(); ++it) {
01085     OPENDDS_STRING datareaderqos_name = it->first;
01086 
01087     if (DCPS_debug_level > 0) {
01088       ACE_DEBUG((LM_NOTICE,
01089                   ACE_TEXT("(%P|%t) NOTICE: StaticDiscovery::parse_datareaderqos ")
01090                   ACE_TEXT("processing [datareaderqos/%C] section.\n"),
01091                   datareaderqos_name.c_str()));
01092     }
01093 
01094     ValueMap values;
01095     pullValues(cf, it->second, values);
01096 
01097     DDS::DataReaderQos datareaderqos(TheServiceParticipant->initial_DataReaderQos());
01098 
01099     for (ValueMap::const_iterator it = values.begin(); it != values.end(); ++it) {
01100       OPENDDS_STRING name = it->first;
01101       OPENDDS_STRING value = it->second;
01102 
01103       if (name == "durability.kind") {
01104         if (value == "VOLATILE") {
01105           datareaderqos.durability.kind = DDS::VOLATILE_DURABILITY_QOS;
01106         } else if (value == "TRANSIENT_LOCAL") {
01107           datareaderqos.durability.kind = DDS::TRANSIENT_LOCAL_DURABILITY_QOS;
01108 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
01109         } else if (value == "TRANSIENT") {
01110           datareaderqos.durability.kind = DDS::TRANSIENT_DURABILITY_QOS;
01111         } else if (value == "PERSISTENT") {
01112           datareaderqos.durability.kind = DDS::PERSISTENT_DURABILITY_QOS;
01113 #endif
01114         } else {
01115           ACE_ERROR_RETURN((LM_ERROR,
01116                             ACE_TEXT("(%P|%t) StaticDiscovery::parse_datareaderqos ")
01117                             ACE_TEXT("Illegal value for durability.kind (%C) in [datareaderqos/%C] section.\n"),
01118                             value.c_str(), datareaderqos_name.c_str()),
01119                             -1);
01120         }
01121       } else if (name == "deadline.period.sec") {
01122         parse_second(datareaderqos.deadline.period.sec, value);
01123       } else if (name == "deadline.period.nanosec") {
01124         parse_nanosecond(datareaderqos.deadline.period.nanosec, value);
01125       } else if (name == "latency_budget.duration.sec") {
01126         parse_second(datareaderqos.latency_budget.duration.sec, value);
01127       } else if (name == "latency_budget.duration.nanosec") {
01128         parse_nanosecond(datareaderqos.latency_budget.duration.nanosec, value);
01129       } else if (name == "liveliness.kind") {
01130         if (value == "AUTOMATIC") {
01131           datareaderqos.liveliness.kind = DDS::AUTOMATIC_LIVELINESS_QOS;
01132         } else if (value == "MANUAL_BY_TOPIC") {
01133           datareaderqos.liveliness.kind = DDS::MANUAL_BY_TOPIC_LIVELINESS_QOS;
01134         } else if (value == "MANUAL_BY_PARTICIPANT") {
01135           datareaderqos.liveliness.kind = DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS;
01136         } else {
01137           ACE_ERROR_RETURN((LM_ERROR,
01138                             ACE_TEXT("(%P|%t) StaticDiscovery::parse_datareaderqos ")
01139                             ACE_TEXT("Illegal value for liveliness.kind (%C) in [datareaderqos/%C] section.\n"),
01140                             value.c_str(), datareaderqos_name.c_str()),
01141                             -1);
01142         }
01143       } else if (name == "liveliness.lease_duration.sec") {
01144         parse_second(datareaderqos.liveliness.lease_duration.sec, value);
01145       } else if (name == "liveliness.lease_duration.nanosec") {
01146         parse_nanosecond(datareaderqos.liveliness.lease_duration.nanosec, value);
01147       } else if (name == "reliability.kind") {
01148         if (value == "BEST_EFFORT") {
01149           datareaderqos.reliability.kind = DDS::BEST_EFFORT_RELIABILITY_QOS;
01150         } else if (value == "RELIABLE") {
01151           datareaderqos.reliability.kind = DDS::RELIABLE_RELIABILITY_QOS;
01152         } else {
01153           ACE_ERROR_RETURN((LM_ERROR,
01154                             ACE_TEXT("(%P|%t) StaticDiscovery::parse_datareaderqos ")
01155                             ACE_TEXT("Illegal value for reliability.kind (%C) in [datareaderqos/%C] section.\n"),
01156                             value.c_str(), datareaderqos_name.c_str()),
01157                             -1);
01158         }
01159       } else if (name == "reliability.max_blocking_time.sec") {
01160         parse_second(datareaderqos.reliability.max_blocking_time.sec, value);
01161       } else if (name == "reliability.max_blocking_time.nanosec") {
01162         parse_nanosecond(datareaderqos.reliability.max_blocking_time.nanosec, value);
01163       } else if (name == "destination_order.kind") {
01164         if (value == "BY_RECEPTION_TIMESTAMP") {
01165           datareaderqos.destination_order.kind = DDS::BY_RECEPTION_TIMESTAMP_DESTINATIONORDER_QOS;
01166         } else if (value == "BY_SOURCE_TIMESTAMP") {
01167           datareaderqos.destination_order.kind = DDS::BY_SOURCE_TIMESTAMP_DESTINATIONORDER_QOS;
01168         } else {
01169           ACE_ERROR_RETURN((LM_ERROR,
01170                             ACE_TEXT("(%P|%t) StaticDiscovery::parse_datareaderqos ")
01171                             ACE_TEXT("Illegal value for destination_order.kind (%C) in [datareaderqos/%C] section.\n"),
01172                             value.c_str(), datareaderqos_name.c_str()),
01173                             -1);
01174         }
01175       } else if (name == "history.kind") {
01176         if (value == "KEEP_ALL") {
01177           datareaderqos.history.kind = DDS::KEEP_ALL_HISTORY_QOS;
01178         } else if (value == "KEEP_LAST") {
01179           datareaderqos.history.kind = DDS::KEEP_LAST_HISTORY_QOS;
01180         } else {
01181           ACE_ERROR_RETURN((LM_ERROR,
01182                             ACE_TEXT("(%P|%t) StaticDiscovery::parse_datareaderqos ")
01183                             ACE_TEXT("Illegal value for history.kind (%C) in [datareaderqos/%C] section.\n"),
01184                             value.c_str(), datareaderqos_name.c_str()),
01185                             -1);
01186         }
01187       } else if (name == "history.depth") {
01188         datareaderqos.history.depth = atoi(value.c_str());
01189       } else if (name == "resource_limits.max_samples") {
01190         datareaderqos.resource_limits.max_samples = atoi(value.c_str());
01191       } else if (name == "resource_limits.max_instances") {
01192         datareaderqos.resource_limits.max_instances = atoi(value.c_str());
01193       } else if (name == "resource_limits.max_samples_per_instance") {
01194         datareaderqos.resource_limits.max_samples_per_instance = atoi(value.c_str());
01195       } else if (name == "time_based_filter.minimum_separation.sec") {
01196         parse_second(datareaderqos.time_based_filter.minimum_separation.sec, value);
01197       } else if (name == "time_based_filter.minimum_separation.nanosec") {
01198         parse_nanosecond(datareaderqos.time_based_filter.minimum_separation.nanosec, value);
01199       } else if (name == "reader_data_lifecycle.autopurge_nowriter_samples_delay.sec") {
01200         parse_second(datareaderqos.reader_data_lifecycle.autopurge_nowriter_samples_delay.sec, value);
01201       } else if (name == "reader_data_lifecycle.autopurge_nowriter_samples_delay.nanosec") {
01202         parse_nanosecond(datareaderqos.reader_data_lifecycle.autopurge_nowriter_samples_delay.nanosec, value);
01203       } else if (name == "reader_data_lifecycle.autopurge_disposed_samples_delay.sec") {
01204         parse_second(datareaderqos.reader_data_lifecycle.autopurge_disposed_samples_delay.sec, value);
01205       } else if (name == "reader_data_lifecycle.autopurge_disposed_samples_delay.nanosec") {
01206         parse_nanosecond(datareaderqos.reader_data_lifecycle.autopurge_disposed_samples_delay.nanosec, value);
01207       } else {
01208         ACE_ERROR_RETURN((LM_ERROR,
01209                           ACE_TEXT("(%P|%t) StaticDiscovery::parse_datareaderqos ")
01210                           ACE_TEXT("Unexpected entry (%C) in [datareaderqos/%C] section.\n"),
01211                           name.c_str(), datareaderqos_name.c_str()),
01212                           -1);
01213       }
01214     }
01215 
01216     registry.datareaderqos_map[datareaderqos_name] = datareaderqos;
01217   }
01218 
01219   return 0;
01220 }
01221 
01222 int
01223 StaticDiscovery::parse_publisherqos(ACE_Configuration_Heap& cf)
01224 {
01225   const ACE_Configuration_Section_Key& root = cf.root_section();
01226   ACE_Configuration_Section_Key section;
01227 
01228   if (cf.open_section(root, PUBLISHERQOS_SECTION_NAME, 0, section) != 0) {
01229     if (DCPS_debug_level > 0) {
01230       // This is not an error if the configuration file does not have
01231       // any publisherqos (sub)section.
01232       ACE_DEBUG((LM_NOTICE,
01233                   ACE_TEXT("(%P|%t) NOTICE: StaticDiscovery::parse_publisherqos ")
01234                   ACE_TEXT("no [%s] sections.\n"),
01235                   PUBLISHERQOS_SECTION_NAME));
01236     }
01237     return 0;
01238   }
01239 
01240   // Ensure there are no key/values in the [publisherqos] section.
01241   // Every key/value must be in a [publisherqos/*] sub-section.
01242   ValueMap vm;
01243   if (pullValues(cf, section, vm) > 0) {
01244     ACE_ERROR_RETURN((LM_ERROR,
01245                       ACE_TEXT("(%P|%t) StaticDiscovery::parse_publisherqos ")
01246                       ACE_TEXT("[publisherqos] sections must have a subsection name\n")),
01247                       -1);
01248   }
01249   // Process the subsections of this section
01250   KeyList keys;
01251   if (processSections(cf, section, keys) != 0) {
01252     ACE_ERROR_RETURN((LM_ERROR,
01253                       ACE_TEXT("(%P|%t) StaticDiscovery::parse_publisherqos ")
01254                       ACE_TEXT("too many nesting layers in the [publisherqos] section.\n")),
01255                       -1);
01256   }
01257 
01258   // Loop through the [publisherqos/*] sections
01259   for (KeyList::const_iterator it = keys.begin(); it != keys.end(); ++it) {
01260     OPENDDS_STRING publisherqos_name = it->first;
01261 
01262     if (DCPS_debug_level > 0) {
01263       ACE_DEBUG((LM_NOTICE,
01264                   ACE_TEXT("(%P|%t) NOTICE: StaticDiscovery::parse_publisherqos ")
01265                   ACE_TEXT("processing [publisherqos/%C] section.\n"),
01266                   publisherqos_name.c_str()));
01267     }
01268 
01269     ValueMap values;
01270     pullValues(cf, it->second, values);
01271 
01272     DDS::PublisherQos publisherqos(TheServiceParticipant->initial_PublisherQos());
01273 
01274     for (ValueMap::const_iterator it = values.begin(); it != values.end(); ++it) {
01275       OPENDDS_STRING name = it->first;
01276       OPENDDS_STRING value = it->second;
01277 
01278       if (name == "presentation.access_scope") {
01279         if (value == "INSTANCE") {
01280           publisherqos.presentation.access_scope = DDS::INSTANCE_PRESENTATION_QOS;
01281         } else if (value == "TOPIC") {
01282           publisherqos.presentation.access_scope = DDS::TOPIC_PRESENTATION_QOS;
01283         } else if (value == "GROUP") {
01284           publisherqos.presentation.access_scope = DDS::GROUP_PRESENTATION_QOS;
01285         } else {
01286           ACE_ERROR_RETURN((LM_ERROR,
01287                             ACE_TEXT("(%P|%t) StaticDiscovery::parse_publisherqos ")
01288                             ACE_TEXT("Illegal value for presentation.access_scope (%C) in [publisherqos/%C] section.\n"),
01289                             value.c_str(), publisherqos_name.c_str()),
01290                             -1);
01291         }
01292       } else if (name == "presentation.coherent_access") {
01293         if (parse_bool(publisherqos.presentation.coherent_access, value)) {
01294         } else {
01295           ACE_ERROR_RETURN((LM_ERROR,
01296                             ACE_TEXT("(%P|%t) StaticDiscovery::parse_publisherqos ")
01297                             ACE_TEXT("Illegal value for presentation.coherent_access (%C) in [publisherqos/%C] section.\n"),
01298                             value.c_str(), publisherqos_name.c_str()),
01299                             -1);
01300         }
01301       } else if (name == "presentation.ordered_access") {
01302         if (parse_bool(publisherqos.presentation.ordered_access, value)) {
01303         } else {
01304           ACE_ERROR_RETURN((LM_ERROR,
01305                             ACE_TEXT("(%P|%t) StaticDiscovery::parse_publisherqos ")
01306                             ACE_TEXT("Illegal value for presentation.ordered_access (%C) in [publisherqos/%C] section.\n"),
01307                             value.c_str(), publisherqos_name.c_str()),
01308                             -1);
01309         }
01310       } else if (name == "partition.name") {
01311         parse_list(publisherqos.partition, value);
01312       } else {
01313         ACE_ERROR_RETURN((LM_ERROR,
01314                           ACE_TEXT("(%P|%t) StaticDiscovery::parse_publisherqos ")
01315                           ACE_TEXT("Unexpected entry (%C) in [publisherqos/%C] section.\n"),
01316                           name.c_str(), publisherqos_name.c_str()),
01317                           -1);
01318       }
01319     }
01320 
01321     registry.publisherqos_map[publisherqos_name] = publisherqos;
01322   }
01323 
01324   return 0;
01325 }
01326 
01327 int
01328 StaticDiscovery::parse_subscriberqos(ACE_Configuration_Heap& cf)
01329 {
01330   const ACE_Configuration_Section_Key& root = cf.root_section();
01331   ACE_Configuration_Section_Key section;
01332 
01333   if (cf.open_section(root, SUBSCRIBERQOS_SECTION_NAME, 0, section) != 0) {
01334     if (DCPS_debug_level > 0) {
01335       // This is not an error if the configuration file does not have
01336       // any subscriberqos (sub)section.
01337       ACE_DEBUG((LM_NOTICE,
01338                   ACE_TEXT("(%P|%t) NOTICE: StaticDiscovery::parse_subscriberqos ")
01339                   ACE_TEXT("no [%s] sections.\n"),
01340                   SUBSCRIBERQOS_SECTION_NAME));
01341     }
01342     return 0;
01343   }
01344 
01345   // Ensure there are no key/values in the [subscriberqos] section.
01346   // Every key/value must be in a [subscriberqos/*] sub-section.
01347   ValueMap vm;
01348   if (pullValues(cf, section, vm) > 0) {
01349     ACE_ERROR_RETURN((LM_ERROR,
01350                       ACE_TEXT("(%P|%t) StaticDiscovery::parse_subscriberqos ")
01351                       ACE_TEXT("[subscriberqos] sections must have a subsection name\n")),
01352                       -1);
01353   }
01354   // Process the subsections of this section
01355   KeyList keys;
01356   if (processSections(cf, section, keys) != 0) {
01357     ACE_ERROR_RETURN((LM_ERROR,
01358                       ACE_TEXT("(%P|%t) StaticDiscovery::parse_subscriberqos ")
01359                       ACE_TEXT("too many nesting layers in the [subscriberqos] section.\n")),
01360                       -1);
01361   }
01362 
01363   // Loop through the [subscriberqos/*] sections
01364   for (KeyList::const_iterator it = keys.begin(); it != keys.end(); ++it) {
01365     OPENDDS_STRING subscriberqos_name = it->first;
01366 
01367     if (DCPS_debug_level > 0) {
01368       ACE_DEBUG((LM_NOTICE,
01369                   ACE_TEXT("(%P|%t) NOTICE: StaticDiscovery::parse_subscriberqos ")
01370                   ACE_TEXT("processing [subscriberqos/%C] section.\n"),
01371                   subscriberqos_name.c_str()));
01372     }
01373 
01374     ValueMap values;
01375     pullValues(cf, it->second, values);
01376 
01377     DDS::SubscriberQos subscriberqos(TheServiceParticipant->initial_SubscriberQos());
01378 
01379     for (ValueMap::const_iterator it = values.begin(); it != values.end(); ++it) {
01380       OPENDDS_STRING name = it->first;
01381       OPENDDS_STRING value = it->second;
01382 
01383       if (name == "presentation.access_scope") {
01384         if (value == "INSTANCE") {
01385           subscriberqos.presentation.access_scope = DDS::INSTANCE_PRESENTATION_QOS;
01386         } else if (value == "TOPIC") {
01387           subscriberqos.presentation.access_scope = DDS::TOPIC_PRESENTATION_QOS;
01388         } else if (value == "GROUP") {
01389           subscriberqos.presentation.access_scope = DDS::GROUP_PRESENTATION_QOS;
01390         } else {
01391           ACE_ERROR_RETURN((LM_ERROR,
01392                             ACE_TEXT("(%P|%t) StaticDiscovery::parse_subscriberqos ")
01393                             ACE_TEXT("Illegal value for presentation.access_scope (%C) in [subscriberqos/%C] section.\n"),
01394                             value.c_str(), subscriberqos_name.c_str()),
01395                             -1);
01396         }
01397       } else if (name == "presentation.coherent_access") {
01398         if (parse_bool(subscriberqos.presentation.coherent_access, value)) {
01399         } else {
01400           ACE_ERROR_RETURN((LM_ERROR,
01401                             ACE_TEXT("(%P|%t) StaticDiscovery::parse_subscriberqos ")
01402                             ACE_TEXT("Illegal value for presentation.coherent_access (%C) in [subscriberqos/%C] section.\n"),
01403                             value.c_str(), subscriberqos_name.c_str()),
01404                             -1);
01405         }
01406       } else if (name == "presentation.ordered_access") {
01407         if (parse_bool(subscriberqos.presentation.ordered_access, value)) {
01408         } else {
01409           ACE_ERROR_RETURN((LM_ERROR,
01410                             ACE_TEXT("(%P|%t) StaticDiscovery::parse_subscriberqos ")
01411                             ACE_TEXT("Illegal value for presentation.ordered_access (%C) in [subscriberqos/%C] section.\n"),
01412                             value.c_str(), subscriberqos_name.c_str()),
01413                             -1);
01414         }
01415       } else if (name == "partition.name") {
01416         parse_list(subscriberqos.partition, value);
01417       } else {
01418         ACE_ERROR_RETURN((LM_ERROR,
01419                           ACE_TEXT("(%P|%t) StaticDiscovery::parse_subscriberqos ")
01420                           ACE_TEXT("Unexpected entry (%C) in [subscriberqos/%C] section.\n"),
01421                           name.c_str(), subscriberqos_name.c_str()),
01422                           -1);
01423       }
01424     }
01425 
01426    registry.subscriberqos_map[subscriberqos_name] = subscriberqos;
01427   }
01428 
01429   return 0;
01430 }
01431 
01432 int
01433 StaticDiscovery::parse_endpoints(ACE_Configuration_Heap& cf)
01434 {
01435   const ACE_Configuration_Section_Key& root = cf.root_section();
01436   ACE_Configuration_Section_Key section;
01437 
01438   if (cf.open_section(root, ENDPOINT_SECTION_NAME, 0, section) != 0) {
01439     if (DCPS_debug_level > 0) {
01440       // This is not an error if the configuration file does not have
01441       // any endpoint (sub)section.
01442       ACE_DEBUG((LM_NOTICE,
01443                   ACE_TEXT("(%P|%t) NOTICE: StaticDiscovery::parse_endpoints ")
01444                   ACE_TEXT("no [%s] sections.\n"),
01445                   ENDPOINT_SECTION_NAME));
01446     }
01447     return 0;
01448   }
01449 
01450   // Ensure there are no key/values in the [endpoint] section.
01451   // Every key/value must be in a [endpoint/*] sub-section.
01452   ValueMap vm;
01453   if (pullValues(cf, section, vm) > 0) {
01454     ACE_ERROR_RETURN((LM_ERROR,
01455                       ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
01456                       ACE_TEXT("[endpoint] sections must have a subsection name\n")),
01457                       -1);
01458   }
01459   // Process the subsections of this section
01460   KeyList keys;
01461   if (processSections(cf, section, keys) != 0) {
01462     ACE_ERROR_RETURN((LM_ERROR,
01463                       ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
01464                       ACE_TEXT("too many nesting layers in the [endpoint] section.\n")),
01465                       -1);
01466   }
01467 
01468   // Loop through the [endpoint/*] sections
01469   for (KeyList::const_iterator it = keys.begin(); it != keys.end(); ++it) {
01470     OPENDDS_STRING endpoint_name = it->first;
01471 
01472     if (DCPS_debug_level > 0) {
01473       ACE_DEBUG((LM_NOTICE,
01474                   ACE_TEXT("(%P|%t) NOTICE: StaticDiscovery::parse_endpoints ")
01475                   ACE_TEXT("processing [endpoint/%C] section.\n"),
01476                   endpoint_name.c_str()));
01477     }
01478 
01479     ValueMap values;
01480     pullValues(cf, it->second, values);
01481     int domain = 0;
01482     unsigned char participant[6];
01483     unsigned char entity[3];
01484     enum Type {
01485       Reader,
01486       Writer
01487     };
01488     Type type = Reader; // avoid warning
01489     OPENDDS_STRING topic_name;
01490     DDS::DataWriterQos datawriterqos(TheServiceParticipant->initial_DataWriterQos());
01491     DDS::DataReaderQos datareaderqos(TheServiceParticipant->initial_DataReaderQos());
01492     DDS::PublisherQos publisherqos(TheServiceParticipant->initial_PublisherQos());
01493     DDS::SubscriberQos subscriberqos(TheServiceParticipant->initial_SubscriberQos());
01494     TransportLocatorSeq trans_info;
01495     OPENDDS_STRING config_name;
01496 
01497     bool domainSpecified = false,
01498       participantSpecified = false,
01499       entitySpecified = false,
01500       typeSpecified = false,
01501       topic_name_Specified = false,
01502       config_name_Specified = false;
01503 
01504     for (ValueMap::const_iterator it = values.begin(); it != values.end(); ++it) {
01505       OPENDDS_STRING name = it->first;
01506       OPENDDS_STRING value = it->second;
01507 
01508       if (name == "domain") {
01509         if (convertToInteger(value, domain)) {
01510           domainSpecified = true;
01511         } else {
01512           ACE_ERROR_RETURN((LM_ERROR,
01513                             ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
01514                             ACE_TEXT("Illegal integer value for domain (%C) in [endpoint/%C] section.\n"),
01515                             value.c_str(), endpoint_name.c_str()),
01516                             -1);
01517         }
01518       } else if (name == "participant") {
01519 #ifdef __SUNPRO_CC
01520         int count = 0;
01521         std::count_if(value.begin(), value.end(), isxdigit, count);
01522         if (value.size() != 12 || count != 12) {
01523 #else
01524         if (value.size() != 12 ||
01525             std::count_if(value.begin(), value.end(), isxdigit) != 12) {
01526 #endif
01527           ACE_ERROR_RETURN((LM_ERROR,
01528                             ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
01529                             ACE_TEXT("participant (%C) must be 12 hexadecimal digits in [endpoint/%C] section.\n"),
01530                             value.c_str(), endpoint_name.c_str()),
01531                             -1);
01532         }
01533 
01534         for (size_t idx = 0; idx != 6; ++idx) {
01535           participant[idx] = fromhex(value, idx);
01536         }
01537         participantSpecified = true;
01538       } else if (name == "entity") {
01539 #ifdef __SUNPRO_CC
01540         int count = 0;
01541         std::count_if(value.begin(), value.end(), isxdigit, count);
01542         if (value.size() != 6 || count != 6) {
01543 #else
01544         if (value.size() != 6 ||
01545             std::count_if(value.begin(), value.end(), isxdigit) != 6) {
01546 #endif
01547           ACE_ERROR_RETURN((LM_ERROR,
01548                             ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
01549                             ACE_TEXT("entity (%C) must be 6 hexadecimal digits in [endpoint/%C] section.\n"),
01550                             value.c_str(), endpoint_name.c_str()),
01551                             -1);
01552         }
01553 
01554         for (size_t idx = 0; idx != 3; ++idx) {
01555           entity[idx] = fromhex(value, idx);
01556         }
01557         entitySpecified = true;
01558       } else if (name == "type") {
01559         if (value == "reader") {
01560           type = Reader;
01561           typeSpecified = true;
01562         } else if (value == "writer") {
01563           type = Writer;
01564           typeSpecified = true;
01565         } else {
01566           ACE_ERROR_RETURN((LM_ERROR,
01567                             ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
01568                             ACE_TEXT("Illegal string value for type (%C) in [endpoint/%C] section.\n"),
01569                             value.c_str(), endpoint_name.c_str()),
01570                             -1);
01571         }
01572       } else if (name == "topic") {
01573         EndpointRegistry::TopicMapType::const_iterator pos = this->registry.topic_map.find(value);
01574         if (pos != this->registry.topic_map.end()) {
01575           topic_name = pos->second.name;
01576           topic_name_Specified = true;
01577         } else {
01578           ACE_ERROR_RETURN((LM_ERROR,
01579                             ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
01580                             ACE_TEXT("Illegal topic reference (%C) in [endpoint/%C] section.\n"),
01581                             value.c_str(), endpoint_name.c_str()),
01582                             -1);
01583         }
01584       } else if (name == "datawriterqos") {
01585         EndpointRegistry::DataWriterQosMapType::const_iterator pos = this->registry.datawriterqos_map.find(value);
01586         if (pos != this->registry.datawriterqos_map.end()) {
01587           datawriterqos = pos->second;
01588         } else {
01589           ACE_ERROR_RETURN((LM_ERROR,
01590                             ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
01591                             ACE_TEXT("Illegal datawriterqos reference (%C) in [endpoint/%C] section.\n"),
01592                             value.c_str(), endpoint_name.c_str()),
01593                             -1);
01594         }
01595       } else if (name == "publisherqos") {
01596         EndpointRegistry::PublisherQosMapType::const_iterator pos = this->registry.publisherqos_map.find(value);
01597         if (pos != this->registry.publisherqos_map.end()) {
01598           publisherqos = pos->second;
01599         } else {
01600           ACE_ERROR_RETURN((LM_ERROR,
01601                             ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
01602                             ACE_TEXT("Illegal publisherqos reference (%C) in [endpoint/%C] section.\n"),
01603                             value.c_str(), endpoint_name.c_str()),
01604                             -1);
01605         }
01606       } else if (name == "datareaderqos") {
01607         EndpointRegistry::DataReaderQosMapType::const_iterator pos = this->registry.datareaderqos_map.find(value);
01608         if (pos != this->registry.datareaderqos_map.end()) {
01609           datareaderqos = pos->second;
01610         } else {
01611           ACE_ERROR_RETURN((LM_ERROR,
01612                             ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
01613                             ACE_TEXT("Illegal datareaderqos reference (%C) in [endpoint/%C] section.\n"),
01614                             value.c_str(), endpoint_name.c_str()),
01615                             -1);
01616         }
01617       } else if (name == "subscriberqos") {
01618         EndpointRegistry::SubscriberQosMapType::const_iterator pos = this->registry.subscriberqos_map.find(value);
01619         if (pos != this->registry.subscriberqos_map.end()) {
01620           subscriberqos = pos->second;
01621         } else {
01622           ACE_ERROR_RETURN((LM_ERROR,
01623                             ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
01624                             ACE_TEXT("Illegal subscriberqos reference (%C) in [endpoint/%C] section.\n"),
01625                             value.c_str(), endpoint_name.c_str()),
01626                             -1);
01627         }
01628       } else if (name == "config") {
01629         config_name = value;
01630         config_name_Specified = true;
01631       } else {
01632         ACE_ERROR_RETURN((LM_ERROR,
01633                           ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
01634                           ACE_TEXT("Unexpected entry (%C) in [endpoint/%C] section.\n"),
01635                           name.c_str(), endpoint_name.c_str()),
01636                           -1);
01637       }
01638     }
01639 
01640     if (!domainSpecified) {
01641       ACE_ERROR_RETURN((LM_ERROR,
01642                         ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
01643                         ACE_TEXT("No domain specified for [endpoint/%C] section.\n"),
01644                         endpoint_name.c_str()),
01645                         -1);
01646     }
01647 
01648     if (!participantSpecified) {
01649       ACE_ERROR_RETURN((LM_ERROR,
01650                         ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
01651                         ACE_TEXT("No participant specified for [endpoint/%C] section.\n"),
01652                         endpoint_name.c_str()),
01653                         -1);
01654     }
01655 
01656     if (!entitySpecified) {
01657       ACE_ERROR_RETURN((LM_ERROR,
01658                         ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
01659                         ACE_TEXT("No entity specified for [endpoint/%C] section.\n"),
01660                         endpoint_name.c_str()),
01661                         -1);
01662     }
01663 
01664     if (!typeSpecified) {
01665       ACE_ERROR_RETURN((LM_ERROR,
01666                         ACE_TEXT("(%P|%t) ERROR:StaticDiscovery::parse_endpoints ")
01667                         ACE_TEXT("No type specified for [endpoint/%C] section.\n"),
01668                         endpoint_name.c_str()),
01669                         -1);
01670     }
01671 
01672     if (!topic_name_Specified) {
01673       ACE_ERROR_RETURN((LM_ERROR,
01674                         ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
01675                         ACE_TEXT("No topic specified for [endpoint/%C] section.\n"),
01676                         endpoint_name.c_str()),
01677                         -1);
01678     }
01679 
01680     TransportConfig_rch config;
01681 
01682     if (config_name_Specified) {
01683       config = TheTransportRegistry->get_config(config_name);
01684       if (config.is_nil()) {
01685         ACE_ERROR_RETURN((LM_ERROR,
01686                           ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
01687                           ACE_TEXT("Illegal config reference (%C) in [endpoint/%C] section.\n"),
01688                           config_name.c_str(), endpoint_name.c_str()),
01689                           -1);
01690       }
01691     }
01692 
01693     if (config.is_nil() && domainSpecified) {
01694       config = TheTransportRegistry->domain_default_config(domain);
01695     }
01696 
01697     if (config.is_nil()) {
01698       config = TheTransportRegistry->global_config();
01699     }
01700 
01701     config->populate_locators(trans_info);
01702     if (trans_info.length() == 0) {
01703         ACE_ERROR_RETURN((LM_ERROR,
01704                           ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
01705                           ACE_TEXT("No locators for [endpoint/%C] section.\n"),
01706                           endpoint_name.c_str()),
01707                           -1);
01708     }
01709 
01710     EntityId_t entity_id = EndpointRegistry::build_id(entity,
01711       (type == Reader) ? ENTITYKIND_USER_READER_WITH_KEY : ENTITYKIND_USER_WRITER_WITH_KEY);
01712 
01713     RepoId id = EndpointRegistry::build_id(domain, participant, entity_id);
01714 
01715     if (DCPS_debug_level > 0) {
01716       ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DEBUG: StaticDiscovery::parse_endpoints adding entity with id %C\n"), LogGuid(id).c_str()));
01717     }
01718 
01719     switch (type) {
01720     case Reader:
01721       // Populate the userdata.
01722       datareaderqos.user_data.value.length(3);
01723       datareaderqos.user_data.value[0] = entity_id.entityKey[0];
01724       datareaderqos.user_data.value[1] = entity_id.entityKey[1];
01725       datareaderqos.user_data.value[2] = entity_id.entityKey[2];
01726 
01727       if (!registry.reader_map.insert(std::make_pair(id,
01728             EndpointRegistry::Reader(topic_name, datareaderqos, subscriberqos, config_name, trans_info))).second) {
01729         ACE_ERROR_RETURN((LM_ERROR,
01730                           ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
01731                           ACE_TEXT("Section [endpoint/%C] ignored - duplicate reader.\n"),
01732                           endpoint_name.c_str()),
01733                           -1);
01734       }
01735       break;
01736     case Writer:
01737       // Populate the userdata.
01738       datawriterqos.user_data.value.length(3);
01739       datawriterqos.user_data.value[0] = entity_id.entityKey[0];
01740       datawriterqos.user_data.value[1] = entity_id.entityKey[1];
01741       datawriterqos.user_data.value[2] = entity_id.entityKey[2];
01742 
01743       if (!registry.writer_map.insert(std::make_pair(id,
01744             EndpointRegistry::Writer(topic_name, datawriterqos, publisherqos, config_name, trans_info))).second) {
01745         ACE_ERROR_RETURN((LM_ERROR,
01746                           ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
01747                           ACE_TEXT("Section [endpoint/%C] ignored - duplicate writer.\n"),
01748                           endpoint_name.c_str()),
01749                           -1);
01750       }
01751       break;
01752     }
01753   }
01754 
01755   return 0;
01756 }
01757 
01758 void StaticDiscovery::pre_writer(DataWriterImpl* writer)
01759 {
01760   const DDS::Publisher_var pub = writer->get_publisher();
01761   const DDS::DomainParticipant_var part = pub->get_participant();
01762   const DDS::DomainId_t dom = part->get_domain_id();
01763 
01764   DDS::DomainParticipantQos partQos;
01765   part->get_qos(partQos);
01766   if (partQos.user_data.value.length() < 6)
01767     return;
01768   const unsigned char* const partId = partQos.user_data.value.get_buffer();
01769 
01770   DDS::DataWriterQos qos;
01771   writer->get_qos(qos);
01772   if (qos.user_data.value.length() < 3)
01773     return;
01774   const unsigned char* const dwId = qos.user_data.value.get_buffer();
01775 
01776   const EntityId_t entId =
01777     EndpointRegistry::build_id(dwId, ENTITYKIND_USER_WRITER_WITH_KEY);
01778   const RepoId rid = EndpointRegistry::build_id(dom, partId, entId);
01779 
01780   const EndpointRegistry::WriterMapType::const_iterator iter =
01781     registry.writer_map.find(rid);
01782 
01783   if (iter != registry.writer_map.end() && !iter->second.trans_cfg.empty()) {
01784     TransportRegistry::instance()->bind_config(iter->second.trans_cfg, writer);
01785   }
01786 }
01787 
01788 void StaticDiscovery::pre_reader(DataReaderImpl* reader)
01789 {
01790   const DDS::Subscriber_var sub = reader->get_subscriber();
01791   const DDS::DomainParticipant_var part = sub->get_participant();
01792   const DDS::DomainId_t dom = part->get_domain_id();
01793 
01794   DDS::DomainParticipantQos partQos;
01795   part->get_qos(partQos);
01796   if (partQos.user_data.value.length() < 6)
01797     return;
01798   const unsigned char* const partId = partQos.user_data.value.get_buffer();
01799 
01800   DDS::DataReaderQos qos;
01801   reader->get_qos(qos);
01802   if (qos.user_data.value.length() < 3)
01803     return;
01804   const unsigned char* const drId = qos.user_data.value.get_buffer();
01805 
01806   const EntityId_t entId =
01807     EndpointRegistry::build_id(drId, ENTITYKIND_USER_READER_WITH_KEY);
01808   const RepoId rid = EndpointRegistry::build_id(dom, partId, entId);
01809 
01810   const EndpointRegistry::ReaderMapType::const_iterator iter =
01811     registry.reader_map.find(rid);
01812 
01813   if (iter != registry.reader_map.end() && !iter->second.trans_cfg.empty()) {
01814     TransportRegistry::instance()->bind_config(iter->second.trans_cfg, reader);
01815   }
01816 }
01817 
01818 StaticDiscovery_rch StaticDiscovery::instance_(new StaticDiscovery(Discovery::DEFAULT_STATIC));
01819 
01820 }
01821 }
01822 

Generated on Fri Feb 12 20:05:27 2016 for OpenDDS by  doxygen 1.4.7