ReactorInterceptor.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
00009 
00010 #include "ace/Log_Msg.h"
00011 #include "ace/Synch.h"
00012 
00013 #include "ReactorInterceptor.h"
00014 
00015 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00016 
00017 namespace OpenDDS {
00018 namespace DCPS {
00019 
00020 ReactorInterceptor::ReactorInterceptor(ACE_Reactor* reactor,
00021                                        ACE_thread_t owner)
00022   : owner_(owner)
00023   , condition_(mutex_)
00024 {
00025   if (reactor == 0) {
00026     ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: ReactorInterceptor initialized with null reactor\n"));
00027   }
00028   this->reactor(reactor);
00029 }
00030 
00031 ReactorInterceptor::~ReactorInterceptor()
00032 {
00033 }
00034 
00035 bool ReactorInterceptor::should_execute_immediately()
00036 {
00037   return ACE_OS::thr_equal(owner_, ACE_Thread::self()) ||
00038     reactor_is_shut_down();
00039 }
00040 
00041 void ReactorInterceptor::wait()
00042 {
00043   ACE_GUARD(ACE_Thread_Mutex, guard, this->mutex_);
00044 
00045   if (should_execute_immediately()) {
00046     handle_exception_i(guard);
00047     reactor()->purge_pending_notifications(this);
00048   } else {
00049     while (!command_queue_.empty()) {
00050       condition_.wait();
00051     }
00052   }
00053 }
00054 
00055 
00056 int ReactorInterceptor::handle_exception(ACE_HANDLE /*fd*/)
00057 {
00058   ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, this->mutex_, 0);
00059 
00060   return handle_exception_i(guard);
00061 }
00062 
00063 void ReactorInterceptor::process_command_queue()
00064 {
00065   while (!command_queue_.empty()) {
00066     CommandPtr command = move(command_queue_.front());
00067     command_queue_.pop();
00068     if (command)
00069       command->execute();
00070   }
00071 }
00072 
00073 int ReactorInterceptor::handle_exception_i(ACE_Guard<ACE_Thread_Mutex>&)
00074 {
00075   process_command_queue();
00076   condition_.signal();
00077   return 0;
00078 }
00079 
00080 }
00081 }
00082 
00083 OPENDDS_END_VERSIONED_NAMESPACE_DECL
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1