TcpSendStrategy.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "Tcp_pch.h"
00009 #include "TcpConnection.h"
00010 #include "TcpSendStrategy.h"
00011 #include "TcpTransport.h"
00012 #include "TcpInst.h"
00013 #include "TcpSynchResource.h"
00014 #include "TcpDataLink.h"
00015 #include "dds/DCPS/transport/framework/ThreadSynch.h"
00016 #include "dds/DCPS/transport/framework/ScheduleOutputHandler.h"
00017 #include "dds/DCPS/transport/framework/TransportReactorTask.h"
00018 #include "dds/DCPS/transport/framework/ReactorSynchStrategy.h"
00019 
00020 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00021 
00022 OpenDDS::DCPS::TcpSendStrategy::TcpSendStrategy(
00023   std::size_t id,
00024   TcpDataLink& link,
00025   TcpSynchResource* synch_resource,
00026   const TransportReactorTask_rch& task,
00027   Priority priority)
00028   : TransportSendStrategy(id, link.impl(),
00029                           synch_resource, priority,
00030                           make_rch<ReactorSynchStrategy>(this,task->get_reactor()))
00031   , link_(link)
00032   , reactor_task_(task)
00033 {
00034   DBG_ENTRY_LVL("TcpSendStrategy","TcpSendStrategy",6);
00035 
00036 }
00037 
00038 OpenDDS::DCPS::TcpSendStrategy::~TcpSendStrategy()
00039 {
00040   DBG_ENTRY_LVL("TcpSendStrategy","~TcpSendStrategy",6);
00041 }
00042 
00043 void
00044 OpenDDS::DCPS::TcpSendStrategy::schedule_output()
00045 {
00046   DBG_ENTRY_LVL("TcpSendStrategy","schedule_output",6);
00047 
00048   // Notify the reactor to adjust its processing policy according to mode_.
00049   synch()->work_available();
00050 
00051   if (DCPS_debug_level > 4) {
00052     const char* action = "";
00053     if( mode() == MODE_DIRECT) {
00054       action = "canceling";
00055     } else if( (mode() == MODE_QUEUE)
00056             || (mode() == MODE_SUSPEND)) {
00057       action = "starting";
00058     }
00059     ACE_DEBUG((LM_DEBUG,
00060                ACE_TEXT("(%P|%t) TcpSendStrategy::schedule_output() [%d] - ")
00061                ACE_TEXT("%C data queueing for handle %d.\n"),
00062                id(),action,get_handle()));
00063   }
00064 }
00065 
00066 int
00067 OpenDDS::DCPS::TcpSendStrategy::reset(bool reset_mode)
00068 {
00069   DBG_ENTRY_LVL("TcpSendStrategy","reset",6);
00070   //For the case of a send_strategy being reused for a new connection (not reconnect)
00071   //need to reset the state
00072   if (reset_mode) {
00073     //Need to make sure that the send mode is set back to MODE_DIRECT in case
00074     //it was terminated prior to being reused/reset.
00075     this->clear(MODE_DIRECT);
00076     //reset graceful_disconnecting_ to initial state
00077     this->set_graceful_disconnecting(false);
00078   }
00079   return 0;
00080 }
00081 
00082 ssize_t
00083 OpenDDS::DCPS::TcpSendStrategy::send_bytes(const iovec iov[], int n, int& bp)
00084 {
00085   DBG_ENTRY_LVL("TcpSendStrategy","send_bytes",6);
00086   return this->non_blocking_send(iov, n, bp);
00087 }
00088 
00089 ACE_HANDLE
00090 OpenDDS::DCPS::TcpSendStrategy::get_handle()
00091 {
00092   TcpConnection_rch connection = link_.get_connection();
00093 
00094   if (!connection)
00095     return ACE_INVALID_HANDLE;
00096 
00097   return connection->peer().get_handle();
00098 }
00099 
00100 ssize_t
00101 OpenDDS::DCPS::TcpSendStrategy::send_bytes_i(const iovec iov[], int n)
00102 {
00103   TcpConnection_rch connection = link_.get_connection();
00104 
00105   if (!connection)
00106     return -1;
00107   ssize_t result = connection->peer().sendv(iov, n);
00108   if (DCPS_debug_level > 4)
00109     ACE_DEBUG((LM_DEBUG, "(%P|%t) TcpSendStrategy::send_bytes_i sent %d bytes \n", result));
00110 
00111   return result;
00112 }
00113 
00114 void
00115 OpenDDS::DCPS::TcpSendStrategy::relink(bool do_suspend)
00116 {
00117   DBG_ENTRY_LVL("TcpSendStrategy","relink",6);
00118   TcpConnection_rch connection = link_.get_connection();
00119 
00120   if (connection) {
00121     connection->relink_from_send(do_suspend);
00122   }
00123 }
00124 
00125 void
00126 OpenDDS::DCPS::TcpSendStrategy::stop_i()
00127 {
00128   DBG_ENTRY_LVL("TcpSendStrategy","stop_i",6);
00129 }
00130 
00131 void
00132 OpenDDS::DCPS::TcpSendStrategy::add_delayed_notification(TransportQueueElement* element)
00133 {
00134   if (!element->is_request_ack()) {
00135     // only add the notification when we are not sending REQUEST_ACK message
00136     TransportSendStrategy::add_delayed_notification(element);
00137   }
00138 }
00139 
00140 OPENDDS_END_VERSIONED_NAMESPACE_DECL
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1