#include <TcpReceiveStrategy.h>
Inheritance diagram for OpenDDS::DCPS::TcpReceiveStrategy:
Public Member Functions | |
TcpReceiveStrategy (const TcpDataLink_rch &link, const TcpConnection_rch &connection, const TransportReactorTask_rch &task) | |
virtual | ~TcpReceiveStrategy () |
int | reset (TcpConnection *connection) |
ACE_Reactor * | get_reactor () |
bool | gracefully_disconnected () |
Protected Member Functions | |
virtual ssize_t | receive_bytes (iovec iov[], int n, ACE_INET_Addr &remote_address, ACE_HANDLE fd) |
Only our subclass knows how to do this. | |
virtual void | deliver_sample (ReceivedDataSample &sample, const ACE_INET_Addr &remote_address) |
Called when there is a ReceivedDataSample to be delivered. | |
virtual int | start_i () |
Let the subclass start. | |
virtual void | stop_i () |
Let the subclass stop. | |
virtual void | relink (bool do_suspend=true) |
Private Attributes | |
TcpDataLink_rch | link_ |
TcpConnection_rch | connection_ |
TransportReactorTask_rch | reactor_task_ |
Definition at line 21 of file TcpReceiveStrategy.h.
OpenDDS::DCPS::TcpReceiveStrategy::TcpReceiveStrategy | ( | const TcpDataLink_rch & | link, | |
const TcpConnection_rch & | connection, | |||
const TransportReactorTask_rch & | task | |||
) |
Definition at line 21 of file TcpReceiveStrategy.cpp.
References DBG_ENTRY_LVL.
00025 : link_(link) 00026 , connection_(connection) 00027 , reactor_task_(task) 00028 { 00029 DBG_ENTRY_LVL("TcpReceiveStrategy","TcpReceiveStrategy",6); 00030 }
OpenDDS::DCPS::TcpReceiveStrategy::~TcpReceiveStrategy | ( | ) | [virtual] |
Definition at line 32 of file TcpReceiveStrategy.cpp.
References DBG_ENTRY_LVL.
00033 { 00034 DBG_ENTRY_LVL("TcpReceiveStrategy","~TcpReceiveStrategy",6); 00035 }
void OpenDDS::DCPS::TcpReceiveStrategy::deliver_sample | ( | ReceivedDataSample & | sample, | |
const ACE_INET_Addr & | remote_address | |||
) | [protected, virtual] |
Called when there is a ReceivedDataSample to be delivered.
Implements OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >.
Definition at line 59 of file TcpReceiveStrategy.cpp.
References DBG_ENTRY_LVL, OpenDDS::DCPS::GRACEFUL_DISCONNECT, OpenDDS::DCPS::ReceivedDataSample::header_, OpenDDS::DCPS::DataSampleHeader::message_id_, and VDBG.
00060 { 00061 DBG_ENTRY_LVL("TcpReceiveStrategy","deliver_sample",6); 00062 if (sample.header_.message_id_ == GRACEFUL_DISCONNECT) { 00063 VDBG((LM_DEBUG, "(%P|%t) DBG: received GRACEFUL_DISCONNECT \n")); 00064 this->gracefully_disconnected_ = true; 00065 00066 } else { 00067 this->link_->data_received(sample); 00068 } 00069 }
ACE_INLINE ACE_Reactor * OpenDDS::DCPS::TcpReceiveStrategy::get_reactor | ( | ) |
Definition at line 12 of file TcpReceiveStrategy.inl.
References DBG_ENTRY_LVL, and reactor_task_.
Referenced by OpenDDS::DCPS::TcpConnection::transfer().
00013 { 00014 DBG_ENTRY_LVL("TcpReceiveStrategy","get_reactor",6); 00015 return this->reactor_task_->get_reactor(); 00016 }
ACE_INLINE bool OpenDDS::DCPS::TcpReceiveStrategy::gracefully_disconnected | ( | ) |
Definition at line 19 of file TcpReceiveStrategy.inl.
References OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::gracefully_disconnected_.
Referenced by OpenDDS::DCPS::TcpDataLink::pre_stop_i().
00020 { 00021 return this->gracefully_disconnected_; 00022 }
ssize_t OpenDDS::DCPS::TcpReceiveStrategy::receive_bytes | ( | iovec | iov[], | |
int | n, | |||
ACE_INET_Addr & | remote_address, | |||
ACE_HANDLE | fd | |||
) | [protected, virtual] |
Only our subclass knows how to do this.
Implements OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >.
Definition at line 38 of file TcpReceiveStrategy.cpp.
References connection_, DBG_ENTRY_LVL, and OpenDDS::DCPS::RcHandle< T >::is_nil().
00043 { 00044 DBG_ENTRY_LVL("TcpReceiveStrategy", "receive_bytes", 6); 00045 00046 // We don't do anything to the remote_address for the Tcp case. 00047 00048 TcpConnection_rch connection = this->connection_; 00049 00050 if (connection.is_nil()) { 00051 return 0; 00052 } 00053 00054 return connection->peer().recvv(iov, n); 00055 }
void OpenDDS::DCPS::TcpReceiveStrategy::relink | ( | bool | do_suspend = true |
) | [protected, virtual] |
The subclass needs to provide the implementation for re-establishing the datalink. This is called when recv returns an error.
Reimplemented from OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >.
Definition at line 196 of file TcpReceiveStrategy.cpp.
References DBG_ENTRY_LVL.
00197 { 00198 DBG_ENTRY_LVL("TcpReceiveStrategy","relink",6); 00199 00200 if (!this->connection_.is_nil()) 00201 this->connection_->relink_from_recv(do_suspend); 00202 }
int OpenDDS::DCPS::TcpReceiveStrategy::reset | ( | TcpConnection * | connection | ) |
Definition at line 115 of file TcpReceiveStrategy.cpp.
References OpenDDS::DCPS::RcObject< T >::_add_ref(), connection_, DBG_ENTRY_LVL, and reactor_task_.
Referenced by OpenDDS::DCPS::TcpDataLink::reconnect(), and OpenDDS::DCPS::TcpDataLink::reuse_existing_connection().
00116 { 00117 DBG_ENTRY_LVL("TcpReceiveStrategy","reset",6); 00118 00119 // Sanity check - this connection is passed in from the constructor and 00120 // it should not be nil. 00121 if (this->connection_.is_nil()) { 00122 ACE_ERROR_RETURN((LM_ERROR, 00123 "(%P|%t) ERROR: TcpReceiveStrategy::reset previous connection " 00124 "should not be nil.\n"), 00125 -1); 00126 } 00127 00128 if (this->connection_.in() == connection) { 00129 ACE_ERROR_RETURN((LM_ERROR, 00130 "(%P|%t) ERROR: TcpReceiveStrategy::reset should not be called" 00131 " to replace the same connection.\n"), 00132 -1); 00133 } 00134 00135 // Unregister the old handle 00136 this->reactor_task_->get_reactor()->remove_handler 00137 (this->connection_.in(), 00138 ACE_Event_Handler::READ_MASK | 00139 ACE_Event_Handler::DONT_CALL); 00140 00141 // Take back the "copy" we made (see start_i() implementation). 00142 this->connection_->_remove_ref(); 00143 00144 // This will cause the connection_ object to drop its reference to this 00145 // TransportReceiveStrategy object. 00146 this->connection_->remove_receive_strategy(); 00147 00148 // Replace with a new connection. 00149 connection->_add_ref(); 00150 this->connection_ = connection; 00151 00152 // Tell the TcpConnection that we are the object that it should 00153 // call when it receives a handle_input() "event", and we will carry 00154 // it out. The TcpConnection object will make a "copy" of the 00155 // reference (to this object) that we pass-in here. 00156 this->connection_->set_receive_strategy(this); 00157 00158 // Give the reactor its own "copy" of the reference to the connection object. 00159 this->connection_->_add_ref(); 00160 00161 if (this->reactor_task_->get_reactor()->register_handler 00162 (this->connection_.in(), 00163 ACE_Event_Handler::READ_MASK) == -1) { 00164 // Take back the "copy" we made. 00165 this->connection_->_remove_ref(); 00166 ACE_ERROR_RETURN((LM_ERROR, 00167 "(%P|%t) ERROR: TcpReceiveStrategy::reset TcpConnection can't register with " 00168 "reactor\n"), 00169 -1); 00170 } 00171 00172 return 0; 00173 }
int OpenDDS::DCPS::TcpReceiveStrategy::start_i | ( | ) | [protected, virtual] |
Let the subclass start.
Implements OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >.
Definition at line 72 of file TcpReceiveStrategy.cpp.
References connection_, DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::RcHandle< T >::in(), and link_.
00073 { 00074 DBG_ENTRY_LVL("TcpReceiveStrategy","start_i",6); 00075 00076 // Tell the TcpConnection that we are the object that it should 00077 // call when it receives a handle_input() "event", and we will carry 00078 // it out. The TcpConnection object will make a "copy" of the 00079 // reference (to this object) that we pass-in here. 00080 this->connection_->set_receive_strategy(this); 00081 00082 // Give the reactor its own "copy" of the reference to the connection object. 00083 this->connection_->_add_ref(); 00084 00085 if (DCPS_debug_level > 9) { 00086 std::stringstream buffer; 00087 buffer << *this->link_.in(); 00088 ACE_DEBUG((LM_DEBUG, 00089 ACE_TEXT("(%P|%t) TcpReceiveStrategy::start_i() - ") 00090 ACE_TEXT("link:\n%C connected to %C:%d ") 00091 ACE_TEXT("registering with reactor to receive.\n"), 00092 buffer.str().c_str(), 00093 this->connection_->get_remote_address().get_host_name(), 00094 this->connection_->get_remote_address().get_port_number())); 00095 } 00096 00097 if (this->reactor_task_->get_reactor()->register_handler 00098 (this->connection_.in(), 00099 ACE_Event_Handler::READ_MASK) == -1) { 00100 // Take back the "copy" we made. 00101 this->connection_->_remove_ref(); 00102 ACE_ERROR_RETURN((LM_ERROR, 00103 "(%P|%t) ERROR: TcpReceiveStrategy::start_i TcpConnection can't register with " 00104 "reactor %@ %p\n", this->connection_.in(), ACE_TEXT("register_handler")), 00105 -1); 00106 } 00107 00108 return 0; 00109 }
void OpenDDS::DCPS::TcpReceiveStrategy::stop_i | ( | ) | [protected, virtual] |
Let the subclass stop.
Implements OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >.
Definition at line 176 of file TcpReceiveStrategy.cpp.
References connection_, DBG_ENTRY_LVL, and reactor_task_.
00177 { 00178 DBG_ENTRY_LVL("TcpReceiveStrategy","stop_i",6); 00179 00180 this->reactor_task_->get_reactor()->remove_handler 00181 (this->connection_.in(), 00182 ACE_Event_Handler::READ_MASK | 00183 ACE_Event_Handler::DONT_CALL); 00184 00185 // Take back the "copy" we made (see start_i() implementation). 00186 this->connection_->_remove_ref(); 00187 00188 // This will cause the connection_ object to drop its reference to this 00189 // TransportReceiveStrategy object. 00190 this->connection_->remove_receive_strategy(); 00191 00192 this->connection_ = 0; 00193 }
Definition at line 56 of file TcpReceiveStrategy.h.
Referenced by receive_bytes(), reset(), start_i(), and stop_i().
Definition at line 57 of file TcpReceiveStrategy.h.
Referenced by get_reactor(), reset(), and stop_i().