Line data Source code
1 : /* 2 : * 3 : * 4 : * Distributed under the OpenDDS License. 5 : * See: http://www.opendds.org/license.html 6 : */ 7 : #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/ 8 : #include "dcps_export.h" 9 : 10 : #include "DataReaderImpl.h" 11 : #include "GuidConverter.h" 12 : #include "Service_Participant.h" 13 : #include "Time_Helper.h" 14 : #include "WriterInfo.h" 15 : 16 : #include "ace/OS_NS_sys_time.h" 17 : 18 : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL 19 : 20 : namespace OpenDDS { 21 : namespace DCPS { 22 : 23 0 : WriterInfoListener::WriterInfoListener() 24 0 : : subscription_id_(GUID_UNKNOWN) 25 : { 26 0 : } 27 : 28 0 : WriterInfoListener::~WriterInfoListener() 29 : { 30 0 : } 31 : 32 : /// tell instances when a DataWriter transitions to being alive 33 : /// The writer state is inout parameter, it has to be set ALIVE before 34 : /// handle_timeout is called since some subroutine use the state. 35 : void 36 0 : WriterInfoListener::writer_became_alive(WriterInfo&, 37 : const MonotonicTimePoint&) 38 : { 39 0 : } 40 : 41 : /// tell instances when a DataWriter transitions to DEAD 42 : /// The writer state is inout parameter, the state is set to DEAD 43 : /// when it returns. 44 : void 45 0 : WriterInfoListener::writer_became_dead(WriterInfo&) 46 : { 47 0 : } 48 : 49 : /// tell instance when a DataWriter is removed. 50 : /// The liveliness status need update. 51 : void 52 0 : WriterInfoListener::writer_removed(WriterInfo&) 53 : { 54 0 : } 55 : 56 0 : WriterInfo::WriterInfo(const WriterInfoListener_rch& reader, 57 : const GUID_t& writer_id, 58 0 : const ::DDS::DataWriterQos& writer_qos) 59 0 : : last_liveliness_activity_time_(MonotonicTimePoint::now()) 60 0 : , historic_samples_timer_(NO_TIMER) 61 0 : , last_historic_seq_(SequenceNumber::SEQUENCENUMBER_UNKNOWN()) 62 0 : , waiting_for_end_historic_samples_(false) 63 0 : , delivering_historic_samples_(false) 64 0 : , delivering_historic_samples_cv_(mutex_) 65 0 : , state_(NOT_SET) 66 0 : , reader_(reader) 67 0 : , writer_id_(writer_id) 68 0 : , writer_qos_(writer_qos) 69 0 : , handle_(DDS::HANDLE_NIL) 70 : { 71 : #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 72 0 : reset_coherent_info(); 73 : #endif 74 : 75 0 : if (DCPS_debug_level >= 5) { 76 0 : ACE_DEBUG((LM_DEBUG, 77 : ACE_TEXT("(%P|%t) WriterInfo::WriterInfo: ") 78 : ACE_TEXT("writer %C added to reader %C.\n"), 79 : LogGuid(writer_id).c_str(), 80 : LogGuid(reader->subscription_id_).c_str())); 81 : } 82 0 : } 83 : 84 0 : const char* WriterInfo::get_state_str() const 85 : { 86 0 : ACE_Guard<ACE_Thread_Mutex> guard(mutex_); 87 0 : switch (state_) { 88 0 : case NOT_SET: 89 0 : return "NOT_SET"; 90 0 : case ALIVE: 91 0 : return "ALIVE"; 92 0 : case DEAD: 93 0 : return "DEAD"; 94 0 : default: 95 0 : ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: WriterInfo::get_state_str: ") 96 : ACE_TEXT("%d is either invalid or not recognized.\n"), 97 : state_)); 98 0 : return "Invalid state"; 99 : } 100 0 : } 101 : 102 : void 103 0 : WriterInfo::schedule_historic_samples_timer(EndHistoricSamplesMissedSweeper* sweeper, const ACE_Time_Value& ten_seconds) 104 : { 105 0 : ACE_Guard<ACE_Thread_Mutex> guard(mutex_); 106 0 : const void* arg = reinterpret_cast<const void*>(this); 107 0 : historic_samples_timer_ = sweeper->reactor()->schedule_timer(sweeper, arg, ten_seconds); 108 0 : } 109 : 110 : void 111 0 : WriterInfo::cancel_historic_samples_timer(EndHistoricSamplesMissedSweeper* sweeper) 112 : { 113 0 : ACE_Guard<ACE_Thread_Mutex> guard(mutex_); 114 0 : if (historic_samples_timer_ != WriterInfo::NO_TIMER) { 115 0 : sweeper->reactor()->cancel_timer(historic_samples_timer_); 116 0 : historic_samples_timer_ = WriterInfo::NO_TIMER; 117 : } 118 0 : } 119 : 120 : bool 121 0 : WriterInfo::check_end_historic_samples(EndHistoricSamplesMissedSweeper* sweeper, OPENDDS_MAP(SequenceNumber, ReceivedDataSample)& to_deliver) 122 : { 123 0 : ACE_Guard<ACE_Thread_Mutex> guard(mutex_); 124 0 : while (delivering_historic_samples_) { 125 0 : delivering_historic_samples_cv_.wait(TheServiceParticipant->get_thread_status_manager()); 126 : } 127 0 : if (waiting_for_end_historic_samples_) { 128 0 : bool result = false; 129 0 : RcHandle<WriterInfo> info = rchandle_from(this); 130 0 : if (!historic_samples_.empty()) { 131 0 : last_historic_seq_ = historic_samples_.rbegin()->first; 132 0 : delivering_historic_samples_ = true; 133 0 : to_deliver.swap(historic_samples_); 134 0 : result = true; 135 : } 136 0 : guard.release(); 137 0 : sweeper->cancel_timer(info); 138 0 : return result; 139 0 : } 140 0 : return false; 141 0 : } 142 : 143 : bool 144 0 : WriterInfo::check_historic(const SequenceNumber& seq, const ReceivedDataSample& sample, SequenceNumber& last_historic_seq) 145 : { 146 0 : ACE_Guard<ACE_Thread_Mutex> guard(mutex_); 147 0 : last_historic_seq = last_historic_seq_; 148 0 : if (waiting_for_end_historic_samples_) { 149 0 : historic_samples_.insert(std::make_pair(seq, sample)); 150 0 : return true; 151 : } 152 0 : return false; 153 0 : } 154 : 155 : void 156 0 : WriterInfo::finished_delivering_historic() 157 : { 158 0 : ACE_Guard<ACE_Thread_Mutex> guard(mutex_); 159 0 : delivering_historic_samples_ = false; 160 0 : delivering_historic_samples_cv_.notify_all(); 161 0 : } 162 : 163 : #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 164 : void 165 0 : WriterInfo::add_coherent_samples(const SequenceNumber& seq) 166 : { 167 0 : ACE_Guard<ACE_Thread_Mutex> guard(mutex_); 168 0 : if (coherent_samples_ == 0) { 169 0 : static const SequenceNumber defaultSN; 170 0 : const SequenceRange resetRange(defaultSN, seq); 171 0 : coherent_sample_sequence_.reset(); 172 0 : coherent_sample_sequence_.insert(resetRange); 173 : } 174 : else { 175 0 : coherent_sample_sequence_.insert(seq); 176 : } 177 0 : } 178 : 179 : void 180 0 : WriterInfo::coherent_change(bool group_coherent, const GUID_t& publisher_id) 181 : { 182 0 : ACE_Guard<ACE_Thread_Mutex> guard(mutex_); 183 0 : group_coherent_ = group_coherent; 184 0 : publisher_id_ = publisher_id; 185 0 : ++coherent_samples_; 186 0 : } 187 : #endif 188 : 189 : void 190 0 : WriterInfo::clear_owner_evaluated() 191 : { 192 0 : ACE_Guard<ACE_Thread_Mutex> guard(mutex_); 193 0 : owner_evaluated_.clear(); 194 0 : } 195 : 196 : void 197 0 : WriterInfo::set_owner_evaluated(::DDS::InstanceHandle_t instance, bool flag) 198 : { 199 0 : ACE_Guard<ACE_Thread_Mutex> guard(mutex_); 200 0 : if (flag || 201 0 : (!flag && owner_evaluated_.find(instance) != owner_evaluated_.end())) { 202 0 : owner_evaluated_[instance] = flag; 203 : } 204 0 : } 205 : 206 : bool 207 0 : WriterInfo::is_owner_evaluated(::DDS::InstanceHandle_t instance) 208 : { 209 0 : ACE_Guard<ACE_Thread_Mutex> guard(mutex_); 210 0 : OwnerEvaluateFlags::iterator iter = owner_evaluated_.find(instance); 211 0 : if (iter == owner_evaluated_.end()) { 212 0 : owner_evaluated_.insert(OwnerEvaluateFlags::value_type(instance, false)); 213 0 : return false; 214 : } 215 : else 216 0 : return iter->second; 217 0 : } 218 : 219 : MonotonicTimePoint 220 0 : WriterInfo::check_activity(const MonotonicTimePoint& now) 221 : { 222 0 : MonotonicTimePoint expires_at(MonotonicTimePoint::max_value); 223 : 224 0 : ACE_Guard<ACE_Thread_Mutex> guard(mutex_); 225 : 226 0 : WriterInfoListener_rch reader = reader_.lock(); 227 : 228 : // We only need check the liveliness with the non-zero liveliness_lease_duration_. 229 0 : if (state_ == ALIVE && reader && !reader->liveliness_lease_duration_.is_zero()) { 230 0 : expires_at = last_liveliness_activity_time_ + reader->liveliness_lease_duration_; 231 : 232 0 : if (expires_at <= now) { 233 : // let all instances know this write is not alive. 234 0 : guard.release(); 235 0 : reader->writer_became_dead(*this); 236 0 : expires_at = MonotonicTimePoint::max_value; 237 : } 238 : } 239 : 240 0 : return expires_at; 241 0 : } 242 : 243 : void 244 0 : WriterInfo::removed() 245 : { 246 0 : ACE_Guard<ACE_Thread_Mutex> guard(mutex_); 247 0 : WriterInfoListener_rch reader = reader_.lock(); 248 0 : guard.release(); 249 0 : if (reader) { 250 0 : reader->writer_removed(*this); 251 : } 252 0 : } 253 : 254 : #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 255 : Coherent_State 256 0 : WriterInfo::coherent_change_received() 257 : { 258 0 : ACE_Guard<ACE_Thread_Mutex> guard(mutex_); 259 0 : if (writer_coherent_samples_.num_samples_ == 0) { 260 0 : return NOT_COMPLETED_YET; 261 : } 262 : 263 0 : if (!coherent_sample_sequence_.disjoint() 264 0 : && (coherent_sample_sequence_.high() 265 0 : == writer_coherent_samples_.last_sample_)) { 266 0 : return COMPLETED; 267 : } 268 : 269 0 : if (coherent_sample_sequence_.high() > 270 0 : writer_coherent_samples_.last_sample_) { 271 0 : return REJECTED; 272 : } 273 : 274 0 : return NOT_COMPLETED_YET; 275 0 : } 276 : 277 : void 278 0 : WriterInfo::reset_coherent_info() 279 : { 280 0 : ACE_Guard<ACE_Thread_Mutex> guard(mutex_); 281 0 : coherent_samples_ = 0; 282 0 : group_coherent_ = false; 283 0 : publisher_id_ = GUID_UNKNOWN; 284 0 : coherent_sample_sequence_.reset(); 285 0 : writer_coherent_samples_.reset(); 286 0 : group_coherent_samples_.clear(); 287 0 : } 288 : 289 : 290 : void 291 0 : WriterInfo::set_group_info(const CoherentChangeControl& info) 292 : { 293 0 : ACE_Guard<ACE_Thread_Mutex> guard(mutex_); 294 0 : if (!(publisher_id_ == info.publisher_id_) 295 0 : || group_coherent_ != info.group_coherent_) { 296 0 : WriterInfoListener_rch reader = reader_.lock(); 297 0 : ACE_ERROR((LM_ERROR, 298 : ACE_TEXT("(%P|%t) ERROR: WriterInfo::set_group_info()") 299 : ACE_TEXT(" reader %C writer %C incorrect coherent info !\n"), 300 : reader ? LogGuid(reader->subscription_id_).c_str() : "", 301 : LogGuid(writer_id_).c_str())); 302 0 : } 303 : 304 0 : writer_coherent_samples_ = info.coherent_samples_; 305 0 : group_coherent_samples_ = info.group_coherent_samples_; 306 0 : } 307 : 308 : #endif // OPENDDS_NO_OBJECT_MODEL_PROFILE 309 : 310 : } 311 : } 312 : 313 : OPENDDS_END_VERSIONED_NAMESPACE_DECL