00001
00002
00003
00004
00005
00006
00007
00008 #include "DCPS/DdsDcps_pch.h"
00009
00010 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00011
00012 #include "OwnershipManager.h"
00013 #include "GuidConverter.h"
00014 #include "Util.h"
00015 #include "DataReaderImpl.h"
00016 #include <algorithm>
00017
00018 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00019
00020 namespace OpenDDS {
00021 namespace DCPS {
00022
00023 namespace Util {
00024
00025 bool DescendingOwnershipStrengthSort(const OwnershipManager::WriterInfo& w1,
00026 const OwnershipManager::WriterInfo& w2)
00027 {
00028 return w1.ownership_strength_ > w2.ownership_strength_;
00029 }
00030
00031 }
00032
00033 OwnershipManager::OwnershipManager()
00034 {
00035 }
00036
00037 OwnershipManager::~OwnershipManager()
00038 {
00039
00040
00041
00042 if (!type_instance_map_.empty()) {
00043
00044
00045 ACE_DEBUG((LM_WARNING,
00046 ACE_TEXT("(%P|%t) OwnershipManager::~OwnershipManager ")
00047 ACE_TEXT("- non-empty type_instance_map_\n")));
00048 }
00049 }
00050
00051 int
00052 OwnershipManager::instance_lock_acquire()
00053 {
00054 return instance_lock_.acquire();
00055 }
00056
00057 int
00058 OwnershipManager::instance_lock_release()
00059 {
00060 return instance_lock_.release();
00061 }
00062
00063 RcHandle<RcObject>
00064 OwnershipManager::get_instance_map(const char* type_name,
00065 DataReaderImpl* reader)
00066 {
00067 InstanceMap* instance = 0;
00068 if (0 != find(type_instance_map_, type_name, instance)) {
00069 return RcHandle<RcObject>();
00070 }
00071
00072 instance->readers_.insert(reader);
00073 return instance->map_;
00074 }
00075
00076 void
00077 OwnershipManager::set_instance_map(const char* type_name,
00078 const RcHandle<RcObject>& instance_map,
00079 DataReaderImpl* reader)
00080 {
00081 if (DCPS_debug_level >= 1) {
00082 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) OwnershipManager::set_instance_map ")
00083 ACE_TEXT("instance map %X is created by reader %X \n"),
00084 instance_map.in(), reader));
00085 }
00086
00087 if (0 != OpenDDS::DCPS::bind(type_instance_map_, type_name,
00088 InstanceMap(instance_map, reader))) {
00089 ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: OwnershipManager::set_instance_map "
00090 "failed to bind instance for type \"%C\"\n", type_name));
00091 }
00092 }
00093
00094 void
00095 OwnershipManager::unregister_reader(const char* type_name,
00096 DataReaderImpl* reader)
00097 {
00098 ACE_GUARD(ACE_Thread_Mutex, guard, instance_lock_);
00099
00100 InstanceMap* instance = 0;
00101 if (0 != find(type_instance_map_, type_name, instance)) {
00102 return;
00103 }
00104
00105 instance->readers_.erase(reader);
00106
00107 if (instance->readers_.empty()) {
00108 if (DCPS_debug_level >= 1) {
00109 ACE_DEBUG((LM_DEBUG,
00110 ACE_TEXT("(%P|%t) OwnershipManager::unregister_reader ")
00111 ACE_TEXT(" instance map %@ is deleted by reader %@\n"),
00112 instance->map_.in(), reader));
00113 }
00114 unbind(type_instance_map_, type_name);
00115 }
00116 }
00117
00118 void
00119 OwnershipManager::remove_writer(const PublicationId& pub_id)
00120 {
00121 ACE_GUARD(ACE_Thread_Mutex, guard, instance_lock_);
00122
00123 const InstanceOwnershipWriterInfos::iterator the_end =
00124 instance_ownership_infos_.end();
00125 for (InstanceOwnershipWriterInfos::iterator iter =
00126 instance_ownership_infos_.begin(); iter != the_end; ++iter) {
00127 remove_writer(iter->first, iter->second, pub_id);
00128 }
00129 }
00130
00131 void
00132 OwnershipManager::remove_instance(InstanceState* instance_state)
00133 {
00134 ACE_GUARD(ACE_Thread_Mutex, guard, instance_lock_);
00135 const DDS::InstanceHandle_t ih = instance_state->instance_handle();
00136 InstanceOwnershipWriterInfos::iterator i = instance_ownership_infos_.find(ih);
00137 if (i != instance_ownership_infos_.end()) {
00138 InstanceStateVec& states = i->second.instance_states_;
00139 for (size_t j = 0; j < states.size(); ++j) {
00140 if (states[j] == instance_state) {
00141 states.erase(states.begin() + j);
00142 break;
00143 }
00144 }
00145 }
00146 }
00147
00148 void
00149 OwnershipManager::remove_writers(const DDS::InstanceHandle_t& instance_handle)
00150 {
00151 InstanceStateVec instances_to_reset;
00152 {
00153 ACE_GUARD(ACE_Thread_Mutex, guard, instance_lock_);
00154
00155 if (DCPS_debug_level >= 1) {
00156 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) OwnershipManager::remove_writers:")
00157 ACE_TEXT(" disassociate writers with instance %d\n"),
00158 instance_handle));
00159 }
00160
00161 InstanceOwnershipWriterInfos::iterator owner_wi =
00162 instance_ownership_infos_.find(instance_handle);
00163 if (owner_wi != instance_ownership_infos_.end()) {
00164 owner_wi->second.owner_ = WriterInfo();
00165 owner_wi->second.candidates_.clear();
00166 const InstanceStateVec::iterator end =
00167 owner_wi->second.instance_states_.end();
00168 for (InstanceStateVec::iterator iter =
00169 owner_wi->second.instance_states_.begin(); iter != end; ++iter) {
00170
00171 instances_to_reset.push_back(*iter);
00172 }
00173 owner_wi->second.instance_states_.clear();
00174
00175 instance_ownership_infos_.erase(owner_wi);
00176 }
00177 }
00178
00179 for (InstanceStateVec::iterator instance = instances_to_reset.begin();
00180 instance != instances_to_reset.end(); ++instance) {
00181 (*instance)->reset_ownership(instance_handle);
00182 }
00183 }
00184
00185
00186 bool
00187 OwnershipManager::is_owner(const DDS::InstanceHandle_t& instance_handle,
00188 const PublicationId& pub_id)
00189 {
00190 ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, instance_lock_, false);
00191
00192 InstanceOwnershipWriterInfos::iterator iter
00193 = instance_ownership_infos_.find(instance_handle);
00194 if (iter != instance_ownership_infos_.end()) {
00195 return iter->second.owner_.pub_id_ == pub_id;
00196 }
00197
00198 return false;
00199 }
00200
00201
00202 bool
00203 OwnershipManager::remove_writer(const DDS::InstanceHandle_t& instance_handle,
00204 const PublicationId& pub_id)
00205 {
00206 ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, instance_lock_, false);
00207
00208 InstanceOwnershipWriterInfos::iterator the_iter =
00209 instance_ownership_infos_.find(instance_handle);
00210 if (the_iter != instance_ownership_infos_.end()) {
00211 return remove_writer(instance_handle, the_iter->second, pub_id);
00212 }
00213
00214 return false;
00215 }
00216
00217 bool
00218 OwnershipManager::remove_writer(const DDS::InstanceHandle_t& instance_handle,
00219 OwnershipWriterInfos& infos,
00220 const PublicationId& pub_id)
00221 {
00222 if (infos.owner_.pub_id_ == pub_id) {
00223 remove_owner(instance_handle, infos, false);
00224 return true;
00225
00226 } else {
00227 remove_candidate(infos, pub_id);
00228 return false;
00229 }
00230 }
00231
00232
00233 void
00234 OwnershipManager::remove_owner(const DDS::InstanceHandle_t& instance_handle,
00235 OwnershipWriterInfos& infos,
00236 bool sort)
00237 {
00238
00239 PublicationId new_owner(GUID_UNKNOWN);
00240 if (infos.candidates_.empty()) {
00241 infos.owner_ = WriterInfo();
00242
00243 } else {
00244 if (sort) {
00245 std::sort(infos.candidates_.begin(), infos.candidates_.end(),
00246 Util::DescendingOwnershipStrengthSort);
00247 }
00248
00249 const WriterInfos::iterator begin = infos.candidates_.begin();
00250 infos.owner_ = *begin;
00251 infos.candidates_.erase(begin);
00252 new_owner = infos.owner_.pub_id_;
00253 }
00254
00255 broadcast_new_owner(instance_handle, infos, new_owner);
00256 }
00257
00258
00259 void
00260 OwnershipManager::remove_candidate(OwnershipWriterInfos& infos,
00261 const PublicationId& pub_id)
00262 {
00263 if (!infos.candidates_.empty()) {
00264 WriterInfos::iterator const the_end = infos.candidates_.end();
00265
00266 WriterInfos::iterator found_candidate = the_end;
00267
00268
00269 for (WriterInfos::iterator iter = infos.candidates_.begin();
00270 iter != the_end; ++iter) {
00271 if (iter->pub_id_ == pub_id) {
00272 found_candidate = iter;
00273 break;
00274 }
00275 }
00276
00277 if (found_candidate != the_end) {
00278 infos.candidates_.erase(found_candidate);
00279 }
00280 }
00281 }
00282
00283 bool
00284 OwnershipManager::select_owner(const DDS::InstanceHandle_t& instance_handle,
00285 const PublicationId& pub_id,
00286 const CORBA::Long& ownership_strength,
00287 InstanceState* instance_state)
00288 {
00289 ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, instance_lock_, false);
00290
00291 InstanceOwnershipWriterInfos::iterator iter =
00292 instance_ownership_infos_.find(instance_handle);
00293 if (iter != instance_ownership_infos_.end()) {
00294 OwnershipWriterInfos& infos = iter->second;
00295 if (!instance_state->registered()) {
00296 infos.instance_states_.push_back(instance_state);
00297 instance_state->registered(true);
00298 }
00299
00300
00301 if (infos.owner_.pub_id_ == GUID_UNKNOWN) {
00302 infos.owner_ = WriterInfo(pub_id, ownership_strength);
00303 broadcast_new_owner(instance_handle, infos, pub_id);
00304 return true;
00305
00306 } else if (infos.owner_.pub_id_ == pub_id) {
00307
00308 if (infos.owner_.ownership_strength_ <= ownership_strength) {
00309 infos.owner_.ownership_strength_ = ownership_strength;
00310 return true;
00311
00312 } else {
00313 infos.candidates_.push_back(WriterInfo(pub_id, ownership_strength));
00314 remove_owner(instance_handle, infos, true);
00315 return infos.owner_.pub_id_ == pub_id;
00316 }
00317
00318 } else {
00319 bool replace_owner = false;
00320
00321
00322 if (ownership_strength > infos.owner_.ownership_strength_) {
00323 infos.candidates_.push_back(infos.owner_);
00324 replace_owner = true;
00325 }
00326
00327 bool found = false;
00328 bool sort = true;
00329
00330
00331
00332
00333 const WriterInfos::iterator the_end = infos.candidates_.end();
00334
00335 for (WriterInfos::iterator iter = infos.candidates_.begin();
00336 iter != the_end; ++iter) {
00337
00338 if (iter->pub_id_ == pub_id) {
00339 if (iter->ownership_strength_ != ownership_strength) {
00340 iter->ownership_strength_ = ownership_strength;
00341 } else {
00342 sort = false;
00343 }
00344 found = true;
00345 break;
00346 }
00347 }
00348
00349 if (!found) {
00350 infos.candidates_.push_back(WriterInfo(pub_id, ownership_strength));
00351 }
00352
00353 if (sort) {
00354 std::sort(infos.candidates_.begin(), infos.candidates_.end(),
00355 Util::DescendingOwnershipStrengthSort);
00356 }
00357
00358 if (replace_owner) {
00359
00360
00361
00362 remove_owner(instance_handle, infos, false);
00363 }
00364
00365 return infos.owner_.pub_id_ == pub_id;
00366 }
00367
00368 } else {
00369
00370 OwnershipWriterInfos& infos = instance_ownership_infos_[instance_handle];
00371 infos.owner_ = WriterInfo(pub_id, ownership_strength);
00372 if (!instance_state->registered()) {
00373 infos.instance_states_.push_back(instance_state);
00374 instance_state->registered(true);
00375 }
00376 broadcast_new_owner(instance_handle, infos, infos.owner_.pub_id_);
00377 return true;
00378 }
00379
00380 return false;
00381 }
00382
00383
00384 void
00385 OwnershipManager::broadcast_new_owner(const DDS::InstanceHandle_t& instance_handle,
00386 OwnershipWriterInfos& infos,
00387 const PublicationId& owner)
00388 {
00389 if (DCPS_debug_level >= 1) {
00390
00391
00392
00393 GuidConverter writer_converter(owner);
00394 ACE_DEBUG((LM_DEBUG,
00395 ACE_TEXT("(%P|%t) OwnershipManager::broadcast_new_owner: ")
00396 ACE_TEXT("owner writer %C, instance handle %d strength %d num ")
00397 ACE_TEXT("of candidates %d\n"),
00398 OPENDDS_STRING(writer_converter).c_str(), instance_handle,
00399 infos.owner_.ownership_strength_,
00400 (int)infos.candidates_.size()));
00401 }
00402
00403 const InstanceStateVec::iterator the_end = infos.instance_states_.end();
00404 for (InstanceStateVec::iterator iter = infos.instance_states_.begin();
00405 iter != the_end; ++iter) {
00406 (*iter)->set_owner(owner);
00407 }
00408 }
00409
00410 void
00411 OwnershipManager::remove_owner(const DDS::InstanceHandle_t& instance_handle)
00412 {
00413 ACE_GUARD(ACE_Thread_Mutex, guard, instance_lock_);
00414
00415 const InstanceOwnershipWriterInfos::iterator iter =
00416 instance_ownership_infos_.find(instance_handle);
00417
00418 if (iter != instance_ownership_infos_.end()) {
00419 remove_owner(instance_handle, iter->second, false);
00420 }
00421 }
00422
00423
00424 }
00425 }
00426
00427 OPENDDS_END_VERSIONED_NAMESPACE_DECL
00428
00429 #endif // OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE