WriterInfo.h

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 
00009 #ifndef OPENDDS_DCPS_WRITERINFO_H
00010 #define OPENDDS_DCPS_WRITERINFO_H
00011 
00012 #include "dds/DCPS/PoolAllocator.h"
00013 #include "dds/DdsDcpsInfoUtilsC.h"
00014 // #include "RcHandle_T.h"
00015 #include "RcObject_T.h"
00016 #include "Definitions.h"
00017 #include "CoherentChangeControl.h"
00018 #include "DisjointSequence.h"
00019 #include "transport/framework/ReceivedDataSample.h"
00020 
00021 namespace OpenDDS {
00022 namespace DCPS {
00023 
00024 class WriterInfo;
00025 
00026 class OpenDDS_Dcps_Export WriterInfoListener
00027 {
00028 public:
00029   WriterInfoListener();
00030   virtual ~WriterInfoListener();
00031 
00032   RepoId subscription_id_;
00033 
00034   /// The time interval for checking liveliness.
00035   /// TBD: Should this be initialized with
00036   ///      DDS::DURATION_INFINITE_SEC and DDS::DURATION_INFINITE_NSEC
00037   ///      instead of ACE_Time_Value::zero to be consistent with default
00038   ///      duration qos ? Or should we simply use the ACE_Time_Value::zero
00039   ///      to indicate the INFINITY duration ?
00040   ACE_Time_Value liveliness_lease_duration_;
00041 
00042   /// tell instances when a DataWriter transitions to being alive
00043   /// The writer state is inout parameter, it has to be set ALIVE before
00044   /// handle_timeout is called since some subroutine use the state.
00045   virtual void writer_became_alive(WriterInfo&           info,
00046                                    const ACE_Time_Value& when);
00047 
00048   /// tell instances when a DataWriter transitions to DEAD
00049   /// The writer state is inout parameter, the state is set to DEAD
00050   /// when it returns.
00051   virtual void writer_became_dead(WriterInfo&           info,
00052                                   const ACE_Time_Value& when);
00053 
00054   /// tell instance when a DataWriter is removed.
00055   /// The liveliness status need update.
00056   virtual void writer_removed(WriterInfo& info);
00057 };
00058 
00059 
00060 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00061 enum Coherent_State {
00062   NOT_COMPLETED_YET,
00063   COMPLETED,
00064   REJECTED
00065 };
00066 #endif
00067 
00068 
00069 
00070 /// Keeps track of a DataWriter's liveliness for a DataReader.
00071 class OpenDDS_Dcps_Export WriterInfo : public RcObject<ACE_SYNCH_MUTEX> {
00072   friend class WriteInfoListner;
00073 
00074 public:
00075   enum WriterState { NOT_SET, ALIVE, DEAD };
00076   enum HistoricSamplesState { NO_TIMER = -1 };
00077 
00078   WriterInfo(WriterInfoListener*         reader,
00079              const PublicationId&        writer_id,
00080              const ::DDS::DataWriterQos& writer_qos);
00081 
00082   /// check to see if this writer is alive (called by handle_timeout).
00083   /// @param now next time this DataWriter will become not active (not alive)
00084   ///      if no sample or liveliness message is received.
00085   /// @returns absolute time when the Writer will become not active (if no activity)
00086   ///          of ACE_Time_Value::zero if the writer is already or became not alive
00087   ACE_Time_Value check_activity(const ACE_Time_Value& now);
00088 
00089   /// called when a sample or other activity is received from this writer.
00090   int received_activity(const ACE_Time_Value& when);
00091 
00092   /// returns 1 if the DataWriter is lively; 2 if dead; otherwise returns 0.
00093   WriterState get_state() {
00094     return state_;
00095   };
00096 
00097   OPENDDS_STRING get_state_str() const;
00098 
00099   /// update liveliness when remove_association is called.
00100   void removed();
00101 
00102   /// Update the last observed sequence number.
00103   void ack_sequence(SequenceNumber value);
00104 
00105   /// Return the most recently observed contiguous sequence number.
00106   SequenceNumber ack_sequence() const;
00107 
00108   /// Checks to see if writer has registered activity in either
00109   /// liveliness_lease_duration or DCPSPendingTimeout duration
00110   /// to allow it to finish before reader removes it
00111   bool active(ACE_Time_Value default_participant_timeout) const;
00112 
00113 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00114   Coherent_State coherent_change_received ();
00115   void reset_coherent_info ();
00116   void set_group_info (const CoherentChangeControl& info);
00117 #endif
00118 
00119   void clear_owner_evaluated ();
00120   void set_owner_evaluated (::DDS::InstanceHandle_t instance, bool flag);
00121   bool is_owner_evaluated (::DDS::InstanceHandle_t instance);
00122 
00123   //private:
00124 
00125   /// Timestamp of last write/dispose/assert_liveliness from this DataWriter
00126   ACE_Time_Value last_liveliness_activity_time_;
00127 
00128   /// Times after which we no longer need to respond to a REQUEST_ACK message.
00129   typedef std::pair<SequenceNumber, ACE_Time_Value> SeqDeadlinePair;
00130   typedef OPENDDS_LIST(SeqDeadlinePair) DeadlineList;
00131   DeadlineList ack_deadlines_;
00132 
00133   DisjointSequence ack_sequence_;
00134 
00135   bool seen_data_;
00136 
00137   // Non-negative if this a durable writer which has a timer scheduled
00138   long historic_samples_timer_;
00139   long remove_association_timer_;
00140   ACE_Time_Value removal_deadline_;
00141 
00142   /// Temporary holding place for samples received before
00143   /// the END_HISTORIC_SAMPLES control message.
00144   OPENDDS_MAP(SequenceNumber, ReceivedDataSample) historic_samples_;
00145 
00146   /// After receiving END_HISTORIC_SAMPLES, check for duplicates
00147   SequenceNumber last_historic_seq_;
00148 
00149   bool waiting_for_end_historic_samples_;
00150 
00151   bool scheduled_for_removal_;
00152   bool notify_lost_;
00153 
00154   /// State of the writer.
00155   WriterState state_;
00156 
00157   /// The DataReader owning this WriterInfo
00158   WriterInfoListener* reader_;
00159 
00160   /// DCPSInfoRepo ID of the DataWriter
00161   PublicationId writer_id_;
00162 
00163   /// Writer qos
00164   ::DDS::DataWriterQos writer_qos_;
00165 
00166   /// The publication entity instance handle.
00167   ::DDS::InstanceHandle_t handle_;
00168 
00169   /// Number of received coherent changes in active change set.
00170   ACE_Atomic_Op<ACE_Thread_Mutex, ACE_UINT32> coherent_samples_;
00171 
00172   /// Is this writer evaluated for owner ?
00173   typedef OPENDDS_MAP( ::DDS::InstanceHandle_t, bool) OwnerEvaluateFlags;
00174   OwnerEvaluateFlags owner_evaluated_;
00175 
00176   /// Data to support GROUP access scope.
00177 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00178   bool group_coherent_;
00179   RepoId publisher_id_;
00180   DisjointSequence coherent_sample_sequence_;
00181   WriterCoherentSample writer_coherent_samples_;
00182   GroupCoherentSamples group_coherent_samples_;
00183 #endif
00184 
00185 };
00186 
00187 inline
00188 int
00189 OpenDDS::DCPS::WriterInfo::received_activity(const ACE_Time_Value& when)
00190 {
00191   last_liveliness_activity_time_ = when;
00192 
00193   if (state_ != ALIVE) { // NOT_SET || DEAD
00194     reader_->writer_became_alive(*this, when);
00195     return 0;
00196   }
00197 
00198   //TBD - is the "was alive" return value used?
00199   return 1;
00200 }
00201 
00202 } // namespace DCPS
00203 } // namespace
00204 
00205 #endif  /* end of include guard: OPENDDS_DCPS_WRITERINFO_H */

Generated on Fri Feb 12 20:05:29 2016 for OpenDDS by  doxygen 1.4.7