OpenDDS  Snapshot(2023/04/28-20:55)
Public Member Functions | Private Types | Private Member Functions | Private Attributes | List of all members
OpenDDS::DCPS::ThreadPerConnectionSendTask Class Reference

Execute the requests of sending a sample or control message. More...

#include <ThreadPerConnectionSendTask.h>

Inheritance diagram for OpenDDS::DCPS::ThreadPerConnectionSendTask:
Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::ThreadPerConnectionSendTask:
Collaboration graph
[legend]

Public Member Functions

 ThreadPerConnectionSendTask (DataLink *link)
 
virtual ~ThreadPerConnectionSendTask ()
 
int add_request (SendStrategyOpType op, TransportQueueElement *element=0)
 
virtual int open (void *=0)
 Activate the worker threads. More...
 
virtual int svc ()
 The "mainline" executed by the worker thread. More...
 
virtual int close (u_long flag=0)
 Called when the thread exits. More...
 
RemoveResult remove_sample (const DataSampleElement *element)
 Remove sample from the thread per connection queue. More...
 
- Public Member Functions inherited from ACE_Task_Base
 ACE_Task_Base (ACE_Thread_Manager *=0)
 
virtual ~ACE_Task_Base (void)
 
virtual int module_closed (void)
 
virtual int put (ACE_Message_Block *, ACE_Time_Value *=0)
 
virtual int activate (long flags=THR_NEW_LWP|THR_JOINABLE|THR_INHERIT_SCHED, int n_threads=1, int force_active=0, long priority=ACE_DEFAULT_THREAD_PRIORITY, int grp_id=-1, ACE_Task_Base *task=0, ACE_hthread_t thread_handles[]=0, void *stack[]=0, size_t stack_size[]=0, ACE_thread_t thread_ids[]=0, const char *thr_name[]=0)
 
virtual int wait (void)
 
virtual int suspend (void)
 
virtual int resume (void)
 
int grp_id (void) const
 
void grp_id (int)
 
ACE_Thread_Managerthr_mgr (void) const
 
void thr_mgr (ACE_Thread_Manager *)
 
int is_reader (void) const
 
int is_writer (void) const
 
size_t thr_count (void) const
 
ACE_thread_t last_thread (void) const
 
- Public Member Functions inherited from ACE_Service_Object
 ACE_Service_Object (ACE_Reactor *=0)
 
virtual ~ACE_Service_Object (void)
 
- Public Member Functions inherited from ACE_Event_Handler
virtual ~ACE_Event_Handler (void)
 
virtual ACE_HANDLE get_handle (void) const
 
virtual void set_handle (ACE_HANDLE)
 
virtual int priority (void) const
 
virtual void priority (int priority)
 
virtual int handle_input (ACE_HANDLE fd=ACE_INVALID_HANDLE)
 
virtual int handle_output (ACE_HANDLE fd=ACE_INVALID_HANDLE)
 
virtual int handle_exception (ACE_HANDLE fd=ACE_INVALID_HANDLE)
 
virtual int handle_timeout (const ACE_Time_Value &current_time, const void *act=0)
 
virtual int handle_exit (ACE_Process *)
 
virtual int handle_close (ACE_HANDLE handle, ACE_Reactor_Mask close_mask)
 
virtual int handle_signal (int signum, siginfo_t *=0, ucontext_t *=0)
 
virtual int resume_handler (void)
 
virtual int handle_qos (ACE_HANDLE=ACE_INVALID_HANDLE)
 
virtual int handle_group_qos (ACE_HANDLE=ACE_INVALID_HANDLE)
 
virtual void reactor (ACE_Reactor *reactor)
 
virtual ACE_Reactorreactor (void) const
 
virtual ACE_Reactor_Timer_Interfacereactor_timer_interface (void) const
 
virtual Reference_Count add_reference (void)
 
virtual Reference_Count remove_reference (void)
 
Reference_Counting_Policyreference_counting_policy (void)
 
- Public Member Functions inherited from ACE_Shared_Object
 ACE_Shared_Object (void)
 
virtual ~ACE_Shared_Object (void)
 
virtual int init (int argc, ACE_TCHAR *argv[])
 
virtual int fini (void)
 
virtual int info (ACE_TCHAR **info_string, size_t length=0) const
 

Private Types

typedef ACE_SYNCH_MUTEX LockType
 
