#include <PerConnectionSynch.h>
Inheritance diagram for OpenDDS::DCPS::PerConnectionSynch:
Public Member Functions | |
PerConnectionSynch (ThreadSynchResource *synch_resource, long priority, int scheduler) | |
virtual | ~PerConnectionSynch () |
virtual void | work_available () |
virtual int | open (void *) |
virtual int | svc () |
virtual int | close (u_long) |
Protected Member Functions | |
virtual int | register_worker_i () |
virtual void | unregister_worker_i () |
Private Types | |
typedef ACE_SYNCH_MUTEX | LockType |
typedef ACE_Guard< LockType > | GuardType |
typedef ACE_Condition< LockType > | ConditionType |
Private Attributes | |
LockType | lock_ |
ConditionType | condition_ |
int | work_available_ |
int | shutdown_ |
long | dds_priority_ |
long | scheduler_ |
Definition at line 19 of file PerConnectionSynch.h.
typedef ACE_Condition<LockType> OpenDDS::DCPS::PerConnectionSynch::ConditionType [private] |
Definition at line 43 of file PerConnectionSynch.h.
typedef ACE_Guard<LockType> OpenDDS::DCPS::PerConnectionSynch::GuardType [private] |
Definition at line 42 of file PerConnectionSynch.h.
typedef ACE_SYNCH_MUTEX OpenDDS::DCPS::PerConnectionSynch::LockType [private] |
Definition at line 41 of file PerConnectionSynch.h.
ACE_INLINE OpenDDS::DCPS::PerConnectionSynch::PerConnectionSynch | ( | ThreadSynchResource * | synch_resource, | |
long | priority, | |||
int | scheduler | |||
) |
Definition at line 11 of file PerConnectionSynch.inl.
References DBG_ENTRY_LVL.
00015 : ThreadSynch(synch_resource), 00016 condition_(this->lock_), 00017 work_available_(0), 00018 shutdown_(0), 00019 dds_priority_(priority), 00020 scheduler_(scheduler) 00021 { 00022 DBG_ENTRY_LVL("PerConnectionSynch","PerConnectionSynch",6); 00023 }
OpenDDS::DCPS::PerConnectionSynch::~PerConnectionSynch | ( | ) | [virtual] |
Definition at line 16 of file PerConnectionSynch.cpp.
References DBG_ENTRY_LVL.
00017 { 00018 DBG_ENTRY_LVL("PerConnectionSynch","~PerConnectionSynch",6); 00019 }
int OpenDDS::DCPS::PerConnectionSynch::close | ( | u_long | ) | [virtual] |
Definition at line 158 of file PerConnectionSynch.cpp.
References DBG_ENTRY_LVL.
00159 { 00160 DBG_ENTRY_LVL("PerConnectionSynch","close",6); 00161 return 0; 00162 }
int OpenDDS::DCPS::PerConnectionSynch::open | ( | void * | ) | [virtual] |
Definition at line 31 of file PerConnectionSynch.cpp.
References DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, scheduler_, and shutdown_.
Referenced by register_worker_i().
00032 { 00033 DBG_ENTRY_LVL("PerConnectionSynch","open",6); 00034 // Activate this object to start a new thread that will call 00035 // our svc() method, and then our close() method. 00036 this->shutdown_ = 0; 00037 00038 long flags; 00039 flags = THR_NEW_LWP | THR_JOINABLE ;//|THR_SCOPE_PROCESS | THR_SCOPE_THREAD; 00040 00041 if (this->scheduler_ >= 0) { 00042 flags |= THR_EXPLICIT_SCHED | this->scheduler_; 00043 00044 } else { 00045 flags |= THR_INHERIT_SCHED; 00046 } 00047 00048 if (DCPS_debug_level > 0) { 00049 ACE_DEBUG((LM_DEBUG, 00050 ACE_TEXT("(%P|%t) PerConnectionSynch::open(): ") 00051 ACE_TEXT("activating thread with flags 0x%08.8x ") 00052 ACE_TEXT("and priority %d.\n"), 00053 flags, 00054 this->dds_priority_)); 00055 } 00056 00057 return this->activate(flags, 1, 0, this->dds_priority_); 00058 }
int OpenDDS::DCPS::PerConnectionSynch::register_worker_i | ( | ) | [protected, virtual] |
The default implementation is to do nothing here. The subclass may override the implementation in order to do something when the worker registers. Returns 0 for success, -1 for failure.
Reimplemented from OpenDDS::DCPS::ThreadSynch.
Definition at line 165 of file PerConnectionSynch.cpp.
References DBG_ENTRY_LVL, and open().
00166 { 00167 DBG_ENTRY_LVL("PerConnectionSynch","register_worker_i",6); 00168 return this->open(0); 00169 }
int OpenDDS::DCPS::PerConnectionSynch::svc | ( | ) | [virtual] |
Definition at line 61 of file PerConnectionSynch.cpp.
References condition_, DBG_ENTRY_LVL, OpenDDS::DCPS::ThreadSynch::perform_work(), VDBG, VDBG_LVL, work_available_, OpenDDS::DCPS::ThreadSynchWorker::WORK_OUTCOME_BROKEN_RESOURCE, OpenDDS::DCPS::ThreadSynchWorker::WORK_OUTCOME_CLOGGED_RESOURCE, and OpenDDS::DCPS::ThreadSynchWorker::WORK_OUTCOME_NO_MORE_TO_DO.
00062 { 00063 DBG_ENTRY_LVL("PerConnectionSynch","svc",6); 00064 00065 // Ignore all signals to avoid 00066 // ERROR: ACE::handle_write_ready return -1 while waiting to unclog. handle_write_ready: Interrupted system call 00067 // The main thread will handle signals. 00068 sigset_t set; 00069 ACE_OS::sigfillset(&set); 00070 ACE_OS::thr_sigsetmask(SIG_SETMASK, &set, NULL); 00071 00072 ThreadSynchWorker::WorkOutcome work_outcome = 00073 ThreadSynchWorker::WORK_OUTCOME_NO_MORE_TO_DO; 00074 00075 // Loop until we honor the shutdown_ flag. 00076 while (1) { 00077 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00078 "Top of infinite svc() loop\n")); 00079 00080 { 00081 GuardType guard(this->lock_); 00082 00083 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00084 "Lock acquired. Check to see what to do next.\n")); 00085 00086 // We will wait on the condition_ if all of the following are true: 00087 // 00088 // 1) The last time the perform_work() method was called, it 00089 // indicated that there was no more work to do. 00090 // 2) Since we last invoked perform_work(), we have not been 00091 // informed that there is work_available(). 00092 // 3) We have not been asked to shutdown_ the svc() loop. 00093 // 00094 while ((work_outcome == 00095 ThreadSynchWorker::WORK_OUTCOME_NO_MORE_TO_DO) && 00096 (this->work_available_ == 0) && 00097 (this->shutdown_ == 0)) { 00098 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00099 "No work to do. Just wait on the condition.\n")); 00100 this->condition_.wait(); 00101 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00102 "We are awake from waiting on the condition.\n")); 00103 } 00104 00105 // Maybe we have been asked to shutdown_ the svc() loop. 00106 if (this->shutdown_ == 1) { 00107 VDBG_LVL((LM_DEBUG,"(%P|%t) DBG: " 00108 "Honoring the shutdown request.\n"), 5); 00109 // We are honoring the request to shutdown_ the svc() loop. 00110 break; 00111 } 00112 00113 // Or, perhaps we experienced a fatal error on our last call to 00114 // perform_work(). 00115 if (work_outcome == ThreadSynchWorker::WORK_OUTCOME_BROKEN_RESOURCE) { 00116 VDBG_LVL((LM_DEBUG,"(%P|%t) DBG: " 00117 "Fatal error - Broken SynchResounce.\n"), 5); 00118 // Stop the svc() loop. 00119 break; 00120 } 00121 00122 VDBG_LVL((LM_DEBUG,"(%P|%t) DBG: " 00123 "Reset our work_available_ flag to 0, and release lock.\n"), 5); 00124 00125 // Set our work_available_ flag to false (0) before we release the 00126 // lock so that we will only count any work_available() calls that 00127 // happen after this point. 00128 this->work_available_ = 0; 00129 } 00130 00131 if (work_outcome == ThreadSynchWorker::WORK_OUTCOME_CLOGGED_RESOURCE) { 00132 VDBG_LVL((LM_DEBUG,"(%P|%t) DBG: Need to wait for clogged resources to open up.\n"), 5); 00133 00134 // Ask the ThreadSynchResource to block us until the clog situation 00135 // clears up. 00136 if (this->wait_on_clogged_resource() == -1) { 00137 VDBG_LVL((LM_DEBUG,"(%P|%t) DBG: " 00138 "Fatal error - wait_on_clogged_resource fails.\n"), 5); 00139 break; 00140 } 00141 } 00142 00143 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00144 "Call perform_work()\n")); 00145 00146 // Without the lock, ask the worker to perform some work. It tells 00147 // us if it completed with more work to still be performed (or not). 00148 work_outcome = this->perform_work(); 00149 00150 VDBG_LVL((LM_DEBUG,"(%P|%t) DBG: " 00151 "call to perform_work() returned %d\n",work_outcome), 5); 00152 } 00153 00154 return 0; 00155 }
void OpenDDS::DCPS::PerConnectionSynch::unregister_worker_i | ( | ) | [protected, virtual] |
The default implementation is to do nothing here. The subclass may override the implementation in order to do something when the worker unregisters.
Reimplemented from OpenDDS::DCPS::ThreadSynch.
Definition at line 172 of file PerConnectionSynch.cpp.
References condition_, DBG_ENTRY_LVL, and shutdown_.
00173 { 00174 DBG_ENTRY_LVL("PerConnectionSynch","unregister_worker_i",6); 00175 // It is at this point that we need to stop the thread that 00176 // was activated when our open() method was called. 00177 { 00178 // Acquire the lock 00179 GuardType guard(this->lock_); 00180 00181 // Set the shutdown_ flag to false to shutdown the svc() method loop. 00182 this->shutdown_ = 1; 00183 00184 // Signal the condition_ object in case the svc() method is currently 00185 // blocked wait()'ing on the condition. 00186 this->condition_.signal(); 00187 } 00188 00189 // Wait for all threads running this task (there should just be one thread) 00190 // to finish. 00191 this->wait(); 00192 }
void OpenDDS::DCPS::PerConnectionSynch::work_available | ( | ) | [virtual] |
The ThreadSynchWorker would like to have its perform_work() called from the appropriate thread once the ThreadSynchResource claims that it is_ready_for_work().
Implements OpenDDS::DCPS::ThreadSynch.
Definition at line 22 of file PerConnectionSynch.cpp.
References condition_, DBG_ENTRY_LVL, and work_available_.
00023 { 00024 DBG_ENTRY_LVL("PerConnectionSynch","work_available",6); 00025 GuardType guard(this->lock_); 00026 this->work_available_ = 1; 00027 this->condition_.signal(); 00028 }
Definition at line 46 of file PerConnectionSynch.h.
Referenced by svc(), unregister_worker_i(), and work_available().
long OpenDDS::DCPS::PerConnectionSynch::dds_priority_ [private] |
Definition at line 49 of file PerConnectionSynch.h.
Definition at line 45 of file PerConnectionSynch.h.
long OpenDDS::DCPS::PerConnectionSynch::scheduler_ [private] |
int OpenDDS::DCPS::PerConnectionSynch::shutdown_ [private] |
Definition at line 48 of file PerConnectionSynch.h.
Referenced by open(), and unregister_worker_i().
int OpenDDS::DCPS::PerConnectionSynch::work_available_ [private] |