Keeps track of a DataWriter's liveliness for a DataReader. More...
#include <WriterInfo.h>
Keeps track of a DataWriter's liveliness for a DataReader.
Definition at line 73 of file WriterInfo.h.
Definition at line 77 of file WriterInfo.h.
OpenDDS::DCPS::WriterInfo::WriterInfo | ( | WriterInfoListener * | reader, | |
const PublicationId & | writer_id, | |||
const DDS::DataWriterQos & | writer_qos | |||
) |
bool OpenDDS::DCPS::WriterInfo::active | ( | void | ) | const |
Checks to see if writer has registered activity in either liveliness_lease_duration or DCPSPendingTimeout duration to allow it to finish before reader removes it
Definition at line 182 of file WriterInfo.cpp.
References activity_wait_period(), ACE_OS::gettimeofday(), last_liveliness_activity_time_, and ACE_Time_Value::zero.
00183 { 00184 // Need some period of time by which to decide if a writer the 00185 // DataReaderImpl knows about has gone 'inactive'. Used to determine 00186 // if a remove_associations should remove immediately or wait to let 00187 // reader process more information that may have queued up from the writer 00188 // Over-arching max wait time for removal is controlled in the 00189 // RemoveAssociationSweeper (10 seconds), but on a per writer basis set 00190 // activity_wait_period based on: 00191 // 1) Reader's liveliness_lease_duration 00192 // 2) DCPSPendingTimeout value (if not zero) 00193 // 3) Writer's max blocking time (could be infinite, in which case 00194 // RemoveAssociationSweeper will remove after its max wait) 00195 // 4) Zero - don't wait, simply remove association 00196 ACE_Time_Value activity_wait_period = this->activity_wait_period(); 00197 00198 if (activity_wait_period == ACE_Time_Value::zero) { 00199 return false; 00200 } 00201 return (ACE_OS::gettimeofday() - last_liveliness_activity_time_) <= activity_wait_period; 00202 }
ACE_Time_Value OpenDDS::DCPS::WriterInfo::activity_wait_period | ( | ) | const |
Definition at line 167 of file WriterInfo.cpp.
References OpenDDS::DCPS::duration_to_time_value(), OpenDDS::DCPS::WriterInfoListener::liveliness_lease_duration_, reader_, DDS::DataWriterQos::reliability, TheServiceParticipant, writer_qos_, and ACE_Time_Value::zero.
Referenced by active().
00168 { 00169 ACE_Time_Value activity_wait_period(TheServiceParticipant->pending_timeout()); 00170 if (reader_->liveliness_lease_duration_ != ACE_Time_Value::zero) { 00171 activity_wait_period = reader_->liveliness_lease_duration_; 00172 } 00173 if (activity_wait_period == ACE_Time_Value::zero) { 00174 activity_wait_period = duration_to_time_value(writer_qos_.reliability.max_blocking_time); 00175 } 00176 00177 return activity_wait_period; 00178 }
ACE_Time_Value OpenDDS::DCPS::WriterInfo::check_activity | ( | const ACE_Time_Value & | now | ) |
check to see if this writer is alive (called by handle_timeout).
now | next time this DataWriter will become not active (not alive) if no sample or liveliness message is received. |
Definition at line 141 of file WriterInfo.cpp.
References ALIVE, last_liveliness_activity_time_, OpenDDS::DCPS::WriterInfoListener::liveliness_lease_duration_, ACE_Time_Value::max_time, reader_, state_, OpenDDS::DCPS::WriterInfoListener::writer_became_dead(), and ACE_Time_Value::zero.
00142 { 00143 ACE_Time_Value expires_at = ACE_Time_Value::max_time; 00144 00145 // We only need check the liveliness with the non-zero liveliness_lease_duration_. 00146 // if (state_ != DEAD && reader_->liveliness_lease_duration_ != ACE_Time_Value::zero) 00147 if (state_ == ALIVE && reader_->liveliness_lease_duration_ != ACE_Time_Value::zero) { 00148 expires_at = this->last_liveliness_activity_time_ + 00149 reader_->liveliness_lease_duration_; 00150 00151 if (expires_at <= now) { 00152 // let all instances know this write is not alive. 00153 reader_->writer_became_dead(*this, now); 00154 expires_at = ACE_Time_Value::max_time; 00155 } 00156 } 00157 00158 return expires_at; 00159 }
void OpenDDS::DCPS::WriterInfo::clear_owner_evaluated | ( | ) |
Definition at line 114 of file WriterInfo.cpp.
References owner_evaluated_.
Referenced by OpenDDS::DCPS::DataReaderImpl::writer_became_dead(), and OpenDDS::DCPS::DataReaderImpl::writer_removed().
00115 { 00116 this->owner_evaluated_.clear (); 00117 }
Coherent_State OpenDDS::DCPS::WriterInfo::coherent_change_received | ( | ) |
Definition at line 207 of file WriterInfo.cpp.
References coherent_sample_sequence_, OpenDDS::DCPS::COMPLETED, OpenDDS::DCPS::DisjointSequence::disjoint(), OpenDDS::DCPS::DisjointSequence::high(), OpenDDS::DCPS::WriterCoherentSample::last_sample_, OpenDDS::DCPS::NOT_COMPLETED_YET, OpenDDS::DCPS::WriterCoherentSample::num_samples_, OpenDDS::DCPS::REJECTED, and writer_coherent_samples_.
Referenced by OpenDDS::DCPS::DataReaderImpl::verify_coherent_changes_completion().
00208 { 00209 if (this->writer_coherent_samples_.num_samples_ == 0) { 00210 return NOT_COMPLETED_YET; 00211 } 00212 00213 if (!this->coherent_sample_sequence_.disjoint() 00214 && (this->coherent_sample_sequence_.high() 00215 == this->writer_coherent_samples_.last_sample_)) { 00216 return COMPLETED; 00217 } 00218 00219 if (this->coherent_sample_sequence_.high() > 00220 this->writer_coherent_samples_.last_sample_) { 00221 return REJECTED; 00222 } 00223 00224 return NOT_COMPLETED_YET; 00225 }
WriterState OpenDDS::DCPS::WriterInfo::get_state | ( | ) | [inline] |
returns 1 if the DataWriter is lively; 2 if dead; otherwise returns 0.
Definition at line 95 of file WriterInfo.h.
00095 { 00096 return state_; 00097 };
OPENDDS_STRING OpenDDS::DCPS::WriterInfo::get_state_str | ( | ) | const |
Definition at line 92 of file WriterInfo.cpp.
References ALIVE, DEAD, NOT_SET, OPENDDS_STRING, state_, and OpenDDS::DCPS::to_dds_string().
Referenced by OpenDDS::DCPS::DataReaderImpl::writer_became_alive(), and OpenDDS::DCPS::DataReaderImpl::writer_became_dead().
00093 { 00094 OPENDDS_STRING ret; 00095 switch (this->state_) { 00096 case WriterInfo::NOT_SET: 00097 ret += "NOT_SET"; 00098 break; 00099 case WriterInfo::ALIVE: 00100 ret += "ALIVE"; 00101 break; 00102 case WriterInfo::DEAD: 00103 ret += "DEAD"; 00104 break; 00105 default: 00106 ret += "UNSPECIFIED("; 00107 ret += to_dds_string(int(this->state_)); 00108 ret += ")"; 00109 } 00110 return ret.c_str(); 00111 }
bool OpenDDS::DCPS::WriterInfo::is_owner_evaluated | ( | DDS::InstanceHandle_t | instance | ) |
typedef OpenDDS::DCPS::WriterInfo::OPENDDS_MAP | ( | DDS::InstanceHandle_t | , | |
bool | ||||
) |
Is this writer evaluated for owner ?
OpenDDS::DCPS::WriterInfo::OPENDDS_MAP | ( | SequenceNumber | , | |
ReceivedDataSample | ||||
) |
Temporary holding place for samples received before the END_HISTORIC_SAMPLES control message.
int OpenDDS::DCPS::WriterInfo::received_activity | ( | const ACE_Time_Value & | when | ) | [inline] |
called when a sample or other activity is received from this writer.
Definition at line 178 of file WriterInfo.h.
References ALIVE, last_liveliness_activity_time_, reader_, state_, and OpenDDS::DCPS::WriterInfoListener::writer_became_alive().
00179 { 00180 last_liveliness_activity_time_ = when; 00181 00182 if (state_ != ALIVE) { // NOT_SET || DEAD 00183 reader_->writer_became_alive(*this, when); 00184 return 0; 00185 } 00186 00187 //TBD - is the "was alive" return value used? 00188 return 1; 00189 }
void OpenDDS::DCPS::WriterInfo::removed | ( | ) |
update liveliness when remove_association is called.
Definition at line 162 of file WriterInfo.cpp.
References reader_, and OpenDDS::DCPS::WriterInfoListener::writer_removed().
00163 { 00164 reader_->writer_removed(*this); 00165 }
void OpenDDS::DCPS::WriterInfo::reset_coherent_info | ( | ) |
Definition at line 228 of file WriterInfo.cpp.
References coherent_sample_sequence_, coherent_samples_, group_coherent_, group_coherent_samples_, OpenDDS::DCPS::GUID_UNKNOWN, publisher_id_, OpenDDS::DCPS::WriterCoherentSample::reset(), OpenDDS::DCPS::DisjointSequence::reset(), and writer_coherent_samples_.
Referenced by OpenDDS::DCPS::DataReaderImpl::verify_coherent_changes_completion().
00229 { 00230 this->coherent_samples_ = 0; 00231 this->group_coherent_ = false; 00232 this->publisher_id_ = GUID_UNKNOWN; 00233 this->coherent_sample_sequence_.reset(); 00234 this->writer_coherent_samples_.reset(); 00235 this->group_coherent_samples_.clear(); 00236 }
void OpenDDS::DCPS::WriterInfo::set_group_info | ( | const CoherentChangeControl & | info | ) |
Definition at line 240 of file WriterInfo.cpp.
References ACE_TEXT(), OpenDDS::DCPS::CoherentChangeControl::coherent_samples_, OpenDDS::DCPS::CoherentChangeControl::group_coherent_, OpenDDS::DCPS::CoherentChangeControl::group_coherent_samples_, group_coherent_samples_, LM_ERROR, OPENDDS_STRING, OpenDDS::DCPS::CoherentChangeControl::publisher_id_, publisher_id_, reader_, OpenDDS::DCPS::WriterInfoListener::subscription_id_, writer_coherent_samples_, and writer_id_.
00241 { 00242 if (!(this->publisher_id_ == info.publisher_id_) 00243 || this->group_coherent_ != info.group_coherent_) { 00244 GuidConverter sub_id(this->reader_->subscription_id_); 00245 GuidConverter pub_id(this->writer_id_); 00246 ACE_ERROR((LM_ERROR, 00247 ACE_TEXT("(%P|%t) ERROR: WriterInfo::set_group_info()") 00248 ACE_TEXT(" reader %C writer %C incorrect coherent info !\n"), 00249 OPENDDS_STRING(sub_id).c_str(), 00250 OPENDDS_STRING(pub_id).c_str())); 00251 } 00252 00253 this->writer_coherent_samples_ = info.coherent_samples_; 00254 this->group_coherent_samples_ = info.group_coherent_samples_; 00255 }
void OpenDDS::DCPS::WriterInfo::set_owner_evaluated | ( | DDS::InstanceHandle_t | instance, | |
bool | flag | |||
) |
friend class WriteInfoListner [friend] |
Definition at line 74 of file WriterInfo.h.
Definition at line 169 of file WriterInfo.h.
Referenced by coherent_change_received(), and reset_coherent_info().
Number of received coherent changes in active change set.
Definition at line 159 of file WriterInfo.h.
Referenced by reset_coherent_info().
Data to support GROUP access scope.
Definition at line 167 of file WriterInfo.h.
Referenced by reset_coherent_info(), and OpenDDS::DCPS::DataReaderImpl::verify_coherent_changes_completion().
GroupCoherentSamples OpenDDS::DCPS::WriterInfo::group_coherent_samples_ |
Definition at line 171 of file WriterInfo.h.
Referenced by reset_coherent_info(), and set_group_info().
The publication entity instance handle.
Definition at line 156 of file WriterInfo.h.
Referenced by OpenDDS::DCPS::DataReaderImpl::writer_became_alive(), OpenDDS::DCPS::DataReaderImpl::writer_became_dead(), and OpenDDS::DCPS::DataReaderImpl::writer_removed().
Definition at line 127 of file WriterInfo.h.
After receiving END_HISTORIC_SAMPLES, check for duplicates.
Definition at line 136 of file WriterInfo.h.
Referenced by OpenDDS::DCPS::DataReaderImpl::resume_sample_processing().
Timestamp of last write/dispose/assert_liveliness from this DataWriter.
Definition at line 124 of file WriterInfo.h.
Referenced by active(), check_activity(), and received_activity().
Definition at line 141 of file WriterInfo.h.
Referenced by OpenDDS::DCPS::RecorderImpl::remove_publication().
OwnerEvaluateFlags OpenDDS::DCPS::WriterInfo::owner_evaluated_ |
Definition at line 163 of file WriterInfo.h.
Referenced by clear_owner_evaluated().
Definition at line 168 of file WriterInfo.h.
Referenced by reset_coherent_info(), set_group_info(), and OpenDDS::DCPS::DataReaderImpl::verify_coherent_changes_completion().
The DataReader owning this WriterInfo.
Definition at line 147 of file WriterInfo.h.
Referenced by activity_wait_period(), check_activity(), received_activity(), removed(), and set_group_info().
Definition at line 129 of file WriterInfo.h.
Definition at line 128 of file WriterInfo.h.
Referenced by OpenDDS::DCPS::RemoveAssociationSweeper< T >::handle_timeout().
Definition at line 140 of file WriterInfo.h.
State of the writer.
Definition at line 144 of file WriterInfo.h.
Referenced by check_activity(), get_state_str(), received_activity(), OpenDDS::DCPS::DataReaderImpl::writer_became_alive(), OpenDDS::DCPS::DataReaderImpl::writer_became_dead(), and OpenDDS::DCPS::DataReaderImpl::writer_removed().
Definition at line 138 of file WriterInfo.h.
Referenced by OpenDDS::DCPS::DataReaderImpl::resume_sample_processing().
Definition at line 170 of file WriterInfo.h.
Referenced by coherent_change_received(), reset_coherent_info(), and set_group_info().
DCPSInfoRepo ID of the DataWriter.
Definition at line 150 of file WriterInfo.h.
Referenced by OpenDDS::DCPS::RemoveAssociationSweeper< T >::handle_timeout(), OpenDDS::DCPS::EndHistoricSamplesMissedSweeper::handle_timeout(), OpenDDS::DCPS::DataReaderImpl::instances_liveliness_update(), set_group_info(), OpenDDS::DCPS::DataReaderImpl::verify_coherent_changes_completion(), OpenDDS::DCPS::DataReaderImpl::writer_became_alive(), OpenDDS::DCPS::DataReaderImpl::writer_became_dead(), and OpenDDS::DCPS::DataReaderImpl::writer_removed().