typedef ACE_Guard< LockTypeGuardType
 
typedef ConditionVariable< LockTypeConditionVariableType
 
typedef BasicQueue< SendRequestQueueType
 

Private Member Functions

virtual void execute (SendRequest &req)
 Handle the request. More...
 

Private Attributes

LockType lock_
 Lock to protect the "state" (all of the data members) of this object. More...
 
QueueType queue_
 The request queue. More...
 
ConditionVariableType work_available_
 
bool shutdown_initiated_
 Flag used to initiate a shutdown request to all worker threads. More...
 
bool opened_
 Flag used to avoid multiple open() calls. More...
 
ACE_thread_t thr_id_
 The id of the thread created by this task. More...
 
DataLinklink_
 The datalink to send the samples or control messages. More...
 

Additional Inherited Members

- Public Types inherited from ACE_Event_Handler
typedef long Reference_Count
 
- Static Public Member Functions inherited from ACE_Task_Base
static ACE_THR_FUNC_RETURN svc_run (void *)
 
static void cleanup (void *object, void *params)
 
- Static Public Member Functions inherited from ACE_Event_Handler
static ACE_THR_FUNC_RETURN read_adapter (void *event_handler)
 
static int register_stdin_handler (ACE_Event_Handler *eh, ACE_Reactor *reactor, ACE_Thread_Manager *thr_mgr, int flags=THR_DETACHED)
 
static int remove_stdin_handler (ACE_Reactor *reactor, ACE_Thread_Manager *thr_mgr)
 
- Public Attributes inherited from ACE_Service_Object
 ACE_ALLOC_HOOK_DECLARE
 
- Public Attributes inherited from ACE_Event_Handler
 LO_PRIORITY
 
 HI_PRIORITY
 
 NULL_MASK
 
 READ_MASK
 
 WRITE_MASK
 
 EXCEPT_MASK
 
 ACCEPT_MASK
 
 CONNECT_MASK
 
 TIMER_MASK
 
 QOS_MASK
 
 GROUP_QOS_MASK
 
 SIGNAL_MASK
 
 ALL_EVENTS_MASK
 
 RWE_MASK
 
 DONT_CALL
 
 ACE_EVENT_HANDLER_NOT_RESUMED
 
 ACE_REACTOR_RESUMES_HANDLER
 
 ACE_APPLICATION_RESUMES_HANDLER
 
- Protected Types inherited from ACE_Event_Handler
typedef ACE_Atomic_Op< ACE_SYNCH_MUTEX, Reference_CountAtomic_Reference_Count
 
- Protected Member Functions inherited from ACE_Event_Handler
 ACE_Event_Handler (ACE_Reactor *=0, int priority=ACE_Event_Handler::LO_PRIORITY)
 
- Protected Attributes inherited from ACE_Task_Base
size_t thr_count_
 
ACE_Thread_Managerthr_mgr_
 
u_long flags_
 
int grp_id_
 
ACE_thread_t last_thread_id_
 
- Protected Attributes inherited from ACE_Event_Handler
Atomic_Reference_Count reference_count_
 

Detailed Description

Execute the requests of sending a sample or control message.

This task implements the request execute method which handles each step of sending a sample or control message.

Definition at line 57 of file ThreadPerConnectionSendTask.h.

Member Typedef Documentation

◆ ConditionVariableType

Definition at line 87 of file ThreadPerConnectionSendTask.h.

◆ GuardType

Definition at line 86 of file ThreadPerConnectionSendTask.h.

◆ LockType

Definition at line 85 of file ThreadPerConnectionSendTask.h.

◆ QueueType

Definition at line 89 of file ThreadPerConnectionSendTask.h.

Constructor & Destructor Documentation

◆ ThreadPerConnectionSendTask()

OpenDDS::DCPS::ThreadPerConnectionSendTask::ThreadPerConnectionSendTask ( DataLink link)

Definition at line 26 of file ThreadPerConnectionSendTask.cpp.

References DBG_ENTRY_LVL, and NULL_thread.

27  : lock_()
29  , shutdown_initiated_(false)
30  , opened_(false)
32  , link_(link)
33 {
34  DBG_ENTRY_LVL("ThreadPerConnectionSendTask", "ThreadPerConnectionSendTask", 6);
35 }
bool opened_
Flag used to avoid multiple open() calls.
ACE_thread_t thr_id_
The id of the thread created by this task.
bool shutdown_initiated_
Flag used to initiate a shutdown request to all worker threads.
ACE_thread_t NULL_thread
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
LockType lock_
Lock to protect the "state" (all of the data members) of this object.
DataLink * link_
The datalink to send the samples or control messages.

