OpenDDS  Snapshot(2023/04/07-19:43)
Public Member Functions | Protected Member Functions | Private Types | Private Member Functions | Private Attributes | List of all members
OpenDDS::DCPS::UdpReceiveStrategy Class Reference

#include <UdpReceiveStrategy.h>

Inheritance diagram for OpenDDS::DCPS::UdpReceiveStrategy:
Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::UdpReceiveStrategy:
Collaboration graph
[legend]

Public Member Functions

 UdpReceiveStrategy (UdpDataLink *link)
 
virtual ACE_HANDLE get_handle () const
 
virtual int handle_input (ACE_HANDLE fd)
 
- Public Member Functions inherited from OpenDDS::DCPS::TransportReceiveStrategy<>
virtual ~TransportReceiveStrategy ()
 
int start ()
 
void stop ()
 
int handle_dds_input (ACE_HANDLE fd)
 
virtual void relink (bool do_suspend=true)
 
const TransportHeaderreceived_header () const
 
TransportHeaderreceived_header ()
 
const DataSampleHeaderreceived_sample_header () const
 
DataSampleHeaderreceived_sample_header ()
 
ACE_Message_Blockto_msgblock (const ReceivedDataSample &sample)
 
- Public Member Functions inherited from OpenDDS::DCPS::TransportStrategy
virtual ~TransportStrategy ()
 
- Public Member Functions inherited from OpenDDS::DCPS::RcObject
virtual ~RcObject ()
 
virtual void _add_ref ()
 
virtual void _remove_ref ()
 
long ref_count () const
 
WeakObject_get_weak_object () const
 
- Public Member Functions inherited from OpenDDS::DCPS::RcEventHandler
 RcEventHandler ()
 
ACE_Event_Handler::Reference_Count add_reference ()
 
ACE_Event_Handler::Reference_Count remove_reference ()
 
- Public Member Functions inherited from ACE_Event_Handler
virtual ~ACE_Event_Handler (void)
 
virtual void set_handle (ACE_HANDLE)
 
virtual int priority (void) const
 
virtual void priority (int priority)
 
virtual int handle_output (ACE_HANDLE fd=ACE_INVALID_HANDLE)
 
virtual int handle_exception (ACE_HANDLE fd=ACE_INVALID_HANDLE)
 
virtual int handle_timeout (const ACE_Time_Value &current_time, const void *act=0)
 
virtual int handle_exit (ACE_Process *)
 
virtual int handle_close (ACE_HANDLE handle, ACE_Reactor_Mask close_mask)
 
virtual int handle_signal (int signum, siginfo_t *=0, ucontext_t *=0)
 
virtual int resume_handler (void)
 
virtual int handle_qos (ACE_HANDLE=ACE_INVALID_HANDLE)
 
virtual int handle_group_qos (ACE_HANDLE=ACE_INVALID_HANDLE)
 
virtual void reactor (ACE_Reactor *reactor)
 
virtual ACE_Reactorreactor (void) const
 
virtual ACE_Reactor_Timer_Interfacereactor_timer_interface (void) const
 
Reference_Counting_Policyreference_counting_policy (void)
 

Protected Member Functions

virtual ssize_t receive_bytes (iovec iov[], int n, ACE_INET_Addr &remote_address, ACE_HANDLE fd, bool &stop)
 Only our subclass knows how to do this. More...
 
virtual void deliver_sample (ReceivedDataSample &sample, const ACE_INET_Addr &remote_address)
 Called when there is a ReceivedDataSample to be delivered. More...
 
virtual int start_i ()
 Let the subclass start. More...
 
virtual void stop_i ()
 Let the subclass stop. More...
 
virtual bool check_header (const TransportHeader &header)
 Check the transport header for suitability. More...
 
virtual bool check_header (const DataSampleHeader &header)
 Check the data sample header for suitability. More...
 
- Protected Member Functions inherited from OpenDDS::DCPS::TransportReceiveStrategy<>
 TransportReceiveStrategy (const TransportInst_rch &config, size_t receive_buffers_count=RECEIVE_BUFFERS)
 
virtual void begin_transport_header_processing ()
 Begin Current Transport Header Processing. More...
 
