MessageTracker.cpp
Go to the documentation of this file.00001
00002
00003
00004
00005
00006 #include "DCPS/DdsDcps_pch.h"
00007 #include "ace/Synch.h"
00008 #include <dds/DCPS/MessageTracker.h>
00009 #include <dds/DCPS/Service_Participant.h>
00010 #include "ace/ACE.h"
00011 #include "ace/Guard_T.h"
00012 #include "ace/OS_NS_time.h"
00013
00014 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00015
00016 using namespace OpenDDS::DCPS;
00017
00018 MessageTracker::MessageTracker(const OPENDDS_STRING& msg_src)
00019 : msg_src_(msg_src)
00020 , dropped_count_(0)
00021 , delivered_count_(0)
00022 , sent_count_(0)
00023 , done_condition_(lock_)
00024 {
00025 }
00026
00027 bool
00028 MessageTracker::pending_messages()
00029 {
00030 if (sent_count_ > delivered_count_ + dropped_count_) {
00031 return true;
00032 }
00033 return false;
00034 }
00035
00036 void
00037 MessageTracker::message_sent()
00038 {
00039 ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
00040 ++sent_count_;
00041 }
00042
00043 void
00044 MessageTracker::message_delivered()
00045 {
00046 ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
00047 ++delivered_count_;
00048
00049 if (!pending_messages())
00050 done_condition_.broadcast();
00051 }
00052
00053 void
00054 MessageTracker::message_dropped()
00055 {
00056 ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
00057 ++dropped_count_;
00058
00059 if (!pending_messages())
00060 done_condition_.broadcast();
00061 }
00062
00063 void
00064 MessageTracker::wait_messages_pending(OPENDDS_STRING& caller_message)
00065 {
00066 ACE_Time_Value pending_timeout =
00067 TheServiceParticipant->pending_timeout();
00068
00069 ACE_Time_Value* pTimeout = 0;
00070
00071 if (pending_timeout != ACE_Time_Value::zero) {
00072 pTimeout = &pending_timeout;
00073 pending_timeout += ACE_OS::gettimeofday();
00074 }
00075
00076 ACE_GUARD(ACE_Thread_Mutex, guard, this->lock_);
00077 const bool report = DCPS_debug_level > 0 && pending_messages();
00078 if (report) {
00079 if (pTimeout != 0) {
00080 ACE_DEBUG((LM_DEBUG,
00081 ACE_TEXT("%T (%P|%t) MessageTracker::wait_messages_pending ")
00082 ACE_TEXT("from source=%C will wait until %#T.\n"),
00083 msg_src_.c_str(), &pending_timeout));
00084 } else {
00085 ACE_DEBUG((LM_DEBUG,
00086 ACE_TEXT("%T (%P|%t) MessageTracker::wait_messages_pending ")
00087 ACE_TEXT("from source=%C will wait with no timeout.\n")));
00088 }
00089 }
00090 while (true) {
00091 if (!pending_messages())
00092 break;
00093
00094 if (done_condition_.wait(pTimeout) == -1 && pending_messages()) {
00095 if (DCPS_debug_level) {
00096 ACE_DEBUG((LM_INFO,
00097 ACE_TEXT("(%P|%t) %T MessageTracker::")
00098 ACE_TEXT("wait_messages_pending (Redmine Issue# 1446) %p (caller: %C)\n"),
00099 ACE_TEXT("Timed out waiting for messages to be transported"),
00100 caller_message.c_str()));
00101 }
00102 break;
00103 }
00104 }
00105 if (report) {
00106 ACE_DEBUG((LM_DEBUG,
00107 "%T (%P|%t) MessageTracker::wait_messages_pending done\n"));
00108 }
00109 }
00110
00111 int
00112 MessageTracker::dropped_count()
00113 {
00114 return dropped_count_;
00115 }
00116
00117 OPENDDS_END_VERSIONED_NAMESPACE_DECL