OpenDDS  Snapshot(2023/04/07-19:43)
Classes | Public Member Functions | Public Attributes | Private Types | Private Member Functions | Private Attributes | List of all members
OpenDDS::ICE::AgentImpl Class Reference

#include <AgentImpl.h>

Inheritance diagram for OpenDDS::ICE::AgentImpl:
Inheritance graph
[legend]
Collaboration diagram for OpenDDS::ICE::AgentImpl:
Collaboration graph
[legend]

Classes

struct  Item
 

Public Member Functions

 AgentImpl ()
 
void shutdown ()
 
void notify_shutdown ()
 
void add_endpoint (DCPS::WeakRcHandle< Endpoint > a_endpoint)
 
void remove_endpoint (DCPS::WeakRcHandle< Endpoint > a_endpoint)
 
AgentInfo get_local_agent_info (DCPS::WeakRcHandle< Endpoint > a_endpoint) const
 
void add_local_agent_info_listener (DCPS::WeakRcHandle< Endpoint > a_endpoint, const DCPS::GUID_t &a_local_guid, DCPS::WeakRcHandle< AgentInfoListener > a_agent_info_listener)
 
void remove_local_agent_info_listener (DCPS::WeakRcHandle< Endpoint > a_endpoint, const DCPS::GUID_t &a_local_guid)
 
void start_ice (DCPS::WeakRcHandle< Endpoint > a_endpoint, const DCPS::GUID_t &a_local_guid, const DCPS::GUID_t &a_remote_guid, const AgentInfo &a_remote_agent_info)
 
void stop_ice (DCPS::WeakRcHandle< Endpoint > a_endpoint, const DCPS::GUID_t &a_local_guid, const DCPS::GUID_t &a_remote_guid)
 
ACE_INET_Addr get_address (DCPS::WeakRcHandle< Endpoint > a_endpoint, const DCPS::GUID_t &a_local_guid, const DCPS::GUID_t &a_remote_guid) const
 
void receive (DCPS::WeakRcHandle< Endpoint > a_endpoint, const ACE_INET_Addr &a_local_address, const ACE_INET_Addr &a_remote_address, const STUN::Message &a_message)
 
void enqueue (const DCPS::MonotonicTimePoint &a_release_time, WeakTaskPtr a_task)
 
size_t remote_peer_reflexive_counter ()
 
bool contains (const FoundationType &a_foundation) const
 
void add (const FoundationType &a_foundation)
 
void remove (const FoundationType &a_foundation)
 
void unfreeze (const FoundationType &a_foundation)
 
- Public Member Functions inherited from OpenDDS::ICE::Agent
virtual ~Agent ()
 
- Public Member Functions inherited from OpenDDS::DCPS::RcObject
virtual ~RcObject ()
 
virtual void _add_ref ()
 
virtual void _remove_ref ()
 
long ref_count () const
 
WeakObject_get_weak_object () const
 
- Public Member Functions inherited from OpenDDS::DCPS::ShutdownListener
virtual ~ShutdownListener ()
 
- Public Member Functions inherited from OpenDDS::DCPS::InternalDataReaderListener< DCPS::NetworkInterfaceAddress >
 InternalDataReaderListener ()
 
 InternalDataReaderListener (JobQueue_rch job_queue)
 
void job_queue (JobQueue_rch job_queue)
 
virtual void on_data_available (InternalDataReader_rch reader)=0
 
void schedule (InternalDataReader_rch reader)
 
- Public Member Functions inherited from OpenDDS::DCPS::ReactorInterceptor
CommandPtr execute_or_enqueue (CommandPtr command)
 
virtual void reactor (ACE_Reactor *reactor)
 
virtual ACE_Reactorreactor () const
 
- Public Member Functions inherited from OpenDDS::DCPS::RcEventHandler
 RcEventHandler ()
 
ACE_Event_Handler::Reference_Count add_reference ()
 
ACE_Event_Handler::Reference_Count remove_reference ()
 
- Public Member Functions inherited from ACE_Event_Handler
virtual ~ACE_Event_Handler (void)
 
virtual ACE_HANDLE get_handle (void) const
 
virtual void set_handle (ACE_HANDLE)
 
virtual int priority (void) const
 
virtual void priority (int priority)
 
virtual int handle_input (ACE_HANDLE fd=ACE_INVALID_HANDLE)
 
virtual int handle_output (ACE_HANDLE fd=ACE_INVALID_HANDLE)
 
