OpenDDS  Snapshot(2023/04/28-20:55)
ScheduleOutputHandler.cpp
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
10 
11 #include "TransportSendStrategy.h"
13 
14 #if !defined (__ACE_INLINE__)
16 #endif /* __ACE_INLINE__ */
17 
18 void
20 {
21  DBG_ENTRY_LVL("ScheduleOutputHandler","schedule_output",6);
22 
23  // Only emit once into the notification queue since we check which
24  // operation to perform when we process.
25  if( reference_count_ == 1) {
26  /// Filter the notifications here to reduce load.
28 
29  if( ( (state_ == Enabled) && (mode == TransportSendStrategy::MODE_DIRECT))
30  || ( (state_ == Disabled) && ( (mode == TransportSendStrategy::MODE_QUEUE)
31  || (mode == TransportSendStrategy::MODE_SUSPEND)))) {
32  reactor()->notify(this);
33  }
34  }
35 }
36 
37 int
39 {
40  DBG_ENTRY_LVL("ScheduleOutputHandler","handle_exception",6);
41 
42  ThreadStatusManager::Event ev(TheServiceParticipant->get_thread_status_manager());
43 
44  if( reference_count_ == 1) {
45  // The containing TransportSendStrategy has unregistered, so we are
46  // going away and can't determine the mode, don't process.
47  return -1;
48  }
49 
50  // Check the *current* mode value as it might have been changed since
51  // we were scheduled to run.
52  //
53  // We already hold the Reactor::token_ (since we are being called from
54  // the reactor), and it will see the recursion and allow the call back
55  // into the reactor. We hold no other locks, and so avoid deadlock.
57  bool changed = false;
58 
59  ACE_HANDLE handle = strategy_->get_handle();
60  // We need to recheck the mode here since it might have already changed.
62  // Don't cancel a canceled handle.
63  if( state_ == Enabled) {
65  state_ = Disabled;
66  changed = true;
67  }
68 
69  } else if( (mode == TransportSendStrategy::MODE_QUEUE)
71 
72  // Don't schedule a scheduled handle.
73  if( state_ == Disabled) {
75  state_ = Enabled;
76  changed = true;
77  }
78  }
79 
80  if (DCPS_debug_level > 4) {
82  ACE_TEXT("(%P|%t) ScheduleOutputHandler::handle_exception() - [%d] ")
83  ACE_TEXT("%C data queueing for handle %d.\n"),
84  strategy_->id(),
85  (changed? ((state_ == Enabled)? "starting": "canceling"): "declining to change"),
86  handle));
87  }
88 
89  // Terminate the upcall and remove from the reactor, if there (and
90  // decrement_reference()).
91  return -1;
92 }
#define ACE_DEBUG(X)
SendMode mode() const
Access the current sending mode.
virtual int handle_exception(ACE_HANDLE)
modify the reactor mask for the handle.
Atomic_Reference_Count reference_count_
LM_DEBUG
int notify(ACE_Event_Handler *event_handler=0, ACE_Reactor_Mask masks=ACE_Event_Handler::EXCEPT_MASK, ACE_Time_Value *timeout=0)
void schedule_output()
Update output processing in the reactor.
std::size_t id() const
DataLink reference value for diagnostics.
int cancel_wakeup(ACE_Event_Handler *event_handler, ACE_Reactor_Mask masks_to_be_cleared)
int schedule_wakeup(ACE_Event_Handler *event_handler, ACE_Reactor_Mask masks_to_be_added)
virtual ACE_Reactor * reactor(void) const
ACE_TEXT("TCP_Factory")
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
#define TheServiceParticipant
TransportSendStrategy * strategy_
Strategy sending data to be scheduled (or not).