#include <UpdateReceiver_T.h>
Public Member Functions | |
UpdateReceiver (UpdateProcessor< DataType > &processor) | |
Construct with a processor reference. | |
virtual | ~UpdateReceiver () |
virtual int | open (void *) |
virtual int | svc () |
virtual int | close (u_long flags=0) |
void | add (OpenDDS::DCPS::unique_ptr< DataType > sample, OpenDDS::DCPS::unique_ptr< DDS::SampleInfo > info) |
Sample enqueueing. | |
void | stop () |
Synchronous termination. | |
Private Types | |
typedef std::pair< DataType *, DDS::SampleInfo * > | DataInfo |
Contents of the queue. | |
Private Attributes | |
UpdateProcessor< DataType > & | processor_ |
The object that we delegate update processing to. | |
bool | stop_ |
Termination flag. | |
ACE_SYNCH_MUTEX | lock_ |
Protect queue modifications. | |
ACE_Condition< ACE_SYNCH_MUTEX > | workAvailable_ |
Work to do indicator. | |
std::list< DataInfo > | queue_ |
Queue of publication data to process. |
Definition at line 35 of file UpdateReceiver_T.h.
typedef std::pair<DataType*, DDS::SampleInfo* > OpenDDS::Federator::UpdateReceiver< DataType >::DataInfo [private] |
Contents of the queue.
Definition at line 76 of file UpdateReceiver_T.h.
OpenDDS::Federator::UpdateReceiver< DataType >::UpdateReceiver | ( | UpdateProcessor< DataType > & | processor | ) | [inline] |
Construct with a processor reference.
Definition at line 26 of file UpdateReceiver_T.cpp.
References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, LM_DEBUG, and OpenDDS::Federator::UpdateReceiver< DataType >::open().
00027 : processor_(processor), 00028 stop_(false), 00029 workAvailable_(this->lock_) 00030 { 00031 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 00032 ACE_DEBUG((LM_DEBUG, 00033 ACE_TEXT("(%P|%t) UpdateReceiver::UpdateReceiver()\n"))); 00034 } 00035 00036 // Always execute the thread. 00037 this->open(0); 00038 }
OpenDDS::Federator::UpdateReceiver< DataType >::~UpdateReceiver | ( | ) | [inline, virtual] |
Definition at line 41 of file UpdateReceiver_T.cpp.
References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, LM_DEBUG, OpenDDS::Federator::UpdateReceiver< DataType >::stop(), and ACE_Task_Base::wait().
00042 { 00043 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 00044 ACE_DEBUG((LM_DEBUG, 00045 ACE_TEXT("(%P|%t) UpdateReceiver::~UpdateReceiver()\n"))); 00046 } 00047 00048 // Cleanly terminate. 00049 this->stop(); 00050 this->wait(); 00051 }
void OpenDDS::Federator::UpdateReceiver< DataType >::add | ( | OpenDDS::DCPS::unique_ptr< DataType > | sample, | |
OpenDDS::DCPS::unique_ptr< DDS::SampleInfo > | info | |||
) | [inline] |
Sample enqueueing.
sample | - pointer to the received sample to be processed | |
info | - pointer to the info about the sample to be processed. |
NOTE: We take ownership of this data and delete it when we are done processing it.
Definition at line 103 of file UpdateReceiver_T.cpp.
References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, LM_DEBUG, OpenDDS::Federator::UpdateReceiver< DataType >::lock_, OpenDDS::Federator::UpdateReceiver< DataType >::queue_, OpenDDS::DCPS::unique_ptr< T, Deleter >::release(), ACE_Condition< MUTEX >::signal(), OpenDDS::Federator::UpdateReceiver< DataType >::stop_, and OpenDDS::Federator::UpdateReceiver< DataType >::workAvailable_.
00104 { 00105 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 00106 ACE_DEBUG((LM_DEBUG, 00107 ACE_TEXT("(%P|%t) UpdateReceiver::add()\n"))); 00108 } 00109 00110 if (this->stop_) 00111 return; 00112 00113 { // Protect the queue. 00114 ACE_GUARD(ACE_SYNCH_MUTEX, guard, this->lock_); 00115 this->queue_.push_back(DataInfo(sample.release(), info.release())); 00116 00117 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 00118 ACE_DEBUG((LM_DEBUG, 00119 ACE_TEXT("(%P|%t) UpdateReceiver::add() - ") 00120 ACE_TEXT(" %d samples waiting to process in 0x%x.\n"), 00121 this->queue_.size(), 00122 (void*)this)); 00123 } 00124 } 00125 00126 this->workAvailable_.signal(); 00127 }
int OpenDDS::Federator::UpdateReceiver< DataType >::close | ( | u_long | flags = 0 |
) | [inline, virtual] |
Reimplemented from ACE_Task_Base.
Definition at line 68 of file UpdateReceiver_T.cpp.
References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, LM_DEBUG, and OpenDDS::Federator::UpdateReceiver< DataType >::queue_.
00069 { 00070 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 00071 ACE_DEBUG((LM_DEBUG, 00072 ACE_TEXT("(%P|%t) UpdateReceiver::close()\n"))); 00073 } 00074 00075 while (this->queue_.size()) { 00076 delete this->queue_.front().first; 00077 delete this->queue_.front().second; 00078 this->queue_.pop_front(); 00079 } 00080 00081 return 0; 00082 }
int OpenDDS::Federator::UpdateReceiver< DataType >::open | ( | void * | ) | [inline, virtual] |
Reimplemented from ACE_Task_Base.
Definition at line 55 of file UpdateReceiver_T.cpp.
References ACE_TEXT(), ACE_Task_Base::activate(), OpenDDS::DCPS::DCPS_debug_level, and LM_DEBUG.
Referenced by OpenDDS::Federator::UpdateReceiver< DataType >::UpdateReceiver().
00056 { 00057 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 00058 ACE_DEBUG((LM_DEBUG, 00059 ACE_TEXT("(%P|%t) UpdateReceiver::open()\n"))); 00060 } 00061 00062 // Run as a separate thread. 00063 return this->activate(); 00064 }
void OpenDDS::Federator::UpdateReceiver< DataType >::stop | ( | void | ) | [inline] |
Synchronous termination.
Definition at line 86 of file UpdateReceiver_T.cpp.
References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, LM_DEBUG, ACE_Condition< MUTEX >::signal(), OpenDDS::Federator::UpdateReceiver< DataType >::stop_, and OpenDDS::Federator::UpdateReceiver< DataType >::workAvailable_.
Referenced by OpenDDS::Federator::UpdateReceiver< DataType >::~UpdateReceiver().
00087 { 00088 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 00089 ACE_DEBUG((LM_DEBUG, 00090 ACE_TEXT("(%P|%t) UpdateReceiver::stop()\n"))); 00091 } 00092 00093 // Indicate the thread should stop and get its attention. 00094 if (this->stop_) 00095 return; 00096 00097 this->stop_ = true; 00098 this->workAvailable_.signal(); 00099 }
int OpenDDS::Federator::UpdateReceiver< DataType >::svc | ( | void | ) | [inline, virtual] |
Reimplemented from ACE_Task_Base.
Definition at line 131 of file UpdateReceiver_T.cpp.
References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, LM_DEBUG, OpenDDS::Federator::UpdateReceiver< DataType >::lock_, OpenDDS::Federator::UpdateReceiver< DataType >::processor_, OpenDDS::Federator::UpdateReceiver< DataType >::queue_, OpenDDS::Federator::UpdateReceiver< DataType >::stop_, ACE_Condition< MUTEX >::wait(), and OpenDDS::Federator::UpdateReceiver< DataType >::workAvailable_.
00132 { 00133 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 00134 ACE_DEBUG((LM_DEBUG, 00135 ACE_TEXT("(%P|%t) UpdateReceiver::svc()\n"))); 00136 } 00137 00138 // Continue until we are synchronously terminated. 00139 while (this->stop_ == false) { 00140 { // Block until there is work to do. 00141 ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, guard, this->lock_, 0); 00142 00143 while (this->queue_.size() == 0) { 00144 // This releases the lock while we block. 00145 this->workAvailable_.wait(); 00146 00147 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 00148 ACE_DEBUG((LM_DEBUG, 00149 ACE_TEXT("(%P|%t) UpdateReceiver::svc() - ") 00150 ACE_TEXT("wakeup in 0x%x.\n"), 00151 (void*)this)); 00152 } 00153 00154 // We were asked to stop instead. 00155 if (this->stop_ == true) { 00156 if (OpenDDS::DCPS::DCPS_debug_level > 4) { 00157 ACE_DEBUG((LM_DEBUG, 00158 ACE_TEXT("(%P|%t) UpdateReceiver::svc() - ") 00159 ACE_TEXT("discontinuing processing after wakeup in 0x%x.\n"), 00160 (void*)this)); 00161 } 00162 00163 return 0; 00164 } 00165 } 00166 } 00167 00168 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 00169 ACE_DEBUG((LM_DEBUG, 00170 ACE_TEXT("(%P|%t) UpdateReceiver::svc() - ") 00171 ACE_TEXT("processing a sample in 0x%x.\n"), 00172 (void*)this)); 00173 } 00174 00175 // Delegate actual processing to the publication manager. 00176 this->processor_.processSample( 00177 this->queue_.front().first, 00178 this->queue_.front().second); 00179 00180 { // Remove the completed work. 00181 ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, guard, this->lock_, 0); 00182 delete this->queue_.front().first; 00183 delete this->queue_.front().second; 00184 this->queue_.pop_front(); 00185 } 00186 } 00187 00188 if (OpenDDS::DCPS::DCPS_debug_level > 4) { 00189 ACE_DEBUG((LM_DEBUG, 00190 ACE_TEXT("(%P|%t) UpdateReceiver::svc() - ") 00191 ACE_TEXT("discontinuing processing after sample complete in 0x%x.\n"), 00192 (void*)this)); 00193 } 00194 00195 return 0; 00196 }
ACE_SYNCH_MUTEX OpenDDS::Federator::UpdateReceiver< DataType >::lock_ [private] |
Protect queue modifications.
Definition at line 70 of file UpdateReceiver_T.h.
Referenced by OpenDDS::Federator::UpdateReceiver< DataType >::add(), and OpenDDS::Federator::UpdateReceiver< DataType >::svc().
UpdateProcessor<DataType>& OpenDDS::Federator::UpdateReceiver< DataType >::processor_ [private] |
The object that we delegate update processing to.
Definition at line 64 of file UpdateReceiver_T.h.
Referenced by OpenDDS::Federator::UpdateReceiver< DataType >::svc().
std::list<DataInfo> OpenDDS::Federator::UpdateReceiver< DataType >::queue_ [private] |
Queue of publication data to process.
Definition at line 79 of file UpdateReceiver_T.h.
Referenced by OpenDDS::Federator::UpdateReceiver< DataType >::add(), OpenDDS::Federator::UpdateReceiver< DataType >::close(), and OpenDDS::Federator::UpdateReceiver< DataType >::svc().
bool OpenDDS::Federator::UpdateReceiver< DataType >::stop_ [private] |
Termination flag.
Definition at line 67 of file UpdateReceiver_T.h.
Referenced by OpenDDS::Federator::UpdateReceiver< DataType >::add(), OpenDDS::Federator::UpdateReceiver< DataType >::stop(), and OpenDDS::Federator::UpdateReceiver< DataType >::svc().
ACE_Condition<ACE_SYNCH_MUTEX> OpenDDS::Federator::UpdateReceiver< DataType >::workAvailable_ [private] |
Work to do indicator.
Definition at line 73 of file UpdateReceiver_T.h.
Referenced by OpenDDS::Federator::UpdateReceiver< DataType >::add(), OpenDDS::Federator::UpdateReceiver< DataType >::stop(), and OpenDDS::Federator::UpdateReceiver< DataType >::svc().