OpenDDS  Snapshot(2023/04/28-20:55)
UdpTransport.cpp
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 #include "UdpTransport.h"
9 
10 #include "UdpInst_rch.h"
11 #include "UdpDataLink.h"
12 #include "UdpInst.h"
13 #include "UdpSendStrategy.h"
14 #include "UdpReceiveStrategy.h"
15 
16 #include <dds/DCPS/LogAddr.h>
22 
23 #include <ace/CDR_Base.h>
24 #include <ace/Log_Msg.h>
25 
27 
28 namespace OpenDDS {
29 namespace DCPS {
30 
31 namespace {
32  const Encoding::Kind encoding_kind = Encoding::KIND_UNALIGNED_CDR;
33 }
34 
36  : TransportImpl(inst)
37 {
38  if (!(configure_i(inst) && open())) {
40  }
41 }
42 
45 {
47 }
48 
49 
52  Priority priority, bool active)
53 {
54  UdpDataLink_rch link(make_rch<UdpDataLink>(rchandle_from(this), priority, reactor_task(), active));
55  // Configure link with transport configuration and reactor task:
56 
57  // Open logical connection:
58  if (link->open(remote_address)) {
59  return link;
60  }
61 
63  ACE_TEXT("(%P|%t) ERROR: ")
64  ACE_TEXT("UdpTransport::make_datalink: ")
65  ACE_TEXT("failed to open DataLink!\n")));
66 
67  return UdpDataLink_rch();
68 }
69 
72  const ConnectionAttribs& attribs,
73  const TransportClient_rch& )
74 {
75  UdpInst_rch cfg = config();
76  if (!cfg && is_shut_down()) {
78  }
79  const ACE_INET_Addr remote_address = get_connection_addr(remote.blob_);
80  const bool active = true;
81  const PriorityKey key = blob_to_key(remote.blob_, attribs.priority_, cfg->local_address(), active);
82 
83  VDBG_LVL((LM_DEBUG, "(%P|%t) UdpTransport::connect_datalink PriorityKey "
84  "prio=%d, addr=%C, is_loopback=%d, is_active=%d\n",
85  key.priority(), LogAddr(key.address()).c_str(), key.is_loopback(),
86  key.is_active()), 2);
87 
89  if (this->is_shut_down()) {
91  }
92 
93  const UdpDataLinkMap::iterator it(client_links_.find(key));
94  if (it != client_links_.end()) {
95  VDBG((LM_DEBUG, "(%P|%t) UdpTransport::connect_datalink found\n"));
96  return AcceptConnectResult(UdpDataLink_rch(it->second));
97  }
98 
99  // Create new DataLink for logical connection:
100  UdpDataLink_rch link (make_datalink(remote_address,
101  attribs.priority_,
102  active));
103 
104  if (!link.is_nil()) {
105  client_links_.insert(UdpDataLinkMap::value_type(key, link));
106  VDBG((LM_DEBUG, "(%P|%t) UdpTransport::connect_datalink connected\n"));
107  }
108 
109  return AcceptConnectResult(link);
110 }
111 
114  const ConnectionAttribs& attribs,
115  const TransportClient_rch& client)
116 {
117  UdpInst_rch cfg = config();
118  if (!cfg && is_shut_down()) {
120  }
122 
123  const PriorityKey key = blob_to_key(remote.blob_,
124  attribs.priority_, cfg->local_address(), false /* !active */);
125 
126  VDBG_LVL((LM_DEBUG, "(%P|%t) UdpTransport::accept_datalink PriorityKey "
127  "prio=%d, addr=%C, is_loopback=%d, is_active=%d\n",
128  key.priority(), LogAddr(key.address()).c_str(), key.is_loopback(),
129  key.is_active()), 2);
130 
131  if (server_link_keys_.count(key)) {
132  VDBG((LM_DEBUG, "(%P|%t) UdpTransport::accept_datalink found\n"));
134  } else if (pending_server_link_keys_.count(key)) {
135  pending_server_link_keys_.erase(key);
136  server_link_keys_.insert(key);
137  VDBG((LM_DEBUG, "(%P|%t) UdpTransport::accept_datalink completed\n"));
139  } else {
140  const DataLink::OnStartCallback callback(client, remote.repo_id_);
141  pending_connections_[key].push_back(callback);
142  VDBG((LM_DEBUG, "(%P|%t) UdpTransport::accept_datalink pending\n"));
144  }
145  return AcceptConnectResult();
146 }
147 
148 void
150  const GUID_t& remote_id,
151  bool /*disassociate*/,
152  bool /*association_failed*/)
153 {
154  VDBG((LM_DEBUG, "(%P|%t) UdpTransport::stop_accepting_or_connecting\n"));
155 
157 
158  for (PendConnMap::iterator it = pending_connections_.begin();
159  it != pending_connections_.end(); ++it) {
160  for (size_t i = 0; i < it->second.size(); ++i) {
161  if (it->second[i].first == client && it->second[i].second == remote_id) {
162  it->second.erase(it->second.begin() + i);
163  break;
164  }
165  }
166  if (it->second.empty()) {
167  pending_connections_.erase(it);
168  return;
169  }
170  }
171 }
172 
173 bool
175 {
176  if (!config) {
177  return false;
178  }
179  create_reactor_task(false, "UdpTransport" + config->name());
180 
181  // Override with DCPSDefaultAddress.
182  if (config->local_address() == ACE_INET_Addr() &&
183  TheServiceParticipant->default_address().to_addr() != ACE_INET_Addr()) {
184  config->local_address(TheServiceParticipant->default_address().to_addr());
185  }
186 
187  // Our "server side" data link is created here, similar to the acceptor_
188  // in the TcpTransport implementation. This establishes a socket as an
189  // endpoint that we can advertise to peers via connection_info_i().
190  server_link_ = make_datalink(config->local_address(), 0 /* priority */, false);
191  return true;
192 }
193 
194 void
196 {
197  // Shutdown reserved datalinks and release configuration:
199  for (UdpDataLinkMap::iterator it(client_links_.begin());
200  it != client_links_.end(); ++it) {
201  it->second->transport_shutdown();
202  }
203  client_links_.clear();
204 
205  if (server_link_) {
208  }
209 }
210 
211 bool
213 {
214  UdpInst_rch cfg = config();
215  if (cfg) {
216  cfg->populate_locator(info, flags);
217  return true;
218  }
219  return false;
220 }
221 
224 {
225  ACE_INET_Addr local_address;
226  NetworkResource network_resource;
227 
228  size_t len = data.length();
229  const char* buffer = reinterpret_cast<const char*>(data.get_buffer());
230 
231  ACE_InputCDR cdr(buffer, len);
232  if (cdr >> network_resource) {
233  network_resource.to_addr(local_address);
234  }
235 
236  return local_address;
237 }
238 
239 void
241 {
243  for (UdpDataLinkMap::iterator it(client_links_.begin());
244  it != client_links_.end(); ++it) {
245  // We are guaranteed to have exactly one matching DataLink
246  // in the map; release any resources held and return.
247  if (link == static_cast<DataLink*>(it->second.in())) {
248  link->stop();
249  client_links_.erase(it);
250  return;
251  }
252  }
253 }
254 
257  Priority priority,
258  ACE_INET_Addr local_addr,
259  bool active)
260 {
261  NetworkResource network_resource;
262  ACE_InputCDR cdr((const char*)remote.get_buffer(), remote.length());
263 
264  if (!(cdr >> network_resource)) {
266  ACE_TEXT("(%P|%t) ERROR: UdpTransport::blob_to_key")
267  ACE_TEXT(" failed to de-serialize the NetworkResource\n")));
268  }
269 
270  ACE_INET_Addr remote_address;
271  network_resource.to_addr(remote_address);
272  const bool is_loopback = remote_address == local_addr;
273 
274  return PriorityKey(priority, remote_address, is_loopback, active);
275 }
276 
277 void
279  const ReceivedDataSample& data)
280 {
281  UdpInst_rch cfg = config();
282  if (!cfg) {
283  return;
284  }
285  const size_t blob_len = data.data_length() - sizeof(Priority);
286  Message_Block_Ptr payload(data.data());
287  Priority priority;
288  Serializer serializer(payload.get(), encoding_kind);
289  serializer >> priority;
290  TransportBLOB blob(static_cast<CORBA::ULong>(blob_len));
291  blob.length(blob.maximum());
292  serializer.read_octet_array(blob.get_buffer(), blob.length());
293 
294  // Send an ack so that the active side can return from
295  // connect_datalink_i(). This is just a single byte of
296  // arbitrary data, the remote side is not yet using the
297  // framework (TransportHeader, DataSampleHeader,
298  // ReceiveStrategy).
299  const char ack_data = 23;
300  if (server_link_->socket().send(&ack_data, 1, remote_address) <= 0) {
301  VDBG((LM_DEBUG, "(%P|%t) UdpTransport::passive_connection failed to send ack\n"));
302  }
303 
304  const PriorityKey key = blob_to_key(blob, priority, cfg->local_address(), false /* passive */);
305 
307 
308  const PendConnMap::iterator pend = pending_connections_.find(key);
309 
310  if (pend != pending_connections_.end()) {
311 
312  //don't hold connections_lock_ while calling use_datalink
313  //guard.release();
314 
315  VDBG((LM_DEBUG, "(%P|%t) UdpTransport::passive_connection completing\n"));
316 
318 
319  //Insert key now to make sure when releasing guard to call use_datalink
320  //if an accept_datalink obtains lock first it will see that it can proceed
321  //with using the link and do its own use_datalink call.
322  server_link_keys_.insert(key);
323 
324  //create a copy of the size of callback vector so that if use_datalink_i -> stop_accepting_or_connecting
325  //finds that callbacks vector is empty and deletes pending connection & its callback vector for loop can
326  //still exit the loop without checking the size of invalid memory
327  //size_t num_callbacks = pend->second.size();
328 
329  //Create a copy of the vector of callbacks to process, making sure that each is
330  //still present in the actual pending_connections_ before calling use_datalink
331  Callbacks tmp(pend->second);
332  for (size_t i = 0; i < tmp.size(); ++i) {
333  const PendConnMap::iterator pend = pending_connections_.find(key);
334  if (pend != pending_connections_.end()) {
335  const Callbacks::iterator tmp_iter = find(pend->second.begin(),
336  pend->second.end(),
337  tmp.at(i));
338  if (tmp_iter != pend->second.end()) {
339  TransportClient_wrch pend_client = tmp.at(i).first;
340  GUID_t remote_repo = tmp.at(i).second;
341  guard.release();
342  TransportClient_rch client = pend_client.lock();
343  if (client)
344  client->use_datalink(remote_repo, link);
345  guard.acquire();
346  }
347  }
348  }
349  } else {
350  // still hold guard(connections_lock_) at this point so
351  // pending_server_link_keys_ is protected for insert
352 
353  VDBG((LM_DEBUG, "(%P|%t) UdpTransport::passive_connection pending\n"));
354  // accept_datalink() will complete the connection.
355  pending_server_link_keys_.insert(key);
356  }
357 }
358 
359 } // namespace DCPS
360 } // namespace OpenDDS
361 
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310
#define ACE_ERROR(X)
virtual void release_datalink(DataLink *link)
std::pair< TransportClient_wrch, GUID_t > OnStartCallback
Definition: DataLink.h:259
virtual AcceptConnectResult connect_datalink(const RemoteTransport &remote, const ConnectionAttribs &attribs, const TransportClient_rch &client)
bool open(const ACE_INET_Addr &remote_address)
Definition: UdpDataLink.cpp:46
sequence< octet > key
Encapsulate a priority value and internet address as a key.
Definition: PriorityKey.h:52
void create_reactor_task(bool useAsyncSend=false, const OPENDDS_STRING &name="")
UdpTransport(const UdpInst_rch &inst)
void passive_connection(const ACE_INET_Addr &remote_address, const ReceivedDataSample &data)
RcHandle< UdpDataLink > UdpDataLink_rch
virtual void stop_accepting_or_connecting(const TransportClient_wrch &client, const GUID_t &remote_id, bool disassociate, bool association_failed)
ACE_INET_Addr & address()
Definition: PriorityKey.inl:72
std::vector< DataLink::OnStartCallback > Callbacks
Definition: UdpTransport.h:94
LM_DEBUG
Class to serialize and deserialize data for DDS.
Definition: Serializer.h:369
#define VDBG(DBG_ARGS)
ACE_INET_Addr get_connection_addr(const TransportBLOB &data) const
Holds a data sample received by the transport.
std::set< PriorityKey > pending_server_link_keys_
Definition: UdpTransport.h:104
Defines a wrapper around address info which is used for advertise.
UdpDataLink_rch server_link_
The single datalink for the passive side. No locking required.
Definition: UdpTransport.h:81
bool configure_i(const UdpInst_rch &config)
PendConnMap pending_connections_
Definition: UdpTransport.h:99
ACE_TEXT("TCP_Factory")
virtual bool connection_info_i(TransportLocator &info, ConnectionInfoFlags flags) const
ACE_SOCK_Dgram & socket()
Definition: UdpDataLink.inl:40
ReactorTask_rch reactor_task()
ACE_Recursive_Thread_Mutex connections_lock_
Definition: UdpTransport.h:86
std::set< PriorityKey > server_link_keys_
Definition: UdpTransport.h:92
ACE_CDR::Long Priority
PriorityKey blob_to_key(const TransportBLOB &remote, Priority priority, ACE_INET_Addr local_addr, bool active)
size_t data_length() const
total length of usable bytes (between rd_ptr and wr_ptr) of all Data Blocks
void stop()
The stop method is used to stop the DataLink prior to shutdown.
Definition: DataLink.cpp:355
UdpDataLink_rch make_datalink(const ACE_INET_Addr &remote_address, Priority priority, bool active)
RcHandle< T > static_rchandle_cast(const RcHandle< U > &h)
Definition: RcHandle_T.h:202
LockType client_links_lock_
This lock is used to protect the client_links_ data member.
Definition: UdpTransport.h:73
#define VDBG_LVL(DBG_ARGS, LEVEL)
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
UdpDataLinkMap client_links_
Definition: UdpTransport.h:78
ACE_Message_Block * data(ACE_Allocator *mb_alloc=0) const
RcHandle< T > lock() const
Definition: RcObject.h:188
RcHandle< T > dynamic_rchandle_cast(const RcHandle< U > &h)
Definition: RcHandle_T.h:214
void to_addr(ACE_INET_Addr &addr) const
#define TheServiceParticipant
ssize_t send(const void *buf, size_t n, const ACE_Addr &addr, int flags=0) const
DDS::OctetSeq TransportBLOB
LM_ERROR
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
virtual AcceptConnectResult accept_datalink(const RemoteTransport &remote, const ConnectionAttribs &attribs, const TransportClient_rch &client)
UdpInst_rch config() const
size_t ConnectionInfoFlags
TransportInst_rch config() const
int find(Container &c, const Key &key, typename Container::mapped_type *&value)
Definition: Util.h:71