WriterInfo.cpp
Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007 #include "DCPS/DdsDcps_pch.h"
00008 #include "dcps_export.h"
00009
00010 #include "GuidConverter.h"
00011 #include "WriterInfo.h"
00012 #include "Time_Helper.h"
00013 #include "Service_Participant.h"
00014
00015 #include "ace/OS_NS_sys_time.h"
00016
00017 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00018
00019 namespace OpenDDS {
00020 namespace DCPS {
00021
00022 WriterInfoListener::WriterInfoListener()
00023 : subscription_id_(GUID_UNKNOWN),
00024 liveliness_lease_duration_(ACE_Time_Value::zero)
00025 {
00026 }
00027
00028 WriterInfoListener::~WriterInfoListener()
00029 {
00030 }
00031
00032
00033
00034
00035 void
00036 WriterInfoListener::writer_became_alive(WriterInfo&,
00037 const ACE_Time_Value& )
00038 {
00039 }
00040
00041
00042
00043
00044 void
00045 WriterInfoListener::writer_became_dead(WriterInfo&,
00046 const ACE_Time_Value& )
00047 {
00048 }
00049
00050
00051
00052 void
00053 WriterInfoListener::writer_removed(WriterInfo& )
00054 {
00055 }
00056
00057
00058
00059 WriterInfo::WriterInfo(WriterInfoListener* reader,
00060 const PublicationId& writer_id,
00061 const ::DDS::DataWriterQos& writer_qos)
00062 : last_liveliness_activity_time_(ACE_OS::gettimeofday()),
00063 historic_samples_timer_(NO_TIMER),
00064 remove_association_timer_(NO_TIMER),
00065 removal_deadline_(ACE_Time_Value::zero),
00066 last_historic_seq_(SequenceNumber::SEQUENCENUMBER_UNKNOWN()),
00067 waiting_for_end_historic_samples_(false),
00068 scheduled_for_removal_(false),
00069 notify_lost_(false),
00070 state_(NOT_SET),
00071 reader_(reader),
00072 writer_id_(writer_id),
00073 writer_qos_(writer_qos),
00074 handle_(DDS::HANDLE_NIL)
00075 {
00076 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00077 this->reset_coherent_info();
00078 #endif
00079
00080 if (DCPS_debug_level >= 5) {
00081 GuidConverter writer_converter(writer_id);
00082 GuidConverter reader_converter(reader->subscription_id_);
00083 ACE_DEBUG((LM_DEBUG,
00084 ACE_TEXT("(%P|%t) WriterInfo::WriterInfo: ")
00085 ACE_TEXT("writer %C added to reader %C.\n"),
00086 OPENDDS_STRING(writer_converter).c_str(),
00087 OPENDDS_STRING(reader_converter).c_str()));
00088 }
00089 }
00090
00091 OPENDDS_STRING
00092 WriterInfo::get_state_str() const
00093 {
00094 OPENDDS_STRING ret;
00095 switch (this->state_) {
00096 case WriterInfo::NOT_SET:
00097 ret += "NOT_SET";
00098 break;
00099 case WriterInfo::ALIVE:
00100 ret += "ALIVE";
00101 break;
00102 case WriterInfo::DEAD:
00103 ret += "DEAD";
00104 break;
00105 default:
00106 ret += "UNSPECIFIED(";
00107 ret += to_dds_string(int(this->state_));
00108 ret += ")";
00109 }
00110 return ret.c_str();
00111 }
00112
00113 void
00114 WriterInfo::clear_owner_evaluated ()
00115 {
00116 this->owner_evaluated_.clear ();
00117 }
00118
00119 void
00120 WriterInfo::set_owner_evaluated (::DDS::InstanceHandle_t instance, bool flag)
00121 {
00122 if (flag ||
00123 (!flag && owner_evaluated_.find (instance) != owner_evaluated_.end ())) {
00124 this->owner_evaluated_ [instance] = flag;
00125 }
00126 }
00127
00128 bool
00129 WriterInfo::is_owner_evaluated (::DDS::InstanceHandle_t instance)
00130 {
00131 OwnerEvaluateFlags::iterator iter = owner_evaluated_.find (instance);
00132 if (iter == owner_evaluated_.end ()) {
00133 this->owner_evaluated_.insert (OwnerEvaluateFlags::value_type (instance, false));
00134 return false;
00135 }
00136 else
00137 return iter->second;
00138 }
00139
00140 ACE_Time_Value
00141 WriterInfo::check_activity(const ACE_Time_Value& now)
00142 {
00143 ACE_Time_Value expires_at = ACE_Time_Value::max_time;
00144
00145
00146
00147 if (state_ == ALIVE && reader_->liveliness_lease_duration_ != ACE_Time_Value::zero) {
00148 expires_at = this->last_liveliness_activity_time_ +
00149 reader_->liveliness_lease_duration_;
00150
00151 if (expires_at <= now) {
00152
00153 reader_->writer_became_dead(*this, now);
00154 expires_at = ACE_Time_Value::max_time;
00155 }
00156 }
00157
00158 return expires_at;
00159 }
00160
00161 void
00162 WriterInfo::removed()
00163 {
00164 reader_->writer_removed(*this);
00165 }
00166
00167 ACE_Time_Value WriterInfo::activity_wait_period() const
00168 {
00169 ACE_Time_Value activity_wait_period(TheServiceParticipant->pending_timeout());
00170 if (reader_->liveliness_lease_duration_ != ACE_Time_Value::zero) {
00171 activity_wait_period = reader_->liveliness_lease_duration_;
00172 }
00173 if (activity_wait_period == ACE_Time_Value::zero) {
00174 activity_wait_period = duration_to_time_value(writer_qos_.reliability.max_blocking_time);
00175 }
00176
00177 return activity_wait_period;
00178 }
00179
00180
00181 bool
00182 WriterInfo::active() const
00183 {
00184
00185
00186
00187
00188
00189
00190
00191
00192
00193
00194
00195
00196 ACE_Time_Value activity_wait_period = this->activity_wait_period();
00197
00198 if (activity_wait_period == ACE_Time_Value::zero) {
00199 return false;
00200 }
00201 return (ACE_OS::gettimeofday() - last_liveliness_activity_time_) <= activity_wait_period;
00202 }
00203
00204
00205 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00206 Coherent_State
00207 WriterInfo::coherent_change_received()
00208 {
00209 if (this->writer_coherent_samples_.num_samples_ == 0) {
00210 return NOT_COMPLETED_YET;
00211 }
00212
00213 if (!this->coherent_sample_sequence_.disjoint()
00214 && (this->coherent_sample_sequence_.high()
00215 == this->writer_coherent_samples_.last_sample_)) {
00216 return COMPLETED;
00217 }
00218
00219 if (this->coherent_sample_sequence_.high() >
00220 this->writer_coherent_samples_.last_sample_) {
00221 return REJECTED;
00222 }
00223
00224 return NOT_COMPLETED_YET;
00225 }
00226
00227 void
00228 WriterInfo::reset_coherent_info()
00229 {
00230 this->coherent_samples_ = 0;
00231 this->group_coherent_ = false;
00232 this->publisher_id_ = GUID_UNKNOWN;
00233 this->coherent_sample_sequence_.reset();
00234 this->writer_coherent_samples_.reset();
00235 this->group_coherent_samples_.clear();
00236 }
00237
00238
00239 void
00240 WriterInfo::set_group_info(const CoherentChangeControl& info)
00241 {
00242 if (!(this->publisher_id_ == info.publisher_id_)
00243 || this->group_coherent_ != info.group_coherent_) {
00244 GuidConverter sub_id(this->reader_->subscription_id_);
00245 GuidConverter pub_id(this->writer_id_);
00246 ACE_ERROR((LM_ERROR,
00247 ACE_TEXT("(%P|%t) ERROR: WriterInfo::set_group_info()")
00248 ACE_TEXT(" reader %C writer %C incorrect coherent info !\n"),
00249 OPENDDS_STRING(sub_id).c_str(),
00250 OPENDDS_STRING(pub_id).c_str()));
00251 }
00252
00253 this->writer_coherent_samples_ = info.coherent_samples_;
00254 this->group_coherent_samples_ = info.group_coherent_samples_;
00255 }
00256
00257 #endif // OPENDDS_NO_OBJECT_MODEL_PROFILE
00258
00259 }
00260 }
00261
00262 OPENDDS_END_VERSIONED_NAMESPACE_DECL