OpenDDS::DCPS::OwnershipManager Class Reference

#include <OwnershipManager.h>

Collaboration diagram for OpenDDS::DCPS::OwnershipManager:
Collaboration graph
[legend]

List of all members.

Classes

struct  InstanceMap
struct  OwnershipWriterInfos
struct  WriterInfo

Public Member Functions

typedef OPENDDS_SET (DataReaderImpl *) ReaderSet
typedef OPENDDS_MAP (OPENDDS_STRING, InstanceMap) TypeInstanceMap
typedef OPENDDS_VECTOR (WriterInfo) WriterInfos
typedef OPENDDS_VECTOR (InstanceState *) InstanceStateVec
typedef OPENDDS_MAP (DDS::InstanceHandle_t, OwnershipWriterInfos) InstanceOwnershipWriterInfos
 OwnershipManager ()
 ~OwnershipManager ()
int instance_lock_acquire ()
int instance_lock_release ()
void set_instance_map (const char *type_name, const RcHandle< RcObject > &instance_map, DataReaderImpl *reader)
RcHandle< RcObjectget_instance_map (const char *type_name, DataReaderImpl *reader)
void unregister_reader (const char *type_name, DataReaderImpl *reader)
void remove_writer (const PublicationId &pub_id)
void remove_writers (const DDS::InstanceHandle_t &instance_handle)
bool remove_writer (const DDS::InstanceHandle_t &instance_handle, const PublicationId &pub_id)
bool is_owner (const DDS::InstanceHandle_t &instance_handle, const PublicationId &pub_id)
bool select_owner (const DDS::InstanceHandle_t &instance_handle, const PublicationId &pub_id, const CORBA::Long &ownership_strength, InstanceState *instance_state)
void remove_owner (const DDS::InstanceHandle_t &instance_handle)
void remove_instance (InstanceState *instance_state)
void update_ownership_strength (const PublicationId &pub_id, const CORBA::Long &ownership_strength)

Private Member Functions

bool remove_writer (const DDS::InstanceHandle_t &instance_handle, OwnershipWriterInfos &infos, const PublicationId &pub_id)
void remove_owner (const DDS::InstanceHandle_t &instance_handle, OwnershipWriterInfos &infos, bool sort)
void remove_candidate (OwnershipWriterInfos &infos, const PublicationId &pub_id)
void broadcast_new_owner (const DDS::InstanceHandle_t &instance_handle, OwnershipWriterInfos &infos, const PublicationId &owner)

Private Attributes

ACE_Thread_Mutex instance_lock_
TypeInstanceMap type_instance_map_
InstanceOwnershipWriterInfos instance_ownership_infos_

Detailed Description

Definition at line 32 of file OwnershipManager.h.


Constructor & Destructor Documentation

OpenDDS::DCPS::OwnershipManager::OwnershipManager (  ) 

Definition at line 33 of file OwnershipManager.cpp.

00034 {
00035 }

OpenDDS::DCPS::OwnershipManager::~OwnershipManager (  ) 

Definition at line 37 of file OwnershipManager.cpp.

References ACE_TEXT(), LM_WARNING, and type_instance_map_.

00038 {
00039   // The type->instance should be empty if unregister instance are performed
00040   // by all readers, but in case the instance not unregistered for some reason,
00041   // an error will be logged.
00042   if (!type_instance_map_.empty()) {
00043     // There is no way to pass the instance map to concrete datareader
00044     // to delete, so it will be leaked.
00045     ACE_DEBUG((LM_WARNING,
00046                ACE_TEXT("(%P|%t) OwnershipManager::~OwnershipManager ")
00047                ACE_TEXT("- non-empty type_instance_map_\n")));
00048   }
00049 }

Here is the call graph for this function:


Member Function Documentation

void OpenDDS::DCPS::OwnershipManager::broadcast_new_owner ( const DDS::InstanceHandle_t instance_handle,
OwnershipWriterInfos infos,
const PublicationId owner 
) [private]

Definition at line 385 of file OwnershipManager.cpp.

