#include <ReceiveListenerSet.h>
Public Types | |
enum | ConstrainReceiveSet { SET_EXCLUDED, SET_INCLUDED } |
Public Member Functions | |
typedef | OPENDDS_MAP_CMP (RepoId, TransportReceiveListener_wrch, GUID_tKeyLessThan) MapType |
ReceiveListenerSet () | |
ReceiveListenerSet (const ReceiveListenerSet &) | |
ReceiveListenerSet & | operator= (const ReceiveListenerSet &) |
virtual | ~ReceiveListenerSet () |
int | insert (RepoId subscriber_id, const TransportReceiveListener_wrch &listener) |
int | remove (RepoId subscriber_id) |
void | remove_all (const GUIDSeq &to_remove) |
ssize_t | size () const |
void | data_received (const ReceivedDataSample &sample, const RepoIdSet &incl_excl, ConstrainReceiveSet constrain) |
void | data_received (const ReceivedDataSample &sample, const RepoId &readerId) |
MapType & | map () |
Give access to the underlying map for iteration purposes. | |
const MapType & | map () const |
bool | exist (const RepoId &key, bool &last) |
bool | exist (const RepoId &local_id) |
void | get_keys (ReaderIdSeq &ids) |
void | clear () |
Private Types | |
typedef ACE_SYNCH_MUTEX | LockType |
typedef ACE_Guard< LockType > | GuardType |
Private Attributes | |
LockType | lock_ |
This lock will protect the map. | |
MapType | map_ |
Definition at line 27 of file ReceiveListenerSet.h.
typedef ACE_Guard<LockType> OpenDDS::DCPS::ReceiveListenerSet::GuardType [private] |
Definition at line 71 of file ReceiveListenerSet.h.
typedef ACE_SYNCH_MUTEX OpenDDS::DCPS::ReceiveListenerSet::LockType [private] |
Definition at line 70 of file ReceiveListenerSet.h.
Definition at line 31 of file ReceiveListenerSet.h.
00031 { 00032 SET_EXCLUDED, 00033 SET_INCLUDED 00034 };
ACE_INLINE OpenDDS::DCPS::ReceiveListenerSet::ReceiveListenerSet | ( | ) |
Definition at line 20 of file ReceiveListenerSet.inl.
References DBG_ENTRY_LVL.
00021 { 00022 DBG_ENTRY_LVL("ReceiveListenerSet", "ReceiveListenerSet", 6); 00023 }
ACE_INLINE OpenDDS::DCPS::ReceiveListenerSet::ReceiveListenerSet | ( | const ReceiveListenerSet & | rhs | ) |
Definition at line 26 of file ReceiveListenerSet.inl.
References DBG_ENTRY_LVL.
00027 : RcObject() 00028 , lock_() 00029 , map_(rhs.map_) 00030 { 00031 DBG_ENTRY_LVL("ReceiveListenerSet", "ReceiveListenerSet(rhs)", 6); 00032 }
OpenDDS::DCPS::ReceiveListenerSet::~ReceiveListenerSet | ( | ) | [virtual] |
Definition at line 23 of file ReceiveListenerSet.cpp.
References DBG_ENTRY_LVL.
00024 { 00025 DBG_ENTRY_LVL("ReceiveListenerSet","~ReceiveListenerSet",6); 00026 }
void OpenDDS::DCPS::ReceiveListenerSet::clear | ( | void | ) |
void OpenDDS::DCPS::ReceiveListenerSet::data_received | ( | const ReceivedDataSample & | sample, | |
const RepoId & | readerId | |||
) |
Definition at line 126 of file ReceiveListenerSet.cpp.
References DBG_ENTRY_LVL, h, OpenDDS::DCPS::WeakRcHandle< T >::lock(), lock_, and map_.
00128 { 00129 DBG_ENTRY_LVL("ReceiveListenerSet", "data_received(sample, readerId)", 6); 00130 TransportReceiveListener_wrch h; 00131 { 00132 GuardType guard(this->lock_); 00133 MapType::iterator itr = map_.find(readerId); 00134 if (itr != map_.end() && itr->second) { 00135 h = itr->second; 00136 } 00137 } 00138 TransportReceiveListener_rch listener = h.lock(); 00139 if (listener) 00140 listener->data_received(sample); 00141 }
void OpenDDS::DCPS::ReceiveListenerSet::data_received | ( | const ReceivedDataSample & | sample, | |
const RepoIdSet & | incl_excl, | |||
ConstrainReceiveSet | constrain | |||
) |
Definition at line 87 of file ReceiveListenerSet.cpp.
References DBG_ENTRY_LVL, LM_ERROR, lock_, map_, OpenDDS::DCPS::OPENDDS_VECTOR(), OpenDDS::DCPS::ReceivedDataSample::sample_, SET_EXCLUDED, and SET_INCLUDED.
00090 { 00091 DBG_ENTRY_LVL("ReceiveListenerSet", "data_received", 6); 00092 OPENDDS_VECTOR(TransportReceiveListener_wrch) handles; 00093 { 00094 GuardType guard(this->lock_); 00095 for (MapType::iterator itr = map_.begin(); itr != map_.end(); ++itr) { 00096 if (constrain == ReceiveListenerSet::SET_EXCLUDED) { 00097 if (itr->second && incl_excl.count(itr->first) == 0) { 00098 handles.push_back(itr->second); 00099 } 00100 } else if (constrain == ReceiveListenerSet::SET_INCLUDED) { //SET_INCLUDED 00101 if (itr->second && incl_excl.count(itr->first) != 0) { 00102 handles.push_back(itr->second); 00103 } 00104 } else { 00105 ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: ReceiveListenerSet::data_received - NOTHING\n")); 00106 } 00107 } 00108 } 00109 00110 for (size_t i = 0; i < handles.size(); ++i) { 00111 TransportReceiveListener_rch listener = handles[i].lock(); 00112 if (!listener) 00113 continue; 00114 if (i < handles.size() - 1 && sample.sample_) { 00115 // demarshal (in data_received()) updates the rd_ptr() of any of 00116 // the message blocks in the chain, so give it a duplicated chain. 00117 ReceivedDataSample rds(sample); 00118 listener->data_received(rds); 00119 } else { 00120 listener->data_received(sample); 00121 } 00122 } 00123 }
bool OpenDDS::DCPS::ReceiveListenerSet::exist | ( | const RepoId & | local_id | ) |
bool OpenDDS::DCPS::ReceiveListenerSet::exist | ( | const RepoId & | key, | |
bool & | last | |||
) |
Check if the key is in the map and if it's the only left entry in the map.
Definition at line 29 of file ReceiveListenerSet.cpp.
References ACE_TEXT(), OpenDDS::DCPS::find(), LM_ERROR, lock_, map_, and OPENDDS_STRING.
00030 { 00031 GuardType guard(this->lock_); 00032 00033 last = true; 00034 00035 TransportReceiveListener_wrch listener; 00036 00037 if (find(map_, local_id, listener) == -1) { 00038 GuidConverter converter(local_id); 00039 ACE_ERROR((LM_ERROR, 00040 ACE_TEXT("(%P|%t) ReceiveListenerSet::exist: ") 00041 ACE_TEXT("could not find local %C.\n"), 00042 OPENDDS_STRING(converter).c_str())); 00043 00044 return false; 00045 } 00046 00047 if (!listener) { 00048 GuidConverter converter(local_id); 00049 ACE_ERROR((LM_ERROR, 00050 ACE_TEXT("(%P|%t) ReceiveListenerSet::exist: ") 00051 ACE_TEXT("listener for local %C is nil.\n"), 00052 OPENDDS_STRING(converter).c_str())); 00053 00054 return false; 00055 } 00056 00057 last = map_.size() == 1; 00058 return true; 00059 }
void OpenDDS::DCPS::ReceiveListenerSet::get_keys | ( | ReaderIdSeq & | ids | ) |
Definition at line 62 of file ReceiveListenerSet.cpp.
References lock_, map_, and OpenDDS::DCPS::push_back().
00063 { 00064 GuardType guard(this->lock_); 00065 00066 for (MapType::iterator iter = map_.begin(); 00067 iter != map_.end(); ++ iter) { 00068 push_back(ids, iter->first); 00069 } 00070 }
ACE_INLINE int OpenDDS::DCPS::ReceiveListenerSet::insert | ( | RepoId | subscriber_id, | |
const TransportReceiveListener_wrch & | listener | |||
) |
Definition at line 44 of file ReceiveListenerSet.inl.
References DBG_ENTRY_LVL, lock_, and map_.
00046 { 00047 DBG_ENTRY_LVL("ReceiveListenerSet", "insert", 6); 00048 GuardType guard(this->lock_); 00049 00050 std::pair<MapType::iterator,bool> r = map_.insert(std::make_pair(subscriber_id,listener)); 00051 if (!r.second) { 00052 // subscriber_id is already in the map 00053 if (!r.first->second) { 00054 // subscriber_id is in the map with a null listener, update it. 00055 r.first->second = listener; 00056 } 00057 return 1; // 1 ==> key already existed in map 00058 } 00059 return 0; 00060 }
ACE_INLINE const ReceiveListenerSet::MapType & OpenDDS::DCPS::ReceiveListenerSet::map | ( | void | ) | const |
Definition at line 105 of file ReceiveListenerSet.inl.
References map_.
00106 { 00107 return this->map_; 00108 }
ACE_INLINE ReceiveListenerSet::MapType & OpenDDS::DCPS::ReceiveListenerSet::map | ( | void | ) |
Give access to the underlying map for iteration purposes.
Definition at line 99 of file ReceiveListenerSet.inl.
References map_.
00100 { 00101 return this->map_; 00102 }
typedef OpenDDS::DCPS::ReceiveListenerSet::OPENDDS_MAP_CMP | ( | RepoId | , | |
TransportReceiveListener_wrch | , | |||
GUID_tKeyLessThan | ||||
) |
ACE_INLINE ReceiveListenerSet & OpenDDS::DCPS::ReceiveListenerSet::operator= | ( | const ReceiveListenerSet & | rhs | ) |
Reimplemented from OpenDDS::DCPS::RcObject.
Definition at line 35 of file ReceiveListenerSet.inl.
References DBG_ENTRY_LVL, and map_.
00036 { 00037 DBG_ENTRY_LVL("ReceiveListenerSet", "operator=", 6); 00038 map_ = rhs.map_; 00039 return *this; 00040 }
ACE_INLINE int OpenDDS::DCPS::ReceiveListenerSet::remove | ( | RepoId | subscriber_id | ) |
Definition at line 64 of file ReceiveListenerSet.inl.
References DBG_ENTRY_LVL, LM_DEBUG, lock_, map_, and OpenDDS::DCPS::unbind().
00065 { 00066 DBG_ENTRY_LVL("ReceiveListenerSet", "remove", 6); 00067 GuardType guard(this->lock_); 00068 00069 if (unbind(map_, subscriber_id) != 0) { 00070 ACE_ERROR_RETURN((LM_DEBUG, 00071 "(%P|%t) subscriber_id (%C) not found in map_.\n", 00072 LogGuid(subscriber_id).c_str()), 00073 -1); 00074 } 00075 00076 return 0; 00077 }
ACE_INLINE void OpenDDS::DCPS::ReceiveListenerSet::remove_all | ( | const GUIDSeq & | to_remove | ) |
Definition at line 80 of file ReceiveListenerSet.inl.
References DBG_ENTRY_LVL, len, lock_, map_, and OpenDDS::DCPS::unbind().
00081 { 00082 DBG_ENTRY_LVL("ReceiveListenerSet", "remove_all", 6); 00083 GuardType guard(this->lock_); 00084 const CORBA::ULong len = to_remove.length(); 00085 for (CORBA::ULong i(0); i < len; ++i) { 00086 unbind(map_, to_remove[i]); 00087 } 00088 }
ACE_INLINE ssize_t OpenDDS::DCPS::ReceiveListenerSet::size | ( | void | ) | const |
Definition at line 91 of file ReceiveListenerSet.inl.
References DBG_ENTRY_LVL, lock_, and map_.
00092 { 00093 DBG_ENTRY_LVL("ReceiveListenerSet", "size", 6); 00094 GuardType guard(this->lock_); 00095 return map_.size(); 00096 }
LockType OpenDDS::DCPS::ReceiveListenerSet::lock_ [mutable, private] |
This lock will protect the map.
Definition at line 74 of file ReceiveListenerSet.h.
Referenced by clear(), data_received(), exist(), get_keys(), insert(), remove(), remove_all(), and size().
MapType OpenDDS::DCPS::ReceiveListenerSet::map_ [private] |
Definition at line 76 of file ReceiveListenerSet.h.
Referenced by clear(), data_received(), exist(), get_keys(), insert(), map(), operator=(), remove(), remove_all(), and size().