OpenDDS  Snapshot(2023/04/28-20:55)
ThreadPerConnectionSendTask.cpp
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
10 #include "TransportQueueElement.h"
11 #include "TransportSendElement.h"
12 #include "DataLink.h"
14 #include "DirectPriorityMapper.h"
15 #include "EntryExit.h"
18 
19 #include "ace/Auto_Ptr.h"
20 
22 
23 namespace OpenDDS {
24 namespace DCPS {
25 
27  : lock_()
28  , work_available_(lock_)
29  , shutdown_initiated_(false)
30  , opened_(false)
31  , thr_id_(ACE_OS::NULL_thread)
32  , link_(link)
33 {
34  DBG_ENTRY_LVL("ThreadPerConnectionSendTask", "ThreadPerConnectionSendTask", 6);
35 }
36 
38 {
39  DBG_ENTRY_LVL("ThreadPerConnectionSendTask", "~ThreadPerConnectionSendTask", 6);
40 }
41 
43  TransportQueueElement* element)
44 {
45  DBG_ENTRY("ThreadPerConnectionSendTask", "add");
46 
48  req->op_ = op;
49  req->element_ = element;
50 
51  int result = -1;
52  { // guard scope
53  GuardType guard(lock_);
54 
55  if (shutdown_initiated_) {
56  return -1;
57  }
58 
59  result = queue_.put(req.get());
60 
61  if (result == 0 && op == SEND_STOP) {
63  }
64  }
65 
66  if (result == 0) {
67  req.release();
68  } else {
70  ACE_TEXT("(%P|%t) ERROR: ThreadPerConnectionSendTask::add %p\n"),
71  ACE_TEXT("put")));
72  }
73 
74  return result;
75 }
76 
78 {
79  DBG_ENTRY("ThreadPerConnectionSendTask", "open");
80 
81  GuardType guard(lock_);
82 
83  // We can assume that we are in the proper state to handle this open()
84  // call as long as we haven't been open()'ed before.
85  if (opened_) {
87  "(%P|%t) ThreadPerConnectionSendTask failed to open. "
88  "Task has previously been open()'ed.\n"),
89  -1);
90  }
91 
93  int priority = mapper.thread_priority();
94 
95  long flags = THR_NEW_LWP | THR_JOINABLE ;//|THR_SCOPE_PROCESS | THR_SCOPE_THREAD;
96  long policy = TheServiceParticipant->scheduler();
97 
98  if (policy >= 0) {
99  flags |= policy;
100  } else {
101  flags |= THR_INHERIT_SCHED;
102  }
103 
104  if (DCPS_debug_level > 0) {
106  ACE_TEXT("(%P|%t) ThreadPerConnectionSendTask::open(): ")
107  ACE_TEXT("activating thread with flags 0x%08.8x ")
108  ACE_TEXT("and priority %d.\n"),
109  flags,
110  priority));
111  }
112 
113  // Activate this task object with one worker thread.
114  if (activate(flags, 1, 0, priority) != 0) {
115  // Assumes that when activate returns non-zero return code that
116  // no threads were activated.
118  "(%P|%t) ThreadPerConnectionSendTask failed to activate "
119  "the worker threads.\n"),
120  -1);
121  }
122 
123  // Now we have past the point where we can say we've been open()'ed before.
124  opened_ = true;
125 
126  return 0;
127 }
128 
130 {
131  ThreadStatusManager& thread_status_manager = TheServiceParticipant->get_thread_status_manager();
132  ThreadStatusManager::Start s(thread_status_manager, "ThreadPerConnectionSendTask");
133 
134  DBG_ENTRY_LVL("ThreadPerConnectionSendTask", "svc", 6);
135 
137 
138  // Ignore all signals to avoid
139  // ERROR: <something descriptive> Interrupted system call
140  // The main thread will handle signals.
141  sigset_t set;
142  ACE_OS::sigfillset(&set);
143  ACE_OS::thr_sigsetmask(SIG_SETMASK, &set, NULL);
144 
145  SendRequest* req;
147 
149  GuardType guard(lock_);
150 
151  // Start the "GetWork-And-PerformWork" loop for the current worker thread.
152  while (!shutdown_initiated_) {
153 
154  if (queue_.size() == 0) {
155  work_available_.wait(thread_status_manager);
156  }
157 
158  if (shutdown_initiated_) {
159  break;
160  }
161 
162  while (queue_.size() != 0 && (reqs.empty() || reqs.back()->op_ != SEND_STOP)) {
163  req = queue_.get();
164 
165  if (req == 0) {
166  //I'm not sure why this thread got more signals than actual signals
167  //when using thread_per_connection and the user application thread
168  //send requests without interval. We just need ignore the dequeue
169  //failure.
170  //ACE_ERROR ((LM_ERROR, "(%P|%t) ERROR: ThreadPerConnectionSendTask::svc %p\n",
171  // ACE_TEXT("dequeue_head")));
172  continue;
173  }
174  reqs.push_back(req);
175  }
176 
177  ACE_Guard<ACE_Reverse_Lock<LockType> > rev_guard(rev_lock);
178 
179  if (!reqs.empty() && reqs.back()->op_ == SEND_STOP) {
180  for (size_t i = 0; i < reqs.size(); ++i) {
181  execute(*reqs[i]);
182  delete reqs[i];
183  }
184  reqs.clear();
185  }
186  }
187 
188  return 0;
189 }
190 
192 {
193  DBG_ENTRY("ThreadPerConnectionSendTask","close");
194 
195  if (flag == 0) {
196  return 0;
197  }
198 
199  {
200  GuardType guard(lock_);
201 
202  if (shutdown_initiated_) {
203  return 0;
204  }
205 
206  // Set the shutdown flag to true.
207  shutdown_initiated_ = true;
209  }
210 
212  ThreadStatusManager::Sleeper s(TheServiceParticipant->get_thread_status_manager());
213  wait();
214  }
215 
216  return 0;
217 }
218 
221 {
222  DBG_ENTRY("ThreadPerConnectionSendTask", "remove_sample");
223 
224  ACE_Message_Block* payload = element->get_sample()->cont();
225  ThreadPerConRemoveVisitor visitor(payload);
226 
227  GuardType guard(lock_);
228 
229  queue_.accept_visitor(visitor);
230 
231  return visitor.status();
232 }
233 
235 {
236  DBG_ENTRY_LVL("ThreadPerConnectionSendTask", "execute", 6);
237 
238 
239 
240  switch (req.op_) {
241  case SEND_START:
242  link_->send_start_i();
243  break;
244  case SEND:
245  link_->send_i(req.element_);
246  break;
247  case SEND_STOP:
248  //DataLink::send_stop_i expects the RepoId of the message sender, however, in ThreadPerConnectionSendTask
249  //the control element will be a null element with only the op_ set. Thus pass in GUID_UNKNOWN which will
250  //allow send_stop to call send_delayed_notifications without a match. In the case of ThreadPerConnectionSendTask
251  //this is allowable because only one thread will be managing the sending thus no deadlock down in send_delayed_notifications()
253  break;
254  default:
255  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: ThreadPerConnectionSendTask::execute unknown command %d\n",
256  req.op_));
257  break;
258  }
259 }
260 
261 }
262 }
263 
X * get(void) const
#define ACE_DEBUG(X)
#define ACE_ERROR(X)
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
bool opened_
Flag used to avoid multiple open() calls.
bool notify_one()
Unblock one of the threads waiting on this condition.
virtual void send_i(TransportQueueElement *element, bool relink=true)
Definition: DataLink.inl:119
ACE_thread_t thr_self(void)
ACE_thread_t thr_id_
The id of the thread created by this task.
ACE_Guard< ACE_Thread_Mutex > lock_
bool shutdown_initiated_
Flag used to initiate a shutdown request to all worker threads.
ACE_thread_t NULL_thread
LM_DEBUG
virtual int svc()
The "mainline" executed by the worker thread.
virtual void execute(SendRequest &req)
Handle the request.
virtual int open(void *=0)
Activate the worker threads.
RemoveResult
used by DataLink::remove_sample(), TransportSendStrategy, *RemoveVisitor
Priority & transport_priority()
Definition: DataLink.inl:21
int thr_sigsetmask(int how, const sigset_t *nsm, sigset_t *osm)
ACE_Message_Block * cont(void) const
virtual int priority(void) const
void send_stop_i(GUID_t repoId)
Definition: DataLink.inl:147
size_t size() const
Accessor for the current number of elements in the queue.
Definition: BasicQueue_T.h:65
int thr_equal(ACE_thread_t t1, ACE_thread_t t2)
#define DBG_ENTRY(CNAME, MNAME)
Definition: EntryExit.h:72
bool notify_all()
Unblock all of the threads waiting on this condition.
virtual int wait(void)
ACE_TEXT("TCP_Factory")
map TRANSPORT_PRIORITY values directly.
RemoveResult remove_sample(const DataSampleElement *element)
Remove sample from the thread per connection queue.
CvStatus wait(ThreadStatusManager &thread_status_manager)
Block until thread is woken up.
#define SIG_SETMASK
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
RemoveResult status() const
True if the visitor found and removed the sample.
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
void accept_visitor(VisitorType &visitor) const
Definition: BasicQueue_T.h:74
#define ACE_ERROR_RETURN(X, Y)
virtual int activate(long flags=THR_NEW_LWP|THR_JOINABLE|THR_INHERIT_SCHED, int n_threads=1, int force_active=0, long priority=ACE_DEFAULT_THREAD_PRIORITY, int grp_id=-1, ACE_Task_Base *task=0, ACE_hthread_t thread_handles[]=0, void *stack[]=0, size_t stack_size[]=0, ACE_thread_t thread_ids[]=0, const char *thr_name[]=0)
typedef OPENDDS_VECTOR(ActionConnectionRecord) ConnectionRecords
#define TheServiceParticipant
int add_request(SendStrategyOpType op, TransportQueueElement *element=0)
LockType lock_
Lock to protect the "state" (all of the data members) of this object.
int sigfillset(sigset_t *s)
LM_ERROR
virtual int close(u_long flag=0)
Called when the thread exits.
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
DataLink * link_
The datalink to send the samples or control messages.
int put(T *elem)
Put a pointer to an element (T*) on to the queue.
Definition: BasicQueue_T.h:36
Base wrapper class around a data/control sample to be sent.