Line data Source code
1 : /* 2 : * Distributed under the OpenDDS License. 3 : * See: http://www.opendds.org/license.html 4 : */ 5 : 6 : #include "TransactionalRtpsSendQueue.h" 7 : 8 : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL 9 : 10 : namespace OpenDDS { 11 : namespace DCPS { 12 : 13 6 : TransactionalRtpsSendQueue::TransactionalRtpsSendQueue() 14 6 : : ready_to_send_(false) 15 6 : , active_transaction_count_(0) 16 : { 17 6 : } 18 : 19 10 : bool TransactionalRtpsSendQueue::enqueue(const MetaSubmessage& ms) 20 : { 21 10 : ACE_Guard<ACE_Thread_Mutex> guard(mutex_); 22 10 : const bool empty_before = queue_.empty(); 23 10 : queue_.push_back(ms); 24 10 : return empty_before; 25 10 : } 26 : 27 2 : bool TransactionalRtpsSendQueue::enqueue(const MetaSubmessageVec& vec) 28 : { 29 2 : ACE_Guard<ACE_Thread_Mutex> guard(mutex_); 30 2 : const bool empty_before = queue_.empty(); 31 4 : for (MetaSubmessageVec::const_iterator it = vec.begin(), limit = vec.end(); it != limit; ++it) { 32 2 : queue_.push_back(*it); 33 : } 34 4 : return empty_before && !queue_.empty(); 35 2 : } 36 : 37 7 : void TransactionalRtpsSendQueue::begin_transaction() 38 : { 39 7 : ACE_Guard<ACE_Thread_Mutex> guard(mutex_); 40 7 : ++active_transaction_count_; 41 7 : } 42 : 43 6 : void TransactionalRtpsSendQueue::ready_to_send() 44 : { 45 6 : ACE_Guard<ACE_Thread_Mutex> guard(mutex_); 46 6 : ready_to_send_ = true; 47 6 : } 48 : 49 : 50 7 : void TransactionalRtpsSendQueue::end_transaction(MetaSubmessageVec& vec) 51 : { 52 7 : vec.clear(); 53 : 54 7 : ACE_Guard<ACE_Thread_Mutex> guard(mutex_); 55 7 : --active_transaction_count_; 56 7 : if (active_transaction_count_ == 0 && ready_to_send_) { 57 6 : queue_.swap(vec); 58 6 : ready_to_send_ = false; 59 : } 60 7 : } 61 : 62 1 : void TransactionalRtpsSendQueue::ignore(const GUID_t& local, const GUID_t& remote) 63 : { 64 1 : ACE_Guard<ACE_Thread_Mutex> guard(mutex_); 65 3 : for (MetaSubmessageVec::iterator pos = queue_.begin(), limit = queue_.end(); pos != limit; ++pos) { 66 2 : if (pos->src_guid_ == local && pos->dst_guid_ == remote) { 67 1 : pos->ignore_ = true; 68 : } 69 : } 70 1 : } 71 : 72 1 : void TransactionalRtpsSendQueue::ignore_remote(const GUID_t& id) 73 : { 74 1 : ACE_Guard<ACE_Thread_Mutex> guard(mutex_); 75 3 : for (MetaSubmessageVec::iterator pos = queue_.begin(), limit = queue_.end(); pos != limit; ++pos) { 76 2 : if (pos->dst_guid_ == id) { 77 1 : pos->ignore_ = true; 78 : } 79 : } 80 1 : } 81 : 82 1 : void TransactionalRtpsSendQueue::ignore_local(const GUID_t& id) 83 : { 84 1 : ACE_Guard<ACE_Thread_Mutex> guard(mutex_); 85 3 : for (MetaSubmessageVec::iterator pos = queue_.begin(), limit = queue_.end(); pos != limit; ++pos) { 86 2 : if (pos->src_guid_ == id) { 87 1 : pos->ignore_ = true; 88 : } 89 : } 90 1 : } 91 : 92 : } // namespace DCPS 93 : } // namespace OpenDDS 94 : 95 : OPENDDS_END_VERSIONED_NAMESPACE_DECL