DataWriterImpl.h

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #ifndef OPENDDS_DCPS_DATAWRITER_H
00009 #define OPENDDS_DCPS_DATAWRITER_H
00010 
00011 #include "dds/DdsDcpsDomainC.h"
00012 #include "dds/DdsDcpsTopicC.h"
00013 #include "dds/DCPS/DataWriterCallbacks.h"
00014 #include "dds/DCPS/transport/framework/TransportSendListener.h"
00015 #include "dds/DCPS/transport/framework/TransportClient.h"
00016 #include "dds/DCPS/MessageTracker.h"
00017 #include "dds/DCPS/DataBlockLockPool.h"
00018 #include "dds/DCPS/PoolAllocator.h"
00019 #include "WriteDataContainer.h"
00020 #include "Definitions.h"
00021 #include "DataSampleHeader.h"
00022 #include "TopicImpl.h"
00023 #include "Time_Helper.h"
00024 #include "CoherentChangeControl.h"
00025 #include "GuidUtils.h"
00026 #include "RcEventHandler.h"
00027 #include "unique_ptr.h"
00028 #include "Message_Block_Ptr.h"
00029 
00030 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00031 #include "FilterEvaluator.h"
00032 #endif
00033 
00034 #include "ace/Event_Handler.h"
00035 #include "ace/OS_NS_sys_time.h"
00036 
00037 #include <memory>
00038 
00039 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00040 #pragma once
00041 #endif /* ACE_LACKS_PRAGMA_ONCE */
00042 
00043 class DDS_TEST;
00044 
00045 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00046 
00047 namespace OpenDDS {
00048 namespace DCPS {
00049 
00050 class PublisherImpl;
00051 class DomainParticipantImpl;
00052 class OfferedDeadlineWatchdog;
00053 class Monitor;
00054 class DataSampleElement;
00055 class SendStateDataSampleList;
00056 struct AssociationData;
00057 class LivenessTimer;
00058 
00059 
00060 
00061 
00062 /**
00063 * @class DataWriterImpl
00064 *
00065 * @brief Implements the OpenDDS::DCPS::DataWriterRemote interfaces and
00066 *        DDS::DataWriter interfaces.
00067 *
00068 * See the DDS specification, OMG formal/04-12-02, for a description of
00069 * the interface this class is implementing.
00070 *
00071 * This class must be inherited by the type-specific datawriter which
00072 * is specific to the data-type associated with the topic.
00073 *
00074 * @note: This class is responsible for allocating memory for the
00075 *        header message block
00076 *        (MessageBlock + DataBlock + DataSampleHeader) and the
00077 *        DataSampleElement.
00078 *        The data-type datawriter is responsible for allocating
00079 *        memory for the sample data message block.
00080 *        (e.g. MessageBlock + DataBlock + Foo data). But it gives
00081 *        up ownership to this WriteDataContainer.
00082 */
00083 class OpenDDS_Dcps_Export DataWriterImpl
00084   : public virtual LocalObject<DDS::DataWriter>,
00085     public virtual DataWriterCallbacks,
00086     public virtual EntityImpl,
00087     public virtual TransportClient,
00088     public virtual TransportSendListener {
00089 public:
00090   friend class WriteDataContainer;
00091   friend class PublisherImpl;
00092 
00093   typedef OPENDDS_MAP_CMP(RepoId, SequenceNumber, GUID_tKeyLessThan) RepoIdToSequenceMap;
00094 
00095   struct AckToken {
00096     ACE_Time_Value tstamp_;
00097     DDS::Duration_t max_wait_;
00098     SequenceNumber sequence_;
00099 
00100     AckToken(const DDS::Duration_t& max_wait,
00101              const SequenceNumber& sequence)
00102       : tstamp_(ACE_OS::gettimeofday()),
00103         max_wait_(max_wait),
00104         sequence_(sequence) {}
00105 
00106     ~AckToken() {}
00107 
00108     ACE_Time_Value deadline() const {
00109       return duration_to_absolute_time_value(this->max_wait_, this->tstamp_);
00110     }
00111 
00112     DDS::Time_t timestamp() const {
00113       return time_value_to_time(this->tstamp_);
00114     }
00115   };
00116 
00117   DataWriterImpl();
00118 
00119   virtual ~DataWriterImpl();
00120 
00121   virtual DDS::InstanceHandle_t get_instance_handle();
00122 
00123   virtual DDS::ReturnCode_t set_qos(const DDS::DataWriterQos & qos);
00124 
00125   virtual DDS::ReturnCode_t get_qos(DDS::DataWriterQos & qos);
00126 
00127   virtual DDS::ReturnCode_t set_listener(
00128     DDS::DataWriterListener_ptr a_listener,
00129     DDS::StatusMask mask);
00130 
00131   virtual DDS::DataWriterListener_ptr get_listener();
00132 
00133   virtual DDS::Topic_ptr get_topic();
00134 
00135   virtual DDS::ReturnCode_t wait_for_acknowledgments(
00136     const DDS::Duration_t & max_wait);
00137 
00138   virtual DDS::Publisher_ptr get_publisher();
00139 
00140   virtual DDS::ReturnCode_t get_liveliness_lost_status(
00141     DDS::LivelinessLostStatus & status);
00142 
00143   virtual DDS::ReturnCode_t get_offered_deadline_missed_status(
00144     DDS::OfferedDeadlineMissedStatus & status);
00145 
00146   virtual DDS::ReturnCode_t get_offered_incompatible_qos_status(
00147     DDS::OfferedIncompatibleQosStatus & status);
00148 
00149   virtual DDS::ReturnCode_t get_publication_matched_status(
00150     DDS::PublicationMatchedStatus & status);
00151 
00152   ACE_Time_Value liveliness_check_interval(DDS::LivelinessQosPolicyKind kind);
00153 
00154   bool participant_liveliness_activity_after(const ACE_Time_Value& tv);
00155 
00156   virtual DDS::ReturnCode_t assert_liveliness();
00157 
00158   virtual DDS::ReturnCode_t assert_liveliness_by_participant();
00159 
00160   typedef OPENDDS_VECTOR(DDS::InstanceHandle_t) InstanceHandleVec;
00161   void get_instance_handles(InstanceHandleVec& instance_handles);
00162 
00163   void get_readers(RepoIdSet& readers);
00164 
00165   virtual DDS::ReturnCode_t get_matched_subscriptions(
00166     DDS::InstanceHandleSeq & subscription_handles);
00167 
00168 #if !defined (DDS_HAS_MINIMUM_BIT)
00169   virtual DDS::ReturnCode_t get_matched_subscription_data(
00170     DDS::SubscriptionBuiltinTopicData & subscription_data,
00171     DDS::InstanceHandle_t subscription_handle);
00172 #endif // !defined (DDS_HAS_MINIMUM_BIT)
00173 
00174   virtual DDS::ReturnCode_t enable();
00175 
00176   virtual void add_association(const RepoId& yourId,
00177                                const ReaderAssociation& reader,
00178                                bool active);
00179 
00180   virtual void transport_assoc_done(int flags, const RepoId& remote_id);
00181 
00182   virtual void association_complete(const RepoId& remote_id);
00183 
00184   virtual void remove_associations(const ReaderIdSeq & readers,
00185                                    bool callback);
00186 
00187   virtual void update_incompatible_qos(const IncompatibleQosStatus& status);
00188 
00189   virtual void update_subscription_params(const RepoId& readerId,
00190                                           const DDS::StringSeq& params);
00191 
00192   virtual void inconsistent_topic();
00193 
00194 
00195   /**
00196    * cleanup the DataWriter.
00197    */
00198   void cleanup();
00199 
00200   /**
00201    * Initialize the data members.
00202    */
00203   void init(
00204     TopicImpl*                            topic_servant,
00205     const DDS::DataWriterQos &            qos,
00206     DDS::DataWriterListener_ptr           a_listener,
00207     const DDS::StatusMask &               mask,
00208     WeakRcHandle<OpenDDS::DCPS::DomainParticipantImpl> participant_servant,
00209     OpenDDS::DCPS::PublisherImpl*         publisher_servant);
00210 
00211   void send_all_to_flush_control(ACE_Guard<ACE_Recursive_Thread_Mutex>& guard);
00212 
00213   /**
00214    * Delegate to the WriteDataContainer to register
00215    * Must tell the transport to broadcast the registered
00216    * instance upon returning.
00217    */
00218   DDS::ReturnCode_t
00219   register_instance_i(
00220     DDS::InstanceHandle_t& handle,
00221     Message_Block_Ptr data,
00222     const DDS::Time_t& source_timestamp);
00223 
00224   /**
00225    * Delegate to the WriteDataContainer to register and tell
00226    * the transport to broadcast the registered instance.
00227    */
00228   DDS::ReturnCode_t
00229   register_instance_from_durable_data(
00230     DDS::InstanceHandle_t& handle,
00231     Message_Block_Ptr data,
00232     const DDS::Time_t & source_timestamp);
00233 
00234   /**
00235    * Delegate to the WriteDataContainer to unregister and tell
00236    * the transport to broadcast the unregistered instance.
00237    */
00238   DDS::ReturnCode_t
00239   unregister_instance_i(
00240     DDS::InstanceHandle_t handle,
00241     const DDS::Time_t & source_timestamp);
00242 
00243   /**
00244    * Unregister all registered instances and tell the transport
00245    * to broadcast the unregistered instances.
00246    */
00247   void unregister_instances(const DDS::Time_t& source_timestamp);
00248 
00249   /**
00250    * Delegate to the WriteDataContainer to queue the instance
00251    * sample and finally tell the transport to send the sample.
00252    * \param filter_out can either be null (if the writer can't
00253    *        or won't evaluate the filters), or a list of
00254    *        associated reader RepoIds that should NOT get the
00255    *        data sample due to content filtering.
00256    */
00257   DDS::ReturnCode_t write(Message_Block_Ptr sample,
00258                           DDS::InstanceHandle_t handle,
00259                           const DDS::Time_t& source_timestamp,
00260                           GUIDSeq* filter_out);
00261 
00262   /**
00263    * Delegate to the WriteDataContainer to dispose all data
00264    * samples for a given instance and tell the transport to
00265    * broadcast the disposed instance.
00266    */
00267   DDS::ReturnCode_t dispose(DDS::InstanceHandle_t handle,
00268                             const DDS::Time_t & source_timestamp);
00269 
00270   /**
00271    * Return the number of samples for a given instance.
00272    */
00273   DDS::ReturnCode_t num_samples(DDS::InstanceHandle_t handle,
00274                                 size_t&               size);
00275 
00276   /**
00277    * Retrieve the unsent data from the WriteDataContainer.
00278    */
00279    ACE_UINT64 get_unsent_data(SendStateDataSampleList& list) {
00280     return data_container_->get_unsent_data(list);
00281   }
00282 
00283   SendStateDataSampleList get_resend_data() {
00284     return data_container_->get_resend_data();
00285   }
00286 
00287   /**
00288    * Accessor of the repository id of this datawriter/publication.
00289    */
00290   RepoId get_publication_id();
00291 
00292   /**
00293    * Accessor of the repository id of the domain participant.
00294    */
00295   RepoId get_dp_id();
00296 
00297   /**
00298    * Delegate to WriteDataContainer to unregister all instances.
00299    */
00300   void unregister_all();
00301 
00302   /**
00303    * This is called by transport to notify that the sample is
00304    * delivered and it is delegated to WriteDataContainer
00305    * to adjust the internal data sample threads.
00306    */
00307   void data_delivered(const DataSampleElement* sample);
00308 
00309   /**
00310    * This is called by transport to notify that the control
00311    * message is delivered.
00312    */
00313   void control_delivered(const Message_Block_Ptr& sample);
00314 
00315   /// Does this writer have samples to be acknowledged?
00316   bool should_ack() const;
00317 
00318   /// Create an AckToken for ack operations.
00319   AckToken create_ack_token(DDS::Duration_t max_wait) const;
00320 
00321   virtual void retrieve_inline_qos_data(TransportSendListener::InlineQosData& qos_data) const;
00322 
00323   virtual bool check_transport_qos(const TransportInst& inst);
00324 
00325 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00326 
00327   /// Are coherent changes pending?
00328   bool coherent_changes_pending();
00329 
00330   /// Starts a coherent change set; should only be called once.
00331   void begin_coherent_changes();
00332 
00333   /// Ends a coherent change set; should only be called once.
00334   void end_coherent_changes(const GroupCoherentSamples& group_samples);
00335 
00336 #endif
00337 
00338   /**
00339    * Get associated topic type name.
00340    */
00341   char const* get_type_name() const;
00342 
00343   /**
00344    * This mothod is called by transport to notify the instance
00345    * sample is dropped and it delegates to WriteDataContainer
00346    * to update the internal list.
00347    */
00348   void data_dropped(const DataSampleElement* element,
00349                     bool dropped_by_transport);
00350 
00351   /**
00352    * This is called by transport to notify that the control
00353    * message is dropped.
00354    */
00355   void control_dropped(const Message_Block_Ptr& sample,
00356                        bool dropped_by_transport);
00357 
00358   /**
00359    * Accessor of the WriterDataContainer's lock.
00360    */
00361   // ciju: Seems this is no longer being used.
00362   // Was wrong. Still required.
00363   ACE_INLINE
00364   ACE_Recursive_Thread_Mutex& get_lock() {
00365     return data_container_->lock_;
00366   }
00367 
00368   /**
00369    * This is used to retrieve the listener for a certain status
00370    * change.
00371    *
00372    * If this datawriter has a registered listener and the status
00373    * kind is in the listener mask then the listener is returned.
00374    * Otherwise, the query for the listener is propagated up to the
00375    * factory/publisher.
00376    */
00377   DDS::DataWriterListener_ptr listener_for(DDS::StatusKind kind);
00378 
00379   /// Handle the assert liveliness timeout.
00380   virtual int handle_timeout(const ACE_Time_Value &tv,
00381                              const void *arg);
00382 
00383   /// Called by the PublisherImpl to indicate that the Publisher is now
00384   /// resumed and any data collected while it was suspended should now be sent.
00385   void send_suspended_data();
00386 
00387   void remove_all_associations();
00388 
00389   virtual void register_for_reader(const RepoId& participant,
00390                                    const RepoId& writerid,
00391                                    const RepoId& readerid,
00392                                    const TransportLocatorSeq& locators,
00393                                    DiscoveryListener* listener);
00394 
00395   virtual void unregister_for_reader(const RepoId& participant,
00396                                      const RepoId& writerid,
00397                                      const RepoId& readerid);
00398 
00399   void notify_publication_disconnected(const ReaderIdSeq& subids);
00400   void notify_publication_reconnected(const ReaderIdSeq& subids);
00401   void notify_publication_lost(const ReaderIdSeq& subids);
00402 
00403   /// Statistics counter.
00404   int         data_dropped_count_;
00405   int         data_delivered_count_;
00406 
00407   MessageTracker controlTracker;
00408 
00409   /**
00410    * This method create a header message block and chain with
00411    * the sample data. The header contains the information
00412    * needed. e.g. message id, length of whole message...
00413    * The fast allocator is used to allocate the message block,
00414    * data block and header.
00415    */
00416   DDS::ReturnCode_t
00417   create_sample_data_message(Message_Block_Ptr data,
00418                              DDS::InstanceHandle_t instance_handle,
00419                              DataSampleHeader& header_data,
00420                              Message_Block_Ptr& message,
00421                              const DDS::Time_t& source_timestamp,
00422                              bool content_filter);
00423 
00424 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
00425   /// Make sent data available beyond the lifetime of this
00426   /// @c DataWriter.
00427   bool persist_data();
00428 #endif
00429 
00430   // Reset time interval for each instance.
00431   void reschedule_deadline();
00432 
00433   /// Wait for pending samples to drain.
00434   void wait_pending();
00435 
00436   /**
00437    * Get an instance handle for a new instance.
00438    */
00439   DDS::InstanceHandle_t get_next_handle();
00440 
00441   virtual RcHandle<EntityImpl> parent() const;
00442 
00443 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00444   bool filter_out(const DataSampleElement& elt,
00445                   const OPENDDS_STRING& filterClassName,
00446                   const FilterEvaluator& evaluator,
00447                   const DDS::StringSeq& expression_params) const;
00448 #endif
00449 
00450   /**
00451    * Wait until pending control elements have either been delivered
00452    * or dropped.
00453    */
00454   void wait_control_pending();
00455 
00456   DataBlockLockPool::DataBlockLock* get_db_lock() {
00457     return db_lock_pool_->get_lock();
00458   }
00459 
00460  /**
00461   *  Attempt to locate an existing instance for the given handle.
00462   */
00463  PublicationInstance_rch get_handle_instance(
00464    DDS::InstanceHandle_t handle);
00465 
00466 
00467 protected:
00468 
00469   DDS::ReturnCode_t wait_for_specific_ack(const AckToken& token);
00470 
00471   void prepare_to_delete();
00472 
00473   // type specific DataWriter's part of enable.
00474   virtual DDS::ReturnCode_t enable_specific() = 0;
00475 
00476   /// The number of chunks for the cached allocator.
00477   size_t                     n_chunks_;
00478 
00479   /// The multiplier for allocators affected by associations
00480   size_t                     association_chunk_multiplier_;
00481 
00482 
00483   /// The type name of associated topic.
00484   CORBA::String_var               type_name_;
00485 
00486   /// The qos policy list of this datawriter.
00487   DDS::DataWriterQos              qos_;
00488 
00489   /// The participant servant which creats the publisher that
00490   /// creates this datawriter.
00491   WeakRcHandle<DomainParticipantImpl>          participant_servant_;
00492 
00493   //This lock should be used to protect access to reader_info_
00494   ACE_Thread_Mutex reader_info_lock_;
00495 
00496   struct ReaderInfo {
00497 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00498     WeakRcHandle<DomainParticipantImpl> participant_;
00499     OPENDDS_STRING filter_class_name_;
00500     OPENDDS_STRING filter_;
00501     DDS::StringSeq expression_params_;
00502     RcHandle<FilterEvaluator> eval_;
00503 #endif
00504     SequenceNumber expected_sequence_;
00505     bool durable_;
00506     ReaderInfo(const char* filter_class_name, const char* filter, const DDS::StringSeq& params,
00507                WeakRcHandle<DomainParticipantImpl> participant, bool durable);
00508     ~ReaderInfo();
00509   };
00510 
00511   typedef OPENDDS_MAP_CMP(RepoId, ReaderInfo, GUID_tKeyLessThan) RepoIdToReaderInfoMap;
00512   RepoIdToReaderInfoMap reader_info_;
00513 
00514   struct AckCustomization {
00515     GUIDSeq customized_;
00516     AckToken& token_;
00517     explicit AckCustomization(AckToken& at) : token_(at) {}
00518   };
00519 
00520   virtual SendControlStatus send_control(const DataSampleHeader& header,
00521                                          Message_Block_Ptr msg);
00522 
00523 private:
00524 
00525   void track_sequence_number(GUIDSeq* filter_out);
00526 
00527   void notify_publication_lost(const DDS::InstanceHandleSeq& handles);
00528 
00529   DDS::ReturnCode_t dispose_and_unregister(DDS::InstanceHandle_t handle,
00530                                            const DDS::Time_t& timestamp);
00531 
00532   /**
00533    * This method create a header message block and chain with
00534    * the registered sample. The header contains the information
00535    * needed. e.g. message id, length of whole message...
00536    * The fast allocator is not used for the header.
00537    */
00538   ACE_Message_Block*
00539   create_control_message(MessageId message_id,
00540                          DataSampleHeader& header,
00541                          Message_Block_Ptr data,
00542                          const DDS::Time_t& source_timestamp);
00543 
00544   /// Send the liveliness message.
00545   bool send_liveliness(const ACE_Time_Value& now);
00546 
00547   /// Lookup the instance handles by the subscription repo ids
00548   void lookup_instance_handles(const ReaderIdSeq& ids,
00549                                DDS::InstanceHandleSeq& hdls);
00550 
00551   const RepoId& get_repo_id() const {
00552     return this->publication_id_;
00553   }
00554 
00555   DDS::DomainId_t domain_id() const {
00556     return this->domain_id_;
00557   }
00558 
00559   CORBA::Long get_priority_value(const AssociationData&) const {
00560     return this->qos_.transport_priority.value;
00561   }
00562 
00563 #if defined(OPENDDS_SECURITY)
00564   DDS::Security::ParticipantCryptoHandle get_crypto_handle() const;
00565 #endif
00566 
00567   void association_complete_i(const RepoId& remote_id);
00568 
00569   friend class ::DDS_TEST; // allows tests to get at privates
00570 
00571 
00572   // Data block local pool for this data writer.
00573   unique_ptr<DataBlockLockPool>  db_lock_pool_;
00574 
00575   /// The name of associated topic.
00576   CORBA::String_var               topic_name_;
00577   /// The associated topic repository id.
00578   RepoId                          topic_id_;
00579   /// The topic servant.
00580   TopicDescriptionPtr<TopicImpl>                 topic_servant_;
00581 
00582   /// The StatusKind bit mask indicates which status condition change
00583   /// can be notified by the listener of this entity.
00584   DDS::StatusMask                 listener_mask_;
00585   /// Used to notify the entity for relevant events.
00586   DDS::DataWriterListener_var     listener_;
00587   /// The domain id.
00588   DDS::DomainId_t                 domain_id_;
00589   RepoId                          dp_id_;
00590   /// The publisher servant which creates this datawriter.
00591   WeakRcHandle<PublisherImpl>     publisher_servant_;
00592   /// The repository id of this datawriter/publication.
00593   PublicationId                   publication_id_;
00594   /// The sequence number unique in DataWriter scope.
00595   SequenceNumber                  sequence_number_;
00596   /// Flag indicating DataWriter current belongs to
00597   /// a coherent change set.
00598   bool                            coherent_;
00599   /// The number of samples belonging to the current
00600   /// coherent change set.
00601   ACE_UINT32                      coherent_samples_;
00602   /// The sample data container.
00603   unique_ptr<WriteDataContainer>  data_container_;
00604   /// The lock to protect the activate subscriptions
00605   /// and status changes.
00606   ACE_Recursive_Thread_Mutex      lock_;
00607 
00608   typedef OPENDDS_MAP_CMP(RepoId, DDS::InstanceHandle_t, GUID_tKeyLessThan) RepoIdToHandleMap;
00609 
00610   RepoIdToHandleMap               id_to_handle_map_;
00611 
00612   RepoIdSet readers_;
00613 
00614   /// Status conditions.
00615   DDS::LivelinessLostStatus           liveliness_lost_status_ ;
00616   DDS::OfferedDeadlineMissedStatus    offered_deadline_missed_status_ ;
00617   DDS::OfferedIncompatibleQosStatus   offered_incompatible_qos_status_ ;
00618   DDS::PublicationMatchedStatus       publication_match_status_ ;
00619 
00620   /// True if the writer failed to actively signal its liveliness within
00621   /// its offered liveliness period.
00622   bool liveliness_lost_;
00623 
00624   /**
00625    * @todo The publication_lost_status_ and
00626    *       publication_reconnecting_status_ are left here for
00627    *       future use when we add get_publication_lost_status()
00628    *       and get_publication_reconnecting_status() methods.
00629    */
00630   // Statistics of the lost publications due to lost connection.
00631   // PublicationLostStatus               publication_lost_status_;
00632   // Statistics of the publications that associates with a
00633   // reconnecting datalink.
00634   // PublicationReconnectingStatus       publication_reconnecting_status_;
00635 
00636   /// The message block allocator.
00637   unique_ptr<MessageBlockAllocator>     mb_allocator_;
00638   /// The data block allocator.
00639   unique_ptr<DataBlockAllocator>        db_allocator_;
00640   /// The header data allocator.
00641   unique_ptr<DataSampleHeaderAllocator> header_allocator_;
00642 
00643   /// The orb's reactor to be used to register the liveliness
00644   /// timer.
00645   ACE_Reactor_Timer_Interface* reactor_;
00646   /// The time interval for sending liveliness message.
00647   ACE_Time_Value             liveliness_check_interval_;
00648   /// Timestamp of last write/dispose/assert_liveliness.
00649   ACE_Time_Value             last_liveliness_activity_time_;
00650   /// Total number of offered deadlines missed during last offered
00651   /// deadline status check.
00652   CORBA::Long last_deadline_missed_total_count_;
00653   /// Watchdog responsible for reporting missed offered
00654   /// deadlines.
00655   RcHandle<OfferedDeadlineWatchdog> watchdog_;
00656 
00657   /// Flag indicates that this datawriter is a builtin topic
00658   /// datawriter.
00659   bool                       is_bit_;
00660 
00661   RepoIdSet pending_readers_, assoc_complete_readers_;
00662 
00663   /// The cached available data while suspending and associated transaction ids.
00664   ACE_UINT64 min_suspended_transaction_id_;
00665   ACE_UINT64 max_suspended_transaction_id_;
00666   SendStateDataSampleList             available_data_list_;
00667 
00668   /// Monitor object for this entity
00669   Monitor* monitor_;
00670 
00671   /// Periodic Monitor object for this entity
00672   Monitor* periodic_monitor_;
00673 
00674 
00675   // Do we need to set the sequence repair header bit?
00676   //   must call prior to incrementing sequence number
00677   bool need_sequence_repair();
00678   bool need_sequence_repair_i() const;
00679 
00680   DDS::ReturnCode_t send_end_historic_samples(const RepoId& readerId);
00681   DDS::ReturnCode_t send_request_ack();
00682 
00683   bool liveliness_asserted_;
00684 
00685   // Lock used to synchronize remove_associations calls from discovery
00686   // and unregister_instances during deletion of datawriter from application
00687   ACE_Thread_Mutex sync_unreg_rem_assocs_lock_;
00688   RcHandle<LivenessTimer> liveness_timer_;
00689 };
00690 
00691 typedef RcHandle<DataWriterImpl> DataWriterImpl_rch;
00692 
00693 
00694 class LivenessTimer : public RcEventHandler
00695 {
00696 public:
00697   LivenessTimer(DataWriterImpl& writer)
00698     : writer_(writer)
00699   {
00700   }
00701 
00702   /// Handle the assert liveliness timeout.
00703   virtual int handle_timeout(const ACE_Time_Value &tv,
00704                              const void *arg);
00705 
00706 private:
00707   WeakRcHandle<DataWriterImpl> writer_;
00708 };
00709 
00710 } // namespace DCPS
00711 } // namespace OpenDDS
00712 
00713 OPENDDS_END_VERSIONED_NAMESPACE_DECL
00714 
00715 #endif
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1