MulticastSendStrategy.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "MulticastSendStrategy.h"
00009 #include "MulticastDataLink.h"
00010 #include "dds/DCPS/transport/framework/NullSynchStrategy.h"
00011 #include "ace/Proactor.h"
00012 
00013 namespace OpenDDS {
00014 namespace DCPS {
00015 
00016 MulticastSendStrategy::MulticastSendStrategy(MulticastDataLink* link)
00017   : TransportSendStrategy(0, TransportInst_rch(link->config(), false),
00018                           0,  // synch_resource
00019                           link->transport_priority(),
00020                           new NullSynchStrategy),
00021     link_(link)
00022 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO) || defined (ACE_HAS_AIO_CALLS)
00023   , async_init_(false)
00024 #endif
00025 {
00026   // Multicast will send a SYN (TRANSPORT_CONTROL) before any reservations
00027   // are made on the DataLink, if the link is "release" it will be dropped.
00028   this->link_released(false);
00029 }
00030 
00031 void
00032 MulticastSendStrategy::prepare_header_i()
00033 {
00034   // Tag outgoing packets with our peer ID:
00035   this->header_.source_ = this->link_->local_peer();
00036 }
00037 
00038 ssize_t
00039 MulticastSendStrategy::send_bytes_i(const iovec iov[], int n)
00040 {
00041   return (this->link_->config()->async_send() ? async_send(iov, n) : sync_send(iov, n));
00042 }
00043 
00044 ssize_t
00045 MulticastSendStrategy::sync_send(const iovec iov[], int n)
00046 {
00047   ACE_SOCK_Dgram_Mcast& socket = this->link_->socket();
00048 
00049   const ssize_t result = socket.send(iov, n);
00050 
00051   if (result == -1 && errno == ENOBUFS) {
00052     // Make the framework think this was a successful send to avoid
00053     // putting the send strategy in suspended mode.  If reliability
00054     // is enabled, the data may be resent later in response to a NAK.
00055     ssize_t b = 0;
00056     for (int i = 0; i < n; ++i) b += iov[i].iov_len;
00057     return b;
00058   }
00059 
00060   return result;
00061 }
00062 
00063 ssize_t
00064 MulticastSendStrategy::async_send(const iovec iov[], int n)
00065 {
00066 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO) || defined (ACE_HAS_AIO_CALLS)
00067   if (!async_init_) {
00068     if (-1 == async_writer_.open(*this, link_->socket().get_handle(), 0 /*completion_key*/,
00069                                  link_->get_proactor())) {
00070         return -1;
00071     }
00072     async_init_ = true;
00073   }
00074 
00075   ACE_Message_Block* mb = 0;
00076   size_t total_length = 0;
00077 
00078   for (int i = n - 1; i >= 0; --i) {
00079     ACE_Message_Block* next =
00080       new ACE_Message_Block(static_cast<const char*>(iov[i].iov_base),
00081                             iov[i].iov_len);
00082     next->wr_ptr(iov[i].iov_len);
00083     total_length += iov[i].iov_len;
00084     next->cont(mb);
00085     mb = next;
00086   }
00087 
00088   size_t bytes_sent = 0;
00089   ssize_t result = async_writer_.send(mb, bytes_sent, 0 /*flags*/,
00090                                       this->link_->config()->group_address_);
00091 
00092   if (result < 0) {
00093     mb->release();
00094     return result;
00095   }
00096 
00097   // framework needs to think we sent the entire datagram
00098   return total_length;
00099 #else
00100   ACE_UNUSED_ARG(iov);
00101   ACE_UNUSED_ARG(n);
00102   return -1;
00103 #endif
00104 }
00105 
00106 void
00107 MulticastSendStrategy::stop_i()
00108 {
00109 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO) || defined (ACE_HAS_AIO_CALLS)
00110   if (async_init_) {
00111     async_writer_.cancel();
00112   }
00113 #endif
00114 }
00115 
00116 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO) || defined (ACE_HAS_AIO_CALLS)
00117 void
00118 MulticastSendStrategy::handle_write_dgram(const ACE_Asynch_Write_Dgram::Result& res)
00119 {
00120   if (!res.success()) {
00121     ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: MulticastSendStrategy::handle_write_dgram: %d\n", res.error()));
00122   }
00123   res.message_block()->release();
00124 }
00125 #endif
00126 
00127 
00128 } // namespace DCPS
00129 } // namespace OpenDDS

Generated on Fri Feb 12 20:05:24 2016 for OpenDDS by  doxygen 1.4.7