OpenDDS  Snapshot(2023/04/07-19:43)
Public Member Functions | Private Types | Private Member Functions | Private Attributes | List of all members
OpenDDS::DCPS::MessageTracker Class Reference

#include <MessageTracker.h>

Collaboration diagram for OpenDDS::DCPS::MessageTracker:
Collaboration graph
[legend]

Public Member Functions

 MessageTracker (const OPENDDS_STRING &msg_src)
 
void message_sent ()
 
void message_delivered ()
 
void message_dropped ()
 
bool pending_messages () const
 
void wait_messages_pending (const char *caller)
 
void wait_messages_pending (const char *caller, const MonotonicTimePoint &deadline)
 
int dropped_count () const
 

Private Types

typedef ConditionVariable< ACE_Thread_MutexConditionVariableType
 All messages have been transported condition variable. More...
 

Private Member Functions

bool pending_messages_i () const
 

Private Attributes

const OPENDDS_STRING msg_src_
 
int dropped_count_
 
int delivered_count_
 
int sent_count_
 
ACE_Thread_Mutex lock_
 
ConditionVariableType done_condition_
 

Detailed Description

A simple message tracker to use to wait until all messages have been accounted for being continuing processing.

Definition at line 27 of file MessageTracker.h.

Member Typedef Documentation

◆ ConditionVariableType

All messages have been transported condition variable.

Definition at line 79 of file MessageTracker.h.

Constructor & Destructor Documentation

◆ MessageTracker()

OpenDDS::DCPS::MessageTracker::MessageTracker ( const OPENDDS_STRING msg_src)

Definition at line 21 of file MessageTracker.cpp.

Member Function Documentation

◆ dropped_count()

int OpenDDS::DCPS::MessageTracker::dropped_count ( ) const

For testing.

Definition at line 129 of file MessageTracker.cpp.

References ACE_GUARD_RETURN, dropped_count_, lock_, and OPENDDS_END_VERSIONED_NAMESPACE_DECL.

130 {
132  return dropped_count_;
133 }
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)

◆ message_delivered()

void OpenDDS::DCPS::MessageTracker::message_delivered ( )

◆ message_dropped()

void OpenDDS::DCPS::MessageTracker::message_dropped ( )

◆ message_sent()

void OpenDDS::DCPS::MessageTracker::message_sent ( )

◆ pending_messages()

bool OpenDDS::DCPS::MessageTracker::pending_messages ( ) const

Answer if there are any messages that have not been accounted for.

Definition at line 31 of file MessageTracker.cpp.

References ACE_GUARD_RETURN, lock_, and pending_messages_i().

32 {
33  ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, lock_, false);
34  return pending_messages_i();
35 }
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)

◆ pending_messages_i()

bool OpenDDS::DCPS::MessageTracker::pending_messages_i ( ) const
private

◆ wait_messages_pending() [1/2]

void OpenDDS::DCPS::MessageTracker::wait_messages_pending ( const char *  caller)

Block until all messages have been accounted for or timeouts out based on PendingTimeout.

Definition at line 72 of file MessageTracker.cpp.

References OpenDDS::DCPS::TimePoint_T< MonotonicClock >::now(), and TheServiceParticipant.

Referenced by OpenDDS::DCPS::DataWriterImpl::wait_pending(), and OpenDDS::DCPS::SendResponseListener::~SendResponseListener().

73 {
74  const TimeDuration pending_timeout(TheServiceParticipant->pending_timeout());
75  wait_messages_pending(caller, pending_timeout.is_zero() ?
76  MonotonicTimePoint() : MonotonicTimePoint::now() + pending_timeout);
77 }
void wait_messages_pending(const char *caller)
TimePoint_T< MonotonicClock > MonotonicTimePoint
Definition: TimeTypes.h:51
#define TheServiceParticipant

◆ wait_messages_pending() [2/2]

void OpenDDS::DCPS::MessageTracker::wait_messages_pending ( const char *  caller,
const MonotonicTimePoint deadline 
)

Block until all messages have been accounted for or the deadline supplied has passed. Blocks indefinitely if deadline is zero.

Definition at line 79 of file MessageTracker.cpp.

References ACE_DEBUG, ACE_ERROR, ACE_GUARD, ACE_TEXT(), OpenDDS::DCPS::CvStatus_Error, OpenDDS::DCPS::CvStatus_NoTimeout, OpenDDS::DCPS::CvStatus_Timeout, OpenDDS::DCPS::DCPS_debug_level, done_condition_, OpenDDS::DCPS::TimePoint_T< AceClock >::is_zero(), LM_DEBUG, LM_ERROR, lock_, msg_src_, pending_messages_i(), TheServiceParticipant, OpenDDS::DCPS::TimePoint_T< AceClock >::value(), and OpenDDS::DCPS::ConditionVariable< Mutex >::wait_until().

80 {
81  const bool use_deadline = deadline.is_zero();
82  ACE_GUARD(ACE_Thread_Mutex, guard, this->lock_);
83  const bool report = DCPS_debug_level > 0 && pending_messages_i();
84  if (report) {
85  if (use_deadline) {
86  ACE_DEBUG((LM_DEBUG,
87  ACE_TEXT("(%P|%t) MessageTracker::wait_messages_pending ")
88  ACE_TEXT("from source=%C will wait until %#T.\n"),
89  msg_src_.c_str(), &deadline.value()));
90  } else {
91  ACE_DEBUG((LM_DEBUG,
92  ACE_TEXT("(%P|%t) MessageTracker::wait_messages_pending ")
93  ACE_TEXT("from source=%C will wait with no timeout.\n"),
94  msg_src_.c_str()));
95  }
96  }
97  bool loop = true;
98  ThreadStatusManager& thread_status_manager = TheServiceParticipant->get_thread_status_manager();
99  while (loop && pending_messages_i()) {
100  switch (done_condition_.wait_until(deadline, thread_status_manager)) {
101  case CvStatus_Timeout:
103  ACE_DEBUG((LM_DEBUG,
104  "(%P|%t) MessageTracker::wait_messages_pending: "
105  "Timed out waiting for messages to be transported (caller: %C)\n",
106  caller));
107  }
108  loop = false;
109  break;
110 
111  case CvStatus_NoTimeout:
112  break;
113 
114  case CvStatus_Error:
115  if (DCPS_debug_level) {
116  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: MessageTracker::wait_messages_pending: "
117  "error in wait_until\n"));
118  }
119  loop = false;
120  return;
121  }
122  }
123  if (report) {
124  ACE_DEBUG((LM_DEBUG, "(%P|%t) MessageTracker::wait_messages_pending %T done\n"));
125  }
126 }
#define ACE_DEBUG(X)
#define ACE_ERROR(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
const OPENDDS_STRING msg_src_
ConditionVariableType done_condition_
The wait has returned because of a timeout.
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_TEXT("TCP_Factory")
#define TheServiceParticipant
CvStatus wait_until(const MonotonicTimePoint &expire_at, ThreadStatusManager &thread_status_manager)
Block until woken up or until expire_at. Same as wait() if expire_at is zero.
The wait has returned because it was woken up.

Member Data Documentation

◆ delivered_count_

int OpenDDS::DCPS::MessageTracker::delivered_count_
private

Definition at line 73 of file MessageTracker.h.

Referenced by message_delivered(), and pending_messages_i().

◆ done_condition_

ConditionVariableType OpenDDS::DCPS::MessageTracker::done_condition_
private

Definition at line 80 of file MessageTracker.h.

Referenced by message_delivered(), message_dropped(), and wait_messages_pending().

◆ dropped_count_

int OpenDDS::DCPS::MessageTracker::dropped_count_
private

Definition at line 72 of file MessageTracker.h.

Referenced by dropped_count(), message_dropped(), and pending_messages_i().

◆ lock_

ACE_Thread_Mutex OpenDDS::DCPS::MessageTracker::lock_
mutableprivate

◆ msg_src_

const OPENDDS_STRING OpenDDS::DCPS::MessageTracker::msg_src_
private

Definition at line 71 of file MessageTracker.h.

Referenced by wait_messages_pending().

◆ sent_count_

int OpenDDS::DCPS::MessageTracker::sent_count_
private

Definition at line 74 of file MessageTracker.h.

Referenced by message_sent(), and pending_messages_i().


The documentation for this class was generated from the following files: