TcpSendStrategy.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "Tcp_pch.h"
00009 #include "TcpConnection.h"
00010 #include "TcpSendStrategy.h"
00011 #include "TcpTransport.h"
00012 #include "TcpInst.h"
00013 #include "TcpSynchResource.h"
00014 #include "TcpDataLink.h"
00015 #include "dds/DCPS/transport/framework/ThreadSynch.h"
00016 #include "dds/DCPS/transport/framework/ScheduleOutputHandler.h"
00017 #include "dds/DCPS/transport/framework/TransportReactorTask.h"
00018 #include "dds/DCPS/transport/framework/ReactorSynchStrategy.h"
00019 
00020 OpenDDS::DCPS::TcpSendStrategy::TcpSendStrategy(
00021   std::size_t id,
00022   const TcpDataLink_rch& link,
00023   const TcpInst_rch& config,
00024   const TcpConnection_rch& connection,
00025   TcpSynchResource* synch_resource,
00026   const TransportReactorTask_rch& task,
00027   Priority priority)
00028   : TransportSendStrategy(id, static_rchandle_cast<TransportInst>(config),
00029                           synch_resource, priority,
00030                           new ReactorSynchStrategy(
00031                                 this,
00032                                 task->get_reactor()))
00033   , connection_(connection)
00034   , link_(link)
00035   , reactor_task_(task)
00036 {
00037   DBG_ENTRY_LVL("TcpSendStrategy","TcpSendStrategy",6);
00038 
00039   connection->set_send_strategy(this);
00040 }
00041 
00042 OpenDDS::DCPS::TcpSendStrategy::~TcpSendStrategy()
00043 {
00044   DBG_ENTRY_LVL("TcpSendStrategy","~TcpSendStrategy",6);
00045 }
00046 
00047 void
00048 OpenDDS::DCPS::TcpSendStrategy::schedule_output()
00049 {
00050   DBG_ENTRY_LVL("TcpSendStrategy","schedule_output",6);
00051 
00052   // Notify the reactor to adjust its processing policy according to mode_.
00053   synch()->work_available();
00054 
00055   if (DCPS_debug_level > 4) {
00056     const char* action = "";
00057     if( mode() == MODE_DIRECT) {
00058       action = "canceling";
00059     } else if( (mode() == MODE_QUEUE)
00060             || (mode() == MODE_SUSPEND)) {
00061       action = "starting";
00062     }
00063     ACE_DEBUG((LM_DEBUG,
00064                ACE_TEXT("(%P|%t) TcpSendStrategy::schedule_output() [%d] - ")
00065                ACE_TEXT("%C data queueing for handle %d.\n"),
00066                id(),action,get_handle()));
00067   }
00068 }
00069 
00070 int
00071 OpenDDS::DCPS::TcpSendStrategy::reset(TcpConnection* connection, bool reset_mode)
00072 {
00073   DBG_ENTRY_LVL("TcpSendStrategy","reset",6);
00074 
00075   // Sanity check - this connection is passed in from the constructor and
00076   // it should not be nil.
00077   if (this->connection_.is_nil()) {
00078     ACE_ERROR_RETURN((LM_ERROR,
00079                       "(%P|%t) ERROR: TcpSendStrategy::reset  previous connection "
00080                       "should not be nil.\n"),
00081                      -1);
00082   }
00083 
00084   if (this->connection_.in() == connection) {
00085     ACE_ERROR_RETURN((LM_ERROR,
00086                       "(%P|%t) ERROR: TcpSendStrategy::reset should not be called"
00087                       " to replace the same connection.\n"),
00088                      -1);
00089   }
00090 
00091   // This will cause the connection_ object to drop its reference to this
00092   // TransportSendStrategy object.
00093   this->connection_->remove_send_strategy();
00094 
00095   // Replace with a new connection.
00096   connection->_add_ref();
00097   this->connection_ = connection;
00098 
00099   // Tell the TcpConnection that we are the object that it should
00100   // call when it receives a handle_input() "event", and we will carry
00101   // it out.  The TcpConnection object will make a "copy" of the
00102   // reference (to this object) that we pass-in here.
00103   this->connection_->set_send_strategy(this);
00104 
00105   //For the case of a send_strategy being reused for a new connection (not reconnect)
00106   //need to reset the state
00107   if (reset_mode) {
00108     //Need to make sure that the send mode is set back to MODE_DIRECT in case
00109     //it was terminated prior to being reused/reset.
00110     this->clear(MODE_DIRECT);
00111     //reset graceful_disconnecting_ to initial state
00112     this->set_graceful_disconnecting(false);
00113   }
00114   return 0;
00115 }
00116 
00117 ssize_t
00118 OpenDDS::DCPS::TcpSendStrategy::send_bytes(const iovec iov[], int n, int& bp)
00119 {
00120   DBG_ENTRY_LVL("TcpSendStrategy","send_bytes",6);
00121   return this->non_blocking_send(iov, n, bp);
00122 }
00123 
00124 ACE_HANDLE
00125 OpenDDS::DCPS::TcpSendStrategy::get_handle()
00126 {
00127   TcpConnection_rch connection = this->connection_;
00128 
00129   if (connection.is_nil())
00130     return ACE_INVALID_HANDLE;
00131 
00132   return connection->peer().get_handle();
00133 }
00134 
00135 ssize_t
00136 OpenDDS::DCPS::TcpSendStrategy::send_bytes_i(const iovec iov[], int n)
00137 {
00138   TcpConnection_rch connection = this->connection_;
00139 
00140   if (connection.is_nil())
00141     return -1;
00142   ssize_t result = connection->peer().sendv(iov, n);
00143   if (DCPS_debug_level > 4)
00144     ACE_DEBUG((LM_DEBUG, "(%P|%t) TcpSendStrategy::send_bytes_i sent %d bytes \n", result));
00145 
00146   return result;
00147 }
00148 
00149 void
00150 OpenDDS::DCPS::TcpSendStrategy::relink(bool do_suspend)
00151 {
00152   DBG_ENTRY_LVL("TcpSendStrategy","relink",6);
00153 
00154   if (!this->connection_.is_nil()) {
00155     this->connection_->relink_from_send(do_suspend);
00156   }
00157 }
00158 
00159 void
00160 OpenDDS::DCPS::TcpSendStrategy::stop_i()
00161 {
00162   DBG_ENTRY_LVL("TcpSendStrategy","stop_i",6);
00163 
00164   // This will cause the connection_ object to drop its reference to this
00165   // TransportSendStrategy object.
00166   this->connection_->remove_send_strategy();
00167 
00168   // Take back the "copy" of connection object given. (see constructor).
00169   this->connection_ = 0;
00170 }

Generated on Fri Feb 12 20:05:27 2016 for OpenDDS by  doxygen 1.4.7