Line data Source code
1 : /* 2 : * 3 : * 4 : * Distributed under the OpenDDS License. 5 : * See: http://www.opendds.org/license.html 6 : */ 7 : 8 : #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/ 9 : 10 : #include "dds/DCPS/GuidConverter.h" 11 : 12 : #include "ReceiveListenerSet.h" 13 : 14 : #if !defined (__ACE_INLINE__) 15 : #include "ReceiveListenerSet.inl" 16 : #endif /* __ACE_INLINE__ */ 17 : 18 : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL 19 : 20 : namespace OpenDDS { 21 : namespace DCPS { 22 : 23 0 : ReceiveListenerSet::~ReceiveListenerSet() 24 : { 25 : DBG_ENTRY_LVL("ReceiveListenerSet","~ReceiveListenerSet",6); 26 0 : } 27 : 28 : bool 29 0 : ReceiveListenerSet::exist(const GUID_t& local_id, bool& last) 30 : { 31 0 : GuardType guard(lock_); 32 : 33 0 : last = true; 34 : 35 0 : TransportReceiveListener_wrch listener; 36 : 37 0 : if (find(map_, local_id, listener) == -1) { 38 0 : LogGuid logger(local_id); 39 0 : ACE_ERROR((LM_ERROR, 40 : ACE_TEXT("(%P|%t) ReceiveListenerSet::exist: ") 41 : ACE_TEXT("could not find local %C.\n"), 42 : logger.c_str())); 43 : 44 0 : return false; 45 0 : } 46 : 47 0 : if (!listener) { 48 0 : LogGuid logger(local_id); 49 0 : ACE_ERROR((LM_ERROR, 50 : ACE_TEXT("(%P|%t) ReceiveListenerSet::exist: ") 51 : ACE_TEXT("listener for local %C is nil.\n"), 52 : logger.c_str())); 53 : 54 0 : return false; 55 0 : } 56 : 57 0 : last = map_.size() == 1; 58 0 : return true; 59 0 : } 60 : 61 : void 62 0 : ReceiveListenerSet::get_keys(ReaderIdSeq & ids) 63 : { 64 0 : GuardType guard(lock_); 65 : 66 0 : for (MapType::iterator iter = map_.begin(); 67 0 : iter != map_.end(); ++ iter) { 68 0 : push_back(ids, iter->first); 69 : } 70 0 : } 71 : 72 : bool 73 0 : ReceiveListenerSet::exist(const GUID_t& local_id) 74 : { 75 0 : GuardType guard(lock_); 76 0 : return map_.count(local_id) > 0; 77 0 : } 78 : 79 : void 80 0 : ReceiveListenerSet::clear() 81 : { 82 0 : GuardType guard(lock_); 83 0 : map_.clear(); 84 0 : } 85 : 86 : void 87 0 : ReceiveListenerSet::data_received(const ReceivedDataSample& sample, 88 : const RepoIdSet& incl_excl, 89 : ConstrainReceiveSet constrain) 90 : { 91 : DBG_ENTRY_LVL("ReceiveListenerSet", "data_received", 6); 92 0 : OPENDDS_VECTOR(TransportReceiveListener_wrch) handles; 93 : { 94 0 : GuardType guard(lock_); 95 0 : handles.reserve(map_.size()); 96 0 : for (MapType::iterator itr = map_.begin(); itr != map_.end(); ++itr) { 97 0 : if (constrain == ReceiveListenerSet::SET_EXCLUDED) { 98 0 : if (itr->second && incl_excl.count(itr->first) == 0) { 99 0 : handles.push_back(itr->second); 100 : } 101 0 : } else if (constrain == ReceiveListenerSet::SET_INCLUDED) { //SET_INCLUDED 102 0 : if (itr->second && incl_excl.count(itr->first) != 0) { 103 0 : handles.push_back(itr->second); 104 : } 105 : } else { 106 0 : ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: ReceiveListenerSet::data_received - NOTHING\n")); 107 : } 108 : } 109 0 : } 110 : 111 0 : for (size_t i = 0; i < handles.size(); ++i) { 112 0 : TransportReceiveListener_rch listener = handles[i].lock(); 113 0 : if (!listener) 114 0 : continue; 115 0 : if (i < handles.size() - 1 && sample.has_data()) { 116 : // demarshal (in data_received()) updates the rd_ptr() of any of 117 : // the message blocks in the chain, so give it a duplicated chain. 118 0 : ReceivedDataSample rds(sample); 119 0 : listener->data_received(rds); 120 0 : } else { 121 0 : listener->data_received(sample); 122 : } 123 0 : } 124 0 : } 125 : 126 : void 127 0 : ReceiveListenerSet::data_received(const ReceivedDataSample& sample, 128 : const GUID_t& readerId) 129 : { 130 : DBG_ENTRY_LVL("ReceiveListenerSet", "data_received(sample, readerId)", 6); 131 0 : TransportReceiveListener_wrch h; 132 : { 133 0 : GuardType guard(lock_); 134 0 : MapType::iterator itr = map_.find(readerId); 135 0 : if (itr != map_.end() && itr->second) { 136 0 : h = itr->second; 137 : } 138 0 : } 139 0 : TransportReceiveListener_rch listener = h.lock(); 140 0 : if (listener) 141 0 : listener->data_received(sample); 142 0 : } 143 : 144 : } 145 : } 146 : 147 : OPENDDS_END_VERSIONED_NAMESPACE_DECL