OpenDDS::DCPS::MessageTracker Class Reference

#include <MessageTracker.h>

List of all members.

Public Member Functions

 MessageTracker (const OPENDDS_STRING &msg_src)
void message_sent ()
void message_delivered ()
void message_dropped ()
bool pending_messages ()
void wait_messages_pending (OPENDDS_STRING &caller_message)
int dropped_count ()

Static Public Member Functions

static ACE_TCHAR * timestamp (const ACE_Time_Value &time_value, ACE_TCHAR date_and_time[], size_t date_and_timelen)

Private Attributes

const OPENDDS_STRING msg_src_
int dropped_count_
int delivered_count_
int sent_count_
ACE_Thread_Mutex lock_
ACE_Condition_Thread_Mutex done_condition_
 All messages have been transported condition variable.


Detailed Description

A simple message tracker to use to wait until all messages have been accounted for being continuing processing.

Definition at line 24 of file MessageTracker.h.


Constructor & Destructor Documentation

MessageTracker::MessageTracker ( const OPENDDS_STRING &  msg_src  ) 

Definition at line 13 of file MessageTracker.cpp.

00014 : msg_src_(msg_src)
00015 , dropped_count_(0)
00016 , delivered_count_(0)
00017 , sent_count_(0)
00018 , done_condition_(lock_)
00019 {
00020 }


Member Function Documentation

int MessageTracker::dropped_count (  ) 

For testing.

Definition at line 143 of file MessageTracker.cpp.

References dropped_count_.

00144 {
00145   return dropped_count_;
00146 }

void MessageTracker::message_delivered (  ) 

Indicate that a message has been delivered by the transport layer.

Definition at line 39 of file MessageTracker.cpp.

References delivered_count_, done_condition_, lock_, and pending_messages().

Referenced by OpenDDS::DCPS::SendResponseListener::control_delivered(), OpenDDS::DCPS::DataWriterImpl::control_delivered(), OpenDDS::DCPS::WriteDataContainer::data_delivered(), and OpenDDS::DCPS::SendResponseListener::data_delivered().

00040 {
00041   ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
00042   ++delivered_count_;
00043 
00044   if (!pending_messages())
00045     done_condition_.broadcast();
00046 }

void MessageTracker::message_dropped (  ) 

Indicate that a message has been dropped by the transport layer.

Definition at line 49 of file MessageTracker.cpp.

References done_condition_, dropped_count_, lock_, and pending_messages().

Referenced by OpenDDS::DCPS::DataWriterImpl::association_complete_i(), OpenDDS::DCPS::SendResponseListener::control_dropped(), OpenDDS::DCPS::DataWriterImpl::control_dropped(), OpenDDS::DCPS::WriteDataContainer::data_dropped(), OpenDDS::DCPS::SendResponseListener::data_dropped(), and OpenDDS::DCPS::DataWriterImpl::send_control().

00050 {
00051   ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
00052   ++dropped_count_;
00053 
00054   if (!pending_messages())
00055     done_condition_.broadcast();
00056 }

void MessageTracker::message_sent (  ) 

Indicate that a message has been to the transport layer.

Definition at line 32 of file MessageTracker.cpp.

References lock_, and sent_count_.

Referenced by OpenDDS::DCPS::DataWriterImpl::association_complete_i(), OpenDDS::DCPS::DataWriterImpl::send_all_to_flush_control(), OpenDDS::DCPS::DataWriterImpl::send_control(), and OpenDDS::DCPS::SendResponseListener::track_message().

00033 {
00034   ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
00035   ++sent_count_;
00036 }

bool MessageTracker::pending_messages (  ) 

Answer if there are any messages that have not been accounted for.

Definition at line 23 of file MessageTracker.cpp.

References delivered_count_, dropped_count_, and sent_count_.

Referenced by message_delivered(), message_dropped(), OpenDDS::DCPS::DataWriterImpl::pending_control(), and wait_messages_pending().

00024 {
00025   if (sent_count_ > delivered_count_ + dropped_count_) {
00026     return true;
00027   }
00028   return false;
00029 }

ACE_TCHAR * MessageTracker::timestamp ( const ACE_Time_Value &  time_value,
ACE_TCHAR  date_and_time[],
size_t  date_and_timelen 
) [static]

Provide a timestamp for the passed in time. DEPRECATED: remove and replace with ACE::timestamp once TAO 1.6a is no longer supported

Definition at line 107 of file MessageTracker.cpp.

Referenced by wait_messages_pending(), and OpenDDS::DCPS::WriteDataContainer::wait_pending().

00110 {
00111   //ACE_TRACE ("ACE::timestamp");
00112 
00113   // This magic number is from the formatting statement
00114   // farther down this routine.
00115   if (date_and_timelen < 27)
00116     {
00117       errno = EINVAL;
00118       return 0;
00119     }
00120 
00121   ACE_Time_Value cur_time =
00122     (time_value == ACE_Time_Value::zero) ?
00123         ACE_Time_Value (ACE_OS::gettimeofday ()) : time_value;
00124   time_t secs = cur_time.sec ();
00125   struct tm tms;
00126   ACE_OS::localtime_r (&secs, &tms);
00127   ACE_OS::snprintf (date_and_time,
00128                     date_and_timelen,
00129                     ACE_TEXT ("%4.4d-%2.2d-%2.2d %2.2d:%2.2d:%2.2d.%06ld"),
00130                     tms.tm_year + 1900,
00131                     tms.tm_mon + 1,
00132                     tms.tm_mday,
00133                     tms.tm_hour,
00134                     tms.tm_min,
00135                     tms.tm_sec,
00136                     static_cast<long> (cur_time.usec()));
00137   date_and_time[date_and_timelen - 1] = '\0';
00138   return &date_and_time[11];
00139 }

void MessageTracker::wait_messages_pending ( OPENDDS_STRING &  caller_message  ) 

Block until all messages have been account for.

Definition at line 59 of file MessageTracker.cpp.

References OpenDDS::DCPS::DCPS_debug_level, done_condition_, msg_src_, pending_messages(), TheServiceParticipant, and timestamp().

Referenced by OpenDDS::DCPS::DataWriterImpl::wait_control_pending(), and OpenDDS::DCPS::SendResponseListener::~SendResponseListener().

00060 {
00061   ACE_Time_Value pending_timeout =
00062     TheServiceParticipant->pending_timeout();
00063 
00064   ACE_Time_Value* pTimeout = 0;
00065 
00066   if (pending_timeout != ACE_Time_Value::zero) {
00067     pTimeout = &pending_timeout;
00068     pending_timeout += ACE_OS::gettimeofday();
00069   }
00070 
00071   ACE_GUARD(ACE_Thread_Mutex, guard, this->lock_);
00072   const bool report = DCPS_debug_level > 0 && pending_messages();
00073   if (report) {
00074     ACE_TCHAR date_time[50];
00075     ACE_TCHAR* const time =
00076       MessageTracker::timestamp(pending_timeout,
00077                                 date_time,
00078                                 50);
00079     ACE_DEBUG((LM_DEBUG,
00080                ACE_TEXT("%T (%P|%t) MessageTracker::wait_messages_pending ")
00081                ACE_TEXT("from source=%C will wait until %s.\n"),
00082                msg_src_.c_str(),
00083                (pTimeout == 0 ? ACE_TEXT("(no timeout)") : time)));
00084   }
00085   while (true) {
00086     if (!pending_messages())
00087       break;
00088 
00089     if (done_condition_.wait(pTimeout) == -1 && pending_messages()) {
00090       if (DCPS_debug_level) {
00091         ACE_DEBUG((LM_INFO,
00092                    ACE_TEXT("(%P|%t) %T MessageTracker::")
00093                    ACE_TEXT("wait_messages_pending (Redmine Issue# 1446) %p (caller: %s)\n"),
00094                    ACE_TEXT("Timed out waiting for messages to be transported"),
00095                    caller_message.c_str()));
00096       }
00097       break;
00098     }
00099   }
00100   if (report) {
00101     ACE_DEBUG((LM_DEBUG,
00102                "%T (%P|%t) MessageTracker::wait_messages_pending done\n"));
00103   }
00104 }


Member Data Documentation

int OpenDDS::DCPS::MessageTracker::delivered_count_ [private]

Definition at line 72 of file MessageTracker.h.

Referenced by message_delivered(), and pending_messages().

ACE_Condition_Thread_Mutex OpenDDS::DCPS::MessageTracker::done_condition_ [private]

All messages have been transported condition variable.

Definition at line 78 of file MessageTracker.h.

Referenced by message_delivered(), message_dropped(), and wait_messages_pending().

int OpenDDS::DCPS::MessageTracker::dropped_count_ [private]

Definition at line 71 of file MessageTracker.h.

Referenced by dropped_count(), message_dropped(), and pending_messages().

ACE_Thread_Mutex OpenDDS::DCPS::MessageTracker::lock_ [private]

Definition at line 75 of file MessageTracker.h.

Referenced by message_delivered(), message_dropped(), and message_sent().

const OPENDDS_STRING OpenDDS::DCPS::MessageTracker::msg_src_ [private]

Definition at line 70 of file MessageTracker.h.

Referenced by wait_messages_pending().

int OpenDDS::DCPS::MessageTracker::sent_count_ [private]

Definition at line 73 of file MessageTracker.h.

Referenced by message_sent(), and pending_messages().


The documentation for this class was generated from the following files:
Generated on Fri Feb 12 20:06:19 2016 for OpenDDS by  doxygen 1.4.7