References ACE_TEXT(), OpenDDS::DCPS::OwnershipManager::OwnershipWriterInfos::candidates_, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::OwnershipManager::OwnershipWriterInfos::instance_states_, LM_DEBUG, OPENDDS_STRING, OpenDDS::DCPS::OwnershipManager::OwnershipWriterInfos::owner_, and OpenDDS::DCPS::OwnershipManager::WriterInfo::ownership_strength_.

Referenced by remove_owner(), and select_owner().

00388 {
00389   if (DCPS_debug_level >= 1) {
00390     // This may not be an error since it could happen that the sample
00391     // is delivered to the datareader after the write is dis-associated
00392     // with this datareader.
00393     GuidConverter writer_converter(owner);
00394     ACE_DEBUG((LM_DEBUG,
00395                ACE_TEXT("(%P|%t) OwnershipManager::broadcast_new_owner: ")
00396                ACE_TEXT("owner writer %C, instance handle %d strength %d num ")
00397                ACE_TEXT("of candidates %d\n"),
00398                OPENDDS_STRING(writer_converter).c_str(), instance_handle,
00399                infos.owner_.ownership_strength_,
00400                (int)infos.candidates_.size()));
00401   }
00402 
00403   const InstanceStateVec::iterator the_end = infos.instance_states_.end();
00404   for (InstanceStateVec::iterator iter = infos.instance_states_.begin();
00405        iter != the_end; ++iter) {
00406     (*iter)->set_owner(owner);
00407   }
00408 }

Here is the call graph for this function:

Here is the caller graph for this function:

RcHandle< RcObject > OpenDDS::DCPS::OwnershipManager::get_instance_map ( const char *  type_name,
DataReaderImpl reader 
)

Accesor of the instance map for provided type. It is called once for each new instance in a datareader.

Definition at line 64 of file OwnershipManager.cpp.

References OpenDDS::DCPS::find(), OpenDDS::DCPS::OwnershipManager::InstanceMap::map_, OpenDDS::DCPS::OwnershipManager::InstanceMap::readers_, and type_instance_map_.

00066 {
00067   InstanceMap* instance = 0;
00068   if (0 != find(type_instance_map_, type_name, instance)) {
00069     return RcHandle<RcObject>();
00070   }
00071 
00072   instance->readers_.insert(reader);
00073   return instance->map_;
00074 }

Here is the call graph for this function:

int OpenDDS::DCPS::OwnershipManager::instance_lock_acquire (  ) 

Acquire/release lock for type instance map. The following functions are synchronized by instance_lock_.

Definition at line 52 of file OwnershipManager.cpp.

References ACE_Thread_Mutex::acquire(), and instance_lock_.

00053 {
00054   return instance_lock_.acquire();
00055 }

Here is the call graph for this function:

int OpenDDS::DCPS::OwnershipManager::instance_lock_release (  ) 

Definition at line 58 of file OwnershipManager.cpp.

References instance_lock_, and ACE_Thread_Mutex::release().

00059 {
00060   return instance_lock_.release();
00061 }

Here is the call graph for this function:

bool OpenDDS::DCPS::OwnershipManager::is_owner ( const DDS::InstanceHandle_t instance_handle,
const PublicationId pub_id 
)

Return true if the provide writer is the owner of the instance.

Definition at line 187 of file OwnershipManager.cpp.

References instance_lock_, and instance_ownership_infos_.

00189 {
00190   ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, instance_lock_, false);
00191 
00192   InstanceOwnershipWriterInfos::iterator iter
00193     = instance_ownership_infos_.find(instance_handle);
00194   if (iter != instance_ownership_infos_.end()) {
00195     return iter->second.owner_.pub_id_ == pub_id;
00196   }
00197 
00198   return false;
00199 }

