#include <TcpDataLink.h>
Inheritance diagram for OpenDDS::DCPS::TcpDataLink:
Public Member Functions | |
TcpDataLink (const ACE_INET_Addr &remote_address, TcpTransport *transport_impl, Priority priority, bool is_loopback, bool is_active) | |
virtual | ~TcpDataLink () |
const ACE_INET_Addr & | remote_address () const |
Accessor for the remote address. | |
int | connect (const TcpConnection_rch &connection, const TransportSendStrategy_rch &send_strategy, const TransportStrategy_rch &receive_strategy) |
int | reuse_existing_connection (const TcpConnection_rch &connection) |
int | reconnect (TcpConnection *connection) |
TcpConnection_rch | get_connection () |
TcpTransport_rch | get_transport_impl () |
virtual bool | issues_on_deleted_callback () const |
virtual void | pre_stop_i () |
void | set_release_pending (bool flag) |
Set release pending flag. | |
bool | is_release_pending () const |
Get release pending flag. | |
Protected Member Functions | |
virtual void | stop_i () |
Private Member Functions | |
void | send_graceful_disconnect_message () |
Private Attributes | |
ACE_INET_Addr | remote_address_ |
TcpConnection_rch | connection_ |
TcpTransport_rch | transport_ |
bool | graceful_disconnect_sent_ |
ACE_Atomic_Op< ACE_Thread_Mutex, bool > | release_is_pending_ |
Definition at line 22 of file TcpDataLink.h.
OpenDDS::DCPS::TcpDataLink::TcpDataLink | ( | const ACE_INET_Addr & | remote_address, | |
TcpTransport * | transport_impl, | |||
Priority | priority, | |||
bool | is_loopback, | |||
bool | is_active | |||
) |
Definition at line 22 of file TcpDataLink.cpp.
References OpenDDS::DCPS::RcObject< T >::_add_ref(), DBG_ENTRY_LVL, and transport_.
00028 : DataLink(transport_impl, priority, is_loopback, is_active), 00029 remote_address_(remote_address), 00030 graceful_disconnect_sent_(false), 00031 release_is_pending_(false) 00032 { 00033 DBG_ENTRY_LVL("TcpDataLink","TcpDataLink",6); 00034 transport_impl->_add_ref(); 00035 this->transport_ = transport_impl; 00036 }
OpenDDS::DCPS::TcpDataLink::~TcpDataLink | ( | ) | [virtual] |
Definition at line 38 of file TcpDataLink.cpp.
References DBG_ENTRY_LVL.
00039 { 00040 DBG_ENTRY_LVL("TcpDataLink","~TcpDataLink",6); 00041 }
int OpenDDS::DCPS::TcpDataLink::connect | ( | const TcpConnection_rch & | connection, | |
const TransportSendStrategy_rch & | send_strategy, | |||
const TransportStrategy_rch & | receive_strategy | |||
) |
Called when an established connection object is available for this TcpDataLink. Called by the TcpTransport's connect_datalink() method.
Definition at line 95 of file TcpDataLink.cpp.
References connection_, and DBG_ENTRY_LVL.
00099 { 00100 DBG_ENTRY_LVL("TcpDataLink","connect",6); 00101 00102 // Sanity check - cannot connect() if we are already connected. 00103 if (!this->connection_.is_nil()) { 00104 ACE_ERROR_RETURN((LM_ERROR, 00105 "(%P|%t) ERROR: TcpDataLink already connected.\n"), 00106 -1); 00107 } 00108 00109 this->connection_ = connection; 00110 00111 if (this->connection_->peer().enable(ACE_NONBLOCK) == -1) { 00112 ACE_ERROR_RETURN((LM_ERROR, 00113 "(%P|%t) ERROR: TcpDataLink::connect failed to set " 00114 "ACE_NONBLOCK %p\n", ACE_TEXT("enable")), -1); 00115 } 00116 00117 // Let connection know the datalink for callbacks upon reconnect failure. 00118 this->connection_->set_datalink(this); 00119 00120 // And lastly, inform our base class (DataLink) that we are now "connected", 00121 // and it should start the strategy objects. 00122 if (this->start(send_strategy, receive_strategy) != 0) { 00123 // Our base (DataLink) class failed to start the strategy objects. 00124 // We need to "undo" some things here before we return -1 to indicate 00125 // that an error has taken place. 00126 00127 // Drop our reference to the connection object. 00128 this->connection_ = 0; 00129 00130 return -1; 00131 } 00132 00133 return 0; 00134 }
ACE_INLINE OpenDDS::DCPS::TcpConnection_rch OpenDDS::DCPS::TcpDataLink::get_connection | ( | ) |
Definition at line 20 of file TcpDataLink.inl.
References connection_.
00021 { 00022 return this->connection_; 00023 }
ACE_INLINE OpenDDS::DCPS::TcpTransport_rch OpenDDS::DCPS::TcpDataLink::get_transport_impl | ( | ) |
Definition at line 26 of file TcpDataLink.inl.
References transport_.
00027 { 00028 return this->transport_; 00029 }
bool OpenDDS::DCPS::TcpDataLink::is_release_pending | ( | ) | const |
Get release pending flag.
Definition at line 341 of file TcpDataLink.cpp.
References release_is_pending_.
00342 { 00343 return this->release_is_pending_.value(); 00344 }
ACE_INLINE bool OpenDDS::DCPS::TcpDataLink::issues_on_deleted_callback | ( | ) | const [virtual] |
void OpenDDS::DCPS::TcpDataLink::pre_stop_i | ( | ) | [virtual] |
Called before release the datalink or before shutdown to let the concrete DataLink to do anything necessary.
Reimplemented from OpenDDS::DCPS::DataLink.
Definition at line 64 of file TcpDataLink.cpp.
References DBG_ENTRY_LVL, graceful_disconnect_sent_, OpenDDS::DCPS::TcpReceiveStrategy::gracefully_disconnected(), OpenDDS::DCPS::RcHandle< T >::in(), OpenDDS::DCPS::DataLink::pre_stop_i(), and OpenDDS::DCPS::DataLink::receive_strategy_.
00065 { 00066 DBG_ENTRY_LVL("TcpDataLink","pre_stop_i",6); 00067 00068 DataLink::pre_stop_i(); 00069 00070 TcpReceiveStrategy * rs 00071 = dynamic_cast <TcpReceiveStrategy*>(this->receive_strategy_.in()); 00072 00073 if (rs != NULL) { 00074 // If we received the GRACEFUL_DISCONNECT message from peer before we 00075 // initiate the disconnecting of the datalink, then we will not send 00076 // the GRACEFUL_DISCONNECT message to the peer. 00077 bool disconnected = rs->gracefully_disconnected(); 00078 00079 if (!this->connection_.is_nil() && !this->graceful_disconnect_sent_ 00080 && !disconnected) { 00081 this->send_graceful_disconnect_message(); 00082 this->graceful_disconnect_sent_ = true; 00083 } 00084 } 00085 00086 if (!this->connection_.is_nil()) { 00087 this->connection_->shutdown(); 00088 } 00089 }
int OpenDDS::DCPS::TcpDataLink::reconnect | ( | TcpConnection * | connection | ) |
Associate the new connection object with this datalink object. The states of the "old" connection object are copied to the new connection object and the "old" connection object is replaced by the new connection object.
Definition at line 197 of file TcpDataLink.cpp.
References OpenDDS::DCPS::RcHandle< T >::_retn(), connection_, DBG_ENTRY_LVL, OpenDDS::DCPS::RcHandle< T >::in(), OpenDDS::DCPS::RcHandle< T >::is_nil(), OpenDDS::DCPS::DataLink::receive_strategy_, OpenDDS::DCPS::TcpReceiveStrategy::reset(), OpenDDS::DCPS::DataLink::send_strategy_, transport_, and VDBG_LVL.
00198 { 00199 DBG_ENTRY_LVL("TcpDataLink","reconnect",6); 00200 00201 // Sanity check - the connection should exist already since we are reconnecting. 00202 if (this->connection_.is_nil()) { 00203 VDBG_LVL((LM_ERROR, 00204 "(%P|%t) ERROR: TcpDataLink::reconnect old connection is nil.\n") 00205 , 1); 00206 return -1; 00207 } 00208 00209 this->connection_->transfer(connection); 00210 00211 bool released = false; 00212 TransportStrategy_rch brs; 00213 TransportSendStrategy_rch bss; 00214 00215 { 00216 GuardType guard2(this->strategy_lock_); 00217 00218 if (this->receive_strategy_.is_nil() && this->send_strategy_.is_nil()) { 00219 released = true; 00220 this->connection_ = 0; 00221 00222 } else { 00223 brs = this->receive_strategy_; 00224 bss = this->send_strategy_; 00225 } 00226 } 00227 00228 TcpConnection_rch conn_rch(connection, false); 00229 00230 if (released) { 00231 TcpDataLink_rch this_rch(this, false); 00232 return this->transport_->connect_tcp_datalink(this_rch, conn_rch); 00233 } 00234 00235 this->connection_ = conn_rch._retn(); 00236 00237 TcpReceiveStrategy* rs = static_cast<TcpReceiveStrategy*>(brs.in()); 00238 00239 TcpSendStrategy* ss = static_cast<TcpSendStrategy*>(bss.in()); 00240 00241 // Associate the new connection object with the receiveing strategy and disassociate 00242 // the old connection object with the receiveing strategy. 00243 int rs_result = rs->reset(this->connection_.in()); 00244 00245 // Associate the new connection object with the sending strategy and disassociate 00246 // the old connection object with the sending strategy. 00247 int ss_result = ss->reset(this->connection_.in()); 00248 00249 if (rs_result == 0 && ss_result == 0) { 00250 return 0; 00251 } 00252 00253 return -1; 00254 }
ACE_INLINE const ACE_INET_Addr & OpenDDS::DCPS::TcpDataLink::remote_address | ( | ) | const |
Accessor for the remote address.
Definition at line 13 of file TcpDataLink.inl.
References DBG_ENTRY_LVL, and remote_address_.
Referenced by OpenDDS::DCPS::TcpTransport::release_datalink(), and OpenDDS::DCPS::TcpTransport::unbind_link().
00014 { 00015 DBG_ENTRY_LVL("TcpDataLink","remote_address",6); 00016 return this->remote_address_; 00017 }
int OpenDDS::DCPS::TcpDataLink::reuse_existing_connection | ( | const TcpConnection_rch & | connection | ) |
Definition at line 143 of file TcpDataLink.cpp.
References connection_, DBG_ENTRY_LVL, OpenDDS::DCPS::RcHandle< T >::in(), OpenDDS::DCPS::RcHandle< T >::is_nil(), OpenDDS::DCPS::DataLink::receive_strategy_, OpenDDS::DCPS::TcpReceiveStrategy::reset(), OpenDDS::DCPS::DataLink::send_strategy_, and VDBG_LVL.
00144 { 00145 DBG_ENTRY_LVL("TcpDataLink","reuse_existing_connection",6); 00146 00147 if (this->is_active_) { 00148 return -1; 00149 } 00150 //Need to check if connection is nil. If connection is not nil, then connection 00151 //has previously gone through connection phase so this is a reuse of the connection 00152 //proceed to determine if we can reuse/reset existing mechanisms or need to start from 00153 //scratch. 00154 if (!this->connection_.is_nil()) { 00155 VDBG_LVL((LM_DEBUG, "(%P|%t) TcpDataLink::reuse_existing_connection - " 00156 "trying to reuse existing connection\n"), 0); 00157 this->connection_->transfer(connection.in()); 00158 00159 //Connection already exists. 00160 TransportStrategy_rch brs; 00161 TransportSendStrategy_rch bss; 00162 00163 if (this->receive_strategy_.is_nil() && this->send_strategy_.is_nil()) { 00164 this->connection_ = 0; 00165 return -1; 00166 } else { 00167 brs = this->receive_strategy_; 00168 bss = this->send_strategy_; 00169 00170 this->connection_ = connection; 00171 00172 TcpReceiveStrategy* rs = static_cast<TcpReceiveStrategy*>(brs.in()); 00173 00174 TcpSendStrategy* ss = static_cast<TcpSendStrategy*>(bss.in()); 00175 00176 // Associate the new connection object with the receiving strategy and disassociate 00177 // the old connection object with the receiving strategy. 00178 int rs_result = rs->reset(this->connection_.in()); 00179 00180 // Associate the new connection object with the sending strategy and disassociate 00181 // the old connection object with the sending strategy. 00182 int ss_result = ss->reset(this->connection_.in(), true); 00183 00184 if (rs_result == 0 && ss_result == 0) { 00185 return 0; 00186 } 00187 } 00188 } 00189 return -1; 00190 }
void OpenDDS::DCPS::TcpDataLink::send_graceful_disconnect_message | ( | ) | [private] |
Definition at line 257 of file TcpDataLink.cpp.
References DBG_ENTRY_LVL, OpenDDS::DCPS::GRACEFUL_DISCONNECT, OpenDDS::DCPS::DataSampleHeader::max_marshaled_size(), max_marshaled_size(), OpenDDS::DCPS::DataSampleHeader::message_id_, OpenDDS::DCPS::DataSampleHeader::message_length_, OpenDDS::DCPS::DataLink::send_i(), and OpenDDS::DCPS::DataLink::send_strategy_.
00258 { 00259 DBG_ENTRY_LVL("TcpDataLink","send_graceful_disconnect_message",6); 00260 00261 // Will clear all queued messages but still let the disconnect message 00262 // sent. 00263 this->send_strategy_->terminate_send(true); 00264 00265 DataSampleHeader header_data; 00266 // The message_id_ is the most important value for the DataSampleHeader. 00267 header_data.message_id_ = GRACEFUL_DISCONNECT; 00268 00269 // Other data in the DataSampleHeader are not necessary set. The bogus values 00270 // can be used. 00271 00272 //header_data.byte_order_ 00273 // = this->transport_->get_configuration()->swap_bytes() ? !TAO_ENCAP_BYTE_ORDER : TAO_ENCAP_BYTE_ORDER; 00274 //header_data.message_length_ = 0; 00275 //header_data.sequence_ = 0; 00276 //DDS::Time_t source_timestamp 00277 // = OpenDDS::DCPS::time_value_to_time (ACE_OS::gettimeofday ()); 00278 //header_data.source_timestamp_sec_ = source_timestamp.sec; 00279 //header_data.source_timestamp_nanosec_ = source_timestamp.nanosec; 00280 //header_data.coherency_group_ = 0; 00281 //header_data.publication_id_ = 0; 00282 00283 // TODO: 00284 // It seems a bug in the transport implementation that the receiving side can 00285 // not receive the message when the message has no sample data and is sent 00286 // in a single packet. 00287 00288 // To work arround this problem, I have to add bogus data to chain with the 00289 // DataSampleHeader to make the receiving work. 00290 ACE_Message_Block* message; 00291 size_t max_marshaled_size = header_data.max_marshaled_size(); 00292 ACE_Message_Block* data = 0; 00293 ACE_NEW(data, 00294 ACE_Message_Block(20, 00295 ACE_Message_Block::MB_DATA, 00296 0, //cont 00297 0, //data 00298 0, //allocator_strategy 00299 0, //locking_strategy 00300 ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY, 00301 ACE_Time_Value::zero, 00302 ACE_Time_Value::max_time, 00303 0, 00304 0)); 00305 data->wr_ptr(20); 00306 00307 header_data.message_length_ = static_cast<ACE_UINT32>(data->length()); 00308 00309 ACE_NEW(message, 00310 ACE_Message_Block(max_marshaled_size, 00311 ACE_Message_Block::MB_DATA, 00312 data, //cont 00313 0, //data 00314 0, //allocator_strategy 00315 0, //locking_strategy 00316 ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY, 00317 ACE_Time_Value::zero, 00318 ACE_Time_Value::max_time, 00319 0, 00320 0)); 00321 00322 *message << header_data; 00323 00324 TransportControlElement* send_element = 0; 00325 00326 ACE_NEW(send_element, TransportControlElement(message)); 00327 00328 // give the message block ownership to TransportControlElement 00329 message->release(); 00330 00331 // I don't want to rebuild a connection in order to send 00332 // a graceful disconnect message. 00333 this->send_i(send_element, false); 00334 }
void OpenDDS::DCPS::TcpDataLink::set_release_pending | ( | bool | flag | ) |
Set release pending flag.
Definition at line 336 of file TcpDataLink.cpp.
References release_is_pending_.
00337 { 00338 this->release_is_pending_ = flag; 00339 }
void OpenDDS::DCPS::TcpDataLink::stop_i | ( | ) | [protected, virtual] |
Called when the DataLink is self-releasing because all of its reservations have been released, or when the TransportImpl is handling a shutdown() call.
Reimplemented from OpenDDS::DCPS::DataLink.
Definition at line 50 of file TcpDataLink.cpp.
References connection_, and DBG_ENTRY_LVL.
00051 { 00052 DBG_ENTRY_LVL("TcpDataLink","stop_i",6); 00053 00054 if (!this->connection_.is_nil()) { 00055 // Tell the connection object to disconnect. 00056 this->connection_->disconnect(); 00057 00058 // Drop our reference to the connection object. 00059 this->connection_ = 0; 00060 } 00061 }
Definition at line 69 of file TcpDataLink.h.
Referenced by connect(), get_connection(), reconnect(), reuse_existing_connection(), and stop_i().
bool OpenDDS::DCPS::TcpDataLink::graceful_disconnect_sent_ [private] |
ACE_Atomic_Op<ACE_Thread_Mutex, bool> OpenDDS::DCPS::TcpDataLink::release_is_pending_ [private] |
Definition at line 72 of file TcpDataLink.h.
Referenced by is_release_pending(), and set_release_pending().
ACE_INET_Addr OpenDDS::DCPS::TcpDataLink::remote_address_ [private] |
Definition at line 70 of file TcpDataLink.h.
Referenced by get_transport_impl(), reconnect(), and TcpDataLink().