00001
00002
00003
00004
00005
00006
00007
00008 #include "DCPS/DdsDcps_pch.h"
00009 #include "ThreadPerConnectionSendTask.h"
00010 #include "TransportQueueElement.h"
00011 #include "TransportSendElement.h"
00012 #include "DataLink.h"
00013 #include "ThreadPerConRemoveVisitor.h"
00014 #include "DirectPriorityMapper.h"
00015 #include "dds/DCPS/transport/framework/EntryExit.h"
00016 #include "dds/DCPS/DataSampleElement.h"
00017 #include "dds/DCPS/Service_Participant.h"
00018
00019 #include "ace/Auto_Ptr.h"
00020
00021 namespace OpenDDS {
00022 namespace DCPS {
00023
00024 ThreadPerConnectionSendTask::ThreadPerConnectionSendTask(DataLink* link)
00025 : lock_()
00026 , queue_(1, 10)
00027 , work_available_(lock_)
00028 , shutdown_initiated_(false)
00029 , opened_(false)
00030 , thr_id_(ACE_OS::NULL_thread)
00031 , link_(link)
00032 {
00033 DBG_ENTRY_LVL("ThreadPerConnectionSendTask", "ThreadPerConnectionSendTask", 6);
00034 }
00035
00036 ThreadPerConnectionSendTask::~ThreadPerConnectionSendTask()
00037 {
00038 DBG_ENTRY_LVL("ThreadPerConnectionSendTask", "~ThreadPerConnectionSendTask", 6);
00039 }
00040
00041 int ThreadPerConnectionSendTask::add_request(SendStrategyOpType op,
00042 TransportQueueElement* element)
00043 {
00044 DBG_ENTRY("ThreadPerConnectionSendTask", "add");
00045
00046 ACE_Auto_Ptr<SendRequest> req(new SendRequest);
00047 req->op_ = op;
00048 req->element_ = element;
00049
00050 int result = -1;
00051 {
00052 GuardType guard(this->lock_);
00053
00054 if (this->shutdown_initiated_) {
00055 return -1;
00056 }
00057
00058 result = this->queue_.put(req.get());
00059
00060 if (result == 0) {
00061 this->work_available_.signal();
00062 req.release();
00063
00064 } else {
00065 ACE_ERROR((LM_ERROR,
00066 ACE_TEXT("(%P|%t) ERROR: ThreadPerConnectionSendTask::add %p\n"),
00067 ACE_TEXT("put")));
00068 }
00069 }
00070
00071 return result;
00072 }
00073
00074 int ThreadPerConnectionSendTask::open(void*)
00075 {
00076 DBG_ENTRY("ThreadPerConnectionSendTask", "open");
00077
00078 GuardType guard(this->lock_);
00079
00080
00081
00082 if (this->opened_) {
00083 ACE_ERROR_RETURN((LM_ERROR,
00084 "(%P|%t) ThreadPerConnectionSendTask failed to open. "
00085 "Task has previously been open()'ed.\n"),
00086 -1);
00087 }
00088
00089 DirectPriorityMapper mapper(this->link_->transport_priority());
00090 int priority = mapper.thread_priority();
00091
00092 long flags = THR_NEW_LWP | THR_JOINABLE ;
00093 int policy = TheServiceParticipant->scheduler();
00094
00095 if (policy >= 0) {
00096 flags |= policy;
00097 } else {
00098 flags |= THR_INHERIT_SCHED;
00099 }
00100
00101 if (DCPS_debug_level > 0) {
00102 ACE_DEBUG((LM_DEBUG,
00103 ACE_TEXT("(%P|%t) ThreadPerConnectionSendTask::open(): ")
00104 ACE_TEXT("activating thread with flags 0x%08.8x ")
00105 ACE_TEXT("and priority %d.\n"),
00106 flags,
00107 priority));
00108 }
00109
00110
00111 if (this->activate(flags, 1, 0, priority) != 0) {
00112
00113
00114 ACE_ERROR_RETURN((LM_ERROR,
00115 "(%P|%t) ThreadPerConnectionSendTask failed to activate "
00116 "the worker threads.\n"),
00117 -1);
00118 }
00119
00120
00121 this->opened_ = true;
00122
00123 return 0;
00124 }
00125
00126 int ThreadPerConnectionSendTask::svc()
00127 {
00128 DBG_ENTRY_LVL("ThreadPerConnectionSendTask", "svc", 6);
00129
00130 this->thr_id_ = ACE_OS::thr_self();
00131
00132
00133
00134
00135 sigset_t set;
00136 ACE_OS::sigfillset(&set);
00137 ACE_OS::thr_sigsetmask(SIG_SETMASK, &set, NULL);
00138
00139
00140 while (!this->shutdown_initiated_) {
00141 SendRequest* req;
00142 {
00143 GuardType guard(this->lock_);
00144
00145 if (this->queue_.size() == 0) {
00146 this->work_available_.wait();
00147 }
00148
00149 if (this->shutdown_initiated_) {
00150 break;
00151 }
00152
00153 req = queue_.get();
00154
00155 if (req == 0) {
00156
00157
00158
00159
00160
00161
00162 continue;
00163 }
00164 }
00165
00166 this->execute(*req);
00167 delete req;
00168 }
00169
00170
00171 return 0;
00172 }
00173
00174 int ThreadPerConnectionSendTask::close(u_long flag)
00175 {
00176 DBG_ENTRY("ThreadPerConnectionSendTask","close");
00177
00178 if (flag == 0) {
00179 return 0;
00180 }
00181
00182 {
00183 GuardType guard(this->lock_);
00184
00185 if (this->shutdown_initiated_) {
00186 return 0;
00187 }
00188
00189
00190 this->shutdown_initiated_ = true;
00191 this->work_available_.signal();
00192 }
00193
00194 if (this->opened_ && !ACE_OS::thr_equal(this->thr_id_, ACE_OS::thr_self())) {
00195 this->wait();
00196 }
00197
00198 return 0;
00199 }
00200
00201 RemoveResult
00202 ThreadPerConnectionSendTask::remove_sample(const DataSampleElement* element)
00203 {
00204 DBG_ENTRY("ThreadPerConnectionSendTask", "remove_sample");
00205
00206 GuardType guard(this->lock_);
00207
00208 ACE_Message_Block* payload = element->get_sample()->cont();
00209 ThreadPerConRemoveVisitor visitor(payload);
00210
00211 this->queue_.accept_visitor(visitor);
00212
00213 return visitor.status();
00214 }
00215
00216 void ThreadPerConnectionSendTask::execute(SendRequest& req)
00217 {
00218 DBG_ENTRY_LVL("ThreadPerConnectionSendTask", "execute", 6);
00219
00220
00221
00222 switch (req.op_) {
00223 case SEND_START:
00224 this->link_->send_start_i();
00225 break;
00226 case SEND:
00227 this->link_->send_i(req.element_);
00228 break;
00229 case SEND_STOP:
00230
00231
00232
00233
00234 this->link_->send_stop_i(GUID_UNKNOWN);
00235 break;
00236 default:
00237 ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: ThreadPerConnectionSendTask::execute unknown command %d\n",
00238 req.op_));
00239 break;
00240 }
00241 }
00242
00243 }
00244 }