OpenDDS  Snapshot(2023/04/28-20:55)
TransportSendStrategy.inl
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 #include "ThreadSynch.h"
9 
11 
12 namespace OpenDDS {
13 namespace DCPS {
14 
17 {
18  DBG_ENTRY_LVL("TransportSendStrategy","mode",6);
19 
20  return mode_;
21 }
22 
25 {
26  DBG_ENTRY_LVL("TransportSendStrategy","synch",6);
27 
28  return synch_.get();
29 }
30 
33 {
34  header_.source_ = source;
35 }
36 
39 {
40  DBG_ENTRY_LVL("TransportSendStrategy","send_start",6);
41 
42  GuardType guard(this->lock_);
43 
44  if (!this->link_released_)
45  ++this->start_counter_;
46 }
47 
50 {
51  DBG_ENTRY_LVL("TransportSendStrategy","link_released",6);
52 
53  GuardType guard(this->lock_);
54  this->link_released_ = flag;
55 }
56 
59 {
60  DBG_ENTRY_LVL("TransportSendStrategy","relink",6);
61  // The subsclass needs implement this function for re-establishing
62  // the link upon send failure.
63 }
64 
67 {
68  DBG_ENTRY_LVL("TransportSendStrategy","suspend_send",6);
69  GuardType guard(this->lock_);
70 
71  if (this->mode_ != MODE_TERMINATED && this->mode_ != MODE_SUSPEND) {
72  this->mode_before_suspend_ = this->mode_;
73  this->mode_ = MODE_SUSPEND;
74  }
75 }
76 
79 {
80  DBG_ENTRY_LVL("TransportSendStrategy","resume_send",6);
81  GuardType guard(this->lock_);
82 
83  // If this send strategy is reused when the connection is reestablished, then
84  // we need re-initialize the mode_ and mode_before_suspend_.
85  if (this->mode_ == MODE_TERMINATED) {
86  this->header_.length_ = 0;
87  this->pkt_chain_ = 0;
88  this->header_complete_ = false;
89  this->start_counter_ = 0;
90  this->mode_ = MODE_DIRECT;
92  this->delayed_delivered_notification_queue_.clear();
93 
94  } else if (this->mode_ == MODE_SUSPEND) {
95  this->header_.length_ = 0;
96  this->pkt_chain_ = 0;
97  QueueType elems;
98  elems.swap(this->elems_);
99  this->mode_ = this->mode_before_suspend_;
100  this->header_complete_ = false;
102  if (this->queue_.size() > 0) {
103  this->mode_ = MODE_QUEUE;
104  this->synch_->work_available();
105  }
106 
107  } else {
108  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: TransportSendStrategy::resume_send The suspend or terminate"
109  " is not called previously.\n"));
110  }
111 }
112 
115 {
116  static const char* SendModeStr[] = { "MODE_NOT_SET",
117  "MODE_DIRECT",
118  "MODE_QUEUE",
119  "MODE_SUSPEND",
120  "MODE_TERMINATED",
121  "UNKNOWN"
122  };
123 
124  return SendModeStr[mode];
125 }
126 
129 {
130  return this->mode_ == MODE_DIRECT;
131 }
132 
134 ssize_t TransportSendStrategy::send_bytes(const iovec iov[], int n, int& /*bp*/)
135 {
136  return send_bytes_i(iov, n);
137 }
138 
141 {
142  return ACE_INVALID_HANDLE;
143 }
144 
145 
148 {
149  return 0;
150 }
151 
154 {
155  return this->elems_.peek();
156 }
157 
158 } // namespace DCPS
159 } // namespace OpenDDS
160 
#define ACE_ERROR(X)
virtual void relink(bool do_suspend=true)
SendMode mode() const
Access the current sending mode.
void swap(BasicQueue &other)
Definition: BasicQueue_T.h:122
int ssize_t
virtual ssize_t send_bytes_i(const iovec iov[], int n)=0
Atomic< SendMode > mode_
This mode determines how send() calls will be handled.
TransportHeader header_
Current transport packet header.
size_t size() const
Accessor for the current number of elements in the queue.
Definition: BasicQueue_T.h:65
unique_ptr< ThreadSynch > synch_
The thread synch object.
TransportQueueElement * current_packet_first_element() const
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
virtual ssize_t send_bytes(const iovec iov[], int n, int &bp)
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
long long ACE_INT64
#define ACE_INLINE
static const char * mode_as_str(SendMode mode)
Helper function to debugging.
LM_ERROR
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
Base wrapper class around a data/control sample to be sent.