◆ ~ThreadPerConnectionSendTask()

OpenDDS::DCPS::ThreadPerConnectionSendTask::~ThreadPerConnectionSendTask ( )
virtual

Definition at line 37 of file ThreadPerConnectionSendTask.cpp.

References DBG_ENTRY_LVL.

38 {
39  DBG_ENTRY_LVL("ThreadPerConnectionSendTask", "~ThreadPerConnectionSendTask", 6);
40 }
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68

Member Function Documentation

◆ add_request()

int OpenDDS::DCPS::ThreadPerConnectionSendTask::add_request ( SendStrategyOpType  op,
TransportQueueElement element = 0 
)

Put the request to the request queue. Returns 0 if successful, -1 otherwise (it has been "rejected" or this task is shutdown).

Definition at line 42 of file ThreadPerConnectionSendTask.cpp.

References ACE_ERROR, ACE_TEXT(), DBG_ENTRY, ACE_Auto_Basic_Ptr< X >::get(), LM_ERROR, lock_, OpenDDS::DCPS::ConditionVariable< Mutex >::notify_one(), OpenDDS::DCPS::BasicQueue< T >::put(), queue_, ACE_Auto_Basic_Ptr< X >::release(), OpenDDS::DCPS::SEND_STOP, shutdown_initiated_, and work_available_.

44 {
45  DBG_ENTRY("ThreadPerConnectionSendTask", "add");
46 
47  ACE_Auto_Ptr<SendRequest> req(new SendRequest);
48  req->op_ = op;
49  req->element_ = element;
50 
51  int result = -1;
52  { // guard scope
53  GuardType guard(lock_);
54 
55  if (shutdown_initiated_) {
56  return -1;
57  }
58 
59  result = queue_.put(req.get());
60 
61  if (result == 0 && op == SEND_STOP) {
63  }
64  }
65 
66  if (result == 0) {
67  req.release();
68  } else {
69  ACE_ERROR((LM_ERROR,
70  ACE_TEXT("(%P|%t) ERROR: ThreadPerConnectionSendTask::add %p\n"),
71  ACE_TEXT("put")));
72  }
73 
74  return result;
75 }
#define ACE_ERROR(X)
bool notify_one()
Unblock one of the threads waiting on this condition.
bool shutdown_initiated_
Flag used to initiate a shutdown request to all worker threads.
#define DBG_ENTRY(CNAME, MNAME)
Definition: EntryExit.h:72
ACE_TEXT("TCP_Factory")
LockType lock_
Lock to protect the "state" (all of the data members) of this object.
int put(T *elem)
Put a pointer to an element (T*) on to the queue.
Definition: BasicQueue_T.h:36

◆ close()

int OpenDDS::DCPS::ThreadPerConnectionSendTask::close ( u_long  flag = 0)
virtual

Called when the thread exits.

Reimplemented from ACE_Task_Base.

Definition at line 191 of file ThreadPerConnectionSendTask.cpp.

References DBG_ENTRY, lock_, OpenDDS::DCPS::ConditionVariable< Mutex >::notify_all(), opened_, shutdown_initiated_, TheServiceParticipant, ACE_OS::thr_equal(), thr_id_, ACE_OS::thr_self(), ACE_Task_Base::wait(), and work_available_.

192 {
193  DBG_ENTRY("ThreadPerConnectionSendTask","close");
194 
195  if (flag == 0) {
196  return 0;
197  }
198 
199  {
200  GuardType guard(lock_);
201 
202  if (shutdown_initiated_) {
203  return 0;
204  }
205 
206  // Set the shutdown flag to true.
207  shutdown_initiated_ = true;
209  }
210 
212  ThreadStatusManager::Sleeper s(TheServiceParticipant->get_thread_status_manager());
213  wait();
214  }
215 
216  return 0;
217 }
bool opened_
Flag used to avoid multiple open() calls.
ACE_thread_t thr_self(void)
ACE_thread_t thr_id_
The id of the thread created by this task.
bool shutdown_initiated_
Flag used to initiate a shutdown request to all worker threads.
int thr_equal(ACE_thread_t t1, ACE_thread_t t2)
#define DBG_ENTRY(CNAME, MNAME)
Definition: EntryExit.h:72
bool notify_all()
Unblock all of the threads waiting on this condition.
virtual int wait(void)
#define TheServiceParticipant
LockType lock_
Lock to protect the "state" (all of the data members) of this object.

◆ execute()

void OpenDDS::DCPS::ThreadPerConnectionSendTask::execute ( SendRequest req)
privatevirtual

Handle the request.

Definition at line 234 of file ThreadPerConnectionSendTask.cpp.

References ACE_ERROR, DBG_ENTRY_LVL, OpenDDS::DCPS::SendRequest::element_, OpenDDS::DCPS::GUID_UNKNOWN, link_, LM_ERROR, OpenDDS::DCPS::SendRequest::op_, OPENDDS_END_VERSIONED_NAMESPACE_DECL, OpenDDS::DCPS::SEND, OpenDDS::DCPS::DataLink::send_i(), OpenDDS::DCPS::SEND_START, OpenDDS::DCPS::DataLink::send_start_i(), OpenDDS::DCPS::SEND_STOP, and OpenDDS::DCPS::DataLink::send_stop_i().

Referenced by svc().

235 {
236  DBG_ENTRY_LVL("ThreadPerConnectionSendTask", "execute", 6);
237 
238 
239 
240  switch (req.op_) {
241  case SEND_START:
242  link_->send_start_i();
243  break;
244  case SEND:
245  link_->send_i(req.element_);
246  break;
247  case SEND_STOP:
248  //DataLink::send_stop_i expects the RepoId of the message sender, however, in ThreadPerConnectionSendTask
249  //the control element will be a null element with only the op_ set. Thus pass in GUID_UNKNOWN which will
250  //allow send_stop to call send_delayed_notifications without a match. In the case of ThreadPerConnectionSendTask
251  //this is allowable because only one thread will be managing the sending thus no deadlock down in send_delayed_notifications()
253  break;
254  default:
255  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: ThreadPerConnectionSendTask::execute unknown command %d\n",
256  req.op_));
257  break;
258  }
259 }
#define ACE_ERROR(X)
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
virtual void send_i(TransportQueueElement *element, bool relink=true)
Definition: DataLink.inl:119
void send_stop_i(GUID_t repoId)
Definition: DataLink.inl:147
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
DataLink * link_
The datalink to send the samples or control messages.

◆ open()

int OpenDDS::DCPS::ThreadPerConnectionSendTask::open ( void *  = 0)
virtual

Activate the worker threads.

Reimplemented from ACE_Task_Base.

Definition at line 77 of file ThreadPerConnectionSendTask.cpp.

References ACE_DEBUG, ACE_ERROR_RETURN, ACE_TEXT(), ACE_Task_Base::activate(), DBG_ENTRY, OpenDDS::DCPS::DCPS_debug_level, link_, LM_DEBUG, LM_ERROR, lock_, opened_, ACE_Event_Handler::priority(), TheServiceParticipant, and OpenDDS::DCPS::DataLink::transport_priority().

