Line data Source code
1 : /*
2 : * Distributed under the OpenDDS License.
3 : * See: http://www.opendds.org/license.html
4 : */
5 :
6 : #ifndef OPENDDS_DCPS_DATAWRITERIMPL_H
7 : #define OPENDDS_DCPS_DATAWRITERIMPL_H
8 :
9 : #include "Atomic.h"
10 : #include "Sample.h"
11 : #include "DataWriterCallbacks.h"
12 : #include "transport/framework/TransportSendListener.h"
13 : #include "transport/framework/TransportClient.h"
14 : #include "MessageTracker.h"
15 : #include "DataBlockLockPool.h"
16 : #include "PoolAllocator.h"
17 : #include "WriteDataContainer.h"
18 : #include "Definitions.h"
19 : #include "DataSampleHeader.h"
20 : #include "TopicImpl.h"
21 : #include "Time_Helper.h"
22 : #include "CoherentChangeControl.h"
23 : #include "GuidUtils.h"
24 : #include "RcEventHandler.h"
25 : #include "unique_ptr.h"
26 : #include "Message_Block_Ptr.h"
27 : #include "TimeTypes.h"
28 : #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
29 : # include "FilterEvaluator.h"
30 : #endif
31 :
32 : #include <dds/DdsDcpsDomainC.h>
33 : #include <dds/DdsDcpsTopicC.h>
34 :
35 : #include <ace/Event_Handler.h>
36 : #include <ace/OS_NS_sys_time.h>
37 :
38 : #include <memory>
39 :
40 : #ifndef ACE_LACKS_PRAGMA_ONCE
41 : # pragma once
42 : #endif /* ACE_LACKS_PRAGMA_ONCE */
43 :
44 : class DDS_TEST;
45 :
46 : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
47 :
48 : namespace OpenDDS {
49 : namespace DCPS {
50 :
51 : class PublisherImpl;
52 : class DomainParticipantImpl;
53 : class Monitor;
54 : class DataSampleElement;
55 : class SendStateDataSampleList;
56 : struct AssociationData;
57 : class LivenessTimer;
58 :
59 : /**
60 : * @class DataWriterImpl
61 : *
62 : * @brief Implements the OpenDDS::DCPS::DataWriterRemote interfaces and
63 : * DDS::DataWriter interfaces.
64 : *
65 : * See the DDS specification, OMG formal/2015-04-10, for a description of
66 : * the interface this class is implementing.
67 : *
68 : * This class must be inherited by the type-specific datawriter which
69 : * is specific to the data-type associated with the topic.
70 : *
71 : * @note: This class is responsible for allocating memory for the
72 : * header message block
73 : * (MessageBlock + DataBlock + DataSampleHeader) and the
74 : * DataSampleElement.
75 : * The data-type datawriter is responsible for allocating
76 : * memory for the sample data message block.
77 : * (e.g. MessageBlock + DataBlock + Foo data). But it gives
78 : * up ownership to this WriteDataContainer.
79 : */
80 : class OpenDDS_Dcps_Export DataWriterImpl
81 : : public virtual LocalObject<DDS::DataWriter>
82 : , public virtual DataWriterCallbacks
83 : , public virtual EntityImpl
84 : , public virtual TransportClient
85 : , public virtual TransportSendListener
86 : {
87 : public:
88 : friend class WriteDataContainer;
89 : friend class PublisherImpl;
90 :
91 : typedef OPENDDS_MAP_CMP(GUID_t, SequenceNumber, GUID_tKeyLessThan) RepoIdToSequenceMap;
92 : typedef Dynamic_Cached_Allocator_With_Overflow<ACE_Thread_Mutex> DataAllocator;
93 :
94 : struct AckToken {
95 : MonotonicTimePoint tstamp_;
96 : DDS::Duration_t max_wait_;
97 : SequenceNumber sequence_;
98 :
99 0 : AckToken(const DDS::Duration_t& max_wait,
100 : const SequenceNumber& sequence)
101 0 : : tstamp_(MonotonicTimePoint::now())
102 0 : , max_wait_(max_wait)
103 0 : , sequence_(sequence)
104 : {
105 0 : }
106 :
107 0 : ~AckToken() {}
108 :
109 0 : MonotonicTimePoint deadline() const
110 : {
111 0 : return tstamp_ + TimeDuration(max_wait_);
112 : }
113 :
114 0 : bool deadline_is_infinite() const
115 : {
116 0 : return max_wait_.sec == DDS::DURATION_INFINITE_SEC && max_wait_.nanosec == DDS::DURATION_INFINITE_NSEC;
117 : }
118 : };
119 :
120 : DataWriterImpl();
121 :
122 : virtual ~DataWriterImpl();
123 :
124 : void set_marshal_skip_serialize(bool value)
125 : {
126 : skip_serialize_ = value;
127 : }
128 :
129 : bool get_marshal_skip_serialize() const
130 : {
131 : return skip_serialize_;
132 : }
133 :
134 : DataAllocator* data_allocator() const
135 : {
136 : return data_allocator_.get();
137 : }
138 :
139 : virtual DDS::InstanceHandle_t get_instance_handle();
140 :
141 : virtual DDS::ReturnCode_t set_qos(const DDS::DataWriterQos & qos);
142 :
143 : virtual DDS::ReturnCode_t get_qos(DDS::DataWriterQos & qos);
144 :
145 : virtual DDS::ReturnCode_t set_listener(
146 : DDS::DataWriterListener_ptr a_listener,
147 : DDS::StatusMask mask);
148 :
149 : virtual DDS::DataWriterListener_ptr get_listener();
150 :
151 : virtual DDS::Topic_ptr get_topic();
152 :
153 : virtual DDS::ReturnCode_t wait_for_acknowledgments(
154 : const DDS::Duration_t & max_wait);
155 :
156 : virtual DDS::Publisher_ptr get_publisher();
157 :
158 : virtual DDS::ReturnCode_t get_liveliness_lost_status(
159 : DDS::LivelinessLostStatus & status);
160 :
161 : virtual DDS::ReturnCode_t get_offered_deadline_missed_status(
162 : DDS::OfferedDeadlineMissedStatus & status);
163 :
164 : virtual DDS::ReturnCode_t get_offered_incompatible_qos_status(
165 : DDS::OfferedIncompatibleQosStatus & status);
166 :
167 : virtual DDS::ReturnCode_t get_publication_matched_status(
168 : DDS::PublicationMatchedStatus & status);
169 :
170 : TimeDuration liveliness_check_interval(DDS::LivelinessQosPolicyKind kind);
171 :
172 : bool participant_liveliness_activity_after(const MonotonicTimePoint& tv);
173 :
174 : virtual DDS::ReturnCode_t assert_liveliness();
175 :
176 : DDS::ReturnCode_t assert_liveliness_by_participant();
177 :
178 : typedef OPENDDS_VECTOR(DDS::InstanceHandle_t) InstanceHandleVec;
179 : void get_instance_handles(InstanceHandleVec& instance_handles);
180 :
181 : void get_readers(RepoIdSet& readers);
182 :
183 : virtual DDS::ReturnCode_t get_matched_subscriptions(
184 : DDS::InstanceHandleSeq & subscription_handles);
185 :
186 : #if !defined (DDS_HAS_MINIMUM_BIT)
187 : virtual DDS::ReturnCode_t get_matched_subscription_data(
188 : DDS::SubscriptionBuiltinTopicData & subscription_data,
189 : DDS::InstanceHandle_t subscription_handle);
190 : #endif // !defined (DDS_HAS_MINIMUM_BIT)
191 :
192 : virtual DDS::ReturnCode_t enable();
193 :
194 : virtual void add_association(const GUID_t& yourId,
195 : const ReaderAssociation& reader,
196 : bool active);
197 :
198 : virtual void transport_assoc_done(int flags, const GUID_t& remote_id);
199 :
200 : virtual void remove_associations(const ReaderIdSeq & readers,
201 : bool callback);
202 :
203 : virtual void replay_durable_data_for(const GUID_t& remote_sub_id);
204 :
205 : virtual void update_incompatible_qos(const IncompatibleQosStatus& status);
206 :
207 : virtual void update_subscription_params(const GUID_t& readerId,
208 : const DDS::StringSeq& params);
209 :
210 : /**
211 : * cleanup the DataWriter.
212 : */
213 : void cleanup();
214 :
215 : /**
216 : * Initialize the data members.
217 : */
218 : void init(
219 : TopicImpl* topic_servant,
220 : const DDS::DataWriterQos& qos,
221 : DDS::DataWriterListener_ptr a_listener,
222 : const DDS::StatusMask& mask,
223 : WeakRcHandle<DomainParticipantImpl> participant_servant,
224 : PublisherImpl* publisher_servant);
225 :
226 : void send_all_to_flush_control(ACE_Guard<ACE_Recursive_Thread_Mutex>& guard);
227 :
228 : /**
229 : * Delegate to the WriteDataContainer to register
230 : * Must tell the transport to broadcast the registered
231 : * instance upon returning.
232 : */
233 : DDS::ReturnCode_t
234 : register_instance_i(
235 : DDS::InstanceHandle_t& handle,
236 : Message_Block_Ptr data,
237 : const DDS::Time_t& source_timestamp);
238 :
239 : /**
240 : * Delegate to the WriteDataContainer to register and tell
241 : * the transport to broadcast the registered instance.
242 : */
243 : DDS::ReturnCode_t
244 : register_instance_from_durable_data(
245 : DDS::InstanceHandle_t& handle,
246 : Message_Block_Ptr data,
247 : const DDS::Time_t & source_timestamp);
248 :
249 : /**
250 : * Delegate to the WriteDataContainer to unregister and tell
251 : * the transport to broadcast the unregistered instance.
252 : */
253 : DDS::ReturnCode_t
254 : unregister_instance_i(
255 : DDS::InstanceHandle_t handle,
256 : const DDS::Time_t & source_timestamp);
257 :
258 : /**
259 : * Unregister all registered instances and tell the transport
260 : * to broadcast the unregistered instances.
261 : */
262 : void unregister_instances(const DDS::Time_t& source_timestamp);
263 :
264 : /**
265 : * Delegate to the WriteDataContainer to queue the instance
266 : * sample and finally tell the transport to send the sample.
267 : * \param filter_out can either be null (if the writer can't
268 : * or won't evaluate the filters), or a list of
269 : * associated reader GUID_ts that should NOT get the
270 : * data sample due to content filtering.
271 : */
272 : DDS::ReturnCode_t write(Message_Block_Ptr sample,
273 : DDS::InstanceHandle_t handle,
274 : const DDS::Time_t& source_timestamp,
275 : GUIDSeq* filter_out,
276 : const void* real_data);
277 :
278 : DDS::ReturnCode_t write_sample(
279 : const Sample& sample,
280 : DDS::InstanceHandle_t handle,
281 : const DDS::Time_t& source_timestamp,
282 : GUIDSeq* filter_out);
283 :
284 : /**
285 : * Delegate to the WriteDataContainer to dispose all data
286 : * samples for a given instance and tell the transport to
287 : * broadcast the disposed instance.
288 : */
289 : DDS::ReturnCode_t dispose(DDS::InstanceHandle_t handle,
290 : const DDS::Time_t & source_timestamp);
291 :
292 : /**
293 : * Return the number of samples for a given instance.
294 : */
295 : DDS::ReturnCode_t num_samples(DDS::InstanceHandle_t handle,
296 : size_t& size);
297 :
298 : /**
299 : * Retrieve the unsent data from the WriteDataContainer.
300 : */
301 0 : ACE_UINT64 get_unsent_data(SendStateDataSampleList& list)
302 : {
303 0 : return data_container_->get_unsent_data(list);
304 : }
305 :
306 0 : SendStateDataSampleList get_resend_data()
307 : {
308 0 : return data_container_->get_resend_data();
309 : }
310 :
311 : /**
312 : * Accessor of the repository id of the domain participant.
313 : */
314 : GUID_t get_dp_id();
315 :
316 : /**
317 : * Delegate to WriteDataContainer to unregister all instances.
318 : */
319 : void unregister_all();
320 :
321 : /**
322 : * This is called by transport to notify that the sample is
323 : * delivered and it is delegated to WriteDataContainer
324 : * to adjust the internal data sample threads.
325 : */
326 : void data_delivered(const DataSampleElement* sample);
327 :
328 : void transport_discovery_change();
329 :
330 : /**
331 : * This is called by transport to notify that the control
332 : * message is delivered.
333 : */
334 : void control_delivered(const Message_Block_Ptr& sample);
335 :
336 : /// Does this writer have samples to be acknowledged?
337 : bool should_ack() const;
338 :
339 : /// Create an AckToken for ack operations.
340 : AckToken create_ack_token(DDS::Duration_t max_wait) const;
341 :
342 : virtual void retrieve_inline_qos_data(TransportSendListener::InlineQosData& qos_data) const;
343 :
344 : virtual bool check_transport_qos(const TransportInst& inst);
345 :
346 : #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
347 :
348 : /// Are coherent changes pending?
349 : bool coherent_changes_pending();
350 :
351 : /// Starts a coherent change set; should only be called once.
352 : void begin_coherent_changes();
353 :
354 : /// Ends a coherent change set; should only be called once.
355 : void end_coherent_changes(const GroupCoherentSamples& group_samples);
356 :
357 : #endif
358 :
359 : /**
360 : * Get associated topic type name.
361 : */
362 : char const* get_type_name() const;
363 :
364 : /**
365 : * This mothod is called by transport to notify the instance
366 : * sample is dropped and it delegates to WriteDataContainer
367 : * to update the internal list.
368 : */
369 : void data_dropped(const DataSampleElement* element,
370 : bool dropped_by_transport);
371 :
372 : /**
373 : * This is called by transport to notify that the control
374 : * message is dropped.
375 : */
376 : void control_dropped(const Message_Block_Ptr& sample,
377 : bool dropped_by_transport);
378 :
379 : /**
380 : * Accessor of the WriterDataContainer's lock.
381 : */
382 0 : ACE_Recursive_Thread_Mutex& get_lock() const
383 : {
384 0 : return data_container_->lock_;
385 : }
386 :
387 : /**
388 : * This is used to retrieve the listener for a certain status
389 : * change.
390 : *
391 : * If this datawriter has a registered listener and the status
392 : * kind is in the listener mask then the listener is returned.
393 : * Otherwise, the query for the listener is propagated up to the
394 : * factory/publisher.
395 : */
396 : DDS::DataWriterListener_ptr listener_for(DDS::StatusKind kind);
397 :
398 : /// Handle the assert liveliness timeout.
399 : virtual int handle_timeout(const ACE_Time_Value &tv,
400 : const void *arg);
401 :
402 : /// Called by the PublisherImpl to indicate that the Publisher is now
403 : /// resumed and any data collected while it was suspended should now be sent.
404 : void send_suspended_data();
405 :
406 : void remove_all_associations();
407 :
408 : virtual void register_for_reader(const GUID_t& participant,
409 : const GUID_t& writerid,
410 : const GUID_t& readerid,
411 : const TransportLocatorSeq& locators,
412 : DiscoveryListener* listener);
413 :
414 : virtual void unregister_for_reader(const GUID_t& participant,
415 : const GUID_t& writerid,
416 : const GUID_t& readerid);
417 :
418 : virtual void update_locators(const GUID_t& remote,
419 : const TransportLocatorSeq& locators);
420 :
421 : void notify_publication_disconnected(const ReaderIdSeq& subids);
422 : void notify_publication_reconnected(const ReaderIdSeq& subids);
423 : void notify_publication_lost(const ReaderIdSeq& subids);
424 :
425 : /// Statistics counter.
426 : Atomic<int> data_dropped_count_;
427 : Atomic<int> data_delivered_count_;
428 :
429 : MessageTracker controlTracker;
430 :
431 : /**
432 : * This method create a header message block and chain with
433 : * the sample data. The header contains the information
434 : * needed. e.g. message id, length of whole message...
435 : * The fast allocator is used to allocate the message block,
436 : * data block and header.
437 : */
438 : DDS::ReturnCode_t
439 : create_sample_data_message(Message_Block_Ptr data,
440 : DDS::InstanceHandle_t instance_handle,
441 : DataSampleHeader& header_data,
442 : Message_Block_Ptr& message,
443 : const DDS::Time_t& source_timestamp,
444 : bool content_filter);
445 :
446 : #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
447 : /// Make sent data available beyond the lifetime of this
448 : /// @c DataWriter.
449 : bool persist_data();
450 : #endif
451 :
452 : /// Wait for pending data and control messages to drain.
453 : void wait_pending();
454 :
455 : /**
456 : * Set deadline to complete wait_pending by. If 0, then wait_pending will
457 : * wait indefinitely if needed.
458 : */
459 : void set_wait_pending_deadline(const MonotonicTimePoint& deadline);
460 :
461 : /**
462 : * Get an instance handle for a new instance.
463 : */
464 : DDS::InstanceHandle_t get_next_handle();
465 :
466 : virtual RcHandle<EntityImpl> parent() const;
467 :
468 : #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
469 : bool filter_out(const DataSampleElement& elt,
470 : const OPENDDS_STRING& filterClassName,
471 : const FilterEvaluator& evaluator,
472 : const DDS::StringSeq& expression_params) const;
473 : #endif
474 :
475 0 : DataBlockLockPool::DataBlockLock* get_db_lock()
476 : {
477 0 : return db_lock_pool_->get_lock();
478 : }
479 :
480 : /**
481 : * Attempt to locate an existing instance for the given handle.
482 : */
483 : PublicationInstance_rch get_handle_instance(
484 : DDS::InstanceHandle_t handle);
485 :
486 : virtual WeakRcHandle<ICE::Endpoint> get_ice_endpoint();
487 :
488 0 : GUID_t get_guid() const
489 : {
490 0 : ACE_Guard<ACE_Recursive_Thread_Mutex> guard(lock_);
491 0 : return publication_id_;
492 0 : }
493 :
494 0 : SequenceNumber get_max_sn() const
495 : {
496 0 : ACE_Guard<ACE_Thread_Mutex> guard(sn_lock_);
497 0 : return sequence_number_;
498 0 : }
499 :
500 0 : const ValueDispatcher* get_value_dispatcher() const
501 : {
502 0 : return dynamic_cast<const ValueDispatcher*>(type_support_);
503 : }
504 :
505 : DDS::ReturnCode_t get_key_value(Sample_rch& sample, DDS::InstanceHandle_t handle);
506 : DDS::InstanceHandle_t lookup_instance(const Sample& sample);
507 : DDS::InstanceHandle_t register_instance_w_timestamp(
508 : const Sample& sample, const DDS::Time_t& timestamp);
509 : DDS::ReturnCode_t unregister_instance_w_timestamp(
510 : const Sample& sample,
511 : DDS::InstanceHandle_t instance_handle,
512 : const DDS::Time_t& timestamp);
513 : DDS::ReturnCode_t dispose_w_timestamp(
514 : const Sample& sample,
515 : DDS::InstanceHandle_t instance_handle,
516 : const DDS::Time_t& source_timestamp);
517 :
518 : protected:
519 :
520 0 : void check_and_set_repo_id(const GUID_t& id)
521 : {
522 0 : ACE_Guard<ACE_Recursive_Thread_Mutex> guard(lock_);
523 0 : if (GUID_UNKNOWN == publication_id_) {
524 0 : publication_id_ = id;
525 : }
526 0 : }
527 :
528 : SequenceNumber get_next_sn()
529 : {
530 : ACE_Guard<ACE_Thread_Mutex> guard(sn_lock_);
531 : return get_next_sn_i();
532 : }
533 :
534 0 : SequenceNumber get_next_sn_i()
535 : {
536 0 : if (sequence_number_ == SequenceNumber::SEQUENCENUMBER_UNKNOWN()) {
537 0 : sequence_number_ = SequenceNumber();
538 : } else {
539 0 : ++sequence_number_;
540 : }
541 0 : return sequence_number_;
542 : }
543 :
544 : // Perform cast to get extended version of listener (otherwise nil)
545 : DataWriterListener_ptr get_ext_listener();
546 :
547 : DDS::ReturnCode_t wait_for_specific_ack(const AckToken& token);
548 :
549 : void prepare_to_delete();
550 :
551 : /**
552 : * Setup CDR serialization options.
553 : */
554 : DDS::ReturnCode_t setup_serialization();
555 :
556 : ACE_Message_Block* serialize_sample(const Sample& sample);
557 :
558 : /// The number of chunks for the cached allocator.
559 : size_t n_chunks_;
560 :
561 : /// The multiplier for allocators affected by associations
562 : size_t association_chunk_multiplier_;
563 :
564 :
565 : /// The type name of associated topic.
566 : CORBA::String_var type_name_;
567 :
568 : /// The qos policy list of this datawriter.
569 : DDS::DataWriterQos qos_;
570 : /// The qos policy passed in by the user.
571 : /// Differs from qos_ because representation has been interpreted.
572 : DDS::DataWriterQos passed_qos_;
573 :
574 : /// The participant servant which creats the publisher that
575 : /// creates this datawriter.
576 : WeakRcHandle<DomainParticipantImpl> participant_servant_;
577 :
578 : //This lock should be used to protect access to reader_info_
579 : ACE_Thread_Mutex reader_info_lock_;
580 :
581 : struct ReaderInfo {
582 : #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
583 : WeakRcHandle<DomainParticipantImpl> participant_;
584 : OPENDDS_STRING filter_class_name_;
585 : OPENDDS_STRING filter_;
586 : DDS::StringSeq expression_params_;
587 : RcHandle<FilterEvaluator> eval_;
588 : #endif
589 : SequenceNumber expected_sequence_;
590 : bool durable_;
591 : ReaderInfo(const char* filter_class_name, const char* filter, const DDS::StringSeq& params,
592 : WeakRcHandle<DomainParticipantImpl> participant, bool durable);
593 : ~ReaderInfo();
594 : };
595 :
596 : typedef OPENDDS_MAP_CMP(GUID_t, ReaderInfo, GUID_tKeyLessThan) RepoIdToReaderInfoMap;
597 : RepoIdToReaderInfoMap reader_info_;
598 :
599 : struct AckCustomization {
600 : GUIDSeq customized_;
601 : AckToken& token_;
602 : explicit AckCustomization(AckToken& at) : token_(at) {}
603 : };
604 :
605 : virtual SendControlStatus send_control(const DataSampleHeader& header,
606 : Message_Block_Ptr msg);
607 :
608 : bool skip_serialize_;
609 :
610 : /**
611 : * Used to hold the encoding and get the buffer sizes needed to store the
612 : * results of the encoding.
613 : */
614 : class EncodingMode {
615 : public:
616 0 : EncodingMode()
617 0 : : valid_(false)
618 0 : , header_size_(0)
619 : {
620 0 : }
621 :
622 0 : EncodingMode(const TypeSupportImpl* ts, Encoding::Kind kind, bool swap_the_bytes)
623 0 : : valid_(true)
624 0 : , encoding_(kind, swap_the_bytes)
625 0 : , header_size_(encoding_.is_encapsulated() ? EncapsulationHeader::serialized_size : 0)
626 0 : , bound_(ts->serialized_size_bound(encoding_))
627 0 : , key_only_bound_(ts->key_only_serialized_size_bound(encoding_))
628 : {
629 0 : }
630 :
631 0 : bool valid() const
632 : {
633 0 : return valid_;
634 : }
635 :
636 0 : const Encoding& encoding() const
637 : {
638 0 : return encoding_;
639 : }
640 :
641 : bool bound() const
642 : {
643 : return bound_;
644 : }
645 :
646 0 : SerializedSizeBound buffer_size_bound() const
647 : {
648 0 : return bound_ ? SerializedSizeBound(header_size_ + bound_.get()) : SerializedSizeBound();
649 : }
650 :
651 0 : size_t buffer_size(const Sample& sample) const
652 : {
653 0 : const SerializedSizeBound bound = sample.key_only() ? key_only_bound_ : bound_;
654 0 : return header_size_ + (bound ? bound.get() : sample.serialized_size(encoding_));
655 : }
656 :
657 : private:
658 : bool valid_;
659 : Encoding encoding_;
660 : size_t header_size_;
661 : SerializedSizeBound bound_;
662 : SerializedSizeBound key_only_bound_;
663 : } encoding_mode_;
664 :
665 0 : TypeSupportImpl* get_type_support() const
666 : {
667 0 : return type_support_;
668 : }
669 :
670 : DDS::ReturnCode_t instance_must_exist(
671 : const char* method_name,
672 : const Sample& sample,
673 : DDS::InstanceHandle_t& instance_handle,
674 : bool remove = false);
675 :
676 : DDS::ReturnCode_t get_or_create_instance_handle(
677 : DDS::InstanceHandle_t& handle,
678 : const Sample& sample,
679 : const DDS::Time_t& source_timestamp);
680 :
681 : DDS::ReturnCode_t write_w_timestamp(
682 : const Sample& sample,
683 : DDS::InstanceHandle_t handle,
684 : const DDS::Time_t& source_timestamp);
685 :
686 : private:
687 :
688 : void track_sequence_number(GUIDSeq* filter_out);
689 :
690 : void notify_publication_lost(const DDS::InstanceHandleSeq& handles);
691 :
692 : DDS::ReturnCode_t dispose_and_unregister(DDS::InstanceHandle_t handle,
693 : const DDS::Time_t& timestamp);
694 :
695 : /**
696 : * This method create a header message block and chain with
697 : * the registered sample. The header contains the information
698 : * needed. e.g. message id, length of whole message...
699 : * The fast allocator is not used for the header.
700 : */
701 : ACE_Message_Block*
702 : create_control_message(MessageId message_id,
703 : DataSampleHeader& header,
704 : Message_Block_Ptr data,
705 : const DDS::Time_t& source_timestamp);
706 :
707 : /// Send the liveliness message.
708 : bool send_liveliness(const MonotonicTimePoint& now);
709 :
710 : /// Lookup the instance handles by the subscription repo ids
711 : void lookup_instance_handles(const ReaderIdSeq& ids,
712 : DDS::InstanceHandleSeq& hdls);
713 :
714 : RcHandle<BitSubscriber> get_builtin_subscriber_proxy() const;
715 :
716 0 : DDS::DomainId_t domain_id() const
717 : {
718 0 : return this->domain_id_;
719 : }
720 :
721 0 : CORBA::Long get_priority_value(const AssociationData&) const
722 : {
723 0 : return this->qos_.transport_priority.value;
724 : }
725 :
726 : #ifdef OPENDDS_SECURITY
727 : DDS::Security::ParticipantCryptoHandle get_crypto_handle() const;
728 : #endif
729 :
730 : void association_complete_i(const GUID_t& remote_id);
731 :
732 : void return_handle(DDS::InstanceHandle_t handle);
733 :
734 : friend class ::DDS_TEST; // allows tests to get at privates
735 :
736 :
737 : // Data block local pool for this data writer.
738 : unique_ptr<DataBlockLockPool> db_lock_pool_;
739 :
740 : /// The name of associated topic.
741 : CORBA::String_var topic_name_;
742 : /// The associated topic repository id.
743 : GUID_t topic_id_;
744 : /// The topic servant.
745 : TopicDescriptionPtr<TopicImpl> topic_servant_;
746 : TypeSupportImpl* type_support_;
747 :
748 : /// Mutex to protect listener info
749 : ACE_Thread_Mutex listener_mutex_;
750 : /// The StatusKind bit mask indicates which status condition change
751 : /// can be notified by the listener of this entity.
752 : DDS::StatusMask listener_mask_;
753 : /// Used to notify the entity for relevant events.
754 : DDS::DataWriterListener_var listener_;
755 : /// The domain id.
756 : DDS::DomainId_t domain_id_;
757 : GUID_t dp_id_;
758 : /// The publisher servant which creates this datawriter.
759 : WeakRcHandle<PublisherImpl> publisher_servant_;
760 : /// The repository id of this datawriter/publication.
761 : GUID_t publication_id_;
762 : /// The sequence number unique in DataWriter scope.
763 : SequenceNumber sequence_number_;
764 : /// Mutex for sequence_number_
765 : mutable ACE_Thread_Mutex sn_lock_;
766 : /// Flag indicating DataWriter current belongs to
767 : /// a coherent change set.
768 : bool coherent_;
769 : /// The number of samples belonging to the current
770 : /// coherent change set.
771 : ACE_UINT32 coherent_samples_;
772 : /// The sample data container.
773 : RcHandle<WriteDataContainer> data_container_;
774 : /// The lock to protect the activate subscriptions
775 : /// and status changes.
776 : mutable ACE_Recursive_Thread_Mutex lock_;
777 :
778 : typedef OPENDDS_MAP_CMP(GUID_t, DDS::InstanceHandle_t, GUID_tKeyLessThan) RepoIdToHandleMap;
779 : RepoIdToHandleMap id_to_handle_map_;
780 :
781 : RepoIdSet readers_;
782 :
783 : /// Status conditions.
784 : DDS::LivelinessLostStatus liveliness_lost_status_ ;
785 : DDS::OfferedDeadlineMissedStatus offered_deadline_missed_status_ ;
786 : DDS::OfferedIncompatibleQosStatus offered_incompatible_qos_status_ ;
787 : DDS::PublicationMatchedStatus publication_match_status_ ;
788 :
789 : /// True if the writer failed to actively signal its liveliness within
790 : /// its offered liveliness period.
791 : bool liveliness_lost_;
792 :
793 : /**
794 : * @todo The publication_lost_status_ and
795 : * publication_reconnecting_status_ are left here for
796 : * future use when we add get_publication_lost_status()
797 : * and get_publication_reconnecting_status() methods.
798 : */
799 : // Statistics of the lost publications due to lost connection.
800 : // PublicationLostStatus publication_lost_status_;
801 : // Statistics of the publications that associates with a
802 : // reconnecting datalink.
803 : // PublicationReconnectingStatus publication_reconnecting_status_;
804 :
805 : /// The message block allocator.
806 : unique_ptr<MessageBlockAllocator> mb_allocator_;
807 : /// The data block allocator.
808 : unique_ptr<DataBlockAllocator> db_allocator_;
809 : /// The header data allocator.
810 : unique_ptr<DataSampleHeaderAllocator> header_allocator_;
811 : unique_ptr<DataAllocator> data_allocator_;
812 :
813 : /// The orb's reactor to be used to register the liveliness
814 : /// timer.
815 : ACE_Reactor_Timer_Interface* reactor_;
816 : /// The time interval for sending liveliness message.
817 : TimeDuration liveliness_check_interval_;
818 : /// Timestamp of last write/dispose/assert_liveliness.
819 : MonotonicTimePoint last_liveliness_activity_time_;
820 : /// Total number of offered deadlines missed during last offered
821 : /// deadline status check.
822 : CORBA::Long last_deadline_missed_total_count_;
823 :
824 : /// Flag indicates that this datawriter is a builtin topic
825 : /// datawriter.
826 : bool is_bit_;
827 :
828 : /// The cached available data while suspending and associated transaction ids.
829 : ACE_UINT64 min_suspended_transaction_id_;
830 : ACE_UINT64 max_suspended_transaction_id_;
831 : SendStateDataSampleList available_data_list_;
832 :
833 : /// Monitor object for this entity
834 : unique_ptr<Monitor> monitor_;
835 :
836 : /// Periodic Monitor object for this entity
837 : unique_ptr<Monitor> periodic_monitor_;
838 :
839 :
840 : // Do we need to set the sequence repair header bit?
841 : // must call prior to incrementing sequence number
842 : bool need_sequence_repair();
843 : bool need_sequence_repair_i() const;
844 :
845 : DDS::ReturnCode_t send_end_historic_samples(const GUID_t& readerId);
846 : DDS::ReturnCode_t send_request_ack();
847 :
848 : bool liveliness_asserted_;
849 :
850 : // Lock used to synchronize remove_associations calls from discovery
851 : // and unregister_instances during deletion of datawriter from application
852 : ACE_Thread_Mutex sync_unreg_rem_assocs_lock_;
853 : RcHandle<LivenessTimer> liveness_timer_;
854 :
855 : MonotonicTimePoint wait_pending_deadline_;
856 :
857 : typedef OPENDDS_MAP(DDS::InstanceHandle_t, Sample_rch) InstanceHandlesToValues;
858 : InstanceHandlesToValues instance_handles_to_values_;
859 : typedef OPENDDS_MAP_CMP(Sample_rch, DDS::InstanceHandle_t, SampleRchCmp) InstanceValuesToHandles;
860 : InstanceValuesToHandles instance_values_to_handles_;
861 :
862 : bool insert_instance(DDS::InstanceHandle_t handle, Sample_rch& sample);
863 : InstanceValuesToHandles::iterator find_instance(const Sample& sample);
864 :
865 : #ifdef OPENDDS_SECURITY
866 : protected:
867 : Security::SecurityConfig_rch security_config_;
868 : DDS::Security::PermissionsHandle participant_permissions_handle_;
869 : DDS::DynamicType_var dynamic_type_;
870 : #endif
871 : };
872 :
873 : typedef RcHandle<DataWriterImpl> DataWriterImpl_rch;
874 :
875 :
876 : class LivenessTimer : public virtual RcEventHandler
877 : {
878 : public:
879 0 : LivenessTimer(DataWriterImpl& writer)
880 0 : : writer_(writer)
881 : {
882 0 : }
883 :
884 : /// Handle the assert liveliness timeout.
885 : virtual int handle_timeout(const ACE_Time_Value& tv, const void* arg);
886 :
887 : private:
888 : WeakRcHandle<DataWriterImpl> writer_;
889 : };
890 :
891 : } // namespace DCPS
892 : } // namespace OpenDDS
893 :
894 : OPENDDS_END_VERSIONED_NAMESPACE_DECL
895 :
896 : #endif
|