OpenDDS::DCPS::TcpDataLink Class Reference

#include <TcpDataLink.h>

Inheritance diagram for OpenDDS::DCPS::TcpDataLink:

Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::TcpDataLink:

Collaboration graph
[legend]
List of all members.

Public Member Functions

 TcpDataLink (const ACE_INET_Addr &remote_address, TcpTransport *transport_impl, Priority priority, bool is_loopback, bool is_active)
virtual ~TcpDataLink ()
const ACE_INET_Addr & remote_address () const
 Accessor for the remote address.
int connect (const TcpConnection_rch &connection, const TransportSendStrategy_rch &send_strategy, const TransportStrategy_rch &receive_strategy)
int reuse_existing_connection (const TcpConnection_rch &connection)
int reconnect (TcpConnection *connection)
TcpConnection_rch get_connection ()
TcpTransport_rch get_transport_impl ()
virtual bool issues_on_deleted_callback () const
virtual void pre_stop_i ()
void set_release_pending (bool flag)
 Set release pending flag.
bool is_release_pending () const
 Get release pending flag.

Protected Member Functions

virtual void stop_i ()

Private Member Functions

void send_graceful_disconnect_message ()

Private Attributes

ACE_INET_Addr remote_address_
TcpConnection_rch connection_
TcpTransport_rch transport_
bool graceful_disconnect_sent_
ACE_Atomic_Op< ACE_Thread_Mutex,
bool > 
release_is_pending_

Detailed Description

Definition at line 22 of file TcpDataLink.h.


Constructor & Destructor Documentation

OpenDDS::DCPS::TcpDataLink::TcpDataLink ( const ACE_INET_Addr &  remote_address,
TcpTransport transport_impl,
Priority  priority,
bool  is_loopback,
bool  is_active 
)

Definition at line 22 of file TcpDataLink.cpp.

References OpenDDS::DCPS::RcObject< T >::_add_ref(), DBG_ENTRY_LVL, and transport_.

00028   : DataLink(transport_impl, priority, is_loopback, is_active),
00029     remote_address_(remote_address),
00030     graceful_disconnect_sent_(false),
00031     release_is_pending_(false)
00032 {
00033   DBG_ENTRY_LVL("TcpDataLink","TcpDataLink",6);
00034   transport_impl->_add_ref();
00035   this->transport_ = transport_impl;
00036 }

OpenDDS::DCPS::TcpDataLink::~TcpDataLink (  )  [virtual]

Definition at line 38 of file TcpDataLink.cpp.

References DBG_ENTRY_LVL.

00039 {
00040   DBG_ENTRY_LVL("TcpDataLink","~TcpDataLink",6);
00041 }


Member Function Documentation

int OpenDDS::DCPS::TcpDataLink::connect ( const TcpConnection_rch connection,
const TransportSendStrategy_rch send_strategy,
const TransportStrategy_rch receive_strategy 
)

Called when an established connection object is available for this TcpDataLink. Called by the TcpTransport's connect_datalink() method.

Definition at line 95 of file TcpDataLink.cpp.

References connection_, and DBG_ENTRY_LVL.

00099 {
00100   DBG_ENTRY_LVL("TcpDataLink","connect",6);
00101 
00102   // Sanity check - cannot connect() if we are already connected.
00103   if (!this->connection_.is_nil()) {
00104     ACE_ERROR_RETURN((LM_ERROR,
00105                       "(%P|%t) ERROR: TcpDataLink already connected.\n"),
00106                      -1);
00107   }
00108 
00109   this->connection_ = connection;
00110 
00111   if (this->connection_->peer().enable(ACE_NONBLOCK) == -1) {
00112     ACE_ERROR_RETURN((LM_ERROR,
00113                       "(%P|%t) ERROR: TcpDataLink::connect failed to set "
00114                       "ACE_NONBLOCK %p\n", ACE_TEXT("enable")), -1);
00115   }
00116 
00117   // Let connection know the datalink for callbacks upon reconnect failure.
00118   this->connection_->set_datalink(this);
00119 
00120   // And lastly, inform our base class (DataLink) that we are now "connected",
00121   // and it should start the strategy objects.
00122   if (this->start(send_strategy, receive_strategy) != 0) {
00123     // Our base (DataLink) class failed to start the strategy objects.
00124     // We need to "undo" some things here before we return -1 to indicate
00125     // that an error has taken place.
00126 
00127     // Drop our reference to the connection object.
00128     this->connection_ = 0;
00129 
00130     return -1;
00131   }
00132 
00133   return 0;
00134 }

ACE_INLINE OpenDDS::DCPS::TcpConnection_rch OpenDDS::DCPS::TcpDataLink::get_connection (  ) 

Definition at line 20 of file TcpDataLink.inl.

References connection_.

00021 {
00022   return this->connection_;
00023 }

ACE_INLINE OpenDDS::DCPS::TcpTransport_rch OpenDDS::DCPS::TcpDataLink::get_transport_impl (  ) 

Definition at line 26 of file TcpDataLink.inl.

References transport_.

00027 {
00028   return this->transport_;
00029 }

bool OpenDDS::DCPS::TcpDataLink::is_release_pending (  )  const

Get release pending flag.

Definition at line 341 of file TcpDataLink.cpp.

References release_is_pending_.

00342 {
00343   return this->release_is_pending_.value();
00344 }

ACE_INLINE bool OpenDDS::DCPS::TcpDataLink::issues_on_deleted_callback (  )  const [virtual]

Reimplemented from OpenDDS::DCPS::DataLink.

Definition at line 33 of file TcpDataLink.inl.

00034 {
00035   return true;
00036 }

void OpenDDS::DCPS::TcpDataLink::pre_stop_i (  )  [virtual]

Called before release the datalink or before shutdown to let the concrete DataLink to do anything necessary.

Reimplemented from OpenDDS::DCPS::DataLink.

Definition at line 64 of file TcpDataLink.cpp.

References DBG_ENTRY_LVL, graceful_disconnect_sent_, OpenDDS::DCPS::TcpReceiveStrategy::gracefully_disconnected(), OpenDDS::DCPS::RcHandle< T >::in(), OpenDDS::DCPS::DataLink::pre_stop_i(), and OpenDDS::DCPS::DataLink::receive_strategy_.

00065 {
00066   DBG_ENTRY_LVL("TcpDataLink","pre_stop_i",6);
00067 
00068   DataLink::pre_stop_i();
00069 
00070   TcpReceiveStrategy * rs
00071     = dynamic_cast <TcpReceiveStrategy*>(this->receive_strategy_.in());
00072 
00073   if (rs != NULL) {
00074     // If we received the GRACEFUL_DISCONNECT message from peer before we
00075     // initiate the disconnecting of the datalink, then we will not send
00076     // the GRACEFUL_DISCONNECT message to the peer.
00077     bool disconnected = rs->gracefully_disconnected();
00078 
00079     if (!this->connection_.is_nil() && !this->graceful_disconnect_sent_
00080         && !disconnected) {
00081       this->send_graceful_disconnect_message();
00082       this->graceful_disconnect_sent_ = true;
00083     }
00084   }
00085 
00086   if (!this->connection_.is_nil()) {
00087     this->connection_->shutdown();
00088   }
00089 }

int OpenDDS::DCPS::TcpDataLink::reconnect ( TcpConnection connection  ) 

Associate the new connection object with this datalink object. The states of the "old" connection object are copied to the new connection object and the "old" connection object is replaced by the new connection object.

Definition at line 197 of file TcpDataLink.cpp.

References OpenDDS::DCPS::RcHandle< T >::_retn(), connection_, DBG_ENTRY_LVL, OpenDDS::DCPS::RcHandle< T >::in(), OpenDDS::DCPS::RcHandle< T >::is_nil(), OpenDDS::DCPS::DataLink::receive_strategy_, OpenDDS::DCPS::TcpReceiveStrategy::reset(), OpenDDS::DCPS::DataLink::send_strategy_, transport_, and VDBG_LVL.

00198 {
00199   DBG_ENTRY_LVL("TcpDataLink","reconnect",6);
00200 
00201   // Sanity check - the connection should exist already since we are reconnecting.
00202   if (this->connection_.is_nil()) {
00203     VDBG_LVL((LM_ERROR,
00204               "(%P|%t) ERROR: TcpDataLink::reconnect old connection is nil.\n")
00205              , 1);
00206     return -1;
00207   }
00208 
00209   this->connection_->transfer(connection);
00210 
00211   bool released = false;
00212   TransportStrategy_rch brs;
00213   TransportSendStrategy_rch bss;
00214 
00215   {
00216     GuardType guard2(this->strategy_lock_);
00217 
00218     if (this->receive_strategy_.is_nil() && this->send_strategy_.is_nil()) {
00219       released = true;
00220       this->connection_ = 0;
00221 
00222     } else {
00223       brs = this->receive_strategy_;
00224       bss = this->send_strategy_;
00225     }
00226   }
00227 
00228   TcpConnection_rch conn_rch(connection, false);
00229 
00230   if (released) {
00231     TcpDataLink_rch this_rch(this, false);
00232     return this->transport_->connect_tcp_datalink(this_rch, conn_rch);
00233   }
00234 
00235   this->connection_ = conn_rch._retn();
00236 
00237   TcpReceiveStrategy* rs = static_cast<TcpReceiveStrategy*>(brs.in());
00238 
00239   TcpSendStrategy* ss = static_cast<TcpSendStrategy*>(bss.in());
00240 
00241   // Associate the new connection object with the receiveing strategy and disassociate
00242   // the old connection object with the receiveing strategy.
00243   int rs_result = rs->reset(this->connection_.in());
00244 
00245   // Associate the new connection object with the sending strategy and disassociate
00246   // the old connection object with the sending strategy.
00247   int ss_result = ss->reset(this->connection_.in());
00248 
00249   if (rs_result == 0 && ss_result == 0) {
00250     return 0;
00251   }
00252 
00253   return -1;
00254 }

