00001
00002
00003
00004
00005
00006
00007
00008 #include "DCPS/DdsDcps_pch.h"
00009 #include "PerConnectionSynch.h"
00010 #include "dds/DCPS/debug.h"
00011
00012 #if !defined (__ACE_INLINE__)
00013 #include "PerConnectionSynch.inl"
00014 #endif
00015
00016 OpenDDS::DCPS::PerConnectionSynch::~PerConnectionSynch()
00017 {
00018 DBG_ENTRY_LVL("PerConnectionSynch","~PerConnectionSynch",6);
00019 }
00020
00021 void
00022 OpenDDS::DCPS::PerConnectionSynch::work_available()
00023 {
00024 DBG_ENTRY_LVL("PerConnectionSynch","work_available",6);
00025 GuardType guard(this->lock_);
00026 this->work_available_ = 1;
00027 this->condition_.signal();
00028 }
00029
00030 int
00031 OpenDDS::DCPS::PerConnectionSynch::open(void*)
00032 {
00033 DBG_ENTRY_LVL("PerConnectionSynch","open",6);
00034
00035
00036 this->shutdown_ = 0;
00037
00038 long flags;
00039 flags = THR_NEW_LWP | THR_JOINABLE ;
00040
00041 if (this->scheduler_ >= 0) {
00042 flags |= THR_EXPLICIT_SCHED | this->scheduler_;
00043
00044 } else {
00045 flags |= THR_INHERIT_SCHED;
00046 }
00047
00048 if (DCPS_debug_level > 0) {
00049 ACE_DEBUG((LM_DEBUG,
00050 ACE_TEXT("(%P|%t) PerConnectionSynch::open(): ")
00051 ACE_TEXT("activating thread with flags 0x%08.8x ")
00052 ACE_TEXT("and priority %d.\n"),
00053 flags,
00054 this->dds_priority_));
00055 }
00056
00057 return this->activate(flags, 1, 0, this->dds_priority_);
00058 }
00059
00060 int
00061 OpenDDS::DCPS::PerConnectionSynch::svc()
00062 {
00063 DBG_ENTRY_LVL("PerConnectionSynch","svc",6);
00064
00065
00066
00067
00068 sigset_t set;
00069 ACE_OS::sigfillset(&set);
00070 ACE_OS::thr_sigsetmask(SIG_SETMASK, &set, NULL);
00071
00072 ThreadSynchWorker::WorkOutcome work_outcome =
00073 ThreadSynchWorker::WORK_OUTCOME_NO_MORE_TO_DO;
00074
00075
00076 while (1) {
00077 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00078 "Top of infinite svc() loop\n"));
00079
00080 {
00081 GuardType guard(this->lock_);
00082
00083 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00084 "Lock acquired. Check to see what to do next.\n"));
00085
00086
00087
00088
00089
00090
00091
00092
00093
00094 while ((work_outcome ==
00095 ThreadSynchWorker::WORK_OUTCOME_NO_MORE_TO_DO) &&
00096 (this->work_available_ == 0) &&
00097 (this->shutdown_ == 0)) {
00098 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00099 "No work to do. Just wait on the condition.\n"));
00100 this->condition_.wait();
00101 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00102 "We are awake from waiting on the condition.\n"));
00103 }
00104
00105
00106 if (this->shutdown_ == 1) {
00107 VDBG_LVL((LM_DEBUG,"(%P|%t) DBG: "
00108 "Honoring the shutdown request.\n"), 5);
00109
00110 break;
00111 }
00112
00113
00114
00115 if (work_outcome == ThreadSynchWorker::WORK_OUTCOME_BROKEN_RESOURCE) {
00116 VDBG_LVL((LM_DEBUG,"(%P|%t) DBG: "
00117 "Fatal error - Broken SynchResounce.\n"), 5);
00118
00119 break;
00120 }
00121
00122 VDBG_LVL((LM_DEBUG,"(%P|%t) DBG: "
00123 "Reset our work_available_ flag to 0, and release lock.\n"), 5);
00124
00125
00126
00127
00128 this->work_available_ = 0;
00129 }
00130
00131 if (work_outcome == ThreadSynchWorker::WORK_OUTCOME_CLOGGED_RESOURCE) {
00132 VDBG_LVL((LM_DEBUG,"(%P|%t) DBG: Need to wait for clogged resources to open up.\n"), 5);
00133
00134
00135
00136 if (this->wait_on_clogged_resource() == -1) {
00137 VDBG_LVL((LM_DEBUG,"(%P|%t) DBG: "
00138 "Fatal error - wait_on_clogged_resource fails.\n"), 5);
00139 break;
00140 }
00141 }
00142
00143 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00144 "Call perform_work()\n"));
00145
00146
00147
00148 work_outcome = this->perform_work();
00149
00150 VDBG_LVL((LM_DEBUG,"(%P|%t) DBG: "
00151 "call to perform_work() returned %d\n",work_outcome), 5);
00152 }
00153
00154 return 0;
00155 }
00156
00157 int
00158 OpenDDS::DCPS::PerConnectionSynch::close(u_long)
00159 {
00160 DBG_ENTRY_LVL("PerConnectionSynch","close",6);
00161 return 0;
00162 }
00163
00164 int
00165 OpenDDS::DCPS::PerConnectionSynch::register_worker_i()
00166 {
00167 DBG_ENTRY_LVL("PerConnectionSynch","register_worker_i",6);
00168 return this->open(0);
00169 }
00170
00171 void
00172 OpenDDS::DCPS::PerConnectionSynch::unregister_worker_i()
00173 {
00174 DBG_ENTRY_LVL("PerConnectionSynch","unregister_worker_i",6);
00175
00176
00177 {
00178
00179 GuardType guard(this->lock_);
00180
00181
00182 this->shutdown_ = 1;
00183
00184
00185
00186 this->condition_.signal();
00187 }
00188
00189
00190
00191 this->wait();
00192 }