OpenDDS::DCPS::ThreadPerConnectionSendTask Class Reference

Execute the requests of sending a sample or control message. More...

#include <ThreadPerConnectionSendTask.h>

Inheritance diagram for OpenDDS::DCPS::ThreadPerConnectionSendTask:
Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::ThreadPerConnectionSendTask:
Collaboration graph
[legend]

List of all members.

Public Member Functions

 ThreadPerConnectionSendTask (DataLink *link)
virtual ~ThreadPerConnectionSendTask ()
int add_request (SendStrategyOpType op, TransportQueueElement *element=0)
virtual int open (void *=0)
 Activate the worker threads.
virtual int svc ()
 The "mainline" executed by the worker thread.
virtual int close (u_long flag=0)
 Called when the thread exits.
RemoveResult remove_sample (const DataSampleElement *element)
 Remove sample from the thread per connection queue.

Private Types

typedef ACE_SYNCH_MUTEX LockType
typedef ACE_Guard< LockTypeGuardType
typedef ACE_Condition< LockTypeConditionType
typedef BasicQueue< SendRequestQueueType

Private Member Functions

virtual void execute (SendRequest &req)
 Handle the request.

Private Attributes

LockType lock_
 Lock to protect the "state" (all of the data members) of this object.
QueueType queue_
 The request queue.
ConditionType work_available_
bool shutdown_initiated_
 Flag used to initiate a shutdown request to all worker threads.
bool opened_
 Flag used to avoid multiple open() calls.
ACE_thread_t thr_id_
 The id of the thread created by this task.
DataLinklink_
 The datalink to send the samples or control messages.

Detailed Description

Execute the requests of sending a sample or control message.

This task implements the request execute method which handles each step of sending a sample or control message.

Definition at line 58 of file ThreadPerConnectionSendTask.h.


Member Typedef Documentation

Definition at line 88 of file ThreadPerConnectionSendTask.h.

Definition at line 87 of file ThreadPerConnectionSendTask.h.

typedef ACE_SYNCH_MUTEX OpenDDS::DCPS::ThreadPerConnectionSendTask::LockType [private]

Definition at line 86 of file ThreadPerConnectionSendTask.h.

Definition at line 90 of file ThreadPerConnectionSendTask.h.


Constructor & Destructor Documentation

OpenDDS::DCPS::ThreadPerConnectionSendTask::ThreadPerConnectionSendTask ( DataLink link  ) 

Definition at line 26 of file ThreadPerConnectionSendTask.cpp.

References DBG_ENTRY_LVL, and NULL_thread.

00027   : lock_()
00028   , work_available_(lock_)
00029   , shutdown_initiated_(false)
00030   , opened_(false)
00031   , thr_id_(ACE_OS::NULL_thread)
00032   , link_(link)
00033 {
00034   DBG_ENTRY_LVL("ThreadPerConnectionSendTask", "ThreadPerConnectionSendTask", 6);
00035 }

OpenDDS::DCPS::ThreadPerConnectionSendTask::~ThreadPerConnectionSendTask (  )  [virtual]

Definition at line 37 of file ThreadPerConnectionSendTask.cpp.

References DBG_ENTRY_LVL.

00038 {
00039   DBG_ENTRY_LVL("ThreadPerConnectionSendTask", "~ThreadPerConnectionSendTask", 6);
00040 }


Member Function Documentation

int OpenDDS::DCPS::ThreadPerConnectionSendTask::add_request ( SendStrategyOpType  op,
TransportQueueElement element = 0 
)

Put the request to the request queue. Returns 0 if successful, -1 otherwise (it has been "rejected" or this task is shutdown).

Definition at line 42 of file ThreadPerConnectionSendTask.cpp.

References ACE_TEXT(), DBG_ENTRY, ACE_Auto_Basic_Ptr< X >::get(), LM_ERROR, lock_, OpenDDS::DCPS::BasicQueue< T >::put(), queue_, ACE_Auto_Basic_Ptr< X >::release(), shutdown_initiated_, ACE_Condition< MUTEX >::signal(), and work_available_.

00044 {
00045   DBG_ENTRY("ThreadPerConnectionSendTask", "add");
00046 
00047   ACE_Auto_Ptr<SendRequest> req(new SendRequest);
00048   req->op_ = op;
00049   req->element_ = element;
00050 
00051   int result = -1;
00052   { // guard scope
00053     GuardType guard(this->lock_);
00054 
00055     if (this->shutdown_initiated_) {
00056       return -1;
00057     }
00058 
00059     result = this->queue_.put(req.get());
00060 
00061     if (result == 0) {
00062       this->work_available_.signal();
00063       req.release();
00064 
00065     } else {
00066       ACE_ERROR((LM_ERROR,
00067                  ACE_TEXT("(%P|%t) ERROR: ThreadPerConnectionSendTask::add %p\n"),
00068                  ACE_TEXT("put")));
00069     }
00070   }
00071 
00072   return result;
00073 }

Here is the call graph for this function:

int OpenDDS::DCPS::ThreadPerConnectionSendTask::close ( u_long  flag = 0  )  [virtual]

Called when the thread exits.

