OpenDDS  Snapshot(2023/04/28-20:55)
Classes | Public Member Functions | Public Attributes | Private Types | Private Member Functions | Private Attributes | List of all members
OpenDDS::ICE::AgentImpl Class Reference

#include <AgentImpl.h>

Inheritance diagram for OpenDDS::ICE::AgentImpl:
Inheritance graph
[legend]
Collaboration diagram for OpenDDS::ICE::AgentImpl:
Collaboration graph
[legend]

Classes

struct  Item
 

Public Member Functions

 AgentImpl ()
 
void shutdown ()
 
void notify_shutdown ()
 
void add_endpoint (DCPS::WeakRcHandle< Endpoint > a_endpoint)
 
void remove_endpoint (DCPS::WeakRcHandle< Endpoint > a_endpoint)
 
AgentInfo get_local_agent_info (DCPS::WeakRcHandle< Endpoint > a_endpoint) const
 
void add_local_agent_info_listener (DCPS::WeakRcHandle< Endpoint > a_endpoint, const DCPS::GUID_t &a_local_guid, DCPS::WeakRcHandle< AgentInfoListener > a_agent_info_listener)
 
void remove_local_agent_info_listener (DCPS::WeakRcHandle< Endpoint > a_endpoint, const DCPS::GUID_t &a_local_guid)
 
void start_ice (DCPS::WeakRcHandle< Endpoint > a_endpoint, const DCPS::GUID_t &a_local_guid, const DCPS::GUID_t &a_remote_guid, const AgentInfo &a_remote_agent_info)
 
void stop_ice (DCPS::WeakRcHandle< Endpoint > a_endpoint, const DCPS::GUID_t &a_local_guid, const DCPS::GUID_t &a_remote_guid)
 
ACE_INET_Addr get_address (DCPS::WeakRcHandle< Endpoint > a_endpoint, const DCPS::GUID_t &a_local_guid, const DCPS::GUID_t &a_remote_guid) const
 
void receive (DCPS::WeakRcHandle< Endpoint > a_endpoint, const ACE_INET_Addr &a_local_address, const ACE_INET_Addr &a_remote_address, const STUN::Message &a_message)
 
void enqueue (const DCPS::MonotonicTimePoint &a_release_time, WeakTaskPtr a_task)
 
size_t remote_peer_reflexive_counter ()
 
bool contains (const FoundationType &a_foundation) const
 
void add (const FoundationType &a_foundation)
 
void remove (const FoundationType &a_foundation)
 
void unfreeze (const FoundationType &a_foundation)
 
- Public Member Functions inherited from OpenDDS::ICE::Agent
virtual ~Agent ()
 
- Public Member Functions inherited from OpenDDS::DCPS::RcObject
virtual ~RcObject ()
 
virtual void _add_ref ()
 
virtual void _remove_ref ()
 
long ref_count () const
 
WeakObject_get_weak_object () const
 
- Public Member Functions inherited from OpenDDS::DCPS::ShutdownListener
virtual ~ShutdownListener ()
 
- Public Member Functions inherited from OpenDDS::DCPS::InternalDataReaderListener< DCPS::NetworkInterfaceAddress >
 InternalDataReaderListener ()
 
 InternalDataReaderListener (JobQueue_rch job_queue)
 
void job_queue (JobQueue_rch job_queue)
 
virtual void on_data_available (InternalDataReader_rch reader)=0
 
void schedule (InternalDataReader_rch reader)
 
- Public Member Functions inherited from OpenDDS::DCPS::ReactorInterceptor
CommandPtr execute_or_enqueue (CommandPtr command)
 
virtual void reactor (ACE_Reactor *reactor)
 
virtual ACE_Reactorreactor () const
 
- Public Member Functions inherited from OpenDDS::DCPS::RcEventHandler
 RcEventHandler ()
 
ACE_Event_Handler::Reference_Count add_reference ()
 
ACE_Event_Handler::Reference_Count remove_reference ()
 
- Public Member Functions inherited from ACE_Event_Handler
virtual ~ACE_Event_Handler (void)
 
virtual ACE_HANDLE get_handle (void) const
 
virtual void set_handle (ACE_HANDLE)
 
virtual int priority (void) const
 
virtual void priority (int priority)
 
virtual int handle_input (ACE_HANDLE fd=ACE_INVALID_HANDLE)
 
virtual int handle_output (ACE_HANDLE fd=ACE_INVALID_HANDLE)
 
