UdpTransport.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "UdpTransport.h"
00009 #include "UdpInst_rch.h"
00010 #include "UdpInst.h"
00011 #include "UdpSendStrategy.h"
00012 #include "UdpReceiveStrategy.h"
00013 
00014 #include "ace/CDR_Base.h"
00015 #include "ace/Log_Msg.h"
00016 
00017 #include "dds/DCPS/transport/framework/NetworkAddress.h"
00018 #include "dds/DCPS/transport/framework/PriorityKey.h"
00019 #include "dds/DCPS/transport/framework/TransportClient.h"
00020 #include "dds/DCPS/transport/framework/TransportExceptions.h"
00021 #include "dds/DCPS/AssociationData.h"
00022 
00023 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00024 
00025 namespace OpenDDS {
00026 namespace DCPS {
00027 
00028 UdpTransport::UdpTransport(UdpInst& inst)
00029   : TransportImpl(inst)
00030 {
00031   if (!(configure_i(inst) && open())) {
00032     throw Transport::UnableToCreate();
00033   }
00034 }
00035 
00036 UdpInst&
00037 UdpTransport::config() const
00038 {
00039   return static_cast<UdpInst&>(TransportImpl::config());
00040 }
00041 
00042 
00043 UdpDataLink_rch
00044 UdpTransport::make_datalink(const ACE_INET_Addr& remote_address,
00045                             Priority priority, bool active)
00046 {
00047   TransportReactorTask_rch rtask (reactor_task());
00048   UdpDataLink_rch link(make_rch<UdpDataLink>(ref(*this), priority, rtask.in(), active));
00049   // Configure link with transport configuration and reactor task:
00050 
00051   // Open logical connection:
00052   if (link->open(remote_address)) {
00053     return link;
00054   }
00055 
00056   ACE_ERROR((LM_ERROR,
00057               ACE_TEXT("(%P|%t) ERROR: ")
00058               ACE_TEXT("UdpTransport::make_datalink: ")
00059               ACE_TEXT("failed to open DataLink!\n")));
00060 
00061   return UdpDataLink_rch();
00062 }
00063 
00064 TransportImpl::AcceptConnectResult
00065 UdpTransport::connect_datalink(const RemoteTransport& remote,
00066                                const ConnectionAttribs& attribs,
00067                                const TransportClient_rch& )
00068 {
00069 
00070   if (this->is_shut_down()) {
00071     return AcceptConnectResult(AcceptConnectResult::ACR_FAILED);
00072   }
00073   const ACE_INET_Addr remote_address = get_connection_addr(remote.blob_);
00074   const bool active = true;
00075   const PriorityKey key = blob_to_key(remote.blob_, attribs.priority_, this->config().local_address(), active);
00076 
00077   GuardType guard(client_links_lock_);
00078   if (this->is_shut_down()) {
00079     return AcceptConnectResult(AcceptConnectResult::ACR_FAILED);
00080   }
00081 
00082   const UdpDataLinkMap::iterator it(client_links_.find(key));
00083   if (it != client_links_.end()) {
00084     VDBG((LM_DEBUG, "(%P|%t) UdpTransport::connect_datalink found\n"));
00085     return AcceptConnectResult(UdpDataLink_rch(it->second));
00086   }
00087 
00088   // Create new DataLink for logical connection:
00089   UdpDataLink_rch link (make_datalink(remote_address,
00090                                        attribs.priority_,
00091                                        active));
00092 
00093   if (!link.is_nil()) {
00094     client_links_.insert(UdpDataLinkMap::value_type(key, link));
00095     VDBG((LM_DEBUG, "(%P|%t) UdpTransport::connect_datalink connected\n"));
00096   }
00097 
00098   return AcceptConnectResult(link);
00099 }
00100 
00101 TransportImpl::AcceptConnectResult
00102 UdpTransport::accept_datalink(const RemoteTransport& remote,
00103                               const ConnectionAttribs& attribs,
00104                               const TransportClient_rch& client)
00105 {
00106   ACE_Guard<ACE_Recursive_Thread_Mutex> guard(connections_lock_);
00107   //GuardType guard(connections_lock_);
00108   const PriorityKey key = blob_to_key(remote.blob_,
00109                                       attribs.priority_, config().local_address(), false /* !active */);
00110   if (server_link_keys_.count(key)) {
00111     VDBG((LM_DEBUG, "(%P|%t) UdpTransport::accept_datalink found\n"));
00112     return AcceptConnectResult(UdpDataLink_rch(server_link_));
00113   }
00114 
00115   else if (pending_server_link_keys_.count(key)) {
00116     pending_server_link_keys_.erase(key);
00117     server_link_keys_.insert(key);
00118     VDBG((LM_DEBUG, "(%P|%t) UdpTransport::accept_datalink completed\n"));
00119     return AcceptConnectResult(UdpDataLink_rch(server_link_));
00120   } else {
00121     const DataLink::OnStartCallback callback(client, remote.repo_id_);
00122     pending_connections_[key].push_back(callback);
00123     VDBG((LM_DEBUG, "(%P|%t) UdpTransport::accept_datalink pending\n"));
00124     return AcceptConnectResult(AcceptConnectResult::ACR_SUCCESS);
00125   }
00126      return AcceptConnectResult();
00127 }
00128 
00129 void
00130 UdpTransport::stop_accepting_or_connecting(const TransportClient_wrch& client,
00131                                            const RepoId& remote_id)
00132 {
00133   VDBG((LM_DEBUG, "(%P|%t) UdpTransport::stop_accepting_or_connecting\n"));
00134 
00135   //GuardType guard(connections_lock_);
00136   ACE_Guard<ACE_Recursive_Thread_Mutex> guard(connections_lock_);
00137 
00138   for (PendConnMap::iterator it = pending_connections_.begin();
00139        it != pending_connections_.end(); ++it) {
00140     for (size_t i = 0; i < it->second.size(); ++i) {
00141       if (it->second[i].first == client && it->second[i].second == remote_id) {
00142         it->second.erase(it->second.begin() + i);
00143         break;
00144       }
00145     }
00146     if (it->second.empty()) {
00147       pending_connections_.erase(it);
00148       return;
00149     }
00150   }
00151 }
00152 
00153 bool
00154 UdpTransport::configure_i(UdpInst& config)
00155 {
00156   create_reactor_task();
00157 
00158   // Override with DCPSDefaultAddress.
00159   if (config.local_address() == ACE_INET_Addr () &&
00160       !TheServiceParticipant->default_address ().empty ()) {
00161 
00162     config.local_address(0, TheServiceParticipant->default_address ().c_str ());
00163   }
00164 
00165   // Our "server side" data link is created here, similar to the acceptor_
00166   // in the TcpTransport implementation.  This establishes a socket as an
00167   // endpoint that we can advertise to peers via connection_info_i().
00168   server_link_ = make_datalink(config.local_address(), 0 /* priority */, false);
00169   return true;
00170 }
00171 
00172 void
00173 UdpTransport::shutdown_i()
00174 {
00175   // Shutdown reserved datalinks and release configuration:
00176   GuardType guard(client_links_lock_);
00177   for (UdpDataLinkMap::iterator it(client_links_.begin());
00178        it != client_links_.end(); ++it) {
00179     it->second->transport_shutdown();
00180   }
00181   client_links_.clear();
00182 
00183   if (server_link_) {
00184     server_link_->transport_shutdown();
00185     server_link_.reset();
00186   }
00187 }
00188 
00189 bool
00190 UdpTransport::connection_info_i(TransportLocator& info) const
00191 {
00192   this->config().populate_locator(info);
00193   return true;
00194 }
00195 
00196 ACE_INET_Addr
00197 UdpTransport::get_connection_addr(const TransportBLOB& data) const
00198 {
00199   ACE_INET_Addr local_address;
00200   NetworkAddress network_address;
00201 
00202   size_t len = data.length();
00203   const char* buffer = reinterpret_cast<const char*>(data.get_buffer());
00204 
00205   ACE_InputCDR cdr(buffer, len);
00206   if (cdr >> network_address) {
00207     network_address.to_addr(local_address);
00208   }
00209 
00210   return local_address;
00211 }
00212 
00213 void
00214 UdpTransport::release_datalink(DataLink* link)
00215 {
00216   GuardType guard(client_links_lock_);
00217   for (UdpDataLinkMap::iterator it(client_links_.begin());
00218        it != client_links_.end(); ++it) {
00219     // We are guaranteed to have exactly one matching DataLink
00220     // in the map; release any resources held and return.
00221     if (link == static_cast<DataLink*>(it->second.in())) {
00222       link->stop();
00223       client_links_.erase(it);
00224       return;
00225     }
00226   }
00227 }
00228 
00229 PriorityKey
00230 UdpTransport::blob_to_key(const TransportBLOB& remote,
00231                           Priority priority,
00232                           ACE_INET_Addr local_addr,
00233                           bool active)
00234 {
00235   NetworkAddress network_order_address;
00236   ACE_InputCDR cdr((const char*)remote.get_buffer(), remote.length());
00237 
00238   if (!(cdr >> network_order_address)) {
00239     ACE_ERROR((LM_ERROR,
00240                ACE_TEXT("(%P|%t) ERROR: UdpTransport::blob_to_key")
00241                ACE_TEXT(" failed to de-serialize the NetworkAddress\n")));
00242   }
00243 
00244   ACE_INET_Addr remote_address;
00245   network_order_address.to_addr(remote_address);
00246   const bool is_loopback = remote_address == local_addr;
00247 
00248   return PriorityKey(priority, remote_address, is_loopback, active);
00249 }
00250 
00251 void
00252 UdpTransport::passive_connection(const ACE_INET_Addr& remote_address,
00253                                  const Message_Block_Ptr& data)
00254 {
00255   CORBA::ULong octet_size =
00256     static_cast<CORBA::ULong>(data->length() - sizeof(Priority));
00257   Priority priority;
00258   Serializer serializer(data.get());
00259   serializer >> priority;
00260   TransportBLOB blob(octet_size);
00261   blob.length(octet_size);
00262   serializer.read_octet_array(blob.get_buffer(), octet_size);
00263 
00264   // Send an ack so that the active side can return from
00265   // connect_datalink_i().  This is just a single byte of
00266   // arbitrary data, the remote side is not yet using the
00267   // framework (TransportHeader, DataSampleHeader,
00268   // ReceiveStrategy).
00269   const char ack_data = 23;
00270   if (server_link_->socket().send(&ack_data, 1, remote_address) <= 0) {
00271     VDBG((LM_DEBUG, "(%P|%t) UdpTransport::passive_connection failed to send ack\n"));
00272   }
00273 
00274   const PriorityKey key = blob_to_key(blob, priority, config().local_address(), false /* passive */);
00275 
00276   ACE_Guard<ACE_Recursive_Thread_Mutex> guard(connections_lock_);
00277 
00278   //GuardType guard(connections_lock_);
00279   const PendConnMap::iterator pend = pending_connections_.find(key);
00280 
00281   if (pend != pending_connections_.end()) {
00282 
00283      //don't hold connections_lock_ while calling use_datalink
00284      //guard.release();
00285 
00286     VDBG((LM_DEBUG, "(%P|%t) UdpTransport::passive_connection completing\n"));
00287 
00288     const DataLink_rch link = static_rchandle_cast<DataLink>(server_link_);
00289 
00290     //Insert key now to make sure when releasing guard to call use_datalink
00291     //if an accept_datalink obtains lock first it will see that it can proceed
00292     //with using the link and do its own use_datalink call.
00293     server_link_keys_.insert(key);
00294 
00295     //create a copy of the size of callback vector so that if use_datalink_i -> stop_accepting_or_connecting
00296     //finds that callbacks vector is empty and deletes pending connection & its callback vector for loop can
00297     //still exit the loop without checking the size of invalid memory
00298     //size_t num_callbacks = pend->second.size();
00299 
00300     //Create a copy of the vector of callbacks to process, making sure that each is
00301     //still present in the actual pending_connections_ before calling use_datalink
00302     Callbacks tmp(pend->second);
00303     for (size_t i = 0; i < tmp.size(); ++i) {
00304       const PendConnMap::iterator pend = pending_connections_.find(key);
00305       if (pend != pending_connections_.end()) {
00306         const Callbacks::iterator tmp_iter = find(pend->second.begin(),
00307                                                   pend->second.end(),
00308                                                   tmp.at(i));
00309         if (tmp_iter != pend->second.end()) {
00310           TransportClient_wrch pend_client = tmp.at(i).first;
00311           RepoId remote_repo = tmp.at(i).second;
00312           guard.release();
00313           TransportClient_rch client = pend_client.lock();
00314           if (client)
00315             client->use_datalink(remote_repo, link);
00316           guard.acquire();
00317         }
00318       }
00319     }
00320   } else {
00321     // still hold guard(connections_lock_) at this point so
00322     // pending_server_link_keys_ is protected for insert
00323 
00324     VDBG((LM_DEBUG, "(%P|%t) UdpTransport::passive_connection pending\n"));
00325     // accept_datalink() will complete the connection.
00326     pending_server_link_keys_.insert(key);
00327   }
00328 }
00329 
00330 } // namespace DCPS
00331 } // namespace OpenDDS
00332 
00333 OPENDDS_END_VERSIONED_NAMESPACE_DECL
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1