00001
00002
00003
00004 #include "DCPS/DdsDcps_pch.h"
00005 #include <dds/DCPS/MessageTracker.h>
00006 #include <dds/DCPS/Service_Participant.h>
00007 #include "ace/ACE.h"
00008 #include "ace/Guard_T.h"
00009 #include "ace/OS_NS_time.h"
00010
00011 using namespace OpenDDS::DCPS;
00012
00013 MessageTracker::MessageTracker(const OPENDDS_STRING& msg_src)
00014 : msg_src_(msg_src)
00015 , dropped_count_(0)
00016 , delivered_count_(0)
00017 , sent_count_(0)
00018 , done_condition_(lock_)
00019 {
00020 }
00021
00022 bool
00023 MessageTracker::pending_messages()
00024 {
00025 if (sent_count_ > delivered_count_ + dropped_count_) {
00026 return true;
00027 }
00028 return false;
00029 }
00030
00031 void
00032 MessageTracker::message_sent()
00033 {
00034 ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
00035 ++sent_count_;
00036 }
00037
00038 void
00039 MessageTracker::message_delivered()
00040 {
00041 ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
00042 ++delivered_count_;
00043
00044 if (!pending_messages())
00045 done_condition_.broadcast();
00046 }
00047
00048 void
00049 MessageTracker::message_dropped()
00050 {
00051 ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
00052 ++dropped_count_;
00053
00054 if (!pending_messages())
00055 done_condition_.broadcast();
00056 }
00057
00058 void
00059 MessageTracker::wait_messages_pending(OPENDDS_STRING& caller_message)
00060 {
00061 ACE_Time_Value pending_timeout =
00062 TheServiceParticipant->pending_timeout();
00063
00064 ACE_Time_Value* pTimeout = 0;
00065
00066 if (pending_timeout != ACE_Time_Value::zero) {
00067 pTimeout = &pending_timeout;
00068 pending_timeout += ACE_OS::gettimeofday();
00069 }
00070
00071 ACE_GUARD(ACE_Thread_Mutex, guard, this->lock_);
00072 const bool report = DCPS_debug_level > 0 && pending_messages();
00073 if (report) {
00074 ACE_TCHAR date_time[50];
00075 ACE_TCHAR* const time =
00076 MessageTracker::timestamp(pending_timeout,
00077 date_time,
00078 50);
00079 ACE_DEBUG((LM_DEBUG,
00080 ACE_TEXT("%T (%P|%t) MessageTracker::wait_messages_pending ")
00081 ACE_TEXT("from source=%C will wait until %s.\n"),
00082 msg_src_.c_str(),
00083 (pTimeout == 0 ? ACE_TEXT("(no timeout)") : time)));
00084 }
00085 while (true) {
00086 if (!pending_messages())
00087 break;
00088
00089 if (done_condition_.wait(pTimeout) == -1 && pending_messages()) {
00090 if (DCPS_debug_level) {
00091 ACE_DEBUG((LM_INFO,
00092 ACE_TEXT("(%P|%t) %T MessageTracker::")
00093 ACE_TEXT("wait_messages_pending (Redmine Issue# 1446) %p (caller: %s)\n"),
00094 ACE_TEXT("Timed out waiting for messages to be transported"),
00095 caller_message.c_str()));
00096 }
00097 break;
00098 }
00099 }
00100 if (report) {
00101 ACE_DEBUG((LM_DEBUG,
00102 "%T (%P|%t) MessageTracker::wait_messages_pending done\n"));
00103 }
00104 }
00105
00106 ACE_TCHAR *
00107 MessageTracker::timestamp (const ACE_Time_Value& time_value,
00108 ACE_TCHAR date_and_time[],
00109 size_t date_and_timelen)
00110 {
00111
00112
00113
00114
00115 if (date_and_timelen < 27)
00116 {
00117 errno = EINVAL;
00118 return 0;
00119 }
00120
00121 ACE_Time_Value cur_time =
00122 (time_value == ACE_Time_Value::zero) ?
00123 ACE_Time_Value (ACE_OS::gettimeofday ()) : time_value;
00124 time_t secs = cur_time.sec ();
00125 struct tm tms;
00126 ACE_OS::localtime_r (&secs, &tms);
00127 ACE_OS::snprintf (date_and_time,
00128 date_and_timelen,
00129 ACE_TEXT ("%4.4d-%2.2d-%2.2d %2.2d:%2.2d:%2.2d.%06ld"),
00130 tms.tm_year + 1900,
00131 tms.tm_mon + 1,
00132 tms.tm_mday,
00133 tms.tm_hour,
00134 tms.tm_min,
00135 tms.tm_sec,
00136 static_cast<long> (cur_time.usec()));
00137 date_and_time[date_and_timelen - 1] = '\0';
00138 return &date_and_time[11];
00139 }
00140
00141
00142 int
00143 MessageTracker::dropped_count()
00144 {
00145 return dropped_count_;
00146 }