00001
00002
00003
00004
00005
00006
00007
00008 #include "Tcp_pch.h"
00009 #include "TcpDataLink.h"
00010 #include "TcpReceiveStrategy.h"
00011 #include "TcpInst.h"
00012 #include "TcpSendStrategy.h"
00013 #include "dds/DCPS/transport/framework/TransportControlElement.h"
00014 #include "dds/DCPS/transport/framework/EntryExit.h"
00015 #include "dds/DCPS/DataSampleHeader.h"
00016 #include "dds/DCPS/GuidConverter.h"
00017 #include "ace/Log_Msg.h"
00018
00019 #if !defined (__ACE_INLINE__)
00020 #include "TcpDataLink.inl"
00021 #endif
00022
00023 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00024
00025 OpenDDS::DCPS::TcpDataLink::TcpDataLink(
00026 const ACE_INET_Addr& remote_address,
00027 OpenDDS::DCPS::TcpTransport& transport_impl,
00028 Priority priority,
00029 bool is_loopback,
00030 bool is_active)
00031 : DataLink(transport_impl, priority, is_loopback, is_active),
00032 remote_address_(remote_address),
00033 graceful_disconnect_sent_(false),
00034 release_is_pending_(false)
00035 {
00036 DBG_ENTRY_LVL("TcpDataLink","TcpDataLink",6);
00037 }
00038
00039 OpenDDS::DCPS::TcpDataLink::~TcpDataLink()
00040 {
00041 DBG_ENTRY_LVL("TcpDataLink","~TcpDataLink",6);
00042 }
00043
00044
00045
00046
00047
00048
00049
00050 void
00051 OpenDDS::DCPS::TcpDataLink::stop_i()
00052 {
00053 DBG_ENTRY_LVL("TcpDataLink","stop_i",6);
00054
00055 TcpConnection_rch connection(this->connection_.lock());
00056 if (connection) {
00057
00058 connection->disconnect();
00059 }
00060 }
00061
00062 void
00063 OpenDDS::DCPS::TcpDataLink::pre_stop_i()
00064 {
00065 DBG_ENTRY_LVL("TcpDataLink","pre_stop_i",6);
00066
00067 DataLink::pre_stop_i();
00068
00069 TcpReceiveStrategy_rch rs = this->receive_strategy();
00070
00071 TcpConnection_rch connection(this->connection_.lock());
00072
00073 if (rs) {
00074
00075
00076
00077 bool disconnected = rs->gracefully_disconnected();
00078
00079 if (connection && !this->graceful_disconnect_sent_
00080 && !disconnected && !this->impl().is_shut_down()) {
00081 this->send_graceful_disconnect_message();
00082 this->graceful_disconnect_sent_ = true;
00083 }
00084 }
00085
00086 if (connection) {
00087 connection->shutdown();
00088 }
00089 }
00090
00091
00092
00093
00094 int
00095 OpenDDS::DCPS::TcpDataLink::connect(
00096 const TcpConnection_rch& connection,
00097 const RcHandle<TcpSendStrategy>& send_strategy,
00098 const RcHandle<TcpReceiveStrategy>& receive_strategy)
00099 {
00100 DBG_ENTRY_LVL("TcpDataLink","connect",6);
00101
00102 this->connection_ = connection;
00103
00104 if (connection->peer().enable(ACE_NONBLOCK) == -1) {
00105 ACE_ERROR_RETURN((LM_ERROR,
00106 "(%P|%t) ERROR: TcpDataLink::connect failed to set "
00107 "ACE_NONBLOCK %p\n", ACE_TEXT("enable")), -1);
00108 }
00109
00110
00111 connection->set_datalink(rchandle_from(this));
00112
00113
00114
00115 if (this->start(send_strategy, receive_strategy) != 0) {
00116
00117
00118
00119
00120
00121 this->connection_.reset();
00122
00123 return -1;
00124 }
00125
00126 return 0;
00127 }
00128
00129
00130
00131
00132
00133
00134
00135 int
00136 OpenDDS::DCPS::TcpDataLink::reuse_existing_connection(const TcpConnection_rch& connection)
00137 {
00138 DBG_ENTRY_LVL("TcpDataLink","reuse_existing_connection",6);
00139
00140 if (this->is_active_) {
00141 return -1;
00142 }
00143
00144
00145
00146
00147
00148 TcpConnection_rch old_connection(this->connection_.lock());
00149
00150 if (old_connection) {
00151 VDBG_LVL((LM_DEBUG, "(%P|%t) TcpDataLink::reuse_existing_connection - "
00152 "trying to reuse existing connection\n"), 0);
00153 old_connection->transfer(connection.in());
00154
00155
00156 TransportStrategy_rch brs;
00157 TransportSendStrategy_rch bss;
00158
00159 if (this->receive_strategy_.is_nil() && this->send_strategy_.is_nil()) {
00160 return -1;
00161 } else {
00162 brs = this->receive_strategy_;
00163 bss = this->send_strategy_;
00164
00165 this->connection_ = connection;
00166
00167 TcpReceiveStrategy* rs = static_cast<TcpReceiveStrategy*>(brs.in());
00168
00169 TcpSendStrategy* ss = static_cast<TcpSendStrategy*>(bss.in());
00170
00171
00172
00173 int rs_result = rs->reset(0, connection.in());
00174
00175
00176
00177 int ss_result = ss->reset(true);
00178
00179 if (rs_result == 0 && ss_result == 0) {
00180 return 0;
00181 }
00182 }
00183 }
00184 return -1;
00185 }
00186
00187
00188
00189
00190
00191 int
00192 OpenDDS::DCPS::TcpDataLink::reconnect(const TcpConnection_rch& connection)
00193 {
00194 DBG_ENTRY_LVL("TcpDataLink","reconnect",6);
00195
00196 TcpConnection_rch existing_connection(this->connection_.lock());
00197
00198 if (!existing_connection) {
00199 VDBG_LVL((LM_ERROR,
00200 "(%P|%t) ERROR: TcpDataLink::reconnect old connection is nil.\n")
00201 , 1);
00202 return -1;
00203 }
00204
00205 existing_connection->transfer(connection.in());
00206
00207 bool released = false;
00208 TransportStrategy_rch brs;
00209 TransportSendStrategy_rch bss;
00210
00211 {
00212 GuardType guard2(this->strategy_lock_);
00213
00214 if (this->receive_strategy_.is_nil() && this->send_strategy_.is_nil()) {
00215 released = true;
00216
00217 } else {
00218 brs = this->receive_strategy_;
00219 bss = this->send_strategy_;
00220 }
00221 }
00222
00223 if (released) {
00224 return static_cast<TcpTransport&>(impl()).connect_tcp_datalink(*this, connection);
00225 }
00226
00227 this->connection_ = connection;
00228
00229 TcpReceiveStrategy* rs = static_cast<TcpReceiveStrategy*>(brs.in());
00230
00231 TcpSendStrategy* ss = static_cast<TcpSendStrategy*>(bss.in());
00232
00233
00234
00235 int rs_result = rs->reset(existing_connection.in(), connection.in());
00236
00237
00238
00239 int ss_result = ss->reset();
00240
00241 if (rs_result == 0 && ss_result == 0) {
00242 return 0;
00243 }
00244
00245 return -1;
00246 }
00247
00248 void
00249 OpenDDS::DCPS::TcpDataLink::send_graceful_disconnect_message()
00250 {
00251 DBG_ENTRY_LVL("TcpDataLink","send_graceful_disconnect_message",6);
00252
00253
00254
00255 this->send_strategy_->terminate_send(true);
00256
00257 DataSampleHeader header_data;
00258
00259 header_data.message_id_ = GRACEFUL_DISCONNECT;
00260
00261
00262
00263
00264
00265
00266
00267
00268
00269
00270
00271
00272
00273
00274
00275
00276
00277
00278
00279
00280
00281
00282 size_t max_marshaled_size = header_data.max_marshaled_size();
00283
00284 Message_Block_Ptr data(
00285 new ACE_Message_Block(20,
00286 ACE_Message_Block::MB_DATA,
00287 0,
00288 0,
00289 0,
00290 0,
00291 ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
00292 ACE_Time_Value::zero,
00293 ACE_Time_Value::max_time,
00294 0,
00295 0));
00296 data->wr_ptr(20);
00297
00298 header_data.message_length_ = static_cast<ACE_UINT32>(data->length());
00299
00300 Message_Block_Ptr message(
00301 new ACE_Message_Block(max_marshaled_size,
00302 ACE_Message_Block::MB_DATA,
00303 data.release(),
00304 0,
00305 0,
00306 0,
00307 ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
00308 ACE_Time_Value::zero,
00309 ACE_Time_Value::max_time,
00310 0,
00311 0));
00312
00313 *message << header_data;
00314
00315 TransportControlElement* send_element = new TransportControlElement(move(message));
00316
00317
00318
00319 this->send_i(send_element, false);
00320 }
00321
00322 void
00323 OpenDDS::DCPS::TcpDataLink::set_release_pending(bool flag)
00324 {
00325 this->release_is_pending_ = flag;
00326 }
00327
00328 bool
00329 OpenDDS::DCPS::TcpDataLink::is_release_pending() const
00330 {
00331 return this->release_is_pending_.value();
00332 }
00333
00334 bool
00335 OpenDDS::DCPS::TcpDataLink::handle_send_request_ack(TransportQueueElement* element)
00336 {
00337 if (Transport_debug_level >= 1) {
00338 const GuidConverter converter(element->publication_id());
00339 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) TcpDataLink::handle_send_request_ack(%@) sequence number %q, publication_id=%C\n"),
00340 element, element->sequence().getValue(), OPENDDS_STRING(converter).c_str()));
00341 }
00342
00343 ACE_Guard<ACE_SYNCH_MUTEX> guard(pending_request_acks_lock_);
00344 pending_request_acks_.push_back(element);
00345 return false;
00346 }
00347
00348
00349 void
00350 OpenDDS::DCPS::TcpDataLink::ack_received(const ReceivedDataSample& sample)
00351 {
00352 SequenceNumber sequence = sample.header_.sequence_;
00353
00354 if (Transport_debug_level >= 1) {
00355 const GuidConverter converter(sample.header_.publication_id_);
00356 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) TcpDataLink::ack_received() received sequence number %q, publiction_id=%C\n"),
00357 sequence.getValue(), OPENDDS_STRING(converter).c_str()));
00358 }
00359
00360 TransportQueueElement* elem=0;
00361 {
00362
00363 ACE_Guard<ACE_SYNCH_MUTEX> guard(pending_request_acks_lock_);
00364 PendingRequestAcks::iterator it;
00365 for (it = pending_request_acks_.begin(); it != pending_request_acks_.end(); ++it){
00366 if ((*it)->sequence() == sequence && (*it)->publication_id() == sample.header_.publication_id_) {
00367 elem = *it;
00368 pending_request_acks_.erase(it);
00369 break;
00370 }
00371 }
00372 }
00373
00374 if (elem) {
00375 if (Transport_debug_level >= 1) {
00376 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) TcpDataLink::ack_received() found matching element %@\n"),
00377 elem));
00378 }
00379 this->send_strategy_->deliver_ack_request(elem);
00380 }
00381 else {
00382 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) TcpDataLink::ack_received() received unknown sequence number %q\n"),
00383 sequence.getValue()));
00384 }
00385 }
00386
00387 void
00388 OpenDDS::DCPS::TcpDataLink::request_ack_received(const ReceivedDataSample& sample)
00389 {
00390 DataSampleHeader header_data;
00391
00392 header_data.message_id_ = SAMPLE_ACK;
00393
00394
00395
00396
00397 header_data.byte_order_ = ACE_CDR_BYTE_ORDER;
00398 header_data.message_length_ = 0;
00399 header_data.sequence_ = sample.header_.sequence_;
00400 header_data.publication_id_ = sample.header_.publication_id_;
00401 header_data.publisher_id_ = sample.header_.publisher_id_;
00402
00403 size_t max_marshaled_size = header_data.max_marshaled_size();
00404
00405 Message_Block_Ptr message(
00406 new ACE_Message_Block(max_marshaled_size,
00407 ACE_Message_Block::MB_DATA,
00408 0,
00409 0,
00410 0,
00411 0,
00412 ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
00413 ACE_Time_Value::zero,
00414 ACE_Time_Value::max_time,
00415 0,
00416 0));
00417
00418 *message << header_data;
00419
00420 TransportControlElement* send_element = new TransportControlElement(move(message));
00421
00422
00423
00424
00425 this->send_i(send_element, false);
00426 }
00427
00428 void
00429 OpenDDS::DCPS::TcpDataLink::drop_pending_request_acks()
00430 {
00431 ACE_Guard<ACE_SYNCH_MUTEX> guard(pending_request_acks_lock_);
00432 PendingRequestAcks::iterator it;
00433 for (it = pending_request_acks_.begin(); it != pending_request_acks_.end(); ++it){
00434 (*it)->data_dropped(true);
00435 }
00436 pending_request_acks_.clear();
00437 }
00438
00439 OpenDDS::DCPS::TcpSendStrategy_rch
00440 OpenDDS::DCPS::TcpDataLink::send_strategy()
00441 {
00442 return static_rchandle_cast<OpenDDS::DCPS::TcpSendStrategy>(send_strategy_);
00443 }
00444
00445 OpenDDS::DCPS::TcpReceiveStrategy_rch
00446 OpenDDS::DCPS::TcpDataLink::receive_strategy()
00447 {
00448 return static_rchandle_cast<OpenDDS::DCPS::TcpReceiveStrategy>(receive_strategy_);
00449 }
00450
00451
00452 OPENDDS_END_VERSIONED_NAMESPACE_DECL