OpenDDS  Snapshot(2023/04/28-20:55)
ThreadPerConnectionSendTask.h
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 #ifndef OPENDDS_DCPS_TRANSPORT_FRAMEWORK_THREADPERCONNECTIONSENDTASK_H
9 #define OPENDDS_DCPS_TRANSPORT_FRAMEWORK_THREADPERCONNECTIONSENDTASK_H
10 
11 #include "BasicQueue_T.h"
12 #include "TransportDefs.h"
13 
14 #include <dds/DCPS/dcps_export.h>
17 
18 #include <ace/Synch_Traits.h>
19 #include <ace/Task.h>
20 
21 #if !defined (ACE_LACKS_PRAGMA_ONCE)
22 # pragma once
23 #endif /* ACE_LACKS_PRAGMA_ONCE */
24 
26 
27 namespace OpenDDS {
28 namespace DCPS {
29 
30 class DataLink;
31 class TransportQueueElement;
32 class DataSampleElement;
33 class TransportSendElement;
34 
35 
42 };
43 
47 };
48 
49 /**
50  * @class ThreadPerConnectionSendTask
51  *
52  * @brief Execute the requests of sending a sample or control message.
53  *
54  * This task implements the request execute method which handles each step
55  * of sending a sample or control message.
56  */
58 public:
60 
61  virtual ~ThreadPerConnectionSendTask();
62 
63  /// Put the request to the request queue.
64  /// Returns 0 if successful, -1 otherwise (it has been "rejected" or this
65  /// task is shutdown).
66  int add_request(SendStrategyOpType op, TransportQueueElement* element = 0);
67 
68  /// Activate the worker threads
69  virtual int open(void* = 0);
70 
71  /// The "mainline" executed by the worker thread.
72  virtual int svc();
73 
74  /// Called when the thread exits.
75  virtual int close(u_long flag = 0);
76 
77  /// Remove sample from the thread per connection queue.
78  RemoveResult remove_sample(const DataSampleElement* element);
79 
80 private:
81 
82  /// Handle the request.
83  virtual void execute(SendRequest& req);
84 
88 
90 
91  /// Lock to protect the "state" (all of the data members) of this object.
92  LockType lock_;
93 
94  /// The request queue.
95  QueueType queue_;
96 
97  /// Condition used to signal the worker threads that they may be able to
98  /// find a request in the queue_ that needs to be executed.
99  /// This condition will be signal()'ed each time a request is
100  /// added to the queue_, and also when this task is shutdown.
101  ConditionVariableType work_available_;
102 
103  /// Flag used to initiate a shutdown request to all worker threads.
105 
106  /// Flag used to avoid multiple open() calls.
107  bool opened_;
108 
109  /// The id of the thread created by this task.
111 
112  /// The datalink to send the samples or control messages.
114 };
115 
116 } // namespace DCPS
117 } // namespace OpenDDS
118 
120 
121 #endif /* OPENDDS_DCPS_THREADPERCONNECTIONSENDER_H */
#define ACE_SYNCH_MUTEX
bool opened_
Flag used to avoid multiple open() calls.
#define OpenDDS_Dcps_Export
Definition: dcps_export.h:24
ACE_thread_t thr_id_
The id of the thread created by this task.
int close(ACE_HANDLE handle)
bool shutdown_initiated_
Flag used to initiate a shutdown request to all worker threads.
RemoveResult
used by DataLink::remove_sample(), TransportSendStrategy, *RemoveVisitor
ACE_HANDLE open(const char *filename, int mode, mode_t perms=ACE_DEFAULT_OPEN_PERMS, LPSECURITY_ATTRIBUTES sa=0)
DWORD ACE_thread_t
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
Execute the requests of sending a sample or control message.
LockType lock_
Lock to protect the "state" (all of the data members) of this object.
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
DataLink * link_
The datalink to send the samples or control messages.
Base wrapper class around a data/control sample to be sent.