OpenDDS::Federator::UpdateReceiver< DataType > Class Template Reference

#include <UpdateReceiver_T.h>

Inheritance diagram for OpenDDS::Federator::UpdateReceiver< DataType >:
Inheritance graph
[legend]
Collaboration diagram for OpenDDS::Federator::UpdateReceiver< DataType >:
Collaboration graph
[legend]

List of all members.

Public Member Functions

 UpdateReceiver (UpdateProcessor< DataType > &processor)
 Construct with a processor reference.
virtual ~UpdateReceiver ()
virtual int open (void *)
virtual int svc ()
virtual int close (u_long flags=0)
void add (OpenDDS::DCPS::unique_ptr< DataType > sample, OpenDDS::DCPS::unique_ptr< DDS::SampleInfo > info)
 Sample enqueueing.
void stop ()
 Synchronous termination.

Private Types

typedef std::pair< DataType
*, DDS::SampleInfo * > 
DataInfo
 Contents of the queue.

Private Attributes

UpdateProcessor< DataType > & processor_
 The object that we delegate update processing to.
bool stop_
 Termination flag.
ACE_SYNCH_MUTEX lock_
 Protect queue modifications.
ACE_Condition< ACE_SYNCH_MUTEX > workAvailable_
 Work to do indicator.
std::list< DataInfoqueue_
 Queue of publication data to process.

Detailed Description

template<class DataType>
class OpenDDS::Federator::UpdateReceiver< DataType >

Definition at line 35 of file UpdateReceiver_T.h.


Member Typedef Documentation

template<class DataType>
typedef std::pair<DataType*, DDS::SampleInfo* > OpenDDS::Federator::UpdateReceiver< DataType >::DataInfo [private]

Contents of the queue.

Definition at line 76 of file UpdateReceiver_T.h.


Constructor & Destructor Documentation

template<class DataType>
OpenDDS::Federator::UpdateReceiver< DataType >::UpdateReceiver ( UpdateProcessor< DataType > &  processor  )  [inline]

Construct with a processor reference.

Definition at line 26 of file UpdateReceiver_T.cpp.

References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, LM_DEBUG, and OpenDDS::Federator::UpdateReceiver< DataType >::open().

00027   : processor_(processor),
00028     stop_(false),
00029     workAvailable_(this->lock_)
00030 {
00031   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00032     ACE_DEBUG((LM_DEBUG,
00033                ACE_TEXT("(%P|%t) UpdateReceiver::UpdateReceiver()\n")));
00034   }
00035 
00036   // Always execute the thread.
00037   this->open(0);
00038 }

Here is the call graph for this function:

template<class DataType >
OpenDDS::Federator::UpdateReceiver< DataType >::~UpdateReceiver (  )  [inline, virtual]

Definition at line 41 of file UpdateReceiver_T.cpp.

References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, LM_DEBUG, OpenDDS::Federator::UpdateReceiver< DataType >::stop(), and ACE_Task_Base::wait().

00042 {
00043   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00044     ACE_DEBUG((LM_DEBUG,
00045                ACE_TEXT("(%P|%t) UpdateReceiver::~UpdateReceiver()\n")));
00046   }
00047 
00048   // Cleanly terminate.
00049   this->stop();
00050   this->wait();
00051 }

Here is the call graph for this function:


Member Function Documentation

template<class DataType>
void OpenDDS::Federator::UpdateReceiver< DataType >::add ( OpenDDS::DCPS::unique_ptr< DataType >  sample,
OpenDDS::DCPS::unique_ptr< DDS::SampleInfo info 
) [inline]

Sample enqueueing.

Parameters:
sample - pointer to the received sample to be processed
info - pointer to the info about the sample to be processed.

NOTE: We take ownership of this data and delete it when we are done processing it.

Definition at line 103 of file UpdateReceiver_T.cpp.