typedef OpenDDS::DCPS::OwnershipManager::OPENDDS_MAP ( DDS::InstanceHandle_t  ,
OwnershipWriterInfos   
)
typedef OpenDDS::DCPS::OwnershipManager::OPENDDS_MAP ( OPENDDS_STRING  ,
InstanceMap   
)
typedef OpenDDS::DCPS::OwnershipManager::OPENDDS_SET ( DataReaderImpl  ) 
typedef OpenDDS::DCPS::OwnershipManager::OPENDDS_VECTOR ( InstanceState  ) 
typedef OpenDDS::DCPS::OwnershipManager::OPENDDS_VECTOR ( WriterInfo   ) 
void OpenDDS::DCPS::OwnershipManager::remove_candidate ( OwnershipWriterInfos infos,
const PublicationId pub_id 
) [private]

Definition at line 260 of file OwnershipManager.cpp.

References OpenDDS::DCPS::OwnershipManager::OwnershipWriterInfos::candidates_.

Referenced by remove_writer().

00262 {
00263   if (!infos.candidates_.empty()) {
00264     WriterInfos::iterator const the_end = infos.candidates_.end();
00265 
00266     WriterInfos::iterator found_candidate = the_end;
00267     // Supplied writer is not an owner, check if it exists in candidate list.
00268     // If not, add it to the candidate list and sort the list.
00269     for (WriterInfos::iterator iter = infos.candidates_.begin();
00270          iter != the_end; ++iter) {
00271       if (iter->pub_id_ == pub_id) {
00272         found_candidate = iter;
00273         break;
00274       }
00275     }
00276 
00277     if (found_candidate != the_end) {
00278       infos.candidates_.erase(found_candidate);
00279     }
00280   }
00281 }

Here is the caller graph for this function:

void OpenDDS::DCPS::OwnershipManager::remove_instance ( InstanceState instance_state  ) 

Definition at line 132 of file OwnershipManager.cpp.

References OpenDDS::DCPS::InstanceState::instance_handle(), instance_lock_, and instance_ownership_infos_.

00133 {
00134   ACE_GUARD(ACE_Thread_Mutex, guard, instance_lock_);
00135   const DDS::InstanceHandle_t ih = instance_state->instance_handle();
00136   InstanceOwnershipWriterInfos::iterator i = instance_ownership_infos_.find(ih);
00137   if (i != instance_ownership_infos_.end()) {
00138     InstanceStateVec& states = i->second.instance_states_;
00139     for (size_t j = 0; j < states.size(); ++j) {
00140       if (states[j] == instance_state) {
00141         states.erase(states.begin() + j);
00142         break;
00143       }
00144     }
00145   }
00146 }

Here is the call graph for this function:

void OpenDDS::DCPS::OwnershipManager::remove_owner ( const DDS::InstanceHandle_t instance_handle,
OwnershipWriterInfos infos,
bool  sort 
) [private]

Definition at line 234 of file OwnershipManager.cpp.

References broadcast_new_owner(), OpenDDS::DCPS::OwnershipManager::OwnershipWriterInfos::candidates_, OpenDDS::DCPS::Util::DescendingOwnershipStrengthSort(), OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::DCPS::OwnershipManager::OwnershipWriterInfos::owner_, and OpenDDS::DCPS::OwnershipManager::WriterInfo::pub_id_.

00237 {
00238   //change owner
00239   PublicationId new_owner(GUID_UNKNOWN);
00240   if (infos.candidates_.empty()) {
00241     infos.owner_ = WriterInfo();
00242 
00243   } else {
00244     if (sort) {
00245       std::sort(infos.candidates_.begin(), infos.candidates_.end(),
00246                 Util::DescendingOwnershipStrengthSort);
00247     }
00248 
00249     const WriterInfos::iterator begin = infos.candidates_.begin();
00250     infos.owner_ = *begin;
00251     infos.candidates_.erase(begin);
00252     new_owner = infos.owner_.pub_id_;
00253   }
00254 
00255   broadcast_new_owner(instance_handle, infos, new_owner);
00256 }

Here is the call graph for this function:

void OpenDDS::DCPS::OwnershipManager::remove_owner ( const DDS::InstanceHandle_t instance_handle  ) 

Remove an owner of the specified instance.

Definition at line 411 of file OwnershipManager.cpp.

References instance_lock_, and instance_ownership_infos_.

Referenced by remove_writer(), and select_owner().

