OpenDDS  Snapshot(2023/04/07-19:43)
Public Member Functions | Private Member Functions | Private Attributes | List of all members
OpenDDS::DCPS::ContentFilteredTopicImpl Class Reference

#include <ContentFilteredTopicImpl.h>

Inheritance diagram for OpenDDS::DCPS::ContentFilteredTopicImpl:
Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::ContentFilteredTopicImpl:
Collaboration graph
[legend]

Public Member Functions

 ContentFilteredTopicImpl (const char *name, DDS::Topic_ptr related_topic, const char *filter_expression, DomainParticipantImpl *participant)
 
virtual ~ContentFilteredTopicImpl ()
 
char * get_filter_expression ()
 
DDS::ReturnCode_t get_expression_parameters (DDS::StringSeq &parameters)
 
DDS::ReturnCode_t set_expression_parameters (const DDS::StringSeq &parameters)
 
DDS::Topic_ptr get_related_topic ()
 
template<typename Sample >
bool filter (const Sample &s, bool sample_only_has_key_fields) const
 
void add_reader (DataReaderImpl &reader)
 
void remove_reader (DataReaderImpl &reader)
 
const char * get_filter_class_name () const
 
- Public Member Functions inherited from DDS::ContentFilteredTopic
ReturnCode_t get_expression_parameters (inout StringSeq params)
 
ReturnCode_t set_expression_parameters (in StringSeq expression_parameters)
 
- Public Member Functions inherited from OpenDDS::DCPS::LocalObjectBase
virtual void _add_ref ()
 
virtual void _remove_ref ()
 
virtual CORBA::ULong _refcount_value () const
 
- Public Member Functions inherited from CORBA::LocalObject
virtual ~LocalObject (void)
 
virtual CORBA::Boolean _non_existent (void)
 
virtual char * _repository_id (void)
 
virtual CORBA::InterfaceDef_ptr _get_interface (void)
 
virtual CORBA::Object_ptr _get_component (void)
 
virtual void _create_request (CORBA::Context_ptr ctx, const char *operation, CORBA::NVList_ptr arg_list, CORBA::NamedValue_ptr result, CORBA::Request_ptr &request, CORBA::Flags req_flags)
 
virtual void _create_request (CORBA::Context_ptr ctx, const char *operation, CORBA::NVList_ptr arg_list, CORBA::NamedValue_ptr result, CORBA::ExceptionList_ptr exclist, CORBA::ContextList_ptr ctxtlist, CORBA::Request_ptr &request, CORBA::Flags req_flags)
 
virtual CORBA::Request_ptr _request (const char *operation)
 
CORBA::Policy_ptr _get_policy (CORBA::PolicyType type)
 
CORBA::Policy_ptr _get_cached_policy (TAO_Cached_Policy_Type type)
 
CORBA::Object_ptr _set_policy_overrides (const CORBA::PolicyList &policies, CORBA::SetOverrideType set_add)
 
CORBA::PolicyList_get_policy_overrides (const CORBA::PolicyTypeSeq &types)
 
CORBA::Boolean _validate_connection (CORBA::PolicyList_out inconsistent_policies)
 
virtual CORBA::ULong _hash (CORBA::ULong maximum)
 
virtual CORBA::Boolean _is_equivalent (CORBA::Object_ptr other_obj)
 
virtual CORBA::ORB_ptr _get_orb (void)
 
virtual TAO::ObjectKey_key (void)
 
- Public Member Functions inherited from CORBA::Object
virtual ~Object (void)
 
virtual TAO_Abstract_ServantBase_servant (void) const
 
virtual CORBA::Boolean _is_collocated (void) const
 
virtual CORBA::Boolean _is_local (void) const
 
 Object (TAO_Stub *p, CORBA::Boolean collocated=false, TAO_Abstract_ServantBase *servant=0, TAO_ORB_Core *orb_core=0)
 
 Object (IOP::IOR *ior, TAO_ORB_Core *orb_core)
 
virtual TAO_Stub_stubobj (void) const
 
virtual TAO_Stub_stubobj (void)
 
virtual void _proxy_broker (TAO::Object_Proxy_Broker *proxy_broker)
 
