ReactorInterceptor.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
00009 #include "ReactorInterceptor.h"
00010 
00011 #include "ace/Log_Msg.h"
00012 
00013 namespace OpenDDS {
00014 namespace DCPS {
00015 
00016 ReactorInterceptor::ReactorInterceptor(ACE_Reactor* reactor,
00017                                        ACE_thread_t owner)
00018   : owner_(owner)
00019   , condition_(mutex_)
00020   , registration_counter_(0)
00021   , destroy_(false)
00022 {
00023   if (reactor == 0) {
00024     ACE_DEBUG((LM_ERROR, "(%P|%t) ERROR: ReactorInterceptor initialized with null reactor\n"));
00025   }
00026   this->reactor(reactor);
00027 }
00028 
00029 ReactorInterceptor::~ReactorInterceptor()
00030 {
00031   ACE_GUARD(ACE_Thread_Mutex, guard, this->mutex_);
00032 
00033   // Dump the command queue.
00034   while (!command_queue_.empty ()) {
00035     delete command_queue_.front ();
00036     command_queue_.pop ();
00037   }
00038 }
00039 
00040 bool ReactorInterceptor::should_execute_immediately()
00041 {
00042   return ACE_OS::thr_equal(owner_, ACE_Thread::self()) ||
00043     reactor_is_shut_down();
00044 }
00045 
00046 void ReactorInterceptor::wait()
00047 {
00048   ACE_GUARD(ACE_Thread_Mutex, guard, this->mutex_);
00049 
00050   if (should_execute_immediately()) {
00051     handle_exception_i(guard);
00052   } else {
00053     while (!command_queue_.empty()) {
00054       condition_.wait();
00055     }
00056   }
00057 }
00058 
00059 void ReactorInterceptor::destroy()
00060 {
00061   ACE_GUARD(ACE_Thread_Mutex, guard, this->mutex_);
00062   if (!reactor_is_shut_down() && registration_counter_ > 0) {
00063     // Wait until we get handle exception.
00064     destroy_ = true;
00065   } else {
00066     guard.release();
00067     delete this;
00068   }
00069 }
00070 
00071 int ReactorInterceptor::handle_exception(ACE_HANDLE /*fd*/)
00072 {
00073   ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, this->mutex_, 0);
00074 
00075   --registration_counter_;
00076 
00077   return handle_exception_i(guard);
00078 }
00079 
00080 void ReactorInterceptor::process_command_queue()
00081 {
00082   while (!command_queue_.empty()) {
00083     Command* command = command_queue_.front();
00084     command_queue_.pop();
00085     command->execute();
00086     delete command;
00087   }
00088 }
00089 
00090 int ReactorInterceptor::handle_exception_i(ACE_Guard<ACE_Thread_Mutex>& guard)
00091 {
00092   process_command_queue();
00093 
00094   condition_.signal();
00095 
00096   if (registration_counter_ == 0 && destroy_) {
00097     guard.release();
00098     delete this;
00099   }
00100   return 0;
00101 }
00102 
00103 }
00104 
00105 }

Generated on Fri Feb 12 20:05:25 2016 for OpenDDS by  doxygen 1.4.7