00001
00002
00003
00004
00005
00006
00007
00008 #include "Tcp_pch.h"
00009 #include "TcpReceiveStrategy.h"
00010 #include "TcpSendStrategy.h"
00011 #include "TcpTransport.h"
00012 #include "TcpDataLink.h"
00013 #include "TcpConnection.h"
00014
00015 #include <sstream>
00016
00017 #if !defined (__ACE_INLINE__)
00018 #include "TcpReceiveStrategy.inl"
00019 #endif
00020
00021 OpenDDS::DCPS::TcpReceiveStrategy::TcpReceiveStrategy(
00022 const TcpDataLink_rch& link,
00023 const TcpConnection_rch& connection,
00024 const TransportReactorTask_rch& task)
00025 : link_(link)
00026 , connection_(connection)
00027 , reactor_task_(task)
00028 {
00029 DBG_ENTRY_LVL("TcpReceiveStrategy","TcpReceiveStrategy",6);
00030 }
00031
00032 OpenDDS::DCPS::TcpReceiveStrategy::~TcpReceiveStrategy()
00033 {
00034 DBG_ENTRY_LVL("TcpReceiveStrategy","~TcpReceiveStrategy",6);
00035 }
00036
00037 ssize_t
00038 OpenDDS::DCPS::TcpReceiveStrategy::receive_bytes(
00039 iovec iov[],
00040 int n,
00041 ACE_INET_Addr& ,
00042 ACE_HANDLE )
00043 {
00044 DBG_ENTRY_LVL("TcpReceiveStrategy", "receive_bytes", 6);
00045
00046
00047
00048 TcpConnection_rch connection = this->connection_;
00049
00050 if (connection.is_nil()) {
00051 return 0;
00052 }
00053
00054 return connection->peer().recvv(iov, n);
00055 }
00056
00057 void
00058 OpenDDS::DCPS::TcpReceiveStrategy::deliver_sample
00059 (ReceivedDataSample& sample, const ACE_INET_Addr&)
00060 {
00061 DBG_ENTRY_LVL("TcpReceiveStrategy","deliver_sample",6);
00062 if (sample.header_.message_id_ == GRACEFUL_DISCONNECT) {
00063 VDBG((LM_DEBUG, "(%P|%t) DBG: received GRACEFUL_DISCONNECT \n"));
00064 this->gracefully_disconnected_ = true;
00065
00066 } else {
00067 this->link_->data_received(sample);
00068 }
00069 }
00070
00071 int
00072 OpenDDS::DCPS::TcpReceiveStrategy::start_i()
00073 {
00074 DBG_ENTRY_LVL("TcpReceiveStrategy","start_i",6);
00075
00076
00077
00078
00079
00080 this->connection_->set_receive_strategy(this);
00081
00082
00083 this->connection_->_add_ref();
00084
00085 if (DCPS_debug_level > 9) {
00086 std::stringstream buffer;
00087 buffer << *this->link_.in();
00088 ACE_DEBUG((LM_DEBUG,
00089 ACE_TEXT("(%P|%t) TcpReceiveStrategy::start_i() - ")
00090 ACE_TEXT("link:\n%C connected to %C:%d ")
00091 ACE_TEXT("registering with reactor to receive.\n"),
00092 buffer.str().c_str(),
00093 this->connection_->get_remote_address().get_host_name(),
00094 this->connection_->get_remote_address().get_port_number()));
00095 }
00096
00097 if (this->reactor_task_->get_reactor()->register_handler
00098 (this->connection_.in(),
00099 ACE_Event_Handler::READ_MASK) == -1) {
00100
00101 this->connection_->_remove_ref();
00102 ACE_ERROR_RETURN((LM_ERROR,
00103 "(%P|%t) ERROR: TcpReceiveStrategy::start_i TcpConnection can't register with "
00104 "reactor %@ %p\n", this->connection_.in(), ACE_TEXT("register_handler")),
00105 -1);
00106 }
00107
00108 return 0;
00109 }
00110
00111
00112
00113
00114 int
00115 OpenDDS::DCPS::TcpReceiveStrategy::reset(TcpConnection* connection)
00116 {
00117 DBG_ENTRY_LVL("TcpReceiveStrategy","reset",6);
00118
00119
00120
00121 if (this->connection_.is_nil()) {
00122 ACE_ERROR_RETURN((LM_ERROR,
00123 "(%P|%t) ERROR: TcpReceiveStrategy::reset previous connection "
00124 "should not be nil.\n"),
00125 -1);
00126 }
00127
00128 if (this->connection_.in() == connection) {
00129 ACE_ERROR_RETURN((LM_ERROR,
00130 "(%P|%t) ERROR: TcpReceiveStrategy::reset should not be called"
00131 " to replace the same connection.\n"),
00132 -1);
00133 }
00134
00135
00136 this->reactor_task_->get_reactor()->remove_handler
00137 (this->connection_.in(),
00138 ACE_Event_Handler::READ_MASK |
00139 ACE_Event_Handler::DONT_CALL);
00140
00141
00142 this->connection_->_remove_ref();
00143
00144
00145
00146 this->connection_->remove_receive_strategy();
00147
00148
00149 connection->_add_ref();
00150 this->connection_ = connection;
00151
00152
00153
00154
00155
00156 this->connection_->set_receive_strategy(this);
00157
00158
00159 this->connection_->_add_ref();
00160
00161 if (this->reactor_task_->get_reactor()->register_handler
00162 (this->connection_.in(),
00163 ACE_Event_Handler::READ_MASK) == -1) {
00164
00165 this->connection_->_remove_ref();
00166 ACE_ERROR_RETURN((LM_ERROR,
00167 "(%P|%t) ERROR: TcpReceiveStrategy::reset TcpConnection can't register with "
00168 "reactor\n"),
00169 -1);
00170 }
00171
00172 return 0;
00173 }
00174
00175 void
00176 OpenDDS::DCPS::TcpReceiveStrategy::stop_i()
00177 {
00178 DBG_ENTRY_LVL("TcpReceiveStrategy","stop_i",6);
00179
00180 this->reactor_task_->get_reactor()->remove_handler
00181 (this->connection_.in(),
00182 ACE_Event_Handler::READ_MASK |
00183 ACE_Event_Handler::DONT_CALL);
00184
00185
00186 this->connection_->_remove_ref();
00187
00188
00189
00190 this->connection_->remove_receive_strategy();
00191
00192 this->connection_ = 0;
00193 }
00194
00195 void
00196 OpenDDS::DCPS::TcpReceiveStrategy::relink(bool do_suspend)
00197 {
00198 DBG_ENTRY_LVL("TcpReceiveStrategy","relink",6);
00199
00200 if (!this->connection_.is_nil())
00201 this->connection_->relink_from_recv(do_suspend);
00202 }