OpenDDS  Snapshot(2023/04/28-20:55)
MulticastManager.cpp
Go to the documentation of this file.
1 /*
2  * Distributed under the OpenDDS License.
3  * See: http://www.opendds.org/license.html
4  */
5 
6 #include "DCPS/DdsDcps_pch.h"
7 
8 #include "MulticastManager.h"
9 
10 #include "debug.h"
11 #include "LogAddr.h"
12 
14 
15 namespace OpenDDS {
16 namespace DCPS {
17 
19  InternalSampleInfoSequence& infos,
20  const OPENDDS_STRING& multicast_interface,
21  ACE_Reactor* reactor,
22  ACE_Event_Handler* event_handler,
23  const NetworkAddress& multicast_group_address,
24  ACE_SOCK_Dgram_Mcast& multicast_socket
25 #ifdef ACE_HAS_IPV6
26  , const NetworkAddress& ipv6_multicast_group_address,
27  ACE_SOCK_Dgram_Mcast& ipv6_multicast_socket
28 #endif
29  )
30 {
31  bool any_joined = false;
32 
33  for (size_t idx = 0; idx != samples.size(); ++idx) {
34  NetworkInterfaceAddress& nia = samples[idx];
35  const DDS::SampleInfo& info = infos[idx];
36 
37  if (nia.name.empty()) {
38  nia.name = multicast_interface;
39  }
40 
42  leave(nia, multicast_group_address, multicast_socket
43 #ifdef ACE_HAS_IPV6
44  , ipv6_multicast_group_address, ipv6_multicast_socket
45 #endif
46 );
47  if (nia.exclude_from_multicast(multicast_interface.c_str())) {
48  continue;
49  }
50  const bool j = join(nia, reactor, event_handler, multicast_group_address, multicast_socket
51 #ifdef ACE_HAS_IPV6
52  , ipv6_multicast_group_address, ipv6_multicast_socket
53 #endif
54  );
55  any_joined = any_joined || j;
56  } else {
57  leave(nia, multicast_group_address, multicast_socket
58 #ifdef ACE_HAS_IPV6
59  , ipv6_multicast_group_address, ipv6_multicast_socket
60 #endif
61  );
62  }
63  }
64 
65  return any_joined;
66 }
67 
69 {
70  return joined_interfaces_.size()
71 #ifdef ACE_HAS_IPV6
72  + ipv6_joined_interfaces_.size()
73 #endif
74  ;
75 }
76 
78  ACE_Reactor* reactor,
79  ACE_Event_Handler* event_handler,
80  const NetworkAddress& multicast_group_address,
81  ACE_SOCK_Dgram_Mcast& multicast_socket
82 #ifdef ACE_HAS_IPV6
83  , const NetworkAddress& ipv6_multicast_group_address,
84  ACE_SOCK_Dgram_Mcast& ipv6_multicast_socket
85 #endif
86  )
87 {
88  bool joined = false;
89 
90  if (joined_interfaces_.count(nia.name) == 0 && nia.is_ipv4()) {
91  if (0 == multicast_socket.join(multicast_group_address.to_addr(), 1, nia.name.empty() ? 0 : ACE_TEXT_CHAR_TO_TCHAR(nia.name.c_str()))) {
92  joined_interfaces_.insert(nia.name);
93  if (log_level >= LogLevel::Info) {
95  "(%P|%t) INFO: MulticastManager::join: joined group %C on %C/%C (%@ joined count %B)\n",
96  LogAddr(multicast_group_address).c_str(),
97  nia.name.empty() ? "all interfaces" : nia.name.c_str(),
99  this,
101  }
102  joined = true;
103 
104  if (reactor) {
105  if (reactor->register_handler(multicast_socket.get_handle(),
106  event_handler,
108  if (log_level >= LogLevel::Error) {
109  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: MulticastManager::join: failed to register multicast input handler\n"));
110  }
111  }
112  } else if (log_level >= LogLevel::Error) {
113  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: MulticastManager::join: reactor is NULL\n"));
114  }
115  } else {
116  if (log_level >= LogLevel::Warning) {
118  "(%P|%t) WARNING: MulticastManager::join: failed to join group %C on %C/%C (%@ joined count %B): %m\n",
119  LogAddr(multicast_group_address).c_str(),
120  nia.name.empty() ? "all interfaces" : nia.name.c_str(),
122  this,
124  }
125  }
126  }
127 
128 #ifdef ACE_HAS_IPV6
129  if (ipv6_joined_interfaces_.count(nia.name) == 0 && nia.is_ipv6()) {
130  // Windows 7 has an issue with different threads concurrently calling join for ipv6
131  static ACE_Thread_Mutex ipv6_static_lock;
132  ACE_GUARD_RETURN(ACE_Thread_Mutex, g3, ipv6_static_lock, false);
133 
134  if (0 == ipv6_multicast_socket.join(ipv6_multicast_group_address.to_addr(), 1, nia.name.empty() ? 0 : ACE_TEXT_CHAR_TO_TCHAR(nia.name.c_str()))) {
135  ipv6_joined_interfaces_.insert(nia.name);
136  if (log_level >= LogLevel::Info) {
138  "(%P|%t) INFO: MulticastManager::join: joined group %C on %C/%C (%@ joined count %B)\n",
139  LogAddr(ipv6_multicast_group_address).c_str(),
140  nia.name.empty() ? "all interfaces" : nia.name.c_str(),
142  this,
144  }
145  joined = true;
146 
147  if (reactor) {
148  if (reactor->register_handler(ipv6_multicast_socket.get_handle(),
149  event_handler,
151  if (log_level >= LogLevel::Error) {
152  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: MulticastManager::join: ipv6 failed to register multicast input handler\n"));
153  }
154  }
155  } else if (log_level >= LogLevel::Error) {
156  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: MulticastManager::join: ipv6 reactor is NULL\n"));
157  }
158  } else {
159  if (log_level >= LogLevel::Warning) {
161  "(%P|%t) WARNING: MulticastManager::join: failed to join group %C on %C/%C (%@ joined count %B): %m\n",
162  LogAddr(ipv6_multicast_group_address).c_str(),
163  nia.name.empty() ? "all interfaces" : nia.name.c_str(),
165  this,
167  }
168  }
169  }
170 #endif
171 
172  return joined;
173 }
174 
176  const NetworkAddress& multicast_group_address,
177  ACE_SOCK_Dgram_Mcast& multicast_socket
178 #ifdef ACE_HAS_IPV6
179  , const NetworkAddress& ipv6_multicast_group_address,
180  ACE_SOCK_Dgram_Mcast& ipv6_multicast_socket
181 #endif
182 )
183 {
184  if (joined_interfaces_.count(nia.name) != 0 && nia.is_ipv4()) {
185  if (0 == multicast_socket.leave(multicast_group_address.to_addr(), nia.name.empty() ? 0 : ACE_TEXT_CHAR_TO_TCHAR(nia.name.c_str()))) {
186  joined_interfaces_.erase(nia.name);
187  if (log_level >= LogLevel::Info) {
189  "(%P|%t) INFO: MulticastManager::leave: left group %C on %C/%C (%@ joined count %B)\n",
190  LogAddr(multicast_group_address).c_str(),
191  nia.name.empty() ? "all interfaces" : nia.name.c_str(),
193  this,
195  }
196  } else {
197  if (log_level >= LogLevel::Warning) {
199  "(%P|%t) WARNING: MulticastManager::leave: failed to leave group %C on %C/%C (%@ joined count %B): %m\n",
200  LogAddr(multicast_group_address).c_str(),
201  nia.name.empty() ? "all interfaces" : nia.name.c_str(),
203  this,
205  }
206  }
207  }
208 
209 #ifdef ACE_HAS_IPV6
210  if (ipv6_joined_interfaces_.count(nia.name) != 0 && nia.is_ipv6()) {
211  if (0 == ipv6_multicast_socket.leave(ipv6_multicast_group_address.to_addr(), nia.name.empty() ? 0 : ACE_TEXT_CHAR_TO_TCHAR(nia.name.c_str()))) {
212  ipv6_joined_interfaces_.erase(nia.name);
213  if (log_level >= LogLevel::Info) {
215  "(%P|%t) INFO: MulticastManager::leave: left group %C on %C/%C (%@ joined count %B)\n",
216  LogAddr(ipv6_multicast_group_address).c_str(),
217  nia.name.empty() ? "all interfaces" : nia.name.c_str(),
219  this,
221  }
222  } else {
223  if (log_level >= LogLevel::Warning) {
225  "(%P|%t) WARNING: MulticastManager::leave: failed to leave group %C on %C/%C (%@ joined count %B): %m\n",
226  LogAddr(ipv6_multicast_group_address).c_str(),
227  nia.name.empty() ? "all interfaces" : nia.name.c_str(),
229  this,
231  }
232  }
233  }
234 #endif
235 }
236 
237 } // namespace DCPS
238 } // namespace OpenDDS
239 
bool exclude_from_multicast(const char *configured_interface) const
#define ACE_DEBUG(X)
#define ACE_ERROR(X)
InstanceStateKind instance_state
void leave(const NetworkInterfaceAddress &nia, const NetworkAddress &multicast_group_address, ACE_SOCK_Dgram_Mcast &multicast_socket)
ACE_INET_Addr to_addr() const
LM_INFO
#define OPENDDS_STRING
int register_handler(ACE_Event_Handler *event_handler, ACE_Reactor_Mask mask)
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
bool join(const NetworkInterfaceAddress &nia, ACE_Reactor *reactor, ACE_Event_Handler *event_handler, const NetworkAddress &multicast_group_address, ACE_SOCK_Dgram_Mcast &multicast_socket)
const char * c_str() const
Definition: LogAddr.h:32
LM_WARNING
#define ACE_TEXT_CHAR_TO_TCHAR(STRING)
OpenDDS_Dcps_Export LogLevel log_level
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
LM_ERROR
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
const InstanceStateKind ALIVE_INSTANCE_STATE
bool process(InternalDataReader< NetworkInterfaceAddress >::SampleSequence &samples, InternalSampleInfoSequence &infos, const OPENDDS_STRING &multicast_interface, ACE_Reactor *reactor, ACE_Event_Handler *event_handler, const NetworkAddress &multicast_group_address, ACE_SOCK_Dgram_Mcast &multicast_socket)
Returns true if at least one group was joined.