Line data Source code
1 : /* 2 : * Distributed under the OpenDDS License. 3 : * See: http://www.opendds.org/license.html 4 : */ 5 : 6 : #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/ 7 : 8 : #include "MessageTracker.h" 9 : #include "Service_Participant.h" 10 : 11 : #include <ace/Synch.h> 12 : #include <ace/ACE.h> 13 : #include <ace/Guard_T.h> 14 : #include <ace/OS_NS_time.h> 15 : 16 : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL 17 : 18 : namespace OpenDDS { 19 : namespace DCPS { 20 : 21 3 : MessageTracker::MessageTracker(const OPENDDS_STRING& msg_src) 22 3 : : msg_src_(msg_src) 23 3 : , dropped_count_(0) 24 3 : , delivered_count_(0) 25 3 : , sent_count_(0) 26 3 : , done_condition_(lock_) 27 : { 28 3 : } 29 : 30 : bool 31 0 : MessageTracker::pending_messages() const 32 : { 33 0 : ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, lock_, false); 34 0 : return pending_messages_i(); 35 0 : } 36 : 37 : bool 38 3 : MessageTracker::pending_messages_i() const 39 : { 40 3 : return sent_count_ > delivered_count_ + dropped_count_; 41 : } 42 : 43 : void 44 0 : MessageTracker::message_sent() 45 : { 46 0 : ACE_GUARD(ACE_Thread_Mutex, guard, lock_); 47 0 : ++sent_count_; 48 0 : } 49 : 50 : void 51 0 : MessageTracker::message_delivered() 52 : { 53 0 : ACE_GUARD(ACE_Thread_Mutex, guard, lock_); 54 0 : ++delivered_count_; 55 : 56 0 : if (!pending_messages_i()) { 57 0 : done_condition_.notify_all(); 58 : } 59 0 : } 60 : 61 : void 62 0 : MessageTracker::message_dropped() 63 : { 64 0 : ACE_GUARD(ACE_Thread_Mutex, guard, lock_); 65 0 : ++dropped_count_; 66 : 67 0 : if (!pending_messages_i()) { 68 0 : done_condition_.notify_all(); 69 : } 70 0 : } 71 : 72 3 : void MessageTracker::wait_messages_pending(const char* caller) 73 : { 74 3 : const TimeDuration pending_timeout(TheServiceParticipant->pending_timeout()); 75 3 : wait_messages_pending(caller, pending_timeout.is_zero() ? 76 3 : MonotonicTimePoint() : MonotonicTimePoint::now() + pending_timeout); 77 3 : } 78 : 79 3 : void MessageTracker::wait_messages_pending(const char* caller, const MonotonicTimePoint& deadline) 80 : { 81 3 : const bool use_deadline = deadline.is_zero(); 82 3 : ACE_GUARD(ACE_Thread_Mutex, guard, this->lock_); 83 3 : const bool report = DCPS_debug_level > 0 && pending_messages_i(); 84 3 : if (report) { 85 0 : if (use_deadline) { 86 0 : ACE_DEBUG((LM_DEBUG, 87 : ACE_TEXT("(%P|%t) MessageTracker::wait_messages_pending ") 88 : ACE_TEXT("from source=%C will wait until %#T.\n"), 89 : msg_src_.c_str(), &deadline.value())); 90 : } else { 91 0 : ACE_DEBUG((LM_DEBUG, 92 : ACE_TEXT("(%P|%t) MessageTracker::wait_messages_pending ") 93 : ACE_TEXT("from source=%C will wait with no timeout.\n"), 94 : msg_src_.c_str())); 95 : } 96 : } 97 3 : bool loop = true; 98 3 : ThreadStatusManager& thread_status_manager = TheServiceParticipant->get_thread_status_manager(); 99 3 : while (loop && pending_messages_i()) { 100 0 : switch (done_condition_.wait_until(deadline, thread_status_manager)) { 101 0 : case CvStatus_Timeout: 102 0 : if (DCPS_debug_level && pending_messages_i()) { 103 0 : ACE_DEBUG((LM_DEBUG, 104 : "(%P|%t) MessageTracker::wait_messages_pending: " 105 : "Timed out waiting for messages to be transported (caller: %C)\n", 106 : caller)); 107 : } 108 0 : loop = false; 109 0 : break; 110 : 111 0 : case CvStatus_NoTimeout: 112 0 : break; 113 : 114 0 : case CvStatus_Error: 115 0 : if (DCPS_debug_level) { 116 0 : ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: MessageTracker::wait_messages_pending: " 117 : "error in wait_until\n")); 118 : } 119 0 : loop = false; 120 0 : return; 121 : } 122 : } 123 3 : if (report) { 124 0 : ACE_DEBUG((LM_DEBUG, "(%P|%t) MessageTracker::wait_messages_pending %T done\n")); 125 : } 126 3 : } 127 : 128 : int 129 0 : MessageTracker::dropped_count() const 130 : { 131 0 : ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, lock_, 0); 132 0 : return dropped_count_; 133 0 : } 134 : 135 : } // namespace DCPS 136 : } // namespace OpenDDS 137 : 138 : OPENDDS_END_VERSIONED_NAMESPACE_DECL