OpenDDS  Snapshot(2023/04/07-19:43)
Public Member Functions | Private Types | Private Attributes | List of all members
OpenDDS::DCPS::ScheduleOutputHandler Class Reference

event handler used to enable and disable output processing. More...

#include <ScheduleOutputHandler.h>

Inheritance diagram for OpenDDS::DCPS::ScheduleOutputHandler:
Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::ScheduleOutputHandler:
Collaboration graph
[legend]

Public Member Functions

 ScheduleOutputHandler (TransportSendStrategy *strategy, ACE_Reactor *reactor)
 Construct with the reactor and strategy. More...
 
void schedule_output ()
 Update output processing in the reactor. More...
 
ACE_Event_Handler methods
virtual int handle_exception (ACE_HANDLE)
 modify the reactor mask for the handle. More...
 
- Public Member Functions inherited from ACE_Event_Handler
virtual ~ACE_Event_Handler (void)
 
virtual ACE_HANDLE get_handle (void) const
 
virtual void set_handle (ACE_HANDLE)
 
virtual int priority (void) const
 
virtual void priority (int priority)
 
virtual int handle_input (ACE_HANDLE fd=ACE_INVALID_HANDLE)
 
virtual int handle_output (ACE_HANDLE fd=ACE_INVALID_HANDLE)
 
virtual int handle_timeout (const ACE_Time_Value &current_time, const void *act=0)
 
virtual int handle_exit (ACE_Process *)
 
virtual int handle_close (ACE_HANDLE handle, ACE_Reactor_Mask close_mask)
 
virtual int handle_signal (int signum, siginfo_t *=0, ucontext_t *=0)
 
virtual int resume_handler (void)
 
virtual int handle_qos (ACE_HANDLE=ACE_INVALID_HANDLE)
 
virtual int handle_group_qos (ACE_HANDLE=ACE_INVALID_HANDLE)
 
virtual void reactor (ACE_Reactor *reactor)
 
virtual ACE_Reactorreactor (void) const
 
virtual ACE_Reactor_Timer_Interfacereactor_timer_interface (void) const
 
virtual Reference_Count add_reference (void)
 
virtual Reference_Count remove_reference (void)
 
Reference_Counting_Policyreference_counting_policy (void)
 

Private Types

enum  HandlerState { Disabled, Enabled }
 Cache the state that we have set the reactor into. More...
 

Private Attributes

TransportSendStrategystrategy_
 Strategy sending data to be scheduled (or not). More...
 
HandlerState state_
 

Additional Inherited Members

- Public Types inherited from ACE_Event_Handler
typedef long Reference_Count
 
- Static Public Member Functions inherited from ACE_Event_Handler
static ACE_THR_FUNC_RETURN read_adapter (void *event_handler)
 
static int register_stdin_handler (ACE_Event_Handler *eh, ACE_Reactor *reactor, ACE_Thread_Manager *thr_mgr, int flags=THR_DETACHED)
 
static int remove_stdin_handler (ACE_Reactor *reactor, ACE_Thread_Manager *thr_mgr)
 
- Public Attributes inherited from ACE_Event_Handler
 LO_PRIORITY
 
 HI_PRIORITY
 
 NULL_MASK
 
 READ_MASK
 
 WRITE_MASK
 
 EXCEPT_MASK
 
 ACCEPT_MASK
 
 CONNECT_MASK
 
 TIMER_MASK
 
 QOS_MASK
 
 GROUP_QOS_MASK
 
 SIGNAL_MASK
 
 ALL_EVENTS_MASK
 
 RWE_MASK
 
 DONT_CALL
 
 ACE_EVENT_HANDLER_NOT_RESUMED
 
 ACE_REACTOR_RESUMES_HANDLER
 
 ACE_APPLICATION_RESUMES_HANDLER
 
- Protected Types inherited from ACE_Event_Handler
typedef ACE_Atomic_Op< ACE_SYNCH_MUTEX, Reference_CountAtomic_Reference_Count
 
- Protected Member Functions inherited from ACE_Event_Handler
 ACE_Event_Handler (ACE_Reactor *=0, int priority=ACE_Event_Handler::LO_PRIORITY)
 
- Protected Attributes inherited from ACE_Event_Handler
Atomic_Reference_Count reference_count_
 

Detailed Description

event handler used to enable and disable output processing.

This class implements a simple notification handler that is used to schedule or cancel output processing for queued data according to the current mode state of the TransportSendStrategy. If the send strategy is queueing data, then the reactor is enabled to process on output events. Otherwise the output processing callbacks are cancelled.

Definition at line 32 of file ScheduleOutputHandler.h.

Member Enumeration Documentation

◆ HandlerState

Cache the state that we have set the reactor into.

Enumerator
Disabled 
Enabled 

Definition at line 53 of file ScheduleOutputHandler.h.

Constructor & Destructor Documentation

◆ ScheduleOutputHandler()

ACE_INLINE OpenDDS::DCPS::ScheduleOutputHandler::ScheduleOutputHandler ( TransportSendStrategy strategy,
ACE_Reactor reactor 
)

Construct with the reactor and strategy.

Definition at line 12 of file ScheduleOutputHandler.inl.

References DBG_ENTRY_LVL, ACE_Event_Handler::Reference_Counting_Policy::ENABLED, ACE_Event_Handler::reference_counting_policy(), and ACE_Event_Handler::Reference_Counting_Policy::value().

15  : ACE_Event_Handler( reactor),
16  strategy_( strategy),
18 {
19  DBG_ENTRY_LVL("ScheduleOutputHandler","ScheduleOutputHandler",6);
20 
23 }
Reference_Counting_Policy & reference_counting_policy(void)
TransportSendStrategy * strategy_
Strategy sending data to be scheduled (or not).
ACE_Event_Handler(ACE_Reactor *=0, int priority=ACE_Event_Handler::LO_PRIORITY)
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68

Member Function Documentation

◆ handle_exception()

int OpenDDS::DCPS::ScheduleOutputHandler::handle_exception ( ACE_HANDLE  )
virtual

modify the reactor mask for the handle.

Reimplemented from ACE_Event_Handler.

Definition at line 38 of file ScheduleOutputHandler.cpp.

References ACE_DEBUG, ACE_TEXT(), ACE_Reactor::cancel_wakeup(), DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, Disabled, Enabled, OpenDDS::DCPS::TransportSendStrategy::get_handle(), OpenDDS::DCPS::ThreadSynchWorker::id(), LM_DEBUG, OpenDDS::DCPS::TransportSendStrategy::mode(), OpenDDS::DCPS::TransportSendStrategy::MODE_DIRECT, OpenDDS::DCPS::TransportSendStrategy::MODE_QUEUE, OpenDDS::DCPS::TransportSendStrategy::MODE_SUSPEND, ACE_Event_Handler::reactor(), ACE_Event_Handler::reference_count_, ACE_Reactor::schedule_wakeup(), state_, strategy_, TheServiceParticipant, and ACE_Event_Handler::WRITE_MASK.