00412 {
00413   ACE_GUARD(ACE_Thread_Mutex, guard, instance_lock_);
00414 
00415   const InstanceOwnershipWriterInfos::iterator iter =
00416     instance_ownership_infos_.find(instance_handle);
00417 
00418   if (iter != instance_ownership_infos_.end()) {
00419     remove_owner(instance_handle, iter->second, false);
00420   }
00421 }

Here is the caller graph for this function:

bool OpenDDS::DCPS::OwnershipManager::remove_writer ( const DDS::InstanceHandle_t instance_handle,
OwnershipWriterInfos infos,
const PublicationId pub_id 
) [private]

Definition at line 218 of file OwnershipManager.cpp.

References OpenDDS::DCPS::OwnershipManager::OwnershipWriterInfos::owner_, OpenDDS::DCPS::OwnershipManager::WriterInfo::pub_id_, remove_candidate(), and remove_owner().

00221 {
00222   if (infos.owner_.pub_id_ == pub_id) {
00223     remove_owner(instance_handle, infos, false);
00224     return true;
00225 
00226   } else {
00227     remove_candidate(infos, pub_id);
00228     return false;
00229   }
00230 }

Here is the call graph for this function:

bool OpenDDS::DCPS::OwnershipManager::remove_writer ( const DDS::InstanceHandle_t instance_handle,
const PublicationId pub_id 
)

Remove a writer that write to the specified instance. Return true if it's the owner writer removed.

Definition at line 203 of file OwnershipManager.cpp.

References instance_lock_, instance_ownership_infos_, and remove_writer().

00205 {
00206   ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, instance_lock_, false);
00207 
00208   InstanceOwnershipWriterInfos::iterator the_iter =
00209     instance_ownership_infos_.find(instance_handle);
00210   if (the_iter != instance_ownership_infos_.end()) {
00211     return remove_writer(instance_handle, the_iter->second, pub_id);
00212   }
00213 
00214   return false;
00215 }

Here is the call graph for this function:

void OpenDDS::DCPS::OwnershipManager::remove_writer ( const PublicationId pub_id  ) 

Remove a writer from all instances ownership collection.

Definition at line 119 of file OwnershipManager.cpp.

References instance_lock_, and instance_ownership_infos_.

Referenced by remove_writer().

00120 {
00121   ACE_GUARD(ACE_Thread_Mutex, guard, instance_lock_);
00122 
00123   const InstanceOwnershipWriterInfos::iterator the_end =
00124     instance_ownership_infos_.end();
00125   for (InstanceOwnershipWriterInfos::iterator iter =
00126          instance_ownership_infos_.begin(); iter != the_end; ++iter) {
00127     remove_writer(iter->first, iter->second, pub_id);
00128   }
00129 }

Here is the caller graph for this function:

void OpenDDS::DCPS::OwnershipManager::remove_writers ( const DDS::InstanceHandle_t instance_handle  ) 

Remove all writers that write to the specified instance.

Definition at line 149 of file OwnershipManager.cpp.

References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, instance_lock_, instance_ownership_infos_, and LM_DEBUG.

00150 {
00151   InstanceStateVec instances_to_reset;
00152   {
00153     ACE_GUARD(ACE_Thread_Mutex, guard, instance_lock_);
00154 
00155     if (DCPS_debug_level >= 1) {
00156       ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) OwnershipManager::remove_writers:")
00157                  ACE_TEXT(" disassociate writers with instance %d\n"),
00158                  instance_handle));
00159     }
00160 
00161     InstanceOwnershipWriterInfos::iterator owner_wi =
00162       instance_ownership_infos_.find(instance_handle);
00163     if (owner_wi != instance_ownership_infos_.end()) {
00164       owner_wi->second.owner_ = WriterInfo();
00165       owner_wi->second.candidates_.clear();
00166       const InstanceStateVec::iterator end =
00167         owner_wi->second.instance_states_.end();
00168       for (InstanceStateVec::iterator iter =
00169              owner_wi->second.instance_states_.begin(); iter != end; ++iter) {
00170         // call after lock released, will call back to data reader
00171         instances_to_reset.push_back(*iter);
00172       }
00173       owner_wi->second.instance_states_.clear();
00174 
00175       instance_ownership_infos_.erase(owner_wi);
00176     }
00177   }
00178   // Lock released
00179   for (InstanceStateVec::iterator instance = instances_to_reset.begin();
00180        instance != instances_to_reset.end(); ++instance) {
00181     (*instance)->reset_ownership(instance_handle);
00182   }
00183 }

