#include <UdpTransport.h>
Inheritance diagram for OpenDDS::DCPS::UdpTransport:
Public Member Functions | |
UdpTransport (const TransportInst_rch &inst) | |
void | passive_connection (const ACE_INET_Addr &remote_address, ACE_Message_Block *data) |
Protected Member Functions | |
virtual AcceptConnectResult | connect_datalink (const RemoteTransport &remote, const ConnectionAttribs &attribs, TransportClient *client) |
virtual AcceptConnectResult | accept_datalink (const RemoteTransport &remote, const ConnectionAttribs &attribs, TransportClient *client) |
virtual void | stop_accepting_or_connecting (TransportClient *client, const RepoId &remote_id) |
virtual bool | configure_i (TransportInst *config) |
virtual void | shutdown_i () |
virtual bool | connection_info_i (TransportLocator &info) const |
ACE_INET_Addr | get_connection_addr (const TransportBLOB &data) const |
virtual void | release_datalink (DataLink *link) |
virtual std::string | transport_type () const |
Private Types | |
typedef ACE_SYNCH_MUTEX | LockType |
typedef ACE_Guard< LockType > | GuardType |
typedef ACE_Condition< LockType > | ConditionType |
typedef std::vector< DataLink::OnStartCallback > | Callbacks |
Private Member Functions | |
UdpDataLink * | make_datalink (const ACE_INET_Addr &remote_address, Priority priority, bool active) |
PriorityKey | blob_to_key (const TransportBLOB &remote, Priority priority, ACE_INET_Addr local_addr, bool active) |
typedef | OPENDDS_MAP (PriorityKey, UdpDataLink_rch) UdpDataLinkMap |
Map of fully associated DataLinks for this transport. Protected. | |
typedef | OPENDDS_MAP (PriorityKey, Callbacks) PendConnMap |
Private Attributes | |
RcHandle< UdpInst > | config_i_ |
LockType | client_links_lock_ |
This lock is used to protect the client_links_ data member. | |
UdpDataLinkMap | client_links_ |
UdpDataLink_rch | server_link_ |
The single datalink for the passive side. No locking required. | |
ACE_Recursive_Thread_Mutex | connections_lock_ |
std::set< PriorityKey > | server_link_keys_ |
PendConnMap | pending_connections_ |
std::set< PriorityKey > | pending_server_link_keys_ |
Definition at line 25 of file UdpTransport.h.
typedef std::vector<DataLink::OnStartCallback> OpenDDS::DCPS::UdpTransport::Callbacks [private] |
Definition at line 90 of file UdpTransport.h.
typedef ACE_Condition<LockType> OpenDDS::DCPS::UdpTransport::ConditionType [private] |
Definition at line 66 of file UdpTransport.h.
typedef ACE_Guard<LockType> OpenDDS::DCPS::UdpTransport::GuardType [private] |
typedef ACE_SYNCH_MUTEX OpenDDS::DCPS::UdpTransport::LockType [private] |
OpenDDS::DCPS::UdpTransport::UdpTransport | ( | const TransportInst_rch & | inst | ) | [explicit] |
Definition at line 26 of file UdpTransport.cpp.
References OpenDDS::DCPS::TransportImpl::configure(), OpenDDS::DCPS::RcHandle< T >::in(), and OpenDDS::DCPS::RcHandle< T >::is_nil().
00027 { 00028 if (!inst.is_nil()) { 00029 if (!configure(inst.in())) { 00030 throw Transport::UnableToCreate(); 00031 } 00032 } 00033 }
TransportImpl::AcceptConnectResult OpenDDS::DCPS::UdpTransport::accept_datalink | ( | const RemoteTransport & | remote, | |
const ConnectionAttribs & | attribs, | |||
TransportClient * | client | |||
) | [protected, virtual] |
Definition at line 114 of file UdpTransport.cpp.
References blob_to_key(), config_i_, connections_lock_, pending_connections_, pending_server_link_keys_, OpenDDS::DCPS::TransportClient::repo_id_, server_link_, server_link_keys_, and VDBG.
00117 { 00118 ACE_Guard<ACE_Recursive_Thread_Mutex> guard(connections_lock_); 00119 //GuardType guard(connections_lock_); 00120 const PriorityKey key = blob_to_key(remote.blob_, 00121 attribs.priority_, config_i_->local_address(), false /* !active */); 00122 if (server_link_keys_.count(key)) { 00123 VDBG((LM_DEBUG, "(%P|%t) UdpTransport::accept_datalink found\n")); 00124 return AcceptConnectResult(UdpDataLink_rch(server_link_)._retn()); 00125 } 00126 00127 else if (pending_server_link_keys_.count(key)) { 00128 pending_server_link_keys_.erase(key); 00129 server_link_keys_.insert(key); 00130 VDBG((LM_DEBUG, "(%P|%t) UdpTransport::accept_datalink completed\n")); 00131 return AcceptConnectResult(UdpDataLink_rch(server_link_)._retn()); 00132 } else { 00133 const DataLink::OnStartCallback callback(client, remote.repo_id_); 00134 pending_connections_[key].push_back(callback); 00135 VDBG((LM_DEBUG, "(%P|%t) UdpTransport::accept_datalink pending\n")); 00136 return AcceptConnectResult(AcceptConnectResult::ACR_SUCCESS); 00137 } 00138 return AcceptConnectResult(); 00139 }
PriorityKey OpenDDS::DCPS::UdpTransport::blob_to_key | ( | const TransportBLOB & | remote, | |
Priority | priority, | |||
ACE_INET_Addr | local_addr, | |||
bool | active | |||
) | [private] |
Definition at line 251 of file UdpTransport.cpp.
References OpenDDS::DCPS::NetworkAddress::to_addr().
Referenced by accept_datalink(), connect_datalink(), and passive_connection().
00255 { 00256 NetworkAddress network_order_address; 00257 ACE_InputCDR cdr((const char*)remote.get_buffer(), remote.length()); 00258 00259 if ((cdr >> network_order_address) == 0) { 00260 ACE_ERROR((LM_ERROR, 00261 ACE_TEXT("(%P|%t) ERROR: UdpTransport::blob_to_key") 00262 ACE_TEXT(" failed to de-serialize the NetworkAddress\n"))); 00263 } 00264 00265 ACE_INET_Addr remote_address; 00266 network_order_address.to_addr(remote_address); 00267 const bool is_loopback = remote_address == local_addr; 00268 00269 return PriorityKey(priority, remote_address, is_loopback, active); 00270 }
bool OpenDDS::DCPS::UdpTransport::configure_i | ( | TransportInst * | config | ) | [protected, virtual] |
Concrete subclass gets a shot at the config object. The subclass will likely downcast the TransportInst object to a subclass type that it expects/requires.
Implements OpenDDS::DCPS::TransportImpl.
Definition at line 166 of file UdpTransport.cpp.
References OpenDDS::DCPS::TransportImpl::config(), config_i_, OpenDDS::DCPS::TransportImpl::create_reactor_task(), make_datalink(), server_link_, and TheServiceParticipant.
00167 { 00168 config_i_ = UdpInst_rch(dynamic_cast<UdpInst*>(config), false); 00169 00170 if (config_i_ == 0) { 00171 ACE_ERROR_RETURN((LM_ERROR, 00172 ACE_TEXT("(%P|%t) ERROR: ") 00173 ACE_TEXT("UdpTransport::configure_i: ") 00174 ACE_TEXT("invalid configuration!\n")), 00175 false); 00176 } 00177 00178 create_reactor_task(); 00179 00180 // Override with DCPSDefaultAddress. 00181 if (this->config_i_->local_address() == ACE_INET_Addr () && 00182 !TheServiceParticipant->default_address ().empty ()) { 00183 this->config_i_->local_address(0, TheServiceParticipant->default_address ().c_str ()); 00184 } 00185 00186 // Our "server side" data link is created here, similar to the acceptor_ 00187 // in the TcpTransport implementation. This establishes a socket as an 00188 // endpoint that we can advertise to peers via connection_info_i(). 00189 server_link_ = make_datalink(this->config_i_->local_address(), 0 /* priority */, false); 00190 return true; 00191 }
TransportImpl::AcceptConnectResult OpenDDS::DCPS::UdpTransport::connect_datalink | ( | const RemoteTransport & | remote, | |
const ConnectionAttribs & | attribs, | |||
TransportClient * | client | |||
) | [protected, virtual] |
Definition at line 77 of file UdpTransport.cpp.
References blob_to_key(), client_links_, client_links_lock_, config_i_, get_connection_addr(), make_datalink(), and VDBG.
00080 { 00081 UdpInst_rch tmp_config(this->config_i_.in(), false); 00082 if (this->is_shut_down() || this->config_i_.is_nil()) { 00083 return AcceptConnectResult(AcceptConnectResult::ACR_FAILED); 00084 } 00085 const ACE_INET_Addr remote_address = get_connection_addr(remote.blob_); 00086 const bool active = true; 00087 const PriorityKey key = blob_to_key(remote.blob_, attribs.priority_, tmp_config->local_address(), active); 00088 tmp_config = 0; //no longer need to hold on to local config 00089 GuardType guard(client_links_lock_); 00090 if (this->is_shut_down() || this->config_i_.is_nil()) { 00091 return AcceptConnectResult(AcceptConnectResult::ACR_FAILED); 00092 } 00093 00094 const UdpDataLinkMap::iterator it(client_links_.find(key)); 00095 if (it != client_links_.end()) { 00096 VDBG((LM_DEBUG, "(%P|%t) UdpTransport::connect_datalink found\n")); 00097 return AcceptConnectResult(UdpDataLink_rch(it->second)._retn()); 00098 } 00099 00100 // Create new DataLink for logical connection: 00101 UdpDataLink_rch link = make_datalink(remote_address, 00102 attribs.priority_, 00103 active); 00104 00105 if (!link.is_nil()) { 00106 client_links_.insert(UdpDataLinkMap::value_type(key, link)); 00107 VDBG((LM_DEBUG, "(%P|%t) UdpTransport::connect_datalink connected\n")); 00108 } 00109 00110 return AcceptConnectResult(link._retn()); 00111 }
bool OpenDDS::DCPS::UdpTransport::connection_info_i | ( | TransportLocator & | info | ) | const [protected, virtual] |
Called by our connection_info() method to allow the concrete TransportImpl subclass to do the dirty work since it really is the one that knows how to populate the supplied TransportLocator object.
Implements OpenDDS::DCPS::TransportImpl.
Definition at line 211 of file UdpTransport.cpp.
References config_i_.
00212 { 00213 this->config_i_->populate_locator(info); 00214 return true; 00215 }
ACE_INET_Addr OpenDDS::DCPS::UdpTransport::get_connection_addr | ( | const TransportBLOB & | data | ) | const [protected] |
Definition at line 218 of file UdpTransport.cpp.
References OpenDDS::DCPS::NetworkAddress::to_addr().
Referenced by connect_datalink().
00219 { 00220 ACE_INET_Addr local_address; 00221 NetworkAddress network_address; 00222 00223 size_t len = data.length(); 00224 const char* buffer = reinterpret_cast<const char*>(data.get_buffer()); 00225 00226 ACE_InputCDR cdr(buffer, len); 00227 cdr >> network_address; 00228 00229 network_address.to_addr(local_address); 00230 00231 return local_address; 00232 }
UdpDataLink * OpenDDS::DCPS::UdpTransport::make_datalink | ( | const ACE_INET_Addr & | remote_address, | |
Priority | priority, | |||
bool | active | |||
) | [private] |
Definition at line 36 of file UdpTransport.cpp.
References OpenDDS::DCPS::RcHandle< T >::_retn(), config_i_, OpenDDS::DCPS::RcHandle< T >::in(), OpenDDS::DCPS::RcHandle< T >::is_nil(), and OpenDDS::DCPS::TransportImpl::reactor_task().
Referenced by configure_i(), and connect_datalink().
00038 { 00039 UdpDataLink_rch link; 00040 ACE_NEW_RETURN(link, UdpDataLink(this, priority, active), 0); 00041 00042 if (link.is_nil()) { 00043 ACE_ERROR_RETURN((LM_ERROR, 00044 ACE_TEXT("(%P|%t) ERROR: ") 00045 ACE_TEXT("UdpTransport::make_datalink: ") 00046 ACE_TEXT("failed to create DataLink!\n")), 00047 0); 00048 } 00049 00050 // Configure link with transport configuration and reactor task: 00051 TransportReactorTask_rch rtask = reactor_task(); 00052 link->configure(config_i_.in(), rtask.in()); 00053 00054 // Assign send strategy: 00055 UdpSendStrategy* send_strategy; 00056 ACE_NEW_RETURN(send_strategy, UdpSendStrategy(link.in()), 0); 00057 link->send_strategy(send_strategy); 00058 00059 // Assign receive strategy: 00060 UdpReceiveStrategy* recv_strategy; 00061 ACE_NEW_RETURN(recv_strategy, UdpReceiveStrategy(link.in()), 0); 00062 link->receive_strategy(recv_strategy); 00063 00064 // Open logical connection: 00065 if (!link->open(remote_address)) { 00066 ACE_ERROR_RETURN((LM_ERROR, 00067 ACE_TEXT("(%P|%t) ERROR: ") 00068 ACE_TEXT("UdpTransport::make_datalink: ") 00069 ACE_TEXT("failed to open DataLink!\n")), 00070 0); 00071 } 00072 00073 return link._retn(); 00074 }
typedef OpenDDS::DCPS::UdpTransport::OPENDDS_MAP | ( | PriorityKey | , | |
Callbacks | ||||
) | [private] |
typedef OpenDDS::DCPS::UdpTransport::OPENDDS_MAP | ( | PriorityKey | , | |
UdpDataLink_rch | ||||
) | [private] |
Map of fully associated DataLinks for this transport. Protected.
void OpenDDS::DCPS::UdpTransport::passive_connection | ( | const ACE_INET_Addr & | remote_address, | |
ACE_Message_Block * | data | |||
) |
Definition at line 273 of file UdpTransport.cpp.
References blob_to_key(), config_i_, connections_lock_, OpenDDS::DCPS::find(), pending_connections_, pending_server_link_keys_, OpenDDS::DCPS::Serializer::read_octet_array(), server_link_, server_link_keys_, and VDBG.
00275 { 00276 CORBA::ULong octet_size = 00277 static_cast<CORBA::ULong>(data->length() - sizeof(Priority)); 00278 Priority priority; 00279 Serializer serializer(data); 00280 serializer >> priority; 00281 TransportBLOB blob(octet_size); 00282 blob.length(octet_size); 00283 serializer.read_octet_array(blob.get_buffer(), octet_size); 00284 00285 // Send an ack so that the active side can return from 00286 // connect_datalink_i(). This is just a single byte of 00287 // arbitrary data, the remote side is not yet using the 00288 // framework (TransportHeader, DataSampleHeader, 00289 // ReceiveStrategy). 00290 const char ack_data = 23; 00291 server_link_->socket().send(&ack_data, 1, remote_address); 00292 00293 const PriorityKey key = blob_to_key(blob, priority, config_i_->local_address(), false /* passive */); 00294 00295 ACE_Guard<ACE_Recursive_Thread_Mutex> guard(connections_lock_); 00296 00297 //GuardType guard(connections_lock_); 00298 const PendConnMap::iterator pend = pending_connections_.find(key); 00299 00300 if (pend != pending_connections_.end()) { 00301 00302 //don't hold connections_lock_ while calling use_datalink 00303 //guard.release(); 00304 00305 VDBG((LM_DEBUG, "(%P|%t) UdpTransport::passive_connection completing\n")); 00306 00307 const DataLink_rch link = static_rchandle_cast<DataLink>(server_link_); 00308 00309 //Insert key now to make sure when releasing guard to call use_datalink 00310 //if an accept_datalink obtains lock first it will see that it can proceed 00311 //with using the link and do its own use_datalink call. 00312 server_link_keys_.insert(key); 00313 00314 //create a copy of the size of callback vector so that if use_datalink_i -> stop_accepting_or_connecting 00315 //finds that callbacks vector is empty and deletes pending connection & its callback vector for loop can 00316 //still exit the loop without checking the size of invalid memory 00317 //size_t num_callbacks = pend->second.size(); 00318 00319 //Create a copy of the vector of callbacks to process, making sure that each is 00320 //still present in the actual pending_connections_ before calling use_datalink 00321 Callbacks tmp(pend->second); 00322 for (size_t i = 0; i < tmp.size(); ++i) { 00323 const PendConnMap::iterator pend = pending_connections_.find(key); 00324 if (pend != pending_connections_.end()) { 00325 const Callbacks::iterator tmp_iter = find(pend->second.begin(), 00326 pend->second.end(), 00327 tmp.at(i)); 00328 if (tmp_iter != pend->second.end()) { 00329 TransportClient* pend_client = tmp.at(i).first; 00330 RepoId remote_repo = tmp.at(i).second; 00331 guard.release(); 00332 pend_client->use_datalink(remote_repo, link); 00333 guard.acquire(); 00334 } 00335 } 00336 } 00337 } else { 00338 // still hold guard(connections_lock_) at this point so 00339 // pending_server_link_keys_ is protected for insert 00340 00341 VDBG((LM_DEBUG, "(%P|%t) UdpTransport::passive_connection pending\n")); 00342 // accept_datalink() will complete the connection. 00343 pending_server_link_keys_.insert(key); 00344 } 00345 }
void OpenDDS::DCPS::UdpTransport::release_datalink | ( | DataLink * | link | ) | [protected, virtual] |
The DataLink itself calls this method when it thinks it is no longer used for any associations. This occurs during a "remove associations" operation being performed by some TransportClient that uses this TransportImpl. The TransportClient is known to have acquired our reservation_lock_, so there won't be any reserve_datalink() calls being made from any other threads while we perform this release.
Implements OpenDDS::DCPS::TransportImpl.
Definition at line 235 of file UdpTransport.cpp.
References client_links_, client_links_lock_, and OpenDDS::DCPS::DataLink::stop().
00236 { 00237 GuardType guard(client_links_lock_); 00238 for (UdpDataLinkMap::iterator it(client_links_.begin()); 00239 it != client_links_.end(); ++it) { 00240 // We are guaranteed to have exactly one matching DataLink 00241 // in the map; release any resources held and return. 00242 if (link == static_cast<DataLink*>(it->second.in())) { 00243 link->stop(); 00244 client_links_.erase(it); 00245 return; 00246 } 00247 } 00248 }
void OpenDDS::DCPS::UdpTransport::shutdown_i | ( | ) | [protected, virtual] |
Called during the shutdown() method in order to give the concrete TransportImpl subclass a chance to do something when the shutdown "event" occurs.
Implements OpenDDS::DCPS::TransportImpl.
Definition at line 194 of file UdpTransport.cpp.
References client_links_, client_links_lock_, config_i_, and server_link_.
00195 { 00196 // Shutdown reserved datalinks and release configuration: 00197 GuardType guard(client_links_lock_); 00198 for (UdpDataLinkMap::iterator it(client_links_.begin()); 00199 it != client_links_.end(); ++it) { 00200 it->second->transport_shutdown(); 00201 } 00202 client_links_.clear(); 00203 00204 server_link_->transport_shutdown(); 00205 server_link_ = 0; 00206 00207 config_i_ = 0; 00208 }
void OpenDDS::DCPS::UdpTransport::stop_accepting_or_connecting | ( | TransportClient * | client, | |
const RepoId & | remote_id | |||
) | [protected, virtual] |
stop_accepting_or_connecting() is called from TransportClient to terminate the accepting process begun by accept_datalink() or connect_datalink(). This allows the TransportImpl to clean up any resources associated with this pending connection. The TransportClient* passed in to accept or connect is not valid after this method is called.
Implements OpenDDS::DCPS::TransportImpl.
Definition at line 142 of file UdpTransport.cpp.
References connections_lock_, pending_connections_, and VDBG.
00144 { 00145 VDBG((LM_DEBUG, "(%P|%t) UdpTransport::stop_accepting_or_connecting\n")); 00146 00147 //GuardType guard(connections_lock_); 00148 ACE_Guard<ACE_Recursive_Thread_Mutex> guard(connections_lock_); 00149 00150 for (PendConnMap::iterator it = pending_connections_.begin(); 00151 it != pending_connections_.end(); ++it) { 00152 for (size_t i = 0; i < it->second.size(); ++i) { 00153 if (it->second[i].first == client && it->second[i].second == remote_id) { 00154 it->second.erase(it->second.begin() + i); 00155 break; 00156 } 00157 } 00158 if (it->second.empty()) { 00159 pending_connections_.erase(it); 00160 return; 00161 } 00162 } 00163 }
virtual std::string OpenDDS::DCPS::UdpTransport::transport_type | ( | ) | const [inline, protected, virtual] |
UdpDataLinkMap OpenDDS::DCPS::UdpTransport::client_links_ [private] |
Definition at line 74 of file UdpTransport.h.
Referenced by connect_datalink(), release_datalink(), and shutdown_i().
This lock is used to protect the client_links_ data member.
Definition at line 69 of file UdpTransport.h.
Referenced by connect_datalink(), release_datalink(), and shutdown_i().
Definition at line 62 of file UdpTransport.h.
Referenced by accept_datalink(), configure_i(), connect_datalink(), connection_info_i(), make_datalink(), passive_connection(), and shutdown_i().
ACE_Recursive_Thread_Mutex OpenDDS::DCPS::UdpTransport::connections_lock_ [private] |
This protects the pending_connections_, pending_server_link_keys_, and server_link_keys_ data members.
Definition at line 82 of file UdpTransport.h.
Referenced by accept_datalink(), passive_connection(), and stop_accepting_or_connecting().
PendConnMap OpenDDS::DCPS::UdpTransport::pending_connections_ [private] |
Locked by connections_lock_. Tracks expected connections that we have learned about in accept_datalink() but have not yet performed the handshake.
Definition at line 95 of file UdpTransport.h.
Referenced by accept_datalink(), passive_connection(), and stop_accepting_or_connecting().
std::set<PriorityKey> OpenDDS::DCPS::UdpTransport::pending_server_link_keys_ [private] |
Locked by connections_lock_. These are passive-side PriorityKeys that have finished handshaking, but have not been processed by accept_datalink()
Definition at line 100 of file UdpTransport.h.
Referenced by accept_datalink(), and passive_connection().
The single datalink for the passive side. No locking required.
Definition at line 77 of file UdpTransport.h.
Referenced by accept_datalink(), configure_i(), passive_connection(), and shutdown_i().
std::set<PriorityKey> OpenDDS::DCPS::UdpTransport::server_link_keys_ [private] |
Locked by connections_lock_. These are passive-side PriorityKeys that have been fully associated (processed by accept_datalink() and finished handshaking). They are ready for use and reuse via server_link_.
Definition at line 88 of file UdpTransport.h.
Referenced by accept_datalink(), and passive_connection().