Line data Source code
1 : /*
2 : * Distributed under the OpenDDS License.
3 : * See: http://www.opendds.org/license.html
4 : */
5 :
6 : #ifndef OPENDDS_DCPS_DOMAIN_PARTICIPANT_IMPL_H
7 : #define OPENDDS_DCPS_DOMAIN_PARTICIPANT_IMPL_H
8 :
9 : #include "EntityImpl.h"
10 : #include "Definitions.h"
11 : #include "DisjointSequence.h"
12 : #include "TopicImpl.h"
13 : #include "InstanceHandle.h"
14 : #include "OwnershipManager.h"
15 : #include "GuidBuilder.h"
16 : #include "PoolAllocator.h"
17 : #include "Recorder.h"
18 : #include "Replayer.h"
19 : #include "ConditionVariable.h"
20 : #include "TimeTypes.h"
21 : #include "GuidUtils.h"
22 : #include "SporadicTask.h"
23 : #include "XTypes/TypeLookupService.h"
24 : #include "transport/framework/TransportImpl_rch.h"
25 : #include "security/framework/SecurityConfig_rch.h"
26 :
27 : #include <dds/DdsDcpsPublicationC.h>
28 : #include <dds/DdsDcpsSubscriptionExtC.h>
29 : #include <dds/DdsDcpsTopicC.h>
30 : #include <dds/DdsDcpsDomainC.h>
31 : #include <dds/DdsDcpsInfoUtilsC.h>
32 : #include <dds/DdsDcpsInfrastructureC.h>
33 : #include <dds/DdsDynamicDataC.h>
34 : #ifndef DDS_HAS_MINIMUM_BIT
35 : # include <dds/DdsDcpsCoreTypeSupportC.h>
36 : #endif
37 :
38 : #include <ace/Null_Mutex.h>
39 : #include <ace/Thread_Mutex.h>
40 : #include <ace/Recursive_Thread_Mutex.h>
41 :
42 : #ifndef ACE_LACKS_PRAGMA_ONCE
43 : # pragma once
44 : #endif
45 :
46 : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
47 :
48 : namespace OpenDDS {
49 : namespace DCPS {
50 :
51 : class PublisherImpl;
52 : class SubscriberImpl;
53 : class DataWriterImpl;
54 : class DomainParticipantFactoryImpl;
55 : class Monitor;
56 : class BitSubscriber;
57 :
58 : class RecorderImpl;
59 : class ReplayerImpl;
60 :
61 : #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
62 : class FilterEvaluator;
63 : #endif
64 :
65 : /**
66 : * @class DomainParticipantImpl
67 : *
68 : * @brief Implements the OpenDDS::DCPS::DomainParticipant interfaces.
69 : *
70 : * This class acts as an entrypoint of the service and a factory
71 : * for publisher, subscriber and topic. It also acts as a container
72 : * for the publisher, subscriber and topic objects.
73 : *
74 : * See the DDS specification, OMG formal/2015-04-10, for a description of
75 : * the interface this class is implementing.
76 : */
77 : class OpenDDS_Dcps_Export DomainParticipantImpl
78 : : public virtual OpenDDS::DCPS::LocalObject<DDS::DomainParticipant>
79 : , public virtual OpenDDS::DCPS::EntityImpl
80 : , public virtual ACE_Event_Handler
81 : {
82 : public:
83 : typedef Objref_Servant_Pair <SubscriberImpl, DDS::Subscriber,
84 : DDS::Subscriber_ptr, DDS::Subscriber_var> Subscriber_Pair;
85 :
86 : typedef Objref_Servant_Pair <PublisherImpl, DDS::Publisher,
87 : DDS::Publisher_ptr, DDS::Publisher_var> Publisher_Pair;
88 :
89 : typedef Objref_Servant_Pair <TopicImpl, DDS::Topic,
90 : DDS::Topic_ptr, DDS::Topic_var> Topic_Pair;
91 :
92 : typedef OPENDDS_SET(Subscriber_Pair) SubscriberSet;
93 : typedef OPENDDS_SET(Publisher_Pair) PublisherSet;
94 :
95 : class OpenDDS_Dcps_Export RepoIdSequence {
96 : public:
97 : explicit RepoIdSequence(const GUID_t& base);
98 : GUID_t next();
99 : private:
100 : GUID_t base_; // will be combined with serial to produce next
101 : long serial_; // will be incremented each time
102 : GuidBuilder builder_; // used to modify base
103 : };
104 :
105 : struct RefCounted_Topic {
106 : RefCounted_Topic()
107 : : client_refs_(0)
108 : {
109 : }
110 :
111 0 : explicit RefCounted_Topic(const Topic_Pair& pair)
112 0 : : pair_(pair),
113 0 : client_refs_(1)
114 : {
115 0 : }
116 :
117 : /// The topic object reference.
118 : Topic_Pair pair_;
119 : /// The reference count on the obj_.
120 : CORBA::ULong client_refs_;
121 : };
122 :
123 : typedef OPENDDS_MULTIMAP(OPENDDS_STRING, RefCounted_Topic) TopicMap;
124 : typedef TopicMap::iterator TopicMapIterator;
125 : typedef std::pair<TopicMapIterator, TopicMapIterator> TopicMapIteratorPair;
126 :
127 : typedef OPENDDS_MAP(OPENDDS_STRING, DDS::TopicDescription_var) TopicDescriptionMap;
128 :
129 : DomainParticipantImpl(InstanceHandleGenerator& handle_generator,
130 : const DDS::DomainId_t& domain_id,
131 : const DDS::DomainParticipantQos & qos,
132 : DDS::DomainParticipantListener_ptr a_listener,
133 : const DDS::StatusMask & mask);
134 :
135 : virtual ~DomainParticipantImpl();
136 :
137 : virtual DDS::InstanceHandle_t get_instance_handle();
138 :
139 : virtual DDS::Publisher_ptr create_publisher(
140 : const DDS::PublisherQos & qos,
141 : DDS::PublisherListener_ptr a_listener,
142 : DDS::StatusMask mask);
143 :
144 : virtual DDS::ReturnCode_t delete_publisher(
145 : DDS::Publisher_ptr p);
146 :
147 : virtual DDS::Subscriber_ptr create_subscriber(
148 : const DDS::SubscriberQos & qos,
149 : DDS::SubscriberListener_ptr a_listener,
150 : DDS::StatusMask mask);
151 :
152 : virtual DDS::ReturnCode_t delete_subscriber(
153 : DDS::Subscriber_ptr s);
154 :
155 : virtual DDS::Subscriber_ptr get_builtin_subscriber();
156 :
157 : RcHandle<DCPS::BitSubscriber> get_builtin_subscriber_proxy();
158 :
159 : virtual DDS::Topic_ptr create_topic(
160 : const char * topic_name,
161 : const char * type_name,
162 : const DDS::TopicQos & qos,
163 : DDS::TopicListener_ptr a_listener,
164 : DDS::StatusMask mask);
165 :
166 : virtual DDS::ReturnCode_t delete_topic(
167 : DDS::Topic_ptr a_topic);
168 :
169 : virtual DDS::Topic_ptr find_topic(
170 : const char * topic_name,
171 : const DDS::Duration_t & timeout);
172 :
173 : virtual DDS::TopicDescription_ptr lookup_topicdescription(
174 : const char * name);
175 :
176 : #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
177 :
178 : virtual DDS::ContentFilteredTopic_ptr create_contentfilteredtopic(
179 : const char * name,
180 : DDS::Topic_ptr related_topic,
181 : const char * filter_expression,
182 : const DDS::StringSeq & expression_parameters);
183 :
184 : virtual DDS::ReturnCode_t delete_contentfilteredtopic(
185 : DDS::ContentFilteredTopic_ptr a_contentfilteredtopic);
186 :
187 : #endif
188 :
189 : #ifndef OPENDDS_NO_MULTI_TOPIC
190 :
191 : virtual DDS::MultiTopic_ptr create_multitopic(
192 : const char * name,
193 : const char * type_name,
194 : const char * subscription_expression,
195 : const DDS::StringSeq & expression_parameters);
196 :
197 : virtual DDS::ReturnCode_t delete_multitopic(DDS::MultiTopic_ptr a_multitopic);
198 :
199 : #endif
200 :
201 : #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
202 :
203 : RcHandle<FilterEvaluator> get_filter_eval(const char* filter);
204 : void deref_filter_eval(const char* filter);
205 :
206 : #endif
207 :
208 : virtual DDS::ReturnCode_t delete_contained_entities();
209 :
210 : virtual CORBA::Boolean contains_entity(DDS::InstanceHandle_t a_handle);
211 :
212 : virtual DDS::ReturnCode_t set_qos(
213 : const DDS::DomainParticipantQos & qos);
214 :
215 : virtual DDS::ReturnCode_t get_qos(
216 : DDS::DomainParticipantQos & qos);
217 :
218 : virtual DDS::ReturnCode_t set_listener(
219 : DDS::DomainParticipantListener_ptr a_listener,
220 : DDS::StatusMask mask);
221 :
222 : virtual DDS::DomainParticipantListener_ptr get_listener();
223 :
224 : virtual DDS::ReturnCode_t ignore_participant(
225 : DDS::InstanceHandle_t handle);
226 :
227 : virtual DDS::ReturnCode_t ignore_topic(
228 : DDS::InstanceHandle_t handle);
229 :
230 : virtual DDS::ReturnCode_t ignore_publication(
231 : DDS::InstanceHandle_t handle);
232 :
233 : virtual DDS::ReturnCode_t ignore_subscription(
234 : DDS::InstanceHandle_t handle);
235 :
236 : virtual DDS::DomainId_t get_domain_id();
237 :
238 : virtual DDS::ReturnCode_t assert_liveliness();
239 :
240 : virtual DDS::ReturnCode_t set_default_publisher_qos(
241 : const DDS::PublisherQos & qos);
242 :
243 : virtual DDS::ReturnCode_t get_default_publisher_qos(
244 : DDS::PublisherQos & qos);
245 :
246 : virtual DDS::ReturnCode_t set_default_subscriber_qos(
247 : const DDS::SubscriberQos & qos);
248 :
249 : virtual DDS::ReturnCode_t get_default_subscriber_qos(
250 : DDS::SubscriberQos & qos);
251 :
252 : virtual DDS::ReturnCode_t set_default_topic_qos(
253 : const DDS::TopicQos & qos);
254 :
255 : virtual DDS::ReturnCode_t get_default_topic_qos(
256 : DDS::TopicQos & qos);
257 :
258 : /**
259 : * Set Argument to Current System Time
260 : */
261 : virtual DDS::ReturnCode_t get_current_time(DDS::Time_t& current_time);
262 :
263 : #if !defined (DDS_HAS_MINIMUM_BIT)
264 :
265 : virtual DDS::ReturnCode_t get_discovered_participants(
266 : DDS::InstanceHandleSeq & participant_handles);
267 :
268 : virtual DDS::ReturnCode_t get_discovered_participant_data(
269 : DDS::ParticipantBuiltinTopicData & participant_data,
270 : DDS::InstanceHandle_t participant_handle);
271 :
272 : virtual DDS::ReturnCode_t get_discovered_topics(
273 : DDS::InstanceHandleSeq & topic_handles);
274 :
275 : virtual DDS::ReturnCode_t get_discovered_topic_data(
276 : DDS::TopicBuiltinTopicData & topic_data,
277 : DDS::InstanceHandle_t topic_handle);
278 :
279 : #endif
280 :
281 : virtual DDS::ReturnCode_t enable();
282 :
283 : /// Following methods are not the idl interfaces and are
284 : /// local operations.
285 :
286 : /**
287 : * Return the id given by discovery.
288 : */
289 : GUID_t get_id() const;
290 :
291 : /**
292 : * Return a unique string based on repo ID.
293 : */
294 : OPENDDS_STRING get_unique_id();
295 :
296 : /**
297 : * Assign an instance handle, optionally representing a GUID.
298 : * If a GUID is provided (not GUID_UNKNOWN), other calls to assign_handle
299 : * for this GUID will return the same handle, as will subsequent calls to
300 : * lookup_handle.
301 : *
302 : * If this method returns a valid (non-HANDLE_NIL) handle, it must be
303 : * returned by calling return_handle.
304 : */
305 : DDS::InstanceHandle_t assign_handle(const GUID_t& id = GUID_UNKNOWN);
306 :
307 : /**
308 : * Get a handle that was previously mapped to a GUID or HANDLE_NIL if none exists.
309 : *
310 : * Handles returned from this method should not be passed to return_handle.
311 : */
312 : DDS::InstanceHandle_t lookup_handle(const GUID_t& id) const;
313 :
314 : /**
315 : * Similar to lookup_handle in that it will return a previously mapped handle,
316 : * but will coorindate with assign_handle when a desired handle has not yet
317 : * been mapped, but is expected to be. The optional max_wait argument can be
318 : * supplied to limit the time spent waiting for a handle. If the wait times out,
319 : * a value of HANDLE_NIL is returned.
320 : */
321 : DDS::InstanceHandle_t await_handle(const GUID_t& id, TimeDuration max_wait = TimeDuration::zero_value) const;
322 :
323 : /**
324 : * Return a previously-assigned handle.
325 : */
326 : void return_handle(DDS::InstanceHandle_t handle);
327 :
328 : /**
329 : * Obtain a GUID representing a local hande.
330 : * @return GUID_UNKNOWN if not found.
331 : */
332 : GUID_t get_repoid(DDS::InstanceHandle_t id) const;
333 :
334 : /**
335 : * Check to see if the Participant has any entities left in it.
336 : * leftover_entities will be set with a description of what is left.
337 : */
338 : bool is_clean(String* leftover_entities = 0) const;
339 :
340 : /**
341 : * This is used to retrieve the listener for a certain status change.
342 : * If this DomainParticipant has a registered listener and the status
343 : * kind is in the listener mask then the listener is returned.
344 : * Otherwise, return nil.
345 : */
346 : DDS::DomainParticipantListener_ptr listener_for(DDS::StatusKind kind);
347 :
348 : typedef OPENDDS_VECTOR(GUID_t) TopicIdVec;
349 : /**
350 : * Populates an std::vector with the GUID_t of the topics this
351 : * participant has created/found.
352 : */
353 : void get_topic_ids(TopicIdVec& topics);
354 :
355 : #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
356 :
357 : /** Accessor for ownership manager.
358 : */
359 : OwnershipManager* ownership_manager();
360 :
361 :
362 : /**
363 : * Called upon receiving new BIT publication data to
364 : * update the ownership strength of a publication.
365 : */
366 : void update_ownership_strength(const GUID_t& pub_id,
367 : const CORBA::Long& ownership_strength);
368 :
369 : #endif
370 :
371 : bool federated() const {
372 : return this->federated_;
373 : }
374 :
375 :
376 : Recorder_ptr create_recorder(DDS::Topic_ptr a_topic,
377 : const DDS::SubscriberQos & subscriber_qos,
378 : const DDS::DataReaderQos & datareader_qos,
379 : const RecorderListener_rch & a_listener,
380 : DDS::StatusMask mask);
381 :
382 : Replayer_ptr create_replayer(DDS::Topic_ptr a_topic,
383 : const DDS::PublisherQos & publisher_qos,
384 : const DDS::DataWriterQos & datawriter_qos,
385 : const ReplayerListener_rch & a_listener,
386 : DDS::StatusMask mask);
387 :
388 : DDS::Topic_ptr create_typeless_topic(
389 : const char * topic_name,
390 : const char * type_name,
391 : bool type_has_keys,
392 : const DDS::TopicQos & qos,
393 : DDS::TopicListener_ptr a_listener,
394 : DDS::StatusMask mask);
395 :
396 : void delete_recorder(Recorder_ptr recorder);
397 : void delete_replayer(Replayer_ptr replayer);
398 :
399 : void add_adjust_liveliness_timers(DataWriterImpl* writer);
400 : void remove_adjust_liveliness_timers();
401 :
402 0 : XTypes::TypeLookupService_rch get_type_lookup_service() { return type_lookup_service_; }
403 :
404 : #if defined(OPENDDS_SECURITY)
405 0 : Security::SecurityConfig_rch get_security_config() const
406 : {
407 0 : return security_config_;
408 : }
409 0 : DDS::Security::PermissionsHandle permissions_handle() const
410 : {
411 0 : return perm_handle_;
412 : }
413 0 : DDS::Security::ParticipantCryptoHandle crypto_handle() const
414 : {
415 0 : return part_crypto_handle_;
416 : }
417 : #endif
418 :
419 : bool prepare_to_delete_datawriters();
420 : bool set_wait_pending_deadline(const MonotonicTimePoint& deadline);
421 :
422 : #ifndef OPENDDS_SAFETY_PROFILE
423 : DDS::ReturnCode_t get_dynamic_type(
424 : DDS::DynamicType_var& type, const DDS::BuiltinTopicKey_t& key);
425 : #endif
426 :
427 : private:
428 : bool validate_publisher_qos(DDS::PublisherQos & publisher_qos);
429 : bool validate_subscriber_qos(DDS::SubscriberQos & subscriber_qos);
430 :
431 : /** The implementation of create_topic.
432 : */
433 :
434 : ///{@
435 : /// constants for the topic_mask argument to create_topic_i
436 : static const int TOPIC_TYPE_HAS_KEYS = 1;
437 : static const int TOPIC_TYPELESS = 2;
438 : ///@}
439 :
440 : DDS::Topic_ptr create_topic_i(
441 : const char * topic_name,
442 : const char * type_name,
443 : const DDS::TopicQos & qos,
444 : DDS::TopicListener_ptr a_listener,
445 : DDS::StatusMask mask,
446 : int topic_mask);
447 :
448 : DDS::Topic_ptr create_new_topic(
449 : const char * topic_name,
450 : const char * type_name,
451 : const DDS::TopicQos & qos,
452 : DDS::TopicListener_ptr a_listener,
453 : const DDS::StatusMask & mask,
454 : OpenDDS::DCPS::TypeSupport_ptr type_support);
455 :
456 : /** Delete the topic with option of whether the
457 : * topic object reference should be removed.
458 : */
459 : DDS::ReturnCode_t delete_topic_i(
460 : DDS::Topic_ptr a_topic,
461 : bool remove_objref);
462 :
463 : /// The default topic qos.
464 : DDS::TopicQos default_topic_qos_;
465 : /// The default publisher qos.
466 : DDS::PublisherQos default_publisher_qos_;
467 : /// The default subscriber qos.
468 : DDS::SubscriberQos default_subscriber_qos_;
469 :
470 : /// The qos of this DomainParticipant.
471 : DDS::DomainParticipantQos qos_;
472 : /// Mutex to protect listener info
473 : ACE_Thread_Mutex listener_mutex_;
474 : /// Used to notify the entity for relevant events.
475 : DDS::DomainParticipantListener_var listener_;
476 : /// The StatusKind bit mask indicates which status condition change
477 : /// can be notified by the listener of this entity.
478 : DDS::StatusMask listener_mask_;
479 :
480 : #if defined(OPENDDS_SECURITY)
481 : /// This participant id handle given by authentication.
482 : DDS::Security::IdentityHandle id_handle_;
483 : /// This participant permissions handle given by access constrol.
484 : DDS::Security::PermissionsHandle perm_handle_;
485 : /// This participant crypto handle given by crypto
486 : DDS::Security::ParticipantCryptoHandle part_crypto_handle_;
487 : #endif
488 :
489 : /// The id of the domain that creates this participant.
490 : const DDS::DomainId_t domain_id_;
491 : /// This participant id given by discovery.
492 : GUID_t dp_id_;
493 :
494 : /// Whether this DomainParticipant is attached to a federated
495 : /// repository.
496 : bool federated_;
497 :
498 : /// Collection of publishers.
499 : PublisherSet publishers_;
500 : /// Collection of subscribers.
501 : SubscriberSet subscribers_;
502 : /// Collection of topics.
503 : TopicMap topics_;
504 : #if !defined(OPENDDS_NO_CONTENT_FILTERED_TOPIC) || !defined(OPENDDS_NO_MULTI_TOPIC)
505 : /// Collection of TopicDescriptions which are not also Topics
506 : TopicDescriptionMap topic_descrs_;
507 : #endif
508 :
509 : typedef std::pair<DDS::InstanceHandle_t, unsigned int> HandleWithCounter;
510 : typedef OPENDDS_MAP_CMP(GUID_t, HandleWithCounter, GUID_tKeyLessThan) CountedHandleMap;
511 : typedef OPENDDS_MAP(DDS::InstanceHandle_t, GUID_t) RepoIdMap;
512 :
513 : /// Instance handles assigned which are mapped to GUIDs (use handle_protector_)
514 : CountedHandleMap handles_;
515 : /// By-handle lookup of instance handles assigned to GUIDs (use handle_protector_)
516 : RepoIdMap repoIds_;
517 :
518 : typedef OPENDDS_MAP_CMP(GUID_t, DDS::InstanceHandle_t, GUID_tKeyLessThan) HandleMap;
519 :
520 : /// Collection of ignored participants.
521 : HandleMap ignored_participants_;
522 : /// Collection of ignored topics.
523 : HandleMap ignored_topics_;
524 : /// Protect the publisher collection.
525 : ACE_Recursive_Thread_Mutex publishers_protector_;
526 : /// Protect the subscriber collection.
527 : ACE_Recursive_Thread_Mutex subscribers_protector_;
528 : /// Protect the topic collection.
529 : ACE_Recursive_Thread_Mutex topics_protector_;
530 : /// Protect the handle collection.
531 : mutable ACE_Thread_Mutex handle_protector_;
532 :
533 : mutable ConditionVariable<ACE_Thread_Mutex> handle_waiters_;
534 :
535 : /// Protect the shutdown.
536 : ACE_Thread_Mutex shutdown_mutex_;
537 : ConditionVariable<ACE_Thread_Mutex> shutdown_condition_;
538 : DDS::ReturnCode_t shutdown_result_;
539 : bool shutdown_complete_;
540 :
541 : /// The built in topic subscriber.
542 : RcHandle<BitSubscriber> bit_subscriber_;
543 :
544 : /// Get instances handles from DomainParticipantFactory (use handle_protector_)
545 : InstanceHandleGenerator& participant_handles_;
546 :
547 : /// Keep track of handles that can be reused (use handle_protector_)
548 : DisjointSequence::OrderedRanges<DDS::InstanceHandle_t> reusable_handles_;
549 :
550 : unique_ptr<Monitor> monitor_;
551 :
552 : #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
553 : OwnershipManager owner_man_;
554 : #endif
555 :
556 : /// Publisher ID generator.
557 : RepoIdSequence pub_id_gen_;
558 :
559 : #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
560 : ACE_Thread_Mutex filter_cache_lock_;
561 : OPENDDS_MAP(OPENDDS_STRING, RcHandle<FilterEvaluator> ) filter_cache_;
562 : #endif
563 :
564 : typedef OPENDDS_SET_CMP(Recorder_var, VarLess<Recorder> ) RecorderSet;
565 : typedef OPENDDS_SET_CMP(Replayer_var, VarLess<Replayer> ) ReplayerSet;
566 :
567 : RecorderSet recorders_;
568 : ReplayerSet replayers_;
569 :
570 : #if defined(OPENDDS_SECURITY)
571 : Security::SecurityConfig_rch security_config_;
572 : #endif
573 :
574 : /// Protect the recorders collection.
575 : ACE_Recursive_Thread_Mutex recorders_protector_;
576 : /// Protect the replayers collection.
577 : ACE_Recursive_Thread_Mutex replayers_protector_;
578 :
579 : class LivelinessTimer : public virtual RcObject {
580 : public:
581 : LivelinessTimer(DomainParticipantImpl& impl, DDS::LivelinessQosPolicyKind kind);
582 : virtual ~LivelinessTimer();
583 : void add_adjust(OpenDDS::DCPS::DataWriterImpl* writer);
584 : void remove_adjust();
585 : void execute(const MonotonicTimePoint& now);
586 : virtual void dispatch(const MonotonicTimePoint& tv) = 0;
587 : virtual void cancel() = 0;
588 :
589 : protected:
590 : DomainParticipantImpl& impl_;
591 : const DDS::LivelinessQosPolicyKind kind_;
592 :
593 0 : TimeDuration interval () const { return interval_; }
594 :
595 : virtual void schedule(const TimeDuration& interval) = 0;
596 :
597 : private:
598 : TimeDuration interval_;
599 : bool recalculate_interval_;
600 : MonotonicTimePoint last_liveliness_check_;
601 : bool scheduled_;
602 : ACE_Thread_Mutex lock_;
603 : };
604 :
605 : class AutomaticLivelinessTimer : public LivelinessTimer {
606 : public:
607 : AutomaticLivelinessTimer(DomainParticipantImpl& impl);
608 : virtual void dispatch(const MonotonicTimePoint& tv);
609 :
610 0 : void cancel()
611 : {
612 0 : impl_.automatic_liveliness_task_->cancel();
613 0 : }
614 :
615 : private:
616 0 : void schedule(const TimeDuration& interval)
617 : {
618 0 : impl_.automatic_liveliness_task_->schedule(interval);
619 0 : }
620 : };
621 : RcHandle<AutomaticLivelinessTimer> automatic_liveliness_timer_;
622 : typedef PmfSporadicTask<AutomaticLivelinessTimer> AutomaticLivelinessTask;
623 : RcHandle<AutomaticLivelinessTask> automatic_liveliness_task_;
624 :
625 : class ParticipantLivelinessTimer : public LivelinessTimer {
626 : public:
627 : ParticipantLivelinessTimer(DomainParticipantImpl& impl);
628 : virtual void dispatch(const MonotonicTimePoint& tv);
629 :
630 0 : void cancel()
631 : {
632 0 : impl_.participant_liveliness_task_->cancel();
633 0 : }
634 :
635 : private:
636 0 : void schedule(const TimeDuration& interval)
637 : {
638 0 : impl_.participant_liveliness_task_->schedule(interval);
639 0 : }
640 : };
641 : RcHandle<ParticipantLivelinessTimer> participant_liveliness_timer_;
642 : typedef PmfSporadicTask<ParticipantLivelinessTimer> ParticipantLivelinessTask;
643 : RcHandle<ParticipantLivelinessTask> participant_liveliness_task_;
644 :
645 : TimeDuration liveliness_check_interval(DDS::LivelinessQosPolicyKind kind);
646 : bool participant_liveliness_activity_after(const MonotonicTimePoint& tv);
647 : void signal_liveliness(DDS::LivelinessQosPolicyKind kind);
648 :
649 : MonotonicTimePoint last_liveliness_activity_;
650 :
651 : virtual int handle_exception(ACE_HANDLE fd);
652 :
653 : XTypes::TypeLookupService_rch type_lookup_service_;
654 : };
655 :
656 : } // namespace DCPS
657 : } // namespace OpenDDS
658 :
659 : OPENDDS_END_VERSIONED_NAMESPACE_DECL
660 :
661 : #endif /* OPENDDS_DCPS_DOMAIN_PARTICIPANT_IMPL_H */
|