virtual CORBA::Boolean marshal (TAO_OutputCDR &cdr)
 
CORBA::Boolean is_evaluated (void) const
 
TAO_ORB_Coreorb_core (void) const
 
IOP::IORsteal_ior (void)
 
const IOP::IORior (void) const
 
virtual bool can_convert_to_ior (void) const
 
virtual char * convert_to_ior (bool use_omg_ior_format, const char *ior_prefix) const
 
void _decr_refcount (void)
 
virtual CORBA::Boolean _is_a (const char *logical_type_id)
 
virtual const char * _interface_repository_id (void) const
 
CORBA::Policy_ptr _get_policy (CORBA::PolicyType type)
 
CORBA::Policy_ptr _get_cached_policy (TAO_Cached_Policy_Type type)
 
CORBA::Object_ptr _set_policy_overrides (const CORBA::PolicyList &policies, CORBA::SetOverrideType set_add)
 
CORBA::PolicyList_get_policy_overrides (const CORBA::PolicyTypeSeq &types)
 
CORBA::Boolean _validate_connection (CORBA::PolicyList_out inconsistent_policies)
 
- Public Member Functions inherited from OpenDDS::DCPS::RcObject
virtual ~RcObject ()
 
long ref_count () const
 
WeakObject_get_weak_object () const
 
- Public Member Functions inherited from OpenDDS::DCPS::TopicDescriptionImpl
 TopicDescriptionImpl (const char *topic_name, const char *type_name, TypeSupport_ptr type_support, DomainParticipantImpl *participant)
 
virtual ~TopicDescriptionImpl ()
 
virtual char * get_type_name ()
 
virtual char * get_name ()
 
virtual DDS::DomainParticipant_ptr get_participant ()
 
OpenDDS::DCPS::TypeSupport_ptr get_type_support ()
 
bool has_entity_refs () const
 
void add_entity_ref ()
 
void remove_entity_ref ()
 

Private Member Functions

typedef OPENDDS_VECTOR (WeakRcHandle< DataReaderImpl >) Readers
 

Private Attributes

OPENDDS_STRING filter_expression_
 
FilterEvaluator filter_eval_
 
DDS::StringSeq expression_parameters_
 
DDS::Topic_var related_topic_
 
Readers readers_
 
ACE_Recursive_Thread_Mutex lock_
 Concurrent access to expression_parameters_ and readers_. More...
 

Additional Inherited Members

- Public Types inherited from OpenDDS::DCPS::LocalObject< DDS::ContentFilteredTopic >
typedef DDS::ContentFilteredTopic ::_ptr_type _ptr_type
 
typedef DDS::ContentFilteredTopic ::_var_type _var_type
 
- Public Types inherited from CORBA::LocalObject
typedef LocalObject_ptr _ptr_type
 
typedef LocalObject_var _var_type
 
typedef LocalObject_out _out_type
 
- Public Types inherited from CORBA::Object
typedef Object_ptr _ptr_type
 
typedef Object_var _var_type
 
typedef Object_out _out_type
 
- Public Types inherited from OpenDDS::DCPS::LocalObject< DDS::TopicDescription >
typedef DDS::TopicDescription ::_ptr_type _ptr_type
 
typedef DDS::TopicDescription ::_var_type _var_type
 
- Static Public Member Functions inherited from OpenDDS::DCPS::LocalObject< DDS::ContentFilteredTopic >
static _ptr_type _narrow (CORBA::Object_ptr obj)
 
- Static Public Member Functions inherited from CORBA::LocalObject
static LocalObject_ptr _duplicate (LocalObject_ptr obj)
 
static LocalObject_ptr _nil (void)
 
static LocalObject_ptr _narrow (CORBA::Object_ptr obj)
 
- Static Public Member Functions inherited from CORBA::Object
static CORBA::Boolean marshal (const Object_ptr x, TAO_OutputCDR &cdr)
 
static void _tao_any_destructor (void *)
 
static CORBA::Boolean is_nil_i (CORBA::Object_ptr obj)
 
static void tao_object_initialize (Object *)
 
static CORBA::Object_ptr _duplicate (CORBA::Object_ptr obj)
 