virtual int handle_exit (ACE_Process *)
 
virtual int handle_close (ACE_HANDLE handle, ACE_Reactor_Mask close_mask)
 
virtual int handle_signal (int signum, siginfo_t *=0, ucontext_t *=0)
 
virtual int resume_handler (void)
 
virtual int handle_qos (ACE_HANDLE=ACE_INVALID_HANDLE)
 
virtual int handle_group_qos (ACE_HANDLE=ACE_INVALID_HANDLE)
 
virtual ACE_Reactor_Timer_Interfacereactor_timer_interface (void) const
 
Reference_Counting_Policyreference_counting_policy (void)
 

Public Attributes

ACE_Recursive_Thread_Mutex mutex
 
- Public Attributes inherited from ACE_Event_Handler
 LO_PRIORITY
 
 HI_PRIORITY
 
 NULL_MASK
 
 READ_MASK
 
 WRITE_MASK
 
 EXCEPT_MASK
 
 ACCEPT_MASK
 
 CONNECT_MASK
 
 TIMER_MASK
 
 QOS_MASK
 
 GROUP_QOS_MASK
 
 SIGNAL_MASK
 
 ALL_EVENTS_MASK
 
 RWE_MASK
 
 DONT_CALL
 
 ACE_EVENT_HANDLER_NOT_RESUMED
 
 ACE_REACTOR_RESUMES_HANDLER
 
 ACE_APPLICATION_RESUMES_HANDLER
 

Private Types

typedef std::map< DCPS::WeakRcHandle< Endpoint >, EndpointManagerPtrEndpointManagerMapType
 

Private Member Functions

void on_data_available (DCPS::RcHandle< DCPS::InternalDataReader< DCPS::NetworkInterfaceAddress > > reader)
 
void process_deferred ()
 
bool reactor_is_shut_down () const
 
int handle_timeout (const ACE_Time_Value &a_now, const void *)
 
void check_invariants () const
 

Private Attributes

ActiveFoundationSet active_foundations_
 
FoundationList to_unfreeze_
 
bool unfreeze_
 
DCPS::RcHandle< DCPS::InternalDataReader< DCPS::NetworkInterfaceAddress > > reader_
 
bool reader_added_
 
size_t remote_peer_reflexive_counter_
 
EndpointManagerMapType endpoint_managers_
 
std::priority_queue< Itemtasks_
 
DCPS::MonotonicTimePoint last_execute_
 

Additional Inherited Members

- Public Types inherited from OpenDDS::DCPS::InternalDataReaderListener< DCPS::NetworkInterfaceAddress >
typedef RcHandle< InternalDataReader< DCPS::NetworkInterfaceAddress > > InternalDataReader_rch
 
- Public Types inherited from OpenDDS::DCPS::ReactorInterceptor
typedef RcHandle< CommandCommandPtr
 
- Public Types inherited from ACE_Event_Handler
typedef long Reference_Count
 
- Static Public Member Functions inherited from OpenDDS::ICE::Agent
static DCPS::RcHandle< Agentinstance ()
 
- Static Public Member Functions inherited from ACE_Event_Handler
static ACE_THR_FUNC_RETURN read_adapter (void *event_handler)
 
static int register_stdin_handler (ACE_Event_Handler *eh, ACE_Reactor *reactor, ACE_Thread_Manager *thr_mgr, int flags=THR_DETACHED)
 
static int remove_stdin_handler (ACE_Reactor *reactor, ACE_Thread_Manager *thr_mgr)
 
- Protected Types inherited from OpenDDS::DCPS::ReactorInterceptor
enum  ReactorState { RS_NONE, RS_NOTIFIED, RS_PROCESSING }
 
- Protected Types inherited from ACE_Event_Handler
typedef ACE_Atomic_Op< ACE_SYNCH_MUTEX, Reference_CountAtomic_Reference_Count
 
- Protected Member Functions inherited from OpenDDS::DCPS::RcObject
 RcObject ()
 
- Protected Member Functions inherited from OpenDDS::DCPS::ReactorInterceptor
 ReactorInterceptor (ACE_Reactor *reactor, ACE_thread_t owner)
 
virtual ~ReactorInterceptor ()
 
int handle_exception (ACE_HANDLE)
 
void process_command_queue_i (ACE_Guard< ACE_Thread_Mutex > &guard)
 
typedef OPENDDS_VECTOR (CommandPtr) Queue
 
