#include <OwnershipManager.h>
Classes | |
struct | InstanceMap |
struct | OwnershipWriterInfos |
struct | WriterInfo |
Public Member Functions | |
typedef | OPENDDS_SET (DataReaderImpl *) ReaderSet |
typedef | OPENDDS_MAP (OPENDDS_STRING, InstanceMap) TypeInstanceMap |
typedef | OPENDDS_VECTOR (WriterInfo) WriterInfos |
typedef | OPENDDS_VECTOR (InstanceState *) InstanceStateVec |
typedef | OPENDDS_MAP (DDS::InstanceHandle_t, OwnershipWriterInfos) InstanceOwnershipWriterInfos |
OwnershipManager () | |
~OwnershipManager () | |
int | instance_lock_acquire () |
int | instance_lock_release () |
void | set_instance_map (const char *type_name, const RcHandle< RcObject > &instance_map, DataReaderImpl *reader) |
RcHandle< RcObject > | get_instance_map (const char *type_name, DataReaderImpl *reader) |
void | unregister_reader (const char *type_name, DataReaderImpl *reader) |
void | remove_writer (const PublicationId &pub_id) |
void | remove_writers (const DDS::InstanceHandle_t &instance_handle) |
bool | remove_writer (const DDS::InstanceHandle_t &instance_handle, const PublicationId &pub_id) |
bool | is_owner (const DDS::InstanceHandle_t &instance_handle, const PublicationId &pub_id) |
bool | select_owner (const DDS::InstanceHandle_t &instance_handle, const PublicationId &pub_id, const CORBA::Long &ownership_strength, InstanceState *instance_state) |
void | remove_owner (const DDS::InstanceHandle_t &instance_handle) |
void | remove_instance (InstanceState *instance_state) |
void | update_ownership_strength (const PublicationId &pub_id, const CORBA::Long &ownership_strength) |
Private Member Functions | |
bool | remove_writer (const DDS::InstanceHandle_t &instance_handle, OwnershipWriterInfos &infos, const PublicationId &pub_id) |
void | remove_owner (const DDS::InstanceHandle_t &instance_handle, OwnershipWriterInfos &infos, bool sort) |
void | remove_candidate (OwnershipWriterInfos &infos, const PublicationId &pub_id) |
void | broadcast_new_owner (const DDS::InstanceHandle_t &instance_handle, OwnershipWriterInfos &infos, const PublicationId &owner) |
Private Attributes | |
ACE_Thread_Mutex | instance_lock_ |
TypeInstanceMap | type_instance_map_ |
InstanceOwnershipWriterInfos | instance_ownership_infos_ |
Definition at line 32 of file OwnershipManager.h.
OpenDDS::DCPS::OwnershipManager::OwnershipManager | ( | ) |
Definition at line 33 of file OwnershipManager.cpp.
OpenDDS::DCPS::OwnershipManager::~OwnershipManager | ( | ) |
Definition at line 37 of file OwnershipManager.cpp.
References ACE_TEXT(), LM_WARNING, and type_instance_map_.
00038 { 00039 // The type->instance should be empty if unregister instance are performed 00040 // by all readers, but in case the instance not unregistered for some reason, 00041 // an error will be logged. 00042 if (!type_instance_map_.empty()) { 00043 // There is no way to pass the instance map to concrete datareader 00044 // to delete, so it will be leaked. 00045 ACE_DEBUG((LM_WARNING, 00046 ACE_TEXT("(%P|%t) OwnershipManager::~OwnershipManager ") 00047 ACE_TEXT("- non-empty type_instance_map_\n"))); 00048 } 00049 }
void OpenDDS::DCPS::OwnershipManager::broadcast_new_owner | ( | const DDS::InstanceHandle_t & | instance_handle, | |
OwnershipWriterInfos & | infos, | |||
const PublicationId & | owner | |||
) | [private] |
Definition at line 385 of file OwnershipManager.cpp.
References ACE_TEXT(), OpenDDS::DCPS::OwnershipManager::OwnershipWriterInfos::candidates_, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::OwnershipManager::OwnershipWriterInfos::instance_states_, LM_DEBUG, OPENDDS_STRING, OpenDDS::DCPS::OwnershipManager::OwnershipWriterInfos::owner_, and OpenDDS::DCPS::OwnershipManager::WriterInfo::ownership_strength_.
Referenced by remove_owner(), and select_owner().
00388 { 00389 if (DCPS_debug_level >= 1) { 00390 // This may not be an error since it could happen that the sample 00391 // is delivered to the datareader after the write is dis-associated 00392 // with this datareader. 00393 GuidConverter writer_converter(owner); 00394 ACE_DEBUG((LM_DEBUG, 00395 ACE_TEXT("(%P|%t) OwnershipManager::broadcast_new_owner: ") 00396 ACE_TEXT("owner writer %C, instance handle %d strength %d num ") 00397 ACE_TEXT("of candidates %d\n"), 00398 OPENDDS_STRING(writer_converter).c_str(), instance_handle, 00399 infos.owner_.ownership_strength_, 00400 (int)infos.candidates_.size())); 00401 } 00402 00403 const InstanceStateVec::iterator the_end = infos.instance_states_.end(); 00404 for (InstanceStateVec::iterator iter = infos.instance_states_.begin(); 00405 iter != the_end; ++iter) { 00406 (*iter)->set_owner(owner); 00407 } 00408 }
RcHandle< RcObject > OpenDDS::DCPS::OwnershipManager::get_instance_map | ( | const char * | type_name, | |
DataReaderImpl * | reader | |||
) |
Accesor of the instance map for provided type. It is called once for each new instance in a datareader.
Definition at line 64 of file OwnershipManager.cpp.
References OpenDDS::DCPS::find(), OpenDDS::DCPS::OwnershipManager::InstanceMap::map_, OpenDDS::DCPS::OwnershipManager::InstanceMap::readers_, and type_instance_map_.
00066 { 00067 InstanceMap* instance = 0; 00068 if (0 != find(type_instance_map_, type_name, instance)) { 00069 return RcHandle<RcObject>(); 00070 } 00071 00072 instance->readers_.insert(reader); 00073 return instance->map_; 00074 }
int OpenDDS::DCPS::OwnershipManager::instance_lock_acquire | ( | ) |
Acquire/release lock for type instance map. The following functions are synchronized by instance_lock_.
Definition at line 52 of file OwnershipManager.cpp.
References ACE_Thread_Mutex::acquire(), and instance_lock_.
00053 { 00054 return instance_lock_.acquire(); 00055 }
int OpenDDS::DCPS::OwnershipManager::instance_lock_release | ( | ) |
Definition at line 58 of file OwnershipManager.cpp.
References instance_lock_, and ACE_Thread_Mutex::release().
00059 { 00060 return instance_lock_.release(); 00061 }
bool OpenDDS::DCPS::OwnershipManager::is_owner | ( | const DDS::InstanceHandle_t & | instance_handle, | |
const PublicationId & | pub_id | |||
) |
Return true if the provide writer is the owner of the instance.
Definition at line 187 of file OwnershipManager.cpp.
References instance_lock_, and instance_ownership_infos_.
00189 { 00190 ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, instance_lock_, false); 00191 00192 InstanceOwnershipWriterInfos::iterator iter 00193 = instance_ownership_infos_.find(instance_handle); 00194 if (iter != instance_ownership_infos_.end()) { 00195 return iter->second.owner_.pub_id_ == pub_id; 00196 } 00197 00198 return false; 00199 }
typedef OpenDDS::DCPS::OwnershipManager::OPENDDS_MAP | ( | DDS::InstanceHandle_t | , | |
OwnershipWriterInfos | ||||
) |
typedef OpenDDS::DCPS::OwnershipManager::OPENDDS_MAP | ( | OPENDDS_STRING | , | |
InstanceMap | ||||
) |
typedef OpenDDS::DCPS::OwnershipManager::OPENDDS_SET | ( | DataReaderImpl * | ) |
typedef OpenDDS::DCPS::OwnershipManager::OPENDDS_VECTOR | ( | InstanceState * | ) |
typedef OpenDDS::DCPS::OwnershipManager::OPENDDS_VECTOR | ( | WriterInfo | ) |
void OpenDDS::DCPS::OwnershipManager::remove_candidate | ( | OwnershipWriterInfos & | infos, | |
const PublicationId & | pub_id | |||
) | [private] |
Definition at line 260 of file OwnershipManager.cpp.
References OpenDDS::DCPS::OwnershipManager::OwnershipWriterInfos::candidates_.
Referenced by remove_writer().
00262 { 00263 if (!infos.candidates_.empty()) { 00264 WriterInfos::iterator const the_end = infos.candidates_.end(); 00265 00266 WriterInfos::iterator found_candidate = the_end; 00267 // Supplied writer is not an owner, check if it exists in candidate list. 00268 // If not, add it to the candidate list and sort the list. 00269 for (WriterInfos::iterator iter = infos.candidates_.begin(); 00270 iter != the_end; ++iter) { 00271 if (iter->pub_id_ == pub_id) { 00272 found_candidate = iter; 00273 break; 00274 } 00275 } 00276 00277 if (found_candidate != the_end) { 00278 infos.candidates_.erase(found_candidate); 00279 } 00280 } 00281 }
void OpenDDS::DCPS::OwnershipManager::remove_instance | ( | InstanceState * | instance_state | ) |
Definition at line 132 of file OwnershipManager.cpp.
References OpenDDS::DCPS::InstanceState::instance_handle(), instance_lock_, and instance_ownership_infos_.
00133 { 00134 ACE_GUARD(ACE_Thread_Mutex, guard, instance_lock_); 00135 const DDS::InstanceHandle_t ih = instance_state->instance_handle(); 00136 InstanceOwnershipWriterInfos::iterator i = instance_ownership_infos_.find(ih); 00137 if (i != instance_ownership_infos_.end()) { 00138 InstanceStateVec& states = i->second.instance_states_; 00139 for (size_t j = 0; j < states.size(); ++j) { 00140 if (states[j] == instance_state) { 00141 states.erase(states.begin() + j); 00142 break; 00143 } 00144 } 00145 } 00146 }
void OpenDDS::DCPS::OwnershipManager::remove_owner | ( | const DDS::InstanceHandle_t & | instance_handle, | |
OwnershipWriterInfos & | infos, | |||
bool | sort | |||
) | [private] |
Definition at line 234 of file OwnershipManager.cpp.
References broadcast_new_owner(), OpenDDS::DCPS::OwnershipManager::OwnershipWriterInfos::candidates_, OpenDDS::DCPS::Util::DescendingOwnershipStrengthSort(), OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::DCPS::OwnershipManager::OwnershipWriterInfos::owner_, and OpenDDS::DCPS::OwnershipManager::WriterInfo::pub_id_.
00237 { 00238 //change owner 00239 PublicationId new_owner(GUID_UNKNOWN); 00240 if (infos.candidates_.empty()) { 00241 infos.owner_ = WriterInfo(); 00242 00243 } else { 00244 if (sort) { 00245 std::sort(infos.candidates_.begin(), infos.candidates_.end(), 00246 Util::DescendingOwnershipStrengthSort); 00247 } 00248 00249 const WriterInfos::iterator begin = infos.candidates_.begin(); 00250 infos.owner_ = *begin; 00251 infos.candidates_.erase(begin); 00252 new_owner = infos.owner_.pub_id_; 00253 } 00254 00255 broadcast_new_owner(instance_handle, infos, new_owner); 00256 }
void OpenDDS::DCPS::OwnershipManager::remove_owner | ( | const DDS::InstanceHandle_t & | instance_handle | ) |
Remove an owner of the specified instance.
Definition at line 411 of file OwnershipManager.cpp.
References instance_lock_, and instance_ownership_infos_.
Referenced by remove_writer(), and select_owner().
00412 { 00413 ACE_GUARD(ACE_Thread_Mutex, guard, instance_lock_); 00414 00415 const InstanceOwnershipWriterInfos::iterator iter = 00416 instance_ownership_infos_.find(instance_handle); 00417 00418 if (iter != instance_ownership_infos_.end()) { 00419 remove_owner(instance_handle, iter->second, false); 00420 } 00421 }
bool OpenDDS::DCPS::OwnershipManager::remove_writer | ( | const DDS::InstanceHandle_t & | instance_handle, | |
OwnershipWriterInfos & | infos, | |||
const PublicationId & | pub_id | |||
) | [private] |
Definition at line 218 of file OwnershipManager.cpp.
References OpenDDS::DCPS::OwnershipManager::OwnershipWriterInfos::owner_, OpenDDS::DCPS::OwnershipManager::WriterInfo::pub_id_, remove_candidate(), and remove_owner().
00221 { 00222 if (infos.owner_.pub_id_ == pub_id) { 00223 remove_owner(instance_handle, infos, false); 00224 return true; 00225 00226 } else { 00227 remove_candidate(infos, pub_id); 00228 return false; 00229 } 00230 }
bool OpenDDS::DCPS::OwnershipManager::remove_writer | ( | const DDS::InstanceHandle_t & | instance_handle, | |
const PublicationId & | pub_id | |||
) |
Remove a writer that write to the specified instance. Return true if it's the owner writer removed.
Definition at line 203 of file OwnershipManager.cpp.
References instance_lock_, instance_ownership_infos_, and remove_writer().
00205 { 00206 ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, instance_lock_, false); 00207 00208 InstanceOwnershipWriterInfos::iterator the_iter = 00209 instance_ownership_infos_.find(instance_handle); 00210 if (the_iter != instance_ownership_infos_.end()) { 00211 return remove_writer(instance_handle, the_iter->second, pub_id); 00212 } 00213 00214 return false; 00215 }
void OpenDDS::DCPS::OwnershipManager::remove_writer | ( | const PublicationId & | pub_id | ) |
Remove a writer from all instances ownership collection.
Definition at line 119 of file OwnershipManager.cpp.
References instance_lock_, and instance_ownership_infos_.
Referenced by remove_writer().
00120 { 00121 ACE_GUARD(ACE_Thread_Mutex, guard, instance_lock_); 00122 00123 const InstanceOwnershipWriterInfos::iterator the_end = 00124 instance_ownership_infos_.end(); 00125 for (InstanceOwnershipWriterInfos::iterator iter = 00126 instance_ownership_infos_.begin(); iter != the_end; ++iter) { 00127 remove_writer(iter->first, iter->second, pub_id); 00128 } 00129 }
void OpenDDS::DCPS::OwnershipManager::remove_writers | ( | const DDS::InstanceHandle_t & | instance_handle | ) |
Remove all writers that write to the specified instance.
Definition at line 149 of file OwnershipManager.cpp.
References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, instance_lock_, instance_ownership_infos_, and LM_DEBUG.
00150 { 00151 InstanceStateVec instances_to_reset; 00152 { 00153 ACE_GUARD(ACE_Thread_Mutex, guard, instance_lock_); 00154 00155 if (DCPS_debug_level >= 1) { 00156 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) OwnershipManager::remove_writers:") 00157 ACE_TEXT(" disassociate writers with instance %d\n"), 00158 instance_handle)); 00159 } 00160 00161 InstanceOwnershipWriterInfos::iterator owner_wi = 00162 instance_ownership_infos_.find(instance_handle); 00163 if (owner_wi != instance_ownership_infos_.end()) { 00164 owner_wi->second.owner_ = WriterInfo(); 00165 owner_wi->second.candidates_.clear(); 00166 const InstanceStateVec::iterator end = 00167 owner_wi->second.instance_states_.end(); 00168 for (InstanceStateVec::iterator iter = 00169 owner_wi->second.instance_states_.begin(); iter != end; ++iter) { 00170 // call after lock released, will call back to data reader 00171 instances_to_reset.push_back(*iter); 00172 } 00173 owner_wi->second.instance_states_.clear(); 00174 00175 instance_ownership_infos_.erase(owner_wi); 00176 } 00177 } 00178 // Lock released 00179 for (InstanceStateVec::iterator instance = instances_to_reset.begin(); 00180 instance != instances_to_reset.end(); ++instance) { 00181 (*instance)->reset_ownership(instance_handle); 00182 } 00183 }
bool OpenDDS::DCPS::OwnershipManager::select_owner | ( | const DDS::InstanceHandle_t & | instance_handle, | |
const PublicationId & | pub_id, | |||
const CORBA::Long & | ownership_strength, | |||
InstanceState * | instance_state | |||
) |
Determine if the provided publication can be the owner.
Definition at line 284 of file OwnershipManager.cpp.
References broadcast_new_owner(), OpenDDS::DCPS::OwnershipManager::OwnershipWriterInfos::candidates_, OpenDDS::DCPS::Util::DescendingOwnershipStrengthSort(), OpenDDS::DCPS::GUID_UNKNOWN, instance_lock_, instance_ownership_infos_, OpenDDS::DCPS::OwnershipManager::OwnershipWriterInfos::instance_states_, OpenDDS::DCPS::OwnershipManager::OwnershipWriterInfos::owner_, OpenDDS::DCPS::OwnershipManager::WriterInfo::ownership_strength_, OpenDDS::DCPS::OwnershipManager::WriterInfo::pub_id_, OpenDDS::DCPS::InstanceState::registered(), and remove_owner().
00288 { 00289 ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, instance_lock_, false); 00290 00291 InstanceOwnershipWriterInfos::iterator iter = 00292 instance_ownership_infos_.find(instance_handle); 00293 if (iter != instance_ownership_infos_.end()) { 00294 OwnershipWriterInfos& infos = iter->second; 00295 if (!instance_state->registered()) { 00296 infos.instance_states_.push_back(instance_state); 00297 instance_state->registered(true); 00298 } 00299 00300 // No owner at some point. 00301 if (infos.owner_.pub_id_ == GUID_UNKNOWN) { 00302 infos.owner_ = WriterInfo(pub_id, ownership_strength); 00303 broadcast_new_owner(instance_handle, infos, pub_id); 00304 return true; 00305 00306 } else if (infos.owner_.pub_id_ == pub_id) { // is current owner 00307 //still owner but strength changed to be bigger.. 00308 if (infos.owner_.ownership_strength_ <= ownership_strength) { 00309 infos.owner_.ownership_strength_ = ownership_strength; 00310 return true; 00311 00312 } else { //update strength and reevaluate owner which broadcast new owner. 00313 infos.candidates_.push_back(WriterInfo(pub_id, ownership_strength)); 00314 remove_owner(instance_handle, infos, true); 00315 return infos.owner_.pub_id_ == pub_id; 00316 } 00317 00318 } else { // not current owner, reevaluate the owner 00319 bool replace_owner = false; 00320 // Add current owner to candidate list for owner reevaluation 00321 // if provided pub has strength greater than current owner. 00322 if (ownership_strength > infos.owner_.ownership_strength_) { 00323 infos.candidates_.push_back(infos.owner_); 00324 replace_owner = true; 00325 } 00326 00327 bool found = false; 00328 bool sort = true; 00329 00330 // check if it already existed in candidate list. If not, 00331 // add it to the candidate list, otherwise update strength 00332 // if strength was changed. 00333 const WriterInfos::iterator the_end = infos.candidates_.end(); 00334 00335 for (WriterInfos::iterator iter = infos.candidates_.begin(); 00336 iter != the_end; ++iter) { 00337 00338 if (iter->pub_id_ == pub_id) { 00339 if (iter->ownership_strength_ != ownership_strength) { 00340 iter->ownership_strength_ = ownership_strength; 00341 } else { 00342 sort = false; 00343 } 00344 found = true; 00345 break; 00346 } 00347 } 00348 00349 if (!found) { 00350 infos.candidates_.push_back(WriterInfo(pub_id, ownership_strength)); 00351 } 00352 00353 if (sort) { 00354 std::sort(infos.candidates_.begin(), infos.candidates_.end(), 00355 Util::DescendingOwnershipStrengthSort); 00356 } 00357 00358 if (replace_owner) { 00359 // Owner was already moved to candidate list and the list was sorted 00360 // already so pick owner from sorted list and replace current 00361 // owner. 00362 remove_owner(instance_handle, infos, false); 00363 } 00364 00365 return infos.owner_.pub_id_ == pub_id; 00366 } 00367 00368 } else { 00369 // first writer of the instance so it's owner. 00370 OwnershipWriterInfos& infos = instance_ownership_infos_[instance_handle]; 00371 infos.owner_ = WriterInfo(pub_id, ownership_strength); 00372 if (!instance_state->registered()) { 00373 infos.instance_states_.push_back(instance_state); 00374 instance_state->registered(true); 00375 } 00376 broadcast_new_owner(instance_handle, infos, infos.owner_.pub_id_); 00377 return true; 00378 } 00379 00380 return false; 00381 }
void OpenDDS::DCPS::OwnershipManager::set_instance_map | ( | const char * | type_name, | |
const RcHandle< RcObject > & | instance_map, | |||
DataReaderImpl * | reader | |||
) |
The instance map per type is created by the concrete datareader when first sample with the type is received.
Definition at line 77 of file OwnershipManager.cpp.
References ACE_TEXT(), OpenDDS::DCPS::bind(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::RcHandle< T >::in(), LM_DEBUG, LM_ERROR, and type_instance_map_.
00080 { 00081 if (DCPS_debug_level >= 1) { 00082 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) OwnershipManager::set_instance_map ") 00083 ACE_TEXT("instance map %X is created by reader %X \n"), 00084 instance_map.in(), reader)); 00085 } 00086 00087 if (0 != OpenDDS::DCPS::bind(type_instance_map_, type_name, 00088 InstanceMap(instance_map, reader))) { 00089 ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: OwnershipManager::set_instance_map " 00090 "failed to bind instance for type \"%C\"\n", type_name)); 00091 } 00092 }
void OpenDDS::DCPS::OwnershipManager::unregister_reader | ( | const char * | type_name, | |
DataReaderImpl * | reader | |||
) |
The readers that access the instance map are keep tracked as ref counting to the instance map. The readers need unregister itself with the instance map upon unregistering instance.The instance map is deleted upon the last reader unregistering an instance of the type.
Definition at line 95 of file OwnershipManager.cpp.
References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::find(), OpenDDS::DCPS::RcHandle< T >::in(), instance_lock_, LM_DEBUG, OpenDDS::DCPS::OwnershipManager::InstanceMap::map_, OpenDDS::DCPS::OwnershipManager::InstanceMap::readers_, type_instance_map_, and OpenDDS::DCPS::unbind().
00097 { 00098 ACE_GUARD(ACE_Thread_Mutex, guard, instance_lock_); 00099 00100 InstanceMap* instance = 0; 00101 if (0 != find(type_instance_map_, type_name, instance)) { 00102 return; 00103 } 00104 00105 instance->readers_.erase(reader); 00106 00107 if (instance->readers_.empty()) { 00108 if (DCPS_debug_level >= 1) { 00109 ACE_DEBUG((LM_DEBUG, 00110 ACE_TEXT("(%P|%t) OwnershipManager::unregister_reader ") 00111 ACE_TEXT(" instance map %@ is deleted by reader %@\n"), 00112 instance->map_.in(), reader)); 00113 } 00114 unbind(type_instance_map_, type_name); 00115 } 00116 }
void OpenDDS::DCPS::OwnershipManager::update_ownership_strength | ( | const PublicationId & | pub_id, | |
const CORBA::Long & | ownership_strength | |||
) |
Update the ownership strength of a publication.
Definition at line 174 of file OwnershipManager.h.
Referenced by instance_lock_acquire(), instance_lock_release(), is_owner(), remove_instance(), remove_owner(), remove_writer(), remove_writers(), select_owner(), and unregister_reader().
InstanceOwnershipWriterInfos OpenDDS::DCPS::OwnershipManager::instance_ownership_infos_ [private] |
Definition at line 176 of file OwnershipManager.h.
Referenced by is_owner(), remove_instance(), remove_owner(), remove_writer(), remove_writers(), and select_owner().
TypeInstanceMap OpenDDS::DCPS::OwnershipManager::type_instance_map_ [private] |
Definition at line 175 of file OwnershipManager.h.
Referenced by get_instance_map(), set_instance_map(), unregister_reader(), and ~OwnershipManager().