UdpTransport.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "UdpTransport.h"
00009 #include "UdpInst_rch.h"
00010 #include "UdpInst.h"
00011 #include "UdpSendStrategy.h"
00012 #include "UdpReceiveStrategy.h"
00013 
00014 #include "ace/CDR_Base.h"
00015 #include "ace/Log_Msg.h"
00016 
00017 #include "dds/DCPS/transport/framework/NetworkAddress.h"
00018 #include "dds/DCPS/transport/framework/PriorityKey.h"
00019 #include "dds/DCPS/transport/framework/TransportClient.h"
00020 #include "dds/DCPS/transport/framework/TransportExceptions.h"
00021 #include "dds/DCPS/AssociationData.h"
00022 
00023 namespace OpenDDS {
00024 namespace DCPS {
00025 
00026 UdpTransport::UdpTransport(const TransportInst_rch& inst)
00027 {
00028   if (!inst.is_nil()) {
00029     if (!configure(inst.in())) {
00030       throw Transport::UnableToCreate();
00031     }
00032   }
00033 }
00034 
00035 UdpDataLink*
00036 UdpTransport::make_datalink(const ACE_INET_Addr& remote_address,
00037                             Priority priority, bool active)
00038 {
00039   UdpDataLink_rch link;
00040   ACE_NEW_RETURN(link, UdpDataLink(this, priority, active), 0);
00041 
00042   if (link.is_nil()) {
00043     ACE_ERROR_RETURN((LM_ERROR,
00044                       ACE_TEXT("(%P|%t) ERROR: ")
00045                       ACE_TEXT("UdpTransport::make_datalink: ")
00046                       ACE_TEXT("failed to create DataLink!\n")),
00047                      0);
00048   }
00049 
00050   // Configure link with transport configuration and reactor task:
00051   TransportReactorTask_rch rtask = reactor_task();
00052   link->configure(config_i_.in(), rtask.in());
00053 
00054   // Assign send strategy:
00055   UdpSendStrategy* send_strategy;
00056   ACE_NEW_RETURN(send_strategy, UdpSendStrategy(link.in()), 0);
00057   link->send_strategy(send_strategy);
00058 
00059   // Assign receive strategy:
00060   UdpReceiveStrategy* recv_strategy;
00061   ACE_NEW_RETURN(recv_strategy, UdpReceiveStrategy(link.in()), 0);
00062   link->receive_strategy(recv_strategy);
00063 
00064   // Open logical connection:
00065   if (!link->open(remote_address)) {
00066     ACE_ERROR_RETURN((LM_ERROR,
00067                       ACE_TEXT("(%P|%t) ERROR: ")
00068                       ACE_TEXT("UdpTransport::make_datalink: ")
00069                       ACE_TEXT("failed to open DataLink!\n")),
00070                      0);
00071   }
00072 
00073   return link._retn();
00074 }
00075 
00076 TransportImpl::AcceptConnectResult
00077 UdpTransport::connect_datalink(const RemoteTransport& remote,
00078                                const ConnectionAttribs& attribs,
00079                                TransportClient* )
00080 {
00081   UdpInst_rch tmp_config(this->config_i_.in(), false);
00082   if (this->is_shut_down() || this->config_i_.is_nil()) {
00083     return AcceptConnectResult(AcceptConnectResult::ACR_FAILED);
00084   }
00085   const ACE_INET_Addr remote_address = get_connection_addr(remote.blob_);
00086   const bool active = true;
00087   const PriorityKey key = blob_to_key(remote.blob_, attribs.priority_, tmp_config->local_address(), active);
00088   tmp_config = 0; //no longer need to hold on to local config
00089   GuardType guard(client_links_lock_);
00090   if (this->is_shut_down() || this->config_i_.is_nil()) {
00091     return AcceptConnectResult(AcceptConnectResult::ACR_FAILED);
00092   }
00093 
00094   const UdpDataLinkMap::iterator it(client_links_.find(key));
00095   if (it != client_links_.end()) {
00096     VDBG((LM_DEBUG, "(%P|%t) UdpTransport::connect_datalink found\n"));
00097     return AcceptConnectResult(UdpDataLink_rch(it->second)._retn());
00098   }
00099 
00100   // Create new DataLink for logical connection:
00101   UdpDataLink_rch link = make_datalink(remote_address,
00102                                        attribs.priority_,
00103                                        active);
00104 
00105   if (!link.is_nil()) {
00106     client_links_.insert(UdpDataLinkMap::value_type(key, link));
00107     VDBG((LM_DEBUG, "(%P|%t) UdpTransport::connect_datalink connected\n"));
00108   }
00109 
00110   return AcceptConnectResult(link._retn());
00111 }
00112 
00113 TransportImpl::AcceptConnectResult
00114 UdpTransport::accept_datalink(const RemoteTransport& remote,
00115                               const ConnectionAttribs& attribs,
00116                               TransportClient* client)
00117 {
00118   ACE_Guard<ACE_Recursive_Thread_Mutex> guard(connections_lock_);
00119   //GuardType guard(connections_lock_);
00120   const PriorityKey key = blob_to_key(remote.blob_,
00121                                       attribs.priority_, config_i_->local_address(), false /* !active */);
00122   if (server_link_keys_.count(key)) {
00123     VDBG((LM_DEBUG, "(%P|%t) UdpTransport::accept_datalink found\n"));
00124     return AcceptConnectResult(UdpDataLink_rch(server_link_)._retn());
00125   }
00126 
00127   else if (pending_server_link_keys_.count(key)) {
00128     pending_server_link_keys_.erase(key);
00129     server_link_keys_.insert(key);
00130     VDBG((LM_DEBUG, "(%P|%t) UdpTransport::accept_datalink completed\n"));
00131     return AcceptConnectResult(UdpDataLink_rch(server_link_)._retn());
00132   } else {
00133     const DataLink::OnStartCallback callback(client, remote.repo_id_);
00134     pending_connections_[key].push_back(callback);
00135     VDBG((LM_DEBUG, "(%P|%t) UdpTransport::accept_datalink pending\n"));
00136     return AcceptConnectResult(AcceptConnectResult::ACR_SUCCESS);
00137   }
00138      return AcceptConnectResult();
00139 }
00140 
00141 void
00142 UdpTransport::stop_accepting_or_connecting(TransportClient* client,
00143                                            const RepoId& remote_id)
00144 {
00145   VDBG((LM_DEBUG, "(%P|%t) UdpTransport::stop_accepting_or_connecting\n"));
00146 
00147   //GuardType guard(connections_lock_);
00148   ACE_Guard<ACE_Recursive_Thread_Mutex> guard(connections_lock_);
00149 
00150   for (PendConnMap::iterator it = pending_connections_.begin();
00151        it != pending_connections_.end(); ++it) {
00152     for (size_t i = 0; i < it->second.size(); ++i) {
00153       if (it->second[i].first == client && it->second[i].second == remote_id) {
00154         it->second.erase(it->second.begin() + i);
00155         break;
00156       }
00157     }
00158     if (it->second.empty()) {
00159       pending_connections_.erase(it);
00160       return;
00161     }
00162   }
00163 }
00164 
00165 bool
00166 UdpTransport::configure_i(TransportInst* config)
00167 {
00168   config_i_ = UdpInst_rch(dynamic_cast<UdpInst*>(config), false);
00169 
00170   if (config_i_ == 0) {
00171     ACE_ERROR_RETURN((LM_ERROR,
00172                       ACE_TEXT("(%P|%t) ERROR: ")
00173                       ACE_TEXT("UdpTransport::configure_i: ")
00174                       ACE_TEXT("invalid configuration!\n")),
00175                      false);
00176   }
00177 
00178   create_reactor_task();
00179 
00180   // Override with DCPSDefaultAddress.
00181   if (this->config_i_->local_address() == ACE_INET_Addr () &&
00182       !TheServiceParticipant->default_address ().empty ()) {
00183     this->config_i_->local_address(0, TheServiceParticipant->default_address ().c_str ());
00184   }
00185 
00186   // Our "server side" data link is created here, similar to the acceptor_
00187   // in the TcpTransport implementation.  This establishes a socket as an
00188   // endpoint that we can advertise to peers via connection_info_i().
00189   server_link_ = make_datalink(this->config_i_->local_address(), 0 /* priority */, false);
00190   return true;
00191 }
00192 
00193 void
00194 UdpTransport::shutdown_i()
00195 {
00196   // Shutdown reserved datalinks and release configuration:
00197   GuardType guard(client_links_lock_);
00198   for (UdpDataLinkMap::iterator it(client_links_.begin());
00199        it != client_links_.end(); ++it) {
00200     it->second->transport_shutdown();
00201   }
00202   client_links_.clear();
00203 
00204   server_link_->transport_shutdown();
00205   server_link_ = 0;
00206 
00207   config_i_ = 0;
00208 }
00209 
00210 bool
00211 UdpTransport::connection_info_i(TransportLocator& info) const
00212 {
00213   this->config_i_->populate_locator(info);
00214   return true;
00215 }
00216 
00217 ACE_INET_Addr
00218 UdpTransport::get_connection_addr(const TransportBLOB& data) const
00219 {
00220   ACE_INET_Addr local_address;
00221   NetworkAddress network_address;
00222 
00223   size_t len = data.length();
00224   const char* buffer = reinterpret_cast<const char*>(data.get_buffer());
00225 
00226   ACE_InputCDR cdr(buffer, len);
00227   cdr >> network_address;
00228 
00229   network_address.to_addr(local_address);
00230 
00231   return local_address;
00232 }
00233 
00234 void
00235 UdpTransport::release_datalink(DataLink* link)
00236 {
00237   GuardType guard(client_links_lock_);
00238   for (UdpDataLinkMap::iterator it(client_links_.begin());
00239        it != client_links_.end(); ++it) {
00240     // We are guaranteed to have exactly one matching DataLink
00241     // in the map; release any resources held and return.
00242     if (link == static_cast<DataLink*>(it->second.in())) {
00243       link->stop();
00244       client_links_.erase(it);
00245       return;
00246     }
00247   }
00248 }
00249 
00250 PriorityKey
00251 UdpTransport::blob_to_key(const TransportBLOB& remote,
00252                           Priority priority,
00253                           ACE_INET_Addr local_addr,
00254                           bool active)
00255 {
00256   NetworkAddress network_order_address;
00257   ACE_InputCDR cdr((const char*)remote.get_buffer(), remote.length());
00258 
00259   if ((cdr >> network_order_address) == 0) {
00260     ACE_ERROR((LM_ERROR,
00261                ACE_TEXT("(%P|%t) ERROR: UdpTransport::blob_to_key")
00262                ACE_TEXT(" failed to de-serialize the NetworkAddress\n")));
00263   }
00264 
00265   ACE_INET_Addr remote_address;
00266   network_order_address.to_addr(remote_address);
00267   const bool is_loopback = remote_address == local_addr;
00268 
00269   return PriorityKey(priority, remote_address, is_loopback, active);
00270 }
00271 
00272 void
00273 UdpTransport::passive_connection(const ACE_INET_Addr& remote_address,
00274                                  ACE_Message_Block* data)
00275 {
00276   CORBA::ULong octet_size =
00277     static_cast<CORBA::ULong>(data->length() - sizeof(Priority));
00278   Priority priority;
00279   Serializer serializer(data);
00280   serializer >> priority;
00281   TransportBLOB blob(octet_size);
00282   blob.length(octet_size);
00283   serializer.read_octet_array(blob.get_buffer(), octet_size);
00284 
00285   // Send an ack so that the active side can return from
00286   // connect_datalink_i().  This is just a single byte of
00287   // arbitrary data, the remote side is not yet using the
00288   // framework (TransportHeader, DataSampleHeader,
00289   // ReceiveStrategy).
00290   const char ack_data = 23;
00291   server_link_->socket().send(&ack_data, 1, remote_address);
00292 
00293   const PriorityKey key = blob_to_key(blob, priority, config_i_->local_address(), false /* passive */);
00294 
00295   ACE_Guard<ACE_Recursive_Thread_Mutex> guard(connections_lock_);
00296 
00297   //GuardType guard(connections_lock_);
00298   const PendConnMap::iterator pend = pending_connections_.find(key);
00299 
00300   if (pend != pending_connections_.end()) {
00301 
00302      //don't hold connections_lock_ while calling use_datalink
00303      //guard.release();
00304 
00305     VDBG((LM_DEBUG, "(%P|%t) UdpTransport::passive_connection completing\n"));
00306 
00307     const DataLink_rch link = static_rchandle_cast<DataLink>(server_link_);
00308 
00309     //Insert key now to make sure when releasing guard to call use_datalink
00310     //if an accept_datalink obtains lock first it will see that it can proceed
00311     //with using the link and do its own use_datalink call.
00312     server_link_keys_.insert(key);
00313 
00314     //create a copy of the size of callback vector so that if use_datalink_i -> stop_accepting_or_connecting
00315     //finds that callbacks vector is empty and deletes pending connection & its callback vector for loop can
00316     //still exit the loop without checking the size of invalid memory
00317     //size_t num_callbacks = pend->second.size();
00318 
00319     //Create a copy of the vector of callbacks to process, making sure that each is
00320     //still present in the actual pending_connections_ before calling use_datalink
00321     Callbacks tmp(pend->second);
00322     for (size_t i = 0; i < tmp.size(); ++i) {
00323       const PendConnMap::iterator pend = pending_connections_.find(key);
00324       if (pend != pending_connections_.end()) {
00325         const Callbacks::iterator tmp_iter = find(pend->second.begin(),
00326                                                   pend->second.end(),
00327                                                   tmp.at(i));
00328         if (tmp_iter != pend->second.end()) {
00329           TransportClient* pend_client = tmp.at(i).first;
00330           RepoId remote_repo = tmp.at(i).second;
00331           guard.release();
00332           pend_client->use_datalink(remote_repo, link);
00333           guard.acquire();
00334         }
00335       }
00336     }
00337   } else {
00338     // still hold guard(connections_lock_) at this point so
00339     // pending_server_link_keys_ is protected for insert
00340 
00341     VDBG((LM_DEBUG, "(%P|%t) UdpTransport::passive_connection pending\n"));
00342     // accept_datalink() will complete the connection.
00343     pending_server_link_keys_.insert(key);
00344   }
00345 }
00346 
00347 } // namespace DCPS
00348 } // namespace OpenDDS

Generated on Fri Feb 12 20:05:29 2016 for OpenDDS by  doxygen 1.4.7