OpenDDS  Snapshot(2023/04/28-20:55)
Public Member Functions | Private Member Functions | List of all members
OpenDDS::DCPS::MulticastManager Class Reference

#include <MulticastManager.h>

Public Member Functions

bool process (InternalDataReader< NetworkInterfaceAddress >::SampleSequence &samples, InternalSampleInfoSequence &infos, const OPENDDS_STRING &multicast_interface, ACE_Reactor *reactor, ACE_Event_Handler *event_handler, const NetworkAddress &multicast_group_address, ACE_SOCK_Dgram_Mcast &multicast_socket)
 Returns true if at least one group was joined. More...
 

Private Member Functions

size_t joined_interface_count () const
 
bool join (const NetworkInterfaceAddress &nia, ACE_Reactor *reactor, ACE_Event_Handler *event_handler, const NetworkAddress &multicast_group_address, ACE_SOCK_Dgram_Mcast &multicast_socket)
 
void leave (const NetworkInterfaceAddress &nia, const NetworkAddress &multicast_group_address, ACE_SOCK_Dgram_Mcast &multicast_socket)
 
 OPENDDS_SET (OPENDDS_STRING) joined_interfaces_
 

Detailed Description

Definition at line 24 of file MulticastManager.h.

Member Function Documentation

◆ join()

bool OpenDDS::DCPS::MulticastManager::join ( const NetworkInterfaceAddress nia,
ACE_Reactor reactor,
ACE_Event_Handler event_handler,
const NetworkAddress multicast_group_address,
ACE_SOCK_Dgram_Mcast multicast_socket 
)
private

Definition at line 77 of file MulticastManager.cpp.

References ACE_DEBUG, ACE_ERROR, ACE_GUARD_RETURN, ACE_TEXT_CHAR_TO_TCHAR, OpenDDS::DCPS::NetworkInterfaceAddress::address, OpenDDS::DCPS::LogAddr::c_str(), OpenDDS::DCPS::LogLevel::Error, OpenDDS::DCPS::LogLevel::Info, OpenDDS::DCPS::LogAddr::Ip, OpenDDS::DCPS::NetworkInterfaceAddress::is_ipv4(), OpenDDS::DCPS::NetworkInterfaceAddress::is_ipv6(), joined_interface_count(), LM_ERROR, LM_INFO, LM_WARNING, OpenDDS::DCPS::log_level, OpenDDS::DCPS::NetworkInterfaceAddress::name, ACE_Event_Handler::READ_MASK, ACE_Reactor::register_handler(), OpenDDS::DCPS::NetworkAddress::to_addr(), and OpenDDS::DCPS::LogLevel::Warning.

Referenced by process().

