OpenDDS::Federator::UpdateReceiver< DataType > Class Template Reference

#include <UpdateReceiver_T.h>

Inheritance diagram for OpenDDS::Federator::UpdateReceiver< DataType >:

Inheritance graph
[legend]
Collaboration diagram for OpenDDS::Federator::UpdateReceiver< DataType >:

Collaboration graph
[legend]
List of all members.

Public Member Functions

 UpdateReceiver (UpdateProcessor< DataType > &processor)
 Construct with a processor reference.
virtual ~UpdateReceiver ()
 Virtual destructor.
virtual int open (void *)
virtual int svc ()
virtual int close (u_long flags=0)
void add (DataType *sample, DDS::SampleInfo *info)
 Sample enqueueing.
void stop ()
 Synchronous termination.

Private Types

typedef std::pair< DataType *,
DDS::SampleInfo * > 
DataInfo
 Contents of the queue.

Private Attributes

UpdateProcessor< DataType > & processor_
 The object that we delegate update processing to.
bool stop_
 Termination flag.
ACE_SYNCH_MUTEX lock_
 Protect queue modifications.
ACE_Condition< ACE_SYNCH_MUTEX > workAvailable_
 Work to do indicator.
std::list< DataInfoqueue_
 Queue of publication data to process.

Detailed Description

template<class DataType>
class OpenDDS::Federator::UpdateReceiver< DataType >

Definition at line 33 of file UpdateReceiver_T.h.


Member Typedef Documentation

template<class DataType>
typedef std::pair<DataType*, DDS::SampleInfo* > OpenDDS::Federator::UpdateReceiver< DataType >::DataInfo [private]

Contents of the queue.

Definition at line 75 of file UpdateReceiver_T.h.


Constructor & Destructor Documentation

template<class DataType>
OpenDDS::Federator::UpdateReceiver< DataType >::UpdateReceiver ( UpdateProcessor< DataType > &  processor  ) 

Construct with a processor reference.

Definition at line 24 of file UpdateReceiver_T.cpp.

References OpenDDS::DCPS::DCPS_debug_level, and OpenDDS::Federator::UpdateReceiver< DataType >::open().

00025   : processor_(processor),
00026     stop_(false),
00027     workAvailable_(this->lock_)
00028 {
00029   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00030     ACE_DEBUG((LM_DEBUG,
00031                ACE_TEXT("(%P|%t) UpdateReceiver::UpdateReceiver()\n")));
00032   }
00033 
00034   // Always execute the thread.
00035   this->open(0);
00036 }

template<class DataType>
OpenDDS::Federator::UpdateReceiver< DataType >::~UpdateReceiver (  )  [virtual]

Virtual destructor.

Definition at line 39 of file UpdateReceiver_T.cpp.

References OpenDDS::DCPS::DCPS_debug_level, and OpenDDS::Federator::UpdateReceiver< DataType >::stop().

00040 {
00041   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00042     ACE_DEBUG((LM_DEBUG,
00043                ACE_TEXT("(%P|%t) UpdateReceiver::~UpdateReceiver()\n")));
00044   }
00045 
00046   // Cleanly terminate.
00047   this->stop();
00048   this->wait();
00049 }


Member Function Documentation

template<class DataType>
void OpenDDS::Federator::UpdateReceiver< DataType >::add ( DataType *  sample,
DDS::SampleInfo info 
)

Sample enqueueing.

Parameters:
sample - pointer to the received sample to be processed
info - pointer to the info about the sample to be processed.
NOTE: We take ownership of this data and delete it when we are done processing it.

Definition at line 95 of file UpdateReceiver_T.cpp.

References OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::UpdateReceiver< DataType >::queue_, and OpenDDS::Federator::UpdateReceiver< DataType >::workAvailable_.

00096 {
00097   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00098     ACE_DEBUG((LM_DEBUG,
00099                ACE_TEXT("(%P|%t) UpdateReceiver::add()\n")));
00100   }
00101 
00102   { // Protect the queue.
00103     ACE_GUARD(ACE_SYNCH_MUTEX, guard, this->lock_);
00104     this->queue_.push_back(DataInfo(sample, info));
00105 
00106     if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00107       ACE_DEBUG((LM_DEBUG,
00108                  ACE_TEXT("(%P|%t) UpdateReceiver::add() - ")
00109                  ACE_TEXT(" %d samples waiting to process in 0x%x.\n"),
00110                  this->queue_.size(),
00111                  (void*)this));
00112     }
00113   }
00114 
00115   this->workAvailable_.signal();
00116 }

template<class DataType>
int OpenDDS::Federator::UpdateReceiver< DataType >::close ( u_long  flags = 0  )  [virtual]

Definition at line 66 of file UpdateReceiver_T.cpp.

References OpenDDS::DCPS::DCPS_debug_level.

00067 {
00068   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00069     ACE_DEBUG((LM_DEBUG,
00070                ACE_TEXT("(%P|%t) UpdateReceiver::close()\n")));
00071   }
00072 
00073   return 0;
00074 }

template<class DataType>
int OpenDDS::Federator::UpdateReceiver< DataType >::open ( void *   )  [virtual]

Definition at line 53 of file UpdateReceiver_T.cpp.

References OpenDDS::DCPS::DCPS_debug_level.

Referenced by OpenDDS::Federator::UpdateReceiver< DataType >::UpdateReceiver().

