00001
00002
00003
00004
00005
00006
00007
00008 #include "UdpTransport.h"
00009 #include "UdpInst_rch.h"
00010 #include "UdpInst.h"
00011 #include "UdpSendStrategy.h"
00012 #include "UdpReceiveStrategy.h"
00013
00014 #include "ace/CDR_Base.h"
00015 #include "ace/Log_Msg.h"
00016
00017 #include "dds/DCPS/transport/framework/NetworkAddress.h"
00018 #include "dds/DCPS/transport/framework/PriorityKey.h"
00019 #include "dds/DCPS/transport/framework/TransportClient.h"
00020 #include "dds/DCPS/transport/framework/TransportExceptions.h"
00021 #include "dds/DCPS/AssociationData.h"
00022
00023 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00024
00025 namespace OpenDDS {
00026 namespace DCPS {
00027
00028 UdpTransport::UdpTransport(UdpInst& inst)
00029 : TransportImpl(inst)
00030 {
00031 if (!(configure_i(inst) && open())) {
00032 throw Transport::UnableToCreate();
00033 }
00034 }
00035
00036 UdpInst&
00037 UdpTransport::config() const
00038 {
00039 return static_cast<UdpInst&>(TransportImpl::config());
00040 }
00041
00042
00043 UdpDataLink_rch
00044 UdpTransport::make_datalink(const ACE_INET_Addr& remote_address,
00045 Priority priority, bool active)
00046 {
00047 TransportReactorTask_rch rtask (reactor_task());
00048 UdpDataLink_rch link(make_rch<UdpDataLink>(ref(*this), priority, rtask.in(), active));
00049
00050
00051
00052 if (link->open(remote_address)) {
00053 return link;
00054 }
00055
00056 ACE_ERROR((LM_ERROR,
00057 ACE_TEXT("(%P|%t) ERROR: ")
00058 ACE_TEXT("UdpTransport::make_datalink: ")
00059 ACE_TEXT("failed to open DataLink!\n")));
00060
00061 return UdpDataLink_rch();
00062 }
00063
00064 TransportImpl::AcceptConnectResult
00065 UdpTransport::connect_datalink(const RemoteTransport& remote,
00066 const ConnectionAttribs& attribs,
00067 const TransportClient_rch& )
00068 {
00069
00070 if (this->is_shut_down()) {
00071 return AcceptConnectResult(AcceptConnectResult::ACR_FAILED);
00072 }
00073 const ACE_INET_Addr remote_address = get_connection_addr(remote.blob_);
00074 const bool active = true;
00075 const PriorityKey key = blob_to_key(remote.blob_, attribs.priority_, this->config().local_address(), active);
00076
00077 GuardType guard(client_links_lock_);
00078 if (this->is_shut_down()) {
00079 return AcceptConnectResult(AcceptConnectResult::ACR_FAILED);
00080 }
00081
00082 const UdpDataLinkMap::iterator it(client_links_.find(key));
00083 if (it != client_links_.end()) {
00084 VDBG((LM_DEBUG, "(%P|%t) UdpTransport::connect_datalink found\n"));
00085 return AcceptConnectResult(UdpDataLink_rch(it->second));
00086 }
00087
00088
00089 UdpDataLink_rch link (make_datalink(remote_address,
00090 attribs.priority_,
00091 active));
00092
00093 if (!link.is_nil()) {
00094 client_links_.insert(UdpDataLinkMap::value_type(key, link));
00095 VDBG((LM_DEBUG, "(%P|%t) UdpTransport::connect_datalink connected\n"));
00096 }
00097
00098 return AcceptConnectResult(link);
00099 }
00100
00101 TransportImpl::AcceptConnectResult
00102 UdpTransport::accept_datalink(const RemoteTransport& remote,
00103 const ConnectionAttribs& attribs,
00104 const TransportClient_rch& client)
00105 {
00106 ACE_Guard<ACE_Recursive_Thread_Mutex> guard(connections_lock_);
00107
00108 const PriorityKey key = blob_to_key(remote.blob_,
00109 attribs.priority_, config().local_address(), false );
00110 if (server_link_keys_.count(key)) {
00111 VDBG((LM_DEBUG, "(%P|%t) UdpTransport::accept_datalink found\n"));
00112 return AcceptConnectResult(UdpDataLink_rch(server_link_));
00113 }
00114
00115 else if (pending_server_link_keys_.count(key)) {
00116 pending_server_link_keys_.erase(key);
00117 server_link_keys_.insert(key);
00118 VDBG((LM_DEBUG, "(%P|%t) UdpTransport::accept_datalink completed\n"));
00119 return AcceptConnectResult(UdpDataLink_rch(server_link_));
00120 } else {
00121 const DataLink::OnStartCallback callback(client, remote.repo_id_);
00122 pending_connections_[key].push_back(callback);
00123 VDBG((LM_DEBUG, "(%P|%t) UdpTransport::accept_datalink pending\n"));
00124 return AcceptConnectResult(AcceptConnectResult::ACR_SUCCESS);
00125 }
00126 return AcceptConnectResult();
00127 }
00128
00129 void
00130 UdpTransport::stop_accepting_or_connecting(const TransportClient_wrch& client,
00131 const RepoId& remote_id)
00132 {
00133 VDBG((LM_DEBUG, "(%P|%t) UdpTransport::stop_accepting_or_connecting\n"));
00134
00135
00136 ACE_Guard<ACE_Recursive_Thread_Mutex> guard(connections_lock_);
00137
00138 for (PendConnMap::iterator it = pending_connections_.begin();
00139 it != pending_connections_.end(); ++it) {
00140 for (size_t i = 0; i < it->second.size(); ++i) {
00141 if (it->second[i].first == client && it->second[i].second == remote_id) {
00142 it->second.erase(it->second.begin() + i);
00143 break;
00144 }
00145 }
00146 if (it->second.empty()) {
00147 pending_connections_.erase(it);
00148 return;
00149 }
00150 }
00151 }
00152
00153 bool
00154 UdpTransport::configure_i(UdpInst& config)
00155 {
00156 create_reactor_task();
00157
00158
00159 if (config.local_address() == ACE_INET_Addr () &&
00160 !TheServiceParticipant->default_address ().empty ()) {
00161
00162 config.local_address(0, TheServiceParticipant->default_address ().c_str ());
00163 }
00164
00165
00166
00167
00168 server_link_ = make_datalink(config.local_address(), 0 , false);
00169 return true;
00170 }
00171
00172 void
00173 UdpTransport::shutdown_i()
00174 {
00175
00176 GuardType guard(client_links_lock_);
00177 for (UdpDataLinkMap::iterator it(client_links_.begin());
00178 it != client_links_.end(); ++it) {
00179 it->second->transport_shutdown();
00180 }
00181 client_links_.clear();
00182
00183 if (server_link_) {
00184 server_link_->transport_shutdown();
00185 server_link_.reset();
00186 }
00187 }
00188
00189 bool
00190 UdpTransport::connection_info_i(TransportLocator& info) const
00191 {
00192 this->config().populate_locator(info);
00193 return true;
00194 }
00195
00196 ACE_INET_Addr
00197 UdpTransport::get_connection_addr(const TransportBLOB& data) const
00198 {
00199 ACE_INET_Addr local_address;
00200 NetworkAddress network_address;
00201
00202 size_t len = data.length();
00203 const char* buffer = reinterpret_cast<const char*>(data.get_buffer());
00204
00205 ACE_InputCDR cdr(buffer, len);
00206 if (cdr >> network_address) {
00207 network_address.to_addr(local_address);
00208 }
00209
00210 return local_address;
00211 }
00212
00213 void
00214 UdpTransport::release_datalink(DataLink* link)
00215 {
00216 GuardType guard(client_links_lock_);
00217 for (UdpDataLinkMap::iterator it(client_links_.begin());
00218 it != client_links_.end(); ++it) {
00219
00220
00221 if (link == static_cast<DataLink*>(it->second.in())) {
00222 link->stop();
00223 client_links_.erase(it);
00224 return;
00225 }
00226 }
00227 }
00228
00229 PriorityKey
00230 UdpTransport::blob_to_key(const TransportBLOB& remote,
00231 Priority priority,
00232 ACE_INET_Addr local_addr,
00233 bool active)
00234 {
00235 NetworkAddress network_order_address;
00236 ACE_InputCDR cdr((const char*)remote.get_buffer(), remote.length());
00237
00238 if (!(cdr >> network_order_address)) {
00239 ACE_ERROR((LM_ERROR,
00240 ACE_TEXT("(%P|%t) ERROR: UdpTransport::blob_to_key")
00241 ACE_TEXT(" failed to de-serialize the NetworkAddress\n")));
00242 }
00243
00244 ACE_INET_Addr remote_address;
00245 network_order_address.to_addr(remote_address);
00246 const bool is_loopback = remote_address == local_addr;
00247
00248 return PriorityKey(priority, remote_address, is_loopback, active);
00249 }
00250
00251 void
00252 UdpTransport::passive_connection(const ACE_INET_Addr& remote_address,
00253 const Message_Block_Ptr& data)
00254 {
00255 CORBA::ULong octet_size =
00256 static_cast<CORBA::ULong>(data->length() - sizeof(Priority));
00257 Priority priority;
00258 Serializer serializer(data.get());
00259 serializer >> priority;
00260 TransportBLOB blob(octet_size);
00261 blob.length(octet_size);
00262 serializer.read_octet_array(blob.get_buffer(), octet_size);
00263
00264
00265
00266
00267
00268
00269 const char ack_data = 23;
00270 if (server_link_->socket().send(&ack_data, 1, remote_address) <= 0) {
00271 VDBG((LM_DEBUG, "(%P|%t) UdpTransport::passive_connection failed to send ack\n"));
00272 }
00273
00274 const PriorityKey key = blob_to_key(blob, priority, config().local_address(), false );
00275
00276 ACE_Guard<ACE_Recursive_Thread_Mutex> guard(connections_lock_);
00277
00278
00279 const PendConnMap::iterator pend = pending_connections_.find(key);
00280
00281 if (pend != pending_connections_.end()) {
00282
00283
00284
00285
00286 VDBG((LM_DEBUG, "(%P|%t) UdpTransport::passive_connection completing\n"));
00287
00288 const DataLink_rch link = static_rchandle_cast<DataLink>(server_link_);
00289
00290
00291
00292
00293 server_link_keys_.insert(key);
00294
00295
00296
00297
00298
00299
00300
00301
00302 Callbacks tmp(pend->second);
00303 for (size_t i = 0; i < tmp.size(); ++i) {
00304 const PendConnMap::iterator pend = pending_connections_.find(key);
00305 if (pend != pending_connections_.end()) {
00306 const Callbacks::iterator tmp_iter = find(pend->second.begin(),
00307 pend->second.end(),
00308 tmp.at(i));
00309 if (tmp_iter != pend->second.end()) {
00310 TransportClient_wrch pend_client = tmp.at(i).first;
00311 RepoId remote_repo = tmp.at(i).second;
00312 guard.release();
00313 TransportClient_rch client = pend_client.lock();
00314 if (client)
00315 client->use_datalink(remote_repo, link);
00316 guard.acquire();
00317 }
00318 }
00319 }
00320 } else {
00321
00322
00323
00324 VDBG((LM_DEBUG, "(%P|%t) UdpTransport::passive_connection pending\n"));
00325
00326 pending_server_link_keys_.insert(key);
00327 }
00328 }
00329
00330 }
00331 }
00332
00333 OPENDDS_END_VERSIONED_NAMESPACE_DECL