00001
00002
00003
00004
00005
00006
00007
00008 #include "DCPS/DdsDcps_pch.h"
00009
00010 #include "dds/DCPS/GuidConverter.h"
00011
00012 #include "ReceiveListenerSet.h"
00013
00014 #if !defined (__ACE_INLINE__)
00015 #include "ReceiveListenerSet.inl"
00016 #endif
00017
00018 namespace OpenDDS {
00019 namespace DCPS {
00020
00021 namespace {
00022
00023 struct ReceiveListenerHandle {
00024 explicit ReceiveListenerHandle(TransportReceiveListener* trl)
00025 : listener_(trl)
00026 {
00027 if (listener_) listener_->listener_add_ref();
00028 }
00029
00030 ReceiveListenerHandle(const ReceiveListenerHandle& rhs)
00031 : listener_(rhs.listener_)
00032 {
00033 if (listener_) listener_->listener_add_ref();
00034 }
00035
00036 ReceiveListenerHandle& operator=(const ReceiveListenerHandle& rhs);
00037
00038 ~ReceiveListenerHandle()
00039 {
00040 if (listener_) listener_->listener_remove_ref();
00041 }
00042
00043 private:
00044 typedef void (ReceiveListenerHandle::*bool_t)() const;
00045 void no_implicit_conversion() const {}
00046
00047 public:
00048 operator bool_t() const
00049 {
00050 return listener_ ? &ReceiveListenerHandle::no_implicit_conversion : 0;
00051 }
00052
00053 TransportReceiveListener* operator->() const
00054 {
00055 return listener_;
00056 }
00057
00058 TransportReceiveListener* listener_;
00059 };
00060
00061 void swap(ReceiveListenerHandle& lhs, ReceiveListenerHandle& rhs)
00062 {
00063 std::swap(lhs.listener_, rhs.listener_);
00064 }
00065
00066 ReceiveListenerHandle&
00067 ReceiveListenerHandle::operator=(const ReceiveListenerHandle& rhs)
00068 {
00069 ReceiveListenerHandle cpy(rhs);
00070 swap(*this, cpy);
00071 return *this;
00072 }
00073 }
00074
00075 ReceiveListenerSet::~ReceiveListenerSet()
00076 {
00077 DBG_ENTRY_LVL("ReceiveListenerSet","~ReceiveListenerSet",6);
00078 }
00079
00080 bool
00081 ReceiveListenerSet::exist(const RepoId& local_id, bool& last)
00082 {
00083 GuardType guard(this->lock_);
00084
00085 last = true;
00086
00087 TransportReceiveListener* listener = 0;
00088
00089 if (find(map_, local_id, listener) == -1) {
00090 GuidConverter converter(local_id);
00091 ACE_ERROR((LM_ERROR,
00092 ACE_TEXT("(%P|%t) ReceiveListenerSet::exist: ")
00093 ACE_TEXT("could not find local %C.\n"),
00094 OPENDDS_STRING(converter).c_str()));
00095
00096 return false;
00097 }
00098
00099 if (listener == 0) {
00100 GuidConverter converter(local_id);
00101 ACE_ERROR((LM_ERROR,
00102 ACE_TEXT("(%P|%t) ReceiveListenerSet::exist: ")
00103 ACE_TEXT("listener for local %C is nil.\n"),
00104 OPENDDS_STRING(converter).c_str()));
00105
00106 return false;
00107 }
00108
00109 last = map_.size() == 1;
00110 return true;
00111 }
00112
00113 void
00114 ReceiveListenerSet::get_keys(ReaderIdSeq & ids)
00115 {
00116 GuardType guard(this->lock_);
00117
00118 for (MapType::iterator iter = map_.begin();
00119 iter != map_.end(); ++ iter) {
00120 push_back(ids, iter->first);
00121 }
00122 }
00123
00124 bool
00125 ReceiveListenerSet::exist(const RepoId& local_id)
00126 {
00127 GuardType guard(this->lock_);
00128
00129 TransportReceiveListener* listener = 0;
00130 return (find(map_, local_id, listener) == -1 ? false : true);
00131 }
00132
00133 void
00134 ReceiveListenerSet::clear()
00135 {
00136 GuardType guard(this->lock_);
00137 this->map_.clear();
00138 }
00139
00140 void
00141 ReceiveListenerSet::data_received(const ReceivedDataSample& sample,
00142 const RepoIdSet& incl_excl,
00143 ConstrainReceiveSet constrain)
00144 {
00145 DBG_ENTRY_LVL("ReceiveListenerSet", "data_received", 6);
00146 OPENDDS_VECTOR(ReceiveListenerHandle) handles;
00147 {
00148 GuardType guard(this->lock_);
00149 for (MapType::iterator itr = map_.begin(); itr != map_.end(); ++itr) {
00150 if (constrain == ReceiveListenerSet::SET_EXCLUDED) {
00151 if (itr->second && incl_excl.count(itr->first) == 0) {
00152 handles.push_back(ReceiveListenerHandle(itr->second));
00153 }
00154 } else if (constrain == ReceiveListenerSet::SET_INCLUDED) {
00155 if (itr->second && incl_excl.count(itr->first) != 0) {
00156 handles.push_back(ReceiveListenerHandle(itr->second));
00157 }
00158 } else {
00159 ACE_DEBUG((LM_ERROR, "(%P|%t) ERROR: ReceiveListenerSet::data_received - NOTHING\n"));
00160 }
00161 }
00162 }
00163
00164 for (size_t i = 0; i < handles.size(); ++i) {
00165 if (i < handles.size() - 1 && sample.sample_) {
00166
00167
00168 ReceivedDataSample rds(sample);
00169 handles[i]->data_received(rds);
00170 } else {
00171 handles[i]->data_received(sample);
00172 }
00173 }
00174 }
00175
00176 void
00177 ReceiveListenerSet::data_received(const ReceivedDataSample& sample,
00178 const RepoId& readerId)
00179 {
00180 DBG_ENTRY_LVL("ReceiveListenerSet", "data_received(sample, readerId)", 6);
00181 ReceiveListenerHandle h(0);
00182 {
00183 GuardType guard(this->lock_);
00184 MapType::iterator itr = map_.find(readerId);
00185 if (itr != map_.end() && itr->second) {
00186 h = ReceiveListenerHandle(itr->second);
00187 }
00188 }
00189 if (h) h->data_received(sample);
00190 }
00191
00192 }
00193 }