static CORBA::Object_ptr _nil (void)
 
static CORBA::Object_ptr _narrow (CORBA::Object_ptr obj)
 
- Static Public Member Functions inherited from OpenDDS::DCPS::LocalObject< DDS::TopicDescription >
static _ptr_type _narrow (CORBA::Object_ptr obj)
 
- Protected Member Functions inherited from CORBA::LocalObject
 LocalObject (void)
 
- Protected Member Functions inherited from CORBA::Object
 Object (int dummy=0)
 
TAO::Object_Proxy_Brokerproxy_broker () const
 
- Protected Member Functions inherited from OpenDDS::DCPS::RcObject
 RcObject ()
 
- Protected Attributes inherited from CORBA::Object
ACE_Atomic_Op< TAO_SYNCH_MUTEX, unsigned long > refcount_
 
- Protected Attributes inherited from OpenDDS::DCPS::TopicDescriptionImpl
ACE_CString topic_name_
 The name of the topic. More...
 
ACE_CString type_name_
 The datatype of the topic. More...
 
DomainParticipantImplparticipant_
 The participant that creates this topic. More...
 
OpenDDS::DCPS::TypeSupport_var type_support_
 The type_support for this topic. More...
 
Atomic< ACE_UINT32 > entity_refs_
 The number of entities using this topic. More...
 

Detailed Description

Definition at line 29 of file ContentFilteredTopicImpl.h.

Constructor & Destructor Documentation

◆ ContentFilteredTopicImpl()

OpenDDS::DCPS::ContentFilteredTopicImpl::ContentFilteredTopicImpl ( const char *  name,
DDS::Topic_ptr  related_topic,
const char *  filter_expression,
DomainParticipantImpl participant 
)

Definition at line 21 of file ContentFilteredTopicImpl.cpp.

References _duplicate(), ACE_DEBUG, ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, filter_eval_, LM_DEBUG, and OpenDDS::DCPS::FilterEvaluator::number_parameters().

25  CORBA::String_var(related_topic->get_type_name()),
26  dynamic_cast<TopicDescriptionImpl*>(related_topic)->get_type_support(),
27  participant)
28  , filter_expression_(filter_expression)
29  , filter_eval_(filter_expression, false /*allowOrderBy*/)
30  , related_topic_(DDS::Topic::_duplicate(related_topic))
31 {
32  if (DCPS_debug_level > 5) {
33  ACE_DEBUG((LM_DEBUG,
34  ACE_TEXT("(%P|%t) ContentFilteredTopicImpl::ContentFilteredTopicImpl() - ")
35  ACE_TEXT("Creating cft with filter <%C> which requires <%d> parameters\n"),
36  filter_expression, filter_eval_.number_parameters()));
37  }
38 }
#define ACE_DEBUG(X)
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
const char *const name
Definition: debug.cpp:60
ACE_TEXT("TCP_Factory")
TopicDescriptionImpl(const char *topic_name, const char *type_name, TypeSupport_ptr type_support, DomainParticipantImpl *participant)

◆ ~ContentFilteredTopicImpl()

virtual OpenDDS::DCPS::ContentFilteredTopicImpl::~ContentFilteredTopicImpl ( )
inlinevirtual

Definition at line 36 of file ContentFilteredTopicImpl.h.

36 {}

Member Function Documentation

◆ add_reader()

void OpenDDS::DCPS::ContentFilteredTopicImpl::add_reader ( DataReaderImpl reader)

Definition at line 124 of file ContentFilteredTopicImpl.cpp.

References ACE_GUARD, lock_, and readers_.

Referenced by OpenDDS::DCPS::DataReaderImpl::enable_filtering().

125 {
126  // readers_ does not own or reference-count the reader because
127  // the readers reference this CFT and this CFT can't be removed
128  // until all readers are gone (DomainParticipant::delete_contentfilteredtopic)
130  readers_.push_back(reader);
131 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
ACE_Recursive_Thread_Mutex lock_
Concurrent access to expression_parameters_ and readers_.

◆ filter()

template<typename Sample >
bool OpenDDS::DCPS::ContentFilteredTopicImpl::filter ( const Sample s,
bool  sample_only_has_key_fields 
) const
inline

Returns true if the sample matches the filter.

Definition at line 50 of file ContentFilteredTopicImpl.h.

References ACE_GUARD_RETURN, and lock_.

51  {
53  /*
54  * Omit the sample from results if the filter references non-key fields
55  * and the sample only has key fields.
56  */
57  TypeSupportImpl* const ts = dynamic_cast<TypeSupportImpl*>(type_support_.in());
58  if (!ts || (sample_only_has_key_fields && filter_eval_.has_non_key_fields(*ts))) {
59  return false;
60  }
62  }
bool has_non_key_fields(const TypeSupportImpl &ts) const
ACE_Recursive_Thread_Mutex lock_
Concurrent access to expression_parameters_ and readers_.
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
OpenDDS::DCPS::TypeSupport_var type_support_
The type_support for this topic.
bool eval(const T &sample, const DDS::StringSeq &params) const

◆ get_expression_parameters()

DDS::ReturnCode_t OpenDDS::DCPS::ContentFilteredTopicImpl::get_expression_parameters ( DDS::StringSeq parameters)

Definition at line 46 of file ContentFilteredTopicImpl.cpp.

References ACE_GUARD_RETURN, expression_parameters_, lock_, DDS::RETCODE_OK, and DDS::RETCODE_OUT_OF_RESOURCES.

47 {
50  params = expression_parameters_;
51  return DDS::RETCODE_OK;
52 }
const ReturnCode_t RETCODE_OK
ACE_Recursive_Thread_Mutex lock_
Concurrent access to expression_parameters_ and readers_.
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
const ReturnCode_t RETCODE_OUT_OF_RESOURCES

◆ get_filter_class_name()

const char* OpenDDS::DCPS::ContentFilteredTopicImpl::get_filter_class_name ( ) const
inline

Definition at line 67 of file ContentFilteredTopicImpl.h.

68  {
69  return filter_eval_.usesExtendedGrammar () ? "OPENDDSSQL" : "DDSSQL";
70  }

◆ get_filter_expression()

char * OpenDDS::DCPS::ContentFilteredTopicImpl::get_filter_expression ( )

Implements DDS::ContentFilteredTopic.

Definition at line 40 of file ContentFilteredTopicImpl.cpp.

References filter_expression_, and CORBA::string_dup().

41 {
42  return CORBA::string_dup(filter_expression_.c_str());
43 }
char * string_dup(const char *)

◆ get_related_topic()

DDS::Topic_ptr OpenDDS::DCPS::ContentFilteredTopicImpl::get_related_topic ( )

Implements DDS::ContentFilteredTopic.

Definition at line 118 of file ContentFilteredTopicImpl.cpp.

References related_topic_.

Referenced by OpenDDS::DCPS::SubscriberImpl::create_datareader().

119 {
120  return DDS::Topic::_duplicate(related_topic_);
121 }

◆ OPENDDS_VECTOR()

typedef OpenDDS::DCPS::ContentFilteredTopicImpl::OPENDDS_VECTOR ( WeakRcHandle< DataReaderImpl )
private

◆ remove_reader()

void OpenDDS::DCPS::ContentFilteredTopicImpl::remove_reader ( DataReaderImpl reader)

Definition at line 134 of file ContentFilteredTopicImpl.cpp.

References ACE_GUARD, lock_, OPENDDS_END_VERSIONED_NAMESPACE_DECL, readers_, and OpenDDS::DCPS::remove().

135 {
137  Readers::iterator end = readers_.end();
138  readers_.erase(std::remove(readers_.begin(), end, reader), end);
139 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
int remove(Container &c, const ValueType &v)
Definition: Util.h:121
ACE_Recursive_Thread_Mutex lock_
Concurrent access to expression_parameters_ and readers_.

◆ set_expression_parameters()

DDS::ReturnCode_t OpenDDS::DCPS::ContentFilteredTopicImpl::set_expression_parameters ( const DDS::StringSeq parameters)

Definition at line 61 of file ContentFilteredTopicImpl.cpp.

References ACE_ERROR, ACE_GUARD_RETURN, ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, expression_parameters_, filter_eval_, LM_ERROR, lock_, OpenDDS::DCPS::FilterEvaluator::number_parameters(), readers_, DDS::RETCODE_ERROR, DDS::RETCODE_OK, DDS::RETCODE_OUT_OF_RESOURCES, and OpenDDS::DCPS::swap().

62 {
65 
66  const CORBA::ULong len = p.length();
67 
68  // Check sequence of strings that give values to the 'parameters' (i.e., "%n" tokens)
69  // in the filter_expression matches the size of the parameter sequence.
70  // The tokens start with 0 which means that when the maximum number used is 1 we need
71  // two parameters: %0 and %1
72  if (len != filter_eval_.number_parameters()) {
73  if (DCPS_debug_level > 1) {
74  ACE_ERROR((LM_ERROR,
75  ACE_TEXT("(%P|%t) ContentFilteredTopicImpl::set_expression_parameters() - ")
76  ACE_TEXT("passed incorrect set of filter parameters, expected %d received %d\n"),
78  }
79  return DDS::RETCODE_ERROR;
80  }
81 
82  if (len == expression_parameters_.length()) {
83  const char* const* p_buf = p.get_buffer();
84  char* const* e_buf = expression_parameters_.get_buffer();
85 #ifdef _MSC_VER
86 #pragma warning(push)
87 #pragma warning(disable : 4996)
88 #endif
89  if (std::equal(&p_buf[0], &p_buf[len], &e_buf[0], string_equal)) {
90  // no change, bail out now to avoid remote InfoRepo calls
91  return DDS::RETCODE_OK;
92  }
93 #ifdef _MSC_VER
94 #pragma warning(pop)
95 #endif
96  }
97 
99 
100  Readers readers_still_alive;
101 
102  for (Readers::iterator iter = readers_.begin(),
103  end = readers_.end(); iter != end; ++iter) {
104  DataReaderImpl_rch reader = iter->lock();
105  if (reader) {
106  reader->update_subscription_params(p);
107  readers_still_alive.push_back(*iter);
108  }
109  }
110 
111  using namespace std;
112  swap(readers_, readers_still_alive);
113 
114  return DDS::RETCODE_OK;
115 }
#define ACE_ERROR(X)
const ReturnCode_t RETCODE_OK
ACE_Recursive_Thread_Mutex lock_
Concurrent access to expression_parameters_ and readers_.
STL namespace.
ACE_CDR::ULong ULong
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
const ReturnCode_t RETCODE_ERROR
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_TEXT("TCP_Factory")
const ReturnCode_t RETCODE_OUT_OF_RESOURCES
RcHandle< DataReaderImpl > DataReaderImpl_rch
void swap(MessageBlock &lhs, MessageBlock &rhs)

Member Data Documentation

◆ expression_parameters_

DDS::StringSeq OpenDDS::DCPS::ContentFilteredTopicImpl::expression_parameters_
private

◆ filter_eval_

FilterEvaluator OpenDDS::DCPS::ContentFilteredTopicImpl::filter_eval_
private

◆ filter_expression_

OPENDDS_STRING OpenDDS::DCPS::ContentFilteredTopicImpl::filter_expression_
private

Definition at line 73 of file ContentFilteredTopicImpl.h.

Referenced by get_filter_expression().

◆ lock_

ACE_Recursive_Thread_Mutex OpenDDS::DCPS::ContentFilteredTopicImpl::lock_
mutableprivate

Concurrent access to expression_parameters_ and readers_.

Definition at line 81 of file ContentFilteredTopicImpl.h.

Referenced by add_reader(), get_expression_parameters(), remove_reader(), and set_expression_parameters().

◆ readers_

Readers OpenDDS::DCPS::ContentFilteredTopicImpl::readers_
private

Definition at line 78 of file ContentFilteredTopicImpl.h.

Referenced by add_reader(), remove_reader(), and set_expression_parameters().

◆ related_topic_

DDS::Topic_var OpenDDS::DCPS::ContentFilteredTopicImpl::related_topic_
private

Definition at line 76 of file ContentFilteredTopicImpl.h.

Referenced by get_related_topic().


The documentation for this class was generated from the following files: