WriterInfo.h

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 
00009 #ifndef OPENDDS_DCPS_WRITERINFO_H
00010 #define OPENDDS_DCPS_WRITERINFO_H
00011 
00012 #include "dds/DdsDcpsInfoUtilsC.h"
00013 #include "dds/DdsDcpsCoreC.h"
00014 #include "dds/DCPS/PoolAllocator.h"
00015 #include "RcObject.h"
00016 #include "Definitions.h"
00017 #include "CoherentChangeControl.h"
00018 #include "DisjointSequence.h"
00019 #include "transport/framework/ReceivedDataSample.h"
00020 
00021 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00022 
00023 namespace OpenDDS {
00024 namespace DCPS {
00025 
00026 class WriterInfo;
00027 
00028 class OpenDDS_Dcps_Export WriterInfoListener
00029 {
00030 public:
00031   WriterInfoListener();
00032   virtual ~WriterInfoListener();
00033 
00034   RepoId subscription_id_;
00035 
00036   /// The time interval for checking liveliness.
00037   /// TBD: Should this be initialized with
00038   ///      DDS::DURATION_INFINITE_SEC and DDS::DURATION_INFINITE_NSEC
00039   ///      instead of ACE_Time_Value::zero to be consistent with default
00040   ///      duration qos ? Or should we simply use the ACE_Time_Value::zero
00041   ///      to indicate the INFINITY duration ?
00042   ACE_Time_Value liveliness_lease_duration_;
00043 
00044   /// tell instances when a DataWriter transitions to being alive
00045   /// The writer state is inout parameter, it has to be set ALIVE before
00046   /// handle_timeout is called since some subroutine use the state.
00047   virtual void writer_became_alive(WriterInfo&           info,
00048                                    const ACE_Time_Value& when);
00049 
00050   /// tell instances when a DataWriter transitions to DEAD
00051   /// The writer state is inout parameter, the state is set to DEAD
00052   /// when it returns.
00053   virtual void writer_became_dead(WriterInfo&           info,
00054                                   const ACE_Time_Value& when);
00055 
00056   /// tell instance when a DataWriter is removed.
00057   /// The liveliness status need update.
00058   virtual void writer_removed(WriterInfo& info);
00059 };
00060 
00061 
00062 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00063 enum Coherent_State {
00064   NOT_COMPLETED_YET,
00065   COMPLETED,
00066   REJECTED
00067 };
00068 #endif
00069 
00070 
00071 
00072 /// Keeps track of a DataWriter's liveliness for a DataReader.
00073 class OpenDDS_Dcps_Export WriterInfo : public RcObject {
00074   friend class WriteInfoListner;
00075 
00076 public:
00077   enum WriterState { NOT_SET, ALIVE, DEAD };
00078   enum HistoricSamplesState { NO_TIMER = -1 };
00079 
00080   WriterInfo(WriterInfoListener*         reader,
00081              const PublicationId&        writer_id,
00082              const DDS::DataWriterQos& writer_qos);
00083 
00084   /// check to see if this writer is alive (called by handle_timeout).
00085   /// @param now next time this DataWriter will become not active (not alive)
00086   ///      if no sample or liveliness message is received.
00087   /// @returns absolute time when the Writer will become not active (if no activity)
00088   ///          of ACE_Time_Value::zero if the writer is already or became not alive
00089   ACE_Time_Value check_activity(const ACE_Time_Value& now);
00090 
00091   /// called when a sample or other activity is received from this writer.
00092   int received_activity(const ACE_Time_Value& when);
00093 
00094   /// returns 1 if the DataWriter is lively; 2 if dead; otherwise returns 0.
00095   WriterState get_state() {
00096     return state_;
00097   };
00098 
00099   OPENDDS_STRING get_state_str() const;
00100 
00101   /// update liveliness when remove_association is called.
00102   void removed();
00103 
00104   ACE_Time_Value activity_wait_period() const;
00105 
00106   /// Checks to see if writer has registered activity in either
00107   /// liveliness_lease_duration or DCPSPendingTimeout duration
00108   /// to allow it to finish before reader removes it
00109   bool active() const;
00110 
00111 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00112   Coherent_State coherent_change_received ();
00113   void reset_coherent_info ();
00114   void set_group_info (const CoherentChangeControl& info);
00115 #endif
00116 
00117   void clear_owner_evaluated ();
00118   void set_owner_evaluated (DDS::InstanceHandle_t instance, bool flag);
00119   bool is_owner_evaluated (DDS::InstanceHandle_t instance);
00120 
00121   //private:
00122 
00123   /// Timestamp of last write/dispose/assert_liveliness from this DataWriter
00124   ACE_Time_Value last_liveliness_activity_time_;
00125 
00126   // Non-negative if this a durable writer which has a timer scheduled
00127   long historic_samples_timer_;
00128   long remove_association_timer_;
00129   ACE_Time_Value removal_deadline_;
00130 
00131   /// Temporary holding place for samples received before
00132   /// the END_HISTORIC_SAMPLES control message.
00133   OPENDDS_MAP(SequenceNumber, ReceivedDataSample) historic_samples_;
00134 
00135   /// After receiving END_HISTORIC_SAMPLES, check for duplicates
00136   SequenceNumber last_historic_seq_;
00137 
00138   bool waiting_for_end_historic_samples_;
00139 
00140   bool scheduled_for_removal_;
00141   bool notify_lost_;
00142 
00143   /// State of the writer.
00144   WriterState state_;
00145 
00146   /// The DataReader owning this WriterInfo
00147   WriterInfoListener* reader_;
00148 
00149   /// DCPSInfoRepo ID of the DataWriter
00150   PublicationId writer_id_;
00151 
00152   /// Writer qos
00153   DDS::DataWriterQos writer_qos_;
00154 
00155   /// The publication entity instance handle.
00156   DDS::InstanceHandle_t handle_;
00157 
00158   /// Number of received coherent changes in active change set.
00159   ACE_Atomic_Op<ACE_Thread_Mutex, ACE_UINT32> coherent_samples_;
00160 
00161   /// Is this writer evaluated for owner ?
00162   typedef OPENDDS_MAP(DDS::InstanceHandle_t, bool) OwnerEvaluateFlags;
00163   OwnerEvaluateFlags owner_evaluated_;
00164 
00165   /// Data to support GROUP access scope.
00166 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00167   bool group_coherent_;
00168   RepoId publisher_id_;
00169   DisjointSequence coherent_sample_sequence_;
00170   WriterCoherentSample writer_coherent_samples_;
00171   GroupCoherentSamples group_coherent_samples_;
00172 #endif
00173 
00174 };
00175 
00176 inline
00177 int
00178 OpenDDS::DCPS::WriterInfo::received_activity(const ACE_Time_Value& when)
00179 {
00180   last_liveliness_activity_time_ = when;
00181 
00182   if (state_ != ALIVE) { // NOT_SET || DEAD
00183     reader_->writer_became_alive(*this, when);
00184     return 0;
00185   }
00186 
00187   //TBD - is the "was alive" return value used?
00188   return 1;
00189 }
00190 
00191 } // namespace DCPS
00192 } // namespace
00193 
00194 OPENDDS_END_VERSIONED_NAMESPACE_DECL
00195 
00196 #endif  /* end of include guard: OPENDDS_DCPS_WRITERINFO_H */
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1