PerConnectionSynch.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
00009 #include "PerConnectionSynch.h"
00010 #include "dds/DCPS/debug.h"
00011 
00012 #if !defined (__ACE_INLINE__)
00013 #include "PerConnectionSynch.inl"
00014 #endif /* __ACE_INLINE__ */
00015 
00016 OpenDDS::DCPS::PerConnectionSynch::~PerConnectionSynch()
00017 {
00018   DBG_ENTRY_LVL("PerConnectionSynch","~PerConnectionSynch",6);
00019 }
00020 
00021 void
00022 OpenDDS::DCPS::PerConnectionSynch::work_available()
00023 {
00024   DBG_ENTRY_LVL("PerConnectionSynch","work_available",6);
00025   GuardType guard(this->lock_);
00026   this->work_available_ = 1;
00027   this->condition_.signal();
00028 }
00029 
00030 int
00031 OpenDDS::DCPS::PerConnectionSynch::open(void*)
00032 {
00033   DBG_ENTRY_LVL("PerConnectionSynch","open",6);
00034   // Activate this object to start a new thread that will call
00035   // our svc() method, and then our close() method.
00036   this->shutdown_ = 0;
00037 
00038   long flags;
00039   flags  = THR_NEW_LWP | THR_JOINABLE ;//|THR_SCOPE_PROCESS | THR_SCOPE_THREAD;
00040 
00041   if (this->scheduler_ >= 0) {
00042     flags |= THR_EXPLICIT_SCHED | this->scheduler_;
00043 
00044   } else {
00045     flags |= THR_INHERIT_SCHED;
00046   }
00047 
00048   if (DCPS_debug_level > 0) {
00049     ACE_DEBUG((LM_DEBUG,
00050                ACE_TEXT("(%P|%t) PerConnectionSynch::open(): ")
00051                ACE_TEXT("activating thread with flags 0x%08.8x ")
00052                ACE_TEXT("and priority %d.\n"),
00053                flags,
00054                this->dds_priority_));
00055   }
00056 
00057   return this->activate(flags, 1, 0, this->dds_priority_);
00058 }
00059 
00060 int
00061 OpenDDS::DCPS::PerConnectionSynch::svc()
00062 {
00063   DBG_ENTRY_LVL("PerConnectionSynch","svc",6);
00064 
00065   // Ignore all signals to avoid
00066   // ERROR: ACE::handle_write_ready return -1 while waiting  to unclog. handle_write_ready: Interrupted system call
00067   // The main thread will handle signals.
00068   sigset_t set;
00069   ACE_OS::sigfillset(&set);
00070   ACE_OS::thr_sigsetmask(SIG_SETMASK, &set, NULL);
00071 
00072   ThreadSynchWorker::WorkOutcome work_outcome =
00073     ThreadSynchWorker::WORK_OUTCOME_NO_MORE_TO_DO;
00074 
00075   // Loop until we honor the shutdown_ flag.
00076   while (1) {
00077     VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00078           "Top of infinite svc() loop\n"));
00079 
00080     {
00081       GuardType guard(this->lock_);
00082 
00083       VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00084             "Lock acquired.  Check to see what to do next.\n"));
00085 
00086       // We will wait on the condition_ if all of the following are true:
00087       //
00088       //   1) The last time the perform_work() method was called, it
00089       //      indicated that there was no more work to do.
00090       //   2) Since we last invoked perform_work(), we have not been
00091       //      informed that there is work_available().
00092       //   3) We have not been asked to shutdown_ the svc() loop.
00093       //
00094       while ((work_outcome ==
00095               ThreadSynchWorker::WORK_OUTCOME_NO_MORE_TO_DO) &&
00096              (this->work_available_ == 0) &&
00097              (this->shutdown_       == 0)) {
00098         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00099               "No work to do.  Just wait on the condition.\n"));
00100         this->condition_.wait();
00101         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00102               "We are awake from waiting on the condition.\n"));
00103       }
00104 
00105       // Maybe we have been asked to shutdown_ the svc() loop.
00106       if (this->shutdown_ == 1) {
00107         VDBG_LVL((LM_DEBUG,"(%P|%t) DBG:   "
00108                   "Honoring the shutdown request.\n"), 5);
00109         // We are honoring the request to shutdown_ the svc() loop.
00110         break;
00111       }
00112 
00113       // Or, perhaps we experienced a fatal error on our last call to
00114       // perform_work().
00115       if (work_outcome == ThreadSynchWorker::WORK_OUTCOME_BROKEN_RESOURCE) {
00116         VDBG_LVL((LM_DEBUG,"(%P|%t) DBG:   "
00117                   "Fatal error - Broken SynchResounce.\n"), 5);
00118         // Stop the svc() loop.
00119         break;
00120       }
00121 
00122       VDBG_LVL((LM_DEBUG,"(%P|%t) DBG:   "
00123                 "Reset our work_available_ flag to 0, and release lock.\n"), 5);
00124 
00125       // Set our work_available_ flag to false (0) before we release the
00126       // lock so that we will only count any work_available() calls that
00127       // happen after this point.
00128       this->work_available_ = 0;
00129     }
00130 
00131     if (work_outcome == ThreadSynchWorker::WORK_OUTCOME_CLOGGED_RESOURCE) {
00132       VDBG_LVL((LM_DEBUG,"(%P|%t) DBG:   Need to wait for clogged resources to open up.\n"), 5);
00133 
00134       // Ask the ThreadSynchResource to block us until the clog situation
00135       // clears up.
00136       if (this->wait_on_clogged_resource() == -1) {
00137         VDBG_LVL((LM_DEBUG,"(%P|%t) DBG:   "
00138                   "Fatal error - wait_on_clogged_resource fails.\n"), 5);
00139         break;
00140       }
00141     }
00142 
00143     VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00144           "Call perform_work()\n"));
00145 
00146     // Without the lock, ask the worker to perform some work.  It tells
00147     // us if it completed with more work to still be performed (or not).
00148     work_outcome = this->perform_work();
00149 
00150     VDBG_LVL((LM_DEBUG,"(%P|%t) DBG:   "
00151               "call to perform_work() returned %d\n",work_outcome), 5);
00152   }
00153 
00154   return 0;
00155 }
00156 
00157 int
00158 OpenDDS::DCPS::PerConnectionSynch::close(u_long)
00159 {
00160   DBG_ENTRY_LVL("PerConnectionSynch","close",6);
00161   return 0;
00162 }
00163 
00164 int
00165 OpenDDS::DCPS::PerConnectionSynch::register_worker_i()
00166 {
00167   DBG_ENTRY_LVL("PerConnectionSynch","register_worker_i",6);
00168   return this->open(0);
00169 }
00170 
00171 void
00172 OpenDDS::DCPS::PerConnectionSynch::unregister_worker_i()
00173 {
00174   DBG_ENTRY_LVL("PerConnectionSynch","unregister_worker_i",6);
00175   // It is at this point that we need to stop the thread that
00176   // was activated when our open() method was called.
00177   {
00178     // Acquire the lock
00179     GuardType guard(this->lock_);
00180 
00181     // Set the shutdown_ flag to false to shutdown the svc() method loop.
00182     this->shutdown_ = 1;
00183 
00184     // Signal the condition_ object in case the svc() method is currently
00185     // blocked wait()'ing on the condition.
00186     this->condition_.signal();
00187   }
00188 
00189   // Wait for all threads running this task (there should just be one thread)
00190   // to finish.
00191   this->wait();
00192 }

Generated on Fri Feb 12 20:05:24 2016 for OpenDDS by  doxygen 1.4.7