RtpsUdpReceiveStrategy.h
Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008 #ifndef DCPS_RTPSUDPRECEIVESTRATEGY_H
00009 #define DCPS_RTPSUDPRECEIVESTRATEGY_H
00010
00011 #include "Rtps_Udp_Export.h"
00012 #include "RtpsTransportHeader.h"
00013 #include "RtpsSampleHeader.h"
00014
00015 #include "dds/DCPS/transport/framework/TransportReceiveStrategy_T.h"
00016
00017 #include "dds/DCPS/RTPS/RtpsCoreC.h"
00018 #include "dds/DCPS/RcEventHandler.h"
00019
00020 #include "ace/INET_Addr.h"
00021
00022 #include <cstring>
00023
00024 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00025
00026 namespace OpenDDS {
00027 namespace DCPS {
00028
00029 class RtpsUdpDataLink;
00030 class ReceivedDataSample;
00031
00032 class OpenDDS_Rtps_Udp_Export RtpsUdpReceiveStrategy
00033 : public TransportReceiveStrategy<RtpsTransportHeader, RtpsSampleHeader>,
00034 public RcEventHandler
00035 {
00036 public:
00037 explicit RtpsUdpReceiveStrategy(RtpsUdpDataLink* link, const GuidPrefix_t& local_prefix);
00038
00039 virtual int handle_input(ACE_HANDLE fd);
00040
00041
00042
00043
00044
00045 bool remove_frags_from_bitmap(CORBA::Long bitmap[], CORBA::ULong num_bits,
00046 const SequenceNumber& base,
00047 const RepoId& pub_id);
00048
00049
00050
00051 void remove_fragments(const SequenceRange& range, const RepoId& pub_id);
00052
00053 typedef std::pair<SequenceNumber, RTPS::FragmentNumberSet> SeqFragPair;
00054 typedef OPENDDS_VECTOR(SeqFragPair) FragmentInfo;
00055
00056 bool has_fragments(const SequenceRange& range, const RepoId& pub_id,
00057 FragmentInfo* frag_info = 0);
00058
00059
00060
00061
00062 const ReceivedDataSample* withhold_data_from(const RepoId& sub_id);
00063 void do_not_withhold_data_from(const RepoId& sub_id);
00064
00065 private:
00066 virtual ssize_t receive_bytes(iovec iov[],
00067 int n,
00068 ACE_INET_Addr& remote_address,
00069 ACE_HANDLE fd);
00070
00071 virtual void deliver_sample(ReceivedDataSample& sample,
00072 const ACE_INET_Addr& remote_address);
00073
00074 void deliver_sample_i(ReceivedDataSample& sample,
00075 const RTPS::Submessage& submessage);
00076
00077 virtual int start_i();
00078 virtual void stop_i();
00079
00080 virtual bool check_header(const RtpsTransportHeader& header);
00081
00082 virtual bool check_header(const RtpsSampleHeader& header);
00083
00084 virtual bool reassemble(ReceivedDataSample& data);
00085
00086 #if defined(OPENDDS_SECURITY)
00087 void sec_submsg_to_octets(DDS::OctetSeq& encoded,
00088 const RTPS::Submessage& postfix);
00089
00090 void deliver_from_secure(const RTPS::Submessage& submessage);
00091
00092 bool decode_payload(ReceivedDataSample& sample,
00093 const RTPS::DataSubmessage& submessage);
00094 #endif
00095
00096 RtpsUdpDataLink* link_;
00097 SequenceNumber last_received_;
00098
00099 const ReceivedDataSample* recvd_sample_;
00100 RepoIdSet readers_withheld_, readers_selected_;
00101
00102 SequenceRange frags_;
00103 TransportReassembly reassembly_;
00104
00105 struct MessageReceiver {
00106
00107 explicit MessageReceiver(const GuidPrefix_t& local);
00108
00109 void reset(const ACE_INET_Addr& remote_address, const RTPS::Header& hdr);
00110
00111 void submsg(const RTPS::Submessage& s);
00112 void submsg(const RTPS::InfoDestinationSubmessage& id);
00113 void submsg(const RTPS::InfoReplySubmessage& ir);
00114 void submsg(const RTPS::InfoReplyIp4Submessage& iri4);
00115 void submsg(const RTPS::InfoTimestampSubmessage& it);
00116 void submsg(const RTPS::InfoSourceSubmessage& is);
00117
00118 void fill_header(DataSampleHeader& header) const;
00119
00120 GuidPrefix_t local_;
00121 RTPS::ProtocolVersion_t source_version_;
00122 RTPS::VendorId_t source_vendor_;
00123 GuidPrefix_t source_guid_prefix_;
00124 GuidPrefix_t dest_guid_prefix_;
00125 DCPS::LocatorSeq unicast_reply_locator_list_;
00126 DCPS::LocatorSeq multicast_reply_locator_list_;
00127 bool have_timestamp_;
00128 RTPS::Time_t timestamp_;
00129 };
00130
00131 MessageReceiver receiver_;
00132 ACE_INET_Addr remote_address_;
00133
00134 #if defined(OPENDDS_SECURITY)
00135 RTPS::SecuritySubmessage secure_prefix_;
00136 OPENDDS_VECTOR(RTPS::Submessage) secure_submessages_;
00137 ReceivedDataSample secure_sample_;
00138 #endif
00139 };
00140
00141 }
00142 }
00143
00144 OPENDDS_END_VERSIONED_NAMESPACE_DECL
00145
00146 #endif