#include <MessageTracker.h>
Public Member Functions | |
MessageTracker (const OPENDDS_STRING &msg_src) | |
void | message_sent () |
void | message_delivered () |
void | message_dropped () |
bool | pending_messages () |
void | wait_messages_pending (OPENDDS_STRING &caller_message) |
int | dropped_count () |
Private Attributes | |
const OPENDDS_STRING | msg_src_ |
int | dropped_count_ |
int | delivered_count_ |
int | sent_count_ |
ACE_Thread_Mutex | lock_ |
ACE_Condition_Thread_Mutex | done_condition_ |
All messages have been transported condition variable. |
A simple message tracker to use to wait until all messages have been accounted for being continuing processing.
Definition at line 25 of file MessageTracker.h.
MessageTracker::MessageTracker | ( | const OPENDDS_STRING & | msg_src | ) |
Definition at line 18 of file MessageTracker.cpp.
00019 : msg_src_(msg_src) 00020 , dropped_count_(0) 00021 , delivered_count_(0) 00022 , sent_count_(0) 00023 , done_condition_(lock_) 00024 { 00025 }
int MessageTracker::dropped_count | ( | ) |
For testing.
Definition at line 112 of file MessageTracker.cpp.
References dropped_count_.
00113 { 00114 return dropped_count_; 00115 }
void MessageTracker::message_delivered | ( | ) |
Indicate that a message has been delivered by the transport layer.
Definition at line 44 of file MessageTracker.cpp.
References ACE_Condition< ACE_Thread_Mutex >::broadcast(), delivered_count_, done_condition_, lock_, and pending_messages().
Referenced by OpenDDS::DCPS::SendResponseListener::control_delivered(), OpenDDS::DCPS::DataWriterImpl::control_delivered(), OpenDDS::DCPS::WriteDataContainer::data_delivered(), and OpenDDS::DCPS::SendResponseListener::data_delivered().
00045 { 00046 ACE_GUARD(ACE_Thread_Mutex, guard, lock_); 00047 ++delivered_count_; 00048 00049 if (!pending_messages()) 00050 done_condition_.broadcast(); 00051 }
void MessageTracker::message_dropped | ( | ) |
Indicate that a message has been dropped by the transport layer.
Definition at line 54 of file MessageTracker.cpp.
References ACE_Condition< ACE_Thread_Mutex >::broadcast(), done_condition_, dropped_count_, lock_, and pending_messages().
Referenced by OpenDDS::DCPS::DataWriterImpl::association_complete_i(), OpenDDS::DCPS::SendResponseListener::control_dropped(), OpenDDS::DCPS::DataWriterImpl::control_dropped(), OpenDDS::DCPS::WriteDataContainer::data_dropped(), OpenDDS::DCPS::SendResponseListener::data_dropped(), and OpenDDS::DCPS::DataWriterImpl::send_control().
00055 { 00056 ACE_GUARD(ACE_Thread_Mutex, guard, lock_); 00057 ++dropped_count_; 00058 00059 if (!pending_messages()) 00060 done_condition_.broadcast(); 00061 }
void MessageTracker::message_sent | ( | ) |
Indicate that a message has been to the transport layer.
Definition at line 37 of file MessageTracker.cpp.
References lock_, and sent_count_.
Referenced by OpenDDS::DCPS::DataWriterImpl::association_complete_i(), OpenDDS::DCPS::DataWriterImpl::send_all_to_flush_control(), OpenDDS::DCPS::DataWriterImpl::send_control(), and OpenDDS::DCPS::SendResponseListener::track_message().
00038 { 00039 ACE_GUARD(ACE_Thread_Mutex, guard, lock_); 00040 ++sent_count_; 00041 }
bool MessageTracker::pending_messages | ( | ) |
Answer if there are any messages that have not been accounted for.
Definition at line 28 of file MessageTracker.cpp.
References delivered_count_, dropped_count_, and sent_count_.
Referenced by message_delivered(), message_dropped(), and wait_messages_pending().
00029 { 00030 if (sent_count_ > delivered_count_ + dropped_count_) { 00031 return true; 00032 } 00033 return false; 00034 }
void MessageTracker::wait_messages_pending | ( | OPENDDS_STRING & | caller_message | ) |
Block until all messages have been account for.
Definition at line 64 of file MessageTracker.cpp.
References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, done_condition_, ACE_OS::gettimeofday(), LM_DEBUG, LM_INFO, lock_, msg_src_, pending_messages(), TheServiceParticipant, ACE_Condition< ACE_Thread_Mutex >::wait(), and ACE_Time_Value::zero.
Referenced by OpenDDS::DCPS::DataWriterImpl::wait_control_pending(), and OpenDDS::DCPS::SendResponseListener::~SendResponseListener().
00065 { 00066 ACE_Time_Value pending_timeout = 00067 TheServiceParticipant->pending_timeout(); 00068 00069 ACE_Time_Value* pTimeout = 0; 00070 00071 if (pending_timeout != ACE_Time_Value::zero) { 00072 pTimeout = &pending_timeout; 00073 pending_timeout += ACE_OS::gettimeofday(); 00074 } 00075 00076 ACE_GUARD(ACE_Thread_Mutex, guard, this->lock_); 00077 const bool report = DCPS_debug_level > 0 && pending_messages(); 00078 if (report) { 00079 if (pTimeout != 0) { 00080 ACE_DEBUG((LM_DEBUG, 00081 ACE_TEXT("%T (%P|%t) MessageTracker::wait_messages_pending ") 00082 ACE_TEXT("from source=%C will wait until %#T.\n"), 00083 msg_src_.c_str(), &pending_timeout)); 00084 } else { 00085 ACE_DEBUG((LM_DEBUG, 00086 ACE_TEXT("%T (%P|%t) MessageTracker::wait_messages_pending ") 00087 ACE_TEXT("from source=%C will wait with no timeout.\n"))); 00088 } 00089 } 00090 while (true) { 00091 if (!pending_messages()) 00092 break; 00093 00094 if (done_condition_.wait(pTimeout) == -1 && pending_messages()) { 00095 if (DCPS_debug_level) { 00096 ACE_DEBUG((LM_INFO, 00097 ACE_TEXT("(%P|%t) %T MessageTracker::") 00098 ACE_TEXT("wait_messages_pending (Redmine Issue# 1446) %p (caller: %C)\n"), 00099 ACE_TEXT("Timed out waiting for messages to be transported"), 00100 caller_message.c_str())); 00101 } 00102 break; 00103 } 00104 } 00105 if (report) { 00106 ACE_DEBUG((LM_DEBUG, 00107 "%T (%P|%t) MessageTracker::wait_messages_pending done\n")); 00108 } 00109 }
int OpenDDS::DCPS::MessageTracker::delivered_count_ [private] |
Definition at line 62 of file MessageTracker.h.
Referenced by message_delivered(), and pending_messages().
All messages have been transported condition variable.
Definition at line 68 of file MessageTracker.h.
Referenced by message_delivered(), message_dropped(), and wait_messages_pending().
int OpenDDS::DCPS::MessageTracker::dropped_count_ [private] |
Definition at line 61 of file MessageTracker.h.
Referenced by dropped_count(), message_dropped(), and pending_messages().
Definition at line 65 of file MessageTracker.h.
Referenced by message_delivered(), message_dropped(), message_sent(), and wait_messages_pending().
const OPENDDS_STRING OpenDDS::DCPS::MessageTracker::msg_src_ [private] |
Definition at line 60 of file MessageTracker.h.
Referenced by wait_messages_pending().
int OpenDDS::DCPS::MessageTracker::sent_count_ [private] |
Definition at line 63 of file MessageTracker.h.
Referenced by message_sent(), and pending_messages().