Execute the requests of sending a sample or control message. More...
#include <ThreadPerConnectionSendTask.h>
Public Member Functions | |
ThreadPerConnectionSendTask (DataLink *link) | |
virtual | ~ThreadPerConnectionSendTask () |
int | add_request (SendStrategyOpType op, TransportQueueElement *element=0) |
virtual int | open (void *=0) |
Activate the worker threads. | |
virtual int | svc () |
The "mainline" executed by the worker thread. | |
virtual int | close (u_long flag=0) |
Called when the thread exits. | |
RemoveResult | remove_sample (const DataSampleElement *element) |
Remove sample from the thread per connection queue. | |
Private Types | |
typedef ACE_SYNCH_MUTEX | LockType |
typedef ACE_Guard< LockType > | GuardType |
typedef ACE_Condition< LockType > | ConditionType |
typedef BasicQueue< SendRequest > | QueueType |
Private Member Functions | |
virtual void | execute (SendRequest &req) |
Handle the request. | |
Private Attributes | |
LockType | lock_ |
Lock to protect the "state" (all of the data members) of this object. | |
QueueType | queue_ |
The request queue. | |
ConditionType | work_available_ |
bool | shutdown_initiated_ |
Flag used to initiate a shutdown request to all worker threads. | |
bool | opened_ |
Flag used to avoid multiple open() calls. | |
ACE_thread_t | thr_id_ |
The id of the thread created by this task. | |
DataLink * | link_ |
The datalink to send the samples or control messages. |
Execute the requests of sending a sample or control message.
This task implements the request execute method which handles each step of sending a sample or control message.
Definition at line 58 of file ThreadPerConnectionSendTask.h.
typedef ACE_Condition<LockType> OpenDDS::DCPS::ThreadPerConnectionSendTask::ConditionType [private] |
Definition at line 88 of file ThreadPerConnectionSendTask.h.
typedef ACE_Guard<LockType> OpenDDS::DCPS::ThreadPerConnectionSendTask::GuardType [private] |
Definition at line 87 of file ThreadPerConnectionSendTask.h.
typedef ACE_SYNCH_MUTEX OpenDDS::DCPS::ThreadPerConnectionSendTask::LockType [private] |
Definition at line 86 of file ThreadPerConnectionSendTask.h.
typedef BasicQueue<SendRequest> OpenDDS::DCPS::ThreadPerConnectionSendTask::QueueType [private] |
Definition at line 90 of file ThreadPerConnectionSendTask.h.
OpenDDS::DCPS::ThreadPerConnectionSendTask::ThreadPerConnectionSendTask | ( | DataLink * | link | ) |
Definition at line 26 of file ThreadPerConnectionSendTask.cpp.
References DBG_ENTRY_LVL, and NULL_thread.
00027 : lock_() 00028 , work_available_(lock_) 00029 , shutdown_initiated_(false) 00030 , opened_(false) 00031 , thr_id_(ACE_OS::NULL_thread) 00032 , link_(link) 00033 { 00034 DBG_ENTRY_LVL("ThreadPerConnectionSendTask", "ThreadPerConnectionSendTask", 6); 00035 }
OpenDDS::DCPS::ThreadPerConnectionSendTask::~ThreadPerConnectionSendTask | ( | ) | [virtual] |
Definition at line 37 of file ThreadPerConnectionSendTask.cpp.
References DBG_ENTRY_LVL.
00038 { 00039 DBG_ENTRY_LVL("ThreadPerConnectionSendTask", "~ThreadPerConnectionSendTask", 6); 00040 }
int OpenDDS::DCPS::ThreadPerConnectionSendTask::add_request | ( | SendStrategyOpType | op, | |
TransportQueueElement * | element = 0 | |||
) |
Put the request to the request queue. Returns 0 if successful, -1 otherwise (it has been "rejected" or this task is shutdown).
Definition at line 42 of file ThreadPerConnectionSendTask.cpp.
References ACE_TEXT(), DBG_ENTRY, ACE_Auto_Basic_Ptr< X >::get(), LM_ERROR, lock_, OpenDDS::DCPS::BasicQueue< T >::put(), queue_, ACE_Auto_Basic_Ptr< X >::release(), shutdown_initiated_, ACE_Condition< MUTEX >::signal(), and work_available_.
00044 { 00045 DBG_ENTRY("ThreadPerConnectionSendTask", "add"); 00046 00047 ACE_Auto_Ptr<SendRequest> req(new SendRequest); 00048 req->op_ = op; 00049 req->element_ = element; 00050 00051 int result = -1; 00052 { // guard scope 00053 GuardType guard(this->lock_); 00054 00055 if (this->shutdown_initiated_) { 00056 return -1; 00057 } 00058 00059 result = this->queue_.put(req.get()); 00060 00061 if (result == 0) { 00062 this->work_available_.signal(); 00063 req.release(); 00064 00065 } else { 00066 ACE_ERROR((LM_ERROR, 00067 ACE_TEXT("(%P|%t) ERROR: ThreadPerConnectionSendTask::add %p\n"), 00068 ACE_TEXT("put"))); 00069 } 00070 } 00071 00072 return result; 00073 }
int OpenDDS::DCPS::ThreadPerConnectionSendTask::close | ( | u_long | flag = 0 |
) | [virtual] |
Called when the thread exits.
Reimplemented from ACE_Task_Base.
Definition at line 175 of file ThreadPerConnectionSendTask.cpp.
References DBG_ENTRY, lock_, opened_, shutdown_initiated_, ACE_Condition< MUTEX >::signal(), ACE_OS::thr_equal(), thr_id_, ACE_OS::thr_self(), ACE_Task_Base::wait(), and work_available_.
00176 { 00177 DBG_ENTRY("ThreadPerConnectionSendTask","close"); 00178 00179 if (flag == 0) { 00180 return 0; 00181 } 00182 00183 { 00184 GuardType guard(this->lock_); 00185 00186 if (this->shutdown_initiated_) { 00187 return 0; 00188 } 00189 00190 // Set the shutdown flag to true. 00191 this->shutdown_initiated_ = true; 00192 this->work_available_.signal(); 00193 } 00194 00195 if (this->opened_ && !ACE_OS::thr_equal(this->thr_id_, ACE_OS::thr_self())) { 00196 this->wait(); 00197 } 00198 00199 return 0; 00200 }
void OpenDDS::DCPS::ThreadPerConnectionSendTask::execute | ( | SendRequest & | req | ) | [private, virtual] |
Handle the request.
Definition at line 217 of file ThreadPerConnectionSendTask.cpp.
References DBG_ENTRY_LVL, OpenDDS::DCPS::SendRequest::element_, OpenDDS::DCPS::GUID_UNKNOWN, link_, LM_ERROR, OpenDDS::DCPS::SendRequest::op_, OpenDDS::DCPS::SEND, OpenDDS::DCPS::DataLink::send_i(), OpenDDS::DCPS::SEND_START, OpenDDS::DCPS::DataLink::send_start_i(), OpenDDS::DCPS::SEND_STOP, and OpenDDS::DCPS::DataLink::send_stop_i().
Referenced by svc().
00218 { 00219 DBG_ENTRY_LVL("ThreadPerConnectionSendTask", "execute", 6); 00220 00221 00222 00223 switch (req.op_) { 00224 case SEND_START: 00225 this->link_->send_start_i(); 00226 break; 00227 case SEND: 00228 this->link_->send_i(req.element_); 00229 break; 00230 case SEND_STOP: 00231 //DataLink::send_stop_i expects the RepoId of the message sender, however, in ThreadPerConnectionSendTask 00232 //the control element will be a null element with only the op_ set. Thus pass in GUID_UNKNOWN which will 00233 //allow send_stop to call send_delayed_notifications without a match. In the case of ThreadPerConnectionSendTask 00234 //this is allowable because only one thread will be managing the sending thus no deadlock down in send_delayed_notifications() 00235 this->link_->send_stop_i(GUID_UNKNOWN); 00236 break; 00237 default: 00238 ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: ThreadPerConnectionSendTask::execute unknown command %d\n", 00239 req.op_)); 00240 break; 00241 } 00242 }
int OpenDDS::DCPS::ThreadPerConnectionSendTask::open | ( | void * | = 0 |
) | [virtual] |
Activate the worker threads.
Reimplemented from ACE_Task_Base.
Definition at line 75 of file ThreadPerConnectionSendTask.cpp.
References ACE_TEXT(), ACE_Task_Base::activate(), DBG_ENTRY, OpenDDS::DCPS::DCPS_debug_level, link_, LM_DEBUG, LM_ERROR, lock_, opened_, ACE_Event_Handler::priority(), TheServiceParticipant, and OpenDDS::DCPS::DataLink::transport_priority().
00076 { 00077 DBG_ENTRY("ThreadPerConnectionSendTask", "open"); 00078 00079 GuardType guard(this->lock_); 00080 00081 // We can assume that we are in the proper state to handle this open() 00082 // call as long as we haven't been open()'ed before. 00083 if (this->opened_) { 00084 ACE_ERROR_RETURN((LM_ERROR, 00085 "(%P|%t) ThreadPerConnectionSendTask failed to open. " 00086 "Task has previously been open()'ed.\n"), 00087 -1); 00088 } 00089 00090 DirectPriorityMapper mapper(this->link_->transport_priority()); 00091 int priority = mapper.thread_priority(); 00092 00093 long flags = THR_NEW_LWP | THR_JOINABLE ;//|THR_SCOPE_PROCESS | THR_SCOPE_THREAD; 00094 int policy = TheServiceParticipant->scheduler(); 00095 00096 if (policy >= 0) { 00097 flags |= policy; 00098 } else { 00099 flags |= THR_INHERIT_SCHED; 00100 } 00101 00102 if (DCPS_debug_level > 0) { 00103 ACE_DEBUG((LM_DEBUG, 00104 ACE_TEXT("(%P|%t) ThreadPerConnectionSendTask::open(): ") 00105 ACE_TEXT("activating thread with flags 0x%08.8x ") 00106 ACE_TEXT("and priority %d.\n"), 00107 flags, 00108 priority)); 00109 } 00110 00111 // Activate this task object with one worker thread. 00112 if (this->activate(flags, 1, 0, priority) != 0) { 00113 // Assumes that when activate returns non-zero return code that 00114 // no threads were activated. 00115 ACE_ERROR_RETURN((LM_ERROR, 00116 "(%P|%t) ThreadPerConnectionSendTask failed to activate " 00117 "the worker threads.\n"), 00118 -1); 00119 } 00120 00121 // Now we have past the point where we can say we've been open()'ed before. 00122 this->opened_ = true; 00123 00124 return 0; 00125 }
RemoveResult OpenDDS::DCPS::ThreadPerConnectionSendTask::remove_sample | ( | const DataSampleElement * | element | ) |
Remove sample from the thread per connection queue.
Definition at line 203 of file ThreadPerConnectionSendTask.cpp.
References OpenDDS::DCPS::BasicQueue< T >::accept_visitor(), ACE_Message_Block::cont(), DBG_ENTRY, OpenDDS::DCPS::DataSampleElement::get_sample(), lock_, queue_, and OpenDDS::DCPS::ThreadPerConRemoveVisitor::status().
00204 { 00205 DBG_ENTRY("ThreadPerConnectionSendTask", "remove_sample"); 00206 00207 GuardType guard(this->lock_); 00208 00209 ACE_Message_Block* payload = element->get_sample()->cont(); 00210 ThreadPerConRemoveVisitor visitor(payload); 00211 00212 this->queue_.accept_visitor(visitor); 00213 00214 return visitor.status(); 00215 }
int OpenDDS::DCPS::ThreadPerConnectionSendTask::svc | ( | void | ) | [virtual] |
The "mainline" executed by the worker thread.
Reimplemented from ACE_Task_Base.
Definition at line 127 of file ThreadPerConnectionSendTask.cpp.
References DBG_ENTRY_LVL, execute(), OpenDDS::DCPS::BasicQueue< T >::get(), lock_, queue_, shutdown_initiated_, SIG_SETMASK, ACE_OS::sigfillset(), sigset_t, OpenDDS::DCPS::BasicQueue< T >::size(), thr_id_, ACE_OS::thr_self(), ACE_OS::thr_sigsetmask(), ACE_Condition< MUTEX >::wait(), and work_available_.
00128 { 00129 DBG_ENTRY_LVL("ThreadPerConnectionSendTask", "svc", 6); 00130 00131 this->thr_id_ = ACE_OS::thr_self(); 00132 00133 // Ignore all signals to avoid 00134 // ERROR: <something descriptive> Interrupted system call 00135 // The main thread will handle signals. 00136 sigset_t set; 00137 ACE_OS::sigfillset(&set); 00138 ACE_OS::thr_sigsetmask(SIG_SETMASK, &set, NULL); 00139 00140 // Start the "GetWork-And-PerformWork" loop for the current worker thread. 00141 while (!this->shutdown_initiated_) { 00142 SendRequest* req; 00143 { 00144 GuardType guard(this->lock_); 00145 00146 if (this->queue_.size() == 0) { 00147 this->work_available_.wait(); 00148 } 00149 00150 if (this->shutdown_initiated_) { 00151 break; 00152 } 00153 00154 req = queue_.get(); 00155 00156 if (req == 0) { 00157 //I'm not sure why this thread got more signals than actual signals 00158 //when using thread_per_connection and the user application thread 00159 //send requests without interval. We just need ignore the dequeue 00160 //failure. 00161 //ACE_ERROR ((LM_ERROR, "(%P|%t) ERROR: ThreadPerConnectionSendTask::svc %p\n", 00162 // ACE_TEXT("dequeue_head"))); 00163 continue; 00164 } 00165 } 00166 00167 this->execute(*req); 00168 delete req; 00169 } 00170 00171 // This will never get executed. 00172 return 0; 00173 }
The datalink to send the samples or control messages.
Definition at line 114 of file ThreadPerConnectionSendTask.h.
Lock to protect the "state" (all of the data members) of this object.
Definition at line 93 of file ThreadPerConnectionSendTask.h.
Referenced by add_request(), close(), open(), remove_sample(), and svc().
bool OpenDDS::DCPS::ThreadPerConnectionSendTask::opened_ [private] |
Flag used to avoid multiple open() calls.
Definition at line 108 of file ThreadPerConnectionSendTask.h.
The request queue.
Definition at line 96 of file ThreadPerConnectionSendTask.h.
Referenced by add_request(), remove_sample(), and svc().
Flag used to initiate a shutdown request to all worker threads.
Definition at line 105 of file ThreadPerConnectionSendTask.h.
Referenced by add_request(), close(), and svc().
The id of the thread created by this task.
Definition at line 111 of file ThreadPerConnectionSendTask.h.
Condition used to signal the worker threads that they may be able to find a request in the queue_ that needs to be executed. This condition will be signal()'ed each time a request is added to the queue_, and also when this task is shutdown.
Definition at line 102 of file ThreadPerConnectionSendTask.h.
Referenced by add_request(), close(), and svc().