UpdateReceiver_T.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #ifndef UPDATERECEIVER_T_CPP
00009 #define UPDATERECEIVER_T_CPP
00010 
00011 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00012 #pragma once
00013 #endif /* ACE_LACKS_PRAGMA_ONCE */
00014 
00015 #include "DcpsInfo_pch.h"
00016 #include "dds/DCPS/debug.h"
00017 #include "UpdateReceiver_T.h"
00018 #include "UpdateProcessor_T.h"
00019 
00020 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00021 
00022 namespace OpenDDS {
00023 namespace Federator {
00024 
00025 template<class DataType>
00026 UpdateReceiver<DataType>::UpdateReceiver(UpdateProcessor<DataType>& processor)
00027   : processor_(processor),
00028     stop_(false),
00029     workAvailable_(this->lock_)
00030 {
00031   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00032     ACE_DEBUG((LM_DEBUG,
00033                ACE_TEXT("(%P|%t) UpdateReceiver::UpdateReceiver()\n")));
00034   }
00035 
00036   // Always execute the thread.
00037   this->open(0);
00038 }
00039 
00040 template<class DataType>
00041 UpdateReceiver<DataType>::~UpdateReceiver()
00042 {
00043   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00044     ACE_DEBUG((LM_DEBUG,
00045                ACE_TEXT("(%P|%t) UpdateReceiver::~UpdateReceiver()\n")));
00046   }
00047 
00048   // Cleanly terminate.
00049   this->stop();
00050   this->wait();
00051 }
00052 
00053 template<class DataType>
00054 int
00055 UpdateReceiver<DataType>::open(void*)
00056 {
00057   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00058     ACE_DEBUG((LM_DEBUG,
00059                ACE_TEXT("(%P|%t) UpdateReceiver::open()\n")));
00060   }
00061 
00062   // Run as a separate thread.
00063   return this->activate();
00064 }
00065 
00066 template<class DataType>
00067 int
00068 UpdateReceiver<DataType>::close(u_long /* flags */)
00069 {
00070   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00071     ACE_DEBUG((LM_DEBUG,
00072                ACE_TEXT("(%P|%t) UpdateReceiver::close()\n")));
00073   }
00074 
00075   while (this->queue_.size()) {
00076     delete this->queue_.front().first;
00077     delete this->queue_.front().second;
00078     this->queue_.pop_front();
00079   }
00080 
00081   return 0;
00082 }
00083 
00084 template<class DataType>
00085 void
00086 UpdateReceiver<DataType>::stop()
00087 {
00088   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00089     ACE_DEBUG((LM_DEBUG,
00090                ACE_TEXT("(%P|%t) UpdateReceiver::stop()\n")));
00091   }
00092 
00093   // Indicate the thread should stop and get its attention.
00094   if (this->stop_)
00095     return;
00096 
00097   this->stop_ = true;
00098   this->workAvailable_.signal();
00099 }
00100 
00101 template<class DataType>
00102 void
00103 UpdateReceiver<DataType>::add(OpenDDS::DCPS::unique_ptr<DataType> sample, OpenDDS::DCPS::unique_ptr<DDS::SampleInfo> info)
00104 {
00105   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00106     ACE_DEBUG((LM_DEBUG,
00107                ACE_TEXT("(%P|%t) UpdateReceiver::add()\n")));
00108   }
00109 
00110   if (this->stop_)
00111     return;
00112 
00113   { // Protect the queue.
00114     ACE_GUARD(ACE_SYNCH_MUTEX, guard, this->lock_);
00115     this->queue_.push_back(DataInfo(sample.release(), info.release()));
00116 
00117     if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00118       ACE_DEBUG((LM_DEBUG,
00119                  ACE_TEXT("(%P|%t) UpdateReceiver::add() - ")
00120                  ACE_TEXT(" %d samples waiting to process in 0x%x.\n"),
00121                  this->queue_.size(),
00122                  (void*)this));
00123     }
00124   }
00125 
00126   this->workAvailable_.signal();
00127 }
00128 
00129 template<class DataType>
00130 int
00131 UpdateReceiver<DataType>::svc()
00132 {
00133   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00134     ACE_DEBUG((LM_DEBUG,
00135                ACE_TEXT("(%P|%t) UpdateReceiver::svc()\n")));
00136   }
00137 
00138   // Continue until we are synchronously terminated.
00139   while (this->stop_ == false) {
00140     { // Block until there is work to do.
00141       ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, guard, this->lock_, 0);
00142 
00143       while (this->queue_.size() == 0) {
00144         // This releases the lock while we block.
00145         this->workAvailable_.wait();
00146 
00147         if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00148           ACE_DEBUG((LM_DEBUG,
00149                      ACE_TEXT("(%P|%t) UpdateReceiver::svc() - ")
00150                      ACE_TEXT("wakeup in 0x%x.\n"),
00151                      (void*)this));
00152         }
00153 
00154         // We were asked to stop instead.
00155         if (this->stop_ == true) {
00156           if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00157             ACE_DEBUG((LM_DEBUG,
00158                        ACE_TEXT("(%P|%t) UpdateReceiver::svc() - ")
00159                        ACE_TEXT("discontinuing processing after wakeup in 0x%x.\n"),
00160                        (void*)this));
00161           }
00162 
00163           return 0;
00164         }
00165       }
00166     }
00167 
00168     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00169       ACE_DEBUG((LM_DEBUG,
00170                  ACE_TEXT("(%P|%t) UpdateReceiver::svc() - ")
00171                  ACE_TEXT("processing a sample in 0x%x.\n"),
00172                  (void*)this));
00173     }
00174 
00175     // Delegate actual processing to the publication manager.
00176     this->processor_.processSample(
00177       this->queue_.front().first,
00178       this->queue_.front().second);
00179 
00180     { // Remove the completed work.
00181       ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, guard, this->lock_, 0);
00182       delete this->queue_.front().first;
00183       delete this->queue_.front().second;
00184       this->queue_.pop_front();
00185     }
00186   }
00187 
00188   if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00189     ACE_DEBUG((LM_DEBUG,
00190                ACE_TEXT("(%P|%t) UpdateReceiver::svc() - ")
00191                ACE_TEXT("discontinuing processing after sample complete in 0x%x.\n"),
00192                (void*)this));
00193   }
00194 
00195   return 0;
00196 }
00197 
00198 } // namespace Federator
00199 } // namespace OpenDDS
00200 
00201 OPENDDS_END_VERSIONED_NAMESPACE_DECL
00202 
00203 #endif /* UPDATERECEIVER_T_CPP */
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1