#include <UpdateReceiver_T.h>
Inheritance diagram for OpenDDS::Federator::UpdateReceiver< DataType >:


| Public Member Functions | |
| UpdateReceiver (UpdateProcessor< DataType > &processor) | |
| Construct with a processor reference. | |
| virtual | ~UpdateReceiver () | 
| Virtual destructor. | |
| virtual int | open (void *) | 
| virtual int | svc () | 
| virtual int | close (u_long flags=0) | 
| void | add (DataType *sample, DDS::SampleInfo *info) | 
| Sample enqueueing. | |
| void | stop () | 
| Synchronous termination. | |
| Private Types | |
| typedef std::pair< DataType *, DDS::SampleInfo * > | DataInfo | 
| Contents of the queue. | |
| Private Attributes | |
| UpdateProcessor< DataType > & | processor_ | 
| The object that we delegate update processing to. | |
| bool | stop_ | 
| Termination flag. | |
| ACE_SYNCH_MUTEX | lock_ | 
| Protect queue modifications. | |
| ACE_Condition< ACE_SYNCH_MUTEX > | workAvailable_ | 
| Work to do indicator. | |
| std::list< DataInfo > | queue_ | 
| Queue of publication data to process. | |
Definition at line 33 of file UpdateReceiver_T.h.
| typedef std::pair<DataType*, DDS::SampleInfo* > OpenDDS::Federator::UpdateReceiver< DataType >::DataInfo  [private] | 
| OpenDDS::Federator::UpdateReceiver< DataType >::UpdateReceiver | ( | UpdateProcessor< DataType > & | processor | ) | 
Construct with a processor reference.
Definition at line 24 of file UpdateReceiver_T.cpp.
References OpenDDS::DCPS::DCPS_debug_level, and OpenDDS::Federator::UpdateReceiver< DataType >::open().
00025 : processor_(processor), 00026 stop_(false), 00027 workAvailable_(this->lock_) 00028 { 00029 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 00030 ACE_DEBUG((LM_DEBUG, 00031 ACE_TEXT("(%P|%t) UpdateReceiver::UpdateReceiver()\n"))); 00032 } 00033 00034 // Always execute the thread. 00035 this->open(0); 00036 }
| OpenDDS::Federator::UpdateReceiver< DataType >::~UpdateReceiver | ( | ) |  [virtual] | 
Virtual destructor.
Definition at line 39 of file UpdateReceiver_T.cpp.
References OpenDDS::DCPS::DCPS_debug_level, and OpenDDS::Federator::UpdateReceiver< DataType >::stop().
00040 { 00041 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 00042 ACE_DEBUG((LM_DEBUG, 00043 ACE_TEXT("(%P|%t) UpdateReceiver::~UpdateReceiver()\n"))); 00044 } 00045 00046 // Cleanly terminate. 00047 this->stop(); 00048 this->wait(); 00049 }
| void OpenDDS::Federator::UpdateReceiver< DataType >::add | ( | DataType * | sample, | |
| DDS::SampleInfo * | info | |||
| ) | 
Sample enqueueing.
| sample | - pointer to the received sample to be processed | |
| info | - pointer to the info about the sample to be processed. | 
Definition at line 95 of file UpdateReceiver_T.cpp.
References OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::UpdateReceiver< DataType >::queue_, and OpenDDS::Federator::UpdateReceiver< DataType >::workAvailable_.
00096 { 00097 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 00098 ACE_DEBUG((LM_DEBUG, 00099 ACE_TEXT("(%P|%t) UpdateReceiver::add()\n"))); 00100 } 00101 00102 { // Protect the queue. 00103 ACE_GUARD(ACE_SYNCH_MUTEX, guard, this->lock_); 00104 this->queue_.push_back(DataInfo(sample, info)); 00105 00106 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 00107 ACE_DEBUG((LM_DEBUG, 00108 ACE_TEXT("(%P|%t) UpdateReceiver::add() - ") 00109 ACE_TEXT(" %d samples waiting to process in 0x%x.\n"), 00110 this->queue_.size(), 00111 (void*)this)); 00112 } 00113 } 00114 00115 this->workAvailable_.signal(); 00116 }
| int OpenDDS::Federator::UpdateReceiver< DataType >::close | ( | u_long | flags = 0 | ) |  [virtual] | 
Definition at line 66 of file UpdateReceiver_T.cpp.
References OpenDDS::DCPS::DCPS_debug_level.
00067 { 00068 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 00069 ACE_DEBUG((LM_DEBUG, 00070 ACE_TEXT("(%P|%t) UpdateReceiver::close()\n"))); 00071 } 00072 00073 return 0; 00074 }
| int OpenDDS::Federator::UpdateReceiver< DataType >::open | ( | void * | ) |  [virtual] | 
Definition at line 53 of file UpdateReceiver_T.cpp.
References OpenDDS::DCPS::DCPS_debug_level.
Referenced by OpenDDS::Federator::UpdateReceiver< DataType >::UpdateReceiver().
00054 { 00055 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 00056 ACE_DEBUG((LM_DEBUG, 00057 ACE_TEXT("(%P|%t) UpdateReceiver::open()\n"))); 00058 } 00059 00060 // Run as a separate thread. 00061 return this->activate(); 00062 }
| void OpenDDS::Federator::UpdateReceiver< DataType >::stop | ( | ) | 
Synchronous termination.
Definition at line 78 of file UpdateReceiver_T.cpp.
References OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::UpdateReceiver< DataType >::stop_, and OpenDDS::Federator::UpdateReceiver< DataType >::workAvailable_.
Referenced by OpenDDS::Federator::UpdateReceiver< DataType >::~UpdateReceiver().
00079 { 00080 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 00081 ACE_DEBUG((LM_DEBUG, 00082 ACE_TEXT("(%P|%t) UpdateReceiver::stop()\n"))); 00083 } 00084 00085 // Indicate the thread should stop and get its attention. 00086 if (this->stop_) 00087 return; 00088 00089 this->stop_ = true; 00090 this->workAvailable_.signal(); 00091 }
| int OpenDDS::Federator::UpdateReceiver< DataType >::svc | ( | ) |  [virtual] | 
Definition at line 120 of file UpdateReceiver_T.cpp.
References OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Federator::UpdateReceiver< DataType >::processor_, and OpenDDS::Federator::UpdateReceiver< DataType >::queue_.
00121 { 00122 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 00123 ACE_DEBUG((LM_DEBUG, 00124 ACE_TEXT("(%P|%t) UpdateReceiver::svc()\n"))); 00125 } 00126 00127 // Continue until we are synchronously terminated. 00128 while (this->stop_ == false) { 00129 { // Block until there is work to do. 00130 ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, guard, this->lock_, 0); 00131 00132 while (this->queue_.size() == 0) { 00133 // This releases the lock while we block. 00134 this->workAvailable_.wait(); 00135 00136 if (OpenDDS::DCPS::DCPS_debug_level > 9) { 00137 ACE_DEBUG((LM_DEBUG, 00138 ACE_TEXT("(%P|%t) UpdateReceiver::svc() - ") 00139 ACE_TEXT("wakeup in 0x%x.\n"), 00140 (void*)this)); 00141 } 00142 00143 // We were asked to stop instead. 00144 if (this->stop_ == true) { 00145 if (OpenDDS::DCPS::DCPS_debug_level > 4) { 00146 ACE_DEBUG((LM_DEBUG, 00147 ACE_TEXT("(%P|%t) UpdateReceiver::svc() - ") 00148 ACE_TEXT("discontinuing processing after wakeup in 0x%x.\n"), 00149 (void*)this)); 00150 } 00151 00152 return 0; 00153 } 00154 } 00155 } 00156 00157 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 00158 ACE_DEBUG((LM_DEBUG, 00159 ACE_TEXT("(%P|%t) UpdateReceiver::svc() - ") 00160 ACE_TEXT("processing a sample in 0x%x.\n"), 00161 (void*)this)); 00162 } 00163 00164 // Delegate actual processing to the publication manager. 00165 this->processor_.processSample( 00166 this->queue_.front().first, 00167 this->queue_.front().second); 00168 00169 { // Remove the completed work. 00170 ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, guard, this->lock_, 0); 00171 delete this->queue_.front().first; 00172 delete this->queue_.front().second; 00173 this->queue_.pop_front(); 00174 } 00175 } 00176 00177 if (OpenDDS::DCPS::DCPS_debug_level > 4) { 00178 ACE_DEBUG((LM_DEBUG, 00179 ACE_TEXT("(%P|%t) UpdateReceiver::svc() - ") 00180 ACE_TEXT("discontinuing processing after sample complete in 0x%x.\n"), 00181 (void*)this)); 00182 } 00183 00184 return 0; 00185 }
| ACE_SYNCH_MUTEX OpenDDS::Federator::UpdateReceiver< DataType >::lock_  [private] | 
| UpdateProcessor<DataType>& OpenDDS::Federator::UpdateReceiver< DataType >::processor_  [private] | 
The object that we delegate update processing to.
Definition at line 63 of file UpdateReceiver_T.h.
Referenced by OpenDDS::Federator::UpdateReceiver< DataType >::svc().
| std::list<DataInfo> OpenDDS::Federator::UpdateReceiver< DataType >::queue_  [private] | 
Queue of publication data to process.
Definition at line 78 of file UpdateReceiver_T.h.
Referenced by OpenDDS::Federator::UpdateReceiver< DataType >::add(), and OpenDDS::Federator::UpdateReceiver< DataType >::svc().
| bool OpenDDS::Federator::UpdateReceiver< DataType >::stop_  [private] | 
Termination flag.
Definition at line 66 of file UpdateReceiver_T.h.
Referenced by OpenDDS::Federator::UpdateReceiver< DataType >::stop().
| ACE_Condition<ACE_SYNCH_MUTEX> OpenDDS::Federator::UpdateReceiver< DataType >::workAvailable_  [private] | 
Work to do indicator.
Definition at line 72 of file UpdateReceiver_T.h.
Referenced by OpenDDS::Federator::UpdateReceiver< DataType >::add(), and OpenDDS::Federator::UpdateReceiver< DataType >::stop().
 1.4.7
 1.4.7