OpenDDS  Snapshot(2023/04/28-20:55)
ReceiveListenerSet.cpp
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
9 
10 #include "dds/DCPS/GuidConverter.h"
11 
12 #include "ReceiveListenerSet.h"
13 
14 #if !defined (__ACE_INLINE__)
15 #include "ReceiveListenerSet.inl"
16 #endif /* __ACE_INLINE__ */
17 
19 
20 namespace OpenDDS {
21 namespace DCPS {
22 
24 {
25  DBG_ENTRY_LVL("ReceiveListenerSet","~ReceiveListenerSet",6);
26 }
27 
28 bool
29 ReceiveListenerSet::exist(const GUID_t& local_id, bool& last)
30 {
31  GuardType guard(lock_);
32 
33  last = true;
34 
36 
37  if (find(map_, local_id, listener) == -1) {
38  LogGuid logger(local_id);
40  ACE_TEXT("(%P|%t) ReceiveListenerSet::exist: ")
41  ACE_TEXT("could not find local %C.\n"),
42  logger.c_str()));
43 
44  return false;
45  }
46 
47  if (!listener) {
48  LogGuid logger(local_id);
50  ACE_TEXT("(%P|%t) ReceiveListenerSet::exist: ")
51  ACE_TEXT("listener for local %C is nil.\n"),
52  logger.c_str()));
53 
54  return false;
55  }
56 
57  last = map_.size() == 1;
58  return true;
59 }
60 
61 void
63 {
64  GuardType guard(lock_);
65 
66  for (MapType::iterator iter = map_.begin();
67  iter != map_.end(); ++ iter) {
68  push_back(ids, iter->first);
69  }
70 }
71 
72 bool
74 {
75  GuardType guard(lock_);
76  return map_.count(local_id) > 0;
77 }
78 
79 void
81 {
82  GuardType guard(lock_);
83  map_.clear();
84 }
85 
86 void
88  const RepoIdSet& incl_excl,
89  ConstrainReceiveSet constrain)
90 {
91  DBG_ENTRY_LVL("ReceiveListenerSet", "data_received", 6);
93  {
94  GuardType guard(lock_);
95  handles.reserve(map_.size());
96  for (MapType::iterator itr = map_.begin(); itr != map_.end(); ++itr) {
97  if (constrain == ReceiveListenerSet::SET_EXCLUDED) {
98  if (itr->second && incl_excl.count(itr->first) == 0) {
99  handles.push_back(itr->second);
100  }
101  } else if (constrain == ReceiveListenerSet::SET_INCLUDED) { //SET_INCLUDED
102  if (itr->second && incl_excl.count(itr->first) != 0) {
103  handles.push_back(itr->second);
104  }
105  } else {
106  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: ReceiveListenerSet::data_received - NOTHING\n"));
107  }
108  }
109  }
110 
111  for (size_t i = 0; i < handles.size(); ++i) {
112  TransportReceiveListener_rch listener = handles[i].lock();
113  if (!listener)
114  continue;
115  if (i < handles.size() - 1 && sample.has_data()) {
116  // demarshal (in data_received()) updates the rd_ptr() of any of
117  // the message blocks in the chain, so give it a duplicated chain.
118  ReceivedDataSample rds(sample);
119  listener->data_received(rds);
120  } else {
121  listener->data_received(sample);
122  }
123  }
124 }
125 
126 void
128  const GUID_t& readerId)
129 {
130  DBG_ENTRY_LVL("ReceiveListenerSet", "data_received(sample, readerId)", 6);
132  {
133  GuardType guard(lock_);
134  MapType::iterator itr = map_.find(readerId);
135  if (itr != map_.end() && itr->second) {
136  h = itr->second;
137  }
138  }
139  TransportReceiveListener_rch listener = h.lock();
140  if (listener)
141  listener->data_received(sample);
142 }
143 
144 }
145 }
146 
#define ACE_ERROR(X)
bool has_data() const
true if at least one Data Block is stored (even if it has 0 useable bytes)
bool exist(const GUID_t &key, bool &last)
GuidSet RepoIdSet
Definition: GuidUtils.h:113
const char * c_str() const
sequence< GUID_t > ReaderIdSeq
LockType lock_
This lock will protect the map.
void data_received(const ReceivedDataSample &sample, const RepoIdSet &incl_excl, ConstrainReceiveSet constrain)
Holds a data sample received by the transport.
virtual void data_received(const ReceivedDataSample &sample)=0
ACE_TEXT("TCP_Factory")
void push_back(Seq &seq, const typename Seq::value_type &val)
std::vector-style push_back() for CORBA Sequences
Definition: Util.h:138
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
RcHandle< T > lock() const
Definition: RcObject.h:188
typedef OPENDDS_VECTOR(ActionConnectionRecord) ConnectionRecords
LM_ERROR
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
int find(Container &c, const Key &key, typename Container::mapped_type *&value)
Definition: Util.h:71