Reimplemented from ACE_Task_Base.

Definition at line 175 of file ThreadPerConnectionSendTask.cpp.

References DBG_ENTRY, lock_, opened_, shutdown_initiated_, ACE_Condition< MUTEX >::signal(), ACE_OS::thr_equal(), thr_id_, ACE_OS::thr_self(), ACE_Task_Base::wait(), and work_available_.

00176 {
00177   DBG_ENTRY("ThreadPerConnectionSendTask","close");
00178 
00179   if (flag == 0) {
00180     return 0;
00181   }
00182 
00183   {
00184     GuardType guard(this->lock_);
00185 
00186     if (this->shutdown_initiated_) {
00187       return 0;
00188     }
00189 
00190     // Set the shutdown flag to true.
00191     this->shutdown_initiated_ = true;
00192     this->work_available_.signal();
00193   }
00194 
00195   if (this->opened_ && !ACE_OS::thr_equal(this->thr_id_, ACE_OS::thr_self())) {
00196     this->wait();
00197   }
00198 
00199   return 0;
00200 }

Here is the call graph for this function:

void OpenDDS::DCPS::ThreadPerConnectionSendTask::execute ( SendRequest req  )  [private, virtual]

Handle the request.

Definition at line 217 of file ThreadPerConnectionSendTask.cpp.

References DBG_ENTRY_LVL, OpenDDS::DCPS::SendRequest::element_, OpenDDS::DCPS::GUID_UNKNOWN, link_, LM_ERROR, OpenDDS::DCPS::SendRequest::op_, OpenDDS::DCPS::SEND, OpenDDS::DCPS::DataLink::send_i(), OpenDDS::DCPS::SEND_START, OpenDDS::DCPS::DataLink::send_start_i(), OpenDDS::DCPS::SEND_STOP, and OpenDDS::DCPS::DataLink::send_stop_i().

Referenced by svc().

00218 {
00219   DBG_ENTRY_LVL("ThreadPerConnectionSendTask", "execute", 6);
00220 
00221 
00222 
00223   switch (req.op_) {
00224   case SEND_START:
00225     this->link_->send_start_i();
00226     break;
00227   case SEND:
00228     this->link_->send_i(req.element_);
00229     break;
00230   case SEND_STOP:
00231     //DataLink::send_stop_i expects the RepoId of the message sender, however, in ThreadPerConnectionSendTask
00232     //the control element will be a null element with only the op_ set.  Thus pass in GUID_UNKNOWN which will
00233     //allow send_stop to call send_delayed_notifications without a match.  In the case of ThreadPerConnectionSendTask
00234     //this is allowable because only one thread will be managing the sending thus no deadlock down in send_delayed_notifications()
00235     this->link_->send_stop_i(GUID_UNKNOWN);
00236     break;
00237   default:
00238     ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: ThreadPerConnectionSendTask::execute unknown command %d\n",
00239                req.op_));
00240     break;
00241   }
00242 }

Here is the call graph for this function:

Here is the caller graph for this function:

int OpenDDS::DCPS::ThreadPerConnectionSendTask::open ( void *  = 0  )  [virtual]

Activate the worker threads.

Reimplemented from ACE_Task_Base.

Definition at line 75 of file ThreadPerConnectionSendTask.cpp.

References ACE_TEXT(), ACE_Task_Base::activate(), DBG_ENTRY, OpenDDS::DCPS::DCPS_debug_level, link_, LM_DEBUG, LM_ERROR, lock_, opened_, ACE_Event_Handler::priority(), TheServiceParticipant, and OpenDDS::DCPS::DataLink::transport_priority().

00076 {
00077   DBG_ENTRY("ThreadPerConnectionSendTask", "open");
00078 
00079   GuardType guard(this->lock_);
00080 
00081   // We can assume that we are in the proper state to handle this open()
00082   // call as long as we haven't been open()'ed before.
00083   if (this->opened_) {
00084     ACE_ERROR_RETURN((LM_ERROR,
00085                       "(%P|%t) ThreadPerConnectionSendTask failed to open.  "
00086                       "Task has previously been open()'ed.\n"),
00087                      -1);
00088   }
00089 
00090   DirectPriorityMapper mapper(this->link_->transport_priority());
00091   int priority = mapper.thread_priority();
00092 
00093   long flags  = THR_NEW_LWP | THR_JOINABLE ;//|THR_SCOPE_PROCESS | THR_SCOPE_THREAD;
00094   int policy = TheServiceParticipant->scheduler();
00095 
00096   if (policy >= 0) {
00097     flags |= policy;
00098   } else {
00099     flags |= THR_INHERIT_SCHED;
00100   }
00101 
00102   if (DCPS_debug_level > 0) {
00103     ACE_DEBUG((LM_DEBUG,
00104                ACE_TEXT("(%P|%t) ThreadPerConnectionSendTask::open(): ")
00105                ACE_TEXT("activating thread with flags 0x%08.8x ")
00106                ACE_TEXT("and priority %d.\n"),
00107                flags,
00108                priority));
00109   }
00110 
00111   // Activate this task object with one worker thread.
00112   if (this->activate(flags, 1, 0, priority) != 0) {
00113     // Assumes that when activate returns non-zero return code that
00114     // no threads were activated.
00115     ACE_ERROR_RETURN((LM_ERROR,
00116                       "(%P|%t) ThreadPerConnectionSendTask failed to activate "
00117                       "the worker threads.\n"),
00118                      -1);
00119   }
00120 
00121   // Now we have past the point where we can say we've been open()'ed before.
00122   this->opened_ = true;
00123 
00124   return 0;
00125 }

