Line data Source code
1 : #include <DCPS/DdsDcps_pch.h> // Only the _pch include should start with DCPS/
2 :
3 : #include "StaticDiscovery.h"
4 :
5 : #include "debug.h"
6 : #include "ConfigUtils.h"
7 : #include "DomainParticipantImpl.h"
8 : #include "Marked_Default_Qos.h"
9 : #include "SubscriberImpl.h"
10 : #include "BuiltInTopicUtils.h"
11 : #include "Registered_Data_Types.h"
12 : #include "Qos_Helper.h"
13 : #include "DataWriterImpl.h"
14 : #include "DcpsUpcalls.h"
15 : #include "transport/framework/TransportRegistry.h"
16 : #include "XTypes/TypeAssignability.h"
17 :
18 : #include <ctype.h>
19 :
20 : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
21 :
22 : namespace OpenDDS {
23 : namespace DCPS {
24 :
25 : namespace {
26 : const size_t BYTES_IN_PARTICIPANT = 6;
27 : const size_t HEX_DIGITS_IN_PARTICIPANT = 2 * BYTES_IN_PARTICIPANT;
28 : const size_t BYTES_IN_ENTITY = 3;
29 : const size_t HEX_DIGITS_IN_ENTITY = 2 * BYTES_IN_ENTITY;
30 : const size_t TYPE_NAME_MAX = 128;
31 : }
32 :
33 0 : void EndpointRegistry::match()
34 : {
35 0 : for (WriterMapType::iterator wp = writer_map.begin(), wp_limit = writer_map.end();
36 0 : wp != wp_limit;
37 0 : ++wp) {
38 0 : const GUID_t& writerid = wp->first;
39 0 : Writer& writer = wp->second;
40 0 : for (ReaderMapType::iterator rp = reader_map.begin(), rp_limit = reader_map.end();
41 0 : rp != rp_limit;
42 0 : ++rp) {
43 0 : const GUID_t& readerid = rp->first;
44 0 : Reader& reader = rp->second;
45 :
46 0 : if (StaticDiscGuidDomainEqual()(readerid.guidPrefix, writerid.guidPrefix) &&
47 0 : !StaticDiscGuidPartEqual()(readerid.guidPrefix, writerid.guidPrefix) &&
48 0 : reader.topic_name == writer.topic_name) {
49 : // Different participants, same topic.
50 0 : IncompatibleQosStatus writerStatus = {0, 0, 0, DDS::QosPolicyCountSeq()};
51 0 : IncompatibleQosStatus readerStatus = {0, 0, 0, DDS::QosPolicyCountSeq()};
52 0 : const TransportLocatorSeq& writer_trans_info = writer.trans_info;
53 0 : const TransportLocatorSeq& reader_trans_info = reader.trans_info;
54 0 : const DDS::DataWriterQos& writer_qos = writer.qos;
55 0 : const DDS::DataReaderQos& reader_qos = reader.qos;
56 0 : const DDS::PublisherQos& publisher_qos = writer.publisher_qos;
57 0 : const DDS::SubscriberQos& subscriber_qos = reader.subscriber_qos;
58 :
59 0 : if (compatibleQOS(&writerStatus, &readerStatus, writer_trans_info, reader_trans_info,
60 : &writer_qos, &reader_qos, &publisher_qos, &subscriber_qos)) {
61 0 : switch (reader.qos.reliability.kind) {
62 0 : case DDS::BEST_EFFORT_RELIABILITY_QOS:
63 0 : writer.best_effort_readers.insert(readerid);
64 0 : reader.best_effort_writers.insert(writerid);
65 0 : break;
66 0 : case DDS::RELIABLE_RELIABILITY_QOS:
67 0 : writer.reliable_readers.insert(readerid);
68 0 : reader.reliable_writers.insert(writerid);
69 0 : break;
70 : }
71 : }
72 0 : }
73 : }
74 : }
75 0 : }
76 :
77 0 : StaticEndpointManager::StaticEndpointManager(const GUID_t& participant_id,
78 : ACE_Thread_Mutex& lock,
79 : const EndpointRegistry& registry,
80 0 : StaticParticipant& participant)
81 0 : : lock_(lock)
82 0 : , participant_id_(participant_id)
83 0 : , topic_counter_(0)
84 0 : , registry_(registry)
85 : #ifndef DDS_HAS_MINIMUM_BIT
86 0 : , participant_(participant)
87 : #endif
88 0 : , max_type_lookup_service_reply_period_(0)
89 0 : , type_lookup_service_sequence_number_(0)
90 : {
91 : #ifdef DDS_HAS_MINIMUM_BIT
92 : ACE_UNUSED_ARG(participant);
93 : #endif
94 0 : type_lookup_init(TheServiceParticipant->interceptor());
95 0 : }
96 :
97 0 : StaticEndpointManager::~StaticEndpointManager()
98 : {
99 0 : type_lookup_fini();
100 0 : }
101 :
102 0 : void StaticEndpointManager::init_bit()
103 : {
104 : // Discover all remote publications and subscriptions.
105 :
106 0 : for (EndpointRegistry::WriterMapType::const_iterator pos = registry_.writer_map.begin(),
107 0 : limit = registry_.writer_map.end();
108 0 : pos != limit;
109 0 : ++pos) {
110 0 : const GUID_t& remoteid = pos->first;
111 0 : const EndpointRegistry::Writer& writer = pos->second;
112 :
113 0 : if (!equal_guid_prefixes(participant_id_, remoteid)) {
114 0 : const DDS::BuiltinTopicKey_t key = guid_to_bit_key(remoteid);
115 :
116 : // pos represents a remote.
117 : // Populate data.
118 0 : DDS::PublicationBuiltinTopicData data = DDS::PublicationBuiltinTopicData();
119 :
120 0 : data.key = key;
121 0 : OPENDDS_STRING topic_name = writer.topic_name;
122 0 : data.topic_name = topic_name.c_str();
123 0 : const EndpointRegistry::Topic& topic = registry_.topic_map.find(topic_name)->second;
124 0 : data.type_name = topic.type_name.c_str();
125 0 : data.durability = writer.qos.durability;
126 0 : data.durability_service = writer.qos.durability_service;
127 0 : data.deadline = writer.qos.deadline;
128 0 : data.latency_budget = writer.qos.latency_budget;
129 0 : data.liveliness = writer.qos.liveliness;
130 0 : data.reliability = writer.qos.reliability;
131 0 : data.lifespan = writer.qos.lifespan;
132 0 : data.user_data = writer.qos.user_data;
133 0 : data.ownership = writer.qos.ownership;
134 0 : data.ownership_strength = writer.qos.ownership_strength;
135 0 : data.destination_order = writer.qos.destination_order;
136 0 : data.presentation = writer.publisher_qos.presentation;
137 0 : data.partition = writer.publisher_qos.partition;
138 : // If the TopicQos becomes available, this can be populated.
139 : //data.topic_data = topic_details.qos_.topic_data;
140 0 : data.group_data = writer.publisher_qos.group_data;
141 0 : data.representation = writer.qos.representation;
142 :
143 : #ifndef DDS_HAS_MINIMUM_BIT
144 0 : OpenDDS::DCPS::PublicationBuiltinTopicDataDataReaderImpl* bit = pub_bit();
145 0 : if (bit) { // bit may be null if the DomainParticipant is shutting down
146 0 : bit->store_synthetic_data(data, DDS::NEW_VIEW_STATE);
147 : }
148 : #endif /* DDS_HAS_MINIMUM_BIT */
149 0 : }
150 : }
151 :
152 0 : for (EndpointRegistry::ReaderMapType::const_iterator pos = registry_.reader_map.begin(),
153 0 : limit = registry_.reader_map.end();
154 0 : pos != limit;
155 0 : ++pos) {
156 0 : const GUID_t& remoteid = pos->first;
157 0 : const EndpointRegistry::Reader& reader = pos->second;
158 :
159 0 : if (!equal_guid_prefixes(participant_id_, remoteid)) {
160 0 : const DDS::BuiltinTopicKey_t key = guid_to_bit_key(remoteid);
161 :
162 : // pos represents a remote.
163 : // Populate data.
164 0 : DDS::SubscriptionBuiltinTopicData data = DDS::SubscriptionBuiltinTopicData();
165 :
166 0 : data.key = key;
167 0 : OPENDDS_STRING topic_name = reader.topic_name;
168 0 : data.topic_name = topic_name.c_str();
169 0 : const EndpointRegistry::Topic& topic = registry_.topic_map.find(topic_name)->second;
170 0 : data.type_name = topic.type_name.c_str();
171 0 : data.durability = reader.qos.durability;
172 0 : data.deadline = reader.qos.deadline;
173 0 : data.latency_budget = reader.qos.latency_budget;
174 0 : data.liveliness = reader.qos.liveliness;
175 0 : data.reliability = reader.qos.reliability;
176 0 : data.ownership = reader.qos.ownership;
177 0 : data.destination_order = reader.qos.destination_order;
178 0 : data.user_data = reader.qos.user_data;
179 0 : data.time_based_filter = reader.qos.time_based_filter;
180 0 : data.presentation = reader.subscriber_qos.presentation;
181 0 : data.partition = reader.subscriber_qos.partition;
182 : // // If the TopicQos becomes available, this can be populated.
183 : //data.topic_data = topic_details.qos_.topic_data;
184 0 : data.group_data = reader.subscriber_qos.group_data;
185 0 : data.representation = reader.qos.representation;
186 :
187 : #ifndef DDS_HAS_MINIMUM_BIT
188 0 : OpenDDS::DCPS::SubscriptionBuiltinTopicDataDataReaderImpl* bit = sub_bit();
189 0 : if (bit) { // bit may be null if the DomainParticipant is shutting down
190 0 : bit->store_synthetic_data(data, DDS::NEW_VIEW_STATE);
191 : }
192 : #endif /* DDS_HAS_MINIMUM_BIT */
193 0 : }
194 : }
195 0 : }
196 :
197 0 : void StaticEndpointManager::assign_publication_key(GUID_t& rid,
198 : const GUID_t& /*topicId*/,
199 : const DDS::DataWriterQos& qos)
200 : {
201 0 : if (qos.user_data.value.length() != BYTES_IN_ENTITY) {
202 0 : ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: StaticEndpointManager::assign_publication_key: no user data to identify writer\n")));
203 0 : return;
204 : }
205 :
206 0 : rid.entityId.entityKey[0] = qos.user_data.value[0];
207 0 : rid.entityId.entityKey[1] = qos.user_data.value[1];
208 0 : rid.entityId.entityKey[2] = qos.user_data.value[2];
209 0 : rid.entityId.entityKind = ENTITYKIND_USER_WRITER_WITH_KEY;
210 :
211 0 : if (DCPS_debug_level > 8) {
212 0 : ACE_DEBUG((LM_INFO, "(%P|%t) looking up writer ID %C\n",
213 : LogGuid(rid).c_str()));
214 : }
215 :
216 0 : EndpointRegistry::WriterMapType::const_iterator pos = registry_.writer_map.find(rid);
217 0 : if (pos == registry_.writer_map.end()) {
218 0 : ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: StaticEndpointManager::assign_publication_key: unknown writer: %C\n"), LogGuid(rid).c_str()));
219 0 : return;
220 : }
221 :
222 0 : DDS::DataWriterQos qos2(qos);
223 : // Qos in registry will not have the user data so overwrite.
224 0 : qos2.user_data = pos->second.qos.user_data;
225 :
226 0 : DDS::DataWriterQos qos3(pos->second.qos);
227 :
228 0 : if (qos2 != qos3) {
229 0 : ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: StaticEndpointManager::assign_publication_key: dynamic and static QoS differ\n")));
230 : }
231 0 : }
232 :
233 0 : void StaticEndpointManager::assign_subscription_key(GUID_t& rid,
234 : const GUID_t& /*topicId*/,
235 : const DDS::DataReaderQos& qos)
236 : {
237 0 : if (qos.user_data.value.length() != BYTES_IN_ENTITY) {
238 0 : ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: StaticEndpointManager::assign_subscription_key: no user data to identify reader\n")));
239 0 : return;
240 : }
241 :
242 0 : rid.entityId.entityKey[0] = qos.user_data.value[0];
243 0 : rid.entityId.entityKey[1] = qos.user_data.value[1];
244 0 : rid.entityId.entityKey[2] = qos.user_data.value[2];
245 0 : rid.entityId.entityKind = ENTITYKIND_USER_READER_WITH_KEY;
246 :
247 0 : EndpointRegistry::ReaderMapType::const_iterator pos = registry_.reader_map.find(rid);
248 0 : if (pos == registry_.reader_map.end()) {
249 0 : ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: StaticEndpointManager::assign_subscription_key: unknown reader: %C\n"), LogGuid(rid).c_str()));
250 0 : return;
251 : }
252 :
253 0 : DDS::DataReaderQos qos2(qos);
254 : // Qos in registry will not have the user data so overwrite.
255 0 : qos2.user_data = pos->second.qos.user_data;
256 :
257 0 : DDS::DataReaderQos qos3(pos->second.qos);
258 :
259 0 : if (qos2 != qos3) {
260 0 : ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: StaticEndpointManager::assign_subscription_key: dynamic and static QoS differ\n")));
261 : }
262 0 : }
263 :
264 : bool
265 0 : StaticEndpointManager::update_topic_qos(const GUID_t& /*topicId*/,
266 : const DDS::TopicQos& /*qos*/)
267 : {
268 0 : ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: StaticEndpointManager::update_topic_qos - ")
269 : ACE_TEXT("Not allowed\n")));
270 0 : return false;
271 : }
272 :
273 : bool
274 0 : StaticEndpointManager::update_publication_qos(const GUID_t& /*publicationId*/,
275 : const DDS::DataWriterQos& /*qos*/,
276 : const DDS::PublisherQos& /*publisherQos*/)
277 : {
278 0 : ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: StaticEndpointManager::update_publication_qos - ")
279 : ACE_TEXT("Not allowed\n")));
280 0 : return false;
281 : }
282 :
283 : bool
284 0 : StaticEndpointManager::update_subscription_qos(const GUID_t& /*subscriptionId*/,
285 : const DDS::DataReaderQos& /*qos*/,
286 : const DDS::SubscriberQos& /*subscriberQos*/)
287 : {
288 0 : ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: StaticEndpointManager::update_subscription_qos - ")
289 : ACE_TEXT("Not allowed\n")));
290 0 : return false;
291 : }
292 :
293 : bool
294 0 : StaticEndpointManager::update_subscription_params(const GUID_t& /*subId*/,
295 : const DDS::StringSeq& /*params*/)
296 : {
297 0 : ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: StaticEndpointManager::update_subscription_qos - ")
298 : ACE_TEXT("Not allowed\n")));
299 0 : return false;
300 : }
301 :
302 : bool
303 0 : StaticEndpointManager::disassociate()
304 : {
305 0 : ACE_DEBUG((LM_NOTICE, ACE_TEXT("(%P|%t) StaticEndpointManager::disassociate TODO\n")));
306 : // TODO
307 0 : return false;
308 : }
309 :
310 : DDS::ReturnCode_t
311 0 : StaticEndpointManager::add_publication_i(const GUID_t& writerid,
312 : LocalPublication& pub)
313 : {
314 : /*
315 : Find all matching remote readers.
316 : If the reader is best effort, then associate immediately.
317 : If the reader is reliable (we are reliable by implication), register with the transport to receive notification that the remote reader is up.
318 : */
319 0 : EndpointRegistry::WriterMapType::const_iterator pos = registry_.writer_map.find(writerid);
320 0 : if (pos == registry_.writer_map.end()) {
321 0 : return DDS::RETCODE_ERROR;
322 : }
323 0 : const EndpointRegistry::Writer& writer = pos->second;
324 :
325 0 : for (RepoIdSet::const_iterator pos = writer.best_effort_readers.begin(), limit = writer.best_effort_readers.end();
326 0 : pos != limit;
327 0 : ++pos) {
328 0 : const GUID_t& readerid = *pos;
329 0 : const EndpointRegistry::Reader& reader = registry_.reader_map.find(readerid)->second;
330 :
331 0 : const ReaderAssociation ra =
332 0 : {reader.trans_info, TransportLocator(), 0, readerid, reader.subscriber_qos, reader.qos, "", "", 0, 0, {0, 0}};
333 0 : DataWriterCallbacks_rch pl = pub.publication_.lock();
334 0 : if (pl) {
335 0 : pl->add_association(writerid, ra, true);
336 : }
337 0 : }
338 :
339 0 : for (RepoIdSet::const_iterator pos = writer.reliable_readers.begin(), limit = writer.reliable_readers.end();
340 0 : pos != limit;
341 0 : ++pos) {
342 0 : const GUID_t& readerid = *pos;
343 0 : const EndpointRegistry::Reader& reader = registry_.reader_map.find(readerid)->second;
344 0 : DataWriterCallbacks_rch pl = pub.publication_.lock();
345 0 : if (pl) {
346 0 : pl->register_for_reader(participant_id_, writerid, readerid, reader.trans_info, this);
347 : }
348 0 : }
349 :
350 0 : return DDS::RETCODE_OK;
351 : }
352 :
353 : DDS::ReturnCode_t
354 0 : StaticEndpointManager::remove_publication_i(const GUID_t& writerid, LocalPublication& pub)
355 : {
356 0 : EndpointRegistry::WriterMapType::const_iterator pos = registry_.writer_map.find(writerid);
357 0 : if (pos == registry_.writer_map.end()) {
358 0 : return DDS::RETCODE_ERROR;
359 : }
360 :
361 0 : const EndpointRegistry::Writer& writer = pos->second;
362 :
363 0 : ReaderIdSeq ids;
364 0 : ids.length((CORBA::ULong)writer.reliable_readers.size());
365 0 : CORBA::ULong idx = 0;
366 0 : for (RepoIdSet::const_iterator pos = writer.reliable_readers.begin(), limit = writer.reliable_readers.end();
367 0 : pos != limit;
368 0 : ++pos, ++idx) {
369 0 : const GUID_t& readerid = *pos;
370 0 : ids[idx] = readerid;
371 0 : DataWriterCallbacks_rch pl = pub.publication_.lock();
372 0 : if (pl) {
373 0 : pl->unregister_for_reader(participant_id_, writerid, readerid);
374 : }
375 0 : }
376 :
377 0 : return DDS::RETCODE_OK;
378 0 : }
379 :
380 : DDS::ReturnCode_t
381 0 : StaticEndpointManager::add_subscription_i(const GUID_t& readerid,
382 : LocalSubscription& sub)
383 : {
384 : /*
385 : Find all matching remote writers.
386 : If we (the reader) is best effort, then associate immediately.
387 : If we (the reader) are reliable, then register with the transport to receive notification that the remote writer is up.
388 : */
389 0 : EndpointRegistry::ReaderMapType::const_iterator pos = registry_.reader_map.find(readerid);
390 0 : if (pos == registry_.reader_map.end()) {
391 0 : return DDS::RETCODE_ERROR;
392 : }
393 0 : const EndpointRegistry::Reader& reader = pos->second;
394 :
395 0 : for (RepoIdSet::const_iterator pos = reader.best_effort_writers.begin(), limit = reader.best_effort_writers.end();
396 0 : pos != limit;
397 0 : ++pos) {
398 0 : const GUID_t& writerid = *pos;
399 0 : const EndpointRegistry::Writer& writer = registry_.writer_map.find(writerid)->second;
400 :
401 0 : DDS::OctetSeq type_info;
402 0 : const WriterAssociation wa = {
403 0 : writer.trans_info, TransportLocator(), 0, writerid, writer.publisher_qos, writer.qos, type_info, {0, 0}
404 0 : };
405 0 : DataReaderCallbacks_rch sl = sub.subscription_.lock();
406 0 : if (sl) {
407 0 : sl->add_association(readerid, wa, false);
408 : }
409 0 : }
410 :
411 0 : for (RepoIdSet::const_iterator pos = reader.reliable_writers.begin(), limit = reader.reliable_writers.end();
412 0 : pos != limit;
413 0 : ++pos) {
414 0 : const GUID_t& writerid = *pos;
415 0 : const EndpointRegistry::Writer& writer = registry_.writer_map.find(writerid)->second;
416 0 : DataReaderCallbacks_rch sl = sub.subscription_.lock();
417 0 : if (sl) {
418 0 : sl->register_for_writer(participant_id_, readerid, writerid, writer.trans_info, this);
419 : }
420 0 : }
421 :
422 0 : return DDS::RETCODE_OK;
423 : }
424 :
425 0 : DDS::ReturnCode_t StaticEndpointManager::remove_subscription_i(
426 : const GUID_t& readerid, LocalSubscription& sub)
427 : {
428 0 : EndpointRegistry::ReaderMapType::const_iterator pos = registry_.reader_map.find(readerid);
429 0 : if (pos == registry_.reader_map.end()) {
430 0 : return DDS::RETCODE_ERROR;
431 : }
432 :
433 0 : const EndpointRegistry::Reader& reader = pos->second;
434 :
435 0 : WriterIdSeq ids;
436 0 : ids.length((CORBA::ULong)reader.reliable_writers.size());
437 0 : CORBA::ULong idx = 0;
438 0 : for (RepoIdSet::const_iterator pos = reader.reliable_writers.begin(), limit = reader.reliable_writers.end();
439 0 : pos != limit;
440 0 : ++pos, ++idx) {
441 0 : const GUID_t& writerid = *pos;
442 0 : ids[idx] = writerid;
443 0 : DataReaderCallbacks_rch sl = sub.subscription_.lock();
444 0 : if (sl) {
445 0 : sl->unregister_for_writer(participant_id_, readerid, writerid);
446 : }
447 0 : }
448 :
449 0 : return DDS::RETCODE_OK;
450 0 : }
451 :
452 : bool
453 0 : StaticEndpointManager::is_expectant_opendds(const GUID_t& /*endpoint*/) const
454 : {
455 : // We can't propagate associated writers via SEDP announcments if we're
456 : // using static discovery, so nobody ought to be "expecting" them
457 0 : return false;
458 : }
459 :
460 : bool
461 0 : StaticEndpointManager::shutting_down() const
462 : {
463 0 : ACE_DEBUG((LM_NOTICE, ACE_TEXT("(%P|%t) StaticEndpointManager::shutting_down TODO\n")));
464 : // TODO
465 0 : return false;
466 : }
467 :
468 : void
469 0 : StaticEndpointManager::populate_transport_locator_sequence(TransportLocatorSeq*& /*tls*/,
470 : DiscoveredSubscriptionIter& /*iter*/,
471 : const GUID_t& /*reader*/)
472 : {
473 0 : ACE_DEBUG((LM_NOTICE, ACE_TEXT("(%P|%t) StaticEndpointManager::populate_transport_locator_sequence TODO\n")));
474 : // TODO
475 0 : }
476 :
477 : void
478 0 : StaticEndpointManager::populate_transport_locator_sequence(TransportLocatorSeq*& /*tls*/,
479 : DiscoveredPublicationIter& /*iter*/,
480 : const GUID_t& /*reader*/)
481 : {
482 0 : ACE_DEBUG((LM_NOTICE, ACE_TEXT("(%P|%t) StaticEndpointManager::populate_transport_locator_sequence TODO\n")));
483 : // TODO
484 0 : }
485 :
486 : void
487 0 : StaticEndpointManager::reader_exists(const GUID_t& readerid, const GUID_t& writerid)
488 : {
489 0 : ACE_GUARD(ACE_Thread_Mutex, g, lock_);
490 0 : LocalPublicationMap::const_iterator lp_pos = local_publications_.find(writerid);
491 0 : EndpointRegistry::ReaderMapType::const_iterator reader_pos = registry_.reader_map.find(readerid);
492 0 : if (lp_pos != local_publications_.end() &&
493 0 : reader_pos != registry_.reader_map.end()) {
494 0 : DataWriterCallbacks_rch dwr = lp_pos->second.publication_.lock();
495 0 : if (dwr) {
496 0 : const ReaderAssociation ra =
497 0 : {reader_pos->second.trans_info, TransportLocator(), 0, readerid, reader_pos->second.subscriber_qos, reader_pos->second.qos,
498 0 : "", "", DDS::StringSeq(), DDS::OctetSeq(), {0, 0}};
499 0 : dwr->add_association(writerid, ra, true);
500 0 : }
501 0 : }
502 0 : }
503 :
504 : void
505 0 : StaticEndpointManager::reader_does_not_exist(const GUID_t& readerid, const GUID_t& writerid)
506 : {
507 0 : ACE_GUARD(ACE_Thread_Mutex, g, lock_);
508 0 : LocalPublicationMap::const_iterator lp_pos = local_publications_.find(writerid);
509 0 : EndpointRegistry::ReaderMapType::const_iterator reader_pos = registry_.reader_map.find(readerid);
510 0 : if (lp_pos != local_publications_.end() &&
511 0 : reader_pos != registry_.reader_map.end()) {
512 0 : DataWriterCallbacks_rch dwr = lp_pos->second.publication_.lock();
513 0 : if (dwr) {
514 0 : ReaderIdSeq ids;
515 0 : ids.length(1);
516 0 : ids[0] = readerid;
517 0 : dwr->remove_associations(ids, true);
518 0 : }
519 0 : }
520 0 : }
521 :
522 : void
523 0 : StaticEndpointManager::writer_exists(const GUID_t& writerid, const GUID_t& readerid)
524 : {
525 0 : ACE_GUARD(ACE_Thread_Mutex, g, lock_);
526 0 : LocalSubscriptionMap::const_iterator ls_pos = local_subscriptions_.find(readerid);
527 0 : EndpointRegistry::WriterMapType::const_iterator writer_pos = registry_.writer_map.find(writerid);
528 0 : if (ls_pos != local_subscriptions_.end() &&
529 0 : writer_pos != registry_.writer_map.end()) {
530 0 : DataReaderCallbacks_rch drr = ls_pos->second.subscription_.lock();
531 0 : if (drr) {
532 0 : const WriterAssociation wa =
533 0 : {writer_pos->second.trans_info, TransportLocator(), 0, writerid, writer_pos->second.publisher_qos, writer_pos->second.qos, DDS::OctetSeq(), {0,0}};
534 0 : drr->add_association(readerid, wa, false);
535 0 : }
536 0 : }
537 0 : }
538 :
539 : void
540 0 : StaticEndpointManager::writer_does_not_exist(const GUID_t& writerid, const GUID_t& readerid)
541 : {
542 0 : ACE_GUARD(ACE_Thread_Mutex, g, lock_);
543 0 : LocalSubscriptionMap::const_iterator ls_pos = local_subscriptions_.find(readerid);
544 0 : EndpointRegistry::WriterMapType::const_iterator writer_pos = registry_.writer_map.find(writerid);
545 0 : if (ls_pos != local_subscriptions_.end() &&
546 0 : writer_pos != registry_.writer_map.end()) {
547 0 : DataReaderCallbacks_rch drr = ls_pos->second.subscription_.lock();
548 0 : if (drr) {
549 0 : WriterIdSeq ids;
550 0 : ids.length(1);
551 0 : ids[0] = writerid;
552 0 : drr->remove_associations(ids, true);
553 0 : }
554 0 : }
555 0 : }
556 :
557 0 : void StaticEndpointManager::cleanup_type_lookup_data(const GuidPrefix_t& /*guid_prefix*/,
558 : const XTypes::TypeIdentifier& /*ti*/,
559 : bool /*secure*/)
560 : {
561 : // Do nothing.
562 0 : }
563 :
564 : #ifndef DDS_HAS_MINIMUM_BIT
565 : OpenDDS::DCPS::PublicationBuiltinTopicDataDataReaderImpl*
566 0 : StaticEndpointManager::pub_bit()
567 : {
568 0 : DDS::Subscriber_var sub = participant_.bit_subscriber();
569 0 : if (!sub.in())
570 0 : return 0;
571 :
572 0 : DDS::DataReader_var d = sub->lookup_datareader(BUILT_IN_PUBLICATION_TOPIC);
573 0 : return dynamic_cast<OpenDDS::DCPS::PublicationBuiltinTopicDataDataReaderImpl*>(d.in());
574 0 : }
575 :
576 : OpenDDS::DCPS::SubscriptionBuiltinTopicDataDataReaderImpl*
577 0 : StaticEndpointManager::sub_bit()
578 : {
579 0 : DDS::Subscriber_var sub = participant_.bit_subscriber();
580 0 : if (!sub.in())
581 0 : return 0;
582 :
583 0 : DDS::DataReader_var d = sub->lookup_datareader(BUILT_IN_SUBSCRIPTION_TOPIC);
584 0 : return dynamic_cast<OpenDDS::DCPS::SubscriptionBuiltinTopicDataDataReaderImpl*>(d.in());
585 0 : }
586 : #endif /* DDS_HAS_MINIMUM_BIT */
587 :
588 0 : void StaticEndpointManager::type_lookup_init(ReactorInterceptor_rch reactor_interceptor)
589 : {
590 0 : if (!type_lookup_reply_deadline_processor_) {
591 : type_lookup_reply_deadline_processor_ =
592 0 : DCPS::make_rch<StaticEndpointManagerSporadic>(TheServiceParticipant->time_source(), reactor_interceptor,
593 0 : rchandle_from(this), &StaticEndpointManager::remove_expired_endpoints);
594 : }
595 0 : }
596 :
597 0 : void StaticEndpointManager::type_lookup_fini()
598 : {
599 0 : if (type_lookup_reply_deadline_processor_) {
600 0 : type_lookup_reply_deadline_processor_->cancel();
601 0 : type_lookup_reply_deadline_processor_.reset();
602 : }
603 0 : }
604 :
605 0 : void StaticEndpointManager::type_lookup_service(
606 : const XTypes::TypeLookupService_rch type_lookup_service)
607 : {
608 0 : type_lookup_service_ = type_lookup_service;
609 0 : }
610 :
611 0 : void StaticEndpointManager::purge_dead_topic(const String& topic_name)
612 : {
613 0 : TopicDetailsMap::iterator top_it = topics_.find(topic_name);
614 0 : topic_names_.erase(top_it->second.topic_id());
615 0 : topics_.erase(top_it);
616 0 : }
617 :
618 0 : void StaticEndpointManager::ignore(const GUID_t& to_ignore)
619 : {
620 : // Locked prior to call from Spdp.
621 0 : ignored_guids_.insert(to_ignore);
622 : {
623 0 : const DiscoveredPublicationIter iter = discovered_publications_.find(to_ignore);
624 0 : if (iter != discovered_publications_.end()) {
625 : // clean up tracking info
626 0 : const String topic_name = iter->second.get_topic_name();
627 0 : TopicDetails& td = topics_[topic_name];
628 0 : td.remove_discovered_publication(to_ignore);
629 0 : remove_from_bit(iter->second);
630 0 : discovered_publications_.erase(iter);
631 : // break associations
632 0 : match_endpoints(to_ignore, td, true /*remove*/);
633 0 : if (td.is_dead()) {
634 0 : purge_dead_topic(topic_name);
635 : }
636 0 : return;
637 0 : }
638 : }
639 : {
640 : const DiscoveredSubscriptionIter iter =
641 0 : discovered_subscriptions_.find(to_ignore);
642 0 : if (iter != discovered_subscriptions_.end()) {
643 : // clean up tracking info
644 0 : const String topic_name = iter->second.get_topic_name();
645 0 : TopicDetails& td = topics_[topic_name];
646 0 : td.remove_discovered_publication(to_ignore);
647 0 : remove_from_bit(iter->second);
648 0 : discovered_subscriptions_.erase(iter);
649 : // break associations
650 0 : match_endpoints(to_ignore, td, true /*remove*/);
651 0 : if (td.is_dead()) {
652 0 : purge_dead_topic(topic_name);
653 : }
654 0 : return;
655 0 : }
656 : }
657 : {
658 : const OPENDDS_MAP_CMP(GUID_t, OPENDDS_STRING, GUID_tKeyLessThan)::iterator
659 0 : iter = topic_names_.find(to_ignore);
660 0 : if (iter != topic_names_.end()) {
661 0 : ignored_topics_.insert(iter->second);
662 : // Remove all publications and subscriptions on this topic
663 0 : TopicDetails& td = topics_[iter->second];
664 : {
665 0 : const RepoIdSet ids = td.discovered_publications();
666 0 : for (RepoIdSet::const_iterator ep = ids.begin(); ep!= ids.end(); ++ep) {
667 0 : match_endpoints(*ep, td, true /*remove*/);
668 0 : td.remove_discovered_publication(*ep);
669 : // TODO: Do we need to remove from discovered_subscriptions?
670 0 : if (shutting_down()) { return; }
671 : }
672 0 : }
673 : {
674 0 : const RepoIdSet ids = td.discovered_subscriptions();
675 0 : for (RepoIdSet::const_iterator ep = ids.begin(); ep!= ids.end(); ++ep) {
676 0 : match_endpoints(*ep, td, true /*remove*/);
677 0 : td.remove_discovered_subscription(*ep);
678 : // TODO: Do we need to remove from discovered_publications?
679 0 : if (shutting_down()) { return; }
680 : }
681 0 : }
682 0 : if (td.is_dead()) {
683 0 : purge_dead_topic(iter->second);
684 : }
685 : }
686 : }
687 : }
688 :
689 0 : bool StaticEndpointManager::ignoring(const GUID_t& guid) const
690 : {
691 0 : return ignored_guids_.count(guid);
692 : }
693 0 : bool StaticEndpointManager::ignoring(const char* topic_name) const
694 : {
695 0 : return ignored_topics_.count(topic_name);
696 : }
697 :
698 0 : TopicStatus StaticEndpointManager::assert_topic(
699 : GUID_t& topicId, const char* topicName,
700 : const char* dataTypeName, const DDS::TopicQos& qos,
701 : bool hasDcpsKey, TopicCallbacks* topic_callbacks)
702 : {
703 0 : ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, INTERNAL_ERROR);
704 0 : TopicDetailsMap::iterator iter = topics_.find(topicName);
705 0 : if (iter != topics_.end()) {
706 0 : if (iter->second.local_is_set() && iter->second.local_data_type_name() != dataTypeName) {
707 0 : return CONFLICTING_TYPENAME;
708 : }
709 0 : topicId = iter->second.topic_id();
710 0 : iter->second.set_local(dataTypeName, qos, hasDcpsKey, topic_callbacks);
711 0 : return FOUND;
712 : }
713 :
714 0 : TopicDetails& td = topics_[topicName];
715 0 : topicId = make_topic_guid();
716 0 : td.init(topicName, topicId);
717 0 : topic_names_[topicId] = topicName;
718 0 : td.set_local(dataTypeName, qos, hasDcpsKey, topic_callbacks);
719 :
720 0 : return CREATED;
721 0 : }
722 :
723 0 : TopicStatus StaticEndpointManager::find_topic(
724 : const char* topicName,
725 : CORBA::String_out dataTypeName,
726 : DDS::TopicQos_out qos,
727 : GUID_t& topicId)
728 : {
729 0 : ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, INTERNAL_ERROR);
730 0 : TopicDetailsMap::const_iterator iter = topics_.find(topicName);
731 0 : if (iter == topics_.end()) {
732 0 : return NOT_FOUND;
733 : }
734 :
735 0 : const TopicDetails& td = iter->second;
736 :
737 0 : dataTypeName = td.local_data_type_name().c_str();
738 0 : qos = new DDS::TopicQos(td.local_qos());
739 0 : topicId = td.topic_id();
740 0 : return FOUND;
741 0 : }
742 :
743 0 : TopicStatus StaticEndpointManager::remove_topic(const GUID_t& topicId)
744 : {
745 0 : ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, INTERNAL_ERROR);
746 0 : TopicNameMap::iterator name_iter = topic_names_.find(topicId);
747 0 : if (name_iter == topic_names_.end()) {
748 0 : return NOT_FOUND;
749 : }
750 0 : const String& name = name_iter->second;
751 0 : TopicDetails& td = topics_[name];
752 0 : td.unset_local();
753 0 : if (td.is_dead()) {
754 0 : purge_dead_topic(name);
755 : }
756 :
757 0 : return REMOVED;
758 0 : }
759 :
760 0 : GUID_t StaticEndpointManager::add_publication(
761 : const GUID_t& topicId,
762 : DataWriterCallbacks_rch publication,
763 : const DDS::DataWriterQos& qos,
764 : const TransportLocatorSeq& transInfo,
765 : const DDS::PublisherQos& publisherQos,
766 : const XTypes::TypeInformation& type_info)
767 : {
768 0 : ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, GUID_t());
769 :
770 0 : GUID_t rid = participant_id_;
771 0 : assign_publication_key(rid, topicId, qos);
772 0 : LocalPublication& pb = local_publications_[rid];
773 0 : pb.topic_id_ = topicId;
774 0 : pb.publication_ = publication;
775 0 : pb.qos_ = qos;
776 0 : pb.trans_info_ = transInfo;
777 0 : pb.publisher_qos_ = publisherQos;
778 0 : pb.type_info_ = type_info;
779 0 : const OPENDDS_STRING& topic_name = topic_names_[topicId];
780 :
781 0 : TopicDetails& td = topics_[topic_name];
782 0 : td.add_local_publication(rid);
783 :
784 0 : if (DDS::RETCODE_OK != add_publication_i(rid, pb)) {
785 0 : return GUID_t();
786 : }
787 :
788 0 : if (DDS::RETCODE_OK != write_publication_data(rid, pb)) {
789 0 : return GUID_t();
790 : }
791 :
792 0 : if (DCPS_debug_level > 3) {
793 0 : ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) StaticEndpointManager::add_publication - ")
794 : ACE_TEXT("calling match_endpoints\n")));
795 : }
796 0 : match_endpoints(rid, td);
797 :
798 0 : return rid;
799 0 : }
800 :
801 0 : void StaticEndpointManager::remove_publication(const GUID_t& publicationId)
802 : {
803 0 : ACE_GUARD(ACE_Thread_Mutex, g, lock_);
804 0 : LocalPublicationIter iter = local_publications_.find(publicationId);
805 0 : if (iter != local_publications_.end()) {
806 0 : if (DDS::RETCODE_OK == remove_publication_i(publicationId, iter->second)) {
807 0 : OPENDDS_STRING topic_name = topic_names_[iter->second.topic_id_];
808 0 : local_publications_.erase(publicationId);
809 0 : TopicDetailsMap::iterator top_it = topics_.find(topic_name);
810 0 : if (top_it != topics_.end()) {
811 0 : match_endpoints(publicationId, top_it->second, true /*remove*/);
812 0 : top_it->second.remove_local_publication(publicationId);
813 : // Local, no need to check for dead topic.
814 : }
815 0 : } else {
816 0 : ACE_ERROR((LM_ERROR,
817 : ACE_TEXT("(%P|%t) ERROR: StaticEndpointManager::remove_publication - ")
818 : ACE_TEXT("Failed to publish dispose msg\n")));
819 : }
820 : }
821 0 : }
822 :
823 0 : void StaticEndpointManager::update_publication_locators(
824 : const GUID_t& publicationId, const TransportLocatorSeq& transInfo)
825 : {
826 0 : ACE_GUARD(ACE_Thread_Mutex, g, lock_);
827 0 : LocalPublicationIter iter = local_publications_.find(publicationId);
828 0 : if (iter != local_publications_.end()) {
829 0 : if (DCPS_debug_level > 3) {
830 0 : ACE_DEBUG((LM_INFO,
831 : ACE_TEXT("(%P|%t) StaticEndpointManager::update_publication_locators - updating locators for %C\n"),
832 : LogGuid(publicationId).c_str()));
833 : }
834 0 : iter->second.trans_info_ = transInfo;
835 0 : write_publication_data(publicationId, iter->second);
836 : }
837 0 : }
838 :
839 0 : GUID_t StaticEndpointManager::add_subscription(
840 : const GUID_t& topicId,
841 : DataReaderCallbacks_rch subscription,
842 : const DDS::DataReaderQos& qos,
843 : const TransportLocatorSeq& transInfo,
844 : const DDS::SubscriberQos& subscriberQos,
845 : const char* filterClassName,
846 : const char* filterExpr,
847 : const DDS::StringSeq& params,
848 : const XTypes::TypeInformation& type_info)
849 : {
850 0 : ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, GUID_t());
851 :
852 0 : GUID_t rid = participant_id_;
853 0 : assign_subscription_key(rid, topicId, qos);
854 0 : LocalSubscription& sb = local_subscriptions_[rid];
855 0 : sb.topic_id_ = topicId;
856 0 : sb.subscription_ = subscription;
857 0 : sb.qos_ = qos;
858 0 : sb.trans_info_ = transInfo;
859 0 : sb.subscriber_qos_ = subscriberQos;
860 0 : sb.filterProperties.filterClassName = filterClassName;
861 0 : sb.filterProperties.filterExpression = filterExpr;
862 0 : sb.filterProperties.expressionParameters = params;
863 0 : sb.type_info_ = type_info;
864 0 : const OPENDDS_STRING& topic_name = topic_names_[topicId];
865 :
866 0 : TopicDetails& td = topics_[topic_name];
867 0 : td.add_local_subscription(rid);
868 :
869 0 : if (DDS::RETCODE_OK != add_subscription_i(rid, sb)) {
870 0 : return GUID_t();
871 : }
872 :
873 0 : if (DDS::RETCODE_OK != write_subscription_data(rid, sb)) {
874 0 : return GUID_t();
875 : }
876 :
877 0 : if (DCPS_debug_level > 3) {
878 0 : ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) StaticEndpointManager::add_subscription - ")
879 : ACE_TEXT("calling match_endpoints\n")));
880 : }
881 0 : match_endpoints(rid, td);
882 :
883 0 : return rid;
884 0 : }
885 :
886 0 : void StaticEndpointManager::remove_subscription(const GUID_t& subscriptionId)
887 : {
888 0 : ACE_GUARD(ACE_Thread_Mutex, g, lock_);
889 0 : LocalSubscriptionIter iter = local_subscriptions_.find(subscriptionId);
890 0 : if (iter != local_subscriptions_.end()) {
891 0 : if (DDS::RETCODE_OK == remove_subscription_i(subscriptionId, iter->second)) {
892 0 : OPENDDS_STRING topic_name = topic_names_[iter->second.topic_id_];
893 0 : local_subscriptions_.erase(subscriptionId);
894 0 : TopicDetailsMap::iterator top_it = topics_.find(topic_name);
895 0 : if (top_it != topics_.end()) {
896 0 : match_endpoints(subscriptionId, top_it->second, true /*remove*/);
897 0 : top_it->second.remove_local_subscription(subscriptionId);
898 : // Local, no need to check for dead topic.
899 : }
900 0 : } else {
901 0 : ACE_ERROR((LM_ERROR,
902 : ACE_TEXT("(%P|%t) ERROR: StaticEndpointManager::remove_subscription - ")
903 : ACE_TEXT("Failed to publish dispose msg\n")));
904 : }
905 : }
906 0 : }
907 :
908 0 : void StaticEndpointManager::update_subscription_locators(
909 : const GUID_t& subscriptionId,
910 : const TransportLocatorSeq& transInfo)
911 : {
912 0 : ACE_GUARD(ACE_Thread_Mutex, g, lock_);
913 0 : LocalSubscriptionIter iter = local_subscriptions_.find(subscriptionId);
914 0 : if (iter != local_subscriptions_.end()) {
915 0 : if (DCPS_debug_level > 3) {
916 0 : ACE_DEBUG((LM_INFO,
917 : ACE_TEXT("(%P|%t) StaticEndpointManager::update_subscription_locators updating locators for %C\n"),
918 : LogGuid(subscriptionId).c_str()));
919 : }
920 0 : iter->second.trans_info_ = transInfo;
921 0 : write_subscription_data(subscriptionId, iter->second);
922 : }
923 0 : }
924 :
925 : // TODO: This is perhaps too generic since the context probably has the details this function computes.
926 0 : void StaticEndpointManager::match_endpoints(
927 : GUID_t repoId, const TopicDetails& td, bool remove)
928 : {
929 0 : if (DCPS_debug_level >= 4) {
930 0 : ACE_DEBUG((LM_DEBUG, "(%P|%t) StaticEndpointManager::match_endpoints %C%C\n",
931 : remove ? "remove " : "", LogGuid(repoId).c_str()));
932 : }
933 :
934 0 : const bool reader = GuidConverter(repoId).isReader();
935 : // Copy the endpoint set - lock can be released in match()
936 0 : RepoIdSet local_endpoints;
937 0 : RepoIdSet discovered_endpoints;
938 0 : if (reader) {
939 0 : local_endpoints = td.local_publications();
940 0 : discovered_endpoints = td.discovered_publications();
941 : } else {
942 0 : local_endpoints = td.local_subscriptions();
943 0 : discovered_endpoints = td.discovered_subscriptions();
944 : }
945 :
946 0 : const bool is_remote = !equal_guid_prefixes(repoId, participant_id_);
947 0 : if (is_remote && local_endpoints.empty()) {
948 : // Nothing to match.
949 0 : return;
950 : }
951 :
952 0 : for (RepoIdSet::const_iterator iter = local_endpoints.begin();
953 0 : iter != local_endpoints.end(); ++iter) {
954 : // check to make sure it's a Reader/Writer or Writer/Reader match
955 0 : if (GuidConverter(*iter).isReader() != reader) {
956 0 : if (remove) {
957 0 : remove_assoc(*iter, repoId);
958 : } else {
959 0 : match(reader ? *iter : repoId, reader ? repoId : *iter);
960 : }
961 : }
962 : }
963 :
964 : // Remote/remote matches are a waste of time
965 0 : if (is_remote) {
966 0 : return;
967 : }
968 :
969 0 : for (RepoIdSet::const_iterator iter = discovered_endpoints.begin();
970 0 : iter != discovered_endpoints.end(); ++iter) {
971 : // check to make sure it's a Reader/Writer or Writer/Reader match
972 0 : if (GuidConverter(*iter).isReader() != reader) {
973 0 : if (remove) {
974 0 : remove_assoc(*iter, repoId);
975 : } else {
976 0 : match(reader ? *iter : repoId, reader ? repoId : *iter);
977 : }
978 : }
979 : }
980 0 : }
981 :
982 0 : void StaticEndpointManager::remove_assoc(const GUID_t& remove_from, const GUID_t& removing)
983 : {
984 0 : if (GuidConverter(remove_from).isReader()) {
985 0 : const LocalSubscriptionIter lsi = local_subscriptions_.find(remove_from);
986 0 : if (lsi != local_subscriptions_.end()) {
987 0 : lsi->second.matched_endpoints_.erase(removing);
988 0 : const DiscoveredPublicationIter dpi = discovered_publications_.find(removing);
989 0 : if (dpi != discovered_publications_.end()) {
990 0 : dpi->second.matched_endpoints_.erase(remove_from);
991 : }
992 0 : WriterIdSeq writer_seq(1);
993 0 : writer_seq.length(1);
994 0 : writer_seq[0] = removing;
995 0 : const size_t count = lsi->second.remote_expectant_opendds_associations_.erase(removing);
996 0 : DataReaderCallbacks_rch drr = lsi->second.subscription_.lock();
997 0 : if (drr) {
998 0 : drr->remove_associations(writer_seq, false /*notify_lost*/);
999 : }
1000 0 : remove_assoc_i(remove_from, lsi->second, removing);
1001 : // Update writer
1002 0 : if (count) {
1003 0 : write_subscription_data(remove_from, lsi->second);
1004 : }
1005 0 : }
1006 :
1007 : } else {
1008 0 : const LocalPublicationIter lpi = local_publications_.find(remove_from);
1009 0 : if (lpi != local_publications_.end()) {
1010 0 : lpi->second.matched_endpoints_.erase(removing);
1011 0 : const DiscoveredSubscriptionIter dsi = discovered_subscriptions_.find(removing);
1012 0 : if (dsi != discovered_subscriptions_.end()) {
1013 0 : dsi->second.matched_endpoints_.erase(remove_from);
1014 : }
1015 0 : ReaderIdSeq reader_seq(1);
1016 0 : reader_seq.length(1);
1017 0 : reader_seq[0] = removing;
1018 0 : lpi->second.remote_expectant_opendds_associations_.erase(removing);
1019 0 : DataWriterCallbacks_rch dwr = lpi->second.publication_.lock();
1020 0 : if (dwr) {
1021 0 : dwr->remove_associations(reader_seq, false /*notify_lost*/);
1022 : }
1023 0 : remove_assoc_i(remove_from, lpi->second, removing);
1024 0 : }
1025 : }
1026 0 : }
1027 :
1028 0 : void StaticEndpointManager::match(const GUID_t& writer, const GUID_t& reader)
1029 : {
1030 0 : if (DCPS_debug_level >= 4) {
1031 0 : ACE_DEBUG((LM_DEBUG, "(%P|%t) StaticEndpointManager::match: w: %C r: %C\n",
1032 : LogGuid(writer).c_str(), LogGuid(reader).c_str()));
1033 : }
1034 :
1035 0 : match_continue(writer, reader);
1036 0 : }
1037 :
1038 0 : void StaticEndpointManager::remove_expired_endpoints(
1039 : const MonotonicTimePoint& /*now*/)
1040 : {
1041 0 : ACE_GUARD(ACE_Thread_Mutex, g, lock_);
1042 0 : const MonotonicTimePoint now = MonotonicTimePoint::now();
1043 :
1044 : // Clean up internal data used by getTypeDependencies
1045 0 : for (OrigSeqNumberMap::iterator it = orig_seq_numbers_.begin(); it != orig_seq_numbers_.end();) {
1046 0 : if (now - it->second.time_started >= max_type_lookup_service_reply_period_) {
1047 0 : if (DCPS_debug_level >= 4) {
1048 0 : ACE_DEBUG((LM_DEBUG, "(%P|%t) StaticEndpointManager::remove_expired_endpoints: "
1049 : "clean up type lookup data for %C\n",
1050 : LogGuid(it->second.participant).c_str()));
1051 : }
1052 0 : cleanup_type_lookup_data(it->second.participant, it->second.type_id, it->second.secure);
1053 0 : orig_seq_numbers_.erase(it++);
1054 : } else {
1055 0 : ++it;
1056 : }
1057 : }
1058 0 : }
1059 :
1060 0 : void StaticEndpointManager::match_continue(const GUID_t& writer, const GUID_t& reader)
1061 : {
1062 0 : if (DCPS_debug_level >= 4) {
1063 0 : ACE_DEBUG((LM_DEBUG, "(%P|%t) StaticEndpointManager::match_continue: w: %C r: %C\n",
1064 : LogGuid(writer).c_str(), LogGuid(reader).c_str()));
1065 : }
1066 :
1067 : // 0. For discovered endpoints, we'll have the QoS info in the form of the
1068 : // publication or subscription BIT data which doesn't use the same structures
1069 : // for QoS. In those cases we can copy the individual QoS policies to temp
1070 : // QoS structs:
1071 0 : DDS::DataWriterQos tempDwQos;
1072 0 : DDS::PublisherQos tempPubQos;
1073 0 : DDS::DataReaderQos tempDrQos;
1074 0 : DDS::SubscriberQos tempSubQos;
1075 0 : ContentFilterProperty_t tempCfp;
1076 :
1077 0 : DiscoveredPublicationIter dpi = discovered_publications_.find(writer);
1078 0 : DiscoveredSubscriptionIter dsi = discovered_subscriptions_.find(reader);
1079 0 : if (dpi != discovered_publications_.end() && dsi != discovered_subscriptions_.end()) {
1080 : // This is a discovered/discovered match, nothing for us to do
1081 0 : return;
1082 : }
1083 :
1084 : // 1. Collect details about the writer, which may be local or discovered
1085 0 : const DDS::DataWriterQos* dwQos = 0;
1086 0 : const DDS::PublisherQos* pubQos = 0;
1087 0 : TransportLocatorSeq* wTls = 0;
1088 0 : ACE_CDR::ULong wTransportContext = 0;
1089 0 : XTypes::TypeInformation* writer_type_info = 0;
1090 0 : OPENDDS_STRING topic_name;
1091 : MonotonicTime_t writer_participant_discovered_at;
1092 :
1093 0 : const LocalPublicationIter lpi = local_publications_.find(writer);
1094 0 : bool writer_local = false, already_matched = false;
1095 0 : if (lpi != local_publications_.end()) {
1096 0 : writer_local = true;
1097 0 : dwQos = &lpi->second.qos_;
1098 0 : pubQos = &lpi->second.publisher_qos_;
1099 0 : wTls = &lpi->second.trans_info_;
1100 0 : wTransportContext = lpi->second.transport_context_;
1101 0 : already_matched = lpi->second.matched_endpoints_.count(reader);
1102 0 : writer_type_info = &lpi->second.type_info_;
1103 0 : topic_name = topic_names_[lpi->second.topic_id_];
1104 0 : writer_participant_discovered_at = lpi->second.participant_discovered_at_;
1105 0 : } else if (dpi != discovered_publications_.end()) {
1106 0 : wTls = &dpi->second.writer_data_.writerProxy.allLocators;
1107 0 : wTransportContext = dpi->second.transport_context_;
1108 0 : writer_type_info = &dpi->second.type_info_;
1109 0 : topic_name = dpi->second.get_topic_name();
1110 0 : writer_participant_discovered_at = dpi->second.participant_discovered_at_;
1111 :
1112 : const DDS::PublicationBuiltinTopicData& bit =
1113 0 : dpi->second.writer_data_.ddsPublicationData;
1114 0 : tempDwQos.durability = bit.durability;
1115 0 : tempDwQos.durability_service = bit.durability_service;
1116 0 : tempDwQos.deadline = bit.deadline;
1117 0 : tempDwQos.latency_budget = bit.latency_budget;
1118 0 : tempDwQos.liveliness = bit.liveliness;
1119 0 : tempDwQos.reliability = bit.reliability;
1120 0 : tempDwQos.destination_order = bit.destination_order;
1121 0 : tempDwQos.history = TheServiceParticipant->initial_HistoryQosPolicy();
1122 0 : tempDwQos.resource_limits =
1123 0 : TheServiceParticipant->initial_ResourceLimitsQosPolicy();
1124 0 : tempDwQos.transport_priority =
1125 0 : TheServiceParticipant->initial_TransportPriorityQosPolicy();
1126 0 : tempDwQos.lifespan = bit.lifespan;
1127 0 : tempDwQos.user_data = bit.user_data;
1128 0 : tempDwQos.ownership = bit.ownership;
1129 0 : tempDwQos.ownership_strength = bit.ownership_strength;
1130 0 : tempDwQos.writer_data_lifecycle =
1131 0 : TheServiceParticipant->initial_WriterDataLifecycleQosPolicy();
1132 0 : tempDwQos.representation = bit.representation;
1133 0 : dwQos = &tempDwQos;
1134 :
1135 0 : tempPubQos.presentation = bit.presentation;
1136 0 : tempPubQos.partition = bit.partition;
1137 0 : tempPubQos.group_data = bit.group_data;
1138 0 : tempPubQos.entity_factory =
1139 0 : TheServiceParticipant->initial_EntityFactoryQosPolicy();
1140 0 : pubQos = &tempPubQos;
1141 :
1142 0 : populate_transport_locator_sequence(wTls, dpi, writer);
1143 : } else {
1144 0 : return; // Possible and ok, since lock is released
1145 : }
1146 :
1147 : // 2. Collect details about the reader, which may be local or discovered
1148 0 : const DDS::DataReaderQos* drQos = 0;
1149 0 : const DDS::SubscriberQos* subQos = 0;
1150 0 : TransportLocatorSeq* rTls = 0;
1151 0 : ACE_CDR::ULong rTransportContext = 0;
1152 0 : const ContentFilterProperty_t* cfProp = 0;
1153 0 : XTypes::TypeInformation* reader_type_info = 0;
1154 : MonotonicTime_t reader_participant_discovered_at;
1155 :
1156 0 : const LocalSubscriptionIter lsi = local_subscriptions_.find(reader);
1157 0 : bool reader_local = false;
1158 0 : if (lsi != local_subscriptions_.end()) {
1159 0 : reader_local = true;
1160 0 : drQos = &lsi->second.qos_;
1161 0 : subQos = &lsi->second.subscriber_qos_;
1162 0 : rTls = &lsi->second.trans_info_;
1163 0 : rTransportContext = lsi->second.transport_context_;
1164 0 : reader_type_info = &lsi->second.type_info_;
1165 0 : if (lsi->second.filterProperties.filterExpression[0] != 0) {
1166 0 : tempCfp.filterExpression = lsi->second.filterProperties.filterExpression;
1167 0 : tempCfp.expressionParameters = lsi->second.filterProperties.expressionParameters;
1168 : }
1169 0 : cfProp = &tempCfp;
1170 0 : if (!already_matched) {
1171 0 : already_matched = lsi->second.matched_endpoints_.count(writer);
1172 : }
1173 0 : reader_participant_discovered_at = lsi->second.participant_discovered_at_;
1174 0 : } else if (dsi != discovered_subscriptions_.end()) {
1175 0 : rTls = &dsi->second.reader_data_.readerProxy.allLocators;
1176 :
1177 0 : populate_transport_locator_sequence(rTls, dsi, reader);
1178 0 : rTransportContext = dsi->second.transport_context_;
1179 :
1180 : const DDS::SubscriptionBuiltinTopicData& bit =
1181 0 : dsi->second.reader_data_.ddsSubscriptionData;
1182 0 : tempDrQos.durability = bit.durability;
1183 0 : tempDrQos.deadline = bit.deadline;
1184 0 : tempDrQos.latency_budget = bit.latency_budget;
1185 0 : tempDrQos.liveliness = bit.liveliness;
1186 0 : tempDrQos.reliability = bit.reliability;
1187 0 : tempDrQos.destination_order = bit.destination_order;
1188 0 : tempDrQos.history = TheServiceParticipant->initial_HistoryQosPolicy();
1189 0 : tempDrQos.resource_limits =
1190 0 : TheServiceParticipant->initial_ResourceLimitsQosPolicy();
1191 0 : tempDrQos.user_data = bit.user_data;
1192 0 : tempDrQos.ownership = bit.ownership;
1193 0 : tempDrQos.time_based_filter = bit.time_based_filter;
1194 0 : tempDrQos.reader_data_lifecycle =
1195 0 : TheServiceParticipant->initial_ReaderDataLifecycleQosPolicy();
1196 0 : tempDrQos.representation = bit.representation;
1197 0 : tempDrQos.type_consistency = bit.type_consistency;
1198 0 : drQos = &tempDrQos;
1199 :
1200 0 : tempSubQos.presentation = bit.presentation;
1201 0 : tempSubQos.partition = bit.partition;
1202 0 : tempSubQos.group_data = bit.group_data;
1203 0 : tempSubQos.entity_factory =
1204 0 : TheServiceParticipant->initial_EntityFactoryQosPolicy();
1205 0 : subQos = &tempSubQos;
1206 :
1207 0 : cfProp = &dsi->second.reader_data_.contentFilterProperty;
1208 0 : reader_type_info = &dsi->second.type_info_;
1209 0 : reader_participant_discovered_at = dsi->second.participant_discovered_at_;
1210 : } else {
1211 0 : return; // Possible and ok, since lock is released
1212 : }
1213 :
1214 : // 3. Perform type consistency check (XTypes 1.3, Section 7.6.3.4.2)
1215 0 : bool consistent = false;
1216 :
1217 0 : TopicDetailsMap::iterator td_iter = topics_.find(topic_name);
1218 0 : if (td_iter == topics_.end()) {
1219 0 : ACE_ERROR((LM_ERROR,
1220 : ACE_TEXT("(%P|%t) StaticEndpointManager::match_continue - ERROR ")
1221 : ACE_TEXT("Didn't find topic for consistency check\n")));
1222 0 : return;
1223 : } else {
1224 0 : const XTypes::TypeIdentifier& writer_type_id = writer_type_info->minimal.typeid_with_size.type_id;
1225 0 : const XTypes::TypeIdentifier& reader_type_id = reader_type_info->minimal.typeid_with_size.type_id;
1226 0 : if (writer_type_id.kind() != XTypes::TK_NONE && reader_type_id.kind() != XTypes::TK_NONE) {
1227 0 : if (!writer_local || !reader_local) {
1228 : Encoding::Kind encoding_kind;
1229 0 : if (tempDwQos.representation.value.length() > 0 &&
1230 0 : repr_to_encoding_kind(tempDwQos.representation.value[0], encoding_kind) &&
1231 0 : encoding_kind == Encoding::KIND_XCDR1) {
1232 0 : const XTypes::TypeFlag extensibility_mask = XTypes::IS_APPENDABLE;
1233 0 : if (type_lookup_service_->extensibility(extensibility_mask, writer_type_id)) {
1234 0 : if (DCPS_debug_level) {
1235 0 : ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: ")
1236 : ACE_TEXT("StaticEndpointManager::match_continue: ")
1237 : ACE_TEXT("Encountered unsupported combination of XCDR1 encoding and appendable extensibility\n")));
1238 : }
1239 : }
1240 : }
1241 : }
1242 :
1243 : XTypes::TypeConsistencyAttributes type_consistency;
1244 0 : type_consistency.ignore_sequence_bounds = drQos->type_consistency.ignore_sequence_bounds;
1245 0 : type_consistency.ignore_string_bounds = drQos->type_consistency.ignore_string_bounds;
1246 0 : type_consistency.ignore_member_names = drQos->type_consistency.ignore_member_names;
1247 0 : type_consistency.prevent_type_widening = drQos->type_consistency.prevent_type_widening;
1248 0 : XTypes::TypeAssignability ta(type_lookup_service_, type_consistency);
1249 :
1250 0 : if (drQos->type_consistency.kind == DDS::ALLOW_TYPE_COERCION) {
1251 0 : consistent = ta.assignable(reader_type_id, writer_type_id);
1252 : } else {
1253 : // The two types must be equivalent for DISALLOW_TYPE_COERCION
1254 0 : consistent = reader_type_id == writer_type_id;
1255 : }
1256 0 : } else {
1257 0 : if (drQos->type_consistency.force_type_validation) {
1258 : // Cannot do type validation since not both TypeObjects are available
1259 0 : consistent = false;
1260 : } else {
1261 : // Fall back to matching type names
1262 0 : OPENDDS_STRING writer_type_name;
1263 0 : OPENDDS_STRING reader_type_name;
1264 0 : if (writer_local) {
1265 0 : writer_type_name = td_iter->second.local_data_type_name();
1266 : } else {
1267 0 : writer_type_name = dpi->second.get_type_name();
1268 : }
1269 0 : if (reader_local) {
1270 0 : reader_type_name = td_iter->second.local_data_type_name();
1271 : } else {
1272 0 : reader_type_name = dsi->second.get_type_name();
1273 : }
1274 0 : consistent = writer_type_name == reader_type_name;
1275 0 : }
1276 : }
1277 :
1278 0 : if (!consistent) {
1279 0 : td_iter->second.increment_inconsistent();
1280 0 : if (DCPS::DCPS_debug_level) {
1281 0 : ACE_DEBUG((LM_WARNING,
1282 : ACE_TEXT("(%P|%t) StaticEndpointManager::match_continue - WARNING ")
1283 : ACE_TEXT("Data types of topic %C does not match (inconsistent)\n"),
1284 : topic_name.c_str()));
1285 : }
1286 0 : return;
1287 : }
1288 : }
1289 :
1290 : // Need to release lock, below, for callbacks into DCPS which could
1291 : // call into Spdp/Sedp. Note that this doesn't unlock, it just constructs
1292 : // an ACE object which will be used below for unlocking.
1293 0 : ACE_Reverse_Lock<ACE_Thread_Mutex> rev_lock(lock_);
1294 :
1295 : // 4. Check transport and QoS compatibility
1296 :
1297 : // Copy entries from local publication and local subscription maps
1298 : // prior to releasing lock
1299 0 : DataWriterCallbacks_wrch dwr;
1300 0 : DataReaderCallbacks_wrch drr;
1301 0 : if (writer_local) {
1302 0 : dwr = lpi->second.publication_;
1303 0 : OPENDDS_ASSERT(lpi->second.publication_);
1304 0 : OPENDDS_ASSERT(dwr);
1305 : }
1306 0 : if (reader_local) {
1307 0 : drr = lsi->second.subscription_;
1308 0 : OPENDDS_ASSERT(lsi->second.subscription_);
1309 0 : OPENDDS_ASSERT(drr);
1310 : }
1311 :
1312 0 : IncompatibleQosStatus writerStatus = {0, 0, 0, DDS::QosPolicyCountSeq()};
1313 0 : IncompatibleQosStatus readerStatus = {0, 0, 0, DDS::QosPolicyCountSeq()};
1314 :
1315 0 : if (compatibleQOS(&writerStatus, &readerStatus, *wTls, *rTls,
1316 : dwQos, drQos, pubQos, subQos)) {
1317 :
1318 0 : bool call_writer = false, call_reader = false;
1319 :
1320 0 : if (writer_local) {
1321 0 : call_writer = lpi->second.matched_endpoints_.insert(reader).second;
1322 0 : dwr = lpi->second.publication_;
1323 0 : if (!reader_local) {
1324 0 : dsi->second.matched_endpoints_.insert(writer);
1325 : }
1326 : }
1327 0 : if (reader_local) {
1328 0 : call_reader = lsi->second.matched_endpoints_.insert(writer).second;
1329 0 : drr = lsi->second.subscription_;
1330 0 : if (!writer_local) {
1331 0 : dpi->second.matched_endpoints_.insert(reader);
1332 : }
1333 : }
1334 :
1335 0 : if (writer_local && !reader_local) {
1336 0 : add_assoc_i(writer, lpi->second, reader, dsi->second);
1337 : }
1338 0 : if (reader_local && !writer_local) {
1339 0 : add_assoc_i(reader, lsi->second, writer, dpi->second);
1340 : }
1341 :
1342 0 : if (!call_writer && !call_reader) {
1343 0 : return; // nothing more to do
1344 : }
1345 :
1346 : // Copy reader and writer association data prior to releasing lock
1347 0 : DDS::OctetSeq octet_seq_type_info_reader;
1348 0 : XTypes::serialize_type_info(*reader_type_info, octet_seq_type_info_reader);
1349 : const ReaderAssociation ra = {
1350 : *rTls, TransportLocator(), rTransportContext, reader, *subQos, *drQos,
1351 : #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
1352 0 : cfProp->filterClassName, cfProp->filterExpression,
1353 : #else
1354 : "", "",
1355 : #endif
1356 0 : cfProp->expressionParameters,
1357 : octet_seq_type_info_reader,
1358 : reader_participant_discovered_at
1359 0 : };
1360 :
1361 0 : DDS::OctetSeq octet_seq_type_info_writer;
1362 0 : XTypes::serialize_type_info(*writer_type_info, octet_seq_type_info_writer);
1363 : const WriterAssociation wa = {
1364 : *wTls, TransportLocator(), wTransportContext, writer, *pubQos, *dwQos,
1365 : octet_seq_type_info_writer,
1366 : writer_participant_discovered_at
1367 0 : };
1368 :
1369 0 : ACE_GUARD(ACE_Reverse_Lock<ACE_Thread_Mutex>, rg, rev_lock);
1370 : static const bool writer_active = true;
1371 :
1372 0 : if (call_writer) {
1373 0 : if (DCPS_debug_level > 3) {
1374 0 : ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) StaticEndpointManager::match_continue - ")
1375 : ACE_TEXT("adding writer %C association for reader %C\n"), LogGuid(writer).c_str(),
1376 : LogGuid(reader).c_str()));
1377 : }
1378 0 : DataWriterCallbacks_rch dwr_lock = dwr.lock();
1379 0 : if (dwr_lock) {
1380 0 : if (call_reader) {
1381 0 : DataReaderCallbacks_rch drr_lock = drr.lock();
1382 0 : if (drr_lock) {
1383 0 : DcpsUpcalls thr(drr_lock, reader, wa, !writer_active, dwr_lock);
1384 0 : thr.activate();
1385 0 : dwr_lock->add_association(writer, ra, writer_active);
1386 0 : thr.writer_done();
1387 0 : }
1388 0 : } else {
1389 0 : dwr_lock->add_association(writer, ra, writer_active);
1390 : }
1391 : }
1392 0 : } else if (call_reader) {
1393 0 : if (DCPS_debug_level > 3) {
1394 0 : ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) StaticEndpointManager::match_continue - ")
1395 : ACE_TEXT("adding reader %C association for writer %C\n"),
1396 : LogGuid(reader).c_str(), LogGuid(writer).c_str()));
1397 : }
1398 0 : DataReaderCallbacks_rch drr_lock = drr.lock();
1399 0 : if (drr_lock) {
1400 0 : drr_lock->add_association(reader, wa, !writer_active);
1401 : }
1402 0 : }
1403 :
1404 0 : } else if (already_matched) { // break an existing associtaion
1405 0 : if (writer_local) {
1406 0 : lpi->second.matched_endpoints_.erase(reader);
1407 0 : lpi->second.remote_expectant_opendds_associations_.erase(reader);
1408 0 : if (dsi != discovered_subscriptions_.end()) {
1409 0 : dsi->second.matched_endpoints_.erase(writer);
1410 : }
1411 : }
1412 0 : if (reader_local) {
1413 0 : lsi->second.matched_endpoints_.erase(writer);
1414 0 : lsi->second.remote_expectant_opendds_associations_.erase(writer);
1415 0 : if (dpi != discovered_publications_.end()) {
1416 0 : dpi->second.matched_endpoints_.erase(reader);
1417 : }
1418 : }
1419 0 : if (writer_local && !reader_local) {
1420 0 : remove_assoc_i(writer, lpi->second, reader);
1421 : }
1422 0 : if (reader_local && !writer_local) {
1423 0 : remove_assoc_i(reader, lsi->second, writer);
1424 : }
1425 0 : ACE_GUARD(ACE_Reverse_Lock<ACE_Thread_Mutex>, rg, rev_lock);
1426 0 : if (writer_local) {
1427 0 : ReaderIdSeq reader_seq(1);
1428 0 : reader_seq.length(1);
1429 0 : reader_seq[0] = reader;
1430 0 : DataWriterCallbacks_rch dwr_lock = dwr.lock();
1431 0 : if (dwr_lock) {
1432 0 : dwr_lock->remove_associations(reader_seq, false /*notify_lost*/);
1433 : }
1434 0 : }
1435 0 : if (reader_local) {
1436 0 : WriterIdSeq writer_seq(1);
1437 0 : writer_seq.length(1);
1438 0 : writer_seq[0] = writer;
1439 0 : DataReaderCallbacks_rch drr_lock = drr.lock();
1440 0 : if (drr_lock) {
1441 0 : drr_lock->remove_associations(writer_seq, false /*notify_lost*/);
1442 : }
1443 0 : }
1444 0 : } else { // something was incompatible
1445 0 : ACE_GUARD(ACE_Reverse_Lock< ACE_Thread_Mutex>, rg, rev_lock);
1446 0 : if (writer_local && writerStatus.count_since_last_send) {
1447 0 : if (DCPS_debug_level > 3) {
1448 0 : ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) StaticEndpointManager::match - ")
1449 : ACE_TEXT("writer incompatible\n")));
1450 : }
1451 0 : DataWriterCallbacks_rch dwr_lock = dwr.lock();
1452 0 : if (dwr_lock) {
1453 0 : dwr_lock->update_incompatible_qos(writerStatus);
1454 : }
1455 0 : }
1456 0 : if (reader_local && readerStatus.count_since_last_send) {
1457 0 : if (DCPS_debug_level > 3) {
1458 0 : ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) StaticEndpointManager::match - ")
1459 : ACE_TEXT("reader incompatible\n")));
1460 : }
1461 0 : DataReaderCallbacks_rch drr_lock = drr.lock();
1462 0 : if (drr_lock) {
1463 0 : drr_lock->update_incompatible_qos(readerStatus);
1464 : }
1465 0 : }
1466 0 : }
1467 0 : }
1468 :
1469 0 : GUID_t StaticEndpointManager::make_topic_guid()
1470 : {
1471 : EntityId_t entity_id;
1472 0 : assign(entity_id.entityKey, topic_counter_);
1473 0 : ++topic_counter_;
1474 0 : entity_id.entityKind = ENTITYKIND_OPENDDS_TOPIC;
1475 :
1476 0 : if (topic_counter_ == 0x1000000) {
1477 0 : ACE_ERROR((LM_ERROR,
1478 : ACE_TEXT("(%P|%t) ERROR: StaticEndpointManager::make_topic_guid: ")
1479 : ACE_TEXT("Exceeded Maximum number of topic entity keys!")
1480 : ACE_TEXT("Next key will be a duplicate!\n")));
1481 0 : topic_counter_ = 0;
1482 : }
1483 :
1484 0 : return make_id(participant_id_, entity_id);
1485 : }
1486 :
1487 0 : bool StaticEndpointManager::has_dcps_key(const GUID_t& topicId) const
1488 : {
1489 : typedef OPENDDS_MAP_CMP(GUID_t, OPENDDS_STRING, GUID_tKeyLessThan) TNMap;
1490 0 : TNMap::const_iterator tn = topic_names_.find(topicId);
1491 0 : if (tn == topic_names_.end()) return false;
1492 :
1493 0 : TopicDetailsMap::const_iterator td = topics_.find(tn->second);
1494 0 : if (td == topics_.end()) return false;
1495 :
1496 0 : return td->second.has_dcps_key();
1497 : }
1498 :
1499 1 : StaticDiscovery::StaticDiscovery(const RepoKey& key)
1500 1 : : Discovery(key)
1501 1 : {}
1502 :
1503 : namespace {
1504 0 : unsigned char hextobyte(unsigned char c)
1505 : {
1506 0 : if (c >= '0' && c <= '9') {
1507 0 : return c - '0';
1508 : }
1509 0 : if (c >= 'a' && c <= 'f') {
1510 0 : return 10 + c - 'a';
1511 : }
1512 0 : if (c >= 'A' && c <= 'F') {
1513 0 : return 10 + c - 'A';
1514 : }
1515 0 : return c;
1516 : }
1517 :
1518 : unsigned char
1519 0 : fromhex(const OPENDDS_STRING& x, size_t idx)
1520 : {
1521 0 : return (hextobyte(x[idx * 2]) << 4) | (hextobyte(x[idx * 2 + 1]));
1522 : }
1523 : }
1524 :
1525 : EntityId_t
1526 0 : EndpointRegistry::build_id(const unsigned char* entity_key,
1527 : const unsigned char entity_kind)
1528 : {
1529 : EntityId_t retval;
1530 0 : retval.entityKey[0] = entity_key[0];
1531 0 : retval.entityKey[1] = entity_key[1];
1532 0 : retval.entityKey[2] = entity_key[2];
1533 0 : retval.entityKind = entity_kind;
1534 0 : return retval;
1535 : }
1536 :
1537 : GUID_t
1538 0 : EndpointRegistry::build_id(DDS::DomainId_t domain,
1539 : const unsigned char* participant_id,
1540 : const EntityId_t& entity_id)
1541 : {
1542 : GUID_t id;
1543 0 : id.guidPrefix[0] = VENDORID_OCI[0];
1544 0 : id.guidPrefix[1] = VENDORID_OCI[1];
1545 : // id.guidPrefix[2] = domain[0]
1546 : // id.guidPrefix[3] = domain[1]
1547 : // id.guidPrefix[4] = domain[2]
1548 : // id.guidPrefix[5] = domain[3]
1549 0 : DDS::DomainId_t netdom = ACE_HTONL(domain);
1550 0 : ACE_OS::memcpy(&id.guidPrefix[2], &netdom, sizeof(DDS::DomainId_t));
1551 : // id.guidPrefix[6] = participant[0]
1552 : // id.guidPrefix[7] = participant[1]
1553 : // id.guidPrefix[8] = participant[2]
1554 : // id.guidPrefix[9] = participant[3]
1555 : // id.guidPrefix[10] = participant[4]
1556 : // id.guidPrefix[11] = participant[5]
1557 0 : ACE_OS::memcpy(&id.guidPrefix[6], participant_id, 6);
1558 0 : id.entityId = entity_id;
1559 0 : return id;
1560 : }
1561 :
1562 : OpenDDS::DCPS::GUID_t
1563 0 : StaticDiscovery::generate_participant_guid()
1564 : {
1565 0 : return GUID_UNKNOWN;
1566 : }
1567 :
1568 : AddDomainStatus
1569 0 : StaticDiscovery::add_domain_participant(DDS::DomainId_t domain,
1570 : const DDS::DomainParticipantQos& qos,
1571 : XTypes::TypeLookupService_rch tls)
1572 : {
1573 0 : AddDomainStatus ads = {GUID_t(), false /*federated*/};
1574 :
1575 0 : if (qos.user_data.value.length() != BYTES_IN_PARTICIPANT) {
1576 0 : ACE_ERROR((LM_ERROR,
1577 : ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::add_domain_participant ")
1578 : ACE_TEXT("No userdata to identify participant\n")));
1579 0 : return ads;
1580 : }
1581 :
1582 0 : GUID_t id = EndpointRegistry::build_id(domain,
1583 : qos.user_data.value.get_buffer(),
1584 : ENTITYID_PARTICIPANT);
1585 0 : if (!get_part(domain, id).is_nil()) {
1586 0 : ACE_ERROR((LM_ERROR,
1587 : ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::add_domain_participant ")
1588 : ACE_TEXT("Duplicate participant\n")));
1589 0 : return ads;
1590 : }
1591 :
1592 0 : const RcHandle<StaticParticipant> participant (make_rch<StaticParticipant>(ref(id), qos, registry));
1593 :
1594 : {
1595 0 : ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, ads);
1596 0 : participants_[domain][id] = participant;
1597 0 : }
1598 :
1599 0 : participant->type_lookup_service(tls);
1600 :
1601 0 : ads.id = id;
1602 0 : return ads;
1603 0 : }
1604 :
1605 : #if defined(OPENDDS_SECURITY)
1606 : AddDomainStatus
1607 0 : StaticDiscovery::add_domain_participant_secure(
1608 : DDS::DomainId_t /*domain*/,
1609 : const DDS::DomainParticipantQos& /*qos*/,
1610 : XTypes::TypeLookupService_rch /*tls*/,
1611 : const OpenDDS::DCPS::GUID_t& /*guid*/,
1612 : DDS::Security::IdentityHandle /*id*/,
1613 : DDS::Security::PermissionsHandle /*perm*/,
1614 : DDS::Security::ParticipantCryptoHandle /*part_crypto*/)
1615 : {
1616 0 : const DCPS::AddDomainStatus ads = {OpenDDS::DCPS::GUID_UNKNOWN, false /*federated*/};
1617 0 : ACE_ERROR((LM_ERROR,
1618 : ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::add_domain_participant_secure ")
1619 : ACE_TEXT("Security not supported for static discovery.\n")));
1620 0 : return ads;
1621 : }
1622 : #endif
1623 :
1624 : namespace {
1625 : const ACE_TCHAR TOPIC_SECTION_NAME[] = ACE_TEXT("topic");
1626 : const ACE_TCHAR DATAWRITERQOS_SECTION_NAME[] = ACE_TEXT("datawriterqos");
1627 : const ACE_TCHAR DATAREADERQOS_SECTION_NAME[] = ACE_TEXT("datareaderqos");
1628 : const ACE_TCHAR PUBLISHERQOS_SECTION_NAME[] = ACE_TEXT("publisherqos");
1629 : const ACE_TCHAR SUBSCRIBERQOS_SECTION_NAME[] = ACE_TEXT("subscriberqos");
1630 : const ACE_TCHAR ENDPOINT_SECTION_NAME[] = ACE_TEXT("endpoint");
1631 :
1632 0 : void parse_second(CORBA::Long& x, const OPENDDS_STRING& value)
1633 : {
1634 0 : if (value == "DURATION_INFINITE_SEC") {
1635 0 : x = DDS::DURATION_INFINITE_SEC;
1636 : } else {
1637 0 : x = atoi(value.c_str());
1638 : }
1639 0 : }
1640 :
1641 0 : void parse_nanosecond(CORBA::ULong& x, const OPENDDS_STRING& value)
1642 : {
1643 0 : if (value == "DURATION_INFINITE_NANOSEC") {
1644 0 : x = DDS::DURATION_INFINITE_NSEC;
1645 : } else {
1646 0 : x = atoi(value.c_str());
1647 : }
1648 0 : }
1649 :
1650 0 : bool parse_bool(CORBA::Boolean& x, const OPENDDS_STRING& value)
1651 : {
1652 0 : if (value == "true") {
1653 0 : x = true;
1654 0 : return true;
1655 0 : } else if (value == "false") {
1656 0 : x = false;
1657 0 : return true;
1658 : }
1659 0 : return false;
1660 : }
1661 :
1662 0 : void parse_list(DDS::PartitionQosPolicy& x, const OPENDDS_STRING& value)
1663 : {
1664 : // Value can be a comma-separated list
1665 0 : const char* start = value.c_str();
1666 0 : while (const char* next_comma = std::strchr(start, ',')) {
1667 0 : const size_t size = next_comma - start;
1668 0 : const OPENDDS_STRING temp(start, size);
1669 : // Add to QOS
1670 0 : x.name.length(x.name.length() + 1);
1671 0 : x.name[x.name.length() - 1] = temp.c_str();
1672 : // Advance pointer
1673 0 : start = next_comma + 1;
1674 0 : }
1675 : // Append everything after last comma
1676 0 : x.name.length(x.name.length() + 1);
1677 0 : x.name[x.name.length() - 1] = start;
1678 0 : }
1679 : }
1680 :
1681 : int
1682 0 : StaticDiscovery::load_configuration(ACE_Configuration_Heap& cf)
1683 : {
1684 0 : if (parse_topics(cf) ||
1685 0 : parse_datawriterqos(cf) ||
1686 0 : parse_datareaderqos(cf) ||
1687 0 : parse_publisherqos(cf) ||
1688 0 : parse_subscriberqos(cf) ||
1689 0 : parse_endpoints(cf)) {
1690 0 : return -1;
1691 : }
1692 :
1693 0 : registry.match();
1694 :
1695 0 : return 0;
1696 : }
1697 :
1698 : int
1699 0 : StaticDiscovery::parse_topics(ACE_Configuration_Heap& cf)
1700 : {
1701 0 : const ACE_Configuration_Section_Key& root = cf.root_section();
1702 0 : ACE_Configuration_Section_Key section;
1703 :
1704 0 : if (cf.open_section(root, TOPIC_SECTION_NAME, false, section) != 0) {
1705 0 : if (DCPS_debug_level > 0) {
1706 : // This is not an error if the configuration file does not have
1707 : // any topic (sub)section.
1708 0 : ACE_DEBUG((LM_NOTICE,
1709 : ACE_TEXT("(%P|%t) NOTICE: StaticDiscovery::parse_topics ")
1710 : ACE_TEXT("no [%s] sections.\n"),
1711 : TOPIC_SECTION_NAME));
1712 : }
1713 0 : return 0;
1714 : }
1715 :
1716 : // Ensure there are no key/values in the [topic] section.
1717 : // Every key/value must be in a [topic/*] sub-section.
1718 0 : ValueMap vm;
1719 0 : if (pullValues(cf, section, vm) > 0) {
1720 0 : ACE_ERROR_RETURN((LM_ERROR,
1721 : ACE_TEXT("(%P|%t) StaticDiscovery::parse_topics ")
1722 : ACE_TEXT("[topic] sections must have a subsection name\n")),
1723 : -1);
1724 : }
1725 : // Process the subsections of this section
1726 0 : KeyList keys;
1727 0 : if (processSections(cf, section, keys) != 0) {
1728 0 : ACE_ERROR_RETURN((LM_ERROR,
1729 : ACE_TEXT("(%P|%t) StaticDiscovery::parse_topics ")
1730 : ACE_TEXT("too many nesting layers in the [topic] section.\n")),
1731 : -1);
1732 : }
1733 :
1734 : // Loop through the [topic/*] sections
1735 0 : for (KeyList::const_iterator it = keys.begin(); it != keys.end(); ++it) {
1736 0 : OPENDDS_STRING topic_name = it->first;
1737 :
1738 0 : if (DCPS_debug_level > 0) {
1739 0 : ACE_DEBUG((LM_NOTICE,
1740 : ACE_TEXT("(%P|%t) NOTICE: StaticDiscovery::parse_topics ")
1741 : ACE_TEXT("processing [topic/%C] section.\n"),
1742 : topic_name.c_str()));
1743 : }
1744 :
1745 0 : ValueMap values;
1746 0 : pullValues(cf, it->second, values);
1747 :
1748 0 : EndpointRegistry::Topic topic;
1749 0 : bool name_specified = false,
1750 0 : type_name_specified = false;
1751 :
1752 0 : for (ValueMap::const_iterator it = values.begin(); it != values.end(); ++it) {
1753 0 : OPENDDS_STRING name = it->first;
1754 0 : OPENDDS_STRING value = it->second;
1755 :
1756 0 : if (name == "name") {
1757 0 : topic.name = value;
1758 0 : name_specified = true;
1759 0 : } else if (name == "type_name") {
1760 0 : if (value.size() >= TYPE_NAME_MAX) {
1761 0 : ACE_ERROR_RETURN((LM_ERROR,
1762 : ACE_TEXT("(%P|%t) StaticDiscovery::parse_topics ")
1763 : ACE_TEXT("type_name (%C) must be less than 128 characters in [topic/%C] section.\n"),
1764 : value.c_str(), topic_name.c_str()),
1765 : -1);
1766 : }
1767 0 : topic.type_name = value;
1768 0 : type_name_specified = true;
1769 : } else {
1770 : // Typos are ignored to avoid parsing FACE-specific keys.
1771 : }
1772 0 : }
1773 :
1774 0 : if (!name_specified) {
1775 0 : topic.name = topic_name;
1776 : }
1777 :
1778 0 : if (!type_name_specified) {
1779 0 : ACE_ERROR_RETURN((LM_ERROR,
1780 : ACE_TEXT("(%P|%t) StaticDiscovery::parse_topics ")
1781 : ACE_TEXT("No type_name specified for [topic/%C] section.\n"),
1782 : topic_name.c_str()),
1783 : -1);
1784 : }
1785 :
1786 0 : registry.topic_map[topic_name] = topic;
1787 0 : }
1788 :
1789 0 : return 0;
1790 0 : }
1791 :
1792 : int
1793 0 : StaticDiscovery::parse_datawriterqos(ACE_Configuration_Heap& cf)
1794 : {
1795 0 : const ACE_Configuration_Section_Key& root = cf.root_section();
1796 0 : ACE_Configuration_Section_Key section;
1797 :
1798 0 : if (cf.open_section(root, DATAWRITERQOS_SECTION_NAME, false, section) != 0) {
1799 0 : if (DCPS_debug_level > 0) {
1800 : // This is not an error if the configuration file does not have
1801 : // any datawriterqos (sub)section.
1802 0 : ACE_DEBUG((LM_NOTICE,
1803 : ACE_TEXT("(%P|%t) NOTICE: StaticDiscovery::parse_datawriterqos ")
1804 : ACE_TEXT("no [%s] sections.\n"),
1805 : DATAWRITERQOS_SECTION_NAME));
1806 : }
1807 0 : return 0;
1808 : }
1809 :
1810 : // Ensure there are no key/values in the [datawriterqos] section.
1811 : // Every key/value must be in a [datawriterqos/*] sub-section.
1812 0 : ValueMap vm;
1813 0 : if (pullValues(cf, section, vm) > 0) {
1814 0 : ACE_ERROR_RETURN((LM_ERROR,
1815 : ACE_TEXT("(%P|%t) StaticDiscovery::parse_datawriterqos ")
1816 : ACE_TEXT("[datawriterqos] sections must have a subsection name\n")),
1817 : -1);
1818 : }
1819 : // Process the subsections of this section
1820 0 : KeyList keys;
1821 0 : if (processSections(cf, section, keys) != 0) {
1822 0 : ACE_ERROR_RETURN((LM_ERROR,
1823 : ACE_TEXT("(%P|%t) StaticDiscovery::parse_datawriterqos ")
1824 : ACE_TEXT("too many nesting layers in the [datawriterqos] section.\n")),
1825 : -1);
1826 : }
1827 :
1828 : // Loop through the [datawriterqos/*] sections
1829 0 : for (KeyList::const_iterator it = keys.begin(); it != keys.end(); ++it) {
1830 0 : OPENDDS_STRING datawriterqos_name = it->first;
1831 :
1832 0 : if (DCPS_debug_level > 0) {
1833 0 : ACE_DEBUG((LM_NOTICE,
1834 : ACE_TEXT("(%P|%t) NOTICE: StaticDiscovery::parse_datawriterqos ")
1835 : ACE_TEXT("processing [datawriterqos/%C] section.\n"),
1836 : datawriterqos_name.c_str()));
1837 : }
1838 :
1839 0 : ValueMap values;
1840 0 : pullValues(cf, it->second, values);
1841 :
1842 0 : DDS::DataWriterQos datawriterqos(TheServiceParticipant->initial_DataWriterQos());
1843 :
1844 0 : for (ValueMap::const_iterator it = values.begin(); it != values.end(); ++it) {
1845 0 : OPENDDS_STRING name = it->first;
1846 0 : OPENDDS_STRING value = it->second;
1847 :
1848 0 : if (name == "durability.kind") {
1849 0 : if (value == "VOLATILE") {
1850 0 : datawriterqos.durability.kind = DDS::VOLATILE_DURABILITY_QOS;
1851 0 : } else if (value == "TRANSIENT_LOCAL") {
1852 0 : datawriterqos.durability.kind = DDS::TRANSIENT_LOCAL_DURABILITY_QOS;
1853 : #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
1854 0 : } else if (value == "TRANSIENT") {
1855 0 : datawriterqos.durability.kind = DDS::TRANSIENT_DURABILITY_QOS;
1856 0 : } else if (value == "PERSISTENT") {
1857 0 : datawriterqos.durability.kind = DDS::PERSISTENT_DURABILITY_QOS;
1858 : #endif
1859 : } else {
1860 0 : ACE_ERROR_RETURN((LM_ERROR,
1861 : ACE_TEXT("(%P|%t) StaticDiscovery::parse_datawriterqos ")
1862 : ACE_TEXT("Illegal value for durability.kind (%C) in [datawriterqos/%C] section.\n"),
1863 : value.c_str(), datawriterqos_name.c_str()),
1864 : -1);
1865 : }
1866 0 : } else if (name == "deadline.period.sec") {
1867 0 : parse_second(datawriterqos.deadline.period.sec, value);
1868 0 : } else if (name == "deadline.period.nanosec") {
1869 0 : parse_nanosecond(datawriterqos.deadline.period.nanosec, value);
1870 0 : } else if (name == "latency_budget.duration.sec") {
1871 0 : parse_second(datawriterqos.latency_budget.duration.sec, value);
1872 0 : } else if (name == "latency_budget.duration.nanosec") {
1873 0 : parse_nanosecond(datawriterqos.latency_budget.duration.nanosec, value);
1874 0 : } else if (name == "liveliness.kind") {
1875 0 : if (value == "AUTOMATIC") {
1876 0 : datawriterqos.liveliness.kind = DDS::AUTOMATIC_LIVELINESS_QOS;
1877 0 : } else if (value == "MANUAL_BY_TOPIC") {
1878 0 : datawriterqos.liveliness.kind = DDS::MANUAL_BY_TOPIC_LIVELINESS_QOS;
1879 0 : } else if (value == "MANUAL_BY_PARTICIPANT") {
1880 0 : datawriterqos.liveliness.kind = DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS;
1881 : } else {
1882 0 : ACE_ERROR_RETURN((LM_ERROR,
1883 : ACE_TEXT("(%P|%t) StaticDiscovery::parse_datawriterqos ")
1884 : ACE_TEXT("Illegal value for liveliness.kind (%C) in [datawriterqos/%C] section.\n"),
1885 : value.c_str(), datawriterqos_name.c_str()),
1886 : -1);
1887 : }
1888 0 : } else if (name == "liveliness.lease_duration.sec") {
1889 0 : parse_second(datawriterqos.liveliness.lease_duration.sec, value);
1890 0 : } else if (name == "liveliness.lease_duration.nanosec") {
1891 0 : parse_nanosecond(datawriterqos.liveliness.lease_duration.nanosec, value);
1892 0 : } else if (name == "reliability.kind") {
1893 0 : if (value == "BEST_EFFORT") {
1894 0 : datawriterqos.reliability.kind = DDS::BEST_EFFORT_RELIABILITY_QOS;
1895 0 : } else if (value == "RELIABLE") {
1896 0 : datawriterqos.reliability.kind = DDS::RELIABLE_RELIABILITY_QOS;
1897 : } else {
1898 0 : ACE_ERROR_RETURN((LM_ERROR,
1899 : ACE_TEXT("(%P|%t) StaticDiscovery::parse_datawriterqos ")
1900 : ACE_TEXT("Illegal value for reliability.kind (%C) in [datawriterqos/%C] section.\n"),
1901 : value.c_str(), datawriterqos_name.c_str()),
1902 : -1);
1903 : }
1904 0 : } else if (name == "reliability.max_blocking_time.sec") {
1905 0 : parse_second(datawriterqos.reliability.max_blocking_time.sec, value);
1906 0 : } else if (name == "reliability.max_blocking_time.nanosec") {
1907 0 : parse_nanosecond(datawriterqos.reliability.max_blocking_time.nanosec, value);
1908 0 : } else if (name == "destination_order.kind") {
1909 0 : if (value == "BY_RECEPTION_TIMESTAMP") {
1910 0 : datawriterqos.destination_order.kind = DDS::BY_RECEPTION_TIMESTAMP_DESTINATIONORDER_QOS;
1911 0 : } else if (value == "BY_SOURCE_TIMESTAMP") {
1912 0 : datawriterqos.destination_order.kind = DDS::BY_SOURCE_TIMESTAMP_DESTINATIONORDER_QOS;
1913 : } else {
1914 0 : ACE_ERROR_RETURN((LM_ERROR,
1915 : ACE_TEXT("(%P|%t) StaticDiscovery::parse_datawriterqos ")
1916 : ACE_TEXT("Illegal value for destination_order.kind (%C) in [datawriterqos/%C] section.\n"),
1917 : value.c_str(), datawriterqos_name.c_str()),
1918 : -1);
1919 : }
1920 0 : } else if (name == "history.kind") {
1921 0 : if (value == "KEEP_ALL") {
1922 0 : datawriterqos.history.kind = DDS::KEEP_ALL_HISTORY_QOS;
1923 0 : } else if (value == "KEEP_LAST") {
1924 0 : datawriterqos.history.kind = DDS::KEEP_LAST_HISTORY_QOS;
1925 : } else {
1926 0 : ACE_ERROR_RETURN((LM_ERROR,
1927 : ACE_TEXT("(%P|%t) StaticDiscovery::parse_datawriterqos ")
1928 : ACE_TEXT("Illegal value for history.kind (%C) in [datawriterqos/%C] section.\n"),
1929 : value.c_str(), datawriterqos_name.c_str()),
1930 : -1);
1931 : }
1932 0 : } else if (name == "history.depth") {
1933 0 : datawriterqos.history.depth = atoi(value.c_str());
1934 0 : } else if (name == "resource_limits.max_samples") {
1935 0 : datawriterqos.resource_limits.max_samples = atoi(value.c_str());
1936 0 : } else if (name == "resource_limits.max_instances") {
1937 0 : datawriterqos.resource_limits.max_instances = atoi(value.c_str());
1938 0 : } else if (name == "resource_limits.max_samples_per_instance") {
1939 0 : datawriterqos.resource_limits.max_samples_per_instance = atoi(value.c_str());
1940 0 : } else if (name == "transport_priority.value") {
1941 0 : datawriterqos.transport_priority.value = atoi(value.c_str());
1942 0 : } else if (name == "lifespan.duration.sec") {
1943 0 : parse_second(datawriterqos.lifespan.duration.sec, value);
1944 0 : } else if (name == "lifespan.duration.nanosec") {
1945 0 : parse_nanosecond(datawriterqos.lifespan.duration.nanosec, value);
1946 0 : } else if (name == "ownership.kind") {
1947 0 : if (value == "SHARED") {
1948 0 : datawriterqos.ownership.kind = DDS::SHARED_OWNERSHIP_QOS;
1949 0 : } else if (value == "EXCLUSIVE") {
1950 0 : datawriterqos.ownership.kind = DDS::EXCLUSIVE_OWNERSHIP_QOS;
1951 : } else {
1952 0 : ACE_ERROR_RETURN((LM_ERROR,
1953 : ACE_TEXT("(%P|%t) StaticDiscovery::parse_datawriterqos ")
1954 : ACE_TEXT("Illegal value for ownership.kind (%C) in [datawriterqos/%C] section.\n"),
1955 : value.c_str(), datawriterqos_name.c_str()),
1956 : -1);
1957 : }
1958 0 : } else if (name == "ownership_strength.value") {
1959 0 : datawriterqos.ownership_strength.value = atoi(value.c_str());
1960 : } else {
1961 0 : ACE_ERROR_RETURN((LM_ERROR,
1962 : ACE_TEXT("(%P|%t) StaticDiscovery::parse_datawriterqos ")
1963 : ACE_TEXT("Unexpected entry (%C) in [datawriterqos/%C] section.\n"),
1964 : name.c_str(), datawriterqos_name.c_str()),
1965 : -1);
1966 : }
1967 0 : }
1968 :
1969 0 : registry.datawriterqos_map[datawriterqos_name] = datawriterqos;
1970 0 : }
1971 :
1972 0 : return 0;
1973 0 : }
1974 :
1975 : int
1976 0 : StaticDiscovery::parse_datareaderqos(ACE_Configuration_Heap& cf)
1977 : {
1978 0 : const ACE_Configuration_Section_Key& root = cf.root_section();
1979 0 : ACE_Configuration_Section_Key section;
1980 :
1981 0 : if (cf.open_section(root, DATAREADERQOS_SECTION_NAME, false, section) != 0) {
1982 0 : if (DCPS_debug_level > 0) {
1983 : // This is not an error if the configuration file does not have
1984 : // any datareaderqos (sub)section.
1985 0 : ACE_DEBUG((LM_NOTICE,
1986 : ACE_TEXT("(%P|%t) NOTICE: StaticDiscovery::parse_datareaderqos ")
1987 : ACE_TEXT("no [%s] sections.\n"),
1988 : DATAREADERQOS_SECTION_NAME));
1989 : }
1990 0 : return 0;
1991 : }
1992 :
1993 : // Ensure there are no key/values in the [datareaderqos] section.
1994 : // Every key/value must be in a [datareaderqos/*] sub-section.
1995 0 : ValueMap vm;
1996 0 : if (pullValues(cf, section, vm) > 0) {
1997 0 : ACE_ERROR_RETURN((LM_ERROR,
1998 : ACE_TEXT("(%P|%t) StaticDiscovery::parse_datareaderqos ")
1999 : ACE_TEXT("[datareaderqos] sections must have a subsection name\n")),
2000 : -1);
2001 : }
2002 : // Process the subsections of this section
2003 0 : KeyList keys;
2004 0 : if (processSections(cf, section, keys) != 0) {
2005 0 : ACE_ERROR_RETURN((LM_ERROR,
2006 : ACE_TEXT("(%P|%t) StaticDiscovery::parse_datareaderqos ")
2007 : ACE_TEXT("too many nesting layers in the [datareaderqos] section.\n")),
2008 : -1);
2009 : }
2010 :
2011 : // Loop through the [datareaderqos/*] sections
2012 0 : for (KeyList::const_iterator it = keys.begin(); it != keys.end(); ++it) {
2013 0 : OPENDDS_STRING datareaderqos_name = it->first;
2014 :
2015 0 : if (DCPS_debug_level > 0) {
2016 0 : ACE_DEBUG((LM_NOTICE,
2017 : ACE_TEXT("(%P|%t) NOTICE: StaticDiscovery::parse_datareaderqos ")
2018 : ACE_TEXT("processing [datareaderqos/%C] section.\n"),
2019 : datareaderqos_name.c_str()));
2020 : }
2021 :
2022 0 : ValueMap values;
2023 0 : pullValues(cf, it->second, values);
2024 :
2025 0 : DDS::DataReaderQos datareaderqos(TheServiceParticipant->initial_DataReaderQos());
2026 :
2027 0 : for (ValueMap::const_iterator it = values.begin(); it != values.end(); ++it) {
2028 0 : OPENDDS_STRING name = it->first;
2029 0 : OPENDDS_STRING value = it->second;
2030 :
2031 0 : if (name == "durability.kind") {
2032 0 : if (value == "VOLATILE") {
2033 0 : datareaderqos.durability.kind = DDS::VOLATILE_DURABILITY_QOS;
2034 0 : } else if (value == "TRANSIENT_LOCAL") {
2035 0 : datareaderqos.durability.kind = DDS::TRANSIENT_LOCAL_DURABILITY_QOS;
2036 : #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
2037 0 : } else if (value == "TRANSIENT") {
2038 0 : datareaderqos.durability.kind = DDS::TRANSIENT_DURABILITY_QOS;
2039 0 : } else if (value == "PERSISTENT") {
2040 0 : datareaderqos.durability.kind = DDS::PERSISTENT_DURABILITY_QOS;
2041 : #endif
2042 : } else {
2043 0 : ACE_ERROR_RETURN((LM_ERROR,
2044 : ACE_TEXT("(%P|%t) StaticDiscovery::parse_datareaderqos ")
2045 : ACE_TEXT("Illegal value for durability.kind (%C) in [datareaderqos/%C] section.\n"),
2046 : value.c_str(), datareaderqos_name.c_str()),
2047 : -1);
2048 : }
2049 0 : } else if (name == "deadline.period.sec") {
2050 0 : parse_second(datareaderqos.deadline.period.sec, value);
2051 0 : } else if (name == "deadline.period.nanosec") {
2052 0 : parse_nanosecond(datareaderqos.deadline.period.nanosec, value);
2053 0 : } else if (name == "latency_budget.duration.sec") {
2054 0 : parse_second(datareaderqos.latency_budget.duration.sec, value);
2055 0 : } else if (name == "latency_budget.duration.nanosec") {
2056 0 : parse_nanosecond(datareaderqos.latency_budget.duration.nanosec, value);
2057 0 : } else if (name == "liveliness.kind") {
2058 0 : if (value == "AUTOMATIC") {
2059 0 : datareaderqos.liveliness.kind = DDS::AUTOMATIC_LIVELINESS_QOS;
2060 0 : } else if (value == "MANUAL_BY_TOPIC") {
2061 0 : datareaderqos.liveliness.kind = DDS::MANUAL_BY_TOPIC_LIVELINESS_QOS;
2062 0 : } else if (value == "MANUAL_BY_PARTICIPANT") {
2063 0 : datareaderqos.liveliness.kind = DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS;
2064 : } else {
2065 0 : ACE_ERROR_RETURN((LM_ERROR,
2066 : ACE_TEXT("(%P|%t) StaticDiscovery::parse_datareaderqos ")
2067 : ACE_TEXT("Illegal value for liveliness.kind (%C) in [datareaderqos/%C] section.\n"),
2068 : value.c_str(), datareaderqos_name.c_str()),
2069 : -1);
2070 : }
2071 0 : } else if (name == "liveliness.lease_duration.sec") {
2072 0 : parse_second(datareaderqos.liveliness.lease_duration.sec, value);
2073 0 : } else if (name == "liveliness.lease_duration.nanosec") {
2074 0 : parse_nanosecond(datareaderqos.liveliness.lease_duration.nanosec, value);
2075 0 : } else if (name == "reliability.kind") {
2076 0 : if (value == "BEST_EFFORT") {
2077 0 : datareaderqos.reliability.kind = DDS::BEST_EFFORT_RELIABILITY_QOS;
2078 0 : } else if (value == "RELIABLE") {
2079 0 : datareaderqos.reliability.kind = DDS::RELIABLE_RELIABILITY_QOS;
2080 : } else {
2081 0 : ACE_ERROR_RETURN((LM_ERROR,
2082 : ACE_TEXT("(%P|%t) StaticDiscovery::parse_datareaderqos ")
2083 : ACE_TEXT("Illegal value for reliability.kind (%C) in [datareaderqos/%C] section.\n"),
2084 : value.c_str(), datareaderqos_name.c_str()),
2085 : -1);
2086 : }
2087 0 : } else if (name == "reliability.max_blocking_time.sec") {
2088 0 : parse_second(datareaderqos.reliability.max_blocking_time.sec, value);
2089 0 : } else if (name == "reliability.max_blocking_time.nanosec") {
2090 0 : parse_nanosecond(datareaderqos.reliability.max_blocking_time.nanosec, value);
2091 0 : } else if (name == "destination_order.kind") {
2092 0 : if (value == "BY_RECEPTION_TIMESTAMP") {
2093 0 : datareaderqos.destination_order.kind = DDS::BY_RECEPTION_TIMESTAMP_DESTINATIONORDER_QOS;
2094 0 : } else if (value == "BY_SOURCE_TIMESTAMP") {
2095 0 : datareaderqos.destination_order.kind = DDS::BY_SOURCE_TIMESTAMP_DESTINATIONORDER_QOS;
2096 : } else {
2097 0 : ACE_ERROR_RETURN((LM_ERROR,
2098 : ACE_TEXT("(%P|%t) StaticDiscovery::parse_datareaderqos ")
2099 : ACE_TEXT("Illegal value for destination_order.kind (%C) in [datareaderqos/%C] section.\n"),
2100 : value.c_str(), datareaderqos_name.c_str()),
2101 : -1);
2102 : }
2103 0 : } else if (name == "history.kind") {
2104 0 : if (value == "KEEP_ALL") {
2105 0 : datareaderqos.history.kind = DDS::KEEP_ALL_HISTORY_QOS;
2106 0 : } else if (value == "KEEP_LAST") {
2107 0 : datareaderqos.history.kind = DDS::KEEP_LAST_HISTORY_QOS;
2108 : } else {
2109 0 : ACE_ERROR_RETURN((LM_ERROR,
2110 : ACE_TEXT("(%P|%t) StaticDiscovery::parse_datareaderqos ")
2111 : ACE_TEXT("Illegal value for history.kind (%C) in [datareaderqos/%C] section.\n"),
2112 : value.c_str(), datareaderqos_name.c_str()),
2113 : -1);
2114 : }
2115 0 : } else if (name == "history.depth") {
2116 0 : datareaderqos.history.depth = atoi(value.c_str());
2117 0 : } else if (name == "resource_limits.max_samples") {
2118 0 : datareaderqos.resource_limits.max_samples = atoi(value.c_str());
2119 0 : } else if (name == "resource_limits.max_instances") {
2120 0 : datareaderqos.resource_limits.max_instances = atoi(value.c_str());
2121 0 : } else if (name == "resource_limits.max_samples_per_instance") {
2122 0 : datareaderqos.resource_limits.max_samples_per_instance = atoi(value.c_str());
2123 0 : } else if (name == "time_based_filter.minimum_separation.sec") {
2124 0 : parse_second(datareaderqos.time_based_filter.minimum_separation.sec, value);
2125 0 : } else if (name == "time_based_filter.minimum_separation.nanosec") {
2126 0 : parse_nanosecond(datareaderqos.time_based_filter.minimum_separation.nanosec, value);
2127 0 : } else if (name == "reader_data_lifecycle.autopurge_nowriter_samples_delay.sec") {
2128 0 : parse_second(datareaderqos.reader_data_lifecycle.autopurge_nowriter_samples_delay.sec, value);
2129 0 : } else if (name == "reader_data_lifecycle.autopurge_nowriter_samples_delay.nanosec") {
2130 0 : parse_nanosecond(datareaderqos.reader_data_lifecycle.autopurge_nowriter_samples_delay.nanosec, value);
2131 0 : } else if (name == "reader_data_lifecycle.autopurge_disposed_samples_delay.sec") {
2132 0 : parse_second(datareaderqos.reader_data_lifecycle.autopurge_disposed_samples_delay.sec, value);
2133 0 : } else if (name == "reader_data_lifecycle.autopurge_disposed_samples_delay.nanosec") {
2134 0 : parse_nanosecond(datareaderqos.reader_data_lifecycle.autopurge_disposed_samples_delay.nanosec, value);
2135 : } else {
2136 0 : ACE_ERROR_RETURN((LM_ERROR,
2137 : ACE_TEXT("(%P|%t) StaticDiscovery::parse_datareaderqos ")
2138 : ACE_TEXT("Unexpected entry (%C) in [datareaderqos/%C] section.\n"),
2139 : name.c_str(), datareaderqos_name.c_str()),
2140 : -1);
2141 : }
2142 0 : }
2143 :
2144 0 : registry.datareaderqos_map[datareaderqos_name] = datareaderqos;
2145 0 : }
2146 :
2147 0 : return 0;
2148 0 : }
2149 :
2150 : int
2151 0 : StaticDiscovery::parse_publisherqos(ACE_Configuration_Heap& cf)
2152 : {
2153 0 : const ACE_Configuration_Section_Key& root = cf.root_section();
2154 0 : ACE_Configuration_Section_Key section;
2155 :
2156 0 : if (cf.open_section(root, PUBLISHERQOS_SECTION_NAME, false, section) != 0) {
2157 0 : if (DCPS_debug_level > 0) {
2158 : // This is not an error if the configuration file does not have
2159 : // any publisherqos (sub)section.
2160 0 : ACE_DEBUG((LM_NOTICE,
2161 : ACE_TEXT("(%P|%t) NOTICE: StaticDiscovery::parse_publisherqos ")
2162 : ACE_TEXT("no [%s] sections.\n"),
2163 : PUBLISHERQOS_SECTION_NAME));
2164 : }
2165 0 : return 0;
2166 : }
2167 :
2168 : // Ensure there are no key/values in the [publisherqos] section.
2169 : // Every key/value must be in a [publisherqos/*] sub-section.
2170 0 : ValueMap vm;
2171 0 : if (pullValues(cf, section, vm) > 0) {
2172 0 : ACE_ERROR_RETURN((LM_ERROR,
2173 : ACE_TEXT("(%P|%t) StaticDiscovery::parse_publisherqos ")
2174 : ACE_TEXT("[publisherqos] sections must have a subsection name\n")),
2175 : -1);
2176 : }
2177 : // Process the subsections of this section
2178 0 : KeyList keys;
2179 0 : if (processSections(cf, section, keys) != 0) {
2180 0 : ACE_ERROR_RETURN((LM_ERROR,
2181 : ACE_TEXT("(%P|%t) StaticDiscovery::parse_publisherqos ")
2182 : ACE_TEXT("too many nesting layers in the [publisherqos] section.\n")),
2183 : -1);
2184 : }
2185 :
2186 : // Loop through the [publisherqos/*] sections
2187 0 : for (KeyList::const_iterator it = keys.begin(); it != keys.end(); ++it) {
2188 0 : OPENDDS_STRING publisherqos_name = it->first;
2189 :
2190 0 : if (DCPS_debug_level > 0) {
2191 0 : ACE_DEBUG((LM_NOTICE,
2192 : ACE_TEXT("(%P|%t) NOTICE: StaticDiscovery::parse_publisherqos ")
2193 : ACE_TEXT("processing [publisherqos/%C] section.\n"),
2194 : publisherqos_name.c_str()));
2195 : }
2196 :
2197 0 : ValueMap values;
2198 0 : pullValues(cf, it->second, values);
2199 :
2200 0 : DDS::PublisherQos publisherqos(TheServiceParticipant->initial_PublisherQos());
2201 :
2202 0 : for (ValueMap::const_iterator it = values.begin(); it != values.end(); ++it) {
2203 0 : OPENDDS_STRING name = it->first;
2204 0 : OPENDDS_STRING value = it->second;
2205 :
2206 0 : if (name == "presentation.access_scope") {
2207 0 : if (value == "INSTANCE") {
2208 0 : publisherqos.presentation.access_scope = DDS::INSTANCE_PRESENTATION_QOS;
2209 0 : } else if (value == "TOPIC") {
2210 0 : publisherqos.presentation.access_scope = DDS::TOPIC_PRESENTATION_QOS;
2211 0 : } else if (value == "GROUP") {
2212 0 : publisherqos.presentation.access_scope = DDS::GROUP_PRESENTATION_QOS;
2213 : } else {
2214 0 : ACE_ERROR_RETURN((LM_ERROR,
2215 : ACE_TEXT("(%P|%t) StaticDiscovery::parse_publisherqos ")
2216 : ACE_TEXT("Illegal value for presentation.access_scope (%C) in [publisherqos/%C] section.\n"),
2217 : value.c_str(), publisherqos_name.c_str()),
2218 : -1);
2219 : }
2220 0 : } else if (name == "presentation.coherent_access") {
2221 0 : if (parse_bool(publisherqos.presentation.coherent_access, value)) {
2222 : } else {
2223 0 : ACE_ERROR_RETURN((LM_ERROR,
2224 : ACE_TEXT("(%P|%t) StaticDiscovery::parse_publisherqos ")
2225 : ACE_TEXT("Illegal value for presentation.coherent_access (%C) in [publisherqos/%C] section.\n"),
2226 : value.c_str(), publisherqos_name.c_str()),
2227 : -1);
2228 : }
2229 0 : } else if (name == "presentation.ordered_access") {
2230 0 : if (parse_bool(publisherqos.presentation.ordered_access, value)) {
2231 : } else {
2232 0 : ACE_ERROR_RETURN((LM_ERROR,
2233 : ACE_TEXT("(%P|%t) StaticDiscovery::parse_publisherqos ")
2234 : ACE_TEXT("Illegal value for presentation.ordered_access (%C)")
2235 : ACE_TEXT("in [publisherqos/%C] section.\n"),
2236 : value.c_str(), publisherqos_name.c_str()),
2237 : -1);
2238 : }
2239 0 : } else if (name == "partition.name") {
2240 : try {
2241 0 : parse_list(publisherqos.partition, value);
2242 : }
2243 0 : catch (const CORBA::Exception& ex) {
2244 0 : ACE_ERROR_RETURN((LM_ERROR,
2245 : ACE_TEXT("(%P|%t) StaticDiscovery::parse_publisherqos ")
2246 : ACE_TEXT("Exception caught while parsing partition.name (%C) ")
2247 : ACE_TEXT("in [publisherqos/%C] section: %C.\n"),
2248 : value.c_str(), publisherqos_name.c_str(), ex._info().c_str()),
2249 : -1);
2250 0 : }
2251 : } else {
2252 0 : ACE_ERROR_RETURN((LM_ERROR,
2253 : ACE_TEXT("(%P|%t) StaticDiscovery::parse_publisherqos ")
2254 : ACE_TEXT("Unexpected entry (%C) in [publisherqos/%C] section.\n"),
2255 : name.c_str(), publisherqos_name.c_str()),
2256 : -1);
2257 : }
2258 0 : }
2259 :
2260 0 : registry.publisherqos_map[publisherqos_name] = publisherqos;
2261 0 : }
2262 :
2263 0 : return 0;
2264 0 : }
2265 :
2266 : int
2267 0 : StaticDiscovery::parse_subscriberqos(ACE_Configuration_Heap& cf)
2268 : {
2269 0 : const ACE_Configuration_Section_Key& root = cf.root_section();
2270 0 : ACE_Configuration_Section_Key section;
2271 :
2272 0 : if (cf.open_section(root, SUBSCRIBERQOS_SECTION_NAME, false, section) != 0) {
2273 0 : if (DCPS_debug_level > 0) {
2274 : // This is not an error if the configuration file does not have
2275 : // any subscriberqos (sub)section.
2276 0 : ACE_DEBUG((LM_NOTICE,
2277 : ACE_TEXT("(%P|%t) NOTICE: StaticDiscovery::parse_subscriberqos ")
2278 : ACE_TEXT("no [%s] sections.\n"),
2279 : SUBSCRIBERQOS_SECTION_NAME));
2280 : }
2281 0 : return 0;
2282 : }
2283 :
2284 : // Ensure there are no key/values in the [subscriberqos] section.
2285 : // Every key/value must be in a [subscriberqos/*] sub-section.
2286 0 : ValueMap vm;
2287 0 : if (pullValues(cf, section, vm) > 0) {
2288 0 : ACE_ERROR_RETURN((LM_ERROR,
2289 : ACE_TEXT("(%P|%t) StaticDiscovery::parse_subscriberqos ")
2290 : ACE_TEXT("[subscriberqos] sections must have a subsection name\n")),
2291 : -1);
2292 : }
2293 : // Process the subsections of this section
2294 0 : KeyList keys;
2295 0 : if (processSections(cf, section, keys) != 0) {
2296 0 : ACE_ERROR_RETURN((LM_ERROR,
2297 : ACE_TEXT("(%P|%t) StaticDiscovery::parse_subscriberqos ")
2298 : ACE_TEXT("too many nesting layers in the [subscriberqos] section.\n")),
2299 : -1);
2300 : }
2301 :
2302 : // Loop through the [subscriberqos/*] sections
2303 0 : for (KeyList::const_iterator it = keys.begin(); it != keys.end(); ++it) {
2304 0 : OPENDDS_STRING subscriberqos_name = it->first;
2305 :
2306 0 : if (DCPS_debug_level > 0) {
2307 0 : ACE_DEBUG((LM_NOTICE,
2308 : ACE_TEXT("(%P|%t) NOTICE: StaticDiscovery::parse_subscriberqos ")
2309 : ACE_TEXT("processing [subscriberqos/%C] section.\n"),
2310 : subscriberqos_name.c_str()));
2311 : }
2312 :
2313 0 : ValueMap values;
2314 0 : pullValues(cf, it->second, values);
2315 :
2316 0 : DDS::SubscriberQos subscriberqos(TheServiceParticipant->initial_SubscriberQos());
2317 :
2318 0 : for (ValueMap::const_iterator it = values.begin(); it != values.end(); ++it) {
2319 0 : OPENDDS_STRING name = it->first;
2320 0 : OPENDDS_STRING value = it->second;
2321 :
2322 0 : if (name == "presentation.access_scope") {
2323 0 : if (value == "INSTANCE") {
2324 0 : subscriberqos.presentation.access_scope = DDS::INSTANCE_PRESENTATION_QOS;
2325 0 : } else if (value == "TOPIC") {
2326 0 : subscriberqos.presentation.access_scope = DDS::TOPIC_PRESENTATION_QOS;
2327 0 : } else if (value == "GROUP") {
2328 0 : subscriberqos.presentation.access_scope = DDS::GROUP_PRESENTATION_QOS;
2329 : } else {
2330 0 : ACE_ERROR_RETURN((LM_ERROR,
2331 : ACE_TEXT("(%P|%t) StaticDiscovery::parse_subscriberqos ")
2332 : ACE_TEXT("Illegal value for presentation.access_scope (%C) in [subscriberqos/%C] section.\n"),
2333 : value.c_str(), subscriberqos_name.c_str()),
2334 : -1);
2335 : }
2336 0 : } else if (name == "presentation.coherent_access") {
2337 0 : if (parse_bool(subscriberqos.presentation.coherent_access, value)) {
2338 : } else {
2339 0 : ACE_ERROR_RETURN((LM_ERROR,
2340 : ACE_TEXT("(%P|%t) StaticDiscovery::parse_subscriberqos ")
2341 : ACE_TEXT("Illegal value for presentation.coherent_access (%C) in [subscriberqos/%C] section.\n"),
2342 : value.c_str(), subscriberqos_name.c_str()),
2343 : -1);
2344 : }
2345 0 : } else if (name == "presentation.ordered_access") {
2346 0 : if (parse_bool(subscriberqos.presentation.ordered_access, value)) {
2347 : } else {
2348 0 : ACE_ERROR_RETURN((LM_ERROR,
2349 : ACE_TEXT("(%P|%t) StaticDiscovery::parse_subscriberqos ")
2350 : ACE_TEXT("Illegal value for presentation.ordered_access (%C) in [subscriberqos/%C] section.\n"),
2351 : value.c_str(), subscriberqos_name.c_str()),
2352 : -1);
2353 : }
2354 0 : } else if (name == "partition.name") {
2355 : try {
2356 0 : parse_list(subscriberqos.partition, value);
2357 : }
2358 0 : catch (const CORBA::Exception& ex) {
2359 0 : ACE_ERROR_RETURN((LM_ERROR,
2360 : ACE_TEXT("(%P|%t) StaticDiscovery::parse_subscriberqos ")
2361 : ACE_TEXT("Exception caught while parsing partition.name (%C) ")
2362 : ACE_TEXT("in [subscriberqos/%C] section: %C.\n"),
2363 : value.c_str(), subscriberqos_name.c_str(), ex._info().c_str()),
2364 : -1);
2365 0 : }
2366 : } else {
2367 0 : ACE_ERROR_RETURN((LM_ERROR,
2368 : ACE_TEXT("(%P|%t) StaticDiscovery::parse_subscriberqos ")
2369 : ACE_TEXT("Unexpected entry (%C) in [subscriberqos/%C] section.\n"),
2370 : name.c_str(), subscriberqos_name.c_str()),
2371 : -1);
2372 : }
2373 0 : }
2374 :
2375 0 : registry.subscriberqos_map[subscriberqos_name] = subscriberqos;
2376 0 : }
2377 :
2378 0 : return 0;
2379 0 : }
2380 :
2381 : int
2382 0 : StaticDiscovery::parse_endpoints(ACE_Configuration_Heap& cf)
2383 : {
2384 0 : const ACE_Configuration_Section_Key& root = cf.root_section();
2385 0 : ACE_Configuration_Section_Key section;
2386 :
2387 0 : if (cf.open_section(root, ENDPOINT_SECTION_NAME, false, section) != 0) {
2388 0 : if (DCPS_debug_level > 0) {
2389 : // This is not an error if the configuration file does not have
2390 : // any endpoint (sub)section.
2391 0 : ACE_DEBUG((LM_NOTICE,
2392 : ACE_TEXT("(%P|%t) NOTICE: StaticDiscovery::parse_endpoints ")
2393 : ACE_TEXT("no [%s] sections.\n"),
2394 : ENDPOINT_SECTION_NAME));
2395 : }
2396 0 : return 0;
2397 : }
2398 :
2399 : // Ensure there are no key/values in the [endpoint] section.
2400 : // Every key/value must be in a [endpoint/*] sub-section.
2401 0 : ValueMap vm;
2402 0 : if (pullValues(cf, section, vm) > 0) {
2403 0 : ACE_ERROR_RETURN((LM_ERROR,
2404 : ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
2405 : ACE_TEXT("[endpoint] sections must have a subsection name\n")),
2406 : -1);
2407 : }
2408 : // Process the subsections of this section
2409 0 : KeyList keys;
2410 0 : if (processSections(cf, section, keys) != 0) {
2411 0 : ACE_ERROR_RETURN((LM_ERROR,
2412 : ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
2413 : ACE_TEXT("too many nesting layers in the [endpoint] section.\n")),
2414 : -1);
2415 : }
2416 :
2417 : // Loop through the [endpoint/*] sections
2418 0 : for (KeyList::const_iterator it = keys.begin(); it != keys.end(); ++it) {
2419 0 : OPENDDS_STRING endpoint_name = it->first;
2420 :
2421 0 : if (DCPS_debug_level > 0) {
2422 0 : ACE_DEBUG((LM_NOTICE,
2423 : ACE_TEXT("(%P|%t) NOTICE: StaticDiscovery::parse_endpoints ")
2424 : ACE_TEXT("processing [endpoint/%C] section.\n"),
2425 : endpoint_name.c_str()));
2426 : }
2427 :
2428 0 : ValueMap values;
2429 0 : pullValues(cf, it->second, values);
2430 0 : int domain = 0;
2431 0 : unsigned char participant[6] = { 0 };
2432 0 : unsigned char entity[3] = { 0 };
2433 : enum Type {
2434 : Reader,
2435 : Writer
2436 : };
2437 0 : Type type = Reader; // avoid warning
2438 0 : OPENDDS_STRING topic_name;
2439 0 : DDS::DataWriterQos datawriterqos(TheServiceParticipant->initial_DataWriterQos());
2440 0 : DDS::DataReaderQos datareaderqos(TheServiceParticipant->initial_DataReaderQos());
2441 0 : DDS::PublisherQos publisherqos(TheServiceParticipant->initial_PublisherQos());
2442 0 : DDS::SubscriberQos subscriberqos(TheServiceParticipant->initial_SubscriberQos());
2443 0 : TransportLocatorSeq trans_info;
2444 0 : OPENDDS_STRING config_name;
2445 :
2446 0 : bool domain_specified = false,
2447 0 : participant_specified = false,
2448 0 : entity_specified = false,
2449 0 : type_specified = false,
2450 0 : topic_name_specified = false,
2451 0 : config_name_specified = false;
2452 :
2453 0 : for (ValueMap::const_iterator it = values.begin(); it != values.end(); ++it) {
2454 0 : OPENDDS_STRING name = it->first;
2455 0 : OPENDDS_STRING value = it->second;
2456 :
2457 0 : if (name == "domain") {
2458 0 : if (convertToInteger(value, domain)) {
2459 0 : domain_specified = true;
2460 : } else {
2461 0 : ACE_ERROR_RETURN((LM_ERROR,
2462 : ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
2463 : ACE_TEXT("Illegal integer value for domain (%C) in [endpoint/%C] section.\n"),
2464 : value.c_str(), endpoint_name.c_str()),
2465 : -1);
2466 : }
2467 0 : } else if (name == "participant") {
2468 0 : const OPENDDS_STRING::difference_type count = std::count_if(value.begin(), value.end(), isxdigit);
2469 0 : if (value.size() != HEX_DIGITS_IN_PARTICIPANT || static_cast<size_t>(count) != HEX_DIGITS_IN_PARTICIPANT) {
2470 0 : ACE_ERROR_RETURN((LM_ERROR,
2471 : ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
2472 : ACE_TEXT("participant (%C) must be 12 hexadecimal digits in [endpoint/%C] section.\n"),
2473 : value.c_str(), endpoint_name.c_str()),
2474 : -1);
2475 : }
2476 :
2477 0 : for (size_t idx = 0; idx != BYTES_IN_PARTICIPANT; ++idx) {
2478 0 : participant[idx] = fromhex(value, idx);
2479 : }
2480 0 : participant_specified = true;
2481 0 : } else if (name == "entity") {
2482 0 : const OPENDDS_STRING::difference_type count = std::count_if(value.begin(), value.end(), isxdigit);
2483 0 : if (value.size() != HEX_DIGITS_IN_ENTITY || static_cast<size_t>(count) != HEX_DIGITS_IN_ENTITY) {
2484 0 : ACE_ERROR_RETURN((LM_ERROR,
2485 : ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
2486 : ACE_TEXT("entity (%C) must be 6 hexadecimal digits in [endpoint/%C] section.\n"),
2487 : value.c_str(), endpoint_name.c_str()),
2488 : -1);
2489 : }
2490 :
2491 0 : for (size_t idx = 0; idx != BYTES_IN_ENTITY; ++idx) {
2492 0 : entity[idx] = fromhex(value, idx);
2493 : }
2494 0 : entity_specified = true;
2495 0 : } else if (name == "type") {
2496 0 : if (value == "reader") {
2497 0 : type = Reader;
2498 0 : type_specified = true;
2499 0 : } else if (value == "writer") {
2500 0 : type = Writer;
2501 0 : type_specified = true;
2502 : } else {
2503 0 : ACE_ERROR_RETURN((LM_ERROR,
2504 : ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
2505 : ACE_TEXT("Illegal string value for type (%C) in [endpoint/%C] section.\n"),
2506 : value.c_str(), endpoint_name.c_str()),
2507 : -1);
2508 : }
2509 0 : } else if (name == "topic") {
2510 0 : EndpointRegistry::TopicMapType::const_iterator pos = this->registry.topic_map.find(value);
2511 0 : if (pos != this->registry.topic_map.end()) {
2512 0 : topic_name = pos->second.name;
2513 0 : topic_name_specified = true;
2514 : } else {
2515 0 : ACE_ERROR_RETURN((LM_ERROR,
2516 : ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
2517 : ACE_TEXT("Illegal topic reference (%C) in [endpoint/%C] section.\n"),
2518 : value.c_str(), endpoint_name.c_str()),
2519 : -1);
2520 : }
2521 0 : } else if (name == "datawriterqos") {
2522 0 : EndpointRegistry::DataWriterQosMapType::const_iterator pos = this->registry.datawriterqos_map.find(value);
2523 0 : if (pos != this->registry.datawriterqos_map.end()) {
2524 0 : datawriterqos = pos->second;
2525 : } else {
2526 0 : ACE_ERROR_RETURN((LM_ERROR,
2527 : ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
2528 : ACE_TEXT("Illegal datawriterqos reference (%C) in [endpoint/%C] section.\n"),
2529 : value.c_str(), endpoint_name.c_str()),
2530 : -1);
2531 : }
2532 0 : } else if (name == "publisherqos") {
2533 0 : EndpointRegistry::PublisherQosMapType::const_iterator pos = this->registry.publisherqos_map.find(value);
2534 0 : if (pos != this->registry.publisherqos_map.end()) {
2535 0 : publisherqos = pos->second;
2536 : } else {
2537 0 : ACE_ERROR_RETURN((LM_ERROR,
2538 : ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
2539 : ACE_TEXT("Illegal publisherqos reference (%C) in [endpoint/%C] section.\n"),
2540 : value.c_str(), endpoint_name.c_str()),
2541 : -1);
2542 : }
2543 0 : } else if (name == "datareaderqos") {
2544 0 : EndpointRegistry::DataReaderQosMapType::const_iterator pos = this->registry.datareaderqos_map.find(value);
2545 0 : if (pos != this->registry.datareaderqos_map.end()) {
2546 0 : datareaderqos = pos->second;
2547 : } else {
2548 0 : ACE_ERROR_RETURN((LM_ERROR,
2549 : ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
2550 : ACE_TEXT("Illegal datareaderqos reference (%C) in [endpoint/%C] section.\n"),
2551 : value.c_str(), endpoint_name.c_str()),
2552 : -1);
2553 : }
2554 0 : } else if (name == "subscriberqos") {
2555 0 : EndpointRegistry::SubscriberQosMapType::const_iterator pos = this->registry.subscriberqos_map.find(value);
2556 0 : if (pos != this->registry.subscriberqos_map.end()) {
2557 0 : subscriberqos = pos->second;
2558 : } else {
2559 0 : ACE_ERROR_RETURN((LM_ERROR,
2560 : ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
2561 : ACE_TEXT("Illegal subscriberqos reference (%C) in [endpoint/%C] section.\n"),
2562 : value.c_str(), endpoint_name.c_str()),
2563 : -1);
2564 : }
2565 0 : } else if (name == "config") {
2566 0 : config_name = value;
2567 0 : config_name_specified = true;
2568 : } else {
2569 0 : ACE_ERROR_RETURN((LM_ERROR,
2570 : ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
2571 : ACE_TEXT("Unexpected entry (%C) in [endpoint/%C] section.\n"),
2572 : name.c_str(), endpoint_name.c_str()),
2573 : -1);
2574 : }
2575 0 : }
2576 :
2577 0 : if (!domain_specified) {
2578 0 : ACE_ERROR_RETURN((LM_ERROR,
2579 : ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
2580 : ACE_TEXT("No domain specified for [endpoint/%C] section.\n"),
2581 : endpoint_name.c_str()),
2582 : -1);
2583 : }
2584 :
2585 0 : if (!participant_specified) {
2586 0 : ACE_ERROR_RETURN((LM_ERROR,
2587 : ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
2588 : ACE_TEXT("No participant specified for [endpoint/%C] section.\n"),
2589 : endpoint_name.c_str()),
2590 : -1);
2591 : }
2592 :
2593 0 : if (!entity_specified) {
2594 0 : ACE_ERROR_RETURN((LM_ERROR,
2595 : ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
2596 : ACE_TEXT("No entity specified for [endpoint/%C] section.\n"),
2597 : endpoint_name.c_str()),
2598 : -1);
2599 : }
2600 :
2601 0 : if (!type_specified) {
2602 0 : ACE_ERROR_RETURN((LM_ERROR,
2603 : ACE_TEXT("(%P|%t) ERROR:StaticDiscovery::parse_endpoints ")
2604 : ACE_TEXT("No type specified for [endpoint/%C] section.\n"),
2605 : endpoint_name.c_str()),
2606 : -1);
2607 : }
2608 :
2609 0 : if (!topic_name_specified) {
2610 0 : ACE_ERROR_RETURN((LM_ERROR,
2611 : ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
2612 : ACE_TEXT("No topic specified for [endpoint/%C] section.\n"),
2613 : endpoint_name.c_str()),
2614 : -1);
2615 : }
2616 :
2617 0 : TransportConfig_rch config;
2618 :
2619 0 : if (config_name_specified) {
2620 0 : config = TheTransportRegistry->get_config(config_name);
2621 0 : if (config.is_nil()) {
2622 0 : ACE_ERROR_RETURN((LM_ERROR,
2623 : ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
2624 : ACE_TEXT("Illegal config reference (%C) in [endpoint/%C] section.\n"),
2625 : config_name.c_str(), endpoint_name.c_str()),
2626 : -1);
2627 : }
2628 : }
2629 :
2630 0 : if (config.is_nil() && domain_specified) {
2631 0 : config = TheTransportRegistry->domain_default_config(domain);
2632 : }
2633 :
2634 0 : if (config.is_nil()) {
2635 0 : config = TheTransportRegistry->global_config();
2636 : }
2637 :
2638 : try {
2639 0 : config->populate_locators(trans_info);
2640 : }
2641 0 : catch (const CORBA::Exception& ex) {
2642 0 : ACE_ERROR_RETURN((LM_ERROR,
2643 : ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
2644 : ACE_TEXT("Exception caught while populating locators for [endpoint/%C] section. %C\n"),
2645 : endpoint_name.c_str(), ex._info().c_str()),
2646 : -1);
2647 0 : }
2648 0 : if (trans_info.length() == 0) {
2649 0 : ACE_ERROR_RETURN((LM_ERROR,
2650 : ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
2651 : ACE_TEXT("No locators for [endpoint/%C] section.\n"),
2652 : endpoint_name.c_str()),
2653 : -1);
2654 : }
2655 :
2656 0 : EntityId_t entity_id = EndpointRegistry::build_id(entity,
2657 : (type == Reader) ? ENTITYKIND_USER_READER_WITH_KEY : ENTITYKIND_USER_WRITER_WITH_KEY);
2658 :
2659 0 : GUID_t id = EndpointRegistry::build_id(domain, participant, entity_id);
2660 :
2661 0 : if (DCPS_debug_level > 0) {
2662 0 : ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DEBUG: StaticDiscovery::parse_endpoints adding entity with id %C\n"), LogGuid(id).c_str()));
2663 : }
2664 :
2665 0 : switch (type) {
2666 0 : case Reader:
2667 : // Populate the userdata.
2668 0 : datareaderqos.user_data.value.length(3);
2669 0 : datareaderqos.user_data.value[0] = entity_id.entityKey[0];
2670 0 : datareaderqos.user_data.value[1] = entity_id.entityKey[1];
2671 0 : datareaderqos.user_data.value[2] = entity_id.entityKey[2];
2672 0 : set_reader_effective_data_rep_qos(datareaderqos.representation.value);
2673 0 : if (!registry.reader_map.insert(std::make_pair(id,
2674 0 : EndpointRegistry::Reader(topic_name, datareaderqos, subscriberqos, config_name, trans_info))).second) {
2675 0 : ACE_ERROR_RETURN((LM_ERROR,
2676 : ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
2677 : ACE_TEXT("Section [endpoint/%C] ignored - duplicate reader.\n"),
2678 : endpoint_name.c_str()),
2679 : -1);
2680 : }
2681 0 : break;
2682 0 : case Writer:
2683 : // Populate the userdata.
2684 0 : datawriterqos.user_data.value.length(3);
2685 0 : datawriterqos.user_data.value[0] = entity_id.entityKey[0];
2686 0 : datawriterqos.user_data.value[1] = entity_id.entityKey[1];
2687 0 : datawriterqos.user_data.value[2] = entity_id.entityKey[2];
2688 0 : bool encapsulated_only = false;
2689 0 : for (CORBA::ULong i = 0; i < trans_info.length(); ++i) {
2690 0 : if (0 == std::strcmp(trans_info[i].transport_type, "rtps_udp")) {
2691 0 : encapsulated_only = true;
2692 0 : break;
2693 : }
2694 : }
2695 0 : set_writer_effective_data_rep_qos(datawriterqos.representation.value, encapsulated_only);
2696 :
2697 0 : if (!registry.writer_map.insert(std::make_pair(id,
2698 0 : EndpointRegistry::Writer(topic_name, datawriterqos, publisherqos, config_name, trans_info))).second) {
2699 0 : ACE_ERROR_RETURN((LM_ERROR,
2700 : ACE_TEXT("(%P|%t) ERROR: StaticDiscovery::parse_endpoints ")
2701 : ACE_TEXT("Section [endpoint/%C] ignored - duplicate writer.\n"),
2702 : endpoint_name.c_str()),
2703 : -1);
2704 : }
2705 0 : break;
2706 : }
2707 0 : }
2708 :
2709 0 : return 0;
2710 0 : }
2711 :
2712 0 : void StaticDiscovery::pre_writer(DataWriterImpl* writer)
2713 : {
2714 0 : const DDS::Publisher_var pub = writer->get_publisher();
2715 0 : const DDS::DomainParticipant_var part = pub->get_participant();
2716 0 : const DDS::DomainId_t dom = part->get_domain_id();
2717 :
2718 0 : DDS::DomainParticipantQos partQos;
2719 0 : part->get_qos(partQos);
2720 0 : if (partQos.user_data.value.length() < 6)
2721 0 : return;
2722 0 : const unsigned char* const partId = partQos.user_data.value.get_buffer();
2723 :
2724 0 : DDS::DataWriterQos qos;
2725 0 : writer->get_qos(qos);
2726 0 : if (qos.user_data.value.length() < 3)
2727 0 : return;
2728 0 : const unsigned char* const dwId = qos.user_data.value.get_buffer();
2729 :
2730 : const EntityId_t entId =
2731 0 : EndpointRegistry::build_id(dwId, ENTITYKIND_USER_WRITER_WITH_KEY);
2732 0 : const GUID_t rid = EndpointRegistry::build_id(dom, partId, entId);
2733 :
2734 : const EndpointRegistry::WriterMapType::const_iterator iter =
2735 0 : registry.writer_map.find(rid);
2736 :
2737 0 : if (iter != registry.writer_map.end() && !iter->second.trans_cfg.empty()) {
2738 0 : TransportRegistry::instance()->bind_config(iter->second.trans_cfg, writer);
2739 : }
2740 0 : }
2741 :
2742 0 : void StaticDiscovery::pre_reader(DataReaderImpl* reader)
2743 : {
2744 0 : const DDS::Subscriber_var sub = reader->get_subscriber();
2745 0 : const DDS::DomainParticipant_var part = sub->get_participant();
2746 0 : const DDS::DomainId_t dom = part->get_domain_id();
2747 :
2748 0 : DDS::DomainParticipantQos partQos;
2749 0 : part->get_qos(partQos);
2750 0 : if (partQos.user_data.value.length() < 6)
2751 0 : return;
2752 0 : const unsigned char* const partId = partQos.user_data.value.get_buffer();
2753 :
2754 0 : DDS::DataReaderQos qos;
2755 0 : reader->get_qos(qos);
2756 0 : if (qos.user_data.value.length() < 3)
2757 0 : return;
2758 0 : const unsigned char* const drId = qos.user_data.value.get_buffer();
2759 :
2760 : const EntityId_t entId =
2761 0 : EndpointRegistry::build_id(drId, ENTITYKIND_USER_READER_WITH_KEY);
2762 0 : const GUID_t rid = EndpointRegistry::build_id(dom, partId, entId);
2763 :
2764 : const EndpointRegistry::ReaderMapType::const_iterator iter =
2765 0 : registry.reader_map.find(rid);
2766 :
2767 0 : if (iter != registry.reader_map.end() && !iter->second.trans_cfg.empty()) {
2768 0 : TransportRegistry::instance()->bind_config(iter->second.trans_cfg, reader);
2769 : }
2770 0 : }
2771 :
2772 : StaticDiscovery_rch StaticDiscovery::instance_(make_rch<StaticDiscovery>(Discovery::DEFAULT_STATIC));
2773 :
2774 0 : RcHandle<BitSubscriber> StaticDiscovery::init_bit(DomainParticipantImpl* participant)
2775 : {
2776 0 : DDS::Subscriber_var bit_subscriber;
2777 : #ifndef DDS_HAS_MINIMUM_BIT
2778 0 : if (!TheServiceParticipant->get_BIT()) {
2779 0 : get_part(participant->get_domain_id(), participant->get_id())->init_bit(bit_subscriber);
2780 0 : return RcHandle<BitSubscriber>();
2781 : }
2782 :
2783 0 : if (create_bit_topics(participant) != DDS::RETCODE_OK) {
2784 0 : return RcHandle<BitSubscriber>();
2785 : }
2786 :
2787 : bit_subscriber =
2788 0 : participant->create_subscriber(SUBSCRIBER_QOS_DEFAULT,
2789 : DDS::SubscriberListener::_nil(),
2790 0 : DEFAULT_STATUS_MASK);
2791 0 : SubscriberImpl* sub = dynamic_cast<SubscriberImpl*>(bit_subscriber.in());
2792 0 : if (sub == 0) {
2793 0 : ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) PeerDiscovery::init_bit")
2794 : ACE_TEXT(" - Could not cast Subscriber to SubscriberImpl\n")));
2795 0 : return RcHandle<BitSubscriber>();
2796 : }
2797 :
2798 0 : DDS::DataReaderQos dr_qos;
2799 0 : sub->get_default_datareader_qos(dr_qos);
2800 0 : dr_qos.durability.kind = DDS::TRANSIENT_LOCAL_DURABILITY_QOS;
2801 :
2802 : dr_qos.reader_data_lifecycle.autopurge_nowriter_samples_delay =
2803 0 : TheServiceParticipant->bit_autopurge_nowriter_samples_delay();
2804 : dr_qos.reader_data_lifecycle.autopurge_disposed_samples_delay =
2805 0 : TheServiceParticipant->bit_autopurge_disposed_samples_delay();
2806 :
2807 : DDS::TopicDescription_var bit_part_topic =
2808 0 : participant->lookup_topicdescription(BUILT_IN_PARTICIPANT_TOPIC);
2809 0 : create_bit_dr(bit_part_topic, BUILT_IN_PARTICIPANT_TOPIC_TYPE,
2810 : sub, dr_qos);
2811 :
2812 : DDS::TopicDescription_var bit_topic_topic =
2813 0 : participant->lookup_topicdescription(BUILT_IN_TOPIC_TOPIC);
2814 0 : create_bit_dr(bit_topic_topic, BUILT_IN_TOPIC_TOPIC_TYPE,
2815 : sub, dr_qos);
2816 :
2817 : DDS::TopicDescription_var bit_pub_topic =
2818 0 : participant->lookup_topicdescription(BUILT_IN_PUBLICATION_TOPIC);
2819 0 : create_bit_dr(bit_pub_topic, BUILT_IN_PUBLICATION_TOPIC_TYPE,
2820 : sub, dr_qos);
2821 :
2822 : DDS::TopicDescription_var bit_sub_topic =
2823 0 : participant->lookup_topicdescription(BUILT_IN_SUBSCRIPTION_TOPIC);
2824 0 : create_bit_dr(bit_sub_topic, BUILT_IN_SUBSCRIPTION_TOPIC_TYPE,
2825 : sub, dr_qos);
2826 :
2827 : DDS::TopicDescription_var bit_part_loc_topic =
2828 0 : participant->lookup_topicdescription(BUILT_IN_PARTICIPANT_LOCATION_TOPIC);
2829 0 : create_bit_dr(bit_part_loc_topic, BUILT_IN_PARTICIPANT_LOCATION_TOPIC_TYPE,
2830 : sub, dr_qos);
2831 :
2832 : DDS::TopicDescription_var bit_connection_record_topic =
2833 0 : participant->lookup_topicdescription(BUILT_IN_CONNECTION_RECORD_TOPIC);
2834 0 : create_bit_dr(bit_connection_record_topic, BUILT_IN_CONNECTION_RECORD_TOPIC_TYPE,
2835 : sub, dr_qos);
2836 :
2837 : DDS::TopicDescription_var bit_internal_thread_topic =
2838 0 : participant->lookup_topicdescription(BUILT_IN_INTERNAL_THREAD_TOPIC);
2839 0 : create_bit_dr(bit_internal_thread_topic, BUILT_IN_INTERNAL_THREAD_TOPIC_TYPE,
2840 : sub, dr_qos);
2841 :
2842 0 : const DDS::ReturnCode_t ret = bit_subscriber->enable();
2843 0 : if (ret != DDS::RETCODE_OK) {
2844 0 : if (DCPS_debug_level) {
2845 0 : ACE_DEBUG((LM_INFO, ACE_TEXT("(%P|%t) PeerDiscovery::init_bit")
2846 : ACE_TEXT(" - Error %d enabling subscriber\n"), ret));
2847 : }
2848 0 : return RcHandle<BitSubscriber>();
2849 : }
2850 : #endif /* DDS_HAS_MINIMUM_BIT */
2851 :
2852 0 : get_part(participant->get_domain_id(), participant->get_id())->init_bit(bit_subscriber);
2853 :
2854 0 : return make_rch<BitSubscriber>(bit_subscriber);
2855 0 : }
2856 :
2857 0 : void StaticDiscovery::fini_bit(DCPS::DomainParticipantImpl* participant)
2858 : {
2859 0 : get_part(participant->get_domain_id(), participant->get_id())->fini_bit();
2860 0 : }
2861 :
2862 0 : bool StaticDiscovery::attach_participant(
2863 : DDS::DomainId_t /*domainId*/, const GUID_t& /*participantId*/)
2864 : {
2865 0 : return false; // This is just for DCPSInfoRepo?
2866 : }
2867 :
2868 0 : bool StaticDiscovery::remove_domain_participant(
2869 : DDS::DomainId_t domain_id, const GUID_t& participantId)
2870 : {
2871 : // Use reference counting to ensure participant
2872 : // does not get deleted until lock as been released.
2873 0 : ParticipantHandle participant;
2874 0 : ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, false);
2875 0 : DomainParticipantMap::iterator domain = participants_.find(domain_id);
2876 0 : if (domain == participants_.end()) {
2877 0 : return false;
2878 : }
2879 0 : ParticipantMap::iterator part = domain->second.find(participantId);
2880 0 : if (part == domain->second.end()) {
2881 0 : return false;
2882 : }
2883 0 : participant = part->second;
2884 0 : domain->second.erase(part);
2885 0 : if (domain->second.empty()) {
2886 0 : participants_.erase(domain);
2887 : }
2888 :
2889 0 : participant->shutdown();
2890 0 : return true;
2891 0 : }
2892 :
2893 0 : bool StaticDiscovery::ignore_domain_participant(
2894 : DDS::DomainId_t domain, const GUID_t& myParticipantId, const GUID_t& ignoreId)
2895 : {
2896 0 : get_part(domain, myParticipantId)->ignore_domain_participant(ignoreId);
2897 0 : return true;
2898 : }
2899 :
2900 0 : bool StaticDiscovery::update_domain_participant_qos(
2901 : DDS::DomainId_t domain, const GUID_t& participant, const DDS::DomainParticipantQos& qos)
2902 : {
2903 0 : return get_part(domain, participant)->update_domain_participant_qos(qos);
2904 : }
2905 :
2906 0 : DCPS::TopicStatus StaticDiscovery::assert_topic(
2907 : GUID_t& topicId,
2908 : DDS::DomainId_t domainId,
2909 : const GUID_t& participantId,
2910 : const char* topicName,
2911 : const char* dataTypeName,
2912 : const DDS::TopicQos& qos,
2913 : bool hasDcpsKey,
2914 : DCPS::TopicCallbacks* topic_callbacks)
2915 : {
2916 0 : ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, DCPS::INTERNAL_ERROR);
2917 : // Verified its safe to hold lock during call to assert_topic
2918 0 : return participants_[domainId][participantId]->assert_topic(topicId, topicName,
2919 : dataTypeName, qos,
2920 0 : hasDcpsKey, topic_callbacks);
2921 0 : }
2922 :
2923 0 : DCPS::TopicStatus StaticDiscovery::find_topic(
2924 : DDS::DomainId_t domainId,
2925 : const GUID_t& participantId,
2926 : const char* topicName,
2927 : CORBA::String_out dataTypeName,
2928 : DDS::TopicQos_out qos,
2929 : GUID_t& topicId)
2930 : {
2931 0 : ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, DCPS::INTERNAL_ERROR);
2932 0 : return participants_[domainId][participantId]->find_topic(topicName, dataTypeName, qos, topicId);
2933 0 : }
2934 :
2935 0 : DCPS::TopicStatus StaticDiscovery::remove_topic(
2936 : DDS::DomainId_t domainId,
2937 : const GUID_t& participantId,
2938 : const GUID_t& topicId)
2939 : {
2940 0 : ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, DCPS::INTERNAL_ERROR);
2941 : // Safe to hold lock while calling remove topic
2942 0 : return participants_[domainId][participantId]->remove_topic(topicId);
2943 0 : }
2944 :
2945 0 : bool StaticDiscovery::ignore_topic(DDS::DomainId_t domainId, const GUID_t& myParticipantId,
2946 : const GUID_t& ignoreId)
2947 : {
2948 0 : get_part(domainId, myParticipantId)->ignore_topic(ignoreId);
2949 0 : return true;
2950 : }
2951 :
2952 0 : bool StaticDiscovery::update_topic_qos(const GUID_t& topicId, DDS::DomainId_t domainId,
2953 : const GUID_t& participantId, const DDS::TopicQos& qos)
2954 : {
2955 0 : ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, false);
2956 : // Safe to hold lock while calling update_topic_qos
2957 0 : return participants_[domainId][participantId]->update_topic_qos(topicId, qos);
2958 0 : }
2959 :
2960 0 : GUID_t StaticDiscovery::add_publication(
2961 : DDS::DomainId_t domainId,
2962 : const GUID_t& participantId,
2963 : const GUID_t& topicId,
2964 : DCPS::DataWriterCallbacks_rch publication,
2965 : const DDS::DataWriterQos& qos,
2966 : const DCPS::TransportLocatorSeq& transInfo,
2967 : const DDS::PublisherQos& publisherQos,
2968 : const XTypes::TypeInformation& type_info)
2969 : {
2970 0 : return get_part(domainId, participantId)->add_publication(
2971 0 : topicId, publication, qos, transInfo, publisherQos, type_info);
2972 : }
2973 :
2974 0 : bool StaticDiscovery::remove_publication(
2975 : DDS::DomainId_t domainId, const GUID_t& participantId, const GUID_t& publicationId)
2976 : {
2977 0 : get_part(domainId, participantId)->remove_publication(publicationId);
2978 0 : return true;
2979 : }
2980 :
2981 0 : bool StaticDiscovery::ignore_publication(
2982 : DDS::DomainId_t domainId, const GUID_t& participantId, const GUID_t& ignoreId)
2983 : {
2984 0 : get_part(domainId, participantId)->ignore_publication(ignoreId);
2985 0 : return true;
2986 : }
2987 :
2988 0 : bool StaticDiscovery::update_publication_qos(
2989 : DDS::DomainId_t domainId,
2990 : const GUID_t& partId,
2991 : const GUID_t& dwId,
2992 : const DDS::DataWriterQos& qos,
2993 : const DDS::PublisherQos& publisherQos)
2994 : {
2995 0 : return get_part(domainId, partId)->update_publication_qos(dwId, qos,
2996 0 : publisherQos);
2997 : }
2998 :
2999 0 : void StaticDiscovery::update_publication_locators(
3000 : DDS::DomainId_t domainId, const GUID_t& partId, const GUID_t& dwId,
3001 : const DCPS::TransportLocatorSeq& transInfo)
3002 : {
3003 0 : get_part(domainId, partId)->update_publication_locators(dwId, transInfo);
3004 0 : }
3005 :
3006 0 : GUID_t StaticDiscovery::add_subscription(
3007 : DDS::DomainId_t domainId,
3008 : const GUID_t& participantId,
3009 : const GUID_t& topicId,
3010 : DCPS::DataReaderCallbacks_rch subscription,
3011 : const DDS::DataReaderQos& qos,
3012 : const DCPS::TransportLocatorSeq& transInfo,
3013 : const DDS::SubscriberQos& subscriberQos,
3014 : const char* filterClassName,
3015 : const char* filterExpr,
3016 : const DDS::StringSeq& params,
3017 : const XTypes::TypeInformation& type_info)
3018 : {
3019 0 : return get_part(domainId, participantId)->add_subscription(
3020 : topicId, subscription, qos, transInfo, subscriberQos, filterClassName,
3021 0 : filterExpr, params, type_info);
3022 : }
3023 :
3024 0 : bool StaticDiscovery::remove_subscription(
3025 : DDS::DomainId_t domainId, const GUID_t& participantId, const GUID_t& subscriptionId)
3026 : {
3027 0 : get_part(domainId, participantId)->remove_subscription(subscriptionId);
3028 0 : return true;
3029 : }
3030 :
3031 0 : bool StaticDiscovery::ignore_subscription(
3032 : DDS::DomainId_t domainId, const GUID_t& participantId, const GUID_t& ignoreId)
3033 : {
3034 0 : get_part(domainId, participantId)->ignore_subscription(ignoreId);
3035 0 : return true;
3036 : }
3037 :
3038 0 : bool StaticDiscovery::update_subscription_qos(
3039 : DDS::DomainId_t domainId,
3040 : const GUID_t& partId,
3041 : const GUID_t& drId,
3042 : const DDS::DataReaderQos& qos,
3043 : const DDS::SubscriberQos& subQos)
3044 : {
3045 0 : return get_part(domainId, partId)->update_subscription_qos(drId, qos, subQos);
3046 : }
3047 :
3048 0 : bool StaticDiscovery::update_subscription_params(
3049 : DDS::DomainId_t domainId, const GUID_t& partId, const GUID_t& subId, const DDS::StringSeq& params)
3050 : {
3051 0 : return get_part(domainId, partId)->update_subscription_params(subId, params);
3052 : }
3053 :
3054 0 : void StaticDiscovery::update_subscription_locators(
3055 : DDS::DomainId_t domainId, const GUID_t& partId, const GUID_t& subId,
3056 : const DCPS::TransportLocatorSeq& transInfo)
3057 : {
3058 0 : get_part(domainId, partId)->update_subscription_locators(subId, transInfo);
3059 0 : }
3060 :
3061 0 : StaticDiscovery::ParticipantHandle StaticDiscovery::get_part(
3062 : const DDS::DomainId_t domain_id, const GUID_t& part_id) const
3063 : {
3064 0 : ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, ParticipantHandle());
3065 0 : DomainParticipantMap::const_iterator domain = participants_.find(domain_id);
3066 0 : if (domain == participants_.end()) {
3067 0 : return ParticipantHandle();
3068 : }
3069 0 : ParticipantMap::const_iterator part = domain->second.find(part_id);
3070 0 : if (part == domain->second.end()) {
3071 0 : return ParticipantHandle();
3072 : }
3073 0 : return part->second;
3074 0 : }
3075 :
3076 0 : void StaticDiscovery::create_bit_dr(DDS::TopicDescription_ptr topic, const char* type,
3077 : SubscriberImpl* sub, const DDS::DataReaderQos& qos)
3078 : {
3079 : TopicDescriptionImpl* bit_topic_i =
3080 0 : dynamic_cast<TopicDescriptionImpl*>(topic);
3081 0 : if (bit_topic_i == 0) {
3082 0 : ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) PeerDiscovery::create_bit_dr")
3083 : ACE_TEXT(" - Could not cast TopicDescription to TopicDescriptionImpl\n")));
3084 0 : return;
3085 : }
3086 :
3087 0 : DDS::DomainParticipant_var participant = sub->get_participant();
3088 : DomainParticipantImpl* participant_i =
3089 0 : dynamic_cast<DomainParticipantImpl*>(participant.in());
3090 0 : if (participant_i == 0) {
3091 0 : ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) PeerDiscovery::create_bit_dr")
3092 : ACE_TEXT(" - Could not cast DomainParticipant to DomainParticipantImpl\n")));
3093 0 : return;
3094 : }
3095 :
3096 : TypeSupport_var type_support =
3097 0 : Registered_Data_Types->lookup(participant, type);
3098 :
3099 0 : DDS::DataReader_var dr = type_support->create_datareader();
3100 0 : DataReaderImpl* dri = dynamic_cast<DataReaderImpl*>(dr.in());
3101 0 : if (dri == 0) {
3102 0 : ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) PeerDiscovery::create_bit_dr")
3103 : ACE_TEXT(" - Could not cast DataReader to DataReaderImpl\n")));
3104 0 : return;
3105 : }
3106 :
3107 0 : dri->init(bit_topic_i, qos, 0 /*listener*/, 0 /*mask*/, participant_i, sub);
3108 0 : dri->disable_transport();
3109 0 : dri->enable();
3110 0 : }
3111 :
3112 0 : void StaticParticipant::remove_discovered_participant(DiscoveredParticipantIter& iter)
3113 : {
3114 0 : if (iter == participants_.end()) {
3115 0 : return;
3116 : }
3117 0 : GUID_t part_id = iter->first;
3118 0 : bool removed = endpoint_manager().disassociate();
3119 0 : iter = participants_.find(part_id); // refresh iter after disassociate, which can unlock
3120 0 : if (iter == participants_.end()) {
3121 0 : return;
3122 : }
3123 0 : if (removed) {
3124 : #ifndef DDS_HAS_MINIMUM_BIT
3125 0 : ParticipantBuiltinTopicDataDataReaderImpl* bit = part_bit();
3126 0 : ParticipantLocationBuiltinTopicDataDataReaderImpl* loc_bit = part_loc_bit();
3127 : // bit may be null if the DomainParticipant is shutting down
3128 0 : if ((bit && iter->second.bit_ih_ != DDS::HANDLE_NIL) ||
3129 0 : (loc_bit && iter->second.location_ih_ != DDS::HANDLE_NIL)) {
3130 : {
3131 0 : const DDS::InstanceHandle_t bit_ih = iter->second.bit_ih_;
3132 0 : const DDS::InstanceHandle_t location_ih = iter->second.location_ih_;
3133 :
3134 0 : ACE_Reverse_Lock<ACE_Thread_Mutex> rev_lock(lock_);
3135 0 : ACE_GUARD(ACE_Reverse_Lock<ACE_Thread_Mutex>, rg, rev_lock);
3136 0 : if (bit && bit_ih != DDS::HANDLE_NIL) {
3137 0 : bit->set_instance_state(bit_ih,
3138 : DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE);
3139 : }
3140 0 : if (loc_bit && location_ih != DDS::HANDLE_NIL) {
3141 0 : loc_bit->set_instance_state(location_ih,
3142 : DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE);
3143 : }
3144 0 : }
3145 0 : iter = participants_.find(part_id);
3146 0 : if (iter == participants_.end()) {
3147 0 : return;
3148 : }
3149 : }
3150 : #endif /* DDS_HAS_MINIMUM_BIT */
3151 0 : if (DCPS_debug_level > 3) {
3152 0 : ACE_DEBUG((LM_INFO, ACE_TEXT("(%P|%t) LocalParticipant::remove_discovered_participant")
3153 : ACE_TEXT(" - erasing %C (%B)\n"), LogGuid(iter->first).c_str(), participants_.size()));
3154 : }
3155 :
3156 0 : remove_discovered_participant_i(iter);
3157 :
3158 0 : participants_.erase(iter);
3159 : }
3160 : }
3161 :
3162 : } // namespace DCPS
3163 : } // namespace OpenDDS
3164 :
3165 : OPENDDS_END_VERSIONED_NAMESPACE_DECL
|