InstanceState.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
00009 #include "ace/Event_Handler.h"
00010 #include "ace/Reactor.h"
00011 #include "InstanceState.h"
00012 #include "DataReaderImpl.h"
00013 #include "SubscriptionInstance.h"
00014 #include "ReceivedDataElementList.h"
00015 #include "Time_Helper.h"
00016 #include "DomainParticipantImpl.h"
00017 #include "GuidConverter.h"
00018 
00019 #if !defined (__ACE_INLINE__)
00020 # include "InstanceState.inl"
00021 #endif /* !__ACE_INLINE__ */
00022 
00023 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00024 
00025 OpenDDS::DCPS::InstanceState::InstanceState(DataReaderImpl* reader,
00026                                             ACE_Recursive_Thread_Mutex& lock,
00027                                             DDS::InstanceHandle_t handle)
00028   : lock_(lock),
00029     instance_state_(0),
00030     view_state_(0),
00031     disposed_generation_count_(0),
00032     no_writers_generation_count_(0),
00033     empty_(true),
00034     release_pending_(false),
00035     release_timer_id_(-1),
00036     reader_(reader),
00037     handle_(handle),
00038     owner_(GUID_UNKNOWN),
00039 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00040     exclusive_(reader->qos_.ownership.kind == ::DDS::EXCLUSIVE_OWNERSHIP_QOS),
00041 #endif
00042     registered_ (false)
00043 {}
00044 
00045 OpenDDS::DCPS::InstanceState::~InstanceState()
00046 {
00047   cancel_release();
00048 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00049   if (registered_) {
00050     DataReaderImpl::OwnershipManagerPtr om = reader_->ownership_manager();
00051     if (om) om->remove_instance(this);
00052   }
00053 #endif
00054 }
00055 
00056 void OpenDDS::DCPS::InstanceState::sample_info(DDS::SampleInfo& si,
00057                                                const ReceivedDataElement* de)
00058 {
00059   si.sample_state = de->sample_state_;
00060   si.view_state = view_state_;
00061   si.instance_state = instance_state_;
00062   si.disposed_generation_count =
00063     static_cast<CORBA::Long>(disposed_generation_count_);
00064   si.no_writers_generation_count =
00065     static_cast<CORBA::Long>(no_writers_generation_count_);
00066   si.source_timestamp = de->source_timestamp_;
00067   si.instance_handle = handle_;
00068   RcHandle<DomainParticipantImpl> participant = this->reader_->participant_servant_.lock();
00069   si.publication_handle = participant ? participant->id_to_handle(de->pub_) : 0;
00070   si.valid_data = de->registered_data_ != 0;
00071   /*
00072    * These are actually calculated later...
00073    */
00074   si.sample_rank = 0;
00075 
00076   // these aren't the real value, they're being saved
00077   // for a later calculation. the actual value is
00078   // calculated in DataReaderImpl::sample_info using
00079   // these values.
00080   si.generation_rank =
00081     static_cast<CORBA::Long>(de->disposed_generation_count_ +
00082                              de->no_writers_generation_count_);
00083   si.absolute_generation_rank =
00084     static_cast<CORBA::Long>(de->disposed_generation_count_ +
00085                              de->no_writers_generation_count_);
00086 
00087   si.opendds_reserved_publication_seq = de->sequence_.getValue();
00088 }
00089 
00090 // cannot ACE_INLINE because of #include loop
00091 
00092 int
00093 OpenDDS::DCPS::InstanceState::handle_timeout(const ACE_Time_Value& /* current_time */,
00094                                              const void* /* arg */)
00095 {
00096   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00097     ACE_DEBUG((LM_NOTICE,
00098                ACE_TEXT("(%P|%t) NOTICE:")
00099                ACE_TEXT(" InstanceState::handle_timeout:")
00100                ACE_TEXT(" autopurging samples with instance handle 0x%x!\n"),
00101                this->handle_));
00102   }
00103   this->release();
00104 
00105   return 0;
00106 }
00107 
00108 bool
00109 OpenDDS::DCPS::InstanceState::dispose_was_received(const PublicationId& writer_id)
00110 {
00111   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00112                    guard, this->lock_, false);
00113 
00114   writers_.erase(writer_id);
00115 
00116   //
00117   // Manage the instance state on disposal here.
00118   //
00119   // If disposed by owner then the owner is not re-elected, it can
00120   // resume if the writer sends message again.
00121   if (this->instance_state_ & DDS::ALIVE_INSTANCE_STATE) {
00122 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00123     DataReaderImpl::OwnershipManagerPtr owner_manager = this->reader_->ownership_manager();
00124     if (! this->exclusive_
00125       || (owner_manager && owner_manager->is_owner (this->handle_, writer_id))) {
00126 #endif
00127       this->instance_state_ = DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE;
00128       schedule_release();
00129       return true;
00130 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00131     }
00132 #endif
00133   }
00134 
00135   return false;
00136 }
00137 
00138 bool
00139 OpenDDS::DCPS::InstanceState::unregister_was_received(const PublicationId& writer_id)
00140 {
00141   if (OpenDDS::DCPS::DCPS_debug_level > 1) {
00142     GuidConverter conv(writer_id);
00143     ACE_DEBUG((LM_DEBUG,
00144       ACE_TEXT(
00145         "(%P|%t) InstanceState::unregister_was_received on %C\n"
00146       ),
00147       OPENDDS_STRING(conv).c_str()
00148     ));
00149   }
00150 
00151   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00152                    guard, this->lock_, false);
00153   writers_.erase(writer_id);
00154 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00155   if (this->exclusive_) {
00156     // If unregistered by owner then the ownership should be transferred to another
00157     // writer.
00158     DataReaderImpl::OwnershipManagerPtr owner_manager = this->reader_->ownership_manager();
00159     if (owner_manager)
00160       owner_manager->remove_writer (this->handle_, writer_id);
00161   }
00162 #endif
00163 
00164   if (writers_.empty() && (this->instance_state_ & DDS::ALIVE_INSTANCE_STATE)) {
00165     this->instance_state_ = DDS::NOT_ALIVE_NO_WRITERS_INSTANCE_STATE;
00166     schedule_release();
00167     return true;
00168   }
00169 
00170   return false;
00171 }
00172 
00173 void
00174 OpenDDS::DCPS::InstanceState::writer_became_dead(
00175   const PublicationId&  writer_id,
00176   int                   /*num_alive_writers*/,
00177   const ACE_Time_Value& /* when */)
00178 {
00179   if (OpenDDS::DCPS::DCPS_debug_level > 1) {
00180     GuidConverter conv(writer_id);
00181     ACE_DEBUG((LM_DEBUG,
00182       ACE_TEXT(
00183         "(%P|%t) InstanceState::writer_became_dead on %C\n"
00184       ),
00185       OPENDDS_STRING(conv).c_str()
00186     ));
00187   }
00188 
00189   ACE_GUARD(ACE_Recursive_Thread_Mutex,
00190             guard, this->lock_);
00191   writers_.erase(writer_id);
00192 
00193   if (writers_.empty() && this->instance_state_ & DDS::ALIVE_INSTANCE_STATE) {
00194     this->instance_state_ = DDS::NOT_ALIVE_NO_WRITERS_INSTANCE_STATE;
00195     schedule_release();
00196   }
00197 }
00198 
00199 void
00200 OpenDDS::DCPS::InstanceState::schedule_pending()
00201 {
00202   this->release_pending_ = true;
00203 }
00204 
00205 void
00206 OpenDDS::DCPS::InstanceState::schedule_release()
00207 {
00208   DDS::DataReaderQos qos;
00209   this->reader_->get_qos(qos);
00210 
00211   DDS::Duration_t delay;
00212 
00213   switch (this->instance_state_) {
00214   case DDS::NOT_ALIVE_NO_WRITERS_INSTANCE_STATE:
00215     delay = qos.reader_data_lifecycle.autopurge_nowriter_samples_delay;
00216     break;
00217 
00218   case DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE:
00219     delay = qos.reader_data_lifecycle.autopurge_disposed_samples_delay;
00220     break;
00221 
00222   default:
00223     ACE_ERROR((LM_ERROR,
00224                ACE_TEXT("(%P|%t) ERROR: InstanceState::schedule_release:")
00225                ACE_TEXT(" Unsupported instance state: %d!\n"),
00226                this->instance_state_));
00227     return;
00228   }
00229 
00230   if (delay.sec != DDS::DURATION_INFINITE_SEC &&
00231       delay.nanosec != DDS::DURATION_INFINITE_NSEC) {
00232     cancel_release();
00233 
00234     ACE_Reactor_Timer_Interface* reactor = this->reader_->get_reactor();
00235 
00236     this->release_timer_id_ =
00237       reactor->schedule_timer(this, 0, duration_to_time_value(delay));
00238 
00239     if (this->release_timer_id_ == -1) {
00240       ACE_ERROR((LM_ERROR,
00241                  ACE_TEXT("(%P|%t) ERROR: InstanceState::schedule_release:")
00242                  ACE_TEXT(" Unable to schedule timer!\n")));
00243     }
00244 
00245   } else {
00246     // N.B. instance transitions are always followed by a non-valid
00247     // sample being queued to the ReceivedDataElementList; marking
00248     // the release as pending prevents this sample from being lost
00249     // if all samples have been already removed from the instance.
00250     schedule_pending();
00251   }
00252 }
00253 
00254 void
00255 OpenDDS::DCPS::InstanceState::cancel_release()
00256 {
00257   this->release_pending_ = false;
00258 
00259   if (this->release_timer_id_ != -1) {
00260     ACE_Reactor_Timer_Interface* reactor = this->reader_->get_reactor();
00261     reactor->cancel_timer(this->release_timer_id_);
00262 
00263     this->release_timer_id_ = -1;
00264   }
00265 }
00266 
00267 bool
00268 OpenDDS::DCPS::InstanceState::release_if_empty()
00269 {
00270   bool released = false;
00271   if (this->empty_ && this->writers_.empty()) {
00272     release();
00273     released = true;
00274   } else {
00275     schedule_pending();
00276   }
00277   return released;
00278 }
00279 
00280 void
00281 OpenDDS::DCPS::InstanceState::release()
00282 {
00283   this->reader_->release_instance(this->handle_);
00284 }
00285 
00286 void
00287 OpenDDS::DCPS::InstanceState::set_owner (const PublicationId& owner)
00288 {
00289   this->owner_ = owner;
00290 }
00291 
00292 OpenDDS::DCPS::PublicationId&
00293 OpenDDS::DCPS::InstanceState::get_owner ()
00294 {
00295   return this->owner_;
00296 }
00297 
00298 bool
00299 OpenDDS::DCPS::InstanceState::is_exclusive () const
00300 {
00301   return this->exclusive_;
00302 }
00303 
00304 bool
00305 OpenDDS::DCPS::InstanceState::registered()
00306 {
00307   bool ret = this->registered_;
00308   this->registered_ = true;
00309   return ret;
00310 }
00311 
00312 void
00313 OpenDDS::DCPS::InstanceState::registered (bool flag)
00314 {
00315   this->registered_ = flag;
00316 }
00317 
00318 void
00319 OpenDDS::DCPS::InstanceState::reset_ownership (::DDS::InstanceHandle_t instance)
00320 {
00321   this->owner_ = GUID_UNKNOWN;
00322   this->registered_ = false;
00323 
00324   this->reader_->reset_ownership(instance);
00325 }
00326 
00327 OPENDDS_STRING
00328 OpenDDS::DCPS::InstanceState::instance_state_string(DDS::InstanceStateKind value)
00329 {
00330   switch (value) {
00331   case DDS::ALIVE_INSTANCE_STATE:
00332     return OPENDDS_STRING("ALIVE_INSTANCE_STATE");
00333   case DDS::NOT_ALIVE_INSTANCE_STATE:
00334     return OPENDDS_STRING("NOT_ALIVE_INSTANCE_STATE");
00335   case DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE:
00336     return OPENDDS_STRING("NOT_ALIVE_DISPOSED_INSTANCE_STATE");
00337   case DDS::NOT_ALIVE_NO_WRITERS_INSTANCE_STATE:
00338     return OPENDDS_STRING("NOT_ALIVE_NO_WRITERS_INSTANCE_STATE");
00339   case DDS::ANY_INSTANCE_STATE:
00340     return OPENDDS_STRING("ANY_INSTANCE_STATE");
00341   default:
00342     ACE_ERROR((LM_ERROR,
00343       ACE_TEXT(
00344         "(%P|%t) ERROR: OpenDDS::DCPS::InstanceState::instance_state_string(): "
00345         "%d is either completely invalid or at least not defined in this function.\n"
00346       ),
00347       value
00348     ));
00349 
00350     return OPENDDS_STRING("(Unknown Instance State: ") + to_dds_string(value) + ")";
00351   }
00352 }
00353 
00354 OPENDDS_END_VERSIONED_NAMESPACE_DECL
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1