Line data Source code
1 : /*
2 : *
3 : *
4 : * Distributed under the OpenDDS License.
5 : * See: http://www.opendds.org/license.html
6 : */
7 :
8 : #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
9 :
10 : #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
11 :
12 : #include "OwnershipManager.h"
13 : #include "GuidConverter.h"
14 : #include "Util.h"
15 : #include "DataReaderImpl.h"
16 : #include <algorithm>
17 :
18 : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
19 :
20 : namespace OpenDDS {
21 : namespace DCPS {
22 :
23 : namespace Util {
24 :
25 0 : bool DescendingOwnershipStrengthSort(const OwnershipManager::WriterInfo& w1,
26 : const OwnershipManager::WriterInfo& w2)
27 : {
28 0 : return w1.ownership_strength_ > w2.ownership_strength_;
29 : }
30 :
31 : } // namespace Util
32 :
33 0 : OwnershipManager::OwnershipManager()
34 : {
35 0 : }
36 :
37 0 : OwnershipManager::~OwnershipManager()
38 : {
39 : // The type->instance should be empty if unregister instance are performed
40 : // by all readers, but in case the instance not unregistered for some reason,
41 : // an error will be logged.
42 0 : if (!type_instance_map_.empty()) {
43 : // There is no way to pass the instance map to concrete datareader
44 : // to delete, so it will be leaked.
45 0 : ACE_DEBUG((LM_WARNING,
46 : ACE_TEXT("(%P|%t) OwnershipManager::~OwnershipManager ")
47 : ACE_TEXT("- non-empty type_instance_map_\n")));
48 : }
49 0 : }
50 :
51 : int
52 0 : OwnershipManager::instance_lock_acquire()
53 : {
54 0 : return instance_lock_.acquire();
55 : }
56 :
57 : int
58 0 : OwnershipManager::instance_lock_release()
59 : {
60 0 : return instance_lock_.release();
61 : }
62 :
63 : RcHandle<RcObject>
64 0 : OwnershipManager::get_instance_map(const char* type_name,
65 : DataReaderImpl* reader)
66 : {
67 0 : InstanceMap* instance = 0;
68 0 : if (0 != find(type_instance_map_, type_name, instance)) {
69 0 : return RcHandle<RcObject>();
70 : }
71 :
72 0 : instance->readers_.insert(reader);
73 0 : return instance->map_;
74 : }
75 :
76 : void
77 0 : OwnershipManager::set_instance_map(const char* type_name,
78 : const RcHandle<RcObject>& instance_map,
79 : DataReaderImpl* reader)
80 : {
81 0 : if (DCPS_debug_level >= 1) {
82 0 : ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) OwnershipManager::set_instance_map ")
83 : ACE_TEXT("instance map %X is created by reader %X\n"),
84 : instance_map.in(), reader));
85 : }
86 :
87 0 : if (0 != OpenDDS::DCPS::bind(type_instance_map_, type_name,
88 0 : InstanceMap(instance_map, reader))) {
89 0 : ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: OwnershipManager::set_instance_map "
90 : "failed to bind instance for type \"%C\"\n", type_name));
91 : }
92 0 : }
93 :
94 : void
95 0 : OwnershipManager::unregister_reader(const char* type_name,
96 : DataReaderImpl* reader)
97 : {
98 0 : ACE_GUARD(ACE_Thread_Mutex, guard, instance_lock_);
99 :
100 0 : InstanceMap* instance = 0;
101 0 : if (0 != find(type_instance_map_, type_name, instance)) {
102 0 : return;
103 : }
104 :
105 0 : instance->readers_.erase(reader);
106 :
107 0 : if (instance->readers_.empty()) {
108 0 : if (DCPS_debug_level >= 1) {
109 0 : ACE_DEBUG((LM_DEBUG,
110 : ACE_TEXT("(%P|%t) OwnershipManager::unregister_reader ")
111 : ACE_TEXT(" instance map %@ is deleted by reader %@\n"),
112 : instance->map_.in(), reader));
113 : }
114 0 : unbind(type_instance_map_, type_name);
115 : }
116 0 : }
117 :
118 : void
119 0 : OwnershipManager::remove_writer(const GUID_t& pub_id)
120 : {
121 0 : ACE_GUARD(ACE_Thread_Mutex, guard, instance_lock_);
122 :
123 : const InstanceOwnershipWriterInfos::iterator the_end =
124 0 : instance_ownership_infos_.end();
125 0 : for (InstanceOwnershipWriterInfos::iterator iter =
126 0 : instance_ownership_infos_.begin(); iter != the_end; ++iter) {
127 0 : remove_writer(iter->first, iter->second, pub_id);
128 : }
129 0 : }
130 :
131 : void
132 0 : OwnershipManager::remove_instance(InstanceState* instance_state)
133 : {
134 0 : ACE_GUARD(ACE_Thread_Mutex, guard, instance_lock_);
135 0 : const DDS::InstanceHandle_t ih = instance_state->instance_handle();
136 0 : InstanceOwnershipWriterInfos::iterator i = instance_ownership_infos_.find(ih);
137 0 : if (i != instance_ownership_infos_.end()) {
138 0 : InstanceStateVec& states = i->second.instance_states_;
139 0 : for (size_t j = 0; j < states.size(); ++j) {
140 0 : if (states[j].in() == instance_state) {
141 0 : states.erase(states.begin() + j);
142 0 : break;
143 : }
144 : }
145 : }
146 0 : }
147 :
148 : void
149 0 : OwnershipManager::remove_writers(const DDS::InstanceHandle_t& instance_handle)
150 : {
151 0 : InstanceStateVec instances_to_reset;
152 : {
153 0 : ACE_GUARD(ACE_Thread_Mutex, guard, instance_lock_);
154 :
155 0 : if (DCPS_debug_level >= 1) {
156 0 : ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) OwnershipManager::remove_writers:")
157 : ACE_TEXT(" disassociate writers with instance %d\n"),
158 : instance_handle));
159 : }
160 :
161 : InstanceOwnershipWriterInfos::iterator owner_wi =
162 0 : instance_ownership_infos_.find(instance_handle);
163 0 : if (owner_wi != instance_ownership_infos_.end()) {
164 0 : owner_wi->second.owner_ = WriterInfo();
165 0 : owner_wi->second.candidates_.clear();
166 : const InstanceStateVec::iterator end =
167 0 : owner_wi->second.instance_states_.end();
168 0 : for (InstanceStateVec::iterator iter =
169 0 : owner_wi->second.instance_states_.begin(); iter != end; ++iter) {
170 : // call after lock released, will call back to data reader
171 0 : instances_to_reset.push_back(*iter);
172 : }
173 0 : owner_wi->second.instance_states_.clear();
174 :
175 0 : instance_ownership_infos_.erase(owner_wi);
176 : }
177 0 : }
178 : // Lock released
179 0 : for (InstanceStateVec::iterator instance = instances_to_reset.begin();
180 0 : instance != instances_to_reset.end(); ++instance) {
181 0 : (*instance)->reset_ownership(instance_handle);
182 : }
183 0 : }
184 :
185 :
186 : bool
187 0 : OwnershipManager::is_owner(const DDS::InstanceHandle_t& instance_handle,
188 : const GUID_t& pub_id)
189 : {
190 0 : ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, instance_lock_, false);
191 :
192 : InstanceOwnershipWriterInfos::iterator iter
193 0 : = instance_ownership_infos_.find(instance_handle);
194 0 : if (iter != instance_ownership_infos_.end()) {
195 0 : return iter->second.owner_.pub_id_ == pub_id;
196 : }
197 :
198 0 : return false;
199 0 : }
200 :
201 :
202 : bool // owner unregister instance
203 0 : OwnershipManager::remove_writer(const DDS::InstanceHandle_t& instance_handle,
204 : const GUID_t& pub_id)
205 : {
206 0 : ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, instance_lock_, false);
207 :
208 : InstanceOwnershipWriterInfos::iterator the_iter =
209 0 : instance_ownership_infos_.find(instance_handle);
210 0 : if (the_iter != instance_ownership_infos_.end()) {
211 0 : return remove_writer(instance_handle, the_iter->second, pub_id);
212 : }
213 :
214 0 : return false;
215 0 : }
216 :
217 : bool
218 0 : OwnershipManager::remove_writer(const DDS::InstanceHandle_t& instance_handle,
219 : OwnershipWriterInfos& infos,
220 : const GUID_t& pub_id)
221 : {
222 0 : if (infos.owner_.pub_id_ == pub_id) {
223 0 : remove_owner(instance_handle, infos, false);
224 0 : return true;
225 :
226 : } else {
227 0 : remove_candidate(infos, pub_id);
228 0 : return false;
229 : }
230 : }
231 :
232 :
233 : void
234 0 : OwnershipManager::remove_owner(const DDS::InstanceHandle_t& instance_handle,
235 : OwnershipWriterInfos& infos,
236 : bool sort)
237 : {
238 : //change owner
239 0 : GUID_t new_owner(GUID_UNKNOWN);
240 0 : if (infos.candidates_.empty()) {
241 0 : infos.owner_ = WriterInfo();
242 :
243 : } else {
244 0 : if (sort) {
245 0 : std::sort(infos.candidates_.begin(), infos.candidates_.end(),
246 : Util::DescendingOwnershipStrengthSort);
247 : }
248 :
249 0 : const WriterInfos::iterator begin = infos.candidates_.begin();
250 0 : infos.owner_ = *begin;
251 0 : infos.candidates_.erase(begin);
252 0 : new_owner = infos.owner_.pub_id_;
253 : }
254 :
255 0 : broadcast_new_owner(instance_handle, infos, new_owner);
256 0 : }
257 :
258 :
259 : void
260 0 : OwnershipManager::remove_candidate(OwnershipWriterInfos& infos,
261 : const GUID_t& pub_id)
262 : {
263 0 : if (!infos.candidates_.empty()) {
264 0 : WriterInfos::iterator const the_end = infos.candidates_.end();
265 :
266 0 : WriterInfos::iterator found_candidate = the_end;
267 : // Supplied writer is not an owner, check if it exists in candidate list.
268 : // If not, add it to the candidate list and sort the list.
269 0 : for (WriterInfos::iterator iter = infos.candidates_.begin();
270 0 : iter != the_end; ++iter) {
271 0 : if (iter->pub_id_ == pub_id) {
272 0 : found_candidate = iter;
273 0 : break;
274 : }
275 : }
276 :
277 0 : if (found_candidate != the_end) {
278 0 : infos.candidates_.erase(found_candidate);
279 : }
280 : }
281 0 : }
282 :
283 : bool
284 0 : OwnershipManager::select_owner(const DDS::InstanceHandle_t& instance_handle,
285 : const GUID_t& pub_id,
286 : const CORBA::Long& ownership_strength,
287 : InstanceState_rch instance_state)
288 : {
289 0 : ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, instance_lock_, false);
290 :
291 : InstanceOwnershipWriterInfos::iterator iter =
292 0 : instance_ownership_infos_.find(instance_handle);
293 0 : if (iter != instance_ownership_infos_.end()) {
294 0 : OwnershipWriterInfos& infos = iter->second;
295 0 : if (!instance_state->registered()) {
296 0 : infos.instance_states_.push_back(instance_state);
297 0 : instance_state->registered(true);
298 : }
299 :
300 : // No owner at some point.
301 0 : if (infos.owner_.pub_id_ == GUID_UNKNOWN) {
302 0 : infos.owner_ = WriterInfo(pub_id, ownership_strength);
303 0 : broadcast_new_owner(instance_handle, infos, pub_id);
304 0 : return true;
305 :
306 0 : } else if (infos.owner_.pub_id_ == pub_id) { // is current owner
307 : //still owner but strength changed to be bigger..
308 0 : if (infos.owner_.ownership_strength_ <= ownership_strength) {
309 0 : infos.owner_.ownership_strength_ = ownership_strength;
310 0 : return true;
311 :
312 : } else { //update strength and reevaluate owner which broadcast new owner.
313 0 : infos.candidates_.push_back(WriterInfo(pub_id, ownership_strength));
314 0 : remove_owner(instance_handle, infos, true);
315 0 : return infos.owner_.pub_id_ == pub_id;
316 : }
317 :
318 : } else { // not current owner, reevaluate the owner
319 0 : bool replace_owner = false;
320 : // Add current owner to candidate list for owner reevaluation
321 : // if provided pub has strength greater than current owner.
322 0 : if (ownership_strength > infos.owner_.ownership_strength_) {
323 0 : infos.candidates_.push_back(infos.owner_);
324 0 : replace_owner = true;
325 : }
326 :
327 0 : bool found = false;
328 0 : bool sort = true;
329 :
330 : // check if it already existed in candidate list. If not,
331 : // add it to the candidate list, otherwise update strength
332 : // if strength was changed.
333 0 : const WriterInfos::iterator the_end = infos.candidates_.end();
334 :
335 0 : for (WriterInfos::iterator iter = infos.candidates_.begin();
336 0 : iter != the_end; ++iter) {
337 :
338 0 : if (iter->pub_id_ == pub_id) {
339 0 : if (iter->ownership_strength_ != ownership_strength) {
340 0 : iter->ownership_strength_ = ownership_strength;
341 : } else {
342 0 : sort = false;
343 : }
344 0 : found = true;
345 0 : break;
346 : }
347 : }
348 :
349 0 : if (!found) {
350 0 : infos.candidates_.push_back(WriterInfo(pub_id, ownership_strength));
351 : }
352 :
353 0 : if (sort) {
354 0 : std::sort(infos.candidates_.begin(), infos.candidates_.end(),
355 : Util::DescendingOwnershipStrengthSort);
356 : }
357 :
358 0 : if (replace_owner) {
359 : // Owner was already moved to candidate list and the list was sorted
360 : // already so pick owner from sorted list and replace current
361 : // owner.
362 0 : remove_owner(instance_handle, infos, false);
363 : }
364 :
365 0 : return infos.owner_.pub_id_ == pub_id;
366 : }
367 :
368 : } else {
369 : // first writer of the instance so it's owner.
370 0 : OwnershipWriterInfos& infos = instance_ownership_infos_[instance_handle];
371 0 : infos.owner_ = WriterInfo(pub_id, ownership_strength);
372 0 : if (!instance_state->registered()) {
373 0 : infos.instance_states_.push_back(instance_state);
374 0 : instance_state->registered(true);
375 : }
376 0 : broadcast_new_owner(instance_handle, infos, infos.owner_.pub_id_);
377 0 : return true;
378 : }
379 :
380 : return false;
381 0 : }
382 :
383 :
384 : void
385 0 : OwnershipManager::broadcast_new_owner(const DDS::InstanceHandle_t& instance_handle,
386 : OwnershipWriterInfos& infos,
387 : const GUID_t& owner)
388 : {
389 0 : if (DCPS_debug_level >= 1) {
390 : // This may not be an error since it could happen that the sample
391 : // is delivered to the datareader after the write is dis-associated
392 : // with this datareader.
393 0 : ACE_DEBUG((LM_DEBUG,
394 : ACE_TEXT("(%P|%t) OwnershipManager::broadcast_new_owner: ")
395 : ACE_TEXT("owner writer %C, instance handle %d strength %d num ")
396 : ACE_TEXT("of candidates %d\n"),
397 : LogGuid(owner).c_str(), instance_handle,
398 : infos.owner_.ownership_strength_,
399 : (int)infos.candidates_.size()));
400 : }
401 :
402 0 : const InstanceStateVec::iterator the_end = infos.instance_states_.end();
403 0 : for (InstanceStateVec::iterator iter = infos.instance_states_.begin();
404 0 : iter != the_end; ++iter) {
405 0 : (*iter)->set_owner(owner);
406 : }
407 0 : }
408 :
409 : void
410 0 : OwnershipManager::remove_owner(const DDS::InstanceHandle_t& instance_handle)
411 : {
412 0 : ACE_GUARD(ACE_Thread_Mutex, guard, instance_lock_);
413 :
414 : const InstanceOwnershipWriterInfos::iterator iter =
415 0 : instance_ownership_infos_.find(instance_handle);
416 :
417 0 : if (iter != instance_ownership_infos_.end()) {
418 0 : remove_owner(instance_handle, iter->second, false);
419 : }
420 0 : }
421 :
422 :
423 : } // namespace DCPS
424 : } // namespace OpenDDS
425 :
426 : OPENDDS_END_VERSIONED_NAMESPACE_DECL
427 :
428 : #endif // OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
|