ReceiveListenerSet.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
00009 
00010 #include "dds/DCPS/GuidConverter.h"
00011 
00012 #include "ReceiveListenerSet.h"
00013 
00014 #if !defined (__ACE_INLINE__)
00015 #include "ReceiveListenerSet.inl"
00016 #endif /* __ACE_INLINE__ */
00017 
00018 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00019 
00020 namespace OpenDDS {
00021 namespace DCPS {
00022 
00023 ReceiveListenerSet::~ReceiveListenerSet()
00024 {
00025   DBG_ENTRY_LVL("ReceiveListenerSet","~ReceiveListenerSet",6);
00026 }
00027 
00028 bool
00029 ReceiveListenerSet::exist(const RepoId& local_id, bool& last)
00030 {
00031   GuardType guard(this->lock_);
00032 
00033   last = true;
00034 
00035   TransportReceiveListener_wrch listener;
00036 
00037   if (find(map_, local_id, listener) == -1) {
00038     GuidConverter converter(local_id);
00039     ACE_ERROR((LM_ERROR,
00040                ACE_TEXT("(%P|%t) ReceiveListenerSet::exist: ")
00041                ACE_TEXT("could not find local %C.\n"),
00042                OPENDDS_STRING(converter).c_str()));
00043 
00044     return false;
00045   }
00046 
00047   if (!listener) {
00048     GuidConverter converter(local_id);
00049     ACE_ERROR((LM_ERROR,
00050                ACE_TEXT("(%P|%t) ReceiveListenerSet::exist: ")
00051                ACE_TEXT("listener for local %C is nil.\n"),
00052                OPENDDS_STRING(converter).c_str()));
00053 
00054     return false;
00055   }
00056 
00057   last = map_.size() == 1;
00058   return true;
00059 }
00060 
00061 void
00062 ReceiveListenerSet::get_keys(ReaderIdSeq & ids)
00063 {
00064   GuardType guard(this->lock_);
00065 
00066   for (MapType::iterator iter = map_.begin();
00067        iter != map_.end(); ++ iter) {
00068     push_back(ids, iter->first);
00069   }
00070 }
00071 
00072 bool
00073 ReceiveListenerSet::exist(const RepoId& local_id)
00074 {
00075   GuardType guard(this->lock_);
00076   return map_.count(local_id) > 0;
00077 }
00078 
00079 void
00080 ReceiveListenerSet::clear()
00081 {
00082   GuardType guard(this->lock_);
00083   this->map_.clear();
00084 }
00085 
00086 void
00087 ReceiveListenerSet::data_received(const ReceivedDataSample& sample,
00088                                   const RepoIdSet& incl_excl,
00089                                   ConstrainReceiveSet constrain)
00090 {
00091   DBG_ENTRY_LVL("ReceiveListenerSet", "data_received", 6);
00092   OPENDDS_VECTOR(TransportReceiveListener_wrch) handles;
00093   {
00094     GuardType guard(this->lock_);
00095     for (MapType::iterator itr = map_.begin(); itr != map_.end(); ++itr) {
00096       if (constrain == ReceiveListenerSet::SET_EXCLUDED) {
00097         if (itr->second && incl_excl.count(itr->first) == 0) {
00098           handles.push_back(itr->second);
00099         }
00100       } else if (constrain == ReceiveListenerSet::SET_INCLUDED) { //SET_INCLUDED
00101         if (itr->second && incl_excl.count(itr->first) != 0) {
00102           handles.push_back(itr->second);
00103         }
00104       } else {
00105         ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: ReceiveListenerSet::data_received - NOTHING\n"));
00106       }
00107     }
00108   }
00109 
00110   for (size_t i = 0; i < handles.size(); ++i) {
00111     TransportReceiveListener_rch listener = handles[i].lock();
00112     if (!listener)
00113       continue;
00114     if (i < handles.size() - 1 && sample.sample_) {
00115       // demarshal (in data_received()) updates the rd_ptr() of any of
00116       // the message blocks in the chain, so give it a duplicated chain.
00117       ReceivedDataSample rds(sample);
00118       listener->data_received(rds);
00119     } else {
00120       listener->data_received(sample);
00121     }
00122   }
00123 }
00124 
00125 void
00126 ReceiveListenerSet::data_received(const ReceivedDataSample& sample,
00127                                   const RepoId& readerId)
00128 {
00129   DBG_ENTRY_LVL("ReceiveListenerSet", "data_received(sample, readerId)", 6);
00130   TransportReceiveListener_wrch h;
00131   {
00132     GuardType guard(this->lock_);
00133     MapType::iterator itr = map_.find(readerId);
00134     if (itr != map_.end() && itr->second) {
00135       h = itr->second;
00136     }
00137   }
00138   TransportReceiveListener_rch listener = h.lock();
00139   if (listener)
00140     listener->data_received(sample);
00141 }
00142 
00143 }
00144 }
00145 
00146 OPENDDS_END_VERSIONED_NAMESPACE_DECL
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1