00001
00002
00003
00004
00005
00006
00007
00008 #include "DCPS/DdsDcps_pch.h"
00009 #include "ace/Event_Handler.h"
00010 #include "ace/Reactor.h"
00011 #include "InstanceState.h"
00012 #include "DataReaderImpl.h"
00013 #include "SubscriptionInstance.h"
00014 #include "ReceivedDataElementList.h"
00015 #include "Qos_Helper.h"
00016 #include "DomainParticipantImpl.h"
00017
00018 #if !defined (__ACE_INLINE__)
00019 # include "InstanceState.inl"
00020 #endif
00021
00022 OpenDDS::DCPS::InstanceState::InstanceState(DataReaderImpl* reader,
00023 ACE_Recursive_Thread_Mutex& lock,
00024 DDS::InstanceHandle_t handle)
00025 : lock_(lock),
00026 instance_state_(0),
00027 view_state_(0),
00028 disposed_generation_count_(0),
00029 no_writers_generation_count_(0),
00030 empty_(true),
00031 release_pending_(false),
00032 release_timer_id_(-1),
00033 reader_(reader),
00034 handle_(handle),
00035 owner_(GUID_UNKNOWN),
00036 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00037 exclusive_(reader->qos_.ownership.kind == ::DDS::EXCLUSIVE_OWNERSHIP_QOS),
00038 #endif
00039 registered_ (false)
00040 {}
00041
00042 OpenDDS::DCPS::InstanceState::~InstanceState()
00043 {
00044 cancel_release();
00045 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00046 if (registered_) {
00047 OwnershipManager* om = reader_->ownership_manager();
00048 if (om) om->remove_instance(this);
00049 }
00050 #endif
00051 }
00052
00053 void OpenDDS::DCPS::InstanceState::sample_info(DDS::SampleInfo& si,
00054 const ReceivedDataElement* de)
00055 {
00056 si.sample_state = de->sample_state_;
00057 si.view_state = view_state_;
00058 si.instance_state = instance_state_;
00059 si.disposed_generation_count =
00060 static_cast<CORBA::Long>(disposed_generation_count_);
00061 si.no_writers_generation_count =
00062 static_cast<CORBA::Long>(no_writers_generation_count_);
00063 si.source_timestamp = de->source_timestamp_;
00064 si.instance_handle = handle_;
00065 si.publication_handle = this->reader_->participant_servant_->id_to_handle(de->pub_);
00066 si.valid_data = de->registered_data_ != 0;
00067
00068
00069
00070 si.sample_rank = 0;
00071
00072
00073
00074
00075
00076 si.generation_rank =
00077 static_cast<CORBA::Long>(de->disposed_generation_count_ +
00078 de->no_writers_generation_count_);
00079 si.absolute_generation_rank =
00080 static_cast<CORBA::Long>(de->disposed_generation_count_ +
00081 de->no_writers_generation_count_);
00082
00083 si.opendds_reserved_publication_seq = de->sequence_.getValue();
00084 }
00085
00086
00087
00088 int
00089 OpenDDS::DCPS::InstanceState::handle_timeout(const ACE_Time_Value& ,
00090 const void* )
00091 {
00092 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00093 ACE_DEBUG((LM_NOTICE,
00094 ACE_TEXT("(%P|%t) NOTICE:")
00095 ACE_TEXT(" InstanceState::handle_timeout:")
00096 ACE_TEXT(" autopurging samples with instance handle 0x%x!\n"),
00097 this->handle_));
00098 }
00099 this->release();
00100
00101 return 0;
00102 }
00103
00104 bool
00105 OpenDDS::DCPS::InstanceState::dispose_was_received(const PublicationId& writer_id)
00106 {
00107 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00108 guard, this->lock_, false);
00109
00110 writers_.erase(writer_id);
00111
00112
00113
00114
00115
00116
00117 if (this->instance_state_ & DDS::ALIVE_INSTANCE_STATE) {
00118 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00119 if (! this->exclusive_
00120 || this->reader_->owner_manager_->is_owner (this->handle_, writer_id)) {
00121 #endif
00122 this->instance_state_ = DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE;
00123 schedule_release();
00124 return true;
00125 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00126 }
00127 #endif
00128 }
00129
00130 return false;
00131 }
00132
00133 bool
00134 OpenDDS::DCPS::InstanceState::unregister_was_received(const PublicationId& writer_id)
00135 {
00136 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00137 guard, this->lock_, false);
00138 writers_.erase(writer_id);
00139 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00140 if (this->exclusive_) {
00141
00142
00143 (void) this->reader_->owner_manager_->remove_writer (
00144 this->handle_, writer_id);
00145 }
00146 #endif
00147
00148 if (writers_.empty() && (this->instance_state_ & DDS::ALIVE_INSTANCE_STATE)) {
00149 this->instance_state_ = DDS::NOT_ALIVE_NO_WRITERS_INSTANCE_STATE;
00150 schedule_release();
00151 return true;
00152 }
00153
00154 return false;
00155 }
00156
00157 void
00158 OpenDDS::DCPS::InstanceState::writer_became_dead(
00159 const PublicationId& writer_id,
00160 int ,
00161 const ACE_Time_Value& )
00162 {
00163 ACE_GUARD(ACE_Recursive_Thread_Mutex,
00164 guard, this->lock_);
00165 writers_.erase(writer_id);
00166
00167 if (writers_.empty() && this->instance_state_ & DDS::ALIVE_INSTANCE_STATE) {
00168 this->instance_state_ = DDS::NOT_ALIVE_NO_WRITERS_INSTANCE_STATE;
00169 schedule_release();
00170 }
00171 }
00172
00173 void
00174 OpenDDS::DCPS::InstanceState::schedule_pending()
00175 {
00176 this->release_pending_ = true;
00177 }
00178
00179 void
00180 OpenDDS::DCPS::InstanceState::schedule_release()
00181 {
00182 DDS::DataReaderQos qos;
00183 this->reader_->get_qos(qos);
00184
00185 DDS::Duration_t delay;
00186
00187 switch (this->instance_state_) {
00188 case DDS::NOT_ALIVE_NO_WRITERS_INSTANCE_STATE:
00189 delay = qos.reader_data_lifecycle.autopurge_nowriter_samples_delay;
00190 break;
00191
00192 case DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE:
00193 delay = qos.reader_data_lifecycle.autopurge_disposed_samples_delay;
00194 break;
00195
00196 default:
00197 ACE_ERROR((LM_ERROR,
00198 ACE_TEXT("(%P|%t) ERROR: InstanceState::schedule_release:")
00199 ACE_TEXT(" Unsupported instance state: %d!\n"),
00200 this->instance_state_));
00201 return;
00202 }
00203
00204 if (delay.sec != DDS::DURATION_INFINITE_SEC &&
00205 delay.nanosec != DDS::DURATION_INFINITE_NSEC) {
00206 cancel_release();
00207
00208 ACE_Reactor_Timer_Interface* reactor = this->reader_->get_reactor();
00209
00210 this->release_timer_id_ =
00211 reactor->schedule_timer(this, 0, duration_to_time_value(delay));
00212
00213 if (this->release_timer_id_ == -1) {
00214 ACE_ERROR((LM_ERROR,
00215 ACE_TEXT("(%P|%t) ERROR: InstanceState::schedule_release:")
00216 ACE_TEXT(" Unable to schedule timer!\n")));
00217 }
00218
00219 } else {
00220
00221
00222
00223
00224 schedule_pending();
00225 }
00226 }
00227
00228 void
00229 OpenDDS::DCPS::InstanceState::cancel_release()
00230 {
00231 this->release_pending_ = false;
00232
00233 if (this->release_timer_id_ != -1) {
00234 ACE_Reactor_Timer_Interface* reactor = this->reader_->get_reactor();
00235 reactor->cancel_timer(this->release_timer_id_);
00236
00237 this->release_timer_id_ = -1;
00238 }
00239 }
00240
00241 bool
00242 OpenDDS::DCPS::InstanceState::release_if_empty()
00243 {
00244 bool released = false;
00245 if (this->empty_ && this->writers_.empty()) {
00246 release();
00247 released = true;
00248 } else {
00249 schedule_pending();
00250 }
00251 return released;
00252 }
00253
00254 void
00255 OpenDDS::DCPS::InstanceState::release()
00256 {
00257 this->reader_->release_instance(this->handle_);
00258 }
00259
00260 void
00261 OpenDDS::DCPS::InstanceState::set_owner (const PublicationId& owner)
00262 {
00263 this->owner_ = owner;
00264 }
00265
00266 OpenDDS::DCPS::PublicationId&
00267 OpenDDS::DCPS::InstanceState::get_owner ()
00268 {
00269 return this->owner_;
00270 }
00271
00272 bool
00273 OpenDDS::DCPS::InstanceState::is_exclusive () const
00274 {
00275 return this->exclusive_;
00276 }
00277
00278 bool
00279 OpenDDS::DCPS::InstanceState::registered()
00280 {
00281 bool ret = this->registered_;
00282 this->registered_ = true;
00283 return ret;
00284 }
00285
00286 void
00287 OpenDDS::DCPS::InstanceState::registered (bool flag)
00288 {
00289 this->registered_ = flag;
00290 }
00291
00292 void
00293 OpenDDS::DCPS::InstanceState::reset_ownership (::DDS::InstanceHandle_t instance)
00294 {
00295 this->owner_ = GUID_UNKNOWN;
00296 this->registered_ = false;
00297
00298 this->reader_->reset_ownership(instance);
00299 }
00300
00301