Line data Source code
1 : /* 2 : * 3 : * 4 : * Distributed under the OpenDDS License. 5 : * See: http://www.opendds.org/license.html 6 : */ 7 : 8 : #ifndef OPENDDS_DCPS_INTERNAL_DATA_WRITER_H 9 : #define OPENDDS_DCPS_INTERNAL_DATA_WRITER_H 10 : 11 : #include "dcps_export.h" 12 : 13 : #ifndef ACE_LACKS_PRAGMA_ONCE 14 : # pragma once 15 : #endif /* ACE_LACKS_PRAGMA_ONCE */ 16 : 17 : #include "RcObject.h" 18 : #include "InternalDataReader.h" 19 : 20 : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL 21 : 22 : namespace OpenDDS { 23 : namespace DCPS { 24 : 25 : /* 26 : The InternalDataWriter supports the following QoS: 27 : 28 : DurabilityQosPolicy durability; => VOLATILE, TRANSIENT_LOCAL 29 : DurabilityServiceQosPolicy durability_service; => None 30 : DeadlineQosPolicy deadline; => None 31 : LatencyBudgetQosPolicy latency_budget => None 32 : LivelinessQosPolicy liveliness => None 33 : ReliabilityQosPolicy reliability => RELIABLE 34 : DestinationOrderQosPolicy destination_order; => None 35 : HistoryQosPolicy history; => KEEP_LAST_HISTORY, KEEP_ALL_HISTORY 36 : ResourceLimitsQosPolicy resource_limits; => None 37 : TransportPriorityQosPolicy transport_priority; => None 38 : LifespanQosPolicy lifespan; => None 39 : UserDataQosPolicy user_data; => None 40 : OwnershipQosPolicy ownership; => None 41 : OwnershipStrengthQosPolicy ownership_strength; => None 42 : WriterDataLifecycleQosPolicy writer_data_lifecycle; => Yes 43 : */ 44 : 45 : template <typename T> 46 : class InternalDataWriter : public InternalEntity { 47 : public: 48 : typedef RcHandle<InternalDataReader<T> > InternalDataReader_rch; 49 : typedef WeakRcHandle<InternalDataReader<T> > InternalDataReader_wrch; 50 : 51 23 : explicit InternalDataWriter(const DDS::DataWriterQos& qos) 52 23 : : qos_(qos) 53 23 : {} 54 : 55 : /// @name InternalTopic Interface 56 : /// @{ 57 16 : void add_reader(InternalDataReader_rch reader) 58 : { 59 16 : ACE_GUARD(ACE_Thread_Mutex, g, mutex_); 60 16 : readers_.insert(reader); 61 : 62 16 : if (qos_.durability.kind == DDS::TRANSIENT_LOCAL_DURABILITY_QOS && reader->durable()) { 63 2 : for (typename InstanceMap::iterator pos = instance_map_.begin(), limit = instance_map_.end(); 64 6 : pos != limit; ++pos) { 65 4 : pos->second.add_reader(reader, static_rchandle_cast<InternalEntity>(rchandle_from(this))); 66 : } 67 : } 68 16 : } 69 : 70 3 : void remove_reader(InternalDataReader_rch reader) 71 : { 72 3 : ACE_GUARD(ACE_Thread_Mutex, g, mutex_); 73 3 : if (readers_.erase(reader)) { 74 3 : reader->remove_publication(static_rchandle_cast<InternalEntity>(rchandle_from(this)), qos_.writer_data_lifecycle.autodispose_unregistered_instances); 75 : } 76 3 : } 77 : 78 10 : bool has_reader(InternalDataReader_rch reader) 79 : { 80 10 : ACE_GUARD_RETURN(ACE_Thread_Mutex, g, mutex_, false); 81 10 : return readers_.count(reader); 82 10 : } 83 : 84 : InternalEntity_wrch publication_handle() 85 : { 86 : return static_rchandle_cast<InternalEntity>(rchandle_from(this)); 87 : } 88 : /// @} 89 : 90 : /// @name User Interface 91 : /// @{ 92 65 : void write(const T& sample) 93 : { 94 65 : ACE_GUARD(ACE_Thread_Mutex, g, mutex_); 95 : 96 65 : if (qos_.durability.kind == DDS::TRANSIENT_LOCAL_DURABILITY_QOS) { 97 60 : const std::pair<typename InstanceMap::iterator, bool> p = instance_map_.insert(std::make_pair(sample, SampleHolder())); 98 60 : p.first->second.write(sample, qos_); 99 : } 100 : 101 70 : for (typename ReaderSet::const_iterator pos = readers_.begin(), limit = readers_.end(); pos != limit; ++pos) { 102 5 : InternalDataReader_rch reader = pos->lock(); 103 5 : if (reader) { 104 5 : reader->write(static_rchandle_cast<InternalEntity>(rchandle_from(this)), sample); 105 : } 106 : } 107 65 : } 108 : 109 1 : void dispose(const T& sample) 110 : { 111 1 : ACE_GUARD(ACE_Thread_Mutex, g, mutex_); 112 : 113 1 : if (qos_.durability.kind == DDS::TRANSIENT_LOCAL_DURABILITY_QOS) { 114 0 : typename InstanceMap::iterator pos = instance_map_.find(sample); 115 0 : if (pos != instance_map_.end()) { 116 0 : pos->second.dispose(); 117 : } 118 : } 119 : 120 2 : for (typename ReaderSet::const_iterator pos = readers_.begin(), limit = readers_.end(); pos != limit; ++pos) { 121 1 : InternalDataReader_rch reader = pos->lock(); 122 1 : if (reader) { 123 1 : reader->dispose(static_rchandle_cast<InternalEntity>(rchandle_from(this)), sample); 124 : } 125 : } 126 1 : } 127 : 128 2 : void unregister_instance(const T& sample) 129 : { 130 2 : ACE_GUARD(ACE_Thread_Mutex, g, mutex_); 131 : 132 2 : if (qos_.durability.kind == DDS::TRANSIENT_LOCAL_DURABILITY_QOS) { 133 0 : instance_map_.erase(sample); 134 : } 135 : 136 4 : for (typename ReaderSet::const_iterator pos = readers_.begin(), limit = readers_.end(); pos != limit; ++pos) { 137 2 : InternalDataReader_rch reader = pos->lock(); 138 2 : if (reader) { 139 2 : if (qos_.writer_data_lifecycle.autodispose_unregistered_instances) { 140 1 : reader->dispose(static_rchandle_cast<InternalEntity>(rchandle_from(this)), sample); 141 : } 142 2 : reader->unregister_instance(static_rchandle_cast<InternalEntity>(rchandle_from(this)), sample); 143 : } 144 : } 145 2 : } 146 : /// @} 147 : 148 : private: 149 : const DDS::DataWriterQos qos_; 150 : 151 : typedef OPENDDS_SET(InternalDataReader_wrch) ReaderSet; 152 : ReaderSet readers_; 153 : 154 : class SampleHolder { 155 : public: 156 : bool empty() const { return samples_.empty(); } 157 : 158 4 : void add_reader(InternalDataReader_rch reader, RcHandle<InternalEntity> writer) 159 : { 160 9 : for (typename SampleList::const_iterator pos = samples_.begin(), limit = samples_.end(); pos != limit; ++pos) { 161 5 : reader->write(writer, *pos); 162 : } 163 4 : } 164 : 165 60 : void write(const T& sample, 166 : const DDS::DataWriterQos& qos) 167 : { 168 60 : samples_.push_back(sample); 169 60 : if (qos.history.kind == DDS::KEEP_LAST_HISTORY_QOS) { 170 61 : while (samples_.size() > static_cast<std::size_t>(qos.history.depth)) { 171 1 : samples_.pop_front(); 172 : } 173 : } 174 60 : } 175 : 176 0 : void dispose() 177 : { 178 0 : samples_.clear(); 179 0 : } 180 : 181 : private: 182 : typedef OPENDDS_LIST(T) SampleList; 183 : SampleList samples_; 184 : }; 185 : 186 : typedef OPENDDS_MAP_T(T, SampleHolder) InstanceMap; 187 : InstanceMap instance_map_; 188 : 189 : ACE_Thread_Mutex mutex_; 190 : }; 191 : 192 : } // namespace DCPS 193 : } // namespace OpenDDS 194 : 195 : OPENDDS_END_VERSIONED_NAMESPACE_DECL 196 : 197 : #endif /* OPENDDS_DCPS_INTERNAL_DATA_WRITER_H */