Line data Source code
1 : /*
2 : *
3 : *
4 : * Distributed under the OpenDDS License.
5 : * See: http://www.opendds.org/license.html
6 : */
7 :
8 : #ifndef OPENDDS_DCPS_INTERNAL_DATA_READER_H
9 : #define OPENDDS_DCPS_INTERNAL_DATA_READER_H
10 :
11 : #include "dcps_export.h"
12 :
13 : #ifndef ACE_LACKS_PRAGMA_ONCE
14 : # pragma once
15 : #endif /* ACE_LACKS_PRAGMA_ONCE */
16 :
17 : #include "RcObject.h"
18 : #include "PoolAllocator.h"
19 : #include "InternalDataReaderListener.h"
20 : #include "Time_Helper.h"
21 : #include "TimeTypes.h"
22 :
23 : #include <dds/DdsDcpsCoreC.h>
24 : #include <dds/DdsDcpsInfrastructureC.h>
25 :
26 : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
27 :
28 : namespace OpenDDS {
29 : namespace DCPS {
30 :
31 : class InternalEntity : public virtual RcObject {};
32 : typedef WeakRcHandle<InternalEntity> InternalEntity_wrch;
33 :
34 : typedef OPENDDS_VECTOR(DDS::SampleInfo) InternalSampleInfoSequence;
35 :
36 56 : inline DDS::SampleInfo make_sample_info(DDS::SampleStateKind sample_state,
37 : DDS::ViewStateKind view_state,
38 : DDS::InstanceStateKind instance_state,
39 : CORBA::Long disposed_generation_count,
40 : CORBA::Long no_writers_generation_count,
41 : CORBA::Long sample_rank,
42 : CORBA::Long generation_rank,
43 : CORBA::Long absolute_generation_rank,
44 : bool valid_data)
45 : {
46 : DDS::SampleInfo si;
47 56 : si.sample_state = sample_state;
48 56 : si.view_state = view_state;
49 56 : si.instance_state = instance_state;
50 56 : si.source_timestamp = make_time_t(0, 0); // TODO
51 56 : si.instance_handle = DDS::HANDLE_NIL; // TODO
52 56 : si.publication_handle = DDS::HANDLE_NIL; // TODO
53 56 : si.disposed_generation_count = disposed_generation_count;
54 56 : si.no_writers_generation_count = no_writers_generation_count;
55 56 : si.sample_rank = sample_rank;
56 56 : si.generation_rank = generation_rank;
57 56 : si.absolute_generation_rank = absolute_generation_rank;
58 56 : si.valid_data = valid_data;
59 56 : return si;
60 : }
61 :
62 : #ifndef OPENDDS_SAFETY_PROFILE
63 28 : inline bool operator==(const DDS::SampleInfo& x, const DDS::SampleInfo& y)
64 : {
65 56 : return x.sample_state == y.sample_state &&
66 28 : x.view_state == y.view_state &&
67 28 : x.instance_state == y.instance_state &&
68 28 : x.source_timestamp == y.source_timestamp &&
69 28 : x.instance_handle == y.instance_handle &&
70 28 : x.publication_handle == y.publication_handle &&
71 28 : x.disposed_generation_count == y.disposed_generation_count &&
72 28 : x.no_writers_generation_count == y.no_writers_generation_count &&
73 28 : x.sample_rank == y.sample_rank &&
74 28 : x.generation_rank == y.generation_rank &&
75 84 : x.absolute_generation_rank == y.absolute_generation_rank &&
76 56 : x.valid_data == y.valid_data;
77 : }
78 : #endif
79 :
80 : class SampleInfoWrapper {
81 : public:
82 56 : SampleInfoWrapper(const DDS::SampleInfo& sample_info)
83 56 : : si(sample_info)
84 56 : {}
85 :
86 28 : bool operator==(const SampleInfoWrapper& other) const
87 : {
88 28 : return si == other.si;
89 : }
90 :
91 : DDS::SampleInfo si;
92 : };
93 :
94 : template <typename T>
95 : class InternalDataReader : public InternalEntity {
96 : public:
97 : typedef OPENDDS_VECTOR(T) SampleSequence;
98 : typedef RcHandle<InternalDataReaderListener<T> > Listener_rch;
99 : typedef WeakRcHandle<InternalDataReaderListener<T> > Listener_wrch;
100 :
101 29 : explicit InternalDataReader(const DDS::DataReaderQos qos,
102 : Listener_rch listener = Listener_rch())
103 29 : : qos_(qos)
104 29 : , listener_(listener)
105 29 : {}
106 :
107 : /// @name InternalTopic and InternalWriter Interface
108 : /// @{
109 4 : bool durable() const { return qos_.durability.kind == DDS::TRANSIENT_LOCAL_DURABILITY_QOS; }
110 :
111 5 : void remove_publication(InternalEntity_wrch publication_handle, bool autodispose_unregistered_instances)
112 : {
113 5 : ACE_GUARD(ACE_Thread_Mutex, g, mutex_);
114 :
115 : // FUTURE: Index by publication_handle to avoid the loop.
116 5 : bool schedule = false;
117 8 : for (typename InstanceMap::iterator pos = instance_map_.begin(), limit = instance_map_.end(); pos != limit; ++pos) {
118 3 : if (autodispose_unregistered_instances && pos->second.dispose(publication_handle, qos_)) {
119 2 : schedule = true;
120 : }
121 3 : if (pos->second.unregister_instance(publication_handle, qos_)) {
122 1 : schedule = true;
123 : }
124 : }
125 :
126 5 : if (schedule) {
127 3 : const Listener_rch listener = listener_.lock();
128 3 : if (listener) {
129 0 : listener->schedule(rchandle_from(this));
130 : // TODO: If the listener doesn't do anything, then clean up then possibly clean up the instance.
131 : }
132 3 : }
133 5 : }
134 :
135 29 : void write(InternalEntity_wrch publication_handle, const T& sample)
136 : {
137 29 : ACE_GUARD(ACE_Thread_Mutex, g, mutex_);
138 :
139 29 : const std::pair<typename InstanceMap::iterator, bool> p = instance_map_.insert(std::make_pair(sample, Instance()));
140 29 : p.first->second.write(publication_handle, sample, qos_);
141 :
142 29 : const Listener_rch listener = listener_.lock();
143 29 : if (listener) {
144 1 : listener->schedule(rchandle_from(this));
145 : }
146 29 : }
147 :
148 5 : void dispose(InternalEntity_wrch publication_handle, const T& sample)
149 : {
150 5 : ACE_GUARD(ACE_Thread_Mutex, g, mutex_);
151 :
152 5 : typename InstanceMap::iterator pos = instance_map_.find(sample);
153 5 : if (pos == instance_map_.end()) {
154 0 : return;
155 : }
156 :
157 5 : if (pos->second.dispose(publication_handle, qos_)) {
158 5 : const Listener_rch listener = listener_.lock();
159 5 : if (listener) {
160 0 : listener->schedule(rchandle_from(this));
161 : }
162 5 : }
163 5 : }
164 :
165 5 : void unregister_instance(InternalEntity_wrch publication_handle, const T& sample)
166 : {
167 5 : ACE_GUARD(ACE_Thread_Mutex, g, mutex_);
168 :
169 5 : typename InstanceMap::iterator pos = instance_map_.find(sample);
170 5 : if (pos == instance_map_.end()) {
171 0 : return;
172 : }
173 :
174 5 : if (pos->second.unregister_instance(publication_handle, qos_)) {
175 4 : const Listener_rch listener = listener_.lock();
176 4 : if (listener) {
177 0 : listener->schedule(rchandle_from(this));
178 : }
179 4 : }
180 5 : }
181 : /// @}
182 :
183 : /// @name User Interface
184 : /// @{
185 1 : void set_listener(Listener_rch listener)
186 : {
187 1 : ACE_GUARD(ACE_Thread_Mutex, g, mutex_);
188 1 : listener_ = listener;
189 1 : }
190 :
191 1 : Listener_rch get_listener() const
192 : {
193 1 : ACE_GUARD_RETURN(ACE_Thread_Mutex, g, mutex_, Listener_rch());
194 1 : return listener_.lock();
195 1 : }
196 :
197 10 : void read(SampleSequence& samples,
198 : InternalSampleInfoSequence& infos,
199 : CORBA::Long max_samples,
200 : DDS::SampleStateMask sample_states,
201 : DDS::ViewStateMask view_states,
202 : DDS::InstanceStateMask instance_states)
203 : {
204 10 : samples.clear();
205 10 : infos.clear();
206 :
207 : // TODO: Index to avoid the loop.
208 10 : ACE_GUARD(ACE_Thread_Mutex, g, mutex_);
209 20 : for (typename InstanceMap::iterator pos = instance_map_.begin(), limit = instance_map_.end(); pos != limit; ) {
210 10 : pos->second.read(pos->first, samples, infos, max_samples, sample_states, view_states, instance_states);
211 10 : pos->second.purge_samples(qos_);
212 10 : if (pos->second.can_purge_instance(qos_)) {
213 0 : instance_map_.erase(pos++);
214 : } else {
215 10 : ++pos;
216 : }
217 : }
218 10 : }
219 :
220 23 : void take(SampleSequence& samples,
221 : InternalSampleInfoSequence& infos,
222 : CORBA::Long max_samples,
223 : DDS::SampleStateMask sample_states,
224 : DDS::ViewStateMask view_states,
225 : DDS::InstanceStateMask instance_states)
226 : {
227 23 : samples.clear();
228 23 : infos.clear();
229 :
230 : // TODO: Index to avoid the loop.
231 23 : ACE_GUARD(ACE_Thread_Mutex, g, mutex_);
232 48 : for (typename InstanceMap::iterator pos = instance_map_.begin(), limit = instance_map_.end(); pos != limit; ) {
233 25 : pos->second.take(pos->first, samples, infos, max_samples, sample_states, view_states, instance_states);
234 25 : pos->second.purge_samples(qos_);
235 25 : if (pos->second.can_purge_instance(qos_)) {
236 7 : instance_map_.erase(pos++);
237 : } else {
238 18 : ++pos;
239 : }
240 : }
241 23 : }
242 :
243 2 : void read_instance(SampleSequence& samples,
244 : InternalSampleInfoSequence& infos,
245 : CORBA::Long max_samples,
246 : const T& key,
247 : DDS::SampleStateMask sample_states,
248 : DDS::ViewStateMask view_states,
249 : DDS::InstanceStateMask instance_states)
250 : {
251 2 : samples.clear();
252 2 : infos.clear();
253 :
254 2 : ACE_GUARD(ACE_Thread_Mutex, g, mutex_);
255 2 : typename InstanceMap::iterator pos = instance_map_.find(key);
256 2 : if (pos != instance_map_.end()) {
257 1 : pos->second.read(pos->first, samples, infos, max_samples, sample_states, view_states, instance_states);
258 1 : pos->second.purge_samples(qos_);
259 1 : if (pos->second.can_purge_instance(qos_)) {
260 0 : instance_map_.erase(pos);
261 : }
262 : }
263 2 : }
264 :
265 2 : void take_instance(SampleSequence& samples,
266 : InternalSampleInfoSequence& infos,
267 : CORBA::Long max_samples,
268 : const T& key,
269 : DDS::SampleStateMask sample_states,
270 : DDS::ViewStateMask view_states,
271 : DDS::InstanceStateMask instance_states)
272 : {
273 2 : samples.clear();
274 2 : infos.clear();
275 :
276 2 : ACE_GUARD(ACE_Thread_Mutex, g, mutex_);
277 2 : typename InstanceMap::iterator pos = instance_map_.find(key);
278 2 : if (pos != instance_map_.end()) {
279 1 : pos->second.take(pos->first, samples, infos, max_samples, sample_states, view_states, instance_states);
280 1 : pos->second.purge_samples(qos_);
281 1 : if (pos->second.can_purge_instance(qos_)) {
282 0 : instance_map_.erase(pos);
283 : }
284 : }
285 2 : }
286 : /// @}
287 :
288 : private:
289 : const DDS::DataReaderQos qos_;
290 : // Often, the listener will have the reader as a member. Use a weak
291 : // pointer to prevent a cycle that prevents the listener from being
292 : // destroyed.
293 : Listener_wrch listener_;
294 :
295 : typedef OPENDDS_SET(InternalEntity_wrch) PublicationSet;
296 :
297 : class Instance {
298 : public:
299 :
300 29 : Instance()
301 29 : : view_state_(DDS::NEW_VIEW_STATE)
302 29 : , instance_state_(DDS::ALIVE_INSTANCE_STATE)
303 29 : , disposed_generation_count_(0)
304 29 : , no_writers_generation_count_(0)
305 29 : , informed_of_not_alive_(false)
306 : {
307 29 : disposed_expiration_date_.sec = 0;
308 29 : disposed_expiration_date_.nanosec = 0;
309 29 : no_writers_expiration_date_.sec = 0;
310 29 : no_writers_expiration_date_.nanosec = 0;
311 29 : }
312 :
313 : DDS::ViewStateKind view_state() const { return view_state_; }
314 :
315 : DDS::InstanceStateKind instance_state() const { return instance_state_; }
316 :
317 37 : void purge_samples(const DDS::DataReaderQos& qos)
318 : {
319 83 : if (instance_state_ == DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE &&
320 37 : !is_infinite(qos.reader_data_lifecycle.autopurge_disposed_samples_delay) &&
321 37 : SystemTimePoint::now().to_dds_time() > disposed_expiration_date_) {
322 0 : not_read_samples_.clear();
323 0 : read_samples_.clear();
324 : }
325 37 : }
326 :
327 37 : bool can_purge_instance(const DDS::DataReaderQos& qos) const
328 : {
329 16 : if (instance_state_ != DDS::ALIVE_INSTANCE_STATE &&
330 30 : not_read_samples_.empty() &&
331 67 : read_samples_.empty() &&
332 11 : publication_set_.empty()) {
333 7 : return true;
334 : }
335 :
336 63 : if (instance_state_ == DDS::NOT_ALIVE_NO_WRITERS_INSTANCE_STATE &&
337 30 : !is_infinite(qos.reader_data_lifecycle.autopurge_nowriter_samples_delay) &&
338 30 : SystemTimePoint::now().to_dds_time() > no_writers_expiration_date_) {
339 0 : return true;
340 : }
341 :
342 30 : return false;
343 : }
344 :
345 11 : void read(const T& key,
346 : SampleSequence& samples,
347 : InternalSampleInfoSequence& infos,
348 : CORBA::Long max_samples,
349 : DDS::SampleStateMask sample_states,
350 : DDS::ViewStateMask view_states,
351 : DDS::InstanceStateMask instance_states)
352 : {
353 11 : if (!((view_states & view_state_) && (instance_states & instance_state_))) {
354 4 : return;
355 : }
356 :
357 7 : CORBA::Long sample_count = 0;
358 :
359 7 : if (sample_states & DDS::READ_SAMPLE_STATE) {
360 6 : for (typename SampleList::const_iterator pos = read_samples_.begin(), limit = read_samples_.end();
361 8 : pos != limit && (max_samples == DDS::LENGTH_UNLIMITED || sample_count < max_samples); ++pos) {
362 2 : samples.push_back(pos->sample);
363 2 : infos.push_back(make_sample_info(DDS::READ_SAMPLE_STATE, view_state_, instance_state_, pos->disposed_generation_count, pos->no_writers_generation_count, 0, 0, 0, true));
364 2 : ++sample_count;
365 : }
366 : }
367 :
368 7 : if (sample_states & DDS::NOT_READ_SAMPLE_STATE) {
369 6 : typename SampleList::iterator pos = not_read_samples_.begin();
370 6 : for (typename SampleList::iterator limit = not_read_samples_.end();
371 10 : pos != limit && (max_samples == DDS::LENGTH_UNLIMITED || sample_count < max_samples); ++pos) {
372 4 : samples.push_back(pos->sample);
373 4 : infos.push_back(make_sample_info(DDS::NOT_READ_SAMPLE_STATE, view_state_, instance_state_, pos->disposed_generation_count, pos->no_writers_generation_count, 0, 0, 0, true));
374 4 : ++sample_count;
375 : }
376 6 : read_samples_.splice(read_samples_.end(), not_read_samples_, not_read_samples_.begin(), pos);
377 :
378 : // Generate a synthetic sample for not alive states.
379 6 : if (sample_count == 0 &&
380 0 : instance_state_ != DDS::ALIVE_INSTANCE_STATE &&
381 0 : !informed_of_not_alive_) {
382 0 : samples.push_back(key);
383 0 : infos.push_back(make_sample_info(DDS::NOT_READ_SAMPLE_STATE, view_state_, instance_state_, disposed_generation_count_, no_writers_generation_count_, 0, 0, 0, false));
384 0 : ++sample_count;
385 : }
386 : }
387 :
388 7 : compute_ranks(sample_count, infos);
389 :
390 7 : if (sample_count) {
391 6 : view_state_ = DDS::NOT_NEW_VIEW_STATE;
392 6 : informed_of_not_alive_ = true;
393 : }
394 : }
395 :
396 26 : void take(const T& key,
397 : SampleSequence& samples,
398 : InternalSampleInfoSequence& infos,
399 : CORBA::Long max_samples,
400 : DDS::SampleStateMask sample_states,
401 : DDS::ViewStateMask view_states,
402 : DDS::InstanceStateMask instance_states)
403 : {
404 26 : if (!((view_states & view_state_) && (instance_states & instance_state_))) {
405 4 : return;
406 : }
407 :
408 22 : CORBA::Long sample_count = 0;
409 :
410 22 : if (sample_states & DDS::READ_SAMPLE_STATE) {
411 21 : typename SampleList::iterator pos = read_samples_.begin();
412 21 : for (typename SampleList::iterator limit = read_samples_.end();
413 21 : pos != limit && (max_samples == DDS::LENGTH_UNLIMITED || sample_count < max_samples); ++pos) {
414 0 : samples.push_back(pos->sample);
415 0 : infos.push_back(make_sample_info(DDS::READ_SAMPLE_STATE, view_state_, instance_state_, pos->disposed_generation_count, pos->no_writers_generation_count, 0, 0, 0, true));
416 0 : ++sample_count;
417 : }
418 21 : read_samples_.erase(read_samples_.begin(), pos);
419 : }
420 :
421 22 : if (sample_states & DDS::NOT_READ_SAMPLE_STATE) {
422 21 : typename SampleList::iterator pos = not_read_samples_.begin();
423 21 : for (typename SampleList::iterator limit = not_read_samples_.end();
424 42 : pos != limit && (max_samples == DDS::LENGTH_UNLIMITED || sample_count < max_samples); ++pos) {
425 21 : samples.push_back(pos->sample);
426 21 : infos.push_back(make_sample_info(DDS::NOT_READ_SAMPLE_STATE, view_state_, instance_state_, pos->disposed_generation_count, pos->no_writers_generation_count, 0, 0, 0, true));
427 21 : ++sample_count;
428 : }
429 21 : not_read_samples_.erase(not_read_samples_.begin(), pos);
430 :
431 : // Generate a synthetic sample for not alive states.
432 21 : if (sample_count == 0 &&
433 2 : instance_state_ != DDS::ALIVE_INSTANCE_STATE &&
434 1 : !informed_of_not_alive_) {
435 1 : samples.push_back(key);
436 1 : infos.push_back(make_sample_info(DDS::NOT_READ_SAMPLE_STATE, view_state_, instance_state_, disposed_generation_count_, no_writers_generation_count_, 0, 0, 0, false));
437 1 : ++sample_count;
438 : }
439 : }
440 :
441 22 : compute_ranks(sample_count, infos);
442 :
443 22 : if (sample_count) {
444 20 : view_state_ = DDS::NOT_NEW_VIEW_STATE;
445 20 : informed_of_not_alive_ = true;
446 : }
447 : }
448 :
449 29 : void write(InternalEntity_wrch publication_handle,
450 : const T& sample,
451 : const DDS::DataReaderQos& qos)
452 : {
453 29 : publication_set_.insert(publication_handle);
454 :
455 29 : if (view_state_ == DDS::NOT_NEW_VIEW_STATE && instance_state_ != DDS::ALIVE_INSTANCE_STATE) {
456 2 : view_state_ = DDS::NEW_VIEW_STATE;
457 : }
458 :
459 29 : switch (instance_state_) {
460 27 : case DDS::ALIVE_INSTANCE_STATE:
461 27 : break;
462 2 : case DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE:
463 2 : ++disposed_generation_count_;
464 2 : break;
465 0 : case DDS::NOT_ALIVE_NO_WRITERS_INSTANCE_STATE:
466 0 : ++no_writers_generation_count_;
467 0 : break;
468 : }
469 :
470 29 : instance_state_ = DDS::ALIVE_INSTANCE_STATE;
471 :
472 29 : if (qos.history.kind == DDS::KEEP_LAST_HISTORY_QOS) {
473 26 : while (read_samples_.size() + not_read_samples_.size() >= static_cast<size_t>(qos.history.depth)) {
474 2 : if (!read_samples_.empty()) {
475 1 : read_samples_.pop_front();
476 : } else {
477 1 : not_read_samples_.pop_front();
478 : }
479 : }
480 : }
481 :
482 29 : not_read_samples_.push_back(SampleHolder(sample, disposed_generation_count_, no_writers_generation_count_));
483 29 : }
484 :
485 7 : bool dispose(InternalEntity_wrch publication_handle,
486 : const DDS::DataReaderQos& qos)
487 : {
488 7 : publication_set_.insert(publication_handle);
489 :
490 7 : if (instance_state_ != DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE) {
491 7 : instance_state_ = DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE;
492 7 : disposed_expiration_date_ = SystemTimePoint::now().to_dds_time() + qos.reader_data_lifecycle.autopurge_disposed_samples_delay;
493 7 : informed_of_not_alive_ = false;
494 7 : return true;
495 : }
496 :
497 0 : return false;
498 : }
499 :
500 8 : bool unregister_instance(InternalEntity_wrch publication_handle,
501 : const DDS::DataReaderQos& qos)
502 : {
503 8 : publication_set_.erase(publication_handle);
504 :
505 8 : if (publication_set_.empty() && instance_state_ == DDS::ALIVE_INSTANCE_STATE) {
506 5 : instance_state_ = DDS::NOT_ALIVE_NO_WRITERS_INSTANCE_STATE;
507 5 : no_writers_expiration_date_ = SystemTimePoint::now().to_dds_time() + qos.reader_data_lifecycle.autopurge_nowriter_samples_delay;
508 5 : informed_of_not_alive_ = false;
509 5 : return true;
510 : }
511 :
512 3 : return false;
513 : }
514 :
515 : private:
516 : struct SampleHolder {
517 : T sample;
518 : CORBA::Long disposed_generation_count;
519 : CORBA::Long no_writers_generation_count;
520 :
521 29 : SampleHolder(const T& s,
522 : CORBA::Long dgc,
523 : CORBA::Long nwgc)
524 29 : : sample(s)
525 29 : , disposed_generation_count(dgc)
526 29 : , no_writers_generation_count(nwgc)
527 29 : {}
528 : };
529 :
530 : typedef OPENDDS_LIST(SampleHolder) SampleList;
531 : SampleList read_samples_;
532 : SampleList not_read_samples_;
533 :
534 : PublicationSet publication_set_;
535 :
536 : DDS::ViewStateKind view_state_;
537 : DDS::InstanceStateKind instance_state_;
538 : DDS::Time_t disposed_expiration_date_;
539 : DDS::Time_t no_writers_expiration_date_;
540 : CORBA::Long disposed_generation_count_;
541 : CORBA::Long no_writers_generation_count_;
542 : bool informed_of_not_alive_;
543 :
544 29 : void compute_ranks(CORBA::Long sample_count, InternalSampleInfoSequence& infos)
545 : {
546 29 : if (sample_count == 0) {
547 3 : return;
548 : }
549 :
550 26 : typename InternalSampleInfoSequence::reverse_iterator pos = infos.rbegin();
551 26 : const CORBA::Long mrsic = pos->disposed_generation_count + pos->no_writers_generation_count;
552 26 : const CORBA::Long mrs = disposed_generation_count_ + no_writers_generation_count_;
553 :
554 54 : for (CORBA::Long rank = 0; rank != sample_count; ++rank, ++pos) {
555 28 : pos->sample_rank = rank;
556 28 : pos->generation_rank = mrsic - (pos->disposed_generation_count + pos->no_writers_generation_count);
557 28 : pos->absolute_generation_rank = mrs - (pos->disposed_generation_count + pos->no_writers_generation_count);
558 : }
559 : }
560 : };
561 :
562 : typedef OPENDDS_MAP_T(T, Instance) InstanceMap;
563 : InstanceMap instance_map_;
564 :
565 : mutable ACE_Thread_Mutex mutex_;
566 : };
567 :
568 : } // namespace DCPS
569 : } // namespace OpenDDS
570 :
571 : OPENDDS_END_VERSIONED_NAMESPACE_DECL
572 :
573 : #endif /* OPENDDS_DCPS_INTERNAL_DATA_READER_H */
|