00001
00002
00003
00004
00005
00006
00007
00008 #include "Tcp_pch.h"
00009 #include "TcpConnection.h"
00010 #include "TcpSendStrategy.h"
00011 #include "TcpTransport.h"
00012 #include "TcpInst.h"
00013 #include "TcpSynchResource.h"
00014 #include "TcpDataLink.h"
00015 #include "dds/DCPS/transport/framework/ThreadSynch.h"
00016 #include "dds/DCPS/transport/framework/ScheduleOutputHandler.h"
00017 #include "dds/DCPS/transport/framework/TransportReactorTask.h"
00018 #include "dds/DCPS/transport/framework/ReactorSynchStrategy.h"
00019
00020 OpenDDS::DCPS::TcpSendStrategy::TcpSendStrategy(
00021 std::size_t id,
00022 const TcpDataLink_rch& link,
00023 const TcpInst_rch& config,
00024 const TcpConnection_rch& connection,
00025 TcpSynchResource* synch_resource,
00026 const TransportReactorTask_rch& task,
00027 Priority priority)
00028 : TransportSendStrategy(id, static_rchandle_cast<TransportInst>(config),
00029 synch_resource, priority,
00030 new ReactorSynchStrategy(
00031 this,
00032 task->get_reactor()))
00033 , connection_(connection)
00034 , link_(link)
00035 , reactor_task_(task)
00036 {
00037 DBG_ENTRY_LVL("TcpSendStrategy","TcpSendStrategy",6);
00038
00039 connection->set_send_strategy(this);
00040 }
00041
00042 OpenDDS::DCPS::TcpSendStrategy::~TcpSendStrategy()
00043 {
00044 DBG_ENTRY_LVL("TcpSendStrategy","~TcpSendStrategy",6);
00045 }
00046
00047 void
00048 OpenDDS::DCPS::TcpSendStrategy::schedule_output()
00049 {
00050 DBG_ENTRY_LVL("TcpSendStrategy","schedule_output",6);
00051
00052
00053 synch()->work_available();
00054
00055 if (DCPS_debug_level > 4) {
00056 const char* action = "";
00057 if( mode() == MODE_DIRECT) {
00058 action = "canceling";
00059 } else if( (mode() == MODE_QUEUE)
00060 || (mode() == MODE_SUSPEND)) {
00061 action = "starting";
00062 }
00063 ACE_DEBUG((LM_DEBUG,
00064 ACE_TEXT("(%P|%t) TcpSendStrategy::schedule_output() [%d] - ")
00065 ACE_TEXT("%C data queueing for handle %d.\n"),
00066 id(),action,get_handle()));
00067 }
00068 }
00069
00070 int
00071 OpenDDS::DCPS::TcpSendStrategy::reset(TcpConnection* connection, bool reset_mode)
00072 {
00073 DBG_ENTRY_LVL("TcpSendStrategy","reset",6);
00074
00075
00076
00077 if (this->connection_.is_nil()) {
00078 ACE_ERROR_RETURN((LM_ERROR,
00079 "(%P|%t) ERROR: TcpSendStrategy::reset previous connection "
00080 "should not be nil.\n"),
00081 -1);
00082 }
00083
00084 if (this->connection_.in() == connection) {
00085 ACE_ERROR_RETURN((LM_ERROR,
00086 "(%P|%t) ERROR: TcpSendStrategy::reset should not be called"
00087 " to replace the same connection.\n"),
00088 -1);
00089 }
00090
00091
00092
00093 this->connection_->remove_send_strategy();
00094
00095
00096 connection->_add_ref();
00097 this->connection_ = connection;
00098
00099
00100
00101
00102
00103 this->connection_->set_send_strategy(this);
00104
00105
00106
00107 if (reset_mode) {
00108
00109
00110 this->clear(MODE_DIRECT);
00111
00112 this->set_graceful_disconnecting(false);
00113 }
00114 return 0;
00115 }
00116
00117 ssize_t
00118 OpenDDS::DCPS::TcpSendStrategy::send_bytes(const iovec iov[], int n, int& bp)
00119 {
00120 DBG_ENTRY_LVL("TcpSendStrategy","send_bytes",6);
00121 return this->non_blocking_send(iov, n, bp);
00122 }
00123
00124 ACE_HANDLE
00125 OpenDDS::DCPS::TcpSendStrategy::get_handle()
00126 {
00127 TcpConnection_rch connection = this->connection_;
00128
00129 if (connection.is_nil())
00130 return ACE_INVALID_HANDLE;
00131
00132 return connection->peer().get_handle();
00133 }
00134
00135 ssize_t
00136 OpenDDS::DCPS::TcpSendStrategy::send_bytes_i(const iovec iov[], int n)
00137 {
00138 TcpConnection_rch connection = this->connection_;
00139
00140 if (connection.is_nil())
00141 return -1;
00142 ssize_t result = connection->peer().sendv(iov, n);
00143 if (DCPS_debug_level > 4)
00144 ACE_DEBUG((LM_DEBUG, "(%P|%t) TcpSendStrategy::send_bytes_i sent %d bytes \n", result));
00145
00146 return result;
00147 }
00148
00149 void
00150 OpenDDS::DCPS::TcpSendStrategy::relink(bool do_suspend)
00151 {
00152 DBG_ENTRY_LVL("TcpSendStrategy","relink",6);
00153
00154 if (!this->connection_.is_nil()) {
00155 this->connection_->relink_from_send(do_suspend);
00156 }
00157 }
00158
00159 void
00160 OpenDDS::DCPS::TcpSendStrategy::stop_i()
00161 {
00162 DBG_ENTRY_LVL("TcpSendStrategy","stop_i",6);
00163
00164
00165
00166 this->connection_->remove_send_strategy();
00167
00168
00169 this->connection_ = 0;
00170 }