PerConnectionSynch.cpp
Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008 #include "DCPS/DdsDcps_pch.h"
00009 #include "PerConnectionSynch.h"
00010 #include "dds/DCPS/debug.h"
00011
00012 #if !defined (__ACE_INLINE__)
00013 #include "PerConnectionSynch.inl"
00014 #endif
00015
00016 OpenDDS::DCPS::PerConnectionSynch::~PerConnectionSynch()
00017 {
00018 DBG_ENTRY_LVL("PerConnectionSynch","~PerConnectionSynch",6);
00019 }
00020
00021 void
00022 OpenDDS::DCPS::PerConnectionSynch::work_available()
00023 {
00024 DBG_ENTRY_LVL("PerConnectionSynch","work_available",6);
00025 GuardType guard(this->lock_);
00026 this->work_available_ = 1;
00027 this->condition_.signal();
00028 }
00029
00030 int
00031 OpenDDS::DCPS::PerConnectionSynch::open(void*)
00032 {
00033 DBG_ENTRY_LVL("PerConnectionSynch","open",6);
00034
00035
00036 this->shutdown_ = 0;
00037
00038 long flags;
00039 flags = THR_NEW_LWP | THR_JOINABLE ;
00040
00041 if (this->scheduler_ >= 0) {
00042 flags |= THR_EXPLICIT_SCHED | this->scheduler_;
00043
00044 } else {
00045 flags |= THR_INHERIT_SCHED;
00046 }
00047
00048 if (DCPS_debug_level > 0) {
00049 ACE_DEBUG((LM_DEBUG,
00050 ACE_TEXT("(%P|%t) PerConnectionSynch::open(): ")
00051 ACE_TEXT("activating thread with flags 0x%08.8x ")
00052 ACE_TEXT("and priority %d.\n"),
00053 flags,
00054 this->dds_priority_));
00055 }
00056
00057 return this->activate(flags, 1, 0, this->dds_priority_);
00058 }
00059
00060 int
00061 OpenDDS::DCPS::PerConnectionSynch::svc()
00062 {
00063 DBG_ENTRY_LVL("PerConnectionSynch","svc",6);
00064
00065
00066
00067
00068 sigset_t set;
00069 ACE_OS::sigfillset(&set);
00070 ACE_OS::thr_sigsetmask(SIG_SETMASK, &set, NULL);
00071
00072 ThreadSynchWorker::WorkOutcome work_outcome =
00073 ThreadSynchWorker::WORK_OUTCOME_NO_MORE_TO_DO;
00074
00075
00076 RcHandle<ThreadSynchWorker> worker = this->worker().lock();
00077
00078 if (! worker)
00079 return ThreadSynchWorker::WORK_OUTCOME_NO_MORE_TO_DO;
00080
00081
00082 while (1) {
00083 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00084 "Top of infinite svc() loop\n"));
00085
00086 {
00087 GuardType guard(this->lock_);
00088
00089 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00090 "Lock acquired. Check to see what to do next.\n"));
00091
00092
00093
00094
00095
00096
00097
00098
00099
00100 while ((work_outcome ==
00101 ThreadSynchWorker::WORK_OUTCOME_NO_MORE_TO_DO) &&
00102 (this->work_available_ == 0) &&
00103 (this->shutdown_ == 0)) {
00104 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00105 "No work to do. Just wait on the condition.\n"));
00106 this->condition_.wait();
00107 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00108 "We are awake from waiting on the condition.\n"));
00109 }
00110
00111
00112 if (this->shutdown_ == 1) {
00113 VDBG_LVL((LM_DEBUG,"(%P|%t) DBG: "
00114 "Honoring the shutdown request.\n"), 5);
00115
00116 break;
00117 }
00118
00119
00120
00121 if (work_outcome == ThreadSynchWorker::WORK_OUTCOME_BROKEN_RESOURCE) {
00122 VDBG_LVL((LM_DEBUG,"(%P|%t) DBG: "
00123 "Fatal error - Broken SynchResounce.\n"), 5);
00124
00125 break;
00126 }
00127
00128 VDBG_LVL((LM_DEBUG,"(%P|%t) DBG: "
00129 "Reset our work_available_ flag to 0, and release lock.\n"), 5);
00130
00131
00132
00133
00134 this->work_available_ = 0;
00135 }
00136
00137 if (work_outcome == ThreadSynchWorker::WORK_OUTCOME_CLOGGED_RESOURCE) {
00138 VDBG_LVL((LM_DEBUG,"(%P|%t) DBG: Need to wait for clogged resources to open up.\n"), 5);
00139
00140
00141
00142 if (this->wait_on_clogged_resource() == -1) {
00143 VDBG_LVL((LM_DEBUG,"(%P|%t) DBG: "
00144 "Fatal error - wait_on_clogged_resource fails.\n"), 5);
00145 break;
00146 }
00147 }
00148
00149 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00150 "Call perform_work()\n"));
00151
00152
00153
00154 work_outcome = worker->perform_work();
00155
00156 VDBG_LVL((LM_DEBUG,"(%P|%t) DBG: "
00157 "call to perform_work() returned %d\n",work_outcome), 5);
00158 }
00159
00160 return 0;
00161 }
00162
00163 int
00164 OpenDDS::DCPS::PerConnectionSynch::close(u_long)
00165 {
00166 DBG_ENTRY_LVL("PerConnectionSynch","close",6);
00167 return 0;
00168 }
00169
00170 int
00171 OpenDDS::DCPS::PerConnectionSynch::register_worker_i()
00172 {
00173 DBG_ENTRY_LVL("PerConnectionSynch","register_worker_i",6);
00174 return this->open(0);
00175 }
00176
00177 void
00178 OpenDDS::DCPS::PerConnectionSynch::unregister_worker_i()
00179 {
00180 DBG_ENTRY_LVL("PerConnectionSynch","unregister_worker_i",6);
00181
00182
00183 {
00184
00185 GuardType guard(this->lock_);
00186
00187
00188 this->shutdown_ = 1;
00189
00190
00191
00192 this->condition_.signal();
00193 }
00194
00195
00196
00197 this->wait();
00198 }