00001
00002
00003
00004
00005
00006
00007
00008 #include "DCPS/DdsDcps_pch.h"
00009
00010 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00011
00012 #include "OwnershipManager.h"
00013 #include "GuidConverter.h"
00014 #include "Util.h"
00015 #include "DataReaderImpl.h"
00016 #include <algorithm>
00017
00018
00019 namespace Util {
00020
00021 bool DescendingOwnershipStrengthSort (const OpenDDS::DCPS::OwnershipManager::WriterInfo& w1, const OpenDDS::DCPS::OwnershipManager::WriterInfo& w2)
00022 {
00023 return w1.ownership_strength_ > w2.ownership_strength_;
00024 };
00025
00026 }
00027
00028 namespace OpenDDS {
00029 namespace DCPS {
00030
00031
00032
00033
00034
00035
00036 OwnershipManager::OwnershipManager()
00037 {
00038 }
00039
00040
00041 OwnershipManager::~OwnershipManager()
00042 {
00043
00044
00045
00046 TypeInstanceMap::iterator const the_end = type_instance_map_.end ();
00047 TypeInstanceMap::iterator iter = type_instance_map_.begin ();
00048 while (iter != the_end)
00049 {
00050
00051
00052
00053 ACE_DEBUG((LM_WARNING,
00054 ACE_TEXT("(%P|%t) OwnershipManager::~OwnershipManager ")
00055 ACE_TEXT("- non-empty type_instance_map_\n")));
00056 ++ iter;
00057 }
00058
00059 type_instance_map_.clear ();
00060 }
00061
00062
00063 int
00064 OwnershipManager::instance_lock_acquire ()
00065 {
00066 return this->instance_lock_.acquire ();
00067 }
00068
00069 int
00070 OwnershipManager::instance_lock_release ()
00071 {
00072 return this->instance_lock_.release ();
00073 }
00074
00075 void*
00076 OwnershipManager::get_instance_map (const char* type_name,
00077 DataReaderImpl* reader)
00078 {
00079 InstanceMap* instance = 0;
00080 if (0 != find(type_instance_map_, type_name, instance)) {
00081 return 0;
00082 }
00083
00084 instance->readers_.push_back (reader);
00085 return instance->map_;
00086 }
00087
00088 void
00089 OwnershipManager::set_instance_map (const char* type_name,
00090 void* instance_map,
00091 DataReaderImpl* reader)
00092 {
00093 if (DCPS_debug_level >= 1) {
00094 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) OwnershipManager::set_instance_map ")
00095 ACE_TEXT (" instance map %X is created by reader %X \n"),
00096 instance_map, reader));
00097 }
00098
00099 if (0 != OpenDDS::DCPS::bind(type_instance_map_, type_name, InstanceMap (instance_map, reader))) {
00100 ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: OwnershipManager::set_instance_map failed to "
00101 "bind instance for type \"%s\"\n",type_name));
00102 }
00103 }
00104
00105 void
00106 OwnershipManager::unregister_reader (const char* type_name,
00107 DataReaderImpl* reader)
00108 {
00109 ACE_GUARD(ACE_Thread_Mutex,
00110 guard,
00111 this->instance_lock_);
00112
00113 InstanceMap* instance = 0;
00114 if (0 != find(type_instance_map_, type_name, instance)) {
00115 return;
00116 }
00117
00118 ReaderVec::iterator end = instance->readers_.end();
00119
00120 for (ReaderVec::iterator it(instance->readers_.begin());
00121 it != end; ++it) {
00122 if (*it == reader) {
00123 instance->readers_.erase (it);
00124 break;
00125 }
00126 }
00127
00128 if (instance->readers_.empty ()) {
00129 if (DCPS_debug_level >= 1) {
00130 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) OwnershipManager::unregister_reader ")
00131 ACE_TEXT (" instance map %X is deleted by reader %X \n"),
00132 instance->map_, reader));
00133 }
00134 reader->delete_instance_map (instance->map_);
00135 unbind (type_instance_map_, type_name);
00136 }
00137 }
00138
00139 void
00140 OwnershipManager::remove_writer (const PublicationId& pub_id)
00141 {
00142 ACE_GUARD(ACE_Thread_Mutex,
00143 guard,
00144 this->instance_lock_);
00145
00146 InstanceOwnershipWriterInfos::iterator const the_end = instance_ownership_infos_.end ();
00147 for (InstanceOwnershipWriterInfos::iterator iter = instance_ownership_infos_.begin ();
00148 iter != the_end; ++ iter) {
00149 this->remove_writer (iter->first, iter->second, pub_id);
00150 }
00151 }
00152
00153 void
00154 OwnershipManager::remove_instance(InstanceState* instance_state)
00155 {
00156 ACE_GUARD(ACE_Thread_Mutex, guard, this->instance_lock_);
00157 DDS::InstanceHandle_t ih = instance_state->instance_handle();
00158 InstanceOwnershipWriterInfos::iterator i = instance_ownership_infos_.find(ih);
00159 if (i != instance_ownership_infos_.end()) {
00160 for (size_t j = 0; j < i->second.instance_states_.size(); ++j) {
00161 if (i->second.instance_states_[j] == instance_state) {
00162 i->second.instance_states_.erase(i->second.instance_states_.begin() + j);
00163 break;
00164 }
00165 }
00166 }
00167 }
00168
00169 void
00170 OwnershipManager::remove_writers (const ::DDS::InstanceHandle_t& instance_handle)
00171 {
00172 InstanceStateVec instances_to_reset;
00173 {
00174 ACE_GUARD(ACE_Thread_Mutex,
00175 guard,
00176 this->instance_lock_);
00177
00178 if (DCPS_debug_level >= 1) {
00179 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) OwnershipManager::remove_writers: ")
00180 ACE_TEXT("disassociate writers with instance %d\n"),
00181 instance_handle));
00182 }
00183
00184 InstanceOwnershipWriterInfos::iterator owner_wi
00185 = instance_ownership_infos_.find (instance_handle);
00186 if (owner_wi != instance_ownership_infos_.end()) {
00187 owner_wi->second.owner_ = WriterInfo();
00188 owner_wi->second.candidates_.clear ();
00189 InstanceStateVec::iterator const end = owner_wi->second.instance_states_.end();
00190 for (InstanceStateVec::iterator iter = owner_wi->second.instance_states_.begin();
00191 iter != end; ++iter) {
00192
00193 instances_to_reset.push_back(*iter);
00194 }
00195 owner_wi->second.instance_states_.clear ();
00196
00197 instance_ownership_infos_.erase(owner_wi);
00198 }
00199 }
00200
00201 InstanceStateVec::iterator instance;
00202 for (instance = instances_to_reset.begin();
00203 instance != instances_to_reset.end(); ++instance)
00204 {
00205 (*instance)->reset_ownership(instance_handle);
00206 }
00207 }
00208
00209
00210 bool
00211 OwnershipManager::is_owner (const ::DDS::InstanceHandle_t& instance_handle,
00212 const PublicationId& pub_id)
00213 {
00214 ACE_GUARD_RETURN (ACE_Thread_Mutex,
00215 guard,
00216 this->instance_lock_,
00217 false);
00218
00219 InstanceOwnershipWriterInfos::iterator const the_end = instance_ownership_infos_.end ();
00220
00221 InstanceOwnershipWriterInfos::iterator iter
00222 = instance_ownership_infos_.find (instance_handle);
00223 if (iter != the_end) {
00224 return iter->second.owner_.pub_id_ == pub_id;
00225 }
00226
00227 return false;
00228 }
00229
00230
00231 bool
00232 OwnershipManager::remove_writer (
00233 const ::DDS::InstanceHandle_t& instance_handle,
00234 const PublicationId& pub_id)
00235 {
00236 ACE_GUARD_RETURN (ACE_Thread_Mutex,
00237 guard,
00238 this->instance_lock_,
00239 false);
00240
00241 InstanceOwnershipWriterInfos::iterator const the_end = instance_ownership_infos_.end ();
00242
00243 InstanceOwnershipWriterInfos::iterator the_iter
00244 = instance_ownership_infos_.find (instance_handle);
00245 if (the_iter != the_end) {
00246 return this->remove_writer (instance_handle, the_iter->second, pub_id);
00247 }
00248
00249 return false;
00250 }
00251
00252 bool
00253 OwnershipManager::remove_writer (const ::DDS::InstanceHandle_t& instance_handle,
00254 OwnershipWriterInfos& infos,
00255 const PublicationId& pub_id)
00256 {
00257 if (infos.owner_.pub_id_ == pub_id) {
00258 this->remove_owner (instance_handle, infos, false);
00259 return true;
00260 }
00261 else {
00262 this->remove_candidate (infos, pub_id);
00263 return false;
00264 }
00265
00266 return false;
00267 }
00268
00269
00270 void
00271 OwnershipManager::remove_owner (const ::DDS::InstanceHandle_t& instance_handle,
00272 OwnershipWriterInfos& infos,
00273 bool sort)
00274 {
00275
00276 PublicationId new_owner(GUID_UNKNOWN);
00277 if (infos.candidates_.empty ()) {
00278 infos.owner_ = WriterInfo();
00279 }
00280 else {
00281 if (sort) {
00282 std::sort (infos.candidates_.begin(), infos.candidates_.end(),
00283 ::Util::DescendingOwnershipStrengthSort);
00284 }
00285
00286 WriterInfos::iterator begin = infos.candidates_.begin();
00287 infos.owner_ = *begin;
00288 infos.candidates_.erase (begin);
00289 new_owner = infos.owner_.pub_id_;
00290 }
00291
00292 this->broadcast_new_owner (instance_handle, infos, new_owner);
00293 }
00294
00295
00296 void
00297 OwnershipManager::remove_candidate (OwnershipWriterInfos& infos,const PublicationId& pub_id)
00298 {
00299 if (! infos.candidates_.empty ()) {
00300 WriterInfos::iterator const the_end = infos.candidates_.end();
00301
00302 WriterInfos::iterator found_candidate = the_end;
00303
00304
00305 for (WriterInfos::iterator iter = infos.candidates_.begin ();
00306 iter != the_end; ++iter) {
00307
00308 if (iter->pub_id_ == pub_id) {
00309 found_candidate = iter;
00310 break;
00311 }
00312 }
00313
00314 if (found_candidate != the_end) {
00315 infos.candidates_.erase (found_candidate);
00316 }
00317 }
00318 }
00319
00320 bool
00321 OwnershipManager::select_owner (const ::DDS::InstanceHandle_t& instance_handle,
00322 const PublicationId& pub_id,
00323 const CORBA::Long& ownership_strength,
00324 InstanceState* instance_state)
00325 {
00326 ACE_GUARD_RETURN (ACE_Thread_Mutex,
00327 guard,
00328 this->instance_lock_,
00329 false);
00330
00331 InstanceOwnershipWriterInfos::iterator const the_end = instance_ownership_infos_.end ();
00332
00333 InstanceOwnershipWriterInfos::iterator iter
00334 = instance_ownership_infos_.find (instance_handle);
00335 if (iter != the_end) {
00336 OwnershipWriterInfos& infos = iter->second;
00337 if (!instance_state->registered()) {
00338 infos.instance_states_.push_back (instance_state);
00339 instance_state->registered(true);
00340 }
00341
00342
00343 if (infos.owner_.pub_id_ == GUID_UNKNOWN) {
00344 infos.owner_ = WriterInfo(pub_id,ownership_strength);
00345 this->broadcast_new_owner (instance_handle, infos, pub_id);
00346
00347 return true;
00348 }
00349 else if (infos.owner_.pub_id_ == pub_id) {
00350
00351 if (infos.owner_.ownership_strength_ <= ownership_strength) {
00352 infos.owner_.ownership_strength_ = ownership_strength;
00353 return true;
00354 }
00355 else {
00356 infos.candidates_.push_back (WriterInfo(pub_id,ownership_strength));
00357 this->remove_owner (instance_handle, infos, true);
00358 return infos.owner_.pub_id_ == pub_id;
00359 }
00360 }
00361 else {
00362 bool replace_owner = false;
00363
00364
00365 if (ownership_strength > infos.owner_.ownership_strength_) {
00366 infos.candidates_.push_back (infos.owner_);
00367 replace_owner = true;
00368 }
00369
00370 bool found = false;
00371 bool sort = true;
00372
00373
00374
00375
00376 WriterInfos::iterator const the_end = infos.candidates_.end();
00377
00378 for (WriterInfos::iterator iter = infos.candidates_.begin();
00379 iter != the_end; ++iter) {
00380
00381 if (iter->pub_id_ == pub_id) {
00382 if (iter->ownership_strength_ != ownership_strength) {
00383 iter->ownership_strength_ = ownership_strength;
00384 }
00385 else {
00386 sort = false;
00387 }
00388 found = true;
00389 break;
00390 }
00391 }
00392
00393 if (!found) {
00394 infos.candidates_.push_back (WriterInfo(pub_id,ownership_strength));
00395 }
00396
00397 if (sort) {
00398 std::sort (infos.candidates_.begin(), infos.candidates_.end(), ::Util::DescendingOwnershipStrengthSort);
00399 }
00400
00401 if (replace_owner) {
00402
00403
00404
00405 this->remove_owner (instance_handle, infos, false);
00406 }
00407
00408 return infos.owner_.pub_id_ == pub_id;
00409 }
00410 }
00411 else {
00412
00413 OwnershipWriterInfos& infos = instance_ownership_infos_[instance_handle];
00414 infos.owner_ = WriterInfo(pub_id,ownership_strength);
00415 if (!instance_state->registered()) {
00416 infos.instance_states_.push_back (instance_state);
00417 instance_state->registered(true);
00418 }
00419 this->broadcast_new_owner (instance_handle, infos, infos.owner_.pub_id_);
00420 return true;
00421 }
00422
00423 return false;
00424 }
00425
00426
00427 void
00428 OwnershipManager::broadcast_new_owner ( const ::DDS::InstanceHandle_t& instance_handle,
00429 OwnershipWriterInfos& infos,
00430 const PublicationId& owner)
00431 {
00432 if (DCPS_debug_level >= 1) {
00433
00434
00435
00436 GuidConverter writer_converter(owner);
00437 ACE_DEBUG((LM_DEBUG,
00438 ACE_TEXT("(%P|%t) OwnershipManager::broadcast_new_owner: ")
00439 ACE_TEXT("owner writer %C, instance handle %d strength %d num of candidates %d\n"),
00440 OPENDDS_STRING(writer_converter).c_str(),
00441 instance_handle, infos.owner_.ownership_strength_, infos.candidates_.size()));
00442 }
00443
00444 InstanceStateVec::iterator const the_end = infos.instance_states_.end();
00445 for (InstanceStateVec::iterator iter = infos.instance_states_.begin ();
00446 iter != the_end; ++iter) {
00447 (*iter)->set_owner (owner);
00448 }
00449 }
00450
00451 void
00452 OwnershipManager::remove_owner (const ::DDS::InstanceHandle_t& instance_handle)
00453 {
00454 ACE_GUARD(ACE_Thread_Mutex,
00455 guard,
00456 this->instance_lock_);
00457
00458 InstanceOwnershipWriterInfos::iterator iter
00459 = instance_ownership_infos_.find (instance_handle);
00460
00461 this->remove_owner (instance_handle, iter->second, false);
00462 }
00463
00464
00465 }
00466 }
00467
00468 #endif // OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE