00001 /* 00002 * 00003 * 00004 * Distributed under the OpenDDS License. 00005 * See: http://www.opendds.org/license.html 00006 */ 00007 00008 #ifndef OPENDDS_DCPS_WRITE_DATA_CONTAINER_H 00009 #define OPENDDS_DCPS_WRITE_DATA_CONTAINER_H 00010 00011 #include "dds/DdsDcpsInfrastructureC.h" 00012 #include "dds/DdsDcpsCoreC.h" 00013 #include "DataSampleElement.h" 00014 #include "SendStateDataSampleList.h" 00015 #include "WriterDataSampleList.h" 00016 #include "DisjointSequence.h" 00017 #include "PoolAllocator.h" 00018 #include "PoolAllocationBase.h" 00019 #include "Message_Block_Ptr.h" 00020 00021 #include "ace/Synch_Traits.h" 00022 #include "ace/Condition_T.h" 00023 #include "ace/Condition_Thread_Mutex.h" 00024 #include "ace/Condition_Recursive_Thread_Mutex.h" 00025 00026 #include <memory> 00027 00028 #if !defined (ACE_LACKS_PRAGMA_ONCE) 00029 #pragma once 00030 #endif /* ACE_LACKS_PRAGMA_ONCE */ 00031 00032 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL 00033 00034 namespace OpenDDS { 00035 namespace DCPS { 00036 00037 class InstanceDataSampleList; 00038 class DataWriterImpl; 00039 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE 00040 class DataDurabilityCache; 00041 #endif 00042 class FilterEvaluator; 00043 00044 typedef OPENDDS_MAP(DDS::InstanceHandle_t, PublicationInstance_rch) 00045 PublicationInstanceMapType; 00046 00047 /** 00048 * @class WriteDataContainer 00049 * 00050 * @brief A container for instances sample data. 00051 * 00052 * This container is instantiated per DataWriter. It maintains 00053 * list of PublicationInstance objects which is internally 00054 * referenced by the instance handle. 00055 * 00056 * This container contains threaded lists of all data written to a 00057 * given DataWriter. The real data sample is represented by the 00058 * DataSampleElement. The data_holder_ holds all 00059 * DataSampleElement in the writing order via the 00060 * next_writer_sample_/previous_writer_sample_ thread. The instance list in 00061 * PublicationInstance links samples via the next_instance_sample_ 00062 * thread. 00063 * 00064 * There are three state transition lists used during write operations: 00065 * unsent, sending, and sent. These lists are linked 00066 * via the next_send_sample_/previous_send_sample_ thread. Any 00067 * DataSampleElement should be in one of these three lists and 00068 * SHOULD NOT be shared between these three lists. 00069 * A normal transition of a DataSampleElement would be 00070 * unsent->sending->sent. A DataSampleElement transitions from unsent 00071 * to sent naturally during a write operation when get_unsent_data is called. 00072 * A DataSampleElement transitions from sending to sent when data_delivered 00073 * is called notifying the container that the transport is finished with the 00074 * sample and it can be marked as a historical sample. 00075 * A DataSampleElement may transition back to unsent from sending if 00076 * the transport notifies that the sample was dropped and should be resent. 00077 * A DataSampleElement is removed from sent list and freed when the instance 00078 * queue or container (for another instance) needs more space. 00079 * A DataSampleElement may be removed and freed directly from sending when 00080 * and unreliable writer needs space. In this case the transport will be 00081 * notified that the sample should be removed. 00082 * A DataSampleElement may also be removed directly from unsent if an unreliable 00083 * writer needs space for new samples. 00084 * Note: The real data sample will be freed when the reference counting goes 0. 00085 * The resend list is only used when the datawriter uses 00086 * TRANSIENT_LOCAL_DURABILITY_QOS. It holds the DataSampleElements 00087 * for the data sample duplicates of the sending and sent list and 00088 * hands this list off to the transport. 00089 * 00090 * 00091 * 00092 * @note: 1) The PublicationInstance object is not removed from 00093 * this container until the instance is 00094 * unregistered. The same instance handle is reused for 00095 * re-registration. The instance data is deleted when 00096 * this container is deleted. This would simplify 00097 * instance data memory management. An alternative way 00098 * is to remove the handle from the instance list when 00099 * unregister occurs and delete the instance data after 00100 * the transport is done with the instance, but we do 00101 * not have a way to know when the transport is done 00102 * since we have the sending list for all instances in 00103 * the same datawriter. 00104 * 2) This container has the ownership of the instance data 00105 * samples when the data is written. The type-specific 00106 * datawriter that allocates the memory for the sample 00107 * data gives the ownership to its base class. 00108 * 3) It is the responsibility of the owner of objects of 00109 * this class to ensure that access to the lists are 00110 * properly locked. This means using the same 00111 * lock/condition to access the lists via the enqueue(), 00112 * get*(), and data_delivered() methods. For the case 00113 * where these are all called from the same (client) 00114 * thread, this should be a recursive lock so that: 1) 00115 * we do not deadlock; and, 2) we incur the cost of 00116 * obtaining the lock only once. 00117 */ 00118 class OpenDDS_Dcps_Export WriteDataContainer : public PoolAllocationBase { 00119 public: 00120 00121 friend class DataWriterImpl; 00122 00123 /** 00124 * No default constructor, must be initialized. 00125 */ 00126 WriteDataContainer( 00127 /// The writer which owns this container. 00128 DataWriterImpl* writer, 00129 /// Max samples kept within each instance 00130 CORBA::Long max_samples_per_instance, 00131 CORBA::Long history_depth, 00132 /// Max durable samples sent for each instance 00133 CORBA::Long max_durable_per_instance, 00134 /// The timeout for write. 00135 DDS::Duration_t max_blocking_time, 00136 /// The number of chunks that the DataSampleElementAllocator 00137 /// needs allocate. 00138 size_t n_chunks, 00139 /// Domain ID. 00140 DDS::DomainId_t domain_id, 00141 /// Topic name. 00142 char const * topic_name, 00143 /// Type name. 00144 char const * type_name, 00145 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE 00146 /// The data durability cache for unsent data. 00147 DataDurabilityCache * durability_cache, 00148 /// DURABILITY_SERVICE QoS specific to the DataWriter. 00149 DDS::DurabilityServiceQosPolicy const & durability_service, 00150 #endif 00151 /// maximum number of instances, 0 for unlimited 00152 CORBA::Long max_instances, 00153 /// maximum total number of samples, 0 for unlimited 00154 CORBA::Long max_total_samples); 00155 00156 ~WriteDataContainer(); 00157 00158 DDS::ReturnCode_t 00159 enqueue_control(DataSampleElement* control_sample); 00160 00161 /** 00162 * Enqueue the data sample in its instance thread. This method 00163 * assumes there is an available space for the sample in the 00164 * instance list. 00165 */ 00166 DDS::ReturnCode_t 00167 enqueue( 00168 DataSampleElement* sample, 00169 DDS::InstanceHandle_t instance); 00170 00171 /** 00172 * Create a resend list with the copies of all current "sending" 00173 * and "sent" samples. The samples will be sent to the 00174 * subscriber specified. 00175 */ 00176 DDS::ReturnCode_t reenqueue_all(const RepoId& reader_id, 00177 const DDS::LifespanQosPolicy& lifespan 00178 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC 00179 , 00180 const OPENDDS_STRING& filterClassName, 00181 const FilterEvaluator* eval, 00182 const DDS::StringSeq& params 00183 #endif 00184 ); 00185 00186 /** 00187 * Dynamically allocate a PublicationInstance object and add to 00188 * the instances_ list. 00189 * 00190 * @note: The registered_sample is an input and output parameter. 00191 * A shallow copy of the sample data will be given to 00192 * datawriter as part of the control message. 00193 */ 00194 DDS::ReturnCode_t 00195 register_instance(DDS::InstanceHandle_t& instance_handle, 00196 Message_Block_Ptr& registered_sample); 00197 00198 /** 00199 * Remove the provided instance from the instances_ list. 00200 * The registered sample data will be released upon the deletion 00201 * of the PublicationInstance. A shallow copy of the sample data 00202 * will be given to datawriter as part of the control message if 00203 * the dup_registered_sample is true. 00204 * 00205 * This method returns error if the instance is not registered. 00206 */ 00207 DDS::ReturnCode_t unregister( 00208 DDS::InstanceHandle_t handle, 00209 Message_Block_Ptr& registered_sample, 00210 bool dup_registered_sample = true); 00211 00212 /** 00213 * Delete the samples for the provided instance. 00214 * A shallow copy of the sample data will be given to datawriter 00215 * as part of the control message if the dup_registered_sample 00216 * is true. 00217 * 00218 * This method returns error if the instance is not registered. 00219 */ 00220 DDS::ReturnCode_t dispose( 00221 DDS::InstanceHandle_t handle, 00222 Message_Block_Ptr& registered_sample, 00223 bool dup_registered_sample = true); 00224 00225 /** 00226 * Return the number of samples for the given instance. 00227 */ 00228 DDS::ReturnCode_t num_samples( 00229 DDS::InstanceHandle_t handle, 00230 size_t& size); 00231 00232 /** 00233 * Return the number of samples for all instances. 00234 */ 00235 size_t num_all_samples(); 00236 00237 /** 00238 * Obtain a list of data that has not yet been sent. The data 00239 * on the list returned is moved from the internal unsent_data_ 00240 * list to the internal sending_data_ list as part of this call. 00241 * The entire list is linked via the 00242 * DataSampleElement.next_send_sample_ link as well. 00243 */ 00244 ACE_UINT64 get_unsent_data(SendStateDataSampleList& list); 00245 00246 /** 00247 * Obtain a list of data for resending. This is only used when 00248 * TRANSIENT_LOCAL_DURABILITY_QOS is used. The data on the list 00249 * returned is not put on any SendStateDataSampleList. 00250 */ 00251 SendStateDataSampleList get_resend_data() ; 00252 00253 /** 00254 * Returns if pending data exists. This includes 00255 * sending, and unsent data. 00256 */ 00257 bool pending_data(); 00258 00259 /** 00260 * Acknowledge the delivery of data. The sample that resides in 00261 * this container will be moved from sending_data_ list to the 00262 * internal sent_data_ list. If there are any threads waiting for 00263 * available space, it wakes up these threads. 00264 */ 00265 void data_delivered(const DataSampleElement* sample); 00266 00267 /** 00268 * This method is called by the transport to notify the sample 00269 * is dropped. Which the transport was told to do by the 00270 * publication code by calling 00271 * TransportClient::remove_sample(). If the sample was 00272 * "sending" then it is moved to the "unsent" list. If there are any 00273 * threads waiting for available space then it needs wake up 00274 * these threads. The dropped_by_transport flag true indicates 00275 * the dropping initiated by transport when the transport send 00276 * strategy is in a MODE_TERMINATED. The dropped_by_transport 00277 * flag false indicates the dropping is initiated by the 00278 * remove_sample and data_dropped() is a result of 00279 * remove_sample(). 00280 */ 00281 void data_dropped(const DataSampleElement* element, 00282 bool dropped_by_transport); 00283 00284 DDS::ReturnCode_t obtain_buffer_for_control(DataSampleElement*& element); 00285 00286 /** 00287 * Allocate a DataSampleElement object and check the space 00288 * availability for newly allocated element according to qos settings. 00289 * For the blocking write case, if resource limits or history qos limits 00290 * are reached, then it blocks for max blocking time for a previous sample 00291 * to be delivered or dropped by the transport. In non-blocking write 00292 * case, if resource limits or history qos limits are reached, will attempt 00293 * to remove oldest samples (forcing the transport to drop samples if necessary) 00294 * to make space. If there are several threads waiting then 00295 * the first one in the waiting list can enqueue, others continue 00296 * waiting. 00297 */ 00298 DDS::ReturnCode_t obtain_buffer( 00299 DataSampleElement*& element, 00300 DDS::InstanceHandle_t handle); 00301 00302 /** 00303 * Release the memory previously allocated. 00304 * This method is corresponding to the obtain_buffer method. If 00305 * the memory is allocated by some allocator then the memory 00306 * needs to be released to the allocator. 00307 */ 00308 void release_buffer(DataSampleElement* element); 00309 00310 /** 00311 * Unregister all instances managed by this data containers. 00312 */ 00313 void unregister_all(); 00314 00315 /** 00316 * @todo remove/document this! 00317 */ 00318 PublicationInstance_rch get_handle_instance( 00319 DDS::InstanceHandle_t handle); 00320 00321 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE 00322 /** 00323 * Copy sent data to data DURABILITY cache. 00324 */ 00325 bool persist_data(); 00326 #endif 00327 00328 /// Reset time interval for each instance. 00329 void reschedule_deadline(); 00330 00331 /** 00332 * Block until pending samples have either been delivered 00333 * or dropped. 00334 */ 00335 void wait_pending(); 00336 00337 /** 00338 * Returns a vector of handles for the instances registered for this 00339 * data writer. 00340 */ 00341 typedef OPENDDS_VECTOR(DDS::InstanceHandle_t) InstanceHandleVec; 00342 void get_instance_handles(InstanceHandleVec& instance_handles); 00343 00344 DDS::ReturnCode_t wait_ack_of_seq(const ACE_Time_Value& abs_deadline, const SequenceNumber& sequence); 00345 00346 bool sequence_acknowledged(const SequenceNumber sequence); 00347 00348 private: 00349 00350 // A class, normally provided by an unit test, that needs access to 00351 // private methods/members. 00352 friend class ::DDS_TEST; 00353 00354 // -------------------------- 00355 // Preventing copying 00356 // -------------------------- 00357 WriteDataContainer(WriteDataContainer const &); 00358 WriteDataContainer & operator= (WriteDataContainer const &); 00359 // -------------------------- 00360 00361 void copy_and_prepend(SendStateDataSampleList& list, 00362 const SendStateDataSampleList& appended, 00363 const RepoId& reader_id, 00364 const DDS::LifespanQosPolicy& lifespan, 00365 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC 00366 const OPENDDS_STRING& filterClassName, 00367 const FilterEvaluator* eval, 00368 const DDS::StringSeq& params, 00369 #endif 00370 ssize_t& max_resend_samples); 00371 00372 /** 00373 * Remove the oldest "n" samples from each instance list that are 00374 * in a state such that they could only be used for durability 00375 * purposes (see reenqueue_all). 00376 * "n" is determined by max_durable_per_instance_, so these samples 00377 * are truly unneeded -- there are max_durable_per_instance_ newer 00378 * samples available in the instance. 00379 */ 00380 void remove_excess_durable(); 00381 00382 /** 00383 * Remove the oldest sample (head) from the instance history list. 00384 * This method also updates the internal lists to reflect 00385 * the change. 00386 * If the sample is in the unsent_data_ or sent_data_ list then 00387 * it will be released. If the sample is in the sending_data_ list 00388 * then the transport will be notified to release the sample, then 00389 * the sample will be released. Otherwise an error 00390 * is returned. 00391 * The "released" boolean value indicates whether the sample is 00392 * released. 00393 */ 00394 DDS::ReturnCode_t remove_oldest_sample( 00395 InstanceDataSampleList& instance_list, 00396 bool& released); 00397 00398 /** 00399 * Called when data has been dropped or delivered and any 00400 * blocked writers should be notified 00401 */ 00402 void wakeup_blocking_writers (DataSampleElement* stale); 00403 00404 private: 00405 00406 void log_send_state_lists (OPENDDS_STRING description); 00407 00408 DisjointSequence acked_sequences_; 00409 00410 /// List of data that has not been sent yet. 00411 SendStateDataSampleList unsent_data_; 00412 00413 /// Id used to keep track of which send transaction 00414 /// DataWriter is currently creating 00415 ACE_UINT64 transaction_id_; 00416 00417 /// List of data that is currently being sent. 00418 SendStateDataSampleList sending_data_; 00419 00420 /// List of data that has already been sent. 00421 SendStateDataSampleList sent_data_; 00422 00423 /// List of data that has been released by WriteDataContainer 00424 /// but is still in process of delivery (or dropping) by transport 00425 SendStateDataSampleList orphaned_to_transport_; 00426 00427 /// The list of all samples written to this datawriter in 00428 /// writing order. 00429 WriterDataSampleList data_holder_; 00430 00431 /// List of the data reenqueued to support the 00432 /// TRANSIENT_LOCAL_DURABILITY_QOS policy. It duplicates the 00433 /// samples in sent and sending list. This 00434 /// list will be passed to the transport for re-sending. 00435 SendStateDataSampleList resend_data_; 00436 00437 /// The individual instance queue threads in the data. 00438 PublicationInstanceMapType instances_; 00439 00440 /// The publication Id from repo. 00441 PublicationId publication_id_; 00442 00443 /// The writer that owns this container. 00444 DataWriterImpl* writer_; 00445 00446 /// The maximum size a container should allow for 00447 /// an instance sample list 00448 CORBA::Long max_samples_per_instance_; 00449 00450 CORBA::Long history_depth_; 00451 00452 /// The maximum number of samples from each instance that 00453 /// can be added to the resend_data_ for durability. 00454 CORBA::Long max_durable_per_instance_; 00455 00456 /// The maximum number of instances allowed or zero 00457 /// to indicate unlimited. 00458 /// It corresponds to the QoS.RESOURCE_LIMITS.max_instances 00459 /// when QoS.RELIABILITY.kind == DDS::RELIABLE_RELIABILITY_QOS 00460 CORBA::Long max_num_instances_; 00461 00462 /// The maximum number of samples allowed or zero 00463 /// to indicate unlimited. 00464 /// It corresponds to the QoS.RESOURCE_LIMITS.max_instances 00465 /// when QoS.RELIABILITY.kind == DDS::RELIABLE_RELIABILITY_QOS 00466 /// It also covers QoS.RESOURCE_LIMITS.max_samples and 00467 /// max_instances * max_samples_per_instance 00468 CORBA::Long max_num_samples_; 00469 00470 /// The maximum time to block on write operation. 00471 /// This comes from DataWriter's QoS HISTORY.max_blocking_time 00472 DDS::Duration_t max_blocking_time_; 00473 00474 /// The block waiting flag. 00475 bool waiting_on_release_; 00476 00477 /// This lock is used to protect the container and the map 00478 /// in the type-specific DataWriter. 00479 /// This lock can be accessible via the datawriter. 00480 /// This lock is made to be globally accessible for 00481 /// performance concern. The lock is acquired as the external 00482 /// call (e.g. FooDataWriterImpl::write) started and the 00483 /// same lock will be used by the transport thread to notify 00484 /// the datawriter the data is delivered. Other internal 00485 /// operations will not lock. 00486 ACE_Recursive_Thread_Mutex lock_; 00487 ACE_Condition<ACE_Recursive_Thread_Mutex> condition_; 00488 ACE_Condition<ACE_Recursive_Thread_Mutex> empty_condition_; 00489 00490 /// Lock used for wait_for_acks() processing. 00491 ACE_Thread_Mutex wfa_lock_; 00492 00493 /// Used to block in wait_for_acks(). 00494 ACE_Condition<ACE_Thread_Mutex> wfa_condition_; 00495 00496 /// The number of chunks that sample_list_element_allocator_ 00497 /// needs initialize. 00498 size_t n_chunks_; 00499 00500 /// The cached allocator to allocate DataSampleElement 00501 /// objects. 00502 DataSampleElementAllocator sample_list_element_allocator_; 00503 00504 /// The flag indicates the datawriter will be destroyed. 00505 bool shutdown_; 00506 00507 /// Domain ID. 00508 DDS::DomainId_t const domain_id_; 00509 00510 /// Topic name. 00511 char const * const topic_name_; 00512 00513 /// Type name. 00514 char const * const type_name_; 00515 00516 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE 00517 00518 /// Pointer to the data durability cache. 00519 /** 00520 * This a pointer to the data durability cache owned by the 00521 * Service Participant singleton, which means this cache is also 00522 * a singleton. 00523 */ 00524 DataDurabilityCache * const durability_cache_; 00525 00526 /// DURABILITY_SERVICE QoS specific to the DataWriter. 00527 DDS::DurabilityServiceQosPolicy const & durability_service_; 00528 00529 #endif 00530 }; 00531 00532 } /// namespace OpenDDS 00533 } /// namespace DCPS 00534 00535 OPENDDS_END_VERSIONED_NAMESPACE_DECL 00536 00537 #endif /* OPENDDS_DCPS_WRITE_DATA_CONTAINER_H */