00001
00002
00003
00004
00005
00006
00007 #include "DCPS/DdsDcps_pch.h"
00008 #include "dcps_export.h"
00009
00010 #include "GuidConverter.h"
00011 #include "WriterInfo.h"
00012 #include "Qos_Helper.h"
00013
00014 #include "ace/OS_NS_sys_time.h"
00015
00016
00017 namespace OpenDDS {
00018 namespace DCPS {
00019
00020 WriterInfoListener::WriterInfoListener()
00021 : subscription_id_(GUID_UNKNOWN),
00022 liveliness_lease_duration_(ACE_Time_Value::zero)
00023 {
00024 }
00025
00026 WriterInfoListener::~WriterInfoListener()
00027 {
00028 }
00029
00030
00031
00032
00033 void
00034 WriterInfoListener::writer_became_alive(WriterInfo&,
00035 const ACE_Time_Value& )
00036 {
00037 }
00038
00039
00040
00041
00042 void
00043 WriterInfoListener::writer_became_dead(WriterInfo&,
00044 const ACE_Time_Value& )
00045 {
00046 }
00047
00048
00049
00050 void
00051 WriterInfoListener::writer_removed(WriterInfo& )
00052 {
00053 }
00054
00055
00056
00057 WriterInfo::WriterInfo(WriterInfoListener* reader,
00058 const PublicationId& writer_id,
00059 const ::DDS::DataWriterQos& writer_qos)
00060 : last_liveliness_activity_time_(ACE_OS::gettimeofday()),
00061 seen_data_(false),
00062 historic_samples_timer_(NO_TIMER),
00063 remove_association_timer_(NO_TIMER),
00064 removal_deadline_(ACE_Time_Value::zero),
00065 last_historic_seq_(SequenceNumber::SEQUENCENUMBER_UNKNOWN()),
00066 waiting_for_end_historic_samples_(false),
00067 scheduled_for_removal_(false),
00068 notify_lost_(false),
00069 state_(NOT_SET),
00070 reader_(reader),
00071 writer_id_(writer_id),
00072 writer_qos_(writer_qos),
00073 handle_(DDS::HANDLE_NIL)
00074 {
00075 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00076 this->reset_coherent_info();
00077 #endif
00078
00079 if (DCPS_debug_level >= 5) {
00080 GuidConverter writer_converter(writer_id);
00081 GuidConverter reader_converter(reader->subscription_id_);
00082 ACE_DEBUG((LM_DEBUG,
00083 ACE_TEXT("(%P|%t) WriterInfo::WriterInfo: ")
00084 ACE_TEXT("writer %C added to reader %C.\n"),
00085 OPENDDS_STRING(writer_converter).c_str(),
00086 OPENDDS_STRING(reader_converter).c_str()));
00087 }
00088 }
00089
00090 OPENDDS_STRING
00091 WriterInfo::get_state_str() const
00092 {
00093 OPENDDS_STRING ret;
00094 switch (this->state_) {
00095 case WriterInfo::NOT_SET:
00096 ret += "NOT_SET";
00097 break;
00098 case WriterInfo::ALIVE:
00099 ret += "ALIVE";
00100 break;
00101 case WriterInfo::DEAD:
00102 ret += "DEAD";
00103 break;
00104 default:
00105 ret += "UNSPECIFIED(";
00106 ret += to_dds_string(int(this->state_));
00107 ret += ")";
00108 }
00109 return ret.c_str();
00110 }
00111
00112 void
00113 WriterInfo::clear_owner_evaluated ()
00114 {
00115 this->owner_evaluated_.clear ();
00116 }
00117
00118 void
00119 WriterInfo::set_owner_evaluated (::DDS::InstanceHandle_t instance, bool flag)
00120 {
00121 if (flag ||
00122 (!flag && owner_evaluated_.find (instance) != owner_evaluated_.end ())) {
00123 this->owner_evaluated_ [instance] = flag;
00124 }
00125 }
00126
00127 bool
00128 WriterInfo::is_owner_evaluated (::DDS::InstanceHandle_t instance)
00129 {
00130 OwnerEvaluateFlags::iterator iter = owner_evaluated_.find (instance);
00131 if (iter == owner_evaluated_.end ()) {
00132 this->owner_evaluated_.insert (OwnerEvaluateFlags::value_type (instance, false));
00133 return false;
00134 }
00135 else
00136 return iter->second;
00137 }
00138
00139 ACE_Time_Value
00140 WriterInfo::check_activity(const ACE_Time_Value& now)
00141 {
00142 ACE_Time_Value expires_at = ACE_Time_Value::max_time;
00143
00144
00145
00146 if (state_ == ALIVE && reader_->liveliness_lease_duration_ != ACE_Time_Value::zero) {
00147 expires_at = this->last_liveliness_activity_time_ +
00148 reader_->liveliness_lease_duration_;
00149
00150 if (expires_at <= now) {
00151
00152 reader_->writer_became_dead(*this, now);
00153 expires_at = ACE_Time_Value::max_time;
00154 }
00155 }
00156
00157 return expires_at;
00158 }
00159
00160 void
00161 WriterInfo::removed()
00162 {
00163 reader_->writer_removed(*this);
00164 }
00165
00166 void
00167 WriterInfo::ack_sequence(SequenceNumber value)
00168 {
00169
00170 this->ack_sequence_.insert(value);
00171 }
00172
00173 SequenceNumber
00174 WriterInfo::ack_sequence() const
00175 {
00176
00177 return this->ack_sequence_.cumulative_ack();
00178 }
00179
00180 bool
00181 WriterInfo::active(ACE_Time_Value default_participant_timeout) const
00182 {
00183
00184
00185
00186
00187
00188
00189
00190
00191
00192
00193
00194
00195 ACE_Time_Value activity_wait_period(default_participant_timeout);
00196 if (reader_->liveliness_lease_duration_ != ACE_Time_Value::zero) {
00197 activity_wait_period = reader_->liveliness_lease_duration_;
00198 }
00199 if (activity_wait_period == ACE_Time_Value::zero) {
00200 activity_wait_period = duration_to_time_value(writer_qos_.reliability.max_blocking_time);
00201 }
00202 if (activity_wait_period == ACE_Time_Value::zero) {
00203 return false;
00204 }
00205 return (ACE_OS::gettimeofday() - last_liveliness_activity_time_) <= activity_wait_period;
00206 }
00207
00208
00209 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00210 Coherent_State
00211 WriterInfo::coherent_change_received()
00212 {
00213 if (this->writer_coherent_samples_.num_samples_ == 0) {
00214 return NOT_COMPLETED_YET;
00215 }
00216
00217 if (!this->coherent_sample_sequence_.disjoint()
00218 && (this->coherent_sample_sequence_.high()
00219 == this->writer_coherent_samples_.last_sample_)) {
00220 return COMPLETED;
00221 }
00222
00223 if (this->coherent_sample_sequence_.high() >
00224 this->writer_coherent_samples_.last_sample_) {
00225 return REJECTED;
00226 }
00227
00228 return NOT_COMPLETED_YET;
00229 }
00230
00231 void
00232 WriterInfo::reset_coherent_info()
00233 {
00234 this->coherent_samples_ = 0;
00235 this->group_coherent_ = false;
00236 this->publisher_id_ = GUID_UNKNOWN;
00237 this->coherent_sample_sequence_.reset();
00238 this->writer_coherent_samples_.reset();
00239 this->group_coherent_samples_.clear();
00240 }
00241
00242
00243 void
00244 WriterInfo::set_group_info(const CoherentChangeControl& info)
00245 {
00246 if (!(this->publisher_id_ == info.publisher_id_)
00247 || this->group_coherent_ != info.group_coherent_) {
00248 GuidConverter sub_id(this->reader_->subscription_id_);
00249 GuidConverter pub_id(this->writer_id_);
00250 ACE_ERROR((LM_ERROR,
00251 ACE_TEXT("(%P|%t) ERROR: WriterInfo::set_group_info()")
00252 ACE_TEXT(" reader %C writer %C incorrect coherent info !\n"),
00253 OPENDDS_STRING(sub_id).c_str(),
00254 OPENDDS_STRING(pub_id).c_str()));
00255 }
00256
00257 this->writer_coherent_samples_ = info.coherent_samples_;
00258 this->group_coherent_samples_ = info.group_coherent_samples_;
00259 }
00260
00261 #endif // OPENDDS_NO_OBJECT_MODEL_PROFILE
00262
00263 }
00264 }