InstanceState.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
00009 #include "ace/Event_Handler.h"
00010 #include "ace/Reactor.h"
00011 #include "InstanceState.h"
00012 #include "DataReaderImpl.h"
00013 #include "SubscriptionInstance.h"
00014 #include "ReceivedDataElementList.h"
00015 #include "Qos_Helper.h"
00016 #include "DomainParticipantImpl.h"
00017 
00018 #if !defined (__ACE_INLINE__)
00019 # include "InstanceState.inl"
00020 #endif /* !__ACE_INLINE__ */
00021 
00022 OpenDDS::DCPS::InstanceState::InstanceState(DataReaderImpl* reader,
00023                                             ACE_Recursive_Thread_Mutex& lock,
00024                                             DDS::InstanceHandle_t handle)
00025   : lock_(lock),
00026     instance_state_(0),
00027     view_state_(0),
00028     disposed_generation_count_(0),
00029     no_writers_generation_count_(0),
00030     empty_(true),
00031     release_pending_(false),
00032     release_timer_id_(-1),
00033     reader_(reader),
00034     handle_(handle),
00035     owner_(GUID_UNKNOWN),
00036 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00037     exclusive_(reader->qos_.ownership.kind == ::DDS::EXCLUSIVE_OWNERSHIP_QOS),
00038 #endif
00039     registered_ (false)
00040 {}
00041 
00042 OpenDDS::DCPS::InstanceState::~InstanceState()
00043 {
00044   cancel_release();
00045 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00046   if (registered_) {
00047     OwnershipManager* om = reader_->ownership_manager();
00048     if (om) om->remove_instance(this);
00049   }
00050 #endif
00051 }
00052 
00053 void OpenDDS::DCPS::InstanceState::sample_info(DDS::SampleInfo& si,
00054                                                const ReceivedDataElement* de)
00055 {
00056   si.sample_state = de->sample_state_;
00057   si.view_state = view_state_;
00058   si.instance_state = instance_state_;
00059   si.disposed_generation_count =
00060     static_cast<CORBA::Long>(disposed_generation_count_);
00061   si.no_writers_generation_count =
00062     static_cast<CORBA::Long>(no_writers_generation_count_);
00063   si.source_timestamp = de->source_timestamp_;
00064   si.instance_handle = handle_;
00065   si.publication_handle = this->reader_->participant_servant_->id_to_handle(de->pub_);
00066   si.valid_data = de->registered_data_ != 0;
00067   /*
00068    * These are actually calculated later...
00069    */
00070   si.sample_rank = 0;
00071 
00072   // these aren't the real value, they're being saved
00073   // for a later calculation. the actual value is
00074   // calculated in DataReaderImpl::sample_info using
00075   // these values.
00076   si.generation_rank =
00077     static_cast<CORBA::Long>(de->disposed_generation_count_ +
00078                              de->no_writers_generation_count_);
00079   si.absolute_generation_rank =
00080     static_cast<CORBA::Long>(de->disposed_generation_count_ +
00081                              de->no_writers_generation_count_);
00082 
00083   si.opendds_reserved_publication_seq = de->sequence_.getValue();
00084 }
00085 
00086 // cannot ACE_INLINE because of #include loop
00087 
00088 int
00089 OpenDDS::DCPS::InstanceState::handle_timeout(const ACE_Time_Value& /* current_time */,
00090                                              const void* /* arg */)
00091 {
00092   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00093     ACE_DEBUG((LM_NOTICE,
00094                ACE_TEXT("(%P|%t) NOTICE:")
00095                ACE_TEXT(" InstanceState::handle_timeout:")
00096                ACE_TEXT(" autopurging samples with instance handle 0x%x!\n"),
00097                this->handle_));
00098   }
00099   this->release();
00100 
00101   return 0;
00102 }
00103 
00104 bool
00105 OpenDDS::DCPS::InstanceState::dispose_was_received(const PublicationId& writer_id)
00106 {
00107   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00108                    guard, this->lock_, false);
00109 
00110   writers_.erase(writer_id);
00111 
00112   //
00113   // Manage the instance state on disposal here.
00114   //
00115   // If disposed by owner then the owner is not re-elected, it can
00116   // resume if the writer sends message again.
00117   if (this->instance_state_ & DDS::ALIVE_INSTANCE_STATE) {
00118 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00119     if (! this->exclusive_
00120       || this->reader_->owner_manager_->is_owner (this->handle_, writer_id)) {
00121 #endif
00122       this->instance_state_ = DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE;
00123       schedule_release();
00124       return true;
00125 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00126     }
00127 #endif
00128   }
00129 
00130   return false;
00131 }
00132 
00133 bool
00134 OpenDDS::DCPS::InstanceState::unregister_was_received(const PublicationId& writer_id)
00135 {
00136   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00137                    guard, this->lock_, false);
00138   writers_.erase(writer_id);
00139 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00140   if (this->exclusive_) {
00141     // If unregistered by owner then the ownership should be transferred to another
00142     // writer.
00143     (void) this->reader_->owner_manager_->remove_writer (
00144              this->handle_, writer_id);
00145   }
00146 #endif
00147 
00148   if (writers_.empty() && (this->instance_state_ & DDS::ALIVE_INSTANCE_STATE)) {
00149     this->instance_state_ = DDS::NOT_ALIVE_NO_WRITERS_INSTANCE_STATE;
00150     schedule_release();
00151     return true;
00152   }
00153 
00154   return false;
00155 }
00156 
00157 void
00158 OpenDDS::DCPS::InstanceState::writer_became_dead(
00159   const PublicationId&  writer_id,
00160   int                   /*num_alive_writers*/,
00161   const ACE_Time_Value& /* when */)
00162 {
00163   ACE_GUARD(ACE_Recursive_Thread_Mutex,
00164             guard, this->lock_);
00165   writers_.erase(writer_id);
00166 
00167   if (writers_.empty() && this->instance_state_ & DDS::ALIVE_INSTANCE_STATE) {
00168     this->instance_state_ = DDS::NOT_ALIVE_NO_WRITERS_INSTANCE_STATE;
00169     schedule_release();
00170   }
00171 }
00172 
00173 void
00174 OpenDDS::DCPS::InstanceState::schedule_pending()
00175 {
00176   this->release_pending_ = true;
00177 }
00178 
00179 void
00180 OpenDDS::DCPS::InstanceState::schedule_release()
00181 {
00182   DDS::DataReaderQos qos;
00183   this->reader_->get_qos(qos);
00184 
00185   DDS::Duration_t delay;
00186 
00187   switch (this->instance_state_) {
00188   case DDS::NOT_ALIVE_NO_WRITERS_INSTANCE_STATE:
00189     delay = qos.reader_data_lifecycle.autopurge_nowriter_samples_delay;
00190     break;
00191 
00192   case DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE:
00193     delay = qos.reader_data_lifecycle.autopurge_disposed_samples_delay;
00194     break;
00195 
00196   default:
00197     ACE_ERROR((LM_ERROR,
00198                ACE_TEXT("(%P|%t) ERROR: InstanceState::schedule_release:")
00199                ACE_TEXT(" Unsupported instance state: %d!\n"),
00200                this->instance_state_));
00201     return;
00202   }
00203 
00204   if (delay.sec != DDS::DURATION_INFINITE_SEC &&
00205       delay.nanosec != DDS::DURATION_INFINITE_NSEC) {
00206     cancel_release();
00207 
00208     ACE_Reactor_Timer_Interface* reactor = this->reader_->get_reactor();
00209 
00210     this->release_timer_id_ =
00211       reactor->schedule_timer(this, 0, duration_to_time_value(delay));
00212 
00213     if (this->release_timer_id_ == -1) {
00214       ACE_ERROR((LM_ERROR,
00215                  ACE_TEXT("(%P|%t) ERROR: InstanceState::schedule_release:")
00216                  ACE_TEXT(" Unable to schedule timer!\n")));
00217     }
00218 
00219   } else {
00220     // N.B. instance transitions are always followed by a non-valid
00221     // sample being queued to the ReceivedDataElementList; marking
00222     // the release as pending prevents this sample from being lost
00223     // if all samples have been already removed from the instance.
00224     schedule_pending();
00225   }
00226 }
00227 
00228 void
00229 OpenDDS::DCPS::InstanceState::cancel_release()
00230 {
00231   this->release_pending_ = false;
00232 
00233   if (this->release_timer_id_ != -1) {
00234     ACE_Reactor_Timer_Interface* reactor = this->reader_->get_reactor();
00235     reactor->cancel_timer(this->release_timer_id_);
00236 
00237     this->release_timer_id_ = -1;
00238   }
00239 }
00240 
00241 bool
00242 OpenDDS::DCPS::InstanceState::release_if_empty()
00243 {
00244   bool released = false;
00245   if (this->empty_ && this->writers_.empty()) {
00246     release();
00247     released = true;
00248   } else {
00249     schedule_pending();
00250   }
00251   return released;
00252 }
00253 
00254 void
00255 OpenDDS::DCPS::InstanceState::release()
00256 {
00257   this->reader_->release_instance(this->handle_);
00258 }
00259 
00260 void
00261 OpenDDS::DCPS::InstanceState::set_owner (const PublicationId& owner)
00262 {
00263   this->owner_ = owner;
00264 }
00265 
00266 OpenDDS::DCPS::PublicationId&
00267 OpenDDS::DCPS::InstanceState::get_owner ()
00268 {
00269   return this->owner_;
00270 }
00271 
00272 bool
00273 OpenDDS::DCPS::InstanceState::is_exclusive () const
00274 {
00275   return this->exclusive_;
00276 }
00277 
00278 bool
00279 OpenDDS::DCPS::InstanceState::registered()
00280 {
00281   bool ret = this->registered_;
00282   this->registered_ = true;
00283   return ret;
00284 }
00285 
00286 void
00287 OpenDDS::DCPS::InstanceState::registered (bool flag)
00288 {
00289   this->registered_ = flag;
00290 }
00291 
00292 void
00293 OpenDDS::DCPS::InstanceState::reset_ownership (::DDS::InstanceHandle_t instance)
00294 {
00295   this->owner_ = GUID_UNKNOWN;
00296   this->registered_ = false;
00297 
00298   this->reader_->reset_ownership(instance);
00299 }
00300 
00301 

Generated on Fri Feb 12 20:05:23 2016 for OpenDDS by  doxygen 1.4.7