InstanceState.cpp
Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008 #include "DCPS/DdsDcps_pch.h"
00009 #include "ace/Event_Handler.h"
00010 #include "ace/Reactor.h"
00011 #include "InstanceState.h"
00012 #include "DataReaderImpl.h"
00013 #include "SubscriptionInstance.h"
00014 #include "ReceivedDataElementList.h"
00015 #include "Time_Helper.h"
00016 #include "DomainParticipantImpl.h"
00017 #include "GuidConverter.h"
00018
00019 #if !defined (__ACE_INLINE__)
00020 # include "InstanceState.inl"
00021 #endif
00022
00023 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00024
00025 OpenDDS::DCPS::InstanceState::InstanceState(DataReaderImpl* reader,
00026 ACE_Recursive_Thread_Mutex& lock,
00027 DDS::InstanceHandle_t handle)
00028 : lock_(lock),
00029 instance_state_(0),
00030 view_state_(0),
00031 disposed_generation_count_(0),
00032 no_writers_generation_count_(0),
00033 empty_(true),
00034 release_pending_(false),
00035 release_timer_id_(-1),
00036 reader_(reader),
00037 handle_(handle),
00038 owner_(GUID_UNKNOWN),
00039 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00040 exclusive_(reader->qos_.ownership.kind == ::DDS::EXCLUSIVE_OWNERSHIP_QOS),
00041 #endif
00042 registered_ (false)
00043 {}
00044
00045 OpenDDS::DCPS::InstanceState::~InstanceState()
00046 {
00047 cancel_release();
00048 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00049 if (registered_) {
00050 DataReaderImpl::OwnershipManagerPtr om = reader_->ownership_manager();
00051 if (om) om->remove_instance(this);
00052 }
00053 #endif
00054 }
00055
00056 void OpenDDS::DCPS::InstanceState::sample_info(DDS::SampleInfo& si,
00057 const ReceivedDataElement* de)
00058 {
00059 si.sample_state = de->sample_state_;
00060 si.view_state = view_state_;
00061 si.instance_state = instance_state_;
00062 si.disposed_generation_count =
00063 static_cast<CORBA::Long>(disposed_generation_count_);
00064 si.no_writers_generation_count =
00065 static_cast<CORBA::Long>(no_writers_generation_count_);
00066 si.source_timestamp = de->source_timestamp_;
00067 si.instance_handle = handle_;
00068 RcHandle<DomainParticipantImpl> participant = this->reader_->participant_servant_.lock();
00069 si.publication_handle = participant ? participant->id_to_handle(de->pub_) : 0;
00070 si.valid_data = de->registered_data_ != 0;
00071
00072
00073
00074 si.sample_rank = 0;
00075
00076
00077
00078
00079
00080 si.generation_rank =
00081 static_cast<CORBA::Long>(de->disposed_generation_count_ +
00082 de->no_writers_generation_count_);
00083 si.absolute_generation_rank =
00084 static_cast<CORBA::Long>(de->disposed_generation_count_ +
00085 de->no_writers_generation_count_);
00086
00087 si.opendds_reserved_publication_seq = de->sequence_.getValue();
00088 }
00089
00090
00091
00092 int
00093 OpenDDS::DCPS::InstanceState::handle_timeout(const ACE_Time_Value& ,
00094 const void* )
00095 {
00096 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00097 ACE_DEBUG((LM_NOTICE,
00098 ACE_TEXT("(%P|%t) NOTICE:")
00099 ACE_TEXT(" InstanceState::handle_timeout:")
00100 ACE_TEXT(" autopurging samples with instance handle 0x%x!\n"),
00101 this->handle_));
00102 }
00103 this->release();
00104
00105 return 0;
00106 }
00107
00108 bool
00109 OpenDDS::DCPS::InstanceState::dispose_was_received(const PublicationId& writer_id)
00110 {
00111 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00112 guard, this->lock_, false);
00113
00114 writers_.erase(writer_id);
00115
00116
00117
00118
00119
00120
00121 if (this->instance_state_ & DDS::ALIVE_INSTANCE_STATE) {
00122 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00123 DataReaderImpl::OwnershipManagerPtr owner_manager = this->reader_->ownership_manager();
00124 if (! this->exclusive_
00125 || (owner_manager && owner_manager->is_owner (this->handle_, writer_id))) {
00126 #endif
00127 this->instance_state_ = DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE;
00128 schedule_release();
00129 return true;
00130 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00131 }
00132 #endif
00133 }
00134
00135 return false;
00136 }
00137
00138 bool
00139 OpenDDS::DCPS::InstanceState::unregister_was_received(const PublicationId& writer_id)
00140 {
00141 if (OpenDDS::DCPS::DCPS_debug_level > 1) {
00142 GuidConverter conv(writer_id);
00143 ACE_DEBUG((LM_DEBUG,
00144 ACE_TEXT(
00145 "(%P|%t) InstanceState::unregister_was_received on %C\n"
00146 ),
00147 OPENDDS_STRING(conv).c_str()
00148 ));
00149 }
00150
00151 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00152 guard, this->lock_, false);
00153 writers_.erase(writer_id);
00154 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00155 if (this->exclusive_) {
00156
00157
00158 DataReaderImpl::OwnershipManagerPtr owner_manager = this->reader_->ownership_manager();
00159 if (owner_manager)
00160 owner_manager->remove_writer (this->handle_, writer_id);
00161 }
00162 #endif
00163
00164 if (writers_.empty() && (this->instance_state_ & DDS::ALIVE_INSTANCE_STATE)) {
00165 this->instance_state_ = DDS::NOT_ALIVE_NO_WRITERS_INSTANCE_STATE;
00166 schedule_release();
00167 return true;
00168 }
00169
00170 return false;
00171 }
00172
00173 void
00174 OpenDDS::DCPS::InstanceState::writer_became_dead(
00175 const PublicationId& writer_id,
00176 int ,
00177 const ACE_Time_Value& )
00178 {
00179 if (OpenDDS::DCPS::DCPS_debug_level > 1) {
00180 GuidConverter conv(writer_id);
00181 ACE_DEBUG((LM_DEBUG,
00182 ACE_TEXT(
00183 "(%P|%t) InstanceState::writer_became_dead on %C\n"
00184 ),
00185 OPENDDS_STRING(conv).c_str()
00186 ));
00187 }
00188
00189 ACE_GUARD(ACE_Recursive_Thread_Mutex,
00190 guard, this->lock_);
00191 writers_.erase(writer_id);
00192
00193 if (writers_.empty() && this->instance_state_ & DDS::ALIVE_INSTANCE_STATE) {
00194 this->instance_state_ = DDS::NOT_ALIVE_NO_WRITERS_INSTANCE_STATE;
00195 schedule_release();
00196 }
00197 }
00198
00199 void
00200 OpenDDS::DCPS::InstanceState::schedule_pending()
00201 {
00202 this->release_pending_ = true;
00203 }
00204
00205 void
00206 OpenDDS::DCPS::InstanceState::schedule_release()
00207 {
00208 DDS::DataReaderQos qos;
00209 this->reader_->get_qos(qos);
00210
00211 DDS::Duration_t delay;
00212
00213 switch (this->instance_state_) {
00214 case DDS::NOT_ALIVE_NO_WRITERS_INSTANCE_STATE:
00215 delay = qos.reader_data_lifecycle.autopurge_nowriter_samples_delay;
00216 break;
00217
00218 case DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE:
00219 delay = qos.reader_data_lifecycle.autopurge_disposed_samples_delay;
00220 break;
00221
00222 default:
00223 ACE_ERROR((LM_ERROR,
00224 ACE_TEXT("(%P|%t) ERROR: InstanceState::schedule_release:")
00225 ACE_TEXT(" Unsupported instance state: %d!\n"),
00226 this->instance_state_));
00227 return;
00228 }
00229
00230 if (delay.sec != DDS::DURATION_INFINITE_SEC &&
00231 delay.nanosec != DDS::DURATION_INFINITE_NSEC) {
00232 cancel_release();
00233
00234 ACE_Reactor_Timer_Interface* reactor = this->reader_->get_reactor();
00235
00236 this->release_timer_id_ =
00237 reactor->schedule_timer(this, 0, duration_to_time_value(delay));
00238
00239 if (this->release_timer_id_ == -1) {
00240 ACE_ERROR((LM_ERROR,
00241 ACE_TEXT("(%P|%t) ERROR: InstanceState::schedule_release:")
00242 ACE_TEXT(" Unable to schedule timer!\n")));
00243 }
00244
00245 } else {
00246
00247
00248
00249
00250 schedule_pending();
00251 }
00252 }
00253
00254 void
00255 OpenDDS::DCPS::InstanceState::cancel_release()
00256 {
00257 this->release_pending_ = false;
00258
00259 if (this->release_timer_id_ != -1) {
00260 ACE_Reactor_Timer_Interface* reactor = this->reader_->get_reactor();
00261 reactor->cancel_timer(this->release_timer_id_);
00262
00263 this->release_timer_id_ = -1;
00264 }
00265 }
00266
00267 bool
00268 OpenDDS::DCPS::InstanceState::release_if_empty()
00269 {
00270 bool released = false;
00271 if (this->empty_ && this->writers_.empty()) {
00272 release();
00273 released = true;
00274 } else {
00275 schedule_pending();
00276 }
00277 return released;
00278 }
00279
00280 void
00281 OpenDDS::DCPS::InstanceState::release()
00282 {
00283 this->reader_->release_instance(this->handle_);
00284 }
00285
00286 void
00287 OpenDDS::DCPS::InstanceState::set_owner (const PublicationId& owner)
00288 {
00289 this->owner_ = owner;
00290 }
00291
00292 OpenDDS::DCPS::PublicationId&
00293 OpenDDS::DCPS::InstanceState::get_owner ()
00294 {
00295 return this->owner_;
00296 }
00297
00298 bool
00299 OpenDDS::DCPS::InstanceState::is_exclusive () const
00300 {
00301 return this->exclusive_;
00302 }
00303
00304 bool
00305 OpenDDS::DCPS::InstanceState::registered()
00306 {
00307 bool ret = this->registered_;
00308 this->registered_ = true;
00309 return ret;
00310 }
00311
00312 void
00313 OpenDDS::DCPS::InstanceState::registered (bool flag)
00314 {
00315 this->registered_ = flag;
00316 }
00317
00318 void
00319 OpenDDS::DCPS::InstanceState::reset_ownership (::DDS::InstanceHandle_t instance)
00320 {
00321 this->owner_ = GUID_UNKNOWN;
00322 this->registered_ = false;
00323
00324 this->reader_->reset_ownership(instance);
00325 }
00326
00327 OPENDDS_STRING
00328 OpenDDS::DCPS::InstanceState::instance_state_string(DDS::InstanceStateKind value)
00329 {
00330 switch (value) {
00331 case DDS::ALIVE_INSTANCE_STATE:
00332 return OPENDDS_STRING("ALIVE_INSTANCE_STATE");
00333 case DDS::NOT_ALIVE_INSTANCE_STATE:
00334 return OPENDDS_STRING("NOT_ALIVE_INSTANCE_STATE");
00335 case DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE:
00336 return OPENDDS_STRING("NOT_ALIVE_DISPOSED_INSTANCE_STATE");
00337 case DDS::NOT_ALIVE_NO_WRITERS_INSTANCE_STATE:
00338 return OPENDDS_STRING("NOT_ALIVE_NO_WRITERS_INSTANCE_STATE");
00339 case DDS::ANY_INSTANCE_STATE:
00340 return OPENDDS_STRING("ANY_INSTANCE_STATE");
00341 default:
00342 ACE_ERROR((LM_ERROR,
00343 ACE_TEXT(
00344 "(%P|%t) ERROR: OpenDDS::DCPS::InstanceState::instance_state_string(): "
00345 "%d is either completely invalid or at least not defined in this function.\n"
00346 ),
00347 value
00348 ));
00349
00350 return OPENDDS_STRING("(Unknown Instance State: ") + to_dds_string(value) + ")";
00351 }
00352 }
00353
00354 OPENDDS_END_VERSIONED_NAMESPACE_DECL