Line data Source code
1 : /* 2 : * 3 : * 4 : * Distributed under the OpenDDS License. 5 : * See: http://www.opendds.org/license.html 6 : */ 7 : 8 : #include <DCPS/DdsDcps_pch.h> // Only the _pch include should start with DCPS/ 9 : 10 : #include "JobQueue.h" 11 : 12 : #include "Service_Participant.h" 13 : 14 : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL 15 : 16 : namespace OpenDDS { 17 : namespace DCPS { 18 : 19 10 : JobQueue::JobQueue(ACE_Reactor* reactor) 20 : { 21 10 : this->reactor(reactor); 22 10 : } 23 : 24 1 : int JobQueue::handle_exception(ACE_HANDLE /*fd*/) 25 : { 26 1 : ThreadStatusManager::Event ev(TheServiceParticipant->get_thread_status_manager()); 27 : 28 1 : Queue q; 29 : 30 1 : ACE_Reverse_Lock<ACE_Thread_Mutex> rev_lock(mutex_); 31 1 : ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, mutex_, -1); 32 1 : q.swap(job_queue_); 33 2 : for (Queue::const_iterator pos = q.begin(), limit = q.end(); pos != limit; ++pos) { 34 1 : ACE_GUARD_RETURN(ACE_Reverse_Lock<ACE_Thread_Mutex>, rev_guard, rev_lock, -1); 35 1 : (*pos)->execute(); 36 1 : } 37 : 38 1 : if (!job_queue_.empty()) { 39 0 : guard.release(); 40 0 : reactor()->notify(this); 41 : } 42 : 43 1 : return 0; 44 1 : } 45 : 46 : } // namespace DCPS 47 : } // namespace OpenDDS 48 : 49 : OPENDDS_END_VERSIONED_NAMESPACE_DECL