00001
00002
00003
00004
00005
00006
00007
00008 #ifndef OPENDDS_DCPS_DATAWRITER_H
00009 #define OPENDDS_DCPS_DATAWRITER_H
00010
00011 #include "dds/DdsDcpsDomainC.h"
00012 #include "dds/DdsDcpsTopicC.h"
00013 #include "dds/DCPS/DataWriterCallbacks.h"
00014 #include "dds/DCPS/transport/framework/TransportSendListener.h"
00015 #include "dds/DCPS/transport/framework/TransportClient.h"
00016 #include "dds/DCPS/MessageTracker.h"
00017 #include "dds/DCPS/DataBlockLockPool.h"
00018 #include "dds/DCPS/PoolAllocator.h"
00019 #include "WriteDataContainer.h"
00020 #include "Definitions.h"
00021 #include "DataSampleHeader.h"
00022 #include "TopicImpl.h"
00023 #include "Qos_Helper.h"
00024 #include "CoherentChangeControl.h"
00025 #include "GuidUtils.h"
00026
00027 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00028 #include "FilterEvaluator.h"
00029 #endif
00030
00031 #include "ace/Event_Handler.h"
00032 #include "ace/OS_NS_sys_time.h"
00033 #include "ace/Condition_T.h"
00034 #include "ace/Condition_Recursive_Thread_Mutex.h"
00035
00036 #include <memory>
00037
00038 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00039 #pragma once
00040 #endif
00041
00042 class DDS_TEST;
00043
00044 namespace OpenDDS {
00045 namespace DCPS {
00046
00047 class PublisherImpl;
00048 class DomainParticipantImpl;
00049 class OfferedDeadlineWatchdog;
00050 class Monitor;
00051 class DataSampleElement;
00052 class SendStateDataSampleList;
00053 struct AssociationData;
00054
00055
00056
00057
00058
00059
00060
00061
00062
00063
00064
00065
00066
00067
00068
00069
00070
00071
00072
00073
00074
00075
00076 class OpenDDS_Dcps_Export DataWriterImpl
00077 : public virtual DDS::DataWriter,
00078 public virtual DataWriterCallbacks,
00079 public virtual EntityImpl,
00080 public virtual TransportClient,
00081 public virtual TransportSendListener,
00082 public virtual ACE_Event_Handler {
00083 public:
00084 friend class WriteDataContainer;
00085 friend class PublisherImpl;
00086
00087 typedef OPENDDS_MAP_CMP(RepoId, SequenceNumber, GUID_tKeyLessThan) RepoIdToSequenceMap;
00088
00089 struct AckToken {
00090 ACE_Time_Value tstamp_;
00091 DDS::Duration_t max_wait_;
00092 SequenceNumber sequence_;
00093
00094 AckToken(const DDS::Duration_t& max_wait,
00095 const SequenceNumber& sequence)
00096 : tstamp_(ACE_OS::gettimeofday()),
00097 max_wait_(max_wait),
00098 sequence_(sequence) {}
00099
00100 ~AckToken() {}
00101
00102 ACE_Time_Value deadline() const {
00103 return duration_to_absolute_time_value(this->max_wait_, this->tstamp_);
00104 }
00105
00106 DDS::Time_t timestamp() const {
00107 return time_value_to_time(this->tstamp_);
00108 }
00109 };
00110
00111
00112 DataWriterImpl();
00113
00114
00115 virtual ~DataWriterImpl();
00116
00117 virtual DDS::InstanceHandle_t get_instance_handle();
00118
00119 virtual DDS::ReturnCode_t set_qos(const DDS::DataWriterQos & qos);
00120
00121 virtual DDS::ReturnCode_t get_qos(DDS::DataWriterQos & qos);
00122
00123 virtual DDS::ReturnCode_t set_listener(
00124 DDS::DataWriterListener_ptr a_listener,
00125 DDS::StatusMask mask);
00126
00127 virtual DDS::DataWriterListener_ptr get_listener();
00128
00129 virtual DDS::Topic_ptr get_topic();
00130
00131 virtual DDS::ReturnCode_t wait_for_acknowledgments(
00132 const DDS::Duration_t & max_wait);
00133
00134 virtual DDS::Publisher_ptr get_publisher();
00135
00136 virtual DDS::ReturnCode_t get_liveliness_lost_status(
00137 DDS::LivelinessLostStatus & status);
00138
00139 virtual DDS::ReturnCode_t get_offered_deadline_missed_status(
00140 DDS::OfferedDeadlineMissedStatus & status);
00141
00142 virtual DDS::ReturnCode_t get_offered_incompatible_qos_status(
00143 DDS::OfferedIncompatibleQosStatus & status);
00144
00145 virtual DDS::ReturnCode_t get_publication_matched_status(
00146 DDS::PublicationMatchedStatus & status);
00147
00148 ACE_Time_Value liveliness_check_interval(DDS::LivelinessQosPolicyKind kind);
00149
00150 bool participant_liveliness_activity_after(const ACE_Time_Value& tv);
00151
00152 virtual DDS::ReturnCode_t assert_liveliness();
00153
00154 virtual DDS::ReturnCode_t assert_liveliness_by_participant();
00155
00156 typedef OPENDDS_VECTOR(DDS::InstanceHandle_t) InstanceHandleVec;
00157 void get_instance_handles(InstanceHandleVec& instance_handles);
00158
00159 void get_readers(RepoIdSet& readers);
00160
00161 virtual DDS::ReturnCode_t get_matched_subscriptions(
00162 DDS::InstanceHandleSeq & subscription_handles);
00163
00164 #if !defined (DDS_HAS_MINIMUM_BIT)
00165 virtual DDS::ReturnCode_t get_matched_subscription_data(
00166 DDS::SubscriptionBuiltinTopicData & subscription_data,
00167 DDS::InstanceHandle_t subscription_handle);
00168 #endif // !defined (DDS_HAS_MINIMUM_BIT)
00169
00170 virtual DDS::ReturnCode_t enable();
00171
00172 virtual void add_association(const RepoId& yourId,
00173 const ReaderAssociation& reader,
00174 bool active);
00175
00176 virtual void transport_assoc_done(int flags, const RepoId& remote_id);
00177
00178 virtual void association_complete(const RepoId& remote_id);
00179
00180 virtual void remove_associations(const ReaderIdSeq & readers,
00181 bool callback);
00182
00183 virtual void update_incompatible_qos(const IncompatibleQosStatus& status);
00184
00185 virtual void update_subscription_params(const RepoId& readerId,
00186 const DDS::StringSeq& params);
00187
00188 virtual void inconsistent_topic();
00189
00190
00191
00192
00193 void cleanup();
00194
00195
00196
00197
00198 virtual void init(
00199 DDS::Topic_ptr topic,
00200 TopicImpl* topic_servant,
00201 const DDS::DataWriterQos & qos,
00202 DDS::DataWriterListener_ptr a_listener,
00203 const DDS::StatusMask & mask,
00204 OpenDDS::DCPS::DomainParticipantImpl* participant_servant,
00205 OpenDDS::DCPS::PublisherImpl* publisher_servant,
00206 DDS::DataWriter_ptr dw_local);
00207
00208 void send_all_to_flush_control(ACE_Guard<ACE_Recursive_Thread_Mutex>& guard);
00209
00210
00211
00212
00213
00214
00215 DDS::ReturnCode_t
00216 register_instance_i(
00217 DDS::InstanceHandle_t& handle,
00218 DataSample* data,
00219 const DDS::Time_t& source_timestamp);
00220
00221
00222
00223
00224
00225 DDS::ReturnCode_t
00226 register_instance_from_durable_data(
00227 DDS::InstanceHandle_t& handle,
00228 DataSample* data,
00229 const DDS::Time_t & source_timestamp);
00230
00231
00232
00233
00234
00235 DDS::ReturnCode_t
00236 unregister_instance_i(
00237 DDS::InstanceHandle_t handle,
00238 const DDS::Time_t & source_timestamp);
00239
00240
00241
00242
00243
00244 void unregister_instances(const DDS::Time_t& source_timestamp);
00245
00246
00247
00248
00249
00250
00251
00252
00253
00254 DDS::ReturnCode_t write(DataSample* sample,
00255 DDS::InstanceHandle_t handle,
00256 const DDS::Time_t& source_timestamp,
00257 GUIDSeq* filter_out);
00258
00259
00260
00261
00262
00263
00264 DDS::ReturnCode_t dispose(DDS::InstanceHandle_t handle,
00265 const DDS::Time_t & source_timestamp);
00266
00267
00268
00269
00270 DDS::ReturnCode_t num_samples(DDS::InstanceHandle_t handle,
00271 size_t& size);
00272
00273
00274
00275
00276 ACE_UINT64 get_unsent_data(SendStateDataSampleList& list) {
00277 return data_container_->get_unsent_data(list);
00278 }
00279
00280 SendStateDataSampleList get_resend_data() {
00281 return data_container_->get_resend_data();
00282 }
00283
00284
00285
00286
00287 RepoId get_publication_id();
00288
00289
00290
00291
00292 RepoId get_dp_id();
00293
00294
00295
00296
00297 void unregister_all();
00298
00299
00300
00301
00302
00303
00304 void data_delivered(const DataSampleElement* sample);
00305
00306
00307
00308
00309
00310 void control_delivered(ACE_Message_Block* sample);
00311
00312
00313 bool should_ack() const;
00314
00315
00316 AckToken create_ack_token(DDS::Duration_t max_wait) const;
00317
00318 virtual void retrieve_inline_qos_data(TransportSendListener::InlineQosData& qos_data) const;
00319
00320 virtual bool check_transport_qos(const TransportInst& inst);
00321
00322 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00323
00324
00325 bool coherent_changes_pending();
00326
00327
00328 void begin_coherent_changes();
00329
00330
00331 void end_coherent_changes(const GroupCoherentSamples& group_samples);
00332
00333 #endif
00334
00335
00336
00337
00338 const char* get_topic_name();
00339
00340
00341
00342
00343 char const* get_type_name() const;
00344
00345
00346
00347
00348
00349
00350 void data_dropped(const DataSampleElement* element,
00351 bool dropped_by_transport);
00352
00353
00354
00355
00356
00357 void control_dropped(ACE_Message_Block* sample,
00358 bool dropped_by_transport);
00359
00360
00361
00362
00363
00364
00365 ACE_INLINE
00366 ACE_Recursive_Thread_Mutex& get_lock() {
00367 return data_container_->lock_;
00368 }
00369
00370
00371
00372
00373
00374
00375
00376
00377 virtual void unregistered(DDS::InstanceHandle_t instance_handle) = 0;
00378
00379
00380
00381
00382
00383
00384
00385
00386
00387
00388 DDS::DataWriterListener_ptr listener_for(DDS::StatusKind kind);
00389
00390
00391 virtual int handle_timeout(const ACE_Time_Value &tv,
00392 const void *arg);
00393
00394 virtual int handle_close(ACE_HANDLE,
00395 ACE_Reactor_Mask);
00396
00397
00398
00399 void send_suspended_data();
00400
00401 void remove_all_associations();
00402
00403 virtual void register_for_reader(const RepoId& participant,
00404 const RepoId& writerid,
00405 const RepoId& readerid,
00406 const TransportLocatorSeq& locators,
00407 DiscoveryListener* listener);
00408
00409 virtual void unregister_for_reader(const RepoId& participant,
00410 const RepoId& writerid,
00411 const RepoId& readerid);
00412
00413 void notify_publication_disconnected(const ReaderIdSeq& subids);
00414 void notify_publication_reconnected(const ReaderIdSeq& subids);
00415 void notify_publication_lost(const ReaderIdSeq& subids);
00416
00417 virtual void notify_connection_deleted(const RepoId& peerId);
00418
00419
00420 int data_dropped_count_;
00421 int data_delivered_count_;
00422
00423 MessageTracker controlTracker;
00424
00425
00426
00427
00428
00429
00430
00431
00432 DDS::ReturnCode_t
00433 create_sample_data_message(DataSample* data,
00434 DDS::InstanceHandle_t instance_handle,
00435 DataSampleHeader& header_data,
00436 ACE_Message_Block*& message,
00437 const DDS::Time_t& source_timestamp,
00438 bool content_filter);
00439
00440 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
00441
00442
00443 bool persist_data();
00444 #endif
00445
00446
00447 void reschedule_deadline();
00448
00449
00450 void wait_pending();
00451
00452
00453
00454
00455 DDS::InstanceHandle_t get_next_handle();
00456
00457 virtual EntityImpl* parent() const;
00458
00459 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00460 bool filter_out(const DataSampleElement& elt,
00461 const OPENDDS_STRING& filterClassName,
00462 const FilterEvaluator& evaluator,
00463 const DDS::StringSeq& expression_params) const;
00464 #endif
00465
00466
00467
00468
00469
00470 void wait_control_pending();
00471
00472 DataBlockLockPool::DataBlockLock* get_db_lock() {
00473 return db_lock_pool_->get_lock();
00474 }
00475
00476 protected:
00477
00478 DDS::ReturnCode_t wait_for_specific_ack(const AckToken& token);
00479
00480 void prepare_to_delete();
00481
00482
00483 virtual DDS::ReturnCode_t enable_specific() = 0;
00484
00485
00486 size_t n_chunks_;
00487
00488
00489 size_t association_chunk_multiplier_;
00490
00491
00492
00493
00494 PublicationInstance* get_handle_instance(
00495 DDS::InstanceHandle_t handle);
00496
00497
00498 CORBA::String_var type_name_;
00499
00500
00501 DDS::DataWriterQos qos_;
00502
00503
00504
00505 DomainParticipantImpl* participant_servant_;
00506
00507
00508 ACE_Thread_Mutex reader_info_lock_;
00509
00510 struct ReaderInfo {
00511 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00512 DomainParticipantImpl* participant_;
00513 OPENDDS_STRING filter_class_name_;
00514 OPENDDS_STRING filter_;
00515 DDS::StringSeq expression_params_;
00516 RcHandle<FilterEvaluator> eval_;
00517 #endif
00518 SequenceNumber expected_sequence_;
00519 bool durable_;
00520 ReaderInfo(const char* filter_class_name, const char* filter, const DDS::StringSeq& params,
00521 DomainParticipantImpl* participant, bool durable);
00522 ~ReaderInfo();
00523 };
00524
00525 typedef OPENDDS_MAP_CMP(RepoId, ReaderInfo, GUID_tKeyLessThan) RepoIdToReaderInfoMap;
00526 RepoIdToReaderInfoMap reader_info_;
00527
00528 struct AckCustomization {
00529 GUIDSeq customized_;
00530 AckToken& token_;
00531 explicit AckCustomization(AckToken& at) : token_(at) {}
00532 };
00533
00534 virtual SendControlStatus send_control(const DataSampleHeader& header,
00535 ACE_Message_Block* msg
00536 );
00537
00538
00539
00540
00541 bool pending_control();
00542
00543 private:
00544
00545 void track_sequence_number(GUIDSeq* filter_out);
00546
00547 void notify_publication_lost(const DDS::InstanceHandleSeq& handles);
00548
00549 DDS::ReturnCode_t dispose_and_unregister(DDS::InstanceHandle_t handle,
00550 const DDS::Time_t& timestamp);
00551
00552
00553
00554
00555
00556
00557
00558 ACE_Message_Block*
00559 create_control_message(MessageId message_id,
00560 DataSampleHeader& header,
00561 ACE_Message_Block* data,
00562 const DDS::Time_t& source_timestamp);
00563
00564
00565 bool send_liveliness(const ACE_Time_Value& now);
00566
00567
00568 bool lookup_instance_handles(const ReaderIdSeq& ids,
00569 DDS::InstanceHandleSeq& hdls);
00570
00571 const RepoId& get_repo_id() const {
00572 return this->publication_id_;
00573 }
00574 DDS::DomainId_t domain_id() const {
00575 return this->domain_id_;
00576 }
00577
00578 CORBA::Long get_priority_value(const AssociationData&) const {
00579 return this->qos_.transport_priority.value;
00580 }
00581
00582 void association_complete_i(const RepoId& remote_id);
00583
00584 friend class ::DDS_TEST;
00585
00586
00587 CORBA::String_var topic_name_;
00588
00589 RepoId topic_id_;
00590
00591 DDS::Topic_var topic_objref_;
00592
00593 TopicImpl* topic_servant_;
00594
00595
00596
00597 DDS::StatusMask listener_mask_;
00598
00599 DDS::DataWriterListener_var listener_;
00600
00601 DDS::DomainId_t domain_id_;
00602
00603 PublisherImpl* publisher_servant_;
00604
00605 DDS::DataWriter_var dw_local_objref_;
00606
00607 PublicationId publication_id_;
00608
00609 SequenceNumber sequence_number_;
00610
00611
00612 bool coherent_;
00613
00614
00615 ACE_UINT32 coherent_samples_;
00616
00617 WriteDataContainer* data_container_;
00618
00619
00620 ACE_Recursive_Thread_Mutex lock_;
00621
00622 typedef OPENDDS_MAP_CMP(RepoId, DDS::InstanceHandle_t, GUID_tKeyLessThan) RepoIdToHandleMap;
00623
00624 RepoIdToHandleMap id_to_handle_map_;
00625
00626 RepoIdSet readers_;
00627
00628
00629 DDS::LivelinessLostStatus liveliness_lost_status_ ;
00630 DDS::OfferedDeadlineMissedStatus offered_deadline_missed_status_ ;
00631 DDS::OfferedIncompatibleQosStatus offered_incompatible_qos_status_ ;
00632 DDS::PublicationMatchedStatus publication_match_status_ ;
00633
00634
00635
00636 bool liveliness_lost_;
00637
00638
00639
00640
00641
00642
00643
00644
00645
00646
00647
00648
00649
00650
00651 MessageBlockAllocator* mb_allocator_;
00652
00653 DataBlockAllocator* db_allocator_;
00654
00655 DataSampleHeaderAllocator* header_allocator_;
00656
00657
00658
00659 ACE_Reactor_Timer_Interface* reactor_;
00660
00661 ACE_Time_Value liveliness_check_interval_;
00662
00663 ACE_Time_Value last_liveliness_activity_time_;
00664
00665 ACE_Time_Value last_liveliness_check_time_;
00666
00667
00668 CORBA::Long last_deadline_missed_total_count_;
00669
00670
00671 OfferedDeadlineWatchdog* watchdog_;
00672
00673
00674 bool cancel_timer_;
00675
00676
00677
00678 bool is_bit_;
00679
00680
00681 bool initialized_;
00682
00683 RepoIdSet pending_readers_, assoc_complete_readers_;
00684
00685
00686 ACE_UINT64 min_suspended_transaction_id_;
00687 ACE_UINT64 max_suspended_transaction_id_;
00688 SendStateDataSampleList available_data_list_;
00689
00690
00691 Monitor* monitor_;
00692
00693
00694 Monitor* periodic_monitor_;
00695
00696
00697 DataBlockLockPool* db_lock_pool_;
00698
00699
00700
00701 bool need_sequence_repair();
00702 bool need_sequence_repair_i() const;
00703
00704 DDS::ReturnCode_t send_end_historic_samples(const RepoId& readerId);
00705
00706 bool liveliness_asserted_;
00707
00708
00709
00710 ACE_Thread_Mutex sync_unreg_rem_assocs_lock_;
00711 };
00712
00713 }
00714 }
00715
00716 #endif