Line data Source code
1 : /* 2 : * 3 : * 4 : * Distributed under the OpenDDS License. 5 : * See: http://www.opendds.org/license.html 6 : */ 7 : 8 : #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/ 9 : 10 : #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC 11 : #include "ContentFilteredTopicImpl.h" 12 : #include "DataReaderImpl.h" 13 : 14 : #include <cstring> 15 : 16 : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL 17 : 18 : namespace OpenDDS { 19 : namespace DCPS { 20 : 21 0 : ContentFilteredTopicImpl::ContentFilteredTopicImpl(const char* name, 22 : DDS::Topic_ptr related_topic, const char* filter_expression, 23 0 : DomainParticipantImpl* participant) 24 : : TopicDescriptionImpl(name, 25 0 : CORBA::String_var(related_topic->get_type_name()), 26 0 : dynamic_cast<TopicDescriptionImpl*>(related_topic)->get_type_support(), 27 : participant) 28 0 : , filter_expression_(filter_expression) 29 0 : , filter_eval_(filter_expression, false /*allowOrderBy*/) 30 0 : , related_topic_(DDS::Topic::_duplicate(related_topic)) 31 : { 32 0 : if (DCPS_debug_level > 5) { 33 0 : ACE_DEBUG((LM_DEBUG, 34 : ACE_TEXT("(%P|%t) ContentFilteredTopicImpl::ContentFilteredTopicImpl() - ") 35 : ACE_TEXT("Creating cft with filter <%C> which requires <%d> parameters\n"), 36 : filter_expression, filter_eval_.number_parameters())); 37 : } 38 0 : } 39 : 40 0 : char* ContentFilteredTopicImpl::get_filter_expression() 41 : { 42 0 : return CORBA::string_dup(filter_expression_.c_str()); 43 : } 44 : 45 : DDS::ReturnCode_t 46 0 : ContentFilteredTopicImpl::get_expression_parameters(DDS::StringSeq& params) 47 : { 48 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, lock_, 49 : DDS::RETCODE_OUT_OF_RESOURCES); 50 0 : params = expression_parameters_; 51 0 : return DDS::RETCODE_OK; 52 0 : } 53 : 54 : namespace { 55 0 : bool string_equal(const char* a, const char* b) { 56 0 : return std::strcmp(a, b) == 0; 57 : } 58 : } 59 : 60 : DDS::ReturnCode_t 61 0 : ContentFilteredTopicImpl::set_expression_parameters(const DDS::StringSeq& p) 62 : { 63 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, lock_, 64 : DDS::RETCODE_OUT_OF_RESOURCES); 65 : 66 0 : const CORBA::ULong len = p.length(); 67 : 68 : // Check sequence of strings that give values to the 'parameters' (i.e., "%n" tokens) 69 : // in the filter_expression matches the size of the parameter sequence. 70 : // The tokens start with 0 which means that when the maximum number used is 1 we need 71 : // two parameters: %0 and %1 72 0 : if (len != filter_eval_.number_parameters()) { 73 0 : if (DCPS_debug_level > 1) { 74 0 : ACE_ERROR((LM_ERROR, 75 : ACE_TEXT("(%P|%t) ContentFilteredTopicImpl::set_expression_parameters() - ") 76 : ACE_TEXT("passed incorrect set of filter parameters, expected %d received %d\n"), 77 : filter_eval_.number_parameters(), len)); 78 : } 79 0 : return DDS::RETCODE_ERROR; 80 : } 81 : 82 0 : if (len == expression_parameters_.length()) { 83 0 : const char* const* p_buf = p.get_buffer(); 84 0 : char* const* e_buf = expression_parameters_.get_buffer(); 85 : #ifdef _MSC_VER 86 : #pragma warning(push) 87 : #pragma warning(disable : 4996) 88 : #endif 89 0 : if (std::equal(&p_buf[0], &p_buf[len], &e_buf[0], string_equal)) { 90 : // no change, bail out now to avoid remote InfoRepo calls 91 0 : return DDS::RETCODE_OK; 92 : } 93 : #ifdef _MSC_VER 94 : #pragma warning(pop) 95 : #endif 96 : } 97 : 98 0 : expression_parameters_ = p; 99 : 100 0 : Readers readers_still_alive; 101 : 102 0 : for (Readers::iterator iter = readers_.begin(), 103 0 : end = readers_.end(); iter != end; ++iter) { 104 0 : DataReaderImpl_rch reader = iter->lock(); 105 0 : if (reader) { 106 0 : reader->update_subscription_params(p); 107 0 : readers_still_alive.push_back(*iter); 108 : } 109 0 : } 110 : 111 : using namespace std; 112 0 : swap(readers_, readers_still_alive); 113 : 114 0 : return DDS::RETCODE_OK; 115 0 : } 116 : 117 : DDS::Topic_ptr 118 0 : ContentFilteredTopicImpl::get_related_topic() 119 : { 120 0 : return DDS::Topic::_duplicate(related_topic_); 121 : } 122 : 123 : void 124 0 : ContentFilteredTopicImpl::add_reader(DataReaderImpl& reader) 125 : { 126 : // readers_ does not own or reference-count the reader because 127 : // the readers reference this CFT and this CFT can't be removed 128 : // until all readers are gone (DomainParticipant::delete_contentfilteredtopic) 129 0 : ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, lock_); 130 0 : readers_.push_back(reader); 131 0 : } 132 : 133 : void 134 0 : ContentFilteredTopicImpl::remove_reader(DataReaderImpl& reader) 135 : { 136 0 : ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, lock_); 137 0 : Readers::iterator end = readers_.end(); 138 0 : readers_.erase(std::remove(readers_.begin(), end, reader), end); 139 0 : } 140 : 141 : } // namespace DCPS 142 : } // namespace OpenDDS 143 : 144 : OPENDDS_END_VERSIONED_NAMESPACE_DECL 145 : 146 : #endif // OPENDDS_NO_CONTENT_FILTERED_TOPIC