#include <RtpsUdpSendStrategy.h>
Definition at line 32 of file RtpsUdpSendStrategy.h.
OpenDDS::DCPS::RtpsUdpSendStrategy::RtpsUdpSendStrategy | ( | RtpsUdpDataLink * | link, | |
const GuidPrefix_t & | local_prefix | |||
) |
Definition at line 31 of file RtpsUdpSendStrategy.cpp.
References OpenDDS::RTPS::Header::guidPrefix, OpenDDS::RTPS::Header::prefix, OpenDDS::RTPS::PROTOCOLVERSION, rtps_header_, rtps_header_mb_, OpenDDS::RTPS::Header::vendorId, OpenDDS::RTPS::VENDORID_OPENDDS, and OpenDDS::RTPS::Header::version.
00033 : TransportSendStrategy(0, link->impl(), 00034 0, // synch_resource 00035 link->transport_priority(), 00036 make_rch<NullSynchStrategy>()), 00037 link_(link), 00038 override_dest_(0), 00039 override_single_dest_(0), 00040 rtps_header_db_(RTPS::RTPSHDR_SZ, ACE_Message_Block::MB_DATA, 00041 rtps_header_data_, 0, 0, ACE_Message_Block::DONT_DELETE, 0), 00042 rtps_header_mb_(&rtps_header_db_, ACE_Message_Block::DONT_DELETE) 00043 { 00044 rtps_header_.prefix[0] = 'R'; 00045 rtps_header_.prefix[1] = 'T'; 00046 rtps_header_.prefix[2] = 'P'; 00047 rtps_header_.prefix[3] = 'S'; 00048 rtps_header_.version = OpenDDS::RTPS::PROTOCOLVERSION; 00049 rtps_header_.vendorId = OpenDDS::RTPS::VENDORID_OPENDDS; 00050 std::memcpy(rtps_header_.guidPrefix, local_prefix, 00051 sizeof(GuidPrefix_t)); 00052 Serializer writer(&rtps_header_mb_); 00053 // byte order doesn't matter for the RTPS Header 00054 writer << rtps_header_; 00055 }
void OpenDDS::DCPS::RtpsUdpSendStrategy::add_delayed_notification | ( | TransportQueueElement * | element | ) | [protected, virtual] |
Reimplemented from OpenDDS::DCPS::TransportSendStrategy.
Definition at line 246 of file RtpsUdpSendStrategy.cpp.
References OpenDDS::DCPS::RtpsUdpDataLink::add_delayed_notification(), and link_.
00247 { 00248 if (!link_->add_delayed_notification(element)) { 00249 TransportSendStrategy::add_delayed_notification(element); 00250 } 00251 }
RemoveResult OpenDDS::DCPS::RtpsUdpSendStrategy::do_remove_sample | ( | const RepoId & | pub_id, | |
const TransportQueueElement::MatchCriteria & | criteria, | |||
void * | context | |||
) | [protected, virtual] |
Implement framework chain visitations to remove a sample.
Reimplemented from OpenDDS::DCPS::TransportSendStrategy.
Definition at line 254 of file RtpsUdpSendStrategy.cpp.
References OpenDDS::DCPS::RtpsUdpDataLink::do_remove_sample(), and link_.
00257 { 00258 ACE_Guard<ACE_Thread_Mutex>* guard = 00259 static_cast<ACE_Guard<ACE_Thread_Mutex>*>(context); 00260 link_->do_remove_sample(pub_id, criteria, *guard); 00261 return TransportSendStrategy::do_remove_sample(pub_id, criteria, 0); 00262 }
bool OpenDDS::DCPS::RtpsUdpSendStrategy::marshal_transport_header | ( | ACE_Message_Block * | mb | ) | [private, virtual] |
Reimplemented from OpenDDS::DCPS::TransportSendStrategy.
Definition at line 143 of file RtpsUdpSendStrategy.cpp.
References rtps_header_data_, OpenDDS::RTPS::RTPSHDR_SZ, and OpenDDS::DCPS::Serializer::write_octet_array().
00144 { 00145 Serializer writer(mb); // byte order doesn't matter for the RTPS Header 00146 return writer.write_octet_array(reinterpret_cast<ACE_CDR::Octet*>(rtps_header_data_), 00147 RTPS::RTPSHDR_SZ); 00148 }
virtual size_t OpenDDS::DCPS::RtpsUdpSendStrategy::max_message_size | ( | void | ) | const [inline, protected, virtual] |
The maximum size of a message allowed by the this TransportImpl, or 0 if there is no such limit. This is expected to be a constant, for example UDP/IPv4 can send messages of up to 65466 bytes. The transport framework will use the returned value (if > 0) to fragment larger messages. This fragmentation and reassembly will be transparent to the user.
Reimplemented from OpenDDS::DCPS::TransportSendStrategy.
Definition at line 65 of file RtpsUdpSendStrategy.h.
00066 { 00067 return UDP_MAX_MESSAGE_SIZE; 00068 }
const OpenDDS::DCPS::RtpsUdpSendStrategy::OPENDDS_SET | ( | ACE_INET_Addr | ) | [override, private] |
Referenced by send_bytes_i_helper(), and send_multi_i().
RtpsUdpSendStrategy::OverrideToken OpenDDS::DCPS::RtpsUdpSendStrategy::override_destinations | ( | const OPENDDS_SET(ACE_INET_Addr)& | destinations | ) |
Definition at line 130 of file RtpsUdpSendStrategy.cpp.
References OverrideToken.
00131 { 00132 override_dest_ = &dest; 00133 return OverrideToken(this); 00134 }
RtpsUdpSendStrategy::OverrideToken OpenDDS::DCPS::RtpsUdpSendStrategy::override_destinations | ( | const ACE_INET_Addr & | destination | ) |
Definition at line 123 of file RtpsUdpSendStrategy.cpp.
References override_single_dest_, and OverrideToken.
Referenced by OpenDDS::DCPS::RtpsUdpDataLink::send_directed_nack_replies(), OpenDDS::DCPS::RtpsUdpDataLink::send_nack_replies(), and OpenDDS::DCPS::RtpsUdpDataLink::send_nackfrag_replies().
00124 { 00125 override_single_dest_ = &destination; 00126 return OverrideToken(this); 00127 }
ssize_t OpenDDS::DCPS::RtpsUdpSendStrategy::send_bytes_i | ( | const iovec | iov[], | |
int | n | |||
) | [protected, virtual] |
Implements OpenDDS::DCPS::TransportSendStrategy.
Definition at line 64 of file RtpsUdpSendStrategy.cpp.
References iovec::iov_len, and send_bytes_i_helper().
00065 { 00066 ssize_t result = send_bytes_i_helper(iov, n); 00067 00068 if (result == -1 && shouldWarn(errno)) { 00069 // Make the framework think this was a successful send to avoid 00070 // putting the send strategy in suspended mode. If reliability 00071 // is enabled, the data may be resent later. 00072 ssize_t b = 0; 00073 for (int i = 0; i < n; ++i) { 00074 b += iov[i].iov_len; 00075 } 00076 result = b; 00077 } 00078 00079 return result; 00080 }
ssize_t OpenDDS::DCPS::RtpsUdpSendStrategy::send_bytes_i_helper | ( | const iovec | iov[], | |
int | n | |||
) | [protected] |
Definition at line 83 of file RtpsUdpSendStrategy.cpp.
References OpenDDS::DCPS::TransportSendStrategy::current_packet_first_element(), OpenDDS::DCPS::RtpsUdpDataLink::get_locator(), OpenDDS::DCPS::RtpsUdpDataLink::get_locators(), OpenDDS::DCPS::GUID_UNKNOWN, link_, OPENDDS_SET(), override_single_dest_, OpenDDS::DCPS::TransportQueueElement::publication_id(), send_multi_i(), send_single_i(), and OpenDDS::DCPS::TransportQueueElement::subscription_id().
Referenced by send_bytes_i().
00084 { 00085 if (override_single_dest_) { 00086 return send_single_i(iov, n, *override_single_dest_); 00087 } 00088 00089 if (override_dest_) { 00090 return send_multi_i(iov, n, *override_dest_); 00091 } 00092 00093 // determine destination address(es) from TransportQueueElement in progress 00094 TransportQueueElement* elem = current_packet_first_element(); 00095 if (!elem) { 00096 errno = ENOTCONN; 00097 return -1; 00098 } 00099 00100 const RepoId remote_id = elem->subscription_id(); 00101 OPENDDS_SET(ACE_INET_Addr) addrs; 00102 00103 if (remote_id != GUID_UNKNOWN) { 00104 const ACE_INET_Addr remote = link_->get_locator(remote_id); 00105 if (remote != ACE_INET_Addr()) { 00106 addrs.insert(remote); 00107 } 00108 } 00109 00110 if (addrs.empty()) { 00111 link_->get_locators(elem->publication_id(), addrs); 00112 } 00113 00114 if (addrs.empty()) { 00115 errno = ENOTCONN; 00116 return -1; 00117 } 00118 00119 return send_multi_i(iov, n, addrs); 00120 }
ssize_t OpenDDS::DCPS::RtpsUdpSendStrategy::send_multi_i | ( | const iovec | iov[], | |
int | n, | |||
const OPENDDS_SET(ACE_INET_Addr)& | addrs | |||
) | [private] |
Definition at line 199 of file RtpsUdpSendStrategy.cpp.
References OPENDDS_SET(), and send_single_i().
Referenced by send_bytes_i_helper(), and send_rtps_control().
00201 { 00202 ssize_t result = -1; 00203 typedef OPENDDS_SET(ACE_INET_Addr)::const_iterator iter_t; 00204 for (iter_t iter = addrs.begin(); iter != addrs.end(); ++iter) { 00205 const ssize_t result_per_dest = send_single_i(iov, n, *iter); 00206 if (result_per_dest >= 0) { 00207 result = result_per_dest; 00208 } 00209 } 00210 return result; 00211 }
void OpenDDS::DCPS::RtpsUdpSendStrategy::send_rtps_control | ( | ACE_Message_Block & | submessages, | |
const OPENDDS_SET(ACE_INET_Addr)& | destinations | |||
) |
Definition at line 175 of file RtpsUdpSendStrategy.cpp.
References ACE_Message_Block::cont(), LM_ERROR, LM_WARNING, OpenDDS::DCPS::MAX_SEND_BLOCKS, OpenDDS::DCPS::TransportSendStrategy::mb_to_iov(), rtps_header_mb_, and send_multi_i().
00177 { 00178 rtps_header_mb_.cont(&submessages); 00179 #if defined(OPENDDS_SECURITY) 00180 Message_Block_Ptr alternate(pre_send_packet(&rtps_header_mb_)); 00181 ACE_Message_Block& use_mb = alternate ? *alternate : rtps_header_mb_; 00182 #else 00183 ACE_Message_Block& use_mb = rtps_header_mb_; 00184 #endif 00185 00186 iovec iov[MAX_SEND_BLOCKS]; 00187 const int num_blocks = mb_to_iov(use_mb, iov); 00188 const ssize_t result = send_multi_i(iov, num_blocks, addrs); 00189 if (result < 0) { 00190 const ACE_Log_Priority prio = shouldWarn(errno) ? LM_WARNING : LM_ERROR; 00191 ACE_ERROR((prio, "(%P|%t) RtpsUdpSendStrategy::send_rtps_control() - " 00192 "failed to send RTPS control message\n")); 00193 } 00194 00195 rtps_header_mb_.cont(0); 00196 }
void OpenDDS::DCPS::RtpsUdpSendStrategy::send_rtps_control | ( | ACE_Message_Block & | submessages, | |
const ACE_INET_Addr & | destination | |||
) |
Definition at line 151 of file RtpsUdpSendStrategy.cpp.
References ACE_Message_Block::cont(), LM_ERROR, LM_WARNING, OpenDDS::DCPS::MAX_SEND_BLOCKS, OpenDDS::DCPS::TransportSendStrategy::mb_to_iov(), rtps_header_mb_, and send_single_i().
Referenced by OpenDDS::DCPS::RtpsUdpDataLink::durability_resend(), OpenDDS::DCPS::RtpsUdpDataLink::send_ack_nacks(), OpenDDS::DCPS::RtpsUdpDataLink::send_directed_heartbeats(), OpenDDS::DCPS::RtpsUdpDataLink::send_directed_nack_replies(), OpenDDS::DCPS::RtpsUdpDataLink::send_durability_gaps(), OpenDDS::DCPS::RtpsUdpDataLink::send_heartbeat_replies(), OpenDDS::DCPS::RtpsUdpDataLink::send_heartbeats(), OpenDDS::DCPS::RtpsUdpDataLink::send_heartbeats_manual(), and OpenDDS::DCPS::RtpsUdpDataLink::send_nack_replies().
00153 { 00154 rtps_header_mb_.cont(&submessages); 00155 #if defined(OPENDDS_SECURITY) 00156 Message_Block_Ptr alternate(pre_send_packet(&rtps_header_mb_)); 00157 ACE_Message_Block& use_mb = alternate ? *alternate : rtps_header_mb_; 00158 #else 00159 ACE_Message_Block& use_mb = rtps_header_mb_; 00160 #endif 00161 00162 iovec iov[MAX_SEND_BLOCKS]; 00163 const int num_blocks = mb_to_iov(use_mb, iov); 00164 const ssize_t result = send_single_i(iov, num_blocks, addr); 00165 if (result < 0) { 00166 const ACE_Log_Priority prio = shouldWarn(errno) ? LM_WARNING : LM_ERROR; 00167 ACE_ERROR((prio, "(%P|%t) RtpsUdpSendStrategy::send_rtps_control() - " 00168 "failed to send RTPS control message\n")); 00169 } 00170 00171 rtps_header_mb_.cont(0); 00172 }
ssize_t OpenDDS::DCPS::RtpsUdpSendStrategy::send_single_i | ( | const iovec | iov[], | |
int | n, | |||
const ACE_INET_Addr & | addr | |||
) | [private] |
Definition at line 214 of file RtpsUdpSendStrategy.cpp.
References ACE_TEXT(), ACE_INET_Addr::addr_to_string(), iovec::iov_len, link_, LM_ERROR, LM_WARNING, ACE_SOCK_Dgram::send(), OpenDDS::DCPS::TransportSendStrategy::UDP_MAX_MESSAGE_SIZE, and OpenDDS::DCPS::RtpsUdpDataLink::unicast_socket().
Referenced by send_bytes_i_helper(), send_multi_i(), and send_rtps_control().
00216 { 00217 #ifdef ACE_LACKS_SENDMSG 00218 char buffer[UDP_MAX_MESSAGE_SIZE]; 00219 char *iter = buffer; 00220 for (int i = 0; i < n; ++i) { 00221 if (size_t(iter - buffer + iov[i].iov_len) > UDP_MAX_MESSAGE_SIZE) { 00222 ACE_ERROR((LM_ERROR, "(%P|%t) RtpsUdpSendStrategy::send_single_i() - " 00223 "message too large at index %d size %d\n", i, iov[i].iov_len)); 00224 return -1; 00225 } 00226 std::memcpy(iter, iov[i].iov_base, iov[i].iov_len); 00227 iter += iov[i].iov_len; 00228 } 00229 const ssize_t result = link_->unicast_socket().send(buffer, iter - buffer, addr); 00230 #else 00231 const ssize_t result = link_->unicast_socket().send(iov, n, addr); 00232 #endif 00233 if (result < 0) { 00234 ACE_TCHAR addr_buff[256] = {}; 00235 int err = errno; 00236 addr.addr_to_string(addr_buff, 256, 0); 00237 errno = err; 00238 const ACE_Log_Priority prio = shouldWarn(errno) ? LM_WARNING : LM_ERROR; 00239 ACE_ERROR((prio, "(%P|%t) RtpsUdpSendStrategy::send_single_i() - " 00240 "destination %s failed %p\n", addr_buff, ACE_TEXT("send"))); 00241 } 00242 return result; 00243 }
void OpenDDS::DCPS::RtpsUdpSendStrategy::stop_i | ( | ) | [virtual] |
Let the subclass stop.
Implements OpenDDS::DCPS::TransportSendStrategy.
Definition at line 643 of file RtpsUdpSendStrategy.cpp.
friend struct OverrideToken [friend] |
Definition at line 45 of file RtpsUdpSendStrategy.h.
Referenced by override_destinations().
Definition at line 108 of file RtpsUdpSendStrategy.h.
Referenced by add_delayed_notification(), do_remove_sample(), send_bytes_i_helper(), and send_single_i().
const ACE_INET_Addr* OpenDDS::DCPS::RtpsUdpSendStrategy::override_single_dest_ [private] |
Definition at line 110 of file RtpsUdpSendStrategy.h.
Referenced by override_destinations(), send_bytes_i_helper(), and OpenDDS::DCPS::RtpsUdpSendStrategy::OverrideToken::~OverrideToken().
Definition at line 112 of file RtpsUdpSendStrategy.h.
Referenced by RtpsUdpSendStrategy().
Definition at line 113 of file RtpsUdpSendStrategy.h.
Referenced by marshal_transport_header().
Definition at line 114 of file RtpsUdpSendStrategy.h.
Definition at line 115 of file RtpsUdpSendStrategy.h.
Referenced by RtpsUdpSendStrategy(), and send_rtps_control().