TcpSendStrategy.cpp
Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008 #include "Tcp_pch.h"
00009 #include "TcpConnection.h"
00010 #include "TcpSendStrategy.h"
00011 #include "TcpTransport.h"
00012 #include "TcpInst.h"
00013 #include "TcpSynchResource.h"
00014 #include "TcpDataLink.h"
00015 #include "dds/DCPS/transport/framework/ThreadSynch.h"
00016 #include "dds/DCPS/transport/framework/ScheduleOutputHandler.h"
00017 #include "dds/DCPS/transport/framework/TransportReactorTask.h"
00018 #include "dds/DCPS/transport/framework/ReactorSynchStrategy.h"
00019
00020 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00021
00022 OpenDDS::DCPS::TcpSendStrategy::TcpSendStrategy(
00023 std::size_t id,
00024 TcpDataLink& link,
00025 TcpSynchResource* synch_resource,
00026 const TransportReactorTask_rch& task,
00027 Priority priority)
00028 : TransportSendStrategy(id, link.impl(),
00029 synch_resource, priority,
00030 make_rch<ReactorSynchStrategy>(this,task->get_reactor()))
00031 , link_(link)
00032 , reactor_task_(task)
00033 {
00034 DBG_ENTRY_LVL("TcpSendStrategy","TcpSendStrategy",6);
00035
00036 }
00037
00038 OpenDDS::DCPS::TcpSendStrategy::~TcpSendStrategy()
00039 {
00040 DBG_ENTRY_LVL("TcpSendStrategy","~TcpSendStrategy",6);
00041 }
00042
00043 void
00044 OpenDDS::DCPS::TcpSendStrategy::schedule_output()
00045 {
00046 DBG_ENTRY_LVL("TcpSendStrategy","schedule_output",6);
00047
00048
00049 synch()->work_available();
00050
00051 if (DCPS_debug_level > 4) {
00052 const char* action = "";
00053 if( mode() == MODE_DIRECT) {
00054 action = "canceling";
00055 } else if( (mode() == MODE_QUEUE)
00056 || (mode() == MODE_SUSPEND)) {
00057 action = "starting";
00058 }
00059 ACE_DEBUG((LM_DEBUG,
00060 ACE_TEXT("(%P|%t) TcpSendStrategy::schedule_output() [%d] - ")
00061 ACE_TEXT("%C data queueing for handle %d.\n"),
00062 id(),action,get_handle()));
00063 }
00064 }
00065
00066 int
00067 OpenDDS::DCPS::TcpSendStrategy::reset(bool reset_mode)
00068 {
00069 DBG_ENTRY_LVL("TcpSendStrategy","reset",6);
00070
00071
00072 if (reset_mode) {
00073
00074
00075 this->clear(MODE_DIRECT);
00076
00077 this->set_graceful_disconnecting(false);
00078 }
00079 return 0;
00080 }
00081
00082 ssize_t
00083 OpenDDS::DCPS::TcpSendStrategy::send_bytes(const iovec iov[], int n, int& bp)
00084 {
00085 DBG_ENTRY_LVL("TcpSendStrategy","send_bytes",6);
00086 return this->non_blocking_send(iov, n, bp);
00087 }
00088
00089 ACE_HANDLE
00090 OpenDDS::DCPS::TcpSendStrategy::get_handle()
00091 {
00092 TcpConnection_rch connection = link_.get_connection();
00093
00094 if (!connection)
00095 return ACE_INVALID_HANDLE;
00096
00097 return connection->peer().get_handle();
00098 }
00099
00100 ssize_t
00101 OpenDDS::DCPS::TcpSendStrategy::send_bytes_i(const iovec iov[], int n)
00102 {
00103 TcpConnection_rch connection = link_.get_connection();
00104
00105 if (!connection)
00106 return -1;
00107 ssize_t result = connection->peer().sendv(iov, n);
00108 if (DCPS_debug_level > 4)
00109 ACE_DEBUG((LM_DEBUG, "(%P|%t) TcpSendStrategy::send_bytes_i sent %d bytes \n", result));
00110
00111 return result;
00112 }
00113
00114 void
00115 OpenDDS::DCPS::TcpSendStrategy::relink(bool do_suspend)
00116 {
00117 DBG_ENTRY_LVL("TcpSendStrategy","relink",6);
00118 TcpConnection_rch connection = link_.get_connection();
00119
00120 if (connection) {
00121 connection->relink_from_send(do_suspend);
00122 }
00123 }
00124
00125 void
00126 OpenDDS::DCPS::TcpSendStrategy::stop_i()
00127 {
00128 DBG_ENTRY_LVL("TcpSendStrategy","stop_i",6);
00129 }
00130
00131 void
00132 OpenDDS::DCPS::TcpSendStrategy::add_delayed_notification(TransportQueueElement* element)
00133 {
00134 if (!element->is_request_ack()) {
00135
00136 TransportSendStrategy::add_delayed_notification(element);
00137 }
00138 }
00139
00140 OPENDDS_END_VERSIONED_NAMESPACE_DECL