ThreadPerConnectionSendTask.cpp
Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008 #include "DCPS/DdsDcps_pch.h"
00009 #include "ThreadPerConnectionSendTask.h"
00010 #include "TransportQueueElement.h"
00011 #include "TransportSendElement.h"
00012 #include "DataLink.h"
00013 #include "ThreadPerConRemoveVisitor.h"
00014 #include "DirectPriorityMapper.h"
00015 #include "dds/DCPS/transport/framework/EntryExit.h"
00016 #include "dds/DCPS/DataSampleElement.h"
00017 #include "dds/DCPS/Service_Participant.h"
00018
00019 #include "ace/Auto_Ptr.h"
00020
00021 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00022
00023 namespace OpenDDS {
00024 namespace DCPS {
00025
00026 ThreadPerConnectionSendTask::ThreadPerConnectionSendTask(DataLink* link)
00027 : lock_()
00028 , work_available_(lock_)
00029 , shutdown_initiated_(false)
00030 , opened_(false)
00031 , thr_id_(ACE_OS::NULL_thread)
00032 , link_(link)
00033 {
00034 DBG_ENTRY_LVL("ThreadPerConnectionSendTask", "ThreadPerConnectionSendTask", 6);
00035 }
00036
00037 ThreadPerConnectionSendTask::~ThreadPerConnectionSendTask()
00038 {
00039 DBG_ENTRY_LVL("ThreadPerConnectionSendTask", "~ThreadPerConnectionSendTask", 6);
00040 }
00041
00042 int ThreadPerConnectionSendTask::add_request(SendStrategyOpType op,
00043 TransportQueueElement* element)
00044 {
00045 DBG_ENTRY("ThreadPerConnectionSendTask", "add");
00046
00047 ACE_Auto_Ptr<SendRequest> req(new SendRequest);
00048 req->op_ = op;
00049 req->element_ = element;
00050
00051 int result = -1;
00052 {
00053 GuardType guard(this->lock_);
00054
00055 if (this->shutdown_initiated_) {
00056 return -1;
00057 }
00058
00059 result = this->queue_.put(req.get());
00060
00061 if (result == 0) {
00062 this->work_available_.signal();
00063 req.release();
00064
00065 } else {
00066 ACE_ERROR((LM_ERROR,
00067 ACE_TEXT("(%P|%t) ERROR: ThreadPerConnectionSendTask::add %p\n"),
00068 ACE_TEXT("put")));
00069 }
00070 }
00071
00072 return result;
00073 }
00074
00075 int ThreadPerConnectionSendTask::open(void*)
00076 {
00077 DBG_ENTRY("ThreadPerConnectionSendTask", "open");
00078
00079 GuardType guard(this->lock_);
00080
00081
00082
00083 if (this->opened_) {
00084 ACE_ERROR_RETURN((LM_ERROR,
00085 "(%P|%t) ThreadPerConnectionSendTask failed to open. "
00086 "Task has previously been open()'ed.\n"),
00087 -1);
00088 }
00089
00090 DirectPriorityMapper mapper(this->link_->transport_priority());
00091 int priority = mapper.thread_priority();
00092
00093 long flags = THR_NEW_LWP | THR_JOINABLE ;
00094 int policy = TheServiceParticipant->scheduler();
00095
00096 if (policy >= 0) {
00097 flags |= policy;
00098 } else {
00099 flags |= THR_INHERIT_SCHED;
00100 }
00101
00102 if (DCPS_debug_level > 0) {
00103 ACE_DEBUG((LM_DEBUG,
00104 ACE_TEXT("(%P|%t) ThreadPerConnectionSendTask::open(): ")
00105 ACE_TEXT("activating thread with flags 0x%08.8x ")
00106 ACE_TEXT("and priority %d.\n"),
00107 flags,
00108 priority));
00109 }
00110
00111
00112 if (this->activate(flags, 1, 0, priority) != 0) {
00113
00114
00115 ACE_ERROR_RETURN((LM_ERROR,
00116 "(%P|%t) ThreadPerConnectionSendTask failed to activate "
00117 "the worker threads.\n"),
00118 -1);
00119 }
00120
00121
00122 this->opened_ = true;
00123
00124 return 0;
00125 }
00126
00127 int ThreadPerConnectionSendTask::svc()
00128 {
00129 DBG_ENTRY_LVL("ThreadPerConnectionSendTask", "svc", 6);
00130
00131 this->thr_id_ = ACE_OS::thr_self();
00132
00133
00134
00135
00136 sigset_t set;
00137 ACE_OS::sigfillset(&set);
00138 ACE_OS::thr_sigsetmask(SIG_SETMASK, &set, NULL);
00139
00140
00141 while (!this->shutdown_initiated_) {
00142 SendRequest* req;
00143 {
00144 GuardType guard(this->lock_);
00145
00146 if (this->queue_.size() == 0) {
00147 this->work_available_.wait();
00148 }
00149
00150 if (this->shutdown_initiated_) {
00151 break;
00152 }
00153
00154 req = queue_.get();
00155
00156 if (req == 0) {
00157
00158
00159
00160
00161
00162
00163 continue;
00164 }
00165 }
00166
00167 this->execute(*req);
00168 delete req;
00169 }
00170
00171
00172 return 0;
00173 }
00174
00175 int ThreadPerConnectionSendTask::close(u_long flag)
00176 {
00177 DBG_ENTRY("ThreadPerConnectionSendTask","close");
00178
00179 if (flag == 0) {
00180 return 0;
00181 }
00182
00183 {
00184 GuardType guard(this->lock_);
00185
00186 if (this->shutdown_initiated_) {
00187 return 0;
00188 }
00189
00190
00191 this->shutdown_initiated_ = true;
00192 this->work_available_.signal();
00193 }
00194
00195 if (this->opened_ && !ACE_OS::thr_equal(this->thr_id_, ACE_OS::thr_self())) {
00196 this->wait();
00197 }
00198
00199 return 0;
00200 }
00201
00202 RemoveResult
00203 ThreadPerConnectionSendTask::remove_sample(const DataSampleElement* element)
00204 {
00205 DBG_ENTRY("ThreadPerConnectionSendTask", "remove_sample");
00206
00207 GuardType guard(this->lock_);
00208
00209 ACE_Message_Block* payload = element->get_sample()->cont();
00210 ThreadPerConRemoveVisitor visitor(payload);
00211
00212 this->queue_.accept_visitor(visitor);
00213
00214 return visitor.status();
00215 }
00216
00217 void ThreadPerConnectionSendTask::execute(SendRequest& req)
00218 {
00219 DBG_ENTRY_LVL("ThreadPerConnectionSendTask", "execute", 6);
00220
00221
00222
00223 switch (req.op_) {
00224 case SEND_START:
00225 this->link_->send_start_i();
00226 break;
00227 case SEND:
00228 this->link_->send_i(req.element_);
00229 break;
00230 case SEND_STOP:
00231
00232
00233
00234
00235 this->link_->send_stop_i(GUID_UNKNOWN);
00236 break;
00237 default:
00238 ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: ThreadPerConnectionSendTask::execute unknown command %d\n",
00239 req.op_));
00240 break;
00241 }
00242 }
00243
00244 }
00245 }
00246
00247 OPENDDS_END_VERSIONED_NAMESPACE_DECL