00001
00002
00003
00004
00005
00006
00007
00008 #include "RtpsUdpSendStrategy.h"
00009 #include "RtpsUdpDataLink.h"
00010 #include "RtpsUdpInst.h"
00011
00012 #include "dds/DCPS/transport/framework/NullSynchStrategy.h"
00013 #include "dds/DCPS/transport/framework/TransportCustomizedElement.h"
00014 #include "dds/DCPS/transport/framework/TransportSendElement.h"
00015
00016 #include "dds/DCPS/RTPS/BaseMessageTypes.h"
00017 #include "dds/DCPS/RTPS/RtpsCoreTypeSupportImpl.h"
00018
00019 #include "dds/DCPS/Serializer.h"
00020
00021 #include <cstring>
00022
00023 namespace OpenDDS {
00024 namespace DCPS {
00025
00026 RtpsUdpSendStrategy::RtpsUdpSendStrategy(RtpsUdpDataLink* link)
00027 : TransportSendStrategy(0, TransportInst_rch(link->config(), false),
00028 0,
00029 link->transport_priority(),
00030 new NullSynchStrategy),
00031 link_(link),
00032 override_dest_(0),
00033 override_single_dest_(0),
00034 rtps_header_db_(RTPS::RTPSHDR_SZ, ACE_Message_Block::MB_DATA,
00035 rtps_header_data_, 0, 0, ACE_Message_Block::DONT_DELETE, 0),
00036 rtps_header_mb_(&rtps_header_db_, ACE_Message_Block::DONT_DELETE)
00037 {
00038 rtps_header_.prefix[0] = 'R';
00039 rtps_header_.prefix[1] = 'T';
00040 rtps_header_.prefix[2] = 'P';
00041 rtps_header_.prefix[3] = 'S';
00042 rtps_header_.version = OpenDDS::RTPS::PROTOCOLVERSION;
00043 rtps_header_.vendorId = OpenDDS::RTPS::VENDORID_OPENDDS;
00044 std::memcpy(rtps_header_.guidPrefix, link->local_prefix(),
00045 sizeof(GuidPrefix_t));
00046 Serializer writer(&rtps_header_mb_);
00047
00048 writer << rtps_header_;
00049 }
00050
00051 ssize_t
00052 RtpsUdpSendStrategy::send_bytes_i(const iovec iov[], int n)
00053 {
00054 if (override_single_dest_) {
00055 return send_single_i(iov, n, *override_single_dest_);
00056 }
00057
00058 if (override_dest_) {
00059 return send_multi_i(iov, n, *override_dest_);
00060 }
00061
00062
00063 TransportQueueElement* elem = current_packet_first_element();
00064 if (!elem) {
00065 errno = ENOTCONN;
00066 return -1;
00067 }
00068
00069 const RepoId remote_id = elem->subscription_id();
00070 OPENDDS_SET(ACE_INET_Addr) addrs;
00071
00072 if (remote_id != GUID_UNKNOWN) {
00073 const ACE_INET_Addr remote = link_->get_locator(remote_id);
00074 if (remote != ACE_INET_Addr()) {
00075 addrs.insert(remote);
00076 }
00077 }
00078
00079 if (addrs.empty()) {
00080 link_->get_locators(elem->publication_id(), addrs);
00081 }
00082
00083 if (addrs.empty()) {
00084 errno = ENOTCONN;
00085 return -1;
00086 }
00087
00088 return send_multi_i(iov, n, addrs);
00089 }
00090
00091 RtpsUdpSendStrategy::OverrideToken
00092 RtpsUdpSendStrategy::override_destinations(const ACE_INET_Addr& destination)
00093 {
00094 override_single_dest_ = &destination;
00095 return OverrideToken(this);
00096 }
00097
00098 RtpsUdpSendStrategy::OverrideToken
00099 RtpsUdpSendStrategy::override_destinations(const OPENDDS_SET(ACE_INET_Addr)& dest)
00100 {
00101 override_dest_ = &dest;
00102 return OverrideToken(this);
00103 }
00104
00105 RtpsUdpSendStrategy::OverrideToken::~OverrideToken()
00106 {
00107 outer_->override_single_dest_ = 0;
00108 outer_->override_dest_ = 0;
00109 }
00110
00111 void
00112 RtpsUdpSendStrategy::marshal_transport_header(ACE_Message_Block* mb)
00113 {
00114 Serializer writer(mb);
00115 writer.write_octet_array(reinterpret_cast<ACE_CDR::Octet*>(rtps_header_data_),
00116 RTPS::RTPSHDR_SZ);
00117 }
00118
00119 void
00120 RtpsUdpSendStrategy::send_rtps_control(ACE_Message_Block& submessages,
00121 const ACE_INET_Addr& addr)
00122 {
00123 rtps_header_mb_.cont(&submessages);
00124
00125 iovec iov[MAX_SEND_BLOCKS];
00126 const int num_blocks = mb_to_iov(rtps_header_mb_, iov);
00127 const ssize_t result = send_single_i(iov, num_blocks, addr);
00128 if (result < 0) {
00129 ACE_DEBUG((LM_ERROR, "(%P|%t) RtpsUdpSendStrategy::send_rtps_control() - "
00130 "failed to send RTPS control message\n"));
00131 }
00132
00133 rtps_header_mb_.cont(0);
00134 }
00135
00136 void
00137 RtpsUdpSendStrategy::send_rtps_control(ACE_Message_Block& submessages,
00138 const OPENDDS_SET(ACE_INET_Addr)& addrs)
00139 {
00140 rtps_header_mb_.cont(&submessages);
00141
00142 iovec iov[MAX_SEND_BLOCKS];
00143 const int num_blocks = mb_to_iov(rtps_header_mb_, iov);
00144 const ssize_t result = send_multi_i(iov, num_blocks, addrs);
00145 if (result < 0) {
00146 ACE_DEBUG((LM_ERROR, "(%P|%t) RtpsUdpSendStrategy::send_rtps_control() - "
00147 "failed to send RTPS control message\n"));
00148 }
00149
00150 rtps_header_mb_.cont(0);
00151 }
00152
00153 ssize_t
00154 RtpsUdpSendStrategy::send_multi_i(const iovec iov[], int n,
00155 const OPENDDS_SET(ACE_INET_Addr)& addrs)
00156 {
00157 ssize_t result = -1;
00158 typedef OPENDDS_SET(ACE_INET_Addr)::const_iterator iter_t;
00159 for (iter_t iter = addrs.begin(); iter != addrs.end(); ++iter) {
00160 const ssize_t result_per_dest = send_single_i(iov, n, *iter);
00161 if (result_per_dest >= 0) {
00162 result = result_per_dest;
00163 }
00164 }
00165 return result;
00166 }
00167
00168 ssize_t
00169 RtpsUdpSendStrategy::send_single_i(const iovec iov[], int n,
00170 const ACE_INET_Addr& addr)
00171 {
00172 #ifdef ACE_LACKS_SENDMSG
00173 char buffer[UDP_MAX_MESSAGE_SIZE];
00174 char *iter = buffer;
00175 for (int i = 0; i < n; ++i) {
00176 if (size_t(iter - buffer + iov[i].iov_len) > UDP_MAX_MESSAGE_SIZE) {
00177 ACE_ERROR((LM_ERROR, "(%P|%t) RtpsUdpSendStrategy::send_single_i() - "
00178 "message too large at index %d size %d\n", i, iov[i].iov_len));
00179 return -1;
00180 }
00181 std::memcpy(iter, iov[i].iov_base, iov[i].iov_len);
00182 iter += iov[i].iov_len;
00183 }
00184 const ssize_t result = link_->unicast_socket().send(buffer, iter - buffer, addr);
00185 #else
00186 const ssize_t result = link_->unicast_socket().send(iov, n, addr);
00187 #endif
00188 if (result < 0) {
00189 ACE_TCHAR addr_buff[256] = {};
00190 int err = errno;
00191 addr.addr_to_string(addr_buff, 256, 0);
00192 errno = err;
00193 ACE_ERROR((LM_ERROR, "(%P|%t) RtpsUdpSendStrategy::send_single_i() - "
00194 "destination %s failed %p\n", addr_buff, ACE_TEXT("send")));
00195 }
00196 return result;
00197 }
00198
00199 void
00200 RtpsUdpSendStrategy::add_delayed_notification(TransportQueueElement* element)
00201 {
00202 if (!link_->add_delayed_notification(element)) {
00203 TransportSendStrategy::add_delayed_notification(element);
00204 }
00205 }
00206
00207 RemoveResult
00208 RtpsUdpSendStrategy::do_remove_sample(const RepoId& pub_id,
00209 const TransportQueueElement::MatchCriteria& criteria)
00210 {
00211 link_->do_remove_sample(pub_id, criteria);
00212 return TransportSendStrategy::do_remove_sample(pub_id, criteria);
00213 }
00214
00215 void
00216 RtpsUdpSendStrategy::stop_i()
00217 {
00218 }
00219
00220 }
00221 }