78 {
79  DBG_ENTRY("ThreadPerConnectionSendTask", "open");
80 
81  GuardType guard(lock_);
82 
83  // We can assume that we are in the proper state to handle this open()
84  // call as long as we haven't been open()'ed before.
85  if (opened_) {
86  ACE_ERROR_RETURN((LM_ERROR,
87  "(%P|%t) ThreadPerConnectionSendTask failed to open. "
88  "Task has previously been open()'ed.\n"),
89  -1);
90  }
91 
92  DirectPriorityMapper mapper(link_->transport_priority());
93  int priority = mapper.thread_priority();
94 
95  long flags = THR_NEW_LWP | THR_JOINABLE ;//|THR_SCOPE_PROCESS | THR_SCOPE_THREAD;
96  long policy = TheServiceParticipant->scheduler();
97 
98  if (policy >= 0) {
99  flags |= policy;
100  } else {
101  flags |= THR_INHERIT_SCHED;
102  }
103 
104  if (DCPS_debug_level > 0) {
105  ACE_DEBUG((LM_DEBUG,
106  ACE_TEXT("(%P|%t) ThreadPerConnectionSendTask::open(): ")
107  ACE_TEXT("activating thread with flags 0x%08.8x ")
108  ACE_TEXT("and priority %d.\n"),
109  flags,
110  priority));
111  }
112 
113  // Activate this task object with one worker thread.
114  if (activate(flags, 1, 0, priority) != 0) {
115  // Assumes that when activate returns non-zero return code that
116  // no threads were activated.
117  ACE_ERROR_RETURN((LM_ERROR,
118  "(%P|%t) ThreadPerConnectionSendTask failed to activate "
119  "the worker threads.\n"),
120  -1);
121  }
122 
123  // Now we have past the point where we can say we've been open()'ed before.
124  opened_ = true;
125 
126  return 0;
127 }
#define ACE_DEBUG(X)
bool opened_
Flag used to avoid multiple open() calls.
Priority & transport_priority()
Definition: DataLink.inl:21
virtual int priority(void) const
#define DBG_ENTRY(CNAME, MNAME)
Definition: EntryExit.h:72
ACE_TEXT("TCP_Factory")
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
#define ACE_ERROR_RETURN(X, Y)
virtual int activate(long flags=THR_NEW_LWP|THR_JOINABLE|THR_INHERIT_SCHED, int n_threads=1, int force_active=0, long priority=ACE_DEFAULT_THREAD_PRIORITY, int grp_id=-1, ACE_Task_Base *task=0, ACE_hthread_t thread_handles[]=0, void *stack[]=0, size_t stack_size[]=0, ACE_thread_t thread_ids[]=0, const char *thr_name[]=0)
#define TheServiceParticipant
LockType lock_
Lock to protect the "state" (all of the data members) of this object.
DataLink * link_
The datalink to send the samples or control messages.

◆ remove_sample()

RemoveResult OpenDDS::DCPS::ThreadPerConnectionSendTask::remove_sample ( const DataSampleElement element)

Remove sample from the thread per connection queue.

Definition at line 220 of file ThreadPerConnectionSendTask.cpp.

References OpenDDS::DCPS::BasicQueue< T >::accept_visitor(), ACE_Message_Block::cont(), DBG_ENTRY, OpenDDS::DCPS::DataSampleElement::get_sample(), lock_, queue_, and OpenDDS::DCPS::ThreadPerConRemoveVisitor::status().

221 {
222  DBG_ENTRY("ThreadPerConnectionSendTask", "remove_sample");
223 
224  ACE_Message_Block* payload = element->get_sample()->cont();
225  ThreadPerConRemoveVisitor visitor(payload);
226 
227  GuardType guard(lock_);
228 
229  queue_.accept_visitor(visitor);
230 
231  return visitor.status();
232 }
ACE_Message_Block * cont(void) const
#define DBG_ENTRY(CNAME, MNAME)
Definition: EntryExit.h:72
void accept_visitor(VisitorType &visitor) const
Definition: BasicQueue_T.h:74
LockType lock_
Lock to protect the "state" (all of the data members) of this object.

◆ svc()

int OpenDDS::DCPS::ThreadPerConnectionSendTask::svc ( void  )
virtual

The "mainline" executed by the worker thread.

Reimplemented from ACE_Task_Base.

Definition at line 129 of file ThreadPerConnectionSendTask.cpp.

References DBG_ENTRY_LVL, execute(), OpenDDS::DCPS::BasicQueue< T >::get(), lock_, OpenDDS::DCPS::OPENDDS_VECTOR(), queue_, OpenDDS::DCPS::SEND_STOP, shutdown_initiated_, SIG_SETMASK, ACE_OS::sigfillset(), OpenDDS::DCPS::BasicQueue< T >::size(), TheServiceParticipant, thr_id_, ACE_OS::thr_self(), ACE_OS::thr_sigsetmask(), OpenDDS::DCPS::ConditionVariable< Mutex >::wait(), and work_available_.

