00001
00002
00003
00004
00005
00006
00007
00008 #include "Tcp_pch.h"
00009 #include "TcpDataLink.h"
00010 #include "TcpReceiveStrategy.h"
00011 #include "TcpInst.h"
00012 #include "TcpSendStrategy.h"
00013 #include "dds/DCPS/transport/framework/TransportControlElement.h"
00014 #include "dds/DCPS/transport/framework/EntryExit.h"
00015 #include "dds/DCPS/DataSampleHeader.h"
00016 #include "ace/Log_Msg.h"
00017
00018 #if !defined (__ACE_INLINE__)
00019 #include "TcpDataLink.inl"
00020 #endif
00021
00022 OpenDDS::DCPS::TcpDataLink::TcpDataLink(
00023 const ACE_INET_Addr& remote_address,
00024 OpenDDS::DCPS::TcpTransport* transport_impl,
00025 Priority priority,
00026 bool is_loopback,
00027 bool is_active)
00028 : DataLink(transport_impl, priority, is_loopback, is_active),
00029 remote_address_(remote_address),
00030 graceful_disconnect_sent_(false),
00031 release_is_pending_(false)
00032 {
00033 DBG_ENTRY_LVL("TcpDataLink","TcpDataLink",6);
00034 transport_impl->_add_ref();
00035 this->transport_ = transport_impl;
00036 }
00037
00038 OpenDDS::DCPS::TcpDataLink::~TcpDataLink()
00039 {
00040 DBG_ENTRY_LVL("TcpDataLink","~TcpDataLink",6);
00041 }
00042
00043
00044
00045
00046
00047
00048
00049 void
00050 OpenDDS::DCPS::TcpDataLink::stop_i()
00051 {
00052 DBG_ENTRY_LVL("TcpDataLink","stop_i",6);
00053
00054 if (!this->connection_.is_nil()) {
00055
00056 this->connection_->disconnect();
00057
00058
00059 this->connection_ = 0;
00060 }
00061 }
00062
00063 void
00064 OpenDDS::DCPS::TcpDataLink::pre_stop_i()
00065 {
00066 DBG_ENTRY_LVL("TcpDataLink","pre_stop_i",6);
00067
00068 DataLink::pre_stop_i();
00069
00070 TcpReceiveStrategy * rs
00071 = dynamic_cast <TcpReceiveStrategy*>(this->receive_strategy_.in());
00072
00073 if (rs != NULL) {
00074
00075
00076
00077 bool disconnected = rs->gracefully_disconnected();
00078
00079 if (!this->connection_.is_nil() && !this->graceful_disconnect_sent_
00080 && !disconnected) {
00081 this->send_graceful_disconnect_message();
00082 this->graceful_disconnect_sent_ = true;
00083 }
00084 }
00085
00086 if (!this->connection_.is_nil()) {
00087 this->connection_->shutdown();
00088 }
00089 }
00090
00091
00092
00093
00094 int
00095 OpenDDS::DCPS::TcpDataLink::connect(
00096 const TcpConnection_rch& connection,
00097 const TransportSendStrategy_rch& send_strategy,
00098 const TransportStrategy_rch& receive_strategy)
00099 {
00100 DBG_ENTRY_LVL("TcpDataLink","connect",6);
00101
00102
00103 if (!this->connection_.is_nil()) {
00104 ACE_ERROR_RETURN((LM_ERROR,
00105 "(%P|%t) ERROR: TcpDataLink already connected.\n"),
00106 -1);
00107 }
00108
00109 this->connection_ = connection;
00110
00111 if (this->connection_->peer().enable(ACE_NONBLOCK) == -1) {
00112 ACE_ERROR_RETURN((LM_ERROR,
00113 "(%P|%t) ERROR: TcpDataLink::connect failed to set "
00114 "ACE_NONBLOCK %p\n", ACE_TEXT("enable")), -1);
00115 }
00116
00117
00118 this->connection_->set_datalink(this);
00119
00120
00121
00122 if (this->start(send_strategy, receive_strategy) != 0) {
00123
00124
00125
00126
00127
00128 this->connection_ = 0;
00129
00130 return -1;
00131 }
00132
00133 return 0;
00134 }
00135
00136
00137
00138
00139
00140
00141
00142 int
00143 OpenDDS::DCPS::TcpDataLink::reuse_existing_connection(const TcpConnection_rch& connection)
00144 {
00145 DBG_ENTRY_LVL("TcpDataLink","reuse_existing_connection",6);
00146
00147 if (this->is_active_) {
00148 return -1;
00149 }
00150
00151
00152
00153
00154 if (!this->connection_.is_nil()) {
00155 VDBG_LVL((LM_DEBUG, "(%P|%t) TcpDataLink::reuse_existing_connection - "
00156 "trying to reuse existing connection\n"), 0);
00157 this->connection_->transfer(connection.in());
00158
00159
00160 TransportStrategy_rch brs;
00161 TransportSendStrategy_rch bss;
00162
00163 if (this->receive_strategy_.is_nil() && this->send_strategy_.is_nil()) {
00164 this->connection_ = 0;
00165 return -1;
00166 } else {
00167 brs = this->receive_strategy_;
00168 bss = this->send_strategy_;
00169
00170 this->connection_ = connection;
00171
00172 TcpReceiveStrategy* rs = static_cast<TcpReceiveStrategy*>(brs.in());
00173
00174 TcpSendStrategy* ss = static_cast<TcpSendStrategy*>(bss.in());
00175
00176
00177
00178 int rs_result = rs->reset(this->connection_.in());
00179
00180
00181
00182 int ss_result = ss->reset(this->connection_.in(), true);
00183
00184 if (rs_result == 0 && ss_result == 0) {
00185 return 0;
00186 }
00187 }
00188 }
00189 return -1;
00190 }
00191
00192
00193
00194
00195
00196 int
00197 OpenDDS::DCPS::TcpDataLink::reconnect(TcpConnection* connection)
00198 {
00199 DBG_ENTRY_LVL("TcpDataLink","reconnect",6);
00200
00201
00202 if (this->connection_.is_nil()) {
00203 VDBG_LVL((LM_ERROR,
00204 "(%P|%t) ERROR: TcpDataLink::reconnect old connection is nil.\n")
00205 , 1);
00206 return -1;
00207 }
00208
00209 this->connection_->transfer(connection);
00210
00211 bool released = false;
00212 TransportStrategy_rch brs;
00213 TransportSendStrategy_rch bss;
00214
00215 {
00216 GuardType guard2(this->strategy_lock_);
00217
00218 if (this->receive_strategy_.is_nil() && this->send_strategy_.is_nil()) {
00219 released = true;
00220 this->connection_ = 0;
00221
00222 } else {
00223 brs = this->receive_strategy_;
00224 bss = this->send_strategy_;
00225 }
00226 }
00227
00228 TcpConnection_rch conn_rch(connection, false);
00229
00230 if (released) {
00231 TcpDataLink_rch this_rch(this, false);
00232 return this->transport_->connect_tcp_datalink(this_rch, conn_rch);
00233 }
00234
00235 this->connection_ = conn_rch._retn();
00236
00237 TcpReceiveStrategy* rs = static_cast<TcpReceiveStrategy*>(brs.in());
00238
00239 TcpSendStrategy* ss = static_cast<TcpSendStrategy*>(bss.in());
00240
00241
00242
00243 int rs_result = rs->reset(this->connection_.in());
00244
00245
00246
00247 int ss_result = ss->reset(this->connection_.in());
00248
00249 if (rs_result == 0 && ss_result == 0) {
00250 return 0;
00251 }
00252
00253 return -1;
00254 }
00255
00256 void
00257 OpenDDS::DCPS::TcpDataLink::send_graceful_disconnect_message()
00258 {
00259 DBG_ENTRY_LVL("TcpDataLink","send_graceful_disconnect_message",6);
00260
00261
00262
00263 this->send_strategy_->terminate_send(true);
00264
00265 DataSampleHeader header_data;
00266
00267 header_data.message_id_ = GRACEFUL_DISCONNECT;
00268
00269
00270
00271
00272
00273
00274
00275
00276
00277
00278
00279
00280
00281
00282
00283
00284
00285
00286
00287
00288
00289
00290 ACE_Message_Block* message;
00291 size_t max_marshaled_size = header_data.max_marshaled_size();
00292 ACE_Message_Block* data = 0;
00293 ACE_NEW(data,
00294 ACE_Message_Block(20,
00295 ACE_Message_Block::MB_DATA,
00296 0,
00297 0,
00298 0,
00299 0,
00300 ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
00301 ACE_Time_Value::zero,
00302 ACE_Time_Value::max_time,
00303 0,
00304 0));
00305 data->wr_ptr(20);
00306
00307 header_data.message_length_ = static_cast<ACE_UINT32>(data->length());
00308
00309 ACE_NEW(message,
00310 ACE_Message_Block(max_marshaled_size,
00311 ACE_Message_Block::MB_DATA,
00312 data,
00313 0,
00314 0,
00315 0,
00316 ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
00317 ACE_Time_Value::zero,
00318 ACE_Time_Value::max_time,
00319 0,
00320 0));
00321
00322 *message << header_data;
00323
00324 TransportControlElement* send_element = 0;
00325
00326 ACE_NEW(send_element, TransportControlElement(message));
00327
00328
00329 message->release();
00330
00331
00332
00333 this->send_i(send_element, false);
00334 }
00335
00336 void OpenDDS::DCPS::TcpDataLink::set_release_pending(bool flag)
00337 {
00338 this->release_is_pending_ = flag;
00339 }
00340
00341 bool OpenDDS::DCPS::TcpDataLink::is_release_pending() const
00342 {
00343 return this->release_is_pending_.value();
00344 }