#include <PerConnectionSynch.h>
Public Member Functions | |
PerConnectionSynch (ThreadSynchResource *synch_resource, long priority, int scheduler) | |
virtual | ~PerConnectionSynch () |
virtual void | work_available () |
virtual int | open (void *) |
virtual int | svc () |
virtual int | close (u_long) |
Protected Member Functions | |
virtual int | register_worker_i () |
virtual void | unregister_worker_i () |
Private Types | |
typedef ACE_SYNCH_MUTEX | LockType |
typedef ACE_Guard< LockType > | GuardType |
typedef ACE_Condition< LockType > | ConditionType |
Private Attributes | |
LockType | lock_ |
ConditionType | condition_ |
int | work_available_ |
int | shutdown_ |
long | dds_priority_ |
long | scheduler_ |
Definition at line 23 of file PerConnectionSynch.h.
typedef ACE_Condition<LockType> OpenDDS::DCPS::PerConnectionSynch::ConditionType [private] |
Definition at line 47 of file PerConnectionSynch.h.
typedef ACE_Guard<LockType> OpenDDS::DCPS::PerConnectionSynch::GuardType [private] |
Definition at line 46 of file PerConnectionSynch.h.
typedef ACE_SYNCH_MUTEX OpenDDS::DCPS::PerConnectionSynch::LockType [private] |
Definition at line 45 of file PerConnectionSynch.h.
ACE_INLINE OpenDDS::DCPS::PerConnectionSynch::PerConnectionSynch | ( | ThreadSynchResource * | synch_resource, | |
long | priority, | |||
int | scheduler | |||
) |
Definition at line 11 of file PerConnectionSynch.inl.
References DBG_ENTRY_LVL.
00015 : ThreadSynch(synch_resource), 00016 condition_(this->lock_), 00017 work_available_(0), 00018 shutdown_(0), 00019 dds_priority_(priority), 00020 scheduler_(scheduler) 00021 { 00022 DBG_ENTRY_LVL("PerConnectionSynch","PerConnectionSynch",6); 00023 }
OpenDDS::DCPS::PerConnectionSynch::~PerConnectionSynch | ( | ) | [virtual] |
Definition at line 16 of file PerConnectionSynch.cpp.
References DBG_ENTRY_LVL.
00017 { 00018 DBG_ENTRY_LVL("PerConnectionSynch","~PerConnectionSynch",6); 00019 }
int OpenDDS::DCPS::PerConnectionSynch::close | ( | u_long | ) | [virtual] |
Reimplemented from ACE_Task_Base.
Definition at line 164 of file PerConnectionSynch.cpp.
References DBG_ENTRY_LVL.
00165 { 00166 DBG_ENTRY_LVL("PerConnectionSynch","close",6); 00167 return 0; 00168 }
int OpenDDS::DCPS::PerConnectionSynch::open | ( | void * | ) | [virtual] |
Reimplemented from ACE_Task_Base.
Definition at line 31 of file PerConnectionSynch.cpp.
References ACE_TEXT(), ACE_Task_Base::activate(), DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, dds_priority_, LM_DEBUG, scheduler_, and shutdown_.
Referenced by register_worker_i().
00032 { 00033 DBG_ENTRY_LVL("PerConnectionSynch","open",6); 00034 // Activate this object to start a new thread that will call 00035 // our svc() method, and then our close() method. 00036 this->shutdown_ = 0; 00037 00038 long flags; 00039 flags = THR_NEW_LWP | THR_JOINABLE ;//|THR_SCOPE_PROCESS | THR_SCOPE_THREAD; 00040 00041 if (this->scheduler_ >= 0) { 00042 flags |= THR_EXPLICIT_SCHED | this->scheduler_; 00043 00044 } else { 00045 flags |= THR_INHERIT_SCHED; 00046 } 00047 00048 if (DCPS_debug_level > 0) { 00049 ACE_DEBUG((LM_DEBUG, 00050 ACE_TEXT("(%P|%t) PerConnectionSynch::open(): ") 00051 ACE_TEXT("activating thread with flags 0x%08.8x ") 00052 ACE_TEXT("and priority %d.\n"), 00053 flags, 00054 this->dds_priority_)); 00055 } 00056 00057 return this->activate(flags, 1, 0, this->dds_priority_); 00058 }
int OpenDDS::DCPS::PerConnectionSynch::register_worker_i | ( | ) | [protected, virtual] |
The default implementation is to do nothing here. The subclass may override the implementation in order to do something when the worker registers. Returns 0 for success, -1 for failure.
Reimplemented from OpenDDS::DCPS::ThreadSynch.
Definition at line 171 of file PerConnectionSynch.cpp.
References DBG_ENTRY_LVL, and open().
00172 { 00173 DBG_ENTRY_LVL("PerConnectionSynch","register_worker_i",6); 00174 return this->open(0); 00175 }
int OpenDDS::DCPS::PerConnectionSynch::svc | ( | void | ) | [virtual] |
Reimplemented from ACE_Task_Base.
Definition at line 61 of file PerConnectionSynch.cpp.
References condition_, DBG_ENTRY_LVL, LM_DEBUG, OpenDDS::DCPS::WeakRcHandle< T >::lock(), lock_, shutdown_, SIG_SETMASK, ACE_OS::sigfillset(), sigset_t, ACE_OS::thr_sigsetmask(), VDBG, VDBG_LVL, ACE_Condition< MUTEX >::wait(), OpenDDS::DCPS::ThreadSynch::wait_on_clogged_resource(), work_available_, OpenDDS::DCPS::ThreadSynchWorker::WORK_OUTCOME_BROKEN_RESOURCE, OpenDDS::DCPS::ThreadSynchWorker::WORK_OUTCOME_CLOGGED_RESOURCE, OpenDDS::DCPS::ThreadSynchWorker::WORK_OUTCOME_NO_MORE_TO_DO, and OpenDDS::DCPS::ThreadSynch::worker().
00062 { 00063 DBG_ENTRY_LVL("PerConnectionSynch","svc",6); 00064 00065 // Ignore all signals to avoid 00066 // ERROR: ACE::handle_write_ready return -1 while waiting to unclog. handle_write_ready: Interrupted system call 00067 // The main thread will handle signals. 00068 sigset_t set; 00069 ACE_OS::sigfillset(&set); 00070 ACE_OS::thr_sigsetmask(SIG_SETMASK, &set, NULL); 00071 00072 ThreadSynchWorker::WorkOutcome work_outcome = 00073 ThreadSynchWorker::WORK_OUTCOME_NO_MORE_TO_DO; 00074 00075 00076 RcHandle<ThreadSynchWorker> worker = this->worker().lock(); 00077 00078 if (! worker) 00079 return ThreadSynchWorker::WORK_OUTCOME_NO_MORE_TO_DO; 00080 00081 // Loop until we honor the shutdown_ flag. 00082 while (1) { 00083 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00084 "Top of infinite svc() loop\n")); 00085 00086 { 00087 GuardType guard(this->lock_); 00088 00089 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00090 "Lock acquired. Check to see what to do next.\n")); 00091 00092 // We will wait on the condition_ if all of the following are true: 00093 // 00094 // 1) The last time the perform_work() method was called, it 00095 // indicated that there was no more work to do. 00096 // 2) Since we last invoked perform_work(), we have not been 00097 // informed that there is work_available(). 00098 // 3) We have not been asked to shutdown_ the svc() loop. 00099 // 00100 while ((work_outcome == 00101 ThreadSynchWorker::WORK_OUTCOME_NO_MORE_TO_DO) && 00102 (this->work_available_ == 0) && 00103 (this->shutdown_ == 0)) { 00104 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00105 "No work to do. Just wait on the condition.\n")); 00106 this->condition_.wait(); 00107 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00108 "We are awake from waiting on the condition.\n")); 00109 } 00110 00111 // Maybe we have been asked to shutdown_ the svc() loop. 00112 if (this->shutdown_ == 1) { 00113 VDBG_LVL((LM_DEBUG,"(%P|%t) DBG: " 00114 "Honoring the shutdown request.\n"), 5); 00115 // We are honoring the request to shutdown_ the svc() loop. 00116 break; 00117 } 00118 00119 // Or, perhaps we experienced a fatal error on our last call to 00120 // perform_work(). 00121 if (work_outcome == ThreadSynchWorker::WORK_OUTCOME_BROKEN_RESOURCE) { 00122 VDBG_LVL((LM_DEBUG,"(%P|%t) DBG: " 00123 "Fatal error - Broken SynchResounce.\n"), 5); 00124 // Stop the svc() loop. 00125 break; 00126 } 00127 00128 VDBG_LVL((LM_DEBUG,"(%P|%t) DBG: " 00129 "Reset our work_available_ flag to 0, and release lock.\n"), 5); 00130 00131 // Set our work_available_ flag to false (0) before we release the 00132 // lock so that we will only count any work_available() calls that 00133 // happen after this point. 00134 this->work_available_ = 0; 00135 } 00136 00137 if (work_outcome == ThreadSynchWorker::WORK_OUTCOME_CLOGGED_RESOURCE) { 00138 VDBG_LVL((LM_DEBUG,"(%P|%t) DBG: Need to wait for clogged resources to open up.\n"), 5); 00139 00140 // Ask the ThreadSynchResource to block us until the clog situation 00141 // clears up. 00142 if (this->wait_on_clogged_resource() == -1) { 00143 VDBG_LVL((LM_DEBUG,"(%P|%t) DBG: " 00144 "Fatal error - wait_on_clogged_resource fails.\n"), 5); 00145 break; 00146 } 00147 } 00148 00149 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00150 "Call perform_work()\n")); 00151 00152 // Without the lock, ask the worker to perform some work. It tells 00153 // us if it completed with more work to still be performed (or not). 00154 work_outcome = worker->perform_work(); 00155 00156 VDBG_LVL((LM_DEBUG,"(%P|%t) DBG: " 00157 "call to perform_work() returned %d\n",work_outcome), 5); 00158 } 00159 00160 return 0; 00161 }
void OpenDDS::DCPS::PerConnectionSynch::unregister_worker_i | ( | ) | [protected, virtual] |
The default implementation is to do nothing here. The subclass may override the implementation in order to do something when the worker unregisters.
Reimplemented from OpenDDS::DCPS::ThreadSynch.
Definition at line 178 of file PerConnectionSynch.cpp.
References condition_, DBG_ENTRY_LVL, lock_, shutdown_, ACE_Condition< MUTEX >::signal(), and ACE_Task_Base::wait().
00179 { 00180 DBG_ENTRY_LVL("PerConnectionSynch","unregister_worker_i",6); 00181 // It is at this point that we need to stop the thread that 00182 // was activated when our open() method was called. 00183 { 00184 // Acquire the lock 00185 GuardType guard(this->lock_); 00186 00187 // Set the shutdown_ flag to false to shutdown the svc() method loop. 00188 this->shutdown_ = 1; 00189 00190 // Signal the condition_ object in case the svc() method is currently 00191 // blocked wait()'ing on the condition. 00192 this->condition_.signal(); 00193 } 00194 00195 // Wait for all threads running this task (there should just be one thread) 00196 // to finish. 00197 this->wait(); 00198 }
void OpenDDS::DCPS::PerConnectionSynch::work_available | ( | ) | [virtual] |
The ThreadSynchWorker would like to have its perform_work() called from the appropriate thread once the ThreadSynchResource claims that it is_ready_for_work().
Implements OpenDDS::DCPS::ThreadSynch.
Definition at line 22 of file PerConnectionSynch.cpp.
References condition_, DBG_ENTRY_LVL, lock_, ACE_Condition< MUTEX >::signal(), and work_available_.
00023 { 00024 DBG_ENTRY_LVL("PerConnectionSynch","work_available",6); 00025 GuardType guard(this->lock_); 00026 this->work_available_ = 1; 00027 this->condition_.signal(); 00028 }
Definition at line 50 of file PerConnectionSynch.h.
Referenced by svc(), unregister_worker_i(), and work_available().
long OpenDDS::DCPS::PerConnectionSynch::dds_priority_ [private] |
Definition at line 53 of file PerConnectionSynch.h.
Referenced by open().
Definition at line 49 of file PerConnectionSynch.h.
Referenced by svc(), unregister_worker_i(), and work_available().
long OpenDDS::DCPS::PerConnectionSynch::scheduler_ [private] |
Definition at line 54 of file PerConnectionSynch.h.
Referenced by open().
int OpenDDS::DCPS::PerConnectionSynch::shutdown_ [private] |
Definition at line 52 of file PerConnectionSynch.h.
Referenced by open(), svc(), and unregister_worker_i().
int OpenDDS::DCPS::PerConnectionSynch::work_available_ [private] |
Definition at line 51 of file PerConnectionSynch.h.
Referenced by svc(), and work_available().