OwnershipManager.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
00009 
00010 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00011 
00012 #include "OwnershipManager.h"
00013 #include "GuidConverter.h"
00014 #include "Util.h"
00015 #include "DataReaderImpl.h"
00016 #include <algorithm>
00017 
00018 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00019 
00020 namespace OpenDDS {
00021 namespace DCPS {
00022 
00023 namespace Util {
00024 
00025 bool DescendingOwnershipStrengthSort(const OwnershipManager::WriterInfo& w1,
00026                                      const OwnershipManager::WriterInfo& w2)
00027 {
00028   return w1.ownership_strength_ > w2.ownership_strength_;
00029 }
00030 
00031 } // namespace Util
00032 
00033 OwnershipManager::OwnershipManager()
00034 {
00035 }
00036 
00037 OwnershipManager::~OwnershipManager()
00038 {
00039   // The type->instance should be empty if unregister instance are performed
00040   // by all readers, but in case the instance not unregistered for some reason,
00041   // an error will be logged.
00042   if (!type_instance_map_.empty()) {
00043     // There is no way to pass the instance map to concrete datareader
00044     // to delete, so it will be leaked.
00045     ACE_DEBUG((LM_WARNING,
00046                ACE_TEXT("(%P|%t) OwnershipManager::~OwnershipManager ")
00047                ACE_TEXT("- non-empty type_instance_map_\n")));
00048   }
00049 }
00050 
00051 int
00052 OwnershipManager::instance_lock_acquire()
00053 {
00054   return instance_lock_.acquire();
00055 }
00056 
00057 int
00058 OwnershipManager::instance_lock_release()
00059 {
00060   return instance_lock_.release();
00061 }
00062 
00063 RcHandle<RcObject>
00064 OwnershipManager::get_instance_map(const char* type_name,
00065                                    DataReaderImpl* reader)
00066 {
00067   InstanceMap* instance = 0;
00068   if (0 != find(type_instance_map_, type_name, instance)) {
00069     return RcHandle<RcObject>();
00070   }
00071 
00072   instance->readers_.insert(reader);
00073   return instance->map_;
00074 }
00075 
00076 void
00077 OwnershipManager::set_instance_map(const char* type_name,
00078                                    const RcHandle<RcObject>& instance_map,
00079                                    DataReaderImpl* reader)
00080 {
00081   if (DCPS_debug_level >= 1) {
00082     ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) OwnershipManager::set_instance_map ")
00083                ACE_TEXT("instance map %X is created by reader %X \n"),
00084                instance_map.in(), reader));
00085   }
00086 
00087   if (0 != OpenDDS::DCPS::bind(type_instance_map_, type_name,
00088                                InstanceMap(instance_map, reader))) {
00089     ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: OwnershipManager::set_instance_map "
00090                "failed to bind instance for type \"%C\"\n", type_name));
00091   }
00092 }
00093 
00094 void
00095 OwnershipManager::unregister_reader(const char* type_name,
00096                                     DataReaderImpl* reader)
00097 {
00098   ACE_GUARD(ACE_Thread_Mutex, guard, instance_lock_);
00099 
00100   InstanceMap* instance = 0;
00101   if (0 != find(type_instance_map_, type_name, instance)) {
00102     return;
00103   }
00104 
00105   instance->readers_.erase(reader);
00106 
00107   if (instance->readers_.empty()) {
00108     if (DCPS_debug_level >= 1) {
00109       ACE_DEBUG((LM_DEBUG,
00110                  ACE_TEXT("(%P|%t) OwnershipManager::unregister_reader ")
00111                  ACE_TEXT(" instance map %@ is deleted by reader %@\n"),
00112                  instance->map_.in(), reader));
00113     }
00114     unbind(type_instance_map_, type_name);
00115   }
00116 }
00117 
00118 void
00119 OwnershipManager::remove_writer(const PublicationId& pub_id)
00120 {
00121   ACE_GUARD(ACE_Thread_Mutex, guard, instance_lock_);
00122 
00123   const InstanceOwnershipWriterInfos::iterator the_end =
00124     instance_ownership_infos_.end();
00125   for (InstanceOwnershipWriterInfos::iterator iter =
00126          instance_ownership_infos_.begin(); iter != the_end; ++iter) {
00127     remove_writer(iter->first, iter->second, pub_id);
00128   }
00129 }
00130 
00131 void
00132 OwnershipManager::remove_instance(InstanceState* instance_state)
00133 {
00134   ACE_GUARD(ACE_Thread_Mutex, guard, instance_lock_);
00135   const DDS::InstanceHandle_t ih = instance_state->instance_handle();
00136   InstanceOwnershipWriterInfos::iterator i = instance_ownership_infos_.find(ih);
00137   if (i != instance_ownership_infos_.end()) {
00138     InstanceStateVec& states = i->second.instance_states_;
00139     for (size_t j = 0; j < states.size(); ++j) {
00140       if (states[j] == instance_state) {
00141         states.erase(states.begin() + j);
00142         break;
00143       }
00144     }
00145   }
00146 }
00147 
00148 void
00149 OwnershipManager::remove_writers(const DDS::InstanceHandle_t& instance_handle)
00150 {
00151   InstanceStateVec instances_to_reset;
00152   {
00153     ACE_GUARD(ACE_Thread_Mutex, guard, instance_lock_);
00154 
00155     if (DCPS_debug_level >= 1) {
00156       ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) OwnershipManager::remove_writers:")
00157                  ACE_TEXT(" disassociate writers with instance %d\n"),
00158                  instance_handle));
00159     }
00160 
00161     InstanceOwnershipWriterInfos::iterator owner_wi =
00162       instance_ownership_infos_.find(instance_handle);
00163     if (owner_wi != instance_ownership_infos_.end()) {
00164       owner_wi->second.owner_ = WriterInfo();
00165       owner_wi->second.candidates_.clear();
00166       const InstanceStateVec::iterator end =
00167         owner_wi->second.instance_states_.end();
00168       for (InstanceStateVec::iterator iter =
00169              owner_wi->second.instance_states_.begin(); iter != end; ++iter) {
00170         // call after lock released, will call back to data reader
00171         instances_to_reset.push_back(*iter);
00172       }
00173       owner_wi->second.instance_states_.clear();
00174 
00175       instance_ownership_infos_.erase(owner_wi);
00176     }
00177   }
00178   // Lock released
00179   for (InstanceStateVec::iterator instance = instances_to_reset.begin();
00180        instance != instances_to_reset.end(); ++instance) {
00181     (*instance)->reset_ownership(instance_handle);
00182   }
00183 }
00184 
00185 
00186 bool
00187 OwnershipManager::is_owner(const DDS::InstanceHandle_t& instance_handle,
00188                            const PublicationId& pub_id)
00189 {
00190   ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, instance_lock_, false);
00191 
00192   InstanceOwnershipWriterInfos::iterator iter
00193     = instance_ownership_infos_.find(instance_handle);
00194   if (iter != instance_ownership_infos_.end()) {
00195     return iter->second.owner_.pub_id_ == pub_id;
00196   }
00197 
00198   return false;
00199 }
00200 
00201 
00202 bool // owner unregister instance
00203 OwnershipManager::remove_writer(const DDS::InstanceHandle_t& instance_handle,
00204                                 const PublicationId& pub_id)
00205 {
00206   ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, instance_lock_, false);
00207 
00208   InstanceOwnershipWriterInfos::iterator the_iter =
00209     instance_ownership_infos_.find(instance_handle);
00210   if (the_iter != instance_ownership_infos_.end()) {
00211     return remove_writer(instance_handle, the_iter->second, pub_id);
00212   }
00213 
00214   return false;
00215 }
00216 
00217 bool
00218 OwnershipManager::remove_writer(const DDS::InstanceHandle_t& instance_handle,
00219                                 OwnershipWriterInfos& infos,
00220                                 const PublicationId& pub_id)
00221 {
00222   if (infos.owner_.pub_id_ == pub_id) {
00223     remove_owner(instance_handle, infos, false);
00224     return true;
00225 
00226   } else {
00227     remove_candidate(infos, pub_id);
00228     return false;
00229   }
00230 }
00231 
00232 
00233 void
00234 OwnershipManager::remove_owner(const DDS::InstanceHandle_t& instance_handle,
00235                                OwnershipWriterInfos& infos,
00236                                bool sort)
00237 {
00238   //change owner
00239   PublicationId new_owner(GUID_UNKNOWN);
00240   if (infos.candidates_.empty()) {
00241     infos.owner_ = WriterInfo();
00242 
00243   } else {
00244     if (sort) {
00245       std::sort(infos.candidates_.begin(), infos.candidates_.end(),
00246                 Util::DescendingOwnershipStrengthSort);
00247     }
00248 
00249     const WriterInfos::iterator begin = infos.candidates_.begin();
00250     infos.owner_ = *begin;
00251     infos.candidates_.erase(begin);
00252     new_owner = infos.owner_.pub_id_;
00253   }
00254 
00255   broadcast_new_owner(instance_handle, infos, new_owner);
00256 }
00257 
00258 
00259 void
00260 OwnershipManager::remove_candidate(OwnershipWriterInfos& infos,
00261                                    const PublicationId& pub_id)
00262 {
00263   if (!infos.candidates_.empty()) {
00264     WriterInfos::iterator const the_end = infos.candidates_.end();
00265 
00266     WriterInfos::iterator found_candidate = the_end;
00267     // Supplied writer is not an owner, check if it exists in candidate list.
00268     // If not, add it to the candidate list and sort the list.
00269     for (WriterInfos::iterator iter = infos.candidates_.begin();
00270          iter != the_end; ++iter) {
00271       if (iter->pub_id_ == pub_id) {
00272         found_candidate = iter;
00273         break;
00274       }
00275     }
00276 
00277     if (found_candidate != the_end) {
00278       infos.candidates_.erase(found_candidate);
00279     }
00280   }
00281 }
00282 
00283 bool
00284 OwnershipManager::select_owner(const DDS::InstanceHandle_t& instance_handle,
00285                                const PublicationId& pub_id,
00286                                const CORBA::Long& ownership_strength,
00287                                InstanceState* instance_state)
00288 {
00289   ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, instance_lock_, false);
00290 
00291   InstanceOwnershipWriterInfos::iterator iter =
00292     instance_ownership_infos_.find(instance_handle);
00293   if (iter != instance_ownership_infos_.end()) {
00294     OwnershipWriterInfos& infos = iter->second;
00295     if (!instance_state->registered()) {
00296       infos.instance_states_.push_back(instance_state);
00297       instance_state->registered(true);
00298     }
00299 
00300     // No owner at some point.
00301     if (infos.owner_.pub_id_ == GUID_UNKNOWN) {
00302       infos.owner_ = WriterInfo(pub_id, ownership_strength);
00303       broadcast_new_owner(instance_handle, infos, pub_id);
00304       return true;
00305 
00306     } else if (infos.owner_.pub_id_ == pub_id) { // is current owner
00307       //still owner but strength changed to be bigger..
00308       if (infos.owner_.ownership_strength_ <= ownership_strength) {
00309         infos.owner_.ownership_strength_ = ownership_strength;
00310         return true;
00311 
00312       } else { //update strength and reevaluate owner which broadcast new owner.
00313         infos.candidates_.push_back(WriterInfo(pub_id, ownership_strength));
00314         remove_owner(instance_handle, infos, true);
00315         return infos.owner_.pub_id_ == pub_id;
00316       }
00317 
00318     } else { // not current owner, reevaluate the owner
00319       bool replace_owner = false;
00320       // Add current owner to candidate list for owner reevaluation
00321       // if provided pub has strength greater than current owner.
00322       if (ownership_strength > infos.owner_.ownership_strength_) {
00323         infos.candidates_.push_back(infos.owner_);
00324         replace_owner = true;
00325       }
00326 
00327       bool found = false;
00328       bool sort = true;
00329 
00330       // check if it already existed in candidate list. If not,
00331       // add it to the candidate list, otherwise update strength
00332       // if strength was changed.
00333       const WriterInfos::iterator the_end = infos.candidates_.end();
00334 
00335       for (WriterInfos::iterator iter = infos.candidates_.begin();
00336            iter != the_end; ++iter) {
00337 
00338         if (iter->pub_id_ == pub_id) {
00339           if (iter->ownership_strength_ != ownership_strength) {
00340             iter->ownership_strength_ = ownership_strength;
00341           } else {
00342             sort = false;
00343           }
00344           found = true;
00345           break;
00346         }
00347       }
00348 
00349       if (!found) {
00350         infos.candidates_.push_back(WriterInfo(pub_id, ownership_strength));
00351       }
00352 
00353       if (sort) {
00354         std::sort(infos.candidates_.begin(), infos.candidates_.end(),
00355                   Util::DescendingOwnershipStrengthSort);
00356       }
00357 
00358       if (replace_owner) {
00359         // Owner was already moved to candidate list and the list was sorted
00360         // already so pick owner from sorted list and replace current
00361         // owner.
00362         remove_owner(instance_handle, infos, false);
00363       }
00364 
00365       return infos.owner_.pub_id_ == pub_id;
00366     }
00367 
00368   } else {
00369     // first writer of the instance so it's owner.
00370     OwnershipWriterInfos& infos = instance_ownership_infos_[instance_handle];
00371     infos.owner_ = WriterInfo(pub_id, ownership_strength);
00372     if (!instance_state->registered()) {
00373       infos.instance_states_.push_back(instance_state);
00374       instance_state->registered(true);
00375     }
00376     broadcast_new_owner(instance_handle, infos, infos.owner_.pub_id_);
00377     return true;
00378   }
00379 
00380   return false;
00381 }
00382 
00383 
00384 void
00385 OwnershipManager::broadcast_new_owner(const DDS::InstanceHandle_t& instance_handle,
00386                                       OwnershipWriterInfos& infos,
00387                                       const PublicationId& owner)
00388 {
00389   if (DCPS_debug_level >= 1) {
00390     // This may not be an error since it could happen that the sample
00391     // is delivered to the datareader after the write is dis-associated
00392     // with this datareader.
00393     GuidConverter writer_converter(owner);
00394     ACE_DEBUG((LM_DEBUG,
00395                ACE_TEXT("(%P|%t) OwnershipManager::broadcast_new_owner: ")
00396                ACE_TEXT("owner writer %C, instance handle %d strength %d num ")
00397                ACE_TEXT("of candidates %d\n"),
00398                OPENDDS_STRING(writer_converter).c_str(), instance_handle,
00399                infos.owner_.ownership_strength_,
00400                (int)infos.candidates_.size()));
00401   }
00402 
00403   const InstanceStateVec::iterator the_end = infos.instance_states_.end();
00404   for (InstanceStateVec::iterator iter = infos.instance_states_.begin();
00405        iter != the_end; ++iter) {
00406     (*iter)->set_owner(owner);
00407   }
00408 }
00409 
00410 void
00411 OwnershipManager::remove_owner(const DDS::InstanceHandle_t& instance_handle)
00412 {
00413   ACE_GUARD(ACE_Thread_Mutex, guard, instance_lock_);
00414 
00415   const InstanceOwnershipWriterInfos::iterator iter =
00416     instance_ownership_infos_.find(instance_handle);
00417 
00418   if (iter != instance_ownership_infos_.end()) {
00419     remove_owner(instance_handle, iter->second, false);
00420   }
00421 }
00422 
00423 
00424 } // namespace DCPS
00425 } // namespace OpenDDS
00426 
00427 OPENDDS_END_VERSIONED_NAMESPACE_DECL
00428 
00429 #endif // OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1