OpenDDS  Snapshot(2023/04/28-20:55)
RtpsUdpReceiveStrategy.h
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 #ifndef OPENDDS_DCPS_TRANSPORT_RTPS_UDP_RTPSUDPRECEIVESTRATEGY_H
9 #define OPENDDS_DCPS_TRANSPORT_RTPS_UDP_RTPSUDPRECEIVESTRATEGY_H
10 
11 #include "Rtps_Udp_Export.h"
12 #include "RtpsTransportHeader.h"
13 #include "RtpsSampleHeader.h"
14 
16 
17 #include "dds/DCPS/RTPS/RtpsCoreC.h"
18 #include "dds/DCPS/RTPS/ICE/Ice.h"
21 
22 #include "ace/SOCK_Dgram.h"
23 
24 #include <cstring>
25 
27 
28 namespace OpenDDS {
29 
30 namespace ICE {
31  class Endpoint;
32 }
33 
34 namespace DCPS {
35 
36 class RtpsUdpTransport;
37 class RtpsUdpDataLink;
38 class ReceivedDataSample;
39 
41  : public TransportReceiveStrategy<RtpsTransportHeader, RtpsSampleHeader>,
42  public virtual RcEventHandler
43 {
44 public:
45  static const size_t BUFFER_COUNT = 1u;
46 
48  const GuidPrefix_t& local_prefix,
49  ThreadStatusManager& thread_status_manager);
50 
51  virtual int handle_input(ACE_HANDLE fd);
52 
53  /// For each "1" bit in the bitmap, change it to a "0" if there are
54  /// fragments from publication "pub_id" for the sequence number represented
55  /// by that position in the bitmap.
56  /// Returns true if the bitmap was changed.
57  bool remove_frags_from_bitmap(CORBA::Long bitmap[], CORBA::ULong num_bits,
58  const SequenceNumber& base,
59  const GUID_t& pub_id, ACE_CDR::ULong& samples_requested);
60 
61  /// Remove any saved fragments. We do not expect to receive any more
62  /// fragments with sequence numbers in "range" from publication "pub_id".
63  void remove_fragments(const SequenceRange& range, const GUID_t& pub_id);
64 
65  typedef std::pair<SequenceNumber, RTPS::FragmentNumberSet> SeqFragPair;
66  typedef OPENDDS_VECTOR(SeqFragPair) FragmentInfo;
67 
68  void clear_completed_fragments(const GUID_t& pub_id);
69  bool has_fragments(const SequenceRange& range, const GUID_t& pub_id, FragmentInfo* frag_info = 0);
70 
71  /// Prevent delivery of the currently in-progress data sample to the
72  /// subscription sub_id. Returns pointer to the in-progress data so
73  /// it can be stored for later delivery.
74  const ReceivedDataSample* withhold_data_from(const GUID_t& sub_id);
75  void do_not_withhold_data_from(const GUID_t& sub_id);
76 
77  static ssize_t receive_bytes_helper(iovec iov[],
78  int n,
79  const ACE_SOCK_Dgram& socket,
80  ACE_INET_Addr& remote_address,
81 #ifdef OPENDDS_SECURITY
84 #endif
85  RtpsUdpTransport& tport,
86  bool& stop);
87 
88  virtual void begin_transport_header_processing();
89  virtual void end_transport_header_processing();
90 
91 private:
92  bool getDirectedWriteReaders(RepoIdSet& directedWriteReaders, const RTPS::DataSubmessage& ds) const;
93 
94  const ACE_SOCK_Dgram& choose_recv_socket(ACE_HANDLE fd) const;
95 
96  virtual ssize_t receive_bytes(iovec iov[],
97  int n,
98  ACE_INET_Addr& remote_address,
99  ACE_HANDLE fd,
100  bool& stop);
101 
102  virtual void deliver_sample(ReceivedDataSample& sample,
103  const ACE_INET_Addr& remote_address);
104 
105  void deliver_sample_i(ReceivedDataSample& sample,
106  const RTPS::Submessage& submessage,
107  const NetworkAddress& remote_addr);
108 
109  virtual int start_i();
110  virtual void stop_i();
111 
112  virtual bool check_header(const RtpsTransportHeader& header);
113 
114  virtual bool check_header(const RtpsSampleHeader& header);
115 
116  virtual bool reassemble(ReceivedDataSample& data);
117  virtual bool reassemble_i(ReceivedDataSample& data, RtpsSampleHeader& rsh);
118 
119 #ifdef OPENDDS_SECURITY
120  bool sec_submsg_to_octets(DDS::OctetSeq& encoded,
121  const RTPS::Submessage& postfix);
122 
123  void deliver_from_secure(const RTPS::Submessage& submessage,
124  const NetworkAddress& remote_addr);
125 
126  bool decode_payload(ReceivedDataSample& sample,
127  const RTPS::DataSubmessage& submessage);
128 #endif
129 
130  bool check_encoded(const EntityId_t& sender);
131 
133 
136 
138  RepoIdSet readers_withheld_, readers_selected_;
139 
140  ACE_UINT16 fragment_size_;
142  ACE_UINT32 total_frags_;
144 
146 
147  explicit MessageReceiver(const GuidPrefix_t& local);
148 
149  void reset(const ACE_INET_Addr& remote_address, const RTPS::Header& hdr);
150 
151  void submsg(const RTPS::Submessage& s);
152  void submsg(const RTPS::InfoDestinationSubmessage& id);
153  void submsg(const RTPS::InfoReplySubmessage& ir);
154  void submsg(const RTPS::InfoReplyIp4Submessage& iri4);
155  void submsg(const RTPS::InfoTimestampSubmessage& it);
156  void submsg(const RTPS::InfoSourceSubmessage& is);
157 
158  void fill_header(DataSampleHeader& header) const;
159 
165  bool directed_;
170  };
171 
176 
177 #ifdef OPENDDS_SECURITY
179  OPENDDS_VECTOR(RTPS::Submessage) secure_submessages_;
181  bool encoded_rtps_, encoded_submsg_;
182 #endif
183 };
184 
185 } // namespace DCPS
186 } // namespace OpenDDS
187 
189 
190 #endif /* DCPS_RTPSUDPRECEIVESTRATEGY_H */
ACE_CDR::Long Long
Adapt the TransportReceiveStrategy for RTPS&#39;s "transport" (message) Header.
TransportReceiveStrategy< RtpsTransportHeader, RtpsSampleHeader > BaseReceiveStrategy
GuidSet RepoIdSet
Definition: GuidUtils.h:113
sequence< Locator_t > LocatorSeq
int ssize_t
ACE_CDR::ULong ULong
std::pair< FragmentNumber, FragmentNumber > FragmentRange
Christopher Diggins *renamed files *fixing compilation errors *adding Visual C project file *removed make Max Lybbert *removed references to missing and unused header
Definition: CHANGELOG.txt:8
Holds a data sample received by the transport.
sequence< octet > OctetSeq
Definition: DdsDcpsCore.idl:64
ACE_UINT32 ULong
std::pair< SequenceNumber, RTPS::FragmentNumberSet > SeqFragPair
std::pair< SequenceNumber, SequenceNumber > SequenceRange
Sequence number abstraction. Only allows positive 64 bit values.
octet GuidPrefix_t[12]
Definition: DdsDcpsGuid.idl:19
Adapt the TransportReceiveStrategy for RTPS&#39;s "sample" (submessage) Header.
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
sequence<<%SCOPED%><%TYPE%><%SEQ%> local interface<%TYPE%> out string encoded
Definition: IDLTemplate.txt:4
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
typedef OPENDDS_VECTOR(ACE_INET_Addr) AddressListType
#define OpenDDS_Rtps_Udp_Export