MulticastSendStrategy.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "MulticastSendStrategy.h"
00009 #include "MulticastDataLink.h"
00010 #include "dds/DCPS/transport/framework/NullSynchStrategy.h"
00011 #include "ace/Proactor.h"
00012 
00013 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00014 
00015 namespace OpenDDS {
00016 namespace DCPS {
00017 
00018 MulticastSendStrategy::MulticastSendStrategy(MulticastDataLink* link)
00019   : TransportSendStrategy(0, link->impl(),
00020                           0,  // synch_resource
00021                           link->transport_priority(),
00022                           make_rch<NullSynchStrategy>()),
00023     link_(link)
00024 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO) || defined (ACE_HAS_AIO_CALLS)
00025   , async_init_(false)
00026 #endif
00027 {
00028   // Multicast will send a SYN (TRANSPORT_CONTROL) before any reservations
00029   // are made on the DataLink, if the link is "release" it will be dropped.
00030   this->link_released(false);
00031 }
00032 
00033 void
00034 MulticastSendStrategy::prepare_header_i()
00035 {
00036   // Tag outgoing packets with our peer ID:
00037   this->header_.source_ = this->link_->local_peer();
00038 }
00039 
00040 ssize_t
00041 MulticastSendStrategy::send_bytes_i(const iovec iov[], int n)
00042 {
00043   return (this->link_->config().async_send() ? async_send(iov, n) : sync_send(iov, n));
00044 }
00045 
00046 ssize_t
00047 MulticastSendStrategy::sync_send(const iovec iov[], int n)
00048 {
00049   ACE_SOCK_Dgram_Mcast& socket = this->link_->socket();
00050 
00051   const ssize_t result = socket.send(iov, n);
00052 
00053   if (result == -1 && errno == ENOBUFS) {
00054     // Make the framework think this was a successful send to avoid
00055     // putting the send strategy in suspended mode.  If reliability
00056     // is enabled, the data may be resent later in response to a NAK.
00057     ssize_t b = 0;
00058     for (int i = 0; i < n; ++i) b += iov[i].iov_len;
00059     return b;
00060   }
00061 
00062   return result;
00063 }
00064 
00065 ssize_t
00066 MulticastSendStrategy::async_send(const iovec iov[], int n)
00067 {
00068 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO) || defined (ACE_HAS_AIO_CALLS)
00069   if (!async_init_) {
00070     if (-1 == async_writer_.open(*this, link_->socket().get_handle(), 0 /*completion_key*/,
00071                                  link_->get_proactor())) {
00072         return -1;
00073     }
00074     async_init_ = true;
00075   }
00076 
00077   ACE_Message_Block* mb = 0;
00078   size_t total_length = 0;
00079 
00080   for (int i = n - 1; i >= 0; --i) {
00081     ACE_Message_Block* next =
00082       new ACE_Message_Block(static_cast<const char*>(iov[i].iov_base),
00083                             iov[i].iov_len);
00084     next->wr_ptr(iov[i].iov_len);
00085     total_length += iov[i].iov_len;
00086     next->cont(mb);
00087     mb = next;
00088   }
00089 
00090   size_t bytes_sent = 0;
00091   ssize_t result = async_writer_.send(mb, bytes_sent, 0 /*flags*/,
00092                                       this->link_->config().group_address_);
00093 
00094   if (result < 0) {
00095     if (mb) mb->release();
00096     return result;
00097   }
00098 
00099   // framework needs to think we sent the entire datagram
00100   return total_length;
00101 #else
00102   ACE_UNUSED_ARG(iov);
00103   ACE_UNUSED_ARG(n);
00104   return -1;
00105 #endif
00106 }
00107 
00108 void
00109 MulticastSendStrategy::stop_i()
00110 {
00111 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO) || defined (ACE_HAS_AIO_CALLS)
00112   if (async_init_) {
00113     async_writer_.cancel();
00114   }
00115 #endif
00116 }
00117 
00118 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO) || defined (ACE_HAS_AIO_CALLS)
00119 void
00120 MulticastSendStrategy::handle_write_dgram(const ACE_Asynch_Write_Dgram::Result& res)
00121 {
00122   if (!res.success()) {
00123     ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: MulticastSendStrategy::handle_write_dgram: %d\n", res.error()));
00124   }
00125   res.message_block()->release();
00126 }
00127 #endif
00128 
00129 
00130 } // namespace DCPS
00131 } // namespace OpenDDS
00132 
00133 OPENDDS_END_VERSIONED_NAMESPACE_DECL
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1