OpenDDS  Snapshot(2023/04/07-19:43)
Public Types | Public Member Functions | Private Types | Private Attributes | List of all members
OpenDDS::DCPS::ReceiveListenerSet Class Reference

#include <ReceiveListenerSet.h>

Inheritance diagram for OpenDDS::DCPS::ReceiveListenerSet:
Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::ReceiveListenerSet:
Collaboration graph
[legend]

Public Types

enum  ConstrainReceiveSet { SET_EXCLUDED, SET_INCLUDED }
 

Public Member Functions

typedef OPENDDS_MAP_CMP (GUID_t, TransportReceiveListener_wrch, GUID_tKeyLessThan) MapType
 
 ReceiveListenerSet ()
 
 ReceiveListenerSet (const ReceiveListenerSet &)
 
ReceiveListenerSetoperator= (const ReceiveListenerSet &)
 
virtual ~ReceiveListenerSet ()
 
int insert (GUID_t subscriber_id, const TransportReceiveListener_wrch &listener)
 
int remove (GUID_t subscriber_id)
 
void remove_all (const GUIDSeq &to_remove)
 
ssize_t size () const
 
void data_received (const ReceivedDataSample &sample, const RepoIdSet &incl_excl, ConstrainReceiveSet constrain)
 
void data_received (const ReceivedDataSample &sample, const GUID_t &readerId)
 
MapType & map ()
 Give access to the underlying map for iteration purposes. More...
 
const MapType & map () const
 
bool exist (const GUID_t &key, bool &last)
 
bool exist (const GUID_t &local_id)
 
void get_keys (ReaderIdSeq &ids)
 
void clear ()
 
- Public Member Functions inherited from OpenDDS::DCPS::RcObject
virtual ~RcObject ()
 
virtual void _add_ref ()
 
virtual void _remove_ref ()
 
long ref_count () const
 
WeakObject_get_weak_object () const
 

Private Types

typedef ACE_SYNCH_MUTEX LockType
 
typedef ACE_Guard< LockTypeGuardType
 

Private Attributes

LockType lock_
 This lock will protect the map. More...
 
MapType map_
 

Additional Inherited Members

- Protected Member Functions inherited from OpenDDS::DCPS::RcObject
 RcObject ()
 

Detailed Description

Definition at line 27 of file ReceiveListenerSet.h.

Member Typedef Documentation

◆ GuardType

Definition at line 71 of file ReceiveListenerSet.h.

◆ LockType

Definition at line 70 of file ReceiveListenerSet.h.

Member Enumeration Documentation

◆ ConstrainReceiveSet

Enumerator
SET_EXCLUDED 
SET_INCLUDED 

Definition at line 31 of file ReceiveListenerSet.h.

Constructor & Destructor Documentation

◆ ReceiveListenerSet() [1/2]

ACE_INLINE OpenDDS::DCPS::ReceiveListenerSet::ReceiveListenerSet ( )

Definition at line 20 of file ReceiveListenerSet.inl.

References ACE_INLINE, and DBG_ENTRY_LVL.

21 {
22  DBG_ENTRY_LVL("ReceiveListenerSet", "ReceiveListenerSet", 6);
23 }
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68

◆ ReceiveListenerSet() [2/2]

ACE_INLINE OpenDDS::DCPS::ReceiveListenerSet::ReceiveListenerSet ( const ReceiveListenerSet rhs)

Definition at line 26 of file ReceiveListenerSet.inl.

References ACE_INLINE, and DBG_ENTRY_LVL.

27  : RcObject()
28  , lock_()
29  , map_()
30 {
31  DBG_ENTRY_LVL("ReceiveListenerSet", "ReceiveListenerSet(rhs)", 6);
32  *this = rhs;
33 }
LockType lock_
This lock will protect the map.
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68

◆ ~ReceiveListenerSet()

OpenDDS::DCPS::ReceiveListenerSet::~ReceiveListenerSet ( )
virtual

Definition at line 23 of file ReceiveListenerSet.cpp.

References DBG_ENTRY_LVL.

