#include <UpdateReceiver_T.h>
Inheritance diagram for OpenDDS::Federator::UpdateReceiver< DataType >:
Public Member Functions | |
UpdateReceiver (UpdateProcessor< DataType > &processor) | |
Construct with a processor reference. | |
virtual | ~UpdateReceiver () |
Virtual destructor. | |
virtual int | open (void *) |
virtual int | svc () |
virtual int | close (u_long flags=0) |
void | add (DataType *sample, DDS::SampleInfo *info) |
Sample enqueueing. | |
void | stop () |
Synchronous termination. | |
Private Types | |
typedef std::pair< DataType *, DDS::SampleInfo * > | DataInfo |
Contents of the queue. | |
Private Attributes | |
UpdateProcessor< DataType > & | processor_ |
The object that we delegate update processing to. | |
bool | stop_ |
Termination flag. | |
ACE_SYNCH_MUTEX | lock_ |
Protect queue modifications. | |
ACE_Condition< ACE_SYNCH_MUTEX > | workAvailable_ |
Work to do indicator. | |
std::list< DataInfo > | queue_ |
Queue of publication data to process. |
Definition at line 33 of file UpdateReceiver_T.h.
typedef std::pair<DataType*, DDS::SampleInfo* > OpenDDS::Federator::UpdateReceiver< DataType >::DataInfo [private] |
OpenDDS::Federator::UpdateReceiver< DataType >::UpdateReceiver | ( | UpdateProcessor< DataType > & | processor | ) |
Construct with a processor reference.
Definition at line 24 of file UpdateReceiver_T.cpp.
References OpenDDS::DCPS::DCPS_debug_level, and OpenDDS::Federator::UpdateReceiver< DataType >::open().
00025 : processor_(processor), 00026 stop_(false), 00027 workAvailable_(this->lock_) 00028 { 00029 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 00030 ACE_DEBUG((LM_DEBUG, 00031 ACE_TEXT("(%P|%t) UpdateReceiver::UpdateReceiver()\n"))); 00032 } 00033 00034 // Always execute the thread. 00035 this->open(0); 00036 }
OpenDDS::Federator::UpdateReceiver< DataType >::~UpdateReceiver | ( | ) | [virtual] |
Virtual destructor.
Definition at line 39 of file UpdateReceiver_T.cpp.
References OpenDDS::DCPS::DCPS_debug_level, and OpenDDS::Federator::UpdateReceiver< DataType >::stop().
00040 { 00041 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 00042 ACE_DEBUG((LM_DEBUG, 00043 ACE_TEXT("(%P|%t) UpdateReceiver::~UpdateReceiver()\n"))); 00044 } 00045 00046 // Cleanly terminate. 00047 this->stop(); 00048 this->wait(); 00049 }
void OpenDDS::Federator::UpdateReceiver< DataType >::add | ( | DataType * | sample, | |
DDS::SampleInfo * | info | |||
) |
Sample enqueueing.
sample | - pointer to the received sample to be processed | |
info | - pointer to the info about the sample to be processed. |
Definition at line 95 of file UpdateReceiver_T.cpp.
References OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::UpdateReceiver< DataType >::queue_, and OpenDDS::Federator::UpdateReceiver< DataType >::workAvailable_.
00096 { 00097 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 00098 ACE_DEBUG((LM_DEBUG, 00099 ACE_TEXT("(%P|%t) UpdateReceiver::add()\n"))); 00100 } 00101 00102 { // Protect the queue. 00103 ACE_GUARD(ACE_SYNCH_MUTEX, guard, this->lock_); 00104 this->queue_.push_back(DataInfo(sample, info)); 00105 00106 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 00107 ACE_DEBUG((LM_DEBUG, 00108 ACE_TEXT("(%P|%t) UpdateReceiver::add() - ") 00109 ACE_TEXT(" %d samples waiting to process in 0x%x.\n"), 00110 this->queue_.size(), 00111 (void*)this)); 00112 } 00113 } 00114 00115 this->workAvailable_.signal(); 00116 }
int OpenDDS::Federator::UpdateReceiver< DataType >::close | ( | u_long | flags = 0 |
) | [virtual] |
Definition at line 66 of file UpdateReceiver_T.cpp.
References OpenDDS::DCPS::DCPS_debug_level.
00067 { 00068 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 00069 ACE_DEBUG((LM_DEBUG, 00070 ACE_TEXT("(%P|%t) UpdateReceiver::close()\n"))); 00071 } 00072 00073 return 0; 00074 }
int OpenDDS::Federator::UpdateReceiver< DataType >::open | ( | void * | ) | [virtual] |
Definition at line 53 of file UpdateReceiver_T.cpp.
References OpenDDS::DCPS::DCPS_debug_level.
Referenced by OpenDDS::Federator::UpdateReceiver< DataType >::UpdateReceiver().
00054 { 00055 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 00056 ACE_DEBUG((LM_DEBUG, 00057 ACE_TEXT("(%P|%t) UpdateReceiver::open()\n"))); 00058 } 00059 00060 // Run as a separate thread. 00061 return this->activate(); 00062 }
void OpenDDS::Federator::UpdateReceiver< DataType >::stop | ( | ) |
Synchronous termination.
Definition at line 78 of file UpdateReceiver_T.cpp.
References OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::UpdateReceiver< DataType >::stop_, and OpenDDS::Federator::UpdateReceiver< DataType >::workAvailable_.
Referenced by OpenDDS::Federator::UpdateReceiver< DataType >::~UpdateReceiver().
00079 { 00080 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 00081 ACE_DEBUG((LM_DEBUG, 00082 ACE_TEXT("(%P|%t) UpdateReceiver::stop()\n"))); 00083 } 00084 00085 // Indicate the thread should stop and get its attention. 00086 if (this->stop_) 00087 return; 00088 00089 this->stop_ = true; 00090 this->workAvailable_.signal(); 00091 }
int OpenDDS::Federator::UpdateReceiver< DataType >::svc | ( | ) | [virtual] |
Definition at line 120 of file UpdateReceiver_T.cpp.
References OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::UpdateReceiver< DataType >::processor_, and OpenDDS::Federator::UpdateReceiver< DataType >::queue_.
00121 { 00122 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 00123 ACE_DEBUG((LM_DEBUG, 00124 ACE_TEXT("(%P|%t) UpdateReceiver::svc()\n"))); 00125 } 00126 00127 // Continue until we are synchronously terminated. 00128 while (this->stop_ == false) { 00129 { // Block until there is work to do. 00130 ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, guard, this->lock_, 0); 00131 00132 while (this->queue_.size() == 0) { 00133 // This releases the lock while we block. 00134 this->workAvailable_.wait(); 00135 00136 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 00137 ACE_DEBUG((LM_DEBUG, 00138 ACE_TEXT("(%P|%t) UpdateReceiver::svc() - ") 00139 ACE_TEXT("wakeup in 0x%x.\n"), 00140 (void*)this)); 00141 } 00142 00143 // We were asked to stop instead. 00144 if (this->stop_ == true) { 00145 if (OpenDDS::DCPS::DCPS_debug_level > 4) { 00146 ACE_DEBUG((LM_DEBUG, 00147 ACE_TEXT("(%P|%t) UpdateReceiver::svc() - ") 00148 ACE_TEXT("discontinuing processing after wakeup in 0x%x.\n"), 00149 (void*)this)); 00150 } 00151 00152 return 0; 00153 } 00154 } 00155 } 00156 00157 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 00158 ACE_DEBUG((LM_DEBUG, 00159 ACE_TEXT("(%P|%t) UpdateReceiver::svc() - ") 00160 ACE_TEXT("processing a sample in 0x%x.\n"), 00161 (void*)this)); 00162 } 00163 00164 // Delegate actual processing to the publication manager. 00165 this->processor_.processSample( 00166 this->queue_.front().first, 00167 this->queue_.front().second); 00168 00169 { // Remove the completed work. 00170 ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, guard, this->lock_, 0); 00171 delete this->queue_.front().first; 00172 delete this->queue_.front().second; 00173 this->queue_.pop_front(); 00174 } 00175 } 00176 00177 if (OpenDDS::DCPS::DCPS_debug_level > 4) { 00178 ACE_DEBUG((LM_DEBUG, 00179 ACE_TEXT("(%P|%t) UpdateReceiver::svc() - ") 00180 ACE_TEXT("discontinuing processing after sample complete in 0x%x.\n"), 00181 (void*)this)); 00182 } 00183 00184 return 0; 00185 }
ACE_SYNCH_MUTEX OpenDDS::Federator::UpdateReceiver< DataType >::lock_ [private] |
UpdateProcessor<DataType>& OpenDDS::Federator::UpdateReceiver< DataType >::processor_ [private] |
The object that we delegate update processing to.
Definition at line 63 of file UpdateReceiver_T.h.
Referenced by OpenDDS::Federator::UpdateReceiver< DataType >::svc().
std::list<DataInfo> OpenDDS::Federator::UpdateReceiver< DataType >::queue_ [private] |
Queue of publication data to process.
Definition at line 78 of file UpdateReceiver_T.h.
Referenced by OpenDDS::Federator::UpdateReceiver< DataType >::add(), and OpenDDS::Federator::UpdateReceiver< DataType >::svc().
bool OpenDDS::Federator::UpdateReceiver< DataType >::stop_ [private] |
Termination flag.
Definition at line 66 of file UpdateReceiver_T.h.
Referenced by OpenDDS::Federator::UpdateReceiver< DataType >::stop().
ACE_Condition<ACE_SYNCH_MUTEX> OpenDDS::Federator::UpdateReceiver< DataType >::workAvailable_ [private] |
Work to do indicator.
Definition at line 72 of file UpdateReceiver_T.h.
Referenced by OpenDDS::Federator::UpdateReceiver< DataType >::add(), and OpenDDS::Federator::UpdateReceiver< DataType >::stop().