Line data Source code
1 : /* 2 : * 3 : * 4 : * Distributed under the OpenDDS License. 5 : * See: http://www.opendds.org/license.html 6 : */ 7 : 8 : #ifndef OPENDDS_DCPS_INTERNAL_TOPIC_H 9 : #define OPENDDS_DCPS_INTERNAL_TOPIC_H 10 : 11 : #include "dcps_export.h" 12 : 13 : #ifndef ACE_LACKS_PRAGMA_ONCE 14 : # pragma once 15 : #endif /* ACE_LACKS_PRAGMA_ONCE */ 16 : 17 : #include "PoolAllocator.h" 18 : #include "RcObject.h" 19 : #include "InternalDataWriter.h" 20 : #include "InternalDataReader.h" 21 : 22 : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL 23 : 24 : namespace OpenDDS { 25 : namespace DCPS { 26 : 27 : template <typename T> 28 : class InternalTopic : public virtual RcObject { 29 : public: 30 : typedef RcHandle<InternalDataWriter<T> > InternalDataWriter_rch; 31 : typedef WeakRcHandle<InternalDataWriter<T> > InternalDataWriter_wrch; 32 : typedef RcHandle<InternalDataReader<T> > InternalDataReader_rch; 33 : typedef WeakRcHandle<InternalDataReader<T> > InternalDataReader_wrch; 34 : 35 15 : void connect(InternalDataWriter_rch writer) 36 : { 37 15 : ACE_GUARD(ACE_Thread_Mutex, g, mutex_); 38 15 : const std::pair<typename WriterSet::iterator, bool> p = writers_.insert(writer); 39 : 40 15 : if (p.second) { 41 20 : for (typename ReaderSet::const_iterator pos = readers_.begin(), limit = readers_.end(); pos != limit; ++pos) { 42 5 : InternalDataReader_rch reader = pos->lock(); 43 5 : if (reader) { 44 5 : writer->add_reader(reader); 45 : } 46 : } 47 : } 48 15 : } 49 : 50 6 : void connect(InternalDataReader_rch reader) 51 : { 52 6 : ACE_GUARD(ACE_Thread_Mutex, g, mutex_); 53 6 : const std::pair<typename ReaderSet::iterator, bool> p = readers_.insert(reader); 54 : 55 6 : if (p.second) { 56 9 : for (typename WriterSet::const_iterator pos = writers_.begin(), limit = writers_.end(); pos != limit; ++pos) { 57 3 : InternalDataWriter_rch writer = pos->lock(); 58 3 : if (writer) { 59 3 : writer->add_reader(reader); 60 : } 61 : } 62 : } 63 6 : } 64 : 65 10 : void disconnect(InternalDataWriter_rch writer) 66 : { 67 10 : ACE_GUARD(ACE_Thread_Mutex, g, mutex_); 68 : 69 10 : if (writers_.erase(writer)) { 70 11 : for (typename ReaderSet::const_iterator pos = readers_.begin(), limit = readers_.end(); pos != limit; ++pos) { 71 1 : InternalDataReader_rch reader = pos->lock(); 72 1 : if (reader) { 73 1 : writer->remove_reader(reader); 74 : } 75 : } 76 : } 77 10 : } 78 : 79 1 : void disconnect(InternalDataReader_rch reader) 80 : { 81 1 : ACE_GUARD(ACE_Thread_Mutex, g, mutex_); 82 : 83 1 : if (readers_.erase(reader)) { 84 2 : for (typename WriterSet::const_iterator pos = writers_.begin(), limit = writers_.end(); pos != limit; ++pos) { 85 1 : InternalDataWriter_rch writer = pos->lock(); 86 1 : if (writer) { 87 1 : writer->remove_reader(reader); 88 : } 89 : } 90 : } 91 1 : } 92 : 93 : private: 94 : typedef OPENDDS_SET(InternalDataWriter_wrch) WriterSet; 95 : WriterSet writers_; 96 : 97 : typedef OPENDDS_SET(InternalDataReader_wrch) ReaderSet; 98 : ReaderSet readers_; 99 : 100 : mutable ACE_Thread_Mutex mutex_; 101 : }; 102 : 103 : } // namespace DCPS 104 : } // namespace OpenDDS 105 : 106 : OPENDDS_END_VERSIONED_NAMESPACE_DECL 107 : 108 : #endif // OPENDDS_DCPS_INTERNAL_TOPIC_H