OpenDDS  Snapshot(2023/04/28-20:55)
AgentImpl.cpp
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 #include "AgentImpl.h"
9 
10 #include "Task.h"
11 #include "EndpointManager.h"
12 
13 #include <dds/DCPS/Qos_Helper.h>
15 #include <dds/DCPS/TimeTypes.h>
16 
18 
19 namespace OpenDDS {
20 namespace ICE {
21 
22 #ifdef OPENDDS_SECURITY
23 
24 using DCPS::TimeDuration;
26 
31 
32  ScheduleTimerCommand(ACE_Reactor* a_reactor, ACE_Event_Handler* a_event_handler, const TimeDuration& a_delay) :
33  reactor(a_reactor), event_handler(a_event_handler), delay(a_delay) {}
34 
35  void execute()
36  {
37  reactor->cancel_timer(event_handler, 0);
38  reactor->schedule_timer(event_handler, 0, delay.value());
39  }
40 };
41 
42 void AgentImpl::enqueue(const DCPS::MonotonicTimePoint& a_release_time,
43  WeakTaskPtr wtask)
44 {
45  if (tasks_.empty() || a_release_time < tasks_.top().release_time_) {
46  const MonotonicTimePoint release = std::max(last_execute_ + ICE::Configuration::instance()->T_a(), a_release_time);
47  execute_or_enqueue(DCPS::make_rch<ScheduleTimerCommand>(reactor(), this, release - MonotonicTimePoint::now()));
48  }
49  tasks_.push(Item(a_release_time, wtask));
50 }
51 
53 {
54  return TheServiceParticipant->is_shut_down();
55 }
56 
57 int AgentImpl::handle_timeout(const ACE_Time_Value& a_now, const void* /*act*/)
58 {
59  DCPS::ThreadStatusManager::Event ev(TheServiceParticipant->get_thread_status_manager());
60 
61  const MonotonicTimePoint now(a_now);
63  check_invariants();
64 
65  if (tasks_.empty()) {
66  return 0;
67  }
68 
69  if (tasks_.top().release_time_ <= now) {
70  Item item = tasks_.top();
71  tasks_.pop();
72 
73  TaskPtr task = item.task_.lock();
74  if (task) {
75  task->execute(now);
76  last_execute_ = now;
77  }
78  }
79  process_deferred();
80  check_invariants();
81 
82  if (!tasks_.empty()) {
83  const MonotonicTimePoint release = std::max(last_execute_ + ICE::Configuration::instance()->T_a(), tasks_.top().release_time_);
84  execute_or_enqueue(DCPS::make_rch<ScheduleTimerCommand>(reactor(), this, release - now));
85  }
86 
87  return 0;
88 }
89 
91  : DCPS::InternalDataReaderListener<DCPS::NetworkInterfaceAddress>(TheServiceParticipant->job_queue())
92  , ReactorInterceptor(TheServiceParticipant->reactor(), TheServiceParticipant->reactor_owner())
93  , unfreeze_(false)
94  , reader_(DCPS::make_rch<DCPS::InternalDataReader<DCPS::NetworkInterfaceAddress> >(DCPS::DataReaderQosBuilder().reliability_reliable().durability_transient_local(), DCPS::rchandle_from(this)))
95  , reader_added_(false)
96  , remote_peer_reflexive_counter_(0)
97 {
98  // Bind the lifetime of this to the service participant.
99  TheServiceParticipant->set_shutdown_listener(DCPS::static_rchandle_cast<ShutdownListener>(rchandle_from(this)));
100 }
101 
103 {
106 
107  if (endpoint_managers_.find(a_endpoint) == endpoint_managers_.end()) {
108  EndpointManagerPtr em = DCPS::make_rch<EndpointManager>(this, a_endpoint);
109  endpoint_managers_[a_endpoint] = em;
110  }
111 
113 
114  if (!endpoint_managers_.empty() && !reader_added_) {
115  TheServiceParticipant->network_interface_address_topic()->connect(reader_);
116  reader_added_ = true;
117  }
118 }
119 
121 {
124 
125  EndpointManagerMapType::iterator pos = endpoint_managers_.find(a_endpoint);
126 
127  if (pos != endpoint_managers_.end()) {
128  EndpointManagerPtr em = pos->second;
129  em->purge();
130  endpoint_managers_.erase(pos);
131  }
132 
134 
135  if (endpoint_managers_.empty() && reader_added_) {
136  TheServiceParticipant->network_interface_address_topic()->disconnect(reader_);
137  reader_added_ = false;
138  }
139 }
140 
142 {
144  EndpointManagerMapType::const_iterator pos = endpoint_managers_.find(a_endpoint);
145  OPENDDS_ASSERT(pos != endpoint_managers_.end());
146  return pos->second->agent_info();
147 }
148 
150  const DCPS::GUID_t& a_local_guid,
151  DCPS::WeakRcHandle<AgentInfoListener> a_agent_info_listener)
152 {
154  EndpointManagerMapType::const_iterator pos = endpoint_managers_.find(a_endpoint);
155  OPENDDS_ASSERT(pos != endpoint_managers_.end());
156  pos->second->add_agent_info_listener(a_local_guid, a_agent_info_listener);
157 }
158 
160  const DCPS::GUID_t& a_local_guid)
161 {
163  EndpointManagerMapType::const_iterator pos = endpoint_managers_.find(a_endpoint);
164  OPENDDS_ASSERT(pos != endpoint_managers_.end());
165  pos->second->remove_agent_info_listener(a_local_guid);
166 }
167 
169  const DCPS::GUID_t& a_local_guid,
170  const DCPS::GUID_t& a_remote_guid,
171  const AgentInfo& a_remote_agent_info)
172 {
175  EndpointManagerMapType::const_iterator pos = endpoint_managers_.find(a_endpoint);
176  OPENDDS_ASSERT(pos != endpoint_managers_.end());
177  pos->second->start_ice(a_local_guid, a_remote_guid, a_remote_agent_info);
179 }
180 
182  const DCPS::GUID_t& a_local_guid,
183  const DCPS::GUID_t& a_remote_guid)
184 {
187  EndpointManagerMapType::const_iterator pos = endpoint_managers_.find(a_endpoint);
188  OPENDDS_ASSERT(pos != endpoint_managers_.end());
189  pos->second->stop_ice(a_local_guid, a_remote_guid);
191 }
192 
194  const DCPS::GUID_t& a_local_guid,
195  const DCPS::GUID_t& a_remote_guid) const
196 {
198  EndpointManagerMapType::const_iterator pos = endpoint_managers_.find(a_endpoint);
199  OPENDDS_ASSERT(pos != endpoint_managers_.end());
200  return pos->second->get_address(a_local_guid, a_remote_guid);
201 }
202 
203 // Receive a STUN message.
205  const ACE_INET_Addr& a_local_address,
206  const ACE_INET_Addr& a_remote_address,
207  const STUN::Message& a_message)
208 {
209  if (a_local_address.is_any()) {
210  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) AgentImpl::receive: ERROR local_address is empty, ICE will not work on this platform\n")));
211  return;
212  }
213 
214  if (a_remote_address.is_any()) {
215  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) AgentImpl::receive: ERROR remote_address is empty, ICE will not work on this platform\n")));
216  return;
217  }
218 
221  EndpointManagerMapType::const_iterator pos = endpoint_managers_.find(a_endpoint);
222  OPENDDS_ASSERT(pos != endpoint_managers_.end());
223  pos->second->receive(a_local_address, a_remote_address, a_message);
226 }
227 
228 void AgentImpl::remove(const FoundationType& a_foundation)
229 {
230  // Foundations that are completely removed from the set of active foundations may unfreeze a checklist.
231  unfreeze_ = active_foundations_.remove(a_foundation) || unfreeze_;
232 }
233 
234 void AgentImpl::unfreeze(const FoundationType& a_foundation)
235 {
236  to_unfreeze_.push_back(a_foundation);
237 }
238 
240 {
241  ActiveFoundationSet expected;
242 
243  for (EndpointManagerMapType::const_iterator pos = endpoint_managers_.begin(),
244  limit = endpoint_managers_.end(); pos != limit; ++pos) {
245  pos->second->compute_active_foundations(expected);
246  pos->second->check_invariants();
247  }
248 
249  OPENDDS_ASSERT(expected == active_foundations_);
250 }
251 
253 {
254  reactor()->cancel_timer(this, 0);
255 }
256 
258 {
259  shutdown();
260 }
261 
263 {
265  DCPS::InternalSampleInfoSequence infos;
267 
268  // FUTURE: This polls the endpoints. The endpoints should just publish the change.
270  for (EndpointManagerMapType::const_iterator pos = endpoint_managers_.begin(),
271  limit = endpoint_managers_.end(); pos != limit; ++pos) {
272  pos->second->network_change();
273  }
274 }
275 
277 {
278  // A successful connectivity check unfreezed a foundation.
279  // Communicate this to all endpoints and checklists.
280  for (FoundationList::const_iterator fpos = to_unfreeze_.begin(), flimit = to_unfreeze_.end(); fpos != flimit; ++fpos) {
281  for (EndpointManagerMapType::const_iterator pos = endpoint_managers_.begin(),
282  limit = endpoint_managers_.end(); pos != limit; ++pos) {
283  pos->second->unfreeze(*fpos);
284  }
285  }
286  to_unfreeze_.clear();
287 
288  // A foundation was completely removed.
289  // Communicate this to all endpoints and checklists.
290  if (unfreeze_) {
291  for (EndpointManagerMapType::const_iterator pos = endpoint_managers_.begin(),
292  limit = endpoint_managers_.end(); pos != limit; ++pos) {
293  pos->second->unfreeze();
294  }
295  unfreeze_ = false;
296  }
297 }
298 
299 #endif /* OPENDDS_SECURITY */
300 
301 } // namespace ICE
302 } // namespace OpenDDS
303 
void remove_endpoint(DCPS::WeakRcHandle< Endpoint > a_endpoint)
Definition: AgentImpl.cpp:120
void add_local_agent_info_listener(DCPS::WeakRcHandle< Endpoint > a_endpoint, const DCPS::GUID_t &a_local_guid, DCPS::WeakRcHandle< AgentInfoListener > a_agent_info_listener)
Definition: AgentImpl.cpp:149
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310
#define ACE_ERROR(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
void release(T x)
ActiveFoundationSet active_foundations_
Definition: AgentImpl.h:106
AgentInfo get_local_agent_info(DCPS::WeakRcHandle< Endpoint > a_endpoint) const
Definition: AgentImpl.cpp:141
bool remove(const FoundationType &a_foundation)
Definition: Checklist.h:44
void unfreeze(const FoundationType &a_foundation)
Definition: AgentImpl.cpp:234
std::pair< std::string, std::string > FoundationType
Definition: Checklist.h:34
RcHandle< T > make_rch()
Definition: RcHandle_T.h:256
FoundationList to_unfreeze_
Definition: AgentImpl.h:107
void on_data_available(DCPS::RcHandle< DCPS::InternalDataReader< DCPS::NetworkInterfaceAddress > > reader)
Definition: AgentImpl.cpp:262
#define OPENDDS_ASSERT(C)
Definition: Definitions.h:72
virtual ACE_Reactor * reactor() const
const SampleStateMask ANY_SAMPLE_STATE
ACE_INET_Addr get_address(DCPS::WeakRcHandle< Endpoint > a_endpoint, const DCPS::GUID_t &a_local_guid, const DCPS::GUID_t &a_remote_guid) const
Definition: AgentImpl.cpp:193
const ACE_Time_Value & value() const
void add_endpoint(DCPS::WeakRcHandle< Endpoint > a_endpoint)
Definition: AgentImpl.cpp:102
bool is_any(void) const
void stop_ice(DCPS::WeakRcHandle< Endpoint > a_endpoint, const DCPS::GUID_t &a_local_guid, const DCPS::GUID_t &a_remote_guid)
Definition: AgentImpl.cpp:181
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
virtual long schedule_timer(ACE_Event_Handler *event_handler, const void *arg, const ACE_Time_Value &delay, const ACE_Time_Value &interval=ACE_Time_Value::zero)
const InstanceStateMask ANY_INSTANCE_STATE
ScheduleTimerCommand(ACE_Reactor *a_reactor, ACE_Event_Handler *a_event_handler, const TimeDuration &a_delay)
Definition: AgentImpl.cpp:32
const ViewStateMask ANY_VIEW_STATE
TimePoint_T< MonotonicClock > MonotonicTimePoint
Definition: TimeTypes.h:51
void remove_local_agent_info_listener(DCPS::WeakRcHandle< Endpoint > a_endpoint, const DCPS::GUID_t &a_local_guid)
Definition: AgentImpl.cpp:159
ACE_TEXT("TCP_Factory")
EndpointManagerMapType endpoint_managers_
Definition: AgentImpl.h:113
virtual int cancel_timer(long timer_id, const void **arg=0, int dont_call_handle_close=1)
void remove(const FoundationType &a_foundation)
Definition: AgentImpl.cpp:228
static Configuration * instance()
Definition: Ice.cpp:109
bool reactor_is_shut_down() const
Definition: AgentImpl.cpp:52
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
void receive(DCPS::WeakRcHandle< Endpoint > a_endpoint, const ACE_INET_Addr &a_local_address, const ACE_INET_Addr &a_remote_address, const STUN::Message &a_message)
Definition: AgentImpl.cpp:204
ACE_Recursive_Thread_Mutex mutex
Definition: AgentImpl.h:100
const long LENGTH_UNLIMITED
RcHandle< T > lock() const
Definition: RcObject.h:188
ACE_Event_Handler * event_handler
Definition: AgentImpl.cpp:29
#define TheServiceParticipant
void enqueue(const DCPS::MonotonicTimePoint &a_release_time, WeakTaskPtr a_task)
Definition: AgentImpl.cpp:42
void check_invariants() const
Definition: AgentImpl.cpp:239
LM_ERROR
void start_ice(DCPS::WeakRcHandle< Endpoint > a_endpoint, const DCPS::GUID_t &a_local_guid, const DCPS::GUID_t &a_remote_guid, const AgentInfo &a_remote_agent_info)
Definition: AgentImpl.cpp:168
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
DCPS::RcHandle< DCPS::InternalDataReader< DCPS::NetworkInterfaceAddress > > reader_
Definition: AgentImpl.h:109
int handle_timeout(const ACE_Time_Value &a_now, const void *)
Definition: AgentImpl.cpp:57