OpenDDS  Snapshot(2023/04/07-19:43)
Public Member Functions | Private Member Functions | List of all members
OpenDDS::DCPS::MulticastManager Class Reference

#include <MulticastManager.h>

Public Member Functions

bool process (InternalDataReader< NetworkInterfaceAddress >::SampleSequence &samples, InternalSampleInfoSequence &infos, const OPENDDS_STRING &multicast_interface, ACE_Reactor *reactor, ACE_Event_Handler *event_handler, const NetworkAddress &multicast_group_address, ACE_SOCK_Dgram_Mcast &multicast_socket)
 Returns true if at least one group was joined. More...
 

Private Member Functions

size_t joined_interface_count () const
 
 OPENDDS_SET (OPENDDS_STRING) joined_interfaces_
 

Detailed Description

Definition at line 24 of file MulticastManager.h.

Member Function Documentation

◆ joined_interface_count()

size_t OpenDDS::DCPS::MulticastManager::joined_interface_count ( ) const
private

Definition at line 194 of file MulticastManager.cpp.

References OPENDDS_END_VERSIONED_NAMESPACE_DECL.

Referenced by process().

195 {
196  return joined_interfaces_.size()
197 #ifdef ACE_HAS_IPV6
198  + ipv6_joined_interfaces_.size()
199 #endif
200  ;
201 }

◆ OPENDDS_SET()

OpenDDS::DCPS::MulticastManager::OPENDDS_SET ( OPENDDS_STRING  )
private

◆ process()

bool OpenDDS::DCPS::MulticastManager::process ( InternalDataReader< NetworkInterfaceAddress >::SampleSequence &  samples,
InternalSampleInfoSequence &  infos,
const OPENDDS_STRING multicast_interface,
ACE_Reactor reactor,
ACE_Event_Handler event_handler,
const NetworkAddress multicast_group_address,
ACE_SOCK_Dgram_Mcast multicast_socket 
)

Returns true if at least one group was joined.

Definition at line 18 of file MulticastManager.cpp.

References ACE_DEBUG, ACE_ERROR, ACE_GUARD_RETURN, ACE_TEXT_CHAR_TO_TCHAR, OpenDDS::DCPS::NetworkInterfaceAddress::address, OpenDDS::DCPS::LogAddr::c_str(), OpenDDS::DCPS::LogLevel::Error, OpenDDS::DCPS::NetworkInterfaceAddress::exclude_from_multicast(), OpenDDS::DCPS::LogLevel::Info, OpenDDS::DCPS::LogAddr::Ip, OpenDDS::DCPS::NetworkInterfaceAddress::is_ipv4(), OpenDDS::DCPS::NetworkInterfaceAddress::is_ipv6(), OpenDDS::DCPS::ISIK_DISPOSE, OpenDDS::DCPS::ISIK_REGISTER, OpenDDS::DCPS::ISIK_SAMPLE, OpenDDS::DCPS::ISIK_UNREGISTER, joined_interface_count(), OpenDDS::DCPS::InternalSampleInfo::kind, LM_ERROR, LM_INFO, LM_WARNING, OpenDDS::DCPS::log_level, OpenDDS::DCPS::NetworkInterfaceAddress::name, ACE_Event_Handler::READ_MASK, ACE_Reactor::register_handler(), OpenDDS::DCPS::NetworkAddress::to_addr(), and OpenDDS::DCPS::LogLevel::Warning.

Referenced by OpenDDS::DCPS::RtpsUdpDataLink::on_data_available(), and OpenDDS::RTPS::Spdp::SpdpTransport::on_data_available().

30 {
31  bool any_joined = false;
32 
33  for (size_t idx = 0; idx != samples.size(); ++idx) {
34  NetworkInterfaceAddress& nia = samples[idx];
35  const InternalSampleInfo& info = infos[idx];
36 
37  if (nia.name.empty()) {
38  nia.name = multicast_interface;
39  }
40 
41  switch (info.kind) {
42  case ISIK_REGISTER:
43  // Ignore.
44  break;
45  case ISIK_SAMPLE: {
46  if (nia.exclude_from_multicast(multicast_interface.c_str())) {
47  continue;
48  }
49 
50  if (joined_interfaces_.count(nia.name) == 0 && nia.is_ipv4()) {
51  if (0 == multicast_socket.join(multicast_group_address.to_addr(), 1, nia.name.empty() ? 0 : ACE_TEXT_CHAR_TO_TCHAR(nia.name.c_str()))) {
52  joined_interfaces_.insert(nia.name);
53  if (log_level >= LogLevel::Info) {
54  ACE_DEBUG((LM_INFO,
55  "(%P|%t) INFO: MulticastManager::process: joined group %C on %C/%C (%@ joined count %B)\n",
56  LogAddr(multicast_group_address).c_str(),
57  nia.name.empty() ? "all interfaces" : nia.name.c_str(),
58  LogAddr(nia.address, LogAddr::Ip).c_str(),
59  this,
61  }
62  any_joined = true;
63 
64  if (reactor) {
65  if (reactor->register_handler(multicast_socket.get_handle(),
66  event_handler,
68  if (log_level >= LogLevel::Error) {
69  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: MulticastManager::process: failed to register multicast input handler\n"));
70  }
71  }
72  } else if (log_level >= LogLevel::Error) {
73  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: MulticastManager::process: reactor is NULL\n"));
74  }
75  } else {
77  ACE_ERROR((LM_WARNING,
78  "(%P|%t) WARNING: MulticastManager::process: failed to join group %C on %C/%C (%@ joined count %B): %m\n",
79  LogAddr(multicast_group_address).c_str(),
80  nia.name.empty() ? "all interfaces" : nia.name.c_str(),
81  LogAddr(nia.address, LogAddr::Ip).c_str(),
82  this,
84  }
85  }
86  }
87 
88 #ifdef ACE_HAS_IPV6
89  if (ipv6_joined_interfaces_.count(nia.name) == 0 && nia.is_ipv6()) {
90  // Windows 7 has an issue with different threads concurrently calling join for ipv6
91  static ACE_Thread_Mutex ipv6_static_lock;
92  ACE_GUARD_RETURN(ACE_Thread_Mutex, g3, ipv6_static_lock, false);
93 
94  if (0 == ipv6_multicast_socket.join(ipv6_multicast_group_address.to_addr(), 1, nia.name.empty() ? 0 : ACE_TEXT_CHAR_TO_TCHAR(nia.name.c_str()))) {
95  ipv6_joined_interfaces_.insert(nia.name);
96  if (log_level >= LogLevel::Info) {
97  ACE_DEBUG((LM_INFO,
98  "(%P|%t) INFO: MulticastManager::process: joined group %C on %C/%C (%@ joined count %B)\n",
99  LogAddr(ipv6_multicast_group_address).c_str(),
100  nia.name.empty() ? "all interfaces" : nia.name.c_str(),
101  LogAddr(nia.address, LogAddr::Ip).c_str(),
102  this,
104  }
105  any_joined = true;
106 
107  if (reactor) {
108  if (reactor->register_handler(ipv6_multicast_socket.get_handle(),
109  event_handler,
111  if (log_level >= LogLevel::Error) {
112  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: MulticastManager::process: ipv6 failed to register multicast input handler\n"));
113  }
114  }
115  } else if (log_level >= LogLevel::Error) {
116  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: MulticastManager::process: ipv6 reactor is NULL\n"));
117  }
118  } else {
119  if (log_level >= LogLevel::Warning) {
120  ACE_ERROR((LM_WARNING,
121  "(%P|%t) WARNING: MulticastManager::process: failed to join group %C on %C/%C (%@ joined count %B): %m\n",
122  LogAddr(ipv6_multicast_group_address).c_str(),
123  nia.name.empty() ? "all interfaces" : nia.name.c_str(),
124  LogAddr(nia.address, LogAddr::Ip).c_str(),
125  this,
127  }
128  }
129  }
130 #endif
131  }
132  break;
133  case ISIK_UNREGISTER:
134  case ISIK_DISPOSE: {
135  if (joined_interfaces_.count(nia.name) != 0 && !nia.is_ipv4()) {
136  if (0 == multicast_socket.leave(multicast_group_address.to_addr(), nia.name.empty() ? 0 : ACE_TEXT_CHAR_TO_TCHAR(nia.name.c_str()))) {
137  joined_interfaces_.erase(nia.name);
138  if (log_level >= LogLevel::Info) {
139  ACE_DEBUG((LM_INFO,
140  "(%P|%t) INFO: MulticastManager::process: left group %C on %C/%C (%@ joined count %B)\n",
141  LogAddr(multicast_group_address).c_str(),
142  nia.name.empty() ? "all interfaces" : nia.name.c_str(),
143  LogAddr(nia.address, LogAddr::Ip).c_str(),
144  this,
146  }
147  } else {
148  if (log_level >= LogLevel::Warning) {
149  ACE_ERROR((LM_WARNING,
150  "(%P|%t) WARNING: MulticastManager::process: failed to leave group %C on %C/%C (%@ joined count %B): %m\n",
151  LogAddr(multicast_group_address).c_str(),
152  nia.name.empty() ? "all interfaces" : nia.name.c_str(),
153  LogAddr(nia.address, LogAddr::Ip).c_str(),
154  this,
156  }
157  }
158  }
159 
160 #ifdef ACE_HAS_IPV6
161  if (ipv6_joined_interfaces_.count(nia.name) != 0 && !nia.is_ipv6()) {
162  if (0 == ipv6_multicast_socket.leave(ipv6_multicast_group_address.to_addr(), nia.name.empty() ? 0 : ACE_TEXT_CHAR_TO_TCHAR(nia.name.c_str()))) {
163  ipv6_joined_interfaces_.erase(nia.name);
164  if (log_level >= LogLevel::Info) {
165  ACE_DEBUG((LM_INFO,
166  "(%P|%t) INFO: MulticastManager::process: left group %C on %C/%C (%@ joined count %B)\n",
167  LogAddr(ipv6_multicast_group_address).c_str(),
168  nia.name.empty() ? "all interfaces" : nia.name.c_str(),
169  LogAddr(nia.address, LogAddr::Ip).c_str(),
170  this,
172  }
173  } else {
174  if (log_level >= LogLevel::Warning) {
175  ACE_ERROR((LM_WARNING,
176  "(%P|%t) WARNING: MulticastManager::process: failed to leave group %C on %C/%C (%@ joined count %B): %m\n",
177  LogAddr(ipv6_multicast_group_address).c_str(),
178  nia.name.empty() ? "all interfaces" : nia.name.c_str(),
179  LogAddr(nia.address, LogAddr::Ip).c_str(),
180  this,
182  }
183  }
184  }
185 #endif
186  }
187  break;
188  }
189  }
190 
191  return any_joined;
192 }
OpenDDS_Dcps_Export LogLevel log_level
#define ACE_DEBUG(X)
#define ACE_ERROR(X)
int leave(const ACE_INET_Addr &mcast_addr, const ACE_TCHAR *net_if=0)
int join(const ACE_INET_Addr &mcast_addr, int reuse_addr=1, const ACE_TCHAR *net_if=0)
int register_handler(ACE_Event_Handler *event_handler, ACE_Reactor_Mask mask)
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
ACE_HANDLE get_handle(void) const
#define ACE_TEXT_CHAR_TO_TCHAR(STRING)

The documentation for this class was generated from the following files: