00001
00002
00003
00004
00005
00006
00007
00008 #ifndef OPENDDS_DCPS_DATAWRITER_H
00009 #define OPENDDS_DCPS_DATAWRITER_H
00010
00011 #include "dds/DdsDcpsDomainC.h"
00012 #include "dds/DdsDcpsTopicC.h"
00013 #include "dds/DCPS/DataWriterCallbacks.h"
00014 #include "dds/DCPS/transport/framework/TransportSendListener.h"
00015 #include "dds/DCPS/transport/framework/TransportClient.h"
00016 #include "dds/DCPS/MessageTracker.h"
00017 #include "dds/DCPS/DataBlockLockPool.h"
00018 #include "dds/DCPS/PoolAllocator.h"
00019 #include "WriteDataContainer.h"
00020 #include "Definitions.h"
00021 #include "DataSampleHeader.h"
00022 #include "TopicImpl.h"
00023 #include "Time_Helper.h"
00024 #include "CoherentChangeControl.h"
00025 #include "GuidUtils.h"
00026 #include "RcEventHandler.h"
00027 #include "unique_ptr.h"
00028 #include "Message_Block_Ptr.h"
00029
00030 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00031 #include "FilterEvaluator.h"
00032 #endif
00033
00034 #include "ace/Event_Handler.h"
00035 #include "ace/OS_NS_sys_time.h"
00036
00037 #include <memory>
00038
00039 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00040 #pragma once
00041 #endif
00042
00043 class DDS_TEST;
00044
00045 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00046
00047 namespace OpenDDS {
00048 namespace DCPS {
00049
00050 class PublisherImpl;
00051 class DomainParticipantImpl;
00052 class OfferedDeadlineWatchdog;
00053 class Monitor;
00054 class DataSampleElement;
00055 class SendStateDataSampleList;
00056 struct AssociationData;
00057 class LivenessTimer;
00058
00059
00060
00061
00062
00063
00064
00065
00066
00067
00068
00069
00070
00071
00072
00073
00074
00075
00076
00077
00078
00079
00080
00081
00082
00083 class OpenDDS_Dcps_Export DataWriterImpl
00084 : public virtual LocalObject<DDS::DataWriter>,
00085 public virtual DataWriterCallbacks,
00086 public virtual EntityImpl,
00087 public virtual TransportClient,
00088 public virtual TransportSendListener {
00089 public:
00090 friend class WriteDataContainer;
00091 friend class PublisherImpl;
00092
00093 typedef OPENDDS_MAP_CMP(RepoId, SequenceNumber, GUID_tKeyLessThan) RepoIdToSequenceMap;
00094
00095 struct AckToken {
00096 ACE_Time_Value tstamp_;
00097 DDS::Duration_t max_wait_;
00098 SequenceNumber sequence_;
00099
00100 AckToken(const DDS::Duration_t& max_wait,
00101 const SequenceNumber& sequence)
00102 : tstamp_(ACE_OS::gettimeofday()),
00103 max_wait_(max_wait),
00104 sequence_(sequence) {}
00105
00106 ~AckToken() {}
00107
00108 ACE_Time_Value deadline() const {
00109 return duration_to_absolute_time_value(this->max_wait_, this->tstamp_);
00110 }
00111
00112 DDS::Time_t timestamp() const {
00113 return time_value_to_time(this->tstamp_);
00114 }
00115 };
00116
00117 DataWriterImpl();
00118
00119 virtual ~DataWriterImpl();
00120
00121 virtual DDS::InstanceHandle_t get_instance_handle();
00122
00123 virtual DDS::ReturnCode_t set_qos(const DDS::DataWriterQos & qos);
00124
00125 virtual DDS::ReturnCode_t get_qos(DDS::DataWriterQos & qos);
00126
00127 virtual DDS::ReturnCode_t set_listener(
00128 DDS::DataWriterListener_ptr a_listener,
00129 DDS::StatusMask mask);
00130
00131 virtual DDS::DataWriterListener_ptr get_listener();
00132
00133 virtual DDS::Topic_ptr get_topic();
00134
00135 virtual DDS::ReturnCode_t wait_for_acknowledgments(
00136 const DDS::Duration_t & max_wait);
00137
00138 virtual DDS::Publisher_ptr get_publisher();
00139
00140 virtual DDS::ReturnCode_t get_liveliness_lost_status(
00141 DDS::LivelinessLostStatus & status);
00142
00143 virtual DDS::ReturnCode_t get_offered_deadline_missed_status(
00144 DDS::OfferedDeadlineMissedStatus & status);
00145
00146 virtual DDS::ReturnCode_t get_offered_incompatible_qos_status(
00147 DDS::OfferedIncompatibleQosStatus & status);
00148
00149 virtual DDS::ReturnCode_t get_publication_matched_status(
00150 DDS::PublicationMatchedStatus & status);
00151
00152 ACE_Time_Value liveliness_check_interval(DDS::LivelinessQosPolicyKind kind);
00153
00154 bool participant_liveliness_activity_after(const ACE_Time_Value& tv);
00155
00156 virtual DDS::ReturnCode_t assert_liveliness();
00157
00158 virtual DDS::ReturnCode_t assert_liveliness_by_participant();
00159
00160 typedef OPENDDS_VECTOR(DDS::InstanceHandle_t) InstanceHandleVec;
00161 void get_instance_handles(InstanceHandleVec& instance_handles);
00162
00163 void get_readers(RepoIdSet& readers);
00164
00165 virtual DDS::ReturnCode_t get_matched_subscriptions(
00166 DDS::InstanceHandleSeq & subscription_handles);
00167
00168 #if !defined (DDS_HAS_MINIMUM_BIT)
00169 virtual DDS::ReturnCode_t get_matched_subscription_data(
00170 DDS::SubscriptionBuiltinTopicData & subscription_data,
00171 DDS::InstanceHandle_t subscription_handle);
00172 #endif // !defined (DDS_HAS_MINIMUM_BIT)
00173
00174 virtual DDS::ReturnCode_t enable();
00175
00176 virtual void add_association(const RepoId& yourId,
00177 const ReaderAssociation& reader,
00178 bool active);
00179
00180 virtual void transport_assoc_done(int flags, const RepoId& remote_id);
00181
00182 virtual void association_complete(const RepoId& remote_id);
00183
00184 virtual void remove_associations(const ReaderIdSeq & readers,
00185 bool callback);
00186
00187 virtual void update_incompatible_qos(const IncompatibleQosStatus& status);
00188
00189 virtual void update_subscription_params(const RepoId& readerId,
00190 const DDS::StringSeq& params);
00191
00192 virtual void inconsistent_topic();
00193
00194
00195
00196
00197
00198 void cleanup();
00199
00200
00201
00202
00203 void init(
00204 TopicImpl* topic_servant,
00205 const DDS::DataWriterQos & qos,
00206 DDS::DataWriterListener_ptr a_listener,
00207 const DDS::StatusMask & mask,
00208 WeakRcHandle<OpenDDS::DCPS::DomainParticipantImpl> participant_servant,
00209 OpenDDS::DCPS::PublisherImpl* publisher_servant);
00210
00211 void send_all_to_flush_control(ACE_Guard<ACE_Recursive_Thread_Mutex>& guard);
00212
00213
00214
00215
00216
00217
00218 DDS::ReturnCode_t
00219 register_instance_i(
00220 DDS::InstanceHandle_t& handle,
00221 Message_Block_Ptr data,
00222 const DDS::Time_t& source_timestamp);
00223
00224
00225
00226
00227
00228 DDS::ReturnCode_t
00229 register_instance_from_durable_data(
00230 DDS::InstanceHandle_t& handle,
00231 Message_Block_Ptr data,
00232 const DDS::Time_t & source_timestamp);
00233
00234
00235
00236
00237
00238 DDS::ReturnCode_t
00239 unregister_instance_i(
00240 DDS::InstanceHandle_t handle,
00241 const DDS::Time_t & source_timestamp);
00242
00243
00244
00245
00246
00247 void unregister_instances(const DDS::Time_t& source_timestamp);
00248
00249
00250
00251
00252
00253
00254
00255
00256
00257 DDS::ReturnCode_t write(Message_Block_Ptr sample,
00258 DDS::InstanceHandle_t handle,
00259 const DDS::Time_t& source_timestamp,
00260 GUIDSeq* filter_out);
00261
00262
00263
00264
00265
00266
00267 DDS::ReturnCode_t dispose(DDS::InstanceHandle_t handle,
00268 const DDS::Time_t & source_timestamp);
00269
00270
00271
00272
00273 DDS::ReturnCode_t num_samples(DDS::InstanceHandle_t handle,
00274 size_t& size);
00275
00276
00277
00278
00279 ACE_UINT64 get_unsent_data(SendStateDataSampleList& list) {
00280 return data_container_->get_unsent_data(list);
00281 }
00282
00283 SendStateDataSampleList get_resend_data() {
00284 return data_container_->get_resend_data();
00285 }
00286
00287
00288
00289
00290 RepoId get_publication_id();
00291
00292
00293
00294
00295 RepoId get_dp_id();
00296
00297
00298
00299
00300 void unregister_all();
00301
00302
00303
00304
00305
00306
00307 void data_delivered(const DataSampleElement* sample);
00308
00309
00310
00311
00312
00313 void control_delivered(const Message_Block_Ptr& sample);
00314
00315
00316 bool should_ack() const;
00317
00318
00319 AckToken create_ack_token(DDS::Duration_t max_wait) const;
00320
00321 virtual void retrieve_inline_qos_data(TransportSendListener::InlineQosData& qos_data) const;
00322
00323 virtual bool check_transport_qos(const TransportInst& inst);
00324
00325 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00326
00327
00328 bool coherent_changes_pending();
00329
00330
00331 void begin_coherent_changes();
00332
00333
00334 void end_coherent_changes(const GroupCoherentSamples& group_samples);
00335
00336 #endif
00337
00338
00339
00340
00341 char const* get_type_name() const;
00342
00343
00344
00345
00346
00347
00348 void data_dropped(const DataSampleElement* element,
00349 bool dropped_by_transport);
00350
00351
00352
00353
00354
00355 void control_dropped(const Message_Block_Ptr& sample,
00356 bool dropped_by_transport);
00357
00358
00359
00360
00361
00362
00363 ACE_INLINE
00364 ACE_Recursive_Thread_Mutex& get_lock() {
00365 return data_container_->lock_;
00366 }
00367
00368
00369
00370
00371
00372
00373
00374
00375
00376
00377 DDS::DataWriterListener_ptr listener_for(DDS::StatusKind kind);
00378
00379
00380 virtual int handle_timeout(const ACE_Time_Value &tv,
00381 const void *arg);
00382
00383
00384
00385 void send_suspended_data();
00386
00387 void remove_all_associations();
00388
00389 virtual void register_for_reader(const RepoId& participant,
00390 const RepoId& writerid,
00391 const RepoId& readerid,
00392 const TransportLocatorSeq& locators,
00393 DiscoveryListener* listener);
00394
00395 virtual void unregister_for_reader(const RepoId& participant,
00396 const RepoId& writerid,
00397 const RepoId& readerid);
00398
00399 void notify_publication_disconnected(const ReaderIdSeq& subids);
00400 void notify_publication_reconnected(const ReaderIdSeq& subids);
00401 void notify_publication_lost(const ReaderIdSeq& subids);
00402
00403
00404 int data_dropped_count_;
00405 int data_delivered_count_;
00406
00407 MessageTracker controlTracker;
00408
00409
00410
00411
00412
00413
00414
00415
00416 DDS::ReturnCode_t
00417 create_sample_data_message(Message_Block_Ptr data,
00418 DDS::InstanceHandle_t instance_handle,
00419 DataSampleHeader& header_data,
00420 Message_Block_Ptr& message,
00421 const DDS::Time_t& source_timestamp,
00422 bool content_filter);
00423
00424 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
00425
00426
00427 bool persist_data();
00428 #endif
00429
00430
00431 void reschedule_deadline();
00432
00433
00434 void wait_pending();
00435
00436
00437
00438
00439 DDS::InstanceHandle_t get_next_handle();
00440
00441 virtual RcHandle<EntityImpl> parent() const;
00442
00443 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00444 bool filter_out(const DataSampleElement& elt,
00445 const OPENDDS_STRING& filterClassName,
00446 const FilterEvaluator& evaluator,
00447 const DDS::StringSeq& expression_params) const;
00448 #endif
00449
00450
00451
00452
00453
00454 void wait_control_pending();
00455
00456 DataBlockLockPool::DataBlockLock* get_db_lock() {
00457 return db_lock_pool_->get_lock();
00458 }
00459
00460
00461
00462
00463 PublicationInstance_rch get_handle_instance(
00464 DDS::InstanceHandle_t handle);
00465
00466
00467 protected:
00468
00469 DDS::ReturnCode_t wait_for_specific_ack(const AckToken& token);
00470
00471 void prepare_to_delete();
00472
00473
00474 virtual DDS::ReturnCode_t enable_specific() = 0;
00475
00476
00477 size_t n_chunks_;
00478
00479
00480 size_t association_chunk_multiplier_;
00481
00482
00483
00484 CORBA::String_var type_name_;
00485
00486
00487 DDS::DataWriterQos qos_;
00488
00489
00490
00491 WeakRcHandle<DomainParticipantImpl> participant_servant_;
00492
00493
00494 ACE_Thread_Mutex reader_info_lock_;
00495
00496 struct ReaderInfo {
00497 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00498 WeakRcHandle<DomainParticipantImpl> participant_;
00499 OPENDDS_STRING filter_class_name_;
00500 OPENDDS_STRING filter_;
00501 DDS::StringSeq expression_params_;
00502 RcHandle<FilterEvaluator> eval_;
00503 #endif
00504 SequenceNumber expected_sequence_;
00505 bool durable_;
00506 ReaderInfo(const char* filter_class_name, const char* filter, const DDS::StringSeq& params,
00507 WeakRcHandle<DomainParticipantImpl> participant, bool durable);
00508 ~ReaderInfo();
00509 };
00510
00511 typedef OPENDDS_MAP_CMP(RepoId, ReaderInfo, GUID_tKeyLessThan) RepoIdToReaderInfoMap;
00512 RepoIdToReaderInfoMap reader_info_;
00513
00514 struct AckCustomization {
00515 GUIDSeq customized_;
00516 AckToken& token_;
00517 explicit AckCustomization(AckToken& at) : token_(at) {}
00518 };
00519
00520 virtual SendControlStatus send_control(const DataSampleHeader& header,
00521 Message_Block_Ptr msg);
00522
00523 private:
00524
00525 void track_sequence_number(GUIDSeq* filter_out);
00526
00527 void notify_publication_lost(const DDS::InstanceHandleSeq& handles);
00528
00529 DDS::ReturnCode_t dispose_and_unregister(DDS::InstanceHandle_t handle,
00530 const DDS::Time_t& timestamp);
00531
00532
00533
00534
00535
00536
00537
00538 ACE_Message_Block*
00539 create_control_message(MessageId message_id,
00540 DataSampleHeader& header,
00541 Message_Block_Ptr data,
00542 const DDS::Time_t& source_timestamp);
00543
00544
00545 bool send_liveliness(const ACE_Time_Value& now);
00546
00547
00548 void lookup_instance_handles(const ReaderIdSeq& ids,
00549 DDS::InstanceHandleSeq& hdls);
00550
00551 const RepoId& get_repo_id() const {
00552 return this->publication_id_;
00553 }
00554
00555 DDS::DomainId_t domain_id() const {
00556 return this->domain_id_;
00557 }
00558
00559 CORBA::Long get_priority_value(const AssociationData&) const {
00560 return this->qos_.transport_priority.value;
00561 }
00562
00563 #if defined(OPENDDS_SECURITY)
00564 DDS::Security::ParticipantCryptoHandle get_crypto_handle() const;
00565 #endif
00566
00567 void association_complete_i(const RepoId& remote_id);
00568
00569 friend class ::DDS_TEST;
00570
00571
00572
00573 unique_ptr<DataBlockLockPool> db_lock_pool_;
00574
00575
00576 CORBA::String_var topic_name_;
00577
00578 RepoId topic_id_;
00579
00580 TopicDescriptionPtr<TopicImpl> topic_servant_;
00581
00582
00583
00584 DDS::StatusMask listener_mask_;
00585
00586 DDS::DataWriterListener_var listener_;
00587
00588 DDS::DomainId_t domain_id_;
00589 RepoId dp_id_;
00590
00591 WeakRcHandle<PublisherImpl> publisher_servant_;
00592
00593 PublicationId publication_id_;
00594
00595 SequenceNumber sequence_number_;
00596
00597
00598 bool coherent_;
00599
00600
00601 ACE_UINT32 coherent_samples_;
00602
00603 unique_ptr<WriteDataContainer> data_container_;
00604
00605
00606 ACE_Recursive_Thread_Mutex lock_;
00607
00608 typedef OPENDDS_MAP_CMP(RepoId, DDS::InstanceHandle_t, GUID_tKeyLessThan) RepoIdToHandleMap;
00609
00610 RepoIdToHandleMap id_to_handle_map_;
00611
00612 RepoIdSet readers_;
00613
00614
00615 DDS::LivelinessLostStatus liveliness_lost_status_ ;
00616 DDS::OfferedDeadlineMissedStatus offered_deadline_missed_status_ ;
00617 DDS::OfferedIncompatibleQosStatus offered_incompatible_qos_status_ ;
00618 DDS::PublicationMatchedStatus publication_match_status_ ;
00619
00620
00621
00622 bool liveliness_lost_;
00623
00624
00625
00626
00627
00628
00629
00630
00631
00632
00633
00634
00635
00636
00637 unique_ptr<MessageBlockAllocator> mb_allocator_;
00638
00639 unique_ptr<DataBlockAllocator> db_allocator_;
00640
00641 unique_ptr<DataSampleHeaderAllocator> header_allocator_;
00642
00643
00644
00645 ACE_Reactor_Timer_Interface* reactor_;
00646
00647 ACE_Time_Value liveliness_check_interval_;
00648
00649 ACE_Time_Value last_liveliness_activity_time_;
00650
00651
00652 CORBA::Long last_deadline_missed_total_count_;
00653
00654
00655 RcHandle<OfferedDeadlineWatchdog> watchdog_;
00656
00657
00658
00659 bool is_bit_;
00660
00661 RepoIdSet pending_readers_, assoc_complete_readers_;
00662
00663
00664 ACE_UINT64 min_suspended_transaction_id_;
00665 ACE_UINT64 max_suspended_transaction_id_;
00666 SendStateDataSampleList available_data_list_;
00667
00668
00669 Monitor* monitor_;
00670
00671
00672 Monitor* periodic_monitor_;
00673
00674
00675
00676
00677 bool need_sequence_repair();
00678 bool need_sequence_repair_i() const;
00679
00680 DDS::ReturnCode_t send_end_historic_samples(const RepoId& readerId);
00681 DDS::ReturnCode_t send_request_ack();
00682
00683 bool liveliness_asserted_;
00684
00685
00686
00687 ACE_Thread_Mutex sync_unreg_rem_assocs_lock_;
00688 RcHandle<LivenessTimer> liveness_timer_;
00689 };
00690
00691 typedef RcHandle<DataWriterImpl> DataWriterImpl_rch;
00692
00693
00694 class LivenessTimer : public RcEventHandler
00695 {
00696 public:
00697 LivenessTimer(DataWriterImpl& writer)
00698 : writer_(writer)
00699 {
00700 }
00701
00702
00703 virtual int handle_timeout(const ACE_Time_Value &tv,
00704 const void *arg);
00705
00706 private:
00707 WeakRcHandle<DataWriterImpl> writer_;
00708 };
00709
00710 }
00711 }
00712
00713 OPENDDS_END_VERSIONED_NAMESPACE_DECL
00714
00715 #endif