virtual int handle_exit (ACE_Process *)
 
virtual int handle_close (ACE_HANDLE handle, ACE_Reactor_Mask close_mask)
 
virtual int handle_signal (int signum, siginfo_t *=0, ucontext_t *=0)
 
virtual int resume_handler (void)
 
virtual int handle_qos (ACE_HANDLE=ACE_INVALID_HANDLE)
 
virtual int handle_group_qos (ACE_HANDLE=ACE_INVALID_HANDLE)
 
virtual ACE_Reactor_Timer_Interfacereactor_timer_interface (void) const
 
Reference_Counting_Policyreference_counting_policy (void)
 

Public Attributes

ACE_Recursive_Thread_Mutex mutex
 
- Public Attributes inherited from ACE_Event_Handler
 LO_PRIORITY
 
 HI_PRIORITY
 
 NULL_MASK
 
 READ_MASK
 
 WRITE_MASK
 
 EXCEPT_MASK
 
 ACCEPT_MASK
 
 CONNECT_MASK
 
 TIMER_MASK
 
 QOS_MASK
 
 GROUP_QOS_MASK
 
 SIGNAL_MASK
 
 ALL_EVENTS_MASK
 
 RWE_MASK
 
 DONT_CALL
 
 ACE_EVENT_HANDLER_NOT_RESUMED
 
 ACE_REACTOR_RESUMES_HANDLER
 
 ACE_APPLICATION_RESUMES_HANDLER
 

Private Types

typedef std::map< DCPS::WeakRcHandle< Endpoint >, EndpointManagerPtrEndpointManagerMapType
 

Private Member Functions

void on_data_available (DCPS::RcHandle< DCPS::InternalDataReader< DCPS::NetworkInterfaceAddress > > reader)
 
void process_deferred ()
 
bool reactor_is_shut_down () const
 
int handle_timeout (const ACE_Time_Value &a_now, const void *)
 
void check_invariants () const
 

Private Attributes

ActiveFoundationSet active_foundations_
 
FoundationList to_unfreeze_
 
bool unfreeze_
 
DCPS::RcHandle< DCPS::InternalDataReader< DCPS::NetworkInterfaceAddress > > reader_
 
bool reader_added_
 
size_t remote_peer_reflexive_counter_
 
EndpointManagerMapType endpoint_managers_
 
std::priority_queue< Itemtasks_
 
DCPS::MonotonicTimePoint last_execute_
 

Additional Inherited Members

- Public Types inherited from OpenDDS::DCPS::InternalDataReaderListener< DCPS::NetworkInterfaceAddress >
typedef RcHandle< InternalDataReader< DCPS::NetworkInterfaceAddress > > InternalDataReader_rch
 
- Public Types inherited from OpenDDS::DCPS::ReactorInterceptor
typedef RcHandle< CommandCommandPtr
 
- Public Types inherited from ACE_Event_Handler
typedef long Reference_Count
 
- Static Public Member Functions inherited from OpenDDS::ICE::Agent
static DCPS::RcHandle< Agentinstance ()
 
- Static Public Member Functions inherited from ACE_Event_Handler
static ACE_THR_FUNC_RETURN read_adapter (void *event_handler)
 
static int register_stdin_handler (ACE_Event_Handler *eh, ACE_Reactor *reactor, ACE_Thread_Manager *thr_mgr, int flags=THR_DETACHED)
 
static int remove_stdin_handler (ACE_Reactor *reactor, ACE_Thread_Manager *thr_mgr)
 
- Protected Types inherited from OpenDDS::DCPS::ReactorInterceptor
enum  ReactorState { RS_NONE, RS_NOTIFIED, RS_PROCESSING }
 
- Protected Types inherited from ACE_Event_Handler
typedef ACE_Atomic_Op< ACE_SYNCH_MUTEX, Reference_CountAtomic_Reference_Count
 
- Protected Member Functions inherited from OpenDDS::DCPS::RcObject
 RcObject ()
 
- Protected Member Functions inherited from OpenDDS::DCPS::ReactorInterceptor
 ReactorInterceptor (ACE_Reactor *reactor, ACE_thread_t owner)
 
virtual ~ReactorInterceptor ()
 
int handle_exception (ACE_HANDLE)
 
void process_command_queue_i (ACE_Guard< ACE_Thread_Mutex > &guard)
 
typedef OPENDDS_VECTOR (CommandPtr) Queue
 
