OpenDDS  Snapshot(2023/04/07-19:43)
Public Member Functions | Protected Member Functions | Private Types | Private Attributes | List of all members
OpenDDS::DCPS::PerConnectionSynch Class Reference

#include <PerConnectionSynch.h>

Inheritance diagram for OpenDDS::DCPS::PerConnectionSynch:
Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::PerConnectionSynch:
Collaboration graph
[legend]

Public Member Functions

 PerConnectionSynch (ThreadSynchResource *synch_resource, long priority, long scheduler)
 
virtual ~PerConnectionSynch ()
 
virtual void work_available ()
 
virtual int open (void *)
 
virtual int svc ()
 
virtual int close (u_long)
 
- Public Member Functions inherited from ACE_Task_Base
 ACE_Task_Base (ACE_Thread_Manager *=0)
 
virtual ~ACE_Task_Base (void)
 
virtual int module_closed (void)
 
virtual int put (ACE_Message_Block *, ACE_Time_Value *=0)
 
virtual int activate (long flags=THR_NEW_LWP|THR_JOINABLE|THR_INHERIT_SCHED, int n_threads=1, int force_active=0, long priority=ACE_DEFAULT_THREAD_PRIORITY, int grp_id=-1, ACE_Task_Base *task=0, ACE_hthread_t thread_handles[]=0, void *stack[]=0, size_t stack_size[]=0, ACE_thread_t thread_ids[]=0, const char *thr_name[]=0)
 
virtual int wait (void)
 
virtual int suspend (void)
 
virtual int resume (void)
 
int grp_id (void) const
 
void grp_id (int)
 
ACE_Thread_Managerthr_mgr (void) const
 
void thr_mgr (ACE_Thread_Manager *)
 
int is_reader (void) const
 
int is_writer (void) const
 
size_t thr_count (void) const
 
ACE_thread_t last_thread (void) const
 
- Public Member Functions inherited from ACE_Service_Object
 ACE_Service_Object (ACE_Reactor *=0)
 
virtual ~ACE_Service_Object (void)
 
- Public Member Functions inherited from ACE_Event_Handler
virtual ~ACE_Event_Handler (void)
 
virtual ACE_HANDLE get_handle (void) const
 
virtual void set_handle (ACE_HANDLE)
 
virtual int priority (void) const
 
virtual void priority (int priority)
 
virtual int handle_input (ACE_HANDLE fd=ACE_INVALID_HANDLE)
 
virtual int handle_output (ACE_HANDLE fd=ACE_INVALID_HANDLE)
 
virtual int handle_exception (ACE_HANDLE fd=ACE_INVALID_HANDLE)
 
virtual int handle_timeout (const ACE_Time_Value &current_time, const void *act=0)
 
virtual int handle_exit (ACE_Process *)
 
virtual int handle_close (ACE_HANDLE handle, ACE_Reactor_Mask close_mask)
 
virtual int handle_signal (int signum, siginfo_t *=0, ucontext_t *=0)
 
virtual int resume_handler (void)
 
virtual int handle_qos (ACE_HANDLE=ACE_INVALID_HANDLE)
 
virtual int handle_group_qos (ACE_HANDLE=ACE_INVALID_HANDLE)
 
virtual void reactor (ACE_Reactor *reactor)
 
virtual ACE_Reactorreactor (void) const
 
virtual ACE_Reactor_Timer_Interfacereactor_timer_interface (void) const
 
virtual Reference_Count add_reference (void)
 
virtual Reference_Count remove_reference (void)
 
Reference_Counting_Policyreference_counting_policy (void)
 
- Public Member Functions inherited from ACE_Shared_Object
 ACE_Shared_Object (void)
 
virtual ~ACE_Shared_Object (void)
 
virtual int init (int argc, ACE_TCHAR *argv[])
 
virtual int fini (void)
 
virtual int info (ACE_TCHAR **info_string, size_t length=0) const
 
- Public Member Functions inherited from OpenDDS::DCPS::ThreadSynch
virtual ~ThreadSynch ()
 
int register_worker (ThreadSynchWorker &worker)
 
void unregister_worker ()
 Our owner, the worker_, is breaking our relationship. More...
 

Protected Member Functions

virtual int register_worker_i ()
 
virtual void unregister_worker_i ()
 
- Protected Member Functions inherited from ACE_Event_Handler
 ACE_Event_Handler (ACE_Reactor *=0, int priority=ACE_Event_Handler::LO_PRIORITY)
 
- Protected Member Functions inherited from OpenDDS::DCPS::ThreadSynch
 ThreadSynch (ThreadSynchResource *resource)
 
int wait_on_clogged_resource ()
 
WeakRcHandle< ThreadSynchWorkerworker ()
 Access the worker implementation directly. More...
 

Private Types

typedef ACE_SYNCH_MUTEX LockType
 
typedef ACE_Guard< LockTypeGuardType
 
typedef ConditionVariable< LockTypeConditionVariableType
 

Private Attributes

LockType lock_
 
ConditionVariableType condition_
 
int work_available_
 
int shutdown_
 
long dds_priority_
 
long scheduler_
 

Additional Inherited Members

- Public Types inherited from ACE_Event_Handler
typedef long Reference_Count
 
- Static Public Member Functions inherited from ACE_Task_Base
static ACE_THR_FUNC_RETURN svc_run (void *)
 
static void cleanup (void *object, void *params)
 
- Static Public Member Functions inherited from ACE_Event_Handler
static ACE_THR_FUNC_RETURN read_adapter (void *event_handler)
 
static int register_stdin_handler (ACE_Event_Handler *eh, ACE_Reactor *reactor, ACE_Thread_Manager *thr_mgr, int flags=THR_DETACHED)
 
static int remove_stdin_handler (ACE_Reactor *reactor, ACE_Thread_Manager *thr_mgr)
 
- Public Attributes inherited from ACE_Service_Object
 ACE_ALLOC_HOOK_DECLARE
 
- Public Attributes inherited from ACE_Event_Handler
 LO_PRIORITY
 
 HI_PRIORITY
 
 NULL_MASK
 
 READ_MASK
 
 WRITE_MASK
 
 EXCEPT_MASK
 
 ACCEPT_MASK
 
 CONNECT_MASK
 
 TIMER_MASK
 
 QOS_MASK
 
 GROUP_QOS_MASK
 
 SIGNAL_MASK
 
 ALL_EVENTS_MASK
 
 RWE_MASK
 
 DONT_CALL
 
 ACE_EVENT_HANDLER_NOT_RESUMED
 
 ACE_REACTOR_RESUMES_HANDLER
 
 ACE_APPLICATION_RESUMES_HANDLER
 
- Protected Types inherited from ACE_Event_Handler
typedef ACE_Atomic_Op< ACE_SYNCH_MUTEX, Reference_CountAtomic_Reference_Count
 
- Protected Attributes inherited from ACE_Task_Base
size_t thr_count_
 
ACE_Thread_Managerthr_mgr_
 
u_long flags_
 
int grp_id_
 
ACE_thread_t last_thread_id_
 
- Protected Attributes inherited from ACE_Event_Handler
Atomic_Reference_Count reference_count_
 

Detailed Description

Definition at line 23 of file PerConnectionSynch.h.

Member Typedef Documentation

◆ ConditionVariableType

Definition at line 47 of file PerConnectionSynch.h.

◆ GuardType

Definition at line 46 of file PerConnectionSynch.h.

◆ LockType

Definition at line 45 of file PerConnectionSynch.h.

Constructor & Destructor Documentation

◆ PerConnectionSynch()

ACE_INLINE OpenDDS::DCPS::PerConnectionSynch::PerConnectionSynch ( ThreadSynchResource synch_resource,
long  priority,
long  scheduler 
)

Definition at line 11 of file PerConnectionSynch.inl.

References DBG_ENTRY_LVL.

15  : ThreadSynch(synch_resource),
16  condition_(this->lock_),
17  work_available_(0),
18  shutdown_(0),
20  scheduler_(scheduler)
21 {
22  DBG_ENTRY_LVL("PerConnectionSynch","PerConnectionSynch",6);
23 }
ThreadSynch(ThreadSynchResource *resource)
Definition: ThreadSynch.inl:14
virtual int priority(void) const
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68

◆ ~PerConnectionSynch()

OpenDDS::DCPS::PerConnectionSynch::~PerConnectionSynch ( )
virtual

Definition at line 19 of file PerConnectionSynch.cpp.

References DBG_ENTRY_LVL.

20 {
21  DBG_ENTRY_LVL("PerConnectionSynch","~PerConnectionSynch",6);
22 }
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68

Member Function Documentation

◆ close()

int OpenDDS::DCPS::PerConnectionSynch::close ( u_long  )
virtual

Reimplemented from ACE_Task_Base.

Definition at line 170 of file PerConnectionSynch.cpp.

References DBG_ENTRY_LVL.

171 {
172  DBG_ENTRY_LVL("PerConnectionSynch","close",6);
173  return 0;
174 }
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68

◆ open()

int OpenDDS::DCPS::PerConnectionSynch::open ( void *  )
virtual

Reimplemented from ACE_Task_Base.

Definition at line 34 of file PerConnectionSynch.cpp.

References ACE_DEBUG, ACE_TEXT(), ACE_Task_Base::activate(), DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, dds_priority_, LM_DEBUG, scheduler_, and shutdown_.

Referenced by register_worker_i().

35 {
36  DBG_ENTRY_LVL("PerConnectionSynch","open",6);
37  // Activate this object to start a new thread that will call
38  // our svc() method, and then our close() method.
39  this->shutdown_ = 0;
40 
41  long flags;
42  flags = THR_NEW_LWP | THR_JOINABLE ;//|THR_SCOPE_PROCESS | THR_SCOPE_THREAD;
43 
44  if (this->scheduler_ >= 0) {
45  flags |= THR_EXPLICIT_SCHED | this->scheduler_;
46 
47  } else {
48  flags |= THR_INHERIT_SCHED;
49  }
50 
51  if (DCPS_debug_level > 0) {
52  ACE_DEBUG((LM_DEBUG,
53  ACE_TEXT("(%P|%t) PerConnectionSynch::open(): ")
54  ACE_TEXT("activating thread with flags 0x%08.8x ")
55  ACE_TEXT("and priority %d.\n"),
56  flags,
57  this->dds_priority_));
58  }
59 
60  return this->activate(flags, 1, 0, this->dds_priority_);
61 }
#define ACE_DEBUG(X)
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_TEXT("TCP_Factory")
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
virtual int activate(long flags=THR_NEW_LWP|THR_JOINABLE|THR_INHERIT_SCHED, int n_threads=1, int force_active=0, long priority=ACE_DEFAULT_THREAD_PRIORITY, int grp_id=-1, ACE_Task_Base *task=0, ACE_hthread_t thread_handles[]=0, void *stack[]=0, size_t stack_size[]=0, ACE_thread_t thread_ids[]=0, const char *thr_name[]=0)

◆ register_worker_i()

int OpenDDS::DCPS::PerConnectionSynch::register_worker_i ( )
protectedvirtual

The default implementation is to do nothing here. The subclass may override the implementation in order to do something when the worker registers. Returns 0 for success, -1 for failure.

Reimplemented from OpenDDS::DCPS::ThreadSynch.

Definition at line 177 of file PerConnectionSynch.cpp.

References DBG_ENTRY_LVL, and open().

178 {
179  DBG_ENTRY_LVL("PerConnectionSynch","register_worker_i",6);
180  return this->open(0);
181 }
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68

◆ svc()

int OpenDDS::DCPS::PerConnectionSynch::svc ( void  )
virtual

Reimplemented from ACE_Task_Base.

Definition at line 64 of file PerConnectionSynch.cpp.

References condition_, DBG_ENTRY_LVL, LM_DEBUG, lock_, shutdown_, SIG_SETMASK, ACE_OS::sigfillset(), TheServiceParticipant, ACE_OS::thr_sigsetmask(), VDBG, VDBG_LVL, OpenDDS::DCPS::ConditionVariable< Mutex >::wait(), OpenDDS::DCPS::ThreadSynch::wait_on_clogged_resource(), work_available_, OpenDDS::DCPS::ThreadSynchWorker::WORK_OUTCOME_BROKEN_RESOURCE, OpenDDS::DCPS::ThreadSynchWorker::WORK_OUTCOME_CLOGGED_RESOURCE, OpenDDS::DCPS::ThreadSynchWorker::WORK_OUTCOME_NO_MORE_TO_DO, and OpenDDS::DCPS::ThreadSynch::worker().

65 {
66  ThreadStatusManager& thread_status_manager = TheServiceParticipant->get_thread_status_manager();
67  ThreadStatusManager::Start s(thread_status_manager, "PerConnectionSynch");
68 
69  DBG_ENTRY_LVL("PerConnectionSynch","svc",6);
70 
71  // Ignore all signals to avoid
72  // ERROR: ACE::handle_write_ready return -1 while waiting to unclog. handle_write_ready: Interrupted system call
73  // The main thread will handle signals.
74  sigset_t set;
75  ACE_OS::sigfillset(&set);
76  ACE_OS::thr_sigsetmask(SIG_SETMASK, &set, NULL);
77 
78  ThreadSynchWorker::WorkOutcome work_outcome =
80 
81 
82  RcHandle<ThreadSynchWorker> worker = this->worker().lock();
83 
84  if (! worker)
86 
87  // Loop until we honor the shutdown_ flag.
88  while (true) {
89  VDBG((LM_DEBUG,"(%P|%t) DBG: "
90  "Top of infinite svc() loop\n"));
91 
92  {
93  GuardType guard(this->lock_);
94 
95  VDBG((LM_DEBUG,"(%P|%t) DBG: "
96  "Lock acquired. Check to see what to do next.\n"));
97 
98  // We will wait on the condition_ if all of the following are true:
99  //
100  // 1) The last time the perform_work() method was called, it
101  // indicated that there was no more work to do.
102  // 2) Since we last invoked perform_work(), we have not been
103  // informed that there is work_available().
104  // 3) We have not been asked to shutdown_ the svc() loop.
105  //
106  while ((work_outcome ==
108  (this->work_available_ == 0) &&
109  (this->shutdown_ == 0)) {
110  VDBG((LM_DEBUG,"(%P|%t) DBG: "
111  "No work to do. Just wait on the condition.\n"));
112  this->condition_.wait(thread_status_manager);
113  VDBG((LM_DEBUG,"(%P|%t) DBG: "
114  "We are awake from waiting on the condition.\n"));
115  }
116 
117  // Maybe we have been asked to shutdown_ the svc() loop.
118  if (this->shutdown_ == 1) {
119  VDBG_LVL((LM_DEBUG,"(%P|%t) DBG: "
120  "Honoring the shutdown request.\n"), 5);
121  // We are honoring the request to shutdown_ the svc() loop.
122  break;
123  }
124 
125  // Or, perhaps we experienced a fatal error on our last call to
126  // perform_work().
128  VDBG_LVL((LM_DEBUG,"(%P|%t) DBG: "
129  "Fatal error - Broken SynchResounce.\n"), 5);
130  // Stop the svc() loop.
131  break;
132  }
133 
134  VDBG_LVL((LM_DEBUG,"(%P|%t) DBG: "
135  "Reset our work_available_ flag to 0, and release lock.\n"), 5);
136 
137  // Set our work_available_ flag to false (0) before we release the
138  // lock so that we will only count any work_available() calls that
139  // happen after this point.
140  this->work_available_ = 0;
141  }
142 
144  VDBG_LVL((LM_DEBUG,"(%P|%t) DBG: Need to wait for clogged resources to open up.\n"), 5);
145 
146  // Ask the ThreadSynchResource to block us until the clog situation
147  // clears up.
148  if (this->wait_on_clogged_resource() == -1) {
149  VDBG_LVL((LM_DEBUG,"(%P|%t) DBG: "
150  "Fatal error - wait_on_clogged_resource fails.\n"), 5);
151  break;
152  }
153  }
154 
155  VDBG((LM_DEBUG,"(%P|%t) DBG: "
156  "Call perform_work()\n"));
157 
158  // Without the lock, ask the worker to perform some work. It tells
159  // us if it completed with more work to still be performed (or not).
160  work_outcome = worker->perform_work();
161 
162  VDBG_LVL((LM_DEBUG,"(%P|%t) DBG: "
163  "call to perform_work() returned %d\n",work_outcome), 5);
164  }
165 
166  return 0;
167 }
WeakRcHandle< ThreadSynchWorker > worker()
Access the worker implementation directly.
Definition: ThreadSynch.inl:22
#define VDBG(DBG_ARGS)
int thr_sigsetmask(int how, const sigset_t *nsm, sigset_t *osm)
CvStatus wait(ThreadStatusManager &thread_status_manager)
Block until thread is woken up.
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
#define VDBG_LVL(DBG_ARGS, LEVEL)
#define TheServiceParticipant
int sigfillset(sigset_t *s)

