OpenDDS  Snapshot(2023/04/28-20:55)
MulticastSendStrategy.cpp
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
9 #include "MulticastDataLink.h"
11 #include "ace/Proactor.h"
12 
14 
15 namespace OpenDDS {
16 namespace DCPS {
17 
19  : TransportSendStrategy(0, link->impl(),
20  0, // synch_resource
21  link->transport_priority(),
23  link_(link)
24 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO) || defined (ACE_HAS_AIO_CALLS)
25  , async_init_(false)
26 #endif
27 {
28  // Multicast will send a SYN (TRANSPORT_CONTROL) before any reservations
29  // are made on the DataLink, if the link is "release" it will be dropped.
30  this->link_released(false);
31 }
32 
33 void
35 {
36  // Tag outgoing packets with our peer ID:
38 }
39 
40 ssize_t
41 MulticastSendStrategy::send_bytes_i(const iovec iov[], int n)
42 {
44  return (cfg && cfg->async_send()) ? async_send(iov, n, cfg->group_address_) : sync_send(iov, n);
45 }
46 
47 ssize_t
48 MulticastSendStrategy::sync_send(const iovec iov[], int n)
49 {
51 
52  const ssize_t result = socket.send(iov, n);
53 
54  if (result == -1 && errno == ENOBUFS) {
55  // Make the framework think this was a successful send to avoid
56  // putting the send strategy in suspended mode. If reliability
57  // is enabled, the data may be resent later in response to a NAK.
58  ssize_t b = 0;
59  for (int i = 0; i < n; ++i) b += iov[i].iov_len;
60  return b;
61  }
62 
63  return result;
64 }
65 
66 ssize_t
67 MulticastSendStrategy::async_send(const iovec iov[], int n, const ACE_INET_Addr& group_address)
68 {
69 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO) || defined (ACE_HAS_AIO_CALLS)
70  if (!async_init_) {
71  if (-1 == async_writer_.open(*this, link_->socket().get_handle(), 0 /*completion_key*/,
72  link_->get_proactor())) {
73  return -1;
74  }
75  async_init_ = true;
76  }
77 
78  ACE_Message_Block* mb = 0;
79  size_t total_length = 0;
80 
81  for (int i = n - 1; i >= 0; --i) {
82  ACE_Message_Block* next =
83  new ACE_Message_Block(static_cast<const char*>(iov[i].iov_base),
84  iov[i].iov_len);
85  next->wr_ptr(iov[i].iov_len);
86  total_length += iov[i].iov_len;
87  next->cont(mb);
88  mb = next;
89  }
90 
91  size_t bytes_sent = 0;
92  ssize_t result = async_writer_.send(mb, bytes_sent, 0 /*flags*/, group_address);
93 
94  if (result < 0) {
95  if (mb) mb->release();
96  return result;
97  }
98 
99  // framework needs to think we sent the entire datagram
100  return total_length;
101 #else
102  ACE_UNUSED_ARG(iov);
103  ACE_UNUSED_ARG(n);
104  return -1;
105 #endif
106 }
107 
108 void
110 {
111 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO) || defined (ACE_HAS_AIO_CALLS)
112  if (async_init_) {
113  async_writer_.cancel();
114  }
115 #endif
116 }
117 
118 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO) || defined (ACE_HAS_AIO_CALLS)
119 void
120 MulticastSendStrategy::handle_write_dgram(const ACE_Asynch_Write_Dgram::Result& res)
121 {
122  if (!res.success()) {
123  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: MulticastSendStrategy::handle_write_dgram: %d\n", res.error()));
124  }
125  res.message_block()->release();
126 }
127 #endif
128 
129 
130 } // namespace DCPS
131 } // namespace OpenDDS
132 
#define ACE_ERROR(X)
ACE_Message_Block * message_block(void) const
virtual void prepare_header_i()
Specific implementation processing of prepared packet header.
if(!(yy_init))
int success(void) const
RcHandle< T > make_rch()
Definition: RcHandle_T.h:256
ACE_SOCK_Dgram_Mcast & socket()
MulticastSendStrategy(MulticastDataLink *link)
ssize_t send(const void *buf, size_t n, int flags=0) const
int ssize_t
ssize_t sync_send(const iovec iov[], int n)
ACE_HANDLE socket(int protocol_family, int type, int proto)
unsigned long error(void) const
virtual ACE_Message_Block * release(void)
ACE_Message_Block * cont(void) const
char * wr_ptr(void) const
ACE_HANDLE get_handle(void) const
virtual void stop_i()
Let the subclass stop.
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
ssize_t async_send(const iovec iov[], int n, const ACE_INET_Addr &addr)
virtual ssize_t send_bytes_i(const iovec iov[], int n)
#define ENOBUFS
LM_ERROR
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28