- Protected Member Functions inherited from ACE_Event_Handler
 ACE_Event_Handler (ACE_Reactor *=0, int priority=ACE_Event_Handler::LO_PRIORITY)
 
- Protected Attributes inherited from OpenDDS::DCPS::ReactorInterceptor
ACE_thread_t owner_
 
ACE_Thread_Mutex mutex_
 
Queue command_queue_
 
ReactorState state_
 
- Protected Attributes inherited from ACE_Event_Handler
Atomic_Reference_Count reference_count_
 

Detailed Description

Definition at line 35 of file AgentImpl.h.

Member Typedef Documentation

◆ EndpointManagerMapType

Definition at line 112 of file AgentImpl.h.

Constructor & Destructor Documentation

◆ AgentImpl()

OpenDDS::ICE::AgentImpl::AgentImpl ( )

Definition at line 90 of file AgentImpl.cpp.

References OpenDDS::DCPS::rchandle_from(), and TheServiceParticipant.

91  : DCPS::InternalDataReaderListener<DCPS::NetworkInterfaceAddress>(TheServiceParticipant->job_queue())
92  , ReactorInterceptor(TheServiceParticipant->reactor(), TheServiceParticipant->reactor_owner())
93  , unfreeze_(false)
94  , reader_(DCPS::make_rch<DCPS::InternalDataReader<DCPS::NetworkInterfaceAddress> >(DCPS::DataReaderQosBuilder().reliability_reliable().durability_transient_local(), DCPS::rchandle_from(this)))
95  , reader_added_(false)
97 {
98  // Bind the lifetime of this to the service participant.
99  TheServiceParticipant->set_shutdown_listener(DCPS::static_rchandle_cast<ShutdownListener>(rchandle_from(this)));
100 }
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310
RcHandle< T > make_rch()
Definition: RcHandle_T.h:256
size_t remote_peer_reflexive_counter_
Definition: AgentImpl.h:111
#define TheServiceParticipant
ReactorInterceptor(ACE_Reactor *reactor, ACE_thread_t owner)
DCPS::RcHandle< DCPS::InternalDataReader< DCPS::NetworkInterfaceAddress > > reader_
Definition: AgentImpl.h:109

Member Function Documentation

◆ add()

void OpenDDS::ICE::AgentImpl::add ( const FoundationType a_foundation)
inline

Definition at line 91 of file AgentImpl.h.

References active_foundations_, OpenDDS::ICE::ActiveFoundationSet::add(), and unfreeze().

Referenced by OpenDDS::ICE::Checklist::add_triggered_check(), and OpenDDS::ICE::Checklist::unfreeze().

92  {
93  active_foundations_.add(a_foundation);
94  }
ActiveFoundationSet active_foundations_
Definition: AgentImpl.h:106
void add(const FoundationType &a_foundation)
Definition: Checklist.h:38

◆ add_endpoint()

void OpenDDS::ICE::AgentImpl::add_endpoint ( DCPS::WeakRcHandle< Endpoint a_endpoint)
virtual

Implements OpenDDS::ICE::Agent.

Definition at line 102 of file AgentImpl.cpp.

References ACE_GUARD, check_invariants(), endpoint_managers_, mutex, reader_, reader_added_, and TheServiceParticipant.

103 {
106 
107  if (endpoint_managers_.find(a_endpoint) == endpoint_managers_.end()) {
108  EndpointManagerPtr em = DCPS::make_rch<EndpointManager>(this, a_endpoint);
109  endpoint_managers_[a_endpoint] = em;
110  }
111 
113 
114  if (!endpoint_managers_.empty() && !reader_added_) {
115  TheServiceParticipant->network_interface_address_topic()->connect(reader_);
116  reader_added_ = true;
117  }
118 }
DCPS::RcHandle< EndpointManager > EndpointManagerPtr
#define ACE_GUARD(MUTEX, OBJ, LOCK)
EndpointManagerMapType endpoint_managers_
Definition: AgentImpl.h:113
ACE_Recursive_Thread_Mutex mutex
Definition: AgentImpl.h:100
#define TheServiceParticipant
void check_invariants() const
Definition: AgentImpl.cpp:239
DCPS::RcHandle< DCPS::InternalDataReader< DCPS::NetworkInterfaceAddress > > reader_
Definition: AgentImpl.h:109

◆ add_local_agent_info_listener()

void OpenDDS::ICE::AgentImpl::add_local_agent_info_listener ( DCPS::WeakRcHandle< Endpoint a_endpoint,
const DCPS::GUID_t a_local_guid,
DCPS::WeakRcHandle< AgentInfoListener a_agent_info_listener 
)
virtual

