OpenDDS  Snapshot(2023/04/28-20:55)
ContentFilteredTopicImpl.cpp
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
9 
10 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
12 #include "DataReaderImpl.h"
13 
14 #include <cstring>
15 
17 
18 namespace OpenDDS {
19 namespace DCPS {
20 
22  DDS::Topic_ptr related_topic, const char* filter_expression,
23  DomainParticipantImpl* participant)
24  : TopicDescriptionImpl(name,
25  CORBA::String_var(related_topic->get_type_name()),
26  dynamic_cast<TopicDescriptionImpl*>(related_topic)->get_type_support(),
27  participant)
28  , filter_expression_(filter_expression)
29  , filter_eval_(filter_expression, false /*allowOrderBy*/)
30  , related_topic_(DDS::Topic::_duplicate(related_topic))
31 {
32  if (DCPS_debug_level > 5) {
34  ACE_TEXT("(%P|%t) ContentFilteredTopicImpl::ContentFilteredTopicImpl() - ")
35  ACE_TEXT("Creating cft with filter <%C> which requires <%d> parameters\n"),
36  filter_expression, filter_eval_.number_parameters()));
37  }
38 }
39 
41 {
42  return CORBA::string_dup(filter_expression_.c_str());
43 }
44 
47 {
50  params = expression_parameters_;
51  return DDS::RETCODE_OK;
52 }
53 
54 namespace {
55  bool string_equal(const char* a, const char* b) {
56  return std::strcmp(a, b) == 0;
57  }
58 }
59 
62 {
65 
66  const CORBA::ULong len = p.length();
67 
68  // Check sequence of strings that give values to the 'parameters' (i.e., "%n" tokens)
69  // in the filter_expression matches the size of the parameter sequence.
70  // The tokens start with 0 which means that when the maximum number used is 1 we need
71  // two parameters: %0 and %1
72  if (len != filter_eval_.number_parameters()) {
73  if (DCPS_debug_level > 1) {
75  ACE_TEXT("(%P|%t) ContentFilteredTopicImpl::set_expression_parameters() - ")
76  ACE_TEXT("passed incorrect set of filter parameters, expected %d received %d\n"),
78  }
79  return DDS::RETCODE_ERROR;
80  }
81 
82  if (len == expression_parameters_.length()) {
83  const char* const* p_buf = p.get_buffer();
84  char* const* e_buf = expression_parameters_.get_buffer();
85 #ifdef _MSC_VER
86 #pragma warning(push)
87 #pragma warning(disable : 4996)
88 #endif
89  if (std::equal(&p_buf[0], &p_buf[len], &e_buf[0], string_equal)) {
90  // no change, bail out now to avoid remote InfoRepo calls
91  return DDS::RETCODE_OK;
92  }
93 #ifdef _MSC_VER
94 #pragma warning(pop)
95 #endif
96  }
97 
99 
100  Readers readers_still_alive;
101 
102  for (Readers::iterator iter = readers_.begin(),
103  end = readers_.end(); iter != end; ++iter) {
104  DataReaderImpl_rch reader = iter->lock();
105  if (reader) {
106  reader->update_subscription_params(p);
107  readers_still_alive.push_back(*iter);
108  }
109  }
110 
111  using namespace std;
112  swap(readers_, readers_still_alive);
113 
114  return DDS::RETCODE_OK;
115 }
116 
117 DDS::Topic_ptr
119 {
120  return DDS::Topic::_duplicate(related_topic_);
121 }
122 
123 void
125 {
126  // readers_ does not own or reference-count the reader because
127  // the readers reference this CFT and this CFT can't be removed
128  // until all readers are gone (DomainParticipant::delete_contentfilteredtopic)
130  readers_.push_back(reader);
131 }
132 
133 void
135 {
137  Readers::iterator end = readers_.end();
138  readers_.erase(std::remove(readers_.begin(), end, reader), end);
139 }
140 
141 } // namespace DCPS
142 } // namespace OpenDDS
143 
145 
146 #endif // OPENDDS_NO_CONTENT_FILTERED_TOPIC
void swap(MessageBlock &lhs, MessageBlock &rhs)
Implements the OpenDDS::DCPS::DomainParticipant interfaces.
#define ACE_DEBUG(X)
#define ACE_ERROR(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
ACE_Recursive_Thread_Mutex lock_
Concurrent access to expression_parameters_ and readers_.
T * _duplicate(T *st)
DDS::ReturnCode_t get_expression_parameters(DDS::StringSeq &parameters)
char * string_dup(const char *)
LM_DEBUG
ACE_CDR::ULong ULong
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
STL namespace.
Implements the DDS::DataReader interface.
const ReturnCode_t RETCODE_OUT_OF_RESOURCES
The End User API.
const char *const name
Definition: debug.cpp:60
ACE_TEXT("TCP_Factory")
ContentFilteredTopicImpl(const char *name, DDS::Topic_ptr related_topic, const char *filter_expression, DomainParticipantImpl *participant)
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
Implements the DDS::TopicDescription interface.
const ReturnCode_t RETCODE_ERROR
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
DDS::ReturnCode_t set_expression_parameters(const DDS::StringSeq &parameters)
int remove(Container &c, const ValueType &v)
Definition: Util.h:121
const ReturnCode_t RETCODE_OK
LM_ERROR
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
sequence< string > StringSeq
Definition: DdsDcpsCore.idl:50