WriterInfo.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
00008 #include "dcps_export.h"
00009 
00010 #include "GuidConverter.h"
00011 #include "WriterInfo.h"
00012 #include "Time_Helper.h"
00013 #include "Service_Participant.h"
00014 
00015 #include "ace/OS_NS_sys_time.h"
00016 
00017 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00018 
00019 namespace OpenDDS {
00020 namespace DCPS {
00021 
00022 WriterInfoListener::WriterInfoListener()
00023   : subscription_id_(GUID_UNKNOWN),
00024   liveliness_lease_duration_(ACE_Time_Value::zero)
00025 {
00026 }
00027 
00028 WriterInfoListener::~WriterInfoListener()
00029 {
00030 }
00031 
00032 /// tell instances when a DataWriter transitions to being alive
00033 /// The writer state is inout parameter, it has to be set ALIVE before
00034 /// handle_timeout is called since some subroutine use the state.
00035 void
00036 WriterInfoListener::writer_became_alive(WriterInfo&,
00037                                         const ACE_Time_Value& )
00038 {
00039 }
00040 
00041 /// tell instances when a DataWriter transitions to DEAD
00042 /// The writer state is inout parameter, the state is set to DEAD
00043 /// when it returns.
00044 void
00045 WriterInfoListener::writer_became_dead(WriterInfo&,
00046                                        const ACE_Time_Value& )
00047 {
00048 }
00049 
00050 /// tell instance when a DataWriter is removed.
00051 /// The liveliness status need update.
00052 void
00053 WriterInfoListener::writer_removed(WriterInfo& )
00054 {
00055 }
00056 
00057 
00058 
00059 WriterInfo::WriterInfo(WriterInfoListener*         reader,
00060                        const PublicationId&        writer_id,
00061                        const ::DDS::DataWriterQos& writer_qos)
00062   : last_liveliness_activity_time_(ACE_OS::gettimeofday()),
00063   historic_samples_timer_(NO_TIMER),
00064   remove_association_timer_(NO_TIMER),
00065   removal_deadline_(ACE_Time_Value::zero),
00066   last_historic_seq_(SequenceNumber::SEQUENCENUMBER_UNKNOWN()),
00067   waiting_for_end_historic_samples_(false),
00068   scheduled_for_removal_(false),
00069   notify_lost_(false),
00070   state_(NOT_SET),
00071   reader_(reader),
00072   writer_id_(writer_id),
00073   writer_qos_(writer_qos),
00074   handle_(DDS::HANDLE_NIL)
00075 {
00076 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00077   this->reset_coherent_info();
00078 #endif
00079 
00080   if (DCPS_debug_level >= 5) {
00081     GuidConverter writer_converter(writer_id);
00082     GuidConverter reader_converter(reader->subscription_id_);
00083     ACE_DEBUG((LM_DEBUG,
00084                ACE_TEXT("(%P|%t) WriterInfo::WriterInfo: ")
00085                ACE_TEXT("writer %C added to reader %C.\n"),
00086                OPENDDS_STRING(writer_converter).c_str(),
00087                OPENDDS_STRING(reader_converter).c_str()));
00088   }
00089 }
00090 
00091 OPENDDS_STRING
00092 WriterInfo::get_state_str() const
00093 {
00094   OPENDDS_STRING ret;
00095   switch (this->state_) {
00096   case WriterInfo::NOT_SET:
00097     ret += "NOT_SET";
00098     break;
00099   case WriterInfo::ALIVE:
00100     ret += "ALIVE";
00101     break;
00102   case WriterInfo::DEAD:
00103     ret += "DEAD";
00104     break;
00105   default:
00106     ret += "UNSPECIFIED(";
00107     ret += to_dds_string(int(this->state_));
00108     ret += ")";
00109   }
00110   return ret.c_str();
00111 }
00112 
00113 void
00114 WriterInfo::clear_owner_evaluated ()
00115 {
00116   this->owner_evaluated_.clear ();
00117 }
00118 
00119 void
00120 WriterInfo::set_owner_evaluated (::DDS::InstanceHandle_t instance, bool flag)
00121 {
00122   if (flag ||
00123       (!flag && owner_evaluated_.find (instance) != owner_evaluated_.end ())) {
00124     this->owner_evaluated_ [instance] = flag;
00125   }
00126 }
00127 
00128 bool
00129 WriterInfo::is_owner_evaluated (::DDS::InstanceHandle_t instance)
00130 {
00131   OwnerEvaluateFlags::iterator iter = owner_evaluated_.find (instance);
00132   if (iter == owner_evaluated_.end ()) {
00133     this->owner_evaluated_.insert (OwnerEvaluateFlags::value_type (instance, false));
00134     return false;
00135   }
00136   else
00137     return iter->second;
00138 }
00139 
00140 ACE_Time_Value
00141 WriterInfo::check_activity(const ACE_Time_Value& now)
00142 {
00143   ACE_Time_Value expires_at = ACE_Time_Value::max_time;
00144 
00145   // We only need check the liveliness with the non-zero liveliness_lease_duration_.
00146   // if (state_ != DEAD && reader_->liveliness_lease_duration_ != ACE_Time_Value::zero)
00147   if (state_ == ALIVE && reader_->liveliness_lease_duration_ != ACE_Time_Value::zero) {
00148     expires_at = this->last_liveliness_activity_time_ +
00149                  reader_->liveliness_lease_duration_;
00150 
00151     if (expires_at <= now) {
00152       // let all instances know this write is not alive.
00153       reader_->writer_became_dead(*this, now);
00154       expires_at = ACE_Time_Value::max_time;
00155     }
00156   }
00157 
00158   return expires_at;
00159 }
00160 
00161 void
00162 WriterInfo::removed()
00163 {
00164   reader_->writer_removed(*this);
00165 }
00166 
00167 ACE_Time_Value WriterInfo::activity_wait_period() const
00168 {
00169   ACE_Time_Value activity_wait_period(TheServiceParticipant->pending_timeout());
00170   if (reader_->liveliness_lease_duration_ != ACE_Time_Value::zero) {
00171     activity_wait_period = reader_->liveliness_lease_duration_;
00172   }
00173   if (activity_wait_period == ACE_Time_Value::zero) {
00174       activity_wait_period = duration_to_time_value(writer_qos_.reliability.max_blocking_time);
00175   }
00176 
00177   return activity_wait_period;
00178 }
00179 
00180 
00181 bool
00182 WriterInfo::active() const
00183 {
00184   // Need some period of time by which to decide if a writer the
00185   // DataReaderImpl knows about has gone 'inactive'.  Used to determine
00186   // if a remove_associations should remove immediately or wait to let
00187   // reader process more information that may have queued up from the writer
00188   // Over-arching max wait time for removal is controlled in the
00189   // RemoveAssociationSweeper (10 seconds), but on a per writer basis set
00190   // activity_wait_period based on:
00191   //     1) Reader's liveliness_lease_duration
00192   //     2) DCPSPendingTimeout value (if not zero)
00193   //     3) Writer's max blocking time (could be infinite, in which case
00194   //        RemoveAssociationSweeper will remove after its max wait)
00195   //     4) Zero - don't wait, simply remove association
00196   ACE_Time_Value activity_wait_period = this->activity_wait_period();
00197 
00198   if (activity_wait_period == ACE_Time_Value::zero) {
00199     return false;
00200   }
00201   return (ACE_OS::gettimeofday() - last_liveliness_activity_time_) <= activity_wait_period;
00202 }
00203 
00204 
00205 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00206 Coherent_State
00207 WriterInfo::coherent_change_received()
00208 {
00209   if (this->writer_coherent_samples_.num_samples_ == 0) {
00210     return NOT_COMPLETED_YET;
00211   }
00212 
00213   if (!this->coherent_sample_sequence_.disjoint()
00214       && (this->coherent_sample_sequence_.high()
00215           == this->writer_coherent_samples_.last_sample_)) {
00216     return COMPLETED;
00217   }
00218 
00219   if (this->coherent_sample_sequence_.high() >
00220       this->writer_coherent_samples_.last_sample_) {
00221     return REJECTED;
00222   }
00223 
00224   return NOT_COMPLETED_YET;
00225 }
00226 
00227 void
00228 WriterInfo::reset_coherent_info()
00229 {
00230   this->coherent_samples_ = 0;
00231   this->group_coherent_ = false;
00232   this->publisher_id_ = GUID_UNKNOWN;
00233   this->coherent_sample_sequence_.reset();
00234   this->writer_coherent_samples_.reset();
00235   this->group_coherent_samples_.clear();
00236 }
00237 
00238 
00239 void
00240 WriterInfo::set_group_info(const CoherentChangeControl& info)
00241 {
00242   if (!(this->publisher_id_ == info.publisher_id_)
00243       || this->group_coherent_ != info.group_coherent_) {
00244     GuidConverter sub_id(this->reader_->subscription_id_);
00245     GuidConverter pub_id(this->writer_id_);
00246     ACE_ERROR((LM_ERROR,
00247                ACE_TEXT("(%P|%t) ERROR: WriterInfo::set_group_info()")
00248                ACE_TEXT(" reader %C writer %C incorrect coherent info !\n"),
00249                OPENDDS_STRING(sub_id).c_str(),
00250                OPENDDS_STRING(pub_id).c_str()));
00251   }
00252 
00253   this->writer_coherent_samples_ = info.coherent_samples_;
00254   this->group_coherent_samples_ = info.group_coherent_samples_;
00255 }
00256 
00257 #endif  // OPENDDS_NO_OBJECT_MODEL_PROFILE
00258 
00259 }
00260 }
00261 
00262 OPENDDS_END_VERSIONED_NAMESPACE_DECL
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1