MessageTracker.cpp

Go to the documentation of this file.
00001 /*
00002  * Distributed under the OpenDDS License.
00003  * See: http://www.opendds.org/license.html
00004  */
00005 
00006 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
00007 #include "ace/Synch.h"
00008 #include <dds/DCPS/MessageTracker.h>
00009 #include <dds/DCPS/Service_Participant.h>
00010 #include "ace/ACE.h"
00011 #include "ace/Guard_T.h"
00012 #include "ace/OS_NS_time.h"
00013 
00014 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00015 
00016 using namespace OpenDDS::DCPS;
00017 
00018 MessageTracker::MessageTracker(const OPENDDS_STRING& msg_src)
00019 : msg_src_(msg_src)
00020 , dropped_count_(0)
00021 , delivered_count_(0)
00022 , sent_count_(0)
00023 , done_condition_(lock_)
00024 {
00025 }
00026 
00027 bool
00028 MessageTracker::pending_messages()
00029 {
00030   if (sent_count_ > delivered_count_ + dropped_count_) {
00031     return true;
00032   }
00033   return false;
00034 }
00035 
00036 void
00037 MessageTracker::message_sent()
00038 {
00039   ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
00040   ++sent_count_;
00041 }
00042 
00043 void
00044 MessageTracker::message_delivered()
00045 {
00046   ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
00047   ++delivered_count_;
00048 
00049   if (!pending_messages())
00050     done_condition_.broadcast();
00051 }
00052 
00053 void
00054 MessageTracker::message_dropped()
00055 {
00056   ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
00057   ++dropped_count_;
00058 
00059   if (!pending_messages())
00060     done_condition_.broadcast();
00061 }
00062 
00063 void
00064 MessageTracker::wait_messages_pending(OPENDDS_STRING& caller_message)
00065 {
00066   ACE_Time_Value pending_timeout =
00067     TheServiceParticipant->pending_timeout();
00068 
00069   ACE_Time_Value* pTimeout = 0;
00070 
00071   if (pending_timeout != ACE_Time_Value::zero) {
00072     pTimeout = &pending_timeout;
00073     pending_timeout += ACE_OS::gettimeofday();
00074   }
00075 
00076   ACE_GUARD(ACE_Thread_Mutex, guard, this->lock_);
00077   const bool report = DCPS_debug_level > 0 && pending_messages();
00078   if (report) {
00079     if (pTimeout != 0) {
00080       ACE_DEBUG((LM_DEBUG,
00081                 ACE_TEXT("%T (%P|%t) MessageTracker::wait_messages_pending ")
00082                 ACE_TEXT("from source=%C will wait until %#T.\n"),
00083                 msg_src_.c_str(), &pending_timeout));
00084     } else {
00085       ACE_DEBUG((LM_DEBUG,
00086                 ACE_TEXT("%T (%P|%t) MessageTracker::wait_messages_pending ")
00087                 ACE_TEXT("from source=%C will wait with no timeout.\n")));
00088     }
00089   }
00090   while (true) {
00091     if (!pending_messages())
00092       break;
00093 
00094     if (done_condition_.wait(pTimeout) == -1 && pending_messages()) {
00095       if (DCPS_debug_level) {
00096         ACE_DEBUG((LM_INFO,
00097                    ACE_TEXT("(%P|%t) %T MessageTracker::")
00098                    ACE_TEXT("wait_messages_pending (Redmine Issue# 1446) %p (caller: %C)\n"),
00099                    ACE_TEXT("Timed out waiting for messages to be transported"),
00100                    caller_message.c_str()));
00101       }
00102       break;
00103     }
00104   }
00105   if (report) {
00106     ACE_DEBUG((LM_DEBUG,
00107                "%T (%P|%t) MessageTracker::wait_messages_pending done\n"));
00108   }
00109 }
00110 
00111 int
00112 MessageTracker::dropped_count()
00113 {
00114   return dropped_count_;
00115 }
00116 
00117 OPENDDS_END_VERSIONED_NAMESPACE_DECL
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1