ReceiveListenerSet.cpp
Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008 #include "DCPS/DdsDcps_pch.h"
00009
00010 #include "dds/DCPS/GuidConverter.h"
00011
00012 #include "ReceiveListenerSet.h"
00013
00014 #if !defined (__ACE_INLINE__)
00015 #include "ReceiveListenerSet.inl"
00016 #endif
00017
00018 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00019
00020 namespace OpenDDS {
00021 namespace DCPS {
00022
00023 ReceiveListenerSet::~ReceiveListenerSet()
00024 {
00025 DBG_ENTRY_LVL("ReceiveListenerSet","~ReceiveListenerSet",6);
00026 }
00027
00028 bool
00029 ReceiveListenerSet::exist(const RepoId& local_id, bool& last)
00030 {
00031 GuardType guard(this->lock_);
00032
00033 last = true;
00034
00035 TransportReceiveListener_wrch listener;
00036
00037 if (find(map_, local_id, listener) == -1) {
00038 GuidConverter converter(local_id);
00039 ACE_ERROR((LM_ERROR,
00040 ACE_TEXT("(%P|%t) ReceiveListenerSet::exist: ")
00041 ACE_TEXT("could not find local %C.\n"),
00042 OPENDDS_STRING(converter).c_str()));
00043
00044 return false;
00045 }
00046
00047 if (!listener) {
00048 GuidConverter converter(local_id);
00049 ACE_ERROR((LM_ERROR,
00050 ACE_TEXT("(%P|%t) ReceiveListenerSet::exist: ")
00051 ACE_TEXT("listener for local %C is nil.\n"),
00052 OPENDDS_STRING(converter).c_str()));
00053
00054 return false;
00055 }
00056
00057 last = map_.size() == 1;
00058 return true;
00059 }
00060
00061 void
00062 ReceiveListenerSet::get_keys(ReaderIdSeq & ids)
00063 {
00064 GuardType guard(this->lock_);
00065
00066 for (MapType::iterator iter = map_.begin();
00067 iter != map_.end(); ++ iter) {
00068 push_back(ids, iter->first);
00069 }
00070 }
00071
00072 bool
00073 ReceiveListenerSet::exist(const RepoId& local_id)
00074 {
00075 GuardType guard(this->lock_);
00076 return map_.count(local_id) > 0;
00077 }
00078
00079 void
00080 ReceiveListenerSet::clear()
00081 {
00082 GuardType guard(this->lock_);
00083 this->map_.clear();
00084 }
00085
00086 void
00087 ReceiveListenerSet::data_received(const ReceivedDataSample& sample,
00088 const RepoIdSet& incl_excl,
00089 ConstrainReceiveSet constrain)
00090 {
00091 DBG_ENTRY_LVL("ReceiveListenerSet", "data_received", 6);
00092 OPENDDS_VECTOR(TransportReceiveListener_wrch) handles;
00093 {
00094 GuardType guard(this->lock_);
00095 for (MapType::iterator itr = map_.begin(); itr != map_.end(); ++itr) {
00096 if (constrain == ReceiveListenerSet::SET_EXCLUDED) {
00097 if (itr->second && incl_excl.count(itr->first) == 0) {
00098 handles.push_back(itr->second);
00099 }
00100 } else if (constrain == ReceiveListenerSet::SET_INCLUDED) {
00101 if (itr->second && incl_excl.count(itr->first) != 0) {
00102 handles.push_back(itr->second);
00103 }
00104 } else {
00105 ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: ReceiveListenerSet::data_received - NOTHING\n"));
00106 }
00107 }
00108 }
00109
00110 for (size_t i = 0; i < handles.size(); ++i) {
00111 TransportReceiveListener_rch listener = handles[i].lock();
00112 if (!listener)
00113 continue;
00114 if (i < handles.size() - 1 && sample.sample_) {
00115
00116
00117 ReceivedDataSample rds(sample);
00118 listener->data_received(rds);
00119 } else {
00120 listener->data_received(sample);
00121 }
00122 }
00123 }
00124
00125 void
00126 ReceiveListenerSet::data_received(const ReceivedDataSample& sample,
00127 const RepoId& readerId)
00128 {
00129 DBG_ENTRY_LVL("ReceiveListenerSet", "data_received(sample, readerId)", 6);
00130 TransportReceiveListener_wrch h;
00131 {
00132 GuardType guard(this->lock_);
00133 MapType::iterator itr = map_.find(readerId);
00134 if (itr != map_.end() && itr->second) {
00135 h = itr->second;
00136 }
00137 }
00138 TransportReceiveListener_rch listener = h.lock();
00139 if (listener)
00140 listener->data_received(sample);
00141 }
00142
00143 }
00144 }
00145
00146 OPENDDS_END_VERSIONED_NAMESPACE_DECL