ReactorInterceptor.h

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #ifndef OPENDDS_DCPS_REACTORINTERCEPTOR_H
00009 #define OPENDDS_DCPS_REACTORINTERCEPTOR_H
00010 
00011 #include "PoolAllocator.h"
00012 #include "PoolAllocationBase.h"
00013 #include "ace/Reactor.h"
00014 #include "ace/Thread.h"
00015 #include "ace/Condition_Thread_Mutex.h"
00016 #include "RcEventHandler.h"
00017 #include "dcps_export.h"
00018 #include "unique_ptr.h"
00019 
00020 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00021 
00022 namespace OpenDDS {
00023 namespace DCPS {
00024 
00025 class OpenDDS_Dcps_Export ReactorInterceptor : public RcEventHandler {
00026 public:
00027 
00028   class Command
00029   : public PoolAllocationBase
00030   , public EnableContainerSupportedUniquePtr<Command> {
00031   public:
00032     virtual ~Command() { }
00033     virtual void execute() = 0;
00034   };
00035   typedef container_supported_unique_ptr<Command> CommandPtr;
00036 
00037   bool should_execute_immediately();
00038   void process_command_queue();
00039 
00040   template<typename T>
00041   void execute_or_enqueue(T& t)
00042   {
00043     if (should_execute_immediately()) {
00044       ACE_GUARD(ACE_Thread_Mutex, guard, this->mutex_);
00045       process_command_queue();
00046       t.execute();
00047     } else {
00048       enqueue(t);
00049     }
00050   }
00051 
00052   template<typename T>
00053   void enqueue(T& t)
00054   {
00055     ACE_GUARD(ACE_Thread_Mutex, guard, this->mutex_);
00056     command_queue_.push(CommandPtr(new T(t)));
00057     this->reactor()->notify(this);
00058   }
00059 
00060   void wait();
00061 
00062   virtual bool reactor_is_shut_down() const = 0;
00063 
00064 protected:
00065   ReactorInterceptor(ACE_Reactor* reactor,
00066                      ACE_thread_t owner);
00067 
00068   virtual ~ReactorInterceptor();
00069   int handle_exception(ACE_HANDLE /*fd*/);
00070   int handle_exception_i(ACE_Guard<ACE_Thread_Mutex>& guard);
00071   ACE_thread_t owner_;
00072   ACE_Thread_Mutex mutex_;
00073   ACE_Condition_Thread_Mutex condition_;
00074   OPENDDS_QUEUE(CommandPtr) command_queue_;
00075 };
00076 
00077 } // namespace DCPS
00078 } // namespace OpenDDS
00079 
00080 OPENDDS_END_VERSIONED_NAMESPACE_DECL
00081 
00082 #endif /* OPENDDS_DCPS_REACTORINTERCEPTOR_H  */
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1