OpenDDS  Snapshot(2023/04/28-20:55)
MessageTracker.h
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 #ifndef OPENDDS_DCPS_MESSAGETRACKER_H
9 #define OPENDDS_DCPS_MESSAGETRACKER_H
10 
11 #include "dcps_export.h"
12 #include "PoolAllocator.h"
13 #include "TimeTypes.h"
14 #include "ConditionVariable.h"
15 
16 #include <ace/Thread_Mutex.h>
17 
19 
20 namespace OpenDDS {
21 namespace DCPS {
22 
23  /**
24  * A simple message tracker to use to wait until all messages have been
25  * accounted for being continuing processing.
26  */
28  public:
29  MessageTracker(const OPENDDS_STRING& msg_src);
30 
31  /**
32  * Indicate that a message has been to the transport layer.
33  */
34  void message_sent();
35 
36  /**
37  * Indicate that a message has been delivered by the transport layer.
38  */
39  void message_delivered();
40 
41  /**
42  * Indicate that a message has been dropped by the transport layer.
43  */
44  void message_dropped();
45 
46  /**
47  * Answer if there are any messages that have not been accounted for.
48  */
49  bool pending_messages() const;
50 
51  /**
52  * Block until all messages have been accounted for or timeouts out based
53  * on PendingTimeout.
54  */
55  void wait_messages_pending(const char* caller);
56 
57  /**
58  * Block until all messages have been accounted for or the deadline supplied
59  * has passed. Blocks indefinitely if deadline is zero.
60  */
61  void wait_messages_pending(const char* caller, const MonotonicTimePoint& deadline);
62 
63  /**
64  * For testing.
65  */
66  int dropped_count() const;
67 
68  private:
69  bool pending_messages_i() const;
70 
71  const OPENDDS_STRING msg_src_; // Source of tracked messages
73  int delivered_count_; // Messages transmitted by transport layer
74  int sent_count_; // Messages sent to transport layer
75 
77 
78  /// All messages have been transported condition variable.
80  ConditionVariableType done_condition_;
81  };
82 
83 } // namespace DCPS
84 } // namespace OpenDDS
85 
87 
88 #endif
#define OpenDDS_Dcps_Export
Definition: dcps_export.h:24
ConditionVariableType done_condition_
const OPENDDS_STRING msg_src_
#define OPENDDS_STRING
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
ConditionVariable< ACE_Thread_Mutex > ConditionVariableType
All messages have been transported condition variable.