00001 #include "DCPS/DdsDcps_pch.h"
00002
00003 #include "StaticDiscovery.h"
00004 #include "dds/DCPS/debug.h"
00005 #include "dds/DCPS/ConfigUtils.h"
00006 #include "dds/DCPS/DomainParticipantImpl.h"
00007 #include "dds/DCPS/Marked_Default_Qos.h"
00008 #include "dds/DCPS/SubscriberImpl.h"
00009 #include "dds/DCPS/BuiltInTopicUtils.h"
00010 #include "dds/DCPS/Registered_Data_Types.h"
00011 #include "dds/DCPS/Qos_Helper.h"
00012 #include "dds/DCPS/DataWriterImpl.h"
00013 #include "dds/DCPS/transport/framework/TransportRegistry.h"
00014
00015 #include <ctype.h>
00016
00017 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00018
00019 namespace OpenDDS {
00020 namespace DCPS {
00021
00022 namespace {
00023 const size_t BYTES_IN_VENDOR = 2;
00024 const size_t HEX_DIGITS_IN_VENDOR = 2 * BYTES_IN_VENDOR;
00025 const size_t BYTES_IN_DOMAIN = 4;
00026 const size_t HEX_DIGITS_IN_DOMAIN = 2 * BYTES_IN_DOMAIN;
00027 const size_t BYTES_IN_PARTICIPANT = 6;
00028 const size_t HEX_DIGITS_IN_PARTICIPANT = 2 * BYTES_IN_PARTICIPANT;
00029 const size_t BYTES_IN_ENTITY = 3;
00030 const size_t HEX_DIGITS_IN_ENTITY = 2 * BYTES_IN_ENTITY;
00031 const size_t TYPE_NAME_MAX = 128;
00032 }
00033
00034 void EndpointRegistry::match()
00035 {
00036 for (WriterMapType::iterator wp = writer_map.begin(), wp_limit = writer_map.end();
00037 wp != wp_limit;
00038 ++wp) {
00039 const RepoId& writerid = wp->first;
00040 Writer& writer = wp->second;
00041 for (ReaderMapType::iterator rp = reader_map.begin(), rp_limit = reader_map.end();
00042 rp != rp_limit;
00043 ++rp) {
00044 const RepoId& readerid = rp->first;
00045 Reader& reader = rp->second;
00046
00047 if (StaticDiscGuidDomainEqual()(readerid.guidPrefix, writerid.guidPrefix) &&
00048 !StaticDiscGuidPartEqual()(readerid.guidPrefix, writerid.guidPrefix) &&
00049 reader.topic_name == writer.topic_name) {
00050
00051 IncompatibleQosStatus writerStatus = {0, 0, 0, DDS::QosPolicyCountSeq()};
00052 IncompatibleQosStatus readerStatus = {0, 0, 0, DDS::QosPolicyCountSeq()};
00053 const TransportLocatorSeq& writer_trans_info = writer.trans_info;
00054 const TransportLocatorSeq& reader_trans_info = reader.trans_info;
00055 const DDS::DataWriterQos& writer_qos = writer.qos;
00056 const DDS::DataReaderQos& reader_qos = reader.qos;
00057 const DDS::PublisherQos& publisher_qos = writer.publisher_qos;
00058 const DDS::SubscriberQos& subscriber_qos = reader.subscriber_qos;
00059
00060 if (compatibleQOS(&writerStatus, &readerStatus, writer_trans_info, reader_trans_info,
00061 &writer_qos, &reader_qos, &publisher_qos, &subscriber_qos)) {
00062 switch (reader.qos.reliability.kind) {
00063 case DDS::BEST_EFFORT_RELIABILITY_QOS:
00064 writer.best_effort_readers.insert(readerid);
00065 reader.best_effort_writers.insert(writerid);
00066 break;
00067 case DDS::RELIABLE_RELIABILITY_QOS:
00068 writer.reliable_readers.insert(readerid);
00069 reader.reliable_writers.insert(writerid);
00070 break;
00071 }
00072 }
00073 }
00074 }
00075 }
00076 }
00077
00078 StaticEndpointManager::StaticEndpointManager(const RepoId& participant_id,
00079 ACE_Thread_Mutex& lock,
00080 const EndpointRegistry& registry,
00081 StaticParticipant& participant)
00082 : EndpointManager<StaticDiscoveredParticipantData>(participant_id, lock)
00083 , registry_(registry)
00084 , participant_(participant)
00085 {
00086 pub_bit_key_.value[0] = pub_bit_key_.value[1] = pub_bit_key_.value[2] = 0;
00087 sub_bit_key_.value[0] = sub_bit_key_.value[1] = sub_bit_key_.value[2] = 0;
00088 }
00089
00090 void StaticEndpointManager::init_bit()
00091 {
00092
00093
00094 for (EndpointRegistry::WriterMapType::const_iterator pos = registry_.writer_map.begin(),
00095 limit = registry_.writer_map.end();
00096 pos != limit;
00097 ++pos) {
00098 const RepoId& remoteid = pos->first;
00099 const EndpointRegistry::Writer& writer = pos->second;
00100
00101 if (!GuidPrefixEqual()(participant_id_.guidPrefix, remoteid.guidPrefix)) {
00102 increment_key(pub_bit_key_);
00103 pub_key_to_id_[pub_bit_key_] = remoteid;
00104
00105
00106
00107 DDS::PublicationBuiltinTopicData data;
00108
00109 data.key = pub_bit_key_;
00110 OPENDDS_STRING topic_name = writer.topic_name;
00111 data.topic_name = topic_name.c_str();
00112 const EndpointRegistry::Topic& topic = registry_.topic_map.find(topic_name)->second;
00113 data.type_name = topic.type_name.c_str();
00114 data.durability = writer.qos.durability;
00115 data.durability_service = writer.qos.durability_service;
00116 data.deadline = writer.qos.deadline;
00117 data.latency_budget = writer.qos.latency_budget;
00118 data.liveliness = writer.qos.liveliness;
00119 data.reliability = writer.qos.reliability;
00120 data.lifespan = writer.qos.lifespan;
00121 data.user_data = writer.qos.user_data;
00122 data.ownership = writer.qos.ownership;
00123 data.ownership_strength = writer.qos.ownership_strength;
00124 data.destination_order = writer.qos.destination_order;
00125 data.presentation = writer.publisher_qos.presentation;
00126 data.partition = writer.publisher_qos.partition;
00127
00128
00129 data.group_data = writer.publisher_qos.group_data;
00130 #ifndef DDS_HAS_MINIMUM_BIT
00131 OpenDDS::DCPS::PublicationBuiltinTopicDataDataReaderImpl* bit = pub_bit();
00132 if (bit) {
00133 bit->store_synthetic_data(data, DDS::NEW_VIEW_STATE);
00134 }
00135 #endif
00136 }
00137 }
00138
00139 for (EndpointRegistry::ReaderMapType::const_iterator pos = registry_.reader_map.begin(),
00140 limit = registry_.reader_map.end();
00141 pos != limit;
00142 ++pos) {
00143 const RepoId& remoteid = pos->first;
00144 const EndpointRegistry::Reader& reader = pos->second;
00145
00146 if (!GuidPrefixEqual()(participant_id_.guidPrefix, remoteid.guidPrefix)) {
00147 increment_key(sub_bit_key_);
00148 sub_key_to_id_[sub_bit_key_] = remoteid;
00149
00150
00151
00152 DDS::SubscriptionBuiltinTopicData data;
00153
00154 data.key = sub_bit_key_;
00155 OPENDDS_STRING topic_name = reader.topic_name;
00156 data.topic_name = topic_name.c_str();
00157 const EndpointRegistry::Topic& topic = registry_.topic_map.find(topic_name)->second;
00158 data.type_name = topic.type_name.c_str();
00159 data.durability = reader.qos.durability;
00160 data.deadline = reader.qos.deadline;
00161 data.latency_budget = reader.qos.latency_budget;
00162 data.liveliness = reader.qos.liveliness;
00163 data.reliability = reader.qos.reliability;
00164 data.ownership = reader.qos.ownership;
00165 data.destination_order = reader.qos.destination_order;
00166 data.user_data = reader.qos.user_data;
00167 data.time_based_filter = reader.qos.time_based_filter;
00168 data.presentation = reader.subscriber_qos.presentation;
00169 data.partition = reader.subscriber_qos.partition;
00170
00171
00172 data.group_data = reader.subscriber_qos.group_data;
00173
00174 #ifndef DDS_HAS_MINIMUM_BIT
00175 OpenDDS::DCPS::SubscriptionBuiltinTopicDataDataReaderImpl* bit = sub_bit();
00176 if (bit) {
00177 bit->store_synthetic_data(data, DDS::NEW_VIEW_STATE);
00178 }
00179 #endif
00180 }
00181 }
00182 }
00183
00184 void StaticEndpointManager::assign_publication_key(RepoId& rid,
00185 const RepoId& ,
00186 const DDS::DataWriterQos& qos)
00187 {
00188 if (qos.user_data.value.length() != BYTES_IN_ENTITY) {
00189 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: StaticEndpointManager::assign_publication_key: no user data to identify writer\n")));
00190 return;
00191 }
00192
00193 rid.entityId.entityKey[0] = qos.user_data.value[0];
00194 rid.entityId.entityKey[1] = qos.user_data.value[1];
00195 rid.entityId.entityKey[2] = qos.user_data.value[2];
00196 rid.entityId.entityKind = ENTITYKIND_USER_WRITER_WITH_KEY;
00197
00198 if (DCPS_debug_level > 8) {
00199 ACE_DEBUG((LM_INFO, "(%P|%t) looking up writer ID %C\n",
00200 LogGuid(rid).c_str()));
00201 }
00202
00203 EndpointRegistry::WriterMapType::const_iterator pos = registry_.writer_map.find(rid);
00204 if (pos == registry_.writer_map.end()) {
00205 ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: StaticEndpointManager::assign_publication_key: unknown writer: %C\n"), LogGuid(rid).c_str()));
00206 return;
00207 }
00208
00209 DDS::DataWriterQos qos2(qos);
00210
00211 qos2.user_data = pos->second.qos.user_data;
00212
00213 DDS::DataWriterQos qos3(pos->second.qos);
00214
00215 if (qos2 != qos3) {
00216 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) WARNING: StaticEndpointManager::assign_publication_key: dynamic and static QoS differ\n")));
00217 }
00218 }
00219
00220 void StaticEndpointManager::assign_subscription_key(RepoId& rid,
00221 const RepoId& ,
00222 const DDS::DataReaderQos& qos)
00223 {
00224 if (qos.user_data.value.length() != BYTES_IN_ENTITY) {
00225 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: StaticEndpointManager::assign_subscription_key: no user data to identify reader\n")));
00226 return;
00227 }
00228
00229 rid.entityId.entityKey[0] = qos.user_data.value[0];
00230 rid.entityId.entityKey[1] = qos.user_data.value[1];
00231 rid.entityId.entityKey[2] = qos.user_data.value[2];
00232 rid.entityId.entityKind = ENTITYKIND_USER_READER_WITH_KEY;
00233
00234 EndpointRegistry::ReaderMapType::const_iterator pos = registry_.reader_map.find(rid);
00235 if (pos == registry_.reader_map.end()) {
00236 ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: StaticEndpointManager::assign_subscription_key: unknown reader: %C\n"), LogGuid(rid).c_str()));
00237 return;
00238 }
00239
00240 DDS::DataReaderQos qos2(qos);
00241
00242 qos2.user_data = pos->second.qos.user_data;
00243
00244 if (qos2 != pos->second.qos) {
00245 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) WARNING: StaticEndpointManager::assign_subscription_key: dynamic and static QoS differ\n")));
00246 }
00247 }
00248
00249 bool
00250 StaticEndpointManager::update_topic_qos(const RepoId& ,
00251 const DDS::TopicQos& ,
00252 OPENDDS_STRING& )
00253 {
00254 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: StaticEndpointManager::update_topic_qos - ")
00255 ACE_TEXT("Not allowed\n")));
00256 return false;
00257 }
00258
00259 bool
00260 StaticEndpointManager::update_publication_qos(const RepoId& ,
00261 const DDS::DataWriterQos& ,
00262 const DDS::PublisherQos& )
00263 {
00264 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: StaticEndpointManager::update_publication_qos - ")
00265 ACE_TEXT("Not allowed\n")));
00266 return false;
00267 }
00268
00269 bool
00270 StaticEndpointManager::update_subscription_qos(const RepoId& ,
00271 const DDS::DataReaderQos& ,
00272 const DDS::SubscriberQos& )
00273 {
00274 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: StaticEndpointManager::update_subscription_qos - ")
00275 ACE_TEXT("Not allowed\n")));
00276 return false;
00277 }
00278
00279 bool
00280 StaticEndpointManager::update_subscription_params(const RepoId& ,
00281 const DDS::StringSeq& )
00282 {
00283 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: StaticEndpointManager::update_subscription_qos - ")
00284 ACE_TEXT("Not allowed\n")));
00285 return false;
00286 }
00287
00288 void
00289 StaticEndpointManager::association_complete(const RepoId& ,
00290 const RepoId& )
00291 {
00292
00293 }
00294
00295 bool
00296 StaticEndpointManager::disassociate(const StaticDiscoveredParticipantData& )
00297 {
00298 ACE_DEBUG((LM_NOTICE, ACE_TEXT("(%P|%t) StaticEndpointManager::disassociate TODO\n")));
00299
00300 return false;
00301 }
00302
00303 DDS::ReturnCode_t
00304 StaticEndpointManager::add_publication_i(const RepoId& writerid,
00305 LocalPublication& pub)
00306 {
00307
00308
00309
00310
00311
00312 EndpointRegistry::WriterMapType::const_iterator pos = registry_.writer_map.find(writerid);
00313 if (pos == registry_.writer_map.end()) {
00314 return DDS::RETCODE_ERROR;
00315 }
00316 const EndpointRegistry::Writer& writer = pos->second;
00317
00318 for (RepoIdSet::const_iterator pos = writer.best_effort_readers.begin(), limit = writer.best_effort_readers.end();
00319 pos != limit;
00320 ++pos) {
00321 const RepoId& readerid = *pos;
00322 const EndpointRegistry::Reader& reader = registry_.reader_map.find(readerid)->second;
00323
00324 #ifdef __SUNPRO_CC
00325 ReaderAssociation ra;
00326 ra.readerTransInfo = reader.trans_info;
00327 ra.readerId = readerid;
00328 ra.subQos = reader.subscriber_qos;
00329 ra.readerQos = reader.qos;
00330 ra.filterClassName = "";
00331 ra.filterExpression = "";
00332 ra.exprParams = 0;
00333 #else
00334 const ReaderAssociation ra =
00335 {reader.trans_info, readerid, reader.subscriber_qos, reader.qos, "", "", 0};
00336 #endif
00337 pub.publication_->add_association(writerid, ra, true);
00338 pub.publication_->association_complete(readerid);
00339 }
00340
00341 for (RepoIdSet::const_iterator pos = writer.reliable_readers.begin(), limit = writer.reliable_readers.end();
00342 pos != limit;
00343 ++pos) {
00344 const RepoId& readerid = *pos;
00345 const EndpointRegistry::Reader& reader = registry_.reader_map.find(readerid)->second;
00346 pub.publication_->register_for_reader(participant_id_, writerid, readerid, reader.trans_info, this);
00347 }
00348
00349 return DDS::RETCODE_OK;
00350 }
00351
00352 DDS::ReturnCode_t
00353 StaticEndpointManager::remove_publication_i(const RepoId& writerid)
00354 {
00355 LocalPublicationMap::const_iterator lp_pos = local_publications_.find(writerid);
00356 if (lp_pos == local_publications_.end()) {
00357 return DDS::RETCODE_ERROR;
00358 }
00359
00360 const LocalPublication& pub = lp_pos->second;
00361
00362 EndpointRegistry::WriterMapType::const_iterator pos = registry_.writer_map.find(writerid);
00363 if (pos == registry_.writer_map.end()) {
00364 return DDS::RETCODE_ERROR;
00365 }
00366
00367 const EndpointRegistry::Writer& writer = pos->second;
00368
00369 ReaderIdSeq ids;
00370 ids.length((CORBA::ULong)writer.reliable_readers.size());
00371 CORBA::ULong idx = 0;
00372 for (RepoIdSet::const_iterator pos = writer.reliable_readers.begin(), limit = writer.reliable_readers.end();
00373 pos != limit;
00374 ++pos, ++idx) {
00375 const RepoId& readerid = *pos;
00376 ids[idx] = readerid;
00377 pub.publication_->unregister_for_reader(participant_id_, writerid, readerid);
00378 }
00379
00380 return DDS::RETCODE_OK;
00381 }
00382
00383 DDS::ReturnCode_t
00384 StaticEndpointManager::add_subscription_i(const RepoId& readerid,
00385 LocalSubscription& sub)
00386 {
00387
00388
00389
00390
00391
00392 EndpointRegistry::ReaderMapType::const_iterator pos = registry_.reader_map.find(readerid);
00393 if (pos == registry_.reader_map.end()) {
00394 return DDS::RETCODE_ERROR;
00395 }
00396 const EndpointRegistry::Reader& reader = pos->second;
00397
00398 for (RepoIdSet::const_iterator pos = reader.best_effort_writers.begin(), limit = reader.best_effort_writers.end();
00399 pos != limit;
00400 ++pos) {
00401 const RepoId& writerid = *pos;
00402 const EndpointRegistry::Writer& writer = registry_.writer_map.find(writerid)->second;
00403
00404 #ifdef __SUNPRO_CC
00405 WriterAssociation wa;
00406 wa.writerTransInfo = writer.trans_info;
00407 wa.writerId = writerid;
00408 wa.pubQos = writer.publisher_qos;
00409 wa.writerQos = writer.qos;
00410 #else
00411 const WriterAssociation wa =
00412 {writer.trans_info, writerid, writer.publisher_qos, writer.qos};
00413 #endif
00414 sub.subscription_->add_association(readerid, wa, false);
00415 }
00416
00417 for (RepoIdSet::const_iterator pos = reader.reliable_writers.begin(), limit = reader.reliable_writers.end();
00418 pos != limit;
00419 ++pos) {
00420 const RepoId& writerid = *pos;
00421 const EndpointRegistry::Writer& writer = registry_.writer_map.find(writerid)->second;
00422 sub.subscription_->register_for_writer(participant_id_, readerid, writerid, writer.trans_info, this);
00423 }
00424
00425 return DDS::RETCODE_OK;
00426 }
00427
00428 DDS::ReturnCode_t
00429 StaticEndpointManager::remove_subscription_i(const RepoId& readerid)
00430 {
00431 LocalSubscriptionMap::const_iterator ls_pos = local_subscriptions_.find(readerid);
00432 if (ls_pos == local_subscriptions_.end()) {
00433 return DDS::RETCODE_ERROR;
00434 }
00435
00436 const LocalSubscription& sub = ls_pos->second;
00437
00438 EndpointRegistry::ReaderMapType::const_iterator pos = registry_.reader_map.find(readerid);
00439 if (pos == registry_.reader_map.end()) {
00440 return DDS::RETCODE_ERROR;
00441 }
00442
00443 const EndpointRegistry::Reader& reader = pos->second;
00444
00445 WriterIdSeq ids;
00446 ids.length((CORBA::ULong)reader.reliable_writers.size());
00447 CORBA::ULong idx = 0;
00448 for (RepoIdSet::const_iterator pos = reader.reliable_writers.begin(), limit = reader.reliable_writers.end();
00449 pos != limit;
00450 ++pos, ++idx) {
00451 const RepoId& writerid = *pos;
00452 ids[idx] = writerid;
00453 sub.subscription_->unregister_for_writer(participant_id_, readerid, writerid);
00454 }
00455
00456 return DDS::RETCODE_OK;
00457 }
00458
00459 bool
00460 StaticEndpointManager::shutting_down() const
00461 {
00462 ACE_DEBUG((LM_NOTICE, ACE_TEXT("(%P|%t) StaticEndpointManager::shutting_down TODO\n")));
00463
00464 return false;
00465 }
00466
00467 void
00468 StaticEndpointManager::populate_transport_locator_sequence(TransportLocatorSeq*& ,
00469 DiscoveredSubscriptionIter& ,
00470 const RepoId& )
00471 {
00472 ACE_DEBUG((LM_NOTICE, ACE_TEXT("(%P|%t) StaticEndpointManager::populate_transport_locator_sequence TODO\n")));
00473
00474 }
00475
00476 void
00477 StaticEndpointManager::populate_transport_locator_sequence(TransportLocatorSeq*& ,
00478 DiscoveredPublicationIter& ,
00479 const RepoId& )
00480 {
00481 ACE_DEBUG((LM_NOTICE, ACE_TEXT("(%P|%t) StaticEndpointManager::populate_transport_locator_sequence TODO\n")));
00482
00483 }
00484
00485 bool
00486 StaticEndpointManager::defer_writer(const RepoId& ,
00487 const RepoId& )
00488 {
00489 ACE_DEBUG((LM_NOTICE, ACE_TEXT("(%P|%t) StaticEndpointManager::defer_writer TODO\n")));
00490
00491 return false;
00492 }
00493
00494 bool
00495 StaticEndpointManager::defer_reader(const RepoId& ,
00496 const RepoId& )
00497 {
00498
00499 ACE_DEBUG((LM_NOTICE, ACE_TEXT("(%P|%t) StaticEndpointManager::defer_reader TODO\n")));
00500 return false;
00501 }
00502
00503 void
00504 StaticEndpointManager::reader_exists(const RepoId& readerid, const RepoId& writerid)
00505 {
00506 ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00507 LocalPublicationMap::const_iterator lp_pos = local_publications_.find(writerid);
00508 EndpointRegistry::ReaderMapType::const_iterator reader_pos = registry_.reader_map.find(readerid);
00509 if (lp_pos != local_publications_.end() &&
00510 reader_pos != registry_.reader_map.end()) {
00511 DataWriterCallbacks* dwr = lp_pos->second.publication_;
00512 #ifdef __SUNPRO_CC
00513 ReaderAssociation ra;
00514 ra.readerTransInfo = reader_pos->second.trans_info;
00515 ra.readerId = readerid;
00516 ra.subQos = reader_pos->second.subscriber_qos;
00517 ra.readerQos = reader_pos->second.qos;
00518 ra.filterClassName = "";
00519 ra.filterExpression = "";
00520 ra.exprParams = 0;
00521 #else
00522 const ReaderAssociation ra =
00523 {reader_pos->second.trans_info, readerid, reader_pos->second.subscriber_qos, reader_pos->second.qos, "", "", 0};
00524
00525 #endif
00526 dwr->add_association(writerid, ra, true);
00527 dwr->association_complete(readerid);
00528 }
00529 }
00530
00531 void
00532 StaticEndpointManager::reader_does_not_exist(const RepoId& readerid, const RepoId& writerid)
00533 {
00534 ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00535 LocalPublicationMap::const_iterator lp_pos = local_publications_.find(writerid);
00536 EndpointRegistry::ReaderMapType::const_iterator reader_pos = registry_.reader_map.find(readerid);
00537 if (lp_pos != local_publications_.end() &&
00538 reader_pos != registry_.reader_map.end()) {
00539 DataWriterCallbacks* dwr = lp_pos->second.publication_;
00540 ReaderIdSeq ids;
00541 ids.length(1);
00542 ids[0] = readerid;
00543 dwr->remove_associations(ids, true);
00544 }
00545 }
00546
00547 void
00548 StaticEndpointManager::writer_exists(const RepoId& writerid, const RepoId& readerid)
00549 {
00550 ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00551 LocalSubscriptionMap::const_iterator ls_pos = local_subscriptions_.find(readerid);
00552 EndpointRegistry::WriterMapType::const_iterator writer_pos = registry_.writer_map.find(writerid);
00553 if (ls_pos != local_subscriptions_.end() &&
00554 writer_pos != registry_.writer_map.end()) {
00555 DataReaderCallbacks* drr = ls_pos->second.subscription_;
00556 #ifdef __SUNPRO_CC
00557 WriterAssociation wa;
00558 wa.writerTransInfo = writer_pos->second.trans_info;
00559 wa.writerId = writerid;
00560 wa.pubQos = writer_pos->second.publisher_qos;
00561 wa.writerQos = writer_pos->second.qos;
00562 #else
00563 const WriterAssociation wa =
00564 {writer_pos->second.trans_info, writerid, writer_pos->second.publisher_qos, writer_pos->second.qos};
00565 #endif
00566 drr->add_association(readerid, wa, false);
00567 }
00568 }
00569
00570 void
00571 StaticEndpointManager::writer_does_not_exist(const RepoId& writerid, const RepoId& readerid)
00572 {
00573 ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00574 LocalSubscriptionMap::const_iterator ls_pos = local_subscriptions_.find(readerid);
00575 EndpointRegistry::WriterMapType::const_iterator writer_pos = registry_.writer_map.find(writerid);
00576 if (ls_pos != local_subscriptions_.end() &&
00577 writer_pos != registry_.writer_map.end()) {
00578 DataReaderCallbacks* drr = ls_pos->second.subscription_;
00579 WriterIdSeq ids;
00580 ids.length(1);
00581 ids[0] = writerid;
00582 drr->remove_associations(ids, true);
00583 }
00584 }
00585
00586 #ifndef DDS_HAS_MINIMUM_BIT
00587 OpenDDS::DCPS::PublicationBuiltinTopicDataDataReaderImpl*
00588 StaticEndpointManager::pub_bit()
00589 {
00590 DDS::Subscriber_var sub = participant_.bit_subscriber();
00591 if (!sub.in())
00592 return 0;
00593
00594 DDS::DataReader_var d = sub->lookup_datareader(BUILT_IN_PUBLICATION_TOPIC);
00595 return dynamic_cast<OpenDDS::DCPS::PublicationBuiltinTopicDataDataReaderImpl*>(d.in());
00596 }
00597
00598 OpenDDS::DCPS::SubscriptionBuiltinTopicDataDataReaderImpl*
00599 StaticEndpointManager::sub_bit()
00600 {
00601 DDS::Subscriber_var sub = participant_.bit_subscriber();
00602 if (!sub.in())
00603 return 0;
00604
00605 DDS::DataReader_var d = sub->lookup_datareader(BUILT_IN_SUBSCRIPTION_TOPIC);
00606 return dynamic_cast<OpenDDS::DCPS::SubscriptionBuiltinTopicDataDataReaderImpl*>(d.in());
00607 }
00608 #endif
00609
00610 StaticDiscovery::StaticDiscovery(const RepoKey& key)
00611 : PeerDiscovery<StaticParticipant>(key)
00612 {}
00613
00614 namespace {
00615 unsigned char hextobyte(unsigned char c)
00616 {
00617 if (c >= '0' && c <= '9') {
00618 return c - '0';
00619 }
00620 if (c >= 'a' && c <= 'f') {
00621 return 10 + c - 'a';
00622 }
00623 if (c >= 'A' && c <= 'F') {
00624 return 10 + c - 'A';
00625 }
00626 return c;
00627 }
00628
00629 unsigned char
00630 fromhex(const OPENDDS_STRING& x, size_t idx)
00631 {
00632 return (hextobyte(x[idx * 2]) << 4) | (hextobyte(x[idx * 2 + 1]));
00633 }
00634 }
00635
00636 EntityId_t
00637 EndpointRegistry::build_id(const unsigned char* entity_key,
00638 const unsigned char entity_kind)
00639 {
00640 EntityId_t retval;
00641 retval.entityKey[0] = entity_key[0];
00642 retval.entityKey[1] = entity_key[1];
00643 retval.entityKey[2] = entity_key[2];
00644 retval.entityKind = entity_kind;
00645 return retval;
00646 }
00647
00648 RepoId
00649 EndpointRegistry::build_id(DDS::DomainId_t domain,
00650 const unsigned char* participant_id,
00651 const EntityId_t& entity_id)
00652 {
00653 RepoId id;
00654 id.guidPrefix[0] = VENDORID_OCI[0];
00655 id.guidPrefix[1] = VENDORID_OCI[1];
00656
00657
00658
00659
00660 DDS::DomainId_t netdom = ACE_HTONL(domain);
00661 ACE_OS::memcpy(&id.guidPrefix[2], &netdom, sizeof(DDS::DomainId_t));
00662
00663
00664
00665
00666
00667
00668 ACE_OS::memcpy(&id.guidPrefix[6], participant_id, 6);
00669 id.entityId = entity_id;
00670 return id;
00671 }
00672
00673 OpenDDS::DCPS::RepoId
00674 StaticDiscovery::generate_participant_guid()
00675 {
00676 return GUID_UNKNOWN;
00677 }
00678
00679 AddDomainStatus
00680 StaticDiscovery::add_domain_participant(DDS::DomainId_t domain,
00681 const DDS::DomainParticipantQos& qos)
00682 {
00683 AddDomainStatus ads = {RepoId(), false };
00684
00685 if (qos.user_data.value.length() != BYTES_IN_PARTICIPANT) {
00686 ACE_ERROR((LM_ERROR,
00687 ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::add_domain_participant ")
00688 ACE_TEXT("No userdata to identify participant\n")));
00689 return ads;
00690 }
00691
00692 RepoId id = EndpointRegistry::build_id(domain,
00693 qos.user_data.value.get_buffer(),
00694 ENTITYID_PARTICIPANT);
00695 if (!get_part(domain, id).is_nil()) {
00696 ACE_ERROR((LM_ERROR,
00697 ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::add_domain_participant ")
00698 ACE_TEXT("Duplicate participant\n")));
00699 return ads;
00700 }
00701
00702 const RcHandle<StaticParticipant> participant (make_rch<StaticParticipant>(ref(id), qos, registry));
00703
00704 {
00705 ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, ads);
00706 participants_[domain][id] = participant;
00707 }
00708
00709 ads.id = id;
00710 return ads;
00711 }
00712
00713 #if defined(OPENDDS_SECURITY)
00714 AddDomainStatus
00715 StaticDiscovery::add_domain_participant_secure(
00716 DDS::DomainId_t ,
00717 const DDS::DomainParticipantQos& ,
00718 const OpenDDS::DCPS::RepoId& ,
00719 DDS::Security::IdentityHandle ,
00720 DDS::Security::PermissionsHandle ,
00721 DDS::Security::ParticipantCryptoHandle )
00722 {
00723 const DCPS::AddDomainStatus ads = {OpenDDS::DCPS::GUID_UNKNOWN, false };
00724 ACE_ERROR((LM_ERROR,
00725 ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::add_domain_participant_secure ")
00726 ACE_TEXT("Security not supported for static discovery.\n")));
00727 return ads;
00728 }
00729 #endif
00730
00731 namespace {
00732 const ACE_TCHAR TOPIC_SECTION_NAME[] = ACE_TEXT("topic");
00733 const ACE_TCHAR DATAWRITERQOS_SECTION_NAME[] = ACE_TEXT("datawriterqos");
00734 const ACE_TCHAR DATAREADERQOS_SECTION_NAME[] = ACE_TEXT("datareaderqos");
00735 const ACE_TCHAR PUBLISHERQOS_SECTION_NAME[] = ACE_TEXT("publisherqos");
00736 const ACE_TCHAR SUBSCRIBERQOS_SECTION_NAME[] = ACE_TEXT("subscriberqos");
00737 const ACE_TCHAR ENDPOINT_SECTION_NAME[] = ACE_TEXT("endpoint");
00738
00739 void parse_second(CORBA::Long& x, const OPENDDS_STRING& value)
00740 {
00741 if (value == "DURATION_INFINITE_SEC") {
00742 x = DDS::DURATION_INFINITE_SEC;
00743 } else {
00744 x = atoi(value.c_str());
00745 }
00746 }
00747
00748 void parse_nanosecond(CORBA::ULong& x, const OPENDDS_STRING& value)
00749 {
00750 if (value == "DURATION_INFINITE_NANOSEC") {
00751 x = DDS::DURATION_INFINITE_NSEC;
00752 } else {
00753 x = atoi(value.c_str());
00754 }
00755 }
00756
00757 bool parse_bool(CORBA::Boolean& x, const OPENDDS_STRING& value)
00758 {
00759 if (value == "true") {
00760 x = true;
00761 return true;
00762 } else if (value == "false") {
00763 x = false;
00764 return true;
00765 }
00766 return false;
00767 }
00768
00769 void parse_list(DDS::PartitionQosPolicy& x, const OPENDDS_STRING& value)
00770 {
00771
00772 const char* start = value.c_str();
00773 char buffer[128];
00774 std::memset(buffer, 0, sizeof(buffer));
00775 while (const char* next_comma = std::strchr(start, ',')) {
00776
00777 std::strncpy(buffer, start, next_comma - start);
00778
00779 buffer[next_comma - start] = '\0';
00780
00781 x.name.length(x.name.length() + 1);
00782 x.name[x.name.length() - 1] = static_cast<const char*>(buffer);
00783
00784 start = next_comma + 1;
00785 }
00786
00787 x.name.length(x.name.length() + 1);
00788 x.name[x.name.length() - 1] = start;
00789 }
00790 }
00791
00792 int
00793 StaticDiscovery::load_configuration(ACE_Configuration_Heap& cf)
00794 {
00795 if (parse_topics(cf) ||
00796 parse_datawriterqos(cf) ||
00797 parse_datareaderqos(cf) ||
00798 parse_publisherqos(cf) ||
00799 parse_subscriberqos(cf) ||
00800 parse_endpoints(cf)) {
00801 return -1;
00802 }
00803
00804 registry.match();
00805
00806 return 0;
00807 }
00808
00809 int
00810 StaticDiscovery::parse_topics(ACE_Configuration_Heap& cf)
00811 {
00812 const ACE_Configuration_Section_Key& root = cf.root_section();
00813 ACE_Configuration_Section_Key section;
00814
00815 if (cf.open_section(root, TOPIC_SECTION_NAME, 0, section) != 0) {
00816 if (DCPS_debug_level > 0) {
00817
00818
00819 ACE_DEBUG((LM_NOTICE,
00820 ACE_TEXT("(%P|%t) NOTICE: StaticDiscovery::parse_topics ")
00821 ACE_TEXT("no [%s] sections.\n"),
00822 TOPIC_SECTION_NAME));
00823 }
00824 return 0;
00825 }
00826
00827
00828
00829 ValueMap vm;
00830 if (pullValues(cf, section, vm) > 0) {
00831 ACE_ERROR_RETURN((LM_ERROR,
00832 ACE_TEXT("(%P|%t) StaticDiscovery::parse_topics ")
00833 ACE_TEXT("[topic] sections must have a subsection name\n")),
00834 -1);
00835 }
00836
00837 KeyList keys;
00838 if (processSections(cf, section, keys) != 0) {
00839 ACE_ERROR_RETURN((LM_ERROR,
00840 ACE_TEXT("(%P|%t) StaticDiscovery::parse_topics ")
00841 ACE_TEXT("too many nesting layers in the [topic] section.\n")),
00842 -1);
00843 }
00844
00845
00846 for (KeyList::const_iterator it = keys.begin(); it != keys.end(); ++it) {
00847 OPENDDS_STRING topic_name = it->first;
00848
00849 if (DCPS_debug_level > 0) {
00850 ACE_DEBUG((LM_NOTICE,
00851 ACE_TEXT("(%P|%t) NOTICE: StaticDiscovery::parse_topics ")
00852 ACE_TEXT("processing [topic/%C] section.\n"),
00853 topic_name.c_str()));
00854 }
00855
00856 ValueMap values;
00857 pullValues(cf, it->second, values);
00858
00859 EndpointRegistry::Topic topic;
00860 bool name_specified = false,
00861 type_name_specified = false;
00862
00863 for (ValueMap::const_iterator it = values.begin(); it != values.end(); ++it) {
00864 OPENDDS_STRING name = it->first;
00865 OPENDDS_STRING value = it->second;
00866
00867 if (name == "name") {
00868 topic.name = value;
00869 name_specified = true;
00870 } else if (name == "type_name") {
00871 if (value.size() >= TYPE_NAME_MAX) {
00872 ACE_ERROR_RETURN((LM_ERROR,
00873 ACE_TEXT("(%P|%t) StaticDiscovery::parse_topics ")
00874 ACE_TEXT("type_name (%C) must be less than 128 characters in [topic/%C] section.\n"),
00875 value.c_str(), topic_name.c_str()),
00876 -1);
00877 }
00878 topic.type_name = value;
00879 type_name_specified = true;
00880 } else {
00881
00882 }
00883 }
00884
00885 if (!name_specified) {
00886 topic.name = topic_name;
00887 }
00888
00889 if (!type_name_specified) {
00890 ACE_ERROR_RETURN((LM_ERROR,
00891 ACE_TEXT("(%P|%t) StaticDiscovery::parse_topics ")
00892 ACE_TEXT("No type_name specified for [topic/%C] section.\n"),
00893 topic_name.c_str()),
00894 -1);
00895 }
00896
00897 registry.topic_map[topic_name] = topic;
00898 }
00899
00900 return 0;
00901 }
00902
00903 int
00904 StaticDiscovery::parse_datawriterqos(ACE_Configuration_Heap& cf)
00905 {
00906 const ACE_Configuration_Section_Key& root = cf.root_section();
00907 ACE_Configuration_Section_Key section;
00908
00909 if (cf.open_section(root, DATAWRITERQOS_SECTION_NAME, 0, section) != 0) {
00910 if (DCPS_debug_level > 0) {
00911
00912
00913 ACE_DEBUG((LM_NOTICE,
00914 ACE_TEXT("(%P|%t) NOTICE: StaticDiscovery::parse_datawriterqos ")
00915 ACE_TEXT("no [%s] sections.\n"),
00916 DATAWRITERQOS_SECTION_NAME));
00917 }
00918 return 0;
00919 }
00920
00921
00922
00923 ValueMap vm;
00924 if (pullValues(cf, section, vm) > 0) {
00925 ACE_ERROR_RETURN((LM_ERROR,
00926 ACE_TEXT("(%P|%t) StaticDiscovery::parse_datawriterqos ")
00927 ACE_TEXT("[datawriterqos] sections must have a subsection name\n")),
00928 -1);
00929 }
00930
00931 KeyList keys;
00932 if (processSections(cf, section, keys) != 0) {
00933 ACE_ERROR_RETURN((LM_ERROR,
00934 ACE_TEXT("(%P|%t) StaticDiscovery::parse_datawriterqos ")
00935 ACE_TEXT("too many nesting layers in the [datawriterqos] section.\n")),
00936 -1);
00937 }
00938
00939
00940 for (KeyList::const_iterator it = keys.begin(); it != keys.end(); ++it) {
00941 OPENDDS_STRING datawriterqos_name = it->first;
00942
00943 if (DCPS_debug_level > 0) {
00944 ACE_DEBUG((LM_NOTICE,
00945 ACE_TEXT("(%P|%t) NOTICE: StaticDiscovery::parse_datawriterqos ")
00946 ACE_TEXT("processing [datawriterqos/%C] section.\n"),
00947 datawriterqos_name.c_str()));
00948 }
00949
00950 ValueMap values;
00951 pullValues(cf, it->second, values);
00952
00953 DDS::DataWriterQos datawriterqos(TheServiceParticipant->initial_DataWriterQos());
00954
00955 for (ValueMap::const_iterator it = values.begin(); it != values.end(); ++it) {
00956 OPENDDS_STRING name = it->first;
00957 OPENDDS_STRING value = it->second;
00958
00959 if (name == "durability.kind") {
00960 if (value == "VOLATILE") {
00961 datawriterqos.durability.kind = DDS::VOLATILE_DURABILITY_QOS;
00962 } else if (value == "TRANSIENT_LOCAL") {
00963 datawriterqos.durability.kind = DDS::TRANSIENT_LOCAL_DURABILITY_QOS;
00964 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
00965 } else if (value == "TRANSIENT") {
00966 datawriterqos.durability.kind = DDS::TRANSIENT_DURABILITY_QOS;
00967 } else if (value == "PERSISTENT") {
00968 datawriterqos.durability.kind = DDS::PERSISTENT_DURABILITY_QOS;
00969 #endif
00970 } else {
00971 ACE_ERROR_RETURN((LM_ERROR,
00972 ACE_TEXT("(%P|%t) StaticDiscovery::parse_datawriterqos ")
00973 ACE_TEXT("Illegal value for durability.kind (%C) in [datawriterqos/%C] section.\n"),
00974 value.c_str(), datawriterqos_name.c_str()),
00975 -1);
00976 }
00977 } else if (name == "deadline.period.sec") {
00978 parse_second(datawriterqos.deadline.period.sec, value);
00979 } else if (name == "deadline.period.nanosec") {
00980 parse_nanosecond(datawriterqos.deadline.period.nanosec, value);
00981 } else if (name == "latency_budget.duration.sec") {
00982 parse_second(datawriterqos.latency_budget.duration.sec, value);
00983 } else if (name == "latency_budget.duration.nanosec") {
00984 parse_nanosecond(datawriterqos.latency_budget.duration.nanosec, value);
00985 } else if (name == "liveliness.kind") {
00986 if (value == "AUTOMATIC") {
00987 datawriterqos.liveliness.kind = DDS::AUTOMATIC_LIVELINESS_QOS;
00988 } else if (value == "MANUAL_BY_TOPIC") {
00989 datawriterqos.liveliness.kind = DDS::MANUAL_BY_TOPIC_LIVELINESS_QOS;
00990 } else if (value == "MANUAL_BY_PARTICIPANT") {
00991 datawriterqos.liveliness.kind = DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS;
00992 } else {
00993 ACE_ERROR_RETURN((LM_ERROR,
00994 ACE_TEXT("(%P|%t) StaticDiscovery::parse_datawriterqos ")
00995 ACE_TEXT("Illegal value for liveliness.kind (%C) in [datawriterqos/%C] section.\n"),
00996 value.c_str(), datawriterqos_name.c_str()),
00997 -1);
00998 }
00999 } else if (name == "liveliness.lease_duration.sec") {
01000 parse_second(datawriterqos.liveliness.lease_duration.sec, value);
01001 } else if (name == "liveliness.lease_duration.nanosec") {
01002 parse_nanosecond(datawriterqos.liveliness.lease_duration.nanosec, value);
01003 } else if (name == "reliability.kind") {
01004 if (value == "BEST_EFFORT") {
01005 datawriterqos.reliability.kind = DDS::BEST_EFFORT_RELIABILITY_QOS;
01006 } else if (value == "RELIABLE") {
01007 datawriterqos.reliability.kind = DDS::RELIABLE_RELIABILITY_QOS;
01008 } else {
01009 ACE_ERROR_RETURN((LM_ERROR,
01010 ACE_TEXT("(%P|%t) StaticDiscovery::parse_datawriterqos ")
01011 ACE_TEXT("Illegal value for reliability.kind (%C) in [datawriterqos/%C] section.\n"),
01012 value.c_str(), datawriterqos_name.c_str()),
01013 -1);
01014 }
01015 } else if (name == "reliability.max_blocking_time.sec") {
01016 parse_second(datawriterqos.reliability.max_blocking_time.sec, value);
01017 } else if (name == "reliability.max_blocking_time.nanosec") {
01018 parse_nanosecond(datawriterqos.reliability.max_blocking_time.nanosec, value);
01019 } else if (name == "destination_order.kind") {
01020 if (value == "BY_RECEPTION_TIMESTAMP") {
01021 datawriterqos.destination_order.kind = DDS::BY_RECEPTION_TIMESTAMP_DESTINATIONORDER_QOS;
01022 } else if (value == "BY_SOURCE_TIMESTAMP") {
01023 datawriterqos.destination_order.kind = DDS::BY_SOURCE_TIMESTAMP_DESTINATIONORDER_QOS;
01024 } else {
01025 ACE_ERROR_RETURN((LM_ERROR,
01026 ACE_TEXT("(%P|%t) StaticDiscovery::parse_datawriterqos ")
01027 ACE_TEXT("Illegal value for destination_order.kind (%C) in [datawriterqos/%C] section.\n"),
01028 value.c_str(), datawriterqos_name.c_str()),
01029 -1);
01030 }
01031 } else if (name == "history.kind") {
01032 if (value == "KEEP_ALL") {
01033 datawriterqos.history.kind = DDS::KEEP_ALL_HISTORY_QOS;
01034 } else if (value == "KEEP_LAST") {
01035 datawriterqos.history.kind = DDS::KEEP_LAST_HISTORY_QOS;
01036 } else {
01037 ACE_ERROR_RETURN((LM_ERROR,
01038 ACE_TEXT("(%P|%t) StaticDiscovery::parse_datawriterqos ")
01039 ACE_TEXT("Illegal value for history.kind (%C) in [datawriterqos/%C] section.\n"),
01040 value.c_str(), datawriterqos_name.c_str()),
01041 -1);
01042 }
01043 } else if (name == "history.depth") {
01044 datawriterqos.history.depth = atoi(value.c_str());
01045 } else if (name == "resource_limits.max_samples") {
01046 datawriterqos.resource_limits.max_samples = atoi(value.c_str());
01047 } else if (name == "resource_limits.max_instances") {
01048 datawriterqos.resource_limits.max_instances = atoi(value.c_str());
01049 } else if (name == "resource_limits.max_samples_per_instance") {
01050 datawriterqos.resource_limits.max_samples_per_instance = atoi(value.c_str());
01051 } else if (name == "transport_priority.value") {
01052 datawriterqos.transport_priority.value = atoi(value.c_str());
01053 } else if (name == "lifespan.duration.sec") {
01054 parse_second(datawriterqos.lifespan.duration.sec, value);
01055 } else if (name == "lifespan.duration.nanosec") {
01056 parse_nanosecond(datawriterqos.lifespan.duration.nanosec, value);
01057 } else if (name == "ownership.kind") {
01058 if (value == "SHARED") {
01059 datawriterqos.ownership.kind = DDS::SHARED_OWNERSHIP_QOS;
01060 } else if (value == "EXCLUSIVE") {
01061 datawriterqos.ownership.kind = DDS::EXCLUSIVE_OWNERSHIP_QOS;
01062 } else {
01063 ACE_ERROR_RETURN((LM_ERROR,
01064 ACE_TEXT("(%P|%t) StaticDiscovery::parse_datawriterqos ")
01065 ACE_TEXT("Illegal value for ownership.kind (%C) in [datawriterqos/%C] section.\n"),
01066 value.c_str(), datawriterqos_name.c_str()),
01067 -1);
01068 }
01069 } else if (name == "ownership_strength.value") {
01070 datawriterqos.ownership_strength.value = atoi(value.c_str());
01071 } else {
01072 ACE_ERROR_RETURN((LM_ERROR,
01073 ACE_TEXT("(%P|%t) StaticDiscovery::parse_datawriterqos ")
01074 ACE_TEXT("Unexpected entry (%C) in [datawriterqos/%C] section.\n"),
01075 name.c_str(), datawriterqos_name.c_str()),
01076 -1);
01077 }
01078 }
01079
01080 registry.datawriterqos_map[datawriterqos_name] = datawriterqos;
01081 }
01082
01083 return 0;
01084 }
01085
01086 int
01087 StaticDiscovery::parse_datareaderqos(ACE_Configuration_Heap& cf)
01088 {
01089 const ACE_Configuration_Section_Key& root = cf.root_section();
01090 ACE_Configuration_Section_Key section;
01091
01092 if (cf.open_section(root, DATAREADERQOS_SECTION_NAME, 0, section) != 0) {
01093 if (DCPS_debug_level > 0) {
01094
01095
01096 ACE_DEBUG((LM_NOTICE,
01097 ACE_TEXT("(%P|%t) NOTICE: StaticDiscovery::parse_datareaderqos ")
01098 ACE_TEXT("no [%s] sections.\n"),
01099 DATAREADERQOS_SECTION_NAME));
01100 }
01101 return 0;
01102 }
01103
01104
01105
01106 ValueMap vm;
01107 if (pullValues(cf, section, vm) > 0) {
01108 ACE_ERROR_RETURN((LM_ERROR,
01109 ACE_TEXT("(%P|%t) StaticDiscovery::parse_datareaderqos ")
01110 ACE_TEXT("[datareaderqos] sections must have a subsection name\n")),
01111 -1);
01112 }
01113
01114 KeyList keys;
01115 if (processSections(cf, section, keys) != 0) {
01116 ACE_ERROR_RETURN((LM_ERROR,
01117 ACE_TEXT("(%P|%t) StaticDiscovery::parse_datareaderqos ")
01118 ACE_TEXT("too many nesting layers in the [datareaderqos] section.\n")),
01119 -1);
01120 }
01121
01122
01123 for (KeyList::const_iterator it = keys.begin(); it != keys.end(); ++it) {
01124 OPENDDS_STRING datareaderqos_name = it->first;
01125
01126 if (DCPS_debug_level > 0) {
01127 ACE_DEBUG((LM_NOTICE,
01128 ACE_TEXT("(%P|%t) NOTICE: StaticDiscovery::parse_datareaderqos ")
01129 ACE_TEXT("processing [datareaderqos/%C] section.\n"),
01130 datareaderqos_name.c_str()));
01131 }
01132
01133 ValueMap values;
01134 pullValues(cf, it->second, values);
01135
01136 DDS::DataReaderQos datareaderqos(TheServiceParticipant->initial_DataReaderQos());
01137
01138 for (ValueMap::const_iterator it = values.begin(); it != values.end(); ++it) {
01139 OPENDDS_STRING name = it->first;
01140 OPENDDS_STRING value = it->second;
01141
01142 if (name == "durability.kind") {
01143 if (value == "VOLATILE") {
01144 datareaderqos.durability.kind = DDS::VOLATILE_DURABILITY_QOS;
01145 } else if (value == "TRANSIENT_LOCAL") {
01146 datareaderqos.durability.kind = DDS::TRANSIENT_LOCAL_DURABILITY_QOS;
01147 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
01148 } else if (value == "TRANSIENT") {
01149 datareaderqos.durability.kind = DDS::TRANSIENT_DURABILITY_QOS;
01150 } else if (value == "PERSISTENT") {
01151 datareaderqos.durability.kind = DDS::PERSISTENT_DURABILITY_QOS;
01152 #endif
01153 } else {
01154 ACE_ERROR_RETURN((LM_ERROR,
01155 ACE_TEXT("(%P|%t) StaticDiscovery::parse_datareaderqos ")
01156 ACE_TEXT("Illegal value for durability.kind (%C) in [datareaderqos/%C] section.\n"),
01157 value.c_str(), datareaderqos_name.c_str()),
01158 -1);
01159 }
01160 } else if (name == "deadline.period.sec") {
01161 parse_second(datareaderqos.deadline.period.sec, value);
01162 } else if (name == "deadline.period.nanosec") {
01163 parse_nanosecond(datareaderqos.deadline.period.nanosec, value);
01164 } else if (name == "latency_budget.duration.sec") {
01165 parse_second(datareaderqos.latency_budget.duration.sec, value);
01166 } else if (name == "latency_budget.duration.nanosec") {
01167 parse_nanosecond(datareaderqos.latency_budget.duration.nanosec, value);
01168 } else if (name == "liveliness.kind") {
01169 if (value == "AUTOMATIC") {
01170 datareaderqos.liveliness.kind = DDS::AUTOMATIC_LIVELINESS_QOS;
01171 } else if (value == "MANUAL_BY_TOPIC") {
01172 datareaderqos.liveliness.kind = DDS::MANUAL_BY_TOPIC_LIVELINESS_QOS;
01173 } else if (value == "MANUAL_BY_PARTICIPANT") {
01174 datareaderqos.liveliness.kind = DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS;
01175 } else {
01176 ACE_ERROR_RETURN((LM_ERROR,
01177 ACE_TEXT("(%P|%t) StaticDiscovery::parse_datareaderqos ")
01178 ACE_TEXT("Illegal value for liveliness.kind (%C) in [datareaderqos/%C] section.\n"),
01179 value.c_str(), datareaderqos_name.c_str()),
01180 -1);
01181 }
01182 } else if (name == "liveliness.lease_duration.sec") {
01183 parse_second(datareaderqos.liveliness.lease_duration.sec, value);
01184 } else if (name == "liveliness.lease_duration.nanosec") {
01185 parse_nanosecond(datareaderqos.liveliness.lease_duration.nanosec, value);
01186 } else if (name == "reliability.kind") {
01187 if (value == "BEST_EFFORT") {
01188 datareaderqos.reliability.kind = DDS::BEST_EFFORT_RELIABILITY_QOS;
01189 } else if (value == "RELIABLE") {
01190 datareaderqos.reliability.kind = DDS::RELIABLE_RELIABILITY_QOS;
01191 } else {
01192 ACE_ERROR_RETURN((LM_ERROR,
01193 ACE_TEXT("(%P|%t) StaticDiscovery::parse_datareaderqos ")
01194 ACE_TEXT("Illegal value for reliability.kind (%C) in [datareaderqos/%C] section.\n"),
01195 value.c_str(), datareaderqos_name.c_str()),
01196 -1);
01197 }
01198 } else if (name == "reliability.max_blocking_time.sec") {
01199 parse_second(datareaderqos.reliability.max_blocking_time.sec, value);
01200 } else if (name == "reliability.max_blocking_time.nanosec") {
01201 parse_nanosecond(datareaderqos.reliability.max_blocking_time.nanosec, value);
01202 } else if (name == "destination_order.kind") {
01203 if (value == "BY_RECEPTION_TIMESTAMP") {
01204 datareaderqos.destination_order.kind = DDS::BY_RECEPTION_TIMESTAMP_DESTINATIONORDER_QOS;
01205 } else if (value == "BY_SOURCE_TIMESTAMP") {
01206 datareaderqos.destination_order.kind = DDS::BY_SOURCE_TIMESTAMP_DESTINATIONORDER_QOS;
01207 } else {
01208 ACE_ERROR_RETURN((LM_ERROR,
01209 ACE_TEXT("(%P|%t) StaticDiscovery::parse_datareaderqos ")
01210 ACE_TEXT("Illegal value for destination_order.kind (%C) in [datareaderqos/%C] section.\n"),
01211 value.c_str(), datareaderqos_name.c_str()),
01212 -1);
01213 }
01214 } else if (name == "history.kind") {
01215 if (value == "KEEP_ALL") {
01216 datareaderqos.history.kind = DDS::KEEP_ALL_HISTORY_QOS;
01217 } else if (value == "KEEP_LAST") {
01218 datareaderqos.history.kind = DDS::KEEP_LAST_HISTORY_QOS;
01219 } else {
01220 ACE_ERROR_RETURN((LM_ERROR,
01221 ACE_TEXT("(%P|%t) StaticDiscovery::parse_datareaderqos ")
01222 ACE_TEXT("Illegal value for history.kind (%C) in [datareaderqos/%C] section.\n"),
01223 value.c_str(), datareaderqos_name.c_str()),
01224 -1);
01225 }
01226 } else if (name == "history.depth") {
01227 datareaderqos.history.depth = atoi(value.c_str());
01228 } else if (name == "resource_limits.max_samples") {
01229 datareaderqos.resource_limits.max_samples = atoi(value.c_str());
01230 } else if (name == "resource_limits.max_instances") {
01231 datareaderqos.resource_limits.max_instances = atoi(value.c_str());
01232 } else if (name == "resource_limits.max_samples_per_instance") {
01233 datareaderqos.resource_limits.max_samples_per_instance = atoi(value.c_str());
01234 } else if (name == "time_based_filter.minimum_separation.sec") {
01235 parse_second(datareaderqos.time_based_filter.minimum_separation.sec, value);
01236 } else if (name == "time_based_filter.minimum_separation.nanosec") {
01237 parse_nanosecond(datareaderqos.time_based_filter.minimum_separation.nanosec, value);
01238 } else if (name == "reader_data_lifecycle.autopurge_nowriter_samples_delay.sec") {
01239 parse_second(datareaderqos.reader_data_lifecycle.autopurge_nowriter_samples_delay.sec, value);
01240 } else if (name == "reader_data_lifecycle.autopurge_nowriter_samples_delay.nanosec") {
01241 parse_nanosecond(datareaderqos.reader_data_lifecycle.autopurge_nowriter_samples_delay.nanosec, value);
01242 } else if (name == "reader_data_lifecycle.autopurge_disposed_samples_delay.sec") {
01243 parse_second(datareaderqos.reader_data_lifecycle.autopurge_disposed_samples_delay.sec, value);
01244 } else if (name == "reader_data_lifecycle.autopurge_disposed_samples_delay.nanosec") {
01245 parse_nanosecond(datareaderqos.reader_data_lifecycle.autopurge_disposed_samples_delay.nanosec, value);
01246 } else {
01247 ACE_ERROR_RETURN((LM_ERROR,
01248 ACE_TEXT("(%P|%t) StaticDiscovery::parse_datareaderqos ")
01249 ACE_TEXT("Unexpected entry (%C) in [datareaderqos/%C] section.\n"),
01250 name.c_str(), datareaderqos_name.c_str()),
01251 -1);
01252 }
01253 }
01254
01255 registry.datareaderqos_map[datareaderqos_name] = datareaderqos;
01256 }
01257
01258 return 0;
01259 }
01260
01261 int
01262 StaticDiscovery::parse_publisherqos(ACE_Configuration_Heap& cf)
01263 {
01264 const ACE_Configuration_Section_Key& root = cf.root_section();
01265 ACE_Configuration_Section_Key section;
01266
01267 if (cf.open_section(root, PUBLISHERQOS_SECTION_NAME, 0, section) != 0) {
01268 if (DCPS_debug_level > 0) {
01269
01270
01271 ACE_DEBUG((LM_NOTICE,
01272 ACE_TEXT("(%P|%t) NOTICE: StaticDiscovery::parse_publisherqos ")
01273 ACE_TEXT("no [%s] sections.\n"),
01274 PUBLISHERQOS_SECTION_NAME));
01275 }
01276 return 0;
01277 }
01278
01279
01280
01281 ValueMap vm;
01282 if (pullValues(cf, section, vm) > 0) {
01283 ACE_ERROR_RETURN((LM_ERROR,
01284 ACE_TEXT("(%P|%t) StaticDiscovery::parse_publisherqos ")
01285 ACE_TEXT("[publisherqos] sections must have a subsection name\n")),
01286 -1);
01287 }
01288
01289 KeyList keys;
01290 if (processSections(cf, section, keys) != 0) {
01291 ACE_ERROR_RETURN((LM_ERROR,
01292 ACE_TEXT("(%P|%t) StaticDiscovery::parse_publisherqos ")
01293 ACE_TEXT("too many nesting layers in the [publisherqos] section.\n")),
01294 -1);
01295 }
01296
01297
01298 for (KeyList::const_iterator it = keys.begin(); it != keys.end(); ++it) {
01299 OPENDDS_STRING publisherqos_name = it->first;
01300
01301 if (DCPS_debug_level > 0) {
01302 ACE_DEBUG((LM_NOTICE,
01303 ACE_TEXT("(%P|%t) NOTICE: StaticDiscovery::parse_publisherqos ")
01304 ACE_TEXT("processing [publisherqos/%C] section.\n"),
01305 publisherqos_name.c_str()));
01306 }
01307
01308 ValueMap values;
01309 pullValues(cf, it->second, values);
01310
01311 DDS::PublisherQos publisherqos(TheServiceParticipant->initial_PublisherQos());
01312
01313 for (ValueMap::const_iterator it = values.begin(); it != values.end(); ++it) {
01314 OPENDDS_STRING name = it->first;
01315 OPENDDS_STRING value = it->second;
01316
01317 if (name == "presentation.access_scope") {
01318 if (value == "INSTANCE") {
01319 publisherqos.presentation.access_scope = DDS::INSTANCE_PRESENTATION_QOS;
01320 } else if (value == "TOPIC") {
01321 publisherqos.presentation.access_scope = DDS::TOPIC_PRESENTATION_QOS;
01322 } else if (value == "GROUP") {
01323 publisherqos.presentation.access_scope = DDS::GROUP_PRESENTATION_QOS;
01324 } else {
01325 ACE_ERROR_RETURN((LM_ERROR,
01326 ACE_TEXT("(%P|%t) StaticDiscovery::parse_publisherqos ")
01327 ACE_TEXT("Illegal value for presentation.access_scope (%C) in [publisherqos/%C] section.\n"),
01328 value.c_str(), publisherqos_name.c_str()),
01329 -1);
01330 }
01331 } else if (name == "presentation.coherent_access") {
01332 if (parse_bool(publisherqos.presentation.coherent_access, value)) {
01333 } else {
01334 ACE_ERROR_RETURN((LM_ERROR,
01335 ACE_TEXT("(%P|%t) StaticDiscovery::parse_publisherqos ")
01336 ACE_TEXT("Illegal value for presentation.coherent_access (%C) in [publisherqos/%C] section.\n"),
01337 value.c_str(), publisherqos_name.c_str()),
01338 -1);
01339 }
01340 } else if (name == "presentation.ordered_access") {
01341 if (parse_bool(publisherqos.presentation.ordered_access, value)) {
01342 } else {
01343 ACE_ERROR_RETURN((LM_ERROR,
01344 ACE_TEXT("(%P|%t) StaticDiscovery::parse_publisherqos ")
01345 ACE_TEXT("Illegal value for presentation.ordered_access (%C)")
01346 ACE_TEXT("in [publisherqos/%C] section.\n"),
01347 value.c_str(), publisherqos_name.c_str()),
01348 -1);
01349 }
01350 } else if (name == "partition.name") {
01351 try {
01352 parse_list(publisherqos.partition, value);
01353 }
01354 catch (const CORBA::Exception& ex) {
01355 ACE_ERROR_RETURN((LM_ERROR,
01356 ACE_TEXT("(%P|%t) StaticDiscovery::parse_publisherqos ")
01357 ACE_TEXT("Exception caught while parsing partition.name (%C) ")
01358 ACE_TEXT("in [publisherqos/%C] section: %C.\n"),
01359 value.c_str(), publisherqos_name.c_str(), ex._info().c_str()),
01360 -1);
01361 }
01362 } else {
01363 ACE_ERROR_RETURN((LM_ERROR,
01364 ACE_TEXT("(%P|%t) StaticDiscovery::parse_publisherqos ")
01365 ACE_TEXT("Unexpected entry (%C) in [publisherqos/%C] section.\n"),
01366 name.c_str(), publisherqos_name.c_str()),
01367 -1);
01368 }
01369 }
01370
01371 registry.publisherqos_map[publisherqos_name] = publisherqos;
01372 }
01373
01374 return 0;
01375 }
01376
01377 int
01378 StaticDiscovery::parse_subscriberqos(ACE_Configuration_Heap& cf)
01379 {
01380 const ACE_Configuration_Section_Key& root = cf.root_section();
01381 ACE_Configuration_Section_Key section;
01382
01383 if (cf.open_section(root, SUBSCRIBERQOS_SECTION_NAME, 0, section) != 0) {
01384 if (DCPS_debug_level > 0) {
01385
01386
01387 ACE_DEBUG((LM_NOTICE,
01388 ACE_TEXT("(%P|%t) NOTICE: StaticDiscovery::parse_subscriberqos ")
01389 ACE_TEXT("no [%s] sections.\n"),
01390 SUBSCRIBERQOS_SECTION_NAME));
01391 }
01392 return 0;
01393 }
01394
01395
01396
01397 ValueMap vm;
01398 if (pullValues(cf, section, vm) > 0) {
01399 ACE_ERROR_RETURN((LM_ERROR,
01400 ACE_TEXT("(%P|%t) StaticDiscovery::parse_subscriberqos ")
01401 ACE_TEXT("[subscriberqos] sections must have a subsection name\n")),
01402 -1);
01403 }
01404
01405 KeyList keys;
01406 if (processSections(cf, section, keys) != 0) {
01407 ACE_ERROR_RETURN((LM_ERROR,
01408 ACE_TEXT("(%P|%t) StaticDiscovery::parse_subscriberqos ")
01409 ACE_TEXT("too many nesting layers in the [subscriberqos] section.\n")),
01410 -1);
01411 }
01412
01413
01414 for (KeyList::const_iterator it = keys.begin(); it != keys.end(); ++it) {
01415 OPENDDS_STRING subscriberqos_name = it->first;
01416
01417 if (DCPS_debug_level > 0) {
01418 ACE_DEBUG((LM_NOTICE,
01419 ACE_TEXT("(%P|%t) NOTICE: StaticDiscovery::parse_subscriberqos ")
01420 ACE_TEXT("processing [subscriberqos/%C] section.\n"),
01421 subscriberqos_name.c_str()));
01422 }
01423
01424 ValueMap values;
01425 pullValues(cf, it->second, values);
01426
01427 DDS::SubscriberQos subscriberqos(TheServiceParticipant->initial_SubscriberQos());
01428
01429 for (ValueMap::const_iterator it = values.begin(); it != values.end(); ++it) {
01430 OPENDDS_STRING name = it->first;
01431 OPENDDS_STRING value = it->second;
01432
01433 if (name == "presentation.access_scope") {
01434 if (value == "INSTANCE") {
01435 subscriberqos.presentation.access_scope = DDS::INSTANCE_PRESENTATION_QOS;
01436 } else if (value == "TOPIC") {
01437 subscriberqos.presentation.access_scope = DDS::TOPIC_PRESENTATION_QOS;
01438 } else if (value == "GROUP") {
01439 subscriberqos.presentation.access_scope = DDS::GROUP_PRESENTATION_QOS;
01440 } else {
01441 ACE_ERROR_RETURN((LM_ERROR,
01442 ACE_TEXT("(%P|%t) StaticDiscovery::parse_subscriberqos ")
01443 ACE_TEXT("Illegal value for presentation.access_scope (%C) in [subscriberqos/%C] section.\n"),
01444 value.c_str(), subscriberqos_name.c_str()),
01445 -1);
01446 }
01447 } else if (name == "presentation.coherent_access") {
01448 if (parse_bool(subscriberqos.presentation.coherent_access, value)) {
01449 } else {
01450 ACE_ERROR_RETURN((LM_ERROR,
01451 ACE_TEXT("(%P|%t) StaticDiscovery::parse_subscriberqos ")
01452 ACE_TEXT("Illegal value for presentation.coherent_access (%C) in [subscriberqos/%C] section.\n"),
01453 value.c_str(), subscriberqos_name.c_str()),
01454 -1);
01455 }
01456 } else if (name == "presentation.ordered_access") {
01457 if (parse_bool(subscriberqos.presentation.ordered_access, value)) {
01458 } else {
01459 ACE_ERROR_RETURN((LM_ERROR,
01460 ACE_TEXT("(%P|%t) StaticDiscovery::parse_subscriberqos ")
01461 ACE_TEXT("Illegal value for presentation.ordered_access (%C) in [subscriberqos/%C] section.\n"),
01462 value.c_str(), subscriberqos_name.c_str()),
01463 -1);
01464 }
01465 } else if (name == "partition.name") {
01466 try {
01467 parse_list(subscriberqos.partition, value);
01468 }
01469 catch (const CORBA::Exception& ex) {
01470 ACE_ERROR_RETURN((LM_ERROR,
01471 ACE_TEXT("(%P|%t) StaticDiscovery::parse_subscriberqos ")
01472 ACE_TEXT("Exception caught while parsing partition.name (%C) ")
01473 ACE_TEXT("in [subscriberqos/%C] section: %C.\n"),
01474 value.c_str(), subscriberqos_name.c_str(), ex._info().c_str()),
01475 -1);
01476 }
01477 } else {
01478 ACE_ERROR_RETURN((LM_ERROR,
01479 ACE_TEXT("(%P|%t) StaticDiscovery::parse_subscriberqos ")
01480 ACE_TEXT("Unexpected entry (%C) in [subscriberqos/%C] section.\n"),
01481 name.c_str(), subscriberqos_name.c_str()),
01482 -1);
01483 }
01484 }
01485
01486 registry.subscriberqos_map[subscriberqos_name] = subscriberqos;
01487 }
01488
01489 return 0;
01490 }
01491
01492 int
01493 StaticDiscovery::parse_endpoints(ACE_Configuration_Heap& cf)
01494 {
01495 const ACE_Configuration_Section_Key& root = cf.root_section();
01496 ACE_Configuration_Section_Key section;
01497
01498 if (cf.open_section(root, ENDPOINT_SECTION_NAME, 0, section) != 0) {
01499 if (DCPS_debug_level > 0) {
01500
01501
01502 ACE_DEBUG((LM_NOTICE,
01503 ACE_TEXT("(%P|%t) NOTICE: StaticDiscovery::parse_endpoints ")
01504 ACE_TEXT("no [%s] sections.\n"),
01505 ENDPOINT_SECTION_NAME));
01506 }
01507 return 0;
01508 }
01509
01510
01511
01512 ValueMap vm;
01513 if (pullValues(cf, section, vm) > 0) {
01514 ACE_ERROR_RETURN((LM_ERROR,
01515 ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
01516 ACE_TEXT("[endpoint] sections must have a subsection name\n")),
01517 -1);
01518 }
01519
01520 KeyList keys;
01521 if (processSections(cf, section, keys) != 0) {
01522 ACE_ERROR_RETURN((LM_ERROR,
01523 ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
01524 ACE_TEXT("too many nesting layers in the [endpoint] section.\n")),
01525 -1);
01526 }
01527
01528
01529 for (KeyList::const_iterator it = keys.begin(); it != keys.end(); ++it) {
01530 OPENDDS_STRING endpoint_name = it->first;
01531
01532 if (DCPS_debug_level > 0) {
01533 ACE_DEBUG((LM_NOTICE,
01534 ACE_TEXT("(%P|%t) NOTICE: StaticDiscovery::parse_endpoints ")
01535 ACE_TEXT("processing [endpoint/%C] section.\n"),
01536 endpoint_name.c_str()));
01537 }
01538
01539 ValueMap values;
01540 pullValues(cf, it->second, values);
01541 int domain = 0;
01542 unsigned char participant[6];
01543 unsigned char entity[3];
01544 enum Type {
01545 Reader,
01546 Writer
01547 };
01548 Type type = Reader;
01549 OPENDDS_STRING topic_name;
01550 DDS::DataWriterQos datawriterqos(TheServiceParticipant->initial_DataWriterQos());
01551 DDS::DataReaderQos datareaderqos(TheServiceParticipant->initial_DataReaderQos());
01552 DDS::PublisherQos publisherqos(TheServiceParticipant->initial_PublisherQos());
01553 DDS::SubscriberQos subscriberqos(TheServiceParticipant->initial_SubscriberQos());
01554 TransportLocatorSeq trans_info;
01555 OPENDDS_STRING config_name;
01556
01557 bool domain_specified = false,
01558 participant_specified = false,
01559 entity_specified = false,
01560 type_specified = false,
01561 topic_name_specified = false,
01562 config_name_specified = false;
01563
01564 for (ValueMap::const_iterator it = values.begin(); it != values.end(); ++it) {
01565 OPENDDS_STRING name = it->first;
01566 OPENDDS_STRING value = it->second;
01567
01568 if (name == "domain") {
01569 if (convertToInteger(value, domain)) {
01570 domain_specified = true;
01571 } else {
01572 ACE_ERROR_RETURN((LM_ERROR,
01573 ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
01574 ACE_TEXT("Illegal integer value for domain (%C) in [endpoint/%C] section.\n"),
01575 value.c_str(), endpoint_name.c_str()),
01576 -1);
01577 }
01578 } else if (name == "participant") {
01579 #ifdef __SUNPRO_CC
01580 int count = 0; std::count_if(value.begin(), value.end(), isxdigit, count);
01581 #else
01582 int count = std::count_if(value.begin(), value.end(), isxdigit);
01583 #endif
01584 if (value.size() != HEX_DIGITS_IN_PARTICIPANT || static_cast<size_t>(count) != HEX_DIGITS_IN_PARTICIPANT) {
01585 ACE_ERROR_RETURN((LM_ERROR,
01586 ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
01587 ACE_TEXT("participant (%C) must be 12 hexadecimal digits in [endpoint/%C] section.\n"),
01588 value.c_str(), endpoint_name.c_str()),
01589 -1);
01590 }
01591
01592 for (size_t idx = 0; idx != BYTES_IN_PARTICIPANT; ++idx) {
01593 participant[idx] = fromhex(value, idx);
01594 }
01595 participant_specified = true;
01596 } else if (name == "entity") {
01597 #ifdef __SUNPRO_CC
01598 int count = 0; std::count_if(value.begin(), value.end(), isxdigit, count);
01599 #else
01600 int count = std::count_if(value.begin(), value.end(), isxdigit);
01601 #endif
01602 if (value.size() != HEX_DIGITS_IN_ENTITY || static_cast<size_t>(count) != HEX_DIGITS_IN_ENTITY) {
01603 ACE_ERROR_RETURN((LM_ERROR,
01604 ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
01605 ACE_TEXT("entity (%C) must be 6 hexadecimal digits in [endpoint/%C] section.\n"),
01606 value.c_str(), endpoint_name.c_str()),
01607 -1);
01608 }
01609
01610 for (size_t idx = 0; idx != BYTES_IN_ENTITY; ++idx) {
01611 entity[idx] = fromhex(value, idx);
01612 }
01613 entity_specified = true;
01614 } else if (name == "type") {
01615 if (value == "reader") {
01616 type = Reader;
01617 type_specified = true;
01618 } else if (value == "writer") {
01619 type = Writer;
01620 type_specified = true;
01621 } else {
01622 ACE_ERROR_RETURN((LM_ERROR,
01623 ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
01624 ACE_TEXT("Illegal string value for type (%C) in [endpoint/%C] section.\n"),
01625 value.c_str(), endpoint_name.c_str()),
01626 -1);
01627 }
01628 } else if (name == "topic") {
01629 EndpointRegistry::TopicMapType::const_iterator pos = this->registry.topic_map.find(value);
01630 if (pos != this->registry.topic_map.end()) {
01631 topic_name = pos->second.name;
01632 topic_name_specified = true;
01633 } else {
01634 ACE_ERROR_RETURN((LM_ERROR,
01635 ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
01636 ACE_TEXT("Illegal topic reference (%C) in [endpoint/%C] section.\n"),
01637 value.c_str(), endpoint_name.c_str()),
01638 -1);
01639 }
01640 } else if (name == "datawriterqos") {
01641 EndpointRegistry::DataWriterQosMapType::const_iterator pos = this->registry.datawriterqos_map.find(value);
01642 if (pos != this->registry.datawriterqos_map.end()) {
01643 datawriterqos = pos->second;
01644 } else {
01645 ACE_ERROR_RETURN((LM_ERROR,
01646 ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
01647 ACE_TEXT("Illegal datawriterqos reference (%C) in [endpoint/%C] section.\n"),
01648 value.c_str(), endpoint_name.c_str()),
01649 -1);
01650 }
01651 } else if (name == "publisherqos") {
01652 EndpointRegistry::PublisherQosMapType::const_iterator pos = this->registry.publisherqos_map.find(value);
01653 if (pos != this->registry.publisherqos_map.end()) {
01654 publisherqos = pos->second;
01655 } else {
01656 ACE_ERROR_RETURN((LM_ERROR,
01657 ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
01658 ACE_TEXT("Illegal publisherqos reference (%C) in [endpoint/%C] section.\n"),
01659 value.c_str(), endpoint_name.c_str()),
01660 -1);
01661 }
01662 } else if (name == "datareaderqos") {
01663 EndpointRegistry::DataReaderQosMapType::const_iterator pos = this->registry.datareaderqos_map.find(value);
01664 if (pos != this->registry.datareaderqos_map.end()) {
01665 datareaderqos = pos->second;
01666 } else {
01667 ACE_ERROR_RETURN((LM_ERROR,
01668 ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
01669 ACE_TEXT("Illegal datareaderqos reference (%C) in [endpoint/%C] section.\n"),
01670 value.c_str(), endpoint_name.c_str()),
01671 -1);
01672 }
01673 } else if (name == "subscriberqos") {
01674 EndpointRegistry::SubscriberQosMapType::const_iterator pos = this->registry.subscriberqos_map.find(value);
01675 if (pos != this->registry.subscriberqos_map.end()) {
01676 subscriberqos = pos->second;
01677 } else {
01678 ACE_ERROR_RETURN((LM_ERROR,
01679 ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
01680 ACE_TEXT("Illegal subscriberqos reference (%C) in [endpoint/%C] section.\n"),
01681 value.c_str(), endpoint_name.c_str()),
01682 -1);
01683 }
01684 } else if (name == "config") {
01685 config_name = value;
01686 config_name_specified = true;
01687 } else {
01688 ACE_ERROR_RETURN((LM_ERROR,
01689 ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
01690 ACE_TEXT("Unexpected entry (%C) in [endpoint/%C] section.\n"),
01691 name.c_str(), endpoint_name.c_str()),
01692 -1);
01693 }
01694 }
01695
01696 if (!domain_specified) {
01697 ACE_ERROR_RETURN((LM_ERROR,
01698 ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
01699 ACE_TEXT("No domain specified for [endpoint/%C] section.\n"),
01700 endpoint_name.c_str()),
01701 -1);
01702 }
01703
01704 if (!participant_specified) {
01705 ACE_ERROR_RETURN((LM_ERROR,
01706 ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
01707 ACE_TEXT("No participant specified for [endpoint/%C] section.\n"),
01708 endpoint_name.c_str()),
01709 -1);
01710 }
01711
01712 if (!entity_specified) {
01713 ACE_ERROR_RETURN((LM_ERROR,
01714 ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
01715 ACE_TEXT("No entity specified for [endpoint/%C] section.\n"),
01716 endpoint_name.c_str()),
01717 -1);
01718 }
01719
01720 if (!type_specified) {
01721 ACE_ERROR_RETURN((LM_ERROR,
01722 ACE_TEXT("(%P|%t) ERROR:StaticDiscovery::parse_endpoints ")
01723 ACE_TEXT("No type specified for [endpoint/%C] section.\n"),
01724 endpoint_name.c_str()),
01725 -1);
01726 }
01727
01728 if (!topic_name_specified) {
01729 ACE_ERROR_RETURN((LM_ERROR,
01730 ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
01731 ACE_TEXT("No topic specified for [endpoint/%C] section.\n"),
01732 endpoint_name.c_str()),
01733 -1);
01734 }
01735
01736 TransportConfig_rch config;
01737
01738 if (config_name_specified) {
01739 config = TheTransportRegistry->get_config(config_name);
01740 if (config.is_nil()) {
01741 ACE_ERROR_RETURN((LM_ERROR,
01742 ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
01743 ACE_TEXT("Illegal config reference (%C) in [endpoint/%C] section.\n"),
01744 config_name.c_str(), endpoint_name.c_str()),
01745 -1);
01746 }
01747 }
01748
01749 if (config.is_nil() && domain_specified) {
01750 config = TheTransportRegistry->domain_default_config(domain);
01751 }
01752
01753 if (config.is_nil()) {
01754 config = TheTransportRegistry->global_config();
01755 }
01756
01757 try {
01758 config->populate_locators(trans_info);
01759 }
01760 catch (const CORBA::Exception& ex) {
01761 ACE_ERROR_RETURN((LM_ERROR,
01762 ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
01763 ACE_TEXT("Exception caught while populating locators for [endpoint/%C] section. %C\n"),
01764 endpoint_name.c_str(), ex._info().c_str()),
01765 -1);
01766 }
01767 if (trans_info.length() == 0) {
01768 ACE_ERROR_RETURN((LM_ERROR,
01769 ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
01770 ACE_TEXT("No locators for [endpoint/%C] section.\n"),
01771 endpoint_name.c_str()),
01772 -1);
01773 }
01774
01775 EntityId_t entity_id = EndpointRegistry::build_id(entity,
01776 (type == Reader) ? ENTITYKIND_USER_READER_WITH_KEY : ENTITYKIND_USER_WRITER_WITH_KEY);
01777
01778 RepoId id = EndpointRegistry::build_id(domain, participant, entity_id);
01779
01780 if (DCPS_debug_level > 0) {
01781 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DEBUG: StaticDiscovery::parse_endpoints adding entity with id %C\n"), LogGuid(id).c_str()));
01782 }
01783
01784 switch (type) {
01785 case Reader:
01786
01787 datareaderqos.user_data.value.length(3);
01788 datareaderqos.user_data.value[0] = entity_id.entityKey[0];
01789 datareaderqos.user_data.value[1] = entity_id.entityKey[1];
01790 datareaderqos.user_data.value[2] = entity_id.entityKey[2];
01791
01792 if (!registry.reader_map.insert(std::make_pair(id,
01793 EndpointRegistry::Reader(topic_name, datareaderqos, subscriberqos, config_name, trans_info))).second) {
01794 ACE_ERROR_RETURN((LM_ERROR,
01795 ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
01796 ACE_TEXT("Section [endpoint/%C] ignored - duplicate reader.\n"),
01797 endpoint_name.c_str()),
01798 -1);
01799 }
01800 break;
01801 case Writer:
01802
01803 datawriterqos.user_data.value.length(3);
01804 datawriterqos.user_data.value[0] = entity_id.entityKey[0];
01805 datawriterqos.user_data.value[1] = entity_id.entityKey[1];
01806 datawriterqos.user_data.value[2] = entity_id.entityKey[2];
01807
01808 if (!registry.writer_map.insert(std::make_pair(id,
01809 EndpointRegistry::Writer(topic_name, datawriterqos, publisherqos, config_name, trans_info))).second) {
01810 ACE_ERROR_RETURN((LM_ERROR,
01811 ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
01812 ACE_TEXT("Section [endpoint/%C] ignored - duplicate writer.\n"),
01813 endpoint_name.c_str()),
01814 -1);
01815 }
01816 break;
01817 }
01818 }
01819
01820 return 0;
01821 }
01822
01823 void StaticDiscovery::pre_writer(DataWriterImpl* writer)
01824 {
01825 const DDS::Publisher_var pub = writer->get_publisher();
01826 const DDS::DomainParticipant_var part = pub->get_participant();
01827 const DDS::DomainId_t dom = part->get_domain_id();
01828
01829 DDS::DomainParticipantQos partQos;
01830 part->get_qos(partQos);
01831 if (partQos.user_data.value.length() < 6)
01832 return;
01833 const unsigned char* const partId = partQos.user_data.value.get_buffer();
01834
01835 DDS::DataWriterQos qos;
01836 writer->get_qos(qos);
01837 if (qos.user_data.value.length() < 3)
01838 return;
01839 const unsigned char* const dwId = qos.user_data.value.get_buffer();
01840
01841 const EntityId_t entId =
01842 EndpointRegistry::build_id(dwId, ENTITYKIND_USER_WRITER_WITH_KEY);
01843 const RepoId rid = EndpointRegistry::build_id(dom, partId, entId);
01844
01845 const EndpointRegistry::WriterMapType::const_iterator iter =
01846 registry.writer_map.find(rid);
01847
01848 if (iter != registry.writer_map.end() && !iter->second.trans_cfg.empty()) {
01849 TransportRegistry::instance()->bind_config(iter->second.trans_cfg, writer);
01850 }
01851 }
01852
01853 void StaticDiscovery::pre_reader(DataReaderImpl* reader)
01854 {
01855 const DDS::Subscriber_var sub = reader->get_subscriber();
01856 const DDS::DomainParticipant_var part = sub->get_participant();
01857 const DDS::DomainId_t dom = part->get_domain_id();
01858
01859 DDS::DomainParticipantQos partQos;
01860 part->get_qos(partQos);
01861 if (partQos.user_data.value.length() < 6)
01862 return;
01863 const unsigned char* const partId = partQos.user_data.value.get_buffer();
01864
01865 DDS::DataReaderQos qos;
01866 reader->get_qos(qos);
01867 if (qos.user_data.value.length() < 3)
01868 return;
01869 const unsigned char* const drId = qos.user_data.value.get_buffer();
01870
01871 const EntityId_t entId =
01872 EndpointRegistry::build_id(drId, ENTITYKIND_USER_READER_WITH_KEY);
01873 const RepoId rid = EndpointRegistry::build_id(dom, partId, entId);
01874
01875 const EndpointRegistry::ReaderMapType::const_iterator iter =
01876 registry.reader_map.find(rid);
01877
01878 if (iter != registry.reader_map.end() && !iter->second.trans_cfg.empty()) {
01879 TransportRegistry::instance()->bind_config(iter->second.trans_cfg, reader);
01880 }
01881 }
01882
01883 StaticDiscovery_rch StaticDiscovery::instance_(make_rch<StaticDiscovery>(Discovery::DEFAULT_STATIC));
01884
01885 }
01886 }
01887
01888 OPENDDS_END_VERSIONED_NAMESPACE_DECL