BaseMessageUtils.h

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #ifndef RTPS_BASEMESSAGEUTILS_H
00009 #define RTPS_BASEMESSAGEUTILS_H
00010 
00011 #include "RtpsCoreTypeSupportImpl.h"
00012 #include "dds/DCPS/Serializer.h"
00013 #include "dds/DdsDcpsInfoUtilsC.h"
00014 #include "dds/DdsDcpsInfoUtilsTypeSupportImpl.h"
00015 #include "md5.h"
00016 #include "ace/INET_Addr.h"
00017 #include "ace/Message_Block.h"
00018 
00019 #include <cstring>
00020 
00021 namespace OpenDDS {
00022 namespace RTPS {
00023   using DCPS::GuidPrefix_t;
00024   using DCPS::GUID_t;
00025   using DCPS::EntityId_t;
00026 
00027 template <typename T>
00028 void marshal_key_hash(const T& msg, KeyHash_t& hash) {
00029   using OpenDDS::DCPS::Serializer;
00030 
00031   OpenDDS::DCPS::KeyOnly<const T> ko(msg);
00032 
00033   static const size_t HASH_LIMIT = 16;
00034   std::memset(hash.value, 0, HASH_LIMIT);
00035 
00036   // Key Hash must use big endian ordering.
00037   // Native==Little endian means we need to swap
00038 #if defined ACE_LITTLE_ENDIAN
00039   static const bool swap_bytes = true;
00040 #else
00041   static const bool swap_bytes = false;
00042 #endif
00043 
00044   if (gen_is_bounded_size(ko) &&
00045       gen_max_marshaled_size(ko, true /*align*/) <= HASH_LIMIT) {
00046     // If it is bounded and can always fit in 16 bytes, we will use the
00047     // marshaled key
00048     ACE_Message_Block mb(HASH_LIMIT);
00049     Serializer out_serializer(&mb, swap_bytes, Serializer::ALIGN_INITIALIZE);
00050     out_serializer << ko;
00051     std::memcpy(hash.value, mb.rd_ptr(), mb.length());
00052 
00053   } else {
00054     // We will use the hash of the marshaled key
00055     size_t size = 0, padding = 0;
00056     gen_find_size(ko, size, padding);
00057     ACE_Message_Block mb(size + padding);
00058     Serializer out_serializer(&mb, swap_bytes, Serializer::ALIGN_INITIALIZE);
00059     out_serializer << ko;
00060 
00061     MD5_CTX ctx;
00062     MD5_Init(&ctx);
00063     MD5_Update(&ctx, mb.rd_ptr(), static_cast<unsigned long>(mb.length()));
00064     MD5_Final(hash.value, &ctx);
00065   }
00066 }
00067 
00068 inline void assign(GuidPrefix_t& dest, const GuidPrefix_t& src)
00069 {
00070   std::memcpy(&dest[0], &src[0], sizeof(GuidPrefix_t));
00071 }
00072 
00073 inline void assign(OpenDDS::DCPS::OctetArray16& dest,
00074                    const OpenDDS::DCPS::OctetArray16& src)
00075 {
00076   std::memcpy(&dest[0], &src[0], sizeof(OpenDDS::DCPS::OctetArray16));
00077 }
00078 
00079 inline void assign(OpenDDS::DCPS::OctetArray16& dest,
00080                    const ACE_CDR::ULong& ipv4addr_be)
00081 {
00082   std::memset(&dest[0], 0, 12);
00083   dest[12] = ipv4addr_be >> 24;
00084   dest[13] = ipv4addr_be >> 16;
00085   dest[14] = ipv4addr_be >> 8;
00086   dest[15] = ipv4addr_be;
00087 }
00088 
00089 inline void assign(DCPS::EntityKey_t& lhs, unsigned int rhs)
00090 {
00091   lhs[0] = static_cast<CORBA::Octet>(rhs);
00092   lhs[1] = static_cast<CORBA::Octet>(rhs >> 8);
00093   lhs[2] = static_cast<CORBA::Octet>(rhs >> 16);
00094 }
00095 
00096 
00097 inline void
00098 address_to_bytes(OpenDDS::DCPS::OctetArray16& dest, const ACE_INET_Addr& addr)
00099 {
00100   const void* raw = addr.get_addr();
00101 #ifdef ACE_HAS_IPV6
00102   if (addr.get_type() == AF_INET6) {
00103     const sockaddr_in6* in = static_cast<const sockaddr_in6*>(raw);
00104     std::memcpy(&dest[0], &in->sin6_addr, 16);
00105   } else {
00106 #else
00107   {
00108 #endif
00109     const sockaddr_in* in = static_cast<const sockaddr_in*>(raw);
00110     std::memset(&dest[0], 0, 12);
00111     std::memcpy(&dest[12], &in->sin_addr, 4);
00112   }
00113 }
00114 
00115 inline int
00116 address_to_kind(const ACE_INET_Addr& addr)
00117 {
00118 #ifdef ACE_HAS_IPV6
00119   return addr.get_type() == AF_INET6 ? LOCATOR_KIND_UDPv6 : LOCATOR_KIND_UDPv4;
00120 #else
00121   ACE_UNUSED_ARG(addr);
00122   return LOCATOR_KIND_UDPv4;
00123 #endif
00124 }
00125 
00126 inline int
00127 locator_to_address(ACE_INET_Addr& dest,
00128                    const OpenDDS::DCPS::Locator_t& locator,
00129                    bool map /*map IPV4 to IPV6 addr*/)
00130 {
00131   switch (locator.kind) {
00132 #ifdef ACE_HAS_IPV6
00133   case LOCATOR_KIND_UDPv6:
00134     dest.set_type(AF_INET6);
00135     if (dest.set_address(reinterpret_cast<const char*>(locator.address),
00136                          16, 0 /*encode*/) == -1) {
00137       return -1;
00138     }
00139     dest.set_port_number(locator.port);
00140     return 0;
00141     break;
00142 #endif
00143   case LOCATOR_KIND_UDPv4:
00144 #if !defined (ACE_HAS_IPV6) || !defined (IPV6_V6ONLY)
00145     ACE_UNUSED_ARG(map);
00146 #endif
00147     dest.set_type(AF_INET);
00148     if (dest.set_address(reinterpret_cast<const char*>(locator.address)
00149                          + 12, 4, 0 /*network order*/
00150 #if defined (ACE_HAS_IPV6) && defined (IPV6_V6ONLY)
00151                          , map ? 1 : 0 /*map IPV4 to IPV6 addr*/
00152 #endif
00153                          ) == -1) {
00154       return -1;
00155     }
00156     dest.set_port_number(locator.port);
00157     return 0;
00158     break;
00159   default:
00160     return -1;  // Unknown kind
00161   }
00162 
00163   return -1;
00164 }
00165 
00166 inline DDS::ReturnCode_t
00167 blob_to_locators(
00168     const OpenDDS::DCPS::TransportBLOB& blob,
00169     OpenDDS::DCPS::LocatorSeq& locators,
00170     bool& requires_inline_qos)
00171 {
00172   ACE_Data_Block db(blob.length(), ACE_Message_Block::MB_DATA,
00173       reinterpret_cast<const char*>(blob.get_buffer()),
00174       0 /*alloc*/, 0 /*lock*/, ACE_Message_Block::DONT_DELETE, 0 /*db_alloc*/);
00175   ACE_Message_Block mb(&db, ACE_Message_Block::DONT_DELETE, 0 /*mb_alloc*/);
00176   mb.wr_ptr(mb.space());
00177 
00178   using OpenDDS::DCPS::Serializer;
00179   Serializer ser(&mb, ACE_CDR_BYTE_ORDER, Serializer::ALIGN_CDR);
00180   if (!(ser >> locators)) {
00181     ACE_ERROR_RETURN((LM_ERROR,
00182                       ACE_TEXT("(%P|%t) blob_to_locators: ")
00183                       ACE_TEXT("Failed to deserialize blob's locators\n")),
00184                       DDS::RETCODE_ERROR);
00185   }
00186   if (!(ser >> ACE_InputCDR::to_boolean(requires_inline_qos))) {
00187     ACE_ERROR_RETURN((LM_ERROR,
00188                       ACE_TEXT("(%P|%t) blob_to_locators: ")
00189                       ACE_TEXT("Failed to deserialize blob's inline QoS flag\n")),
00190                       DDS::RETCODE_ERROR);
00191   }
00192   return DDS::RETCODE_OK;
00193 }
00194 
00195 template <typename T>
00196 void
00197   message_block_to_sequence(const ACE_Message_Block& mb_locator, T& out)
00198 {
00199   out.length (CORBA::ULong(mb_locator.length()));
00200   std::memcpy (out.get_buffer(), mb_locator.rd_ptr(), mb_locator.length());
00201 }
00202 
00203 inline void
00204 locators_to_blob(const OpenDDS::DCPS::LocatorSeq& locators, DCPS::TransportBLOB& blob)
00205 {
00206   using OpenDDS::DCPS::Serializer;
00207   size_t size_locator = 0, padding_locator = 0;
00208   DCPS::gen_find_size(locators, size_locator, padding_locator);
00209   ACE_Message_Block mb_locator(size_locator + padding_locator + 1);
00210   Serializer ser_loc(&mb_locator, ACE_CDR_BYTE_ORDER, Serializer::ALIGN_CDR);
00211   ser_loc << locators;
00212   // Add a bool for 'requires inline qos', see Sedp::set_inline_qos():
00213   // if the bool is no longer the last octet of the sequence then that function
00214   // must be changed as well.
00215   ser_loc << ACE_OutputCDR::from_boolean(false);
00216   message_block_to_sequence(mb_locator, blob);
00217 }
00218 
00219 }
00220 }
00221 
00222 #endif /* RTPS_BASEMESSAGETYPES_H */

Generated on Fri Feb 12 20:05:18 2016 for OpenDDS by  doxygen 1.4.7