87 {
88  bool joined = false;
89 
90  if (joined_interfaces_.count(nia.name) == 0 && nia.is_ipv4()) {
91  if (0 == multicast_socket.join(multicast_group_address.to_addr(), 1, nia.name.empty() ? 0 : ACE_TEXT_CHAR_TO_TCHAR(nia.name.c_str()))) {
92  joined_interfaces_.insert(nia.name);
93  if (log_level >= LogLevel::Info) {
94  ACE_DEBUG((LM_INFO,
95  "(%P|%t) INFO: MulticastManager::join: joined group %C on %C/%C (%@ joined count %B)\n",
96  LogAddr(multicast_group_address).c_str(),
97  nia.name.empty() ? "all interfaces" : nia.name.c_str(),
98  LogAddr(nia.address, LogAddr::Ip).c_str(),
99  this,
101  }
102  joined = true;
103 
104  if (reactor) {
105  if (reactor->register_handler(multicast_socket.get_handle(),
106  event_handler,
108  if (log_level >= LogLevel::Error) {
109  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: MulticastManager::join: failed to register multicast input handler\n"));
110  }
111  }
112  } else if (log_level >= LogLevel::Error) {
113  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: MulticastManager::join: reactor is NULL\n"));
114  }
115  } else {
116  if (log_level >= LogLevel::Warning) {
117  ACE_ERROR((LM_WARNING,
118  "(%P|%t) WARNING: MulticastManager::join: failed to join group %C on %C/%C (%@ joined count %B): %m\n",
119  LogAddr(multicast_group_address).c_str(),
120  nia.name.empty() ? "all interfaces" : nia.name.c_str(),
121  LogAddr(nia.address, LogAddr::Ip).c_str(),
122  this,
124  }
125  }
126  }
127 
128 #ifdef ACE_HAS_IPV6
129  if (ipv6_joined_interfaces_.count(nia.name) == 0 && nia.is_ipv6()) {
130  // Windows 7 has an issue with different threads concurrently calling join for ipv6
131  static ACE_Thread_Mutex ipv6_static_lock;
132  ACE_GUARD_RETURN(ACE_Thread_Mutex, g3, ipv6_static_lock, false);
133 
134  if (0 == ipv6_multicast_socket.join(ipv6_multicast_group_address.to_addr(), 1, nia.name.empty() ? 0 : ACE_TEXT_CHAR_TO_TCHAR(nia.name.c_str()))) {
135  ipv6_joined_interfaces_.insert(nia.name);
136  if (log_level >= LogLevel::Info) {
137  ACE_DEBUG((LM_INFO,
138  "(%P|%t) INFO: MulticastManager::join: joined group %C on %C/%C (%@ joined count %B)\n",
139  LogAddr(ipv6_multicast_group_address).c_str(),
140  nia.name.empty() ? "all interfaces" : nia.name.c_str(),
141  LogAddr(nia.address, LogAddr::Ip).c_str(),
142  this,
144  }
145  joined = true;
146 
147  if (reactor) {
148  if (reactor->register_handler(ipv6_multicast_socket.get_handle(),
149  event_handler,
151  if (log_level >= LogLevel::Error) {
152  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: MulticastManager::join: ipv6 failed to register multicast input handler\n"));
153  }
154  }
155  } else if (log_level >= LogLevel::Error) {
156  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: MulticastManager::join: ipv6 reactor is NULL\n"));
157  }
158  } else {
159  if (log_level >= LogLevel::Warning) {
160  ACE_ERROR((LM_WARNING,
161  "(%P|%t) WARNING: MulticastManager::join: failed to join group %C on %C/%C (%@ joined count %B): %m\n",
162  LogAddr(ipv6_multicast_group_address).c_str(),
163  nia.name.empty() ? "all interfaces" : nia.name.c_str(),
164  LogAddr(nia.address, LogAddr::Ip).c_str(),
165  this,
167  }
168  }
169  }
170 #endif
171 
172  return joined;
173 }
#define ACE_DEBUG(X)
#define ACE_ERROR(X)
int join(const ACE_INET_Addr &mcast_addr, int reuse_addr=1, const ACE_TCHAR *net_if=0)
int register_handler(ACE_Event_Handler *event_handler, ACE_Reactor_Mask mask)
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
ACE_HANDLE get_handle(void) const
#define ACE_TEXT_CHAR_TO_TCHAR(STRING)
OpenDDS_Dcps_Export LogLevel log_level

◆ joined_interface_count()

size_t OpenDDS::DCPS::MulticastManager::joined_interface_count ( ) const
private

Definition at line 68 of file MulticastManager.cpp.

Referenced by join(), and leave().

69 {
70  return joined_interfaces_.size()
71 #ifdef ACE_HAS_IPV6
72  + ipv6_joined_interfaces_.size()
73 #endif
74  ;
75 }

◆ leave()

void OpenDDS::DCPS::MulticastManager::leave ( const NetworkInterfaceAddress nia,
const NetworkAddress multicast_group_address,
ACE_SOCK_Dgram_Mcast multicast_socket 
)
private

Definition at line 175 of file MulticastManager.cpp.

References ACE_DEBUG, ACE_ERROR, ACE_TEXT_CHAR_TO_TCHAR, OpenDDS::DCPS::NetworkInterfaceAddress::address, OpenDDS::DCPS::LogAddr::c_str(), OpenDDS::DCPS::LogLevel::Info, OpenDDS::DCPS::LogAddr::Ip, OpenDDS::DCPS::NetworkInterfaceAddress::is_ipv4(), OpenDDS::DCPS::NetworkInterfaceAddress::is_ipv6(), joined_interface_count(), LM_INFO, LM_WARNING, OpenDDS::DCPS::log_level, OpenDDS::DCPS::NetworkInterfaceAddress::name, OPENDDS_END_VERSIONED_NAMESPACE_DECL, OpenDDS::DCPS::NetworkAddress::to_addr(), and OpenDDS::DCPS::LogLevel::Warning.

Referenced by process().

183 {
184  if (joined_interfaces_.count(nia.name) != 0 && nia.is_ipv4()) {
185  if (0 == multicast_socket.leave(multicast_group_address.to_addr(), nia.name.empty() ? 0 : ACE_TEXT_CHAR_TO_TCHAR(nia.name.c_str()))) {
186  joined_interfaces_.erase(nia.name);
187  if (log_level >= LogLevel::Info) {
188  ACE_DEBUG((LM_INFO,
189  "(%P|%t) INFO: MulticastManager::leave: left group %C on %C/%C (%@ joined count %B)\n",
190  LogAddr(multicast_group_address).c_str(),
191  nia.name.empty() ? "all interfaces" : nia.name.c_str(),
192  LogAddr(nia.address, LogAddr::Ip).c_str(),
193  this,
195  }
196  } else {
197  if (log_level >= LogLevel::Warning) {
198  ACE_ERROR((LM_WARNING,
199  "(%P|%t) WARNING: MulticastManager::leave: failed to leave group %C on %C/%C (%@ joined count %B): %m\n",
200  LogAddr(multicast_group_address).c_str(),
201  nia.name.empty() ? "all interfaces" : nia.name.c_str(),
202  LogAddr(nia.address, LogAddr::Ip).c_str(),
203  this,
205  }
206  }
207  }
208 
209 #ifdef ACE_HAS_IPV6
210  if (ipv6_joined_interfaces_.count(nia.name) != 0 && nia.is_ipv6()) {
211  if (0 == ipv6_multicast_socket.leave(ipv6_multicast_group_address.to_addr(), nia.name.empty() ? 0 : ACE_TEXT_CHAR_TO_TCHAR(nia.name.c_str()))) {
212  ipv6_joined_interfaces_.erase(nia.name);
213  if (log_level >= LogLevel::Info) {
214  ACE_DEBUG((LM_INFO,
215  "(%P|%t) INFO: MulticastManager::leave: left group %C on %C/%C (%@ joined count %B)\n",
216  LogAddr(ipv6_multicast_group_address).c_str(),
217  nia.name.empty() ? "all interfaces" : nia.name.c_str(),
218  LogAddr(nia.address, LogAddr::Ip).c_str(),
219  this,
221  }
222  } else {
223  if (log_level >= LogLevel::Warning) {
224  ACE_ERROR((LM_WARNING,
225  "(%P|%t) WARNING: MulticastManager::leave: failed to leave group %C on %C/%C (%@ joined count %B): %m\n",
226  LogAddr(ipv6_multicast_group_address).c_str(),
227  nia.name.empty() ? "all interfaces" : nia.name.c_str(),
228  LogAddr(nia.address, LogAddr::Ip).c_str(),
229  this,
231  }
232  }
233  }
234 #endif
235 }
#define ACE_DEBUG(X)
#define ACE_ERROR(X)
int leave(const ACE_INET_Addr &mcast_addr, const ACE_TCHAR *net_if=0)
#define ACE_TEXT_CHAR_TO_TCHAR(STRING)
OpenDDS_Dcps_Export LogLevel log_level

◆ OPENDDS_SET()

OpenDDS::DCPS::MulticastManager::OPENDDS_SET ( OPENDDS_STRING  )
private

◆ process()

bool OpenDDS::DCPS::MulticastManager::process ( InternalDataReader< NetworkInterfaceAddress >::SampleSequence &  samples,
InternalSampleInfoSequence &  infos,
const OPENDDS_STRING multicast_interface,
ACE_Reactor reactor,
ACE_Event_Handler event_handler,
const NetworkAddress multicast_group_address,
ACE_SOCK_Dgram_Mcast multicast_socket 
)

Returns true if at least one group was joined.

Definition at line 18 of file MulticastManager.cpp.

References DDS::ALIVE_INSTANCE_STATE, OpenDDS::DCPS::NetworkInterfaceAddress::exclude_from_multicast(), DDS::SampleInfo::instance_state, join(), leave(), and OpenDDS::DCPS::NetworkInterfaceAddress::name.

Referenced by OpenDDS::DCPS::RtpsUdpDataLink::on_data_available(), and OpenDDS::RTPS::Spdp::SpdpTransport::on_data_available().

30 {
31  bool any_joined = false;
32 
33  for (size_t idx = 0; idx != samples.size(); ++idx) {
34  NetworkInterfaceAddress& nia = samples[idx];
35  const DDS::SampleInfo& info = infos[idx];
36 
37  if (nia.name.empty()) {
38  nia.name = multicast_interface;
39  }
40 
42  leave(nia, multicast_group_address, multicast_socket
43 #ifdef ACE_HAS_IPV6
44  , ipv6_multicast_group_address, ipv6_multicast_socket
45 #endif
46 );
47  if (nia.exclude_from_multicast(multicast_interface.c_str())) {
48  continue;
49  }
50  const bool j = join(nia, reactor, event_handler, multicast_group_address, multicast_socket
51 #ifdef ACE_HAS_IPV6
52  , ipv6_multicast_group_address, ipv6_multicast_socket
53 #endif
54  );
55  any_joined = any_joined || j;
56  } else {
57  leave(nia, multicast_group_address, multicast_socket
58 #ifdef ACE_HAS_IPV6
59  , ipv6_multicast_group_address, ipv6_multicast_socket
60 #endif
61  );
62  }
63  }
64 
65  return any_joined;
66 }
InstanceStateKind instance_state
void leave(const NetworkInterfaceAddress &nia, const NetworkAddress &multicast_group_address, ACE_SOCK_Dgram_Mcast &multicast_socket)
bool join(const NetworkInterfaceAddress &nia, ACE_Reactor *reactor, ACE_Event_Handler *event_handler, const NetworkAddress &multicast_group_address, ACE_SOCK_Dgram_Mcast &multicast_socket)
const InstanceStateKind ALIVE_INSTANCE_STATE

The documentation for this class was generated from the following files: