ContentFilteredTopicImpl.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
00009 
00010 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00011 #include "ContentFilteredTopicImpl.h"
00012 #include "DataReaderImpl.h"
00013 
00014 #include <cstring>
00015 
00016 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00017 
00018 namespace OpenDDS {
00019 namespace DCPS {
00020 
00021 ContentFilteredTopicImpl::ContentFilteredTopicImpl(const char* name,
00022   DDS::Topic_ptr related_topic, const char* filter_expression,
00023   DomainParticipantImpl* participant)
00024   : TopicDescriptionImpl(name,
00025       CORBA::String_var(related_topic->get_type_name()),
00026       dynamic_cast<TopicDescriptionImpl*>(related_topic)->get_type_support(),
00027       participant)
00028   , filter_expression_(filter_expression)
00029   , filter_eval_(filter_expression, false /*allowOrderBy*/)
00030   , related_topic_(DDS::Topic::_duplicate(related_topic))
00031 {
00032   if (DCPS_debug_level > 5) {
00033     ACE_DEBUG((LM_DEBUG,
00034       ACE_TEXT("(%P|%t) ContentFilteredTopicImpl::ContentFilteredTopicImpl() - ")
00035       ACE_TEXT("Creating cft with filter <%C> which requires <%d> parameters\n"),
00036       filter_expression, filter_eval_.number_parameters()));
00037   }
00038 }
00039 
00040 char* ContentFilteredTopicImpl::get_filter_expression()
00041 {
00042   return CORBA::string_dup(filter_expression_.c_str());
00043 }
00044 
00045 DDS::ReturnCode_t
00046 ContentFilteredTopicImpl::get_expression_parameters(DDS::StringSeq& params)
00047 {
00048   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, lock_,
00049     DDS::RETCODE_OUT_OF_RESOURCES);
00050   params = expression_parameters_;
00051   return DDS::RETCODE_OK;
00052 }
00053 
00054 namespace {
00055   bool string_equal(const char* a, const char* b) {
00056     return std::strcmp(a, b) == 0;
00057   }
00058 }
00059 
00060 DDS::ReturnCode_t
00061 ContentFilteredTopicImpl::set_expression_parameters(const DDS::StringSeq& p)
00062 {
00063   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, lock_,
00064     DDS::RETCODE_OUT_OF_RESOURCES);
00065 
00066   const CORBA::ULong len = p.length();
00067 
00068   // Check sequence of strings that give values to the 'parameters' (i.e., "%n" tokens)
00069   // in the filter_expression matches the size of the parameter sequence.
00070   // The tokens start with 0 which means that when the maximum number used is 1 we need
00071   // two parameters: %0 and %1
00072   if (len != filter_eval_.number_parameters()) {
00073     if (DCPS_debug_level > 1) {
00074         ACE_ERROR((LM_ERROR,
00075           ACE_TEXT("(%P|%t) ContentFilteredTopicImpl::set_expression_parameters() - ")
00076           ACE_TEXT("passed incorrect set of filter parameters, expected %d received %d\n"),
00077           filter_eval_.number_parameters(), len));
00078     }
00079     return DDS::RETCODE_ERROR;
00080   }
00081 
00082   if (len == expression_parameters_.length()) {
00083     const char* const* p_buf = p.get_buffer();
00084     char* const* e_buf = expression_parameters_.get_buffer();
00085 #ifdef _MSC_VER
00086 #pragma warning(push)
00087 #pragma warning(disable : 4996)
00088 #endif
00089     if (std::equal(&p_buf[0], &p_buf[len], &e_buf[0], string_equal)) {
00090       // no change, bail out now to avoid remote InfoRepo calls
00091       return DDS::RETCODE_OK;
00092     }
00093 #ifdef _MSC_VER
00094 #pragma warning(pop)
00095 #endif
00096   }
00097 
00098   expression_parameters_ = p;
00099 
00100   Readers readers_still_alive;
00101 
00102   for (Readers::iterator iter = readers_.begin(),
00103        end = readers_.end(); iter != end; ++iter) {
00104     DataReaderImpl_rch reader = iter->lock();
00105     if (reader) {
00106       reader->update_subscription_params(p);
00107       readers_still_alive.push_back(*iter);
00108     }
00109   }
00110 
00111   using namespace std;
00112   swap(readers_, readers_still_alive);
00113 
00114   return DDS::RETCODE_OK;
00115 }
00116 
00117 DDS::Topic_ptr
00118 ContentFilteredTopicImpl::get_related_topic()
00119 {
00120   return DDS::Topic::_duplicate(related_topic_);
00121 }
00122 
00123 void
00124 ContentFilteredTopicImpl::add_reader(DataReaderImpl& reader)
00125 {
00126   // readers_ does not own or reference-count the reader because
00127   // the readers reference this CFT and this CFT can't be removed
00128   // until all readers are gone (DomainParticipant::delete_contentfilteredtopic)
00129   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, lock_);
00130   readers_.push_back(reader);
00131 }
00132 
00133 void
00134 ContentFilteredTopicImpl::remove_reader(DataReaderImpl& reader)
00135 {
00136   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, lock_);
00137   Readers::iterator end = readers_.end();
00138   readers_.erase(std::remove(readers_.begin(), end, reader), end);
00139 }
00140 
00141 } // namespace DCPS
00142 } // namespace OpenDDS
00143 
00144 OPENDDS_END_VERSIONED_NAMESPACE_DECL
00145 
00146 #endif // OPENDDS_NO_CONTENT_FILTERED_TOPIC
00147 
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1