00001
00002
00003
00004
00005
00006
00007
00008
00009 #ifndef OPENDDS_DCPS_WRITERINFO_H
00010 #define OPENDDS_DCPS_WRITERINFO_H
00011
00012 #include "dds/DCPS/PoolAllocator.h"
00013 #include "dds/DdsDcpsInfoUtilsC.h"
00014
00015 #include "RcObject_T.h"
00016 #include "Definitions.h"
00017 #include "CoherentChangeControl.h"
00018 #include "DisjointSequence.h"
00019 #include "transport/framework/ReceivedDataSample.h"
00020
00021 namespace OpenDDS {
00022 namespace DCPS {
00023
00024 class WriterInfo;
00025
00026 class OpenDDS_Dcps_Export WriterInfoListener
00027 {
00028 public:
00029 WriterInfoListener();
00030 virtual ~WriterInfoListener();
00031
00032 RepoId subscription_id_;
00033
00034
00035
00036
00037
00038
00039
00040 ACE_Time_Value liveliness_lease_duration_;
00041
00042
00043
00044
00045 virtual void writer_became_alive(WriterInfo& info,
00046 const ACE_Time_Value& when);
00047
00048
00049
00050
00051 virtual void writer_became_dead(WriterInfo& info,
00052 const ACE_Time_Value& when);
00053
00054
00055
00056 virtual void writer_removed(WriterInfo& info);
00057 };
00058
00059
00060 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00061 enum Coherent_State {
00062 NOT_COMPLETED_YET,
00063 COMPLETED,
00064 REJECTED
00065 };
00066 #endif
00067
00068
00069
00070
00071 class OpenDDS_Dcps_Export WriterInfo : public RcObject<ACE_SYNCH_MUTEX> {
00072 friend class WriteInfoListner;
00073
00074 public:
00075 enum WriterState { NOT_SET, ALIVE, DEAD };
00076 enum HistoricSamplesState { NO_TIMER = -1 };
00077
00078 WriterInfo(WriterInfoListener* reader,
00079 const PublicationId& writer_id,
00080 const ::DDS::DataWriterQos& writer_qos);
00081
00082
00083
00084
00085
00086
00087 ACE_Time_Value check_activity(const ACE_Time_Value& now);
00088
00089
00090 int received_activity(const ACE_Time_Value& when);
00091
00092
00093 WriterState get_state() {
00094 return state_;
00095 };
00096
00097 OPENDDS_STRING get_state_str() const;
00098
00099
00100 void removed();
00101
00102
00103 void ack_sequence(SequenceNumber value);
00104
00105
00106 SequenceNumber ack_sequence() const;
00107
00108
00109
00110
00111 bool active(ACE_Time_Value default_participant_timeout) const;
00112
00113 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00114 Coherent_State coherent_change_received ();
00115 void reset_coherent_info ();
00116 void set_group_info (const CoherentChangeControl& info);
00117 #endif
00118
00119 void clear_owner_evaluated ();
00120 void set_owner_evaluated (::DDS::InstanceHandle_t instance, bool flag);
00121 bool is_owner_evaluated (::DDS::InstanceHandle_t instance);
00122
00123
00124
00125
00126 ACE_Time_Value last_liveliness_activity_time_;
00127
00128
00129 typedef std::pair<SequenceNumber, ACE_Time_Value> SeqDeadlinePair;
00130 typedef OPENDDS_LIST(SeqDeadlinePair) DeadlineList;
00131 DeadlineList ack_deadlines_;
00132
00133 DisjointSequence ack_sequence_;
00134
00135 bool seen_data_;
00136
00137
00138 long historic_samples_timer_;
00139 long remove_association_timer_;
00140 ACE_Time_Value removal_deadline_;
00141
00142
00143
00144 OPENDDS_MAP(SequenceNumber, ReceivedDataSample) historic_samples_;
00145
00146
00147 SequenceNumber last_historic_seq_;
00148
00149 bool waiting_for_end_historic_samples_;
00150
00151 bool scheduled_for_removal_;
00152 bool notify_lost_;
00153
00154
00155 WriterState state_;
00156
00157
00158 WriterInfoListener* reader_;
00159
00160
00161 PublicationId writer_id_;
00162
00163
00164 ::DDS::DataWriterQos writer_qos_;
00165
00166
00167 ::DDS::InstanceHandle_t handle_;
00168
00169
00170 ACE_Atomic_Op<ACE_Thread_Mutex, ACE_UINT32> coherent_samples_;
00171
00172
00173 typedef OPENDDS_MAP( ::DDS::InstanceHandle_t, bool) OwnerEvaluateFlags;
00174 OwnerEvaluateFlags owner_evaluated_;
00175
00176
00177 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00178 bool group_coherent_;
00179 RepoId publisher_id_;
00180 DisjointSequence coherent_sample_sequence_;
00181 WriterCoherentSample writer_coherent_samples_;
00182 GroupCoherentSamples group_coherent_samples_;
00183 #endif
00184
00185 };
00186
00187 inline
00188 int
00189 OpenDDS::DCPS::WriterInfo::received_activity(const ACE_Time_Value& when)
00190 {
00191 last_liveliness_activity_time_ = when;
00192
00193 if (state_ != ALIVE) {
00194 reader_->writer_became_alive(*this, when);
00195 return 0;
00196 }
00197
00198
00199 return 1;
00200 }
00201
00202 }
00203 }
00204
00205 #endif