RtpsUdpReceiveStrategy.h

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #ifndef DCPS_RTPSUDPRECEIVESTRATEGY_H
00009 #define DCPS_RTPSUDPRECEIVESTRATEGY_H
00010 
00011 #include "Rtps_Udp_Export.h"
00012 #include "RtpsTransportHeader.h"
00013 #include "RtpsSampleHeader.h"
00014 
00015 #include "dds/DCPS/transport/framework/TransportReceiveStrategy_T.h"
00016 
00017 #include "dds/DCPS/RTPS/RtpsCoreC.h"
00018 
00019 #include "ace/Event_Handler.h"
00020 #include "ace/INET_Addr.h"
00021 
00022 #include <cstring>
00023 
00024 namespace OpenDDS {
00025 namespace DCPS {
00026 
00027 class RtpsUdpDataLink;
00028 class ReceivedDataSample;
00029 
00030 class OpenDDS_Rtps_Udp_Export RtpsUdpReceiveStrategy
00031   : public TransportReceiveStrategy<RtpsTransportHeader, RtpsSampleHeader>,
00032     public ACE_Event_Handler {
00033 public:
00034   explicit RtpsUdpReceiveStrategy(RtpsUdpDataLink* link);
00035 
00036   virtual int handle_input(ACE_HANDLE fd);
00037 
00038   /// For each "1" bit in the bitmap, change it to a "0" if there are
00039   /// fragments from publication "pub_id" for the sequence number represented
00040   /// by that position in the bitmap.
00041   /// Returns true if the bitmap was changed.
00042   bool remove_frags_from_bitmap(CORBA::Long bitmap[], CORBA::ULong num_bits,
00043                                 const SequenceNumber& base,
00044                                 const RepoId& pub_id);
00045 
00046   /// Remove any saved fragments.  We do not expect to receive any more
00047   /// fragments with sequence numbers in "range" from publication "pub_id".
00048   void remove_fragments(const SequenceRange& range, const RepoId& pub_id);
00049 
00050   typedef std::pair<SequenceNumber, RTPS::FragmentNumberSet> SeqFragPair;
00051   typedef OPENDDS_VECTOR(SeqFragPair) FragmentInfo;
00052 
00053   bool has_fragments(const SequenceRange& range, const RepoId& pub_id,
00054                      FragmentInfo* frag_info = 0);
00055 
00056   /// Prevent delivery of the currently in-progress data sample to the
00057   /// subscription sub_id.  Returns pointer to the in-progress data so
00058   /// it can be stored for later delivery.
00059   const ReceivedDataSample* withhold_data_from(const RepoId& sub_id);
00060   void do_not_withhold_data_from(const RepoId& sub_id);
00061 
00062 private:
00063   virtual ssize_t receive_bytes(iovec iov[],
00064                                 int n,
00065                                 ACE_INET_Addr& remote_address,
00066                                 ACE_HANDLE fd);
00067 
00068   virtual void deliver_sample(ReceivedDataSample& sample,
00069                               const ACE_INET_Addr& remote_address);
00070 
00071   virtual int start_i();
00072   virtual void stop_i();
00073 
00074   virtual bool check_header(const RtpsTransportHeader& header);
00075 
00076   virtual bool check_header(const RtpsSampleHeader& header);
00077 
00078   virtual bool reassemble(ReceivedDataSample& data);
00079 
00080 
00081   RtpsUdpDataLink* link_;
00082   SequenceNumber last_received_;
00083 
00084   const ReceivedDataSample* recvd_sample_;
00085   RepoIdSet readers_withheld_, readers_selected_;
00086 
00087   SequenceRange frags_;
00088   TransportReassembly reassembly_;
00089 
00090   struct MessageReceiver {
00091 
00092     explicit MessageReceiver(const GuidPrefix_t& local);
00093 
00094     void reset(const ACE_INET_Addr& remote_address, const RTPS::Header& hdr);
00095 
00096     void submsg(const RTPS::Submessage& s);
00097     void submsg(const RTPS::InfoDestinationSubmessage& id);
00098     void submsg(const RTPS::InfoReplySubmessage& ir);
00099     void submsg(const RTPS::InfoReplyIp4Submessage& iri4);
00100     void submsg(const RTPS::InfoTimestampSubmessage& it);
00101     void submsg(const RTPS::InfoSourceSubmessage& is);
00102 
00103     void fill_header(DataSampleHeader& header) const;
00104 
00105     GuidPrefix_t local_;
00106     RTPS::ProtocolVersion_t source_version_;
00107     RTPS::VendorId_t source_vendor_;
00108     GuidPrefix_t source_guid_prefix_;
00109     GuidPrefix_t dest_guid_prefix_;
00110     DCPS::LocatorSeq unicast_reply_locator_list_;
00111     DCPS::LocatorSeq multicast_reply_locator_list_;
00112     bool have_timestamp_;
00113     RTPS::Time_t timestamp_;
00114   };
00115 
00116   MessageReceiver receiver_;
00117   ACE_INET_Addr remote_address_;
00118 };
00119 
00120 } // namespace DCPS
00121 } // namespace OpenDDS
00122 
00123 #endif  /* DCPS_RTPSUDPRECEIVESTRATEGY_H */

Generated on Fri Feb 12 20:05:26 2016 for OpenDDS by  doxygen 1.4.7