#include <MessageTracker.h>
Public Member Functions | |
MessageTracker (const OPENDDS_STRING &msg_src) | |
void | message_sent () |
void | message_delivered () |
void | message_dropped () |
bool | pending_messages () |
void | wait_messages_pending (OPENDDS_STRING &caller_message) |
int | dropped_count () |
Static Public Member Functions | |
static ACE_TCHAR * | timestamp (const ACE_Time_Value &time_value, ACE_TCHAR date_and_time[], size_t date_and_timelen) |
Private Attributes | |
const OPENDDS_STRING | msg_src_ |
int | dropped_count_ |
int | delivered_count_ |
int | sent_count_ |
ACE_Thread_Mutex | lock_ |
ACE_Condition_Thread_Mutex | done_condition_ |
All messages have been transported condition variable. |
Definition at line 24 of file MessageTracker.h.
MessageTracker::MessageTracker | ( | const OPENDDS_STRING & | msg_src | ) |
Definition at line 13 of file MessageTracker.cpp.
00014 : msg_src_(msg_src) 00015 , dropped_count_(0) 00016 , delivered_count_(0) 00017 , sent_count_(0) 00018 , done_condition_(lock_) 00019 { 00020 }
int MessageTracker::dropped_count | ( | ) |
For testing.
Definition at line 143 of file MessageTracker.cpp.
References dropped_count_.
00144 { 00145 return dropped_count_; 00146 }
void MessageTracker::message_delivered | ( | ) |
Indicate that a message has been delivered by the transport layer.
Definition at line 39 of file MessageTracker.cpp.
References delivered_count_, done_condition_, lock_, and pending_messages().
Referenced by OpenDDS::DCPS::SendResponseListener::control_delivered(), OpenDDS::DCPS::DataWriterImpl::control_delivered(), OpenDDS::DCPS::WriteDataContainer::data_delivered(), and OpenDDS::DCPS::SendResponseListener::data_delivered().
00040 { 00041 ACE_GUARD(ACE_Thread_Mutex, guard, lock_); 00042 ++delivered_count_; 00043 00044 if (!pending_messages()) 00045 done_condition_.broadcast(); 00046 }
void MessageTracker::message_dropped | ( | ) |
Indicate that a message has been dropped by the transport layer.
Definition at line 49 of file MessageTracker.cpp.
References done_condition_, dropped_count_, lock_, and pending_messages().
Referenced by OpenDDS::DCPS::DataWriterImpl::association_complete_i(), OpenDDS::DCPS::SendResponseListener::control_dropped(), OpenDDS::DCPS::DataWriterImpl::control_dropped(), OpenDDS::DCPS::WriteDataContainer::data_dropped(), OpenDDS::DCPS::SendResponseListener::data_dropped(), and OpenDDS::DCPS::DataWriterImpl::send_control().
00050 { 00051 ACE_GUARD(ACE_Thread_Mutex, guard, lock_); 00052 ++dropped_count_; 00053 00054 if (!pending_messages()) 00055 done_condition_.broadcast(); 00056 }
void MessageTracker::message_sent | ( | ) |
Indicate that a message has been to the transport layer.
Definition at line 32 of file MessageTracker.cpp.
References lock_, and sent_count_.
Referenced by OpenDDS::DCPS::DataWriterImpl::association_complete_i(), OpenDDS::DCPS::DataWriterImpl::send_all_to_flush_control(), OpenDDS::DCPS::DataWriterImpl::send_control(), and OpenDDS::DCPS::SendResponseListener::track_message().
00033 { 00034 ACE_GUARD(ACE_Thread_Mutex, guard, lock_); 00035 ++sent_count_; 00036 }
bool MessageTracker::pending_messages | ( | ) |
Answer if there are any messages that have not been accounted for.
Definition at line 23 of file MessageTracker.cpp.
References delivered_count_, dropped_count_, and sent_count_.
Referenced by message_delivered(), message_dropped(), OpenDDS::DCPS::DataWriterImpl::pending_control(), and wait_messages_pending().
00024 { 00025 if (sent_count_ > delivered_count_ + dropped_count_) { 00026 return true; 00027 } 00028 return false; 00029 }
ACE_TCHAR * MessageTracker::timestamp | ( | const ACE_Time_Value & | time_value, | |
ACE_TCHAR | date_and_time[], | |||
size_t | date_and_timelen | |||
) | [static] |
Provide a timestamp for the passed in time. DEPRECATED: remove and replace with ACE::timestamp once TAO 1.6a is no longer supported
Definition at line 107 of file MessageTracker.cpp.
Referenced by wait_messages_pending(), and OpenDDS::DCPS::WriteDataContainer::wait_pending().
00110 { 00111 //ACE_TRACE ("ACE::timestamp"); 00112 00113 // This magic number is from the formatting statement 00114 // farther down this routine. 00115 if (date_and_timelen < 27) 00116 { 00117 errno = EINVAL; 00118 return 0; 00119 } 00120 00121 ACE_Time_Value cur_time = 00122 (time_value == ACE_Time_Value::zero) ? 00123 ACE_Time_Value (ACE_OS::gettimeofday ()) : time_value; 00124 time_t secs = cur_time.sec (); 00125 struct tm tms; 00126 ACE_OS::localtime_r (&secs, &tms); 00127 ACE_OS::snprintf (date_and_time, 00128 date_and_timelen, 00129 ACE_TEXT ("%4.4d-%2.2d-%2.2d %2.2d:%2.2d:%2.2d.%06ld"), 00130 tms.tm_year + 1900, 00131 tms.tm_mon + 1, 00132 tms.tm_mday, 00133 tms.tm_hour, 00134 tms.tm_min, 00135 tms.tm_sec, 00136 static_cast<long> (cur_time.usec())); 00137 date_and_time[date_and_timelen - 1] = '\0'; 00138 return &date_and_time[11]; 00139 }
void MessageTracker::wait_messages_pending | ( | OPENDDS_STRING & | caller_message | ) |
Block until all messages have been account for.
Definition at line 59 of file MessageTracker.cpp.
References OpenDDS::DCPS::DCPS_debug_level, done_condition_, msg_src_, pending_messages(), TheServiceParticipant, and timestamp().
Referenced by OpenDDS::DCPS::DataWriterImpl::wait_control_pending(), and OpenDDS::DCPS::SendResponseListener::~SendResponseListener().
00060 { 00061 ACE_Time_Value pending_timeout = 00062 TheServiceParticipant->pending_timeout(); 00063 00064 ACE_Time_Value* pTimeout = 0; 00065 00066 if (pending_timeout != ACE_Time_Value::zero) { 00067 pTimeout = &pending_timeout; 00068 pending_timeout += ACE_OS::gettimeofday(); 00069 } 00070 00071 ACE_GUARD(ACE_Thread_Mutex, guard, this->lock_); 00072 const bool report = DCPS_debug_level > 0 && pending_messages(); 00073 if (report) { 00074 ACE_TCHAR date_time[50]; 00075 ACE_TCHAR* const time = 00076 MessageTracker::timestamp(pending_timeout, 00077 date_time, 00078 50); 00079 ACE_DEBUG((LM_DEBUG, 00080 ACE_TEXT("%T (%P|%t) MessageTracker::wait_messages_pending ") 00081 ACE_TEXT("from source=%C will wait until %s.\n"), 00082 msg_src_.c_str(), 00083 (pTimeout == 0 ? ACE_TEXT("(no timeout)") : time))); 00084 } 00085 while (true) { 00086 if (!pending_messages()) 00087 break; 00088 00089 if (done_condition_.wait(pTimeout) == -1 && pending_messages()) { 00090 if (DCPS_debug_level) { 00091 ACE_DEBUG((LM_INFO, 00092 ACE_TEXT("(%P|%t) %T MessageTracker::") 00093 ACE_TEXT("wait_messages_pending (Redmine Issue# 1446) %p (caller: %s)\n"), 00094 ACE_TEXT("Timed out waiting for messages to be transported"), 00095 caller_message.c_str())); 00096 } 00097 break; 00098 } 00099 } 00100 if (report) { 00101 ACE_DEBUG((LM_DEBUG, 00102 "%T (%P|%t) MessageTracker::wait_messages_pending done\n")); 00103 } 00104 }
int OpenDDS::DCPS::MessageTracker::delivered_count_ [private] |
Definition at line 72 of file MessageTracker.h.
Referenced by message_delivered(), and pending_messages().
ACE_Condition_Thread_Mutex OpenDDS::DCPS::MessageTracker::done_condition_ [private] |
All messages have been transported condition variable.
Definition at line 78 of file MessageTracker.h.
Referenced by message_delivered(), message_dropped(), and wait_messages_pending().
int OpenDDS::DCPS::MessageTracker::dropped_count_ [private] |
Definition at line 71 of file MessageTracker.h.
Referenced by dropped_count(), message_dropped(), and pending_messages().
ACE_Thread_Mutex OpenDDS::DCPS::MessageTracker::lock_ [private] |
Definition at line 75 of file MessageTracker.h.
Referenced by message_delivered(), message_dropped(), and message_sent().
const OPENDDS_STRING OpenDDS::DCPS::MessageTracker::msg_src_ [private] |
int OpenDDS::DCPS::MessageTracker::sent_count_ [private] |
Definition at line 73 of file MessageTracker.h.
Referenced by message_sent(), and pending_messages().