39 {
40  DBG_ENTRY_LVL("ScheduleOutputHandler","handle_exception",6);
41 
42  ThreadStatusManager::Event ev(TheServiceParticipant->get_thread_status_manager());
43 
44  if( reference_count_ == 1) {
45  // The containing TransportSendStrategy has unregistered, so we are
46  // going away and can't determine the mode, don't process.
47  return -1;
48  }
49 
50  // Check the *current* mode value as it might have been changed since
51  // we were scheduled to run.
52  //
53  // We already hold the Reactor::token_ (since we are being called from
54  // the reactor), and it will see the recursion and allow the call back
55  // into the reactor. We hold no other locks, and so avoid deadlock.
57  bool changed = false;
58 
59  ACE_HANDLE handle = strategy_->get_handle();
60  // We need to recheck the mode here since it might have already changed.
62  // Don't cancel a canceled handle.
63  if( state_ == Enabled) {
65  state_ = Disabled;
66  changed = true;
67  }
68 
69  } else if( (mode == TransportSendStrategy::MODE_QUEUE)
71 
72  // Don't schedule a scheduled handle.
73  if( state_ == Disabled) {
75  state_ = Enabled;
76  changed = true;
77  }
78  }
79 
80  if (DCPS_debug_level > 4) {
81  ACE_DEBUG((LM_DEBUG,
82  ACE_TEXT("(%P|%t) ScheduleOutputHandler::handle_exception() - [%d] ")
83  ACE_TEXT("%C data queueing for handle %d.\n"),
84  strategy_->id(),
85  (changed? ((state_ == Enabled)? "starting": "canceling"): "declining to change"),
86  handle));
87  }
88 
89  // Terminate the upcall and remove from the reactor, if there (and
90  // decrement_reference()).
91  return -1;
92 }
#define ACE_DEBUG(X)
SendMode mode() const
Access the current sending mode.
std::size_t id() const
DataLink reference value for diagnostics.
Atomic_Reference_Count reference_count_
TransportSendStrategy * strategy_
Strategy sending data to be scheduled (or not).
int cancel_wakeup(ACE_Event_Handler *event_handler, ACE_Reactor_Mask masks_to_be_cleared)
int schedule_wakeup(ACE_Event_Handler *event_handler, ACE_Reactor_Mask masks_to_be_added)
virtual ACE_Reactor * reactor(void) const
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_TEXT("TCP_Factory")
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
#define TheServiceParticipant

◆ schedule_output()

void OpenDDS::DCPS::ScheduleOutputHandler::schedule_output ( )

Update output processing in the reactor.

Filter the notifications here to reduce load.

Definition at line 19 of file ScheduleOutputHandler.cpp.

References DBG_ENTRY_LVL, Disabled, Enabled, OpenDDS::DCPS::TransportSendStrategy::mode(), OpenDDS::DCPS::TransportSendStrategy::MODE_DIRECT, OpenDDS::DCPS::TransportSendStrategy::MODE_QUEUE, OpenDDS::DCPS::TransportSendStrategy::MODE_SUSPEND, ACE_Reactor::notify(), ACE_Event_Handler::reactor(), ACE_Event_Handler::reference_count_, state_, and strategy_.

Referenced by OpenDDS::DCPS::ReactorSynch::work_available().

20 {
21  DBG_ENTRY_LVL("ScheduleOutputHandler","schedule_output",6);
22 
23  // Only emit once into the notification queue since we check which
24  // operation to perform when we process.
25  if( reference_count_ == 1) {
26  /// Filter the notifications here to reduce load.
28 
29  if( ( (state_ == Enabled) && (mode == TransportSendStrategy::MODE_DIRECT))
30  || ( (state_ == Disabled) && ( (mode == TransportSendStrategy::MODE_QUEUE)
31  || (mode == TransportSendStrategy::MODE_SUSPEND)))) {
32  reactor()->notify(this);
33  }
34  }
35 }
SendMode mode() const
Access the current sending mode.
Atomic_Reference_Count reference_count_
int notify(ACE_Event_Handler *event_handler=0, ACE_Reactor_Mask masks=ACE_Event_Handler::EXCEPT_MASK, ACE_Time_Value *timeout=0)
TransportSendStrategy * strategy_
Strategy sending data to be scheduled (or not).
virtual ACE_Reactor * reactor(void) const
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68

Member Data Documentation

◆ state_

HandlerState OpenDDS::DCPS::ScheduleOutputHandler::state_
private

Definition at line 54 of file ScheduleOutputHandler.h.

Referenced by handle_exception(), and schedule_output().

◆ strategy_

TransportSendStrategy* OpenDDS::DCPS::ScheduleOutputHandler::strategy_
private

Strategy sending data to be scheduled (or not).

Definition at line 50 of file ScheduleOutputHandler.h.

Referenced by handle_exception(), and schedule_output().


The documentation for this class was generated from the following files: