OpenDDS  Snapshot(2023/04/07-19:43)
Public Member Functions | Protected Member Functions | Private Attributes | List of all members
OpenDDS::DCPS::TcpReceiveStrategy Class Reference

#include <TcpReceiveStrategy.h>

Inheritance diagram for OpenDDS::DCPS::TcpReceiveStrategy:
Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::TcpReceiveStrategy:
Collaboration graph
[legend]

Public Member Functions

 TcpReceiveStrategy (TcpDataLink &link, const ReactorTask_rch &task)
 
virtual ~TcpReceiveStrategy ()
 
int reset (TcpConnection *old_connection, TcpConnection *new_connection)
 
ACE_Reactorget_reactor ()
 
bool gracefully_disconnected ()
 
- Public Member Functions inherited from OpenDDS::DCPS::TransportReceiveStrategy<>
virtual ~TransportReceiveStrategy ()
 
int start ()
 
void stop ()
 
int handle_dds_input (ACE_HANDLE fd)
 
const TransportHeaderreceived_header () const
 
TransportHeaderreceived_header ()
 
const DataSampleHeaderreceived_sample_header () const
 
DataSampleHeaderreceived_sample_header ()
 
ACE_Message_Blockto_msgblock (const ReceivedDataSample &sample)
 
- Public Member Functions inherited from OpenDDS::DCPS::TransportStrategy
virtual ~TransportStrategy ()
 
- Public Member Functions inherited from OpenDDS::DCPS::RcObject
virtual ~RcObject ()
 
virtual void _add_ref ()
 
virtual void _remove_ref ()
 
long ref_count () const
 
WeakObject_get_weak_object () const
 
- Public Member Functions inherited from OpenDDS::DCPS::RcEventHandler
 RcEventHandler ()
 
ACE_Event_Handler::Reference_Count add_reference ()
 
ACE_Event_Handler::Reference_Count remove_reference ()
 
- Public Member Functions inherited from ACE_Event_Handler
virtual ~ACE_Event_Handler (void)
 
virtual ACE_HANDLE get_handle (void) const
 
virtual void set_handle (ACE_HANDLE)
 
virtual int priority (void) const
 
virtual void priority (int priority)
 
virtual int handle_input (ACE_HANDLE fd=ACE_INVALID_HANDLE)
 
virtual int handle_output (ACE_HANDLE fd=ACE_INVALID_HANDLE)
 
virtual int handle_exception (ACE_HANDLE fd=ACE_INVALID_HANDLE)
 
virtual int handle_timeout (const ACE_Time_Value &current_time, const void *act=0)
 
virtual int handle_exit (ACE_Process *)
 
virtual int handle_close (ACE_HANDLE handle, ACE_Reactor_Mask close_mask)
 
virtual int handle_signal (int signum, siginfo_t *=0, ucontext_t *=0)
 
virtual int resume_handler (void)
 
virtual int handle_qos (ACE_HANDLE=ACE_INVALID_HANDLE)
 
virtual int handle_group_qos (ACE_HANDLE=ACE_INVALID_HANDLE)
 
virtual void reactor (ACE_Reactor *reactor)
 
virtual ACE_Reactorreactor (void) const
 
virtual ACE_Reactor_Timer_Interfacereactor_timer_interface (void) const
 
Reference_Counting_Policyreference_counting_policy (void)
 

Protected Member Functions

virtual ssize_t receive_bytes (iovec iov[], int n, ACE_INET_Addr &remote_address, ACE_HANDLE fd, bool &stop)
 Only our subclass knows how to do this. More...
 
virtual void deliver_sample (ReceivedDataSample &sample, const ACE_INET_Addr &remote_address)
 Called when there is a ReceivedDataSample to be delivered. More...
 
virtual int start_i ()
 Let the subclass start. More...
 
virtual void stop_i ()
 Let the subclass stop. More...
 
virtual void relink (bool do_suspend=true)
 
- Protected Member Functions inherited from OpenDDS::DCPS::TransportReceiveStrategy<>
 TransportReceiveStrategy (const TransportInst_rch &config, size_t receive_buffers_count=RECEIVE_BUFFERS)
 
virtual bool check_header (const TransportHeader &header)
 Check the transport header for suitability. More...
 
virtual bool check_header (const DataSampleHeader &header)
 Check the data sample header for suitability. More...
 
virtual void begin_transport_header_processing ()
 Begin Current Transport Header Processing. More...
 
virtual void end_transport_header_processing ()
 End Current Transport Header Processing. More...
 
virtual void finish_message ()
 
int skip_bad_pdus ()
 Ignore bad PDUs by skipping over them. More...
 
void reset ()
 
size_t pdu_remaining () const
 
size_t successor_index (size_t index) const
 Manage an index into the receive buffer array. More...
 
void update_buffer_index (bool &done)
 
virtual bool reassemble (ReceivedDataSample &data)
 
 OPENDDS_VECTOR (ACE_Message_Block *) receive_buffers_
 Set of receive buffers in use. More...
 
- Protected Member Functions inherited from OpenDDS::DCPS::RcObject
 RcObject ()
 
- Protected Member Functions inherited from ACE_Event_Handler
 ACE_Event_Handler (ACE_Reactor *=0, int priority=ACE_Event_Handler::LO_PRIORITY)
 

Private Attributes

TcpDataLinklink_
 
ReactorTask_rch reactor_task_
 

Additional Inherited Members

- Public Types inherited from ACE_Event_Handler
typedef long Reference_Count
 
- Static Public Member Functions inherited from ACE_Event_Handler
static ACE_THR_FUNC_RETURN read_adapter (void *event_handler)
 
static int register_stdin_handler (ACE_Event_Handler *eh, ACE_Reactor *reactor, ACE_Thread_Manager *thr_mgr, int flags=THR_DETACHED)
 
static int remove_stdin_handler (ACE_Reactor *reactor, ACE_Thread_Manager *thr_mgr)
 
- Public Attributes inherited from ACE_Event_Handler
 LO_PRIORITY
 
 HI_PRIORITY
 
 NULL_MASK
 
 READ_MASK
 
 WRITE_MASK
 
 EXCEPT_MASK
 
 ACCEPT_MASK
 
 CONNECT_MASK
 
 TIMER_MASK
 
 QOS_MASK
 
 GROUP_QOS_MASK
 
 SIGNAL_MASK
 
 ALL_EVENTS_MASK
 
 RWE_MASK
 
 DONT_CALL
 
 ACE_EVENT_HANDLER_NOT_RESUMED
 
 ACE_REACTOR_RESUMES_HANDLER
 
 ACE_APPLICATION_RESUMES_HANDLER
 
- Static Public Attributes inherited from OpenDDS::DCPS::TransportReceiveConstants
static const size_t RECEIVE_BUFFERS = DEFAULT_TRANSPORT_RECEIVE_BUFFERS
 
static const size_t BUFFER_LOW_WATER = 4096
 
static const size_t MESSAGE_BLOCKS = 1000
 
static const size_t DATA_BLOCKS = 100
 
- Protected Types inherited from ACE_Event_Handler
typedef ACE_Atomic_Op< ACE_SYNCH_MUTEX, Reference_CountAtomic_Reference_Count
 
- Protected Attributes inherited from OpenDDS::DCPS::TransportReceiveStrategy<>
bool gracefully_disconnected_
 Flag indicates if the GRACEFUL_DISCONNECT message is received. More...
 
size_t receive_sample_remaining_
 Bytes remaining in the current DataSample. More...
 
TransportHeader receive_transport_header_
 Current receive TransportHeader. More...
 
TransportMessageBlockAllocator mb_allocator_
 
TransportDataBlockAllocator db_allocator_
 
TransportDataAllocator data_allocator_
 
ACE_Lock_Adapter< ACE_SYNCH_MUTEXreceive_lock_
 Locking strategy for the allocators. More...
 
size_t buffer_index_
 Current receive buffer index in use. More...
 
DataSampleHeader data_sample_header_
 Current data sample header. More...
 
ACE_Message_Blockpayload_
 
bool good_pdu_
 
size_t pdu_remaining_
 Amount of the current PDU that has not been processed yet. More...
 
- Protected Attributes inherited from ACE_Event_Handler
Atomic_Reference_Count reference_count_
 

Detailed Description

Definition at line 24 of file TcpReceiveStrategy.h.

Constructor & Destructor Documentation

◆ TcpReceiveStrategy()

OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL OpenDDS::DCPS::TcpReceiveStrategy::TcpReceiveStrategy ( TcpDataLink link,
const ReactorTask_rch task 
)

Definition at line 23 of file TcpReceiveStrategy.cpp.

References DBG_ENTRY_LVL.

25  : TransportReceiveStrategy<>(link.impl()->config())
26  , link_(link)
27  , reactor_task_(task)
28 {
29  DBG_ENTRY_LVL("TcpReceiveStrategy","TcpReceiveStrategy",6);
30 }
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68

◆ ~TcpReceiveStrategy()

OpenDDS::DCPS::TcpReceiveStrategy::~TcpReceiveStrategy ( )
virtual

Definition at line 32 of file TcpReceiveStrategy.cpp.

References DBG_ENTRY_LVL.

33 {
34  DBG_ENTRY_LVL("TcpReceiveStrategy","~TcpReceiveStrategy",6);
35 }
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68

Member Function Documentation

◆ deliver_sample()

void OpenDDS::DCPS::TcpReceiveStrategy::deliver_sample ( ReceivedDataSample sample,
const ACE_INET_Addr remote_address 
)
protectedvirtual

Called when there is a ReceivedDataSample to be delivered.

Implements OpenDDS::DCPS::TransportReceiveStrategy<>.

Definition at line 58 of file TcpReceiveStrategy.cpp.

References OpenDDS::DCPS::TcpDataLink::ack_received(), OpenDDS::DCPS::DataLink::data_received(), DBG_ENTRY_LVL, OpenDDS::DCPS::GRACEFUL_DISCONNECT, OpenDDS::DCPS::TransportReceiveStrategy<>::gracefully_disconnected_, OpenDDS::DCPS::ReceivedDataSample::header_, link_, LM_DEBUG, OpenDDS::DCPS::DataSampleHeader::message_id_, OpenDDS::DCPS::REQUEST_ACK, OpenDDS::DCPS::TcpDataLink::request_ack_received(), OpenDDS::DCPS::SAMPLE_ACK, and VDBG.

Referenced by receive_bytes().

59 {
60  DBG_ENTRY_LVL("TcpReceiveStrategy","deliver_sample",6);
61 
62  if (sample.header_.message_id_ == GRACEFUL_DISCONNECT) {
63  VDBG((LM_DEBUG, "(%P|%t) DBG: received GRACEFUL_DISCONNECT\n"));
64  this->gracefully_disconnected_ = true;
65  }
66  else if (sample.header_.message_id_ == REQUEST_ACK) {
67  VDBG((LM_DEBUG, "(%P|%t) DBG: received REQUEST_ACK\n"));
69  }
70  else if (sample.header_.message_id_ == SAMPLE_ACK) {
71  VDBG((LM_DEBUG, "(%P|%t) DBG: received SAMPLE_ACK\n"));
72  link_.ack_received(sample);
73  }
74  else {
75  link_.data_received(sample);
76  }
77 }
void request_ack_received(const ReceivedDataSample &sample)
bool gracefully_disconnected_
Flag indicates if the GRACEFUL_DISCONNECT message is received.
#define VDBG(DBG_ARGS)
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
void ack_received(const ReceivedDataSample &sample)
int data_received(ReceivedDataSample &sample, const GUID_t &readerId=GUID_UNKNOWN)
Definition: DataLink.cpp:690

◆ get_reactor()

OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL ACE_INLINE ACE_Reactor * OpenDDS::DCPS::TcpReceiveStrategy::get_reactor ( void  )

Definition at line 14 of file TcpReceiveStrategy.inl.

References ACE_INLINE, DBG_ENTRY_LVL, OpenDDS::DCPS::ReactorTask::get_reactor(), and reactor_task_.

15 {
16  DBG_ENTRY_LVL("TcpReceiveStrategy","get_reactor",6);
17  return this->reactor_task_->get_reactor();
18 }
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
ACE_Reactor * get_reactor()
Definition: ReactorTask.inl:14

◆ gracefully_disconnected()

ACE_INLINE bool OpenDDS::DCPS::TcpReceiveStrategy::gracefully_disconnected ( )

Definition at line 21 of file TcpReceiveStrategy.inl.

References OpenDDS::DCPS::TransportReceiveStrategy<>::gracefully_disconnected_, and OPENDDS_END_VERSIONED_NAMESPACE_DECL.

22 {
23  return this->gracefully_disconnected_;
24 }
bool gracefully_disconnected_
Flag indicates if the GRACEFUL_DISCONNECT message is received.

◆ receive_bytes()

ssize_t OpenDDS::DCPS::TcpReceiveStrategy::receive_bytes ( iovec  iov[],
int  n,
ACE_INET_Addr remote_address,
ACE_HANDLE  fd,
bool &  stop 
)
protectedvirtual

Only our subclass knows how to do this.

Implements OpenDDS::DCPS::TransportReceiveStrategy<>.

Definition at line 38 of file TcpReceiveStrategy.cpp.

References DBG_ENTRY_LVL, deliver_sample(), OpenDDS::DCPS::TcpDataLink::get_connection(), and link_.

44 {
45  DBG_ENTRY_LVL("TcpReceiveStrategy", "receive_bytes", 6);
46 
47  // We don't do anything to the remote_address for the Tcp case.
48  TcpConnection_rch connection = link_.get_connection();
49  if (!connection) {
50  return 0;
51  }
52 
53  return connection->peer().recvv(iov, n);
54 }
RcHandle< TcpConnection > TcpConnection_rch
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
TcpConnection_rch get_connection()
Definition: TcpDataLink.inl:22

◆ relink()

void OpenDDS::DCPS::TcpReceiveStrategy::relink ( bool  do_suspend = true)
protectedvirtual

The subclass needs to provide the implementation for re-establishing the datalink. This is called when recv returns an error.

Reimplemented from OpenDDS::DCPS::TransportReceiveStrategy<>.

Definition at line 151 of file TcpReceiveStrategy.cpp.

References DBG_ENTRY_LVL, OpenDDS::DCPS::TcpDataLink::get_connection(), link_, and OPENDDS_END_VERSIONED_NAMESPACE_DECL.

152 {
153  DBG_ENTRY_LVL("TcpReceiveStrategy","relink",6);
154  TcpConnection_rch connection = link_.get_connection();
155  if (connection)
156  connection->relink_from_recv(do_suspend);
157 }
RcHandle< TcpConnection > TcpConnection_rch
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
TcpConnection_rch get_connection()
Definition: TcpDataLink.inl:22

◆ reset()

int OpenDDS::DCPS::TcpReceiveStrategy::reset ( TcpConnection old_connection,
TcpConnection new_connection 
)

Definition at line 114 of file TcpReceiveStrategy.cpp.

References ACE_ERROR_RETURN, DBG_ENTRY_LVL, ACE_Event_Handler::DONT_CALL, OpenDDS::DCPS::TcpDataLink::drop_pending_request_acks(), OpenDDS::DCPS::ReactorTask::get_reactor(), link_, LM_ERROR, reactor_task_, ACE_Event_Handler::READ_MASK, ACE_Reactor::register_handler(), and ACE_Reactor::remove_handler().

Referenced by OpenDDS::DCPS::TcpDataLink::reuse_existing_connection().

115 {
116  DBG_ENTRY_LVL("TcpReceiveStrategy","reset",6);
117  // Unregister the old handle
118  if (old_connection) {
120  (old_connection,
123  }
124 
126 
127  // Give the reactor its own "copy" of the reference to the connection object.
128 
130  (new_connection,
132  // Take back the "copy" we made.
133  ACE_ERROR_RETURN((LM_ERROR,
134  "(%P|%t) ERROR: TcpReceiveStrategy::reset TcpConnection can't register with "
135  "reactor\n"),
136  -1);
137  }
138 
139  return 0;
140 }
int register_handler(ACE_Event_Handler *event_handler, ACE_Reactor_Mask mask)
int remove_handler(ACE_HANDLE handle, ACE_Reactor_Mask masks)
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
#define ACE_ERROR_RETURN(X, Y)
ACE_Reactor * get_reactor()
Definition: ReactorTask.inl:14

◆ start_i()

int OpenDDS::DCPS::TcpReceiveStrategy::start_i ( )
protectedvirtual

Let the subclass start.

Implements OpenDDS::DCPS::TransportReceiveStrategy<>.

Definition at line 80 of file TcpReceiveStrategy.cpp.

References ACE_DEBUG, ACE_ERROR_RETURN, ACE_TEXT(), DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::TcpDataLink::get_connection(), OpenDDS::DCPS::ReactorTask::get_reactor(), OpenDDS::DCPS::RcHandle< T >::in(), link_, LM_DEBUG, LM_ERROR, reactor_task_, ACE_Event_Handler::READ_MASK, and ACE_Reactor::register_handler().

81 {
82  DBG_ENTRY_LVL("TcpReceiveStrategy","start_i",6);
83 
84  TcpConnection_rch connection = link_.get_connection();
85 
86  if (DCPS_debug_level > 9) {
87  std::stringstream buffer;
88  buffer << link_;
89  ACE_DEBUG((LM_DEBUG,
90  ACE_TEXT("(%P|%t) TcpReceiveStrategy::start_i() - ")
91  ACE_TEXT("link:\n%C connected to %C ")
92  ACE_TEXT("registering with reactor to receive.\n"),
93  buffer.str().c_str(),
94  LogAddr(connection->get_remote_address()).c_str()));
95  }
96 
98  (connection.in(),
100  // Take back the "copy" we made.
101  ACE_ERROR_RETURN((LM_ERROR,
102  "(%P|%t) ERROR: TcpReceiveStrategy::start_i TcpConnection can't register with "
103  "reactor %@ %p\n", connection.in(), ACE_TEXT("register_handler")),
104  -1);
105  }
106 
107  return 0;
108 }
#define ACE_DEBUG(X)
int register_handler(ACE_Event_Handler *event_handler, ACE_Reactor_Mask mask)
RcHandle< TcpConnection > TcpConnection_rch
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_TEXT("TCP_Factory")
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
#define ACE_ERROR_RETURN(X, Y)
TcpConnection_rch get_connection()
Definition: TcpDataLink.inl:22
ACE_Reactor * get_reactor()
Definition: ReactorTask.inl:14

◆ stop_i()

void OpenDDS::DCPS::TcpReceiveStrategy::stop_i ( )
protectedvirtual

Let the subclass stop.

Implements OpenDDS::DCPS::TransportReceiveStrategy<>.

Definition at line 143 of file TcpReceiveStrategy.cpp.

References DBG_ENTRY_LVL, OpenDDS::DCPS::TcpDataLink::drop_pending_request_acks(), and link_.

144 {
145  DBG_ENTRY_LVL("TcpReceiveStrategy","stop_i",6);
146 
148 }
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68

Member Data Documentation

◆ link_

TcpDataLink& OpenDDS::DCPS::TcpReceiveStrategy::link_
private

Definition at line 61 of file TcpReceiveStrategy.h.

Referenced by deliver_sample(), receive_bytes(), relink(), reset(), start_i(), and stop_i().

◆ reactor_task_

ReactorTask_rch OpenDDS::DCPS::TcpReceiveStrategy::reactor_task_
private

Definition at line 62 of file TcpReceiveStrategy.h.

Referenced by get_reactor(), reset(), and start_i().


The documentation for this class was generated from the following files: