OpenDDS  Snapshot(2023/04/28-20:55)
BuiltInTopicUtils.h
Go to the documentation of this file.
1 /*
2  * Distributed under the OpenDDS License.
3  * See: http://www.opendds.org/license.html
4  */
5 
6 #ifndef OPENDDS_DCPS_BUILTINTOPICUTILS_H
7 #define OPENDDS_DCPS_BUILTINTOPICUTILS_H
8 
9 #include "dcps_export.h"
10 #include "Service_Participant.h"
11 #include "DomainParticipantImpl.h"
12 #include "debug.h"
13 #include "DCPS_Utils.h"
14 
15 #include <dds/DdsDcpsInfrastructureC.h>
16 #include <dds/DdsDcpsInfoUtilsC.h>
17 #include <dds/DdsDcpsSubscriptionC.h>
18 #include <dds/DdsDcpsCoreC.h>
19 
21 
22 namespace OpenDDS {
23 namespace DCPS {
24 
25 OpenDDS_Dcps_Export extern const char* const BUILT_IN_PARTICIPANT_TOPIC;
27 
28 OpenDDS_Dcps_Export extern const char* const BUILT_IN_TOPIC_TOPIC;
29 OpenDDS_Dcps_Export extern const char* const BUILT_IN_TOPIC_TOPIC_TYPE;
30 
31 OpenDDS_Dcps_Export extern const char* const BUILT_IN_SUBSCRIPTION_TOPIC;
33 
34 OpenDDS_Dcps_Export extern const char* const BUILT_IN_PUBLICATION_TOPIC;
36 
37 // TODO: When the ParticipantLocationTopic is retired, then it may be
38 // possible to disable the secure participant writer in the RtpsRelay.
39 // If it is disabled, then the is_ps_writer_ flag in the RTPS
40 // transport can be removed.
43 
46 
49 
50 const size_t NUMBER_OF_BUILT_IN_TOPICS = 7;
51 
52 /**
53  * Returns true if the topic name and type pair matches one of the built-in
54  * topic name and type pairs.
55  */
56 inline bool
57 topicIsBIT(const char* name, const char* type)
58 {
59  return (
60  !ACE_OS::strcmp(name, BUILT_IN_PARTICIPANT_TOPIC) &&
61  !ACE_OS::strcmp(type, BUILT_IN_PARTICIPANT_TOPIC_TYPE)
62  ) || (
63  !ACE_OS::strcmp(name, BUILT_IN_TOPIC_TOPIC) &&
64  !ACE_OS::strcmp(type, BUILT_IN_TOPIC_TOPIC_TYPE)
65  ) || (
66  !ACE_OS::strcmp(name, BUILT_IN_SUBSCRIPTION_TOPIC) &&
67  !ACE_OS::strcmp(type, BUILT_IN_SUBSCRIPTION_TOPIC_TYPE)
68  ) || (
69  !ACE_OS::strcmp(name, BUILT_IN_PUBLICATION_TOPIC) &&
70  !ACE_OS::strcmp(type, BUILT_IN_PUBLICATION_TOPIC_TYPE)
71  ) || (
72  !ACE_OS::strcmp(name, BUILT_IN_PARTICIPANT_LOCATION_TOPIC) &&
73  !ACE_OS::strcmp(type, BUILT_IN_PARTICIPANT_LOCATION_TOPIC_TYPE)
74  ) || (
75  !ACE_OS::strcmp(name, BUILT_IN_CONNECTION_RECORD_TOPIC) &&
76  !ACE_OS::strcmp(type, BUILT_IN_CONNECTION_RECORD_TOPIC_TYPE)
77  ) || (
78  !ACE_OS::strcmp(name, BUILT_IN_INTERNAL_THREAD_TOPIC) &&
79  !ACE_OS::strcmp(type, BUILT_IN_INTERNAL_THREAD_TOPIC_TYPE)
80  );
81 }
82 
84 
86 
87 /**
88  * Functor for ordering BuiltinKey_t.
89  *
90  * Use this like this:
91  * std::map<DDS::BuiltinTopicKey_t, int, OpenDDS::DCPS::BuiltinTopicKeyLess> MapType;
92  */
94 public:
95  bool operator()(
96  const DDS::BuiltinTopicKey_t& lhs,
97  const DDS::BuiltinTopicKey_t& rhs) const;
98 };
99 
100 template<typename TopicType>
101 DDS::BuiltinTopicKey_t keyFromSample(TopicType* sample);
102 
103 #if !defined (DDS_HAS_MINIMUM_BIT)
104 
105 template<class BIT_Reader_var, class BIT_DataSeq>
108  const char* bit_name,
109  const DDS::InstanceHandle_t& handle,
110  BIT_DataSeq& data)
111 {
112  DDS::Subscriber_var bit_subscriber = dp->get_builtin_subscriber();
113  if (!bit_subscriber) {
114  if (log_level >= LogLevel::Notice) {
115  ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: instance_handle_to_bit_data: "
116  "Could not get BIT subscriber, might be in middle of shutdown\n"));
117  }
119  }
120 
121  DDS::DataReader_var reader = bit_subscriber->lookup_datareader(bit_name);
122  typedef typename BIT_Reader_var::_obj_type BIT_Reader;
123  BIT_Reader_var bit_reader = BIT_Reader::_narrow(reader.in());
124  if (!bit_reader) {
125  if (log_level >= LogLevel::Notice) {
126  ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: instance_handle_to_bit_data: "
127  "Could not get BIT reader \"%C\", might be in middle of shutdown\n", bit_name));
128  }
130  }
131 
133  TimeDuration::from_msec(TheServiceParticipant->bit_lookup_duration_msec()));
134 
135  // Look for the data from builtin topic datareader until we get results or
136  // timeout.
137  // This is to resolve the problem of lookup return nothing. This could happen
138  // when the add_association is called before the builtin topic datareader got
139  // the published data.
140  while (true) {
141  DDS::SampleInfoSeq the_info;
142  BIT_DataSeq the_data;
143  const DDS::ReturnCode_t ret =
144  bit_reader->read_instance(the_data,
145  the_info,
147  handle,
151 
152  if (ret == DDS::RETCODE_OK) {
153  data.length(1);
154  data[0] = the_data[0];
155  return ret;
156  }
157 
158  if (ret != DDS::RETCODE_BAD_PARAMETER && ret != DDS::RETCODE_NO_DATA) {
159  if (log_level >= LogLevel::Notice) {
160  ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: instance_handle_to_bit_data: "
161  "read_instance %C 0x%x error: %C\n",
162  bit_name, handle, retcode_to_string(ret)));
163  }
164  return ret;
165  }
166 
168  if (now < due) {
169  if (DCPS_debug_level >= 10) {
170  ACE_DEBUG((LM_DEBUG, "(%P|%t) instance_handle_to_bit_data: "
171  "BIT reader read_instance returned \"%C\" - trying again...\n",
172  retcode_to_string(ret)));
173  }
174  ACE_OS::sleep(std::min(due - now, TimeDuration(0, 100000)).value());
175 
176  } else {
177  if (log_level >= LogLevel::Notice) {
178  ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: instance_handle_to_bit_data: "
179  "read_instance %C 0x%x timed out\n",
180  bit_name, handle));
181  }
182  return DDS::RETCODE_TIMEOUT;
183  }
184  }
185 }
186 #endif
187 
188 inline
189 bool
191  const DDS::BuiltinTopicKey_t& rhs) const
192 {
193  return std::memcmp(lhs.value, rhs.value, sizeof(lhs.value)) < 0;
194 }
195 
196 #if !defined (DDS_HAS_MINIMUM_BIT)
197 
198 template<>
199 inline
201 keyFromSample<DDS::ParticipantBuiltinTopicData>(
203 {
204  return sample->key;
205 }
206 
207 template<>
208 inline
210 keyFromSample<DDS::TopicBuiltinTopicData>(
212 {
213  return sample->key;
214 }
215 
216 template<>
217 inline
219 keyFromSample<DDS::SubscriptionBuiltinTopicData>(
221 {
222  return sample->key;
223 }
224 
225 template<>
226 inline
228 keyFromSample<DDS::PublicationBuiltinTopicData>(
230 {
231  return sample->key;
232 }
233 
234 #endif
235 
236 template<typename TopicType>
238 {
240  std::memset(key.value, 0, sizeof(key.value));
241  return key;
242 }
243 
245 public:
247  {}
248 
249  explicit BitSubscriber(const DDS::Subscriber_var& bit_subscriber)
250  : bit_subscriber_(bit_subscriber)
251  {}
252 
253  DDS::Subscriber_ptr get() const
254  {
255  ACE_GUARD_RETURN(ACE_Thread_Mutex, g, mutex_, 0);
256  return DDS::Subscriber::_duplicate(bit_subscriber_.in());
257  }
258 
259  void clear()
260  {
261  ACE_GUARD(ACE_Thread_Mutex, g, mutex_);
262  bit_subscriber_ = 0;
263  }
264 
265  DDS::InstanceHandle_t add_participant(const DDS::ParticipantBuiltinTopicData& part,
266  DDS::ViewStateKind view_state);
267  void remove_participant(DDS::InstanceHandle_t part_ih,
268  DDS::InstanceHandle_t loc_ih);
269 
270  DDS::ReturnCode_t get_discovered_participant_data(DDS::ParticipantBuiltinTopicData& participant_data,
271  DDS::InstanceHandle_t participant_handle);
272 
273  DDS::ReturnCode_t get_discovered_topic_data(DDS::TopicBuiltinTopicData& topic_data,
274  DDS::InstanceHandle_t topic_handle);
275 
276  DDS::InstanceHandle_t add_publication(const DDS::PublicationBuiltinTopicData& pub,
277  DDS::ViewStateKind view_state);
278  void remove_publication(DDS::InstanceHandle_t pub_ih);
279 
280  DDS::InstanceHandle_t add_subscription(const DDS::SubscriptionBuiltinTopicData& sub,
281  DDS::ViewStateKind view_state);
282  void remove_subscription(DDS::InstanceHandle_t sub_ih);
283 
284  DDS::InstanceHandle_t add_participant_location(const ParticipantLocationBuiltinTopicData& loc,
285  DDS::ViewStateKind view_state);
286 
287  DDS::InstanceHandle_t add_connection_record(const ConnectionRecord& cr,
288  DDS::ViewStateKind view_state);
289  void remove_connection_record(const ConnectionRecord& cr);
290 
291  DDS::InstanceHandle_t add_thread_status(const InternalThreadBuiltinTopicData& ts,
292  DDS::ViewStateKind view_state,
293  const SystemTimePoint& timestamp);
294  void remove_thread_status(const InternalThreadBuiltinTopicData& ts);
295 
296  /*
297  The Ownership QoS is implemented by creating a listener for the
298  Publication BIT that reads the ownership strength and makes
299  adjustments. This is bad (a hack) because it prevents the user
300  from installing a listener for the built-in topics.
301  */
302  void bit_pub_listener_hack(DomainParticipantImpl* participant);
303 
304 private:
305  template <typename DataReaderImpl, typename Sample>
306  DDS::InstanceHandle_t add_i(const char* topic_name,
307  const Sample& sample,
308  DDS::ViewStateKind view_state);
309 
310  void remove_i(const char* topic_name,
312 
313  DDS::Subscriber_var bit_subscriber_;
315 };
316 
317 } // namespace DCPS
318 } // namespace OpenDDS
319 
321 
322 #endif /* OPENDDS_DCPS_BUILTINTOPICUTILS_H */
const char *const BUILT_IN_SUBSCRIPTION_TOPIC_TYPE
BitSubscriber(const DDS::Subscriber_var &bit_subscriber)
Implements the OpenDDS::DCPS::DomainParticipant interfaces.
#define ACE_DEBUG(X)
#define ACE_ERROR(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
const LogLevel::Value value
Definition: debug.cpp:61
const char *const BUILT_IN_CONNECTION_RECORD_TOPIC_TYPE
const char *const BUILT_IN_CONNECTION_RECORD_TOPIC
#define OpenDDS_Dcps_Export
Definition: dcps_export.h:24
sequence< octet > key
sequence< SampleInfo > SampleInfoSeq
const SampleStateMask ANY_SAMPLE_STATE
int sleep(u_int seconds)
DDS::Subscriber_var bit_subscriber_
const DDS::BuiltinTopicKey_t BUILTIN_TOPIC_KEY_UNKNOWN
bool topicIsBIT(const char *name, const char *type)
DDS::ReturnCode_t instance_handle_to_bit_data(DomainParticipantImpl *dp, const char *bit_name, const DDS::InstanceHandle_t &handle, BIT_DataSeq &data)
static TimePoint_T< MonotonicClock > now()
Definition: TimePoint_T.inl:41
const char *const BUILT_IN_PUBLICATION_TOPIC
LM_DEBUG
static TimeDuration from_msec(const ACE_UINT64 &ms)
const char *const BUILT_IN_PARTICIPANT_LOCATION_TOPIC_TYPE
const ReturnCode_t RETCODE_TIMEOUT
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
virtual DDS::Subscriber_ptr get_builtin_subscriber()
const InstanceStateMask ANY_INSTANCE_STATE
LM_NOTICE
const ViewStateMask ANY_VIEW_STATE
const char *const BUILT_IN_PARTICIPANT_TOPIC
const char *const BUILT_IN_PARTICIPANT_LOCATION_TOPIC
int strcmp(const char *s, const char *t)
const char *const name
Definition: debug.cpp:60
HANDLE_TYPE_NATIVE InstanceHandle_t
Definition: DdsDcpsCore.idl:51
const char *const BUILT_IN_PARTICIPANT_TOPIC_TYPE
const char *const BUILT_IN_SUBSCRIPTION_TOPIC
DDS::BuiltinTopicKey_t keyFromSample(TopicType *sample)
const ReturnCode_t RETCODE_NO_DATA
const char *const BUILT_IN_INTERNAL_THREAD_TOPIC
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
OpenDDS_Dcps_Export LogLevel log_level
const char *const BUILT_IN_TOPIC_TOPIC
const char * retcode_to_string(DDS::ReturnCode_t value)
Definition: DCPS_Utils.cpp:29
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
bool operator()(const DDS::BuiltinTopicKey_t &lhs, const DDS::BuiltinTopicKey_t &rhs) const
const size_t NUMBER_OF_BUILT_IN_TOPICS
const ReturnCode_t RETCODE_OK
const long LENGTH_UNLIMITED
unsigned long ViewStateKind
const char *const BUILT_IN_INTERNAL_THREAD_TOPIC_TYPE
#define TheServiceParticipant
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
const char *const BUILT_IN_TOPIC_TOPIC_TYPE
const char *const BUILT_IN_PUBLICATION_TOPIC_TYPE
ACE_TCHAR * timestamp(const ACE_Time_Value &time_value, ACE_TCHAR date_and_time[], size_t time_len, bool return_pointer_to_first_digit=false)
const ReturnCode_t RETCODE_BAD_PARAMETER