DataWriterImpl.h

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #ifndef OPENDDS_DCPS_DATAWRITER_H
00009 #define OPENDDS_DCPS_DATAWRITER_H
00010 
00011 #include "dds/DdsDcpsDomainC.h"
00012 #include "dds/DdsDcpsTopicC.h"
00013 #include "dds/DCPS/DataWriterCallbacks.h"
00014 #include "dds/DCPS/transport/framework/TransportSendListener.h"
00015 #include "dds/DCPS/transport/framework/TransportClient.h"
00016 #include "dds/DCPS/MessageTracker.h"
00017 #include "dds/DCPS/DataBlockLockPool.h"
00018 #include "dds/DCPS/PoolAllocator.h"
00019 #include "WriteDataContainer.h"
00020 #include "Definitions.h"
00021 #include "DataSampleHeader.h"
00022 #include "TopicImpl.h"
00023 #include "Qos_Helper.h"
00024 #include "CoherentChangeControl.h"
00025 #include "GuidUtils.h"
00026 
00027 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00028 #include "FilterEvaluator.h"
00029 #endif
00030 
00031 #include "ace/Event_Handler.h"
00032 #include "ace/OS_NS_sys_time.h"
00033 #include "ace/Condition_T.h"
00034 #include "ace/Condition_Recursive_Thread_Mutex.h"
00035 
00036 #include <memory>
00037 
00038 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00039 #pragma once
00040 #endif /* ACE_LACKS_PRAGMA_ONCE */
00041 
00042 class DDS_TEST;
00043 
00044 namespace OpenDDS {
00045 namespace DCPS {
00046 
00047 class PublisherImpl;
00048 class DomainParticipantImpl;
00049 class OfferedDeadlineWatchdog;
00050 class Monitor;
00051 class DataSampleElement;
00052 class SendStateDataSampleList;
00053 struct AssociationData;
00054 
00055 /**
00056 * @class DataWriterImpl
00057 *
00058 * @brief Implements the OpenDDS::DCPS::DataWriterRemote interfaces and
00059 *        DDS::DataWriter interfaces.
00060 *
00061 * See the DDS specification, OMG formal/04-12-02, for a description of
00062 * the interface this class is implementing.
00063 *
00064 * This class must be inherited by the type-specific datawriter which
00065 * is specific to the data-type associated with the topic.
00066 *
00067 * @note: This class is responsible for allocating memory for the
00068 *        header message block
00069 *        (MessageBlock + DataBlock + DataSampleHeader) and the
00070 *        DataSampleElement.
00071 *        The data-type datawriter is responsible for allocating
00072 *        memory for the sample data message block.
00073 *        (e.g. MessageBlock + DataBlock + Foo data). But it gives
00074 *        up ownership to this WriteDataContainer.
00075 */
00076 class OpenDDS_Dcps_Export DataWriterImpl
00077   : public virtual DDS::DataWriter,
00078     public virtual DataWriterCallbacks,
00079     public virtual EntityImpl,
00080     public virtual TransportClient,
00081     public virtual TransportSendListener,
00082     public virtual ACE_Event_Handler {
00083 public:
00084   friend class WriteDataContainer;
00085   friend class PublisherImpl;
00086 
00087   typedef OPENDDS_MAP_CMP(RepoId, SequenceNumber, GUID_tKeyLessThan) RepoIdToSequenceMap;
00088 
00089   struct AckToken {
00090     ACE_Time_Value tstamp_;
00091     DDS::Duration_t max_wait_;
00092     SequenceNumber sequence_;
00093 
00094     AckToken(const DDS::Duration_t& max_wait,
00095              const SequenceNumber& sequence)
00096       : tstamp_(ACE_OS::gettimeofday()),
00097         max_wait_(max_wait),
00098         sequence_(sequence) {}
00099 
00100     ~AckToken() {}
00101 
00102     ACE_Time_Value deadline() const {
00103       return duration_to_absolute_time_value(this->max_wait_, this->tstamp_);
00104     }
00105 
00106     DDS::Time_t timestamp() const {
00107       return time_value_to_time(this->tstamp_);
00108     }
00109   };
00110 
00111   ///Constructor
00112   DataWriterImpl();
00113 
00114   ///Destructor
00115   virtual ~DataWriterImpl();
00116 
00117   virtual DDS::InstanceHandle_t get_instance_handle();
00118 
00119   virtual DDS::ReturnCode_t set_qos(const DDS::DataWriterQos & qos);
00120 
00121   virtual DDS::ReturnCode_t get_qos(DDS::DataWriterQos & qos);
00122 
00123   virtual DDS::ReturnCode_t set_listener(
00124     DDS::DataWriterListener_ptr a_listener,
00125     DDS::StatusMask mask);
00126 
00127   virtual DDS::DataWriterListener_ptr get_listener();
00128 
00129   virtual DDS::Topic_ptr get_topic();
00130 
00131   virtual DDS::ReturnCode_t wait_for_acknowledgments(
00132     const DDS::Duration_t & max_wait);
00133 
00134   virtual DDS::Publisher_ptr get_publisher();
00135 
00136   virtual DDS::ReturnCode_t get_liveliness_lost_status(
00137     DDS::LivelinessLostStatus & status);
00138 
00139   virtual DDS::ReturnCode_t get_offered_deadline_missed_status(
00140     DDS::OfferedDeadlineMissedStatus & status);
00141 
00142   virtual DDS::ReturnCode_t get_offered_incompatible_qos_status(
00143     DDS::OfferedIncompatibleQosStatus & status);
00144 
00145   virtual DDS::ReturnCode_t get_publication_matched_status(
00146     DDS::PublicationMatchedStatus & status);
00147 
00148   ACE_Time_Value liveliness_check_interval(DDS::LivelinessQosPolicyKind kind);
00149 
00150   bool participant_liveliness_activity_after(const ACE_Time_Value& tv);
00151 
00152   virtual DDS::ReturnCode_t assert_liveliness();
00153 
00154   virtual DDS::ReturnCode_t assert_liveliness_by_participant();
00155 
00156   typedef OPENDDS_VECTOR(DDS::InstanceHandle_t) InstanceHandleVec;
00157   void get_instance_handles(InstanceHandleVec& instance_handles);
00158 
00159   void get_readers(RepoIdSet& readers);
00160 
00161   virtual DDS::ReturnCode_t get_matched_subscriptions(
00162     DDS::InstanceHandleSeq & subscription_handles);
00163 
00164 #if !defined (DDS_HAS_MINIMUM_BIT)
00165   virtual DDS::ReturnCode_t get_matched_subscription_data(
00166     DDS::SubscriptionBuiltinTopicData & subscription_data,
00167     DDS::InstanceHandle_t subscription_handle);
00168 #endif // !defined (DDS_HAS_MINIMUM_BIT)
00169 
00170   virtual DDS::ReturnCode_t enable();
00171 
00172   virtual void add_association(const RepoId& yourId,
00173                                const ReaderAssociation& reader,
00174                                bool active);
00175 
00176   virtual void transport_assoc_done(int flags, const RepoId& remote_id);
00177 
00178   virtual void association_complete(const RepoId& remote_id);
00179 
00180   virtual void remove_associations(const ReaderIdSeq & readers,
00181                                    bool callback);
00182 
00183   virtual void update_incompatible_qos(const IncompatibleQosStatus& status);
00184 
00185   virtual void update_subscription_params(const RepoId& readerId,
00186                                           const DDS::StringSeq& params);
00187 
00188   virtual void inconsistent_topic();
00189 
00190   /**
00191    * cleanup the DataWriter.
00192    */
00193   void cleanup();
00194 
00195   /**
00196    * Initialize the data members.
00197    */
00198   virtual void init(
00199     DDS::Topic_ptr                        topic,
00200     TopicImpl*                            topic_servant,
00201     const DDS::DataWriterQos &            qos,
00202     DDS::DataWriterListener_ptr           a_listener,
00203     const DDS::StatusMask &               mask,
00204     OpenDDS::DCPS::DomainParticipantImpl* participant_servant,
00205     OpenDDS::DCPS::PublisherImpl*         publisher_servant,
00206     DDS::DataWriter_ptr                   dw_local);
00207 
00208   void send_all_to_flush_control(ACE_Guard<ACE_Recursive_Thread_Mutex>& guard);
00209 
00210   /**
00211    * Delegate to the WriteDataContainer to register
00212    * Must tell the transport to broadcast the registered
00213    * instance upon returning.
00214    */
00215   DDS::ReturnCode_t
00216   register_instance_i(
00217     DDS::InstanceHandle_t& handle,
00218     DataSample* data,
00219     const DDS::Time_t& source_timestamp);
00220 
00221   /**
00222    * Delegate to the WriteDataContainer to register and tell
00223    * the transport to broadcast the registered instance.
00224    */
00225   DDS::ReturnCode_t
00226   register_instance_from_durable_data(
00227     DDS::InstanceHandle_t& handle,
00228     DataSample* data,
00229     const DDS::Time_t & source_timestamp);
00230 
00231   /**
00232    * Delegate to the WriteDataContainer to unregister and tell
00233    * the transport to broadcast the unregistered instance.
00234    */
00235   DDS::ReturnCode_t
00236   unregister_instance_i(
00237     DDS::InstanceHandle_t handle,
00238     const DDS::Time_t & source_timestamp);
00239 
00240   /**
00241    * Unregister all registered instances and tell the transport
00242    * to broadcast the unregistered instances.
00243    */
00244   void unregister_instances(const DDS::Time_t& source_timestamp);
00245 
00246   /**
00247    * Delegate to the WriteDataContainer to queue the instance
00248    * sample and finally tell the transport to send the sample.
00249    * \param filter_out can either be null (if the writer can't
00250    *        or won't evaluate the filters), or a list of
00251    *        associated reader RepoIds that should NOT get the
00252    *        data sample due to content filtering.
00253    */
00254   DDS::ReturnCode_t write(DataSample* sample,
00255                           DDS::InstanceHandle_t handle,
00256                           const DDS::Time_t& source_timestamp,
00257                           GUIDSeq* filter_out);
00258 
00259   /**
00260    * Delegate to the WriteDataContainer to dispose all data
00261    * samples for a given instance and tell the transport to
00262    * broadcast the disposed instance.
00263    */
00264   DDS::ReturnCode_t dispose(DDS::InstanceHandle_t handle,
00265                             const DDS::Time_t & source_timestamp);
00266 
00267   /**
00268    * Return the number of samples for a given instance.
00269    */
00270   DDS::ReturnCode_t num_samples(DDS::InstanceHandle_t handle,
00271                                 size_t&               size);
00272 
00273   /**
00274    * Retrieve the unsent data from the WriteDataContainer.
00275    */
00276    ACE_UINT64 get_unsent_data(SendStateDataSampleList& list) {
00277     return data_container_->get_unsent_data(list);
00278   }
00279 
00280   SendStateDataSampleList get_resend_data() {
00281     return data_container_->get_resend_data();
00282   }
00283 
00284   /**
00285    * Accessor of the repository id of this datawriter/publication.
00286    */
00287   RepoId get_publication_id();
00288 
00289   /**
00290    * Accessor of the repository id of the domain participant.
00291    */
00292   RepoId get_dp_id();
00293 
00294   /**
00295    * Delegate to WriteDataContainer to unregister all instances.
00296    */
00297   void unregister_all();
00298 
00299   /**
00300    * This is called by transport to notify that the sample is
00301    * delivered and it is delegated to WriteDataContainer
00302    * to adjust the internal data sample threads.
00303    */
00304   void data_delivered(const DataSampleElement* sample);
00305 
00306   /**
00307    * This is called by transport to notify that the control
00308    * message is delivered.
00309    */
00310   void control_delivered(ACE_Message_Block* sample);
00311 
00312   /// Does this writer have samples to be acknowledged?
00313   bool should_ack() const;
00314 
00315   /// Create an AckToken for ack operations.
00316   AckToken create_ack_token(DDS::Duration_t max_wait) const;
00317 
00318   virtual void retrieve_inline_qos_data(TransportSendListener::InlineQosData& qos_data) const;
00319 
00320   virtual bool check_transport_qos(const TransportInst& inst);
00321 
00322 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00323 
00324   /// Are coherent changes pending?
00325   bool coherent_changes_pending();
00326 
00327   /// Starts a coherent change set; should only be called once.
00328   void begin_coherent_changes();
00329 
00330   /// Ends a coherent change set; should only be called once.
00331   void end_coherent_changes(const GroupCoherentSamples& group_samples);
00332 
00333 #endif
00334 
00335   /**
00336    * Accessor of the associated topic name.
00337    */
00338   const char* get_topic_name();
00339 
00340   /**
00341    * Get associated topic type name.
00342    */
00343   char const* get_type_name() const;
00344 
00345   /**
00346    * This mothod is called by transport to notify the instance
00347    * sample is dropped and it delegates to WriteDataContainer
00348    * to update the internal list.
00349    */
00350   void data_dropped(const DataSampleElement* element,
00351                     bool dropped_by_transport);
00352 
00353   /**
00354    * This is called by transport to notify that the control
00355    * message is dropped.
00356    */
00357   void control_dropped(ACE_Message_Block* sample,
00358                        bool dropped_by_transport);
00359 
00360   /**
00361    * Accessor of the WriterDataContainer's lock.
00362    */
00363   // ciju: Seems this is no longer being used.
00364   // Was wrong. Still required.
00365   ACE_INLINE
00366   ACE_Recursive_Thread_Mutex& get_lock() {
00367     return data_container_->lock_;
00368   }
00369 
00370   /**
00371    * This method is called when an instance is unregistered from
00372    * the WriteDataContainer.
00373    *
00374    * The subclass must provide the implementation to unregister
00375    * the instance from its own map.
00376    */
00377   virtual void unregistered(DDS::InstanceHandle_t instance_handle) = 0;
00378 
00379   /**
00380    * This is used to retrieve the listener for a certain status
00381    * change.
00382    *
00383    * If this datawriter has a registered listener and the status
00384    * kind is in the listener mask then the listener is returned.
00385    * Otherwise, the query for the listener is propagated up to the
00386    * factory/publisher.
00387    */
00388   DDS::DataWriterListener_ptr listener_for(DDS::StatusKind kind);
00389 
00390   /// Handle the assert liveliness timeout.
00391   virtual int handle_timeout(const ACE_Time_Value &tv,
00392                              const void *arg);
00393 
00394   virtual int handle_close(ACE_HANDLE,
00395                            ACE_Reactor_Mask);
00396 
00397   /// Called by the PublisherImpl to indicate that the Publisher is now
00398   /// resumed and any data collected while it was suspended should now be sent.
00399   void send_suspended_data();
00400 
00401   void remove_all_associations();
00402 
00403   virtual void register_for_reader(const RepoId& participant,
00404                                    const RepoId& writerid,
00405                                    const RepoId& readerid,
00406                                    const TransportLocatorSeq& locators,
00407                                    DiscoveryListener* listener);
00408 
00409   virtual void unregister_for_reader(const RepoId& participant,
00410                                      const RepoId& writerid,
00411                                      const RepoId& readerid);
00412 
00413   void notify_publication_disconnected(const ReaderIdSeq& subids);
00414   void notify_publication_reconnected(const ReaderIdSeq& subids);
00415   void notify_publication_lost(const ReaderIdSeq& subids);
00416 
00417   virtual void notify_connection_deleted(const RepoId& peerId);
00418 
00419   /// Statistics counter.
00420   int         data_dropped_count_;
00421   int         data_delivered_count_;
00422 
00423   MessageTracker controlTracker;
00424 
00425   /**
00426    * This method create a header message block and chain with
00427    * the sample data. The header contains the information
00428    * needed. e.g. message id, length of whole message...
00429    * The fast allocator is used to allocate the message block,
00430    * data block and header.
00431    */
00432   DDS::ReturnCode_t
00433   create_sample_data_message(DataSample* data,
00434                              DDS::InstanceHandle_t instance_handle,
00435                              DataSampleHeader& header_data,
00436                              ACE_Message_Block*& message,
00437                              const DDS::Time_t& source_timestamp,
00438                              bool content_filter);
00439 
00440 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
00441   /// Make sent data available beyond the lifetime of this
00442   /// @c DataWriter.
00443   bool persist_data();
00444 #endif
00445 
00446   // Reset time interval for each instance.
00447   void reschedule_deadline();
00448 
00449   /// Wait for pending samples to drain.
00450   void wait_pending();
00451 
00452   /**
00453    * Get an instance handle for a new instance.
00454    */
00455   DDS::InstanceHandle_t get_next_handle();
00456 
00457   virtual EntityImpl* parent() const;
00458 
00459 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00460   bool filter_out(const DataSampleElement& elt,
00461                   const OPENDDS_STRING& filterClassName,
00462                   const FilterEvaluator& evaluator,
00463                   const DDS::StringSeq& expression_params) const;
00464 #endif
00465 
00466   /**
00467    * Wait until pending control elements have either been delivered
00468    * or dropped.
00469    */
00470   void wait_control_pending();
00471 
00472   DataBlockLockPool::DataBlockLock* get_db_lock() {
00473     return db_lock_pool_->get_lock();
00474   }
00475 
00476 protected:
00477 
00478   DDS::ReturnCode_t wait_for_specific_ack(const AckToken& token);
00479 
00480   void prepare_to_delete();
00481 
00482   // type specific DataWriter's part of enable.
00483   virtual DDS::ReturnCode_t enable_specific() = 0;
00484 
00485   /// The number of chunks for the cached allocator.
00486   size_t                     n_chunks_;
00487 
00488   /// The multiplier for allocators affected by associations
00489   size_t                     association_chunk_multiplier_;
00490 
00491   /**
00492    *  Attempt to locate an existing instance for the given handle.
00493    */
00494   PublicationInstance* get_handle_instance(
00495     DDS::InstanceHandle_t handle);
00496 
00497   /// The type name of associated topic.
00498   CORBA::String_var               type_name_;
00499 
00500   /// The qos policy list of this datawriter.
00501   DDS::DataWriterQos              qos_;
00502 
00503   /// The participant servant which creats the publisher that
00504   /// creates this datawriter.
00505   DomainParticipantImpl*          participant_servant_;
00506 
00507   //This lock should be used to protect access to reader_info_
00508   ACE_Thread_Mutex reader_info_lock_;
00509 
00510   struct ReaderInfo {
00511 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00512     DomainParticipantImpl* participant_;
00513     OPENDDS_STRING filter_class_name_;
00514     OPENDDS_STRING filter_;
00515     DDS::StringSeq expression_params_;
00516     RcHandle<FilterEvaluator> eval_;
00517 #endif
00518     SequenceNumber expected_sequence_;
00519     bool durable_;
00520     ReaderInfo(const char* filter_class_name, const char* filter, const DDS::StringSeq& params,
00521                DomainParticipantImpl* participant, bool durable);
00522     ~ReaderInfo();
00523   };
00524 
00525   typedef OPENDDS_MAP_CMP(RepoId, ReaderInfo, GUID_tKeyLessThan) RepoIdToReaderInfoMap;
00526   RepoIdToReaderInfoMap reader_info_;
00527 
00528   struct AckCustomization {
00529     GUIDSeq customized_;
00530     AckToken& token_;
00531     explicit AckCustomization(AckToken& at) : token_(at) {}
00532   };
00533 
00534   virtual SendControlStatus send_control(const DataSampleHeader& header,
00535                                          ACE_Message_Block* msg/*,
00536                                          void* extra = 0*/);
00537 
00538   /**
00539    * Answer if transport of all control messages is pending.
00540    */
00541   bool pending_control();
00542 
00543 private:
00544 
00545   void track_sequence_number(GUIDSeq* filter_out);
00546 
00547   void notify_publication_lost(const DDS::InstanceHandleSeq& handles);
00548 
00549   DDS::ReturnCode_t dispose_and_unregister(DDS::InstanceHandle_t handle,
00550                                            const DDS::Time_t& timestamp);
00551 
00552   /**
00553    * This method create a header message block and chain with
00554    * the registered sample. The header contains the information
00555    * needed. e.g. message id, length of whole message...
00556    * The fast allocator is not used for the header.
00557    */
00558   ACE_Message_Block*
00559   create_control_message(MessageId message_id,
00560                          DataSampleHeader& header,
00561                          ACE_Message_Block* data,
00562                          const DDS::Time_t& source_timestamp);
00563 
00564   /// Send the liveliness message.
00565   bool send_liveliness(const ACE_Time_Value& now);
00566 
00567   /// Lookup the instance handles by the subscription repo ids
00568   bool lookup_instance_handles(const ReaderIdSeq& ids,
00569                                DDS::InstanceHandleSeq& hdls);
00570 
00571   const RepoId& get_repo_id() const {
00572     return this->publication_id_;
00573   }
00574   DDS::DomainId_t domain_id() const {
00575     return this->domain_id_;
00576   }
00577 
00578   CORBA::Long get_priority_value(const AssociationData&) const {
00579     return this->qos_.transport_priority.value;
00580   }
00581 
00582   void association_complete_i(const RepoId& remote_id);
00583 
00584   friend class ::DDS_TEST; // allows tests to get at privates
00585 
00586   /// The name of associated topic.
00587   CORBA::String_var               topic_name_;
00588   /// The associated topic repository id.
00589   RepoId                          topic_id_;
00590   /// The object reference of the associated topic.
00591   DDS::Topic_var                  topic_objref_;
00592   /// The topic servant.
00593   TopicImpl*                      topic_servant_;
00594 
00595   /// The StatusKind bit mask indicates which status condition change
00596   /// can be notified by the listener of this entity.
00597   DDS::StatusMask                 listener_mask_;
00598   /// Used to notify the entity for relevant events.
00599   DDS::DataWriterListener_var     listener_;
00600   /// The domain id.
00601   DDS::DomainId_t                 domain_id_;
00602   /// The publisher servant which creates this datawriter.
00603   PublisherImpl*                  publisher_servant_;
00604   /// the object reference of the local datawriter
00605   DDS::DataWriter_var             dw_local_objref_;
00606   /// The repository id of this datawriter/publication.
00607   PublicationId                   publication_id_;
00608   /// The sequence number unique in DataWriter scope.
00609   SequenceNumber                  sequence_number_;
00610   /// Flag indicating DataWriter current belongs to
00611   /// a coherent change set.
00612   bool                            coherent_;
00613   /// The number of samples belonging to the current
00614   /// coherent change set.
00615   ACE_UINT32                      coherent_samples_;
00616   /// The sample data container.
00617   WriteDataContainer*             data_container_;
00618   /// The lock to protect the activate subscriptions
00619   /// and status changes.
00620   ACE_Recursive_Thread_Mutex      lock_;
00621 
00622   typedef OPENDDS_MAP_CMP(RepoId, DDS::InstanceHandle_t, GUID_tKeyLessThan) RepoIdToHandleMap;
00623 
00624   RepoIdToHandleMap               id_to_handle_map_;
00625 
00626   RepoIdSet readers_;
00627 
00628   /// Status conditions.
00629   DDS::LivelinessLostStatus           liveliness_lost_status_ ;
00630   DDS::OfferedDeadlineMissedStatus    offered_deadline_missed_status_ ;
00631   DDS::OfferedIncompatibleQosStatus   offered_incompatible_qos_status_ ;
00632   DDS::PublicationMatchedStatus       publication_match_status_ ;
00633 
00634   /// True if the writer failed to actively signal its liveliness within
00635   /// its offered liveliness period.
00636   bool liveliness_lost_;
00637 
00638   /**
00639    * @todo The publication_lost_status_ and
00640    *       publication_reconnecting_status_ are left here for
00641    *       future use when we add get_publication_lost_status()
00642    *       and get_publication_reconnecting_status() methods.
00643    */
00644   // Statistics of the lost publications due to lost connection.
00645   // PublicationLostStatus               publication_lost_status_;
00646   // Statistics of the publications that associates with a
00647   // reconnecting datalink.
00648   // PublicationReconnectingStatus       publication_reconnecting_status_;
00649 
00650   /// The message block allocator.
00651   MessageBlockAllocator*     mb_allocator_;
00652   /// The data block allocator.
00653   DataBlockAllocator*        db_allocator_;
00654   /// The header data allocator.
00655   DataSampleHeaderAllocator* header_allocator_;
00656 
00657   /// The orb's reactor to be used to register the liveliness
00658   /// timer.
00659   ACE_Reactor_Timer_Interface* reactor_;
00660   /// The time interval for sending liveliness message.
00661   ACE_Time_Value             liveliness_check_interval_;
00662   /// Timestamp of last write/dispose/assert_liveliness.
00663   ACE_Time_Value             last_liveliness_activity_time_;
00664   /// Timestamp of the last time liveliness was checked.
00665   ACE_Time_Value             last_liveliness_check_time_;
00666   /// Total number of offered deadlines missed during last offered
00667   /// deadline status check.
00668   CORBA::Long last_deadline_missed_total_count_;
00669   /// Watchdog responsible for reporting missed offered
00670   /// deadlines.
00671   OfferedDeadlineWatchdog* watchdog_;
00672   /// The flag indicates whether the liveliness timer is scheduled and
00673   /// needs be cancelled.
00674   bool                       cancel_timer_;
00675 
00676   /// Flag indicates that this datawriter is a builtin topic
00677   /// datawriter.
00678   bool                       is_bit_;
00679 
00680   /// Flag indicates that the init() is called.
00681   bool                       initialized_;
00682 
00683   RepoIdSet pending_readers_, assoc_complete_readers_;
00684 
00685   /// The cached available data while suspending and associated transaction ids.
00686   ACE_UINT64 min_suspended_transaction_id_;
00687   ACE_UINT64 max_suspended_transaction_id_;
00688   SendStateDataSampleList             available_data_list_;
00689 
00690   /// Monitor object for this entity
00691   Monitor* monitor_;
00692 
00693   /// Periodic Monitor object for this entity
00694   Monitor* periodic_monitor_;
00695 
00696   // Data block local pool for this data writer.
00697   DataBlockLockPool*  db_lock_pool_;
00698 
00699   // Do we need to set the sequence repair header bit?
00700   //   must call prior to incrementing sequence number
00701   bool need_sequence_repair();
00702   bool need_sequence_repair_i() const;
00703 
00704   DDS::ReturnCode_t send_end_historic_samples(const RepoId& readerId);
00705 
00706   bool liveliness_asserted_;
00707 
00708   // Lock used to synchronize remove_associations calls from discovery
00709   // and unregister_instances during deletion of datawriter from application
00710   ACE_Thread_Mutex sync_unreg_rem_assocs_lock_;
00711 };
00712 
00713 } // namespace DCPS
00714 } // namespace OpenDDS
00715 
00716 #endif

Generated on Fri Feb 12 20:05:20 2016 for OpenDDS by  doxygen 1.4.7