OpenDDS  Snapshot(2023/04/28-20:55)
DcpsUpcalls.cpp
Go to the documentation of this file.
1 #include <DCPS/DdsDcps_pch.h> // Only the _pch include should start with DCPS/
2 
3 #include "DcpsUpcalls.h"
4 
5 #include "Service_Participant.h"
6 #include "ThreadStatusManager.h"
8 
9 namespace OpenDDS {
10 namespace DCPS {
11 
14  const GUID_t& reader,
15  const WriterAssociation& wa,
16  bool active,
18  : drr_(drr)
19  , reader_(reader)
20  , wa_(wa)
21  , active_(active)
22  , dwr_(dwr)
23  , reader_done_(false)
24  , writer_done_(false)
25  , cnd_(mtx_)
26 {
27 }
28 
30 {
31  ThreadStatusManager& thread_status_manager = TheServiceParticipant->get_thread_status_manager();
32  const TimeDuration thread_status_interval = thread_status_manager.thread_status_interval();
33  const bool update_thread_status = thread_status_manager.update_thread_status();
34 
35  ThreadStatusManager::Start s(thread_status_manager, "DcpsUpcalls");
36 
37  MonotonicTimePoint expire = MonotonicTimePoint::now() + thread_status_interval;
38 
40  if (!drr) {
41  return 0;
42  }
43  drr->add_association(reader_, wa_, active_);
44 
45  {
47  reader_done_ = true;
48  cnd_.notify_one();
49  while (!writer_done_) {
50  if (update_thread_status) {
51  switch (cnd_.wait_until(expire, thread_status_manager)) {
52  case CvStatus_NoTimeout:
53  break;
54 
55  case CvStatus_Timeout:
56  expire = MonotonicTimePoint::now() + thread_status_interval;
57  break;
58 
59  case CvStatus_Error:
60  if (DCPS_debug_level) {
61  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: DcpsUpcalls::svc: error in wait_utill\n"));
62  }
63  return -1;
64  }
65  } else if (cnd_.wait(thread_status_manager) == CvStatus_Error) {
66  if (DCPS_debug_level) {
67  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: DcpsUpcalls::svc: error in wait\n"));
68  }
69  return -1;
70  }
71  }
72  }
73 
74  return 0;
75 }
76 
78 {
79  {
81  writer_done_ = true;
82  cnd_.notify_one();
83  }
84 
85  ThreadStatusManager& thread_status_manager = TheServiceParticipant->get_thread_status_manager();
86  ThreadStatusManager::Sleeper sleeper(thread_status_manager);
87  wait(); // ACE_Task_Base::wait does not accept a timeout
88 }
89 
90 } // namespace DCPS
91 } // namespace OpenDDS
92 
void thread_status_interval(const TimeDuration &thread_status_interval)
#define ACE_ERROR(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
ConditionVariable< ACE_Thread_Mutex > cnd_
Definition: DcpsUpcalls.h:44
const GUID_t & reader_
Definition: DcpsUpcalls.h:38
bool notify_one()
Unblock one of the threads waiting on this condition.
ACE_Thread_Mutex mtx_
Definition: DcpsUpcalls.h:43
const WriterAssociation & wa_
Definition: DcpsUpcalls.h:39
static TimePoint_T< MonotonicClock > now()
Definition: TimePoint_T.inl:41
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
DataReaderCallbacks_wrch drr_
Definition: DcpsUpcalls.h:37
DcpsUpcalls(DataReaderCallbacks_rch drr, const GUID_t &reader, const WriterAssociation &wa, bool active, DataWriterCallbacks_rch dwr)
Definition: DcpsUpcalls.cpp:12
virtual int wait(void)
The wait has returned because of a timeout.
CvStatus wait(ThreadStatusManager &thread_status_manager)
Block until thread is woken up.
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
CvStatus wait_until(const MonotonicTimePoint &expire_at, ThreadStatusManager &thread_status_manager)
Block until woken up or until expire_at. Same as wait() if expire_at is zero.
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
RcHandle< T > lock() const
Definition: RcObject.h:188
The wait has returned because it was woken up.
#define TheServiceParticipant
LM_ERROR
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28