00001 
00002 
00003 
00004 #include "DCPS/DdsDcps_pch.h" 
00005 #include <dds/DCPS/MessageTracker.h>
00006 #include <dds/DCPS/Service_Participant.h>
00007 #include "ace/ACE.h"
00008 #include "ace/Guard_T.h"
00009 #include "ace/OS_NS_time.h"
00010 
00011 using namespace OpenDDS::DCPS;
00012 
00013 MessageTracker::MessageTracker(const OPENDDS_STRING& msg_src)
00014 : msg_src_(msg_src)
00015 , dropped_count_(0)
00016 , delivered_count_(0)
00017 , sent_count_(0)
00018 , done_condition_(lock_)
00019 {
00020 }
00021 
00022 bool
00023 MessageTracker::pending_messages()
00024 {
00025   if (sent_count_ > delivered_count_ + dropped_count_) {
00026     return true;
00027   }
00028   return false;
00029 }
00030 
00031 void
00032 MessageTracker::message_sent()
00033 {
00034   ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
00035   ++sent_count_;
00036 }
00037 
00038 void
00039 MessageTracker::message_delivered()
00040 {
00041   ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
00042   ++delivered_count_;
00043 
00044   if (!pending_messages())
00045     done_condition_.broadcast();
00046 }
00047 
00048 void
00049 MessageTracker::message_dropped()
00050 {
00051   ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
00052   ++dropped_count_;
00053 
00054   if (!pending_messages())
00055     done_condition_.broadcast();
00056 }
00057 
00058 void
00059 MessageTracker::wait_messages_pending(OPENDDS_STRING& caller_message)
00060 {
00061   ACE_Time_Value pending_timeout =
00062     TheServiceParticipant->pending_timeout();
00063 
00064   ACE_Time_Value* pTimeout = 0;
00065 
00066   if (pending_timeout != ACE_Time_Value::zero) {
00067     pTimeout = &pending_timeout;
00068     pending_timeout += ACE_OS::gettimeofday();
00069   }
00070 
00071   ACE_GUARD(ACE_Thread_Mutex, guard, this->lock_);
00072   const bool report = DCPS_debug_level > 0 && pending_messages();
00073   if (report) {
00074     ACE_TCHAR date_time[50];
00075     ACE_TCHAR* const time =
00076       MessageTracker::timestamp(pending_timeout,
00077                                 date_time,
00078                                 50);
00079     ACE_DEBUG((LM_DEBUG,
00080                ACE_TEXT("%T (%P|%t) MessageTracker::wait_messages_pending ")
00081                ACE_TEXT("from source=%C will wait until %s.\n"),
00082                msg_src_.c_str(),
00083                (pTimeout == 0 ? ACE_TEXT("(no timeout)") : time)));
00084   }
00085   while (true) {
00086     if (!pending_messages())
00087       break;
00088 
00089     if (done_condition_.wait(pTimeout) == -1 && pending_messages()) {
00090       if (DCPS_debug_level) {
00091         ACE_DEBUG((LM_INFO,
00092                    ACE_TEXT("(%P|%t) %T MessageTracker::")
00093                    ACE_TEXT("wait_messages_pending (Redmine Issue# 1446) %p (caller: %s)\n"),
00094                    ACE_TEXT("Timed out waiting for messages to be transported"),
00095                    caller_message.c_str()));
00096       }
00097       break;
00098     }
00099   }
00100   if (report) {
00101     ACE_DEBUG((LM_DEBUG,
00102                "%T (%P|%t) MessageTracker::wait_messages_pending done\n"));
00103   }
00104 }
00105 
00106 ACE_TCHAR *
00107 MessageTracker::timestamp (const ACE_Time_Value& time_value,
00108                            ACE_TCHAR date_and_time[],
00109                            size_t date_and_timelen)
00110 {
00111   
00112 
00113   
00114   
00115   if (date_and_timelen < 27)
00116     {
00117       errno = EINVAL;
00118       return 0;
00119     }
00120 
00121   ACE_Time_Value cur_time =
00122     (time_value == ACE_Time_Value::zero) ?
00123         ACE_Time_Value (ACE_OS::gettimeofday ()) : time_value;
00124   time_t secs = cur_time.sec ();
00125   struct tm tms;
00126   ACE_OS::localtime_r (&secs, &tms);
00127   ACE_OS::snprintf (date_and_time,
00128                     date_and_timelen,
00129                     ACE_TEXT ("%4.4d-%2.2d-%2.2d %2.2d:%2.2d:%2.2d.%06ld"),
00130                     tms.tm_year + 1900,
00131                     tms.tm_mon + 1,
00132                     tms.tm_mday,
00133                     tms.tm_hour,
00134                     tms.tm_min,
00135                     tms.tm_sec,
00136                     static_cast<long> (cur_time.usec()));
00137   date_and_time[date_and_timelen - 1] = '\0';
00138   return &date_and_time[11];
00139 }
00140 
00141 
00142 int
00143 MessageTracker::dropped_count()
00144 {
00145   return dropped_count_;
00146 }