Line data Source code
1 : /* 2 : * 3 : * 4 : * Distributed under the OpenDDS License. 5 : * See: http://www.opendds.org/license.html 6 : */ 7 : 8 : #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/ 9 : 10 : #include "debug.h" 11 : #include "DispatchService.h" 12 : #include "Service_Participant.h" 13 : #include "TimeDuration.h" 14 : 15 : #include <ace/Reverse_Lock_T.h> 16 : 17 : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL 18 : 19 : namespace OpenDDS { 20 : namespace DCPS { 21 : 22 48 : DispatchService::DispatchService(size_t count) 23 48 : : cv_(mutex_) 24 48 : , allow_dispatch_(true) 25 48 : , stop_when_empty_(false) 26 48 : , running_(true) 27 48 : , running_threads_(0) 28 48 : , max_timer_id_(LONG_MAX) 29 96 : , pool_(count, run, this) 30 : { 31 48 : } 32 : 33 76 : DispatchService::~DispatchService() 34 : { 35 48 : shutdown(); 36 76 : } 37 : 38 84 : void DispatchService::shutdown(bool immediate, EventQueue* const pending) 39 : { 40 84 : ACE_Guard<ACE_Thread_Mutex> guard(mutex_); 41 84 : allow_dispatch_ = false; 42 84 : stop_when_empty_ = true; 43 84 : running_ = running_ && !immediate; // && with existing state in case shutdown has already been called 44 84 : cv_.notify_all(); 45 : 46 84 : if (pool_.contains(ACE_Thread::self())) { 47 0 : if (log_level >= LogLevel::Error) { 48 0 : ACE_ERROR((LM_ERROR, "(%P|%t) ERROR :: DispatchService::shutdown: Contained Thread Attempting To Call Shutdown.\n")); 49 : } 50 0 : if (pending) { 51 0 : pending->clear(); 52 : } 53 0 : return; 54 : } 55 : 56 180 : while (running_threads_) { 57 96 : cv_.wait(TheServiceParticipant->get_thread_status_manager()); 58 : } 59 : 60 84 : if (pending) { 61 31 : pending->clear(); 62 31 : pending->swap(event_queue_); 63 31 : const TimerQueueMap& cmap = timer_queue_map_; 64 35 : for (TimerQueueMap::const_iterator it = cmap.begin(), limit = cmap.end(); it != limit; ++it) { 65 4 : pending->push_back(it->second.first); 66 : } 67 : } else { 68 53 : event_queue_.clear(); 69 : } 70 84 : timer_queue_map_.clear(); 71 84 : timer_id_map_.clear(); 72 84 : } 73 : 74 4147 : DispatchService::DispatchStatus DispatchService::dispatch(FunPtr fun, void* arg) 75 : { 76 4147 : if (!fun) { 77 0 : return DS_ERROR; 78 : } 79 : 80 4147 : ACE_Guard<ACE_Thread_Mutex> guard(mutex_); 81 4147 : if (allow_dispatch_) { 82 4146 : event_queue_.push_back(std::make_pair(fun, arg)); 83 4146 : cv_.notify_one(); 84 4146 : return DS_SUCCESS; 85 : } 86 1 : return DS_ERROR; 87 4147 : } 88 : 89 84 : DispatchService::TimerId DispatchService::schedule(FunPtr fun, void* arg, const MonotonicTimePoint& expiration) 90 : { 91 84 : if (!fun) { 92 0 : return TI_FAILURE; 93 : } 94 : 95 84 : TimerId id = 0; 96 84 : ACE_Guard<ACE_Thread_Mutex> guard(mutex_); 97 84 : if (allow_dispatch_) { 98 83 : TimerQueueMap::iterator pos = timer_queue_map_.insert(std::make_pair(expiration, std::make_pair(std::make_pair(fun, arg), 0))); 99 : // Make it a loop in case we ever recycle timer ids 100 83 : const TimerId starting_id = max_timer_id_; 101 : do { 102 83 : id = max_timer_id_ = max_timer_id_ == LONG_MAX ? 1 : max_timer_id_ + 1; 103 83 : if (id == starting_id) { 104 0 : return TI_FAILURE; // all ids in use ?! 105 : } 106 83 : pos->second.second = id; 107 83 : } while (timer_id_map_.insert(std::make_pair(id, pos)).second == false); 108 83 : cv_.notify_one(); 109 83 : return id; 110 : } 111 1 : return TI_FAILURE; 112 84 : } 113 : 114 25 : size_t DispatchService::cancel(DispatchService::TimerId id, void** arg) 115 : { 116 25 : ACE_Guard<ACE_Thread_Mutex> guard(mutex_); 117 25 : TimerIdMap::iterator pos = timer_id_map_.find(id); 118 25 : if (pos != timer_id_map_.end()) { 119 23 : if (pos->second == timer_queue_map_.begin()) { 120 13 : cv_.notify_all(); 121 : } 122 23 : if (arg) { 123 17 : *arg = pos->second->second.first.second; 124 : } 125 23 : timer_queue_map_.erase(pos->second); 126 23 : timer_id_map_.erase(pos); 127 23 : return 1; 128 : } 129 2 : return 0; 130 25 : } 131 : 132 4 : size_t DispatchService::cancel(FunPtr fun, void* arg) 133 : { 134 4 : OPENDDS_ASSERT(fun); 135 4 : size_t count = 0; 136 4 : ACE_Guard<ACE_Thread_Mutex> guard(mutex_); 137 22 : for (TimerQueueMap::iterator it = timer_queue_map_.begin(); it != timer_queue_map_.end();) { 138 18 : if (it->second.first.first == fun && it->second.first.second == arg) { 139 12 : if (it == timer_queue_map_.begin()) { 140 6 : cv_.notify_all(); 141 : } 142 12 : timer_id_map_.erase(it->second.second); 143 12 : timer_queue_map_.erase(it++); 144 12 : ++count; 145 : } else { 146 6 : ++it; 147 : } 148 : } 149 4 : return count; 150 4 : } 151 : 152 134 : ACE_THR_FUNC_RETURN DispatchService::run(void* arg) 153 : { 154 134 : DispatchService& dispatcher = *static_cast<DispatchService*>(arg); 155 134 : dispatcher.run_event_loop(); 156 134 : return 0; 157 : } 158 : 159 134 : void DispatchService::run_event_loop() 160 : { 161 134 : ACE_Reverse_Lock<ACE_Thread_Mutex> rev_lock(mutex_); 162 134 : ACE_Guard<ACE_Thread_Mutex> guard(mutex_); 163 134 : ++running_threads_; 164 4645 : while (running_) { 165 : 166 : // Logical Order: 167 : // - Move expired timer events into normal event queue 168 : // - Wait appropriate length if there's nothing to do 169 : // - Check for early exit before execution 170 : // - Run first task from event queue 171 : 172 4511 : if (allow_dispatch_ && !timer_queue_map_.empty()) { 173 185 : const MonotonicTimePoint now = MonotonicTimePoint::now(); 174 : 175 185 : TimerQueueMap::iterator last = timer_queue_map_.upper_bound(now), pos = last; 176 229 : while (pos != timer_queue_map_.begin()) { 177 44 : --pos; 178 44 : event_queue_.push_back(pos->second.first); 179 44 : timer_id_map_.erase(pos->second.second); 180 : } 181 185 : if (last != timer_queue_map_.begin()) { 182 44 : timer_queue_map_.erase(timer_queue_map_.begin(), last); 183 : } 184 185 : } 185 : 186 4511 : if (event_queue_.empty()) { 187 386 : if (stop_when_empty_) { 188 38 : running_ = false; 189 38 : cv_.notify_all(); 190 348 : } else if (allow_dispatch_ && timer_queue_map_.size()) { 191 126 : MonotonicTimePoint deadline(timer_queue_map_.begin()->first); 192 126 : cv_.wait_until(deadline, TheServiceParticipant->get_thread_status_manager()); 193 126 : } else { 194 222 : cv_.wait(TheServiceParticipant->get_thread_status_manager()); 195 : } 196 : } 197 : 198 4511 : if (!running_ || event_queue_.empty()) continue; 199 : 200 4189 : FunArgPair pair = event_queue_.front(); 201 4189 : event_queue_.pop_front(); 202 4189 : ACE_Guard<ACE_Reverse_Lock<ACE_Thread_Mutex> > rev_guard(rev_lock); 203 4189 : pair.first(pair.second); 204 4189 : } 205 134 : --running_threads_; 206 134 : cv_.notify_all(); 207 134 : } 208 : 209 : } // DCPS 210 : } // OpenDDS 211 : 212 : OPENDDS_END_VERSIONED_NAMESPACE_DECL