TcpReceiveStrategy.cpp
Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008 #include "Tcp_pch.h"
00009 #include "TcpReceiveStrategy.h"
00010 #include "TcpSendStrategy.h"
00011 #include "TcpTransport.h"
00012 #include "TcpDataLink.h"
00013 #include "TcpConnection.h"
00014
00015 #include <sstream>
00016
00017 #if !defined (__ACE_INLINE__)
00018 #include "TcpReceiveStrategy.inl"
00019 #endif
00020
00021 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00022
00023 OpenDDS::DCPS::TcpReceiveStrategy::TcpReceiveStrategy(
00024 TcpDataLink& link,
00025 const TransportReactorTask_rch& task)
00026 : link_(link)
00027 , reactor_task_(task)
00028 {
00029 DBG_ENTRY_LVL("TcpReceiveStrategy","TcpReceiveStrategy",6);
00030 }
00031
00032 OpenDDS::DCPS::TcpReceiveStrategy::~TcpReceiveStrategy()
00033 {
00034 DBG_ENTRY_LVL("TcpReceiveStrategy","~TcpReceiveStrategy",6);
00035 }
00036
00037 ssize_t
00038 OpenDDS::DCPS::TcpReceiveStrategy::receive_bytes(
00039 iovec iov[],
00040 int n,
00041 ACE_INET_Addr& ,
00042 ACE_HANDLE )
00043 {
00044 DBG_ENTRY_LVL("TcpReceiveStrategy", "receive_bytes", 6);
00045
00046
00047 TcpConnection_rch connection = link_.get_connection();
00048 if (!connection) {
00049 return 0;
00050 }
00051
00052 return connection->peer().recvv(iov, n);
00053 }
00054
00055 void
00056 OpenDDS::DCPS::TcpReceiveStrategy::deliver_sample
00057 (ReceivedDataSample& sample, const ACE_INET_Addr&)
00058 {
00059 DBG_ENTRY_LVL("TcpReceiveStrategy","deliver_sample",6);
00060
00061 if (sample.header_.message_id_ == GRACEFUL_DISCONNECT) {
00062 VDBG((LM_DEBUG, "(%P|%t) DBG: received GRACEFUL_DISCONNECT \n"));
00063 this->gracefully_disconnected_ = true;
00064 }
00065 else if (sample.header_.message_id_ == REQUEST_ACK) {
00066 VDBG((LM_DEBUG, "(%P|%t) DBG: received REQUEST_ACK \n"));
00067 link_.request_ack_received(sample);
00068 }
00069 else if (sample.header_.message_id_ == SAMPLE_ACK) {
00070 VDBG((LM_DEBUG, "(%P|%t) DBG: received SAMPLE_ACK \n"));
00071 link_.ack_received(sample);
00072 }
00073 else {
00074 link_.data_received(sample);
00075 }
00076 }
00077
00078 int
00079 OpenDDS::DCPS::TcpReceiveStrategy::start_i()
00080 {
00081 DBG_ENTRY_LVL("TcpReceiveStrategy","start_i",6);
00082
00083 TcpConnection_rch connection = link_.get_connection();
00084
00085 if (DCPS_debug_level > 9) {
00086 std::stringstream buffer;
00087 buffer << link_;
00088 ACE_DEBUG((LM_DEBUG,
00089 ACE_TEXT("(%P|%t) TcpReceiveStrategy::start_i() - ")
00090 ACE_TEXT("link:\n%C connected to %C:%d ")
00091 ACE_TEXT("registering with reactor to receive.\n"),
00092 buffer.str().c_str(),
00093 connection->get_remote_address().get_host_name(),
00094 connection->get_remote_address().get_port_number()));
00095 }
00096
00097 if (this->reactor_task_->get_reactor()->register_handler
00098 (connection.in(),
00099 ACE_Event_Handler::READ_MASK) == -1) {
00100
00101 ACE_ERROR_RETURN((LM_ERROR,
00102 "(%P|%t) ERROR: TcpReceiveStrategy::start_i TcpConnection can't register with "
00103 "reactor %@ %p\n", connection.in(), ACE_TEXT("register_handler")),
00104 -1);
00105 }
00106
00107 return 0;
00108 }
00109
00110
00111
00112
00113 int
00114 OpenDDS::DCPS::TcpReceiveStrategy::reset(TcpConnection* old_connection, TcpConnection* new_connection)
00115 {
00116 DBG_ENTRY_LVL("TcpReceiveStrategy","reset",6);
00117
00118 if (old_connection) {
00119 this->reactor_task_->get_reactor()->remove_handler
00120 (old_connection,
00121 ACE_Event_Handler::READ_MASK |
00122 ACE_Event_Handler::DONT_CALL);
00123 }
00124
00125 link_.drop_pending_request_acks();
00126
00127
00128
00129 if (this->reactor_task_->get_reactor()->register_handler
00130 (new_connection,
00131 ACE_Event_Handler::READ_MASK) == -1) {
00132
00133 ACE_ERROR_RETURN((LM_ERROR,
00134 "(%P|%t) ERROR: TcpReceiveStrategy::reset TcpConnection can't register with "
00135 "reactor\n"),
00136 -1);
00137 }
00138
00139 return 0;
00140 }
00141
00142 void
00143 OpenDDS::DCPS::TcpReceiveStrategy::stop_i()
00144 {
00145 DBG_ENTRY_LVL("TcpReceiveStrategy","stop_i",6);
00146
00147 link_.drop_pending_request_acks();
00148 }
00149
00150 void
00151 OpenDDS::DCPS::TcpReceiveStrategy::relink(bool do_suspend)
00152 {
00153 DBG_ENTRY_LVL("TcpReceiveStrategy","relink",6);
00154 TcpConnection_rch connection = link_.get_connection();
00155 if (connection)
00156 connection->relink_from_recv(do_suspend);
00157 }
00158
00159 OPENDDS_END_VERSIONED_NAMESPACE_DECL