130 {
131  ThreadStatusManager& thread_status_manager = TheServiceParticipant->get_thread_status_manager();
132  ThreadStatusManager::Start s(thread_status_manager, "ThreadPerConnectionSendTask");
133 
134  DBG_ENTRY_LVL("ThreadPerConnectionSendTask", "svc", 6);
135 
137 
138  // Ignore all signals to avoid
139  // ERROR: <something descriptive> Interrupted system call
140  // The main thread will handle signals.
141  sigset_t set;
142  ACE_OS::sigfillset(&set);
143  ACE_OS::thr_sigsetmask(SIG_SETMASK, &set, NULL);
144 
145  SendRequest* req;
146  OPENDDS_VECTOR(SendRequest*) reqs;
147 
149  GuardType guard(lock_);
150 
151  // Start the "GetWork-And-PerformWork" loop for the current worker thread.
152  while (!shutdown_initiated_) {
153 
154  if (queue_.size() == 0) {
155  work_available_.wait(thread_status_manager);
156  }
157 
158  if (shutdown_initiated_) {
159  break;
160  }
161 
162  while (queue_.size() != 0 && (reqs.empty() || reqs.back()->op_ != SEND_STOP)) {
163  req = queue_.get();
164 
165  if (req == 0) {
166  //I'm not sure why this thread got more signals than actual signals
167  //when using thread_per_connection and the user application thread
168  //send requests without interval. We just need ignore the dequeue
169  //failure.
170  //ACE_ERROR ((LM_ERROR, "(%P|%t) ERROR: ThreadPerConnectionSendTask::svc %p\n",
171  // ACE_TEXT("dequeue_head")));
172  continue;
173  }
174  reqs.push_back(req);
175  }
176 
177  ACE_Guard<ACE_Reverse_Lock<LockType> > rev_guard(rev_lock);
178 
179  if (!reqs.empty() && reqs.back()->op_ == SEND_STOP) {
180  for (size_t i = 0; i < reqs.size(); ++i) {
181  execute(*reqs[i]);
182  delete reqs[i];
183  }
184  reqs.clear();
185  }
186  }
187 
188  return 0;
189 }
ACE_thread_t thr_self(void)
ACE_thread_t thr_id_
The id of the thread created by this task.
bool shutdown_initiated_
Flag used to initiate a shutdown request to all worker threads.
virtual void execute(SendRequest &req)
Handle the request.
int thr_sigsetmask(int how, const sigset_t *nsm, sigset_t *osm)
size_t size() const
Accessor for the current number of elements in the queue.
Definition: BasicQueue_T.h:65
CvStatus wait(ThreadStatusManager &thread_status_manager)
Block until thread is woken up.
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
typedef OPENDDS_VECTOR(ActionConnectionRecord) ConnectionRecords
#define TheServiceParticipant
LockType lock_
Lock to protect the "state" (all of the data members) of this object.
int sigfillset(sigset_t *s)

Member Data Documentation

◆ link_

DataLink* OpenDDS::DCPS::ThreadPerConnectionSendTask::link_
private

The datalink to send the samples or control messages.

Definition at line 113 of file ThreadPerConnectionSendTask.h.

Referenced by execute(), and open().

◆ lock_

LockType OpenDDS::DCPS::ThreadPerConnectionSendTask::lock_
private

Lock to protect the "state" (all of the data members) of this object.

Definition at line 92 of file ThreadPerConnectionSendTask.h.

Referenced by add_request(), close(), open(), remove_sample(), and svc().

◆ opened_

bool OpenDDS::DCPS::ThreadPerConnectionSendTask::opened_
private

Flag used to avoid multiple open() calls.

Definition at line 107 of file ThreadPerConnectionSendTask.h.

Referenced by close(), and open().

◆ queue_

QueueType OpenDDS::DCPS::ThreadPerConnectionSendTask::queue_
private

The request queue.

Definition at line 95 of file ThreadPerConnectionSendTask.h.

Referenced by add_request(), remove_sample(), and svc().

◆ shutdown_initiated_

bool OpenDDS::DCPS::ThreadPerConnectionSendTask::shutdown_initiated_
private

Flag used to initiate a shutdown request to all worker threads.

Definition at line 104 of file ThreadPerConnectionSendTask.h.

Referenced by add_request(), close(), and svc().

◆ thr_id_

ACE_thread_t OpenDDS::DCPS::ThreadPerConnectionSendTask::thr_id_
private

The id of the thread created by this task.

Definition at line 110 of file ThreadPerConnectionSendTask.h.

Referenced by close(), and svc().

◆ work_available_

ConditionVariableType OpenDDS::DCPS::ThreadPerConnectionSendTask::work_available_
private

Condition used to signal the worker threads that they may be able to find a request in the queue_ that needs to be executed. This condition will be signal()'ed each time a request is added to the queue_, and also when this task is shutdown.

Definition at line 101 of file ThreadPerConnectionSendTask.h.

Referenced by add_request(), close(), and svc().


The documentation for this class was generated from the following files: