OpenDDS::DCPS::ThreadPerConnectionSendTask Class Reference

Execute the requests of sending a sample or control message. More...

#include <ThreadPerConnectionSendTask.h>

Inheritance diagram for OpenDDS::DCPS::ThreadPerConnectionSendTask:

Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::ThreadPerConnectionSendTask:

Collaboration graph
[legend]
List of all members.

Public Member Functions

 ThreadPerConnectionSendTask (DataLink *link)
 Constructor.
virtual ~ThreadPerConnectionSendTask ()
 Virtual Destructor.
int add_request (SendStrategyOpType op, TransportQueueElement *element=0)
virtual int open (void *=0)
 Activate the worker threads.
virtual int svc ()
 The "mainline" executed by the worker thread.
virtual int close (u_long flag=0)
 Called when the thread exits.
RemoveResult remove_sample (const DataSampleElement *element)
 Remove sample from the thread per connection queue.

Private Types

typedef ACE_SYNCH_MUTEX LockType
typedef ACE_Guard< LockTypeGuardType
typedef ACE_Condition< LockTypeConditionType
typedef BasicQueue< SendRequestQueueType

Private Member Functions

virtual void execute (SendRequest &req)
 Handle the request.

Private Attributes

LockType lock_
 Lock to protect the "state" (all of the data members) of this object.
QueueType queue_
 The request queue.
ConditionType work_available_
bool shutdown_initiated_
 Flag used to initiate a shutdown request to all worker threads.
bool opened_
 Flag used to avoid multiple open() calls.
ACE_thread_t thr_id_
 The id of the thread created by this task.
DataLinklink_
 The datalink to send the samples or control messages.

Detailed Description

Execute the requests of sending a sample or control message.

This task implements the request execute method which handles each step of sending a sample or control message.

Definition at line 55 of file ThreadPerConnectionSendTask.h.


Member Typedef Documentation

typedef ACE_Condition<LockType> OpenDDS::DCPS::ThreadPerConnectionSendTask::ConditionType [private]

Definition at line 88 of file ThreadPerConnectionSendTask.h.

typedef ACE_Guard<LockType> OpenDDS::DCPS::ThreadPerConnectionSendTask::GuardType [private]

Definition at line 87 of file ThreadPerConnectionSendTask.h.

typedef ACE_SYNCH_MUTEX OpenDDS::DCPS::ThreadPerConnectionSendTask::LockType [private]

Definition at line 86 of file ThreadPerConnectionSendTask.h.

typedef BasicQueue<SendRequest> OpenDDS::DCPS::ThreadPerConnectionSendTask::QueueType [private]

Definition at line 90 of file ThreadPerConnectionSendTask.h.


Constructor & Destructor Documentation

OpenDDS::DCPS::ThreadPerConnectionSendTask::ThreadPerConnectionSendTask ( DataLink link  ) 

Constructor.

Definition at line 24 of file ThreadPerConnectionSendTask.cpp.

References DBG_ENTRY_LVL.

00025   : lock_()
00026   , queue_(1, 10)
00027   , work_available_(lock_)
00028   , shutdown_initiated_(false)
00029   , opened_(false)
00030   , thr_id_(ACE_OS::NULL_thread)
00031   , link_(link)
00032 {
00033   DBG_ENTRY_LVL("ThreadPerConnectionSendTask", "ThreadPerConnectionSendTask", 6);
00034 }

OpenDDS::DCPS::ThreadPerConnectionSendTask::~ThreadPerConnectionSendTask (  )  [virtual]

Virtual Destructor.

Definition at line 36 of file ThreadPerConnectionSendTask.cpp.

References DBG_ENTRY_LVL.

00037 {
00038   DBG_ENTRY_LVL("ThreadPerConnectionSendTask", "~ThreadPerConnectionSendTask", 6);
00039 }


Member Function Documentation

int OpenDDS::DCPS::ThreadPerConnectionSendTask::add_request ( SendStrategyOpType  op,
TransportQueueElement element = 0 
)

Put the request to the request queue. Returns 0 if successful, -1 otherwise (it has been "rejected" or this task is shutdown).

Definition at line 41 of file ThreadPerConnectionSendTask.cpp.

References DBG_ENTRY, OpenDDS::DCPS::BasicQueue< T >::put(), queue_, and work_available_.

Referenced by OpenDDS::DCPS::DataLink::send(), OpenDDS::DCPS::DataLink::send_start(), and OpenDDS::DCPS::DataLink::send_stop().

00043 {
00044   DBG_ENTRY("ThreadPerConnectionSendTask", "add");
00045 
00046   ACE_Auto_Ptr<SendRequest> req(new SendRequest);
00047   req->op_ = op;
00048   req->element_ = element;
00049 
00050   int result = -1;
00051   { // guard scope
00052     GuardType guard(this->lock_);
00053 
00054     if (this->shutdown_initiated_) {
00055       return -1;
00056     }
00057 
00058     result = this->queue_.put(req.get());
00059 
00060     if (result == 0) {
00061       this->work_available_.signal();
00062       req.release();
00063 
00064     } else {
00065       ACE_ERROR((LM_ERROR,
00066                  ACE_TEXT("(%P|%t) ERROR: ThreadPerConnectionSendTask::add %p\n"),
00067                  ACE_TEXT("put")));
00068     }
00069   }
00070 
00071   return result;
00072 }

int OpenDDS::DCPS::ThreadPerConnectionSendTask::close ( u_long  flag = 0  )  [virtual]

Called when the thread exits.

Definition at line 174 of file ThreadPerConnectionSendTask.cpp.

References DBG_ENTRY, shutdown_initiated_, and work_available_.

Referenced by OpenDDS::DCPS::DataLink::pre_stop_i(), and OpenDDS::DCPS::DataLink::~DataLink().

00175 {
00176   DBG_ENTRY("ThreadPerConnectionSendTask","close");
00177 
00178   if (flag == 0) {
00179     return 0;
00180   }
00181 
00182   {
00183     GuardType guard(this->lock_);
00184 
00185     if (this->shutdown_initiated_) {
00186       return 0;
00187     }
00188 
00189     // Set the shutdown flag to true.
00190     this->shutdown_initiated_ = true;
00191     this->work_available_.signal();
00192   }
00193 
00194   if (this->opened_ && !ACE_OS::thr_equal(this->thr_id_, ACE_OS::thr_self())) {
00195     this->wait();
00196   }
00197 
00198   return 0;
00199 }

void OpenDDS::DCPS::ThreadPerConnectionSendTask::execute ( SendRequest req  )  [private, virtual]

Handle the request.

Definition at line 216 of file ThreadPerConnectionSendTask.cpp.

References DBG_ENTRY_LVL, OpenDDS::DCPS::SendRequest::element_, OpenDDS::DCPS::GUID_UNKNOWN, link_, OpenDDS::DCPS::SendRequest::op_, OpenDDS::DCPS::SEND, OpenDDS::DCPS::DataLink::send_i(), OpenDDS::DCPS::SEND_START, OpenDDS::DCPS::DataLink::send_start_i(), OpenDDS::DCPS::SEND_STOP, and OpenDDS::DCPS::DataLink::send_stop_i().

Referenced by svc().

00217 {
00218   DBG_ENTRY_LVL("ThreadPerConnectionSendTask", "execute", 6);
00219 
00220 
00221 
00222   switch (req.op_) {
00223   case SEND_START:
00224     this->link_->send_start_i();
00225     break;
00226   case SEND:
00227     this->link_->send_i(req.element_);
00228     break;
00229   case SEND_STOP:
00230     //DataLink::send_stop_i expects the RepoId of the message sender, however, in ThreadPerConnectionSendTask
00231     //the control element will be a null element with only the op_ set.  Thus pass in GUID_UNKNOWN which will
00232     //allow send_stop to call send_delayed_notifications without a match.  In the case of ThreadPerConnectionSendTask
00233     //this is allowable because only one thread will be managing the sending thus no deadlock down in send_delayed_notifications()
00234     this->link_->send_stop_i(GUID_UNKNOWN);
00235     break;
00236   default:
00237     ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: ThreadPerConnectionSendTask::execute unknown command %d\n",
00238                req.op_));
00239     break;
00240   }
00241 }

int OpenDDS::DCPS::ThreadPerConnectionSendTask::open ( void *  = 0  )  [virtual]

Activate the worker threads.

Definition at line 74 of file ThreadPerConnectionSendTask.cpp.

References DBG_ENTRY, OpenDDS::DCPS::DCPS_debug_level, opened_, and TheServiceParticipant.

00075 {
00076   DBG_ENTRY("ThreadPerConnectionSendTask", "open");
00077 
00078   GuardType guard(this->lock_);
00079 
00080   // We can assume that we are in the proper state to handle this open()
00081   // call as long as we haven't been open()'ed before.
00082   if (this->opened_) {
00083     ACE_ERROR_RETURN((LM_ERROR,
00084                       "(%P|%t) ThreadPerConnectionSendTask failed to open.  "
00085                       "Task has previously been open()'ed.\n"),
00086                      -1);
00087   }
00088 
00089   DirectPriorityMapper mapper(this->link_->transport_priority());
00090   int priority = mapper.thread_priority();
00091 
00092   long flags  = THR_NEW_LWP | THR_JOINABLE ;//|THR_SCOPE_PROCESS | THR_SCOPE_THREAD;
00093   int policy = TheServiceParticipant->scheduler();
00094 
00095   if (policy >= 0) {
00096     flags |= policy;
00097   } else {
00098     flags |= THR_INHERIT_SCHED;
00099   }
00100 
00101   if (DCPS_debug_level > 0) {
00102     ACE_DEBUG((LM_DEBUG,
00103                ACE_TEXT("(%P|%t) ThreadPerConnectionSendTask::open(): ")
00104                ACE_TEXT("activating thread with flags 0x%08.8x ")
00105                ACE_TEXT("and priority %d.\n"),
00106                flags,
00107                priority));
00108   }
00109 
00110   // Activate this task object with one worker thread.
00111   if (this->activate(flags, 1, 0, priority) != 0) {
00112     // Assumes that when activate returns non-zero return code that
00113     // no threads were activated.
00114     ACE_ERROR_RETURN((LM_ERROR,
00115                       "(%P|%t) ThreadPerConnectionSendTask failed to activate "
00116                       "the worker threads.\n"),
00117                      -1);
00118   }
00119 
00120   // Now we have past the point where we can say we've been open()'ed before.
00121   this->opened_ = true;
00122 
00123   return 0;
00124 }

RemoveResult OpenDDS::DCPS::ThreadPerConnectionSendTask::remove_sample ( const DataSampleElement element  ) 

Remove sample from the thread per connection queue.

Definition at line 202 of file ThreadPerConnectionSendTask.cpp.

References OpenDDS::DCPS::BasicQueue< T >::accept_visitor(), DBG_ENTRY, OpenDDS::DCPS::DataSampleElement::get_sample(), queue_, and OpenDDS::DCPS::ThreadPerConRemoveVisitor::status().

Referenced by OpenDDS::DCPS::DataLink::remove_sample().

00203 {
00204   DBG_ENTRY("ThreadPerConnectionSendTask", "remove_sample");
00205 
00206   GuardType guard(this->lock_);
00207 
00208   ACE_Message_Block* payload = element->get_sample()->cont();
00209   ThreadPerConRemoveVisitor visitor(payload);
00210 
00211   this->queue_.accept_visitor(visitor);
00212 
00213   return visitor.status();
00214 }