- Protected Member Functions inherited from ACE_Event_Handler
 ACE_Event_Handler (ACE_Reactor *=0, int priority=ACE_Event_Handler::LO_PRIORITY)
 
- Protected Attributes inherited from OpenDDS::DCPS::ReactorInterceptor
ACE_thread_t owner_
 
ACE_Thread_Mutex mutex_
 
Queue command_queue_
 
ReactorState state_
 
- Protected Attributes inherited from ACE_Event_Handler
Atomic_Reference_Count reference_count_
 

Detailed Description

Definition at line 35 of file AgentImpl.h.

Member Typedef Documentation

◆ EndpointManagerMapType

Definition at line 112 of file AgentImpl.h.

Constructor & Destructor Documentation

◆ AgentImpl()

OpenDDS::ICE::AgentImpl::AgentImpl ( )

Definition at line 89 of file AgentImpl.cpp.

References OpenDDS::DCPS::rchandle_from(), and TheServiceParticipant.

90  : DCPS::InternalDataReaderListener<DCPS::NetworkInterfaceAddress>(TheServiceParticipant->job_queue())
91  , ReactorInterceptor(TheServiceParticipant->reactor(), TheServiceParticipant->reactor_owner())
92  , unfreeze_(false)
93  , reader_(DCPS::make_rch<DCPS::InternalDataReader<DCPS::NetworkInterfaceAddress> >(true, DCPS::rchandle_from(this)))
94  , reader_added_(false)
96 {
97  // Bind the lifetime of this to the service participant.
98  TheServiceParticipant->set_shutdown_listener(DCPS::static_rchandle_cast<ShutdownListener>(rchandle_from(this)));
99 }
ReactorInterceptor(ACE_Reactor *reactor, ACE_thread_t owner)
size_t remote_peer_reflexive_counter_
Definition: AgentImpl.h:111
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310
DCPS::RcHandle< DCPS::InternalDataReader< DCPS::NetworkInterfaceAddress > > reader_
Definition: AgentImpl.h:109
#define TheServiceParticipant
RcHandle< T > make_rch()
Definition: RcHandle_T.h:256

Member Function Documentation

◆ add()

void OpenDDS::ICE::AgentImpl::add ( const FoundationType a_foundation)
inline

Definition at line 91 of file AgentImpl.h.

References active_foundations_, OpenDDS::ICE::ActiveFoundationSet::add(), and unfreeze().

Referenced by OpenDDS::ICE::Checklist::add_triggered_check(), and OpenDDS::ICE::Checklist::unfreeze().

92  {
93  active_foundations_.add(a_foundation);
94  }
ActiveFoundationSet active_foundations_
Definition: AgentImpl.h:106
void add(const FoundationType &a_foundation)
Definition: Checklist.h:38

◆ add_endpoint()

void OpenDDS::ICE::AgentImpl::add_endpoint ( DCPS::WeakRcHandle< Endpoint a_endpoint)
virtual

Implements OpenDDS::ICE::Agent.

Definition at line 101 of file AgentImpl.cpp.

References ACE_GUARD, check_invariants(), endpoint_managers_, mutex, reader_, reader_added_, and TheServiceParticipant.

102 {
105 
106  if (endpoint_managers_.find(a_endpoint) == endpoint_managers_.end()) {
107  EndpointManagerPtr em = DCPS::make_rch<EndpointManager>(this, a_endpoint);
108  endpoint_managers_[a_endpoint] = em;
109  }
110 
112 
113  if (!endpoint_managers_.empty() && !reader_added_) {
114  TheServiceParticipant->network_interface_address_topic()->connect(reader_);
115  reader_added_ = true;
116  }
117 }
ACE_Recursive_Thread_Mutex mutex
Definition: AgentImpl.h:100
#define ACE_GUARD(MUTEX, OBJ, LOCK)
void check_invariants() const
Definition: AgentImpl.cpp:238
DCPS::RcHandle< EndpointManager > EndpointManagerPtr
EndpointManagerMapType endpoint_managers_
Definition: AgentImpl.h:113
DCPS::RcHandle< DCPS::InternalDataReader< DCPS::NetworkInterfaceAddress > > reader_
Definition: AgentImpl.h:109
#define TheServiceParticipant

◆ add_local_agent_info_listener()

void OpenDDS::ICE::AgentImpl::add_local_agent_info_listener ( DCPS::WeakRcHandle< Endpoint a_endpoint,
const DCPS::GUID_t a_local_guid,
DCPS::WeakRcHandle< AgentInfoListener a_agent_info_listener 
)
virtual

Implements OpenDDS::ICE::Agent.

Definition at line 148 of file AgentImpl.cpp.

References ACE_GUARD, endpoint_managers_, mutex, and OPENDDS_ASSERT.

151 {
153  EndpointManagerMapType::const_iterator pos = endpoint_managers_.find(a_endpoint);
154  OPENDDS_ASSERT(pos != endpoint_managers_.end());
155  pos->second->add_agent_info_listener(a_local_guid, a_agent_info_listener);
156 }
ACE_Recursive_Thread_Mutex mutex
Definition: AgentImpl.h:100
#define ACE_GUARD(MUTEX, OBJ, LOCK)
#define OPENDDS_ASSERT(C)
Definition: Definitions.h:66
EndpointManagerMapType endpoint_managers_
Definition: AgentImpl.h:113

◆ check_invariants()

void OpenDDS::ICE::AgentImpl::check_invariants ( ) const
private

Definition at line 238 of file AgentImpl.cpp.

References active_foundations_, endpoint_managers_, and OPENDDS_ASSERT.

Referenced by add_endpoint(), receive(), remove_endpoint(), start_ice(), and stop_ice().

239 {
240  ActiveFoundationSet expected;
241 
242  for (EndpointManagerMapType::const_iterator pos = endpoint_managers_.begin(),
243  limit = endpoint_managers_.end(); pos != limit; ++pos) {
244  pos->second->compute_active_foundations(expected);
245  pos->second->check_invariants();
246  }
247 
248  OPENDDS_ASSERT(expected == active_foundations_);
249 }
#define OPENDDS_ASSERT(C)
Definition: Definitions.h:66
ActiveFoundationSet active_foundations_
Definition: AgentImpl.h:106
EndpointManagerMapType endpoint_managers_
Definition: AgentImpl.h:113

◆ contains()

bool OpenDDS::ICE::AgentImpl::contains ( const FoundationType a_foundation) const
inline

Definition at line 86 of file AgentImpl.h.

References active_foundations_, and OpenDDS::ICE::ActiveFoundationSet::contains().

Referenced by OpenDDS::ICE::Checklist::unfreeze().

87  {
88  return active_foundations_.contains(a_foundation);
89  }
ActiveFoundationSet active_foundations_
Definition: AgentImpl.h:106
bool contains(const FoundationType &a_foundation) const
Definition: Checklist.h:58

◆ enqueue()

void OpenDDS::ICE::AgentImpl::enqueue ( const DCPS::MonotonicTimePoint a_release_time,
WeakTaskPtr  a_task 
)

Definition at line 41 of file AgentImpl.cpp.

References OpenDDS::ICE::Configuration::instance(), OpenDDS::DCPS::ReactorInterceptor::Command::reactor(), and release().

Referenced by OpenDDS::ICE::Task::enqueue().

43 {
44  if (tasks_.empty() || a_release_time < tasks_.top().release_time_) {
45  const MonotonicTimePoint release = std::max(last_execute_ + ICE::Configuration::instance()->T_a(), a_release_time);
46  execute_or_enqueue(DCPS::make_rch<ScheduleTimerCommand>(reactor(), this, release - MonotonicTimePoint::now()));
47  }
48  tasks_.push(Item(a_release_time, wtask));
49 }
void release(T x)
static Configuration * instance()
Definition: Ice.cpp:109
std::priority_queue< Item > tasks_
Definition: AgentImpl.h:127
virtual ACE_Reactor * reactor() const
TimePoint_T< MonotonicClock > MonotonicTimePoint
Definition: TimeTypes.h:51
CommandPtr execute_or_enqueue(CommandPtr command)
DCPS::MonotonicTimePoint last_execute_
Definition: AgentImpl.h:128

◆ get_address()

ACE_INET_Addr OpenDDS::ICE::AgentImpl::get_address ( DCPS::WeakRcHandle< Endpoint a_endpoint,
const DCPS::GUID_t a_local_guid,
const DCPS::GUID_t a_remote_guid 
) const
virtual

Implements OpenDDS::ICE::Agent.

Definition at line 192 of file AgentImpl.cpp.

References ACE_GUARD_RETURN, endpoint_managers_, mutex, and OPENDDS_ASSERT.

195 {
197  EndpointManagerMapType::const_iterator pos = endpoint_managers_.find(a_endpoint);
198  OPENDDS_ASSERT(pos != endpoint_managers_.end());
199  return pos->second->get_address(a_local_guid, a_remote_guid);
200 }
ACE_Recursive_Thread_Mutex mutex
Definition: AgentImpl.h:100
#define OPENDDS_ASSERT(C)
Definition: Definitions.h:66
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
EndpointManagerMapType endpoint_managers_
Definition: AgentImpl.h:113

◆ get_local_agent_info()

AgentInfo OpenDDS::ICE::AgentImpl::get_local_agent_info ( DCPS::WeakRcHandle< Endpoint a_endpoint) const
virtual

Implements OpenDDS::ICE::Agent.

Definition at line 140 of file AgentImpl.cpp.

References ACE_GUARD_RETURN, endpoint_managers_, mutex, and OPENDDS_ASSERT.

141 {
142  ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, mutex, AgentInfo());
143  EndpointManagerMapType::const_iterator pos = endpoint_managers_.find(a_endpoint);
144  OPENDDS_ASSERT(pos != endpoint_managers_.end());
145  return pos->second->agent_info();
146 }
ACE_Recursive_Thread_Mutex mutex
Definition: AgentImpl.h:100
#define OPENDDS_ASSERT(C)
Definition: Definitions.h:66
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
EndpointManagerMapType endpoint_managers_
Definition: AgentImpl.h:113

◆ handle_timeout()

int OpenDDS::ICE::AgentImpl::handle_timeout ( const ACE_Time_Value a_now,
const void *   
)
privatevirtual

Reimplemented from ACE_Event_Handler.

Definition at line 56 of file AgentImpl.cpp.

References ACE_GUARD_RETURN, OpenDDS::ICE::Configuration::instance(), OpenDDS::DCPS::WeakRcHandle< T >::lock(), OpenDDS::DCPS::ReactorInterceptor::Command::reactor(), release(), OpenDDS::ICE::AgentImpl::Item::task_, and TheServiceParticipant.

57 {
58  DCPS::ThreadStatusManager::Event ev(TheServiceParticipant->get_thread_status_manager());
59 
60  const MonotonicTimePoint now(a_now);
63 
64  if (tasks_.empty()) {
65  return 0;
66  }
67 
68  if (tasks_.top().release_time_ <= now) {
69  Item item = tasks_.top();
70  tasks_.pop();
71 
72  TaskPtr task = item.task_.lock();
73  if (task) {
74  task->execute(now);
75  last_execute_ = now;
76  }
77  }
80 
81  if (!tasks_.empty()) {
82  const MonotonicTimePoint release = std::max(last_execute_ + ICE::Configuration::instance()->T_a(), tasks_.top().release_time_);
83  execute_or_enqueue(DCPS::make_rch<ScheduleTimerCommand>(reactor(), this, release - now));
84  }
85 
86  return 0;
87 }
ACE_Recursive_Thread_Mutex mutex
Definition: AgentImpl.h:100
void release(T x)
DCPS::RcHandle< Task > TaskPtr
Definition: Task.h:38
static Configuration * instance()
Definition: Ice.cpp:109
std::priority_queue< Item > tasks_
Definition: AgentImpl.h:127
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
virtual ACE_Reactor * reactor() const
void check_invariants() const
Definition: AgentImpl.cpp:238
TimePoint_T< MonotonicClock > MonotonicTimePoint
Definition: TimeTypes.h:51
CommandPtr execute_or_enqueue(CommandPtr command)
DCPS::MonotonicTimePoint last_execute_
Definition: AgentImpl.h:128
#define TheServiceParticipant

◆ notify_shutdown()

void OpenDDS::ICE::AgentImpl::notify_shutdown ( void  )
virtual

Implements OpenDDS::DCPS::ShutdownListener.

Definition at line 256 of file AgentImpl.cpp.

References shutdown().

257 {
258  shutdown();
259 }

◆ on_data_available()

void OpenDDS::ICE::AgentImpl::on_data_available ( DCPS::RcHandle< DCPS::InternalDataReader< DCPS::NetworkInterfaceAddress > >  reader)
private

Definition at line 261 of file AgentImpl.cpp.

References ACE_GUARD, endpoint_managers_, mutex, and reader_.

262 {
263  DCPS::InternalDataReader<DCPS::NetworkInterfaceAddress>::SampleSequence samples;
264  DCPS::InternalSampleInfoSequence infos;
265  reader_->take(samples, infos);
266 
267  // FUTURE: This polls the endpoints. The endpoints should just publish the change.
269  for (EndpointManagerMapType::const_iterator pos = endpoint_managers_.begin(),
270  limit = endpoint_managers_.end(); pos != limit; ++pos) {
271  pos->second->network_change();
272  }
273 }
ACE_Recursive_Thread_Mutex mutex
Definition: AgentImpl.h:100
#define ACE_GUARD(MUTEX, OBJ, LOCK)
EndpointManagerMapType endpoint_managers_
Definition: AgentImpl.h:113
DCPS::RcHandle< DCPS::InternalDataReader< DCPS::NetworkInterfaceAddress > > reader_
Definition: AgentImpl.h:109

◆ process_deferred()

void OpenDDS::ICE::AgentImpl::process_deferred ( )
private

Definition at line 275 of file AgentImpl.cpp.

References endpoint_managers_, OPENDDS_END_VERSIONED_NAMESPACE_DECL, to_unfreeze_, and unfreeze_.

Referenced by receive().

276 {
277  // A successful connectivity check unfreezed a foundation.
278  // Communicate this to all endpoints and checklists.
279  for (FoundationList::const_iterator fpos = to_unfreeze_.begin(), flimit = to_unfreeze_.end(); fpos != flimit; ++fpos) {
280  for (EndpointManagerMapType::const_iterator pos = endpoint_managers_.begin(),
281  limit = endpoint_managers_.end(); pos != limit; ++pos) {
282  pos->second->unfreeze(*fpos);
283  }
284  }
285  to_unfreeze_.clear();
286 
287  // A foundation was completely removed.
288  // Communicate this to all endpoints and checklists.
289  if (unfreeze_) {
290  for (EndpointManagerMapType::const_iterator pos = endpoint_managers_.begin(),
291  limit = endpoint_managers_.end(); pos != limit; ++pos) {
292  pos->second->unfreeze();
293  }
294  unfreeze_ = false;
295  }
296 }
FoundationList to_unfreeze_
Definition: AgentImpl.h:107
EndpointManagerMapType endpoint_managers_
Definition: AgentImpl.h:113

◆ reactor_is_shut_down()

bool OpenDDS::ICE::AgentImpl::reactor_is_shut_down ( ) const
privatevirtual

Implements OpenDDS::DCPS::ReactorInterceptor.

Definition at line 51 of file AgentImpl.cpp.

References TheServiceParticipant.

52 {
53  return TheServiceParticipant->is_shut_down();
54 }
#define TheServiceParticipant

◆ receive()

void OpenDDS::ICE::AgentImpl::receive ( DCPS::WeakRcHandle< Endpoint a_endpoint,
const ACE_INET_Addr a_local_address,
const ACE_INET_Addr a_remote_address,
const STUN::Message a_message 
)
virtual

Implements OpenDDS::ICE::Agent.

Definition at line 203 of file AgentImpl.cpp.

References ACE_ERROR, ACE_GUARD, ACE_TEXT(), check_invariants(), endpoint_managers_, ACE_INET_Addr::is_any(), LM_ERROR, mutex, OPENDDS_ASSERT, and process_deferred().

207 {
208  if (a_local_address.is_any()) {
209  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) AgentImpl::receive: ERROR local_address is empty, ICE will not work on this platform\n")));
210  return;
211  }
212 
213  if (a_remote_address.is_any()) {
214  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) AgentImpl::receive: ERROR remote_address is empty, ICE will not work on this platform\n")));
215  return;
216  }
217 
220  EndpointManagerMapType::const_iterator pos = endpoint_managers_.find(a_endpoint);
221  OPENDDS_ASSERT(pos != endpoint_managers_.end());
222  pos->second->receive(a_local_address, a_remote_address, a_message);
225 }
ACE_Recursive_Thread_Mutex mutex
Definition: AgentImpl.h:100
#define ACE_ERROR(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
#define OPENDDS_ASSERT(C)
Definition: Definitions.h:66
bool is_any(void) const
void check_invariants() const
Definition: AgentImpl.cpp:238
ACE_TEXT("TCP_Factory")
EndpointManagerMapType endpoint_managers_
Definition: AgentImpl.h:113

◆ remote_peer_reflexive_counter()

size_t OpenDDS::ICE::AgentImpl::remote_peer_reflexive_counter ( )
inline

Definition at line 81 of file AgentImpl.h.

References remote_peer_reflexive_counter_.

Referenced by OpenDDS::ICE::Checklist::generate_triggered_check().

82  {
84  }
size_t remote_peer_reflexive_counter_
Definition: AgentImpl.h:111

◆ remove()

void OpenDDS::ICE::AgentImpl::remove ( const FoundationType a_foundation)

Definition at line 227 of file AgentImpl.cpp.

References active_foundations_, OpenDDS::ICE::ActiveFoundationSet::remove(), and unfreeze_.

Referenced by OpenDDS::ICE::Checklist::fix_foundations(), OpenDDS::ICE::Checklist::remove_from_in_progress(), and OpenDDS::ICE::Checklist::succeeded().

228 {
229  // Foundations that are completely removed from the set of active foundations may unfreeze a checklist.
230  unfreeze_ = active_foundations_.remove(a_foundation) || unfreeze_;
231 }
ActiveFoundationSet active_foundations_
Definition: AgentImpl.h:106
bool remove(const FoundationType &a_foundation)
Definition: Checklist.h:44

◆ remove_endpoint()

void OpenDDS::ICE::AgentImpl::remove_endpoint ( DCPS::WeakRcHandle< Endpoint a_endpoint)
virtual

Implements OpenDDS::ICE::Agent.

Definition at line 119 of file AgentImpl.cpp.

References ACE_GUARD, check_invariants(), endpoint_managers_, mutex, reader_, reader_added_, and TheServiceParticipant.

120 {
123 
124  EndpointManagerMapType::iterator pos = endpoint_managers_.find(a_endpoint);
125 
126  if (pos != endpoint_managers_.end()) {
127  EndpointManagerPtr em = pos->second;
128  em->purge();
129  endpoint_managers_.erase(pos);
130  }
131 
133 
134  if (endpoint_managers_.empty() && reader_added_) {
135  TheServiceParticipant->network_interface_address_topic()->disconnect(reader_);
136  reader_added_ = false;
137  }
138 }
ACE_Recursive_Thread_Mutex mutex
Definition: AgentImpl.h:100
#define ACE_GUARD(MUTEX, OBJ, LOCK)
void check_invariants() const
Definition: AgentImpl.cpp:238
DCPS::RcHandle< EndpointManager > EndpointManagerPtr
EndpointManagerMapType endpoint_managers_
Definition: AgentImpl.h:113
DCPS::RcHandle< DCPS::InternalDataReader< DCPS::NetworkInterfaceAddress > > reader_
Definition: AgentImpl.h:109
#define TheServiceParticipant

◆ remove_local_agent_info_listener()

void OpenDDS::ICE::AgentImpl::remove_local_agent_info_listener ( DCPS::WeakRcHandle< Endpoint a_endpoint,
const DCPS::GUID_t a_local_guid 
)
virtual

Implements OpenDDS::ICE::Agent.

Definition at line 158 of file AgentImpl.cpp.

References ACE_GUARD, endpoint_managers_, mutex, and OPENDDS_ASSERT.

160 {
162  EndpointManagerMapType::const_iterator pos = endpoint_managers_.find(a_endpoint);
163  OPENDDS_ASSERT(pos != endpoint_managers_.end());
164  pos->second->remove_agent_info_listener(a_local_guid);
165 }
ACE_Recursive_Thread_Mutex mutex
Definition: AgentImpl.h:100
#define ACE_GUARD(MUTEX, OBJ, LOCK)
#define OPENDDS_ASSERT(C)
Definition: Definitions.h:66
EndpointManagerMapType endpoint_managers_
Definition: AgentImpl.h:113

◆ shutdown()

void OpenDDS::ICE::AgentImpl::shutdown ( void  )
virtual

Implements OpenDDS::ICE::Agent.

Definition at line 251 of file AgentImpl.cpp.

References ACE_Reactor::cancel_timer(), and OpenDDS::DCPS::ReactorInterceptor::reactor().

Referenced by notify_shutdown().

252 {
253  reactor()->cancel_timer(this, 0);
254 }
virtual ACE_Reactor * reactor() const
virtual int cancel_timer(long timer_id, const void **arg=0, int dont_call_handle_close=1)

◆ start_ice()

void OpenDDS::ICE::AgentImpl::start_ice ( DCPS::WeakRcHandle< Endpoint a_endpoint,
const DCPS::GUID_t a_local_guid,
const DCPS::GUID_t a_remote_guid,
const AgentInfo a_remote_agent_info 
)
virtual

Implements OpenDDS::ICE::Agent.

Definition at line 167 of file AgentImpl.cpp.

