00001
00002
00003
00004
00005
00006
00007
00008 #include "DCPS/DdsDcps_pch.h"
00009
00010 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00011 #include "ContentFilteredTopicImpl.h"
00012 #include "DataReaderImpl.h"
00013
00014 #include <cstring>
00015
00016 namespace OpenDDS {
00017 namespace DCPS {
00018
00019 ContentFilteredTopicImpl::ContentFilteredTopicImpl(const char* name,
00020 DDS::Topic_ptr related_topic, const char* filter_expression,
00021 const DDS::StringSeq& expression_parameters,
00022 DomainParticipantImpl* participant)
00023 : TopicDescriptionImpl(name,
00024 CORBA::String_var(related_topic->get_type_name()),
00025 dynamic_cast<TopicDescriptionImpl*>(related_topic)->get_type_support(),
00026 participant)
00027 , filter_expression_(filter_expression)
00028 , filter_eval_(filter_expression, false )
00029 , expression_parameters_(expression_parameters)
00030 , related_topic_(DDS::Topic::_duplicate(related_topic))
00031 {}
00032
00033 char* ContentFilteredTopicImpl::get_filter_expression()
00034 {
00035 return CORBA::string_dup(filter_expression_.c_str());
00036 }
00037
00038 DDS::ReturnCode_t
00039 ContentFilteredTopicImpl::get_expression_parameters(DDS::StringSeq& params)
00040 {
00041 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, lock_,
00042 DDS::RETCODE_OUT_OF_RESOURCES);
00043 params = expression_parameters_;
00044 return DDS::RETCODE_OK;
00045 }
00046
00047 namespace {
00048 bool string_equal(const char* a, const char* b) {
00049 return std::strcmp(a, b) == 0;
00050 }
00051 }
00052
00053 DDS::ReturnCode_t
00054 ContentFilteredTopicImpl::set_expression_parameters(const DDS::StringSeq& p)
00055 {
00056 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, lock_,
00057 DDS::RETCODE_OUT_OF_RESOURCES);
00058
00059 const CORBA::ULong len = p.length();
00060 if (len == expression_parameters_.length()) {
00061 const char* const* p_buf = p.get_buffer();
00062 char* const* e_buf = expression_parameters_.get_buffer();
00063 #ifdef _MSC_VER
00064 #pragma warning(push)
00065 #pragma warning(disable : 4996)
00066 #endif
00067 if (std::equal(&p_buf[0], &p_buf[len], &e_buf[0], string_equal)) {
00068
00069 return DDS::RETCODE_OK;
00070 }
00071 #ifdef _MSC_VER
00072 #pragma warning(pop)
00073 #endif
00074 }
00075
00076 expression_parameters_ = p;
00077
00078 for (std::vector<DataReaderImpl*>::iterator iter = readers_.begin(),
00079 end = readers_.end(); iter != end; ++iter) {
00080 (*iter)->update_subscription_params(p);
00081 }
00082
00083 return DDS::RETCODE_OK;
00084 }
00085
00086 DDS::Topic_ptr
00087 ContentFilteredTopicImpl::get_related_topic()
00088 {
00089 return DDS::Topic::_duplicate(related_topic_);
00090 }
00091
00092 void
00093 ContentFilteredTopicImpl::add_reader(DataReaderImpl& reader)
00094 {
00095
00096
00097
00098 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, lock_);
00099 readers_.push_back(&reader);
00100 }
00101
00102 void
00103 ContentFilteredTopicImpl::remove_reader(DataReaderImpl& reader)
00104 {
00105 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, lock_);
00106 std::vector<DataReaderImpl*>::iterator end = readers_.end();
00107 readers_.erase(std::remove(readers_.begin(), end, &reader), end);
00108 }
00109
00110 }
00111 }
00112
00113 #endif // OPENDDS_NO_CONTENT_FILTERED_TOPIC