00001
00002
00003
00004
00005
00006
00007
00008 #include "UdpTransport.h"
00009 #include "UdpInst_rch.h"
00010 #include "UdpInst.h"
00011 #include "UdpSendStrategy.h"
00012 #include "UdpReceiveStrategy.h"
00013
00014 #include "ace/CDR_Base.h"
00015 #include "ace/Log_Msg.h"
00016
00017 #include "dds/DCPS/transport/framework/NetworkAddress.h"
00018 #include "dds/DCPS/transport/framework/PriorityKey.h"
00019 #include "dds/DCPS/transport/framework/TransportClient.h"
00020 #include "dds/DCPS/transport/framework/TransportExceptions.h"
00021 #include "dds/DCPS/AssociationData.h"
00022
00023 namespace OpenDDS {
00024 namespace DCPS {
00025
00026 UdpTransport::UdpTransport(const TransportInst_rch& inst)
00027 {
00028 if (!inst.is_nil()) {
00029 if (!configure(inst.in())) {
00030 throw Transport::UnableToCreate();
00031 }
00032 }
00033 }
00034
00035 UdpDataLink*
00036 UdpTransport::make_datalink(const ACE_INET_Addr& remote_address,
00037 Priority priority, bool active)
00038 {
00039 UdpDataLink_rch link;
00040 ACE_NEW_RETURN(link, UdpDataLink(this, priority, active), 0);
00041
00042 if (link.is_nil()) {
00043 ACE_ERROR_RETURN((LM_ERROR,
00044 ACE_TEXT("(%P|%t) ERROR: ")
00045 ACE_TEXT("UdpTransport::make_datalink: ")
00046 ACE_TEXT("failed to create DataLink!\n")),
00047 0);
00048 }
00049
00050
00051 TransportReactorTask_rch rtask = reactor_task();
00052 link->configure(config_i_.in(), rtask.in());
00053
00054
00055 UdpSendStrategy* send_strategy;
00056 ACE_NEW_RETURN(send_strategy, UdpSendStrategy(link.in()), 0);
00057 link->send_strategy(send_strategy);
00058
00059
00060 UdpReceiveStrategy* recv_strategy;
00061 ACE_NEW_RETURN(recv_strategy, UdpReceiveStrategy(link.in()), 0);
00062 link->receive_strategy(recv_strategy);
00063
00064
00065 if (!link->open(remote_address)) {
00066 ACE_ERROR_RETURN((LM_ERROR,
00067 ACE_TEXT("(%P|%t) ERROR: ")
00068 ACE_TEXT("UdpTransport::make_datalink: ")
00069 ACE_TEXT("failed to open DataLink!\n")),
00070 0);
00071 }
00072
00073 return link._retn();
00074 }
00075
00076 TransportImpl::AcceptConnectResult
00077 UdpTransport::connect_datalink(const RemoteTransport& remote,
00078 const ConnectionAttribs& attribs,
00079 TransportClient* )
00080 {
00081 UdpInst_rch tmp_config(this->config_i_.in(), false);
00082 if (this->is_shut_down() || this->config_i_.is_nil()) {
00083 return AcceptConnectResult(AcceptConnectResult::ACR_FAILED);
00084 }
00085 const ACE_INET_Addr remote_address = get_connection_addr(remote.blob_);
00086 const bool active = true;
00087 const PriorityKey key = blob_to_key(remote.blob_, attribs.priority_, tmp_config->local_address(), active);
00088 tmp_config = 0;
00089 GuardType guard(client_links_lock_);
00090 if (this->is_shut_down() || this->config_i_.is_nil()) {
00091 return AcceptConnectResult(AcceptConnectResult::ACR_FAILED);
00092 }
00093
00094 const UdpDataLinkMap::iterator it(client_links_.find(key));
00095 if (it != client_links_.end()) {
00096 VDBG((LM_DEBUG, "(%P|%t) UdpTransport::connect_datalink found\n"));
00097 return AcceptConnectResult(UdpDataLink_rch(it->second)._retn());
00098 }
00099
00100
00101 UdpDataLink_rch link = make_datalink(remote_address,
00102 attribs.priority_,
00103 active);
00104
00105 if (!link.is_nil()) {
00106 client_links_.insert(UdpDataLinkMap::value_type(key, link));
00107 VDBG((LM_DEBUG, "(%P|%t) UdpTransport::connect_datalink connected\n"));
00108 }
00109
00110 return AcceptConnectResult(link._retn());
00111 }
00112
00113 TransportImpl::AcceptConnectResult
00114 UdpTransport::accept_datalink(const RemoteTransport& remote,
00115 const ConnectionAttribs& attribs,
00116 TransportClient* client)
00117 {
00118 ACE_Guard<ACE_Recursive_Thread_Mutex> guard(connections_lock_);
00119
00120 const PriorityKey key = blob_to_key(remote.blob_,
00121 attribs.priority_, config_i_->local_address(), false );
00122 if (server_link_keys_.count(key)) {
00123 VDBG((LM_DEBUG, "(%P|%t) UdpTransport::accept_datalink found\n"));
00124 return AcceptConnectResult(UdpDataLink_rch(server_link_)._retn());
00125 }
00126
00127 else if (pending_server_link_keys_.count(key)) {
00128 pending_server_link_keys_.erase(key);
00129 server_link_keys_.insert(key);
00130 VDBG((LM_DEBUG, "(%P|%t) UdpTransport::accept_datalink completed\n"));
00131 return AcceptConnectResult(UdpDataLink_rch(server_link_)._retn());
00132 } else {
00133 const DataLink::OnStartCallback callback(client, remote.repo_id_);
00134 pending_connections_[key].push_back(callback);
00135 VDBG((LM_DEBUG, "(%P|%t) UdpTransport::accept_datalink pending\n"));
00136 return AcceptConnectResult(AcceptConnectResult::ACR_SUCCESS);
00137 }
00138 return AcceptConnectResult();
00139 }
00140
00141 void
00142 UdpTransport::stop_accepting_or_connecting(TransportClient* client,
00143 const RepoId& remote_id)
00144 {
00145 VDBG((LM_DEBUG, "(%P|%t) UdpTransport::stop_accepting_or_connecting\n"));
00146
00147
00148 ACE_Guard<ACE_Recursive_Thread_Mutex> guard(connections_lock_);
00149
00150 for (PendConnMap::iterator it = pending_connections_.begin();
00151 it != pending_connections_.end(); ++it) {
00152 for (size_t i = 0; i < it->second.size(); ++i) {
00153 if (it->second[i].first == client && it->second[i].second == remote_id) {
00154 it->second.erase(it->second.begin() + i);
00155 break;
00156 }
00157 }
00158 if (it->second.empty()) {
00159 pending_connections_.erase(it);
00160 return;
00161 }
00162 }
00163 }
00164
00165 bool
00166 UdpTransport::configure_i(TransportInst* config)
00167 {
00168 config_i_ = UdpInst_rch(dynamic_cast<UdpInst*>(config), false);
00169
00170 if (config_i_ == 0) {
00171 ACE_ERROR_RETURN((LM_ERROR,
00172 ACE_TEXT("(%P|%t) ERROR: ")
00173 ACE_TEXT("UdpTransport::configure_i: ")
00174 ACE_TEXT("invalid configuration!\n")),
00175 false);
00176 }
00177
00178 create_reactor_task();
00179
00180
00181 if (this->config_i_->local_address() == ACE_INET_Addr () &&
00182 !TheServiceParticipant->default_address ().empty ()) {
00183 this->config_i_->local_address(0, TheServiceParticipant->default_address ().c_str ());
00184 }
00185
00186
00187
00188
00189 server_link_ = make_datalink(this->config_i_->local_address(), 0 , false);
00190 return true;
00191 }
00192
00193 void
00194 UdpTransport::shutdown_i()
00195 {
00196
00197 GuardType guard(client_links_lock_);
00198 for (UdpDataLinkMap::iterator it(client_links_.begin());
00199 it != client_links_.end(); ++it) {
00200 it->second->transport_shutdown();
00201 }
00202 client_links_.clear();
00203
00204 server_link_->transport_shutdown();
00205 server_link_ = 0;
00206
00207 config_i_ = 0;
00208 }
00209
00210 bool
00211 UdpTransport::connection_info_i(TransportLocator& info) const
00212 {
00213 this->config_i_->populate_locator(info);
00214 return true;
00215 }
00216
00217 ACE_INET_Addr
00218 UdpTransport::get_connection_addr(const TransportBLOB& data) const
00219 {
00220 ACE_INET_Addr local_address;
00221 NetworkAddress network_address;
00222
00223 size_t len = data.length();
00224 const char* buffer = reinterpret_cast<const char*>(data.get_buffer());
00225
00226 ACE_InputCDR cdr(buffer, len);
00227 cdr >> network_address;
00228
00229 network_address.to_addr(local_address);
00230
00231 return local_address;
00232 }
00233
00234 void
00235 UdpTransport::release_datalink(DataLink* link)
00236 {
00237 GuardType guard(client_links_lock_);
00238 for (UdpDataLinkMap::iterator it(client_links_.begin());
00239 it != client_links_.end(); ++it) {
00240
00241
00242 if (link == static_cast<DataLink*>(it->second.in())) {
00243 link->stop();
00244 client_links_.erase(it);
00245 return;
00246 }
00247 }
00248 }
00249
00250 PriorityKey
00251 UdpTransport::blob_to_key(const TransportBLOB& remote,
00252 Priority priority,
00253 ACE_INET_Addr local_addr,
00254 bool active)
00255 {
00256 NetworkAddress network_order_address;
00257 ACE_InputCDR cdr((const char*)remote.get_buffer(), remote.length());
00258
00259 if ((cdr >> network_order_address) == 0) {
00260 ACE_ERROR((LM_ERROR,
00261 ACE_TEXT("(%P|%t) ERROR: UdpTransport::blob_to_key")
00262 ACE_TEXT(" failed to de-serialize the NetworkAddress\n")));
00263 }
00264
00265 ACE_INET_Addr remote_address;
00266 network_order_address.to_addr(remote_address);
00267 const bool is_loopback = remote_address == local_addr;
00268
00269 return PriorityKey(priority, remote_address, is_loopback, active);
00270 }
00271
00272 void
00273 UdpTransport::passive_connection(const ACE_INET_Addr& remote_address,
00274 ACE_Message_Block* data)
00275 {
00276 CORBA::ULong octet_size =
00277 static_cast<CORBA::ULong>(data->length() - sizeof(Priority));
00278 Priority priority;
00279 Serializer serializer(data);
00280 serializer >> priority;
00281 TransportBLOB blob(octet_size);
00282 blob.length(octet_size);
00283 serializer.read_octet_array(blob.get_buffer(), octet_size);
00284
00285
00286
00287
00288
00289
00290 const char ack_data = 23;
00291 server_link_->socket().send(&ack_data, 1, remote_address);
00292
00293 const PriorityKey key = blob_to_key(blob, priority, config_i_->local_address(), false );
00294
00295 ACE_Guard<ACE_Recursive_Thread_Mutex> guard(connections_lock_);
00296
00297
00298 const PendConnMap::iterator pend = pending_connections_.find(key);
00299
00300 if (pend != pending_connections_.end()) {
00301
00302
00303
00304
00305 VDBG((LM_DEBUG, "(%P|%t) UdpTransport::passive_connection completing\n"));
00306
00307 const DataLink_rch link = static_rchandle_cast<DataLink>(server_link_);
00308
00309
00310
00311
00312 server_link_keys_.insert(key);
00313
00314
00315
00316
00317
00318
00319
00320
00321 Callbacks tmp(pend->second);
00322 for (size_t i = 0; i < tmp.size(); ++i) {
00323 const PendConnMap::iterator pend = pending_connections_.find(key);
00324 if (pend != pending_connections_.end()) {
00325 const Callbacks::iterator tmp_iter = find(pend->second.begin(),
00326 pend->second.end(),
00327 tmp.at(i));
00328 if (tmp_iter != pend->second.end()) {
00329 TransportClient* pend_client = tmp.at(i).first;
00330 RepoId remote_repo = tmp.at(i).second;
00331 guard.release();
00332 pend_client->use_datalink(remote_repo, link);
00333 guard.acquire();
00334 }
00335 }
00336 }
00337 } else {
00338
00339
00340
00341 VDBG((LM_DEBUG, "(%P|%t) UdpTransport::passive_connection pending\n"));
00342
00343 pending_server_link_keys_.insert(key);
00344 }
00345 }
00346
00347 }
00348 }