TcpDataLink.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "Tcp_pch.h"
00009 #include "TcpDataLink.h"
00010 #include "TcpReceiveStrategy.h"
00011 #include "TcpInst.h"
00012 #include "TcpSendStrategy.h"
00013 #include "dds/DCPS/transport/framework/TransportControlElement.h"
00014 #include "dds/DCPS/transport/framework/EntryExit.h"
00015 #include "dds/DCPS/DataSampleHeader.h"
00016 #include "ace/Log_Msg.h"
00017 
00018 #if !defined (__ACE_INLINE__)
00019 #include "TcpDataLink.inl"
00020 #endif /* __ACE_INLINE__ */
00021 
00022 OpenDDS::DCPS::TcpDataLink::TcpDataLink(
00023   const ACE_INET_Addr& remote_address,
00024   OpenDDS::DCPS::TcpTransport*  transport_impl,
00025   Priority priority,
00026   bool        is_loopback,
00027   bool        is_active)
00028   : DataLink(transport_impl, priority, is_loopback, is_active),
00029     remote_address_(remote_address),
00030     graceful_disconnect_sent_(false),
00031     release_is_pending_(false)
00032 {
00033   DBG_ENTRY_LVL("TcpDataLink","TcpDataLink",6);
00034   transport_impl->_add_ref();
00035   this->transport_ = transport_impl;
00036 }
00037 
00038 OpenDDS::DCPS::TcpDataLink::~TcpDataLink()
00039 {
00040   DBG_ENTRY_LVL("TcpDataLink","~TcpDataLink",6);
00041 }
00042 
00043 /// Called when the DataLink has been "stopped" for some reason.  It could
00044 /// be called from the DataLink::transport_shutdown() method (when the
00045 /// TransportImpl is handling a shutdown() call).  Or, it could be called
00046 /// from the DataLink::release_reservations() method, when it discovers that
00047 /// it has just released the last remaining reservations from the DataLink,
00048 /// and the DataLink is in the process of "releasing" itself.
00049 void
00050 OpenDDS::DCPS::TcpDataLink::stop_i()
00051 {
00052   DBG_ENTRY_LVL("TcpDataLink","stop_i",6);
00053 
00054   if (!this->connection_.is_nil()) {
00055     // Tell the connection object to disconnect.
00056     this->connection_->disconnect();
00057 
00058     // Drop our reference to the connection object.
00059     this->connection_ = 0;
00060   }
00061 }
00062 
00063 void
00064 OpenDDS::DCPS::TcpDataLink::pre_stop_i()
00065 {
00066   DBG_ENTRY_LVL("TcpDataLink","pre_stop_i",6);
00067 
00068   DataLink::pre_stop_i();
00069 
00070   TcpReceiveStrategy * rs
00071     = dynamic_cast <TcpReceiveStrategy*>(this->receive_strategy_.in());
00072 
00073   if (rs != NULL) {
00074     // If we received the GRACEFUL_DISCONNECT message from peer before we
00075     // initiate the disconnecting of the datalink, then we will not send
00076     // the GRACEFUL_DISCONNECT message to the peer.
00077     bool disconnected = rs->gracefully_disconnected();
00078 
00079     if (!this->connection_.is_nil() && !this->graceful_disconnect_sent_
00080         && !disconnected) {
00081       this->send_graceful_disconnect_message();
00082       this->graceful_disconnect_sent_ = true;
00083     }
00084   }
00085 
00086   if (!this->connection_.is_nil()) {
00087     this->connection_->shutdown();
00088   }
00089 }
00090 
00091 /// The TcpTransport calls this method when it has an established
00092 /// connection object for us.  This call puts this TcpDataLink into
00093 /// the "connected" state.
00094 int
00095 OpenDDS::DCPS::TcpDataLink::connect(
00096   const TcpConnection_rch& connection,
00097   const TransportSendStrategy_rch& send_strategy,
00098   const TransportStrategy_rch& receive_strategy)
00099 {
00100   DBG_ENTRY_LVL("TcpDataLink","connect",6);
00101 
00102   // Sanity check - cannot connect() if we are already connected.
00103   if (!this->connection_.is_nil()) {
00104     ACE_ERROR_RETURN((LM_ERROR,
00105                       "(%P|%t) ERROR: TcpDataLink already connected.\n"),
00106                      -1);
00107   }
00108 
00109   this->connection_ = connection;
00110 
00111   if (this->connection_->peer().enable(ACE_NONBLOCK) == -1) {
00112     ACE_ERROR_RETURN((LM_ERROR,
00113                       "(%P|%t) ERROR: TcpDataLink::connect failed to set "
00114                       "ACE_NONBLOCK %p\n", ACE_TEXT("enable")), -1);
00115   }
00116 
00117   // Let connection know the datalink for callbacks upon reconnect failure.
00118   this->connection_->set_datalink(this);
00119 
00120   // And lastly, inform our base class (DataLink) that we are now "connected",
00121   // and it should start the strategy objects.
00122   if (this->start(send_strategy, receive_strategy) != 0) {
00123     // Our base (DataLink) class failed to start the strategy objects.
00124     // We need to "undo" some things here before we return -1 to indicate
00125     // that an error has taken place.
00126 
00127     // Drop our reference to the connection object.
00128     this->connection_ = 0;
00129 
00130     return -1;
00131   }
00132 
00133   return 0;
00134 }
00135 
00136 //Allows the passive side to detect that the active side is connecting again
00137 //prior to discovery identifying the released datalink from the active side.
00138 //The passive side still believes it has a connection to the remote, however,
00139 //the connect has created a new link/connection, thus the passive side can try
00140 //to reuse the existing structures but reset it to associate the datalink with
00141 //this new connection.
00142 int
00143 OpenDDS::DCPS::TcpDataLink::reuse_existing_connection(const TcpConnection_rch& connection)
00144 {
00145   DBG_ENTRY_LVL("TcpDataLink","reuse_existing_connection",6);
00146 
00147   if (this->is_active_) {
00148     return -1;
00149   }
00150   //Need to check if connection is nil.  If connection is not nil, then connection
00151   //has previously gone through connection phase so this is a reuse of the connection
00152   //proceed to determine if we can reuse/reset existing mechanisms or need to start from
00153   //scratch.
00154   if (!this->connection_.is_nil()) {
00155     VDBG_LVL((LM_DEBUG, "(%P|%t) TcpDataLink::reuse_existing_connection - "
00156                            "trying to reuse existing connection\n"), 0);
00157     this->connection_->transfer(connection.in());
00158 
00159     //Connection already exists.
00160     TransportStrategy_rch brs;
00161     TransportSendStrategy_rch bss;
00162 
00163     if (this->receive_strategy_.is_nil() && this->send_strategy_.is_nil()) {
00164       this->connection_ = 0;
00165       return -1;
00166     } else {
00167       brs = this->receive_strategy_;
00168       bss = this->send_strategy_;
00169 
00170       this->connection_ = connection;
00171 
00172       TcpReceiveStrategy* rs = static_cast<TcpReceiveStrategy*>(brs.in());
00173 
00174       TcpSendStrategy* ss = static_cast<TcpSendStrategy*>(bss.in());
00175 
00176       // Associate the new connection object with the receiving strategy and disassociate
00177       // the old connection object with the receiving strategy.
00178       int rs_result = rs->reset(this->connection_.in());
00179 
00180       // Associate the new connection object with the sending strategy and disassociate
00181       // the old connection object with the sending strategy.
00182       int ss_result = ss->reset(this->connection_.in(), true);
00183 
00184       if (rs_result == 0 && ss_result == 0) {
00185         return 0;
00186       }
00187     }
00188   }
00189   return -1;
00190 }
00191 
00192 /// Associate the new connection object with this datalink object.
00193 /// The states of the "old" connection object are copied to the new
00194 /// connection object and the "old" connection object is replaced by
00195 /// the new connection object.
00196 int
00197 OpenDDS::DCPS::TcpDataLink::reconnect(TcpConnection* connection)
00198 {
00199   DBG_ENTRY_LVL("TcpDataLink","reconnect",6);
00200 
00201   // Sanity check - the connection should exist already since we are reconnecting.
00202   if (this->connection_.is_nil()) {
00203     VDBG_LVL((LM_ERROR,
00204               "(%P|%t) ERROR: TcpDataLink::reconnect old connection is nil.\n")
00205              , 1);
00206     return -1;
00207   }
00208 
00209   this->connection_->transfer(connection);
00210 
00211   bool released = false;
00212   TransportStrategy_rch brs;
00213   TransportSendStrategy_rch bss;
00214 
00215   {
00216     GuardType guard2(this->strategy_lock_);
00217 
00218     if (this->receive_strategy_.is_nil() && this->send_strategy_.is_nil()) {
00219       released = true;
00220       this->connection_ = 0;
00221 
00222     } else {
00223       brs = this->receive_strategy_;
00224       bss = this->send_strategy_;
00225     }
00226   }
00227 
00228   TcpConnection_rch conn_rch(connection, false);
00229 
00230   if (released) {
00231     TcpDataLink_rch this_rch(this, false);
00232     return this->transport_->connect_tcp_datalink(this_rch, conn_rch);
00233   }
00234 
00235   this->connection_ = conn_rch._retn();
00236 
00237   TcpReceiveStrategy* rs = static_cast<TcpReceiveStrategy*>(brs.in());
00238 
00239   TcpSendStrategy* ss = static_cast<TcpSendStrategy*>(bss.in());
00240 
00241   // Associate the new connection object with the receiveing strategy and disassociate
00242   // the old connection object with the receiveing strategy.
00243   int rs_result = rs->reset(this->connection_.in());
00244 
00245   // Associate the new connection object with the sending strategy and disassociate
00246   // the old connection object with the sending strategy.
00247   int ss_result = ss->reset(this->connection_.in());
00248 
00249   if (rs_result == 0 && ss_result == 0) {
00250     return 0;
00251   }
00252 
00253   return -1;
00254 }
00255 
00256 void
00257 OpenDDS::DCPS::TcpDataLink::send_graceful_disconnect_message()
00258 {
00259   DBG_ENTRY_LVL("TcpDataLink","send_graceful_disconnect_message",6);
00260 
00261   // Will clear all queued messages but still let the disconnect message
00262   // sent.
00263   this->send_strategy_->terminate_send(true);
00264 
00265   DataSampleHeader header_data;
00266   // The message_id_ is the most important value for the DataSampleHeader.
00267   header_data.message_id_ = GRACEFUL_DISCONNECT;
00268 
00269   // Other data in the DataSampleHeader are not necessary set. The bogus values
00270   // can be used.
00271 
00272   //header_data.byte_order_
00273   //  = this->transport_->get_configuration()->swap_bytes() ? !TAO_ENCAP_BYTE_ORDER : TAO_ENCAP_BYTE_ORDER;
00274   //header_data.message_length_ = 0;
00275   //header_data.sequence_ = 0;
00276   //DDS::Time_t source_timestamp
00277   //  = OpenDDS::DCPS::time_value_to_time (ACE_OS::gettimeofday ());
00278   //header_data.source_timestamp_sec_ = source_timestamp.sec;
00279   //header_data.source_timestamp_nanosec_ = source_timestamp.nanosec;
00280   //header_data.coherency_group_ = 0;
00281   //header_data.publication_id_ = 0;
00282 
00283   // TODO:
00284   // It seems a bug in the transport implementation that the receiving side can
00285   // not receive the message when the message has no sample data and is sent
00286   // in a single packet.
00287 
00288   // To work arround this problem, I have to add bogus data to chain with the
00289   // DataSampleHeader to make the receiving work.
00290   ACE_Message_Block* message;
00291   size_t max_marshaled_size = header_data.max_marshaled_size();
00292   ACE_Message_Block* data = 0;
00293   ACE_NEW(data,
00294           ACE_Message_Block(20,
00295                             ACE_Message_Block::MB_DATA,
00296                             0, //cont
00297                             0, //data
00298                             0, //allocator_strategy
00299                             0, //locking_strategy
00300                             ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
00301                             ACE_Time_Value::zero,
00302                             ACE_Time_Value::max_time,
00303                             0,
00304                             0));
00305   data->wr_ptr(20);
00306 
00307   header_data.message_length_ = static_cast<ACE_UINT32>(data->length());
00308 
00309   ACE_NEW(message,
00310           ACE_Message_Block(max_marshaled_size,
00311                             ACE_Message_Block::MB_DATA,
00312                             data, //cont
00313                             0, //data
00314                             0, //allocator_strategy
00315                             0, //locking_strategy
00316                             ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
00317                             ACE_Time_Value::zero,
00318                             ACE_Time_Value::max_time,
00319                             0,
00320                             0));
00321 
00322   *message << header_data;
00323 
00324   TransportControlElement* send_element = 0;
00325 
00326   ACE_NEW(send_element, TransportControlElement(message));
00327 
00328   // give the message block ownership to TransportControlElement
00329   message->release();
00330 
00331   // I don't want to rebuild a connection in order to send
00332   // a graceful disconnect message.
00333   this->send_i(send_element, false);
00334 }
00335 
00336 void OpenDDS::DCPS::TcpDataLink::set_release_pending(bool flag)
00337 {
00338   this->release_is_pending_ = flag;
00339 }
00340 
00341 bool OpenDDS::DCPS::TcpDataLink::is_release_pending() const
00342 {
00343   return this->release_is_pending_.value();
00344 }

Generated on Fri Feb 12 20:05:27 2016 for OpenDDS by  doxygen 1.4.7