#include <UdpTransport.h>
Public Member Functions | |
UdpTransport (UdpInst &inst) | |
void | passive_connection (const ACE_INET_Addr &remote_address, const Message_Block_Ptr &data) |
UdpInst & | config () const |
Protected Member Functions | |
virtual AcceptConnectResult | connect_datalink (const RemoteTransport &remote, const ConnectionAttribs &attribs, const TransportClient_rch &client) |
virtual AcceptConnectResult | accept_datalink (const RemoteTransport &remote, const ConnectionAttribs &attribs, const TransportClient_rch &client) |
virtual void | stop_accepting_or_connecting (const TransportClient_wrch &client, const RepoId &remote_id) |
bool | configure_i (UdpInst &config) |
virtual void | shutdown_i () |
virtual bool | connection_info_i (TransportLocator &info) const |
ACE_INET_Addr | get_connection_addr (const TransportBLOB &data) const |
virtual void | release_datalink (DataLink *link) |
virtual std::string | transport_type () const |
Private Types | |
typedef ACE_SYNCH_MUTEX | LockType |
typedef ACE_Guard< LockType > | GuardType |
typedef ACE_Condition< LockType > | ConditionType |
typedef std::vector < DataLink::OnStartCallback > | Callbacks |
Private Member Functions | |
UdpDataLink_rch | make_datalink (const ACE_INET_Addr &remote_address, Priority priority, bool active) |
PriorityKey | blob_to_key (const TransportBLOB &remote, Priority priority, ACE_INET_Addr local_addr, bool active) |
typedef | OPENDDS_MAP (PriorityKey, UdpDataLink_rch) UdpDataLinkMap |
Map of fully associated DataLinks for this transport. Protected. | |
typedef | OPENDDS_MAP (PriorityKey, Callbacks) PendConnMap |
Private Attributes | |
LockType | client_links_lock_ |
This lock is used to protect the client_links_ data member. | |
UdpDataLinkMap | client_links_ |
UdpDataLink_rch | server_link_ |
The single datalink for the passive side. No locking required. | |
ACE_Recursive_Thread_Mutex | connections_lock_ |
std::set< PriorityKey > | server_link_keys_ |
PendConnMap | pending_connections_ |
std::set< PriorityKey > | pending_server_link_keys_ |
Definition at line 28 of file UdpTransport.h.
typedef std::vector<DataLink::OnStartCallback> OpenDDS::DCPS::UdpTransport::Callbacks [private] |
Definition at line 91 of file UdpTransport.h.
typedef ACE_Condition<LockType> OpenDDS::DCPS::UdpTransport::ConditionType [private] |
Definition at line 67 of file UdpTransport.h.
typedef ACE_Guard<LockType> OpenDDS::DCPS::UdpTransport::GuardType [private] |
Reimplemented from OpenDDS::DCPS::TransportImpl.
Definition at line 66 of file UdpTransport.h.
typedef ACE_SYNCH_MUTEX OpenDDS::DCPS::UdpTransport::LockType [private] |
Reimplemented from OpenDDS::DCPS::TransportImpl.
Definition at line 65 of file UdpTransport.h.
OpenDDS::DCPS::UdpTransport::UdpTransport | ( | UdpInst & | inst | ) | [explicit] |
Definition at line 28 of file UdpTransport.cpp.
References configure_i(), and OpenDDS::DCPS::TransportImpl::open().
00029 : TransportImpl(inst) 00030 { 00031 if (!(configure_i(inst) && open())) { 00032 throw Transport::UnableToCreate(); 00033 } 00034 }
TransportImpl::AcceptConnectResult OpenDDS::DCPS::UdpTransport::accept_datalink | ( | const RemoteTransport & | remote, | |
const ConnectionAttribs & | attribs, | |||
const TransportClient_rch & | client | |||
) | [protected, virtual] |
accept_datalink() is called from TransportClient to initiate an association as the passive peer. A DataLink may be returned if one is already connected and ready to use, otherwise passively wait for a physical connection from the active side (either in the form of a connection event or handshaking message). Upon completion of the physical connection, the transport calls back to TransportClient::use_datalink().
Implements OpenDDS::DCPS::TransportImpl.
Definition at line 102 of file UdpTransport.cpp.
References OpenDDS::DCPS::TransportImpl::AcceptConnectResult::ACR_SUCCESS, OpenDDS::DCPS::TransportImpl::RemoteTransport::blob_, blob_to_key(), config(), connections_lock_, LM_DEBUG, pending_connections_, pending_server_link_keys_, OpenDDS::DCPS::TransportImpl::ConnectionAttribs::priority_, OpenDDS::DCPS::TransportImpl::RemoteTransport::repo_id_, server_link_, server_link_keys_, and VDBG.
00105 { 00106 ACE_Guard<ACE_Recursive_Thread_Mutex> guard(connections_lock_); 00107 //GuardType guard(connections_lock_); 00108 const PriorityKey key = blob_to_key(remote.blob_, 00109 attribs.priority_, config().local_address(), false /* !active */); 00110 if (server_link_keys_.count(key)) { 00111 VDBG((LM_DEBUG, "(%P|%t) UdpTransport::accept_datalink found\n")); 00112 return AcceptConnectResult(UdpDataLink_rch(server_link_)); 00113 } 00114 00115 else if (pending_server_link_keys_.count(key)) { 00116 pending_server_link_keys_.erase(key); 00117 server_link_keys_.insert(key); 00118 VDBG((LM_DEBUG, "(%P|%t) UdpTransport::accept_datalink completed\n")); 00119 return AcceptConnectResult(UdpDataLink_rch(server_link_)); 00120 } else { 00121 const DataLink::OnStartCallback callback(client, remote.repo_id_); 00122 pending_connections_[key].push_back(callback); 00123 VDBG((LM_DEBUG, "(%P|%t) UdpTransport::accept_datalink pending\n")); 00124 return AcceptConnectResult(AcceptConnectResult::ACR_SUCCESS); 00125 } 00126 return AcceptConnectResult(); 00127 }
PriorityKey OpenDDS::DCPS::UdpTransport::blob_to_key | ( | const TransportBLOB & | remote, | |
Priority | priority, | |||
ACE_INET_Addr | local_addr, | |||
bool | active | |||
) | [private] |
Definition at line 230 of file UdpTransport.cpp.
References ACE_TEXT(), LM_ERROR, and OpenDDS::DCPS::NetworkAddress::to_addr().
Referenced by accept_datalink(), connect_datalink(), and passive_connection().
00234 { 00235 NetworkAddress network_order_address; 00236 ACE_InputCDR cdr((const char*)remote.get_buffer(), remote.length()); 00237 00238 if (!(cdr >> network_order_address)) { 00239 ACE_ERROR((LM_ERROR, 00240 ACE_TEXT("(%P|%t) ERROR: UdpTransport::blob_to_key") 00241 ACE_TEXT(" failed to de-serialize the NetworkAddress\n"))); 00242 } 00243 00244 ACE_INET_Addr remote_address; 00245 network_order_address.to_addr(remote_address); 00246 const bool is_loopback = remote_address == local_addr; 00247 00248 return PriorityKey(priority, remote_address, is_loopback, active); 00249 }
UdpInst & OpenDDS::DCPS::UdpTransport::config | ( | ) | const |
Expose the configuration information so others can see what we can do.
Reimplemented from OpenDDS::DCPS::TransportImpl.
Definition at line 37 of file UdpTransport.cpp.
Referenced by accept_datalink(), connection_info_i(), and passive_connection().
00038 { 00039 return static_cast<UdpInst&>(TransportImpl::config()); 00040 }
bool OpenDDS::DCPS::UdpTransport::configure_i | ( | UdpInst & | config | ) | [protected] |
Definition at line 154 of file UdpTransport.cpp.
References OpenDDS::DCPS::TransportImpl::create_reactor_task(), OpenDDS::DCPS::UdpInst::local_address(), make_datalink(), server_link_, and TheServiceParticipant.
Referenced by UdpTransport().
00155 { 00156 create_reactor_task(); 00157 00158 // Override with DCPSDefaultAddress. 00159 if (config.local_address() == ACE_INET_Addr () && 00160 !TheServiceParticipant->default_address ().empty ()) { 00161 00162 config.local_address(0, TheServiceParticipant->default_address ().c_str ()); 00163 } 00164 00165 // Our "server side" data link is created here, similar to the acceptor_ 00166 // in the TcpTransport implementation. This establishes a socket as an 00167 // endpoint that we can advertise to peers via connection_info_i(). 00168 server_link_ = make_datalink(config.local_address(), 0 /* priority */, false); 00169 return true; 00170 }
TransportImpl::AcceptConnectResult OpenDDS::DCPS::UdpTransport::connect_datalink | ( | const RemoteTransport & | remote, | |
const ConnectionAttribs & | attribs, | |||
const TransportClient_rch & | client | |||
) | [protected, virtual] |
connect_datalink() is called from TransportClient to initiate an association as the active peer. A DataLink may be returned if one is already connected and ready to use, otherwise initiate a connection to the passive side and return from this method. Upon completion of the physical connection, the transport calls back to TransportClient::use_datalink().
Implements OpenDDS::DCPS::TransportImpl.
Definition at line 65 of file UdpTransport.cpp.
References OpenDDS::DCPS::TransportImpl::AcceptConnectResult::ACR_FAILED, OpenDDS::DCPS::TransportImpl::RemoteTransport::blob_, blob_to_key(), client_links_, client_links_lock_, get_connection_addr(), OpenDDS::DCPS::RcHandle< T >::is_nil(), OpenDDS::DCPS::TransportImpl::is_shut_down(), LM_DEBUG, make_datalink(), OpenDDS::DCPS::TransportImpl::ConnectionAttribs::priority_, and VDBG.
00068 { 00069 00070 if (this->is_shut_down()) { 00071 return AcceptConnectResult(AcceptConnectResult::ACR_FAILED); 00072 } 00073 const ACE_INET_Addr remote_address = get_connection_addr(remote.blob_); 00074 const bool active = true; 00075 const PriorityKey key = blob_to_key(remote.blob_, attribs.priority_, this->config().local_address(), active); 00076 00077 GuardType guard(client_links_lock_); 00078 if (this->is_shut_down()) { 00079 return AcceptConnectResult(AcceptConnectResult::ACR_FAILED); 00080 } 00081 00082 const UdpDataLinkMap::iterator it(client_links_.find(key)); 00083 if (it != client_links_.end()) { 00084 VDBG((LM_DEBUG, "(%P|%t) UdpTransport::connect_datalink found\n")); 00085 return AcceptConnectResult(UdpDataLink_rch(it->second)); 00086 } 00087 00088 // Create new DataLink for logical connection: 00089 UdpDataLink_rch link (make_datalink(remote_address, 00090 attribs.priority_, 00091 active)); 00092 00093 if (!link.is_nil()) { 00094 client_links_.insert(UdpDataLinkMap::value_type(key, link)); 00095 VDBG((LM_DEBUG, "(%P|%t) UdpTransport::connect_datalink connected\n")); 00096 } 00097 00098 return AcceptConnectResult(link); 00099 }
bool OpenDDS::DCPS::UdpTransport::connection_info_i | ( | TransportLocator & | local_info | ) | const [protected, virtual] |
Called by our connection_info() method to allow the concrete TransportImpl subclass to do the dirty work since it really is the one that knows how to populate the supplied TransportLocator object.
Implements OpenDDS::DCPS::TransportImpl.
Definition at line 190 of file UdpTransport.cpp.
References config(), and OpenDDS::DCPS::UdpInst::populate_locator().
00191 { 00192 this->config().populate_locator(info); 00193 return true; 00194 }
ACE_INET_Addr OpenDDS::DCPS::UdpTransport::get_connection_addr | ( | const TransportBLOB & | data | ) | const [protected] |
Definition at line 197 of file UdpTransport.cpp.
References len, and OpenDDS::DCPS::NetworkAddress::to_addr().
Referenced by connect_datalink().
00198 { 00199 ACE_INET_Addr local_address; 00200 NetworkAddress network_address; 00201 00202 size_t len = data.length(); 00203 const char* buffer = reinterpret_cast<const char*>(data.get_buffer()); 00204 00205 ACE_InputCDR cdr(buffer, len); 00206 if (cdr >> network_address) { 00207 network_address.to_addr(local_address); 00208 } 00209 00210 return local_address; 00211 }
UdpDataLink_rch OpenDDS::DCPS::UdpTransport::make_datalink | ( | const ACE_INET_Addr & | remote_address, | |
Priority | priority, | |||
bool | active | |||
) | [private] |
Definition at line 44 of file UdpTransport.cpp.
References ACE_TEXT(), OpenDDS::DCPS::RcHandle< T >::in(), LM_ERROR, OpenDDS::DCPS::TransportImpl::reactor_task(), and OpenDDS::DCPS::ref().
Referenced by configure_i(), and connect_datalink().
00046 { 00047 TransportReactorTask_rch rtask (reactor_task()); 00048 UdpDataLink_rch link(make_rch<UdpDataLink>(ref(*this), priority, rtask.in(), active)); 00049 // Configure link with transport configuration and reactor task: 00050 00051 // Open logical connection: 00052 if (link->open(remote_address)) { 00053 return link; 00054 } 00055 00056 ACE_ERROR((LM_ERROR, 00057 ACE_TEXT("(%P|%t) ERROR: ") 00058 ACE_TEXT("UdpTransport::make_datalink: ") 00059 ACE_TEXT("failed to open DataLink!\n"))); 00060 00061 return UdpDataLink_rch(); 00062 }
typedef OpenDDS::DCPS::UdpTransport::OPENDDS_MAP | ( | PriorityKey | , | |
Callbacks | ||||
) | [private] |
typedef OpenDDS::DCPS::UdpTransport::OPENDDS_MAP | ( | PriorityKey | , | |
UdpDataLink_rch | ||||
) | [private] |
Map of fully associated DataLinks for this transport. Protected.
void OpenDDS::DCPS::UdpTransport::passive_connection | ( | const ACE_INET_Addr & | remote_address, | |
const Message_Block_Ptr & | data | |||
) |
Definition at line 252 of file UdpTransport.cpp.
References ACE_Guard< ACE_LOCK >::acquire(), blob_to_key(), config(), connections_lock_, OpenDDS::DCPS::find(), OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), LM_DEBUG, OpenDDS::DCPS::WeakRcHandle< T >::lock(), pending_connections_, pending_server_link_keys_, ACE_Guard< ACE_LOCK >::release(), server_link_, server_link_keys_, OpenDDS::DCPS::static_rchandle_cast(), and VDBG.
00254 { 00255 CORBA::ULong octet_size = 00256 static_cast<CORBA::ULong>(data->length() - sizeof(Priority)); 00257 Priority priority; 00258 Serializer serializer(data.get()); 00259 serializer >> priority; 00260 TransportBLOB blob(octet_size); 00261 blob.length(octet_size); 00262 serializer.read_octet_array(blob.get_buffer(), octet_size); 00263 00264 // Send an ack so that the active side can return from 00265 // connect_datalink_i(). This is just a single byte of 00266 // arbitrary data, the remote side is not yet using the 00267 // framework (TransportHeader, DataSampleHeader, 00268 // ReceiveStrategy). 00269 const char ack_data = 23; 00270 if (server_link_->socket().send(&ack_data, 1, remote_address) <= 0) { 00271 VDBG((LM_DEBUG, "(%P|%t) UdpTransport::passive_connection failed to send ack\n")); 00272 } 00273 00274 const PriorityKey key = blob_to_key(blob, priority, config().local_address(), false /* passive */); 00275 00276 ACE_Guard<ACE_Recursive_Thread_Mutex> guard(connections_lock_); 00277 00278 //GuardType guard(connections_lock_); 00279 const PendConnMap::iterator pend = pending_connections_.find(key); 00280 00281 if (pend != pending_connections_.end()) { 00282 00283 //don't hold connections_lock_ while calling use_datalink 00284 //guard.release(); 00285 00286 VDBG((LM_DEBUG, "(%P|%t) UdpTransport::passive_connection completing\n")); 00287 00288 const DataLink_rch link = static_rchandle_cast<DataLink>(server_link_); 00289 00290 //Insert key now to make sure when releasing guard to call use_datalink 00291 //if an accept_datalink obtains lock first it will see that it can proceed 00292 //with using the link and do its own use_datalink call. 00293 server_link_keys_.insert(key); 00294 00295 //create a copy of the size of callback vector so that if use_datalink_i -> stop_accepting_or_connecting 00296 //finds that callbacks vector is empty and deletes pending connection & its callback vector for loop can 00297 //still exit the loop without checking the size of invalid memory 00298 //size_t num_callbacks = pend->second.size(); 00299 00300 //Create a copy of the vector of callbacks to process, making sure that each is 00301 //still present in the actual pending_connections_ before calling use_datalink 00302 Callbacks tmp(pend->second); 00303 for (size_t i = 0; i < tmp.size(); ++i) { 00304 const PendConnMap::iterator pend = pending_connections_.find(key); 00305 if (pend != pending_connections_.end()) { 00306 const Callbacks::iterator tmp_iter = find(pend->second.begin(), 00307 pend->second.end(), 00308 tmp.at(i)); 00309 if (tmp_iter != pend->second.end()) { 00310 TransportClient_wrch pend_client = tmp.at(i).first; 00311 RepoId remote_repo = tmp.at(i).second; 00312 guard.release(); 00313 TransportClient_rch client = pend_client.lock(); 00314 if (client) 00315 client->use_datalink(remote_repo, link); 00316 guard.acquire(); 00317 } 00318 } 00319 } 00320 } else { 00321 // still hold guard(connections_lock_) at this point so 00322 // pending_server_link_keys_ is protected for insert 00323 00324 VDBG((LM_DEBUG, "(%P|%t) UdpTransport::passive_connection pending\n")); 00325 // accept_datalink() will complete the connection. 00326 pending_server_link_keys_.insert(key); 00327 } 00328 }
void OpenDDS::DCPS::UdpTransport::release_datalink | ( | DataLink * | link | ) | [protected, virtual] |
Called by the TransportRegistry when this TransportImpl object is released while the TransportRegistry is handling a release() "event". The DataLink itself calls this method when it thinks it is no longer used for any associations. This occurs during a "remove associations" operation being performed by some TransportClient that uses this TransportImpl. The TransportClient is known to have acquired our reservation_lock_, so there won't be any reserve_datalink() calls being made from any other threads while we perform this release.
Implements OpenDDS::DCPS::TransportImpl.
Definition at line 214 of file UdpTransport.cpp.
References client_links_, client_links_lock_, and OpenDDS::DCPS::DataLink::stop().
00215 { 00216 GuardType guard(client_links_lock_); 00217 for (UdpDataLinkMap::iterator it(client_links_.begin()); 00218 it != client_links_.end(); ++it) { 00219 // We are guaranteed to have exactly one matching DataLink 00220 // in the map; release any resources held and return. 00221 if (link == static_cast<DataLink*>(it->second.in())) { 00222 link->stop(); 00223 client_links_.erase(it); 00224 return; 00225 } 00226 } 00227 }
void OpenDDS::DCPS::UdpTransport::shutdown_i | ( | ) | [protected, virtual] |
Called during the shutdown() method in order to give the concrete TransportImpl subclass a chance to do something when the shutdown "event" occurs.
Implements OpenDDS::DCPS::TransportImpl.
Definition at line 173 of file UdpTransport.cpp.
References client_links_, client_links_lock_, OpenDDS::DCPS::RcHandle< T >::reset(), and server_link_.
00174 { 00175 // Shutdown reserved datalinks and release configuration: 00176 GuardType guard(client_links_lock_); 00177 for (UdpDataLinkMap::iterator it(client_links_.begin()); 00178 it != client_links_.end(); ++it) { 00179 it->second->transport_shutdown(); 00180 } 00181 client_links_.clear(); 00182 00183 if (server_link_) { 00184 server_link_->transport_shutdown(); 00185 server_link_.reset(); 00186 } 00187 }
void OpenDDS::DCPS::UdpTransport::stop_accepting_or_connecting | ( | const TransportClient_wrch & | client, | |
const RepoId & | remote_id | |||
) | [protected, virtual] |
stop_accepting_or_connecting() is called from TransportClient to terminate the accepting process begun by accept_datalink() or connect_datalink(). This allows the TransportImpl to clean up any resources associated with this pending connection. The TransportClient* passed in to accept or connect is not valid after this method is called.
Implements OpenDDS::DCPS::TransportImpl.
Definition at line 130 of file UdpTransport.cpp.
References connections_lock_, LM_DEBUG, pending_connections_, and VDBG.
00132 { 00133 VDBG((LM_DEBUG, "(%P|%t) UdpTransport::stop_accepting_or_connecting\n")); 00134 00135 //GuardType guard(connections_lock_); 00136 ACE_Guard<ACE_Recursive_Thread_Mutex> guard(connections_lock_); 00137 00138 for (PendConnMap::iterator it = pending_connections_.begin(); 00139 it != pending_connections_.end(); ++it) { 00140 for (size_t i = 0; i < it->second.size(); ++i) { 00141 if (it->second[i].first == client && it->second[i].second == remote_id) { 00142 it->second.erase(it->second.begin() + i); 00143 break; 00144 } 00145 } 00146 if (it->second.empty()) { 00147 pending_connections_.erase(it); 00148 return; 00149 } 00150 } 00151 }
virtual std::string OpenDDS::DCPS::UdpTransport::transport_type | ( | ) | const [inline, protected, virtual] |
Implements OpenDDS::DCPS::TransportImpl.
Definition at line 56 of file UdpTransport.h.
UdpDataLinkMap OpenDDS::DCPS::UdpTransport::client_links_ [private] |
Definition at line 75 of file UdpTransport.h.
Referenced by connect_datalink(), release_datalink(), and shutdown_i().
This lock is used to protect the client_links_ data member.
Definition at line 70 of file UdpTransport.h.
Referenced by connect_datalink(), release_datalink(), and shutdown_i().
This protects the pending_connections_, pending_server_link_keys_, and server_link_keys_ data members.
Definition at line 83 of file UdpTransport.h.
Referenced by accept_datalink(), passive_connection(), and stop_accepting_or_connecting().
PendConnMap OpenDDS::DCPS::UdpTransport::pending_connections_ [private] |
Locked by connections_lock_. Tracks expected connections that we have learned about in accept_datalink() but have not yet performed the handshake.
Reimplemented from OpenDDS::DCPS::TransportImpl.
Definition at line 96 of file UdpTransport.h.
Referenced by accept_datalink(), passive_connection(), and stop_accepting_or_connecting().
std::set<PriorityKey> OpenDDS::DCPS::UdpTransport::pending_server_link_keys_ [private] |
Locked by connections_lock_. These are passive-side PriorityKeys that have finished handshaking, but have not been processed by accept_datalink()
Definition at line 101 of file UdpTransport.h.
Referenced by accept_datalink(), and passive_connection().
The single datalink for the passive side. No locking required.
Definition at line 78 of file UdpTransport.h.
Referenced by accept_datalink(), configure_i(), passive_connection(), and shutdown_i().
std::set<PriorityKey> OpenDDS::DCPS::UdpTransport::server_link_keys_ [private] |
Locked by connections_lock_. These are passive-side PriorityKeys that have been fully associated (processed by accept_datalink() and finished handshaking). They are ready for use and reuse via server_link_.
Definition at line 89 of file UdpTransport.h.
Referenced by accept_datalink(), and passive_connection().