24 {
25  DBG_ENTRY_LVL("ReceiveListenerSet","~ReceiveListenerSet",6);
26 }
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68

Member Function Documentation

◆ clear()

void OpenDDS::DCPS::ReceiveListenerSet::clear ( void  )

Definition at line 80 of file ReceiveListenerSet.cpp.

References lock_, and map_.

81 {
82  GuardType guard(lock_);
83  map_.clear();
84 }
LockType lock_
This lock will protect the map.

◆ data_received() [1/2]

void OpenDDS::DCPS::ReceiveListenerSet::data_received ( const ReceivedDataSample sample,
const RepoIdSet incl_excl,
ConstrainReceiveSet  constrain 
)

Definition at line 87 of file ReceiveListenerSet.cpp.

References ACE_ERROR, OpenDDS::DCPS::TransportReceiveListener::data_received(), DBG_ENTRY_LVL, OpenDDS::DCPS::ReceivedDataSample::has_data(), LM_ERROR, lock_, map_, OpenDDS::DCPS::OPENDDS_VECTOR(), SET_EXCLUDED, and SET_INCLUDED.

90 {
91  DBG_ENTRY_LVL("ReceiveListenerSet", "data_received", 6);
93  {
94  GuardType guard(lock_);
95  handles.reserve(map_.size());
96  for (MapType::iterator itr = map_.begin(); itr != map_.end(); ++itr) {
97  if (constrain == ReceiveListenerSet::SET_EXCLUDED) {
98  if (itr->second && incl_excl.count(itr->first) == 0) {
99  handles.push_back(itr->second);
100  }
101  } else if (constrain == ReceiveListenerSet::SET_INCLUDED) { //SET_INCLUDED
102  if (itr->second && incl_excl.count(itr->first) != 0) {
103  handles.push_back(itr->second);
104  }
105  } else {
106  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: ReceiveListenerSet::data_received - NOTHING\n"));
107  }
108  }
109  }
110 
111  for (size_t i = 0; i < handles.size(); ++i) {
112  TransportReceiveListener_rch listener = handles[i].lock();
113  if (!listener)
114  continue;
115  if (i < handles.size() - 1 && sample.has_data()) {
116  // demarshal (in data_received()) updates the rd_ptr() of any of
117  // the message blocks in the chain, so give it a duplicated chain.
118  ReceivedDataSample rds(sample);
119  listener->data_received(rds);
120  } else {
121  listener->data_received(sample);
122  }
123  }
124 }
#define ACE_ERROR(X)
typedef OPENDDS_VECTOR(ActionConnectionRecord) ConnectionRecords
LockType lock_
This lock will protect the map.
RcHandle< TransportReceiveListener > TransportReceiveListener_rch
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
WeakRcHandle< TransportReceiveListener > TransportReceiveListener_wrch

◆ data_received() [2/2]

void OpenDDS::DCPS::ReceiveListenerSet::data_received ( const ReceivedDataSample sample,
const GUID_t readerId 
)

Definition at line 127 of file ReceiveListenerSet.cpp.

References OpenDDS::DCPS::TransportReceiveListener::data_received(), DBG_ENTRY_LVL, OpenDDS::DCPS::WeakRcHandle< T >::lock(), lock_, map_, and OPENDDS_END_VERSIONED_NAMESPACE_DECL.

129 {
130  DBG_ENTRY_LVL("ReceiveListenerSet", "data_received(sample, readerId)", 6);
132  {
133  GuardType guard(lock_);
134  MapType::iterator itr = map_.find(readerId);
135  if (itr != map_.end() && itr->second) {
136  h = itr->second;
137  }
138  }
139  TransportReceiveListener_rch listener = h.lock();
140  if (listener)
141  listener->data_received(sample);
142 }
virtual void data_received(const ReceivedDataSample &sample)=0
LockType lock_
This lock will protect the map.
RcHandle< TransportReceiveListener > TransportReceiveListener_rch
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
WeakRcHandle< TransportReceiveListener > TransportReceiveListener_wrch

◆ exist() [1/2]

bool OpenDDS::DCPS::ReceiveListenerSet::exist ( const GUID_t key,
bool &  last 
)

