OpenDDS::DCPS::WriterInfo Class Reference

Keeps track of a DataWriter's liveliness for a DataReader. More...

#include <WriterInfo.h>

Inheritance diagram for OpenDDS::DCPS::WriterInfo:

Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::WriterInfo:

Collaboration graph
[legend]
List of all members.

Public Types

typedef std::pair< SequenceNumber,
ACE_Time_Value > 
SeqDeadlinePair
 Times after which we no longer need to respond to a REQUEST_ACK message.
 NOT_SET
 ALIVE
 DEAD
 NO_TIMER = -1
enum  WriterState { NOT_SET, ALIVE, DEAD }
enum  HistoricSamplesState { NO_TIMER = -1 }

Public Member Functions

 WriterInfo (WriterInfoListener *reader, const PublicationId &writer_id, const ::DDS::DataWriterQos &writer_qos)
ACE_Time_Value check_activity (const ACE_Time_Value &now)
int received_activity (const ACE_Time_Value &when)
 called when a sample or other activity is received from this writer.
WriterState get_state ()
 returns 1 if the DataWriter is lively; 2 if dead; otherwise returns 0.
OPENDDS_STRING get_state_str () const
void removed ()
 update liveliness when remove_association is called.
void ack_sequence (SequenceNumber value)
 Update the last observed sequence number.
SequenceNumber ack_sequence () const
 Return the most recently observed contiguous sequence number.
bool active (ACE_Time_Value default_participant_timeout) const
Coherent_State coherent_change_received ()
void reset_coherent_info ()
void set_group_info (const CoherentChangeControl &info)
void clear_owner_evaluated ()
void set_owner_evaluated (::DDS::InstanceHandle_t instance, bool flag)
bool is_owner_evaluated (::DDS::InstanceHandle_t instance)
typedef OPENDDS_LIST (SeqDeadlinePair) DeadlineList
 OPENDDS_MAP (SequenceNumber, ReceivedDataSample) historic_samples_
typedef OPENDDS_MAP (::DDS::InstanceHandle_t, bool) OwnerEvaluateFlags
 Is this writer evaluated for owner ?

Public Attributes

ACE_Time_Value last_liveliness_activity_time_
 Timestamp of last write/dispose/assert_liveliness from this DataWriter.
DeadlineList ack_deadlines_
DisjointSequence ack_sequence_
bool seen_data_
long historic_samples_timer_
long remove_association_timer_
ACE_Time_Value removal_deadline_
SequenceNumber last_historic_seq_
 After receiving END_HISTORIC_SAMPLES, check for duplicates.
bool waiting_for_end_historic_samples_
bool scheduled_for_removal_
bool notify_lost_
WriterState state_
 State of the writer.
WriterInfoListenerreader_
 The DataReader owning this WriterInfo.
PublicationId writer_id_
 DCPSInfoRepo ID of the DataWriter.
::DDS::DataWriterQos writer_qos_
 Writer qos.
::DDS::InstanceHandle_t handle_
 The publication entity instance handle.
ACE_Atomic_Op< ACE_Thread_Mutex,
ACE_UINT32 > 
coherent_samples_
 Number of received coherent changes in active change set.
OwnerEvaluateFlags owner_evaluated_
bool group_coherent_
 Data to support GROUP access scope.
RepoId publisher_id_
DisjointSequence coherent_sample_sequence_
WriterCoherentSample writer_coherent_samples_
GroupCoherentSamples group_coherent_samples_

Friends

class WriteInfoListner

Detailed Description

Keeps track of a DataWriter's liveliness for a DataReader.

Definition at line 71 of file WriterInfo.h.


Member Typedef Documentation

typedef std::pair<SequenceNumber, ACE_Time_Value> OpenDDS::DCPS::WriterInfo::SeqDeadlinePair

Times after which we no longer need to respond to a REQUEST_ACK message.

Definition at line 129 of file WriterInfo.h.


Member Enumeration Documentation

enum OpenDDS::DCPS::WriterInfo::HistoricSamplesState

