WriteDataContainer.h

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #ifndef OPENDDS_DCPS_WRITE_DATA_CONTAINER_H
00009 #define OPENDDS_DCPS_WRITE_DATA_CONTAINER_H
00010 
00011 #include "dds/DdsDcpsInfrastructureC.h"
00012 #include "DataSampleElement.h"
00013 #include "SendStateDataSampleList.h"
00014 #include "WriterDataSampleList.h"
00015 #include "DisjointSequence.h"
00016 #include "PoolAllocator.h"
00017 #include "PoolAllocationBase.h"
00018 
00019 #include "ace/Condition_T.h"
00020 #include "Service_Participant.h"
00021 
00022 #include "ace/Null_Mutex.h"
00023 #include "ace/Thread_Mutex.h"
00024 #include "ace/Recursive_Thread_Mutex.h"
00025 
00026 #include <memory>
00027 
00028 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00029 #pragma once
00030 #endif /* ACE_LACKS_PRAGMA_ONCE */
00031 
00032 namespace OpenDDS {
00033 namespace DCPS {
00034 
00035 class InstanceDataSampleList;
00036 class DataWriterImpl;
00037 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
00038 class DataDurabilityCache;
00039 #endif
00040 class FilterEvaluator;
00041 
00042 typedef OPENDDS_MAP(DDS::InstanceHandle_t, PublicationInstance*)
00043   PublicationInstanceMapType;
00044 
00045 /**
00046  * @class WriteDataContainer
00047  *
00048  * @brief A container for instances sample data.
00049  *
00050  * This container is instantiated per DataWriter. It maintains
00051  * list of PublicationInstance objects which is internally
00052  * referenced by the instance handle.
00053  *
00054  * This container contains threaded lists of all data written to a
00055  * given DataWriter. The real data sample is represented by the
00056  * DataSampleElement.  The data_holder_ holds all
00057  * DataSampleElement in the writing order via the
00058  * next_writer_sample_/previous_writer_sample_ thread. The instance list in
00059  * PublicationInstance links samples via the next_instance_sample_
00060  * thread.
00061  *
00062  * There are three state transition lists used during write operations:
00063  * unsent, sending, and sent.  These lists are linked
00064  * via the next_send_sample_/previous_send_sample_ thread. Any
00065  * DataSampleElement should be in one of these three lists and
00066  * SHOULD NOT be shared between these three lists.
00067  * A normal transition of a DataSampleElement would be
00068  * unsent->sending->sent.  A DataSampleElement transitions from unsent
00069  * to sent naturally during a write operation when get_unsent_data is called.
00070  * A DataSampleElement transitions from sending to sent when data_delivered
00071  * is called notifying the container that the transport is finished with the
00072  * sample and it can be marked as a historical sample.
00073  * A DataSampleElement may transition back to unsent from sending if
00074  * the transport notifies that the sample was dropped and should be resent.
00075  * A DataSampleElement is removed from sent list and freed when the instance
00076  * queue or container (for another instance) needs more space.
00077  * A DataSampleElement may be removed and freed directly from sending when
00078  * and unreliable writer needs space.  In this case the transport will be
00079  * notified that the sample should be removed.
00080  * A DataSampleElement may also be removed directly from unsent if an unreliable
00081  * writer needs space for new samples.
00082  * Note: The real data sample will be freed when the reference counting goes 0.
00083  * The resend list is only used when the datawriter uses
00084  * TRANSIENT_LOCAL_DURABILITY_QOS. It holds the DataSampleElements
00085  * for the data sample duplicates of the sending and sent list and
00086  * hands this list off to the transport.
00087  *
00088  *
00089  *
00090  * @note: 1) The PublicationInstance object is not removed from
00091  *           this container until the instance is
00092  *           unregistered. The same instance handle is reused for
00093  *           re-registration. The instance data is deleted when
00094  *           this container is deleted. This would simplify
00095  *           instance data memory management. An alternative way
00096  *           is to remove the handle from the instance list when
00097  *           unregister occurs and delete the instance data after
00098  *           the transport is done with the instance, but we do
00099  *           not have a way to know when the transport is done
00100  *           since we have the sending list for all instances in
00101  *           the same datawriter.
00102  *        2) This container has the ownership of the instance data
00103  *           samples when the data is written. The type-specific
00104  *           datawriter that allocates the memory for the sample
00105  *           data gives the ownership to its base class.
00106  *        3) It is the responsibility of the owner of objects of
00107  *           this class to ensure that access to the lists are
00108  *           properly locked.  This means using the same
00109  *           lock/condition to access the lists via the enqueue(),
00110  *           get*(), and data_delivered() methods.  For the case
00111  *           where these are all called from the same (client)
00112  *           thread, this should be a recursive lock so that: 1)
00113  *           we do not deadlock; and, 2) we incur the cost of
00114  *           obtaining the lock only once.
00115  */
00116 class OpenDDS_Dcps_Export WriteDataContainer : public PoolAllocationBase {
00117 public:
00118 
00119   friend class DataWriterImpl;
00120 
00121   /**
00122    * No default constructor, must be initialized.
00123    */
00124   WriteDataContainer(
00125     /// The writer which owns this container.
00126     DataWriterImpl*  writer,
00127     /// Depth of the instance sample queue.
00128     CORBA::Long      depth,
00129     /// The timeout for write.
00130     ::DDS::Duration_t max_blocking_time,
00131     /// The number of chunks that the DataSampleElementAllocator
00132     /// needs allocate.
00133     size_t           n_chunks,
00134     /// Domain ID.
00135     DDS::DomainId_t  domain_id,
00136     /// Topic name.
00137     char const *     topic_name,
00138     /// Type name.
00139     char const *     type_name,
00140 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
00141     /// The data durability cache for unsent data.
00142     DataDurabilityCache * durability_cache,
00143     /// DURABILITY_SERVICE QoS specific to the DataWriter.
00144     DDS::DurabilityServiceQosPolicy const & durability_service,
00145 #endif
00146     /// maximum number of instances, 0 for unlimited
00147     CORBA::Long      max_instances,
00148     /// maximum total number of samples, 0 for unlimited
00149     CORBA::Long      max_total_samples);
00150 
00151   /**
00152    * Default destructor.
00153    */
00154   ~WriteDataContainer();
00155 
00156   DDS::ReturnCode_t
00157   enqueue_control(DataSampleElement* control_sample);
00158 
00159   /**
00160    * Enqueue the data sample in its instance thread. This method
00161    * assumes there is an available space for the sample in the
00162    * instance list.
00163   */
00164   DDS::ReturnCode_t
00165   enqueue(
00166     DataSampleElement* sample,
00167     DDS::InstanceHandle_t instance);
00168 
00169   /**
00170    * Create a resend list with the copies of all current "sending"
00171    * and "sent" samples. The samples will be sent to the
00172    *  subscriber specified.
00173    */
00174   DDS::ReturnCode_t reenqueue_all(const RepoId& reader_id,
00175                                   const DDS::LifespanQosPolicy& lifespan
00176 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00177                                   ,
00178                                   const OPENDDS_STRING& filterClassName,
00179                                   const FilterEvaluator* eval,
00180                                   const DDS::StringSeq& params
00181 #endif
00182                                   );
00183 
00184   /**
00185    * Dynamically allocate a PublicationInstance object and add to
00186    * the instances_ list.
00187    *
00188    * @note: The registered_sample is an input and output parameter.
00189    *        A shallow copy of the sample data will be given to
00190    *        datawriter as part of the control message.
00191    */
00192   DDS::ReturnCode_t
00193   register_instance(DDS::InstanceHandle_t&  instance_handle,
00194                     DataSample*&            registered_sample);
00195 
00196   /**
00197    * Remove the provided instance from the instances_ list.
00198    * The registered sample data will be released upon the deletion
00199    * of the PublicationInstance. A shallow copy of the sample data
00200    * will be given to datawriter as part of the control message if
00201    * the dup_registered_sample is true.
00202    *
00203    * This method returns error if the instance is not registered.
00204    */
00205   DDS::ReturnCode_t unregister(
00206     DDS::InstanceHandle_t handle,
00207     DataSample*& registered_sample,
00208     bool dup_registered_sample = true);
00209 
00210   /**
00211    * Delete the samples for the provided instance.
00212    * A shallow copy of the sample data will be given to datawriter
00213    * as part of the control message if the dup_registered_sample
00214    * is true.
00215    *
00216    * This method returns error if the instance is not registered.
00217    */
00218   DDS::ReturnCode_t dispose(
00219     DDS::InstanceHandle_t handle,
00220     DataSample*& registered_sample,
00221     bool dup_registered_sample = true);
00222 
00223   /**
00224    * Return the number of samples for the given instance.
00225    */
00226   DDS::ReturnCode_t num_samples(
00227     DDS::InstanceHandle_t handle,
00228     size_t& size);
00229 
00230   /**
00231    * Return the number of samples for all instances.
00232    */
00233   size_t num_all_samples();
00234 
00235   /**
00236    * Obtain a list of data that has not yet been sent.  The data
00237    * on the list returned is moved from the internal unsent_data_
00238    * list to the internal sending_data_ list as part of this call.
00239    * The entire list is linked via the
00240    * DataSampleElement.next_send_sample_ link as well.
00241    */
00242    ACE_UINT64 get_unsent_data(SendStateDataSampleList& list);
00243 
00244   /**
00245    * Obtain a list of data for resending. This is only used when
00246    * TRANSIENT_LOCAL_DURABILITY_QOS is used. The data on the list
00247    * returned is not put on any SendStateDataSampleList.
00248    */
00249   SendStateDataSampleList get_resend_data() ;
00250 
00251   /**
00252    * Returns if pending data exists.  This includes
00253    * sending, and unsent data.
00254    */
00255   bool pending_data();
00256 
00257   /**
00258    * Acknowledge the delivery of data.  The sample that resides in
00259    * this container will be moved from sending_data_ list to the
00260    * internal sent_data_ list. If there are any threads waiting for
00261    * available space, it wakes up these threads.
00262    */
00263   void data_delivered(const DataSampleElement* sample);
00264 
00265   /**
00266    * This method is called by the transport to notify the sample
00267    * is dropped.  Which the transport was told to do by the
00268    * publication code by calling
00269    * TransportClient::remove_sample(). If the sample was
00270    * "sending" then it is moved to the "unsent" list. If there are any
00271    * threads waiting for available space then it needs wake up
00272    * these threads. The dropped_by_transport flag true indicates
00273    * the dropping initiated by transport when the transport send
00274    * strategy is in a MODE_TERMINATED. The dropped_by_transport
00275    * flag false indicates the dropping is initiated by the
00276    * remove_sample and data_dropped() is a result of
00277    * remove_sample().
00278    */
00279   void data_dropped(const DataSampleElement* element,
00280                     bool dropped_by_transport);
00281 
00282   DDS::ReturnCode_t obtain_buffer_for_control(DataSampleElement*& element);
00283 
00284   /**
00285    * Allocate a DataSampleElement object and check the space
00286    * availability for newly allocated element according to qos settings.
00287    * For the blocking write case, if resource limits or history qos limits
00288    * are reached, then it blocks for max blocking time for a previous sample
00289    * to be delivered or dropped by the transport. In non-blocking write
00290    * case, if resource limits or history qos limits are reached, will attempt
00291    * to remove oldest samples (forcing the transport to drop samples if necessary)
00292    * to make space.  If there are several threads waiting then
00293    * the first one in the waiting list can enqueue, others continue
00294    * waiting.
00295    */
00296   DDS::ReturnCode_t obtain_buffer(
00297     DataSampleElement*& element,
00298     DDS::InstanceHandle_t handle);
00299 
00300   /**
00301    * Release the memory previously allocated.
00302    * This method is corresponding to the obtain_buffer method. If
00303    * the memory is allocated by some allocator then the memory
00304    * needs to be released to the allocator.
00305    */
00306   void release_buffer(DataSampleElement* element);
00307 
00308   /**
00309    * Unregister all instances managed by this data containers.
00310    */
00311   void unregister_all();
00312 
00313   /**
00314    * @todo remove/document this!
00315    */
00316   PublicationInstance* get_handle_instance(
00317     DDS::InstanceHandle_t handle);
00318 
00319 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
00320   /**
00321    * Copy sent data to data DURABILITY cache.
00322    */
00323   bool persist_data();
00324 #endif
00325 
00326   /// Reset time interval for each instance.
00327   void reschedule_deadline();
00328 
00329   /**
00330    * Block until pending samples have either been delivered
00331    * or dropped.
00332    */
00333   void wait_pending();
00334 
00335   /**
00336    * Returns a vector of handles for the instances registered for this
00337    * data writer.
00338    */
00339   typedef OPENDDS_VECTOR(DDS::InstanceHandle_t) InstanceHandleVec;
00340   void get_instance_handles(InstanceHandleVec& instance_handles);
00341 
00342   DDS::ReturnCode_t wait_ack_of_seq(const ACE_Time_Value& abs_deadline, const SequenceNumber& sequence);
00343 
00344   bool sequence_acknowledged(const SequenceNumber sequence);
00345 
00346 private:
00347 
00348   // A class, normally provided by an unit test, that needs access to
00349   // private methods/members.
00350   friend class ::DDS_TEST;
00351 
00352   // --------------------------
00353   // Preventing copying
00354   // --------------------------
00355   WriteDataContainer(WriteDataContainer const &);
00356   WriteDataContainer & operator= (WriteDataContainer const &);
00357   // --------------------------
00358 
00359   void copy_and_append(SendStateDataSampleList& list,
00360                        const SendStateDataSampleList& appended,
00361                        const RepoId& reader_id,
00362                        const DDS::LifespanQosPolicy& lifespan
00363 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00364                        ,
00365                        const OPENDDS_STRING& filterClassName,
00366                        const FilterEvaluator* eval,
00367                        const DDS::StringSeq& params
00368 #endif
00369                        );
00370   /**
00371    * Remove the sample (head) from the instance list if it is only being
00372    * used for history purposes (located on sent_data_ list).
00373    * This method also updates the internal lists to reflect
00374    * the change.
00375    * The "released" boolean value indicates whether a sample was
00376    * released.
00377    */
00378   DDS::ReturnCode_t remove_oldest_historical_sample(
00379     InstanceDataSampleList& instance_list,
00380     bool& released);
00381 
00382   /**
00383    * Remove the oldest sample (head) from the instance history list.
00384    * This method also updates the internal lists to reflect
00385    * the change.
00386    * If the sample is in the unsent_data_ or sent_data_ list then
00387    * it will be released. If the sample is in the sending_data_ list
00388    * then the transport will be notified to release the sample, then
00389    * the sample will be released. Otherwise an error
00390    * is returned.
00391    * The "released" boolean value indicates whether the sample is
00392    * released.
00393    */
00394   DDS::ReturnCode_t remove_oldest_sample(
00395     InstanceDataSampleList& instance_list,
00396     bool& released);
00397 
00398   /**
00399    * Called when data has been dropped or delivered and any
00400    * blocked writers should be notified
00401    */
00402   void wakeup_blocking_writers (DataSampleElement* stale);
00403 
00404 private:
00405 
00406   void log_send_state_lists (OPENDDS_STRING description);
00407 
00408   DisjointSequence acked_sequences_;
00409 
00410   /// List of data that has not been sent yet.
00411   SendStateDataSampleList   unsent_data_;
00412 
00413   /// Id used to keep track of which send transaction
00414   /// DataWriter is currently creating
00415   ACE_UINT64 transaction_id_;
00416 
00417   /// List of data that is currently being sent.
00418   SendStateDataSampleList   sending_data_;
00419 
00420   /// List of data that has already been sent.
00421   SendStateDataSampleList   sent_data_;
00422 
00423   /// List of data that has been released by WriteDataContainer
00424   /// but is still in process of delivery (or dropping) by transport
00425   SendStateDataSampleList  orphaned_to_transport_;
00426 
00427   /// The list of all samples written to this datawriter in
00428   /// writing order.
00429   WriterDataSampleList   data_holder_;
00430 
00431   /// List of the data reenqueued to support the
00432   /// TRANSIENT_LOCAL_DURABILITY_QOS policy. It duplicates the
00433   /// samples in sent and sending list. This
00434   /// list will be passed to the transport for re-sending.
00435   SendStateDataSampleList   resend_data_;
00436 
00437   /// The individual instance queue threads in the data.
00438   PublicationInstanceMapType instances_;
00439 
00440   /// The publication Id from repo.
00441   PublicationId    publication_id_;
00442 
00443   /// The writer that owns this container.
00444   DataWriterImpl*  writer_;
00445 
00446   /// The maximum size a container should allow for
00447   /// an instance sample list
00448   /// It corresponds to the QoS.HISTORY.depth value for
00449   /// QoS.HISTORY.kind==KEEP_LAST and corresponds to the
00450   /// QoS.RESOURCE_LIMITS.max_samples_per_instance for
00451   /// the case of QoS.HISTORY.kind==KEEP_ALL.
00452   CORBA::Long                     depth_;
00453 
00454   /// The maximum number of instances allowed or zero
00455   /// to indicate unlimited.
00456   /// It corresponds to the QoS.RESOURCE_LIMITS.max_instances
00457   /// when QoS.RELIABILITY.kind == DDS::RELIABLE_RELIABILITY_QOS
00458   CORBA::Long                     max_num_instances_;
00459 
00460   /// The maximum number of samples allowed or zero
00461   /// to indicate unlimited.
00462   /// It corresponds to the QoS.RESOURCE_LIMITS.max_instances
00463   /// when QoS.RELIABILITY.kind == DDS::RELIABLE_RELIABILITY_QOS
00464   /// It also covers QoS.RESOURCE_LIMITS.max_samples and
00465   /// max_instances * depth
00466   CORBA::Long                     max_num_samples_;
00467 
00468   /// The maximum time to block on write operation.
00469   /// This comes from DataWriter's QoS HISTORY.max_blocking_time
00470   ::DDS::Duration_t               max_blocking_time_;
00471 
00472   /// The block waiting flag.
00473   bool                            waiting_on_release_;
00474 
00475   /// This lock is used to protect the container and the map
00476   /// in the type-specific DataWriter.
00477   /// This lock can be accessible via the datawriter.
00478   /// This lock is made to be globally accessible for
00479   /// performance concern. The lock is acquired as the external
00480   /// call (e.g. FooDataWriterImpl::write) started and the
00481   /// same lock will be used by the transport thread to notify
00482   /// the datawriter the data is delivered. Other internal
00483   /// operations will not lock.
00484   ACE_Recursive_Thread_Mutex                lock_;
00485   ACE_Condition<ACE_Recursive_Thread_Mutex> condition_;
00486   ACE_Condition<ACE_Recursive_Thread_Mutex> empty_condition_;
00487 
00488   /// Lock used for wait_for_acks() processing.
00489   ACE_Thread_Mutex wfa_lock_;
00490 
00491   /// Used to block in wait_for_acks().
00492   ACE_Condition<ACE_Thread_Mutex> wfa_condition_;
00493 
00494   /// The number of chunks that sample_list_element_allocator_
00495   /// needs initialize.
00496   size_t                                    n_chunks_;
00497 
00498   /// The cached allocator to allocate DataSampleElement
00499   /// objects.
00500   DataSampleElementAllocator sample_list_element_allocator_;
00501 
00502   /// The allocator for TransportSendElement.
00503   /// The TransportSendElement allocator is put here because it
00504   /// needs the number of chunks information that WriteDataContainer
00505   /// has.
00506   TransportSendElementAllocator  transport_send_element_allocator_;
00507 
00508   TransportCustomizedElementAllocator transport_customized_element_allocator_;
00509 
00510   /// The flag indicates the datawriter will be destroyed.
00511   bool shutdown_;
00512 
00513   /// Domain ID.
00514   DDS::DomainId_t const domain_id_;
00515 
00516   /// Topic name.
00517   char const * const topic_name_;
00518 
00519   /// Type name.
00520   char const * const type_name_;
00521 
00522 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
00523 
00524   /// Pointer to the data durability cache.
00525   /**
00526    * This a pointer to the data durability cache owned by the
00527    * Service Participant singleton, which means this cache is also
00528    * a singleton.
00529    */
00530   DataDurabilityCache * const durability_cache_;
00531 
00532   /// DURABILITY_SERVICE QoS specific to the DataWriter.
00533   DDS::DurabilityServiceQosPolicy const & durability_service_;
00534 
00535 #endif
00536 };
00537 
00538 } /// namespace OpenDDS
00539 } /// namespace DCPS
00540 
00541 #endif /* OPENDDS_DCPS_WRITE_DATA_CONTAINER_H */

Generated on Fri Feb 12 20:05:29 2016 for OpenDDS by  doxygen 1.4.7