Check if the key is in the map and if it's the only left entry in the map.

Definition at line 29 of file ReceiveListenerSet.cpp.

References ACE_ERROR, ACE_TEXT(), OpenDDS::DCPS::LogGuid::c_str(), OpenDDS::DCPS::find(), LM_ERROR, lock_, and map_.

30 {
31  GuardType guard(lock_);
32 
33  last = true;
34 
36 
37  if (find(map_, local_id, listener) == -1) {
38  LogGuid logger(local_id);
39  ACE_ERROR((LM_ERROR,
40  ACE_TEXT("(%P|%t) ReceiveListenerSet::exist: ")
41  ACE_TEXT("could not find local %C.\n"),
42  logger.c_str()));
43 
44  return false;
45  }
46 
47  if (!listener) {
48  LogGuid logger(local_id);
49  ACE_ERROR((LM_ERROR,
50  ACE_TEXT("(%P|%t) ReceiveListenerSet::exist: ")
51  ACE_TEXT("listener for local %C is nil.\n"),
52  logger.c_str()));
53 
54  return false;
55  }
56 
57  last = map_.size() == 1;
58  return true;
59 }
#define ACE_ERROR(X)
LockType lock_
This lock will protect the map.
ACE_TEXT("TCP_Factory")
int find(Container &c, const Key &key, typename Container::mapped_type *&value)
Definition: Util.h:71
WeakRcHandle< TransportReceiveListener > TransportReceiveListener_wrch

◆ exist() [2/2]

bool OpenDDS::DCPS::ReceiveListenerSet::exist ( const GUID_t local_id)

Definition at line 73 of file ReceiveListenerSet.cpp.

References lock_, and map_.

74 {
75  GuardType guard(lock_);
76  return map_.count(local_id) > 0;
77 }
LockType lock_
This lock will protect the map.

◆ get_keys()

void OpenDDS::DCPS::ReceiveListenerSet::get_keys ( ReaderIdSeq ids)

Definition at line 62 of file ReceiveListenerSet.cpp.

References lock_, map_, and OpenDDS::DCPS::push_back().

63 {
64  GuardType guard(lock_);
65 
66  for (MapType::iterator iter = map_.begin();
67  iter != map_.end(); ++ iter) {
68  push_back(ids, iter->first);
69  }
70 }
LockType lock_
This lock will protect the map.
void push_back(Seq &seq, const typename Seq::value_type &val)
std::vector-style push_back() for CORBA Sequences
Definition: Util.h:138

◆ insert()

ACE_INLINE int OpenDDS::DCPS::ReceiveListenerSet::insert ( GUID_t  subscriber_id,
const TransportReceiveListener_wrch listener 
)

Definition at line 48 of file ReceiveListenerSet.inl.

References ACE_INLINE, DBG_ENTRY_LVL, lock_, and map_.

50 {
51  DBG_ENTRY_LVL("ReceiveListenerSet", "insert", 6);
52  GuardType guard(lock_);
53 
54  std::pair<MapType::iterator,bool> r = map_.insert(std::make_pair(subscriber_id,listener));
55  if (!r.second) {
56  // subscriber_id is already in the map
57  if (!r.first->second) {
58  // subscriber_id is in the map with a null listener, update it.
59  r.first->second = listener;
60  }
61  return 1; // 1 ==> key already existed in map
62  }
63  return 0;
64 }
LockType lock_
This lock will protect the map.
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68

◆ map() [1/2]

ACE_INLINE ReceiveListenerSet::MapType & OpenDDS::DCPS::ReceiveListenerSet::map ( void  )

Give access to the underlying map for iteration purposes.

Definition at line 102 of file ReceiveListenerSet.inl.

References ACE_INLINE, and map_.

103 {
104  return map_;
105 }

◆ map() [2/2]

ACE_INLINE const ReceiveListenerSet::MapType & OpenDDS::DCPS::ReceiveListenerSet::map ( void  ) const

Definition at line 108 of file ReceiveListenerSet.inl.

References map_, and OPENDDS_END_VERSIONED_NAMESPACE_DECL.

109 {
110  return map_;
111 }

◆ OPENDDS_MAP_CMP()

typedef OpenDDS::DCPS::ReceiveListenerSet::OPENDDS_MAP_CMP ( GUID_t  ,
TransportReceiveListener_wrch  ,
GUID_tKeyLessThan   
)

◆ operator=()

ACE_INLINE ReceiveListenerSet & OpenDDS::DCPS::ReceiveListenerSet::operator= ( const ReceiveListenerSet rhs)

Definition at line 36 of file ReceiveListenerSet.inl.

References ACE_INLINE, DBG_ENTRY_LVL, lock_, and map_.

37 {
38  DBG_ENTRY_LVL("ReceiveListenerSet", "operator=", 6);
39  if (&rhs != this) {
40  GuardType guard_lt(&lock_ < &rhs.lock_ ? lock_ : rhs.lock_);
41  GuardType guard_gt(&lock_ < &rhs.lock_ ? rhs.lock_ : lock_);
42  map_ = rhs.map_;
43  }
44  return *this;
45 }
LockType lock_
This lock will protect the map.
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68

◆ remove()

ACE_INLINE int OpenDDS::DCPS::ReceiveListenerSet::remove ( GUID_t  subscriber_id)

Definition at line 67 of file ReceiveListenerSet.inl.

References ACE_ERROR_RETURN, ACE_INLINE, DBG_ENTRY_LVL, LM_ERROR, lock_, map_, and OpenDDS::DCPS::unbind().

68 {
69  DBG_ENTRY_LVL("ReceiveListenerSet", "remove", 6);
70  GuardType guard(lock_);
71 
72  if (unbind(map_, subscriber_id) != 0) {
73  ACE_ERROR_RETURN((LM_ERROR,
74  "(%P|%t) ERROR: subscriber_id (%C) not found in map_.\n",
75  LogGuid(subscriber_id).c_str()),
76  -1);
77  }
78 
79  return 0;
80 }
LockType lock_
This lock will protect the map.
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
#define ACE_ERROR_RETURN(X, Y)
int unbind(Container &c, const typename Container::key_type &k, typename Container::mapped_type &v)
Definition: Util.h:40

◆ remove_all()

ACE_INLINE void OpenDDS::DCPS::ReceiveListenerSet::remove_all ( const GUIDSeq to_remove)

Definition at line 83 of file ReceiveListenerSet.inl.

References ACE_INLINE, DBG_ENTRY_LVL, lock_, map_, and OpenDDS::DCPS::unbind().

Referenced by OpenDDS::DCPS::DataLink::data_received_i().

84 {
85  DBG_ENTRY_LVL("ReceiveListenerSet", "remove_all", 6);
86  GuardType guard(lock_);
87  const CORBA::ULong len = to_remove.length();
88  for (CORBA::ULong i(0); i < len; ++i) {
89  unbind(map_, to_remove[i]);
90  }
91 }
LockType lock_
This lock will protect the map.
ACE_CDR::ULong ULong
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
int unbind(Container &c, const typename Container::key_type &k, typename Container::mapped_type &v)
Definition: Util.h:40

◆ size()

ACE_INLINE ssize_t OpenDDS::DCPS::ReceiveListenerSet::size ( void  ) const

Definition at line 94 of file ReceiveListenerSet.inl.

References ACE_INLINE, DBG_ENTRY_LVL, lock_, and map_.

95 {
96  DBG_ENTRY_LVL("ReceiveListenerSet", "size", 6);
97  GuardType guard(lock_);
98  return map_.size();
99 }
LockType lock_
This lock will protect the map.
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68

Member Data Documentation

◆ lock_

LockType OpenDDS::DCPS::ReceiveListenerSet::lock_
mutableprivate

This lock will protect the map.

Definition at line 74 of file ReceiveListenerSet.h.

Referenced by clear(), data_received(), exist(), get_keys(), insert(), operator=(), remove(), remove_all(), and size().

◆ map_

MapType OpenDDS::DCPS::ReceiveListenerSet::map_
private

The documentation for this class was generated from the following files: