60 , historic_samples_timer_(NO_TIMER)
62 , waiting_for_end_historic_samples_(
false)
63 , delivering_historic_samples_(
false)
64 , delivering_historic_samples_cv_(mutex_)
67 , writer_id_(writer_id)
68 , writer_qos_(writer_qos)
71 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 72 reset_coherent_info();
77 ACE_TEXT(
"(%P|%t) WriterInfo::WriterInfo: ")
78 ACE_TEXT(
"writer %C added to reader %C.\n"),
96 ACE_TEXT(
"%d is either invalid or not recognized.\n"),
98 return "Invalid state";
106 const void* arg =
reinterpret_cast<const void*
>(
this);
107 historic_samples_timer_ = sweeper->
reactor()->schedule_timer(sweeper, arg, ten_seconds);
115 sweeper->
reactor()->cancel_timer(historic_samples_timer_);
124 while (delivering_historic_samples_) {
127 if (waiting_for_end_historic_samples_) {
130 if (!historic_samples_.empty()) {
131 last_historic_seq_ = historic_samples_.rbegin()->first;
132 delivering_historic_samples_ =
true;
133 to_deliver.swap(historic_samples_);
147 last_historic_seq = last_historic_seq_;
148 if (waiting_for_end_historic_samples_) {
149 historic_samples_.insert(std::make_pair(seq, sample));
159 delivering_historic_samples_ =
false;
160 delivering_historic_samples_cv_.notify_all();
163 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 168 if (coherent_samples_ == 0) {
171 coherent_sample_sequence_.reset();
172 coherent_sample_sequence_.insert(resetRange);
175 coherent_sample_sequence_.insert(seq);
183 group_coherent_ = group_coherent;
184 publisher_id_ = publisher_id;
193 owner_evaluated_.clear();
201 (!flag && owner_evaluated_.find(instance) != owner_evaluated_.end())) {
202 owner_evaluated_[instance] = flag;
210 OwnerEvaluateFlags::iterator iter = owner_evaluated_.find(instance);
211 if (iter == owner_evaluated_.end()) {
212 owner_evaluated_.insert(OwnerEvaluateFlags::value_type(instance,
false));
229 if (state_ == ALIVE && reader && !reader->liveliness_lease_duration_.is_zero()) {
230 expires_at = last_liveliness_activity_time_ + reader->liveliness_lease_duration_;
232 if (expires_at <= now) {
235 reader->writer_became_dead(*
this);
250 reader->writer_removed(*
this);
254 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 259 if (writer_coherent_samples_.num_samples_ == 0) {
263 if (!coherent_sample_sequence_.disjoint()
264 && (coherent_sample_sequence_.high()
265 == writer_coherent_samples_.last_sample_)) {
269 if (coherent_sample_sequence_.high() >
270 writer_coherent_samples_.last_sample_) {
281 coherent_samples_ = 0;
282 group_coherent_ =
false;
284 coherent_sample_sequence_.reset();
285 writer_coherent_samples_.reset();
286 group_coherent_samples_.clear();
298 ACE_TEXT(
"(%P|%t) ERROR: WriterInfo::set_group_info()")
299 ACE_TEXT(
" reader %C writer %C incorrect coherent info !\n"),
300 reader ?
LogGuid(reader->subscription_id_).
c_str() :
"",
308 #endif // OPENDDS_NO_OBJECT_MODEL_PROFILE
RcHandle< T > rchandle_from(T *pointer)
void cancel_historic_samples_timer(EndHistoricSamplesMissedSweeper *sweeper)
GroupCoherentSamples group_coherent_samples_
const InstanceHandle_t HANDLE_NIL
bool is_owner_evaluated(DDS::InstanceHandle_t instance)
const GUID_t GUID_UNKNOWN
Nil value for GUID.
void set_group_info(const CoherentChangeControl &info)
void schedule_historic_samples_timer(EndHistoricSamplesMissedSweeper *sweeper, const ACE_Time_Value &ten_seconds)
const char * c_str() const
virtual void writer_became_dead(WriterInfo &info)
WriterCoherentSample coherent_samples_
Coherent_State coherent_change_received()
MonotonicTimePoint check_activity(const MonotonicTimePoint &now)
static TimePoint_T< MonotonicClock > now()
bool check_historic(const SequenceNumber &seq, const ReceivedDataSample &sample, SequenceNumber &last_historic_seq)
const char * get_state_str() const
virtual void reactor(ACE_Reactor *reactor)
Holds a data sample received by the transport.
void set_owner_evaluated(DDS::InstanceHandle_t instance, bool flag)
static const TimePoint_T< MonotonicClock > max_value
End Coherent Change message.
HANDLE_TYPE_NATIVE InstanceHandle_t
void coherent_change(bool group_coherent, const GUID_t &publisher_id)
void finished_delivering_historic()
std::pair< SequenceNumber, SequenceNumber > SequenceRange
void add_coherent_samples(const SequenceNumber &seq)
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
void removed()
update liveliness when remove_association is called.
Sequence number abstraction. Only allows positive 64 bit values.
WriterInfo(const WriterInfoListener_rch &reader, const GUID_t &writer_id, const DDS::DataWriterQos &writer_qos)
static SequenceNumber SEQUENCENUMBER_UNKNOWN()
void clear_owner_evaluated()
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
virtual ~WriterInfoListener()
void reset_coherent_info()
bool check_end_historic_samples(EndHistoricSamplesMissedSweeper *sweeper, OPENDDS_MAP(SequenceNumber, ReceivedDataSample)&to_deliver)
virtual void writer_became_alive(WriterInfo &info, const MonotonicTimePoint &when)
virtual void writer_removed(WriterInfo &info)
#define TheServiceParticipant
Keeps track of a DataWriter's liveliness for a DataReader.
The Internal API and Implementation of OpenDDS.
void cancel_timer(WriterInfo_rch &info)
typedef OPENDDS_MAP(OPENDDS_STRING, OPENDDS_STRING) ValueMap
Helper types and functions for config file parsing.