OpenDDS  Snapshot(2023/04/28-20:55)
UpdateReceiver_T.cpp
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 #ifndef UPDATERECEIVER_T_CPP
9 #define UPDATERECEIVER_T_CPP
10 
11 #if !defined (ACE_LACKS_PRAGMA_ONCE)
12 #pragma once
13 #endif /* ACE_LACKS_PRAGMA_ONCE */
14 
15 #include "DcpsInfo_pch.h"
16 #include "dds/DCPS/debug.h"
17 #include "UpdateReceiver_T.h"
18 #include "UpdateProcessor_T.h"
19 
21 
22 namespace OpenDDS {
23 namespace Federator {
24 
25 template<class DataType>
27  : processor_(processor),
28  stop_(false),
29  workAvailable_(this->lock_)
30 {
33  ACE_TEXT("(%P|%t) UpdateReceiver::UpdateReceiver()\n")));
34  }
35 
36  // Always execute the thread.
37  this->open(0);
38 }
39 
40 template<class DataType>
42 {
45  ACE_TEXT("(%P|%t) UpdateReceiver::~UpdateReceiver()\n")));
46  }
47 
48  // Cleanly terminate.
49  this->stop();
50  this->wait();
51 }
52 
53 template<class DataType>
54 int
56 {
59  ACE_TEXT("(%P|%t) UpdateReceiver::open()\n")));
60  }
61 
62  // Run as a separate thread.
63  return this->activate();
64 }
65 
66 template<class DataType>
67 int
69 {
72  ACE_TEXT("(%P|%t) UpdateReceiver::close()\n")));
73  }
74 
75  while (this->queue_.size()) {
76  delete this->queue_.front().first;
77  delete this->queue_.front().second;
78  this->queue_.pop_front();
79  }
80 
81  return 0;
82 }
83 
84 template<class DataType>
85 void
87 {
90  ACE_TEXT("(%P|%t) UpdateReceiver::stop()\n")));
91  }
92 
93  // Indicate the thread should stop and get its attention.
94  if (this->stop_)
95  return;
96 
97  this->stop_ = true;
98  this->workAvailable_.signal();
99 }
100 
101 template<class DataType>
102 void
104 {
107  ACE_TEXT("(%P|%t) UpdateReceiver::add()\n")));
108  }
109 
110  if (this->stop_)
111  return;
112 
113  { // Protect the queue.
114  ACE_GUARD(ACE_SYNCH_MUTEX, guard, this->lock_);
115  this->queue_.push_back(DataInfo(sample.release(), info.release()));
116 
119  ACE_TEXT("(%P|%t) UpdateReceiver::add() - ")
120  ACE_TEXT(" %d samples waiting to process in 0x%x.\n"),
121  this->queue_.size(),
122  (void*)this));
123  }
124  }
125 
126  this->workAvailable_.signal();
127 }
128 
129 template<class DataType>
130 int
132 {
135  ACE_TEXT("(%P|%t) UpdateReceiver::svc()\n")));
136  }
137 
138  // Continue until we are synchronously terminated.
139  while (this->stop_ == false) {
140  { // Block until there is work to do.
141  ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, guard, this->lock_, 0);
142 
143  while (this->queue_.size() == 0) {
144  // This releases the lock while we block.
145  this->workAvailable_.wait();
146 
149  ACE_TEXT("(%P|%t) UpdateReceiver::svc() - ")
150  ACE_TEXT("wakeup in 0x%x.\n"),
151  (void*)this));
152  }
153 
154  // We were asked to stop instead.
155  if (this->stop_ == true) {
158  ACE_TEXT("(%P|%t) UpdateReceiver::svc() - ")
159  ACE_TEXT("discontinuing processing after wakeup in 0x%x.\n"),
160  (void*)this));
161  }
162 
163  return 0;
164  }
165  }
166  }
167 
170  ACE_TEXT("(%P|%t) UpdateReceiver::svc() - ")
171  ACE_TEXT("processing a sample in 0x%x.\n"),
172  (void*)this));
173  }
174 
175  // Delegate actual processing to the publication manager.
176  this->processor_.processSample(
177  this->queue_.front().first,
178  this->queue_.front().second);
179 
180  { // Remove the completed work.
181  ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, guard, this->lock_, 0);
182  delete this->queue_.front().first;
183  delete this->queue_.front().second;
184  this->queue_.pop_front();
185  }
186  }
187 
190  ACE_TEXT("(%P|%t) UpdateReceiver::svc() - ")
191  ACE_TEXT("discontinuing processing after sample complete in 0x%x.\n"),
192  (void*)this));
193  }
194 
195  return 0;
196 }
197 
198 } // namespace Federator
199 } // namespace OpenDDS
200 
202 
203 #endif /* UPDATERECEIVER_T_CPP */
#define ACE_DEBUG(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
#define ACE_SYNCH_MUTEX
Interface for managing update publications.
pid_t wait(int *=0)
ACE_Guard< ACE_Thread_Mutex > lock_
LM_DEBUG
ACE_HANDLE open(const char *filename, int mode, mode_t perms=ACE_DEFAULT_OPEN_PERMS, LPSECURITY_ATTRIBUTES sa=0)
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
ACE_TEXT("TCP_Factory")
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
std::pair< OpenDDS::Federator::OwnerUpdate *, DDS::SampleInfo * > DataInfo
Contents of the queue.
void activate(T &obj_ref, PortableServer::POA_ptr poa, PortableServer::ServantBase *servant, TAO_EC_Object_Deactivator &suggested_object_deactivator)
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
UpdateReceiver(UpdateProcessor< DataType > &processor)
Construct with a processor reference.