OpenDDS  Snapshot(2023/04/28-20:55)
InternalDataReader.h
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 #ifndef OPENDDS_DCPS_INTERNAL_DATA_READER_H
9 #define OPENDDS_DCPS_INTERNAL_DATA_READER_H
10 
11 #include "dcps_export.h"
12 
13 #ifndef ACE_LACKS_PRAGMA_ONCE
14 # pragma once
15 #endif /* ACE_LACKS_PRAGMA_ONCE */
16 
17 #include "RcObject.h"
18 #include "PoolAllocator.h"
20 #include "Time_Helper.h"
21 #include "TimeTypes.h"
22 
23 #include <dds/DdsDcpsCoreC.h>
24 #include <dds/DdsDcpsInfrastructureC.h>
25 
27 
28 namespace OpenDDS {
29 namespace DCPS {
30 
31 class InternalEntity : public virtual RcObject {};
33 
34 typedef OPENDDS_VECTOR(DDS::SampleInfo) InternalSampleInfoSequence;
35 
37  DDS::ViewStateKind view_state,
38  DDS::InstanceStateKind instance_state,
39  CORBA::Long disposed_generation_count,
40  CORBA::Long no_writers_generation_count,
41  CORBA::Long sample_rank,
42  CORBA::Long generation_rank,
43  CORBA::Long absolute_generation_rank,
44  bool valid_data)
45 {
46  DDS::SampleInfo si;
47  si.sample_state = sample_state;
48  si.view_state = view_state;
49  si.instance_state = instance_state;
50  si.source_timestamp = make_time_t(0, 0); // TODO
51  si.instance_handle = DDS::HANDLE_NIL; // TODO
53  si.disposed_generation_count = disposed_generation_count;
54  si.no_writers_generation_count = no_writers_generation_count;
55  si.sample_rank = sample_rank;
56  si.generation_rank = generation_rank;
57  si.absolute_generation_rank = absolute_generation_rank;
58  si.valid_data = valid_data;
59  return si;
60 }
61 
62 #ifndef OPENDDS_SAFETY_PROFILE
63 inline bool operator==(const DDS::SampleInfo& x, const DDS::SampleInfo& y)
64 {
65  return x.sample_state == y.sample_state &&
66  x.view_state == y.view_state &&
73  x.sample_rank == y.sample_rank &&
76  x.valid_data == y.valid_data;
77 }
78 #endif
79 
81 public:
82  SampleInfoWrapper(const DDS::SampleInfo& sample_info)
83  : si(sample_info)
84  {}
85 
86  bool operator==(const SampleInfoWrapper& other) const
87  {
88  return si == other.si;
89  }
90 
92 };
93 
94 template <typename T>
96 public:
97  typedef OPENDDS_VECTOR(T) SampleSequence;
100 
102  Listener_rch listener = Listener_rch())
103  : qos_(qos)
104  , listener_(listener)
105  {}
106 
107  /// @name InternalTopic and InternalWriter Interface
108  /// @{
109  bool durable() const { return qos_.durability.kind == DDS::TRANSIENT_LOCAL_DURABILITY_QOS; }
110 
111  void remove_publication(InternalEntity_wrch publication_handle, bool autodispose_unregistered_instances)
112  {
113  ACE_GUARD(ACE_Thread_Mutex, g, mutex_);
114 
115  // FUTURE: Index by publication_handle to avoid the loop.
116  bool schedule = false;
117  for (typename InstanceMap::iterator pos = instance_map_.begin(), limit = instance_map_.end(); pos != limit; ++pos) {
118  if (autodispose_unregistered_instances && pos->second.dispose(publication_handle, qos_)) {
119  schedule = true;
120  }
121  if (pos->second.unregister_instance(publication_handle, qos_)) {
122  schedule = true;
123  }
124  }
125 
126  if (schedule) {
127  const Listener_rch listener = listener_.lock();
128  if (listener) {
129  listener->schedule(rchandle_from(this));
130  // TODO: If the listener doesn't do anything, then clean up then possibly clean up the instance.
131  }
132  }
133  }
134 
135  void write(InternalEntity_wrch publication_handle, const T& sample)
136  {
137  ACE_GUARD(ACE_Thread_Mutex, g, mutex_);
138 
139  const std::pair<typename InstanceMap::iterator, bool> p = instance_map_.insert(std::make_pair(sample, Instance()));
140  p.first->second.write(publication_handle, sample, qos_);
141 
142  const Listener_rch listener = listener_.lock();
143  if (listener) {
144  listener->schedule(rchandle_from(this));
145  }
146  }
147 
148  void dispose(InternalEntity_wrch publication_handle, const T& sample)
149  {
150  ACE_GUARD(ACE_Thread_Mutex, g, mutex_);
151 
152  typename InstanceMap::iterator pos = instance_map_.find(sample);
153  if (pos == instance_map_.end()) {
154  return;
155  }
156 
157  if (pos->second.dispose(publication_handle, qos_)) {
158  const Listener_rch listener = listener_.lock();
159  if (listener) {
160  listener->schedule(rchandle_from(this));
161  }
162  }
163  }
164 
165  void unregister_instance(InternalEntity_wrch publication_handle, const T& sample)
166  {
167  ACE_GUARD(ACE_Thread_Mutex, g, mutex_);
168 
169  typename InstanceMap::iterator pos = instance_map_.find(sample);
170  if (pos == instance_map_.end()) {
171  return;
172  }
173 
174  if (pos->second.unregister_instance(publication_handle, qos_)) {
175  const Listener_rch listener = listener_.lock();
176  if (listener) {
177  listener->schedule(rchandle_from(this));
178  }
179  }
180  }
181  /// @}
182 
183  /// @name User Interface
184  /// @{
185  void set_listener(Listener_rch listener)
186  {
187  ACE_GUARD(ACE_Thread_Mutex, g, mutex_);
188  listener_ = listener;
189  }
190 
191  Listener_rch get_listener() const
192  {
193  ACE_GUARD_RETURN(ACE_Thread_Mutex, g, mutex_, Listener_rch());
194  return listener_.lock();
195  }
196 
197  void read(SampleSequence& samples,
198  InternalSampleInfoSequence& infos,
203  {
204  samples.clear();
205  infos.clear();
206 
207  // TODO: Index to avoid the loop.
208  ACE_GUARD(ACE_Thread_Mutex, g, mutex_);
209  for (typename InstanceMap::iterator pos = instance_map_.begin(), limit = instance_map_.end(); pos != limit; ) {
210  pos->second.read(pos->first, samples, infos, max_samples, sample_states, view_states, instance_states);
211  pos->second.purge_samples(qos_);
212  if (pos->second.can_purge_instance(qos_)) {
213  instance_map_.erase(pos++);
214  } else {
215  ++pos;
216  }
217  }
218  }
219 
220  void take(SampleSequence& samples,
221  InternalSampleInfoSequence& infos,
226  {
227  samples.clear();
228  infos.clear();
229 
230  // TODO: Index to avoid the loop.
231  ACE_GUARD(ACE_Thread_Mutex, g, mutex_);
232  for (typename InstanceMap::iterator pos = instance_map_.begin(), limit = instance_map_.end(); pos != limit; ) {
233  pos->second.take(pos->first, samples, infos, max_samples, sample_states, view_states, instance_states);
234  pos->second.purge_samples(qos_);
235  if (pos->second.can_purge_instance(qos_)) {
236  instance_map_.erase(pos++);
237  } else {
238  ++pos;
239  }
240  }
241  }
242 
243  void read_instance(SampleSequence& samples,
244  InternalSampleInfoSequence& infos,
246  const T& key,
250  {
251  samples.clear();
252  infos.clear();
253 
254  ACE_GUARD(ACE_Thread_Mutex, g, mutex_);
255  typename InstanceMap::iterator pos = instance_map_.find(key);
256  if (pos != instance_map_.end()) {
257  pos->second.read(pos->first, samples, infos, max_samples, sample_states, view_states, instance_states);
258  pos->second.purge_samples(qos_);
259  if (pos->second.can_purge_instance(qos_)) {
260  instance_map_.erase(pos);
261  }
262  }
263  }
264 
265  void take_instance(SampleSequence& samples,
266  InternalSampleInfoSequence& infos,
268  const T& key,
272  {
273  samples.clear();
274  infos.clear();
275 
276  ACE_GUARD(ACE_Thread_Mutex, g, mutex_);
277  typename InstanceMap::iterator pos = instance_map_.find(key);
278  if (pos != instance_map_.end()) {
279  pos->second.take(pos->first, samples, infos, max_samples, sample_states, view_states, instance_states);
280  pos->second.purge_samples(qos_);
281  if (pos->second.can_purge_instance(qos_)) {
282  instance_map_.erase(pos);
283  }
284  }
285  }
286 /// @}
287 
288 private:
290  // Often, the listener will have the reader as a member. Use a weak
291  // pointer to prevent a cycle that prevents the listener from being
292  // destroyed.
293  Listener_wrch listener_;
294 
295  typedef OPENDDS_SET(InternalEntity_wrch) PublicationSet;
296 
297  class Instance {
298  public:
299 
301  : view_state_(DDS::NEW_VIEW_STATE)
302  , instance_state_(DDS::ALIVE_INSTANCE_STATE)
303  , disposed_generation_count_(0)
304  , no_writers_generation_count_(0)
305  , informed_of_not_alive_(false)
306  {
307  disposed_expiration_date_.sec = 0;
308  disposed_expiration_date_.nanosec = 0;
309  no_writers_expiration_date_.sec = 0;
310  no_writers_expiration_date_.nanosec = 0;
311  }
312 
313  DDS::ViewStateKind view_state() const { return view_state_; }
314 
315  DDS::InstanceStateKind instance_state() const { return instance_state_; }
316 
318  {
319  if (instance_state_ == DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE &&
321  SystemTimePoint::now().to_dds_time() > disposed_expiration_date_) {
322  not_read_samples_.clear();
323  read_samples_.clear();
324  }
325  }
326 
327  bool can_purge_instance(const DDS::DataReaderQos& qos) const
328  {
329  if (instance_state_ != DDS::ALIVE_INSTANCE_STATE &&
330  not_read_samples_.empty() &&
331  read_samples_.empty() &&
332  publication_set_.empty()) {
333  return true;
334  }
335 
336  if (instance_state_ == DDS::NOT_ALIVE_NO_WRITERS_INSTANCE_STATE &&
338  SystemTimePoint::now().to_dds_time() > no_writers_expiration_date_) {
339  return true;
340  }
341 
342  return false;
343  }
344 
345  void read(const T& key,
346  SampleSequence& samples,
347  InternalSampleInfoSequence& infos,
352  {
353  if (!((view_states & view_state_) && (instance_states & instance_state_))) {
354  return;
355  }
356 
357  CORBA::Long sample_count = 0;
358 
359  if (sample_states & DDS::READ_SAMPLE_STATE) {
360  for (typename SampleList::const_iterator pos = read_samples_.begin(), limit = read_samples_.end();
361  pos != limit && (max_samples == DDS::LENGTH_UNLIMITED || sample_count < max_samples); ++pos) {
362  samples.push_back(pos->sample);
363  infos.push_back(make_sample_info(DDS::READ_SAMPLE_STATE, view_state_, instance_state_, pos->disposed_generation_count, pos->no_writers_generation_count, 0, 0, 0, true));
364  ++sample_count;
365  }
366  }
367 
368  if (sample_states & DDS::NOT_READ_SAMPLE_STATE) {
369  typename SampleList::iterator pos = not_read_samples_.begin();
370  for (typename SampleList::iterator limit = not_read_samples_.end();
371  pos != limit && (max_samples == DDS::LENGTH_UNLIMITED || sample_count < max_samples); ++pos) {
372  samples.push_back(pos->sample);
373  infos.push_back(make_sample_info(DDS::NOT_READ_SAMPLE_STATE, view_state_, instance_state_, pos->disposed_generation_count, pos->no_writers_generation_count, 0, 0, 0, true));
374  ++sample_count;
375  }
376  read_samples_.splice(read_samples_.end(), not_read_samples_, not_read_samples_.begin(), pos);
377 
378  // Generate a synthetic sample for not alive states.
379  if (sample_count == 0 &&
380  instance_state_ != DDS::ALIVE_INSTANCE_STATE &&
381  !informed_of_not_alive_) {
382  samples.push_back(key);
383  infos.push_back(make_sample_info(DDS::NOT_READ_SAMPLE_STATE, view_state_, instance_state_, disposed_generation_count_, no_writers_generation_count_, 0, 0, 0, false));
384  ++sample_count;
385  }
386  }
387 
388  compute_ranks(sample_count, infos);
389 
390  if (sample_count) {
391  view_state_ = DDS::NOT_NEW_VIEW_STATE;
392  informed_of_not_alive_ = true;
393  }
394  }
395 
396  void take(const T& key,
397  SampleSequence& samples,
398  InternalSampleInfoSequence& infos,
403  {
404  if (!((view_states & view_state_) && (instance_states & instance_state_))) {
405  return;
406  }
407 
408  CORBA::Long sample_count = 0;
409 
410  if (sample_states & DDS::READ_SAMPLE_STATE) {
411  typename SampleList::iterator pos = read_samples_.begin();
412  for (typename SampleList::iterator limit = read_samples_.end();
413  pos != limit && (max_samples == DDS::LENGTH_UNLIMITED || sample_count < max_samples); ++pos) {
414  samples.push_back(pos->sample);
415  infos.push_back(make_sample_info(DDS::READ_SAMPLE_STATE, view_state_, instance_state_, pos->disposed_generation_count, pos->no_writers_generation_count, 0, 0, 0, true));
416  ++sample_count;
417  }
418  read_samples_.erase(read_samples_.begin(), pos);
419  }
420 
421  if (sample_states & DDS::NOT_READ_SAMPLE_STATE) {
422  typename SampleList::iterator pos = not_read_samples_.begin();
423  for (typename SampleList::iterator limit = not_read_samples_.end();
424  pos != limit && (max_samples == DDS::LENGTH_UNLIMITED || sample_count < max_samples); ++pos) {
425  samples.push_back(pos->sample);
426  infos.push_back(make_sample_info(DDS::NOT_READ_SAMPLE_STATE, view_state_, instance_state_, pos->disposed_generation_count, pos->no_writers_generation_count, 0, 0, 0, true));
427  ++sample_count;
428  }
429  not_read_samples_.erase(not_read_samples_.begin(), pos);
430 
431  // Generate a synthetic sample for not alive states.
432  if (sample_count == 0 &&
433  instance_state_ != DDS::ALIVE_INSTANCE_STATE &&
434  !informed_of_not_alive_) {
435  samples.push_back(key);
436  infos.push_back(make_sample_info(DDS::NOT_READ_SAMPLE_STATE, view_state_, instance_state_, disposed_generation_count_, no_writers_generation_count_, 0, 0, 0, false));
437  ++sample_count;
438  }
439  }
440 
441  compute_ranks(sample_count, infos);
442 
443  if (sample_count) {
444  view_state_ = DDS::NOT_NEW_VIEW_STATE;
445  informed_of_not_alive_ = true;
446  }
447  }
448 
449  void write(InternalEntity_wrch publication_handle,
450  const T& sample,
451  const DDS::DataReaderQos& qos)
452  {
453  publication_set_.insert(publication_handle);
454 
455  if (view_state_ == DDS::NOT_NEW_VIEW_STATE && instance_state_ != DDS::ALIVE_INSTANCE_STATE) {
456  view_state_ = DDS::NEW_VIEW_STATE;
457  }
458 
459  switch (instance_state_) {
461  break;
463  ++disposed_generation_count_;
464  break;
466  ++no_writers_generation_count_;
467  break;
468  }
469 
470  instance_state_ = DDS::ALIVE_INSTANCE_STATE;
471 
473  while (read_samples_.size() + not_read_samples_.size() >= static_cast<size_t>(qos.history.depth)) {
474  if (!read_samples_.empty()) {
475  read_samples_.pop_front();
476  } else {
477  not_read_samples_.pop_front();
478  }
479  }
480  }
481 
482  not_read_samples_.push_back(SampleHolder(sample, disposed_generation_count_, no_writers_generation_count_));
483  }
484 
485  bool dispose(InternalEntity_wrch publication_handle,
486  const DDS::DataReaderQos& qos)
487  {
488  publication_set_.insert(publication_handle);
489 
490  if (instance_state_ != DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE) {
491  instance_state_ = DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE;
493  informed_of_not_alive_ = false;
494  return true;
495  }
496 
497  return false;
498  }
499 
500  bool unregister_instance(InternalEntity_wrch publication_handle,
501  const DDS::DataReaderQos& qos)
502  {
503  publication_set_.erase(publication_handle);
504 
505  if (publication_set_.empty() && instance_state_ == DDS::ALIVE_INSTANCE_STATE) {
508  informed_of_not_alive_ = false;
509  return true;
510  }
511 
512  return false;
513  }
514 
515  private:
516  struct SampleHolder {
520 
521  SampleHolder(const T& s,
522  CORBA::Long dgc,
523  CORBA::Long nwgc)
524  : sample(s)
525  , disposed_generation_count(dgc)
526  , no_writers_generation_count(nwgc)
527  {}
528  };
529 
530  typedef OPENDDS_LIST(SampleHolder) SampleList;
531  SampleList read_samples_;
532  SampleList not_read_samples_;
533 
534  PublicationSet publication_set_;
535 
543 
544  void compute_ranks(CORBA::Long sample_count, InternalSampleInfoSequence& infos)
545  {
546  if (sample_count == 0) {
547  return;
548  }
549 
550  typename InternalSampleInfoSequence::reverse_iterator pos = infos.rbegin();
551  const CORBA::Long mrsic = pos->disposed_generation_count + pos->no_writers_generation_count;
552  const CORBA::Long mrs = disposed_generation_count_ + no_writers_generation_count_;
553 
554  for (CORBA::Long rank = 0; rank != sample_count; ++rank, ++pos) {
555  pos->sample_rank = rank;
556  pos->generation_rank = mrsic - (pos->disposed_generation_count + pos->no_writers_generation_count);
557  pos->absolute_generation_rank = mrs - (pos->disposed_generation_count + pos->no_writers_generation_count);
558  }
559  }
560  };
561 
562  typedef OPENDDS_MAP_T(T, Instance) InstanceMap;
563  InstanceMap instance_map_;
564 
566 };
567 
568 } // namespace DCPS
569 } // namespace OpenDDS
570 
572 
573 #endif /* OPENDDS_DCPS_INTERNAL_DATA_READER_H */
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask sample_states
Definition: IDLTemplate.txt:72
void read(SampleSequence &samples, InternalSampleInfoSequence &infos, CORBA::Long max_samples, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310
ACE_CDR::Long Long
#define ACE_GUARD(MUTEX, OBJ, LOCK)
DDS::Time_t to_dds_time() const
Definition: TimePoint_T.inl:89
void dispose(InternalEntity_wrch publication_handle, const T &sample)
InstanceStateKind instance_state
const InstanceHandle_t HANDLE_NIL
void write(InternalEntity_wrch publication_handle, const T &sample, const DDS::DataReaderQos &qos)
ViewStateKind view_state
void take(const T &key, SampleSequence &samples, InternalSampleInfoSequence &infos, CORBA::Long max_samples, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask in ::DDS::InstanceStateMask instance_states
Definition: IDLTemplate.txt:72
bool can_purge_instance(const DDS::DataReaderQos &qos) const
HistoryQosPolicyKind kind
sequence< octet > key
void read_instance(SampleSequence &samples, InternalSampleInfoSequence &infos, CORBA::Long max_samples, const T &key, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
DDS::SampleInfo make_sample_info(DDS::SampleStateKind sample_state, DDS::ViewStateKind view_state, DDS::InstanceStateKind instance_state, CORBA::Long disposed_generation_count, CORBA::Long no_writers_generation_count, CORBA::Long sample_rank, CORBA::Long generation_rank, CORBA::Long absolute_generation_rank, bool valid_data)
bool dispose(InternalEntity_wrch publication_handle, const DDS::DataReaderQos &qos)
void remove_publication(InternalEntity_wrch publication_handle, bool autodispose_unregistered_instances)
long absolute_generation_rank
unsigned long InstanceStateMask
InternalDataReader(const DDS::DataReaderQos qos, Listener_rch listener=Listener_rch())
RcHandle< InternalDataReaderListener< T > > Listener_rch
unsigned long SampleStateKind
void read(const T &key, SampleSequence &samples, InternalSampleInfoSequence &infos, CORBA::Long max_samples, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
bool operator==(const DisjointSequence::OrderedRanges< T > &a, const DisjointSequence::OrderedRanges< T > &b)
SampleInfoWrapper(const DDS::SampleInfo &sample_info)
sequence< Publication > PublicationSet
void unregister_instance(InternalEntity_wrch publication_handle, const T &sample)
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask view_states
Definition: IDLTemplate.txt:72
static TimePoint_T< SystemClock > now()
Definition: TimePoint_T.inl:41
WeakRcHandle< InternalDataReaderListener< T > > Listener_wrch
#define OPENDDS_MAP_T
const ViewStateKind NOT_NEW_VIEW_STATE
SampleHolder(const T &s, CORBA::Long dgc, CORBA::Long nwgc)
InstanceHandle_t publication_handle
WeakRcHandle< InternalEntity > InternalEntity_wrch
InstanceHandle_t instance_handle
bool unregister_instance(InternalEntity_wrch publication_handle, const DDS::DataReaderQos &qos)
bool operator==(const SampleInfoWrapper &other) const
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
unsigned long InstanceStateKind
long no_writers_generation_count
void take(SampleSequence &samples, InternalSampleInfoSequence &infos, CORBA::Long max_samples, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
void purge_samples(const DDS::DataReaderQos &qos)
const ViewStateKind NEW_VIEW_STATE
SampleStateKind sample_state
void compute_ranks(CORBA::Long sample_count, InternalSampleInfoSequence &infos)
The End User API.
unsigned long SampleStateMask
ReaderDataLifecycleQosPolicy reader_data_lifecycle
const SampleStateKind READ_SAMPLE_STATE
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long max_samples
Definition: IDLTemplate.txt:72
HistoryQosPolicy history
long disposed_generation_count
ACE_INLINE OpenDDS_Dcps_Export DDS::Time_t make_time_t(int sec, unsigned long nanosec)
ACE_INLINE OpenDDS_Dcps_Export bool is_infinite(const DDS::Duration_t &value)
void take_instance(SampleSequence &samples, InternalSampleInfoSequence &infos, CORBA::Long max_samples, const T &key, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
const InstanceStateKind NOT_ALIVE_DISPOSED_INSTANCE_STATE
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
const SampleStateKind NOT_READ_SAMPLE_STATE
DDS::InstanceStateKind instance_state() const
const long LENGTH_UNLIMITED
unsigned long ViewStateKind
typedef OPENDDS_VECTOR(ActionConnectionRecord) ConnectionRecords
void set_listener(Listener_rch listener)
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
const InstanceStateKind NOT_ALIVE_NO_WRITERS_INSTANCE_STATE
unsigned long ViewStateMask
Time_t source_timestamp
typedef OPENDDS_LIST(SubsectionPair) KeyList
const InstanceStateKind ALIVE_INSTANCE_STATE
void write(InternalEntity_wrch publication_handle, const T &sample)
typedef OPENDDS_SET(NetworkAddress) AddrSet