#include <WriterInfo.h>
Inheritance diagram for OpenDDS::DCPS::WriterInfo:
Public Types | |
typedef std::pair< SequenceNumber, ACE_Time_Value > | SeqDeadlinePair |
Times after which we no longer need to respond to a REQUEST_ACK message. | |
NOT_SET | |
ALIVE | |
DEAD | |
NO_TIMER = -1 | |
enum | WriterState { NOT_SET, ALIVE, DEAD } |
enum | HistoricSamplesState { NO_TIMER = -1 } |
Public Member Functions | |
WriterInfo (WriterInfoListener *reader, const PublicationId &writer_id, const ::DDS::DataWriterQos &writer_qos) | |
ACE_Time_Value | check_activity (const ACE_Time_Value &now) |
int | received_activity (const ACE_Time_Value &when) |
called when a sample or other activity is received from this writer. | |
WriterState | get_state () |
returns 1 if the DataWriter is lively; 2 if dead; otherwise returns 0. | |
OPENDDS_STRING | get_state_str () const |
void | removed () |
update liveliness when remove_association is called. | |
void | ack_sequence (SequenceNumber value) |
Update the last observed sequence number. | |
SequenceNumber | ack_sequence () const |
Return the most recently observed contiguous sequence number. | |
bool | active (ACE_Time_Value default_participant_timeout) const |
Coherent_State | coherent_change_received () |
void | reset_coherent_info () |
void | set_group_info (const CoherentChangeControl &info) |
void | clear_owner_evaluated () |
void | set_owner_evaluated (::DDS::InstanceHandle_t instance, bool flag) |
bool | is_owner_evaluated (::DDS::InstanceHandle_t instance) |
typedef | OPENDDS_LIST (SeqDeadlinePair) DeadlineList |
OPENDDS_MAP (SequenceNumber, ReceivedDataSample) historic_samples_ | |
typedef | OPENDDS_MAP (::DDS::InstanceHandle_t, bool) OwnerEvaluateFlags |
Is this writer evaluated for owner ? | |
Public Attributes | |
ACE_Time_Value | last_liveliness_activity_time_ |
Timestamp of last write/dispose/assert_liveliness from this DataWriter. | |
DeadlineList | ack_deadlines_ |
DisjointSequence | ack_sequence_ |
bool | seen_data_ |
long | historic_samples_timer_ |
long | remove_association_timer_ |
ACE_Time_Value | removal_deadline_ |
SequenceNumber | last_historic_seq_ |
After receiving END_HISTORIC_SAMPLES, check for duplicates. | |
bool | waiting_for_end_historic_samples_ |
bool | scheduled_for_removal_ |
bool | notify_lost_ |
WriterState | state_ |
State of the writer. | |
WriterInfoListener * | reader_ |
The DataReader owning this WriterInfo. | |
PublicationId | writer_id_ |
DCPSInfoRepo ID of the DataWriter. | |
::DDS::DataWriterQos | writer_qos_ |
Writer qos. | |
::DDS::InstanceHandle_t | handle_ |
The publication entity instance handle. | |
ACE_Atomic_Op< ACE_Thread_Mutex, ACE_UINT32 > | coherent_samples_ |
Number of received coherent changes in active change set. | |
OwnerEvaluateFlags | owner_evaluated_ |
bool | group_coherent_ |
Data to support GROUP access scope. | |
RepoId | publisher_id_ |
DisjointSequence | coherent_sample_sequence_ |
WriterCoherentSample | writer_coherent_samples_ |
GroupCoherentSamples | group_coherent_samples_ |
Friends | |
class | WriteInfoListner |
Definition at line 71 of file WriterInfo.h.
typedef std::pair<SequenceNumber, ACE_Time_Value> OpenDDS::DCPS::WriterInfo::SeqDeadlinePair |
Times after which we no longer need to respond to a REQUEST_ACK message.
Definition at line 129 of file WriterInfo.h.
OpenDDS::DCPS::WriterInfo::WriterInfo | ( | WriterInfoListener * | reader, | |
const PublicationId & | writer_id, | |||
const ::DDS::DataWriterQos & | writer_qos | |||
) |
Definition at line 57 of file WriterInfo.cpp.
References OpenDDS::DCPS::DCPS_debug_level, OPENDDS_STRING, reset_coherent_info(), and OpenDDS::DCPS::WriterInfoListener::subscription_id_.
00060 : last_liveliness_activity_time_(ACE_OS::gettimeofday()), 00061 seen_data_(false), 00062 historic_samples_timer_(NO_TIMER), 00063 remove_association_timer_(NO_TIMER), 00064 removal_deadline_(ACE_Time_Value::zero), 00065 last_historic_seq_(SequenceNumber::SEQUENCENUMBER_UNKNOWN()), 00066 waiting_for_end_historic_samples_(false), 00067 scheduled_for_removal_(false), 00068 notify_lost_(false), 00069 state_(NOT_SET), 00070 reader_(reader), 00071 writer_id_(writer_id), 00072 writer_qos_(writer_qos), 00073 handle_(DDS::HANDLE_NIL) 00074 { 00075 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 00076 this->reset_coherent_info(); 00077 #endif 00078 00079 if (DCPS_debug_level >= 5) { 00080 GuidConverter writer_converter(writer_id); 00081 GuidConverter reader_converter(reader->subscription_id_); 00082 ACE_DEBUG((LM_DEBUG, 00083 ACE_TEXT("(%P|%t) WriterInfo::WriterInfo: ") 00084 ACE_TEXT("writer %C added to reader %C.\n"), 00085 OPENDDS_STRING(writer_converter).c_str(), 00086 OPENDDS_STRING(reader_converter).c_str())); 00087 } 00088 }
SequenceNumber OpenDDS::DCPS::WriterInfo::ack_sequence | ( | ) | const |
Return the most recently observed contiguous sequence number.
Definition at line 174 of file WriterInfo.cpp.
References ack_sequence_, and OpenDDS::DCPS::DisjointSequence::cumulative_ack().
00175 { 00176 // sample_lock_ is held by the caller. 00177 return this->ack_sequence_.cumulative_ack(); 00178 }
void OpenDDS::DCPS::WriterInfo::ack_sequence | ( | SequenceNumber | value | ) |
Update the last observed sequence number.
Definition at line 167 of file WriterInfo.cpp.
References ack_sequence_, and OpenDDS::DCPS::DisjointSequence::insert().
00168 { 00169 // sample_lock_ is held by the caller. 00170 this->ack_sequence_.insert(value); 00171 }
bool OpenDDS::DCPS::WriterInfo::active | ( | ACE_Time_Value | default_participant_timeout | ) | const |
Checks to see if writer has registered activity in either liveliness_lease_duration or DCPSPendingTimeout duration to allow it to finish before reader removes it
Definition at line 181 of file WriterInfo.cpp.
References OpenDDS::DCPS::duration_to_time_value(), last_liveliness_activity_time_, OpenDDS::DCPS::WriterInfoListener::liveliness_lease_duration_, reader_, DDS::DataWriterQos::reliability, and writer_qos_.
00182 { 00183 // Need some period of time by which to decide if a writer the 00184 // DataReaderImpl knows about has gone 'inactive'. Used to determine 00185 // if a remove_associations should remove immediately or wait to let 00186 // reader process more information that may have queued up from the writer 00187 // Over-arching max wait time for removal is controlled in the 00188 // RemoveAssociationSweeper (10 seconds), but on a per writer basis set 00189 // activity_wait_period based on: 00190 // 1) Reader's liveliness_lease_duration 00191 // 2) DCPSPendingTimeout value (if not zero) 00192 // 3) Writer's max blocking time (could be infinite, in which case 00193 // RemoveAssociationSweeper will remove after its max wait) 00194 // 4) Zero - don't wait, simply remove association 00195 ACE_Time_Value activity_wait_period(default_participant_timeout); 00196 if (reader_->liveliness_lease_duration_ != ACE_Time_Value::zero) { 00197 activity_wait_period = reader_->liveliness_lease_duration_; 00198 } 00199 if (activity_wait_period == ACE_Time_Value::zero) { 00200 activity_wait_period = duration_to_time_value(writer_qos_.reliability.max_blocking_time); 00201 } 00202 if (activity_wait_period == ACE_Time_Value::zero) { 00203 return false; 00204 } 00205 return (ACE_OS::gettimeofday() - last_liveliness_activity_time_) <= activity_wait_period; 00206 }
ACE_Time_Value OpenDDS::DCPS::WriterInfo::check_activity | ( | const ACE_Time_Value & | now | ) |
check to see if this writer is alive (called by handle_timeout).
now | next time this DataWriter will become not active (not alive) if no sample or liveliness message is received. |
Definition at line 140 of file WriterInfo.cpp.
References ALIVE, last_liveliness_activity_time_, OpenDDS::DCPS::WriterInfoListener::liveliness_lease_duration_, reader_, state_, and OpenDDS::DCPS::WriterInfoListener::writer_became_dead().
00141 { 00142 ACE_Time_Value expires_at = ACE_Time_Value::max_time; 00143 00144 // We only need check the liveliness with the non-zero liveliness_lease_duration_. 00145 // if (state_ != DEAD && reader_->liveliness_lease_duration_ != ACE_Time_Value::zero) 00146 if (state_ == ALIVE && reader_->liveliness_lease_duration_ != ACE_Time_Value::zero) { 00147 expires_at = this->last_liveliness_activity_time_ + 00148 reader_->liveliness_lease_duration_; 00149 00150 if (expires_at <= now) { 00151 // let all instances know this write is not alive. 00152 reader_->writer_became_dead(*this, now); 00153 expires_at = ACE_Time_Value::max_time; 00154 } 00155 } 00156 00157 return expires_at; 00158 }
void OpenDDS::DCPS::WriterInfo::clear_owner_evaluated | ( | ) |
Definition at line 113 of file WriterInfo.cpp.
References owner_evaluated_.
Referenced by OpenDDS::DCPS::DataReaderImpl::writer_became_dead(), and OpenDDS::DCPS::DataReaderImpl::writer_removed().
00114 { 00115 this->owner_evaluated_.clear (); 00116 }
Coherent_State OpenDDS::DCPS::WriterInfo::coherent_change_received | ( | ) |
Definition at line 211 of file WriterInfo.cpp.
References coherent_sample_sequence_, OpenDDS::DCPS::COMPLETED, OpenDDS::DCPS::DisjointSequence::high(), OpenDDS::DCPS::WriterCoherentSample::last_sample_, OpenDDS::DCPS::NOT_COMPLETED_YET, OpenDDS::DCPS::REJECTED, and writer_coherent_samples_.
Referenced by OpenDDS::DCPS::DataReaderImpl::verify_coherent_changes_completion().
00212 { 00213 if (this->writer_coherent_samples_.num_samples_ == 0) { 00214 return NOT_COMPLETED_YET; 00215 } 00216 00217 if (!this->coherent_sample_sequence_.disjoint() 00218 && (this->coherent_sample_sequence_.high() 00219 == this->writer_coherent_samples_.last_sample_)) { 00220 return COMPLETED; 00221 } 00222 00223 if (this->coherent_sample_sequence_.high() > 00224 this->writer_coherent_samples_.last_sample_) { 00225 return REJECTED; 00226 } 00227 00228 return NOT_COMPLETED_YET; 00229 }
WriterState OpenDDS::DCPS::WriterInfo::get_state | ( | ) | [inline] |
returns 1 if the DataWriter is lively; 2 if dead; otherwise returns 0.
Definition at line 93 of file WriterInfo.h.
00093 { 00094 return state_; 00095 };
OPENDDS_STRING OpenDDS::DCPS::WriterInfo::get_state_str | ( | ) | const |
Definition at line 91 of file WriterInfo.cpp.
References ALIVE, DEAD, NOT_SET, OPENDDS_STRING, and OpenDDS::DCPS::to_dds_string().
Referenced by OpenDDS::DCPS::DataReaderImpl::writer_became_alive(), and OpenDDS::DCPS::DataReaderImpl::writer_became_dead().
00092 { 00093 OPENDDS_STRING ret; 00094 switch (this->state_) { 00095 case WriterInfo::NOT_SET: 00096 ret += "NOT_SET"; 00097 break; 00098 case WriterInfo::ALIVE: 00099 ret += "ALIVE"; 00100 break; 00101 case WriterInfo::DEAD: 00102 ret += "DEAD"; 00103 break; 00104 default: 00105 ret += "UNSPECIFIED("; 00106 ret += to_dds_string(int(this->state_)); 00107 ret += ")"; 00108 } 00109 return ret.c_str(); 00110 }
bool OpenDDS::DCPS::WriterInfo::is_owner_evaluated | ( | ::DDS::InstanceHandle_t | instance | ) |
Definition at line 128 of file WriterInfo.cpp.
References owner_evaluated_.
00129 { 00130 OwnerEvaluateFlags::iterator iter = owner_evaluated_.find (instance); 00131 if (iter == owner_evaluated_.end ()) { 00132 this->owner_evaluated_.insert (OwnerEvaluateFlags::value_type (instance, false)); 00133 return false; 00134 } 00135 else 00136 return iter->second; 00137 }
typedef OpenDDS::DCPS::WriterInfo::OPENDDS_LIST | ( | SeqDeadlinePair | ) |
typedef OpenDDS::DCPS::WriterInfo::OPENDDS_MAP | ( | ::DDS::InstanceHandle_t | , | |
bool | ||||
) |
Is this writer evaluated for owner ?
OpenDDS::DCPS::WriterInfo::OPENDDS_MAP | ( | SequenceNumber | , | |
ReceivedDataSample | ||||
) |
Temporary holding place for samples received before the END_HISTORIC_SAMPLES control message.
int OpenDDS::DCPS::WriterInfo::received_activity | ( | const ACE_Time_Value & | when | ) | [inline] |
called when a sample or other activity is received from this writer.
Definition at line 189 of file WriterInfo.h.
References ALIVE, last_liveliness_activity_time_, reader_, state_, and OpenDDS::DCPS::WriterInfoListener::writer_became_alive().
00190 { 00191 last_liveliness_activity_time_ = when; 00192 00193 if (state_ != ALIVE) { // NOT_SET || DEAD 00194 reader_->writer_became_alive(*this, when); 00195 return 0; 00196 } 00197 00198 //TBD - is the "was alive" return value used? 00199 return 1; 00200 }
void OpenDDS::DCPS::WriterInfo::removed | ( | ) |
update liveliness when remove_association is called.
Definition at line 161 of file WriterInfo.cpp.
References reader_, and OpenDDS::DCPS::WriterInfoListener::writer_removed().
00162 { 00163 reader_->writer_removed(*this); 00164 }
void OpenDDS::DCPS::WriterInfo::reset_coherent_info | ( | ) |
Definition at line 232 of file WriterInfo.cpp.
References coherent_sample_sequence_, coherent_samples_, group_coherent_, group_coherent_samples_, OpenDDS::DCPS::GUID_UNKNOWN, publisher_id_, OpenDDS::DCPS::WriterCoherentSample::reset(), OpenDDS::DCPS::DisjointSequence::reset(), and writer_coherent_samples_.
Referenced by OpenDDS::DCPS::DataReaderImpl::verify_coherent_changes_completion(), and WriterInfo().
00233 { 00234 this->coherent_samples_ = 0; 00235 this->group_coherent_ = false; 00236 this->publisher_id_ = GUID_UNKNOWN; 00237 this->coherent_sample_sequence_.reset(); 00238 this->writer_coherent_samples_.reset(); 00239 this->group_coherent_samples_.clear(); 00240 }
void OpenDDS::DCPS::WriterInfo::set_group_info | ( | const CoherentChangeControl & | info | ) |
Definition at line 244 of file WriterInfo.cpp.
References OpenDDS::DCPS::CoherentChangeControl::coherent_samples_, OpenDDS::DCPS::CoherentChangeControl::group_coherent_, OpenDDS::DCPS::CoherentChangeControl::group_coherent_samples_, group_coherent_samples_, OPENDDS_STRING, OpenDDS::DCPS::CoherentChangeControl::publisher_id_, and writer_coherent_samples_.
00245 { 00246 if (!(this->publisher_id_ == info.publisher_id_) 00247 || this->group_coherent_ != info.group_coherent_) { 00248 GuidConverter sub_id(this->reader_->subscription_id_); 00249 GuidConverter pub_id(this->writer_id_); 00250 ACE_ERROR((LM_ERROR, 00251 ACE_TEXT("(%P|%t) ERROR: WriterInfo::set_group_info()") 00252 ACE_TEXT(" reader %C writer %C incorrect coherent info !\n"), 00253 OPENDDS_STRING(sub_id).c_str(), 00254 OPENDDS_STRING(pub_id).c_str())); 00255 } 00256 00257 this->writer_coherent_samples_ = info.coherent_samples_; 00258 this->group_coherent_samples_ = info.group_coherent_samples_; 00259 }
void OpenDDS::DCPS::WriterInfo::set_owner_evaluated | ( | ::DDS::InstanceHandle_t | instance, | |
bool | flag | |||
) |
Definition at line 119 of file WriterInfo.cpp.
References owner_evaluated_.
00120 { 00121 if (flag || 00122 (!flag && owner_evaluated_.find (instance) != owner_evaluated_.end ())) { 00123 this->owner_evaluated_ [instance] = flag; 00124 } 00125 }
friend class WriteInfoListner [friend] |
Definition at line 72 of file WriterInfo.h.
DeadlineList OpenDDS::DCPS::WriterInfo::ack_deadlines_ |
Definition at line 131 of file WriterInfo.h.
Definition at line 180 of file WriterInfo.h.
Referenced by coherent_change_received(), and reset_coherent_info().
ACE_Atomic_Op<ACE_Thread_Mutex, ACE_UINT32> OpenDDS::DCPS::WriterInfo::coherent_samples_ |
Number of received coherent changes in active change set.
Definition at line 170 of file WriterInfo.h.
Referenced by reset_coherent_info().
Data to support GROUP access scope.
Definition at line 178 of file WriterInfo.h.
Referenced by reset_coherent_info(), and OpenDDS::DCPS::DataReaderImpl::verify_coherent_changes_completion().
GroupCoherentSamples OpenDDS::DCPS::WriterInfo::group_coherent_samples_ |
Definition at line 182 of file WriterInfo.h.
Referenced by reset_coherent_info(), and set_group_info().
The publication entity instance handle.
Definition at line 167 of file WriterInfo.h.
Referenced by OpenDDS::DCPS::DataReaderImpl::writer_became_alive(), OpenDDS::DCPS::DataReaderImpl::writer_became_dead(), and OpenDDS::DCPS::DataReaderImpl::writer_removed().
Definition at line 138 of file WriterInfo.h.
After receiving END_HISTORIC_SAMPLES, check for duplicates.
Definition at line 147 of file WriterInfo.h.
ACE_Time_Value OpenDDS::DCPS::WriterInfo::last_liveliness_activity_time_ |
Timestamp of last write/dispose/assert_liveliness from this DataWriter.
Definition at line 126 of file WriterInfo.h.
Referenced by active(), check_activity(), and received_activity().
Definition at line 152 of file WriterInfo.h.
OwnerEvaluateFlags OpenDDS::DCPS::WriterInfo::owner_evaluated_ |
Definition at line 174 of file WriterInfo.h.
Referenced by clear_owner_evaluated(), is_owner_evaluated(), and set_owner_evaluated().
Definition at line 179 of file WriterInfo.h.
Referenced by reset_coherent_info(), and OpenDDS::DCPS::DataReaderImpl::verify_coherent_changes_completion().
The DataReader owning this WriterInfo.
Definition at line 158 of file WriterInfo.h.
Referenced by active(), check_activity(), received_activity(), and removed().
ACE_Time_Value OpenDDS::DCPS::WriterInfo::removal_deadline_ |
Definition at line 140 of file WriterInfo.h.
Definition at line 139 of file WriterInfo.h.
Definition at line 151 of file WriterInfo.h.
Definition at line 135 of file WriterInfo.h.
Referenced by OpenDDS::DCPS::DataReaderImpl::writer_became_dead().
State of the writer.
Definition at line 155 of file WriterInfo.h.
Referenced by check_activity(), received_activity(), OpenDDS::DCPS::DataReaderImpl::writer_became_alive(), OpenDDS::DCPS::DataReaderImpl::writer_became_dead(), and OpenDDS::DCPS::DataReaderImpl::writer_removed().
Definition at line 149 of file WriterInfo.h.
Definition at line 181 of file WriterInfo.h.
Referenced by coherent_change_received(), reset_coherent_info(), and set_group_info().
DCPSInfoRepo ID of the DataWriter.
Definition at line 161 of file WriterInfo.h.
Referenced by OpenDDS::DCPS::DataReaderImpl::instances_liveliness_update(), OpenDDS::DCPS::DataReaderImpl::verify_coherent_changes_completion(), OpenDDS::DCPS::DataReaderImpl::writer_became_alive(), OpenDDS::DCPS::DataReaderImpl::writer_became_dead(), and OpenDDS::DCPS::DataReaderImpl::writer_removed().