Here is the call graph for this function:

bool OpenDDS::DCPS::OwnershipManager::select_owner ( const DDS::InstanceHandle_t instance_handle,
const PublicationId pub_id,
const CORBA::Long ownership_strength,
InstanceState instance_state 
)

Determine if the provided publication can be the owner.

Definition at line 284 of file OwnershipManager.cpp.

References broadcast_new_owner(), OpenDDS::DCPS::OwnershipManager::OwnershipWriterInfos::candidates_, OpenDDS::DCPS::Util::DescendingOwnershipStrengthSort(), OpenDDS::DCPS::GUID_UNKNOWN, instance_lock_, instance_ownership_infos_, OpenDDS::DCPS::OwnershipManager::OwnershipWriterInfos::instance_states_, OpenDDS::DCPS::OwnershipManager::OwnershipWriterInfos::owner_, OpenDDS::DCPS::OwnershipManager::WriterInfo::ownership_strength_, OpenDDS::DCPS::OwnershipManager::WriterInfo::pub_id_, OpenDDS::DCPS::InstanceState::registered(), and remove_owner().

00288 {
00289   ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, instance_lock_, false);
00290 
00291   InstanceOwnershipWriterInfos::iterator iter =
00292     instance_ownership_infos_.find(instance_handle);
00293   if (iter != instance_ownership_infos_.end()) {
00294     OwnershipWriterInfos& infos = iter->second;
00295     if (!instance_state->registered()) {
00296       infos.instance_states_.push_back(instance_state);
00297       instance_state->registered(true);
00298     }
00299 
00300     // No owner at some point.
00301     if (infos.owner_.pub_id_ == GUID_UNKNOWN) {
00302       infos.owner_ = WriterInfo(pub_id, ownership_strength);
00303       broadcast_new_owner(instance_handle, infos, pub_id);
00304       return true;
00305 
00306     } else if (infos.owner_.pub_id_ == pub_id) { // is current owner
00307       //still owner but strength changed to be bigger..
00308       if (infos.owner_.ownership_strength_ <= ownership_strength) {
00309         infos.owner_.ownership_strength_ = ownership_strength;
00310         return true;
00311 
00312       } else { //update strength and reevaluate owner which broadcast new owner.
00313         infos.candidates_.push_back(WriterInfo(pub_id, ownership_strength));
00314         remove_owner(instance_handle, infos, true);
00315         return infos.owner_.pub_id_ == pub_id;
00316       }
00317 
00318     } else { // not current owner, reevaluate the owner
00319       bool replace_owner = false;
00320       // Add current owner to candidate list for owner reevaluation
00321       // if provided pub has strength greater than current owner.
00322       if (ownership_strength > infos.owner_.ownership_strength_) {
00323         infos.candidates_.push_back(infos.owner_);
00324         replace_owner = true;
00325       }
00326 
00327       bool found = false;
00328       bool sort = true;
00329 
00330       // check if it already existed in candidate list. If not,
00331       // add it to the candidate list, otherwise update strength
00332       // if strength was changed.
00333       const WriterInfos::iterator the_end = infos.candidates_.end();
00334 
00335       for (WriterInfos::iterator iter = infos.candidates_.begin();
00336            iter != the_end; ++iter) {
00337 
00338         if (iter->pub_id_ == pub_id) {
00339           if (iter->ownership_strength_ != ownership_strength) {
00340             iter->ownership_strength_ = ownership_strength;
00341           } else {
00342             sort = false;
00343           }
00344           found = true;
00345           break;
00346         }
00347       }
00348 
00349       if (!found) {
00350         infos.candidates_.push_back(WriterInfo(pub_id, ownership_strength));
00351       }
00352 
00353       if (sort) {
00354         std::sort(infos.candidates_.begin(), infos.candidates_.end(),
00355                   Util::DescendingOwnershipStrengthSort);
00356       }
00357 
00358       if (replace_owner) {
00359         // Owner was already moved to candidate list and the list was sorted
00360         // already so pick owner from sorted list and replace current
00361         // owner.
00362         remove_owner(instance_handle, infos, false);
00363       }
00364 
00365       return infos.owner_.pub_id_ == pub_id;
00366     }
00367 
00368   } else {
00369     // first writer of the instance so it's owner.
00370     OwnershipWriterInfos& infos = instance_ownership_infos_[instance_handle];
00371     infos.owner_ = WriterInfo(pub_id, ownership_strength);
00372     if (!instance_state->registered()) {
00373       infos.instance_states_.push_back(instance_state);
00374       instance_state->registered(true);
00375     }
00376     broadcast_new_owner(instance_handle, infos, infos.owner_.pub_id_);
00377     return true;
00378   }
00379 
00380   return false;
00381 }

