OwnershipManager.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
00009 
00010 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00011 
00012 #include "OwnershipManager.h"
00013 #include "GuidConverter.h"
00014 #include "Util.h"
00015 #include "DataReaderImpl.h"
00016 #include <algorithm>
00017 
00018 
00019 namespace Util {
00020 
00021 bool DescendingOwnershipStrengthSort (const OpenDDS::DCPS::OwnershipManager::WriterInfo& w1, const OpenDDS::DCPS::OwnershipManager::WriterInfo& w2)
00022 {
00023   return w1.ownership_strength_ > w2.ownership_strength_;
00024 };
00025 
00026 } // namespace Util
00027 
00028 namespace OpenDDS {
00029 namespace DCPS {
00030 
00031 //TBD - add check for enabled in most methods.
00032 //      Currently this is not needed because auto_enable_created_entities
00033 //      cannot be false.
00034 
00035 // Implementation skeleton constructor
00036 OwnershipManager::OwnershipManager()
00037 {
00038 }
00039 
00040 // Implementation skeleton destructor
00041 OwnershipManager::~OwnershipManager()
00042 {
00043   // The type->instance should be empty if unregister instance are performed
00044   // by all readers, but in case the instance not unregistered for some reason,
00045   // an error will be logged.
00046   TypeInstanceMap::iterator const the_end = type_instance_map_.end ();
00047   TypeInstanceMap::iterator iter = type_instance_map_.begin ();
00048   while (iter != the_end)
00049   {
00050     // There is no way to pass the instance map to concrete datareader
00051     // to delete, so it will be leaked.
00052     // delete iter->second.map_;
00053     ACE_DEBUG((LM_WARNING,
00054       ACE_TEXT("(%P|%t) OwnershipManager::~OwnershipManager ")
00055       ACE_TEXT("- non-empty type_instance_map_\n")));
00056     ++ iter;
00057   }
00058 
00059   type_instance_map_.clear ();
00060 }
00061 
00062 
00063 int
00064 OwnershipManager::instance_lock_acquire ()
00065 {
00066   return this->instance_lock_.acquire ();
00067 }
00068 
00069 int
00070 OwnershipManager::instance_lock_release ()
00071 {
00072   return this->instance_lock_.release ();
00073 }
00074 
00075 void*
00076 OwnershipManager::get_instance_map (const char* type_name,
00077                                          DataReaderImpl* reader)
00078 {
00079   InstanceMap* instance = 0;
00080   if (0 != find(type_instance_map_, type_name, instance)) {
00081     return 0;
00082   }
00083 
00084   instance->readers_.push_back (reader);
00085   return instance->map_;
00086 }
00087 
00088 void
00089 OwnershipManager::set_instance_map (const char* type_name,
00090                                     void* instance_map,
00091                                     DataReaderImpl* reader)
00092 {
00093   if (DCPS_debug_level >= 1) {
00094     ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) OwnershipManager::set_instance_map ")
00095                           ACE_TEXT (" instance map %X is created by reader %X \n"),
00096                 instance_map, reader));
00097   }
00098 
00099   if (0 != OpenDDS::DCPS::bind(type_instance_map_, type_name, InstanceMap (instance_map, reader))) {
00100     ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: OwnershipManager::set_instance_map failed to "
00101                          "bind instance for type \"%s\"\n",type_name));
00102   }
00103 }
00104 
00105 void
00106 OwnershipManager::unregister_reader (const char* type_name,
00107                                      DataReaderImpl* reader)
00108 {
00109   ACE_GUARD(ACE_Thread_Mutex,
00110             guard,
00111             this->instance_lock_);
00112 
00113   InstanceMap* instance = 0;
00114   if (0 != find(type_instance_map_, type_name, instance)) {
00115     return;
00116   }
00117 
00118   ReaderVec::iterator end = instance->readers_.end();
00119 
00120   for (ReaderVec::iterator it(instance->readers_.begin());
00121       it != end; ++it) {
00122     if (*it == reader) {
00123       instance->readers_.erase (it);
00124       break;
00125     }
00126   }
00127 
00128   if (instance->readers_.empty ()) {
00129     if (DCPS_debug_level >= 1) {
00130       ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) OwnershipManager::unregister_reader ")
00131                             ACE_TEXT (" instance map %X is deleted by reader %X \n"),
00132                   instance->map_, reader));
00133     }
00134     reader->delete_instance_map (instance->map_);
00135     unbind (type_instance_map_, type_name);
00136   }
00137 }
00138 
00139 void
00140 OwnershipManager::remove_writer (const PublicationId& pub_id)
00141 {
00142   ACE_GUARD(ACE_Thread_Mutex,
00143             guard,
00144             this->instance_lock_);
00145 
00146   InstanceOwnershipWriterInfos::iterator const the_end = instance_ownership_infos_.end ();
00147   for (InstanceOwnershipWriterInfos::iterator iter = instance_ownership_infos_.begin ();
00148         iter != the_end; ++ iter) {
00149     this->remove_writer (iter->first, iter->second, pub_id);
00150   }
00151 }
00152 
00153 void
00154 OwnershipManager::remove_instance(InstanceState* instance_state)
00155 {
00156   ACE_GUARD(ACE_Thread_Mutex, guard, this->instance_lock_);
00157   DDS::InstanceHandle_t ih = instance_state->instance_handle();
00158   InstanceOwnershipWriterInfos::iterator i = instance_ownership_infos_.find(ih);
00159   if (i != instance_ownership_infos_.end()) {
00160     for (size_t j = 0; j < i->second.instance_states_.size(); ++j) {
00161       if (i->second.instance_states_[j] == instance_state) {
00162         i->second.instance_states_.erase(i->second.instance_states_.begin() + j);
00163         break;
00164       }
00165     }
00166   }
00167 }
00168 
00169 void
00170 OwnershipManager::remove_writers (const ::DDS::InstanceHandle_t& instance_handle)
00171 {
00172   InstanceStateVec instances_to_reset;
00173   {
00174     ACE_GUARD(ACE_Thread_Mutex,
00175               guard,
00176               this->instance_lock_);
00177 
00178     if (DCPS_debug_level >= 1) {
00179         ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) OwnershipManager::remove_writers: ")
00180                              ACE_TEXT("disassociate writers with instance %d\n"),
00181                              instance_handle));
00182     }
00183 
00184     InstanceOwnershipWriterInfos::iterator owner_wi
00185       = instance_ownership_infos_.find (instance_handle);
00186     if (owner_wi != instance_ownership_infos_.end()) {
00187       owner_wi->second.owner_ = WriterInfo();
00188       owner_wi->second.candidates_.clear ();
00189       InstanceStateVec::iterator const end = owner_wi->second.instance_states_.end();
00190       for (InstanceStateVec::iterator iter = owner_wi->second.instance_states_.begin();
00191         iter != end; ++iter) {
00192           // call after lock released, will call back to data reader
00193           instances_to_reset.push_back(*iter);
00194       }
00195       owner_wi->second.instance_states_.clear ();
00196 
00197       instance_ownership_infos_.erase(owner_wi);
00198     }
00199   }
00200   // Lock released
00201   InstanceStateVec::iterator instance;
00202   for (instance  = instances_to_reset.begin();
00203        instance != instances_to_reset.end(); ++instance)
00204   {
00205     (*instance)->reset_ownership(instance_handle);
00206   }
00207 }
00208 
00209 
00210 bool
00211 OwnershipManager::is_owner (const ::DDS::InstanceHandle_t& instance_handle,
00212                                  const PublicationId& pub_id)
00213 {
00214   ACE_GUARD_RETURN (ACE_Thread_Mutex,
00215                     guard,
00216                     this->instance_lock_,
00217                     false);
00218 
00219   InstanceOwnershipWriterInfos::iterator const the_end = instance_ownership_infos_.end ();
00220 
00221   InstanceOwnershipWriterInfos::iterator iter
00222     = instance_ownership_infos_.find (instance_handle);
00223   if (iter != the_end) {
00224     return iter->second.owner_.pub_id_ == pub_id;
00225   }
00226 
00227   return false;
00228 }
00229 
00230 
00231 bool // owner unregister instance
00232 OwnershipManager::remove_writer (
00233                                  const ::DDS::InstanceHandle_t& instance_handle,
00234                                  const PublicationId& pub_id)
00235 {
00236   ACE_GUARD_RETURN (ACE_Thread_Mutex,
00237                     guard,
00238                     this->instance_lock_,
00239                     false);
00240 
00241   InstanceOwnershipWriterInfos::iterator const the_end = instance_ownership_infos_.end ();
00242 
00243   InstanceOwnershipWriterInfos::iterator the_iter
00244     = instance_ownership_infos_.find (instance_handle);
00245   if (the_iter != the_end) {
00246     return this->remove_writer (instance_handle, the_iter->second, pub_id);
00247   }
00248 
00249   return false;
00250 }
00251 
00252 bool
00253 OwnershipManager::remove_writer (const ::DDS::InstanceHandle_t& instance_handle,
00254                                  OwnershipWriterInfos& infos,
00255                                  const PublicationId& pub_id)
00256 {
00257   if (infos.owner_.pub_id_ == pub_id) {
00258     this->remove_owner (instance_handle, infos, false);
00259     return true;
00260   }
00261   else {
00262     this->remove_candidate (infos, pub_id);
00263     return false;
00264   }
00265 
00266   return false;
00267 }
00268 
00269 
00270 void
00271 OwnershipManager::remove_owner (const ::DDS::InstanceHandle_t& instance_handle,
00272                                 OwnershipWriterInfos& infos,
00273                                 bool sort)
00274 {
00275   //change owner
00276   PublicationId new_owner(GUID_UNKNOWN);
00277   if (infos.candidates_.empty ()) {
00278     infos.owner_ = WriterInfo();
00279   }
00280   else {
00281     if (sort) {
00282       std::sort (infos.candidates_.begin(), infos.candidates_.end(),
00283                  ::Util::DescendingOwnershipStrengthSort);
00284     }
00285 
00286     WriterInfos::iterator begin = infos.candidates_.begin();
00287     infos.owner_ = *begin;
00288     infos.candidates_.erase (begin);
00289     new_owner = infos.owner_.pub_id_;
00290   }
00291 
00292   this->broadcast_new_owner (instance_handle, infos, new_owner);
00293 }
00294 
00295 
00296 void
00297 OwnershipManager::remove_candidate (OwnershipWriterInfos& infos,const PublicationId& pub_id)
00298 {
00299   if (! infos.candidates_.empty ()) {
00300     WriterInfos::iterator const the_end = infos.candidates_.end();
00301 
00302     WriterInfos::iterator found_candidate = the_end;
00303     // Supplied writer is not an owner, check if it exists in candicate list.If not,
00304     // add it to the candidate list and sort the list.
00305     for (WriterInfos::iterator iter = infos.candidates_.begin ();
00306       iter != the_end; ++iter) {
00307 
00308       if (iter->pub_id_ == pub_id) {
00309         found_candidate = iter;
00310         break;
00311       }
00312     }
00313 
00314     if (found_candidate != the_end) {
00315       infos.candidates_.erase (found_candidate);
00316     }
00317   }
00318 }
00319 
00320 bool
00321 OwnershipManager::select_owner (const ::DDS::InstanceHandle_t& instance_handle,
00322                                      const PublicationId& pub_id,
00323                                      const CORBA::Long& ownership_strength,
00324                                      InstanceState* instance_state)
00325 {
00326   ACE_GUARD_RETURN (ACE_Thread_Mutex,
00327                     guard,
00328                     this->instance_lock_,
00329                     false);
00330 
00331   InstanceOwnershipWriterInfos::iterator const the_end = instance_ownership_infos_.end ();
00332 
00333   InstanceOwnershipWriterInfos::iterator iter
00334     = instance_ownership_infos_.find (instance_handle);
00335   if (iter != the_end) {
00336     OwnershipWriterInfos& infos = iter->second;
00337     if (!instance_state->registered()) {
00338       infos.instance_states_.push_back (instance_state);
00339       instance_state->registered(true);
00340     }
00341 
00342     // No owner at some point.
00343     if (infos.owner_.pub_id_ == GUID_UNKNOWN) {
00344       infos.owner_ = WriterInfo(pub_id,ownership_strength);
00345       this->broadcast_new_owner (instance_handle, infos, pub_id);
00346 
00347       return true;
00348     }
00349     else if (infos.owner_.pub_id_ == pub_id) { // is current owner
00350       //still owner but strength changed to be bigger..
00351       if (infos.owner_.ownership_strength_ <= ownership_strength) {
00352         infos.owner_.ownership_strength_ = ownership_strength;
00353         return true;
00354       }
00355       else { //update strength and reevaluate owner which broadcast new owner.
00356         infos.candidates_.push_back (WriterInfo(pub_id,ownership_strength));
00357         this->remove_owner (instance_handle, infos, true);
00358         return infos.owner_.pub_id_ == pub_id;
00359       }
00360     }
00361     else { // not current owner, reevaluate the owner
00362       bool replace_owner = false;
00363       // Add current owner to candidate list for owner reevaluation
00364       // if provided pub has strength greater than currrent owner.
00365       if (ownership_strength > infos.owner_.ownership_strength_) {
00366         infos.candidates_.push_back (infos.owner_);
00367         replace_owner = true;
00368       }
00369 
00370       bool found = false;
00371       bool sort = true;
00372 
00373       // check if it already existed in candicate list. If not,
00374       // add it to the candidate list, otherwise update strength
00375       // if strength was changed.
00376       WriterInfos::iterator const the_end = infos.candidates_.end();
00377 
00378       for (WriterInfos::iterator iter = infos.candidates_.begin();
00379         iter != the_end; ++iter) {
00380 
00381         if (iter->pub_id_ == pub_id) {
00382           if (iter->ownership_strength_ != ownership_strength) {
00383             iter->ownership_strength_ = ownership_strength;
00384           }
00385           else {
00386             sort = false;
00387           }
00388           found = true;
00389           break;
00390         }
00391       }
00392 
00393       if (!found) {
00394         infos.candidates_.push_back (WriterInfo(pub_id,ownership_strength));
00395       }
00396 
00397       if (sort) {
00398         std::sort (infos.candidates_.begin(), infos.candidates_.end(), ::Util::DescendingOwnershipStrengthSort);
00399       }
00400 
00401       if (replace_owner) {
00402         // Owner was already moved to candidate list and the list was sorted
00403         // already so pick owner from sorted list and replace current
00404         // owner.
00405         this->remove_owner (instance_handle, infos, false);
00406       }
00407 
00408       return infos.owner_.pub_id_ == pub_id;
00409     }
00410   }
00411   else {
00412     // first writer of the instance so it's owner.
00413     OwnershipWriterInfos& infos = instance_ownership_infos_[instance_handle];
00414     infos.owner_ = WriterInfo(pub_id,ownership_strength);
00415     if (!instance_state->registered()) {
00416       infos.instance_states_.push_back (instance_state);
00417       instance_state->registered(true);
00418     }
00419     this->broadcast_new_owner (instance_handle, infos, infos.owner_.pub_id_);
00420     return true;
00421   }
00422 
00423   return false;
00424 }
00425 
00426 
00427 void
00428 OwnershipManager::broadcast_new_owner ( const ::DDS::InstanceHandle_t& instance_handle,
00429                                         OwnershipWriterInfos& infos,
00430                                         const PublicationId& owner)
00431 {
00432   if (DCPS_debug_level >= 1) {
00433     // This may not be an error since it could happen that the sample
00434     // is delivered to the datareader after the write is dis-associated
00435     // with this datareader.
00436     GuidConverter writer_converter(owner);
00437     ACE_DEBUG((LM_DEBUG,
00438       ACE_TEXT("(%P|%t) OwnershipManager::broadcast_new_owner: ")
00439       ACE_TEXT("owner writer %C, instance handle %d strength %d num of candidates %d\n"),
00440       OPENDDS_STRING(writer_converter).c_str(),
00441       instance_handle, infos.owner_.ownership_strength_, infos.candidates_.size()));
00442   }
00443 
00444   InstanceStateVec::iterator const the_end = infos.instance_states_.end();
00445   for (InstanceStateVec::iterator iter = infos.instance_states_.begin ();
00446     iter != the_end; ++iter) {
00447     (*iter)->set_owner (owner);
00448   }
00449 }
00450 
00451 void
00452 OwnershipManager::remove_owner (const ::DDS::InstanceHandle_t& instance_handle)
00453 {
00454   ACE_GUARD(ACE_Thread_Mutex,
00455             guard,
00456             this->instance_lock_);
00457 
00458   InstanceOwnershipWriterInfos::iterator iter
00459     = instance_ownership_infos_.find (instance_handle);
00460 
00461   this->remove_owner (instance_handle, iter->second, false);
00462 }
00463 
00464 
00465 } // namespace DCPS
00466 } // namespace OpenDDS
00467 
00468 #endif // OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE

Generated on Fri Feb 12 20:05:24 2016 for OpenDDS by  doxygen 1.4.7