OpenDDS  Snapshot(2023/04/28-20:55)
InternalTopic.h
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 #ifndef OPENDDS_DCPS_INTERNAL_TOPIC_H
9 #define OPENDDS_DCPS_INTERNAL_TOPIC_H
10 
11 #include "dcps_export.h"
12 
13 #ifndef ACE_LACKS_PRAGMA_ONCE
14 # pragma once
15 #endif /* ACE_LACKS_PRAGMA_ONCE */
16 
17 #include "PoolAllocator.h"
18 #include "RcObject.h"
19 #include "InternalDataWriter.h"
20 #include "InternalDataReader.h"
21 
23 
24 namespace OpenDDS {
25 namespace DCPS {
26 
27 template <typename T>
28 class InternalTopic : public virtual RcObject {
29 public:
34 
35  void connect(InternalDataWriter_rch writer)
36  {
38  const std::pair<typename WriterSet::iterator, bool> p = writers_.insert(writer);
39 
40  if (p.second) {
41  for (typename ReaderSet::const_iterator pos = readers_.begin(), limit = readers_.end(); pos != limit; ++pos) {
42  InternalDataReader_rch reader = pos->lock();
43  if (reader) {
44  writer->add_reader(reader);
45  }
46  }
47  }
48  }
49 
50  void connect(InternalDataReader_rch reader)
51  {
53  const std::pair<typename ReaderSet::iterator, bool> p = readers_.insert(reader);
54 
55  if (p.second) {
56  for (typename WriterSet::const_iterator pos = writers_.begin(), limit = writers_.end(); pos != limit; ++pos) {
57  InternalDataWriter_rch writer = pos->lock();
58  if (writer) {
59  writer->add_reader(reader);
60  }
61  }
62  }
63  }
64 
65  void disconnect(InternalDataWriter_rch writer)
66  {
68 
69  if (writers_.erase(writer)) {
70  for (typename ReaderSet::const_iterator pos = readers_.begin(), limit = readers_.end(); pos != limit; ++pos) {
71  InternalDataReader_rch reader = pos->lock();
72  if (reader) {
73  writer->remove_reader(reader);
74  }
75  }
76  }
77  }
78 
79  void disconnect(InternalDataReader_rch reader)
80  {
82 
83  if (readers_.erase(reader)) {
84  for (typename WriterSet::const_iterator pos = writers_.begin(), limit = writers_.end(); pos != limit; ++pos) {
85  InternalDataWriter_rch writer = pos->lock();
86  if (writer) {
87  writer->remove_reader(reader);
88  }
89  }
90  }
91  }
92 
93 private:
94  typedef OPENDDS_SET(InternalDataWriter_wrch) WriterSet;
95  WriterSet writers_;
96 
97  typedef OPENDDS_SET(InternalDataReader_wrch) ReaderSet;
98  ReaderSet readers_;
99 
101 };
102 
103 } // namespace DCPS
104 } // namespace OpenDDS
105 
107 
108 #endif // OPENDDS_DCPS_INTERNAL_TOPIC_H
RcHandle< InternalDataWriter< T > > InternalDataWriter_rch
Definition: InternalTopic.h:30
void connect(InternalDataWriter_rch writer)
Definition: InternalTopic.h:35
#define ACE_GUARD(MUTEX, OBJ, LOCK)
WeakRcHandle< InternalDataReader< T > > InternalDataReader_wrch
Definition: InternalTopic.h:33
RcHandle< InternalDataReader< T > > InternalDataReader_rch
Definition: InternalTopic.h:32
WeakRcHandle< InternalDataWriter< T > > InternalDataWriter_wrch
Definition: InternalTopic.h:31
typedef OPENDDS_SET(InternalDataWriter_wrch) WriterSet
void connect(InternalDataReader_rch reader)
Definition: InternalTopic.h:50
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
void disconnect(InternalDataWriter_rch writer)
Definition: InternalTopic.h:65
void disconnect(InternalDataReader_rch reader)
Definition: InternalTopic.h:79
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28