00001
00002
00003
00004
00005
00006
00007
00008 #ifndef OPENDDS_DCPS_TRANSPORTREACTORTASK_H
00009 #define OPENDDS_DCPS_TRANSPORTREACTORTASK_H
00010
00011 #include "dds/DCPS/dcps_export.h"
00012 #include "dds/DCPS/RcObject_T.h"
00013 #include "ace/Condition_T.h"
00014 #include "ace/Task.h"
00015 #include "ace/Barrier.h"
00016
00017 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
00018 class ACE_Proactor;
00019 class ACE_Reactor;
00020 ACE_END_VERSIONED_NAMESPACE_DECL
00021
00022 namespace OpenDDS {
00023 namespace DCPS {
00024
00025 class OpenDDS_Dcps_Export TransportReactorTask : public virtual ACE_Task_Base,
00026 public virtual RcObject<ACE_SYNCH_MUTEX> {
00027 public:
00028
00029 TransportReactorTask(bool useAsyncSend);
00030 virtual ~TransportReactorTask();
00031
00032 virtual int open(void*);
00033 virtual int svc();
00034 virtual int close(u_long flags = 0);
00035
00036 void stop();
00037
00038 ACE_Reactor* get_reactor();
00039 const ACE_Reactor* get_reactor() const;
00040
00041 ACE_thread_t get_reactor_owner() const;
00042
00043 ACE_Proactor* get_proactor();
00044 const ACE_Proactor* get_proactor() const;
00045
00046 void wait_for_startup() { barrier_.wait(); }
00047
00048 bool is_shut_down() const { return state_ == STATE_NOT_RUNNING; }
00049
00050 OPENDDS_POOL_ALLOCATION_FWD
00051
00052 private:
00053
00054 typedef ACE_SYNCH_MUTEX LockType;
00055 typedef ACE_Guard<LockType> GuardType;
00056 typedef ACE_Condition<LockType> ConditionType;
00057
00058 enum State { STATE_NOT_RUNNING, STATE_OPENING, STATE_RUNNING };
00059
00060 ACE_Barrier barrier_;
00061 LockType lock_;
00062 State state_;
00063 ConditionType condition_;
00064 ACE_Reactor* reactor_;
00065 ACE_thread_t reactor_owner_;
00066 ACE_Proactor* proactor_;
00067 };
00068
00069 }
00070 }
00071
00072 #if defined (__ACE_INLINE__)
00073 #include "TransportReactorTask.inl"
00074 #endif
00075
00076 #endif