OpenDDS  Snapshot(2023/04/28-20:55)
MessageTracker.cpp
Go to the documentation of this file.
1 /*
2  * Distributed under the OpenDDS License.
3  * See: http://www.opendds.org/license.html
4  */
5 
6 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
7 
8 #include "MessageTracker.h"
9 #include "Service_Participant.h"
10 
11 #include <ace/Synch.h>
12 #include <ace/ACE.h>
13 #include <ace/Guard_T.h>
14 #include <ace/OS_NS_time.h>
15 
17 
18 namespace OpenDDS {
19 namespace DCPS {
20 
22 : msg_src_(msg_src)
23 , dropped_count_(0)
24 , delivered_count_(0)
25 , sent_count_(0)
26 , done_condition_(lock_)
27 {
28 }
29 
30 bool
32 {
33  ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, lock_, false);
34  return pending_messages_i();
35 }
36 
37 bool
39 {
41 }
42 
43 void
45 {
47  ++sent_count_;
48 }
49 
50 void
52 {
55 
56  if (!pending_messages_i()) {
58  }
59 }
60 
61 void
63 {
66 
67  if (!pending_messages_i()) {
69  }
70 }
71 
72 void MessageTracker::wait_messages_pending(const char* caller)
73 {
74  const TimeDuration pending_timeout(TheServiceParticipant->pending_timeout());
75  wait_messages_pending(caller, pending_timeout.is_zero() ?
76  MonotonicTimePoint() : MonotonicTimePoint::now() + pending_timeout);
77 }
78 
79 void MessageTracker::wait_messages_pending(const char* caller, const MonotonicTimePoint& deadline)
80 {
81  const bool use_deadline = deadline.is_zero();
82  ACE_GUARD(ACE_Thread_Mutex, guard, this->lock_);
83  const bool report = DCPS_debug_level > 0 && pending_messages_i();
84  if (report) {
85  if (use_deadline) {
87  ACE_TEXT("(%P|%t) MessageTracker::wait_messages_pending ")
88  ACE_TEXT("from source=%C will wait until %#T.\n"),
89  msg_src_.c_str(), &deadline.value()));
90  } else {
92  ACE_TEXT("(%P|%t) MessageTracker::wait_messages_pending ")
93  ACE_TEXT("from source=%C will wait with no timeout.\n"),
94  msg_src_.c_str()));
95  }
96  }
97  bool loop = true;
98  ThreadStatusManager& thread_status_manager = TheServiceParticipant->get_thread_status_manager();
99  while (loop && pending_messages_i()) {
100  switch (done_condition_.wait_until(deadline, thread_status_manager)) {
101  case CvStatus_Timeout:
104  "(%P|%t) MessageTracker::wait_messages_pending: "
105  "Timed out waiting for messages to be transported (caller: %C)\n",
106  caller));
107  }
108  loop = false;
109  break;
110 
111  case CvStatus_NoTimeout:
112  break;
113 
114  case CvStatus_Error:
115  if (DCPS_debug_level) {
116  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: MessageTracker::wait_messages_pending: "
117  "error in wait_until\n"));
118  }
119  loop = false;
120  return;
121  }
122  }
123  if (report) {
124  ACE_DEBUG((LM_DEBUG, "(%P|%t) MessageTracker::wait_messages_pending %T done\n"));
125  }
126 }
127 
128 int
130 {
132  return dropped_count_;
133 }
134 
135 } // namespace DCPS
136 } // namespace OpenDDS
137 
#define ACE_DEBUG(X)
#define ACE_ERROR(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
MessageTracker(const OPENDDS_STRING &msg_src)
ACE_Guard< ACE_Thread_Mutex > lock_
ConditionVariableType done_condition_
const OPENDDS_STRING msg_src_
static TimePoint_T< MonotonicClock > now()
Definition: TimePoint_T.inl:41
#define OPENDDS_STRING
LM_DEBUG
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
TimePoint_T< MonotonicClock > MonotonicTimePoint
Definition: TimeTypes.h:51
bool notify_all()
Unblock all of the threads waiting on this condition.
ACE_TEXT("TCP_Factory")
The wait has returned because of a timeout.
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
CvStatus wait_until(const MonotonicTimePoint &expire_at, ThreadStatusManager &thread_status_manager)
Block until woken up or until expire_at. Same as wait() if expire_at is zero.
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
void wait_messages_pending(const char *caller)
The wait has returned because it was woken up.
#define TheServiceParticipant
LM_ERROR
const ACE_Time_Value_T< AceClock > & value() const
Definition: TimePoint_T.inl:49
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28