RtpsUdpSendStrategy.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "RtpsUdpSendStrategy.h"
00009 #include "RtpsUdpDataLink.h"
00010 #include "RtpsUdpInst.h"
00011 
00012 #include "dds/DCPS/transport/framework/NullSynchStrategy.h"
00013 #include "dds/DCPS/transport/framework/TransportCustomizedElement.h"
00014 #include "dds/DCPS/transport/framework/TransportSendElement.h"
00015 
00016 #include "dds/DCPS/RTPS/BaseMessageTypes.h"
00017 #include "dds/DCPS/RTPS/RtpsCoreTypeSupportImpl.h"
00018 
00019 #include "dds/DCPS/Serializer.h"
00020 
00021 #include <cstring>
00022 
00023 namespace OpenDDS {
00024 namespace DCPS {
00025 
00026 RtpsUdpSendStrategy::RtpsUdpSendStrategy(RtpsUdpDataLink* link)
00027   : TransportSendStrategy(0, TransportInst_rch(link->config(), false),
00028                           0,  // synch_resource
00029                           link->transport_priority(),
00030                           new NullSynchStrategy),
00031     link_(link),
00032     override_dest_(0),
00033     override_single_dest_(0),
00034     rtps_header_db_(RTPS::RTPSHDR_SZ, ACE_Message_Block::MB_DATA,
00035                     rtps_header_data_, 0, 0, ACE_Message_Block::DONT_DELETE, 0),
00036     rtps_header_mb_(&rtps_header_db_, ACE_Message_Block::DONT_DELETE)
00037 {
00038   rtps_header_.prefix[0] = 'R';
00039   rtps_header_.prefix[1] = 'T';
00040   rtps_header_.prefix[2] = 'P';
00041   rtps_header_.prefix[3] = 'S';
00042   rtps_header_.version = OpenDDS::RTPS::PROTOCOLVERSION;
00043   rtps_header_.vendorId = OpenDDS::RTPS::VENDORID_OPENDDS;
00044   std::memcpy(rtps_header_.guidPrefix, link->local_prefix(),
00045               sizeof(GuidPrefix_t));
00046   Serializer writer(&rtps_header_mb_);
00047   // byte order doesn't matter for the RTPS Header
00048   writer << rtps_header_;
00049 }
00050 
00051 ssize_t
00052 RtpsUdpSendStrategy::send_bytes_i(const iovec iov[], int n)
00053 {
00054   if (override_single_dest_) {
00055     return send_single_i(iov, n, *override_single_dest_);
00056   }
00057 
00058   if (override_dest_) {
00059     return send_multi_i(iov, n, *override_dest_);
00060   }
00061 
00062   // determine destination address(es) from TransportQueueElement in progress
00063   TransportQueueElement* elem = current_packet_first_element();
00064   if (!elem) {
00065     errno = ENOTCONN;
00066     return -1;
00067   }
00068 
00069   const RepoId remote_id = elem->subscription_id();
00070   OPENDDS_SET(ACE_INET_Addr) addrs;
00071 
00072   if (remote_id != GUID_UNKNOWN) {
00073     const ACE_INET_Addr remote = link_->get_locator(remote_id);
00074     if (remote != ACE_INET_Addr()) {
00075       addrs.insert(remote);
00076     }
00077   }
00078 
00079   if (addrs.empty()) {
00080     link_->get_locators(elem->publication_id(), addrs);
00081   }
00082 
00083   if (addrs.empty()) {
00084     errno = ENOTCONN;
00085     return -1;
00086   }
00087 
00088   return send_multi_i(iov, n, addrs);
00089 }
00090 
00091 RtpsUdpSendStrategy::OverrideToken
00092 RtpsUdpSendStrategy::override_destinations(const ACE_INET_Addr& destination)
00093 {
00094   override_single_dest_ = &destination;
00095   return OverrideToken(this);
00096 }
00097 
00098 RtpsUdpSendStrategy::OverrideToken
00099 RtpsUdpSendStrategy::override_destinations(const OPENDDS_SET(ACE_INET_Addr)& dest)
00100 {
00101   override_dest_ = &dest;
00102   return OverrideToken(this);
00103 }
00104 
00105 RtpsUdpSendStrategy::OverrideToken::~OverrideToken()
00106 {
00107   outer_->override_single_dest_ = 0;
00108   outer_->override_dest_ = 0;
00109 }
00110 
00111 void
00112 RtpsUdpSendStrategy::marshal_transport_header(ACE_Message_Block* mb)
00113 {
00114   Serializer writer(mb); // byte order doesn't matter for the RTPS Header
00115   writer.write_octet_array(reinterpret_cast<ACE_CDR::Octet*>(rtps_header_data_),
00116     RTPS::RTPSHDR_SZ);
00117 }
00118 
00119 void
00120 RtpsUdpSendStrategy::send_rtps_control(ACE_Message_Block& submessages,
00121                                        const ACE_INET_Addr& addr)
00122 {
00123   rtps_header_mb_.cont(&submessages);
00124 
00125   iovec iov[MAX_SEND_BLOCKS];
00126   const int num_blocks = mb_to_iov(rtps_header_mb_, iov);
00127   const ssize_t result = send_single_i(iov, num_blocks, addr);
00128   if (result < 0) {
00129     ACE_DEBUG((LM_ERROR, "(%P|%t) RtpsUdpSendStrategy::send_rtps_control() - "
00130       "failed to send RTPS control message\n"));
00131   }
00132 
00133   rtps_header_mb_.cont(0);
00134 }
00135 
00136 void
00137 RtpsUdpSendStrategy::send_rtps_control(ACE_Message_Block& submessages,
00138                                        const OPENDDS_SET(ACE_INET_Addr)& addrs)
00139 {
00140   rtps_header_mb_.cont(&submessages);
00141 
00142   iovec iov[MAX_SEND_BLOCKS];
00143   const int num_blocks = mb_to_iov(rtps_header_mb_, iov);
00144   const ssize_t result = send_multi_i(iov, num_blocks, addrs);
00145   if (result < 0) {
00146     ACE_DEBUG((LM_ERROR, "(%P|%t) RtpsUdpSendStrategy::send_rtps_control() - "
00147       "failed to send RTPS control message\n"));
00148   }
00149 
00150   rtps_header_mb_.cont(0);
00151 }
00152 
00153 ssize_t
00154 RtpsUdpSendStrategy::send_multi_i(const iovec iov[], int n,
00155                                   const OPENDDS_SET(ACE_INET_Addr)& addrs)
00156 {
00157   ssize_t result = -1;
00158   typedef OPENDDS_SET(ACE_INET_Addr)::const_iterator iter_t;
00159   for (iter_t iter = addrs.begin(); iter != addrs.end(); ++iter) {
00160     const ssize_t result_per_dest = send_single_i(iov, n, *iter);
00161     if (result_per_dest >= 0) {
00162       result = result_per_dest;
00163     }
00164   }
00165   return result;
00166 }
00167 
00168 ssize_t
00169 RtpsUdpSendStrategy::send_single_i(const iovec iov[], int n,
00170                                    const ACE_INET_Addr& addr)
00171 {
00172 #ifdef ACE_LACKS_SENDMSG
00173   char buffer[UDP_MAX_MESSAGE_SIZE];
00174   char *iter = buffer;
00175   for (int i = 0; i < n; ++i) {
00176     if (size_t(iter - buffer + iov[i].iov_len) > UDP_MAX_MESSAGE_SIZE) {
00177       ACE_ERROR((LM_ERROR, "(%P|%t) RtpsUdpSendStrategy::send_single_i() - "
00178                  "message too large at index %d size %d\n", i, iov[i].iov_len));
00179       return -1;
00180     }
00181     std::memcpy(iter, iov[i].iov_base, iov[i].iov_len);
00182     iter += iov[i].iov_len;
00183   }
00184   const ssize_t result = link_->unicast_socket().send(buffer, iter - buffer, addr);
00185 #else
00186   const ssize_t result = link_->unicast_socket().send(iov, n, addr);
00187 #endif
00188   if (result < 0) {
00189     ACE_TCHAR addr_buff[256] = {};
00190     int err = errno;
00191     addr.addr_to_string(addr_buff, 256, 0);
00192     errno = err;
00193     ACE_ERROR((LM_ERROR, "(%P|%t) RtpsUdpSendStrategy::send_single_i() - "
00194       "destination %s failed %p\n", addr_buff, ACE_TEXT("send")));
00195   }
00196   return result;
00197 }
00198 
00199 void
00200 RtpsUdpSendStrategy::add_delayed_notification(TransportQueueElement* element)
00201 {
00202   if (!link_->add_delayed_notification(element)) {
00203     TransportSendStrategy::add_delayed_notification(element);
00204   }
00205 }
00206 
00207 RemoveResult
00208 RtpsUdpSendStrategy::do_remove_sample(const RepoId& pub_id,
00209   const TransportQueueElement::MatchCriteria& criteria)
00210 {
00211   link_->do_remove_sample(pub_id, criteria);
00212   return TransportSendStrategy::do_remove_sample(pub_id, criteria);
00213 }
00214 
00215 void
00216 RtpsUdpSendStrategy::stop_i()
00217 {
00218 }
00219 
00220 } // namespace DCPS
00221 } // namespace OpenDDS

Generated on Fri Feb 12 20:05:26 2016 for OpenDDS by  doxygen 1.4.7