00001
00002
00003
00004
00005
00006
00007
00008 #ifndef UPDATERECEIVER_T_CPP
00009 #define UPDATERECEIVER_T_CPP
00010
00011 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00012 #pragma once
00013 #endif
00014
00015 #include "DcpsInfo_pch.h"
00016 #include "dds/DCPS/debug.h"
00017 #include "UpdateReceiver_T.h"
00018 #include "UpdateProcessor_T.h"
00019
00020 namespace OpenDDS {
00021 namespace Federator {
00022
00023 template<class DataType>
00024 UpdateReceiver<DataType>::UpdateReceiver(UpdateProcessor<DataType>& processor)
00025 : processor_(processor),
00026 stop_(false),
00027 workAvailable_(this->lock_)
00028 {
00029 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00030 ACE_DEBUG((LM_DEBUG,
00031 ACE_TEXT("(%P|%t) UpdateReceiver::UpdateReceiver()\n")));
00032 }
00033
00034
00035 this->open(0);
00036 }
00037
00038 template<class DataType>
00039 UpdateReceiver<DataType>::~UpdateReceiver()
00040 {
00041 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00042 ACE_DEBUG((LM_DEBUG,
00043 ACE_TEXT("(%P|%t) UpdateReceiver::~UpdateReceiver()\n")));
00044 }
00045
00046
00047 this->stop();
00048 this->wait();
00049 }
00050
00051 template<class DataType>
00052 int
00053 UpdateReceiver<DataType>::open(void*)
00054 {
00055 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00056 ACE_DEBUG((LM_DEBUG,
00057 ACE_TEXT("(%P|%t) UpdateReceiver::open()\n")));
00058 }
00059
00060
00061 return this->activate();
00062 }
00063
00064 template<class DataType>
00065 int
00066 UpdateReceiver<DataType>::close(u_long )
00067 {
00068 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00069 ACE_DEBUG((LM_DEBUG,
00070 ACE_TEXT("(%P|%t) UpdateReceiver::close()\n")));
00071 }
00072
00073 return 0;
00074 }
00075
00076 template<class DataType>
00077 void
00078 UpdateReceiver<DataType>::stop()
00079 {
00080 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00081 ACE_DEBUG((LM_DEBUG,
00082 ACE_TEXT("(%P|%t) UpdateReceiver::stop()\n")));
00083 }
00084
00085
00086 if (this->stop_)
00087 return;
00088
00089 this->stop_ = true;
00090 this->workAvailable_.signal();
00091 }
00092
00093 template<class DataType>
00094 void
00095 UpdateReceiver<DataType>::add(DataType* sample, DDS::SampleInfo* info)
00096 {
00097 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00098 ACE_DEBUG((LM_DEBUG,
00099 ACE_TEXT("(%P|%t) UpdateReceiver::add()\n")));
00100 }
00101
00102 {
00103 ACE_GUARD(ACE_SYNCH_MUTEX, guard, this->lock_);
00104 this->queue_.push_back(DataInfo(sample, info));
00105
00106 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00107 ACE_DEBUG((LM_DEBUG,
00108 ACE_TEXT("(%P|%t) UpdateReceiver::add() - ")
00109 ACE_TEXT(" %d samples waiting to process in 0x%x.\n"),
00110 this->queue_.size(),
00111 (void*)this));
00112 }
00113 }
00114
00115 this->workAvailable_.signal();
00116 }
00117
00118 template<class DataType>
00119 int
00120 UpdateReceiver<DataType>::svc()
00121 {
00122 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00123 ACE_DEBUG((LM_DEBUG,
00124 ACE_TEXT("(%P|%t) UpdateReceiver::svc()\n")));
00125 }
00126
00127
00128 while (this->stop_ == false) {
00129 {
00130 ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, guard, this->lock_, 0);
00131
00132 while (this->queue_.size() == 0) {
00133
00134 this->workAvailable_.wait();
00135
00136 if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00137 ACE_DEBUG((LM_DEBUG,
00138 ACE_TEXT("(%P|%t) UpdateReceiver::svc() - ")
00139 ACE_TEXT("wakeup in 0x%x.\n"),
00140 (void*)this));
00141 }
00142
00143
00144 if (this->stop_ == true) {
00145 if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00146 ACE_DEBUG((LM_DEBUG,
00147 ACE_TEXT("(%P|%t) UpdateReceiver::svc() - ")
00148 ACE_TEXT("discontinuing processing after wakeup in 0x%x.\n"),
00149 (void*)this));
00150 }
00151
00152 return 0;
00153 }
00154 }
00155 }
00156
00157 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00158 ACE_DEBUG((LM_DEBUG,
00159 ACE_TEXT("(%P|%t) UpdateReceiver::svc() - ")
00160 ACE_TEXT("processing a sample in 0x%x.\n"),
00161 (void*)this));
00162 }
00163
00164
00165 this->processor_.processSample(
00166 this->queue_.front().first,
00167 this->queue_.front().second);
00168
00169 {
00170 ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, guard, this->lock_, 0);
00171 delete this->queue_.front().first;
00172 delete this->queue_.front().second;
00173 this->queue_.pop_front();
00174 }
00175 }
00176
00177 if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00178 ACE_DEBUG((LM_DEBUG,
00179 ACE_TEXT("(%P|%t) UpdateReceiver::svc() - ")
00180 ACE_TEXT("discontinuing processing after sample complete in 0x%x.\n"),
00181 (void*)this));
00182 }
00183
00184 return 0;
00185 }
00186
00187 }
00188 }
00189
00190 #endif