22 #ifdef OPENDDS_SECURITY 24 using DCPS::TimeDuration;
33 reactor(a_reactor), event_handler(a_event_handler), delay(a_delay) {}
45 if (tasks_.empty() || a_release_time < tasks_.top().release_time_) {
47 execute_or_enqueue(DCPS::make_rch<ScheduleTimerCommand>(
reactor(),
this, release - MonotonicTimePoint::now()));
49 tasks_.push(
Item(a_release_time, wtask));
69 if (tasks_.top().release_time_ <= now) {
70 Item item = tasks_.top();
82 if (!tasks_.empty()) {
84 execute_or_enqueue(DCPS::make_rch<ScheduleTimerCommand>(
reactor(),
this, release - now));
94 , reader_(DCPS::
make_rch<DCPS::InternalDataReader<DCPS::NetworkInterfaceAddress> >(DCPS::DataReaderQosBuilder().reliability_reliable().durability_transient_local(), DCPS::
rchandle_from(this)))
95 , reader_added_(false)
96 , remote_peer_reflexive_counter_(0)
146 return pos->second->agent_info();
156 pos->second->add_agent_info_listener(a_local_guid, a_agent_info_listener);
165 pos->second->remove_agent_info_listener(a_local_guid);
177 pos->second->start_ice(a_local_guid, a_remote_guid, a_remote_agent_info);
189 pos->second->stop_ice(a_local_guid, a_remote_guid);
200 return pos->second->get_address(a_local_guid, a_remote_guid);
209 if (a_local_address.
is_any()) {
210 ACE_ERROR((
LM_ERROR,
ACE_TEXT(
"(%P|%t) AgentImpl::receive: ERROR local_address is empty, ICE will not work on this platform\n")));
214 if (a_remote_address.
is_any()) {
215 ACE_ERROR((
LM_ERROR,
ACE_TEXT(
"(%P|%t) AgentImpl::receive: ERROR remote_address is empty, ICE will not work on this platform\n")));
223 pos->second->receive(a_local_address, a_remote_address, a_message);
245 pos->second->compute_active_foundations(expected);
246 pos->second->check_invariants();
265 DCPS::InternalSampleInfoSequence infos;
272 pos->second->network_change();
280 for (FoundationList::const_iterator fpos =
to_unfreeze_.begin(), flimit =
to_unfreeze_.end(); fpos != flimit; ++fpos) {
283 pos->second->unfreeze(*fpos);
293 pos->second->unfreeze();
void remove_endpoint(DCPS::WeakRcHandle< Endpoint > a_endpoint)
void add_local_agent_info_listener(DCPS::WeakRcHandle< Endpoint > a_endpoint, const DCPS::GUID_t &a_local_guid, DCPS::WeakRcHandle< AgentInfoListener > a_agent_info_listener)
RcHandle< T > rchandle_from(T *pointer)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
ActiveFoundationSet active_foundations_
AgentInfo get_local_agent_info(DCPS::WeakRcHandle< Endpoint > a_endpoint) const
bool remove(const FoundationType &a_foundation)
void unfreeze(const FoundationType &a_foundation)
std::pair< std::string, std::string > FoundationType
FoundationList to_unfreeze_
void on_data_available(DCPS::RcHandle< DCPS::InternalDataReader< DCPS::NetworkInterfaceAddress > > reader)
#define OPENDDS_ASSERT(C)
virtual ACE_Reactor * reactor() const
const SampleStateMask ANY_SAMPLE_STATE
ACE_INET_Addr get_address(DCPS::WeakRcHandle< Endpoint > a_endpoint, const DCPS::GUID_t &a_local_guid, const DCPS::GUID_t &a_remote_guid) const
const ACE_Time_Value & value() const
void add_endpoint(DCPS::WeakRcHandle< Endpoint > a_endpoint)
void stop_ice(DCPS::WeakRcHandle< Endpoint > a_endpoint, const DCPS::GUID_t &a_local_guid, const DCPS::GUID_t &a_remote_guid)
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
virtual long schedule_timer(ACE_Event_Handler *event_handler, const void *arg, const ACE_Time_Value &delay, const ACE_Time_Value &interval=ACE_Time_Value::zero)
const InstanceStateMask ANY_INSTANCE_STATE
ScheduleTimerCommand(ACE_Reactor *a_reactor, ACE_Event_Handler *a_event_handler, const TimeDuration &a_delay)
const ViewStateMask ANY_VIEW_STATE
TimePoint_T< MonotonicClock > MonotonicTimePoint
void remove_local_agent_info_listener(DCPS::WeakRcHandle< Endpoint > a_endpoint, const DCPS::GUID_t &a_local_guid)
const ACE_Reactor * reactor() const
EndpointManagerMapType endpoint_managers_
virtual int cancel_timer(long timer_id, const void **arg=0, int dont_call_handle_close=1)
void remove(const FoundationType &a_foundation)
static Configuration * instance()
bool reactor_is_shut_down() const
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
void receive(DCPS::WeakRcHandle< Endpoint > a_endpoint, const ACE_INET_Addr &a_local_address, const ACE_INET_Addr &a_remote_address, const STUN::Message &a_message)
ACE_Recursive_Thread_Mutex mutex
const long LENGTH_UNLIMITED
RcHandle< T > lock() const
ACE_Event_Handler * event_handler
#define TheServiceParticipant
void enqueue(const DCPS::MonotonicTimePoint &a_release_time, WeakTaskPtr a_task)
void check_invariants() const
void start_ice(DCPS::WeakRcHandle< Endpoint > a_endpoint, const DCPS::GUID_t &a_local_guid, const DCPS::GUID_t &a_remote_guid, const AgentInfo &a_remote_agent_info)
The Internal API and Implementation of OpenDDS.
DCPS::RcHandle< DCPS::InternalDataReader< DCPS::NetworkInterfaceAddress > > reader_
int handle_timeout(const ACE_Time_Value &a_now, const void *)