WriterInfo.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
00008 #include "dcps_export.h"
00009 
00010 #include "GuidConverter.h"
00011 #include "WriterInfo.h"
00012 #include "Qos_Helper.h"
00013 
00014 #include "ace/OS_NS_sys_time.h"
00015 
00016 
00017 namespace OpenDDS {
00018 namespace DCPS {
00019 
00020 WriterInfoListener::WriterInfoListener()
00021   : subscription_id_(GUID_UNKNOWN),
00022   liveliness_lease_duration_(ACE_Time_Value::zero)
00023 {
00024 }
00025 
00026 WriterInfoListener::~WriterInfoListener()
00027 {
00028 }
00029 
00030 /// tell instances when a DataWriter transitions to being alive
00031 /// The writer state is inout parameter, it has to be set ALIVE before
00032 /// handle_timeout is called since some subroutine use the state.
00033 void
00034 WriterInfoListener::writer_became_alive(WriterInfo&,
00035                                         const ACE_Time_Value& )
00036 {
00037 }
00038 
00039 /// tell instances when a DataWriter transitions to DEAD
00040 /// The writer state is inout parameter, the state is set to DEAD
00041 /// when it returns.
00042 void
00043 WriterInfoListener::writer_became_dead(WriterInfo&,
00044                                        const ACE_Time_Value& )
00045 {
00046 }
00047 
00048 /// tell instance when a DataWriter is removed.
00049 /// The liveliness status need update.
00050 void
00051 WriterInfoListener::writer_removed(WriterInfo& )
00052 {
00053 }
00054 
00055 
00056 
00057 WriterInfo::WriterInfo(WriterInfoListener*         reader,
00058                        const PublicationId&        writer_id,
00059                        const ::DDS::DataWriterQos& writer_qos)
00060   : last_liveliness_activity_time_(ACE_OS::gettimeofday()),
00061   seen_data_(false),
00062   historic_samples_timer_(NO_TIMER),
00063   remove_association_timer_(NO_TIMER),
00064   removal_deadline_(ACE_Time_Value::zero),
00065   last_historic_seq_(SequenceNumber::SEQUENCENUMBER_UNKNOWN()),
00066   waiting_for_end_historic_samples_(false),
00067   scheduled_for_removal_(false),
00068   notify_lost_(false),
00069   state_(NOT_SET),
00070   reader_(reader),
00071   writer_id_(writer_id),
00072   writer_qos_(writer_qos),
00073   handle_(DDS::HANDLE_NIL)
00074 {
00075 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00076   this->reset_coherent_info();
00077 #endif
00078 
00079   if (DCPS_debug_level >= 5) {
00080     GuidConverter writer_converter(writer_id);
00081     GuidConverter reader_converter(reader->subscription_id_);
00082     ACE_DEBUG((LM_DEBUG,
00083                ACE_TEXT("(%P|%t) WriterInfo::WriterInfo: ")
00084                ACE_TEXT("writer %C added to reader %C.\n"),
00085                OPENDDS_STRING(writer_converter).c_str(),
00086                OPENDDS_STRING(reader_converter).c_str()));
00087   }
00088 }
00089 
00090 OPENDDS_STRING
00091 WriterInfo::get_state_str() const
00092 {
00093   OPENDDS_STRING ret;
00094   switch (this->state_) {
00095   case WriterInfo::NOT_SET:
00096     ret += "NOT_SET";
00097     break;
00098   case WriterInfo::ALIVE:
00099     ret += "ALIVE";
00100     break;
00101   case WriterInfo::DEAD:
00102     ret += "DEAD";
00103     break;
00104   default:
00105     ret += "UNSPECIFIED(";
00106     ret += to_dds_string(int(this->state_));
00107     ret += ")";
00108   }
00109   return ret.c_str();
00110 }
00111 
00112 void
00113 WriterInfo::clear_owner_evaluated ()
00114 {
00115   this->owner_evaluated_.clear ();
00116 }
00117 
00118 void
00119 WriterInfo::set_owner_evaluated (::DDS::InstanceHandle_t instance, bool flag)
00120 {
00121   if (flag ||
00122       (!flag && owner_evaluated_.find (instance) != owner_evaluated_.end ())) {
00123     this->owner_evaluated_ [instance] = flag;
00124   }
00125 }
00126 
00127 bool
00128 WriterInfo::is_owner_evaluated (::DDS::InstanceHandle_t instance)
00129 {
00130   OwnerEvaluateFlags::iterator iter = owner_evaluated_.find (instance);
00131   if (iter == owner_evaluated_.end ()) {
00132     this->owner_evaluated_.insert (OwnerEvaluateFlags::value_type (instance, false));
00133     return false;
00134   }
00135   else
00136     return iter->second;
00137 }
00138 
00139 ACE_Time_Value
00140 WriterInfo::check_activity(const ACE_Time_Value& now)
00141 {
00142   ACE_Time_Value expires_at = ACE_Time_Value::max_time;
00143 
00144   // We only need check the liveliness with the non-zero liveliness_lease_duration_.
00145   // if (state_ != DEAD && reader_->liveliness_lease_duration_ != ACE_Time_Value::zero)
00146   if (state_ == ALIVE && reader_->liveliness_lease_duration_ != ACE_Time_Value::zero) {
00147     expires_at = this->last_liveliness_activity_time_ +
00148                  reader_->liveliness_lease_duration_;
00149 
00150     if (expires_at <= now) {
00151       // let all instances know this write is not alive.
00152       reader_->writer_became_dead(*this, now);
00153       expires_at = ACE_Time_Value::max_time;
00154     }
00155   }
00156 
00157   return expires_at;
00158 }
00159 
00160 void
00161 WriterInfo::removed()
00162 {
00163   reader_->writer_removed(*this);
00164 }
00165 
00166 void
00167 WriterInfo::ack_sequence(SequenceNumber value)
00168 {
00169   // sample_lock_ is held by the caller.
00170   this->ack_sequence_.insert(value);
00171 }
00172 
00173 SequenceNumber
00174 WriterInfo::ack_sequence() const
00175 {
00176   // sample_lock_ is held by the caller.
00177   return this->ack_sequence_.cumulative_ack();
00178 }
00179 
00180 bool
00181 WriterInfo::active(ACE_Time_Value default_participant_timeout) const
00182 {
00183   // Need some period of time by which to decide if a writer the
00184   // DataReaderImpl knows about has gone 'inactive'.  Used to determine
00185   // if a remove_associations should remove immediately or wait to let
00186   // reader process more information that may have queued up from the writer
00187   // Over-arching max wait time for removal is controlled in the
00188   // RemoveAssociationSweeper (10 seconds), but on a per writer basis set
00189   // activity_wait_period based on:
00190   //     1) Reader's liveliness_lease_duration
00191   //     2) DCPSPendingTimeout value (if not zero)
00192   //     3) Writer's max blocking time (could be infinite, in which case
00193   //        RemoveAssociationSweeper will remove after its max wait)
00194   //     4) Zero - don't wait, simply remove association
00195   ACE_Time_Value activity_wait_period(default_participant_timeout);
00196   if (reader_->liveliness_lease_duration_ != ACE_Time_Value::zero) {
00197     activity_wait_period = reader_->liveliness_lease_duration_;
00198   }
00199   if (activity_wait_period == ACE_Time_Value::zero) {
00200       activity_wait_period = duration_to_time_value(writer_qos_.reliability.max_blocking_time);
00201   }
00202   if (activity_wait_period == ACE_Time_Value::zero) {
00203     return false;
00204   }
00205   return (ACE_OS::gettimeofday() - last_liveliness_activity_time_) <= activity_wait_period;
00206 }
00207 
00208 
00209 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00210 Coherent_State
00211 WriterInfo::coherent_change_received()
00212 {
00213   if (this->writer_coherent_samples_.num_samples_ == 0) {
00214     return NOT_COMPLETED_YET;
00215   }
00216 
00217   if (!this->coherent_sample_sequence_.disjoint()
00218       && (this->coherent_sample_sequence_.high()
00219           == this->writer_coherent_samples_.last_sample_)) {
00220     return COMPLETED;
00221   }
00222 
00223   if (this->coherent_sample_sequence_.high() >
00224       this->writer_coherent_samples_.last_sample_) {
00225     return REJECTED;
00226   }
00227 
00228   return NOT_COMPLETED_YET;
00229 }
00230 
00231 void
00232 WriterInfo::reset_coherent_info()
00233 {
00234   this->coherent_samples_ = 0;
00235   this->group_coherent_ = false;
00236   this->publisher_id_ = GUID_UNKNOWN;
00237   this->coherent_sample_sequence_.reset();
00238   this->writer_coherent_samples_.reset();
00239   this->group_coherent_samples_.clear();
00240 }
00241 
00242 
00243 void
00244 WriterInfo::set_group_info(const CoherentChangeControl& info)
00245 {
00246   if (!(this->publisher_id_ == info.publisher_id_)
00247       || this->group_coherent_ != info.group_coherent_) {
00248     GuidConverter sub_id(this->reader_->subscription_id_);
00249     GuidConverter pub_id(this->writer_id_);
00250     ACE_ERROR((LM_ERROR,
00251                ACE_TEXT("(%P|%t) ERROR: WriterInfo::set_group_info()")
00252                ACE_TEXT(" reader %C writer %C incorrect coherent info !\n"),
00253                OPENDDS_STRING(sub_id).c_str(),
00254                OPENDDS_STRING(pub_id).c_str()));
00255   }
00256 
00257   this->writer_coherent_samples_ = info.coherent_samples_;
00258   this->group_coherent_samples_ = info.group_coherent_samples_;
00259 }
00260 
00261 #endif  // OPENDDS_NO_OBJECT_MODEL_PROFILE
00262 
00263 }
00264 }

Generated on Fri Feb 12 20:05:29 2016 for OpenDDS by  doxygen 1.4.7