OpenDDS::DCPS::PerConnectionSynch Class Reference

#include <PerConnectionSynch.h>

Inheritance diagram for OpenDDS::DCPS::PerConnectionSynch:
Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::PerConnectionSynch:
Collaboration graph
[legend]

List of all members.

Public Member Functions

 PerConnectionSynch (ThreadSynchResource *synch_resource, long priority, int scheduler)
virtual ~PerConnectionSynch ()
virtual void work_available ()
virtual int open (void *)
virtual int svc ()
virtual int close (u_long)

Protected Member Functions

virtual int register_worker_i ()
virtual void unregister_worker_i ()

Private Types

typedef ACE_SYNCH_MUTEX LockType
typedef ACE_Guard< LockTypeGuardType
typedef ACE_Condition< LockTypeConditionType

Private Attributes

LockType lock_
ConditionType condition_
int work_available_
int shutdown_
long dds_priority_
long scheduler_

Detailed Description

Definition at line 23 of file PerConnectionSynch.h.


Member Typedef Documentation

Definition at line 47 of file PerConnectionSynch.h.

Definition at line 46 of file PerConnectionSynch.h.

typedef ACE_SYNCH_MUTEX OpenDDS::DCPS::PerConnectionSynch::LockType [private]

Definition at line 45 of file PerConnectionSynch.h.


Constructor & Destructor Documentation

ACE_INLINE OpenDDS::DCPS::PerConnectionSynch::PerConnectionSynch ( ThreadSynchResource synch_resource,
long  priority,
int  scheduler 
)

Definition at line 11 of file PerConnectionSynch.inl.

References DBG_ENTRY_LVL.

00015   : ThreadSynch(synch_resource),
00016     condition_(this->lock_),
00017     work_available_(0),
00018     shutdown_(0),
00019     dds_priority_(priority),
00020     scheduler_(scheduler)
00021 {
00022   DBG_ENTRY_LVL("PerConnectionSynch","PerConnectionSynch",6);
00023 }

OpenDDS::DCPS::PerConnectionSynch::~PerConnectionSynch (  )  [virtual]

Definition at line 16 of file PerConnectionSynch.cpp.

References DBG_ENTRY_LVL.

00017 {
00018   DBG_ENTRY_LVL("PerConnectionSynch","~PerConnectionSynch",6);
00019 }


Member Function Documentation

int OpenDDS::DCPS::PerConnectionSynch::close ( u_long   )  [virtual]

Reimplemented from ACE_Task_Base.

Definition at line 164 of file PerConnectionSynch.cpp.

References DBG_ENTRY_LVL.

00165 {
00166   DBG_ENTRY_LVL("PerConnectionSynch","close",6);
00167   return 0;
00168 }

int OpenDDS::DCPS::PerConnectionSynch::open ( void *   )  [virtual]

Reimplemented from ACE_Task_Base.

Definition at line 31 of file PerConnectionSynch.cpp.

References ACE_TEXT(), ACE_Task_Base::activate(), DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, dds_priority_, LM_DEBUG, scheduler_, and shutdown_.

Referenced by register_worker_i().

00032 {
00033   DBG_ENTRY_LVL("PerConnectionSynch","open",6);
00034   // Activate this object to start a new thread that will call
00035   // our svc() method, and then our close() method.
00036   this->shutdown_ = 0;
00037 
00038   long flags;
00039   flags  = THR_NEW_LWP | THR_JOINABLE ;//|THR_SCOPE_PROCESS | THR_SCOPE_THREAD;
00040 
00041   if (this->scheduler_ >= 0) {
00042     flags |= THR_EXPLICIT_SCHED | this->scheduler_;
00043 
00044   } else {
00045     flags |= THR_INHERIT_SCHED;
00046   }
00047 
00048   if (DCPS_debug_level > 0) {
00049     ACE_DEBUG((LM_DEBUG,
00050                ACE_TEXT("(%P|%t) PerConnectionSynch::open(): ")
00051                ACE_TEXT("activating thread with flags 0x%08.8x ")
00052                ACE_TEXT("and priority %d.\n"),
00053                flags,
00054                this->dds_priority_));
00055   }
00056 
00057   return this->activate(flags, 1, 0, this->dds_priority_);
00058 }

Here is the call graph for this function:

Here is the caller graph for this function:

int OpenDDS::DCPS::PerConnectionSynch::register_worker_i (  )  [protected, virtual]

The default implementation is to do nothing here. The subclass may override the implementation in order to do something when the worker registers. Returns 0 for success, -1 for failure.

Reimplemented from OpenDDS::DCPS::ThreadSynch.

Definition at line 171 of file PerConnectionSynch.cpp.

References DBG_ENTRY_LVL, and open().

00172 {
00173   DBG_ENTRY_LVL("PerConnectionSynch","register_worker_i",6);
00174   return this->open(0);
00175 }

Here is the call graph for this function:

int OpenDDS::DCPS::PerConnectionSynch::svc ( void   )  [virtual]

Reimplemented from ACE_Task_Base.

Definition at line 61 of file PerConnectionSynch.cpp.

References condition_, DBG_ENTRY_LVL, LM_DEBUG, OpenDDS::DCPS::WeakRcHandle< T >::lock(), lock_, shutdown_, SIG_SETMASK, ACE_OS::sigfillset(), sigset_t, ACE_OS::thr_sigsetmask(), VDBG, VDBG_LVL, ACE_Condition< MUTEX >::wait(), OpenDDS::DCPS::ThreadSynch::wait_on_clogged_resource(), work_available_, OpenDDS::DCPS::ThreadSynchWorker::WORK_OUTCOME_BROKEN_RESOURCE, OpenDDS::DCPS::ThreadSynchWorker::WORK_OUTCOME_CLOGGED_RESOURCE, OpenDDS::DCPS::ThreadSynchWorker::WORK_OUTCOME_NO_MORE_TO_DO, and OpenDDS::DCPS::ThreadSynch::worker().

00062 {
00063   DBG_ENTRY_LVL("PerConnectionSynch","svc",6);
00064 
00065   // Ignore all signals to avoid
00066   // ERROR: ACE::handle_write_ready return -1 while waiting  to unclog. handle_write_ready: Interrupted system call
00067   // The main thread will handle signals.
00068   sigset_t set;
00069   ACE_OS::sigfillset(&set);
00070   ACE_OS::thr_sigsetmask(SIG_SETMASK, &set, NULL);
00071 
00072   ThreadSynchWorker::WorkOutcome work_outcome =
00073     ThreadSynchWorker::WORK_OUTCOME_NO_MORE_TO_DO;
00074 
00075 
00076   RcHandle<ThreadSynchWorker> worker = this->worker().lock();
00077 
00078   if (! worker)
00079     return ThreadSynchWorker::WORK_OUTCOME_NO_MORE_TO_DO;
00080 
00081   // Loop until we honor the shutdown_ flag.
00082   while (1) {
00083     VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00084           "Top of infinite svc() loop\n"));
00085 
00086     {
00087       GuardType guard(this->lock_);
00088 
00089       VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00090             "Lock acquired.  Check to see what to do next.\n"));
00091 
00092       // We will wait on the condition_ if all of the following are true:
00093       //
00094       //   1) The last time the perform_work() method was called, it
00095       //      indicated that there was no more work to do.
00096       //   2) Since we last invoked perform_work(), we have not been
00097       //      informed that there is work_available().
00098       //   3) We have not been asked to shutdown_ the svc() loop.
00099       //
00100       while ((work_outcome ==
00101               ThreadSynchWorker::WORK_OUTCOME_NO_MORE_TO_DO) &&
00102              (this->work_available_ == 0) &&
00103              (this->shutdown_       == 0)) {
00104         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00105               "No work to do.  Just wait on the condition.\n"));
00106         this->condition_.wait();
00107         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00108               "We are awake from waiting on the condition.\n"));
00109       }
00110 
00111       // Maybe we have been asked to shutdown_ the svc() loop.
00112       if (this->shutdown_ == 1) {
00113         VDBG_LVL((LM_DEBUG,"(%P|%t) DBG:   "
00114                   "Honoring the shutdown request.\n"), 5);
00115         // We are honoring the request to shutdown_ the svc() loop.
00116         break;
00117       }
00118 
00119       // Or, perhaps we experienced a fatal error on our last call to
00120       // perform_work().
00121       if (work_outcome == ThreadSynchWorker::WORK_OUTCOME_BROKEN_RESOURCE) {
00122         VDBG_LVL((LM_DEBUG,"(%P|%t) DBG:   "
00123                   "Fatal error - Broken SynchResounce.\n"), 5);
00124         // Stop the svc() loop.
00125         break;
00126       }
00127 
00128       VDBG_LVL((LM_DEBUG,"(%P|%t) DBG:   "
00129                 "Reset our work_available_ flag to 0, and release lock.\n"), 5);
00130 
00131       // Set our work_available_ flag to false (0) before we release the
00132       // lock so that we will only count any work_available() calls that
00133       // happen after this point.
00134       this->work_available_ = 0;
00135     }
00136 
00137     if (work_outcome == ThreadSynchWorker::WORK_OUTCOME_CLOGGED_RESOURCE) {
00138       VDBG_LVL((LM_DEBUG,"(%P|%t) DBG:   Need to wait for clogged resources to open up.\n"), 5);
00139 
00140       // Ask the ThreadSynchResource to block us until the clog situation
00141       // clears up.
00142       if (this->wait_on_clogged_resource() == -1) {
00143         VDBG_LVL((LM_DEBUG,"(%P|%t) DBG:   "
00144                   "Fatal error - wait_on_clogged_resource fails.\n"), 5);
00145         break;
00146       }
00147     }
00148 
00149     VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00150           "Call perform_work()\n"));
00151 
00152     // Without the lock, ask the worker to perform some work.  It tells
00153     // us if it completed with more work to still be performed (or not).
00154     work_outcome = worker->perform_work();
00155 
00156     VDBG_LVL((LM_DEBUG,"(%P|%t) DBG:   "
00157               "call to perform_work() returned %d\n",work_outcome), 5);
00158   }
00159 
00160   return 0;
00161 }

Here is the call graph for this function:

void OpenDDS::DCPS::PerConnectionSynch::unregister_worker_i (  )  [protected, virtual]

The default implementation is to do nothing here. The subclass may override the implementation in order to do something when the worker unregisters.

Reimplemented from OpenDDS::DCPS::ThreadSynch.

Definition at line 178 of file PerConnectionSynch.cpp.

References condition_, DBG_ENTRY_LVL, lock_, shutdown_, ACE_Condition< MUTEX >::signal(), and ACE_Task_Base::wait().

00179 {
00180   DBG_ENTRY_LVL("PerConnectionSynch","unregister_worker_i",6);
00181   // It is at this point that we need to stop the thread that
00182   // was activated when our open() method was called.
00183   {
00184     // Acquire the lock
00185     GuardType guard(this->lock_);
00186 
00187     // Set the shutdown_ flag to false to shutdown the svc() method loop.
00188     this->shutdown_ = 1;
00189 
00190     // Signal the condition_ object in case the svc() method is currently
00191     // blocked wait()'ing on the condition.
00192     this->condition_.signal();
00193   }
00194 
00195   // Wait for all threads running this task (there should just be one thread)
00196   // to finish.
00197   this->wait();
00198 }

Here is the call graph for this function:

void OpenDDS::DCPS::PerConnectionSynch::work_available (  )  [virtual]

The ThreadSynchWorker would like to have its perform_work() called from the appropriate thread once the ThreadSynchResource claims that it is_ready_for_work().

Implements OpenDDS::DCPS::ThreadSynch.

Definition at line 22 of file PerConnectionSynch.cpp.

References condition_, DBG_ENTRY_LVL, lock_, ACE_Condition< MUTEX >::signal(), and work_available_.

00023 {
00024   DBG_ENTRY_LVL("PerConnectionSynch","work_available",6);
00025   GuardType guard(this->lock_);
00026   this->work_available_ = 1;
00027   this->condition_.signal();
00028 }

Here is the call graph for this function:


Member Data Documentation

Definition at line 50 of file PerConnectionSynch.h.

Referenced by svc(), unregister_worker_i(), and work_available().

Definition at line 53 of file PerConnectionSynch.h.

Referenced by open().

Definition at line 49 of file PerConnectionSynch.h.

Referenced by svc(), unregister_worker_i(), and work_available().

Definition at line 54 of file PerConnectionSynch.h.

Referenced by open().

Definition at line 52 of file PerConnectionSynch.h.

Referenced by open(), svc(), and unregister_worker_i().

Definition at line 51 of file PerConnectionSynch.h.

Referenced by svc(), and work_available().


The documentation for this class was generated from the following files:
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1