◆ unregister_worker_i()

void OpenDDS::DCPS::PerConnectionSynch::unregister_worker_i ( )
protectedvirtual

The default implementation is to do nothing here. The subclass may override the implementation in order to do something when the worker unregisters.

Reimplemented from OpenDDS::DCPS::ThreadSynch.

Definition at line 184 of file PerConnectionSynch.cpp.

References condition_, DBG_ENTRY_LVL, lock_, OpenDDS::DCPS::ConditionVariable< Mutex >::notify_one(), shutdown_, TheServiceParticipant, and ACE_Task_Base::wait().

185 {
186  DBG_ENTRY_LVL("PerConnectionSynch","unregister_worker_i",6);
187  // It is at this point that we need to stop the thread that
188  // was activated when our open() method was called.
189  {
190  // Acquire the lock
191  GuardType guard(this->lock_);
192 
193  // Set the shutdown_ flag to false to shutdown the svc() method loop.
194  this->shutdown_ = 1;
195 
196  // Signal the condition_ object in case the svc() method is currently
197  // blocked wait()'ing on the condition.
199  }
200 
201  // Wait for all threads running this task (there should just be one thread)
202  // to finish.
203 
204  ThreadStatusManager::Sleeper s(TheServiceParticipant->get_thread_status_manager());
205  this->wait();
206 }
virtual int wait(void)
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
bool notify_one()
Unblock one of the threads waiting on this condition.
#define TheServiceParticipant

◆ work_available()

void OpenDDS::DCPS::PerConnectionSynch::work_available ( )
virtual

The ThreadSynchWorker would like to have its perform_work() called from the appropriate thread once the ThreadSynchResource claims that it is_ready_for_work().

Implements OpenDDS::DCPS::ThreadSynch.

Definition at line 25 of file PerConnectionSynch.cpp.

References condition_, DBG_ENTRY_LVL, lock_, OpenDDS::DCPS::ConditionVariable< Mutex >::notify_one(), and work_available_.

26 {
27  DBG_ENTRY_LVL("PerConnectionSynch","work_available",6);
28  GuardType guard(this->lock_);
29  this->work_available_ = 1;
31 }
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
bool notify_one()
Unblock one of the threads waiting on this condition.

Member Data Documentation

◆ condition_

ConditionVariableType OpenDDS::DCPS::PerConnectionSynch::condition_
private

Definition at line 50 of file PerConnectionSynch.h.

Referenced by svc(), unregister_worker_i(), and work_available().

◆ dds_priority_

long OpenDDS::DCPS::PerConnectionSynch::dds_priority_
private

Definition at line 53 of file PerConnectionSynch.h.

Referenced by open().

◆ lock_

LockType OpenDDS::DCPS::PerConnectionSynch::lock_
private

Definition at line 49 of file PerConnectionSynch.h.

Referenced by svc(), unregister_worker_i(), and work_available().

◆ scheduler_

long OpenDDS::DCPS::PerConnectionSynch::scheduler_
private

Definition at line 54 of file PerConnectionSynch.h.

Referenced by open().

◆ shutdown_

int OpenDDS::DCPS::PerConnectionSynch::shutdown_
private

Definition at line 52 of file PerConnectionSynch.h.

Referenced by open(), svc(), and unregister_worker_i().

◆ work_available_

int OpenDDS::DCPS::PerConnectionSynch::work_available_
private

Definition at line 51 of file PerConnectionSynch.h.

Referenced by svc(), and work_available().


The documentation for this class was generated from the following files: