#include <ThreadPerConnectionSendTask.h>
Inheritance diagram for OpenDDS::DCPS::ThreadPerConnectionSendTask:
Public Member Functions | |
ThreadPerConnectionSendTask (DataLink *link) | |
Constructor. | |
virtual | ~ThreadPerConnectionSendTask () |
Virtual Destructor. | |
int | add_request (SendStrategyOpType op, TransportQueueElement *element=0) |
virtual int | open (void *=0) |
Activate the worker threads. | |
virtual int | svc () |
The "mainline" executed by the worker thread. | |
virtual int | close (u_long flag=0) |
Called when the thread exits. | |
RemoveResult | remove_sample (const DataSampleElement *element) |
Remove sample from the thread per connection queue. | |
Private Types | |
typedef ACE_SYNCH_MUTEX | LockType |
typedef ACE_Guard< LockType > | GuardType |
typedef ACE_Condition< LockType > | ConditionType |
typedef BasicQueue< SendRequest > | QueueType |
Private Member Functions | |
virtual void | execute (SendRequest &req) |
Handle the request. | |
Private Attributes | |
LockType | lock_ |
Lock to protect the "state" (all of the data members) of this object. | |
QueueType | queue_ |
The request queue. | |
ConditionType | work_available_ |
bool | shutdown_initiated_ |
Flag used to initiate a shutdown request to all worker threads. | |
bool | opened_ |
Flag used to avoid multiple open() calls. | |
ACE_thread_t | thr_id_ |
The id of the thread created by this task. | |
DataLink * | link_ |
The datalink to send the samples or control messages. |
This task implements the request execute method which handles each step of sending a sample or control message.
Definition at line 55 of file ThreadPerConnectionSendTask.h.
typedef ACE_Condition<LockType> OpenDDS::DCPS::ThreadPerConnectionSendTask::ConditionType [private] |
Definition at line 88 of file ThreadPerConnectionSendTask.h.
typedef ACE_Guard<LockType> OpenDDS::DCPS::ThreadPerConnectionSendTask::GuardType [private] |
Definition at line 87 of file ThreadPerConnectionSendTask.h.
typedef ACE_SYNCH_MUTEX OpenDDS::DCPS::ThreadPerConnectionSendTask::LockType [private] |
Definition at line 86 of file ThreadPerConnectionSendTask.h.
typedef BasicQueue<SendRequest> OpenDDS::DCPS::ThreadPerConnectionSendTask::QueueType [private] |
Definition at line 90 of file ThreadPerConnectionSendTask.h.
OpenDDS::DCPS::ThreadPerConnectionSendTask::ThreadPerConnectionSendTask | ( | DataLink * | link | ) |
Constructor.
Definition at line 24 of file ThreadPerConnectionSendTask.cpp.
References DBG_ENTRY_LVL.
00025 : lock_() 00026 , queue_(1, 10) 00027 , work_available_(lock_) 00028 , shutdown_initiated_(false) 00029 , opened_(false) 00030 , thr_id_(ACE_OS::NULL_thread) 00031 , link_(link) 00032 { 00033 DBG_ENTRY_LVL("ThreadPerConnectionSendTask", "ThreadPerConnectionSendTask", 6); 00034 }
OpenDDS::DCPS::ThreadPerConnectionSendTask::~ThreadPerConnectionSendTask | ( | ) | [virtual] |
Virtual Destructor.
Definition at line 36 of file ThreadPerConnectionSendTask.cpp.
References DBG_ENTRY_LVL.
00037 { 00038 DBG_ENTRY_LVL("ThreadPerConnectionSendTask", "~ThreadPerConnectionSendTask", 6); 00039 }
int OpenDDS::DCPS::ThreadPerConnectionSendTask::add_request | ( | SendStrategyOpType | op, | |
TransportQueueElement * | element = 0 | |||
) |
Put the request to the request queue. Returns 0 if successful, -1 otherwise (it has been "rejected" or this task is shutdown).
Definition at line 41 of file ThreadPerConnectionSendTask.cpp.
References DBG_ENTRY, OpenDDS::DCPS::BasicQueue< T >::put(), queue_, and work_available_.
Referenced by OpenDDS::DCPS::DataLink::send(), OpenDDS::DCPS::DataLink::send_start(), and OpenDDS::DCPS::DataLink::send_stop().
00043 { 00044 DBG_ENTRY("ThreadPerConnectionSendTask", "add"); 00045 00046 ACE_Auto_Ptr<SendRequest> req(new SendRequest); 00047 req->op_ = op; 00048 req->element_ = element; 00049 00050 int result = -1; 00051 { // guard scope 00052 GuardType guard(this->lock_); 00053 00054 if (this->shutdown_initiated_) { 00055 return -1; 00056 } 00057 00058 result = this->queue_.put(req.get()); 00059 00060 if (result == 0) { 00061 this->work_available_.signal(); 00062 req.release(); 00063 00064 } else { 00065 ACE_ERROR((LM_ERROR, 00066 ACE_TEXT("(%P|%t) ERROR: ThreadPerConnectionSendTask::add %p\n"), 00067 ACE_TEXT("put"))); 00068 } 00069 } 00070 00071 return result; 00072 }
int OpenDDS::DCPS::ThreadPerConnectionSendTask::close | ( | u_long | flag = 0 |
) | [virtual] |
Called when the thread exits.
Definition at line 174 of file ThreadPerConnectionSendTask.cpp.
References DBG_ENTRY, shutdown_initiated_, and work_available_.
Referenced by OpenDDS::DCPS::DataLink::pre_stop_i(), and OpenDDS::DCPS::DataLink::~DataLink().
00175 { 00176 DBG_ENTRY("ThreadPerConnectionSendTask","close"); 00177 00178 if (flag == 0) { 00179 return 0; 00180 } 00181 00182 { 00183 GuardType guard(this->lock_); 00184 00185 if (this->shutdown_initiated_) { 00186 return 0; 00187 } 00188 00189 // Set the shutdown flag to true. 00190 this->shutdown_initiated_ = true; 00191 this->work_available_.signal(); 00192 } 00193 00194 if (this->opened_ && !ACE_OS::thr_equal(this->thr_id_, ACE_OS::thr_self())) { 00195 this->wait(); 00196 } 00197 00198 return 0; 00199 }
void OpenDDS::DCPS::ThreadPerConnectionSendTask::execute | ( | SendRequest & | req | ) | [private, virtual] |
Handle the request.
Definition at line 216 of file ThreadPerConnectionSendTask.cpp.
References DBG_ENTRY_LVL, OpenDDS::DCPS::SendRequest::element_, OpenDDS::DCPS::GUID_UNKNOWN, link_, OpenDDS::DCPS::SendRequest::op_, OpenDDS::DCPS::SEND, OpenDDS::DCPS::DataLink::send_i(), OpenDDS::DCPS::SEND_START, OpenDDS::DCPS::DataLink::send_start_i(), OpenDDS::DCPS::SEND_STOP, and OpenDDS::DCPS::DataLink::send_stop_i().
Referenced by svc().
00217 { 00218 DBG_ENTRY_LVL("ThreadPerConnectionSendTask", "execute", 6); 00219 00220 00221 00222 switch (req.op_) { 00223 case SEND_START: 00224 this->link_->send_start_i(); 00225 break; 00226 case SEND: 00227 this->link_->send_i(req.element_); 00228 break; 00229 case SEND_STOP: 00230 //DataLink::send_stop_i expects the RepoId of the message sender, however, in ThreadPerConnectionSendTask 00231 //the control element will be a null element with only the op_ set. Thus pass in GUID_UNKNOWN which will 00232 //allow send_stop to call send_delayed_notifications without a match. In the case of ThreadPerConnectionSendTask 00233 //this is allowable because only one thread will be managing the sending thus no deadlock down in send_delayed_notifications() 00234 this->link_->send_stop_i(GUID_UNKNOWN); 00235 break; 00236 default: 00237 ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: ThreadPerConnectionSendTask::execute unknown command %d\n", 00238 req.op_)); 00239 break; 00240 } 00241 }
int OpenDDS::DCPS::ThreadPerConnectionSendTask::open | ( | void * | = 0 |
) | [virtual] |
Activate the worker threads.
Definition at line 74 of file ThreadPerConnectionSendTask.cpp.
References DBG_ENTRY, OpenDDS::DCPS::DCPS_debug_level, opened_, and TheServiceParticipant.
00075 { 00076 DBG_ENTRY("ThreadPerConnectionSendTask", "open"); 00077 00078 GuardType guard(this->lock_); 00079 00080 // We can assume that we are in the proper state to handle this open() 00081 // call as long as we haven't been open()'ed before. 00082 if (this->opened_) { 00083 ACE_ERROR_RETURN((LM_ERROR, 00084 "(%P|%t) ThreadPerConnectionSendTask failed to open. " 00085 "Task has previously been open()'ed.\n"), 00086 -1); 00087 } 00088 00089 DirectPriorityMapper mapper(this->link_->transport_priority()); 00090 int priority = mapper.thread_priority(); 00091 00092 long flags = THR_NEW_LWP | THR_JOINABLE ;//|THR_SCOPE_PROCESS | THR_SCOPE_THREAD; 00093 int policy = TheServiceParticipant->scheduler(); 00094 00095 if (policy >= 0) { 00096 flags |= policy; 00097 } else { 00098 flags |= THR_INHERIT_SCHED; 00099 } 00100 00101 if (DCPS_debug_level > 0) { 00102 ACE_DEBUG((LM_DEBUG, 00103 ACE_TEXT("(%P|%t) ThreadPerConnectionSendTask::open(): ") 00104 ACE_TEXT("activating thread with flags 0x%08.8x ") 00105 ACE_TEXT("and priority %d.\n"), 00106 flags, 00107 priority)); 00108 } 00109 00110 // Activate this task object with one worker thread. 00111 if (this->activate(flags, 1, 0, priority) != 0) { 00112 // Assumes that when activate returns non-zero return code that 00113 // no threads were activated. 00114 ACE_ERROR_RETURN((LM_ERROR, 00115 "(%P|%t) ThreadPerConnectionSendTask failed to activate " 00116 "the worker threads.\n"), 00117 -1); 00118 } 00119 00120 // Now we have past the point where we can say we've been open()'ed before. 00121 this->opened_ = true; 00122 00123 return 0; 00124 }
RemoveResult OpenDDS::DCPS::ThreadPerConnectionSendTask::remove_sample | ( | const DataSampleElement * | element | ) |
Remove sample from the thread per connection queue.
Definition at line 202 of file ThreadPerConnectionSendTask.cpp.
References OpenDDS::DCPS::BasicQueue< T >::accept_visitor(), DBG_ENTRY, OpenDDS::DCPS::DataSampleElement::get_sample(), queue_, and OpenDDS::DCPS::ThreadPerConRemoveVisitor::status().
Referenced by OpenDDS::DCPS::DataLink::remove_sample().
00203 { 00204 DBG_ENTRY("ThreadPerConnectionSendTask", "remove_sample"); 00205 00206 GuardType guard(this->lock_); 00207 00208 ACE_Message_Block* payload = element->get_sample()->cont(); 00209 ThreadPerConRemoveVisitor visitor(payload); 00210 00211 this->queue_.accept_visitor(visitor); 00212 00213 return visitor.status(); 00214 }
int OpenDDS::DCPS::ThreadPerConnectionSendTask::svc | ( | ) | [virtual] |
The "mainline" executed by the worker thread.
Definition at line 126 of file ThreadPerConnectionSendTask.cpp.
References DBG_ENTRY_LVL, execute(), OpenDDS::DCPS::BasicQueue< T >::get(), queue_, and thr_id_.
00127 { 00128 DBG_ENTRY_LVL("ThreadPerConnectionSendTask", "svc", 6); 00129 00130 this->thr_id_ = ACE_OS::thr_self(); 00131 00132 // Ignore all signals to avoid 00133 // ERROR: <something descriptive> Interrupted system call 00134 // The main thread will handle signals. 00135 sigset_t set; 00136 ACE_OS::sigfillset(&set); 00137 ACE_OS::thr_sigsetmask(SIG_SETMASK, &set, NULL); 00138 00139 // Start the "GetWork-And-PerformWork" loop for the current worker thread. 00140 while (!this->shutdown_initiated_) { 00141 SendRequest* req; 00142 { 00143 GuardType guard(this->lock_); 00144 00145 if (this->queue_.size() == 0) { 00146 this->work_available_.wait(); 00147 } 00148 00149 if (this->shutdown_initiated_) { 00150 break; 00151 } 00152 00153 req = queue_.get(); 00154 00155 if (req == 0) { 00156 //I'm not sure why this thread got more signals than actual signals 00157 //when using thread_per_connection and the user application thread 00158 //send requests without interval. We just need ignore the dequeue 00159 //failure. 00160 //ACE_ERROR ((LM_ERROR, "(%P|%t) ERROR: ThreadPerConnectionSendTask::svc %p\n", 00161 // ACE_TEXT("dequeue_head"))); 00162 continue; 00163 } 00164 } 00165 00166 this->execute(*req); 00167 delete req; 00168 } 00169 00170 // This will never get executed. 00171 return 0; 00172 }
The datalink to send the samples or control messages.
Definition at line 114 of file ThreadPerConnectionSendTask.h.
Referenced by execute().
Lock to protect the "state" (all of the data members) of this object.
Definition at line 93 of file ThreadPerConnectionSendTask.h.
bool OpenDDS::DCPS::ThreadPerConnectionSendTask::opened_ [private] |
Flag used to avoid multiple open() calls.
Definition at line 108 of file ThreadPerConnectionSendTask.h.
Referenced by open().
The request queue.
Definition at line 96 of file ThreadPerConnectionSendTask.h.
Referenced by add_request(), remove_sample(), and svc().
Flag used to initiate a shutdown request to all worker threads.
Definition at line 105 of file ThreadPerConnectionSendTask.h.
Referenced by close().
ACE_thread_t OpenDDS::DCPS::ThreadPerConnectionSendTask::thr_id_ [private] |
The id of the thread created by this task.
Definition at line 111 of file ThreadPerConnectionSendTask.h.
Referenced by svc().
Condition used to signal the worker threads that they may be able to find a request in the queue_ that needs to be executed. This condition will be signal()'ed each time a request is added to the queue_, and also when this task is shutdown.
Definition at line 102 of file ThreadPerConnectionSendTask.h.
Referenced by add_request(), and close().