00001
00002
00003
00004
00005
00006
00007
00008 #include "MulticastSendStrategy.h"
00009 #include "MulticastDataLink.h"
00010 #include "dds/DCPS/transport/framework/NullSynchStrategy.h"
00011 #include "ace/Proactor.h"
00012
00013 namespace OpenDDS {
00014 namespace DCPS {
00015
00016 MulticastSendStrategy::MulticastSendStrategy(MulticastDataLink* link)
00017 : TransportSendStrategy(0, TransportInst_rch(link->config(), false),
00018 0,
00019 link->transport_priority(),
00020 new NullSynchStrategy),
00021 link_(link)
00022 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO) || defined (ACE_HAS_AIO_CALLS)
00023 , async_init_(false)
00024 #endif
00025 {
00026
00027
00028 this->link_released(false);
00029 }
00030
00031 void
00032 MulticastSendStrategy::prepare_header_i()
00033 {
00034
00035 this->header_.source_ = this->link_->local_peer();
00036 }
00037
00038 ssize_t
00039 MulticastSendStrategy::send_bytes_i(const iovec iov[], int n)
00040 {
00041 return (this->link_->config()->async_send() ? async_send(iov, n) : sync_send(iov, n));
00042 }
00043
00044 ssize_t
00045 MulticastSendStrategy::sync_send(const iovec iov[], int n)
00046 {
00047 ACE_SOCK_Dgram_Mcast& socket = this->link_->socket();
00048
00049 const ssize_t result = socket.send(iov, n);
00050
00051 if (result == -1 && errno == ENOBUFS) {
00052
00053
00054
00055 ssize_t b = 0;
00056 for (int i = 0; i < n; ++i) b += iov[i].iov_len;
00057 return b;
00058 }
00059
00060 return result;
00061 }
00062
00063 ssize_t
00064 MulticastSendStrategy::async_send(const iovec iov[], int n)
00065 {
00066 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO) || defined (ACE_HAS_AIO_CALLS)
00067 if (!async_init_) {
00068 if (-1 == async_writer_.open(*this, link_->socket().get_handle(), 0 ,
00069 link_->get_proactor())) {
00070 return -1;
00071 }
00072 async_init_ = true;
00073 }
00074
00075 ACE_Message_Block* mb = 0;
00076 size_t total_length = 0;
00077
00078 for (int i = n - 1; i >= 0; --i) {
00079 ACE_Message_Block* next =
00080 new ACE_Message_Block(static_cast<const char*>(iov[i].iov_base),
00081 iov[i].iov_len);
00082 next->wr_ptr(iov[i].iov_len);
00083 total_length += iov[i].iov_len;
00084 next->cont(mb);
00085 mb = next;
00086 }
00087
00088 size_t bytes_sent = 0;
00089 ssize_t result = async_writer_.send(mb, bytes_sent, 0 ,
00090 this->link_->config()->group_address_);
00091
00092 if (result < 0) {
00093 mb->release();
00094 return result;
00095 }
00096
00097
00098 return total_length;
00099 #else
00100 ACE_UNUSED_ARG(iov);
00101 ACE_UNUSED_ARG(n);
00102 return -1;
00103 #endif
00104 }
00105
00106 void
00107 MulticastSendStrategy::stop_i()
00108 {
00109 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO) || defined (ACE_HAS_AIO_CALLS)
00110 if (async_init_) {
00111 async_writer_.cancel();
00112 }
00113 #endif
00114 }
00115
00116 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO) || defined (ACE_HAS_AIO_CALLS)
00117 void
00118 MulticastSendStrategy::handle_write_dgram(const ACE_Asynch_Write_Dgram::Result& res)
00119 {
00120 if (!res.success()) {
00121 ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: MulticastSendStrategy::handle_write_dgram: %d\n", res.error()));
00122 }
00123 res.message_block()->release();
00124 }
00125 #endif
00126
00127
00128 }
00129 }