00001 #include "DCPS/DdsDcps_pch.h"
00002
00003 #include "StaticDiscovery.h"
00004 #include "dds/DCPS/debug.h"
00005 #include "dds/DCPS/ConfigUtils.h"
00006 #include "dds/DCPS/DomainParticipantImpl.h"
00007 #include "dds/DCPS/Marked_Default_Qos.h"
00008 #include "dds/DCPS/SubscriberImpl.h"
00009 #include "dds/DCPS/BuiltInTopicUtils.h"
00010 #include "dds/DCPS/Registered_Data_Types.h"
00011 #include "dds/DCPS/Qos_Helper.h"
00012 #include "dds/DCPS/transport/framework/TransportRegistry.h"
00013
00014 #include <ctype.h>
00015
00016 namespace OpenDDS {
00017 namespace DCPS {
00018
00019 void EndpointRegistry::match()
00020 {
00021 for (WriterMapType::iterator wp = writer_map.begin(), wp_limit = writer_map.end();
00022 wp != wp_limit;
00023 ++wp) {
00024 const RepoId& writerid = wp->first;
00025 Writer& writer = wp->second;
00026 for (ReaderMapType::iterator rp = reader_map.begin(), rp_limit = reader_map.end();
00027 rp != rp_limit;
00028 ++rp) {
00029 const RepoId& readerid = rp->first;
00030 Reader& reader = rp->second;
00031
00032 if (StaticDiscGuidDomainEqual()(readerid.guidPrefix, writerid.guidPrefix) &&
00033 !StaticDiscGuidPartEqual()(readerid.guidPrefix, writerid.guidPrefix) &&
00034 reader.topic_name == writer.topic_name) {
00035
00036 IncompatibleQosStatus writerStatus = {0, 0, 0, DDS::QosPolicyCountSeq()};
00037 IncompatibleQosStatus readerStatus = {0, 0, 0, DDS::QosPolicyCountSeq()};
00038 const TransportLocatorSeq& writer_trans_info = writer.trans_info;
00039 const TransportLocatorSeq& reader_trans_info = reader.trans_info;
00040 const DDS::DataWriterQos& writer_qos = writer.qos;
00041 const DDS::DataReaderQos& reader_qos = reader.qos;
00042 const DDS::PublisherQos& publisher_qos = writer.publisher_qos;
00043 const DDS::SubscriberQos& subscriber_qos = reader.subscriber_qos;
00044
00045 if (compatibleQOS(&writerStatus, &readerStatus, writer_trans_info, reader_trans_info,
00046 &writer_qos, &reader_qos, &publisher_qos, &subscriber_qos)) {
00047 switch (reader.qos.reliability.kind) {
00048 case DDS::BEST_EFFORT_RELIABILITY_QOS:
00049 writer.best_effort_readers.insert(readerid);
00050 reader.best_effort_writers.insert(writerid);
00051 break;
00052 case DDS::RELIABLE_RELIABILITY_QOS:
00053 writer.reliable_readers.insert(readerid);
00054 reader.reliable_writers.insert(writerid);
00055 break;
00056 }
00057 }
00058 }
00059 }
00060 }
00061 }
00062
00063 StaticEndpointManager::StaticEndpointManager(const RepoId& participant_id,
00064 ACE_Thread_Mutex& lock,
00065 const EndpointRegistry& registry,
00066 StaticParticipant& participant)
00067 : EndpointManager<StaticDiscoveredParticipantData>(participant_id, lock)
00068 , registry_(registry)
00069 , participant_(participant)
00070 {
00071 pub_bit_key_.value[0] = pub_bit_key_.value[1] = pub_bit_key_.value[2] = 0;
00072 sub_bit_key_.value[0] = sub_bit_key_.value[1] = sub_bit_key_.value[2] = 0;
00073 }
00074
00075 void StaticEndpointManager::init_bit()
00076 {
00077
00078
00079 for (EndpointRegistry::WriterMapType::const_iterator pos = registry_.writer_map.begin(),
00080 limit = registry_.writer_map.end();
00081 pos != limit;
00082 ++pos) {
00083 const RepoId& remoteid = pos->first;
00084 const EndpointRegistry::Writer& writer = pos->second;
00085
00086 if (!GuidPrefixEqual()(participant_id_.guidPrefix, remoteid.guidPrefix)) {
00087 increment_key(pub_bit_key_);
00088 pub_key_to_id_[pub_bit_key_] = remoteid;
00089
00090
00091
00092 DDS::PublicationBuiltinTopicData data;
00093
00094 data.key = pub_bit_key_;
00095 OPENDDS_STRING topic_name = writer.topic_name;
00096 data.topic_name = topic_name.c_str();
00097 const EndpointRegistry::Topic& topic = registry_.topic_map.find(topic_name)->second;
00098 data.type_name = topic.type_name.c_str();
00099 data.durability = writer.qos.durability;
00100 data.durability_service = writer.qos.durability_service;
00101 data.deadline = writer.qos.deadline;
00102 data.latency_budget = writer.qos.latency_budget;
00103 data.liveliness = writer.qos.liveliness;
00104 data.reliability = writer.qos.reliability;
00105 data.lifespan = writer.qos.lifespan;
00106 data.user_data = writer.qos.user_data;
00107 data.ownership = writer.qos.ownership;
00108 data.ownership_strength = writer.qos.ownership_strength;
00109 data.destination_order = writer.qos.destination_order;
00110 data.presentation = writer.publisher_qos.presentation;
00111 data.partition = writer.publisher_qos.partition;
00112
00113
00114 data.group_data = writer.publisher_qos.group_data;
00115 #ifndef DDS_HAS_MINIMUM_BIT
00116 DDS::PublicationBuiltinTopicDataDataReaderImpl* bit = pub_bit();
00117 if (bit) {
00118 bit->store_synthetic_data(data, DDS::NEW_VIEW_STATE);
00119 }
00120 #endif
00121 }
00122 }
00123
00124 for (EndpointRegistry::ReaderMapType::const_iterator pos = registry_.reader_map.begin(),
00125 limit = registry_.reader_map.end();
00126 pos != limit;
00127 ++pos) {
00128 const RepoId& remoteid = pos->first;
00129 const EndpointRegistry::Reader& reader = pos->second;
00130
00131 if (!GuidPrefixEqual()(participant_id_.guidPrefix, remoteid.guidPrefix)) {
00132 increment_key(sub_bit_key_);
00133 sub_key_to_id_[sub_bit_key_] = remoteid;
00134
00135
00136
00137 DDS::SubscriptionBuiltinTopicData data;
00138
00139 data.key = sub_bit_key_;
00140 OPENDDS_STRING topic_name = reader.topic_name;
00141 data.topic_name = topic_name.c_str();
00142 const EndpointRegistry::Topic& topic = registry_.topic_map.find(topic_name)->second;
00143 data.type_name = topic.type_name.c_str();
00144 data.durability = reader.qos.durability;
00145 data.deadline = reader.qos.deadline;
00146 data.latency_budget = reader.qos.latency_budget;
00147 data.liveliness = reader.qos.liveliness;
00148 data.reliability = reader.qos.reliability;
00149 data.ownership = reader.qos.ownership;
00150 data.destination_order = reader.qos.destination_order;
00151 data.user_data = reader.qos.user_data;
00152 data.time_based_filter = reader.qos.time_based_filter;
00153 data.presentation = reader.subscriber_qos.presentation;
00154 data.partition = reader.subscriber_qos.partition;
00155
00156
00157 data.group_data = reader.subscriber_qos.group_data;
00158
00159 #ifndef DDS_HAS_MINIMUM_BIT
00160 DDS::SubscriptionBuiltinTopicDataDataReaderImpl* bit = sub_bit();
00161 if (bit) {
00162 bit->store_synthetic_data(data, DDS::NEW_VIEW_STATE);
00163 }
00164 #endif
00165 }
00166 }
00167 }
00168
00169 void StaticEndpointManager::assign_publication_key(RepoId& rid,
00170 const RepoId& ,
00171 const DDS::DataWriterQos& qos)
00172 {
00173 if (qos.user_data.value.length() != 3) {
00174 ACE_DEBUG((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: StaticEndpointManager::assign_publication_key: no user data to identify writer\n")));
00175 return;
00176 }
00177
00178 rid.entityId.entityKey[0] = qos.user_data.value[0];
00179 rid.entityId.entityKey[1] = qos.user_data.value[1];
00180 rid.entityId.entityKey[2] = qos.user_data.value[2];
00181 rid.entityId.entityKind = ENTITYKIND_USER_WRITER_WITH_KEY;
00182
00183 if (DCPS_debug_level > 8) {
00184 ACE_DEBUG((LM_INFO, "(%P|%t) looking up writer ID %s\n",
00185 LogGuid(rid).c_str()));
00186 }
00187
00188 EndpointRegistry::WriterMapType::const_iterator pos = registry_.writer_map.find(rid);
00189 if (pos == registry_.writer_map.end()) {
00190 ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: StaticEndpointManager::assign_publication_key: unknown writer: %s\n"), LogGuid(rid).c_str()));
00191 return;
00192 }
00193
00194 DDS::DataWriterQos qos2(qos);
00195
00196 qos2.user_data = pos->second.qos.user_data;
00197
00198 DDS::DataWriterQos qos3(pos->second.qos);
00199
00200 if (qos2 != qos3) {
00201 ACE_DEBUG((LM_ERROR, ACE_TEXT("(%P|%t) WARNING: StaticEndpointManager::assign_publication_key: dynamic and static QoS differ\n")));
00202 }
00203 }
00204
00205 void StaticEndpointManager::assign_subscription_key(RepoId& rid,
00206 const RepoId& ,
00207 const DDS::DataReaderQos& qos)
00208 {
00209 if (qos.user_data.value.length() != 3) {
00210 ACE_DEBUG((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: StaticEndpointManager::assign_subscription_key: no user data to identify reader\n")));
00211 return;
00212 }
00213
00214 rid.entityId.entityKey[0] = qos.user_data.value[0];
00215 rid.entityId.entityKey[1] = qos.user_data.value[1];
00216 rid.entityId.entityKey[2] = qos.user_data.value[2];
00217 rid.entityId.entityKind = ENTITYKIND_USER_READER_WITH_KEY;
00218
00219 EndpointRegistry::ReaderMapType::const_iterator pos = registry_.reader_map.find(rid);
00220 if (pos == registry_.reader_map.end()) {
00221 ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: StaticEndpointManager::assign_subscription_key: unknown reader: %s\n"), LogGuid(rid).c_str()));
00222 return;
00223 }
00224
00225 DDS::DataReaderQos qos2(qos);
00226
00227 qos2.user_data = pos->second.qos.user_data;
00228
00229 if (qos2 != pos->second.qos) {
00230 ACE_DEBUG((LM_ERROR, ACE_TEXT("(%P|%t) WARNING: StaticEndpointManager::assign_subscription_key: dynamic and static QoS differ\n")));
00231 }
00232 }
00233
00234 bool
00235 StaticEndpointManager::update_topic_qos(const RepoId& ,
00236 const DDS::TopicQos& ,
00237 OPENDDS_STRING& )
00238 {
00239 ACE_DEBUG((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: StaticEndpointManager::update_topic_qos - ")
00240 ACE_TEXT("Not allowed\n")));
00241 return false;
00242 }
00243
00244 bool
00245 StaticEndpointManager::update_publication_qos(const RepoId& ,
00246 const DDS::DataWriterQos& ,
00247 const DDS::PublisherQos& )
00248 {
00249 ACE_DEBUG((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: StaticEndpointManager::update_publication_qos - ")
00250 ACE_TEXT("Not allowed\n")));
00251 return false;
00252 }
00253
00254 bool
00255 StaticEndpointManager::update_subscription_qos(const RepoId& ,
00256 const DDS::DataReaderQos& ,
00257 const DDS::SubscriberQos& )
00258 {
00259 ACE_DEBUG((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: StaticEndpointManager::update_subscription_qos - ")
00260 ACE_TEXT("Not allowed\n")));
00261 return false;
00262 }
00263
00264 bool
00265 StaticEndpointManager::update_subscription_params(const RepoId& ,
00266 const DDS::StringSeq& )
00267 {
00268 ACE_DEBUG((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: StaticEndpointManager::update_subscription_qos - ")
00269 ACE_TEXT("Not allowed\n")));
00270 return false;
00271 }
00272
00273 void
00274 StaticEndpointManager::association_complete(const RepoId& ,
00275 const RepoId& )
00276 {
00277
00278 }
00279
00280 bool
00281 StaticEndpointManager::disassociate(const StaticDiscoveredParticipantData& )
00282 {
00283 ACE_DEBUG((LM_NOTICE, ACE_TEXT("(%P|%t) StaticEndpointManager::disassociate TODO\n")));
00284
00285 return false;
00286 }
00287
00288 DDS::ReturnCode_t
00289 StaticEndpointManager::add_publication_i(const RepoId& writerid,
00290 LocalPublication& pub)
00291 {
00292
00293
00294
00295
00296
00297 EndpointRegistry::WriterMapType::const_iterator pos = registry_.writer_map.find(writerid);
00298 if (pos == registry_.writer_map.end()) {
00299 return DDS::RETCODE_ERROR;
00300 }
00301 const EndpointRegistry::Writer& writer = pos->second;
00302
00303 for (RepoIdSet::const_iterator pos = writer.best_effort_readers.begin(), limit = writer.best_effort_readers.end();
00304 pos != limit;
00305 ++pos) {
00306 const RepoId& readerid = *pos;
00307 const EndpointRegistry::Reader& reader = registry_.reader_map.find(readerid)->second;
00308
00309 #ifdef __SUNPRO_CC
00310 ReaderAssociation ra;
00311 ra.readerTransInfo = reader.trans_info;
00312 ra.readerId = readerid;
00313 ra.subQos = reader.subscriber_qos;
00314 ra.readerQos = reader.qos;
00315 ra.filterClassName = "";
00316 ra.filterExpression = "";
00317 ra.exprParams = 0;
00318 #else
00319 const ReaderAssociation ra =
00320 {reader.trans_info, readerid, reader.subscriber_qos, reader.qos, "", "", 0};
00321 #endif
00322 pub.publication_->add_association(writerid, ra, true);
00323 pub.publication_->association_complete(readerid);
00324 }
00325
00326 for (RepoIdSet::const_iterator pos = writer.reliable_readers.begin(), limit = writer.reliable_readers.end();
00327 pos != limit;
00328 ++pos) {
00329 const RepoId& readerid = *pos;
00330 const EndpointRegistry::Reader& reader = registry_.reader_map.find(readerid)->second;
00331 pub.publication_->register_for_reader(participant_id_, writerid, readerid, reader.trans_info, this);
00332 }
00333
00334 return DDS::RETCODE_OK;
00335 }
00336
00337 DDS::ReturnCode_t
00338 StaticEndpointManager::remove_publication_i(const RepoId& writerid)
00339 {
00340 LocalPublicationMap::const_iterator lp_pos = local_publications_.find(writerid);
00341 if (lp_pos == local_publications_.end()) {
00342 return DDS::RETCODE_ERROR;
00343 }
00344
00345 const LocalPublication& pub = lp_pos->second;
00346
00347 EndpointRegistry::WriterMapType::const_iterator pos = registry_.writer_map.find(writerid);
00348 if (pos == registry_.writer_map.end()) {
00349 return DDS::RETCODE_ERROR;
00350 }
00351
00352 const EndpointRegistry::Writer& writer = pos->second;
00353
00354 ReaderIdSeq ids;
00355 ids.length((CORBA::ULong)writer.reliable_readers.size());
00356 CORBA::ULong idx = 0;
00357 for (RepoIdSet::const_iterator pos = writer.reliable_readers.begin(), limit = writer.reliable_readers.end();
00358 pos != limit;
00359 ++pos, ++idx) {
00360 const RepoId& readerid = *pos;
00361 ids[idx] = readerid;
00362 pub.publication_->unregister_for_reader(participant_id_, writerid, readerid);
00363 }
00364
00365 return DDS::RETCODE_OK;
00366 }
00367
00368 DDS::ReturnCode_t
00369 StaticEndpointManager::add_subscription_i(const RepoId& readerid,
00370 LocalSubscription& sub)
00371 {
00372
00373
00374
00375
00376
00377 EndpointRegistry::ReaderMapType::const_iterator pos = registry_.reader_map.find(readerid);
00378 if (pos == registry_.reader_map.end()) {
00379 return DDS::RETCODE_ERROR;
00380 }
00381 const EndpointRegistry::Reader& reader = pos->second;
00382
00383 for (RepoIdSet::const_iterator pos = reader.best_effort_writers.begin(), limit = reader.best_effort_writers.end();
00384 pos != limit;
00385 ++pos) {
00386 const RepoId& writerid = *pos;
00387 const EndpointRegistry::Writer& writer = registry_.writer_map.find(writerid)->second;
00388
00389 #ifdef __SUNPRO_CC
00390 WriterAssociation wa;
00391 wa.writerTransInfo = writer.trans_info;
00392 wa.writerId = writerid;
00393 wa.pubQos = writer.publisher_qos;
00394 wa.writerQos = writer.qos;
00395 #else
00396 const WriterAssociation wa =
00397 {writer.trans_info, writerid, writer.publisher_qos, writer.qos};
00398 #endif
00399 sub.subscription_->add_association(readerid, wa, false);
00400 }
00401
00402 for (RepoIdSet::const_iterator pos = reader.reliable_writers.begin(), limit = reader.reliable_writers.end();
00403 pos != limit;
00404 ++pos) {
00405 const RepoId& writerid = *pos;
00406 const EndpointRegistry::Writer& writer = registry_.writer_map.find(writerid)->second;
00407 sub.subscription_->register_for_writer(participant_id_, readerid, writerid, writer.trans_info, this);
00408 }
00409
00410 return DDS::RETCODE_OK;
00411 }
00412
00413 DDS::ReturnCode_t
00414 StaticEndpointManager::remove_subscription_i(const RepoId& readerid)
00415 {
00416 LocalSubscriptionMap::const_iterator ls_pos = local_subscriptions_.find(readerid);
00417 if (ls_pos == local_subscriptions_.end()) {
00418 return DDS::RETCODE_ERROR;
00419 }
00420
00421 const LocalSubscription& sub = ls_pos->second;
00422
00423 EndpointRegistry::ReaderMapType::const_iterator pos = registry_.reader_map.find(readerid);
00424 if (pos == registry_.reader_map.end()) {
00425 return DDS::RETCODE_ERROR;
00426 }
00427
00428 const EndpointRegistry::Reader& reader = pos->second;
00429
00430 WriterIdSeq ids;
00431 ids.length((CORBA::ULong)reader.reliable_writers.size());
00432 CORBA::ULong idx = 0;
00433 for (RepoIdSet::const_iterator pos = reader.reliable_writers.begin(), limit = reader.reliable_writers.end();
00434 pos != limit;
00435 ++pos, ++idx) {
00436 const RepoId& writerid = *pos;
00437 ids[idx] = writerid;
00438 sub.subscription_->unregister_for_writer(participant_id_, readerid, writerid);
00439 }
00440
00441 return DDS::RETCODE_OK;
00442 }
00443
00444 bool
00445 StaticEndpointManager::shutting_down() const
00446 {
00447 ACE_DEBUG((LM_NOTICE, ACE_TEXT("(%P|%t) StaticEndpointManager::shutting_down TODO\n")));
00448
00449 return false;
00450 }
00451
00452 void
00453 StaticEndpointManager::populate_transport_locator_sequence(TransportLocatorSeq*& ,
00454 DiscoveredSubscriptionIter& ,
00455 const RepoId& )
00456 {
00457 ACE_DEBUG((LM_NOTICE, ACE_TEXT("(%P|%t) StaticEndpointManager::populate_transport_locator_sequence TODO\n")));
00458
00459 }
00460
00461 void
00462 StaticEndpointManager::populate_transport_locator_sequence(TransportLocatorSeq*& ,
00463 DiscoveredPublicationIter& ,
00464 const RepoId& )
00465 {
00466 ACE_DEBUG((LM_NOTICE, ACE_TEXT("(%P|%t) StaticEndpointManager::populate_transport_locator_sequence TODO\n")));
00467
00468 }
00469
00470 bool
00471 StaticEndpointManager::defer_writer(const RepoId& ,
00472 const RepoId& )
00473 {
00474 ACE_DEBUG((LM_NOTICE, ACE_TEXT("(%P|%t) StaticEndpointManager::defer_writer TODO\n")));
00475
00476 return false;
00477 }
00478
00479 bool
00480 StaticEndpointManager::defer_reader(const RepoId& ,
00481 const RepoId& )
00482 {
00483
00484 ACE_DEBUG((LM_NOTICE, ACE_TEXT("(%P|%t) StaticEndpointManager::defer_reader TODO\n")));
00485 return false;
00486 }
00487
00488 void
00489 StaticEndpointManager::reader_exists(const RepoId& readerid, const RepoId& writerid)
00490 {
00491 ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00492 LocalPublicationMap::const_iterator lp_pos = local_publications_.find(writerid);
00493 EndpointRegistry::ReaderMapType::const_iterator reader_pos = registry_.reader_map.find(readerid);
00494 if (lp_pos != local_publications_.end() &&
00495 reader_pos != registry_.reader_map.end()) {
00496 DataWriterCallbacks* dwr = lp_pos->second.publication_;
00497 #ifdef __SUNPRO_CC
00498 ReaderAssociation ra;
00499 ra.readerTransInfo = reader_pos->second.trans_info;
00500 ra.readerId = readerid;
00501 ra.subQos = reader_pos->second.subscriber_qos;
00502 ra.readerQos = reader_pos->second.qos;
00503 ra.filterClassName = "";
00504 ra.filterExpression = "";
00505 ra.exprParams = 0;
00506 #else
00507 const ReaderAssociation ra =
00508 {reader_pos->second.trans_info, readerid, reader_pos->second.subscriber_qos, reader_pos->second.qos, "", "", 0};
00509
00510 #endif
00511 dwr->add_association(writerid, ra, true);
00512 dwr->association_complete(readerid);
00513 }
00514 }
00515
00516 void
00517 StaticEndpointManager::reader_does_not_exist(const RepoId& readerid, const RepoId& writerid)
00518 {
00519 ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00520 LocalPublicationMap::const_iterator lp_pos = local_publications_.find(writerid);
00521 EndpointRegistry::ReaderMapType::const_iterator reader_pos = registry_.reader_map.find(readerid);
00522 if (lp_pos != local_publications_.end() &&
00523 reader_pos != registry_.reader_map.end()) {
00524 DataWriterCallbacks* dwr = lp_pos->second.publication_;
00525 ReaderIdSeq ids;
00526 ids.length(1);
00527 ids[0] = readerid;
00528 dwr->remove_associations(ids, true);
00529 }
00530 }
00531
00532 void
00533 StaticEndpointManager::writer_exists(const RepoId& writerid, const RepoId& readerid)
00534 {
00535 ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00536 LocalSubscriptionMap::const_iterator ls_pos = local_subscriptions_.find(readerid);
00537 EndpointRegistry::WriterMapType::const_iterator writer_pos = registry_.writer_map.find(writerid);
00538 if (ls_pos != local_subscriptions_.end() &&
00539 writer_pos != registry_.writer_map.end()) {
00540 DataReaderCallbacks* drr = ls_pos->second.subscription_;
00541 #ifdef __SUNPRO_CC
00542 WriterAssociation wa;
00543 wa.writerTransInfo = writer_pos->second.trans_info;
00544 wa.writerId = writerid;
00545 wa.pubQos = writer_pos->second.publisher_qos;
00546 wa.writerQos = writer_pos->second.qos;
00547 #else
00548 const WriterAssociation wa =
00549 {writer_pos->second.trans_info, writerid, writer_pos->second.publisher_qos, writer_pos->second.qos};
00550 #endif
00551 drr->add_association(readerid, wa, false);
00552 }
00553 }
00554
00555 void
00556 StaticEndpointManager::writer_does_not_exist(const RepoId& writerid, const RepoId& readerid)
00557 {
00558 ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00559 LocalSubscriptionMap::const_iterator ls_pos = local_subscriptions_.find(readerid);
00560 EndpointRegistry::WriterMapType::const_iterator writer_pos = registry_.writer_map.find(writerid);
00561 if (ls_pos != local_subscriptions_.end() &&
00562 writer_pos != registry_.writer_map.end()) {
00563 DataReaderCallbacks* drr = ls_pos->second.subscription_;
00564 WriterIdSeq ids;
00565 ids.length(1);
00566 ids[0] = writerid;
00567 drr->remove_associations(ids, true);
00568 }
00569 }
00570
00571 #ifndef DDS_HAS_MINIMUM_BIT
00572 DDS::PublicationBuiltinTopicDataDataReaderImpl*
00573 StaticEndpointManager::pub_bit()
00574 {
00575 DDS::Subscriber_var sub = participant_.bit_subscriber();
00576 if (!sub.in())
00577 return 0;
00578
00579 DDS::DataReader_var d = sub->lookup_datareader(BUILT_IN_PUBLICATION_TOPIC);
00580 return dynamic_cast<DDS::PublicationBuiltinTopicDataDataReaderImpl*>(d.in());
00581 }
00582
00583 DDS::SubscriptionBuiltinTopicDataDataReaderImpl*
00584 StaticEndpointManager::sub_bit()
00585 {
00586 DDS::Subscriber_var sub = participant_.bit_subscriber();
00587 if (!sub.in())
00588 return 0;
00589
00590 DDS::DataReader_var d = sub->lookup_datareader(BUILT_IN_SUBSCRIPTION_TOPIC);
00591 return dynamic_cast<DDS::SubscriptionBuiltinTopicDataDataReaderImpl*>(d.in());
00592 }
00593 #endif
00594
00595 StaticDiscovery::StaticDiscovery(const RepoKey& key)
00596 : PeerDiscovery<StaticParticipant>(key)
00597 {}
00598
00599 namespace {
00600 unsigned char hextobyte(unsigned char c)
00601 {
00602 if (c >= '0' && c <= '9') {
00603 return c - '0';
00604 }
00605 if (c >= 'a' && c <= 'f') {
00606 return 10 + c - 'a';
00607 }
00608 if (c >= 'A' && c <= 'F') {
00609 return 10 + c - 'A';
00610 }
00611 return c;
00612 }
00613
00614 unsigned char
00615 fromhex(const OPENDDS_STRING& x, size_t idx)
00616 {
00617 return (hextobyte(x[idx * 2]) << 4) | (hextobyte(x[idx * 2 + 1]));
00618 }
00619 }
00620
00621 EntityId_t
00622 EndpointRegistry::build_id(const unsigned char* entity_key,
00623 const unsigned char entity_kind)
00624 {
00625 EntityId_t retval;
00626 retval.entityKey[0] = entity_key[0];
00627 retval.entityKey[1] = entity_key[1];
00628 retval.entityKey[2] = entity_key[2];
00629 retval.entityKind = entity_kind;
00630 return retval;
00631 }
00632
00633 RepoId
00634 EndpointRegistry::build_id(DDS::DomainId_t domain,
00635 const unsigned char* participant_id,
00636 const EntityId_t& entity_id)
00637 {
00638 RepoId id;
00639 id.guidPrefix[0] = VENDORID_OCI[0];
00640 id.guidPrefix[1] = VENDORID_OCI[1];
00641
00642
00643
00644
00645 DDS::DomainId_t netdom = ACE_HTONL(domain);
00646 ACE_OS::memcpy(&id.guidPrefix[2], &netdom, sizeof(DDS::DomainId_t));
00647
00648
00649
00650
00651
00652
00653 ACE_OS::memcpy(&id.guidPrefix[6], participant_id, 6);
00654 id.entityId = entity_id;
00655 return id;
00656 }
00657
00658 AddDomainStatus
00659 StaticDiscovery::add_domain_participant(DDS::DomainId_t domain,
00660 const DDS::DomainParticipantQos& qos)
00661 {
00662 AddDomainStatus ads = {RepoId(), false };
00663
00664 if (qos.user_data.value.length() != 6) {
00665 ACE_DEBUG((LM_ERROR,
00666 ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::add_domain_participant ")
00667 ACE_TEXT("No userdata to identify participant\n")));
00668 return ads;
00669 }
00670
00671 RepoId id = EndpointRegistry::build_id(domain,
00672 qos.user_data.value.get_buffer(),
00673 ENTITYID_PARTICIPANT);
00674 if (!get_part(domain, id).is_nil()) {
00675 ACE_DEBUG((LM_ERROR,
00676 ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::add_domain_participant ")
00677 ACE_TEXT("Duplicate participant\n")));
00678 return ads;
00679 }
00680
00681 const RcHandle<StaticParticipant> participant = new StaticParticipant(id, qos, registry);
00682
00683 {
00684 ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, ads);
00685 participants_[domain][id] = participant;
00686 }
00687
00688 ads.id = id;
00689 return ads;
00690 }
00691
00692 namespace {
00693 const ACE_TCHAR TOPIC_SECTION_NAME[] = ACE_TEXT("topic");
00694 const ACE_TCHAR DATAWRITERQOS_SECTION_NAME[] = ACE_TEXT("datawriterqos");
00695 const ACE_TCHAR DATAREADERQOS_SECTION_NAME[] = ACE_TEXT("datareaderqos");
00696 const ACE_TCHAR PUBLISHERQOS_SECTION_NAME[] = ACE_TEXT("publisherqos");
00697 const ACE_TCHAR SUBSCRIBERQOS_SECTION_NAME[] = ACE_TEXT("subscriberqos");
00698 const ACE_TCHAR ENDPOINT_SECTION_NAME[] = ACE_TEXT("endpoint");
00699
00700 void parse_second(CORBA::Long& x, const OPENDDS_STRING& value)
00701 {
00702 if (value == "DURATION_INFINITE_SEC") {
00703 x = DDS::DURATION_INFINITE_SEC;
00704 } else {
00705 x = atoi(value.c_str());
00706 }
00707 }
00708
00709 void parse_nanosecond(CORBA::ULong& x, const OPENDDS_STRING& value)
00710 {
00711 if (value == "DURATION_INFINITE_NANOSEC") {
00712 x = DDS::DURATION_INFINITE_NSEC;
00713 } else {
00714 x = atoi(value.c_str());
00715 }
00716 }
00717
00718 bool parse_bool(CORBA::Boolean& x, const OPENDDS_STRING& value)
00719 {
00720 if (value == "true") {
00721 x = true;
00722 return true;
00723 } else if (value == "false") {
00724 x = false;
00725 return true;
00726 }
00727 return false;
00728 }
00729
00730 void parse_list(DDS::PartitionQosPolicy& x, const OPENDDS_STRING& value)
00731 {
00732
00733 const char* start = value.c_str();
00734 char buffer[128];
00735 std::memset(buffer, 0, sizeof(buffer));
00736 while (const char* next_comma = std::strchr(start, ',')) {
00737
00738 std::strncpy(buffer, start, next_comma - start);
00739
00740 buffer[next_comma - start] = '\0';
00741
00742 x.name.length(x.name.length() + 1);
00743 x.name[x.name.length() - 1] = static_cast<const char*>(buffer);
00744
00745 start = next_comma + 1;
00746 }
00747
00748 x.name.length(x.name.length() + 1);
00749 x.name[x.name.length() - 1] = start;
00750 }
00751 }
00752
00753 int
00754 StaticDiscovery::load_configuration(ACE_Configuration_Heap& cf)
00755 {
00756 if (parse_topics(cf) ||
00757 parse_datawriterqos(cf) ||
00758 parse_datareaderqos(cf) ||
00759 parse_publisherqos(cf) ||
00760 parse_subscriberqos(cf) ||
00761 parse_endpoints(cf)) {
00762 return -1;
00763 }
00764
00765 registry.match();
00766
00767 return 0;
00768 }
00769
00770 int
00771 StaticDiscovery::parse_topics(ACE_Configuration_Heap& cf)
00772 {
00773 const ACE_Configuration_Section_Key& root = cf.root_section();
00774 ACE_Configuration_Section_Key section;
00775
00776 if (cf.open_section(root, TOPIC_SECTION_NAME, 0, section) != 0) {
00777 if (DCPS_debug_level > 0) {
00778
00779
00780 ACE_DEBUG((LM_NOTICE,
00781 ACE_TEXT("(%P|%t) NOTICE: StaticDiscovery::parse_topics ")
00782 ACE_TEXT("no [%s] sections.\n"),
00783 TOPIC_SECTION_NAME));
00784 }
00785 return 0;
00786 }
00787
00788
00789
00790 ValueMap vm;
00791 if (pullValues(cf, section, vm) > 0) {
00792 ACE_ERROR_RETURN((LM_ERROR,
00793 ACE_TEXT("(%P|%t) StaticDiscovery::parse_topics ")
00794 ACE_TEXT("[topic] sections must have a subsection name\n")),
00795 -1);
00796 }
00797
00798 KeyList keys;
00799 if (processSections(cf, section, keys) != 0) {
00800 ACE_ERROR_RETURN((LM_ERROR,
00801 ACE_TEXT("(%P|%t) StaticDiscovery::parse_topics ")
00802 ACE_TEXT("too many nesting layers in the [topic] section.\n")),
00803 -1);
00804 }
00805
00806
00807 for (KeyList::const_iterator it = keys.begin(); it != keys.end(); ++it) {
00808 OPENDDS_STRING topic_name = it->first;
00809
00810 if (DCPS_debug_level > 0) {
00811 ACE_DEBUG((LM_NOTICE,
00812 ACE_TEXT("(%P|%t) NOTICE: StaticDiscovery::parse_topics ")
00813 ACE_TEXT("processing [topic/%C] section.\n"),
00814 topic_name.c_str()));
00815 }
00816
00817 ValueMap values;
00818 pullValues(cf, it->second, values);
00819
00820 EndpointRegistry::Topic topic;
00821 bool name_Specified = false,
00822 type_name_Specified = false;
00823
00824 for (ValueMap::const_iterator it = values.begin(); it != values.end(); ++it) {
00825 OPENDDS_STRING name = it->first;
00826 OPENDDS_STRING value = it->second;
00827
00828 if (name == "name") {
00829 topic.name = value;
00830 name_Specified = true;
00831 } else if (name == "type_name") {
00832 if (value.size() >= 128) {
00833 ACE_ERROR_RETURN((LM_ERROR,
00834 ACE_TEXT("(%P|%t) StaticDiscovery::parse_topics ")
00835 ACE_TEXT("type_name (%C) must be less than 128 characters in [topic/%C] section.\n"),
00836 value.c_str(), topic_name.c_str()),
00837 -1);
00838 }
00839 topic.type_name = value;
00840 type_name_Specified = true;
00841 } else {
00842
00843 }
00844 }
00845
00846 if (!name_Specified) {
00847 topic.name = topic_name;
00848 }
00849
00850 if (!type_name_Specified) {
00851 ACE_ERROR_RETURN((LM_ERROR,
00852 ACE_TEXT("(%P|%t) StaticDiscovery::parse_topics ")
00853 ACE_TEXT("No type_name specified for [topic/%C] section.\n"),
00854 topic_name.c_str()),
00855 -1);
00856 }
00857
00858 registry.topic_map[topic_name] = topic;
00859 }
00860
00861 return 0;
00862 }
00863
00864 int
00865 StaticDiscovery::parse_datawriterqos(ACE_Configuration_Heap& cf)
00866 {
00867 const ACE_Configuration_Section_Key& root = cf.root_section();
00868 ACE_Configuration_Section_Key section;
00869
00870 if (cf.open_section(root, DATAWRITERQOS_SECTION_NAME, 0, section) != 0) {
00871 if (DCPS_debug_level > 0) {
00872
00873
00874 ACE_DEBUG((LM_NOTICE,
00875 ACE_TEXT("(%P|%t) NOTICE: StaticDiscovery::parse_datawriterqos ")
00876 ACE_TEXT("no [%s] sections.\n"),
00877 DATAWRITERQOS_SECTION_NAME));
00878 }
00879 return 0;
00880 }
00881
00882
00883
00884 ValueMap vm;
00885 if (pullValues(cf, section, vm) > 0) {
00886 ACE_ERROR_RETURN((LM_ERROR,
00887 ACE_TEXT("(%P|%t) StaticDiscovery::parse_datawriterqos ")
00888 ACE_TEXT("[datawriterqos] sections must have a subsection name\n")),
00889 -1);
00890 }
00891
00892 KeyList keys;
00893 if (processSections(cf, section, keys) != 0) {
00894 ACE_ERROR_RETURN((LM_ERROR,
00895 ACE_TEXT("(%P|%t) StaticDiscovery::parse_datawriterqos ")
00896 ACE_TEXT("too many nesting layers in the [datawriterqos] section.\n")),
00897 -1);
00898 }
00899
00900
00901 for (KeyList::const_iterator it = keys.begin(); it != keys.end(); ++it) {
00902 OPENDDS_STRING datawriterqos_name = it->first;
00903
00904 if (DCPS_debug_level > 0) {
00905 ACE_DEBUG((LM_NOTICE,
00906 ACE_TEXT("(%P|%t) NOTICE: StaticDiscovery::parse_datawriterqos ")
00907 ACE_TEXT("processing [datawriterqos/%C] section.\n"),
00908 datawriterqos_name.c_str()));
00909 }
00910
00911 ValueMap values;
00912 pullValues(cf, it->second, values);
00913
00914 DDS::DataWriterQos datawriterqos(TheServiceParticipant->initial_DataWriterQos());
00915
00916 for (ValueMap::const_iterator it = values.begin(); it != values.end(); ++it) {
00917 OPENDDS_STRING name = it->first;
00918 OPENDDS_STRING value = it->second;
00919
00920 if (name == "durability.kind") {
00921 if (value == "VOLATILE") {
00922 datawriterqos.durability.kind = DDS::VOLATILE_DURABILITY_QOS;
00923 } else if (value == "TRANSIENT_LOCAL") {
00924 datawriterqos.durability.kind = DDS::TRANSIENT_LOCAL_DURABILITY_QOS;
00925 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
00926 } else if (value == "TRANSIENT") {
00927 datawriterqos.durability.kind = DDS::TRANSIENT_DURABILITY_QOS;
00928 } else if (value == "PERSISTENT") {
00929 datawriterqos.durability.kind = DDS::PERSISTENT_DURABILITY_QOS;
00930 #endif
00931 } else {
00932 ACE_ERROR_RETURN((LM_ERROR,
00933 ACE_TEXT("(%P|%t) StaticDiscovery::parse_datawriterqos ")
00934 ACE_TEXT("Illegal value for durability.kind (%C) in [datawriterqos/%C] section.\n"),
00935 value.c_str(), datawriterqos_name.c_str()),
00936 -1);
00937 }
00938 } else if (name == "deadline.period.sec") {
00939 parse_second(datawriterqos.deadline.period.sec, value);
00940 } else if (name == "deadline.period.nanosec") {
00941 parse_nanosecond(datawriterqos.deadline.period.nanosec, value);
00942 } else if (name == "latency_budget.duration.sec") {
00943 parse_second(datawriterqos.latency_budget.duration.sec, value);
00944 } else if (name == "latency_budget.duration.nanosec") {
00945 parse_nanosecond(datawriterqos.latency_budget.duration.nanosec, value);
00946 } else if (name == "liveliness.kind") {
00947 if (value == "AUTOMATIC") {
00948 datawriterqos.liveliness.kind = DDS::AUTOMATIC_LIVELINESS_QOS;
00949 } else if (value == "MANUAL_BY_TOPIC") {
00950 datawriterqos.liveliness.kind = DDS::MANUAL_BY_TOPIC_LIVELINESS_QOS;
00951 } else if (value == "MANUAL_BY_PARTICIPANT") {
00952 datawriterqos.liveliness.kind = DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS;
00953 } else {
00954 ACE_ERROR_RETURN((LM_ERROR,
00955 ACE_TEXT("(%P|%t) StaticDiscovery::parse_datawriterqos ")
00956 ACE_TEXT("Illegal value for liveliness.kind (%C) in [datawriterqos/%C] section.\n"),
00957 value.c_str(), datawriterqos_name.c_str()),
00958 -1);
00959 }
00960 } else if (name == "liveliness.lease_duration.sec") {
00961 parse_second(datawriterqos.liveliness.lease_duration.sec, value);
00962 } else if (name == "liveliness.lease_duration.nanosec") {
00963 parse_nanosecond(datawriterqos.liveliness.lease_duration.nanosec, value);
00964 } else if (name == "reliability.kind") {
00965 if (value == "BEST_EFFORT") {
00966 datawriterqos.reliability.kind = DDS::BEST_EFFORT_RELIABILITY_QOS;
00967 } else if (value == "RELIABLE") {
00968 datawriterqos.reliability.kind = DDS::RELIABLE_RELIABILITY_QOS;
00969 } else {
00970 ACE_ERROR_RETURN((LM_ERROR,
00971 ACE_TEXT("(%P|%t) StaticDiscovery::parse_datawriterqos ")
00972 ACE_TEXT("Illegal value for reliability.kind (%C) in [datawriterqos/%C] section.\n"),
00973 value.c_str(), datawriterqos_name.c_str()),
00974 -1);
00975 }
00976 } else if (name == "reliability.max_blocking_time.sec") {
00977 parse_second(datawriterqos.reliability.max_blocking_time.sec, value);
00978 } else if (name == "reliability.max_blocking_time.nanosec") {
00979 parse_nanosecond(datawriterqos.reliability.max_blocking_time.nanosec, value);
00980 } else if (name == "destination_order.kind") {
00981 if (value == "BY_RECEPTION_TIMESTAMP") {
00982 datawriterqos.destination_order.kind = DDS::BY_RECEPTION_TIMESTAMP_DESTINATIONORDER_QOS;
00983 } else if (value == "BY_SOURCE_TIMESTAMP") {
00984 datawriterqos.destination_order.kind = DDS::BY_SOURCE_TIMESTAMP_DESTINATIONORDER_QOS;
00985 } else {
00986 ACE_ERROR_RETURN((LM_ERROR,
00987 ACE_TEXT("(%P|%t) StaticDiscovery::parse_datawriterqos ")
00988 ACE_TEXT("Illegal value for destination_order.kind (%C) in [datawriterqos/%C] section.\n"),
00989 value.c_str(), datawriterqos_name.c_str()),
00990 -1);
00991 }
00992 } else if (name == "history.kind") {
00993 if (value == "KEEP_ALL") {
00994 datawriterqos.history.kind = DDS::KEEP_ALL_HISTORY_QOS;
00995 } else if (value == "KEEP_LAST") {
00996 datawriterqos.history.kind = DDS::KEEP_LAST_HISTORY_QOS;
00997 } else {
00998 ACE_ERROR_RETURN((LM_ERROR,
00999 ACE_TEXT("(%P|%t) StaticDiscovery::parse_datawriterqos ")
01000 ACE_TEXT("Illegal value for history.kind (%C) in [datawriterqos/%C] section.\n"),
01001 value.c_str(), datawriterqos_name.c_str()),
01002 -1);
01003 }
01004 } else if (name == "history.depth") {
01005 datawriterqos.history.depth = atoi(value.c_str());
01006 } else if (name == "resource_limits.max_samples") {
01007 datawriterqos.resource_limits.max_samples = atoi(value.c_str());
01008 } else if (name == "resource_limits.max_instances") {
01009 datawriterqos.resource_limits.max_instances = atoi(value.c_str());
01010 } else if (name == "resource_limits.max_samples_per_instance") {
01011 datawriterqos.resource_limits.max_samples_per_instance = atoi(value.c_str());
01012 } else if (name == "transport_priority.value") {
01013 datawriterqos.transport_priority.value = atoi(value.c_str());
01014 } else if (name == "lifespan.duration.sec") {
01015 parse_second(datawriterqos.lifespan.duration.sec, value);
01016 } else if (name == "lifespan.duration.nanosec") {
01017 parse_nanosecond(datawriterqos.lifespan.duration.nanosec, value);
01018 } else if (name == "ownership.kind") {
01019 if (value == "SHARED") {
01020 datawriterqos.ownership.kind = DDS::SHARED_OWNERSHIP_QOS;
01021 } else if (value == "EXCLUSIVE") {
01022 datawriterqos.ownership.kind = DDS::EXCLUSIVE_OWNERSHIP_QOS;
01023 } else {
01024 ACE_ERROR_RETURN((LM_ERROR,
01025 ACE_TEXT("(%P|%t) StaticDiscovery::parse_datawriterqos ")
01026 ACE_TEXT("Illegal value for ownership.kind (%C) in [datawriterqos/%C] section.\n"),
01027 value.c_str(), datawriterqos_name.c_str()),
01028 -1);
01029 }
01030 } else if (name == "ownership_strength.value") {
01031 datawriterqos.ownership_strength.value = atoi(value.c_str());
01032 } else {
01033 ACE_ERROR_RETURN((LM_ERROR,
01034 ACE_TEXT("(%P|%t) StaticDiscovery::parse_datawriterqos ")
01035 ACE_TEXT("Unexpected entry (%C) in [datawriterqos/%C] section.\n"),
01036 name.c_str(), datawriterqos_name.c_str()),
01037 -1);
01038 }
01039 }
01040
01041 registry.datawriterqos_map[datawriterqos_name] = datawriterqos;
01042 }
01043
01044 return 0;
01045 }
01046
01047 int
01048 StaticDiscovery::parse_datareaderqos(ACE_Configuration_Heap& cf)
01049 {
01050 const ACE_Configuration_Section_Key& root = cf.root_section();
01051 ACE_Configuration_Section_Key section;
01052
01053 if (cf.open_section(root, DATAREADERQOS_SECTION_NAME, 0, section) != 0) {
01054 if (DCPS_debug_level > 0) {
01055
01056
01057 ACE_DEBUG((LM_NOTICE,
01058 ACE_TEXT("(%P|%t) NOTICE: StaticDiscovery::parse_datareaderqos ")
01059 ACE_TEXT("no [%s] sections.\n"),
01060 DATAREADERQOS_SECTION_NAME));
01061 }
01062 return 0;
01063 }
01064
01065
01066
01067 ValueMap vm;
01068 if (pullValues(cf, section, vm) > 0) {
01069 ACE_ERROR_RETURN((LM_ERROR,
01070 ACE_TEXT("(%P|%t) StaticDiscovery::parse_datareaderqos ")
01071 ACE_TEXT("[datareaderqos] sections must have a subsection name\n")),
01072 -1);
01073 }
01074
01075 KeyList keys;
01076 if (processSections(cf, section, keys) != 0) {
01077 ACE_ERROR_RETURN((LM_ERROR,
01078 ACE_TEXT("(%P|%t) StaticDiscovery::parse_datareaderqos ")
01079 ACE_TEXT("too many nesting layers in the [datareaderqos] section.\n")),
01080 -1);
01081 }
01082
01083
01084 for (KeyList::const_iterator it = keys.begin(); it != keys.end(); ++it) {
01085 OPENDDS_STRING datareaderqos_name = it->first;
01086
01087 if (DCPS_debug_level > 0) {
01088 ACE_DEBUG((LM_NOTICE,
01089 ACE_TEXT("(%P|%t) NOTICE: StaticDiscovery::parse_datareaderqos ")
01090 ACE_TEXT("processing [datareaderqos/%C] section.\n"),
01091 datareaderqos_name.c_str()));
01092 }
01093
01094 ValueMap values;
01095 pullValues(cf, it->second, values);
01096
01097 DDS::DataReaderQos datareaderqos(TheServiceParticipant->initial_DataReaderQos());
01098
01099 for (ValueMap::const_iterator it = values.begin(); it != values.end(); ++it) {
01100 OPENDDS_STRING name = it->first;
01101 OPENDDS_STRING value = it->second;
01102
01103 if (name == "durability.kind") {
01104 if (value == "VOLATILE") {
01105 datareaderqos.durability.kind = DDS::VOLATILE_DURABILITY_QOS;
01106 } else if (value == "TRANSIENT_LOCAL") {
01107 datareaderqos.durability.kind = DDS::TRANSIENT_LOCAL_DURABILITY_QOS;
01108 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
01109 } else if (value == "TRANSIENT") {
01110 datareaderqos.durability.kind = DDS::TRANSIENT_DURABILITY_QOS;
01111 } else if (value == "PERSISTENT") {
01112 datareaderqos.durability.kind = DDS::PERSISTENT_DURABILITY_QOS;
01113 #endif
01114 } else {
01115 ACE_ERROR_RETURN((LM_ERROR,
01116 ACE_TEXT("(%P|%t) StaticDiscovery::parse_datareaderqos ")
01117 ACE_TEXT("Illegal value for durability.kind (%C) in [datareaderqos/%C] section.\n"),
01118 value.c_str(), datareaderqos_name.c_str()),
01119 -1);
01120 }
01121 } else if (name == "deadline.period.sec") {
01122 parse_second(datareaderqos.deadline.period.sec, value);
01123 } else if (name == "deadline.period.nanosec") {
01124 parse_nanosecond(datareaderqos.deadline.period.nanosec, value);
01125 } else if (name == "latency_budget.duration.sec") {
01126 parse_second(datareaderqos.latency_budget.duration.sec, value);
01127 } else if (name == "latency_budget.duration.nanosec") {
01128 parse_nanosecond(datareaderqos.latency_budget.duration.nanosec, value);
01129 } else if (name == "liveliness.kind") {
01130 if (value == "AUTOMATIC") {
01131 datareaderqos.liveliness.kind = DDS::AUTOMATIC_LIVELINESS_QOS;
01132 } else if (value == "MANUAL_BY_TOPIC") {
01133 datareaderqos.liveliness.kind = DDS::MANUAL_BY_TOPIC_LIVELINESS_QOS;
01134 } else if (value == "MANUAL_BY_PARTICIPANT") {
01135 datareaderqos.liveliness.kind = DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS;
01136 } else {
01137 ACE_ERROR_RETURN((LM_ERROR,
01138 ACE_TEXT("(%P|%t) StaticDiscovery::parse_datareaderqos ")
01139 ACE_TEXT("Illegal value for liveliness.kind (%C) in [datareaderqos/%C] section.\n"),
01140 value.c_str(), datareaderqos_name.c_str()),
01141 -1);
01142 }
01143 } else if (name == "liveliness.lease_duration.sec") {
01144 parse_second(datareaderqos.liveliness.lease_duration.sec, value);
01145 } else if (name == "liveliness.lease_duration.nanosec") {
01146 parse_nanosecond(datareaderqos.liveliness.lease_duration.nanosec, value);
01147 } else if (name == "reliability.kind") {
01148 if (value == "BEST_EFFORT") {
01149 datareaderqos.reliability.kind = DDS::BEST_EFFORT_RELIABILITY_QOS;
01150 } else if (value == "RELIABLE") {
01151 datareaderqos.reliability.kind = DDS::RELIABLE_RELIABILITY_QOS;
01152 } else {
01153 ACE_ERROR_RETURN((LM_ERROR,
01154 ACE_TEXT("(%P|%t) StaticDiscovery::parse_datareaderqos ")
01155 ACE_TEXT("Illegal value for reliability.kind (%C) in [datareaderqos/%C] section.\n"),
01156 value.c_str(), datareaderqos_name.c_str()),
01157 -1);
01158 }
01159 } else if (name == "reliability.max_blocking_time.sec") {
01160 parse_second(datareaderqos.reliability.max_blocking_time.sec, value);
01161 } else if (name == "reliability.max_blocking_time.nanosec") {
01162 parse_nanosecond(datareaderqos.reliability.max_blocking_time.nanosec, value);
01163 } else if (name == "destination_order.kind") {
01164 if (value == "BY_RECEPTION_TIMESTAMP") {
01165 datareaderqos.destination_order.kind = DDS::BY_RECEPTION_TIMESTAMP_DESTINATIONORDER_QOS;
01166 } else if (value == "BY_SOURCE_TIMESTAMP") {
01167 datareaderqos.destination_order.kind = DDS::BY_SOURCE_TIMESTAMP_DESTINATIONORDER_QOS;
01168 } else {
01169 ACE_ERROR_RETURN((LM_ERROR,
01170 ACE_TEXT("(%P|%t) StaticDiscovery::parse_datareaderqos ")
01171 ACE_TEXT("Illegal value for destination_order.kind (%C) in [datareaderqos/%C] section.\n"),
01172 value.c_str(), datareaderqos_name.c_str()),
01173 -1);
01174 }
01175 } else if (name == "history.kind") {
01176 if (value == "KEEP_ALL") {
01177 datareaderqos.history.kind = DDS::KEEP_ALL_HISTORY_QOS;
01178 } else if (value == "KEEP_LAST") {
01179 datareaderqos.history.kind = DDS::KEEP_LAST_HISTORY_QOS;
01180 } else {
01181 ACE_ERROR_RETURN((LM_ERROR,
01182 ACE_TEXT("(%P|%t) StaticDiscovery::parse_datareaderqos ")
01183 ACE_TEXT("Illegal value for history.kind (%C) in [datareaderqos/%C] section.\n"),
01184 value.c_str(), datareaderqos_name.c_str()),
01185 -1);
01186 }
01187 } else if (name == "history.depth") {
01188 datareaderqos.history.depth = atoi(value.c_str());
01189 } else if (name == "resource_limits.max_samples") {
01190 datareaderqos.resource_limits.max_samples = atoi(value.c_str());
01191 } else if (name == "resource_limits.max_instances") {
01192 datareaderqos.resource_limits.max_instances = atoi(value.c_str());
01193 } else if (name == "resource_limits.max_samples_per_instance") {
01194 datareaderqos.resource_limits.max_samples_per_instance = atoi(value.c_str());
01195 } else if (name == "time_based_filter.minimum_separation.sec") {
01196 parse_second(datareaderqos.time_based_filter.minimum_separation.sec, value);
01197 } else if (name == "time_based_filter.minimum_separation.nanosec") {
01198 parse_nanosecond(datareaderqos.time_based_filter.minimum_separation.nanosec, value);
01199 } else if (name == "reader_data_lifecycle.autopurge_nowriter_samples_delay.sec") {
01200 parse_second(datareaderqos.reader_data_lifecycle.autopurge_nowriter_samples_delay.sec, value);
01201 } else if (name == "reader_data_lifecycle.autopurge_nowriter_samples_delay.nanosec") {
01202 parse_nanosecond(datareaderqos.reader_data_lifecycle.autopurge_nowriter_samples_delay.nanosec, value);
01203 } else if (name == "reader_data_lifecycle.autopurge_disposed_samples_delay.sec") {
01204 parse_second(datareaderqos.reader_data_lifecycle.autopurge_disposed_samples_delay.sec, value);
01205 } else if (name == "reader_data_lifecycle.autopurge_disposed_samples_delay.nanosec") {
01206 parse_nanosecond(datareaderqos.reader_data_lifecycle.autopurge_disposed_samples_delay.nanosec, value);
01207 } else {
01208 ACE_ERROR_RETURN((LM_ERROR,
01209 ACE_TEXT("(%P|%t) StaticDiscovery::parse_datareaderqos ")
01210 ACE_TEXT("Unexpected entry (%C) in [datareaderqos/%C] section.\n"),
01211 name.c_str(), datareaderqos_name.c_str()),
01212 -1);
01213 }
01214 }
01215
01216 registry.datareaderqos_map[datareaderqos_name] = datareaderqos;
01217 }
01218
01219 return 0;
01220 }
01221
01222 int
01223 StaticDiscovery::parse_publisherqos(ACE_Configuration_Heap& cf)
01224 {
01225 const ACE_Configuration_Section_Key& root = cf.root_section();
01226 ACE_Configuration_Section_Key section;
01227
01228 if (cf.open_section(root, PUBLISHERQOS_SECTION_NAME, 0, section) != 0) {
01229 if (DCPS_debug_level > 0) {
01230
01231
01232 ACE_DEBUG((LM_NOTICE,
01233 ACE_TEXT("(%P|%t) NOTICE: StaticDiscovery::parse_publisherqos ")
01234 ACE_TEXT("no [%s] sections.\n"),
01235 PUBLISHERQOS_SECTION_NAME));
01236 }
01237 return 0;
01238 }
01239
01240
01241
01242 ValueMap vm;
01243 if (pullValues(cf, section, vm) > 0) {
01244 ACE_ERROR_RETURN((LM_ERROR,
01245 ACE_TEXT("(%P|%t) StaticDiscovery::parse_publisherqos ")
01246 ACE_TEXT("[publisherqos] sections must have a subsection name\n")),
01247 -1);
01248 }
01249
01250 KeyList keys;
01251 if (processSections(cf, section, keys) != 0) {
01252 ACE_ERROR_RETURN((LM_ERROR,
01253 ACE_TEXT("(%P|%t) StaticDiscovery::parse_publisherqos ")
01254 ACE_TEXT("too many nesting layers in the [publisherqos] section.\n")),
01255 -1);
01256 }
01257
01258
01259 for (KeyList::const_iterator it = keys.begin(); it != keys.end(); ++it) {
01260 OPENDDS_STRING publisherqos_name = it->first;
01261
01262 if (DCPS_debug_level > 0) {
01263 ACE_DEBUG((LM_NOTICE,
01264 ACE_TEXT("(%P|%t) NOTICE: StaticDiscovery::parse_publisherqos ")
01265 ACE_TEXT("processing [publisherqos/%C] section.\n"),
01266 publisherqos_name.c_str()));
01267 }
01268
01269 ValueMap values;
01270 pullValues(cf, it->second, values);
01271
01272 DDS::PublisherQos publisherqos(TheServiceParticipant->initial_PublisherQos());
01273
01274 for (ValueMap::const_iterator it = values.begin(); it != values.end(); ++it) {
01275 OPENDDS_STRING name = it->first;
01276 OPENDDS_STRING value = it->second;
01277
01278 if (name == "presentation.access_scope") {
01279 if (value == "INSTANCE") {
01280 publisherqos.presentation.access_scope = DDS::INSTANCE_PRESENTATION_QOS;
01281 } else if (value == "TOPIC") {
01282 publisherqos.presentation.access_scope = DDS::TOPIC_PRESENTATION_QOS;
01283 } else if (value == "GROUP") {
01284 publisherqos.presentation.access_scope = DDS::GROUP_PRESENTATION_QOS;
01285 } else {
01286 ACE_ERROR_RETURN((LM_ERROR,
01287 ACE_TEXT("(%P|%t) StaticDiscovery::parse_publisherqos ")
01288 ACE_TEXT("Illegal value for presentation.access_scope (%C) in [publisherqos/%C] section.\n"),
01289 value.c_str(), publisherqos_name.c_str()),
01290 -1);
01291 }
01292 } else if (name == "presentation.coherent_access") {
01293 if (parse_bool(publisherqos.presentation.coherent_access, value)) {
01294 } else {
01295 ACE_ERROR_RETURN((LM_ERROR,
01296 ACE_TEXT("(%P|%t) StaticDiscovery::parse_publisherqos ")
01297 ACE_TEXT("Illegal value for presentation.coherent_access (%C) in [publisherqos/%C] section.\n"),
01298 value.c_str(), publisherqos_name.c_str()),
01299 -1);
01300 }
01301 } else if (name == "presentation.ordered_access") {
01302 if (parse_bool(publisherqos.presentation.ordered_access, value)) {
01303 } else {
01304 ACE_ERROR_RETURN((LM_ERROR,
01305 ACE_TEXT("(%P|%t) StaticDiscovery::parse_publisherqos ")
01306 ACE_TEXT("Illegal value for presentation.ordered_access (%C) in [publisherqos/%C] section.\n"),
01307 value.c_str(), publisherqos_name.c_str()),
01308 -1);
01309 }
01310 } else if (name == "partition.name") {
01311 parse_list(publisherqos.partition, value);
01312 } else {
01313 ACE_ERROR_RETURN((LM_ERROR,
01314 ACE_TEXT("(%P|%t) StaticDiscovery::parse_publisherqos ")
01315 ACE_TEXT("Unexpected entry (%C) in [publisherqos/%C] section.\n"),
01316 name.c_str(), publisherqos_name.c_str()),
01317 -1);
01318 }
01319 }
01320
01321 registry.publisherqos_map[publisherqos_name] = publisherqos;
01322 }
01323
01324 return 0;
01325 }
01326
01327 int
01328 StaticDiscovery::parse_subscriberqos(ACE_Configuration_Heap& cf)
01329 {
01330 const ACE_Configuration_Section_Key& root = cf.root_section();
01331 ACE_Configuration_Section_Key section;
01332
01333 if (cf.open_section(root, SUBSCRIBERQOS_SECTION_NAME, 0, section) != 0) {
01334 if (DCPS_debug_level > 0) {
01335
01336
01337 ACE_DEBUG((LM_NOTICE,
01338 ACE_TEXT("(%P|%t) NOTICE: StaticDiscovery::parse_subscriberqos ")
01339 ACE_TEXT("no [%s] sections.\n"),
01340 SUBSCRIBERQOS_SECTION_NAME));
01341 }
01342 return 0;
01343 }
01344
01345
01346
01347 ValueMap vm;
01348 if (pullValues(cf, section, vm) > 0) {
01349 ACE_ERROR_RETURN((LM_ERROR,
01350 ACE_TEXT("(%P|%t) StaticDiscovery::parse_subscriberqos ")
01351 ACE_TEXT("[subscriberqos] sections must have a subsection name\n")),
01352 -1);
01353 }
01354
01355 KeyList keys;
01356 if (processSections(cf, section, keys) != 0) {
01357 ACE_ERROR_RETURN((LM_ERROR,
01358 ACE_TEXT("(%P|%t) StaticDiscovery::parse_subscriberqos ")
01359 ACE_TEXT("too many nesting layers in the [subscriberqos] section.\n")),
01360 -1);
01361 }
01362
01363
01364 for (KeyList::const_iterator it = keys.begin(); it != keys.end(); ++it) {
01365 OPENDDS_STRING subscriberqos_name = it->first;
01366
01367 if (DCPS_debug_level > 0) {
01368 ACE_DEBUG((LM_NOTICE,
01369 ACE_TEXT("(%P|%t) NOTICE: StaticDiscovery::parse_subscriberqos ")
01370 ACE_TEXT("processing [subscriberqos/%C] section.\n"),
01371 subscriberqos_name.c_str()));
01372 }
01373
01374 ValueMap values;
01375 pullValues(cf, it->second, values);
01376
01377 DDS::SubscriberQos subscriberqos(TheServiceParticipant->initial_SubscriberQos());
01378
01379 for (ValueMap::const_iterator it = values.begin(); it != values.end(); ++it) {
01380 OPENDDS_STRING name = it->first;
01381 OPENDDS_STRING value = it->second;
01382
01383 if (name == "presentation.access_scope") {
01384 if (value == "INSTANCE") {
01385 subscriberqos.presentation.access_scope = DDS::INSTANCE_PRESENTATION_QOS;
01386 } else if (value == "TOPIC") {
01387 subscriberqos.presentation.access_scope = DDS::TOPIC_PRESENTATION_QOS;
01388 } else if (value == "GROUP") {
01389 subscriberqos.presentation.access_scope = DDS::GROUP_PRESENTATION_QOS;
01390 } else {
01391 ACE_ERROR_RETURN((LM_ERROR,
01392 ACE_TEXT("(%P|%t) StaticDiscovery::parse_subscriberqos ")
01393 ACE_TEXT("Illegal value for presentation.access_scope (%C) in [subscriberqos/%C] section.\n"),
01394 value.c_str(), subscriberqos_name.c_str()),
01395 -1);
01396 }
01397 } else if (name == "presentation.coherent_access") {
01398 if (parse_bool(subscriberqos.presentation.coherent_access, value)) {
01399 } else {
01400 ACE_ERROR_RETURN((LM_ERROR,
01401 ACE_TEXT("(%P|%t) StaticDiscovery::parse_subscriberqos ")
01402 ACE_TEXT("Illegal value for presentation.coherent_access (%C) in [subscriberqos/%C] section.\n"),
01403 value.c_str(), subscriberqos_name.c_str()),
01404 -1);
01405 }
01406 } else if (name == "presentation.ordered_access") {
01407 if (parse_bool(subscriberqos.presentation.ordered_access, value)) {
01408 } else {
01409 ACE_ERROR_RETURN((LM_ERROR,
01410 ACE_TEXT("(%P|%t) StaticDiscovery::parse_subscriberqos ")
01411 ACE_TEXT("Illegal value for presentation.ordered_access (%C) in [subscriberqos/%C] section.\n"),
01412 value.c_str(), subscriberqos_name.c_str()),
01413 -1);
01414 }
01415 } else if (name == "partition.name") {
01416 parse_list(subscriberqos.partition, value);
01417 } else {
01418 ACE_ERROR_RETURN((LM_ERROR,
01419 ACE_TEXT("(%P|%t) StaticDiscovery::parse_subscriberqos ")
01420 ACE_TEXT("Unexpected entry (%C) in [subscriberqos/%C] section.\n"),
01421 name.c_str(), subscriberqos_name.c_str()),
01422 -1);
01423 }
01424 }
01425
01426 registry.subscriberqos_map[subscriberqos_name] = subscriberqos;
01427 }
01428
01429 return 0;
01430 }
01431
01432 int
01433 StaticDiscovery::parse_endpoints(ACE_Configuration_Heap& cf)
01434 {
01435 const ACE_Configuration_Section_Key& root = cf.root_section();
01436 ACE_Configuration_Section_Key section;
01437
01438 if (cf.open_section(root, ENDPOINT_SECTION_NAME, 0, section) != 0) {
01439 if (DCPS_debug_level > 0) {
01440
01441
01442 ACE_DEBUG((LM_NOTICE,
01443 ACE_TEXT("(%P|%t) NOTICE: StaticDiscovery::parse_endpoints ")
01444 ACE_TEXT("no [%s] sections.\n"),
01445 ENDPOINT_SECTION_NAME));
01446 }
01447 return 0;
01448 }
01449
01450
01451
01452 ValueMap vm;
01453 if (pullValues(cf, section, vm) > 0) {
01454 ACE_ERROR_RETURN((LM_ERROR,
01455 ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
01456 ACE_TEXT("[endpoint] sections must have a subsection name\n")),
01457 -1);
01458 }
01459
01460 KeyList keys;
01461 if (processSections(cf, section, keys) != 0) {
01462 ACE_ERROR_RETURN((LM_ERROR,
01463 ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
01464 ACE_TEXT("too many nesting layers in the [endpoint] section.\n")),
01465 -1);
01466 }
01467
01468
01469 for (KeyList::const_iterator it = keys.begin(); it != keys.end(); ++it) {
01470 OPENDDS_STRING endpoint_name = it->first;
01471
01472 if (DCPS_debug_level > 0) {
01473 ACE_DEBUG((LM_NOTICE,
01474 ACE_TEXT("(%P|%t) NOTICE: StaticDiscovery::parse_endpoints ")
01475 ACE_TEXT("processing [endpoint/%C] section.\n"),
01476 endpoint_name.c_str()));
01477 }
01478
01479 ValueMap values;
01480 pullValues(cf, it->second, values);
01481 int domain = 0;
01482 unsigned char participant[6];
01483 unsigned char entity[3];
01484 enum Type {
01485 Reader,
01486 Writer
01487 };
01488 Type type = Reader;
01489 OPENDDS_STRING topic_name;
01490 DDS::DataWriterQos datawriterqos(TheServiceParticipant->initial_DataWriterQos());
01491 DDS::DataReaderQos datareaderqos(TheServiceParticipant->initial_DataReaderQos());
01492 DDS::PublisherQos publisherqos(TheServiceParticipant->initial_PublisherQos());
01493 DDS::SubscriberQos subscriberqos(TheServiceParticipant->initial_SubscriberQos());
01494 TransportLocatorSeq trans_info;
01495 OPENDDS_STRING config_name;
01496
01497 bool domainSpecified = false,
01498 participantSpecified = false,
01499 entitySpecified = false,
01500 typeSpecified = false,
01501 topic_name_Specified = false,
01502 config_name_Specified = false;
01503
01504 for (ValueMap::const_iterator it = values.begin(); it != values.end(); ++it) {
01505 OPENDDS_STRING name = it->first;
01506 OPENDDS_STRING value = it->second;
01507
01508 if (name == "domain") {
01509 if (convertToInteger(value, domain)) {
01510 domainSpecified = true;
01511 } else {
01512 ACE_ERROR_RETURN((LM_ERROR,
01513 ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
01514 ACE_TEXT("Illegal integer value for domain (%C) in [endpoint/%C] section.\n"),
01515 value.c_str(), endpoint_name.c_str()),
01516 -1);
01517 }
01518 } else if (name == "participant") {
01519 #ifdef __SUNPRO_CC
01520 int count = 0;
01521 std::count_if(value.begin(), value.end(), isxdigit, count);
01522 if (value.size() != 12 || count != 12) {
01523 #else
01524 if (value.size() != 12 ||
01525 std::count_if(value.begin(), value.end(), isxdigit) != 12) {
01526 #endif
01527 ACE_ERROR_RETURN((LM_ERROR,
01528 ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
01529 ACE_TEXT("participant (%C) must be 12 hexadecimal digits in [endpoint/%C] section.\n"),
01530 value.c_str(), endpoint_name.c_str()),
01531 -1);
01532 }
01533
01534 for (size_t idx = 0; idx != 6; ++idx) {
01535 participant[idx] = fromhex(value, idx);
01536 }
01537 participantSpecified = true;
01538 } else if (name == "entity") {
01539 #ifdef __SUNPRO_CC
01540 int count = 0;
01541 std::count_if(value.begin(), value.end(), isxdigit, count);
01542 if (value.size() != 6 || count != 6) {
01543 #else
01544 if (value.size() != 6 ||
01545 std::count_if(value.begin(), value.end(), isxdigit) != 6) {
01546 #endif
01547 ACE_ERROR_RETURN((LM_ERROR,
01548 ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
01549 ACE_TEXT("entity (%C) must be 6 hexadecimal digits in [endpoint/%C] section.\n"),
01550 value.c_str(), endpoint_name.c_str()),
01551 -1);
01552 }
01553
01554 for (size_t idx = 0; idx != 3; ++idx) {
01555 entity[idx] = fromhex(value, idx);
01556 }
01557 entitySpecified = true;
01558 } else if (name == "type") {
01559 if (value == "reader") {
01560 type = Reader;
01561 typeSpecified = true;
01562 } else if (value == "writer") {
01563 type = Writer;
01564 typeSpecified = true;
01565 } else {
01566 ACE_ERROR_RETURN((LM_ERROR,
01567 ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
01568 ACE_TEXT("Illegal string value for type (%C) in [endpoint/%C] section.\n"),
01569 value.c_str(), endpoint_name.c_str()),
01570 -1);
01571 }
01572 } else if (name == "topic") {
01573 EndpointRegistry::TopicMapType::const_iterator pos = this->registry.topic_map.find(value);
01574 if (pos != this->registry.topic_map.end()) {
01575 topic_name = pos->second.name;
01576 topic_name_Specified = true;
01577 } else {
01578 ACE_ERROR_RETURN((LM_ERROR,
01579 ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
01580 ACE_TEXT("Illegal topic reference (%C) in [endpoint/%C] section.\n"),
01581 value.c_str(), endpoint_name.c_str()),
01582 -1);
01583 }
01584 } else if (name == "datawriterqos") {
01585 EndpointRegistry::DataWriterQosMapType::const_iterator pos = this->registry.datawriterqos_map.find(value);
01586 if (pos != this->registry.datawriterqos_map.end()) {
01587 datawriterqos = pos->second;
01588 } else {
01589 ACE_ERROR_RETURN((LM_ERROR,
01590 ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
01591 ACE_TEXT("Illegal datawriterqos reference (%C) in [endpoint/%C] section.\n"),
01592 value.c_str(), endpoint_name.c_str()),
01593 -1);
01594 }
01595 } else if (name == "publisherqos") {
01596 EndpointRegistry::PublisherQosMapType::const_iterator pos = this->registry.publisherqos_map.find(value);
01597 if (pos != this->registry.publisherqos_map.end()) {
01598 publisherqos = pos->second;
01599 } else {
01600 ACE_ERROR_RETURN((LM_ERROR,
01601 ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
01602 ACE_TEXT("Illegal publisherqos reference (%C) in [endpoint/%C] section.\n"),
01603 value.c_str(), endpoint_name.c_str()),
01604 -1);
01605 }
01606 } else if (name == "datareaderqos") {
01607 EndpointRegistry::DataReaderQosMapType::const_iterator pos = this->registry.datareaderqos_map.find(value);
01608 if (pos != this->registry.datareaderqos_map.end()) {
01609 datareaderqos = pos->second;
01610 } else {
01611 ACE_ERROR_RETURN((LM_ERROR,
01612 ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
01613 ACE_TEXT("Illegal datareaderqos reference (%C) in [endpoint/%C] section.\n"),
01614 value.c_str(), endpoint_name.c_str()),
01615 -1);
01616 }
01617 } else if (name == "subscriberqos") {
01618 EndpointRegistry::SubscriberQosMapType::const_iterator pos = this->registry.subscriberqos_map.find(value);
01619 if (pos != this->registry.subscriberqos_map.end()) {
01620 subscriberqos = pos->second;
01621 } else {
01622 ACE_ERROR_RETURN((LM_ERROR,
01623 ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
01624 ACE_TEXT("Illegal subscriberqos reference (%C) in [endpoint/%C] section.\n"),
01625 value.c_str(), endpoint_name.c_str()),
01626 -1);
01627 }
01628 } else if (name == "config") {
01629 config_name = value;
01630 config_name_Specified = true;
01631 } else {
01632 ACE_ERROR_RETURN((LM_ERROR,
01633 ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
01634 ACE_TEXT("Unexpected entry (%C) in [endpoint/%C] section.\n"),
01635 name.c_str(), endpoint_name.c_str()),
01636 -1);
01637 }
01638 }
01639
01640 if (!domainSpecified) {
01641 ACE_ERROR_RETURN((LM_ERROR,
01642 ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
01643 ACE_TEXT("No domain specified for [endpoint/%C] section.\n"),
01644 endpoint_name.c_str()),
01645 -1);
01646 }
01647
01648 if (!participantSpecified) {
01649 ACE_ERROR_RETURN((LM_ERROR,
01650 ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
01651 ACE_TEXT("No participant specified for [endpoint/%C] section.\n"),
01652 endpoint_name.c_str()),
01653 -1);
01654 }
01655
01656 if (!entitySpecified) {
01657 ACE_ERROR_RETURN((LM_ERROR,
01658 ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
01659 ACE_TEXT("No entity specified for [endpoint/%C] section.\n"),
01660 endpoint_name.c_str()),
01661 -1);
01662 }
01663
01664 if (!typeSpecified) {
01665 ACE_ERROR_RETURN((LM_ERROR,
01666 ACE_TEXT("(%P|%t) ERROR:StaticDiscovery::parse_endpoints ")
01667 ACE_TEXT("No type specified for [endpoint/%C] section.\n"),
01668 endpoint_name.c_str()),
01669 -1);
01670 }
01671
01672 if (!topic_name_Specified) {
01673 ACE_ERROR_RETURN((LM_ERROR,
01674 ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
01675 ACE_TEXT("No topic specified for [endpoint/%C] section.\n"),
01676 endpoint_name.c_str()),
01677 -1);
01678 }
01679
01680 TransportConfig_rch config;
01681
01682 if (config_name_Specified) {
01683 config = TheTransportRegistry->get_config(config_name);
01684 if (config.is_nil()) {
01685 ACE_ERROR_RETURN((LM_ERROR,
01686 ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
01687 ACE_TEXT("Illegal config reference (%C) in [endpoint/%C] section.\n"),
01688 config_name.c_str(), endpoint_name.c_str()),
01689 -1);
01690 }
01691 }
01692
01693 if (config.is_nil() && domainSpecified) {
01694 config = TheTransportRegistry->domain_default_config(domain);
01695 }
01696
01697 if (config.is_nil()) {
01698 config = TheTransportRegistry->global_config();
01699 }
01700
01701 config->populate_locators(trans_info);
01702 if (trans_info.length() == 0) {
01703 ACE_ERROR_RETURN((LM_ERROR,
01704 ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
01705 ACE_TEXT("No locators for [endpoint/%C] section.\n"),
01706 endpoint_name.c_str()),
01707 -1);
01708 }
01709
01710 EntityId_t entity_id = EndpointRegistry::build_id(entity,
01711 (type == Reader) ? ENTITYKIND_USER_READER_WITH_KEY : ENTITYKIND_USER_WRITER_WITH_KEY);
01712
01713 RepoId id = EndpointRegistry::build_id(domain, participant, entity_id);
01714
01715 if (DCPS_debug_level > 0) {
01716 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DEBUG: StaticDiscovery::parse_endpoints adding entity with id %C\n"), LogGuid(id).c_str()));
01717 }
01718
01719 switch (type) {
01720 case Reader:
01721
01722 datareaderqos.user_data.value.length(3);
01723 datareaderqos.user_data.value[0] = entity_id.entityKey[0];
01724 datareaderqos.user_data.value[1] = entity_id.entityKey[1];
01725 datareaderqos.user_data.value[2] = entity_id.entityKey[2];
01726
01727 if (!registry.reader_map.insert(std::make_pair(id,
01728 EndpointRegistry::Reader(topic_name, datareaderqos, subscriberqos, config_name, trans_info))).second) {
01729 ACE_ERROR_RETURN((LM_ERROR,
01730 ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
01731 ACE_TEXT("Section [endpoint/%C] ignored - duplicate reader.\n"),
01732 endpoint_name.c_str()),
01733 -1);
01734 }
01735 break;
01736 case Writer:
01737
01738 datawriterqos.user_data.value.length(3);
01739 datawriterqos.user_data.value[0] = entity_id.entityKey[0];
01740 datawriterqos.user_data.value[1] = entity_id.entityKey[1];
01741 datawriterqos.user_data.value[2] = entity_id.entityKey[2];
01742
01743 if (!registry.writer_map.insert(std::make_pair(id,
01744 EndpointRegistry::Writer(topic_name, datawriterqos, publisherqos, config_name, trans_info))).second) {
01745 ACE_ERROR_RETURN((LM_ERROR,
01746 ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
01747 ACE_TEXT("Section [endpoint/%C] ignored - duplicate writer.\n"),
01748 endpoint_name.c_str()),
01749 -1);
01750 }
01751 break;
01752 }
01753 }
01754
01755 return 0;
01756 }
01757
01758 void StaticDiscovery::pre_writer(DataWriterImpl* writer)
01759 {
01760 const DDS::Publisher_var pub = writer->get_publisher();
01761 const DDS::DomainParticipant_var part = pub->get_participant();
01762 const DDS::DomainId_t dom = part->get_domain_id();
01763
01764 DDS::DomainParticipantQos partQos;
01765 part->get_qos(partQos);
01766 if (partQos.user_data.value.length() < 6)
01767 return;
01768 const unsigned char* const partId = partQos.user_data.value.get_buffer();
01769
01770 DDS::DataWriterQos qos;
01771 writer->get_qos(qos);
01772 if (qos.user_data.value.length() < 3)
01773 return;
01774 const unsigned char* const dwId = qos.user_data.value.get_buffer();
01775
01776 const EntityId_t entId =
01777 EndpointRegistry::build_id(dwId, ENTITYKIND_USER_WRITER_WITH_KEY);
01778 const RepoId rid = EndpointRegistry::build_id(dom, partId, entId);
01779
01780 const EndpointRegistry::WriterMapType::const_iterator iter =
01781 registry.writer_map.find(rid);
01782
01783 if (iter != registry.writer_map.end() && !iter->second.trans_cfg.empty()) {
01784 TransportRegistry::instance()->bind_config(iter->second.trans_cfg, writer);
01785 }
01786 }
01787
01788 void StaticDiscovery::pre_reader(DataReaderImpl* reader)
01789 {
01790 const DDS::Subscriber_var sub = reader->get_subscriber();
01791 const DDS::DomainParticipant_var part = sub->get_participant();
01792 const DDS::DomainId_t dom = part->get_domain_id();
01793
01794 DDS::DomainParticipantQos partQos;
01795 part->get_qos(partQos);
01796 if (partQos.user_data.value.length() < 6)
01797 return;
01798 const unsigned char* const partId = partQos.user_data.value.get_buffer();
01799
01800 DDS::DataReaderQos qos;
01801 reader->get_qos(qos);
01802 if (qos.user_data.value.length() < 3)
01803 return;
01804 const unsigned char* const drId = qos.user_data.value.get_buffer();
01805
01806 const EntityId_t entId =
01807 EndpointRegistry::build_id(drId, ENTITYKIND_USER_READER_WITH_KEY);
01808 const RepoId rid = EndpointRegistry::build_id(dom, partId, entId);
01809
01810 const EndpointRegistry::ReaderMapType::const_iterator iter =
01811 registry.reader_map.find(rid);
01812
01813 if (iter != registry.reader_map.end() && !iter->second.trans_cfg.empty()) {
01814 TransportRegistry::instance()->bind_config(iter->second.trans_cfg, reader);
01815 }
01816 }
01817
01818 StaticDiscovery_rch StaticDiscovery::instance_(new StaticDiscovery(Discovery::DEFAULT_STATIC));
01819
01820 }
01821 }
01822