References ACE_GUARD, check_invariants(), endpoint_managers_, mutex, and OPENDDS_ASSERT.

171 {
174  EndpointManagerMapType::const_iterator pos = endpoint_managers_.find(a_endpoint);
175  OPENDDS_ASSERT(pos != endpoint_managers_.end());
176  pos->second->start_ice(a_local_guid, a_remote_guid, a_remote_agent_info);
178 }
ACE_Recursive_Thread_Mutex mutex
Definition: AgentImpl.h:100
#define ACE_GUARD(MUTEX, OBJ, LOCK)
#define OPENDDS_ASSERT(C)
Definition: Definitions.h:66
void check_invariants() const
Definition: AgentImpl.cpp:238
EndpointManagerMapType endpoint_managers_
Definition: AgentImpl.h:113

◆ stop_ice()

void OpenDDS::ICE::AgentImpl::stop_ice ( DCPS::WeakRcHandle< Endpoint a_endpoint,
const DCPS::GUID_t a_local_guid,
const DCPS::GUID_t a_remote_guid 
)
virtual

Implements OpenDDS::ICE::Agent.

Definition at line 180 of file AgentImpl.cpp.

References ACE_GUARD, check_invariants(), endpoint_managers_, mutex, and OPENDDS_ASSERT.

183 {
186  EndpointManagerMapType::const_iterator pos = endpoint_managers_.find(a_endpoint);
187  OPENDDS_ASSERT(pos != endpoint_managers_.end());
188  pos->second->stop_ice(a_local_guid, a_remote_guid);
190 }
ACE_Recursive_Thread_Mutex mutex
Definition: AgentImpl.h:100
#define ACE_GUARD(MUTEX, OBJ, LOCK)
#define OPENDDS_ASSERT(C)
Definition: Definitions.h:66
void check_invariants() const
Definition: AgentImpl.cpp:238
EndpointManagerMapType endpoint_managers_
Definition: AgentImpl.h:113

◆ unfreeze()

void OpenDDS::ICE::AgentImpl::unfreeze ( const FoundationType a_foundation)

Definition at line 233 of file AgentImpl.cpp.

References to_unfreeze_.

Referenced by add(), and OpenDDS::ICE::Checklist::succeeded().

234 {
235  to_unfreeze_.push_back(a_foundation);
236 }
FoundationList to_unfreeze_
Definition: AgentImpl.h:107

Member Data Documentation

◆ active_foundations_

ActiveFoundationSet OpenDDS::ICE::AgentImpl::active_foundations_
private

Definition at line 106 of file AgentImpl.h.

Referenced by add(), check_invariants(), contains(), and remove().

◆ endpoint_managers_

EndpointManagerMapType OpenDDS::ICE::AgentImpl::endpoint_managers_
private

◆ last_execute_

DCPS::MonotonicTimePoint OpenDDS::ICE::AgentImpl::last_execute_
private

Definition at line 128 of file AgentImpl.h.

◆ mutex

ACE_Recursive_Thread_Mutex OpenDDS::ICE::AgentImpl::mutex
mutable

◆ reader_

DCPS::RcHandle<DCPS::InternalDataReader<DCPS::NetworkInterfaceAddress> > OpenDDS::ICE::AgentImpl::reader_
private

Definition at line 109 of file AgentImpl.h.

Referenced by add_endpoint(), on_data_available(), and remove_endpoint().

◆ reader_added_

bool OpenDDS::ICE::AgentImpl::reader_added_
private

Definition at line 110 of file AgentImpl.h.

Referenced by add_endpoint(), and remove_endpoint().

◆ remote_peer_reflexive_counter_

size_t OpenDDS::ICE::AgentImpl::remote_peer_reflexive_counter_
private

Definition at line 111 of file AgentImpl.h.

Referenced by remote_peer_reflexive_counter().

◆ tasks_

std::priority_queue<Item> OpenDDS::ICE::AgentImpl::tasks_
private

Definition at line 127 of file AgentImpl.h.

◆ to_unfreeze_

FoundationList OpenDDS::ICE::AgentImpl::to_unfreeze_
private

Definition at line 107 of file AgentImpl.h.

Referenced by process_deferred(), and unfreeze().

◆ unfreeze_

bool OpenDDS::ICE::AgentImpl::unfreeze_
private

Definition at line 108 of file AgentImpl.h.

Referenced by process_deferred(), and remove().


The documentation for this class was generated from the following files: