LCOV - code coverage report
Current view: top level - DCPS/transport/framework - ThreadPerConnectionSendTask.cpp (source / functions) Hit Total Coverage
Test: coverage.info Lines: 0 109 0.0 %
Date: 2023-04-30 01:32:43 Functions: 0 9 0.0 %

          Line data    Source code
       1             : /*
       2             :  *
       3             :  *
       4             :  * Distributed under the OpenDDS License.
       5             :  * See: http://www.opendds.org/license.html
       6             :  */
       7             : 
       8             : #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
       9             : #include "ThreadPerConnectionSendTask.h"
      10             : #include "TransportQueueElement.h"
      11             : #include "TransportSendElement.h"
      12             : #include "DataLink.h"
      13             : #include "ThreadPerConRemoveVisitor.h"
      14             : #include "DirectPriorityMapper.h"
      15             : #include "EntryExit.h"
      16             : #include "dds/DCPS/DataSampleElement.h"
      17             : #include "dds/DCPS/Service_Participant.h"
      18             : 
      19             : #include "ace/Auto_Ptr.h"
      20             : 
      21             : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
      22             : 
      23             : namespace OpenDDS {
      24             : namespace DCPS {
      25             : 
      26           0 : ThreadPerConnectionSendTask::ThreadPerConnectionSendTask(DataLink* link)
      27           0 :   : lock_()
      28           0 :   , work_available_(lock_)
      29           0 :   , shutdown_initiated_(false)
      30           0 :   , opened_(false)
      31           0 :   , thr_id_(ACE_OS::NULL_thread)
      32           0 :   , link_(link)
      33             : {
      34             :   DBG_ENTRY_LVL("ThreadPerConnectionSendTask", "ThreadPerConnectionSendTask", 6);
      35           0 : }
      36             : 
      37           0 : ThreadPerConnectionSendTask::~ThreadPerConnectionSendTask()
      38             : {
      39             :   DBG_ENTRY_LVL("ThreadPerConnectionSendTask", "~ThreadPerConnectionSendTask", 6);
      40           0 : }
      41             : 
      42           0 : int ThreadPerConnectionSendTask::add_request(SendStrategyOpType op,
      43             :                                              TransportQueueElement* element)
      44             : {
      45             :   DBG_ENTRY("ThreadPerConnectionSendTask", "add");
      46             : 
      47           0 :   ACE_Auto_Ptr<SendRequest> req(new SendRequest);
      48           0 :   req->op_ = op;
      49           0 :   req->element_ = element;
      50             : 
      51           0 :   int result = -1;
      52             :   { // guard scope
      53           0 :     GuardType guard(lock_);
      54             : 
      55           0 :     if (shutdown_initiated_) {
      56           0 :       return -1;
      57             :     }
      58             : 
      59           0 :     result = queue_.put(req.get());
      60             : 
      61           0 :     if (result == 0 && op == SEND_STOP) {
      62           0 :       work_available_.notify_one();
      63             :     }
      64           0 :   }
      65             : 
      66           0 :   if (result == 0) {
      67           0 :     req.release();
      68             :   } else {
      69           0 :     ACE_ERROR((LM_ERROR,
      70             :                ACE_TEXT("(%P|%t) ERROR: ThreadPerConnectionSendTask::add %p\n"),
      71             :                ACE_TEXT("put")));
      72             :   }
      73             : 
      74           0 :   return result;
      75           0 : }
      76             : 
      77           0 : int ThreadPerConnectionSendTask::open(void*)
      78             : {
      79             :   DBG_ENTRY("ThreadPerConnectionSendTask", "open");
      80             : 
      81           0 :   GuardType guard(lock_);
      82             : 
      83             :   // We can assume that we are in the proper state to handle this open()
      84             :   // call as long as we haven't been open()'ed before.
      85           0 :   if (opened_) {
      86           0 :     ACE_ERROR_RETURN((LM_ERROR,
      87             :                       "(%P|%t) ThreadPerConnectionSendTask failed to open.  "
      88             :                       "Task has previously been open()'ed.\n"),
      89             :                      -1);
      90             :   }
      91             : 
      92           0 :   DirectPriorityMapper mapper(link_->transport_priority());
      93           0 :   int priority = mapper.thread_priority();
      94             : 
      95           0 :   long flags  = THR_NEW_LWP | THR_JOINABLE ;//|THR_SCOPE_PROCESS | THR_SCOPE_THREAD;
      96           0 :   long policy = TheServiceParticipant->scheduler();
      97             : 
      98           0 :   if (policy >= 0) {
      99           0 :     flags |= policy;
     100             :   } else {
     101           0 :     flags |= THR_INHERIT_SCHED;
     102             :   }
     103             : 
     104           0 :   if (DCPS_debug_level > 0) {
     105           0 :     ACE_DEBUG((LM_DEBUG,
     106             :                ACE_TEXT("(%P|%t) ThreadPerConnectionSendTask::open(): ")
     107             :                ACE_TEXT("activating thread with flags 0x%08.8x ")
     108             :                ACE_TEXT("and priority %d.\n"),
     109             :                flags,
     110             :                priority));
     111             :   }
     112             : 
     113             :   // Activate this task object with one worker thread.
     114           0 :   if (activate(flags, 1, 0, priority) != 0) {
     115             :     // Assumes that when activate returns non-zero return code that
     116             :     // no threads were activated.
     117           0 :     ACE_ERROR_RETURN((LM_ERROR,
     118             :                       "(%P|%t) ThreadPerConnectionSendTask failed to activate "
     119             :                       "the worker threads.\n"),
     120             :                      -1);
     121             :   }
     122             : 
     123             :   // Now we have past the point where we can say we've been open()'ed before.
     124           0 :   opened_ = true;
     125             : 
     126           0 :   return 0;
     127           0 : }
     128             : 
     129           0 : int ThreadPerConnectionSendTask::svc()
     130             : {
     131           0 :   ThreadStatusManager& thread_status_manager = TheServiceParticipant->get_thread_status_manager();
     132           0 :   ThreadStatusManager::Start s(thread_status_manager, "ThreadPerConnectionSendTask");
     133             : 
     134             :   DBG_ENTRY_LVL("ThreadPerConnectionSendTask", "svc", 6);
     135             : 
     136           0 :   thr_id_ = ACE_OS::thr_self();
     137             : 
     138             :   // Ignore all signals to avoid
     139             :   //     ERROR: <something descriptive> Interrupted system call
     140             :   // The main thread will handle signals.
     141             :   sigset_t set;
     142           0 :   ACE_OS::sigfillset(&set);
     143           0 :   ACE_OS::thr_sigsetmask(SIG_SETMASK, &set, NULL);
     144             : 
     145             :   SendRequest* req;
     146           0 :   OPENDDS_VECTOR(SendRequest*) reqs;
     147             : 
     148           0 :   ACE_Reverse_Lock<LockType> rev_lock(lock_);
     149           0 :   GuardType guard(lock_);
     150             : 
     151             :   // Start the "GetWork-And-PerformWork" loop for the current worker thread.
     152           0 :   while (!shutdown_initiated_) {
     153             : 
     154           0 :     if (queue_.size() == 0) {
     155           0 :       work_available_.wait(thread_status_manager);
     156             :     }
     157             : 
     158           0 :     if (shutdown_initiated_) {
     159           0 :       break;
     160             :     }
     161             : 
     162           0 :     while (queue_.size() != 0 && (reqs.empty() || reqs.back()->op_ != SEND_STOP)) {
     163           0 :       req = queue_.get();
     164             : 
     165           0 :       if (req == 0) {
     166             :         //I'm not sure why this thread got more signals than actual signals
     167             :         //when using thread_per_connection and the user application thread
     168             :         //send requests without interval. We just need ignore the dequeue
     169             :         //failure.
     170             :         //ACE_ERROR ((LM_ERROR, "(%P|%t) ERROR: ThreadPerConnectionSendTask::svc  %p\n",
     171             :         //  ACE_TEXT("dequeue_head")));
     172           0 :         continue;
     173             :       }
     174           0 :       reqs.push_back(req);
     175             :     }
     176             : 
     177           0 :     ACE_Guard<ACE_Reverse_Lock<LockType> > rev_guard(rev_lock);
     178             : 
     179           0 :     if (!reqs.empty() && reqs.back()->op_ == SEND_STOP) {
     180           0 :       for (size_t i = 0; i < reqs.size(); ++i) {
     181           0 :         execute(*reqs[i]);
     182           0 :         delete reqs[i];
     183             :       }
     184           0 :       reqs.clear();
     185             :     }
     186           0 :   }
     187             : 
     188           0 :   return 0;
     189           0 : }
     190             : 
     191           0 : int ThreadPerConnectionSendTask::close(u_long flag)
     192             : {
     193             :   DBG_ENTRY("ThreadPerConnectionSendTask","close");
     194             : 
     195           0 :   if (flag == 0) {
     196           0 :     return 0;
     197             :   }
     198             : 
     199             :   {
     200           0 :     GuardType guard(lock_);
     201             : 
     202           0 :     if (shutdown_initiated_) {
     203           0 :       return 0;
     204             :     }
     205             : 
     206             :     // Set the shutdown flag to true.
     207           0 :     shutdown_initiated_ = true;
     208           0 :     work_available_.notify_all();
     209           0 :   }
     210             : 
     211           0 :   if (opened_ && !ACE_OS::thr_equal(thr_id_, ACE_OS::thr_self())) {
     212           0 :     ThreadStatusManager::Sleeper s(TheServiceParticipant->get_thread_status_manager());
     213           0 :     wait();
     214           0 :   }
     215             : 
     216           0 :   return 0;
     217             : }
     218             : 
     219             : RemoveResult
     220           0 : ThreadPerConnectionSendTask::remove_sample(const DataSampleElement* element)
     221             : {
     222             :   DBG_ENTRY("ThreadPerConnectionSendTask", "remove_sample");
     223             : 
     224           0 :   ACE_Message_Block* payload = element->get_sample()->cont();
     225           0 :   ThreadPerConRemoveVisitor visitor(payload);
     226             : 
     227           0 :   GuardType guard(lock_);
     228             : 
     229           0 :   queue_.accept_visitor(visitor);
     230             : 
     231           0 :   return visitor.status();
     232           0 : }
     233             : 
     234           0 : void ThreadPerConnectionSendTask::execute(SendRequest& req)
     235             : {
     236             :   DBG_ENTRY_LVL("ThreadPerConnectionSendTask", "execute", 6);
     237             : 
     238             : 
     239             : 
     240           0 :   switch (req.op_) {
     241           0 :   case SEND_START:
     242           0 :     link_->send_start_i();
     243           0 :     break;
     244           0 :   case SEND:
     245           0 :     link_->send_i(req.element_);
     246           0 :     break;
     247           0 :   case SEND_STOP:
     248             :     //DataLink::send_stop_i expects the RepoId of the message sender, however, in ThreadPerConnectionSendTask
     249             :     //the control element will be a null element with only the op_ set.  Thus pass in GUID_UNKNOWN which will
     250             :     //allow send_stop to call send_delayed_notifications without a match.  In the case of ThreadPerConnectionSendTask
     251             :     //this is allowable because only one thread will be managing the sending thus no deadlock down in send_delayed_notifications()
     252           0 :     link_->send_stop_i(GUID_UNKNOWN);
     253           0 :     break;
     254           0 :   default:
     255           0 :     ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: ThreadPerConnectionSendTask::execute unknown command %d\n",
     256             :                req.op_));
     257           0 :     break;
     258             :   }
     259           0 : }
     260             : 
     261             : }
     262             : }
     263             : 
     264             : OPENDDS_END_VERSIONED_NAMESPACE_DECL

Generated by: LCOV version 1.16