PerConnectionSynch.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
00009 #include "PerConnectionSynch.h"
00010 #include "dds/DCPS/debug.h"
00011 
00012 #if !defined (__ACE_INLINE__)
00013 #include "PerConnectionSynch.inl"
00014 #endif /* __ACE_INLINE__ */
00015 
00016 OpenDDS::DCPS::PerConnectionSynch::~PerConnectionSynch()
00017 {
00018   DBG_ENTRY_LVL("PerConnectionSynch","~PerConnectionSynch",6);
00019 }
00020 
00021 void
00022 OpenDDS::DCPS::PerConnectionSynch::work_available()
00023 {
00024   DBG_ENTRY_LVL("PerConnectionSynch","work_available",6);
00025   GuardType guard(this->lock_);
00026   this->work_available_ = 1;
00027   this->condition_.signal();
00028 }
00029 
00030 int
00031 OpenDDS::DCPS::PerConnectionSynch::open(void*)
00032 {
00033   DBG_ENTRY_LVL("PerConnectionSynch","open",6);
00034   // Activate this object to start a new thread that will call
00035   // our svc() method, and then our close() method.
00036   this->shutdown_ = 0;
00037 
00038   long flags;
00039   flags  = THR_NEW_LWP | THR_JOINABLE ;//|THR_SCOPE_PROCESS | THR_SCOPE_THREAD;
00040 
00041   if (this->scheduler_ >= 0) {
00042     flags |= THR_EXPLICIT_SCHED | this->scheduler_;
00043 
00044   } else {
00045     flags |= THR_INHERIT_SCHED;
00046   }
00047 
00048   if (DCPS_debug_level > 0) {
00049     ACE_DEBUG((LM_DEBUG,
00050                ACE_TEXT("(%P|%t) PerConnectionSynch::open(): ")
00051                ACE_TEXT("activating thread with flags 0x%08.8x ")
00052                ACE_TEXT("and priority %d.\n"),
00053                flags,
00054                this->dds_priority_));
00055   }
00056 
00057   return this->activate(flags, 1, 0, this->dds_priority_);
00058 }
00059 
00060 int
00061 OpenDDS::DCPS::PerConnectionSynch::svc()
00062 {
00063   DBG_ENTRY_LVL("PerConnectionSynch","svc",6);
00064 
00065   // Ignore all signals to avoid
00066   // ERROR: ACE::handle_write_ready return -1 while waiting  to unclog. handle_write_ready: Interrupted system call
00067   // The main thread will handle signals.
00068   sigset_t set;
00069   ACE_OS::sigfillset(&set);
00070   ACE_OS::thr_sigsetmask(SIG_SETMASK, &set, NULL);
00071 
00072   ThreadSynchWorker::WorkOutcome work_outcome =
00073     ThreadSynchWorker::WORK_OUTCOME_NO_MORE_TO_DO;
00074 
00075 
00076   RcHandle<ThreadSynchWorker> worker = this->worker().lock();
00077 
00078   if (! worker)
00079     return ThreadSynchWorker::WORK_OUTCOME_NO_MORE_TO_DO;
00080 
00081   // Loop until we honor the shutdown_ flag.
00082   while (1) {
00083     VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00084           "Top of infinite svc() loop\n"));
00085 
00086     {
00087       GuardType guard(this->lock_);
00088 
00089       VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00090             "Lock acquired.  Check to see what to do next.\n"));
00091 
00092       // We will wait on the condition_ if all of the following are true:
00093       //
00094       //   1) The last time the perform_work() method was called, it
00095       //      indicated that there was no more work to do.
00096       //   2) Since we last invoked perform_work(), we have not been
00097       //      informed that there is work_available().
00098       //   3) We have not been asked to shutdown_ the svc() loop.
00099       //
00100       while ((work_outcome ==
00101               ThreadSynchWorker::WORK_OUTCOME_NO_MORE_TO_DO) &&
00102              (this->work_available_ == 0) &&
00103              (this->shutdown_       == 0)) {
00104         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00105               "No work to do.  Just wait on the condition.\n"));
00106         this->condition_.wait();
00107         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00108               "We are awake from waiting on the condition.\n"));
00109       }
00110 
00111       // Maybe we have been asked to shutdown_ the svc() loop.
00112       if (this->shutdown_ == 1) {
00113         VDBG_LVL((LM_DEBUG,"(%P|%t) DBG:   "
00114                   "Honoring the shutdown request.\n"), 5);
00115         // We are honoring the request to shutdown_ the svc() loop.
00116         break;
00117       }
00118 
00119       // Or, perhaps we experienced a fatal error on our last call to
00120       // perform_work().
00121       if (work_outcome == ThreadSynchWorker::WORK_OUTCOME_BROKEN_RESOURCE) {
00122         VDBG_LVL((LM_DEBUG,"(%P|%t) DBG:   "
00123                   "Fatal error - Broken SynchResounce.\n"), 5);
00124         // Stop the svc() loop.
00125         break;
00126       }
00127 
00128       VDBG_LVL((LM_DEBUG,"(%P|%t) DBG:   "
00129                 "Reset our work_available_ flag to 0, and release lock.\n"), 5);
00130 
00131       // Set our work_available_ flag to false (0) before we release the
00132       // lock so that we will only count any work_available() calls that
00133       // happen after this point.
00134       this->work_available_ = 0;
00135     }
00136 
00137     if (work_outcome == ThreadSynchWorker::WORK_OUTCOME_CLOGGED_RESOURCE) {
00138       VDBG_LVL((LM_DEBUG,"(%P|%t) DBG:   Need to wait for clogged resources to open up.\n"), 5);
00139 
00140       // Ask the ThreadSynchResource to block us until the clog situation
00141       // clears up.
00142       if (this->wait_on_clogged_resource() == -1) {
00143         VDBG_LVL((LM_DEBUG,"(%P|%t) DBG:   "
00144                   "Fatal error - wait_on_clogged_resource fails.\n"), 5);
00145         break;
00146       }
00147     }
00148 
00149     VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00150           "Call perform_work()\n"));
00151 
00152     // Without the lock, ask the worker to perform some work.  It tells
00153     // us if it completed with more work to still be performed (or not).
00154     work_outcome = worker->perform_work();
00155 
00156     VDBG_LVL((LM_DEBUG,"(%P|%t) DBG:   "
00157               "call to perform_work() returned %d\n",work_outcome), 5);
00158   }
00159 
00160   return 0;
00161 }
00162 
00163 int
00164 OpenDDS::DCPS::PerConnectionSynch::close(u_long)
00165 {
00166   DBG_ENTRY_LVL("PerConnectionSynch","close",6);
00167   return 0;
00168 }
00169 
00170 int
00171 OpenDDS::DCPS::PerConnectionSynch::register_worker_i()
00172 {
00173   DBG_ENTRY_LVL("PerConnectionSynch","register_worker_i",6);
00174   return this->open(0);
00175 }
00176 
00177 void
00178 OpenDDS::DCPS::PerConnectionSynch::unregister_worker_i()
00179 {
00180   DBG_ENTRY_LVL("PerConnectionSynch","unregister_worker_i",6);
00181   // It is at this point that we need to stop the thread that
00182   // was activated when our open() method was called.
00183   {
00184     // Acquire the lock
00185     GuardType guard(this->lock_);
00186 
00187     // Set the shutdown_ flag to false to shutdown the svc() method loop.
00188     this->shutdown_ = 1;
00189 
00190     // Signal the condition_ object in case the svc() method is currently
00191     // blocked wait()'ing on the condition.
00192     this->condition_.signal();
00193   }
00194 
00195   // Wait for all threads running this task (there should just be one thread)
00196   // to finish.
00197   this->wait();
00198 }
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1