Enumerator:
NO_TIMER 

Definition at line 76 of file WriterInfo.h.

00076 { NO_TIMER = -1 };

enum OpenDDS::DCPS::WriterInfo::WriterState

Enumerator:
NOT_SET 
ALIVE 
DEAD 

Definition at line 75 of file WriterInfo.h.

00075 { NOT_SET, ALIVE, DEAD };


Constructor & Destructor Documentation

OpenDDS::DCPS::WriterInfo::WriterInfo ( WriterInfoListener reader,
const PublicationId writer_id,
const ::DDS::DataWriterQos writer_qos 
)

Definition at line 57 of file WriterInfo.cpp.

References OpenDDS::DCPS::DCPS_debug_level, OPENDDS_STRING, reset_coherent_info(), and OpenDDS::DCPS::WriterInfoListener::subscription_id_.

00060   : last_liveliness_activity_time_(ACE_OS::gettimeofday()),
00061   seen_data_(false),
00062   historic_samples_timer_(NO_TIMER),
00063   remove_association_timer_(NO_TIMER),
00064   removal_deadline_(ACE_Time_Value::zero),
00065   last_historic_seq_(SequenceNumber::SEQUENCENUMBER_UNKNOWN()),
00066   waiting_for_end_historic_samples_(false),
00067   scheduled_for_removal_(false),
00068   notify_lost_(false),
00069   state_(NOT_SET),
00070   reader_(reader),
00071   writer_id_(writer_id),
00072   writer_qos_(writer_qos),
00073   handle_(DDS::HANDLE_NIL)
00074 {
00075 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00076   this->reset_coherent_info();
00077 #endif
00078 
00079   if (DCPS_debug_level >= 5) {
00080     GuidConverter writer_converter(writer_id);
00081     GuidConverter reader_converter(reader->subscription_id_);
00082     ACE_DEBUG((LM_DEBUG,
00083                ACE_TEXT("(%P|%t) WriterInfo::WriterInfo: ")
00084                ACE_TEXT("writer %C added to reader %C.\n"),
00085                OPENDDS_STRING(writer_converter).c_str(),
00086                OPENDDS_STRING(reader_converter).c_str()));
00087   }
00088 }


Member Function Documentation

SequenceNumber OpenDDS::DCPS::WriterInfo::ack_sequence (  )  const

Return the most recently observed contiguous sequence number.

Definition at line 174 of file WriterInfo.cpp.

References ack_sequence_, and OpenDDS::DCPS::DisjointSequence::cumulative_ack().

00175 {
00176   // sample_lock_ is held by the caller.
00177   return this->ack_sequence_.cumulative_ack();
00178 }

void OpenDDS::DCPS::WriterInfo::ack_sequence ( SequenceNumber  value  ) 

Update the last observed sequence number.

Definition at line 167 of file WriterInfo.cpp.

References ack_sequence_, and OpenDDS::DCPS::DisjointSequence::insert().

00168 {
00169   // sample_lock_ is held by the caller.
00170   this->ack_sequence_.insert(value);
00171 }

bool OpenDDS::DCPS::WriterInfo::active ( ACE_Time_Value  default_participant_timeout  )  const

Checks to see if writer has registered activity in either liveliness_lease_duration or DCPSPendingTimeout duration to allow it to finish before reader removes it

Definition at line 181 of file WriterInfo.cpp.

References OpenDDS::DCPS::duration_to_time_value(), last_liveliness_activity_time_, OpenDDS::DCPS::WriterInfoListener::liveliness_lease_duration_, reader_, DDS::DataWriterQos::reliability, and writer_qos_.

00182 {
00183   // Need some period of time by which to decide if a writer the
00184   // DataReaderImpl knows about has gone 'inactive'.  Used to determine
00185   // if a remove_associations should remove immediately or wait to let
00186   // reader process more information that may have queued up from the writer
00187   // Over-arching max wait time for removal is controlled in the
00188   // RemoveAssociationSweeper (10 seconds), but on a per writer basis set
00189   // activity_wait_period based on:
00190   //     1) Reader's liveliness_lease_duration
00191   //     2) DCPSPendingTimeout value (if not zero)
00192   //     3) Writer's max blocking time (could be infinite, in which case
00193   //        RemoveAssociationSweeper will remove after its max wait)
00194   //     4) Zero - don't wait, simply remove association
00195   ACE_Time_Value activity_wait_period(default_participant_timeout);
00196   if (reader_->liveliness_lease_duration_ != ACE_Time_Value::zero) {
00197     activity_wait_period = reader_->liveliness_lease_duration_;
00198   }
00199   if (activity_wait_period == ACE_Time_Value::zero) {
00200       activity_wait_period = duration_to_time_value(writer_qos_.reliability.max_blocking_time);
00201   }
00202   if (activity_wait_period == ACE_Time_Value::zero) {
00203     return false;
00204   }
00205   return (ACE_OS::gettimeofday() - last_liveliness_activity_time_) <= activity_wait_period;
00206 }

ACE_Time_Value OpenDDS::DCPS::WriterInfo::check_activity ( const ACE_Time_Value &  now  ) 

check to see if this writer is alive (called by handle_timeout).

Parameters:
now next time this DataWriter will become not active (not alive) if no sample or liveliness message is received.
Returns:
absolute time when the Writer will become not active (if no activity) of ACE_Time_Value::zero if the writer is already or became not alive

Definition at line 140 of file WriterInfo.cpp.

References ALIVE, last_liveliness_activity_time_, OpenDDS::DCPS::WriterInfoListener::liveliness_lease_duration_, reader_, state_, and OpenDDS::DCPS::WriterInfoListener::writer_became_dead().

00141 {
00142   ACE_Time_Value expires_at = ACE_Time_Value::max_time;
00143 
00144   // We only need check the liveliness with the non-zero liveliness_lease_duration_.
00145   // if (state_ != DEAD && reader_->liveliness_lease_duration_ != ACE_Time_Value::zero)
00146   if (state_ == ALIVE && reader_->liveliness_lease_duration_ != ACE_Time_Value::zero) {
00147     expires_at = this->last_liveliness_activity_time_ +
00148                  reader_->liveliness_lease_duration_;
00149 
00150     if (expires_at <= now) {
00151       // let all instances know this write is not alive.
00152       reader_->writer_became_dead(*this, now);
00153       expires_at = ACE_Time_Value::max_time;
00154     }
00155   }
00156 
00157   return expires_at;
00158 }

void OpenDDS::DCPS::WriterInfo::clear_owner_evaluated (  ) 

Definition at line 113 of file WriterInfo.cpp.

References owner_evaluated_.

Referenced by OpenDDS::DCPS::DataReaderImpl::writer_became_dead(), and OpenDDS::DCPS::DataReaderImpl::writer_removed().

00114 {
00115   this->owner_evaluated_.clear ();
00116 }

Coherent_State OpenDDS::DCPS::WriterInfo::coherent_change_received (  ) 

Definition at line 211 of file WriterInfo.cpp.

References coherent_sample_sequence_, OpenDDS::DCPS::COMPLETED, OpenDDS::DCPS::DisjointSequence::high(), OpenDDS::DCPS::WriterCoherentSample::last_sample_, OpenDDS::DCPS::NOT_COMPLETED_YET, OpenDDS::DCPS::REJECTED, and writer_coherent_samples_.

Referenced by OpenDDS::DCPS::DataReaderImpl::verify_coherent_changes_completion().

00212 {
00213   if (this->writer_coherent_samples_.num_samples_ == 0) {
00214     return NOT_COMPLETED_YET;
00215   }
00216 
00217   if (!this->coherent_sample_sequence_.disjoint()
00218       && (this->coherent_sample_sequence_.high()
00219           == this->writer_coherent_samples_.last_sample_)) {
00220     return COMPLETED;
00221   }
00222 
00223   if (this->coherent_sample_sequence_.high() >
00224       this->writer_coherent_samples_.last_sample_) {
00225     return REJECTED;
00226   }
00227 
00228   return NOT_COMPLETED_YET;
00229 }

WriterState OpenDDS::DCPS::WriterInfo::get_state (  )  [inline]

returns 1 if the DataWriter is lively; 2 if dead; otherwise returns 0.

Definition at line 93 of file WriterInfo.h.

00093                           {
00094     return state_;
00095   };

OPENDDS_STRING OpenDDS::DCPS::WriterInfo::get_state_str (  )  const

Definition at line 91 of file WriterInfo.cpp.

References ALIVE, DEAD, NOT_SET, OPENDDS_STRING, and OpenDDS::DCPS::to_dds_string().

Referenced by OpenDDS::DCPS::DataReaderImpl::writer_became_alive(), and OpenDDS::DCPS::DataReaderImpl::writer_became_dead().

00092 {
00093   OPENDDS_STRING ret;
00094   switch (this->state_) {
00095   case WriterInfo::NOT_SET:
00096     ret += "NOT_SET";
00097     break;
00098   case WriterInfo::ALIVE:
00099     ret += "ALIVE";
00100     break;
00101   case WriterInfo::DEAD:
00102     ret += "DEAD";
00103     break;
00104   default:
00105     ret += "UNSPECIFIED(";
00106     ret += to_dds_string(int(this->state_));
00107     ret += ")";
00108   }
00109   return ret.c_str();
00110 }

bool OpenDDS::DCPS::WriterInfo::is_owner_evaluated ( ::DDS::InstanceHandle_t  instance  ) 

Definition at line 128 of file WriterInfo.cpp.

References owner_evaluated_.

00129 {
00130   OwnerEvaluateFlags::iterator iter = owner_evaluated_.find (instance);
00131   if (iter == owner_evaluated_.end ()) {
00132     this->owner_evaluated_.insert (OwnerEvaluateFlags::value_type (instance, false));
00133     return false;
00134   }
00135   else
00136     return iter->second;
00137 }

typedef OpenDDS::DCPS::WriterInfo::OPENDDS_LIST ( SeqDeadlinePair   ) 

typedef OpenDDS::DCPS::WriterInfo::OPENDDS_MAP ( ::DDS::InstanceHandle_t  ,
bool   
)

Is this writer evaluated for owner ?

OpenDDS::DCPS::WriterInfo::OPENDDS_MAP ( SequenceNumber  ,
ReceivedDataSample   
)

Temporary holding place for samples received before the END_HISTORIC_SAMPLES control message.

int OpenDDS::DCPS::WriterInfo::received_activity ( const ACE_Time_Value &  when  )  [inline]

called when a sample or other activity is received from this writer.

Definition at line 189 of file WriterInfo.h.

References ALIVE, last_liveliness_activity_time_, reader_, state_, and OpenDDS::DCPS::WriterInfoListener::writer_became_alive().

00190 {
00191   last_liveliness_activity_time_ = when;
00192 
00193   if (state_ != ALIVE) { // NOT_SET || DEAD
00194     reader_->writer_became_alive(*this, when);
00195     return 0;
00196   }
00197 
00198   //TBD - is the "was alive" return value used?
00199   return 1;
00200 }

void OpenDDS::DCPS::WriterInfo::removed (  ) 

update liveliness when remove_association is called.

Definition at line 161 of file WriterInfo.cpp.

References reader_, and OpenDDS::DCPS::WriterInfoListener::writer_removed().

00162 {
00163   reader_->writer_removed(*this);
00164 }

void OpenDDS::DCPS::WriterInfo::reset_coherent_info (  ) 

Definition at line 232 of file WriterInfo.cpp.

References coherent_sample_sequence_, coherent_samples_, group_coherent_, group_coherent_samples_, OpenDDS::DCPS::GUID_UNKNOWN, publisher_id_, OpenDDS::DCPS::WriterCoherentSample::reset(), OpenDDS::DCPS::DisjointSequence::reset(), and writer_coherent_samples_.

Referenced by OpenDDS::DCPS::DataReaderImpl::verify_coherent_changes_completion(), and WriterInfo().

00233 {
00234   this->coherent_samples_ = 0;
00235   this->group_coherent_ = false;
00236   this->publisher_id_ = GUID_UNKNOWN;
00237   this->coherent_sample_sequence_.reset();
00238   this->writer_coherent_samples_.reset();
00239   this->group_coherent_samples_.clear();
00240 }

void OpenDDS::DCPS::WriterInfo::set_group_info ( const CoherentChangeControl info  ) 

Definition at line 244 of file WriterInfo.cpp.

References OpenDDS::DCPS::CoherentChangeControl::coherent_samples_, OpenDDS::DCPS::CoherentChangeControl::group_coherent_, OpenDDS::DCPS::CoherentChangeControl::group_coherent_samples_, group_coherent_samples_, OPENDDS_STRING, OpenDDS::DCPS::CoherentChangeControl::publisher_id_, and writer_coherent_samples_.

00245 {
00246   if (!(this->publisher_id_ == info.publisher_id_)
00247       || this->group_coherent_ != info.group_coherent_) {
00248     GuidConverter sub_id(this->reader_->subscription_id_);
00249     GuidConverter pub_id(this->writer_id_);
00250     ACE_ERROR((LM_ERROR,
00251                ACE_TEXT("(%P|%t) ERROR: WriterInfo::set_group_info()")
00252                ACE_TEXT(" reader %C writer %C incorrect coherent info !\n"),
00253                OPENDDS_STRING(sub_id).c_str(),
00254                OPENDDS_STRING(pub_id).c_str()));
00255   }
00256 
00257   this->writer_coherent_samples_ = info.coherent_samples_;
00258   this->group_coherent_samples_ = info.group_coherent_samples_;
00259 }

void OpenDDS::DCPS::WriterInfo::set_owner_evaluated ( ::DDS::InstanceHandle_t  instance,
bool  flag 
)

Definition at line 119 of file WriterInfo.cpp.

References owner_evaluated_.

00120 {
00121   if (flag ||
00122       (!flag && owner_evaluated_.find (instance) != owner_evaluated_.end ())) {
00123     this->owner_evaluated_ [instance] = flag;
00124   }
00125 }


Friends And Related Function Documentation

friend class WriteInfoListner [friend]

Definition at line 72 of file WriterInfo.h.


Member Data Documentation

DeadlineList OpenDDS::DCPS::WriterInfo::ack_deadlines_

Definition at line 131 of file WriterInfo.h.

DisjointSequence OpenDDS::DCPS::WriterInfo::ack_sequence_

Definition at line 133 of file WriterInfo.h.

Referenced by ack_sequence().

DisjointSequence OpenDDS::DCPS::WriterInfo::coherent_sample_sequence_

Definition at line 180 of file WriterInfo.h.

Referenced by coherent_change_received(), and reset_coherent_info().

ACE_Atomic_Op<ACE_Thread_Mutex, ACE_UINT32> OpenDDS::DCPS::WriterInfo::coherent_samples_

Number of received coherent changes in active change set.

Definition at line 170 of file WriterInfo.h.

Referenced by reset_coherent_info().

bool OpenDDS::DCPS::WriterInfo::group_coherent_

Data to support GROUP access scope.

Definition at line 178 of file WriterInfo.h.

Referenced by reset_coherent_info(), and OpenDDS::DCPS::DataReaderImpl::verify_coherent_changes_completion().

GroupCoherentSamples OpenDDS::DCPS::WriterInfo::group_coherent_samples_

Definition at line 182 of file WriterInfo.h.

Referenced by reset_coherent_info(), and set_group_info().

::DDS::InstanceHandle_t OpenDDS::DCPS::WriterInfo::handle_

The publication entity instance handle.

Definition at line 167 of file WriterInfo.h.

Referenced by OpenDDS::DCPS::DataReaderImpl::writer_became_alive(), OpenDDS::DCPS::DataReaderImpl::writer_became_dead(), and OpenDDS::DCPS::DataReaderImpl::writer_removed().

long OpenDDS::DCPS::WriterInfo::historic_samples_timer_

Definition at line 138 of file WriterInfo.h.

SequenceNumber OpenDDS::DCPS::WriterInfo::last_historic_seq_

After receiving END_HISTORIC_SAMPLES, check for duplicates.

Definition at line 147 of file WriterInfo.h.

ACE_Time_Value OpenDDS::DCPS::WriterInfo::last_liveliness_activity_time_

Timestamp of last write/dispose/assert_liveliness from this DataWriter.

Definition at line 126 of file WriterInfo.h.

Referenced by active(), check_activity(), and received_activity().

bool OpenDDS::DCPS::WriterInfo::notify_lost_

Definition at line 152 of file WriterInfo.h.

OwnerEvaluateFlags OpenDDS::DCPS::WriterInfo::owner_evaluated_

Definition at line 174 of file WriterInfo.h.

Referenced by clear_owner_evaluated(), is_owner_evaluated(), and set_owner_evaluated().

RepoId OpenDDS::DCPS::WriterInfo::publisher_id_

Definition at line 179 of file WriterInfo.h.

Referenced by reset_coherent_info(), and OpenDDS::DCPS::DataReaderImpl::verify_coherent_changes_completion().

WriterInfoListener* OpenDDS::DCPS::WriterInfo::reader_

The DataReader owning this WriterInfo.

Definition at line 158 of file WriterInfo.h.

Referenced by active(), check_activity(), received_activity(), and removed().

ACE_Time_Value OpenDDS::DCPS::WriterInfo::removal_deadline_

Definition at line 140 of file WriterInfo.h.

long OpenDDS::DCPS::WriterInfo::remove_association_timer_

Definition at line 139 of file WriterInfo.h.

bool OpenDDS::DCPS::WriterInfo::scheduled_for_removal_

Definition at line 151 of file WriterInfo.h.

bool OpenDDS::DCPS::WriterInfo::seen_data_

Definition at line 135 of file WriterInfo.h.

Referenced by OpenDDS::DCPS::DataReaderImpl::writer_became_dead().

WriterState OpenDDS::DCPS::WriterInfo::state_

State of the writer.

Definition at line 155 of file WriterInfo.h.

Referenced by check_activity(), received_activity(), OpenDDS::DCPS::DataReaderImpl::writer_became_alive(), OpenDDS::DCPS::DataReaderImpl::writer_became_dead(), and OpenDDS::DCPS::DataReaderImpl::writer_removed().

bool OpenDDS::DCPS::WriterInfo::waiting_for_end_historic_samples_

Definition at line 149 of file WriterInfo.h.

WriterCoherentSample OpenDDS::DCPS::WriterInfo::writer_coherent_samples_

Definition at line 181 of file WriterInfo.h.

Referenced by coherent_change_received(), reset_coherent_info(), and set_group_info().

PublicationId OpenDDS::DCPS::WriterInfo::writer_id_

DCPSInfoRepo ID of the DataWriter.

Definition at line 161 of file WriterInfo.h.

Referenced by OpenDDS::DCPS::DataReaderImpl::instances_liveliness_update(), OpenDDS::DCPS::DataReaderImpl::verify_coherent_changes_completion(), OpenDDS::DCPS::DataReaderImpl::writer_became_alive(), OpenDDS::DCPS::DataReaderImpl::writer_became_dead(), and OpenDDS::DCPS::DataReaderImpl::writer_removed().

::DDS::DataWriterQos OpenDDS::DCPS::WriterInfo::writer_qos_

Writer qos.

Definition at line 164 of file WriterInfo.h.

Referenced by active().


The documentation for this class was generated from the following files:
Generated on Fri Feb 12 20:06:41 2016 for OpenDDS by  doxygen 1.4.7