ACE_INLINE const ACE_INET_Addr & OpenDDS::DCPS::TcpDataLink::remote_address (  )  const

Accessor for the remote address.

Definition at line 13 of file TcpDataLink.inl.

References DBG_ENTRY_LVL, and remote_address_.

Referenced by OpenDDS::DCPS::TcpTransport::release_datalink(), and OpenDDS::DCPS::TcpTransport::unbind_link().

00014 {
00015   DBG_ENTRY_LVL("TcpDataLink","remote_address",6);
00016   return this->remote_address_;
00017 }

int OpenDDS::DCPS::TcpDataLink::reuse_existing_connection ( const TcpConnection_rch connection  ) 

Definition at line 143 of file TcpDataLink.cpp.

References connection_, DBG_ENTRY_LVL, OpenDDS::DCPS::RcHandle< T >::in(), OpenDDS::DCPS::RcHandle< T >::is_nil(), OpenDDS::DCPS::DataLink::receive_strategy_, OpenDDS::DCPS::TcpReceiveStrategy::reset(), OpenDDS::DCPS::DataLink::send_strategy_, and VDBG_LVL.

00144 {
00145   DBG_ENTRY_LVL("TcpDataLink","reuse_existing_connection",6);
00146 
00147   if (this->is_active_) {
00148     return -1;
00149   }
00150   //Need to check if connection is nil.  If connection is not nil, then connection
00151   //has previously gone through connection phase so this is a reuse of the connection
00152   //proceed to determine if we can reuse/reset existing mechanisms or need to start from
00153   //scratch.
00154   if (!this->connection_.is_nil()) {
00155     VDBG_LVL((LM_DEBUG, "(%P|%t) TcpDataLink::reuse_existing_connection - "
00156                            "trying to reuse existing connection\n"), 0);
00157     this->connection_->transfer(connection.in());
00158 
00159     //Connection already exists.
00160     TransportStrategy_rch brs;
00161     TransportSendStrategy_rch bss;
00162 
00163     if (this->receive_strategy_.is_nil() && this->send_strategy_.is_nil()) {
00164       this->connection_ = 0;
00165       return -1;
00166     } else {
00167       brs = this->receive_strategy_;
00168       bss = this->send_strategy_;
00169 
00170       this->connection_ = connection;
00171 
00172       TcpReceiveStrategy* rs = static_cast<TcpReceiveStrategy*>(brs.in());
00173 
00174       TcpSendStrategy* ss = static_cast<TcpSendStrategy*>(bss.in());
00175 
00176       // Associate the new connection object with the receiving strategy and disassociate
00177       // the old connection object with the receiving strategy.
00178       int rs_result = rs->reset(this->connection_.in());
00179 
00180       // Associate the new connection object with the sending strategy and disassociate
00181       // the old connection object with the sending strategy.
00182       int ss_result = ss->reset(this->connection_.in(), true);
00183 
00184       if (rs_result == 0 && ss_result == 0) {
00185         return 0;
00186       }
00187     }
00188   }
00189   return -1;
00190 }

void OpenDDS::DCPS::TcpDataLink::send_graceful_disconnect_message (  )  [private]

Definition at line 257 of file TcpDataLink.cpp.

References DBG_ENTRY_LVL, OpenDDS::DCPS::GRACEFUL_DISCONNECT, OpenDDS::DCPS::DataSampleHeader::max_marshaled_size(), max_marshaled_size(), OpenDDS::DCPS::DataSampleHeader::message_id_, OpenDDS::DCPS::DataSampleHeader::message_length_, OpenDDS::DCPS::DataLink::send_i(), and OpenDDS::DCPS::DataLink::send_strategy_.