Here is the call graph for this function:

void OpenDDS::DCPS::OwnershipManager::set_instance_map ( const char *  type_name,
const RcHandle< RcObject > &  instance_map,
DataReaderImpl reader 
)

The instance map per type is created by the concrete datareader when first sample with the type is received.

Definition at line 77 of file OwnershipManager.cpp.

References ACE_TEXT(), OpenDDS::DCPS::bind(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::RcHandle< T >::in(), LM_DEBUG, LM_ERROR, and type_instance_map_.

00080 {
00081   if (DCPS_debug_level >= 1) {
00082     ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) OwnershipManager::set_instance_map ")
00083                ACE_TEXT("instance map %X is created by reader %X \n"),
00084                instance_map.in(), reader));
00085   }
00086 
00087   if (0 != OpenDDS::DCPS::bind(type_instance_map_, type_name,
00088                                InstanceMap(instance_map, reader))) {
00089     ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: OwnershipManager::set_instance_map "
00090                "failed to bind instance for type \"%C\"\n", type_name));
00091   }
00092 }

Here is the call graph for this function:

void OpenDDS::DCPS::OwnershipManager::unregister_reader ( const char *  type_name,
DataReaderImpl reader 
)

The readers that access the instance map are keep tracked as ref counting to the instance map. The readers need unregister itself with the instance map upon unregistering instance.The instance map is deleted upon the last reader unregistering an instance of the type.

Definition at line 95 of file OwnershipManager.cpp.

References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::find(), OpenDDS::DCPS::RcHandle< T >::in(), instance_lock_, LM_DEBUG, OpenDDS::DCPS::OwnershipManager::InstanceMap::map_, OpenDDS::DCPS::OwnershipManager::InstanceMap::readers_, type_instance_map_, and OpenDDS::DCPS::unbind().

00097 {
00098   ACE_GUARD(ACE_Thread_Mutex, guard, instance_lock_);
00099 
00100   InstanceMap* instance = 0;
00101   if (0 != find(type_instance_map_, type_name, instance)) {
00102     return;
00103   }
00104 
00105   instance->readers_.erase(reader);
00106 
00107   if (instance->readers_.empty()) {
00108     if (DCPS_debug_level >= 1) {
00109       ACE_DEBUG((LM_DEBUG,
00110                  ACE_TEXT("(%P|%t) OwnershipManager::unregister_reader ")
00111                  ACE_TEXT(" instance map %@ is deleted by reader %@\n"),
00112                  instance->map_.in(), reader));
00113     }
00114     unbind(type_instance_map_, type_name);
00115   }
00116 }

Here is the call graph for this function:

void OpenDDS::DCPS::OwnershipManager::update_ownership_strength ( const PublicationId pub_id,
const CORBA::Long ownership_strength 
)

Update the ownership strength of a publication.


Member Data Documentation

InstanceOwnershipWriterInfos OpenDDS::DCPS::OwnershipManager::instance_ownership_infos_ [private]

The documentation for this class was generated from the following files:
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1