LCOV - code coverage report
Current view: top level - DCPS - MessageTracker.cpp (source / functions) Hit Total Coverage
Test: coverage.info Lines: 24 65 36.9 %
Date: 2023-04-30 01:32:43 Functions: 4 9 44.4 %

          Line data    Source code
       1             : /*
       2             :  * Distributed under the OpenDDS License.
       3             :  * See: http://www.opendds.org/license.html
       4             :  */
       5             : 
       6             : #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
       7             : 
       8             : #include "MessageTracker.h"
       9             : #include "Service_Participant.h"
      10             : 
      11             : #include <ace/Synch.h>
      12             : #include <ace/ACE.h>
      13             : #include <ace/Guard_T.h>
      14             : #include <ace/OS_NS_time.h>
      15             : 
      16             : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
      17             : 
      18             : namespace OpenDDS {
      19             : namespace DCPS {
      20             : 
      21           3 : MessageTracker::MessageTracker(const OPENDDS_STRING& msg_src)
      22           3 : : msg_src_(msg_src)
      23           3 : , dropped_count_(0)
      24           3 : , delivered_count_(0)
      25           3 : , sent_count_(0)
      26           3 : , done_condition_(lock_)
      27             : {
      28           3 : }
      29             : 
      30             : bool
      31           0 : MessageTracker::pending_messages() const
      32             : {
      33           0 :   ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, lock_, false);
      34           0 :   return pending_messages_i();
      35           0 : }
      36             : 
      37             : bool
      38           3 : MessageTracker::pending_messages_i() const
      39             : {
      40           3 :   return sent_count_ > delivered_count_ + dropped_count_;
      41             : }
      42             : 
      43             : void
      44           0 : MessageTracker::message_sent()
      45             : {
      46           0 :   ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
      47           0 :   ++sent_count_;
      48           0 : }
      49             : 
      50             : void
      51           0 : MessageTracker::message_delivered()
      52             : {
      53           0 :   ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
      54           0 :   ++delivered_count_;
      55             : 
      56           0 :   if (!pending_messages_i()) {
      57           0 :     done_condition_.notify_all();
      58             :   }
      59           0 : }
      60             : 
      61             : void
      62           0 : MessageTracker::message_dropped()
      63             : {
      64           0 :   ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
      65           0 :   ++dropped_count_;
      66             : 
      67           0 :   if (!pending_messages_i()) {
      68           0 :     done_condition_.notify_all();
      69             :   }
      70           0 : }
      71             : 
      72           3 : void MessageTracker::wait_messages_pending(const char* caller)
      73             : {
      74           3 :   const TimeDuration pending_timeout(TheServiceParticipant->pending_timeout());
      75           3 :   wait_messages_pending(caller, pending_timeout.is_zero() ?
      76           3 :     MonotonicTimePoint() : MonotonicTimePoint::now() + pending_timeout);
      77           3 : }
      78             : 
      79           3 : void MessageTracker::wait_messages_pending(const char* caller, const MonotonicTimePoint& deadline)
      80             : {
      81           3 :   const bool use_deadline = deadline.is_zero();
      82           3 :   ACE_GUARD(ACE_Thread_Mutex, guard, this->lock_);
      83           3 :   const bool report = DCPS_debug_level > 0 && pending_messages_i();
      84           3 :   if (report) {
      85           0 :     if (use_deadline) {
      86           0 :       ACE_DEBUG((LM_DEBUG,
      87             :                 ACE_TEXT("(%P|%t) MessageTracker::wait_messages_pending ")
      88             :                 ACE_TEXT("from source=%C will wait until %#T.\n"),
      89             :                 msg_src_.c_str(), &deadline.value()));
      90             :     } else {
      91           0 :       ACE_DEBUG((LM_DEBUG,
      92             :                 ACE_TEXT("(%P|%t) MessageTracker::wait_messages_pending ")
      93             :                 ACE_TEXT("from source=%C will wait with no timeout.\n"),
      94             :                 msg_src_.c_str()));
      95             :     }
      96             :   }
      97           3 :   bool loop = true;
      98           3 :   ThreadStatusManager& thread_status_manager = TheServiceParticipant->get_thread_status_manager();
      99           3 :   while (loop && pending_messages_i()) {
     100           0 :     switch (done_condition_.wait_until(deadline, thread_status_manager)) {
     101           0 :     case CvStatus_Timeout:
     102           0 :       if (DCPS_debug_level && pending_messages_i()) {
     103           0 :         ACE_DEBUG((LM_DEBUG,
     104             :                    "(%P|%t) MessageTracker::wait_messages_pending: "
     105             :                    "Timed out waiting for messages to be transported (caller: %C)\n",
     106             :                    caller));
     107             :       }
     108           0 :       loop = false;
     109           0 :       break;
     110             : 
     111           0 :     case CvStatus_NoTimeout:
     112           0 :       break;
     113             : 
     114           0 :     case CvStatus_Error:
     115           0 :       if (DCPS_debug_level) {
     116           0 :         ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: MessageTracker::wait_messages_pending: "
     117             :           "error in wait_until\n"));
     118             :       }
     119           0 :       loop = false;
     120           0 :       return;
     121             :     }
     122             :   }
     123           3 :   if (report) {
     124           0 :     ACE_DEBUG((LM_DEBUG, "(%P|%t) MessageTracker::wait_messages_pending %T done\n"));
     125             :   }
     126           3 : }
     127             : 
     128             : int
     129           0 : MessageTracker::dropped_count() const
     130             : {
     131           0 :   ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, lock_, 0);
     132           0 :   return dropped_count_;
     133           0 : }
     134             : 
     135             : } // namespace DCPS
     136             : } // namespace OpenDDS
     137             : 
     138             : OPENDDS_END_VERSIONED_NAMESPACE_DECL

Generated by: LCOV version 1.16