WriteDataContainer.h

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #ifndef OPENDDS_DCPS_WRITE_DATA_CONTAINER_H
00009 #define OPENDDS_DCPS_WRITE_DATA_CONTAINER_H
00010 
00011 #include "dds/DdsDcpsInfrastructureC.h"
00012 #include "dds/DdsDcpsCoreC.h"
00013 #include "DataSampleElement.h"
00014 #include "SendStateDataSampleList.h"
00015 #include "WriterDataSampleList.h"
00016 #include "DisjointSequence.h"
00017 #include "PoolAllocator.h"
00018 #include "PoolAllocationBase.h"
00019 #include "Message_Block_Ptr.h"
00020 
00021 #include "ace/Synch_Traits.h"
00022 #include "ace/Condition_T.h"
00023 #include "ace/Condition_Thread_Mutex.h"
00024 #include "ace/Condition_Recursive_Thread_Mutex.h"
00025 
00026 #include <memory>
00027 
00028 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00029 #pragma once
00030 #endif /* ACE_LACKS_PRAGMA_ONCE */
00031 
00032 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00033 
00034 namespace OpenDDS {
00035 namespace DCPS {
00036 
00037 class InstanceDataSampleList;
00038 class DataWriterImpl;
00039 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
00040 class DataDurabilityCache;
00041 #endif
00042 class FilterEvaluator;
00043 
00044 typedef OPENDDS_MAP(DDS::InstanceHandle_t, PublicationInstance_rch)
00045   PublicationInstanceMapType;
00046 
00047 /**
00048  * @class WriteDataContainer
00049  *
00050  * @brief A container for instances sample data.
00051  *
00052  * This container is instantiated per DataWriter. It maintains
00053  * list of PublicationInstance objects which is internally
00054  * referenced by the instance handle.
00055  *
00056  * This container contains threaded lists of all data written to a
00057  * given DataWriter. The real data sample is represented by the
00058  * DataSampleElement.  The data_holder_ holds all
00059  * DataSampleElement in the writing order via the
00060  * next_writer_sample_/previous_writer_sample_ thread. The instance list in
00061  * PublicationInstance links samples via the next_instance_sample_
00062  * thread.
00063  *
00064  * There are three state transition lists used during write operations:
00065  * unsent, sending, and sent.  These lists are linked
00066  * via the next_send_sample_/previous_send_sample_ thread. Any
00067  * DataSampleElement should be in one of these three lists and
00068  * SHOULD NOT be shared between these three lists.
00069  * A normal transition of a DataSampleElement would be
00070  * unsent->sending->sent.  A DataSampleElement transitions from unsent
00071  * to sent naturally during a write operation when get_unsent_data is called.
00072  * A DataSampleElement transitions from sending to sent when data_delivered
00073  * is called notifying the container that the transport is finished with the
00074  * sample and it can be marked as a historical sample.
00075  * A DataSampleElement may transition back to unsent from sending if
00076  * the transport notifies that the sample was dropped and should be resent.
00077  * A DataSampleElement is removed from sent list and freed when the instance
00078  * queue or container (for another instance) needs more space.
00079  * A DataSampleElement may be removed and freed directly from sending when
00080  * and unreliable writer needs space.  In this case the transport will be
00081  * notified that the sample should be removed.
00082  * A DataSampleElement may also be removed directly from unsent if an unreliable
00083  * writer needs space for new samples.
00084  * Note: The real data sample will be freed when the reference counting goes 0.
00085  * The resend list is only used when the datawriter uses
00086  * TRANSIENT_LOCAL_DURABILITY_QOS. It holds the DataSampleElements
00087  * for the data sample duplicates of the sending and sent list and
00088  * hands this list off to the transport.
00089  *
00090  *
00091  *
00092  * @note: 1) The PublicationInstance object is not removed from
00093  *           this container until the instance is
00094  *           unregistered. The same instance handle is reused for
00095  *           re-registration. The instance data is deleted when
00096  *           this container is deleted. This would simplify
00097  *           instance data memory management. An alternative way
00098  *           is to remove the handle from the instance list when
00099  *           unregister occurs and delete the instance data after
00100  *           the transport is done with the instance, but we do
00101  *           not have a way to know when the transport is done
00102  *           since we have the sending list for all instances in
00103  *           the same datawriter.
00104  *        2) This container has the ownership of the instance data
00105  *           samples when the data is written. The type-specific
00106  *           datawriter that allocates the memory for the sample
00107  *           data gives the ownership to its base class.
00108  *        3) It is the responsibility of the owner of objects of
00109  *           this class to ensure that access to the lists are
00110  *           properly locked.  This means using the same
00111  *           lock/condition to access the lists via the enqueue(),
00112  *           get*(), and data_delivered() methods.  For the case
00113  *           where these are all called from the same (client)
00114  *           thread, this should be a recursive lock so that: 1)
00115  *           we do not deadlock; and, 2) we incur the cost of
00116  *           obtaining the lock only once.
00117  */
00118 class OpenDDS_Dcps_Export WriteDataContainer : public PoolAllocationBase {
00119 public:
00120 
00121   friend class DataWriterImpl;
00122 
00123   /**
00124    * No default constructor, must be initialized.
00125    */
00126   WriteDataContainer(
00127     /// The writer which owns this container.
00128     DataWriterImpl*  writer,
00129     /// Max samples kept within each instance
00130     CORBA::Long      max_samples_per_instance,
00131     CORBA::Long history_depth,
00132     /// Max durable samples sent for each instance
00133     CORBA::Long      max_durable_per_instance,
00134     /// The timeout for write.
00135     DDS::Duration_t max_blocking_time,
00136     /// The number of chunks that the DataSampleElementAllocator
00137     /// needs allocate.
00138     size_t           n_chunks,
00139     /// Domain ID.
00140     DDS::DomainId_t  domain_id,
00141     /// Topic name.
00142     char const *     topic_name,
00143     /// Type name.
00144     char const *     type_name,
00145 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
00146     /// The data durability cache for unsent data.
00147     DataDurabilityCache * durability_cache,
00148     /// DURABILITY_SERVICE QoS specific to the DataWriter.
00149     DDS::DurabilityServiceQosPolicy const & durability_service,
00150 #endif
00151     /// maximum number of instances, 0 for unlimited
00152     CORBA::Long      max_instances,
00153     /// maximum total number of samples, 0 for unlimited
00154     CORBA::Long      max_total_samples);
00155 
00156   ~WriteDataContainer();
00157 
00158   DDS::ReturnCode_t
00159   enqueue_control(DataSampleElement* control_sample);
00160 
00161   /**
00162    * Enqueue the data sample in its instance thread. This method
00163    * assumes there is an available space for the sample in the
00164    * instance list.
00165   */
00166   DDS::ReturnCode_t
00167   enqueue(
00168     DataSampleElement* sample,
00169     DDS::InstanceHandle_t instance);
00170 
00171   /**
00172    * Create a resend list with the copies of all current "sending"
00173    * and "sent" samples. The samples will be sent to the
00174    *  subscriber specified.
00175    */
00176   DDS::ReturnCode_t reenqueue_all(const RepoId& reader_id,
00177                                   const DDS::LifespanQosPolicy& lifespan
00178 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00179                                   ,
00180                                   const OPENDDS_STRING& filterClassName,
00181                                   const FilterEvaluator* eval,
00182                                   const DDS::StringSeq& params
00183 #endif
00184                                   );
00185 
00186   /**
00187    * Dynamically allocate a PublicationInstance object and add to
00188    * the instances_ list.
00189    *
00190    * @note: The registered_sample is an input and output parameter.
00191    *        A shallow copy of the sample data will be given to
00192    *        datawriter as part of the control message.
00193    */
00194   DDS::ReturnCode_t
00195   register_instance(DDS::InstanceHandle_t&  instance_handle,
00196                     Message_Block_Ptr&      registered_sample);
00197 
00198   /**
00199    * Remove the provided instance from the instances_ list.
00200    * The registered sample data will be released upon the deletion
00201    * of the PublicationInstance. A shallow copy of the sample data
00202    * will be given to datawriter as part of the control message if
00203    * the dup_registered_sample is true.
00204    *
00205    * This method returns error if the instance is not registered.
00206    */
00207   DDS::ReturnCode_t unregister(
00208     DDS::InstanceHandle_t handle,
00209     Message_Block_Ptr&    registered_sample,
00210     bool dup_registered_sample = true);
00211 
00212   /**
00213    * Delete the samples for the provided instance.
00214    * A shallow copy of the sample data will be given to datawriter
00215    * as part of the control message if the dup_registered_sample
00216    * is true.
00217    *
00218    * This method returns error if the instance is not registered.
00219    */
00220   DDS::ReturnCode_t dispose(
00221     DDS::InstanceHandle_t handle,
00222     Message_Block_Ptr& registered_sample,
00223     bool dup_registered_sample = true);
00224 
00225   /**
00226    * Return the number of samples for the given instance.
00227    */
00228   DDS::ReturnCode_t num_samples(
00229     DDS::InstanceHandle_t handle,
00230     size_t& size);
00231 
00232   /**
00233    * Return the number of samples for all instances.
00234    */
00235   size_t num_all_samples();
00236 
00237   /**
00238    * Obtain a list of data that has not yet been sent.  The data
00239    * on the list returned is moved from the internal unsent_data_
00240    * list to the internal sending_data_ list as part of this call.
00241    * The entire list is linked via the
00242    * DataSampleElement.next_send_sample_ link as well.
00243    */
00244    ACE_UINT64 get_unsent_data(SendStateDataSampleList& list);
00245 
00246   /**
00247    * Obtain a list of data for resending. This is only used when
00248    * TRANSIENT_LOCAL_DURABILITY_QOS is used. The data on the list
00249    * returned is not put on any SendStateDataSampleList.
00250    */
00251   SendStateDataSampleList get_resend_data() ;
00252 
00253   /**
00254    * Returns if pending data exists.  This includes
00255    * sending, and unsent data.
00256    */
00257   bool pending_data();
00258 
00259   /**
00260    * Acknowledge the delivery of data.  The sample that resides in
00261    * this container will be moved from sending_data_ list to the
00262    * internal sent_data_ list. If there are any threads waiting for
00263    * available space, it wakes up these threads.
00264    */
00265   void data_delivered(const DataSampleElement* sample);
00266 
00267   /**
00268    * This method is called by the transport to notify the sample
00269    * is dropped.  Which the transport was told to do by the
00270    * publication code by calling
00271    * TransportClient::remove_sample(). If the sample was
00272    * "sending" then it is moved to the "unsent" list. If there are any
00273    * threads waiting for available space then it needs wake up
00274    * these threads. The dropped_by_transport flag true indicates
00275    * the dropping initiated by transport when the transport send
00276    * strategy is in a MODE_TERMINATED. The dropped_by_transport
00277    * flag false indicates the dropping is initiated by the
00278    * remove_sample and data_dropped() is a result of
00279    * remove_sample().
00280    */
00281   void data_dropped(const DataSampleElement* element,
00282                     bool dropped_by_transport);
00283 
00284   DDS::ReturnCode_t obtain_buffer_for_control(DataSampleElement*& element);
00285 
00286   /**
00287    * Allocate a DataSampleElement object and check the space
00288    * availability for newly allocated element according to qos settings.
00289    * For the blocking write case, if resource limits or history qos limits
00290    * are reached, then it blocks for max blocking time for a previous sample
00291    * to be delivered or dropped by the transport. In non-blocking write
00292    * case, if resource limits or history qos limits are reached, will attempt
00293    * to remove oldest samples (forcing the transport to drop samples if necessary)
00294    * to make space.  If there are several threads waiting then
00295    * the first one in the waiting list can enqueue, others continue
00296    * waiting.
00297    */
00298   DDS::ReturnCode_t obtain_buffer(
00299     DataSampleElement*& element,
00300     DDS::InstanceHandle_t handle);
00301 
00302   /**
00303    * Release the memory previously allocated.
00304    * This method is corresponding to the obtain_buffer method. If
00305    * the memory is allocated by some allocator then the memory
00306    * needs to be released to the allocator.
00307    */
00308   void release_buffer(DataSampleElement* element);
00309 
00310   /**
00311    * Unregister all instances managed by this data containers.
00312    */
00313   void unregister_all();
00314 
00315   /**
00316    * @todo remove/document this!
00317    */
00318   PublicationInstance_rch get_handle_instance(
00319     DDS::InstanceHandle_t handle);
00320 
00321 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
00322   /**
00323    * Copy sent data to data DURABILITY cache.
00324    */
00325   bool persist_data();
00326 #endif
00327 
00328   /// Reset time interval for each instance.
00329   void reschedule_deadline();
00330 
00331   /**
00332    * Block until pending samples have either been delivered
00333    * or dropped.
00334    */
00335   void wait_pending();
00336 
00337   /**
00338    * Returns a vector of handles for the instances registered for this
00339    * data writer.
00340    */
00341   typedef OPENDDS_VECTOR(DDS::InstanceHandle_t) InstanceHandleVec;
00342   void get_instance_handles(InstanceHandleVec& instance_handles);
00343 
00344   DDS::ReturnCode_t wait_ack_of_seq(const ACE_Time_Value& abs_deadline, const SequenceNumber& sequence);
00345 
00346   bool sequence_acknowledged(const SequenceNumber sequence);
00347 
00348 private:
00349 
00350   // A class, normally provided by an unit test, that needs access to
00351   // private methods/members.
00352   friend class ::DDS_TEST;
00353 
00354   // --------------------------
00355   // Preventing copying
00356   // --------------------------
00357   WriteDataContainer(WriteDataContainer const &);
00358   WriteDataContainer & operator= (WriteDataContainer const &);
00359   // --------------------------
00360 
00361   void copy_and_prepend(SendStateDataSampleList& list,
00362                         const SendStateDataSampleList& appended,
00363                         const RepoId& reader_id,
00364                         const DDS::LifespanQosPolicy& lifespan,
00365 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00366                         const OPENDDS_STRING& filterClassName,
00367                         const FilterEvaluator* eval,
00368                         const DDS::StringSeq& params,
00369 #endif
00370                         ssize_t& max_resend_samples);
00371 
00372   /**
00373    * Remove the oldest "n" samples from each instance list that are
00374    * in a state such that they could only be used for durability
00375    * purposes (see reenqueue_all).
00376    * "n" is determined by max_durable_per_instance_, so these samples
00377    * are truly unneeded -- there are max_durable_per_instance_ newer
00378    * samples available in the instance.
00379    */
00380   void remove_excess_durable();
00381 
00382   /**
00383    * Remove the oldest sample (head) from the instance history list.
00384    * This method also updates the internal lists to reflect
00385    * the change.
00386    * If the sample is in the unsent_data_ or sent_data_ list then
00387    * it will be released. If the sample is in the sending_data_ list
00388    * then the transport will be notified to release the sample, then
00389    * the sample will be released. Otherwise an error
00390    * is returned.
00391    * The "released" boolean value indicates whether the sample is
00392    * released.
00393    */
00394   DDS::ReturnCode_t remove_oldest_sample(
00395     InstanceDataSampleList& instance_list,
00396     bool& released);
00397 
00398   /**
00399    * Called when data has been dropped or delivered and any
00400    * blocked writers should be notified
00401    */
00402   void wakeup_blocking_writers (DataSampleElement* stale);
00403 
00404 private:
00405 
00406   void log_send_state_lists (OPENDDS_STRING description);
00407 
00408   DisjointSequence acked_sequences_;
00409 
00410   /// List of data that has not been sent yet.
00411   SendStateDataSampleList   unsent_data_;
00412 
00413   /// Id used to keep track of which send transaction
00414   /// DataWriter is currently creating
00415   ACE_UINT64 transaction_id_;
00416 
00417   /// List of data that is currently being sent.
00418   SendStateDataSampleList   sending_data_;
00419 
00420   /// List of data that has already been sent.
00421   SendStateDataSampleList   sent_data_;
00422 
00423   /// List of data that has been released by WriteDataContainer
00424   /// but is still in process of delivery (or dropping) by transport
00425   SendStateDataSampleList  orphaned_to_transport_;
00426 
00427   /// The list of all samples written to this datawriter in
00428   /// writing order.
00429   WriterDataSampleList   data_holder_;
00430 
00431   /// List of the data reenqueued to support the
00432   /// TRANSIENT_LOCAL_DURABILITY_QOS policy. It duplicates the
00433   /// samples in sent and sending list. This
00434   /// list will be passed to the transport for re-sending.
00435   SendStateDataSampleList   resend_data_;
00436 
00437   /// The individual instance queue threads in the data.
00438   PublicationInstanceMapType instances_;
00439 
00440   /// The publication Id from repo.
00441   PublicationId    publication_id_;
00442 
00443   /// The writer that owns this container.
00444   DataWriterImpl*  writer_;
00445 
00446   /// The maximum size a container should allow for
00447   /// an instance sample list
00448   CORBA::Long                     max_samples_per_instance_;
00449 
00450   CORBA::Long history_depth_;
00451 
00452   /// The maximum number of samples from each instance that
00453   /// can be added to the resend_data_ for durability.
00454   CORBA::Long                     max_durable_per_instance_;
00455 
00456   /// The maximum number of instances allowed or zero
00457   /// to indicate unlimited.
00458   /// It corresponds to the QoS.RESOURCE_LIMITS.max_instances
00459   /// when QoS.RELIABILITY.kind == DDS::RELIABLE_RELIABILITY_QOS
00460   CORBA::Long                     max_num_instances_;
00461 
00462   /// The maximum number of samples allowed or zero
00463   /// to indicate unlimited.
00464   /// It corresponds to the QoS.RESOURCE_LIMITS.max_instances
00465   /// when QoS.RELIABILITY.kind == DDS::RELIABLE_RELIABILITY_QOS
00466   /// It also covers QoS.RESOURCE_LIMITS.max_samples and
00467   /// max_instances * max_samples_per_instance
00468   CORBA::Long                     max_num_samples_;
00469 
00470   /// The maximum time to block on write operation.
00471   /// This comes from DataWriter's QoS HISTORY.max_blocking_time
00472   DDS::Duration_t               max_blocking_time_;
00473 
00474   /// The block waiting flag.
00475   bool                            waiting_on_release_;
00476 
00477   /// This lock is used to protect the container and the map
00478   /// in the type-specific DataWriter.
00479   /// This lock can be accessible via the datawriter.
00480   /// This lock is made to be globally accessible for
00481   /// performance concern. The lock is acquired as the external
00482   /// call (e.g. FooDataWriterImpl::write) started and the
00483   /// same lock will be used by the transport thread to notify
00484   /// the datawriter the data is delivered. Other internal
00485   /// operations will not lock.
00486   ACE_Recursive_Thread_Mutex                lock_;
00487   ACE_Condition<ACE_Recursive_Thread_Mutex> condition_;
00488   ACE_Condition<ACE_Recursive_Thread_Mutex> empty_condition_;
00489 
00490   /// Lock used for wait_for_acks() processing.
00491   ACE_Thread_Mutex wfa_lock_;
00492 
00493   /// Used to block in wait_for_acks().
00494   ACE_Condition<ACE_Thread_Mutex> wfa_condition_;
00495 
00496   /// The number of chunks that sample_list_element_allocator_
00497   /// needs initialize.
00498   size_t                                    n_chunks_;
00499 
00500   /// The cached allocator to allocate DataSampleElement
00501   /// objects.
00502   DataSampleElementAllocator sample_list_element_allocator_;
00503 
00504   /// The flag indicates the datawriter will be destroyed.
00505   bool shutdown_;
00506 
00507   /// Domain ID.
00508   DDS::DomainId_t const domain_id_;
00509 
00510   /// Topic name.
00511   char const * const topic_name_;
00512 
00513   /// Type name.
00514   char const * const type_name_;
00515 
00516 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
00517 
00518   /// Pointer to the data durability cache.
00519   /**
00520    * This a pointer to the data durability cache owned by the
00521    * Service Participant singleton, which means this cache is also
00522    * a singleton.
00523    */
00524   DataDurabilityCache * const durability_cache_;
00525 
00526   /// DURABILITY_SERVICE QoS specific to the DataWriter.
00527   DDS::DurabilityServiceQosPolicy const & durability_service_;
00528 
00529 #endif
00530 };
00531 
00532 } /// namespace OpenDDS
00533 } /// namespace DCPS
00534 
00535 OPENDDS_END_VERSIONED_NAMESPACE_DECL
00536 
00537 #endif /* OPENDDS_DCPS_WRITE_DATA_CONTAINER_H */
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1