Line data Source code
1 : /* 2 : * 3 : * 4 : * Distributed under the OpenDDS License. 5 : * See: http://www.opendds.org/license.html 6 : */ 7 : 8 : 9 : #ifndef OPENDDS_DCPS_WRITERINFO_H 10 : #define OPENDDS_DCPS_WRITERINFO_H 11 : 12 : #include "Atomic.h" 13 : #include "CoherentChangeControl.h" 14 : #include "ConditionVariable.h" 15 : #include "Definitions.h" 16 : #include "DisjointSequence.h" 17 : #include "PoolAllocator.h" 18 : #include "RcObject.h" 19 : #include "TimeTypes.h" 20 : #include "transport/framework/ReceivedDataSample.h" 21 : 22 : #include <dds/DdsDcpsInfoUtilsC.h> 23 : #include <dds/DdsDcpsCoreC.h> 24 : 25 : ACE_BEGIN_VERSIONED_NAMESPACE_DECL 26 : class ACE_Reactor; 27 : class ACE_Event_Handler; 28 : ACE_END_VERSIONED_NAMESPACE_DECL 29 : 30 : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL 31 : 32 : namespace OpenDDS { 33 : namespace DCPS { 34 : 35 : class WriterInfo; 36 : class EndHistoricSamplesMissedSweeper; 37 : 38 : class OpenDDS_Dcps_Export WriterInfoListener: public virtual RcObject 39 : { 40 : public: 41 : WriterInfoListener(); 42 : virtual ~WriterInfoListener(); 43 : 44 : GUID_t subscription_id_; 45 : 46 : /// The time interval for checking liveliness. 47 : /// TBD: Should this be initialized with 48 : /// DDS::DURATION_INFINITE_SEC and DDS::DURATION_INFINITE_NSEC 49 : /// instead of ACE_Time_Value::zero to be consistent with default 50 : /// duration qos ? Or should we simply use the ACE_Time_Value::zero 51 : /// to indicate the INFINITY duration ? 52 : TimeDuration liveliness_lease_duration_; 53 : 54 : /// tell instances when a DataWriter transitions to being alive 55 : /// The writer state is inout parameter, it has to be set ALIVE before 56 : /// handle_timeout is called since some subroutine use the state. 57 : virtual void writer_became_alive(WriterInfo& info, 58 : const MonotonicTimePoint& when); 59 : 60 : /// tell instances when a DataWriter transitions to DEAD 61 : /// The writer state is inout parameter, the state is set to DEAD 62 : /// when it returns. 63 : virtual void writer_became_dead(WriterInfo& info); 64 : 65 : /// tell instance when a DataWriter is removed. 66 : /// The liveliness status need update. 67 : virtual void writer_removed(WriterInfo& info); 68 : }; 69 : 70 : typedef RcHandle<WriterInfoListener> WriterInfoListener_rch; 71 : 72 : #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 73 : enum Coherent_State { 74 : NOT_COMPLETED_YET, 75 : COMPLETED, 76 : REJECTED 77 : }; 78 : #endif 79 : 80 : /// Keeps track of a DataWriter's liveliness for a DataReader. 81 : class OpenDDS_Dcps_Export WriterInfo : public virtual RcObject { 82 : friend class WriteInfoListner; 83 : 84 : public: 85 : enum WriterState { NOT_SET, ALIVE, DEAD }; 86 : enum TimerState { NO_TIMER = -1 }; 87 : 88 : WriterInfo(const WriterInfoListener_rch& reader, 89 : const GUID_t& writer_id, 90 : const DDS::DataWriterQos& writer_qos); 91 : 92 : /// check to see if this writer is alive (called by handle_timeout). 93 : /// @param now next monotonic time this DataWriter will become not active (not alive) 94 : /// if no sample or liveliness message is received. 95 : /// @returns montonic time when the Writer will become not active (if no activity) 96 : /// of MonotonicTimePoint::max_value if the writer is already or became not alive 97 : MonotonicTimePoint check_activity(const MonotonicTimePoint& now); 98 : 99 : /// called when a sample or other activity is received from this writer. 100 : void received_activity(const MonotonicTimePoint& when); 101 : 102 : /// returns 1 if the DataWriter is lively; 2 if dead; otherwise returns 0. 103 0 : WriterState state() const 104 : { 105 0 : ACE_Guard<ACE_Thread_Mutex> guard(mutex_); 106 0 : return state_; 107 0 : }; 108 : 109 0 : void state(WriterState state) 110 : { 111 0 : ACE_Guard<ACE_Thread_Mutex> guard(mutex_); 112 0 : state_ = state; 113 0 : }; 114 : 115 0 : DDS::InstanceHandle_t handle() const 116 : { 117 0 : ACE_Guard<ACE_Thread_Mutex> guard(mutex_); 118 0 : return handle_; 119 0 : } 120 : 121 0 : void handle(DDS::InstanceHandle_t handle) 122 : { 123 0 : ACE_Guard<ACE_Thread_Mutex> guard(mutex_); 124 0 : handle_ = handle; 125 0 : }; 126 : 127 0 : GUID_t writer_id() const 128 : { 129 0 : ACE_Guard<ACE_Thread_Mutex> guard(mutex_); 130 0 : return writer_id_; 131 0 : } 132 : 133 0 : CORBA::Long writer_qos_ownership_strength() const 134 : { 135 0 : ACE_Guard<ACE_Thread_Mutex> guard(mutex_); 136 0 : return writer_qos_.ownership_strength.value; 137 0 : } 138 : 139 0 : void writer_qos_ownership_strength(const CORBA::Long writer_qos_ownership_strength) 140 : { 141 0 : ACE_Guard<ACE_Thread_Mutex> guard(mutex_); 142 0 : writer_qos_.ownership_strength.value = writer_qos_ownership_strength; 143 0 : } 144 : 145 : bool waiting_for_end_historic_samples() const 146 : { 147 : ACE_Guard<ACE_Thread_Mutex> guard(mutex_); 148 : return waiting_for_end_historic_samples_; 149 : } 150 : 151 0 : void waiting_for_end_historic_samples(bool waiting_for_end_historic_samples) 152 : { 153 0 : ACE_Guard<ACE_Thread_Mutex> guard(mutex_); 154 0 : waiting_for_end_historic_samples_ = waiting_for_end_historic_samples; 155 0 : } 156 : 157 : SequenceNumber last_historic_seq() const 158 : { 159 : ACE_Guard<ACE_Thread_Mutex> guard(mutex_); 160 : return last_historic_seq_; 161 : } 162 : 163 : void last_historic_seq(const SequenceNumber& last_historic_seq) 164 : { 165 : ACE_Guard<ACE_Thread_Mutex> guard(mutex_); 166 : last_historic_seq_ = last_historic_seq; 167 : } 168 : 169 : const char* get_state_str() const; 170 : 171 : /// update liveliness when remove_association is called. 172 : void removed(); 173 : 174 : #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 175 : Coherent_State coherent_change_received(); 176 : void reset_coherent_info(); 177 : void set_group_info(const CoherentChangeControl& info); 178 : void add_coherent_samples(const SequenceNumber& seq); 179 : void coherent_change(bool group_coherent, const GUID_t& publisher_id); 180 : 181 0 : bool group_coherent() const { 182 0 : ACE_Guard<ACE_Thread_Mutex> guard(mutex_); 183 0 : return group_coherent_; 184 0 : } 185 : 186 0 : GUID_t publisher_id() const { 187 0 : ACE_Guard<ACE_Thread_Mutex> guard(mutex_); 188 0 : return publisher_id_; 189 0 : } 190 : 191 : #endif 192 : 193 : void clear_owner_evaluated(); 194 : void set_owner_evaluated(DDS::InstanceHandle_t instance, bool flag); 195 : bool is_owner_evaluated(DDS::InstanceHandle_t instance); 196 : 197 : void schedule_historic_samples_timer(EndHistoricSamplesMissedSweeper* sweeper, const ACE_Time_Value& ten_seconds); 198 : void cancel_historic_samples_timer(EndHistoricSamplesMissedSweeper* sweeper); 199 : bool check_end_historic_samples(EndHistoricSamplesMissedSweeper* sweeper, OPENDDS_MAP(SequenceNumber, ReceivedDataSample)& to_deliver); 200 : bool check_historic(const SequenceNumber& seq, const ReceivedDataSample& sample, SequenceNumber& last_historic_seq); 201 : void finished_delivering_historic(); 202 : 203 : private: 204 : 205 : mutable ACE_Thread_Mutex mutex_; 206 : 207 : /// Timestamp of last write/dispose/assert_liveliness from this DataWriter 208 : MonotonicTimePoint last_liveliness_activity_time_; 209 : 210 : // Non-negative if this a durable writer which has a timer scheduled 211 : long historic_samples_timer_; 212 : 213 : /// Temporary holding place for samples received before 214 : /// the END_HISTORIC_SAMPLES control message. 215 : OPENDDS_MAP(SequenceNumber, ReceivedDataSample) historic_samples_; 216 : 217 : /// After receiving END_HISTORIC_SAMPLES, check for duplicates 218 : SequenceNumber last_historic_seq_; 219 : 220 : bool waiting_for_end_historic_samples_; 221 : 222 : bool delivering_historic_samples_; 223 : ConditionVariable<ACE_Thread_Mutex> delivering_historic_samples_cv_; 224 : 225 : /// State of the writer. 226 : WriterState state_; 227 : 228 : /// The DataReader owning this WriterInfo 229 : WeakRcHandle<WriterInfoListener> reader_; 230 : 231 : /// DCPSInfoRepo ID of the DataWriter 232 : GUID_t writer_id_; 233 : 234 : /// Writer qos 235 : DDS::DataWriterQos writer_qos_; 236 : 237 : /// The publication entity instance handle. 238 : DDS::InstanceHandle_t handle_; 239 : 240 : /// Number of received coherent changes in active change set. 241 : Atomic<ACE_UINT32> coherent_samples_; 242 : 243 : /// Is this writer evaluated for owner ? 244 : typedef OPENDDS_MAP(DDS::InstanceHandle_t, bool) OwnerEvaluateFlags; 245 : OwnerEvaluateFlags owner_evaluated_; 246 : 247 : /// Data to support GROUP access scope. 248 : #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 249 : bool group_coherent_; 250 : GUID_t publisher_id_; 251 : DisjointSequence coherent_sample_sequence_; 252 : WriterCoherentSample writer_coherent_samples_; 253 : GroupCoherentSamples group_coherent_samples_; 254 : #endif 255 : 256 : }; 257 : 258 : inline 259 : void 260 0 : WriterInfo::received_activity(const MonotonicTimePoint& when) 261 : { 262 0 : ACE_Guard<ACE_Thread_Mutex> guard(mutex_); 263 : 264 0 : last_liveliness_activity_time_ = when; 265 : 266 0 : if (state_ != ALIVE) { // NOT_SET || DEAD 267 0 : RcHandle<WriterInfoListener> reader = reader_.lock(); 268 0 : guard.release(); 269 0 : if (reader) { 270 0 : reader->writer_became_alive(*this, when); 271 : } 272 0 : } 273 0 : } 274 : 275 : typedef RcHandle<WriterInfo> WriterInfo_rch; 276 : 277 : } // namespace DCPS 278 : } // namespace 279 : 280 : OPENDDS_END_VERSIONED_NAMESPACE_DECL 281 : 282 : #endif /* end of include guard: OPENDDS_DCPS_WRITERINFO_H */