UpdateReceiver_T.cpp
Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008 #ifndef UPDATERECEIVER_T_CPP
00009 #define UPDATERECEIVER_T_CPP
00010
00011 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00012 #pragma once
00013 #endif
00014
00015 #include "DcpsInfo_pch.h"
00016 #include "dds/DCPS/debug.h"
00017 #include "UpdateReceiver_T.h"
00018 #include "UpdateProcessor_T.h"
00019
00020 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00021
00022 namespace OpenDDS {
00023 namespace Federator {
00024
00025 template<class DataType>
00026 UpdateReceiver<DataType>::UpdateReceiver(UpdateProcessor<DataType>& processor)
00027 : processor_(processor),
00028 stop_(false),
00029 workAvailable_(this->lock_)
00030 {
00031 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00032 ACE_DEBUG((LM_DEBUG,
00033 ACE_TEXT("(%P|%t) UpdateReceiver::UpdateReceiver()\n")));
00034 }
00035
00036
00037 this->open(0);
00038 }
00039
00040 template<class DataType>
00041 UpdateReceiver<DataType>::~UpdateReceiver()
00042 {
00043 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00044 ACE_DEBUG((LM_DEBUG,
00045 ACE_TEXT("(%P|%t) UpdateReceiver::~UpdateReceiver()\n")));
00046 }
00047
00048
00049 this->stop();
00050 this->wait();
00051 }
00052
00053 template<class DataType>
00054 int
00055 UpdateReceiver<DataType>::open(void*)
00056 {
00057 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00058 ACE_DEBUG((LM_DEBUG,
00059 ACE_TEXT("(%P|%t) UpdateReceiver::open()\n")));
00060 }
00061
00062
00063 return this->activate();
00064 }
00065
00066 template<class DataType>
00067 int
00068 UpdateReceiver<DataType>::close(u_long )
00069 {
00070 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00071 ACE_DEBUG((LM_DEBUG,
00072 ACE_TEXT("(%P|%t) UpdateReceiver::close()\n")));
00073 }
00074
00075 while (this->queue_.size()) {
00076 delete this->queue_.front().first;
00077 delete this->queue_.front().second;
00078 this->queue_.pop_front();
00079 }
00080
00081 return 0;
00082 }
00083
00084 template<class DataType>
00085 void
00086 UpdateReceiver<DataType>::stop()
00087 {
00088 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00089 ACE_DEBUG((LM_DEBUG,
00090 ACE_TEXT("(%P|%t) UpdateReceiver::stop()\n")));
00091 }
00092
00093
00094 if (this->stop_)
00095 return;
00096
00097 this->stop_ = true;
00098 this->workAvailable_.signal();
00099 }
00100
00101 template<class DataType>
00102 void
00103 UpdateReceiver<DataType>::add(OpenDDS::DCPS::unique_ptr<DataType> sample, OpenDDS::DCPS::unique_ptr<DDS::SampleInfo> info)
00104 {
00105 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00106 ACE_DEBUG((LM_DEBUG,
00107 ACE_TEXT("(%P|%t) UpdateReceiver::add()\n")));
00108 }
00109
00110 if (this->stop_)
00111 return;
00112
00113 {
00114 ACE_GUARD(ACE_SYNCH_MUTEX, guard, this->lock_);
00115 this->queue_.push_back(DataInfo(sample.release(), info.release()));
00116
00117 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00118 ACE_DEBUG((LM_DEBUG,
00119 ACE_TEXT("(%P|%t) UpdateReceiver::add() - ")
00120 ACE_TEXT(" %d samples waiting to process in 0x%x.\n"),
00121 this->queue_.size(),
00122 (void*)this));
00123 }
00124 }
00125
00126 this->workAvailable_.signal();
00127 }
00128
00129 template<class DataType>
00130 int
00131 UpdateReceiver<DataType>::svc()
00132 {
00133 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00134 ACE_DEBUG((LM_DEBUG,
00135 ACE_TEXT("(%P|%t) UpdateReceiver::svc()\n")));
00136 }
00137
00138
00139 while (this->stop_ == false) {
00140 {
00141 ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, guard, this->lock_, 0);
00142
00143 while (this->queue_.size() == 0) {
00144
00145 this->workAvailable_.wait();
00146
00147 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00148 ACE_DEBUG((LM_DEBUG,
00149 ACE_TEXT("(%P|%t) UpdateReceiver::svc() - ")
00150 ACE_TEXT("wakeup in 0x%x.\n"),
00151 (void*)this));
00152 }
00153
00154
00155 if (this->stop_ == true) {
00156 if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00157 ACE_DEBUG((LM_DEBUG,
00158 ACE_TEXT("(%P|%t) UpdateReceiver::svc() - ")
00159 ACE_TEXT("discontinuing processing after wakeup in 0x%x.\n"),
00160 (void*)this));
00161 }
00162
00163 return 0;
00164 }
00165 }
00166 }
00167
00168 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00169 ACE_DEBUG((LM_DEBUG,
00170 ACE_TEXT("(%P|%t) UpdateReceiver::svc() - ")
00171 ACE_TEXT("processing a sample in 0x%x.\n"),
00172 (void*)this));
00173 }
00174
00175
00176 this->processor_.processSample(
00177 this->queue_.front().first,
00178 this->queue_.front().second);
00179
00180 {
00181 ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, guard, this->lock_, 0);
00182 delete this->queue_.front().first;
00183 delete this->queue_.front().second;
00184 this->queue_.pop_front();
00185 }
00186 }
00187
00188 if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00189 ACE_DEBUG((LM_DEBUG,
00190 ACE_TEXT("(%P|%t) UpdateReceiver::svc() - ")
00191 ACE_TEXT("discontinuing processing after sample complete in 0x%x.\n"),
00192 (void*)this));
00193 }
00194
00195 return 0;
00196 }
00197
00198 }
00199 }
00200
00201 OPENDDS_END_VERSIONED_NAMESPACE_DECL
00202
00203 #endif