int OpenDDS::DCPS::ThreadPerConnectionSendTask::svc (  )  [virtual]

The "mainline" executed by the worker thread.

Definition at line 126 of file ThreadPerConnectionSendTask.cpp.

References DBG_ENTRY_LVL, execute(), OpenDDS::DCPS::BasicQueue< T >::get(), queue_, and thr_id_.

00127 {
00128   DBG_ENTRY_LVL("ThreadPerConnectionSendTask", "svc", 6);
00129 
00130   this->thr_id_ = ACE_OS::thr_self();
00131 
00132   // Ignore all signals to avoid
00133   //     ERROR: <something descriptive> Interrupted system call
00134   // The main thread will handle signals.
00135   sigset_t set;
00136   ACE_OS::sigfillset(&set);
00137   ACE_OS::thr_sigsetmask(SIG_SETMASK, &set, NULL);
00138 
00139   // Start the "GetWork-And-PerformWork" loop for the current worker thread.
00140   while (!this->shutdown_initiated_) {
00141     SendRequest* req;
00142     {
00143       GuardType guard(this->lock_);
00144 
00145       if (this->queue_.size() == 0) {
00146         this->work_available_.wait();
00147       }
00148 
00149       if (this->shutdown_initiated_) {
00150         break;
00151       }
00152 
00153       req = queue_.get();
00154 
00155       if (req == 0) {
00156         //I'm not sure why this thread got more signals than actual signals
00157         //when using thread_per_connection and the user application thread
00158         //send requests without interval. We just need ignore the dequeue
00159         //failure.
00160         //ACE_ERROR ((LM_ERROR, "(%P|%t) ERROR: ThreadPerConnectionSendTask::svc  %p\n",
00161         //  ACE_TEXT("dequeue_head")));
00162         continue;
00163       }
00164     }
00165 
00166     this->execute(*req);
00167     delete req;
00168   }
00169 
00170   // This will never get executed.
00171   return 0;
00172 }


Member Data Documentation

DataLink* OpenDDS::DCPS::ThreadPerConnectionSendTask::link_ [private]

The datalink to send the samples or control messages.

Definition at line 114 of file ThreadPerConnectionSendTask.h.

Referenced by execute().

LockType OpenDDS::DCPS::ThreadPerConnectionSendTask::lock_ [private]

Lock to protect the "state" (all of the data members) of this object.

Definition at line 93 of file ThreadPerConnectionSendTask.h.

bool OpenDDS::DCPS::ThreadPerConnectionSendTask::opened_ [private]

Flag used to avoid multiple open() calls.

Definition at line 108 of file ThreadPerConnectionSendTask.h.

Referenced by open().

QueueType OpenDDS::DCPS::ThreadPerConnectionSendTask::queue_ [private]

The request queue.

Definition at line 96 of file ThreadPerConnectionSendTask.h.

Referenced by add_request(), remove_sample(), and svc().

bool OpenDDS::DCPS::ThreadPerConnectionSendTask::shutdown_initiated_ [private]

Flag used to initiate a shutdown request to all worker threads.

Definition at line 105 of file ThreadPerConnectionSendTask.h.

Referenced by close().

ACE_thread_t OpenDDS::DCPS::ThreadPerConnectionSendTask::thr_id_ [private]

The id of the thread created by this task.

Definition at line 111 of file ThreadPerConnectionSendTask.h.

Referenced by svc().

ConditionType OpenDDS::DCPS::ThreadPerConnectionSendTask::work_available_ [private]

Condition used to signal the worker threads that they may be able to find a request in the queue_ that needs to be executed. This condition will be signal()'ed each time a request is added to the queue_, and also when this task is shutdown.

Definition at line 102 of file ThreadPerConnectionSendTask.h.

Referenced by add_request(), and close().


The documentation for this class was generated from the following files:
Generated on Fri Feb 12 20:06:29 2016 for OpenDDS by  doxygen 1.4.7