ReceiveListenerSet.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
00009 
00010 #include "dds/DCPS/GuidConverter.h"
00011 
00012 #include "ReceiveListenerSet.h"
00013 
00014 #if !defined (__ACE_INLINE__)
00015 #include "ReceiveListenerSet.inl"
00016 #endif /* __ACE_INLINE__ */
00017 
00018 namespace OpenDDS {
00019 namespace DCPS {
00020 
00021 namespace {
00022 
00023   struct ReceiveListenerHandle {
00024     explicit ReceiveListenerHandle(TransportReceiveListener* trl)
00025       : listener_(trl)
00026     {
00027       if (listener_) listener_->listener_add_ref();
00028     }
00029 
00030     ReceiveListenerHandle(const ReceiveListenerHandle& rhs)
00031       : listener_(rhs.listener_)
00032     {
00033       if (listener_) listener_->listener_add_ref();
00034     }
00035 
00036     ReceiveListenerHandle& operator=(const ReceiveListenerHandle& rhs);
00037 
00038     ~ReceiveListenerHandle()
00039     {
00040       if (listener_) listener_->listener_remove_ref();
00041     }
00042 
00043   private:
00044     typedef void (ReceiveListenerHandle::*bool_t)() const;
00045     void no_implicit_conversion() const {}
00046 
00047   public:
00048     operator bool_t() const
00049     {
00050       return listener_ ? &ReceiveListenerHandle::no_implicit_conversion : 0;
00051     }
00052 
00053     TransportReceiveListener* operator->() const
00054     {
00055       return listener_;
00056     }
00057 
00058     TransportReceiveListener* listener_;
00059   };
00060 
00061   void swap(ReceiveListenerHandle& lhs, ReceiveListenerHandle& rhs)
00062   {
00063     std::swap(lhs.listener_, rhs.listener_);
00064   }
00065 
00066   ReceiveListenerHandle&
00067   ReceiveListenerHandle::operator=(const ReceiveListenerHandle& rhs)
00068   {
00069     ReceiveListenerHandle cpy(rhs);
00070     swap(*this, cpy);
00071     return *this;
00072   }
00073 }
00074 
00075 ReceiveListenerSet::~ReceiveListenerSet()
00076 {
00077   DBG_ENTRY_LVL("ReceiveListenerSet","~ReceiveListenerSet",6);
00078 }
00079 
00080 bool
00081 ReceiveListenerSet::exist(const RepoId& local_id, bool& last)
00082 {
00083   GuardType guard(this->lock_);
00084 
00085   last = true;
00086 
00087   TransportReceiveListener* listener = 0;
00088 
00089   if (find(map_, local_id, listener) == -1) {
00090     GuidConverter converter(local_id);
00091     ACE_ERROR((LM_ERROR,
00092                ACE_TEXT("(%P|%t) ReceiveListenerSet::exist: ")
00093                ACE_TEXT("could not find local %C.\n"),
00094                OPENDDS_STRING(converter).c_str()));
00095 
00096     return false;
00097   }
00098 
00099   if (listener == 0) {
00100     GuidConverter converter(local_id);
00101     ACE_ERROR((LM_ERROR,
00102                ACE_TEXT("(%P|%t) ReceiveListenerSet::exist: ")
00103                ACE_TEXT("listener for local %C is nil.\n"),
00104                OPENDDS_STRING(converter).c_str()));
00105 
00106     return false;
00107   }
00108 
00109   last = map_.size() == 1;
00110   return true;
00111 }
00112 
00113 void
00114 ReceiveListenerSet::get_keys(ReaderIdSeq & ids)
00115 {
00116   GuardType guard(this->lock_);
00117 
00118   for (MapType::iterator iter = map_.begin();
00119        iter != map_.end(); ++ iter) {
00120     push_back(ids, iter->first);
00121   }
00122 }
00123 
00124 bool
00125 ReceiveListenerSet::exist(const RepoId& local_id)
00126 {
00127   GuardType guard(this->lock_);
00128 
00129   TransportReceiveListener* listener = 0;
00130   return (find(map_, local_id, listener) == -1 ? false : true);
00131 }
00132 
00133 void
00134 ReceiveListenerSet::clear()
00135 {
00136   GuardType guard(this->lock_);
00137   this->map_.clear();
00138 }
00139 
00140 void
00141 ReceiveListenerSet::data_received(const ReceivedDataSample& sample,
00142                                   const RepoIdSet& incl_excl,
00143                                   ConstrainReceiveSet constrain)
00144 {
00145   DBG_ENTRY_LVL("ReceiveListenerSet", "data_received", 6);
00146   OPENDDS_VECTOR(ReceiveListenerHandle) handles;
00147   {
00148     GuardType guard(this->lock_);
00149     for (MapType::iterator itr = map_.begin(); itr != map_.end(); ++itr) {
00150       if (constrain == ReceiveListenerSet::SET_EXCLUDED) {
00151         if (itr->second && incl_excl.count(itr->first) == 0) {
00152           handles.push_back(ReceiveListenerHandle(itr->second));
00153         }
00154       } else if (constrain == ReceiveListenerSet::SET_INCLUDED) { //SET_INCLUDED
00155         if (itr->second && incl_excl.count(itr->first) != 0) {
00156           handles.push_back(ReceiveListenerHandle(itr->second));
00157         }
00158       } else {
00159         ACE_DEBUG((LM_ERROR, "(%P|%t) ERROR: ReceiveListenerSet::data_received - NOTHING\n"));
00160       }
00161     }
00162   }
00163 
00164   for (size_t i = 0; i < handles.size(); ++i) {
00165     if (i < handles.size() - 1 && sample.sample_) {
00166       // demarshal (in data_received()) updates the rd_ptr() of any of
00167       // the message blocks in the chain, so give it a duplicated chain.
00168       ReceivedDataSample rds(sample);
00169       handles[i]->data_received(rds);
00170     } else {
00171       handles[i]->data_received(sample);
00172     }
00173   }
00174 }
00175 
00176 void
00177 ReceiveListenerSet::data_received(const ReceivedDataSample& sample,
00178                                   const RepoId& readerId)
00179 {
00180   DBG_ENTRY_LVL("ReceiveListenerSet", "data_received(sample, readerId)", 6);
00181   ReceiveListenerHandle h(0);
00182   {
00183     GuardType guard(this->lock_);
00184     MapType::iterator itr = map_.find(readerId);
00185     if (itr != map_.end() && itr->second) {
00186       h = ReceiveListenerHandle(itr->second);
00187     }
00188   }
00189   if (h) h->data_received(sample);
00190 }
00191 
00192 }
00193 }

Generated on Fri Feb 12 20:05:25 2016 for OpenDDS by  doxygen 1.4.7