00001
00002
00003
00004
00005
00006
00007
00008 #ifndef RTPS_BASEMESSAGEUTILS_H
00009 #define RTPS_BASEMESSAGEUTILS_H
00010
00011 #include "RtpsCoreTypeSupportImpl.h"
00012 #include "dds/DCPS/Serializer.h"
00013 #include "dds/DdsDcpsInfoUtilsC.h"
00014 #include "dds/DdsDcpsInfoUtilsTypeSupportImpl.h"
00015 #include "md5.h"
00016 #include "ace/INET_Addr.h"
00017 #include "ace/Message_Block.h"
00018
00019 #include <cstring>
00020
00021 namespace OpenDDS {
00022 namespace RTPS {
00023 using DCPS::GuidPrefix_t;
00024 using DCPS::GUID_t;
00025 using DCPS::EntityId_t;
00026
00027 template <typename T>
00028 void marshal_key_hash(const T& msg, KeyHash_t& hash) {
00029 using OpenDDS::DCPS::Serializer;
00030
00031 OpenDDS::DCPS::KeyOnly<const T> ko(msg);
00032
00033 static const size_t HASH_LIMIT = 16;
00034 std::memset(hash.value, 0, HASH_LIMIT);
00035
00036
00037
00038 #if defined ACE_LITTLE_ENDIAN
00039 static const bool swap_bytes = true;
00040 #else
00041 static const bool swap_bytes = false;
00042 #endif
00043
00044 if (gen_is_bounded_size(ko) &&
00045 gen_max_marshaled_size(ko, true ) <= HASH_LIMIT) {
00046
00047
00048 ACE_Message_Block mb(HASH_LIMIT);
00049 Serializer out_serializer(&mb, swap_bytes, Serializer::ALIGN_INITIALIZE);
00050 out_serializer << ko;
00051 std::memcpy(hash.value, mb.rd_ptr(), mb.length());
00052
00053 } else {
00054
00055 size_t size = 0, padding = 0;
00056 gen_find_size(ko, size, padding);
00057 ACE_Message_Block mb(size + padding);
00058 Serializer out_serializer(&mb, swap_bytes, Serializer::ALIGN_INITIALIZE);
00059 out_serializer << ko;
00060
00061 MD5_CTX ctx;
00062 MD5_Init(&ctx);
00063 MD5_Update(&ctx, mb.rd_ptr(), static_cast<unsigned long>(mb.length()));
00064 MD5_Final(hash.value, &ctx);
00065 }
00066 }
00067
00068 inline void assign(GuidPrefix_t& dest, const GuidPrefix_t& src)
00069 {
00070 std::memcpy(&dest[0], &src[0], sizeof(GuidPrefix_t));
00071 }
00072
00073 inline void assign(OpenDDS::DCPS::OctetArray16& dest,
00074 const OpenDDS::DCPS::OctetArray16& src)
00075 {
00076 std::memcpy(&dest[0], &src[0], sizeof(OpenDDS::DCPS::OctetArray16));
00077 }
00078
00079 inline void assign(OpenDDS::DCPS::OctetArray16& dest,
00080 const ACE_CDR::ULong& ipv4addr_be)
00081 {
00082 std::memset(&dest[0], 0, 12);
00083 dest[12] = ipv4addr_be >> 24;
00084 dest[13] = ipv4addr_be >> 16;
00085 dest[14] = ipv4addr_be >> 8;
00086 dest[15] = ipv4addr_be;
00087 }
00088
00089 inline void assign(DCPS::EntityKey_t& lhs, unsigned int rhs)
00090 {
00091 lhs[0] = static_cast<CORBA::Octet>(rhs);
00092 lhs[1] = static_cast<CORBA::Octet>(rhs >> 8);
00093 lhs[2] = static_cast<CORBA::Octet>(rhs >> 16);
00094 }
00095
00096
00097 inline void
00098 address_to_bytes(OpenDDS::DCPS::OctetArray16& dest, const ACE_INET_Addr& addr)
00099 {
00100 const void* raw = addr.get_addr();
00101 #ifdef ACE_HAS_IPV6
00102 if (addr.get_type() == AF_INET6) {
00103 const sockaddr_in6* in = static_cast<const sockaddr_in6*>(raw);
00104 std::memcpy(&dest[0], &in->sin6_addr, 16);
00105 } else {
00106 #else
00107 {
00108 #endif
00109 const sockaddr_in* in = static_cast<const sockaddr_in*>(raw);
00110 std::memset(&dest[0], 0, 12);
00111 std::memcpy(&dest[12], &in->sin_addr, 4);
00112 }
00113 }
00114
00115 inline int
00116 address_to_kind(const ACE_INET_Addr& addr)
00117 {
00118 #ifdef ACE_HAS_IPV6
00119 return addr.get_type() == AF_INET6 ? LOCATOR_KIND_UDPv6 : LOCATOR_KIND_UDPv4;
00120 #else
00121 ACE_UNUSED_ARG(addr);
00122 return LOCATOR_KIND_UDPv4;
00123 #endif
00124 }
00125
00126 inline int
00127 locator_to_address(ACE_INET_Addr& dest,
00128 const OpenDDS::DCPS::Locator_t& locator,
00129 bool map )
00130 {
00131 switch (locator.kind) {
00132 #ifdef ACE_HAS_IPV6
00133 case LOCATOR_KIND_UDPv6:
00134 dest.set_type(AF_INET6);
00135 if (dest.set_address(reinterpret_cast<const char*>(locator.address),
00136 16, 0 ) == -1) {
00137 return -1;
00138 }
00139 dest.set_port_number(locator.port);
00140 return 0;
00141 break;
00142 #endif
00143 case LOCATOR_KIND_UDPv4:
00144 #if !defined (ACE_HAS_IPV6) || !defined (IPV6_V6ONLY)
00145 ACE_UNUSED_ARG(map);
00146 #endif
00147 dest.set_type(AF_INET);
00148 if (dest.set_address(reinterpret_cast<const char*>(locator.address)
00149 + 12, 4, 0
00150 #if defined (ACE_HAS_IPV6) && defined (IPV6_V6ONLY)
00151 , map ? 1 : 0
00152 #endif
00153 ) == -1) {
00154 return -1;
00155 }
00156 dest.set_port_number(locator.port);
00157 return 0;
00158 break;
00159 default:
00160 return -1;
00161 }
00162
00163 return -1;
00164 }
00165
00166 inline DDS::ReturnCode_t
00167 blob_to_locators(
00168 const OpenDDS::DCPS::TransportBLOB& blob,
00169 OpenDDS::DCPS::LocatorSeq& locators,
00170 bool& requires_inline_qos)
00171 {
00172 ACE_Data_Block db(blob.length(), ACE_Message_Block::MB_DATA,
00173 reinterpret_cast<const char*>(blob.get_buffer()),
00174 0 , 0 , ACE_Message_Block::DONT_DELETE, 0 );
00175 ACE_Message_Block mb(&db, ACE_Message_Block::DONT_DELETE, 0 );
00176 mb.wr_ptr(mb.space());
00177
00178 using OpenDDS::DCPS::Serializer;
00179 Serializer ser(&mb, ACE_CDR_BYTE_ORDER, Serializer::ALIGN_CDR);
00180 if (!(ser >> locators)) {
00181 ACE_ERROR_RETURN((LM_ERROR,
00182 ACE_TEXT("(%P|%t) blob_to_locators: ")
00183 ACE_TEXT("Failed to deserialize blob's locators\n")),
00184 DDS::RETCODE_ERROR);
00185 }
00186 if (!(ser >> ACE_InputCDR::to_boolean(requires_inline_qos))) {
00187 ACE_ERROR_RETURN((LM_ERROR,
00188 ACE_TEXT("(%P|%t) blob_to_locators: ")
00189 ACE_TEXT("Failed to deserialize blob's inline QoS flag\n")),
00190 DDS::RETCODE_ERROR);
00191 }
00192 return DDS::RETCODE_OK;
00193 }
00194
00195 template <typename T>
00196 void
00197 message_block_to_sequence(const ACE_Message_Block& mb_locator, T& out)
00198 {
00199 out.length (CORBA::ULong(mb_locator.length()));
00200 std::memcpy (out.get_buffer(), mb_locator.rd_ptr(), mb_locator.length());
00201 }
00202
00203 inline void
00204 locators_to_blob(const OpenDDS::DCPS::LocatorSeq& locators, DCPS::TransportBLOB& blob)
00205 {
00206 using OpenDDS::DCPS::Serializer;
00207 size_t size_locator = 0, padding_locator = 0;
00208 DCPS::gen_find_size(locators, size_locator, padding_locator);
00209 ACE_Message_Block mb_locator(size_locator + padding_locator + 1);
00210 Serializer ser_loc(&mb_locator, ACE_CDR_BYTE_ORDER, Serializer::ALIGN_CDR);
00211 ser_loc << locators;
00212
00213
00214
00215 ser_loc << ACE_OutputCDR::from_boolean(false);
00216 message_block_to_sequence(mb_locator, blob);
00217 }
00218
00219 }
00220 }
00221
00222 #endif