OpenDDS  Snapshot(2023/04/28-20:55)
PerConnectionSynch.cpp
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 #include <DCPS/DdsDcps_pch.h> //Only the _pch include should start with DCPS/
9 
10 #include "PerConnectionSynch.h"
11 
13 #include <dds/DCPS/debug.h>
14 
15 #if !defined (__ACE_INLINE__)
16 #include "PerConnectionSynch.inl"
17 #endif /* __ACE_INLINE__ */
18 
20 {
21  DBG_ENTRY_LVL("PerConnectionSynch","~PerConnectionSynch",6);
22 }
23 
24 void
26 {
27  DBG_ENTRY_LVL("PerConnectionSynch","work_available",6);
28  GuardType guard(this->lock_);
29  this->work_available_ = 1;
31 }
32 
33 int
35 {
36  DBG_ENTRY_LVL("PerConnectionSynch","open",6);
37  // Activate this object to start a new thread that will call
38  // our svc() method, and then our close() method.
39  this->shutdown_ = 0;
40 
41  long flags;
42  flags = THR_NEW_LWP | THR_JOINABLE ;//|THR_SCOPE_PROCESS | THR_SCOPE_THREAD;
43 
44  if (this->scheduler_ >= 0) {
45  flags |= THR_EXPLICIT_SCHED | this->scheduler_;
46 
47  } else {
48  flags |= THR_INHERIT_SCHED;
49  }
50 
51  if (DCPS_debug_level > 0) {
53  ACE_TEXT("(%P|%t) PerConnectionSynch::open(): ")
54  ACE_TEXT("activating thread with flags 0x%08.8x ")
55  ACE_TEXT("and priority %d.\n"),
56  flags,
57  this->dds_priority_));
58  }
59 
60  return this->activate(flags, 1, 0, this->dds_priority_);
61 }
62 
63 int
65 {
66  ThreadStatusManager& thread_status_manager = TheServiceParticipant->get_thread_status_manager();
67  ThreadStatusManager::Start s(thread_status_manager, "PerConnectionSynch");
68 
69  DBG_ENTRY_LVL("PerConnectionSynch","svc",6);
70 
71  // Ignore all signals to avoid
72  // ERROR: ACE::handle_write_ready return -1 while waiting to unclog. handle_write_ready: Interrupted system call
73  // The main thread will handle signals.
74  sigset_t set;
75  ACE_OS::sigfillset(&set);
77 
78  ThreadSynchWorker::WorkOutcome work_outcome =
80 
81 
82  RcHandle<ThreadSynchWorker> worker = this->worker().lock();
83 
84  if (! worker)
86 
87  // Loop until we honor the shutdown_ flag.
88  while (true) {
89  VDBG((LM_DEBUG,"(%P|%t) DBG: "
90  "Top of infinite svc() loop\n"));
91 
92  {
93  GuardType guard(this->lock_);
94 
95  VDBG((LM_DEBUG,"(%P|%t) DBG: "
96  "Lock acquired. Check to see what to do next.\n"));
97 
98  // We will wait on the condition_ if all of the following are true:
99  //
100  // 1) The last time the perform_work() method was called, it
101  // indicated that there was no more work to do.
102  // 2) Since we last invoked perform_work(), we have not been
103  // informed that there is work_available().
104  // 3) We have not been asked to shutdown_ the svc() loop.
105  //
106  while ((work_outcome ==
108  (this->work_available_ == 0) &&
109  (this->shutdown_ == 0)) {
110  VDBG((LM_DEBUG,"(%P|%t) DBG: "
111  "No work to do. Just wait on the condition.\n"));
112  this->condition_.wait(thread_status_manager);
113  VDBG((LM_DEBUG,"(%P|%t) DBG: "
114  "We are awake from waiting on the condition.\n"));
115  }
116 
117  // Maybe we have been asked to shutdown_ the svc() loop.
118  if (this->shutdown_ == 1) {
119  VDBG_LVL((LM_DEBUG,"(%P|%t) DBG: "
120  "Honoring the shutdown request.\n"), 5);
121  // We are honoring the request to shutdown_ the svc() loop.
122  break;
123  }
124 
125  // Or, perhaps we experienced a fatal error on our last call to
126  // perform_work().
128  VDBG_LVL((LM_DEBUG,"(%P|%t) DBG: "
129  "Fatal error - Broken SynchResounce.\n"), 5);
130  // Stop the svc() loop.
131  break;
132  }
133 
134  VDBG_LVL((LM_DEBUG,"(%P|%t) DBG: "
135  "Reset our work_available_ flag to 0, and release lock.\n"), 5);
136 
137  // Set our work_available_ flag to false (0) before we release the
138  // lock so that we will only count any work_available() calls that
139  // happen after this point.
140  this->work_available_ = 0;
141  }
142 
144  VDBG_LVL((LM_DEBUG,"(%P|%t) DBG: Need to wait for clogged resources to open up.\n"), 5);
145 
146  // Ask the ThreadSynchResource to block us until the clog situation
147  // clears up.
148  if (this->wait_on_clogged_resource() == -1) {
149  VDBG_LVL((LM_DEBUG,"(%P|%t) DBG: "
150  "Fatal error - wait_on_clogged_resource fails.\n"), 5);
151  break;
152  }
153  }
154 
155  VDBG((LM_DEBUG,"(%P|%t) DBG: "
156  "Call perform_work()\n"));
157 
158  // Without the lock, ask the worker to perform some work. It tells
159  // us if it completed with more work to still be performed (or not).
160  work_outcome = worker->perform_work();
161 
162  VDBG_LVL((LM_DEBUG,"(%P|%t) DBG: "
163  "call to perform_work() returned %d\n",work_outcome), 5);
164  }
165 
166  return 0;
167 }
168 
169 int
171 {
172  DBG_ENTRY_LVL("PerConnectionSynch","close",6);
173  return 0;
174 }
175 
176 int
178 {
179  DBG_ENTRY_LVL("PerConnectionSynch","register_worker_i",6);
180  return this->open(0);
181 }
182 
183 void
185 {
186  DBG_ENTRY_LVL("PerConnectionSynch","unregister_worker_i",6);
187  // It is at this point that we need to stop the thread that
188  // was activated when our open() method was called.
189  {
190  // Acquire the lock
191  GuardType guard(this->lock_);
192 
193  // Set the shutdown_ flag to false to shutdown the svc() method loop.
194  this->shutdown_ = 1;
195 
196  // Signal the condition_ object in case the svc() method is currently
197  // blocked wait()'ing on the condition.
199  }
200 
201  // Wait for all threads running this task (there should just be one thread)
202  // to finish.
203 
204  ThreadStatusManager::Sleeper s(TheServiceParticipant->get_thread_status_manager());
205  this->wait();
206 }
WeakRcHandle< ThreadSynchWorker > worker()
Access the worker implementation directly.
Definition: ThreadSynch.inl:22
#define ACE_DEBUG(X)
bool notify_one()
Unblock one of the threads waiting on this condition.
LM_DEBUG
#define VDBG(DBG_ARGS)
int thr_sigsetmask(int how, const sigset_t *nsm, sigset_t *osm)
virtual int wait(void)
ACE_TEXT("TCP_Factory")
CvStatus wait(ThreadStatusManager &thread_status_manager)
Block until thread is woken up.
#define SIG_SETMASK
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
#define VDBG_LVL(DBG_ARGS, LEVEL)
virtual int activate(long flags=THR_NEW_LWP|THR_JOINABLE|THR_INHERIT_SCHED, int n_threads=1, int force_active=0, long priority=ACE_DEFAULT_THREAD_PRIORITY, int grp_id=-1, ACE_Task_Base *task=0, ACE_hthread_t thread_handles[]=0, void *stack[]=0, size_t stack_size[]=0, ACE_thread_t thread_ids[]=0, const char *thr_name[]=0)
#define TheServiceParticipant
int sigfillset(sigset_t *s)