MulticastSendStrategy.cpp
Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008 #include "MulticastSendStrategy.h"
00009 #include "MulticastDataLink.h"
00010 #include "dds/DCPS/transport/framework/NullSynchStrategy.h"
00011 #include "ace/Proactor.h"
00012
00013 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00014
00015 namespace OpenDDS {
00016 namespace DCPS {
00017
00018 MulticastSendStrategy::MulticastSendStrategy(MulticastDataLink* link)
00019 : TransportSendStrategy(0, link->impl(),
00020 0,
00021 link->transport_priority(),
00022 make_rch<NullSynchStrategy>()),
00023 link_(link)
00024 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO) || defined (ACE_HAS_AIO_CALLS)
00025 , async_init_(false)
00026 #endif
00027 {
00028
00029
00030 this->link_released(false);
00031 }
00032
00033 void
00034 MulticastSendStrategy::prepare_header_i()
00035 {
00036
00037 this->header_.source_ = this->link_->local_peer();
00038 }
00039
00040 ssize_t
00041 MulticastSendStrategy::send_bytes_i(const iovec iov[], int n)
00042 {
00043 return (this->link_->config().async_send() ? async_send(iov, n) : sync_send(iov, n));
00044 }
00045
00046 ssize_t
00047 MulticastSendStrategy::sync_send(const iovec iov[], int n)
00048 {
00049 ACE_SOCK_Dgram_Mcast& socket = this->link_->socket();
00050
00051 const ssize_t result = socket.send(iov, n);
00052
00053 if (result == -1 && errno == ENOBUFS) {
00054
00055
00056
00057 ssize_t b = 0;
00058 for (int i = 0; i < n; ++i) b += iov[i].iov_len;
00059 return b;
00060 }
00061
00062 return result;
00063 }
00064
00065 ssize_t
00066 MulticastSendStrategy::async_send(const iovec iov[], int n)
00067 {
00068 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO) || defined (ACE_HAS_AIO_CALLS)
00069 if (!async_init_) {
00070 if (-1 == async_writer_.open(*this, link_->socket().get_handle(), 0 ,
00071 link_->get_proactor())) {
00072 return -1;
00073 }
00074 async_init_ = true;
00075 }
00076
00077 ACE_Message_Block* mb = 0;
00078 size_t total_length = 0;
00079
00080 for (int i = n - 1; i >= 0; --i) {
00081 ACE_Message_Block* next =
00082 new ACE_Message_Block(static_cast<const char*>(iov[i].iov_base),
00083 iov[i].iov_len);
00084 next->wr_ptr(iov[i].iov_len);
00085 total_length += iov[i].iov_len;
00086 next->cont(mb);
00087 mb = next;
00088 }
00089
00090 size_t bytes_sent = 0;
00091 ssize_t result = async_writer_.send(mb, bytes_sent, 0 ,
00092 this->link_->config().group_address_);
00093
00094 if (result < 0) {
00095 if (mb) mb->release();
00096 return result;
00097 }
00098
00099
00100 return total_length;
00101 #else
00102 ACE_UNUSED_ARG(iov);
00103 ACE_UNUSED_ARG(n);
00104 return -1;
00105 #endif
00106 }
00107
00108 void
00109 MulticastSendStrategy::stop_i()
00110 {
00111 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO) || defined (ACE_HAS_AIO_CALLS)
00112 if (async_init_) {
00113 async_writer_.cancel();
00114 }
00115 #endif
00116 }
00117
00118 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO) || defined (ACE_HAS_AIO_CALLS)
00119 void
00120 MulticastSendStrategy::handle_write_dgram(const ACE_Asynch_Write_Dgram::Result& res)
00121 {
00122 if (!res.success()) {
00123 ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: MulticastSendStrategy::handle_write_dgram: %d\n", res.error()));
00124 }
00125 res.message_block()->release();
00126 }
00127 #endif
00128
00129
00130 }
00131 }
00132
00133 OPENDDS_END_VERSIONED_NAMESPACE_DECL