ThreadPerConnectionSendTask.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
00009 #include "ThreadPerConnectionSendTask.h"
00010 #include "TransportQueueElement.h"
00011 #include "TransportSendElement.h"
00012 #include "DataLink.h"
00013 #include "ThreadPerConRemoveVisitor.h"
00014 #include "DirectPriorityMapper.h"
00015 #include "dds/DCPS/transport/framework/EntryExit.h"
00016 #include "dds/DCPS/DataSampleElement.h"
00017 #include "dds/DCPS/Service_Participant.h"
00018 
00019 #include "ace/Auto_Ptr.h"
00020 
00021 namespace OpenDDS {
00022 namespace DCPS {
00023 
00024 ThreadPerConnectionSendTask::ThreadPerConnectionSendTask(DataLink* link)
00025   : lock_()
00026   , queue_(1, 10)
00027   , work_available_(lock_)
00028   , shutdown_initiated_(false)
00029   , opened_(false)
00030   , thr_id_(ACE_OS::NULL_thread)
00031   , link_(link)
00032 {
00033   DBG_ENTRY_LVL("ThreadPerConnectionSendTask", "ThreadPerConnectionSendTask", 6);
00034 }
00035 
00036 ThreadPerConnectionSendTask::~ThreadPerConnectionSendTask()
00037 {
00038   DBG_ENTRY_LVL("ThreadPerConnectionSendTask", "~ThreadPerConnectionSendTask", 6);
00039 }
00040 
00041 int ThreadPerConnectionSendTask::add_request(SendStrategyOpType op,
00042                                              TransportQueueElement* element)
00043 {
00044   DBG_ENTRY("ThreadPerConnectionSendTask", "add");
00045 
00046   ACE_Auto_Ptr<SendRequest> req(new SendRequest);
00047   req->op_ = op;
00048   req->element_ = element;
00049 
00050   int result = -1;
00051   { // guard scope
00052     GuardType guard(this->lock_);
00053 
00054     if (this->shutdown_initiated_) {
00055       return -1;
00056     }
00057 
00058     result = this->queue_.put(req.get());
00059 
00060     if (result == 0) {
00061       this->work_available_.signal();
00062       req.release();
00063 
00064     } else {
00065       ACE_ERROR((LM_ERROR,
00066                  ACE_TEXT("(%P|%t) ERROR: ThreadPerConnectionSendTask::add %p\n"),
00067                  ACE_TEXT("put")));
00068     }
00069   }
00070 
00071   return result;
00072 }
00073 
00074 int ThreadPerConnectionSendTask::open(void*)
00075 {
00076   DBG_ENTRY("ThreadPerConnectionSendTask", "open");
00077 
00078   GuardType guard(this->lock_);
00079 
00080   // We can assume that we are in the proper state to handle this open()
00081   // call as long as we haven't been open()'ed before.
00082   if (this->opened_) {
00083     ACE_ERROR_RETURN((LM_ERROR,
00084                       "(%P|%t) ThreadPerConnectionSendTask failed to open.  "
00085                       "Task has previously been open()'ed.\n"),
00086                      -1);
00087   }
00088 
00089   DirectPriorityMapper mapper(this->link_->transport_priority());
00090   int priority = mapper.thread_priority();
00091 
00092   long flags  = THR_NEW_LWP | THR_JOINABLE ;//|THR_SCOPE_PROCESS | THR_SCOPE_THREAD;
00093   int policy = TheServiceParticipant->scheduler();
00094 
00095   if (policy >= 0) {
00096     flags |= policy;
00097   } else {
00098     flags |= THR_INHERIT_SCHED;
00099   }
00100 
00101   if (DCPS_debug_level > 0) {
00102     ACE_DEBUG((LM_DEBUG,
00103                ACE_TEXT("(%P|%t) ThreadPerConnectionSendTask::open(): ")
00104                ACE_TEXT("activating thread with flags 0x%08.8x ")
00105                ACE_TEXT("and priority %d.\n"),
00106                flags,
00107                priority));
00108   }
00109 
00110   // Activate this task object with one worker thread.
00111   if (this->activate(flags, 1, 0, priority) != 0) {
00112     // Assumes that when activate returns non-zero return code that
00113     // no threads were activated.
00114     ACE_ERROR_RETURN((LM_ERROR,
00115                       "(%P|%t) ThreadPerConnectionSendTask failed to activate "
00116                       "the worker threads.\n"),
00117                      -1);
00118   }
00119 
00120   // Now we have past the point where we can say we've been open()'ed before.
00121   this->opened_ = true;
00122 
00123   return 0;
00124 }
00125 
00126 int ThreadPerConnectionSendTask::svc()
00127 {
00128   DBG_ENTRY_LVL("ThreadPerConnectionSendTask", "svc", 6);
00129 
00130   this->thr_id_ = ACE_OS::thr_self();
00131 
00132   // Ignore all signals to avoid
00133   //     ERROR: <something descriptive> Interrupted system call
00134   // The main thread will handle signals.
00135   sigset_t set;
00136   ACE_OS::sigfillset(&set);
00137   ACE_OS::thr_sigsetmask(SIG_SETMASK, &set, NULL);
00138 
00139   // Start the "GetWork-And-PerformWork" loop for the current worker thread.
00140   while (!this->shutdown_initiated_) {
00141     SendRequest* req;
00142     {
00143       GuardType guard(this->lock_);
00144 
00145       if (this->queue_.size() == 0) {
00146         this->work_available_.wait();
00147       }
00148 
00149       if (this->shutdown_initiated_) {
00150         break;
00151       }
00152 
00153       req = queue_.get();
00154 
00155       if (req == 0) {
00156         //I'm not sure why this thread got more signals than actual signals
00157         //when using thread_per_connection and the user application thread
00158         //send requests without interval. We just need ignore the dequeue
00159         //failure.
00160         //ACE_ERROR ((LM_ERROR, "(%P|%t) ERROR: ThreadPerConnectionSendTask::svc  %p\n",
00161         //  ACE_TEXT("dequeue_head")));
00162         continue;
00163       }
00164     }
00165 
00166     this->execute(*req);
00167     delete req;
00168   }
00169 
00170   // This will never get executed.
00171   return 0;
00172 }
00173 
00174 int ThreadPerConnectionSendTask::close(u_long flag)
00175 {
00176   DBG_ENTRY("ThreadPerConnectionSendTask","close");
00177 
00178   if (flag == 0) {
00179     return 0;
00180   }
00181 
00182   {
00183     GuardType guard(this->lock_);
00184 
00185     if (this->shutdown_initiated_) {
00186       return 0;
00187     }
00188 
00189     // Set the shutdown flag to true.
00190     this->shutdown_initiated_ = true;
00191     this->work_available_.signal();
00192   }
00193 
00194   if (this->opened_ && !ACE_OS::thr_equal(this->thr_id_, ACE_OS::thr_self())) {
00195     this->wait();
00196   }
00197 
00198   return 0;
00199 }
00200 
00201 RemoveResult
00202 ThreadPerConnectionSendTask::remove_sample(const DataSampleElement* element)
00203 {
00204   DBG_ENTRY("ThreadPerConnectionSendTask", "remove_sample");
00205 
00206   GuardType guard(this->lock_);
00207 
00208   ACE_Message_Block* payload = element->get_sample()->cont();
00209   ThreadPerConRemoveVisitor visitor(payload);
00210 
00211   this->queue_.accept_visitor(visitor);
00212 
00213   return visitor.status();
00214 }
00215 
00216 void ThreadPerConnectionSendTask::execute(SendRequest& req)
00217 {
00218   DBG_ENTRY_LVL("ThreadPerConnectionSendTask", "execute", 6);
00219 
00220 
00221 
00222   switch (req.op_) {
00223   case SEND_START:
00224     this->link_->send_start_i();
00225     break;
00226   case SEND:
00227     this->link_->send_i(req.element_);
00228     break;
00229   case SEND_STOP:
00230     //DataLink::send_stop_i expects the RepoId of the message sender, however, in ThreadPerConnectionSendTask
00231     //the control element will be a null element with only the op_ set.  Thus pass in GUID_UNKNOWN which will
00232     //allow send_stop to call send_delayed_notifications without a match.  In the case of ThreadPerConnectionSendTask
00233     //this is allowable because only one thread will be managing the sending thus no deadlock down in send_delayed_notifications()
00234     this->link_->send_stop_i(GUID_UNKNOWN);
00235     break;
00236   default:
00237     ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: ThreadPerConnectionSendTask::execute unknown command %d\n",
00238                req.op_));
00239     break;
00240   }
00241 }
00242 
00243 }
00244 }

Generated on Fri Feb 12 20:05:27 2016 for OpenDDS by  doxygen 1.4.7