TcpDataLink.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "Tcp_pch.h"
00009 #include "TcpDataLink.h"
00010 #include "TcpReceiveStrategy.h"
00011 #include "TcpInst.h"
00012 #include "TcpSendStrategy.h"
00013 #include "dds/DCPS/transport/framework/TransportControlElement.h"
00014 #include "dds/DCPS/transport/framework/EntryExit.h"
00015 #include "dds/DCPS/DataSampleHeader.h"
00016 #include "dds/DCPS/GuidConverter.h"
00017 #include "ace/Log_Msg.h"
00018 
00019 #if !defined (__ACE_INLINE__)
00020 #include "TcpDataLink.inl"
00021 #endif /* __ACE_INLINE__ */
00022 
00023 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00024 
00025 OpenDDS::DCPS::TcpDataLink::TcpDataLink(
00026   const ACE_INET_Addr& remote_address,
00027   OpenDDS::DCPS::TcpTransport&  transport_impl,
00028   Priority priority,
00029   bool        is_loopback,
00030   bool        is_active)
00031   : DataLink(transport_impl, priority, is_loopback, is_active),
00032     remote_address_(remote_address),
00033     graceful_disconnect_sent_(false),
00034     release_is_pending_(false)
00035 {
00036   DBG_ENTRY_LVL("TcpDataLink","TcpDataLink",6);
00037 }
00038 
00039 OpenDDS::DCPS::TcpDataLink::~TcpDataLink()
00040 {
00041   DBG_ENTRY_LVL("TcpDataLink","~TcpDataLink",6);
00042 }
00043 
00044 /// Called when the DataLink has been "stopped" for some reason.  It could
00045 /// be called from the DataLink::transport_shutdown() method (when the
00046 /// TransportImpl is handling a shutdown() call).  Or, it could be called
00047 /// from the DataLink::release_reservations() method, when it discovers that
00048 /// it has just released the last remaining reservations from the DataLink,
00049 /// and the DataLink is in the process of "releasing" itself.
00050 void
00051 OpenDDS::DCPS::TcpDataLink::stop_i()
00052 {
00053   DBG_ENTRY_LVL("TcpDataLink","stop_i",6);
00054 
00055   TcpConnection_rch connection(this->connection_.lock());
00056   if (connection) {
00057     // Tell the connection object to disconnect.
00058     connection->disconnect();
00059   }
00060 }
00061 
00062 void
00063 OpenDDS::DCPS::TcpDataLink::pre_stop_i()
00064 {
00065   DBG_ENTRY_LVL("TcpDataLink","pre_stop_i",6);
00066 
00067   DataLink::pre_stop_i();
00068 
00069   TcpReceiveStrategy_rch rs = this->receive_strategy();
00070 
00071   TcpConnection_rch connection(this->connection_.lock());
00072 
00073   if (rs) {
00074     // If we received the GRACEFUL_DISCONNECT message from peer before we
00075     // initiate the disconnecting of the datalink, then we will not send
00076     // the GRACEFUL_DISCONNECT message to the peer.
00077     bool disconnected = rs->gracefully_disconnected();
00078 
00079     if (connection && !this->graceful_disconnect_sent_
00080         && !disconnected && !this->impl().is_shut_down()) {
00081       this->send_graceful_disconnect_message();
00082       this->graceful_disconnect_sent_ = true;
00083     }
00084   }
00085 
00086   if (connection) {
00087     connection->shutdown();
00088   }
00089 }
00090 
00091 /// The TcpTransport calls this method when it has an established
00092 /// connection object for us.  This call puts this TcpDataLink into
00093 /// the "connected" state.
00094 int
00095 OpenDDS::DCPS::TcpDataLink::connect(
00096   const TcpConnection_rch& connection,
00097   const RcHandle<TcpSendStrategy>& send_strategy,
00098   const RcHandle<TcpReceiveStrategy>& receive_strategy)
00099 {
00100   DBG_ENTRY_LVL("TcpDataLink","connect",6);
00101 
00102   this->connection_ = connection;
00103 
00104   if (connection->peer().enable(ACE_NONBLOCK) == -1) {
00105     ACE_ERROR_RETURN((LM_ERROR,
00106                       "(%P|%t) ERROR: TcpDataLink::connect failed to set "
00107                       "ACE_NONBLOCK %p\n", ACE_TEXT("enable")), -1);
00108   }
00109 
00110   // Let connection know the datalink for callbacks upon reconnect failure.
00111   connection->set_datalink(rchandle_from(this));
00112 
00113   // And lastly, inform our base class (DataLink) that we are now "connected",
00114   // and it should start the strategy objects.
00115   if (this->start(send_strategy, receive_strategy) != 0) {
00116     // Our base (DataLink) class failed to start the strategy objects.
00117     // We need to "undo" some things here before we return -1 to indicate
00118     // that an error has taken place.
00119 
00120     // Drop our reference to the connection object.
00121     this->connection_.reset();
00122 
00123     return -1;
00124   }
00125 
00126   return 0;
00127 }
00128 
00129 //Allows the passive side to detect that the active side is connecting again
00130 //prior to discovery identifying the released datalink from the active side.
00131 //The passive side still believes it has a connection to the remote, however,
00132 //the connect has created a new link/connection, thus the passive side can try
00133 //to reuse the existing structures but reset it to associate the datalink with
00134 //this new connection.
00135 int
00136 OpenDDS::DCPS::TcpDataLink::reuse_existing_connection(const TcpConnection_rch& connection)
00137 {
00138   DBG_ENTRY_LVL("TcpDataLink","reuse_existing_connection",6);
00139 
00140   if (this->is_active_) {
00141     return -1;
00142   }
00143   //Need to check if connection is nil.  If connection is not nil, then connection
00144   //has previously gone through connection phase so this is a reuse of the connection
00145   //proceed to determine if we can reuse/reset existing mechanisms or need to start from
00146   //scratch.
00147 
00148   TcpConnection_rch old_connection(this->connection_.lock());
00149 
00150   if (old_connection) {
00151     VDBG_LVL((LM_DEBUG, "(%P|%t) TcpDataLink::reuse_existing_connection - "
00152                            "trying to reuse existing connection\n"), 0);
00153     old_connection->transfer(connection.in());
00154 
00155     //Connection already exists.
00156     TransportStrategy_rch brs;
00157     TransportSendStrategy_rch bss;
00158 
00159     if (this->receive_strategy_.is_nil() && this->send_strategy_.is_nil()) {
00160       return -1;
00161     } else {
00162       brs = this->receive_strategy_;
00163       bss = this->send_strategy_;
00164 
00165       this->connection_ = connection;
00166 
00167       TcpReceiveStrategy* rs = static_cast<TcpReceiveStrategy*>(brs.in());
00168 
00169       TcpSendStrategy* ss = static_cast<TcpSendStrategy*>(bss.in());
00170 
00171       // Associate the new connection object with the receiving strategy and disassociate
00172       // the old connection object with the receiving strategy.
00173       int rs_result = rs->reset(0, connection.in());
00174 
00175       // Associate the new connection object with the sending strategy and disassociate
00176       // the old connection object with the sending strategy.
00177       int ss_result = ss->reset(true);
00178 
00179       if (rs_result == 0 && ss_result == 0) {
00180         return 0;
00181       }
00182     }
00183   }
00184   return -1;
00185 }
00186 
00187 /// Associate the new connection object with this datalink object.
00188 /// The states of the "old" connection object are copied to the new
00189 /// connection object and the "old" connection object is replaced by
00190 /// the new connection object.
00191 int
00192 OpenDDS::DCPS::TcpDataLink::reconnect(const TcpConnection_rch& connection)
00193 {
00194   DBG_ENTRY_LVL("TcpDataLink","reconnect",6);
00195 
00196   TcpConnection_rch existing_connection(this->connection_.lock());
00197   // Sanity check - the connection should exist already since we are reconnecting.
00198   if (!existing_connection) {
00199     VDBG_LVL((LM_ERROR,
00200               "(%P|%t) ERROR: TcpDataLink::reconnect old connection is nil.\n")
00201              , 1);
00202     return -1;
00203   }
00204 
00205   existing_connection->transfer(connection.in());
00206 
00207   bool released = false;
00208   TransportStrategy_rch brs;
00209   TransportSendStrategy_rch bss;
00210 
00211   {
00212     GuardType guard2(this->strategy_lock_);
00213 
00214     if (this->receive_strategy_.is_nil() && this->send_strategy_.is_nil()) {
00215       released = true;
00216 
00217     } else {
00218       brs = this->receive_strategy_;
00219       bss = this->send_strategy_;
00220     }
00221   }
00222 
00223   if (released) {
00224     return static_cast<TcpTransport&>(impl()).connect_tcp_datalink(*this, connection);
00225   }
00226 
00227   this->connection_ = connection;
00228 
00229   TcpReceiveStrategy* rs = static_cast<TcpReceiveStrategy*>(brs.in());
00230 
00231   TcpSendStrategy* ss = static_cast<TcpSendStrategy*>(bss.in());
00232 
00233   // Associate the new connection object with the receiveing strategy and disassociate
00234   // the old connection object with the receiveing strategy.
00235   int rs_result = rs->reset(existing_connection.in(), connection.in());
00236 
00237   // Associate the new connection object with the sending strategy and disassociate
00238   // the old connection object with the sending strategy.
00239   int ss_result = ss->reset();
00240 
00241   if (rs_result == 0 && ss_result == 0) {
00242     return 0;
00243   }
00244 
00245   return -1;
00246 }
00247 
00248 void
00249 OpenDDS::DCPS::TcpDataLink::send_graceful_disconnect_message()
00250 {
00251   DBG_ENTRY_LVL("TcpDataLink","send_graceful_disconnect_message",6);
00252 
00253   // Will clear all queued messages but still let the disconnect message
00254   // sent.
00255   this->send_strategy_->terminate_send(true);
00256 
00257   DataSampleHeader header_data;
00258   // The message_id_ is the most important value for the DataSampleHeader.
00259   header_data.message_id_ = GRACEFUL_DISCONNECT;
00260 
00261   // Other data in the DataSampleHeader are not necessary set. The bogus values
00262   // can be used.
00263 
00264   //header_data.byte_order_
00265   //  = this->transport_->config()->swap_bytes() ? !TAO_ENCAP_BYTE_ORDER : TAO_ENCAP_BYTE_ORDER;
00266   //header_data.message_length_ = 0;
00267   //header_data.sequence_ = 0;
00268   //DDS::Time_t source_timestamp
00269   //  = OpenDDS::DCPS::time_value_to_time (ACE_OS::gettimeofday ());
00270   //header_data.source_timestamp_sec_ = source_timestamp.sec;
00271   //header_data.source_timestamp_nanosec_ = source_timestamp.nanosec;
00272   //header_data.coherency_group_ = 0;
00273   //header_data.publication_id_ = 0;
00274 
00275   // TODO:
00276   // It seems a bug in the transport implementation that the receiving side can
00277   // not receive the message when the message has no sample data and is sent
00278   // in a single packet.
00279 
00280   // To work arround this problem, I have to add bogus data to chain with the
00281   // DataSampleHeader to make the receiving work.
00282   size_t max_marshaled_size = header_data.max_marshaled_size();
00283 
00284   Message_Block_Ptr data(
00285     new ACE_Message_Block(20,
00286                           ACE_Message_Block::MB_DATA,
00287                           0, //cont
00288                           0, //data
00289                           0, //allocator_strategy
00290                           0, //locking_strategy
00291                           ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
00292                           ACE_Time_Value::zero,
00293                           ACE_Time_Value::max_time,
00294                           0,
00295                           0));
00296   data->wr_ptr(20);
00297 
00298   header_data.message_length_ = static_cast<ACE_UINT32>(data->length());
00299 
00300   Message_Block_Ptr message(
00301     new ACE_Message_Block(max_marshaled_size,
00302                           ACE_Message_Block::MB_DATA,
00303                           data.release(), //cont
00304                           0, //data
00305                           0, //allocator_strategy
00306                           0, //locking_strategy
00307                           ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
00308                           ACE_Time_Value::zero,
00309                           ACE_Time_Value::max_time,
00310                           0,
00311                           0));
00312 
00313   *message << header_data;
00314 
00315   TransportControlElement* send_element = new TransportControlElement(move(message));
00316 
00317   // I don't want to rebuild a connection in order to send
00318   // a graceful disconnect message.
00319   this->send_i(send_element, false);
00320 }
00321 
00322 void
00323 OpenDDS::DCPS::TcpDataLink::set_release_pending(bool flag)
00324 {
00325   this->release_is_pending_ = flag;
00326 }
00327 
00328 bool
00329 OpenDDS::DCPS::TcpDataLink::is_release_pending() const
00330 {
00331   return this->release_is_pending_.value();
00332 }
00333 
00334 bool
00335 OpenDDS::DCPS::TcpDataLink::handle_send_request_ack(TransportQueueElement* element)
00336 {
00337   if (Transport_debug_level >= 1) {
00338     const GuidConverter converter(element->publication_id());
00339     ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) TcpDataLink::handle_send_request_ack(%@) sequence number %q, publication_id=%C\n"),
00340       element, element->sequence().getValue(), OPENDDS_STRING(converter).c_str()));
00341   }
00342 
00343   ACE_Guard<ACE_SYNCH_MUTEX> guard(pending_request_acks_lock_);
00344   pending_request_acks_.push_back(element);
00345   return false;
00346 }
00347 
00348 
00349 void
00350 OpenDDS::DCPS::TcpDataLink::ack_received(const ReceivedDataSample& sample)
00351 {
00352   SequenceNumber sequence = sample.header_.sequence_;
00353 
00354   if (Transport_debug_level >= 1) {
00355     const GuidConverter converter(sample.header_.publication_id_);
00356     ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) TcpDataLink::ack_received() received sequence number %q, publiction_id=%C\n"),
00357       sequence.getValue(), OPENDDS_STRING(converter).c_str()));
00358   }
00359 
00360   TransportQueueElement* elem=0;
00361   {
00362     // find the pending request with the same sequence number.
00363     ACE_Guard<ACE_SYNCH_MUTEX> guard(pending_request_acks_lock_);
00364     PendingRequestAcks::iterator it;
00365     for (it = pending_request_acks_.begin(); it != pending_request_acks_.end(); ++it){
00366       if ((*it)->sequence() == sequence && (*it)->publication_id() == sample.header_.publication_id_) {
00367         elem = *it;
00368         pending_request_acks_.erase(it);
00369         break;
00370       }
00371     }
00372   }
00373 
00374   if (elem) {
00375     if (Transport_debug_level >= 1) {
00376       ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) TcpDataLink::ack_received() found matching element %@\n"),
00377         elem));
00378     }
00379     this->send_strategy_->deliver_ack_request(elem);
00380   }
00381   else {
00382     ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) TcpDataLink::ack_received() received unknown sequence number %q\n"),
00383       sequence.getValue()));
00384   }
00385 }
00386 
00387 void
00388 OpenDDS::DCPS::TcpDataLink::request_ack_received(const ReceivedDataSample& sample)
00389 {
00390   DataSampleHeader header_data;
00391   // The message_id_ is the most important value for the DataSampleHeader.
00392   header_data.message_id_ = SAMPLE_ACK;
00393 
00394   // Other data in the DataSampleHeader are not necessary set. The bogus values
00395   // can be used.
00396 
00397   header_data.byte_order_  = ACE_CDR_BYTE_ORDER;
00398   header_data.message_length_ = 0;
00399   header_data.sequence_ = sample.header_.sequence_;
00400   header_data.publication_id_ = sample.header_.publication_id_;
00401   header_data.publisher_id_ = sample.header_.publisher_id_;
00402 
00403   size_t max_marshaled_size = header_data.max_marshaled_size();
00404 
00405   Message_Block_Ptr message(
00406     new ACE_Message_Block(max_marshaled_size,
00407                           ACE_Message_Block::MB_DATA,
00408                           0, //cont
00409                           0, //data
00410                           0, //allocator_strategy
00411                           0, //locking_strategy
00412                           ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
00413                           ACE_Time_Value::zero,
00414                           ACE_Time_Value::max_time,
00415                           0,
00416                           0));
00417 
00418   *message << header_data;
00419 
00420   TransportControlElement* send_element =  new TransportControlElement(move(message));
00421 
00422 
00423   // I don't want to rebuild a connection in order to send
00424   // a sample ack message
00425   this->send_i(send_element, false);
00426 }
00427 
00428 void
00429 OpenDDS::DCPS::TcpDataLink::drop_pending_request_acks()
00430 {
00431   ACE_Guard<ACE_SYNCH_MUTEX> guard(pending_request_acks_lock_);
00432   PendingRequestAcks::iterator it;
00433   for (it = pending_request_acks_.begin(); it != pending_request_acks_.end(); ++it){
00434     (*it)->data_dropped(true);
00435   }
00436   pending_request_acks_.clear();
00437 }
00438 
00439 OpenDDS::DCPS::TcpSendStrategy_rch
00440 OpenDDS::DCPS::TcpDataLink::send_strategy()
00441 {
00442   return static_rchandle_cast<OpenDDS::DCPS::TcpSendStrategy>(send_strategy_);
00443 }
00444 
00445 OpenDDS::DCPS::TcpReceiveStrategy_rch
00446 OpenDDS::DCPS::TcpDataLink::receive_strategy()
00447 {
00448   return static_rchandle_cast<OpenDDS::DCPS::TcpReceiveStrategy>(receive_strategy_);
00449 }
00450 
00451 
00452 OPENDDS_END_VERSIONED_NAMESPACE_DECL
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1