00054 {
00055   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00056     ACE_DEBUG((LM_DEBUG,
00057                ACE_TEXT("(%P|%t) UpdateReceiver::open()\n")));
00058   }
00059 
00060   // Run as a separate thread.
00061   return this->activate();
00062 }

template<class DataType>
void OpenDDS::Federator::UpdateReceiver< DataType >::stop (  ) 

Synchronous termination.

Definition at line 78 of file UpdateReceiver_T.cpp.

References OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::UpdateReceiver< DataType >::stop_, and OpenDDS::Federator::UpdateReceiver< DataType >::workAvailable_.

Referenced by OpenDDS::Federator::UpdateReceiver< DataType >::~UpdateReceiver().

00079 {
00080   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00081     ACE_DEBUG((LM_DEBUG,
00082                ACE_TEXT("(%P|%t) UpdateReceiver::stop()\n")));
00083   }
00084 
00085   // Indicate the thread should stop and get its attention.
00086   if (this->stop_)
00087     return;
00088 
00089   this->stop_ = true;
00090   this->workAvailable_.signal();
00091 }

template<class DataType>
int OpenDDS::Federator::UpdateReceiver< DataType >::svc (  )  [virtual]

Definition at line 120 of file UpdateReceiver_T.cpp.

References OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::UpdateReceiver< DataType >::processor_, and OpenDDS::Federator::UpdateReceiver< DataType >::queue_.

00121 {
00122   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00123     ACE_DEBUG((LM_DEBUG,
00124                ACE_TEXT("(%P|%t) UpdateReceiver::svc()\n")));
00125   }
00126 
00127   // Continue until we are synchronously terminated.
00128   while (this->stop_ == false) {
00129     { // Block until there is work to do.
00130       ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, guard, this->lock_, 0);
00131 
00132       while (this->queue_.size() == 0) {
00133         // This releases the lock while we block.
00134         this->workAvailable_.wait();
00135 
00136         if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00137           ACE_DEBUG((LM_DEBUG,
00138                      ACE_TEXT("(%P|%t) UpdateReceiver::svc() - ")
00139                      ACE_TEXT("wakeup in 0x%x.\n"),
00140                      (void*)this));
00141         }
00142 
00143         // We were asked to stop instead.
00144         if (this->stop_ == true) {
00145           if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00146             ACE_DEBUG((LM_DEBUG,
00147                        ACE_TEXT("(%P|%t) UpdateReceiver::svc() - ")
00148                        ACE_TEXT("discontinuing processing after wakeup in 0x%x.\n"),
00149                        (void*)this));
00150           }
00151 
00152           return 0;
00153         }
00154       }
00155     }
00156 
00157     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00158       ACE_DEBUG((LM_DEBUG,
00159                  ACE_TEXT("(%P|%t) UpdateReceiver::svc() - ")
00160                  ACE_TEXT("processing a sample in 0x%x.\n"),
00161                  (void*)this));
00162     }
00163 
00164     // Delegate actual processing to the publication manager.
00165     this->processor_.processSample(
00166       this->queue_.front().first,
00167       this->queue_.front().second);
00168 
00169     { // Remove the completed work.
00170       ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, guard, this->lock_, 0);
00171       delete this->queue_.front().first;
00172       delete this->queue_.front().second;
00173       this->queue_.pop_front();
00174     }
00175   }
00176 
00177   if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00178     ACE_DEBUG((LM_DEBUG,
00179                ACE_TEXT("(%P|%t) UpdateReceiver::svc() - ")
00180                ACE_TEXT("discontinuing processing after sample complete in 0x%x.\n"),
00181                (void*)this));
00182   }
00183 
00184   return 0;
00185 }


Member Data Documentation

template<class DataType>
ACE_SYNCH_MUTEX OpenDDS::Federator::UpdateReceiver< DataType >::lock_ [private]

Protect queue modifications.

Definition at line 69 of file UpdateReceiver_T.h.

template<class DataType>
UpdateProcessor<DataType>& OpenDDS::Federator::UpdateReceiver< DataType >::processor_ [private]

The object that we delegate update processing to.

Definition at line 63 of file UpdateReceiver_T.h.

Referenced by OpenDDS::Federator::UpdateReceiver< DataType >::svc().

template<class DataType>
std::list<DataInfo> OpenDDS::Federator::UpdateReceiver< DataType >::queue_ [private]

Queue of publication data to process.

Definition at line 78 of file UpdateReceiver_T.h.

Referenced by OpenDDS::Federator::UpdateReceiver< DataType >::add(), and OpenDDS::Federator::UpdateReceiver< DataType >::svc().

template<class DataType>
bool OpenDDS::Federator::UpdateReceiver< DataType >::stop_ [private]

Termination flag.

Definition at line 66 of file UpdateReceiver_T.h.

Referenced by OpenDDS::Federator::UpdateReceiver< DataType >::stop().

template<class DataType>
ACE_Condition<ACE_SYNCH_MUTEX> OpenDDS::Federator::UpdateReceiver< DataType >::workAvailable_ [private]

Work to do indicator.

Definition at line 72 of file UpdateReceiver_T.h.

Referenced by OpenDDS::Federator::UpdateReceiver< DataType >::add(), and OpenDDS::Federator::UpdateReceiver< DataType >::stop().


The documentation for this class was generated from the following files:
Generated on Fri Feb 12 20:06:47 2016 for OpenDDS by  doxygen 1.4.7