ContentFilteredTopicImpl.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
00009 
00010 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00011 #include "ContentFilteredTopicImpl.h"
00012 #include "DataReaderImpl.h"
00013 
00014 #include <cstring>
00015 
00016 namespace OpenDDS {
00017 namespace DCPS {
00018 
00019 ContentFilteredTopicImpl::ContentFilteredTopicImpl(const char* name,
00020   DDS::Topic_ptr related_topic, const char* filter_expression,
00021   const DDS::StringSeq& expression_parameters,
00022   DomainParticipantImpl* participant)
00023   : TopicDescriptionImpl(name,
00024       CORBA::String_var(related_topic->get_type_name()),
00025       dynamic_cast<TopicDescriptionImpl*>(related_topic)->get_type_support(),
00026       participant)
00027   , filter_expression_(filter_expression)
00028   , filter_eval_(filter_expression, false /*allowOrderBy*/)
00029   , expression_parameters_(expression_parameters)
00030   , related_topic_(DDS::Topic::_duplicate(related_topic))
00031 {}
00032 
00033 char* ContentFilteredTopicImpl::get_filter_expression()
00034 {
00035   return CORBA::string_dup(filter_expression_.c_str());
00036 }
00037 
00038 DDS::ReturnCode_t
00039 ContentFilteredTopicImpl::get_expression_parameters(DDS::StringSeq& params)
00040 {
00041   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, lock_,
00042     DDS::RETCODE_OUT_OF_RESOURCES);
00043   params = expression_parameters_;
00044   return DDS::RETCODE_OK;
00045 }
00046 
00047 namespace {
00048   bool string_equal(const char* a, const char* b) {
00049     return std::strcmp(a, b) == 0;
00050   }
00051 }
00052 
00053 DDS::ReturnCode_t
00054 ContentFilteredTopicImpl::set_expression_parameters(const DDS::StringSeq& p)
00055 {
00056   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, lock_,
00057     DDS::RETCODE_OUT_OF_RESOURCES);
00058 
00059   const CORBA::ULong len = p.length();
00060   if (len == expression_parameters_.length()) {
00061     const char* const* p_buf = p.get_buffer();
00062     char* const* e_buf = expression_parameters_.get_buffer();
00063 #ifdef _MSC_VER
00064 #pragma warning(push)
00065 #pragma warning(disable : 4996)
00066 #endif
00067     if (std::equal(&p_buf[0], &p_buf[len], &e_buf[0], string_equal)) {
00068       // no change, bail out now to avoid remote InfoRepo calls
00069       return DDS::RETCODE_OK;
00070     }
00071 #ifdef _MSC_VER
00072 #pragma warning(pop)
00073 #endif
00074   }
00075 
00076   expression_parameters_ = p;
00077 
00078   for (std::vector<DataReaderImpl*>::iterator iter = readers_.begin(),
00079        end = readers_.end(); iter != end; ++iter) {
00080     (*iter)->update_subscription_params(p);
00081   }
00082 
00083   return DDS::RETCODE_OK;
00084 }
00085 
00086 DDS::Topic_ptr
00087 ContentFilteredTopicImpl::get_related_topic()
00088 {
00089   return DDS::Topic::_duplicate(related_topic_);
00090 }
00091 
00092 void
00093 ContentFilteredTopicImpl::add_reader(DataReaderImpl& reader)
00094 {
00095   // readers_ does not own or reference-count the reader because
00096   // the readers reference this CFT and this CFT can't be removed
00097   // until all readers are gone (DomainParticipant::delete_contentfilteredtopic)
00098   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, lock_);
00099   readers_.push_back(&reader);
00100 }
00101 
00102 void
00103 ContentFilteredTopicImpl::remove_reader(DataReaderImpl& reader)
00104 {
00105   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, lock_);
00106   std::vector<DataReaderImpl*>::iterator end = readers_.end();
00107   readers_.erase(std::remove(readers_.begin(), end, &reader), end);
00108 }
00109 
00110 } // namespace DCPS
00111 } // namespace OpenDDS
00112 
00113 #endif // OPENDDS_NO_CONTENT_FILTERED_TOPIC

Generated on Fri Feb 12 20:05:19 2016 for OpenDDS by  doxygen 1.4.7