Here is the call graph for this function:

RemoveResult OpenDDS::DCPS::ThreadPerConnectionSendTask::remove_sample ( const DataSampleElement element  ) 

Remove sample from the thread per connection queue.

Definition at line 203 of file ThreadPerConnectionSendTask.cpp.

References OpenDDS::DCPS::BasicQueue< T >::accept_visitor(), ACE_Message_Block::cont(), DBG_ENTRY, OpenDDS::DCPS::DataSampleElement::get_sample(), lock_, queue_, and OpenDDS::DCPS::ThreadPerConRemoveVisitor::status().

00204 {
00205   DBG_ENTRY("ThreadPerConnectionSendTask", "remove_sample");
00206 
00207   GuardType guard(this->lock_);
00208 
00209   ACE_Message_Block* payload = element->get_sample()->cont();
00210   ThreadPerConRemoveVisitor visitor(payload);
00211 
00212   this->queue_.accept_visitor(visitor);
00213 
00214   return visitor.status();
00215 }

Here is the call graph for this function:

int OpenDDS::DCPS::ThreadPerConnectionSendTask::svc ( void   )  [virtual]

The "mainline" executed by the worker thread.

Reimplemented from ACE_Task_Base.

Definition at line 127 of file ThreadPerConnectionSendTask.cpp.

References DBG_ENTRY_LVL, execute(), OpenDDS::DCPS::BasicQueue< T >::get(), lock_, queue_, shutdown_initiated_, SIG_SETMASK, ACE_OS::sigfillset(), sigset_t, OpenDDS::DCPS::BasicQueue< T >::size(), thr_id_, ACE_OS::thr_self(), ACE_OS::thr_sigsetmask(), ACE_Condition< MUTEX >::wait(), and work_available_.

00128 {
00129   DBG_ENTRY_LVL("ThreadPerConnectionSendTask", "svc", 6);
00130 
00131   this->thr_id_ = ACE_OS::thr_self();
00132 
00133   // Ignore all signals to avoid
00134   //     ERROR: <something descriptive> Interrupted system call
00135   // The main thread will handle signals.
00136   sigset_t set;
00137   ACE_OS::sigfillset(&set);
00138   ACE_OS::thr_sigsetmask(SIG_SETMASK, &set, NULL);
00139 
00140   // Start the "GetWork-And-PerformWork" loop for the current worker thread.
00141   while (!this->shutdown_initiated_) {
00142     SendRequest* req;
00143     {
00144       GuardType guard(this->lock_);
00145 
00146       if (this->queue_.size() == 0) {
00147         this->work_available_.wait();
00148       }
00149 
00150       if (this->shutdown_initiated_) {
00151         break;
00152       }
00153 
00154       req = queue_.get();
00155 
00156       if (req == 0) {
00157         //I'm not sure why this thread got more signals than actual signals
00158         //when using thread_per_connection and the user application thread
00159         //send requests without interval. We just need ignore the dequeue
00160         //failure.
00161         //ACE_ERROR ((LM_ERROR, "(%P|%t) ERROR: ThreadPerConnectionSendTask::svc  %p\n",
00162         //  ACE_TEXT("dequeue_head")));
00163         continue;
00164       }
00165     }
00166 
00167     this->execute(*req);
00168     delete req;
00169   }
00170 
00171   // This will never get executed.
00172   return 0;
00173 }

Here is the call graph for this function:


Member Data Documentation

The datalink to send the samples or control messages.

Definition at line 114 of file ThreadPerConnectionSendTask.h.

Referenced by execute(), and open().

Lock to protect the "state" (all of the data members) of this object.

Definition at line 93 of file ThreadPerConnectionSendTask.h.

Referenced by add_request(), close(), open(), remove_sample(), and svc().

Flag used to avoid multiple open() calls.

Definition at line 108 of file ThreadPerConnectionSendTask.h.

Referenced by close(), and open().

The request queue.

Definition at line 96 of file ThreadPerConnectionSendTask.h.

Referenced by add_request(), remove_sample(), and svc().

Flag used to initiate a shutdown request to all worker threads.

Definition at line 105 of file ThreadPerConnectionSendTask.h.

Referenced by add_request(), close(), and svc().

The id of the thread created by this task.

Definition at line 111 of file ThreadPerConnectionSendTask.h.

Referenced by close(), and svc().

Condition used to signal the worker threads that they may be able to find a request in the queue_ that needs to be executed. This condition will be signal()'ed each time a request is added to the queue_, and also when this task is shutdown.

Definition at line 102 of file ThreadPerConnectionSendTask.h.

Referenced by add_request(), close(), and svc().


The documentation for this class was generated from the following files:
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1