Line data Source code
1 : /* 2 : * 3 : * 4 : * Distributed under the OpenDDS License. 5 : * See: http://www.opendds.org/license.html 6 : */ 7 : 8 : #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/ 9 : #include "ace/Event_Handler.h" 10 : #include "ace/Reactor.h" 11 : #include "InstanceState.h" 12 : #include "DataReaderImpl.h" 13 : #include "SubscriptionInstance.h" 14 : #include "ReceivedDataElementList.h" 15 : #include "Time_Helper.h" 16 : #include "DomainParticipantImpl.h" 17 : #include "GuidConverter.h" 18 : 19 : #if !defined (__ACE_INLINE__) 20 : # include "InstanceState.inl" 21 : #endif /* !__ACE_INLINE__ */ 22 : 23 : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL 24 : 25 : namespace OpenDDS { 26 : namespace DCPS { 27 : 28 0 : InstanceState::InstanceState(const DataReaderImpl_rch& reader, 29 : ACE_Recursive_Thread_Mutex& lock, 30 0 : DDS::InstanceHandle_t handle) 31 : : ReactorInterceptor(TheServiceParticipant->reactor(), 32 : TheServiceParticipant->reactor_owner()), 33 0 : lock_(lock), 34 0 : instance_state_(0), 35 0 : view_state_(0), 36 0 : disposed_generation_count_(0), 37 0 : no_writers_generation_count_(0), 38 0 : empty_(true), 39 0 : release_pending_(false), 40 0 : release_timer_id_(-1), 41 0 : reader_(reader), 42 0 : handle_(handle), 43 0 : owner_(GUID_UNKNOWN), 44 : #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE 45 0 : exclusive_(reader->qos_.ownership.kind == DDS::EXCLUSIVE_OWNERSHIP_QOS), 46 : #endif 47 0 : registered_(false) 48 0 : {} 49 : 50 0 : InstanceState::~InstanceState() 51 : { 52 : #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE 53 0 : if (registered_) { 54 0 : RcHandle<DataReaderImpl> reader = reader_.lock(); 55 0 : if (reader) { 56 0 : DataReaderImpl::OwnershipManagerPtr om = reader->ownership_manager(); 57 0 : if (om) om->remove_instance(this); 58 0 : } 59 0 : } 60 : #endif 61 0 : } 62 : 63 0 : void InstanceState::sample_info(DDS::SampleInfo& si, const ReceivedDataElement* de) 64 : { 65 0 : si.sample_state = de->sample_state_; 66 0 : si.view_state = view_state_; 67 0 : si.instance_state = instance_state_; 68 0 : si.disposed_generation_count = 69 0 : static_cast<CORBA::Long>(disposed_generation_count_); 70 0 : si.no_writers_generation_count = 71 0 : static_cast<CORBA::Long>(no_writers_generation_count_); 72 0 : si.source_timestamp = de->source_timestamp_; 73 0 : si.instance_handle = handle_; 74 0 : RcHandle<DataReaderImpl> reader = reader_.lock(); 75 0 : if (reader) { 76 0 : RcHandle<DomainParticipantImpl> participant = reader->participant_servant_.lock(); 77 0 : si.publication_handle = participant ? participant->lookup_handle(de->pub_) : DDS::HANDLE_NIL; 78 0 : } else { 79 0 : si.publication_handle = DDS::HANDLE_NIL; 80 : } 81 0 : si.valid_data = de->valid_data_; 82 : /* 83 : * These are actually calculated later... 84 : */ 85 0 : si.sample_rank = 0; 86 : 87 : // these aren't the real value, they're being saved 88 : // for a later calculation. the actual value is 89 : // calculated in DataReaderImpl::sample_info using 90 : // these values. 91 0 : si.generation_rank = 92 0 : static_cast<CORBA::Long>(de->disposed_generation_count_ + 93 0 : de->no_writers_generation_count_); 94 0 : si.absolute_generation_rank = 95 0 : static_cast<CORBA::Long>(de->disposed_generation_count_ + 96 0 : de->no_writers_generation_count_); 97 : 98 0 : si.opendds_reserved_publication_seq = de->sequence_.getValue(); 99 0 : } 100 : 101 : // cannot ACE_INLINE because of #include loop 102 : 103 0 : int InstanceState::handle_timeout(const ACE_Time_Value&, const void*) 104 : { 105 0 : ThreadStatusManager::Event ev(TheServiceParticipant->get_thread_status_manager()); 106 : 107 0 : if (DCPS_debug_level) { 108 0 : ACE_DEBUG((LM_NOTICE, 109 : ACE_TEXT("(%P|%t) NOTICE:") 110 : ACE_TEXT(" InstanceState::handle_timeout:") 111 : ACE_TEXT(" autopurging samples with instance handle 0x%x!\n"), 112 : handle_)); 113 : } 114 0 : release(); 115 : 116 0 : return 0; 117 0 : } 118 : 119 0 : bool InstanceState::dispose_was_received(const GUID_t& writer_id) 120 : { 121 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, lock_, false); 122 0 : writers_.erase(writer_id); 123 : 124 : // 125 : // Manage the instance state on disposal here. 126 : // 127 : // If disposed by owner then the owner is not re-elected, it can 128 : // resume if the writer sends message again. 129 0 : if (instance_state_ & DDS::ALIVE_INSTANCE_STATE) { 130 : #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE 131 0 : RcHandle<DataReaderImpl> reader = reader_.lock(); 132 0 : if (reader) { 133 0 : DataReaderImpl::OwnershipManagerPtr owner_manager = reader->ownership_manager(); 134 0 : if (! exclusive_ 135 0 : || (owner_manager && owner_manager->is_owner (handle_, writer_id))) { 136 : #endif 137 0 : instance_state_ = DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE; 138 0 : state_updated(); 139 0 : schedule_release(); 140 0 : return true; 141 : #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE 142 : } 143 0 : } 144 : #endif 145 0 : } 146 : 147 0 : return false; 148 0 : } 149 : 150 0 : bool InstanceState::unregister_was_received(const GUID_t& writer_id) 151 : { 152 0 : if (DCPS_debug_level > 1) { 153 0 : ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) InstanceState::unregister_was_received on %C\n"), 154 : LogGuid(writer_id).c_str() 155 : )); 156 : } 157 : 158 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, lock_, false); 159 0 : writers_.erase(writer_id); 160 : #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE 161 0 : if (exclusive_) { 162 : // If unregistered by owner then the ownership should be transferred to another 163 : // writer. 164 0 : RcHandle<DataReaderImpl> reader = reader_.lock(); 165 0 : if (reader) { 166 0 : DataReaderImpl::OwnershipManagerPtr owner_manager = reader->ownership_manager(); 167 0 : if (owner_manager) 168 0 : owner_manager->remove_writer (handle_, writer_id); 169 0 : } 170 0 : } 171 : #endif 172 : 173 0 : if (writers_.empty() && (instance_state_ & DDS::ALIVE_INSTANCE_STATE)) { 174 0 : instance_state_ = DDS::NOT_ALIVE_NO_WRITERS_INSTANCE_STATE; 175 0 : state_updated(); 176 0 : schedule_release(); 177 0 : return true; 178 : } 179 : 180 0 : return false; 181 0 : } 182 : 183 0 : void InstanceState::schedule_pending() 184 : { 185 0 : release_pending_ = true; 186 0 : } 187 : 188 0 : void OpenDDS::DCPS::InstanceState::state_updated() const 189 : { 190 0 : RcHandle<DataReaderImpl> reader = reader_.lock(); 191 0 : if (reader) { 192 0 : reader->state_updated(handle_); 193 : } 194 0 : } 195 : 196 0 : void InstanceState::schedule_release() 197 : { 198 0 : DDS::DataReaderQos qos; 199 0 : RcHandle<DataReaderImpl> reader = reader_.lock(); 200 0 : if (reader) { 201 0 : qos = reader->qos_; 202 : } else { 203 0 : cancel_release(); 204 0 : return; 205 : } 206 : 207 : DDS::Duration_t delay; 208 : 209 0 : switch (instance_state_) { 210 0 : case DDS::NOT_ALIVE_NO_WRITERS_INSTANCE_STATE: 211 0 : delay = qos.reader_data_lifecycle.autopurge_nowriter_samples_delay; 212 0 : break; 213 : 214 0 : case DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE: 215 0 : delay = qos.reader_data_lifecycle.autopurge_disposed_samples_delay; 216 0 : break; 217 : 218 0 : default: 219 0 : ACE_ERROR((LM_ERROR, 220 : ACE_TEXT("(%P|%t) ERROR: InstanceState::schedule_release:") 221 : ACE_TEXT(" Unsupported instance state: %d!\n"), 222 : instance_state_)); 223 0 : return; 224 : } 225 : 226 0 : if (delay.sec != DDS::DURATION_INFINITE_SEC && 227 0 : delay.nanosec != DDS::DURATION_INFINITE_NSEC) { 228 : 229 0 : execute_or_enqueue(make_rch<ScheduleCommand>(this, TimeDuration(delay))); 230 : 231 : } else { 232 : // N.B. instance transitions are always followed by a non-valid 233 : // sample being queued to the ReceivedDataElementList; marking 234 : // the release as pending prevents this sample from being lost 235 : // if all samples have been already removed from the instance. 236 0 : schedule_pending(); 237 : } 238 0 : } 239 : 240 0 : void InstanceState::cancel_release() 241 : { 242 0 : release_pending_ = false; 243 0 : execute_or_enqueue(make_rch<CancelCommand>(this)); 244 0 : } 245 : 246 0 : bool InstanceState::release_if_empty() 247 : { 248 0 : bool released = false; 249 0 : if (empty_ && writers_.empty()) { 250 0 : release(); 251 0 : released = true; 252 : } else { 253 0 : schedule_pending(); 254 : } 255 0 : return released; 256 : } 257 : 258 0 : void InstanceState::release() 259 : { 260 0 : RcHandle<DataReaderImpl> reader = reader_.lock(); 261 0 : if (reader) { 262 0 : reader->release_instance(handle_); 263 : } 264 0 : } 265 : 266 0 : void InstanceState::set_owner(const GUID_t& owner) 267 : { 268 0 : ACE_Guard<ACE_Thread_Mutex> guard(owner_lock_); 269 0 : owner_ = owner; 270 0 : } 271 : 272 0 : GUID_t InstanceState::get_owner() 273 : { 274 0 : ACE_Guard<ACE_Thread_Mutex> guard(owner_lock_); 275 0 : return owner_; 276 0 : } 277 : 278 0 : bool InstanceState::is_exclusive() const 279 : { 280 0 : return exclusive_; 281 : } 282 : 283 0 : bool InstanceState::registered() 284 : { 285 0 : ACE_Guard<ACE_Recursive_Thread_Mutex> guard(lock_); 286 0 : const bool ret = registered_; 287 0 : registered_ = true; 288 0 : return ret; 289 0 : } 290 : 291 0 : void InstanceState::registered(bool flag) 292 : { 293 0 : ACE_Guard<ACE_Recursive_Thread_Mutex> guard(lock_); 294 0 : registered_ = flag; 295 0 : } 296 : 297 0 : void InstanceState::reset_ownership(DDS::InstanceHandle_t instance) 298 : { 299 0 : ACE_Guard<ACE_Recursive_Thread_Mutex> guard(lock_); 300 0 : set_owner(GUID_UNKNOWN); 301 0 : registered_ = false; 302 : 303 0 : RcHandle<DataReaderImpl> reader = reader_.lock(); 304 0 : if (reader) { 305 0 : reader->reset_ownership(instance); 306 : } 307 0 : } 308 : 309 0 : bool InstanceState::most_recent_generation(ReceivedDataElement* item) const 310 : { 311 0 : return item->disposed_generation_count_ == disposed_generation_count_ 312 0 : && item->no_writers_generation_count_ == no_writers_generation_count_; 313 : } 314 : 315 0 : bool InstanceState::reactor_is_shut_down() const 316 : { 317 0 : return TheServiceParticipant->is_shut_down(); 318 : } 319 : 320 0 : void InstanceState::CancelCommand::execute() 321 : { 322 0 : if (instance_state_->release_timer_id_ != -1) { 323 0 : instance_state_->reactor()->cancel_timer(instance_state_); 324 0 : instance_state_->release_timer_id_ = -1; 325 : } 326 0 : } 327 : 328 0 : void InstanceState::ScheduleCommand::execute() 329 : { 330 0 : if (instance_state_->release_timer_id_ != -1) { 331 0 : instance_state_->reactor()->cancel_timer(instance_state_); 332 : } 333 : 334 0 : instance_state_->release_timer_id_ = 335 0 : instance_state_->reactor()->schedule_timer(instance_state_, 0, delay_.value()); 336 : 337 0 : if (instance_state_->release_timer_id_ == -1) { 338 0 : ACE_ERROR((LM_ERROR, 339 : ACE_TEXT("(%P|%t) ERROR: InstanceState::ScheduleCommand::execute:") 340 : ACE_TEXT(" Unable to schedule timer!\n"))); 341 : } 342 0 : } 343 : 344 0 : const char* InstanceState::instance_state_string(DDS::InstanceStateKind value) 345 : { 346 0 : switch (value) { 347 0 : case DDS::ALIVE_INSTANCE_STATE: 348 0 : return "ALIVE_INSTANCE_STATE"; 349 0 : case DDS::NOT_ALIVE_INSTANCE_STATE: 350 0 : return "NOT_ALIVE_INSTANCE_STATE"; 351 0 : case DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE: 352 0 : return "NOT_ALIVE_DISPOSED_INSTANCE_STATE"; 353 0 : case DDS::NOT_ALIVE_NO_WRITERS_INSTANCE_STATE: 354 0 : return "NOT_ALIVE_NO_WRITERS_INSTANCE_STATE"; 355 0 : case DDS::ANY_INSTANCE_STATE: 356 0 : return "ANY_INSTANCE_STATE"; 357 0 : default: 358 0 : ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: InstanceState::instance_state_string: ") 359 : ACE_TEXT("%d is either invalid or not recognized.\n"), 360 : value)); 361 : 362 0 : return "Invalid instance state"; 363 : } 364 : } 365 : 366 0 : OPENDDS_STRING InstanceState::instance_state_mask_string(DDS::InstanceStateMask mask) 367 : { 368 0 : if (mask == DDS::ANY_INSTANCE_STATE) { 369 0 : return instance_state_string(DDS::ANY_INSTANCE_STATE); 370 : } 371 0 : if (mask == DDS::NOT_ALIVE_INSTANCE_STATE) { 372 0 : return instance_state_string(DDS::NOT_ALIVE_INSTANCE_STATE); 373 : } 374 0 : OPENDDS_STRING str; 375 0 : if (mask & DDS::ALIVE_INSTANCE_STATE) { 376 0 : str = instance_state_string(DDS::ALIVE_INSTANCE_STATE); 377 : } 378 0 : if (mask & DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE) { 379 0 : if (!str.empty()) str += " | "; 380 0 : str += instance_state_string(DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE); 381 : } 382 0 : if (mask & DDS::NOT_ALIVE_NO_WRITERS_INSTANCE_STATE) { 383 0 : if (!str.empty()) str += " | "; 384 0 : str += instance_state_string(DDS::NOT_ALIVE_NO_WRITERS_INSTANCE_STATE); 385 : } 386 0 : return str; 387 0 : } 388 : 389 : } 390 : } 391 : 392 : OPENDDS_END_VERSIONED_NAMESPACE_DECL