ContentFilteredTopicImpl.cpp
Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008 #include "DCPS/DdsDcps_pch.h"
00009
00010 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00011 #include "ContentFilteredTopicImpl.h"
00012 #include "DataReaderImpl.h"
00013
00014 #include <cstring>
00015
00016 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00017
00018 namespace OpenDDS {
00019 namespace DCPS {
00020
00021 ContentFilteredTopicImpl::ContentFilteredTopicImpl(const char* name,
00022 DDS::Topic_ptr related_topic, const char* filter_expression,
00023 DomainParticipantImpl* participant)
00024 : TopicDescriptionImpl(name,
00025 CORBA::String_var(related_topic->get_type_name()),
00026 dynamic_cast<TopicDescriptionImpl*>(related_topic)->get_type_support(),
00027 participant)
00028 , filter_expression_(filter_expression)
00029 , filter_eval_(filter_expression, false )
00030 , related_topic_(DDS::Topic::_duplicate(related_topic))
00031 {
00032 if (DCPS_debug_level > 5) {
00033 ACE_DEBUG((LM_DEBUG,
00034 ACE_TEXT("(%P|%t) ContentFilteredTopicImpl::ContentFilteredTopicImpl() - ")
00035 ACE_TEXT("Creating cft with filter <%C> which requires <%d> parameters\n"),
00036 filter_expression, filter_eval_.number_parameters()));
00037 }
00038 }
00039
00040 char* ContentFilteredTopicImpl::get_filter_expression()
00041 {
00042 return CORBA::string_dup(filter_expression_.c_str());
00043 }
00044
00045 DDS::ReturnCode_t
00046 ContentFilteredTopicImpl::get_expression_parameters(DDS::StringSeq& params)
00047 {
00048 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, lock_,
00049 DDS::RETCODE_OUT_OF_RESOURCES);
00050 params = expression_parameters_;
00051 return DDS::RETCODE_OK;
00052 }
00053
00054 namespace {
00055 bool string_equal(const char* a, const char* b) {
00056 return std::strcmp(a, b) == 0;
00057 }
00058 }
00059
00060 DDS::ReturnCode_t
00061 ContentFilteredTopicImpl::set_expression_parameters(const DDS::StringSeq& p)
00062 {
00063 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, lock_,
00064 DDS::RETCODE_OUT_OF_RESOURCES);
00065
00066 const CORBA::ULong len = p.length();
00067
00068
00069
00070
00071
00072 if (len != filter_eval_.number_parameters()) {
00073 if (DCPS_debug_level > 1) {
00074 ACE_ERROR((LM_ERROR,
00075 ACE_TEXT("(%P|%t) ContentFilteredTopicImpl::set_expression_parameters() - ")
00076 ACE_TEXT("passed incorrect set of filter parameters, expected %d received %d\n"),
00077 filter_eval_.number_parameters(), len));
00078 }
00079 return DDS::RETCODE_ERROR;
00080 }
00081
00082 if (len == expression_parameters_.length()) {
00083 const char* const* p_buf = p.get_buffer();
00084 char* const* e_buf = expression_parameters_.get_buffer();
00085 #ifdef _MSC_VER
00086 #pragma warning(push)
00087 #pragma warning(disable : 4996)
00088 #endif
00089 if (std::equal(&p_buf[0], &p_buf[len], &e_buf[0], string_equal)) {
00090
00091 return DDS::RETCODE_OK;
00092 }
00093 #ifdef _MSC_VER
00094 #pragma warning(pop)
00095 #endif
00096 }
00097
00098 expression_parameters_ = p;
00099
00100 Readers readers_still_alive;
00101
00102 for (Readers::iterator iter = readers_.begin(),
00103 end = readers_.end(); iter != end; ++iter) {
00104 DataReaderImpl_rch reader = iter->lock();
00105 if (reader) {
00106 reader->update_subscription_params(p);
00107 readers_still_alive.push_back(*iter);
00108 }
00109 }
00110
00111 using namespace std;
00112 swap(readers_, readers_still_alive);
00113
00114 return DDS::RETCODE_OK;
00115 }
00116
00117 DDS::Topic_ptr
00118 ContentFilteredTopicImpl::get_related_topic()
00119 {
00120 return DDS::Topic::_duplicate(related_topic_);
00121 }
00122
00123 void
00124 ContentFilteredTopicImpl::add_reader(DataReaderImpl& reader)
00125 {
00126
00127
00128
00129 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, lock_);
00130 readers_.push_back(reader);
00131 }
00132
00133 void
00134 ContentFilteredTopicImpl::remove_reader(DataReaderImpl& reader)
00135 {
00136 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, lock_);
00137 Readers::iterator end = readers_.end();
00138 readers_.erase(std::remove(readers_.begin(), end, reader), end);
00139 }
00140
00141 }
00142 }
00143
00144 OPENDDS_END_VERSIONED_NAMESPACE_DECL
00145
00146 #endif // OPENDDS_NO_CONTENT_FILTERED_TOPIC
00147