OpenDDS  Snapshot(2023/04/28-20:55)
OwnershipManager.cpp
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
9 
10 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
11 
12 #include "OwnershipManager.h"
13 #include "GuidConverter.h"
14 #include "Util.h"
15 #include "DataReaderImpl.h"
16 #include <algorithm>
17 
19 
20 namespace OpenDDS {
21 namespace DCPS {
22 
23 namespace Util {
24 
27 {
29 }
30 
31 } // namespace Util
32 
34 {
35 }
36 
38 {
39  // The type->instance should be empty if unregister instance are performed
40  // by all readers, but in case the instance not unregistered for some reason,
41  // an error will be logged.
42  if (!type_instance_map_.empty()) {
43  // There is no way to pass the instance map to concrete datareader
44  // to delete, so it will be leaked.
46  ACE_TEXT("(%P|%t) OwnershipManager::~OwnershipManager ")
47  ACE_TEXT("- non-empty type_instance_map_\n")));
48  }
49 }
50 
51 int
53 {
54  return instance_lock_.acquire();
55 }
56 
57 int
59 {
60  return instance_lock_.release();
61 }
62 
64 OwnershipManager::get_instance_map(const char* type_name,
65  DataReaderImpl* reader)
66 {
67  InstanceMap* instance = 0;
68  if (0 != find(type_instance_map_, type_name, instance)) {
69  return RcHandle<RcObject>();
70  }
71 
72  instance->readers_.insert(reader);
73  return instance->map_;
74 }
75 
76 void
77 OwnershipManager::set_instance_map(const char* type_name,
78  const RcHandle<RcObject>& instance_map,
79  DataReaderImpl* reader)
80 {
81  if (DCPS_debug_level >= 1) {
82  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) OwnershipManager::set_instance_map ")
83  ACE_TEXT("instance map %X is created by reader %X\n"),
84  instance_map.in(), reader));
85  }
86 
87  if (0 != OpenDDS::DCPS::bind(type_instance_map_, type_name,
88  InstanceMap(instance_map, reader))) {
89  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: OwnershipManager::set_instance_map "
90  "failed to bind instance for type \"%C\"\n", type_name));
91  }
92 }
93 
94 void
96  DataReaderImpl* reader)
97 {
98  ACE_GUARD(ACE_Thread_Mutex, guard, instance_lock_);
99 
100  InstanceMap* instance = 0;
101  if (0 != find(type_instance_map_, type_name, instance)) {
102  return;
103  }
104 
105  instance->readers_.erase(reader);
106 
107  if (instance->readers_.empty()) {
108  if (DCPS_debug_level >= 1) {
110  ACE_TEXT("(%P|%t) OwnershipManager::unregister_reader ")
111  ACE_TEXT(" instance map %@ is deleted by reader %@\n"),
112  instance->map_.in(), reader));
113  }
114  unbind(type_instance_map_, type_name);
115  }
116 }
117 
118 void
120 {
121  ACE_GUARD(ACE_Thread_Mutex, guard, instance_lock_);
122 
123  const InstanceOwnershipWriterInfos::iterator the_end =
124  instance_ownership_infos_.end();
125  for (InstanceOwnershipWriterInfos::iterator iter =
126  instance_ownership_infos_.begin(); iter != the_end; ++iter) {
127  remove_writer(iter->first, iter->second, pub_id);
128  }
129 }
130 
131 void
133 {
134  ACE_GUARD(ACE_Thread_Mutex, guard, instance_lock_);
135  const DDS::InstanceHandle_t ih = instance_state->instance_handle();
136  InstanceOwnershipWriterInfos::iterator i = instance_ownership_infos_.find(ih);
137  if (i != instance_ownership_infos_.end()) {
138  InstanceStateVec& states = i->second.instance_states_;
139  for (size_t j = 0; j < states.size(); ++j) {
140  if (states[j].in() == instance_state) {
141  states.erase(states.begin() + j);
142  break;
143  }
144  }
145  }
146 }
147 
148 void
150 {
151  InstanceStateVec instances_to_reset;
152  {
153  ACE_GUARD(ACE_Thread_Mutex, guard, instance_lock_);
154 
155  if (DCPS_debug_level >= 1) {
156  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) OwnershipManager::remove_writers:")
157  ACE_TEXT(" disassociate writers with instance %d\n"),
158  instance_handle));
159  }
160 
161  InstanceOwnershipWriterInfos::iterator owner_wi =
162  instance_ownership_infos_.find(instance_handle);
163  if (owner_wi != instance_ownership_infos_.end()) {
164  owner_wi->second.owner_ = WriterInfo();
165  owner_wi->second.candidates_.clear();
166  const InstanceStateVec::iterator end =
167  owner_wi->second.instance_states_.end();
168  for (InstanceStateVec::iterator iter =
169  owner_wi->second.instance_states_.begin(); iter != end; ++iter) {
170  // call after lock released, will call back to data reader
171  instances_to_reset.push_back(*iter);
172  }
173  owner_wi->second.instance_states_.clear();
174 
175  instance_ownership_infos_.erase(owner_wi);
176  }
177  }
178  // Lock released
179  for (InstanceStateVec::iterator instance = instances_to_reset.begin();
180  instance != instances_to_reset.end(); ++instance) {
181  (*instance)->reset_ownership(instance_handle);
182  }
183 }
184 
185 
186 bool
188  const GUID_t& pub_id)
189 {
190  ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, instance_lock_, false);
191 
192  InstanceOwnershipWriterInfos::iterator iter
193  = instance_ownership_infos_.find(instance_handle);
194  if (iter != instance_ownership_infos_.end()) {
195  return iter->second.owner_.pub_id_ == pub_id;
196  }
197 
198  return false;
199 }
200 
201 
202 bool // owner unregister instance
204  const GUID_t& pub_id)
205 {
206  ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, instance_lock_, false);
207 
208  InstanceOwnershipWriterInfos::iterator the_iter =
209  instance_ownership_infos_.find(instance_handle);
210  if (the_iter != instance_ownership_infos_.end()) {
211  return remove_writer(instance_handle, the_iter->second, pub_id);
212  }
213 
214  return false;
215 }
216 
217 bool
219  OwnershipWriterInfos& infos,
220  const GUID_t& pub_id)
221 {
222  if (infos.owner_.pub_id_ == pub_id) {
223  remove_owner(instance_handle, infos, false);
224  return true;
225 
226  } else {
227  remove_candidate(infos, pub_id);
228  return false;
229  }
230 }
231 
232 
233 void
235  OwnershipWriterInfos& infos,
236  bool sort)
237 {
238  //change owner
239  GUID_t new_owner(GUID_UNKNOWN);
240  if (infos.candidates_.empty()) {
241  infos.owner_ = WriterInfo();
242 
243  } else {
244  if (sort) {
245  std::sort(infos.candidates_.begin(), infos.candidates_.end(),
247  }
248 
249  const WriterInfos::iterator begin = infos.candidates_.begin();
250  infos.owner_ = *begin;
251  infos.candidates_.erase(begin);
252  new_owner = infos.owner_.pub_id_;
253  }
254 
255  broadcast_new_owner(instance_handle, infos, new_owner);
256 }
257 
258 
259 void
261  const GUID_t& pub_id)
262 {
263  if (!infos.candidates_.empty()) {
264  WriterInfos::iterator const the_end = infos.candidates_.end();
265 
266  WriterInfos::iterator found_candidate = the_end;
267  // Supplied writer is not an owner, check if it exists in candidate list.
268  // If not, add it to the candidate list and sort the list.
269  for (WriterInfos::iterator iter = infos.candidates_.begin();
270  iter != the_end; ++iter) {
271  if (iter->pub_id_ == pub_id) {
272  found_candidate = iter;
273  break;
274  }
275  }
276 
277  if (found_candidate != the_end) {
278  infos.candidates_.erase(found_candidate);
279  }
280  }
281 }
282 
283 bool
285  const GUID_t& pub_id,
286  const CORBA::Long& ownership_strength,
287  InstanceState_rch instance_state)
288 {
289  ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, instance_lock_, false);
290 
291  InstanceOwnershipWriterInfos::iterator iter =
292  instance_ownership_infos_.find(instance_handle);
293  if (iter != instance_ownership_infos_.end()) {
294  OwnershipWriterInfos& infos = iter->second;
295  if (!instance_state->registered()) {
296  infos.instance_states_.push_back(instance_state);
297  instance_state->registered(true);
298  }
299 
300  // No owner at some point.
301  if (infos.owner_.pub_id_ == GUID_UNKNOWN) {
302  infos.owner_ = WriterInfo(pub_id, ownership_strength);
303  broadcast_new_owner(instance_handle, infos, pub_id);
304  return true;
305 
306  } else if (infos.owner_.pub_id_ == pub_id) { // is current owner
307  //still owner but strength changed to be bigger..
308  if (infos.owner_.ownership_strength_ <= ownership_strength) {
309  infos.owner_.ownership_strength_ = ownership_strength;
310  return true;
311 
312  } else { //update strength and reevaluate owner which broadcast new owner.
313  infos.candidates_.push_back(WriterInfo(pub_id, ownership_strength));
314  remove_owner(instance_handle, infos, true);
315  return infos.owner_.pub_id_ == pub_id;
316  }
317 
318  } else { // not current owner, reevaluate the owner
319  bool replace_owner = false;
320  // Add current owner to candidate list for owner reevaluation
321  // if provided pub has strength greater than current owner.
322  if (ownership_strength > infos.owner_.ownership_strength_) {
323  infos.candidates_.push_back(infos.owner_);
324  replace_owner = true;
325  }
326 
327  bool found = false;
328  bool sort = true;
329 
330  // check if it already existed in candidate list. If not,
331  // add it to the candidate list, otherwise update strength
332  // if strength was changed.
333  const WriterInfos::iterator the_end = infos.candidates_.end();
334 
335  for (WriterInfos::iterator iter = infos.candidates_.begin();
336  iter != the_end; ++iter) {
337 
338  if (iter->pub_id_ == pub_id) {
339  if (iter->ownership_strength_ != ownership_strength) {
340  iter->ownership_strength_ = ownership_strength;
341  } else {
342  sort = false;
343  }
344  found = true;
345  break;
346  }
347  }
348 
349  if (!found) {
350  infos.candidates_.push_back(WriterInfo(pub_id, ownership_strength));
351  }
352 
353  if (sort) {
354  std::sort(infos.candidates_.begin(), infos.candidates_.end(),
356  }
357 
358  if (replace_owner) {
359  // Owner was already moved to candidate list and the list was sorted
360  // already so pick owner from sorted list and replace current
361  // owner.
362  remove_owner(instance_handle, infos, false);
363  }
364 
365  return infos.owner_.pub_id_ == pub_id;
366  }
367 
368  } else {
369  // first writer of the instance so it's owner.
370  OwnershipWriterInfos& infos = instance_ownership_infos_[instance_handle];
371  infos.owner_ = WriterInfo(pub_id, ownership_strength);
372  if (!instance_state->registered()) {
373  infos.instance_states_.push_back(instance_state);
374  instance_state->registered(true);
375  }
376  broadcast_new_owner(instance_handle, infos, infos.owner_.pub_id_);
377  return true;
378  }
379 
380  return false;
381 }
382 
383 
384 void
386  OwnershipWriterInfos& infos,
387  const GUID_t& owner)
388 {
389  if (DCPS_debug_level >= 1) {
390  // This may not be an error since it could happen that the sample
391  // is delivered to the datareader after the write is dis-associated
392  // with this datareader.
394  ACE_TEXT("(%P|%t) OwnershipManager::broadcast_new_owner: ")
395  ACE_TEXT("owner writer %C, instance handle %d strength %d num ")
396  ACE_TEXT("of candidates %d\n"),
397  LogGuid(owner).c_str(), instance_handle,
399  (int)infos.candidates_.size()));
400  }
401 
402  const InstanceStateVec::iterator the_end = infos.instance_states_.end();
403  for (InstanceStateVec::iterator iter = infos.instance_states_.begin();
404  iter != the_end; ++iter) {
405  (*iter)->set_owner(owner);
406  }
407 }
408 
409 void
411 {
412  ACE_GUARD(ACE_Thread_Mutex, guard, instance_lock_);
413 
414  const InstanceOwnershipWriterInfos::iterator iter =
415  instance_ownership_infos_.find(instance_handle);
416 
417  if (iter != instance_ownership_infos_.end()) {
418  remove_owner(instance_handle, iter->second, false);
419  }
420 }
421 
422 
423 } // namespace DCPS
424 } // namespace OpenDDS
425 
427 
428 #endif // OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
manage the states of a received data instance.
Definition: InstanceState.h:49
#define ACE_DEBUG(X)
ACE_CDR::Long Long
#define ACE_ERROR(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
void set_instance_map(const char *type_name, const RcHandle< RcObject > &instance_map, DataReaderImpl *reader)
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
int bind(Container &c, const FirstType &first, const SecondType &second)
Definition: Util.h:20
bool is_owner(const DDS::InstanceHandle_t &instance_handle, const GUID_t &pub_id)
void unregister_reader(const char *type_name, DataReaderImpl *reader)
void remove_instance(InstanceState *instance_state)
LM_DEBUG
RcHandle< RcObject > get_instance_map(const char *type_name, DataReaderImpl *reader)
bool select_owner(const DDS::InstanceHandle_t &instance_handle, const GUID_t &pub_id, const CORBA::Long &ownership_strength, InstanceState_rch instance_state)
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
DDS::InstanceHandle_t instance_handle() const
Implements the DDS::DataReader interface.
LM_WARNING
bool DescendingOwnershipStrengthSort(const OwnershipManager::WriterInfo &w1, const OwnershipManager::WriterInfo &w2)
HANDLE_TYPE_NATIVE InstanceHandle_t
Definition: DdsDcpsCore.idl:51
ACE_TEXT("TCP_Factory")
void remove_owner(const DDS::InstanceHandle_t &instance_handle)
void remove_writer(const GUID_t &pub_id)
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
void remove_candidate(OwnershipWriterInfos &infos, const GUID_t &pub_id)
void remove_writers(const DDS::InstanceHandle_t &instance_handle)
LM_ERROR
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
int unbind(Container &c, const typename Container::key_type &k, typename Container::mapped_type &v)
Definition: Util.h:40
void broadcast_new_owner(const DDS::InstanceHandle_t &instance_handle, OwnershipWriterInfos &infos, const GUID_t &owner)
int find(Container &c, const Key &key, typename Container::mapped_type *&value)
Definition: Util.h:71