virtual void end_transport_header_processing ()
 End Current Transport Header Processing. More...
 
virtual void finish_message ()
 
int skip_bad_pdus ()
 Ignore bad PDUs by skipping over them. More...
 
void reset ()
 
size_t pdu_remaining () const
 
size_t successor_index (size_t index) const
 Manage an index into the receive buffer array. More...
 
void update_buffer_index (bool &done)
 
 OPENDDS_VECTOR (ACE_Message_Block *) receive_buffers_
 Set of receive buffers in use. More...
 
- Protected Member Functions inherited from OpenDDS::DCPS::RcObject
 RcObject ()
 
- Protected Member Functions inherited from ACE_Event_Handler
 ACE_Event_Handler (ACE_Reactor *=0, int priority=ACE_Event_Handler::LO_PRIORITY)
 

Private Types

typedef std::pair< TransportReassembly_rch, SequenceNumberReassemblyInfo
 

Private Member Functions

virtual bool reassemble (ReceivedDataSample &data)
 
typedef OPENDDS_MAP (ACE_INET_Addr, ReassemblyInfo) ReassemblyMap
 

Private Attributes

UdpDataLinklink_
 
SequenceNumber expected_
 
ACE_INET_Addr remote_address_
 
ReassemblyMap reassembly_
 

Additional Inherited Members

- Public Types inherited from ACE_Event_Handler
typedef long Reference_Count
 
- Static Public Member Functions inherited from ACE_Event_Handler
static ACE_THR_FUNC_RETURN read_adapter (void *event_handler)
 
static int register_stdin_handler (ACE_Event_Handler *eh, ACE_Reactor *reactor, ACE_Thread_Manager *thr_mgr, int flags=THR_DETACHED)
 
static int remove_stdin_handler (ACE_Reactor *reactor, ACE_Thread_Manager *thr_mgr)
 
- Public Attributes inherited from ACE_Event_Handler
 LO_PRIORITY
 
 HI_PRIORITY
 
 NULL_MASK
 
 READ_MASK
 
 WRITE_MASK
 
 EXCEPT_MASK
 
 ACCEPT_MASK
 
 CONNECT_MASK
 
 TIMER_MASK
 
 QOS_MASK
 
 GROUP_QOS_MASK
 
 SIGNAL_MASK
 
 ALL_EVENTS_MASK
 
 RWE_MASK
 
 DONT_CALL
 
 ACE_EVENT_HANDLER_NOT_RESUMED
 
 ACE_REACTOR_RESUMES_HANDLER
 
 ACE_APPLICATION_RESUMES_HANDLER
 
- Static Public Attributes inherited from OpenDDS::DCPS::TransportReceiveConstants
static const size_t RECEIVE_BUFFERS = DEFAULT_TRANSPORT_RECEIVE_BUFFERS
 
static const size_t BUFFER_LOW_WATER = 4096
 
static const size_t MESSAGE_BLOCKS = 1000
 
static const size_t DATA_BLOCKS = 100
 
- Protected Types inherited from ACE_Event_Handler
typedef ACE_Atomic_Op< ACE_SYNCH_MUTEX, Reference_CountAtomic_Reference_Count
 
- Protected Attributes inherited from OpenDDS::DCPS::TransportReceiveStrategy<>
bool gracefully_disconnected_
 Flag indicates if the GRACEFUL_DISCONNECT message is received. More...
 
size_t receive_sample_remaining_
 Bytes remaining in the current DataSample. More...
 
TransportHeader receive_transport_header_
 Current receive TransportHeader. More...
 
TransportMessageBlockAllocator mb_allocator_
 
TransportDataBlockAllocator db_allocator_
 
TransportDataAllocator data_allocator_
 
ACE_Lock_Adapter< ACE_SYNCH_MUTEXreceive_lock_
 Locking strategy for the allocators. More...
 
size_t buffer_index_
 Current receive buffer index in use. More...
 
DataSampleHeader data_sample_header_
 Current data sample header. More...
 
ACE_Message_Blockpayload_
 
bool good_pdu_
 
size_t pdu_remaining_
 Amount of the current PDU that has not been processed yet. More...
 
- Protected Attributes inherited from ACE_Event_Handler
Atomic_Reference_Count reference_count_
 

Detailed Description

Definition at line 25 of file UdpReceiveStrategy.h.

Member Typedef Documentation

◆ ReassemblyInfo

Definition at line 63 of file UdpReceiveStrategy.h.

Constructor & Destructor Documentation

◆ UdpReceiveStrategy()

OpenDDS::DCPS::UdpReceiveStrategy::UdpReceiveStrategy ( UdpDataLink link)
explicit

Member Function Documentation

◆ check_header() [1/2]

bool OpenDDS::DCPS::UdpReceiveStrategy::check_header ( const TransportHeader header)
protectedvirtual

Check the transport header for suitability.

Reimplemented from OpenDDS::DCPS::TransportReceiveStrategy<>.

Definition at line 116 of file UdpReceiveStrategy.cpp.

References ACE_TEXT(), expected_, OpenDDS::DCPS::SequenceNumber::getValue(), LM_WARNING, OpenDDS::DCPS::SequenceNumber::previous(), reassembly_, remote_address_, OpenDDS::DCPS::TransportHeader::sequence_, OpenDDS::DCPS::SequenceNumber::SEQUENCENUMBER_UNKNOWN(), and VDBG_LVL.

117 {
119  if (!info.first) {
120  info.first = make_rch<TransportReassembly>();
121  }
122 
123  if (header.sequence_ != info.second &&
125  VDBG_LVL((LM_WARNING,
126  ACE_TEXT("(%P|%t) WARNING: UdpReceiveStrategy::check_header ")
127  ACE_TEXT("expected %q received %q\n"),
128  info.second.getValue(), header.sequence_.getValue()), 2);
129  FragmentRange range(info.second.getValue(), header.sequence_.previous().getValue());
130  info.first->data_unavailable(range);
131  }
132 
133  info.second = header.sequence_;
134  ++info.second;
135  return true;
136 }
std::pair< TransportReassembly_rch, SequenceNumber > ReassemblyInfo
Christopher Diggins *renamed files *fixing compilation errors *adding Visual C project file *removed make Max Lybbert *removed references to missing and unused header
Definition: CHANGELOG.txt:8
std::pair< FragmentNumber, FragmentNumber > FragmentRange
ACE_TEXT("TCP_Factory")
#define VDBG_LVL(DBG_ARGS, LEVEL)
static SequenceNumber SEQUENCENUMBER_UNKNOWN()

◆ check_header() [2/2]

virtual bool OpenDDS::DCPS::UdpReceiveStrategy::check_header ( const DataSampleHeader header)
inlineprotectedvirtual

Check the data sample header for suitability.

Reimplemented from OpenDDS::DCPS::TransportReceiveStrategy<>.

Definition at line 50 of file UdpReceiveStrategy.h.

References OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::check_header().

51  {
53  }
virtual bool check_header(const TH &header)
Check the transport header for suitability.
Christopher Diggins *renamed files *fixing compilation errors *adding Visual C project file *removed make Max Lybbert *removed references to missing and unused header
Definition: CHANGELOG.txt:8

◆ deliver_sample()

void OpenDDS::DCPS::UdpReceiveStrategy::deliver_sample ( ReceivedDataSample sample,
const ACE_INET_Addr remote_address 
)
protectedvirtual

Called when there is a ReceivedDataSample to be delivered.

Implements OpenDDS::DCPS::TransportReceiveStrategy<>.

Definition at line 56 of file UdpReceiveStrategy.cpp.

References OpenDDS::DCPS::UdpDataLink::control_received(), OpenDDS::DCPS::DataLink::data_received(), OpenDDS::DCPS::ReceivedDataSample::header_, link_, OpenDDS::DCPS::DataSampleHeader::message_id_, and OpenDDS::DCPS::TRANSPORT_CONTROL.

58 {
59  switch (sample.header_.message_id_) {
60 
61  case TRANSPORT_CONTROL:
62  this->link_->control_received(sample, remote_address);
63  break;
64 
65  default:
66  this->link_->data_received(sample);
67  break;
68  }
69 }
void control_received(ReceivedDataSample &sample, const ACE_INET_Addr &remote_address)
int data_received(ReceivedDataSample &sample, const GUID_t &readerId=GUID_UNKNOWN)
Definition: DataLink.cpp:690

◆ get_handle()

ACE_HANDLE OpenDDS::DCPS::UdpReceiveStrategy::get_handle ( void  ) const
virtual

Reimplemented from ACE_Event_Handler.

Definition at line 29 of file UdpReceiveStrategy.cpp.

References ACE_IPC_SAP::get_handle(), link_, socket(), and OpenDDS::DCPS::UdpDataLink::socket().

30 {
31  ACE_SOCK_Dgram& socket = this->link_->socket();
32  return socket.get_handle();
33 }
ACE_SOCK_Dgram & socket()
Definition: UdpDataLink.inl:40
ACE_HANDLE socket(int protocol_family, int type, int proto)
ACE_HANDLE get_handle(void) const

◆ handle_input()

int OpenDDS::DCPS::UdpReceiveStrategy::handle_input ( ACE_HANDLE  fd)
virtual

Reimplemented from ACE_Event_Handler.

Definition at line 36 of file UdpReceiveStrategy.cpp.

References OpenDDS::DCPS::TransportReceiveStrategy<>::handle_dds_input(), and TheServiceParticipant.

37 {
38  ThreadStatusManager::Event ev(TheServiceParticipant->get_thread_status_manager());
39 
40  return this->handle_dds_input(fd);
41 }
#define TheServiceParticipant

◆ OPENDDS_MAP()

typedef OpenDDS::DCPS::UdpReceiveStrategy::OPENDDS_MAP ( ACE_INET_Addr  ,
ReassemblyInfo   
)
private

◆ reassemble()

bool OpenDDS::DCPS::UdpReceiveStrategy::reassemble ( ReceivedDataSample data)
privatevirtual

Reimplemented from OpenDDS::DCPS::TransportReceiveStrategy<>.

Definition at line 139 of file UdpReceiveStrategy.cpp.

References header, OPENDDS_END_VERSIONED_NAMESPACE_DECL, reassembly_, OpenDDS::DCPS::TransportReceiveStrategy<>::received_header(), and remote_address_.

140 {
142  if (!info.first) {
143  info.first = make_rch<TransportReassembly>();
144  }
145  const TransportHeader& header = received_header();
146  return info.first->reassemble(header.sequence_, header.first_fragment_, data);
147 }
std::pair< TransportReassembly_rch, SequenceNumber > ReassemblyInfo
Christopher Diggins *renamed files *fixing compilation errors *adding Visual C project file *removed make Max Lybbert *removed references to missing and unused header
Definition: CHANGELOG.txt:8

◆ receive_bytes()

ssize_t OpenDDS::DCPS::UdpReceiveStrategy::receive_bytes ( iovec  iov[],
int  n,
ACE_INET_Addr remote_address,
ACE_HANDLE  fd,
bool &  stop 
)
protectedvirtual

Only our subclass knows how to do this.

Implements OpenDDS::DCPS::TransportReceiveStrategy<>.

Definition at line 44 of file UdpReceiveStrategy.cpp.

References link_, ACE_SOCK_Dgram::recv(), remote_address_, and OpenDDS::DCPS::UdpDataLink::socket().

49 {
50  const ssize_t ret = this->link_->socket().recv(iov, n, remote_address);
51  remote_address_ = remote_address;
52  return ret;
53 }
ACE_SOCK_Dgram & socket()
Definition: UdpDataLink.inl:40
ssize_t recv(void *buf, size_t n, ACE_Addr &addr, int flags=0) const
int ssize_t

◆ start_i()

int OpenDDS::DCPS::UdpReceiveStrategy::start_i ( )
protectedvirtual

Let the subclass start.

Implements OpenDDS::DCPS::TransportReceiveStrategy<>.

Definition at line 72 of file UdpReceiveStrategy.cpp.

References ACE_DEBUG, ACE_ERROR_RETURN, ACE_TEXT(), ACE_SOCK::get_local_addr(), OpenDDS::DCPS::UdpDataLink::get_reactor(), link_, LM_DEBUG, LM_ERROR, ACE_Event_Handler::reactor(), ACE_Event_Handler::READ_MASK, ACE_Reactor::register_handler(), OpenDDS::DCPS::UdpDataLink::socket(), and OpenDDS::DCPS::Transport_debug_level.

73 {
75  if (reactor == 0) {
76  ACE_ERROR_RETURN((LM_ERROR,
77  ACE_TEXT("(%P|%t) ERROR: ")
78  ACE_TEXT("UdpReceiveStrategy::start_i: ")
79  ACE_TEXT("NULL reactor reference!\n")),
80  -1);
81  }
82 
83  if (reactor->register_handler(this, ACE_Event_Handler::READ_MASK) != 0) {
84  ACE_ERROR_RETURN((LM_ERROR,
85  ACE_TEXT("(%P|%t) ERROR: ")
86  ACE_TEXT("UdpReceiveStrategy::start_i: ")
87  ACE_TEXT("failed to register handler for DataLink!\n")),
88  -1);
89  }
90 
91  if (Transport_debug_level > 5) {
92  ACE_INET_Addr addr;
93  link_->socket().get_local_addr(addr);
94  ACE_DEBUG((LM_DEBUG, "(%P|%t) UdpReceiveStrategy::start_i: listening on %C\n",
95  LogAddr(addr).c_str()));
96  }
97  return 0;
98 }
#define ACE_DEBUG(X)
ACE_SOCK_Dgram & socket()
Definition: UdpDataLink.inl:40
OpenDDS_Dcps_Export unsigned int Transport_debug_level
Transport Logging verbosity level.
Definition: debug.cpp:25
int register_handler(ACE_Event_Handler *event_handler, ACE_Reactor_Mask mask)
int get_local_addr(ACE_Addr &) const
virtual ACE_Reactor * reactor(void) const
ACE_TEXT("TCP_Factory")
ACE_Reactor * get_reactor()
Definition: UdpDataLink.inl:27
#define ACE_ERROR_RETURN(X, Y)

◆ stop_i()

void OpenDDS::DCPS::UdpReceiveStrategy::stop_i ( )
protectedvirtual

Let the subclass stop.

Implements OpenDDS::DCPS::TransportReceiveStrategy<>.

Definition at line 101 of file UdpReceiveStrategy.cpp.

References ACE_ERROR, ACE_TEXT(), OpenDDS::DCPS::UdpDataLink::get_reactor(), link_, LM_ERROR, ACE_Event_Handler::reactor(), ACE_Event_Handler::READ_MASK, and ACE_Reactor::remove_handler().

102 {
103  ACE_Reactor* reactor = this->link_->get_reactor();
104  if (reactor == 0) {
105  ACE_ERROR((LM_ERROR,
106  ACE_TEXT("(%P|%t) ERROR: ")
107  ACE_TEXT("UdpReceiveStrategy::stop_i: ")
108  ACE_TEXT("NULL reactor reference!\n")));
109  return;
110  }
111 
113 }
#define ACE_ERROR(X)
int remove_handler(ACE_HANDLE handle, ACE_Reactor_Mask masks)
virtual ACE_Reactor * reactor(void) const
ACE_TEXT("TCP_Factory")
ACE_Reactor * get_reactor()
Definition: UdpDataLink.inl:27

Member Data Documentation

◆ expected_

SequenceNumber OpenDDS::DCPS::UdpReceiveStrategy::expected_
private

Definition at line 60 of file UdpReceiveStrategy.h.

Referenced by check_header().

◆ link_

UdpDataLink* OpenDDS::DCPS::UdpReceiveStrategy::link_
private

Definition at line 59 of file UdpReceiveStrategy.h.

Referenced by deliver_sample(), get_handle(), receive_bytes(), start_i(), and stop_i().

◆ reassembly_

ReassemblyMap OpenDDS::DCPS::UdpReceiveStrategy::reassembly_
private

Definition at line 65 of file UdpReceiveStrategy.h.

Referenced by check_header(), and reassemble().

◆ remote_address_

ACE_INET_Addr OpenDDS::DCPS::UdpReceiveStrategy::remote_address_
private

Definition at line 61 of file UdpReceiveStrategy.h.

Referenced by check_header(), reassemble(), and receive_bytes().


The documentation for this class was generated from the following files: