00001
00002
00003
00004
00005
00006
00007
00008 #ifndef DCPS_RTPSUDPRECEIVESTRATEGY_H
00009 #define DCPS_RTPSUDPRECEIVESTRATEGY_H
00010
00011 #include "Rtps_Udp_Export.h"
00012 #include "RtpsTransportHeader.h"
00013 #include "RtpsSampleHeader.h"
00014
00015 #include "dds/DCPS/transport/framework/TransportReceiveStrategy_T.h"
00016
00017 #include "dds/DCPS/RTPS/RtpsCoreC.h"
00018
00019 #include "ace/Event_Handler.h"
00020 #include "ace/INET_Addr.h"
00021
00022 #include <cstring>
00023
00024 namespace OpenDDS {
00025 namespace DCPS {
00026
00027 class RtpsUdpDataLink;
00028 class ReceivedDataSample;
00029
00030 class OpenDDS_Rtps_Udp_Export RtpsUdpReceiveStrategy
00031 : public TransportReceiveStrategy<RtpsTransportHeader, RtpsSampleHeader>,
00032 public ACE_Event_Handler {
00033 public:
00034 explicit RtpsUdpReceiveStrategy(RtpsUdpDataLink* link);
00035
00036 virtual int handle_input(ACE_HANDLE fd);
00037
00038
00039
00040
00041
00042 bool remove_frags_from_bitmap(CORBA::Long bitmap[], CORBA::ULong num_bits,
00043 const SequenceNumber& base,
00044 const RepoId& pub_id);
00045
00046
00047
00048 void remove_fragments(const SequenceRange& range, const RepoId& pub_id);
00049
00050 typedef std::pair<SequenceNumber, RTPS::FragmentNumberSet> SeqFragPair;
00051 typedef OPENDDS_VECTOR(SeqFragPair) FragmentInfo;
00052
00053 bool has_fragments(const SequenceRange& range, const RepoId& pub_id,
00054 FragmentInfo* frag_info = 0);
00055
00056
00057
00058
00059 const ReceivedDataSample* withhold_data_from(const RepoId& sub_id);
00060 void do_not_withhold_data_from(const RepoId& sub_id);
00061
00062 private:
00063 virtual ssize_t receive_bytes(iovec iov[],
00064 int n,
00065 ACE_INET_Addr& remote_address,
00066 ACE_HANDLE fd);
00067
00068 virtual void deliver_sample(ReceivedDataSample& sample,
00069 const ACE_INET_Addr& remote_address);
00070
00071 virtual int start_i();
00072 virtual void stop_i();
00073
00074 virtual bool check_header(const RtpsTransportHeader& header);
00075
00076 virtual bool check_header(const RtpsSampleHeader& header);
00077
00078 virtual bool reassemble(ReceivedDataSample& data);
00079
00080
00081 RtpsUdpDataLink* link_;
00082 SequenceNumber last_received_;
00083
00084 const ReceivedDataSample* recvd_sample_;
00085 RepoIdSet readers_withheld_, readers_selected_;
00086
00087 SequenceRange frags_;
00088 TransportReassembly reassembly_;
00089
00090 struct MessageReceiver {
00091
00092 explicit MessageReceiver(const GuidPrefix_t& local);
00093
00094 void reset(const ACE_INET_Addr& remote_address, const RTPS::Header& hdr);
00095
00096 void submsg(const RTPS::Submessage& s);
00097 void submsg(const RTPS::InfoDestinationSubmessage& id);
00098 void submsg(const RTPS::InfoReplySubmessage& ir);
00099 void submsg(const RTPS::InfoReplyIp4Submessage& iri4);
00100 void submsg(const RTPS::InfoTimestampSubmessage& it);
00101 void submsg(const RTPS::InfoSourceSubmessage& is);
00102
00103 void fill_header(DataSampleHeader& header) const;
00104
00105 GuidPrefix_t local_;
00106 RTPS::ProtocolVersion_t source_version_;
00107 RTPS::VendorId_t source_vendor_;
00108 GuidPrefix_t source_guid_prefix_;
00109 GuidPrefix_t dest_guid_prefix_;
00110 DCPS::LocatorSeq unicast_reply_locator_list_;
00111 DCPS::LocatorSeq multicast_reply_locator_list_;
00112 bool have_timestamp_;
00113 RTPS::Time_t timestamp_;
00114 };
00115
00116 MessageReceiver receiver_;
00117 ACE_INET_Addr remote_address_;
00118 };
00119
00120 }
00121 }
00122
00123 #endif