OpenDDS  Snapshot(2023/04/28-20:55)
MulticastTransport.cpp
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 #include "MulticastTransport.h"
9 #include "MulticastDataLink.h"
11 #include "MulticastSendStrategy.h"
12 #include "MulticastSession.h"
14 #include "ReliableSessionFactory.h"
15 
17 #include <dds/DCPS/LogAddr.h>
22 
23 #include <ace/Log_Msg.h>
24 #include <ace/Truncate.h>
25 
27 
28 namespace OpenDDS {
29 namespace DCPS {
30 
32  : TransportImpl(inst)
33 {
34  if (! (configure_i(inst) && open())) {
36  }
37 }
38 
40 {
41 }
42 
43 
46 {
48 }
49 
52  Priority priority,
53  bool active)
54 {
55  RcHandle<MulticastSessionFactory> session_factory;
56  MulticastInst_rch cfg = config();
57  if (cfg && cfg->is_reliable()) {
58  session_factory = make_rch<ReliableSessionFactory>();
59  } else {
60  session_factory = make_rch<BestEffortSessionFactory>();
61  }
62 
63  MulticastPeer local_peer = (ACE_INT64)RepoIdConverter(local_id).federationId() << 32
64  | RepoIdConverter(local_id).participantId();
65 
66  VDBG_LVL((LM_DEBUG, "(%P|%t) MulticastTransport[%C]::make_datalink "
67  "peers: local %#08x%08x priority %d active %d\n",
68  cfg ? cfg->name().c_str() : "", (unsigned int)(local_peer >> 32), (unsigned int)local_peer,
69  priority, active), 2);
70 
71  MulticastDataLink_rch link(make_rch<MulticastDataLink>(rchandle_from(this),
72  session_factory,
73  local_peer,
74  ref(cfg),
75  reactor_task(),
76  active));
77 
78  // Join multicast group:
79  if (!link->join(cfg->group_address_)) {
80  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: MulticastTransport::make_datalink: ")
81  ACE_TEXT("failed to join multicast group: %C!\n"),
82  LogAddr(cfg->group_address_, LogAddr::HostPort).c_str()));
83  return MulticastDataLink_rch();
84  }
85 
86  return link;
87 }
88 
91  MulticastPeer remote_peer, bool active)
92 {
93  MulticastInst_rch cfg = config();
94 
95  if (link.is_nil()) {
97  ACE_TEXT("(%P|%t) ERROR: ")
98  ACE_TEXT("MulticastTransport[%C]::start_session: ")
99  ACE_TEXT("link is nil\n"),
100  cfg ? cfg->name().c_str() : ""),
102  }
103 
104  MulticastSession_rch session(link->find_or_create_session(remote_peer));
105 
106  if (session.is_nil()) {
108  ACE_TEXT("(%P|%t) ERROR: ")
109  ACE_TEXT("MulticastTransport[%C]::start_session: ")
110  ACE_TEXT("failed to create session for remote peer: %#08x%08x!\n"),
111  cfg ? cfg->name().c_str() : "",
112  (unsigned int)(remote_peer >> 32),
113  (unsigned int) remote_peer),
115  }
116 
117  const bool acked = this->connections_.count(std::make_pair(remote_peer, link->local_peer()));
118 
119  if (!session->start(active, acked)) {
121  ACE_TEXT("(%P|%t) ERROR: ")
122  ACE_TEXT("MulticastTransport[%C]::start_session: ")
123  ACE_TEXT("failed to start session for remote peer: %#08x%08x!\n"),
124  cfg ? cfg->name().c_str() : "",
125  (unsigned int)(remote_peer >> 32),
126  (unsigned int) remote_peer),
128  }
129 
130  return session;
131 }
132 
133 static bool
135 {
136  NetworkResource network_resource;
137  ACE_CDR::Boolean reliable;
138 
139  const size_t len = remote.blob_.length();
140  const char* buffer = reinterpret_cast<const char*>(remote.blob_.get_buffer());
141 
142  ACE_InputCDR cdr(buffer, len);
143  cdr >> network_resource;
144  cdr >> ACE_InputCDR::to_boolean(reliable);
145 
146  return reliable;
147 }
148 
151  const ConnectionAttribs& attribs,
152  const TransportClient_rch& client)
153 {
154  MulticastInst_rch cfg = config();
155 
156  if (!cfg) {
158  }
159 
160  // Check that the remote reliability matches.
161  if (get_remote_reliability(remote) != cfg->is_reliable()) {
162  return AcceptConnectResult();
163  }
164 
165  GuardThreadType guard_links(this->links_lock_);
166  const MulticastPeer local_peer = (ACE_INT64)RepoIdConverter(attribs.local_id_).federationId() << 32
168  Links::const_iterator link_iter = this->client_links_.find(local_peer);
170 
171  if (link_iter == this->client_links_.end()) {
172  link = this->make_datalink(attribs.local_id_, attribs.priority_, true /*active*/);
173  this->client_links_[local_peer] = link;
174  } else {
175  link = link_iter->second;
176  }
177 
178  MulticastPeer remote_peer = (ACE_INT64)RepoIdConverter(remote.repo_id_).federationId() << 32
180 
181  if (cfg->is_reliable()) {
182  link->add_on_start_callback(client, remote.repo_id_);
183  }
184 
185  MulticastSession_rch session(
186  this->start_session(link, remote_peer, true /*active*/));
187 
188  if (session.is_nil()) {
189  Links::iterator to_remove = this->client_links_.find(local_peer);
190  if (to_remove != this->client_links_.end()) {
191  this->client_links_.erase(to_remove);
192  }
193  link->remove_on_start_callback(client, remote.repo_id_);
194  return AcceptConnectResult();
195  }
196 
197  if (cfg->is_reliable()) {
198  session->add_remote(attribs.local_id_, remote.repo_id_);
199  if (remote_peer != local_peer) {
201  }
202  } else {
203  session->add_remote(attribs.local_id_);
204  }
205 
206  return AcceptConnectResult(link);
207 }
208 
211  const ConnectionAttribs& attribs,
212  const TransportClient_rch& client)
213 {
214  MulticastInst_rch cfg = config();
215 
216  if (!cfg) {
218  }
219 
220  // Check that the remote reliability matches.
221  if (get_remote_reliability(remote) != cfg->is_reliable()) {
222  return AcceptConnectResult();
223  }
224 
225  const MulticastPeer local_peer = (ACE_INT64)RepoIdConverter(attribs.local_id_).federationId() << 32
227 
228  GuardThreadType guard_links(this->links_lock_);
229 
230  Links::const_iterator link_iter = this->server_links_.find(local_peer);
232 
233  if (link_iter == this->server_links_.end()) {
234 
235  link = this->make_datalink(attribs.local_id_, attribs.priority_, false /*passive*/);
236  this->server_links_[local_peer] = link;
237  } else {
238  link = link_iter->second;
239  }
240 
241  guard_links.release();
242 
243  MulticastPeer remote_peer = (ACE_INT64)RepoIdConverter(remote.repo_id_).federationId() << 32
245 
246  GuardThreadType guard(this->connections_lock_);
247 
248  if (connections_.count(std::make_pair(remote_peer, local_peer))) {
249  //can't call start session with connections_lock_ due to reactor
250  //call in session->start which could deadlock with passive_connection
251  guard.release();
252 
253  VDBG((LM_DEBUG, "(%P|%t) MulticastTransport::accept_datalink found\n"));
254  MulticastSession_rch session(
255  this->start_session(link, remote_peer, false /*!active*/));
256 
257  if (session.is_nil()) {
258  link.reset();
259  }
260  return AcceptConnectResult(link);
261 
262  } else {
263 
264  if (cfg->is_reliable()) {
265  pending_connections_[std::make_pair(remote_peer, local_peer)].
266  push_back(std::make_pair(client, remote.repo_id_));
267  }
268 
269  //can't call start session with connections_lock_ due to reactor
270  //call in session->start which could deadlock with passive_connection
271  guard.release();
272  MulticastSession_rch session(
273  this->start_session(link, remote_peer, false /*!active*/));
274 
275  if (!session) {
277  } else if (cfg->is_reliable()) {
279  } else {
280  return AcceptConnectResult(link);
281  }
282  }
283 }
284 
285 void
287  const GUID_t& remote_id,
288  bool /*disassociate*/,
289  bool /*association_failed*/)
290 {
291  VDBG((LM_DEBUG, "(%P|%t) MulticastTransport::stop_accepting_or_connecting\n"));
292 
293  GuardThreadType guard(this->connections_lock_);
294 
295  for (PendConnMap::iterator it = this->pending_connections_.begin();
296  it != this->pending_connections_.end(); ++it) {
297  bool erased_from_it = false;
298  for (size_t i = 0; i < it->second.size(); ++i) {
299  if (it->second[i].first == client && it->second[i].second == remote_id) {
300  erased_from_it = true;
301  it->second.erase(it->second.begin() + i);
302  break;
303  }
304  }
305 
306  if (erased_from_it && it->second.empty()) {
307  this->pending_connections_.erase(it);
308  return;
309  }
310  }
311 }
312 
313 void
315 {
316  GuardThreadType guard(this->connections_lock_);
317 
318  MulticastInst_rch cfg = config();
319 
320  VDBG_LVL((LM_DEBUG, "(%P|%t) MulticastTransport[%C]::passive_connection "
321  "from remote peer %#08x%08x to local peer %#08x%08x\n",
322  cfg ? cfg->name().c_str() : "",
323  (unsigned int) (remote_peer >> 32),
324  (unsigned int) remote_peer,
325  (unsigned int) (local_peer >> 32),
326  (unsigned int) local_peer), 2);
327 
328  const Peers peers(remote_peer, local_peer);
329  const PendConnMap::iterator pend = this->pending_connections_.find(peers);
330  //if connection was pending, calls to use_datalink finalized the connection
331  //if it was not previously pending, accept_datalink() will finalize connection
332 
333  this->connections_.insert(peers);
334 
335  Links::const_iterator server_link = this->server_links_.find(local_peer);
336  DataLink_rch link;
337 
338  if (server_link != this->server_links_.end()) {
339  link = static_rchandle_cast<DataLink>(server_link->second);
340  MulticastSession_rch session (server_link->second->find_or_create_session(remote_peer));
341  session->set_acked();
342  }
343 
344  if (pend != pending_connections_.end()) {
345  Callbacks tmp(pend->second);
346  for (size_t i = 0; i < tmp.size(); ++i) {
347  const PendConnMap::iterator pend = pending_connections_.find(peers);
348  if (pend != pending_connections_.end()) {
349  const Callbacks::iterator tmp_iter = find(pend->second.begin(),
350  pend->second.end(),
351  tmp.at(i));
352  if (tmp_iter != pend->second.end()) {
353  TransportClient_wrch pend_client = tmp.at(i).first;
354  GUID_t remote_repo = tmp.at(i).second;
355  guard.release();
356  TransportClient_rch client = pend_client.lock();
357  if (client) {
358  client->use_datalink(remote_repo, link);
359  }
360  guard.acquire();
361  }
362  }
363  }
364  }
365 }
366 
367 bool
369 {
370  if (!config) {
371  return false;
372  }
373 
374  // Override with DCPSDefaultAddress.
375  if (config->local_address_.empty() &&
376  TheServiceParticipant->default_address().to_addr() != ACE_INET_Addr()) {
377  char buffer[INET6_ADDRSTRLEN];
378  config->local_address_ = TheServiceParticipant->default_address().to_addr().get_host_addr(buffer, sizeof buffer);
379  }
380 
381  if (!config->group_address_.is_multicast()) {
382  ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: MulticastTransport[%@]::configure_i: ")
383  ACE_TEXT("invalid configuration: address %C is not multicast.\n"),
384  this, LogAddr::ip(config->group_address_).c_str()), false);
385  }
386 
387  this->create_reactor_task(config->async_send_, "MulticastTransport" + config->name());
388 
389  return true;
390 }
391 
392 void
394 {
395  GuardThreadType guard_links(this->links_lock_);
396  Links::iterator link;
397 
398  for (link = this->client_links_.begin();
399  link != this->client_links_.end();
400  ++link) {
401  if (link->second.in()) {
402  link->second->transport_shutdown();
403  }
404  }
405  client_links_.clear();
406 
407  for (link = this->server_links_.begin();
408  link != this->server_links_.end();
409  ++link) {
410  if (link->second.in()) {
411  link->second->transport_shutdown();
412  }
413  }
414  server_links_.clear();
415 }
416 
417 bool
419 {
420  MulticastInst_rch cfg = config();
421  if (cfg) {
422  cfg->populate_locator(info, flags);
423  return true;
424  }
425  return false;
426 }
427 
428 void
430 {
431  // No-op for multicast: keep both the client_link_ and server_link_ around
432  // until the transport is shut down.
433 }
434 
436 {
437  GuardThreadType guard_links(this->links_lock_);
438  const MulticastPeer local_peer = (ACE_INT64)RepoIdConverter(localId).federationId() << 32
439  | RepoIdConverter(localId).participantId();
440  Links::const_iterator link_iter = this->client_links_.find(local_peer);
442 
443  if (link_iter != this->client_links_.end()) {
444  link = link_iter->second;
445  }
446  guard_links.release();
447 
448  if (link) {
449  link->client_stop(localId);
450  }
451 }
452 
453 } // namespace DCPS
454 } // namespace OpenDDS
455 
ACE_INT64 MulticastPeer
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310
#define ACE_ERROR(X)
#define INET6_ADDRSTRLEN
virtual void release_datalink(DataLink *link)
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
static bool get_remote_reliability(const TransportImpl::RemoteTransport &remote)
void create_reactor_task(bool useAsyncSend=false, const OPENDDS_STRING &name="")
bool configure_i(const MulticastInst_rch &config)
ParticipantId participantId() const
Get the participant id from the GUID.
MulticastTransport(const MulticastInst_rch &inst)
RcHandle< MulticastSession > MulticastSession_rch
int acquire(void)
reference_wrapper< T > ref(T &r)
Definition: RcHandle_T.h:237
void passive_connection(MulticastPeer local_peer, MulticastPeer remote_peer)
OpenDDS::Federator::RepoKey federationId() const
Get the federeation id from the GUID.
int release(void)
virtual AcceptConnectResult connect_datalink(const RemoteTransport &remote, const ConnectionAttribs &attribs, const TransportClient_rch &client)
virtual void stop_accepting_or_connecting(const TransportClient_wrch &client, const GUID_t &remote_id, bool disassociate, bool association_failed)
LM_DEBUG
MulticastDataLink_rch make_datalink(const GUID_t &local_id, Priority priority, bool active)
#define VDBG(DBG_ARGS)
virtual AcceptConnectResult accept_datalink(const RemoteTransport &remote, const ConnectionAttribs &attribs, const TransportClient_rch &client)
Defines a wrapper around address info which is used for advertise.
virtual bool connection_info_i(TransportLocator &info, ConnectionInfoFlags flags) const
const char * c_str() const
Definition: LogAddr.h:32
void client_stop(const GUID_t &localId)
std::vector< DataLink::OnStartCallback > Callbacks
ACE_TEXT("TCP_Factory")
ReactorTask_rch reactor_task()
MulticastInst_rch config() const
void push_back(Seq &seq, const typename Seq::value_type &val)
std::vector-style push_back() for CORBA Sequences
Definition: Util.h:138
ACE_CDR::Long Priority
RcHandle< T > static_rchandle_cast(const RcHandle< U > &h)
Definition: RcHandle_T.h:202
MulticastSession_rch start_session(const MulticastDataLink_rch &link, MulticastPeer remote_peer, bool active)
#define VDBG_LVL(DBG_ARGS, LEVEL)
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
long long ACE_INT64
std::pair< MulticastPeer, MulticastPeer > Peers
#define ACE_ERROR_RETURN(X, Y)
RcHandle< T > lock() const
Definition: RcObject.h:188
static const String ip(const ACE_INET_Addr &addr)
Definition: LogAddr.cpp:15
RcHandle< T > dynamic_rchandle_cast(const RcHandle< U > &h)
Definition: RcHandle_T.h:214
#define TheServiceParticipant
bool Boolean
LM_ERROR
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
RcHandle< MulticastDataLink > MulticastDataLink_rch
size_t ConnectionInfoFlags
TransportInst_rch config() const
int find(Container &c, const Key &key, typename Container::mapped_type *&value)
Definition: Util.h:71