Line data Source code
1 : /*
2 : *
3 : *
4 : * Distributed under the OpenDDS License.
5 : * See: http://www.opendds.org/license.html
6 : */
7 :
8 : #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
9 : #include "ThreadPerConnectionSendTask.h"
10 : #include "TransportQueueElement.h"
11 : #include "TransportSendElement.h"
12 : #include "DataLink.h"
13 : #include "ThreadPerConRemoveVisitor.h"
14 : #include "DirectPriorityMapper.h"
15 : #include "EntryExit.h"
16 : #include "dds/DCPS/DataSampleElement.h"
17 : #include "dds/DCPS/Service_Participant.h"
18 :
19 : #include "ace/Auto_Ptr.h"
20 :
21 : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
22 :
23 : namespace OpenDDS {
24 : namespace DCPS {
25 :
26 0 : ThreadPerConnectionSendTask::ThreadPerConnectionSendTask(DataLink* link)
27 0 : : lock_()
28 0 : , work_available_(lock_)
29 0 : , shutdown_initiated_(false)
30 0 : , opened_(false)
31 0 : , thr_id_(ACE_OS::NULL_thread)
32 0 : , link_(link)
33 : {
34 : DBG_ENTRY_LVL("ThreadPerConnectionSendTask", "ThreadPerConnectionSendTask", 6);
35 0 : }
36 :
37 0 : ThreadPerConnectionSendTask::~ThreadPerConnectionSendTask()
38 : {
39 : DBG_ENTRY_LVL("ThreadPerConnectionSendTask", "~ThreadPerConnectionSendTask", 6);
40 0 : }
41 :
42 0 : int ThreadPerConnectionSendTask::add_request(SendStrategyOpType op,
43 : TransportQueueElement* element)
44 : {
45 : DBG_ENTRY("ThreadPerConnectionSendTask", "add");
46 :
47 0 : ACE_Auto_Ptr<SendRequest> req(new SendRequest);
48 0 : req->op_ = op;
49 0 : req->element_ = element;
50 :
51 0 : int result = -1;
52 : { // guard scope
53 0 : GuardType guard(lock_);
54 :
55 0 : if (shutdown_initiated_) {
56 0 : return -1;
57 : }
58 :
59 0 : result = queue_.put(req.get());
60 :
61 0 : if (result == 0 && op == SEND_STOP) {
62 0 : work_available_.notify_one();
63 : }
64 0 : }
65 :
66 0 : if (result == 0) {
67 0 : req.release();
68 : } else {
69 0 : ACE_ERROR((LM_ERROR,
70 : ACE_TEXT("(%P|%t) ERROR: ThreadPerConnectionSendTask::add %p\n"),
71 : ACE_TEXT("put")));
72 : }
73 :
74 0 : return result;
75 0 : }
76 :
77 0 : int ThreadPerConnectionSendTask::open(void*)
78 : {
79 : DBG_ENTRY("ThreadPerConnectionSendTask", "open");
80 :
81 0 : GuardType guard(lock_);
82 :
83 : // We can assume that we are in the proper state to handle this open()
84 : // call as long as we haven't been open()'ed before.
85 0 : if (opened_) {
86 0 : ACE_ERROR_RETURN((LM_ERROR,
87 : "(%P|%t) ThreadPerConnectionSendTask failed to open. "
88 : "Task has previously been open()'ed.\n"),
89 : -1);
90 : }
91 :
92 0 : DirectPriorityMapper mapper(link_->transport_priority());
93 0 : int priority = mapper.thread_priority();
94 :
95 0 : long flags = THR_NEW_LWP | THR_JOINABLE ;//|THR_SCOPE_PROCESS | THR_SCOPE_THREAD;
96 0 : long policy = TheServiceParticipant->scheduler();
97 :
98 0 : if (policy >= 0) {
99 0 : flags |= policy;
100 : } else {
101 0 : flags |= THR_INHERIT_SCHED;
102 : }
103 :
104 0 : if (DCPS_debug_level > 0) {
105 0 : ACE_DEBUG((LM_DEBUG,
106 : ACE_TEXT("(%P|%t) ThreadPerConnectionSendTask::open(): ")
107 : ACE_TEXT("activating thread with flags 0x%08.8x ")
108 : ACE_TEXT("and priority %d.\n"),
109 : flags,
110 : priority));
111 : }
112 :
113 : // Activate this task object with one worker thread.
114 0 : if (activate(flags, 1, 0, priority) != 0) {
115 : // Assumes that when activate returns non-zero return code that
116 : // no threads were activated.
117 0 : ACE_ERROR_RETURN((LM_ERROR,
118 : "(%P|%t) ThreadPerConnectionSendTask failed to activate "
119 : "the worker threads.\n"),
120 : -1);
121 : }
122 :
123 : // Now we have past the point where we can say we've been open()'ed before.
124 0 : opened_ = true;
125 :
126 0 : return 0;
127 0 : }
128 :
129 0 : int ThreadPerConnectionSendTask::svc()
130 : {
131 0 : ThreadStatusManager& thread_status_manager = TheServiceParticipant->get_thread_status_manager();
132 0 : ThreadStatusManager::Start s(thread_status_manager, "ThreadPerConnectionSendTask");
133 :
134 : DBG_ENTRY_LVL("ThreadPerConnectionSendTask", "svc", 6);
135 :
136 0 : thr_id_ = ACE_OS::thr_self();
137 :
138 : // Ignore all signals to avoid
139 : // ERROR: <something descriptive> Interrupted system call
140 : // The main thread will handle signals.
141 : sigset_t set;
142 0 : ACE_OS::sigfillset(&set);
143 0 : ACE_OS::thr_sigsetmask(SIG_SETMASK, &set, NULL);
144 :
145 : SendRequest* req;
146 0 : OPENDDS_VECTOR(SendRequest*) reqs;
147 :
148 0 : ACE_Reverse_Lock<LockType> rev_lock(lock_);
149 0 : GuardType guard(lock_);
150 :
151 : // Start the "GetWork-And-PerformWork" loop for the current worker thread.
152 0 : while (!shutdown_initiated_) {
153 :
154 0 : if (queue_.size() == 0) {
155 0 : work_available_.wait(thread_status_manager);
156 : }
157 :
158 0 : if (shutdown_initiated_) {
159 0 : break;
160 : }
161 :
162 0 : while (queue_.size() != 0 && (reqs.empty() || reqs.back()->op_ != SEND_STOP)) {
163 0 : req = queue_.get();
164 :
165 0 : if (req == 0) {
166 : //I'm not sure why this thread got more signals than actual signals
167 : //when using thread_per_connection and the user application thread
168 : //send requests without interval. We just need ignore the dequeue
169 : //failure.
170 : //ACE_ERROR ((LM_ERROR, "(%P|%t) ERROR: ThreadPerConnectionSendTask::svc %p\n",
171 : // ACE_TEXT("dequeue_head")));
172 0 : continue;
173 : }
174 0 : reqs.push_back(req);
175 : }
176 :
177 0 : ACE_Guard<ACE_Reverse_Lock<LockType> > rev_guard(rev_lock);
178 :
179 0 : if (!reqs.empty() && reqs.back()->op_ == SEND_STOP) {
180 0 : for (size_t i = 0; i < reqs.size(); ++i) {
181 0 : execute(*reqs[i]);
182 0 : delete reqs[i];
183 : }
184 0 : reqs.clear();
185 : }
186 0 : }
187 :
188 0 : return 0;
189 0 : }
190 :
191 0 : int ThreadPerConnectionSendTask::close(u_long flag)
192 : {
193 : DBG_ENTRY("ThreadPerConnectionSendTask","close");
194 :
195 0 : if (flag == 0) {
196 0 : return 0;
197 : }
198 :
199 : {
200 0 : GuardType guard(lock_);
201 :
202 0 : if (shutdown_initiated_) {
203 0 : return 0;
204 : }
205 :
206 : // Set the shutdown flag to true.
207 0 : shutdown_initiated_ = true;
208 0 : work_available_.notify_all();
209 0 : }
210 :
211 0 : if (opened_ && !ACE_OS::thr_equal(thr_id_, ACE_OS::thr_self())) {
212 0 : ThreadStatusManager::Sleeper s(TheServiceParticipant->get_thread_status_manager());
213 0 : wait();
214 0 : }
215 :
216 0 : return 0;
217 : }
218 :
219 : RemoveResult
220 0 : ThreadPerConnectionSendTask::remove_sample(const DataSampleElement* element)
221 : {
222 : DBG_ENTRY("ThreadPerConnectionSendTask", "remove_sample");
223 :
224 0 : ACE_Message_Block* payload = element->get_sample()->cont();
225 0 : ThreadPerConRemoveVisitor visitor(payload);
226 :
227 0 : GuardType guard(lock_);
228 :
229 0 : queue_.accept_visitor(visitor);
230 :
231 0 : return visitor.status();
232 0 : }
233 :
234 0 : void ThreadPerConnectionSendTask::execute(SendRequest& req)
235 : {
236 : DBG_ENTRY_LVL("ThreadPerConnectionSendTask", "execute", 6);
237 :
238 :
239 :
240 0 : switch (req.op_) {
241 0 : case SEND_START:
242 0 : link_->send_start_i();
243 0 : break;
244 0 : case SEND:
245 0 : link_->send_i(req.element_);
246 0 : break;
247 0 : case SEND_STOP:
248 : //DataLink::send_stop_i expects the RepoId of the message sender, however, in ThreadPerConnectionSendTask
249 : //the control element will be a null element with only the op_ set. Thus pass in GUID_UNKNOWN which will
250 : //allow send_stop to call send_delayed_notifications without a match. In the case of ThreadPerConnectionSendTask
251 : //this is allowable because only one thread will be managing the sending thus no deadlock down in send_delayed_notifications()
252 0 : link_->send_stop_i(GUID_UNKNOWN);
253 0 : break;
254 0 : default:
255 0 : ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: ThreadPerConnectionSendTask::execute unknown command %d\n",
256 : req.op_));
257 0 : break;
258 : }
259 0 : }
260 :
261 : }
262 : }
263 :
264 : OPENDDS_END_VERSIONED_NAMESPACE_DECL
|