#include <ScheduleOutputHandler.h>
Inheritance diagram for OpenDDS::DCPS::ScheduleOutputHandler:
Public Member Functions | |
ScheduleOutputHandler (TransportSendStrategy *strategy, ACE_Reactor *reactor) | |
Construct with the reactor and strategy. | |
void | schedule_output () |
Update output processing in the reactor. | |
ACE_Event_Handler methods | |
virtual int | handle_exception (ACE_HANDLE) |
modify the reactor mask for the handle. | |
Private Types | |
Disabled | |
Enabled | |
enum | HandlerState { Disabled, Enabled } |
Cache the state that we have set the reactor into. More... | |
Private Attributes | |
TransportSendStrategy * | strategy_ |
Strategy sending data to be scheduled (or not). | |
HandlerState | state_ |
This class implements a simple notification handler that is used to schedule or cancel output processing for queued data according to the current mode state of the TransportSendStrategy. If the send strategy is queueing data, then the reactor is enabled to process on output events. Otherwise the output processing callbacks are cancelled.
Definition at line 30 of file ScheduleOutputHandler.h.
enum OpenDDS::DCPS::ScheduleOutputHandler::HandlerState [private] |
Cache the state that we have set the reactor into.
Definition at line 51 of file ScheduleOutputHandler.h.
ACE_INLINE OpenDDS::DCPS::ScheduleOutputHandler::ScheduleOutputHandler | ( | TransportSendStrategy * | strategy, | |
ACE_Reactor * | reactor | |||
) |
Construct with the reactor and strategy.
Definition at line 12 of file ScheduleOutputHandler.inl.
References DBG_ENTRY_LVL, and OpenDDS::DCPS::ENABLED.
00015 : ACE_Event_Handler( reactor), 00016 strategy_( strategy), 00017 state_( Disabled) 00018 { 00019 DBG_ENTRY_LVL("ScheduleOutputHandler","ScheduleOutputHandler",6); 00020 00021 reference_counting_policy().value( 00022 ACE_Event_Handler::Reference_Counting_Policy::ENABLED); 00023 }
int OpenDDS::DCPS::ScheduleOutputHandler::handle_exception | ( | ACE_HANDLE | ) | [virtual] |
modify the reactor mask for the handle.
Definition at line 37 of file ScheduleOutputHandler.cpp.
References DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, Disabled, Enabled, OpenDDS::DCPS::TransportSendStrategy::get_handle(), OpenDDS::DCPS::ThreadSynchWorker::id(), OpenDDS::DCPS::TransportSendStrategy::mode(), OpenDDS::DCPS::TransportSendStrategy::MODE_DIRECT, OpenDDS::DCPS::TransportSendStrategy::MODE_QUEUE, OpenDDS::DCPS::TransportSendStrategy::MODE_SUSPEND, state_, and strategy_.
00038 { 00039 DBG_ENTRY_LVL("ScheduleOutputHandler","handle_exception",6); 00040 00041 if( reference_count_ == 1) { 00042 // The containing TransportSendStrategy has unregistered, so we are 00043 // going away and can't determine the mode, don't process. 00044 return -1; 00045 } 00046 00047 // Check the *current* mode value as it might have been changed since 00048 // we were scheduled to run. 00049 // 00050 // We already hold the Reactor::token_ (since we are being called from 00051 // the reactor), and it will see the recursion and allow the call back 00052 // into the reactor. We hold no other locks, and so avoid deadlock. 00053 TransportSendStrategy::SendMode mode = strategy_->mode(); 00054 bool changed = false; 00055 00056 ACE_HANDLE handle = strategy_->get_handle(); 00057 // We need to recheck the mode here since it might have already changed. 00058 if( mode == TransportSendStrategy::MODE_DIRECT) { 00059 // Don't cancel a canceled handle. 00060 if( state_ == Enabled) { 00061 reactor()->cancel_wakeup(handle, ACE_Event_Handler::WRITE_MASK); 00062 state_ = Disabled; 00063 changed = true; 00064 } 00065 00066 } else if( (mode == TransportSendStrategy::MODE_QUEUE) 00067 || (mode == TransportSendStrategy::MODE_SUSPEND)) { 00068 00069 // Don't schedule a scheduled handle. 00070 if( state_ == Disabled) { 00071 reactor()->schedule_wakeup(handle, ACE_Event_Handler::WRITE_MASK); 00072 state_ = Enabled; 00073 changed = true; 00074 } 00075 } 00076 00077 if (DCPS_debug_level > 4) { 00078 ACE_DEBUG((LM_DEBUG, 00079 ACE_TEXT("(%P|%t) ScheduleOutputHandler::handle_exception() - [%d] ") 00080 ACE_TEXT("%C data queueing for handle %d.\n"), 00081 strategy_->id(), 00082 (changed? ((state_ == Enabled)? "starting": "canceling"): "declining to change"), 00083 handle)); 00084 } 00085 00086 // Terminate the upcall and remove from the reactor, if there (and 00087 // decrement_reference()). 00088 return -1; 00089 }
void OpenDDS::DCPS::ScheduleOutputHandler::schedule_output | ( | ) |
Update output processing in the reactor.
Filter the notifications here to reduce load.
Definition at line 18 of file ScheduleOutputHandler.cpp.
References DBG_ENTRY_LVL, Disabled, Enabled, OpenDDS::DCPS::TransportSendStrategy::mode(), OpenDDS::DCPS::TransportSendStrategy::MODE_DIRECT, OpenDDS::DCPS::TransportSendStrategy::MODE_QUEUE, OpenDDS::DCPS::TransportSendStrategy::MODE_SUSPEND, state_, and strategy_.
Referenced by OpenDDS::DCPS::ReactorSynch::work_available().
00019 { 00020 DBG_ENTRY_LVL("ScheduleOutputHandler","schedule_output",6); 00021 00022 // Only emit once into the notification queue since we check which 00023 // operation to perform when we process. 00024 if( reference_count_ == 1) { 00025 /// Filter the notifications here to reduce load. 00026 TransportSendStrategy::SendMode mode = strategy_->mode(); 00027 00028 if( ( (state_ == Enabled) && (mode == TransportSendStrategy::MODE_DIRECT)) 00029 || ( (state_ == Disabled) && ( (mode == TransportSendStrategy::MODE_QUEUE) 00030 || (mode == TransportSendStrategy::MODE_SUSPEND)))) { 00031 reactor()->notify(this); 00032 } 00033 } 00034 }
Definition at line 52 of file ScheduleOutputHandler.h.
Referenced by handle_exception(), and schedule_output().
Strategy sending data to be scheduled (or not).
Definition at line 48 of file ScheduleOutputHandler.h.
Referenced by handle_exception(), and schedule_output().