Implements OpenDDS::ICE::Agent.

Definition at line 149 of file AgentImpl.cpp.

References ACE_GUARD, endpoint_managers_, mutex, and OPENDDS_ASSERT.

152 {
154  EndpointManagerMapType::const_iterator pos = endpoint_managers_.find(a_endpoint);
155  OPENDDS_ASSERT(pos != endpoint_managers_.end());
156  pos->second->add_agent_info_listener(a_local_guid, a_agent_info_listener);
157 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
#define OPENDDS_ASSERT(C)
Definition: Definitions.h:72
EndpointManagerMapType endpoint_managers_
Definition: AgentImpl.h:113
ACE_Recursive_Thread_Mutex mutex
Definition: AgentImpl.h:100

◆ check_invariants()

void OpenDDS::ICE::AgentImpl::check_invariants ( ) const
private

Definition at line 239 of file AgentImpl.cpp.

References active_foundations_, endpoint_managers_, and OPENDDS_ASSERT.

Referenced by add_endpoint(), receive(), remove_endpoint(), start_ice(), and stop_ice().

240 {
241  ActiveFoundationSet expected;
242 
243  for (EndpointManagerMapType::const_iterator pos = endpoint_managers_.begin(),
244  limit = endpoint_managers_.end(); pos != limit; ++pos) {
245  pos->second->compute_active_foundations(expected);
246  pos->second->check_invariants();
247  }
248 
249  OPENDDS_ASSERT(expected == active_foundations_);
250 }
ActiveFoundationSet active_foundations_
Definition: AgentImpl.h:106
#define OPENDDS_ASSERT(C)
Definition: Definitions.h:72
EndpointManagerMapType endpoint_managers_
Definition: AgentImpl.h:113

◆ contains()

bool OpenDDS::ICE::AgentImpl::contains ( const FoundationType a_foundation) const
inline

Definition at line 86 of file AgentImpl.h.

References active_foundations_, and OpenDDS::ICE::ActiveFoundationSet::contains().

Referenced by OpenDDS::ICE::Checklist::unfreeze().

87  {
88  return active_foundations_.contains(a_foundation);
89  }
ActiveFoundationSet active_foundations_
Definition: AgentImpl.h:106
bool contains(const FoundationType &a_foundation) const
Definition: Checklist.h:58

◆ enqueue()

void OpenDDS::ICE::AgentImpl::enqueue ( const DCPS::MonotonicTimePoint a_release_time,
WeakTaskPtr  a_task 
)

Definition at line 42 of file AgentImpl.cpp.

References OpenDDS::ICE::Configuration::instance(), OpenDDS::DCPS::ReactorInterceptor::Command::reactor(), and release().

Referenced by OpenDDS::ICE::Task::enqueue().

44 {
45  if (tasks_.empty() || a_release_time < tasks_.top().release_time_) {
46  const MonotonicTimePoint release = std::max(last_execute_ + ICE::Configuration::instance()->T_a(), a_release_time);
47  execute_or_enqueue(DCPS::make_rch<ScheduleTimerCommand>(reactor(), this, release - MonotonicTimePoint::now()));
48  }
49  tasks_.push(Item(a_release_time, wtask));
50 }
void release(T x)
CommandPtr execute_or_enqueue(CommandPtr command)
virtual ACE_Reactor * reactor() const
TimePoint_T< MonotonicClock > MonotonicTimePoint
Definition: TimeTypes.h:51
std::priority_queue< Item > tasks_
Definition: AgentImpl.h:127
static Configuration * instance()
Definition: Ice.cpp:109
DCPS::MonotonicTimePoint last_execute_
Definition: AgentImpl.h:128

◆ get_address()

ACE_INET_Addr OpenDDS::ICE::AgentImpl::get_address ( DCPS::WeakRcHandle< Endpoint a_endpoint,
const DCPS::GUID_t a_local_guid,
const DCPS::GUID_t a_remote_guid 
) const
virtual

Implements OpenDDS::ICE::Agent.

Definition at line 193 of file AgentImpl.cpp.

References ACE_GUARD_RETURN, endpoint_managers_, mutex, and OPENDDS_ASSERT.

196 {
198  EndpointManagerMapType::const_iterator pos = endpoint_managers_.find(a_endpoint);
199  OPENDDS_ASSERT(pos != endpoint_managers_.end());
200  return pos->second->get_address(a_local_guid, a_remote_guid);
201 }
#define OPENDDS_ASSERT(C)
Definition: Definitions.h:72
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
EndpointManagerMapType endpoint_managers_
Definition: AgentImpl.h:113
ACE_Recursive_Thread_Mutex mutex
Definition: AgentImpl.h:100

◆ get_local_agent_info()

AgentInfo OpenDDS::ICE::AgentImpl::get_local_agent_info ( DCPS::WeakRcHandle< Endpoint a_endpoint) const
virtual

Implements OpenDDS::ICE::Agent.

Definition at line 141 of file AgentImpl.cpp.

References ACE_GUARD_RETURN, endpoint_managers_, mutex, and OPENDDS_ASSERT.

142 {
143  ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, mutex, AgentInfo());
144  EndpointManagerMapType::const_iterator pos = endpoint_managers_.find(a_endpoint);
145  OPENDDS_ASSERT(pos != endpoint_managers_.end());
146  return pos->second->agent_info();
147 }
#define OPENDDS_ASSERT(C)
Definition: Definitions.h:72
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
EndpointManagerMapType endpoint_managers_
Definition: AgentImpl.h:113
ACE_Recursive_Thread_Mutex mutex
Definition: AgentImpl.h:100

◆ handle_timeout()

int OpenDDS::ICE::AgentImpl::handle_timeout ( const ACE_Time_Value a_now,
const void *   
)
privatevirtual

Reimplemented from ACE_Event_Handler.

Definition at line 57 of file AgentImpl.cpp.

References ACE_GUARD_RETURN, OpenDDS::ICE::Configuration::instance(), OpenDDS::DCPS::WeakRcHandle< T >::lock(), OpenDDS::DCPS::ReactorInterceptor::Command::reactor(), release(), OpenDDS::ICE::AgentImpl::Item::task_, and TheServiceParticipant.

58 {
59  DCPS::ThreadStatusManager::Event ev(TheServiceParticipant->get_thread_status_manager());
60 
61  const MonotonicTimePoint now(a_now);
64 
65  if (tasks_.empty()) {
66  return 0;
67  }
68 
69  if (tasks_.top().release_time_ <= now) {
70  Item item = tasks_.top();
71  tasks_.pop();
72 
73  TaskPtr task = item.task_.lock();
74  if (task) {
75  task->execute(now);
76  last_execute_ = now;
77  }
78  }
81 
82  if (!tasks_.empty()) {
83  const MonotonicTimePoint release = std::max(last_execute_ + ICE::Configuration::instance()->T_a(), tasks_.top().release_time_);
84  execute_or_enqueue(DCPS::make_rch<ScheduleTimerCommand>(reactor(), this, release - now));
85  }
86 
87  return 0;
88 }
void release(T x)
DCPS::RcHandle< Task > TaskPtr
Definition: Task.h:38
CommandPtr execute_or_enqueue(CommandPtr command)
virtual ACE_Reactor * reactor() const
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
TimePoint_T< MonotonicClock > MonotonicTimePoint
Definition: TimeTypes.h:51
std::priority_queue< Item > tasks_
Definition: AgentImpl.h:127
static Configuration * instance()
Definition: Ice.cpp:109
ACE_Recursive_Thread_Mutex mutex
Definition: AgentImpl.h:100
DCPS::MonotonicTimePoint last_execute_
Definition: AgentImpl.h:128
#define TheServiceParticipant
void check_invariants() const
Definition: AgentImpl.cpp:239

◆ notify_shutdown()

void OpenDDS::ICE::AgentImpl::notify_shutdown ( void  )
virtual

Implements OpenDDS::DCPS::ShutdownListener.

Definition at line 257 of file AgentImpl.cpp.

References shutdown().

258 {
259  shutdown();
260 }

◆ on_data_available()

void OpenDDS::ICE::AgentImpl::on_data_available ( DCPS::RcHandle< DCPS::InternalDataReader< DCPS::NetworkInterfaceAddress > >  reader)
private

Definition at line 262 of file AgentImpl.cpp.

References ACE_GUARD, DDS::ANY_INSTANCE_STATE, DDS::ANY_SAMPLE_STATE, DDS::ANY_VIEW_STATE, endpoint_managers_, DDS::LENGTH_UNLIMITED, mutex, and reader_.

263 {
264  DCPS::InternalDataReader<DCPS::NetworkInterfaceAddress>::SampleSequence samples;
265  DCPS::InternalSampleInfoSequence infos;
267 
268  // FUTURE: This polls the endpoints. The endpoints should just publish the change.
270  for (EndpointManagerMapType::const_iterator pos = endpoint_managers_.begin(),
271  limit = endpoint_managers_.end(); pos != limit; ++pos) {
272  pos->second->network_change();
273  }
274 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
const SampleStateMask ANY_SAMPLE_STATE
const InstanceStateMask ANY_INSTANCE_STATE
const ViewStateMask ANY_VIEW_STATE
EndpointManagerMapType endpoint_managers_
Definition: AgentImpl.h:113
ACE_Recursive_Thread_Mutex mutex
Definition: AgentImpl.h:100
const long LENGTH_UNLIMITED
DCPS::RcHandle< DCPS::InternalDataReader< DCPS::NetworkInterfaceAddress > > reader_
Definition: AgentImpl.h:109

◆ process_deferred()

void OpenDDS::ICE::AgentImpl::process_deferred ( )
private

Definition at line 276 of file AgentImpl.cpp.

References endpoint_managers_, OPENDDS_END_VERSIONED_NAMESPACE_DECL, to_unfreeze_, and unfreeze_.

Referenced by receive().

277 {
278  // A successful connectivity check unfreezed a foundation.
279  // Communicate this to all endpoints and checklists.
280  for (FoundationList::const_iterator fpos = to_unfreeze_.begin(), flimit = to_unfreeze_.end(); fpos != flimit; ++fpos) {
281  for (EndpointManagerMapType::const_iterator pos = endpoint_managers_.begin(),
282  limit = endpoint_managers_.end(); pos != limit; ++pos) {
283  pos->second->unfreeze(*fpos);
284  }
285  }
286  to_unfreeze_.clear();
287 
288  // A foundation was completely removed.
289  // Communicate this to all endpoints and checklists.
290  if (unfreeze_) {
291  for (EndpointManagerMapType::const_iterator pos = endpoint_managers_.begin(),
292  limit = endpoint_managers_.end(); pos != limit; ++pos) {
293  pos->second->unfreeze();
294  }
295  unfreeze_ = false;
296  }
297 }
FoundationList to_unfreeze_
Definition: AgentImpl.h:107
EndpointManagerMapType endpoint_managers_
Definition: AgentImpl.h:113

◆ reactor_is_shut_down()

bool OpenDDS::ICE::AgentImpl::reactor_is_shut_down ( ) const
privatevirtual

Implements OpenDDS::DCPS::ReactorInterceptor.

Definition at line 52 of file AgentImpl.cpp.

References TheServiceParticipant.

53 {
54  return TheServiceParticipant->is_shut_down();
55 }
#define TheServiceParticipant

◆ receive()

void OpenDDS::ICE::AgentImpl::receive ( DCPS::WeakRcHandle< Endpoint a_endpoint,
const ACE_INET_Addr a_local_address,
const ACE_INET_Addr a_remote_address,
const STUN::Message a_message 
)
virtual

Implements OpenDDS::ICE::Agent.

Definition at line 204 of file AgentImpl.cpp.

References ACE_ERROR, ACE_GUARD, ACE_TEXT(), check_invariants(), endpoint_managers_, ACE_INET_Addr::is_any(), LM_ERROR, mutex, OPENDDS_ASSERT, and process_deferred().

208 {
209  if (a_local_address.is_any()) {
210  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) AgentImpl::receive: ERROR local_address is empty, ICE will not work on this platform\n")));
211  return;
212  }
213 
214  if (a_remote_address.is_any()) {
215  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) AgentImpl::receive: ERROR remote_address is empty, ICE will not work on this platform\n")));
216  return;
217  }
218 
221  EndpointManagerMapType::const_iterator pos = endpoint_managers_.find(a_endpoint);
222  OPENDDS_ASSERT(pos != endpoint_managers_.end());
223  pos->second->receive(a_local_address, a_remote_address, a_message);
226 }
#define ACE_ERROR(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
#define OPENDDS_ASSERT(C)
Definition: Definitions.h:72
bool is_any(void) const
ACE_TEXT("TCP_Factory")
EndpointManagerMapType endpoint_managers_
Definition: AgentImpl.h:113
ACE_Recursive_Thread_Mutex mutex
Definition: AgentImpl.h:100
void check_invariants() const
Definition: AgentImpl.cpp:239

◆ remote_peer_reflexive_counter()

size_t OpenDDS::ICE::AgentImpl::remote_peer_reflexive_counter ( )
inline

Definition at line 81 of file AgentImpl.h.

References remote_peer_reflexive_counter_.

Referenced by OpenDDS::ICE::Checklist::generate_triggered_check().

82  {
84  }
size_t remote_peer_reflexive_counter_
Definition: AgentImpl.h:111

◆ remove()

void OpenDDS::ICE::AgentImpl::remove ( const FoundationType a_foundation)

Definition at line 228 of file AgentImpl.cpp.

References active_foundations_, OpenDDS::ICE::ActiveFoundationSet::remove(), and unfreeze_.

Referenced by OpenDDS::ICE::Checklist::fix_foundations(), OpenDDS::ICE::Checklist::remove_from_in_progress(), and OpenDDS::ICE::Checklist::succeeded().

229 {
230  // Foundations that are completely removed from the set of active foundations may unfreeze a checklist.
231  unfreeze_ = active_foundations_.remove(a_foundation) || unfreeze_;
232 }
ActiveFoundationSet active_foundations_
Definition: AgentImpl.h:106
bool remove(const FoundationType &a_foundation)
Definition: Checklist.h:44

◆ remove_endpoint()

void OpenDDS::ICE::AgentImpl::remove_endpoint ( DCPS::WeakRcHandle< Endpoint a_endpoint)
virtual

Implements OpenDDS::ICE::Agent.

Definition at line 120 of file AgentImpl.cpp.

References ACE_GUARD, check_invariants(), endpoint_managers_, mutex, reader_, reader_added_, and TheServiceParticipant.

121 {
124 
125  EndpointManagerMapType::iterator pos = endpoint_managers_.find(a_endpoint);
126 
127  if (pos != endpoint_managers_.end()) {
128  EndpointManagerPtr em = pos->second;
129  em->purge();
130  endpoint_managers_.erase(pos);
131  }
132 
134 
135  if (endpoint_managers_.empty() && reader_added_) {
136  TheServiceParticipant->network_interface_address_topic()->disconnect(reader_);
137  reader_added_ = false;
138  }
139 }
DCPS::RcHandle< EndpointManager > EndpointManagerPtr
#define ACE_GUARD(MUTEX, OBJ, LOCK)
EndpointManagerMapType endpoint_managers_
Definition: AgentImpl.h:113
ACE_Recursive_Thread_Mutex mutex
Definition: AgentImpl.h:100
#define TheServiceParticipant
void check_invariants() const
Definition: AgentImpl.cpp:239
DCPS::RcHandle< DCPS::InternalDataReader< DCPS::NetworkInterfaceAddress > > reader_
Definition: AgentImpl.h:109

◆ remove_local_agent_info_listener()

void OpenDDS::ICE::AgentImpl::remove_local_agent_info_listener ( DCPS::WeakRcHandle< Endpoint a_endpoint,
const DCPS::GUID_t a_local_guid 
)
virtual

Implements OpenDDS::ICE::Agent.

Definition at line 159 of file AgentImpl.cpp.

References ACE_GUARD, endpoint_managers_, mutex, and OPENDDS_ASSERT.

161 {
163  EndpointManagerMapType::const_iterator pos = endpoint_managers_.find(a_endpoint);
164  OPENDDS_ASSERT(pos != endpoint_managers_.end());
165  pos->second->remove_agent_info_listener(a_local_guid);
166 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
#define OPENDDS_ASSERT(C)
Definition: Definitions.h:72
EndpointManagerMapType endpoint_managers_
Definition: AgentImpl.h:113
ACE_Recursive_Thread_Mutex mutex
Definition: AgentImpl.h:100

◆ shutdown()

void OpenDDS::ICE::AgentImpl::shutdown ( void  )
virtual

Implements OpenDDS::ICE::Agent.

Definition at line 252 of file AgentImpl.cpp.

References ACE_Reactor::cancel_timer(), and OpenDDS::DCPS::ReactorInterceptor::reactor().

Referenced by notify_shutdown().

253 {
254  reactor()->cancel_timer(this, 0);
255 }
virtual ACE_Reactor * reactor() const
virtual int cancel_timer(long timer_id, const void **arg=0, int dont_call_handle_close=1)

◆ start_ice()

void OpenDDS::ICE::AgentImpl::start_ice ( DCPS::WeakRcHandle< Endpoint a_endpoint,
const DCPS::GUID_t a_local_guid,
const DCPS::GUID_t a_remote_guid,
const AgentInfo a_remote_agent_info 
)
virtual

Implements OpenDDS::ICE::Agent.

Definition at line 168 of file AgentImpl.cpp.

References ACE_GUARD, check_invariants(), endpoint_managers_, mutex, and OPENDDS_ASSERT.

172 {
175  EndpointManagerMapType::const_iterator pos = endpoint_managers_.find(a_endpoint);
176  OPENDDS_ASSERT(pos != endpoint_managers_.end());
177  pos->second->start_ice(a_local_guid, a_remote_guid, a_remote_agent_info);
179 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
#define OPENDDS_ASSERT(C)
Definition: Definitions.h:72
EndpointManagerMapType endpoint_managers_
Definition: AgentImpl.h:113
ACE_Recursive_Thread_Mutex mutex
Definition: AgentImpl.h:100
void check_invariants() const
Definition: AgentImpl.cpp:239

◆ stop_ice()

void OpenDDS::ICE::AgentImpl::stop_ice ( DCPS::WeakRcHandle< Endpoint a_endpoint,
const DCPS::GUID_t a_local_guid,
const DCPS::GUID_t a_remote_guid 
)
virtual

Implements OpenDDS::ICE::Agent.

Definition at line 181 of file AgentImpl.cpp.

References ACE_GUARD, check_invariants(), endpoint_managers_, mutex, and OPENDDS_ASSERT.

184 {
187  EndpointManagerMapType::const_iterator pos = endpoint_managers_.find(a_endpoint);
188  OPENDDS_ASSERT(pos != endpoint_managers_.end());
189  pos->second->stop_ice(a_local_guid, a_remote_guid);
191 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
#define OPENDDS_ASSERT(C)
Definition: Definitions.h:72
EndpointManagerMapType endpoint_managers_
Definition: AgentImpl.h:113
ACE_Recursive_Thread_Mutex mutex
Definition: AgentImpl.h:100
void check_invariants() const
Definition: AgentImpl.cpp:239

◆ unfreeze()

void OpenDDS::ICE::AgentImpl::unfreeze ( const FoundationType a_foundation)

Definition at line 234 of file AgentImpl.cpp.

References to_unfreeze_.

Referenced by add(), and OpenDDS::ICE::Checklist::succeeded().

235 {
236  to_unfreeze_.push_back(a_foundation);
237 }
FoundationList to_unfreeze_
Definition: AgentImpl.h:107

Member Data Documentation

◆ active_foundations_

ActiveFoundationSet OpenDDS::ICE::AgentImpl::active_foundations_
private

Definition at line 106 of file AgentImpl.h.

Referenced by add(), check_invariants(), contains(), and remove().

◆ endpoint_managers_

EndpointManagerMapType OpenDDS::ICE::AgentImpl::endpoint_managers_
private

◆ last_execute_

DCPS::MonotonicTimePoint OpenDDS::ICE::AgentImpl::last_execute_
private

Definition at line 128 of file AgentImpl.h.

◆ mutex

ACE_Recursive_Thread_Mutex OpenDDS::ICE::AgentImpl::mutex
mutable

◆ reader_

DCPS::RcHandle<DCPS::InternalDataReader<DCPS::NetworkInterfaceAddress> > OpenDDS::ICE::AgentImpl::reader_
private

Definition at line 109 of file AgentImpl.h.

Referenced by add_endpoint(), on_data_available(), and remove_endpoint().

◆ reader_added_

bool OpenDDS::ICE::AgentImpl::reader_added_
private

Definition at line 110 of file AgentImpl.h.

Referenced by add_endpoint(), and remove_endpoint().

◆ remote_peer_reflexive_counter_

size_t OpenDDS::ICE::AgentImpl::remote_peer_reflexive_counter_
private

Definition at line 111 of file AgentImpl.h.

Referenced by remote_peer_reflexive_counter().

◆ tasks_

std::priority_queue<Item> OpenDDS::ICE::AgentImpl::tasks_
private

Definition at line 127 of file AgentImpl.h.

◆ to_unfreeze_

FoundationList OpenDDS::ICE::AgentImpl::to_unfreeze_
private

Definition at line 107 of file AgentImpl.h.

Referenced by process_deferred(), and unfreeze().

◆ unfreeze_

bool OpenDDS::ICE::AgentImpl::unfreeze_
private

Definition at line 108 of file AgentImpl.h.

Referenced by process_deferred(), and remove().


The documentation for this class was generated from the following files: