#include <TcpReceiveStrategy.h>
Public Member Functions | |
TcpReceiveStrategy (TcpDataLink &link, const TransportReactorTask_rch &task) | |
virtual | ~TcpReceiveStrategy () |
int | reset (TcpConnection *old_connection, TcpConnection *new_connection) |
ACE_Reactor * | get_reactor () |
bool | gracefully_disconnected () |
Protected Member Functions | |
virtual ssize_t | receive_bytes (iovec iov[], int n, ACE_INET_Addr &remote_address, ACE_HANDLE fd) |
Only our subclass knows how to do this. | |
virtual void | deliver_sample (ReceivedDataSample &sample, const ACE_INET_Addr &remote_address) |
Called when there is a ReceivedDataSample to be delivered. | |
virtual int | start_i () |
Let the subclass start. | |
virtual void | stop_i () |
Let the subclass stop. | |
virtual void | relink (bool do_suspend=true) |
Private Attributes | |
TcpDataLink & | link_ |
TransportReactorTask_rch | reactor_task_ |
Definition at line 24 of file TcpReceiveStrategy.h.
OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL OpenDDS::DCPS::TcpReceiveStrategy::TcpReceiveStrategy | ( | TcpDataLink & | link, | |
const TransportReactorTask_rch & | task | |||
) |
Definition at line 23 of file TcpReceiveStrategy.cpp.
References DBG_ENTRY_LVL.
00026 : link_(link) 00027 , reactor_task_(task) 00028 { 00029 DBG_ENTRY_LVL("TcpReceiveStrategy","TcpReceiveStrategy",6); 00030 }
OpenDDS::DCPS::TcpReceiveStrategy::~TcpReceiveStrategy | ( | ) | [virtual] |
Definition at line 32 of file TcpReceiveStrategy.cpp.
References DBG_ENTRY_LVL.
00033 { 00034 DBG_ENTRY_LVL("TcpReceiveStrategy","~TcpReceiveStrategy",6); 00035 }
void OpenDDS::DCPS::TcpReceiveStrategy::deliver_sample | ( | ReceivedDataSample & | sample, | |
const ACE_INET_Addr & | remote_address | |||
) | [protected, virtual] |
Called when there is a ReceivedDataSample to be delivered.
Implements OpenDDS::DCPS::TransportReceiveStrategy<>.
Definition at line 57 of file TcpReceiveStrategy.cpp.
References DBG_ENTRY_LVL, OpenDDS::DCPS::GRACEFUL_DISCONNECT, OpenDDS::DCPS::ReceivedDataSample::header_, LM_DEBUG, OpenDDS::DCPS::DataSampleHeader::message_id_, OpenDDS::DCPS::REQUEST_ACK, OpenDDS::DCPS::SAMPLE_ACK, and VDBG.
00058 { 00059 DBG_ENTRY_LVL("TcpReceiveStrategy","deliver_sample",6); 00060 00061 if (sample.header_.message_id_ == GRACEFUL_DISCONNECT) { 00062 VDBG((LM_DEBUG, "(%P|%t) DBG: received GRACEFUL_DISCONNECT \n")); 00063 this->gracefully_disconnected_ = true; 00064 } 00065 else if (sample.header_.message_id_ == REQUEST_ACK) { 00066 VDBG((LM_DEBUG, "(%P|%t) DBG: received REQUEST_ACK \n")); 00067 link_.request_ack_received(sample); 00068 } 00069 else if (sample.header_.message_id_ == SAMPLE_ACK) { 00070 VDBG((LM_DEBUG, "(%P|%t) DBG: received SAMPLE_ACK \n")); 00071 link_.ack_received(sample); 00072 } 00073 else { 00074 link_.data_received(sample); 00075 } 00076 }
OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL ACE_INLINE ACE_Reactor * OpenDDS::DCPS::TcpReceiveStrategy::get_reactor | ( | void | ) |
Definition at line 14 of file TcpReceiveStrategy.inl.
References DBG_ENTRY_LVL, and reactor_task_.
00015 { 00016 DBG_ENTRY_LVL("TcpReceiveStrategy","get_reactor",6); 00017 return this->reactor_task_->get_reactor(); 00018 }
ACE_INLINE bool OpenDDS::DCPS::TcpReceiveStrategy::gracefully_disconnected | ( | ) |
Definition at line 21 of file TcpReceiveStrategy.inl.
References OpenDDS::DCPS::TransportReceiveStrategy<>::gracefully_disconnected_.
00022 { 00023 return this->gracefully_disconnected_; 00024 }
ssize_t OpenDDS::DCPS::TcpReceiveStrategy::receive_bytes | ( | iovec | iov[], | |
int | n, | |||
ACE_INET_Addr & | remote_address, | |||
ACE_HANDLE | fd | |||
) | [protected, virtual] |
Only our subclass knows how to do this.
Implements OpenDDS::DCPS::TransportReceiveStrategy<>.
Definition at line 38 of file TcpReceiveStrategy.cpp.
References DBG_ENTRY_LVL, OpenDDS::DCPS::TcpDataLink::get_connection(), and link_.
00043 { 00044 DBG_ENTRY_LVL("TcpReceiveStrategy", "receive_bytes", 6); 00045 00046 // We don't do anything to the remote_address for the Tcp case. 00047 TcpConnection_rch connection = link_.get_connection(); 00048 if (!connection) { 00049 return 0; 00050 } 00051 00052 return connection->peer().recvv(iov, n); 00053 }
void OpenDDS::DCPS::TcpReceiveStrategy::relink | ( | bool | do_suspend = true |
) | [protected, virtual] |
The subclass needs to provide the implementation for re-establishing the datalink. This is called when recv returns an error.
Reimplemented from OpenDDS::DCPS::TransportReceiveStrategy<>.
Definition at line 151 of file TcpReceiveStrategy.cpp.
References DBG_ENTRY_LVL, OpenDDS::DCPS::TcpDataLink::get_connection(), and link_.
00152 { 00153 DBG_ENTRY_LVL("TcpReceiveStrategy","relink",6); 00154 TcpConnection_rch connection = link_.get_connection(); 00155 if (connection) 00156 connection->relink_from_recv(do_suspend); 00157 }
int OpenDDS::DCPS::TcpReceiveStrategy::reset | ( | TcpConnection * | old_connection, | |
TcpConnection * | new_connection | |||
) |
Definition at line 114 of file TcpReceiveStrategy.cpp.
References DBG_ENTRY_LVL, ACE_Event_Handler::DONT_CALL, OpenDDS::DCPS::TcpDataLink::drop_pending_request_acks(), link_, LM_ERROR, reactor_task_, and ACE_Event_Handler::READ_MASK.
Referenced by OpenDDS::DCPS::TcpDataLink::reconnect(), and OpenDDS::DCPS::TcpDataLink::reuse_existing_connection().
00115 { 00116 DBG_ENTRY_LVL("TcpReceiveStrategy","reset",6); 00117 // Unregister the old handle 00118 if (old_connection) { 00119 this->reactor_task_->get_reactor()->remove_handler 00120 (old_connection, 00121 ACE_Event_Handler::READ_MASK | 00122 ACE_Event_Handler::DONT_CALL); 00123 } 00124 00125 link_.drop_pending_request_acks(); 00126 00127 // Give the reactor its own "copy" of the reference to the connection object. 00128 00129 if (this->reactor_task_->get_reactor()->register_handler 00130 (new_connection, 00131 ACE_Event_Handler::READ_MASK) == -1) { 00132 // Take back the "copy" we made. 00133 ACE_ERROR_RETURN((LM_ERROR, 00134 "(%P|%t) ERROR: TcpReceiveStrategy::reset TcpConnection can't register with " 00135 "reactor\n"), 00136 -1); 00137 } 00138 00139 return 0; 00140 }
int OpenDDS::DCPS::TcpReceiveStrategy::start_i | ( | ) | [protected, virtual] |
Let the subclass start.
Implements OpenDDS::DCPS::TransportReceiveStrategy<>.
Definition at line 79 of file TcpReceiveStrategy.cpp.
References ACE_TEXT(), DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::TcpDataLink::get_connection(), OpenDDS::DCPS::RcHandle< T >::in(), link_, LM_DEBUG, LM_ERROR, reactor_task_, and ACE_Event_Handler::READ_MASK.
00080 { 00081 DBG_ENTRY_LVL("TcpReceiveStrategy","start_i",6); 00082 00083 TcpConnection_rch connection = link_.get_connection(); 00084 00085 if (DCPS_debug_level > 9) { 00086 std::stringstream buffer; 00087 buffer << link_; 00088 ACE_DEBUG((LM_DEBUG, 00089 ACE_TEXT("(%P|%t) TcpReceiveStrategy::start_i() - ") 00090 ACE_TEXT("link:\n%C connected to %C:%d ") 00091 ACE_TEXT("registering with reactor to receive.\n"), 00092 buffer.str().c_str(), 00093 connection->get_remote_address().get_host_name(), 00094 connection->get_remote_address().get_port_number())); 00095 } 00096 00097 if (this->reactor_task_->get_reactor()->register_handler 00098 (connection.in(), 00099 ACE_Event_Handler::READ_MASK) == -1) { 00100 // Take back the "copy" we made. 00101 ACE_ERROR_RETURN((LM_ERROR, 00102 "(%P|%t) ERROR: TcpReceiveStrategy::start_i TcpConnection can't register with " 00103 "reactor %@ %p\n", connection.in(), ACE_TEXT("register_handler")), 00104 -1); 00105 } 00106 00107 return 0; 00108 }
void OpenDDS::DCPS::TcpReceiveStrategy::stop_i | ( | ) | [protected, virtual] |
Let the subclass stop.
Implements OpenDDS::DCPS::TransportReceiveStrategy<>.
Definition at line 143 of file TcpReceiveStrategy.cpp.
References DBG_ENTRY_LVL, OpenDDS::DCPS::TcpDataLink::drop_pending_request_acks(), and link_.
00144 { 00145 DBG_ENTRY_LVL("TcpReceiveStrategy","stop_i",6); 00146 00147 link_.drop_pending_request_acks(); 00148 }
Definition at line 60 of file TcpReceiveStrategy.h.
Referenced by receive_bytes(), relink(), reset(), start_i(), and stop_i().
Definition at line 61 of file TcpReceiveStrategy.h.
Referenced by get_reactor(), reset(), and start_i().