UpdateReceiver_T.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #ifndef UPDATERECEIVER_T_CPP
00009 #define UPDATERECEIVER_T_CPP
00010 
00011 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00012 #pragma once
00013 #endif /* ACE_LACKS_PRAGMA_ONCE */
00014 
00015 #include "DcpsInfo_pch.h"
00016 #include "dds/DCPS/debug.h"
00017 #include "UpdateReceiver_T.h"
00018 #include "UpdateProcessor_T.h"
00019 
00020 namespace OpenDDS {
00021 namespace Federator {
00022 
00023 template<class DataType>
00024 UpdateReceiver<DataType>::UpdateReceiver(UpdateProcessor<DataType>& processor)
00025   : processor_(processor),
00026     stop_(false),
00027     workAvailable_(this->lock_)
00028 {
00029   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00030     ACE_DEBUG((LM_DEBUG,
00031                ACE_TEXT("(%P|%t) UpdateReceiver::UpdateReceiver()\n")));
00032   }
00033 
00034   // Always execute the thread.
00035   this->open(0);
00036 }
00037 
00038 template<class DataType>
00039 UpdateReceiver<DataType>::~UpdateReceiver()
00040 {
00041   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00042     ACE_DEBUG((LM_DEBUG,
00043                ACE_TEXT("(%P|%t) UpdateReceiver::~UpdateReceiver()\n")));
00044   }
00045 
00046   // Cleanly terminate.
00047   this->stop();
00048   this->wait();
00049 }
00050 
00051 template<class DataType>
00052 int
00053 UpdateReceiver<DataType>::open(void*)
00054 {
00055   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00056     ACE_DEBUG((LM_DEBUG,
00057                ACE_TEXT("(%P|%t) UpdateReceiver::open()\n")));
00058   }
00059 
00060   // Run as a separate thread.
00061   return this->activate();
00062 }
00063 
00064 template<class DataType>
00065 int
00066 UpdateReceiver<DataType>::close(u_long /* flags */)
00067 {
00068   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00069     ACE_DEBUG((LM_DEBUG,
00070                ACE_TEXT("(%P|%t) UpdateReceiver::close()\n")));
00071   }
00072 
00073   return 0;
00074 }
00075 
00076 template<class DataType>
00077 void
00078 UpdateReceiver<DataType>::stop()
00079 {
00080   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00081     ACE_DEBUG((LM_DEBUG,
00082                ACE_TEXT("(%P|%t) UpdateReceiver::stop()\n")));
00083   }
00084 
00085   // Indicate the thread should stop and get its attention.
00086   if (this->stop_)
00087     return;
00088 
00089   this->stop_ = true;
00090   this->workAvailable_.signal();
00091 }
00092 
00093 template<class DataType>
00094 void
00095 UpdateReceiver<DataType>::add(DataType* sample, DDS::SampleInfo* info)
00096 {
00097   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00098     ACE_DEBUG((LM_DEBUG,
00099                ACE_TEXT("(%P|%t) UpdateReceiver::add()\n")));
00100   }
00101 
00102   { // Protect the queue.
00103     ACE_GUARD(ACE_SYNCH_MUTEX, guard, this->lock_);
00104     this->queue_.push_back(DataInfo(sample, info));
00105 
00106     if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00107       ACE_DEBUG((LM_DEBUG,
00108                  ACE_TEXT("(%P|%t) UpdateReceiver::add() - ")
00109                  ACE_TEXT(" %d samples waiting to process in 0x%x.\n"),
00110                  this->queue_.size(),
00111                  (void*)this));
00112     }
00113   }
00114 
00115   this->workAvailable_.signal();
00116 }
00117 
00118 template<class DataType>
00119 int
00120 UpdateReceiver<DataType>::svc()
00121 {
00122   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00123     ACE_DEBUG((LM_DEBUG,
00124                ACE_TEXT("(%P|%t) UpdateReceiver::svc()\n")));
00125   }
00126 
00127   // Continue until we are synchronously terminated.
00128   while (this->stop_ == false) {
00129     { // Block until there is work to do.
00130       ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, guard, this->lock_, 0);
00131 
00132       while (this->queue_.size() == 0) {
00133         // This releases the lock while we block.
00134         this->workAvailable_.wait();
00135 
00136         if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00137           ACE_DEBUG((LM_DEBUG,
00138                      ACE_TEXT("(%P|%t) UpdateReceiver::svc() - ")
00139                      ACE_TEXT("wakeup in 0x%x.\n"),
00140                      (void*)this));
00141         }
00142 
00143         // We were asked to stop instead.
00144         if (this->stop_ == true) {
00145           if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00146             ACE_DEBUG((LM_DEBUG,
00147                        ACE_TEXT("(%P|%t) UpdateReceiver::svc() - ")
00148                        ACE_TEXT("discontinuing processing after wakeup in 0x%x.\n"),
00149                        (void*)this));
00150           }
00151 
00152           return 0;
00153         }
00154       }
00155     }
00156 
00157     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00158       ACE_DEBUG((LM_DEBUG,
00159                  ACE_TEXT("(%P|%t) UpdateReceiver::svc() - ")
00160                  ACE_TEXT("processing a sample in 0x%x.\n"),
00161                  (void*)this));
00162     }
00163 
00164     // Delegate actual processing to the publication manager.
00165     this->processor_.processSample(
00166       this->queue_.front().first,
00167       this->queue_.front().second);
00168 
00169     { // Remove the completed work.
00170       ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, guard, this->lock_, 0);
00171       delete this->queue_.front().first;
00172       delete this->queue_.front().second;
00173       this->queue_.pop_front();
00174     }
00175   }
00176 
00177   if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00178     ACE_DEBUG((LM_DEBUG,
00179                ACE_TEXT("(%P|%t) UpdateReceiver::svc() - ")
00180                ACE_TEXT("discontinuing processing after sample complete in 0x%x.\n"),
00181                (void*)this));
00182   }
00183 
00184   return 0;
00185 }
00186 
00187 } // namespace Federator
00188 } // namespace OpenDDS
00189 
00190 #endif /* UPDATERECEIVER_T_CPP */

Generated on Fri Feb 12 20:05:29 2016 for OpenDDS by  doxygen 1.4.7