#include <RtpsUdpSendStrategy.h>
Inheritance diagram for OpenDDS::DCPS::RtpsUdpSendStrategy:
Public Member Functions | |
RtpsUdpSendStrategy (RtpsUdpDataLink *link) | |
virtual void | stop_i () |
Let the subclass stop. | |
OverrideToken | override_destinations (const ACE_INET_Addr &destination) |
OverrideToken | override_destinations (const OPENDDS_SET(ACE_INET_Addr)&destinations) |
void | send_rtps_control (ACE_Message_Block &submessages, const ACE_INET_Addr &destination) |
void | send_rtps_control (ACE_Message_Block &submessages, const OPENDDS_SET(ACE_INET_Addr)&destinations) |
Protected Member Functions | |
virtual ssize_t | send_bytes_i (const iovec iov[], int n) |
virtual size_t | max_message_size () const |
virtual void | add_delayed_notification (TransportQueueElement *element) |
virtual RemoveResult | do_remove_sample (const RepoId &pub_id, const TransportQueueElement::MatchCriteria &criteria) |
Implement framework chain visitations to remove a sample. | |
Private Member Functions | |
void | marshal_transport_header (ACE_Message_Block *mb) |
ssize_t | send_multi_i (const iovec iov[], int n, const OPENDDS_SET(ACE_INET_Addr)&addrs) |
ssize_t | send_single_i (const iovec iov[], int n, const ACE_INET_Addr &addr) |
const | OPENDDS_SET (ACE_INET_Addr)*override_dest_ |
Private Attributes | |
RtpsUdpDataLink * | link_ |
const ACE_INET_Addr * | override_single_dest_ |
OpenDDS::RTPS::Header | rtps_header_ |
char | rtps_header_data_ [RTPS::RTPSHDR_SZ] |
ACE_Data_Block | rtps_header_db_ |
ACE_Message_Block | rtps_header_mb_ |
Friends | |
struct | OverrideToken |
Classes | |
struct | OverrideToken |
Definition at line 24 of file RtpsUdpSendStrategy.h.
OpenDDS::DCPS::RtpsUdpSendStrategy::RtpsUdpSendStrategy | ( | RtpsUdpDataLink * | link | ) | [explicit] |
Definition at line 26 of file RtpsUdpSendStrategy.cpp.
References OpenDDS::RTPS::Header::guidPrefix, OpenDDS::DCPS::RtpsUdpDataLink::local_prefix(), OpenDDS::RTPS::Header::prefix, OpenDDS::RTPS::PROTOCOLVERSION, rtps_header_, rtps_header_mb_, OpenDDS::RTPS::Header::vendorId, OpenDDS::RTPS::VENDORID_OPENDDS, and OpenDDS::RTPS::Header::version.
00027 : TransportSendStrategy(0, TransportInst_rch(link->config(), false), 00028 0, // synch_resource 00029 link->transport_priority(), 00030 new NullSynchStrategy), 00031 link_(link), 00032 override_dest_(0), 00033 override_single_dest_(0), 00034 rtps_header_db_(RTPS::RTPSHDR_SZ, ACE_Message_Block::MB_DATA, 00035 rtps_header_data_, 0, 0, ACE_Message_Block::DONT_DELETE, 0), 00036 rtps_header_mb_(&rtps_header_db_, ACE_Message_Block::DONT_DELETE) 00037 { 00038 rtps_header_.prefix[0] = 'R'; 00039 rtps_header_.prefix[1] = 'T'; 00040 rtps_header_.prefix[2] = 'P'; 00041 rtps_header_.prefix[3] = 'S'; 00042 rtps_header_.version = OpenDDS::RTPS::PROTOCOLVERSION; 00043 rtps_header_.vendorId = OpenDDS::RTPS::VENDORID_OPENDDS; 00044 std::memcpy(rtps_header_.guidPrefix, link->local_prefix(), 00045 sizeof(GuidPrefix_t)); 00046 Serializer writer(&rtps_header_mb_); 00047 // byte order doesn't matter for the RTPS Header 00048 writer << rtps_header_; 00049 }
void OpenDDS::DCPS::RtpsUdpSendStrategy::add_delayed_notification | ( | TransportQueueElement * | element | ) | [protected, virtual] |
Reimplemented from OpenDDS::DCPS::TransportSendStrategy.
Definition at line 200 of file RtpsUdpSendStrategy.cpp.
References OpenDDS::DCPS::TransportSendStrategy::add_delayed_notification(), OpenDDS::DCPS::RtpsUdpDataLink::add_delayed_notification(), and link_.
00201 { 00202 if (!link_->add_delayed_notification(element)) { 00203 TransportSendStrategy::add_delayed_notification(element); 00204 } 00205 }
RemoveResult OpenDDS::DCPS::RtpsUdpSendStrategy::do_remove_sample | ( | const RepoId & | pub_id, | |
const TransportQueueElement::MatchCriteria & | criteria | |||
) | [protected, virtual] |
Implement framework chain visitations to remove a sample.
Reimplemented from OpenDDS::DCPS::TransportSendStrategy.
Definition at line 208 of file RtpsUdpSendStrategy.cpp.
References OpenDDS::DCPS::TransportSendStrategy::do_remove_sample(), OpenDDS::DCPS::RtpsUdpDataLink::do_remove_sample(), and link_.
00210 { 00211 link_->do_remove_sample(pub_id, criteria); 00212 return TransportSendStrategy::do_remove_sample(pub_id, criteria); 00213 }
void OpenDDS::DCPS::RtpsUdpSendStrategy::marshal_transport_header | ( | ACE_Message_Block * | mb | ) | [private, virtual] |
Reimplemented from OpenDDS::DCPS::TransportSendStrategy.
Definition at line 112 of file RtpsUdpSendStrategy.cpp.
References rtps_header_data_, OpenDDS::RTPS::RTPSHDR_SZ, and OpenDDS::DCPS::Serializer::write_octet_array().
00113 { 00114 Serializer writer(mb); // byte order doesn't matter for the RTPS Header 00115 writer.write_octet_array(reinterpret_cast<ACE_CDR::Octet*>(rtps_header_data_), 00116 RTPS::RTPSHDR_SZ); 00117 }
virtual size_t OpenDDS::DCPS::RtpsUdpSendStrategy::max_message_size | ( | ) | const [inline, protected, virtual] |
The maximum size of a message allowed by the this TransportImpl, or 0 if there is no such limit. This is expected to be a constant, for example UDP/IPv4 can send messages of up to 65466 bytes. The transport framework will use the returned value (if > 0) to fragment larger messages. This fragmentation and reassembly will be transparent to the user.
Reimplemented from OpenDDS::DCPS::TransportSendStrategy.
Definition at line 50 of file RtpsUdpSendStrategy.h.
00051 { 00052 return UDP_MAX_MESSAGE_SIZE; 00053 }
const OpenDDS::DCPS::RtpsUdpSendStrategy::OPENDDS_SET | ( | ACE_INET_Addr | ) | [private] |
Referenced by send_bytes_i(), and send_multi_i().
RtpsUdpSendStrategy::OverrideToken OpenDDS::DCPS::RtpsUdpSendStrategy::override_destinations | ( | const OPENDDS_SET(ACE_INET_Addr)& | destinations | ) |
Definition at line 99 of file RtpsUdpSendStrategy.cpp.
References OverrideToken.
00100 { 00101 override_dest_ = &dest; 00102 return OverrideToken(this); 00103 }
RtpsUdpSendStrategy::OverrideToken OpenDDS::DCPS::RtpsUdpSendStrategy::override_destinations | ( | const ACE_INET_Addr & | destination | ) |
Definition at line 92 of file RtpsUdpSendStrategy.cpp.
References override_single_dest_, and OverrideToken.
00093 { 00094 override_single_dest_ = &destination; 00095 return OverrideToken(this); 00096 }
ssize_t OpenDDS::DCPS::RtpsUdpSendStrategy::send_bytes_i | ( | const iovec | iov[], | |
int | n | |||
) | [protected, virtual] |
Implements OpenDDS::DCPS::TransportSendStrategy.
Definition at line 52 of file RtpsUdpSendStrategy.cpp.
References OpenDDS::DCPS::TransportSendStrategy::current_packet_first_element(), OpenDDS::DCPS::RtpsUdpDataLink::get_locator(), OpenDDS::DCPS::RtpsUdpDataLink::get_locators(), OpenDDS::DCPS::GUID_UNKNOWN, link_, OPENDDS_SET(), override_single_dest_, OpenDDS::DCPS::TransportQueueElement::publication_id(), send_multi_i(), send_single_i(), and OpenDDS::DCPS::TransportQueueElement::subscription_id().
00053 { 00054 if (override_single_dest_) { 00055 return send_single_i(iov, n, *override_single_dest_); 00056 } 00057 00058 if (override_dest_) { 00059 return send_multi_i(iov, n, *override_dest_); 00060 } 00061 00062 // determine destination address(es) from TransportQueueElement in progress 00063 TransportQueueElement* elem = current_packet_first_element(); 00064 if (!elem) { 00065 errno = ENOTCONN; 00066 return -1; 00067 } 00068 00069 const RepoId remote_id = elem->subscription_id(); 00070 OPENDDS_SET(ACE_INET_Addr) addrs; 00071 00072 if (remote_id != GUID_UNKNOWN) { 00073 const ACE_INET_Addr remote = link_->get_locator(remote_id); 00074 if (remote != ACE_INET_Addr()) { 00075 addrs.insert(remote); 00076 } 00077 } 00078 00079 if (addrs.empty()) { 00080 link_->get_locators(elem->publication_id(), addrs); 00081 } 00082 00083 if (addrs.empty()) { 00084 errno = ENOTCONN; 00085 return -1; 00086 } 00087 00088 return send_multi_i(iov, n, addrs); 00089 }
ssize_t OpenDDS::DCPS::RtpsUdpSendStrategy::send_multi_i | ( | const iovec | iov[], | |
int | n, | |||
const OPENDDS_SET(ACE_INET_Addr)& | addrs | |||
) | [private] |
Definition at line 154 of file RtpsUdpSendStrategy.cpp.
References OPENDDS_SET(), and send_single_i().
Referenced by send_bytes_i(), and send_rtps_control().
00156 { 00157 ssize_t result = -1; 00158 typedef OPENDDS_SET(ACE_INET_Addr)::const_iterator iter_t; 00159 for (iter_t iter = addrs.begin(); iter != addrs.end(); ++iter) { 00160 const ssize_t result_per_dest = send_single_i(iov, n, *iter); 00161 if (result_per_dest >= 0) { 00162 result = result_per_dest; 00163 } 00164 } 00165 return result; 00166 }
void OpenDDS::DCPS::RtpsUdpSendStrategy::send_rtps_control | ( | ACE_Message_Block & | submessages, | |
const OPENDDS_SET(ACE_INET_Addr)& | destinations | |||
) |
Definition at line 137 of file RtpsUdpSendStrategy.cpp.
References OpenDDS::DCPS::MAX_SEND_BLOCKS, OpenDDS::DCPS::TransportSendStrategy::mb_to_iov(), rtps_header_mb_, and send_multi_i().
00139 { 00140 rtps_header_mb_.cont(&submessages); 00141 00142 iovec iov[MAX_SEND_BLOCKS]; 00143 const int num_blocks = mb_to_iov(rtps_header_mb_, iov); 00144 const ssize_t result = send_multi_i(iov, num_blocks, addrs); 00145 if (result < 0) { 00146 ACE_DEBUG((LM_ERROR, "(%P|%t) RtpsUdpSendStrategy::send_rtps_control() - " 00147 "failed to send RTPS control message\n")); 00148 } 00149 00150 rtps_header_mb_.cont(0); 00151 }
void OpenDDS::DCPS::RtpsUdpSendStrategy::send_rtps_control | ( | ACE_Message_Block & | submessages, | |
const ACE_INET_Addr & | destination | |||
) |
Definition at line 120 of file RtpsUdpSendStrategy.cpp.
References OpenDDS::DCPS::MAX_SEND_BLOCKS, OpenDDS::DCPS::TransportSendStrategy::mb_to_iov(), rtps_header_mb_, and send_single_i().
00122 { 00123 rtps_header_mb_.cont(&submessages); 00124 00125 iovec iov[MAX_SEND_BLOCKS]; 00126 const int num_blocks = mb_to_iov(rtps_header_mb_, iov); 00127 const ssize_t result = send_single_i(iov, num_blocks, addr); 00128 if (result < 0) { 00129 ACE_DEBUG((LM_ERROR, "(%P|%t) RtpsUdpSendStrategy::send_rtps_control() - " 00130 "failed to send RTPS control message\n")); 00131 } 00132 00133 rtps_header_mb_.cont(0); 00134 }
ssize_t OpenDDS::DCPS::RtpsUdpSendStrategy::send_single_i | ( | const iovec | iov[], | |
int | n, | |||
const ACE_INET_Addr & | addr | |||
) | [private] |
Definition at line 169 of file RtpsUdpSendStrategy.cpp.
References link_, OpenDDS::DCPS::TransportSendStrategy::UDP_MAX_MESSAGE_SIZE, and OpenDDS::DCPS::RtpsUdpDataLink::unicast_socket().
Referenced by send_bytes_i(), send_multi_i(), and send_rtps_control().
00171 { 00172 #ifdef ACE_LACKS_SENDMSG 00173 char buffer[UDP_MAX_MESSAGE_SIZE]; 00174 char *iter = buffer; 00175 for (int i = 0; i < n; ++i) { 00176 if (size_t(iter - buffer + iov[i].iov_len) > UDP_MAX_MESSAGE_SIZE) { 00177 ACE_ERROR((LM_ERROR, "(%P|%t) RtpsUdpSendStrategy::send_single_i() - " 00178 "message too large at index %d size %d\n", i, iov[i].iov_len)); 00179 return -1; 00180 } 00181 std::memcpy(iter, iov[i].iov_base, iov[i].iov_len); 00182 iter += iov[i].iov_len; 00183 } 00184 const ssize_t result = link_->unicast_socket().send(buffer, iter - buffer, addr); 00185 #else 00186 const ssize_t result = link_->unicast_socket().send(iov, n, addr); 00187 #endif 00188 if (result < 0) { 00189 ACE_TCHAR addr_buff[256] = {}; 00190 int err = errno; 00191 addr.addr_to_string(addr_buff, 256, 0); 00192 errno = err; 00193 ACE_ERROR((LM_ERROR, "(%P|%t) RtpsUdpSendStrategy::send_single_i() - " 00194 "destination %s failed %p\n", addr_buff, ACE_TEXT("send"))); 00195 } 00196 return result; 00197 }
void OpenDDS::DCPS::RtpsUdpSendStrategy::stop_i | ( | ) | [virtual] |
Let the subclass stop.
Implements OpenDDS::DCPS::TransportSendStrategy.
Definition at line 216 of file RtpsUdpSendStrategy.cpp.
friend struct OverrideToken [friend] |
Definition at line 65 of file RtpsUdpSendStrategy.h.
Referenced by add_delayed_notification(), do_remove_sample(), send_bytes_i(), and send_single_i().
const ACE_INET_Addr* OpenDDS::DCPS::RtpsUdpSendStrategy::override_single_dest_ [private] |
Definition at line 67 of file RtpsUdpSendStrategy.h.
Referenced by override_destinations(), send_bytes_i(), and OpenDDS::DCPS::RtpsUdpSendStrategy::OverrideToken::~OverrideToken().
ACE_Data_Block OpenDDS::DCPS::RtpsUdpSendStrategy::rtps_header_db_ [private] |
Definition at line 71 of file RtpsUdpSendStrategy.h.
ACE_Message_Block OpenDDS::DCPS::RtpsUdpSendStrategy::rtps_header_mb_ [private] |
Definition at line 72 of file RtpsUdpSendStrategy.h.
Referenced by RtpsUdpSendStrategy(), and send_rtps_control().