ReactorInterceptor.h

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #ifndef OPENDDS_DCPS_REACTORINTERCEPTOR_H
00009 #define OPENDDS_DCPS_REACTORINTERCEPTOR_H
00010 
00011 #include "PoolAllocator.h"
00012 #include "PoolAllocationBase.h"
00013 #include "ace/Reactor.h"
00014 #include "ace/Thread.h"
00015 #include "ace/Condition_Thread_Mutex.h"
00016 #include "dcps_export.h"
00017 
00018 namespace OpenDDS {
00019 namespace DCPS {
00020 
00021 class OpenDDS_Dcps_Export ReactorInterceptor : public ACE_Event_Handler, public PoolAllocationBase {
00022 public:
00023 
00024   class Command : public PoolAllocationBase {
00025   public:
00026     virtual ~Command() { }
00027     virtual void execute() = 0;
00028   };
00029 
00030   ReactorInterceptor(ACE_Reactor* reactor,
00031                      ACE_thread_t owner);
00032 
00033   bool should_execute_immediately();
00034   void process_command_queue();
00035 
00036   template<typename T>
00037   void execute_or_enqueue(T& t)
00038   {
00039     if (should_execute_immediately()) {
00040       ACE_GUARD(ACE_Thread_Mutex, guard, this->mutex_);
00041       process_command_queue();
00042       t.execute();
00043     } else {
00044       enqueue(t);
00045     }
00046   }
00047 
00048   template<typename T>
00049   void enqueue(T& t)
00050   {
00051     ACE_GUARD(ACE_Thread_Mutex, guard, this->mutex_);
00052     command_queue_.push(new T(t));
00053     ++registration_counter_;
00054     this->reactor()->notify(this);
00055   }
00056 
00057   void wait();
00058 
00059   void destroy();
00060 
00061   virtual bool reactor_is_shut_down() const = 0;
00062 
00063 protected:
00064   virtual ~ReactorInterceptor();
00065 
00066 private:
00067   int handle_exception(ACE_HANDLE /*fd*/);
00068   int handle_exception_i(ACE_Guard<ACE_Thread_Mutex>& guard);
00069   ACE_thread_t owner_;
00070   ACE_Thread_Mutex mutex_;
00071   ACE_Condition_Thread_Mutex condition_;
00072   OPENDDS_QUEUE(Command*) command_queue_;
00073   ACE_UINT64 registration_counter_;
00074   bool destroy_;
00075 };
00076 
00077 } // namespace DCPS
00078 } // namespace OpenDDS
00079 
00080 #endif /* OPENDDS_DCPS_REACTORINTERCEPTOR_H  */

Generated on Fri Feb 12 20:05:25 2016 for OpenDDS by  doxygen 1.4.7