RtpsUdpReceiveStrategy.h

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #ifndef DCPS_RTPSUDPRECEIVESTRATEGY_H
00009 #define DCPS_RTPSUDPRECEIVESTRATEGY_H
00010 
00011 #include "Rtps_Udp_Export.h"
00012 #include "RtpsTransportHeader.h"
00013 #include "RtpsSampleHeader.h"
00014 
00015 #include "dds/DCPS/transport/framework/TransportReceiveStrategy_T.h"
00016 
00017 #include "dds/DCPS/RTPS/RtpsCoreC.h"
00018 #include "dds/DCPS/RcEventHandler.h"
00019 
00020 #include "ace/INET_Addr.h"
00021 
00022 #include <cstring>
00023 
00024 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00025 
00026 namespace OpenDDS {
00027 namespace DCPS {
00028 
00029 class RtpsUdpDataLink;
00030 class ReceivedDataSample;
00031 
00032 class OpenDDS_Rtps_Udp_Export RtpsUdpReceiveStrategy
00033   : public TransportReceiveStrategy<RtpsTransportHeader, RtpsSampleHeader>,
00034     public RcEventHandler
00035 {
00036 public:
00037   explicit RtpsUdpReceiveStrategy(RtpsUdpDataLink* link, const GuidPrefix_t& local_prefix);
00038 
00039   virtual int handle_input(ACE_HANDLE fd);
00040 
00041   /// For each "1" bit in the bitmap, change it to a "0" if there are
00042   /// fragments from publication "pub_id" for the sequence number represented
00043   /// by that position in the bitmap.
00044   /// Returns true if the bitmap was changed.
00045   bool remove_frags_from_bitmap(CORBA::Long bitmap[], CORBA::ULong num_bits,
00046                                 const SequenceNumber& base,
00047                                 const RepoId& pub_id);
00048 
00049   /// Remove any saved fragments.  We do not expect to receive any more
00050   /// fragments with sequence numbers in "range" from publication "pub_id".
00051   void remove_fragments(const SequenceRange& range, const RepoId& pub_id);
00052 
00053   typedef std::pair<SequenceNumber, RTPS::FragmentNumberSet> SeqFragPair;
00054   typedef OPENDDS_VECTOR(SeqFragPair) FragmentInfo;
00055 
00056   bool has_fragments(const SequenceRange& range, const RepoId& pub_id,
00057                      FragmentInfo* frag_info = 0);
00058 
00059   /// Prevent delivery of the currently in-progress data sample to the
00060   /// subscription sub_id.  Returns pointer to the in-progress data so
00061   /// it can be stored for later delivery.
00062   const ReceivedDataSample* withhold_data_from(const RepoId& sub_id);
00063   void do_not_withhold_data_from(const RepoId& sub_id);
00064 
00065 private:
00066   virtual ssize_t receive_bytes(iovec iov[],
00067                                 int n,
00068                                 ACE_INET_Addr& remote_address,
00069                                 ACE_HANDLE fd);
00070 
00071   virtual void deliver_sample(ReceivedDataSample& sample,
00072                               const ACE_INET_Addr& remote_address);
00073 
00074   void deliver_sample_i(ReceivedDataSample& sample,
00075                         const RTPS::Submessage& submessage);
00076 
00077   virtual int start_i();
00078   virtual void stop_i();
00079 
00080   virtual bool check_header(const RtpsTransportHeader& header);
00081 
00082   virtual bool check_header(const RtpsSampleHeader& header);
00083 
00084   virtual bool reassemble(ReceivedDataSample& data);
00085 
00086 #if defined(OPENDDS_SECURITY)
00087   void sec_submsg_to_octets(DDS::OctetSeq& encoded,
00088                             const RTPS::Submessage& postfix);
00089 
00090   void deliver_from_secure(const RTPS::Submessage& submessage);
00091 
00092   bool decode_payload(ReceivedDataSample& sample,
00093                       const RTPS::DataSubmessage& submessage);
00094 #endif
00095 
00096   RtpsUdpDataLink* link_;
00097   SequenceNumber last_received_;
00098 
00099   const ReceivedDataSample* recvd_sample_;
00100   RepoIdSet readers_withheld_, readers_selected_;
00101 
00102   SequenceRange frags_;
00103   TransportReassembly reassembly_;
00104 
00105   struct MessageReceiver {
00106 
00107     explicit MessageReceiver(const GuidPrefix_t& local);
00108 
00109     void reset(const ACE_INET_Addr& remote_address, const RTPS::Header& hdr);
00110 
00111     void submsg(const RTPS::Submessage& s);
00112     void submsg(const RTPS::InfoDestinationSubmessage& id);
00113     void submsg(const RTPS::InfoReplySubmessage& ir);
00114     void submsg(const RTPS::InfoReplyIp4Submessage& iri4);
00115     void submsg(const RTPS::InfoTimestampSubmessage& it);
00116     void submsg(const RTPS::InfoSourceSubmessage& is);
00117 
00118     void fill_header(DataSampleHeader& header) const;
00119 
00120     GuidPrefix_t local_;
00121     RTPS::ProtocolVersion_t source_version_;
00122     RTPS::VendorId_t source_vendor_;
00123     GuidPrefix_t source_guid_prefix_;
00124     GuidPrefix_t dest_guid_prefix_;
00125     DCPS::LocatorSeq unicast_reply_locator_list_;
00126     DCPS::LocatorSeq multicast_reply_locator_list_;
00127     bool have_timestamp_;
00128     RTPS::Time_t timestamp_;
00129   };
00130 
00131   MessageReceiver receiver_;
00132   ACE_INET_Addr remote_address_;
00133 
00134 #if defined(OPENDDS_SECURITY)
00135   RTPS::SecuritySubmessage secure_prefix_;
00136   OPENDDS_VECTOR(RTPS::Submessage) secure_submessages_;
00137   ReceivedDataSample secure_sample_;
00138 #endif
00139 };
00140 
00141 } // namespace DCPS
00142 } // namespace OpenDDS
00143 
00144 OPENDDS_END_VERSIONED_NAMESPACE_DECL
00145 
00146 #endif  /* DCPS_RTPSUDPRECEIVESTRATEGY_H */
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1