References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, LM_DEBUG, OpenDDS::Federator::UpdateReceiver< DataType >::lock_, OpenDDS::Federator::UpdateReceiver< DataType >::queue_, OpenDDS::DCPS::unique_ptr< T, Deleter >::release(), ACE_Condition< MUTEX >::signal(), OpenDDS::Federator::UpdateReceiver< DataType >::stop_, and OpenDDS::Federator::UpdateReceiver< DataType >::workAvailable_.

00104 {
00105   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00106     ACE_DEBUG((LM_DEBUG,
00107                ACE_TEXT("(%P|%t) UpdateReceiver::add()\n")));
00108   }
00109 
00110   if (this->stop_)
00111     return;
00112 
00113   { // Protect the queue.
00114     ACE_GUARD(ACE_SYNCH_MUTEX, guard, this->lock_);
00115     this->queue_.push_back(DataInfo(sample.release(), info.release()));
00116 
00117     if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00118       ACE_DEBUG((LM_DEBUG,
00119                  ACE_TEXT("(%P|%t) UpdateReceiver::add() - ")
00120                  ACE_TEXT(" %d samples waiting to process in 0x%x.\n"),
00121                  this->queue_.size(),
00122                  (void*)this));
00123     }
00124   }
00125 
00126   this->workAvailable_.signal();
00127 }

Here is the call graph for this function:

template<class DataType >
int OpenDDS::Federator::UpdateReceiver< DataType >::close ( u_long  flags = 0  )  [inline, virtual]

Reimplemented from ACE_Task_Base.

Definition at line 68 of file UpdateReceiver_T.cpp.

References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, LM_DEBUG, and OpenDDS::Federator::UpdateReceiver< DataType >::queue_.

00069 {
00070   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00071     ACE_DEBUG((LM_DEBUG,
00072                ACE_TEXT("(%P|%t) UpdateReceiver::close()\n")));
00073   }
00074 
00075   while (this->queue_.size()) {
00076     delete this->queue_.front().first;
00077     delete this->queue_.front().second;
00078     this->queue_.pop_front();
00079   }
00080 
00081   return 0;
00082 }

Here is the call graph for this function:

template<class DataType >
int OpenDDS::Federator::UpdateReceiver< DataType >::open ( void *   )  [inline, virtual]

Reimplemented from ACE_Task_Base.

Definition at line 55 of file UpdateReceiver_T.cpp.

References ACE_TEXT(), ACE_Task_Base::activate(), OpenDDS::DCPS::DCPS_debug_level, and LM_DEBUG.

Referenced by OpenDDS::Federator::UpdateReceiver< DataType >::UpdateReceiver().

00056 {
00057   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00058     ACE_DEBUG((LM_DEBUG,
00059                ACE_TEXT("(%P|%t) UpdateReceiver::open()\n")));
00060   }
00061 
00062   // Run as a separate thread.
00063   return this->activate();
00064 }

Here is the call graph for this function:

Here is the caller graph for this function:

template<class DataType >
void OpenDDS::Federator::UpdateReceiver< DataType >::stop ( void   )  [inline]

Synchronous termination.

Definition at line 86 of file UpdateReceiver_T.cpp.

References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, LM_DEBUG, ACE_Condition< MUTEX >::signal(), OpenDDS::Federator::UpdateReceiver< DataType >::stop_, and OpenDDS::Federator::UpdateReceiver< DataType >::workAvailable_.

Referenced by OpenDDS::Federator::UpdateReceiver< DataType >::~UpdateReceiver().

00087 {
00088   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00089     ACE_DEBUG((LM_DEBUG,
00090                ACE_TEXT("(%P|%t) UpdateReceiver::stop()\n")));
00091   }
00092 
00093   // Indicate the thread should stop and get its attention.
00094   if (this->stop_)
00095     return;
00096 
00097   this->stop_ = true;
00098   this->workAvailable_.signal();
00099 }

Here is the call graph for this function:

Here is the caller graph for this function:

template<class DataType >
int OpenDDS::Federator::UpdateReceiver< DataType >::svc ( void   )  [inline, virtual]

Reimplemented from ACE_Task_Base.

Definition at line 131 of file UpdateReceiver_T.cpp.

References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, LM_DEBUG, OpenDDS::Federator::UpdateReceiver< DataType >::lock_, OpenDDS::Federator::UpdateReceiver< DataType >::processor_, OpenDDS::Federator::UpdateReceiver< DataType >::queue_, OpenDDS::Federator::UpdateReceiver< DataType >::stop_, ACE_Condition< MUTEX >::wait(), and OpenDDS::Federator::UpdateReceiver< DataType >::workAvailable_.

00132 {
00133   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00134     ACE_DEBUG((LM_DEBUG,
00135                ACE_TEXT("(%P|%t) UpdateReceiver::svc()\n")));
00136   }
00137 
00138   // Continue until we are synchronously terminated.
00139   while (this->stop_ == false) {
00140     { // Block until there is work to do.
00141       ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, guard, this->lock_, 0);
00142 
00143       while (this->queue_.size() == 0) {
00144         // This releases the lock while we block.
00145         this->workAvailable_.wait();
00146 
00147         if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00148           ACE_DEBUG((LM_DEBUG,
00149                      ACE_TEXT("(%P|%t) UpdateReceiver::svc() - ")
00150                      ACE_TEXT("wakeup in 0x%x.\n"),
00151                      (void*)this));
00152         }
00153 
00154         // We were asked to stop instead.
00155         if (this->stop_ == true) {
00156           if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00157             ACE_DEBUG((LM_DEBUG,
00158                        ACE_TEXT("(%P|%t) UpdateReceiver::svc() - ")
00159                        ACE_TEXT("discontinuing processing after wakeup in 0x%x.\n"),
00160                        (void*)this));
00161           }
00162 
00163           return 0;
00164         }
00165       }
00166     }
00167 
00168     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00169       ACE_DEBUG((LM_DEBUG,
00170                  ACE_TEXT("(%P|%t) UpdateReceiver::svc() - ")
00171                  ACE_TEXT("processing a sample in 0x%x.\n"),
00172                  (void*)this));
00173     }
00174 
00175     // Delegate actual processing to the publication manager.
00176     this->processor_.processSample(
00177       this->queue_.front().first,
00178       this->queue_.front().second);
00179 
00180     { // Remove the completed work.
00181       ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, guard, this->lock_, 0);
00182       delete this->queue_.front().first;
00183       delete this->queue_.front().second;
00184       this->queue_.pop_front();
00185     }
00186   }
00187 
00188   if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00189     ACE_DEBUG((LM_DEBUG,
00190                ACE_TEXT("(%P|%t) UpdateReceiver::svc() - ")
00191                ACE_TEXT("discontinuing processing after sample complete in 0x%x.\n"),
00192                (void*)this));
00193   }
00194 
00195   return 0;
00196 }

Here is the call graph for this function:


Member Data Documentation

template<class DataType>
ACE_SYNCH_MUTEX OpenDDS::Federator::UpdateReceiver< DataType >::lock_ [private]
template<class DataType>
UpdateProcessor<DataType>& OpenDDS::Federator::UpdateReceiver< DataType >::processor_ [private]

The object that we delegate update processing to.

Definition at line 64 of file UpdateReceiver_T.h.

Referenced by OpenDDS::Federator::UpdateReceiver< DataType >::svc().

template<class DataType>
std::list<DataInfo> OpenDDS::Federator::UpdateReceiver< DataType >::queue_ [private]
template<class DataType>
bool OpenDDS::Federator::UpdateReceiver< DataType >::stop_ [private]
template<class DataType>
ACE_Condition<ACE_SYNCH_MUTEX> OpenDDS::Federator::UpdateReceiver< DataType >::workAvailable_ [private]

The documentation for this class was generated from the following files:
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1