OpenDDS  Snapshot(2023/04/28-20:55)
ThreadStatusManager.cpp
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 #include <DCPS/DdsDcps_pch.h> // Only the _pch include should start with DCPS/
9 
10 #include "ThreadStatusManager.h"
11 #include "SafetyProfileStreams.h"
12 #include "debug.h"
13 
15 
16 namespace OpenDDS {
17 namespace DCPS {
18 
20  const SystemTimePoint& s_now,
21  ThreadStatus next_status,
22  const TimeDuration& bucket_limit,
23  bool nested)
24 {
25  timestamp_ = s_now;
26 
27  if (nested) {
28  switch(next_status) {
31  break;
32  case ThreadStatus_Idle:
34  break;
35  }
36  }
37 
38  if (!nested ||
39  (next_status == ThreadStatus_Active && nesting_depth_ == 1) ||
40  (next_status == ThreadStatus_Idle && nesting_depth_ == 0)) {
41  if (bucket_[current_bucket_].active_time + bucket_[current_bucket_].idle_time > bucket_limit) {
47  }
48 
49  const TimeDuration t = m_now - last_update_;
50 
51  switch (status_) {
54  total_.active_time += t;
55  break;
56  case ThreadStatus_Idle:
58  total_.idle_time += t;
59  break;
60  }
61 
62  last_update_ = m_now;
63  status_ = next_status;
64  }
65 }
66 
68 {
69  const TimeDuration active_bonus = (now > last_update_ && status_ == ThreadStatus_Active) ? (now - last_update_) : TimeDuration::zero_value;
70  const TimeDuration idle_bonus = (now > last_update_ && status_ == ThreadStatus_Idle) ? (now - last_update_) : TimeDuration::zero_value;
71  const TimeDuration denom = total_.active_time + active_bonus + total_.idle_time + idle_bonus;
72 
73  if (!denom.is_zero()) {
74  return (total_.active_time + active_bonus) / denom;
75  }
76  return 0;
77 }
78 
80 {
81 #if defined (ACE_WIN32)
82  return static_cast<unsigned>(ACE_Thread::self());
83 #else
84 # ifdef ACE_HAS_GETTID
85  return ACE_OS::thr_gettid();
86 # else
87  char buffer[32];
88  const size_t len = ACE_OS::thr_id(buffer, 32);
89  return String(buffer, len);
90 # endif
91 #endif /* ACE_WIN32 */
92 }
93 
95 {
96  if (!update_thread_status()) {
97  return;
98  }
99 
100  const ThreadId thread_id = get_thread_id();
101 
102  String bit_key = to_dds_string(thread_id);
103 
104  if (name.length()) {
105  bit_key += " (" + name + ")";
106  }
107 
108  if (DCPS_debug_level > 4) {
109  ACE_DEBUG((LM_DEBUG, "(%P|%t) ThreadStatusManager::add_thread: "
110  "adding thread %C\n", bit_key.c_str()));
111  }
112 
114  map_.insert(std::make_pair(thread_id, Thread(bit_key)));
115 }
116 
118 {
119  if (!update_thread_status()) {
120  return;
121  }
122 
124 
126  const SystemTimePoint s_now = SystemTimePoint::now();
127  const ThreadId thread_id = get_thread_id();
128 
129  const Map::iterator pos = map_.find(thread_id);
130  if (pos != map_.end()) {
131  pos->second.update(m_now, s_now, Thread::ThreadStatus_Active, bucket_limit_, nested);
132  }
133 
134  cleanup(m_now);
135 }
136 
137 void ThreadStatusManager::idle(bool nested)
138 {
139  if (!update_thread_status()) {
140  return;
141  }
142 
144 
146  const SystemTimePoint s_now = SystemTimePoint::now();
147  const ThreadId thread_id = get_thread_id();
148 
149  const Map::iterator pos = map_.find(thread_id);
150  if (pos != map_.end()) {
151  pos->second.update(m_now, s_now, Thread::ThreadStatus_Idle, bucket_limit_, nested);
152  }
153 
154  cleanup(m_now);
155 }
156 
158 {
159  if (!update_thread_status()) {
160  return;
161  }
162 
164 
166  const SystemTimePoint s_now = SystemTimePoint::now();
167  const ThreadId thread_id = get_thread_id();
168 
169  const Map::iterator pos = map_.find(thread_id);
170  if (pos != map_.end()) {
171  pos->second.update(m_now, s_now, Thread::ThreadStatus_Idle, bucket_limit_, false);
172 
173  list_.push_back(pos->second);
174  map_.erase(pos);
175  }
176 
177  cleanup(m_now);
178 }
179 
181  ThreadStatusManager::List& running,
182  ThreadStatusManager::List& finished) const
183 {
185 
186  for (Map::const_iterator pos = map_.begin(), limit = map_.end(); pos != limit; ++pos) {
187  if (pos->second.last_update() > start) {
188  running.push_back(pos->second);
189  }
190  }
191 
192  for (List::const_iterator pos = list_.begin(), limit = list_.end(); pos != limit; ++pos) {
193  if (pos->last_update() > start) {
194  finished.push_back(*pos);
195  }
196  }
197 }
198 
200 {
201  const MonotonicTimePoint cutoff = now - 10 * thread_status_interval_;
202 
203  while (!list_.empty() && list_.front().last_update() < cutoff) {
204  list_.pop_front();
205  }
206 }
207 
208 
209 } // namespace DCPS
210 } // namespace OpenDDS
211 
#define ACE_DEBUG(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
pid_t thr_gettid()
std::string String
ssize_t thr_id(char buffer[], size_t buffer_length)
String to_dds_string(unsigned short to_convert)
double utilization(const MonotonicTimePoint &now) const
static TimePoint_T< MonotonicClock > now()
Definition: TimePoint_T.inl:41
LM_DEBUG
static ACE_thread_t self(void)
void harvest(const MonotonicTimePoint &start, List &running, List &finished) const
const char *const name
Definition: debug.cpp:60
static const TimeDuration zero_value
Definition: TimeDuration.h:31
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
void cleanup(const MonotonicTimePoint &now)
void update(const MonotonicTimePoint &m_now, const SystemTimePoint &s_now, ThreadStatus next_status, const TimeDuration &bucket_limit, bool nested)
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28