Line data Source code
1 : /*
2 : * Distributed under the OpenDDS License.
3 : * See: http://www.opendds.org/license.html
4 : */
5 :
6 : #ifndef OPENDDS_DCPS_BUILTINTOPICUTILS_H
7 : #define OPENDDS_DCPS_BUILTINTOPICUTILS_H
8 :
9 : #include "dcps_export.h"
10 : #include "Service_Participant.h"
11 : #include "DomainParticipantImpl.h"
12 : #include "debug.h"
13 : #include "DCPS_Utils.h"
14 :
15 : #include <dds/DdsDcpsInfrastructureC.h>
16 : #include <dds/DdsDcpsInfoUtilsC.h>
17 : #include <dds/DdsDcpsSubscriptionC.h>
18 : #include <dds/DdsDcpsCoreC.h>
19 :
20 : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
21 :
22 : namespace OpenDDS {
23 : namespace DCPS {
24 :
25 : OpenDDS_Dcps_Export extern const char* const BUILT_IN_PARTICIPANT_TOPIC;
26 : OpenDDS_Dcps_Export extern const char* const BUILT_IN_PARTICIPANT_TOPIC_TYPE;
27 :
28 : OpenDDS_Dcps_Export extern const char* const BUILT_IN_TOPIC_TOPIC;
29 : OpenDDS_Dcps_Export extern const char* const BUILT_IN_TOPIC_TOPIC_TYPE;
30 :
31 : OpenDDS_Dcps_Export extern const char* const BUILT_IN_SUBSCRIPTION_TOPIC;
32 : OpenDDS_Dcps_Export extern const char* const BUILT_IN_SUBSCRIPTION_TOPIC_TYPE;
33 :
34 : OpenDDS_Dcps_Export extern const char* const BUILT_IN_PUBLICATION_TOPIC;
35 : OpenDDS_Dcps_Export extern const char* const BUILT_IN_PUBLICATION_TOPIC_TYPE;
36 :
37 : // TODO: When the ParticipantLocationTopic is retired, then it may be
38 : // possible to disable the secure participant writer in the RtpsRelay.
39 : // If it is disabled, then the is_ps_writer_ flag in the RTPS
40 : // transport can be removed.
41 : OpenDDS_Dcps_Export extern const char* const BUILT_IN_PARTICIPANT_LOCATION_TOPIC;
42 : OpenDDS_Dcps_Export extern const char* const BUILT_IN_PARTICIPANT_LOCATION_TOPIC_TYPE;
43 :
44 : OpenDDS_Dcps_Export extern const char* const BUILT_IN_CONNECTION_RECORD_TOPIC;
45 : OpenDDS_Dcps_Export extern const char* const BUILT_IN_CONNECTION_RECORD_TOPIC_TYPE;
46 :
47 : OpenDDS_Dcps_Export extern const char* const BUILT_IN_INTERNAL_THREAD_TOPIC;
48 : OpenDDS_Dcps_Export extern const char* const BUILT_IN_INTERNAL_THREAD_TOPIC_TYPE;
49 :
50 : const size_t NUMBER_OF_BUILT_IN_TOPICS = 7;
51 :
52 : /**
53 : * Returns true if the topic name and type pair matches one of the built-in
54 : * topic name and type pairs.
55 : */
56 : inline bool
57 0 : topicIsBIT(const char* name, const char* type)
58 : {
59 : return (
60 0 : !ACE_OS::strcmp(name, BUILT_IN_PARTICIPANT_TOPIC) &&
61 0 : !ACE_OS::strcmp(type, BUILT_IN_PARTICIPANT_TOPIC_TYPE)
62 0 : ) || (
63 0 : !ACE_OS::strcmp(name, BUILT_IN_TOPIC_TOPIC) &&
64 0 : !ACE_OS::strcmp(type, BUILT_IN_TOPIC_TOPIC_TYPE)
65 0 : ) || (
66 0 : !ACE_OS::strcmp(name, BUILT_IN_SUBSCRIPTION_TOPIC) &&
67 0 : !ACE_OS::strcmp(type, BUILT_IN_SUBSCRIPTION_TOPIC_TYPE)
68 0 : ) || (
69 0 : !ACE_OS::strcmp(name, BUILT_IN_PUBLICATION_TOPIC) &&
70 0 : !ACE_OS::strcmp(type, BUILT_IN_PUBLICATION_TOPIC_TYPE)
71 0 : ) || (
72 0 : !ACE_OS::strcmp(name, BUILT_IN_PARTICIPANT_LOCATION_TOPIC) &&
73 0 : !ACE_OS::strcmp(type, BUILT_IN_PARTICIPANT_LOCATION_TOPIC_TYPE)
74 0 : ) || (
75 0 : !ACE_OS::strcmp(name, BUILT_IN_CONNECTION_RECORD_TOPIC) &&
76 0 : !ACE_OS::strcmp(type, BUILT_IN_CONNECTION_RECORD_TOPIC_TYPE)
77 0 : ) || (
78 0 : !ACE_OS::strcmp(name, BUILT_IN_INTERNAL_THREAD_TOPIC) &&
79 0 : !ACE_OS::strcmp(type, BUILT_IN_INTERNAL_THREAD_TOPIC_TYPE)
80 0 : );
81 : }
82 :
83 : class DomainParticipantImpl;
84 :
85 : const DDS::BuiltinTopicKey_t BUILTIN_TOPIC_KEY_UNKNOWN = { { 0 } };
86 :
87 : /**
88 : * Functor for ordering BuiltinKey_t.
89 : *
90 : * Use this like this:
91 : * std::map<DDS::BuiltinTopicKey_t, int, OpenDDS::DCPS::BuiltinTopicKeyLess> MapType;
92 : */
93 : class BuiltinTopicKeyLess {
94 : public:
95 : bool operator()(
96 : const DDS::BuiltinTopicKey_t& lhs,
97 : const DDS::BuiltinTopicKey_t& rhs) const;
98 : };
99 :
100 : template<typename TopicType>
101 : DDS::BuiltinTopicKey_t keyFromSample(TopicType* sample);
102 :
103 : #if !defined (DDS_HAS_MINIMUM_BIT)
104 :
105 : template<class BIT_Reader_var, class BIT_DataSeq>
106 0 : DDS::ReturnCode_t instance_handle_to_bit_data(
107 : DomainParticipantImpl* dp,
108 : const char* bit_name,
109 : const DDS::InstanceHandle_t& handle,
110 : BIT_DataSeq& data)
111 : {
112 0 : DDS::Subscriber_var bit_subscriber = dp->get_builtin_subscriber();
113 0 : if (!bit_subscriber) {
114 0 : if (log_level >= LogLevel::Notice) {
115 0 : ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: instance_handle_to_bit_data: "
116 : "Could not get BIT subscriber, might be in middle of shutdown\n"));
117 : }
118 0 : return DDS::RETCODE_BAD_PARAMETER;
119 : }
120 :
121 0 : DDS::DataReader_var reader = bit_subscriber->lookup_datareader(bit_name);
122 : typedef typename BIT_Reader_var::_obj_type BIT_Reader;
123 0 : BIT_Reader_var bit_reader = BIT_Reader::_narrow(reader.in());
124 0 : if (!bit_reader) {
125 0 : if (log_level >= LogLevel::Notice) {
126 0 : ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: instance_handle_to_bit_data: "
127 : "Could not get BIT reader \"%C\", might be in middle of shutdown\n", bit_name));
128 : }
129 0 : return DDS::RETCODE_BAD_PARAMETER;
130 : }
131 :
132 0 : const MonotonicTimePoint due(MonotonicTimePoint::now() +
133 0 : TimeDuration::from_msec(TheServiceParticipant->bit_lookup_duration_msec()));
134 :
135 : // Look for the data from builtin topic datareader until we get results or
136 : // timeout.
137 : // This is to resolve the problem of lookup return nothing. This could happen
138 : // when the add_association is called before the builtin topic datareader got
139 : // the published data.
140 0 : while (true) {
141 0 : DDS::SampleInfoSeq the_info;
142 0 : BIT_DataSeq the_data;
143 : const DDS::ReturnCode_t ret =
144 0 : bit_reader->read_instance(the_data,
145 : the_info,
146 : DDS::LENGTH_UNLIMITED,
147 : handle,
148 : DDS::ANY_SAMPLE_STATE,
149 : DDS::ANY_VIEW_STATE,
150 : DDS::ANY_INSTANCE_STATE);
151 :
152 0 : if (ret == DDS::RETCODE_OK) {
153 0 : data.length(1);
154 0 : data[0] = the_data[0];
155 0 : return ret;
156 : }
157 :
158 0 : if (ret != DDS::RETCODE_BAD_PARAMETER && ret != DDS::RETCODE_NO_DATA) {
159 0 : if (log_level >= LogLevel::Notice) {
160 0 : ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: instance_handle_to_bit_data: "
161 : "read_instance %C 0x%x error: %C\n",
162 : bit_name, handle, retcode_to_string(ret)));
163 : }
164 0 : return ret;
165 : }
166 :
167 0 : const MonotonicTimePoint now = MonotonicTimePoint::now();
168 0 : if (now < due) {
169 0 : if (DCPS_debug_level >= 10) {
170 0 : ACE_DEBUG((LM_DEBUG, "(%P|%t) instance_handle_to_bit_data: "
171 : "BIT reader read_instance returned \"%C\" - trying again...\n",
172 : retcode_to_string(ret)));
173 : }
174 0 : ACE_OS::sleep(std::min(due - now, TimeDuration(0, 100000)).value());
175 :
176 : } else {
177 0 : if (log_level >= LogLevel::Notice) {
178 0 : ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: instance_handle_to_bit_data: "
179 : "read_instance %C 0x%x timed out\n",
180 : bit_name, handle));
181 : }
182 0 : return DDS::RETCODE_TIMEOUT;
183 : }
184 : }
185 0 : }
186 : #endif
187 :
188 : inline
189 : bool
190 : BuiltinTopicKeyLess::operator()(const DDS::BuiltinTopicKey_t& lhs,
191 : const DDS::BuiltinTopicKey_t& rhs) const
192 : {
193 : return std::memcmp(lhs.value, rhs.value, sizeof(lhs.value)) < 0;
194 : }
195 :
196 : #if !defined (DDS_HAS_MINIMUM_BIT)
197 :
198 : template<>
199 : inline
200 : DDS::BuiltinTopicKey_t
201 0 : keyFromSample<DDS::ParticipantBuiltinTopicData>(
202 : DDS::ParticipantBuiltinTopicData* sample)
203 : {
204 0 : return sample->key;
205 : }
206 :
207 : template<>
208 : inline
209 : DDS::BuiltinTopicKey_t
210 0 : keyFromSample<DDS::TopicBuiltinTopicData>(
211 : DDS::TopicBuiltinTopicData* sample)
212 : {
213 0 : return sample->key;
214 : }
215 :
216 : template<>
217 : inline
218 : DDS::BuiltinTopicKey_t
219 0 : keyFromSample<DDS::SubscriptionBuiltinTopicData>(
220 : DDS::SubscriptionBuiltinTopicData* sample)
221 : {
222 0 : return sample->key;
223 : }
224 :
225 : template<>
226 : inline
227 : DDS::BuiltinTopicKey_t
228 0 : keyFromSample<DDS::PublicationBuiltinTopicData>(
229 : DDS::PublicationBuiltinTopicData* sample)
230 : {
231 0 : return sample->key;
232 : }
233 :
234 : #endif
235 :
236 : template<typename TopicType>
237 0 : DDS::BuiltinTopicKey_t keyFromSample(TopicType*)
238 : {
239 : DDS::BuiltinTopicKey_t key;
240 0 : std::memset(key.value, 0, sizeof(key.value));
241 0 : return key;
242 : }
243 :
244 : class OpenDDS_Dcps_Export BitSubscriber : public RcObject {
245 : public:
246 : BitSubscriber()
247 : {}
248 :
249 0 : explicit BitSubscriber(const DDS::Subscriber_var& bit_subscriber)
250 0 : : bit_subscriber_(bit_subscriber)
251 0 : {}
252 :
253 0 : DDS::Subscriber_ptr get() const
254 : {
255 0 : ACE_GUARD_RETURN(ACE_Thread_Mutex, g, mutex_, 0);
256 0 : return DDS::Subscriber::_duplicate(bit_subscriber_.in());
257 0 : }
258 :
259 : void clear()
260 : {
261 : ACE_GUARD(ACE_Thread_Mutex, g, mutex_);
262 : bit_subscriber_ = 0;
263 : }
264 :
265 : DDS::InstanceHandle_t add_participant(const DDS::ParticipantBuiltinTopicData& part,
266 : DDS::ViewStateKind view_state);
267 : void remove_participant(DDS::InstanceHandle_t part_ih,
268 : DDS::InstanceHandle_t loc_ih);
269 :
270 : DDS::ReturnCode_t get_discovered_participant_data(DDS::ParticipantBuiltinTopicData& participant_data,
271 : DDS::InstanceHandle_t participant_handle);
272 :
273 : DDS::ReturnCode_t get_discovered_topic_data(DDS::TopicBuiltinTopicData& topic_data,
274 : DDS::InstanceHandle_t topic_handle);
275 :
276 : DDS::InstanceHandle_t add_publication(const DDS::PublicationBuiltinTopicData& pub,
277 : DDS::ViewStateKind view_state);
278 : void remove_publication(DDS::InstanceHandle_t pub_ih);
279 :
280 : DDS::InstanceHandle_t add_subscription(const DDS::SubscriptionBuiltinTopicData& sub,
281 : DDS::ViewStateKind view_state);
282 : void remove_subscription(DDS::InstanceHandle_t sub_ih);
283 :
284 : DDS::InstanceHandle_t add_participant_location(const ParticipantLocationBuiltinTopicData& loc,
285 : DDS::ViewStateKind view_state);
286 :
287 : DDS::InstanceHandle_t add_connection_record(const ConnectionRecord& cr,
288 : DDS::ViewStateKind view_state);
289 : void remove_connection_record(const ConnectionRecord& cr);
290 :
291 : DDS::InstanceHandle_t add_thread_status(const InternalThreadBuiltinTopicData& ts,
292 : DDS::ViewStateKind view_state,
293 : const SystemTimePoint& timestamp);
294 : void remove_thread_status(const InternalThreadBuiltinTopicData& ts);
295 :
296 : /*
297 : The Ownership QoS is implemented by creating a listener for the
298 : Publication BIT that reads the ownership strength and makes
299 : adjustments. This is bad (a hack) because it prevents the user
300 : from installing a listener for the built-in topics.
301 : */
302 : void bit_pub_listener_hack(DomainParticipantImpl* participant);
303 :
304 : private:
305 : template <typename DataReaderImpl, typename Sample>
306 : DDS::InstanceHandle_t add_i(const char* topic_name,
307 : const Sample& sample,
308 : DDS::ViewStateKind view_state);
309 :
310 : void remove_i(const char* topic_name,
311 : DDS::InstanceHandle_t ih);
312 :
313 : DDS::Subscriber_var bit_subscriber_;
314 : mutable ACE_Thread_Mutex mutex_;
315 : };
316 :
317 : } // namespace DCPS
318 : } // namespace OpenDDS
319 :
320 : OPENDDS_END_VERSIONED_NAMESPACE_DECL
321 :
322 : #endif /* OPENDDS_DCPS_BUILTINTOPICUTILS_H */
|