ThreadPerConnectionSendTask.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
00009 #include "ThreadPerConnectionSendTask.h"
00010 #include "TransportQueueElement.h"
00011 #include "TransportSendElement.h"
00012 #include "DataLink.h"
00013 #include "ThreadPerConRemoveVisitor.h"
00014 #include "DirectPriorityMapper.h"
00015 #include "dds/DCPS/transport/framework/EntryExit.h"
00016 #include "dds/DCPS/DataSampleElement.h"
00017 #include "dds/DCPS/Service_Participant.h"
00018 
00019 #include "ace/Auto_Ptr.h"
00020 
00021 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00022 
00023 namespace OpenDDS {
00024 namespace DCPS {
00025 
00026 ThreadPerConnectionSendTask::ThreadPerConnectionSendTask(DataLink* link)
00027   : lock_()
00028   , work_available_(lock_)
00029   , shutdown_initiated_(false)
00030   , opened_(false)
00031   , thr_id_(ACE_OS::NULL_thread)
00032   , link_(link)
00033 {
00034   DBG_ENTRY_LVL("ThreadPerConnectionSendTask", "ThreadPerConnectionSendTask", 6);
00035 }
00036 
00037 ThreadPerConnectionSendTask::~ThreadPerConnectionSendTask()
00038 {
00039   DBG_ENTRY_LVL("ThreadPerConnectionSendTask", "~ThreadPerConnectionSendTask", 6);
00040 }
00041 
00042 int ThreadPerConnectionSendTask::add_request(SendStrategyOpType op,
00043                                              TransportQueueElement* element)
00044 {
00045   DBG_ENTRY("ThreadPerConnectionSendTask", "add");
00046 
00047   ACE_Auto_Ptr<SendRequest> req(new SendRequest);
00048   req->op_ = op;
00049   req->element_ = element;
00050 
00051   int result = -1;
00052   { // guard scope
00053     GuardType guard(this->lock_);
00054 
00055     if (this->shutdown_initiated_) {
00056       return -1;
00057     }
00058 
00059     result = this->queue_.put(req.get());
00060 
00061     if (result == 0) {
00062       this->work_available_.signal();
00063       req.release();
00064 
00065     } else {
00066       ACE_ERROR((LM_ERROR,
00067                  ACE_TEXT("(%P|%t) ERROR: ThreadPerConnectionSendTask::add %p\n"),
00068                  ACE_TEXT("put")));
00069     }
00070   }
00071 
00072   return result;
00073 }
00074 
00075 int ThreadPerConnectionSendTask::open(void*)
00076 {
00077   DBG_ENTRY("ThreadPerConnectionSendTask", "open");
00078 
00079   GuardType guard(this->lock_);
00080 
00081   // We can assume that we are in the proper state to handle this open()
00082   // call as long as we haven't been open()'ed before.
00083   if (this->opened_) {
00084     ACE_ERROR_RETURN((LM_ERROR,
00085                       "(%P|%t) ThreadPerConnectionSendTask failed to open.  "
00086                       "Task has previously been open()'ed.\n"),
00087                      -1);
00088   }
00089 
00090   DirectPriorityMapper mapper(this->link_->transport_priority());
00091   int priority = mapper.thread_priority();
00092 
00093   long flags  = THR_NEW_LWP | THR_JOINABLE ;//|THR_SCOPE_PROCESS | THR_SCOPE_THREAD;
00094   int policy = TheServiceParticipant->scheduler();
00095 
00096   if (policy >= 0) {
00097     flags |= policy;
00098   } else {
00099     flags |= THR_INHERIT_SCHED;
00100   }
00101 
00102   if (DCPS_debug_level > 0) {
00103     ACE_DEBUG((LM_DEBUG,
00104                ACE_TEXT("(%P|%t) ThreadPerConnectionSendTask::open(): ")
00105                ACE_TEXT("activating thread with flags 0x%08.8x ")
00106                ACE_TEXT("and priority %d.\n"),
00107                flags,
00108                priority));
00109   }
00110 
00111   // Activate this task object with one worker thread.
00112   if (this->activate(flags, 1, 0, priority) != 0) {
00113     // Assumes that when activate returns non-zero return code that
00114     // no threads were activated.
00115     ACE_ERROR_RETURN((LM_ERROR,
00116                       "(%P|%t) ThreadPerConnectionSendTask failed to activate "
00117                       "the worker threads.\n"),
00118                      -1);
00119   }
00120 
00121   // Now we have past the point where we can say we've been open()'ed before.
00122   this->opened_ = true;
00123 
00124   return 0;
00125 }
00126 
00127 int ThreadPerConnectionSendTask::svc()
00128 {
00129   DBG_ENTRY_LVL("ThreadPerConnectionSendTask", "svc", 6);
00130 
00131   this->thr_id_ = ACE_OS::thr_self();
00132 
00133   // Ignore all signals to avoid
00134   //     ERROR: <something descriptive> Interrupted system call
00135   // The main thread will handle signals.
00136   sigset_t set;
00137   ACE_OS::sigfillset(&set);
00138   ACE_OS::thr_sigsetmask(SIG_SETMASK, &set, NULL);
00139 
00140   // Start the "GetWork-And-PerformWork" loop for the current worker thread.
00141   while (!this->shutdown_initiated_) {
00142     SendRequest* req;
00143     {
00144       GuardType guard(this->lock_);
00145 
00146       if (this->queue_.size() == 0) {
00147         this->work_available_.wait();
00148       }
00149 
00150       if (this->shutdown_initiated_) {
00151         break;
00152       }
00153 
00154       req = queue_.get();
00155 
00156       if (req == 0) {
00157         //I'm not sure why this thread got more signals than actual signals
00158         //when using thread_per_connection and the user application thread
00159         //send requests without interval. We just need ignore the dequeue
00160         //failure.
00161         //ACE_ERROR ((LM_ERROR, "(%P|%t) ERROR: ThreadPerConnectionSendTask::svc  %p\n",
00162         //  ACE_TEXT("dequeue_head")));
00163         continue;
00164       }
00165     }
00166 
00167     this->execute(*req);
00168     delete req;
00169   }
00170 
00171   // This will never get executed.
00172   return 0;
00173 }
00174 
00175 int ThreadPerConnectionSendTask::close(u_long flag)
00176 {
00177   DBG_ENTRY("ThreadPerConnectionSendTask","close");
00178 
00179   if (flag == 0) {
00180     return 0;
00181   }
00182 
00183   {
00184     GuardType guard(this->lock_);
00185 
00186     if (this->shutdown_initiated_) {
00187       return 0;
00188     }
00189 
00190     // Set the shutdown flag to true.
00191     this->shutdown_initiated_ = true;
00192     this->work_available_.signal();
00193   }
00194 
00195   if (this->opened_ && !ACE_OS::thr_equal(this->thr_id_, ACE_OS::thr_self())) {
00196     this->wait();
00197   }
00198 
00199   return 0;
00200 }
00201 
00202 RemoveResult
00203 ThreadPerConnectionSendTask::remove_sample(const DataSampleElement* element)
00204 {
00205   DBG_ENTRY("ThreadPerConnectionSendTask", "remove_sample");
00206 
00207   GuardType guard(this->lock_);
00208 
00209   ACE_Message_Block* payload = element->get_sample()->cont();
00210   ThreadPerConRemoveVisitor visitor(payload);
00211 
00212   this->queue_.accept_visitor(visitor);
00213 
00214   return visitor.status();
00215 }
00216 
00217 void ThreadPerConnectionSendTask::execute(SendRequest& req)
00218 {
00219   DBG_ENTRY_LVL("ThreadPerConnectionSendTask", "execute", 6);
00220 
00221 
00222 
00223   switch (req.op_) {
00224   case SEND_START:
00225     this->link_->send_start_i();
00226     break;
00227   case SEND:
00228     this->link_->send_i(req.element_);
00229     break;
00230   case SEND_STOP:
00231     //DataLink::send_stop_i expects the RepoId of the message sender, however, in ThreadPerConnectionSendTask
00232     //the control element will be a null element with only the op_ set.  Thus pass in GUID_UNKNOWN which will
00233     //allow send_stop to call send_delayed_notifications without a match.  In the case of ThreadPerConnectionSendTask
00234     //this is allowable because only one thread will be managing the sending thus no deadlock down in send_delayed_notifications()
00235     this->link_->send_stop_i(GUID_UNKNOWN);
00236     break;
00237   default:
00238     ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: ThreadPerConnectionSendTask::execute unknown command %d\n",
00239                req.op_));
00240     break;
00241   }
00242 }
00243 
00244 }
00245 }
00246 
00247 OPENDDS_END_VERSIONED_NAMESPACE_DECL
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1