00258 {
00259   DBG_ENTRY_LVL("TcpDataLink","send_graceful_disconnect_message",6);
00260 
00261   // Will clear all queued messages but still let the disconnect message
00262   // sent.
00263   this->send_strategy_->terminate_send(true);
00264 
00265   DataSampleHeader header_data;
00266   // The message_id_ is the most important value for the DataSampleHeader.
00267   header_data.message_id_ = GRACEFUL_DISCONNECT;
00268 
00269   // Other data in the DataSampleHeader are not necessary set. The bogus values
00270   // can be used.
00271 
00272   //header_data.byte_order_
00273   //  = this->transport_->get_configuration()->swap_bytes() ? !TAO_ENCAP_BYTE_ORDER : TAO_ENCAP_BYTE_ORDER;
00274   //header_data.message_length_ = 0;
00275   //header_data.sequence_ = 0;
00276   //DDS::Time_t source_timestamp
00277   //  = OpenDDS::DCPS::time_value_to_time (ACE_OS::gettimeofday ());
00278   //header_data.source_timestamp_sec_ = source_timestamp.sec;
00279   //header_data.source_timestamp_nanosec_ = source_timestamp.nanosec;
00280   //header_data.coherency_group_ = 0;
00281   //header_data.publication_id_ = 0;
00282 
00283   // TODO:
00284   // It seems a bug in the transport implementation that the receiving side can
00285   // not receive the message when the message has no sample data and is sent
00286   // in a single packet.
00287 
00288   // To work arround this problem, I have to add bogus data to chain with the
00289   // DataSampleHeader to make the receiving work.
00290   ACE_Message_Block* message;
00291   size_t max_marshaled_size = header_data.max_marshaled_size();
00292   ACE_Message_Block* data = 0;
00293   ACE_NEW(data,
00294           ACE_Message_Block(20,
00295                             ACE_Message_Block::MB_DATA,
00296                             0, //cont
00297                             0, //data
00298                             0, //allocator_strategy
00299                             0, //locking_strategy
00300                             ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
00301                             ACE_Time_Value::zero,
00302                             ACE_Time_Value::max_time,
00303                             0,
00304                             0));
00305   data->wr_ptr(20);
00306 
00307   header_data.message_length_ = static_cast<ACE_UINT32>(data->length());
00308 
00309   ACE_NEW(message,
00310           ACE_Message_Block(max_marshaled_size,
00311                             ACE_Message_Block::MB_DATA,
00312                             data, //cont
00313                             0, //data
00314                             0, //allocator_strategy
00315                             0, //locking_strategy
00316                             ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
00317                             ACE_Time_Value::zero,
00318                             ACE_Time_Value::max_time,
00319                             0,
00320                             0));
00321 
00322   *message << header_data;
00323 
00324   TransportControlElement* send_element = 0;
00325 
00326   ACE_NEW(send_element, TransportControlElement(message));
00327 
00328   // give the message block ownership to TransportControlElement
00329   message->release();
00330 
00331   // I don't want to rebuild a connection in order to send
00332   // a graceful disconnect message.
00333   this->send_i(send_element, false);
00334 }

void OpenDDS::DCPS::TcpDataLink::set_release_pending ( bool  flag  ) 

Set release pending flag.

Definition at line 336 of file TcpDataLink.cpp.

References release_is_pending_.

00337 {
00338   this->release_is_pending_ = flag;
00339 }

void OpenDDS::DCPS::TcpDataLink::stop_i (  )  [protected, virtual]

Called when the DataLink is self-releasing because all of its reservations have been released, or when the TransportImpl is handling a shutdown() call.

Reimplemented from OpenDDS::DCPS::DataLink.

Definition at line 50 of file TcpDataLink.cpp.

References connection_, and DBG_ENTRY_LVL.

00051 {
00052   DBG_ENTRY_LVL("TcpDataLink","stop_i",6);
00053 
00054   if (!this->connection_.is_nil()) {
00055     // Tell the connection object to disconnect.
00056     this->connection_->disconnect();
00057 
00058     // Drop our reference to the connection object.
00059     this->connection_ = 0;
00060   }
00061 }


Member Data Documentation

TcpConnection_rch OpenDDS::DCPS::TcpDataLink::connection_ [private]

Definition at line 69 of file TcpDataLink.h.

Referenced by connect(), get_connection(), reconnect(), reuse_existing_connection(), and stop_i().

bool OpenDDS::DCPS::TcpDataLink::graceful_disconnect_sent_ [private]

Definition at line 71 of file TcpDataLink.h.

Referenced by pre_stop_i().

ACE_Atomic_Op<ACE_Thread_Mutex, bool> OpenDDS::DCPS::TcpDataLink::release_is_pending_ [private]

Definition at line 72 of file TcpDataLink.h.

Referenced by is_release_pending(), and set_release_pending().

ACE_INET_Addr OpenDDS::DCPS::TcpDataLink::remote_address_ [private]

Definition at line 68 of file TcpDataLink.h.

Referenced by remote_address().

TcpTransport_rch OpenDDS::DCPS::TcpDataLink::transport_ [private]

Definition at line 70 of file TcpDataLink.h.

Referenced by get_transport_impl(), reconnect(), and TcpDataLink().


The documentation for this class was generated from the following files:
Generated on Fri Feb 12 20:06:38 2016 for OpenDDS by  doxygen 1.4.7