00001 /* 00002 * 00003 * 00004 * Distributed under the OpenDDS License. 00005 * See: http://www.opendds.org/license.html 00006 */ 00007 00008 #ifndef OPENDDS_DCPS_WRITE_DATA_CONTAINER_H 00009 #define OPENDDS_DCPS_WRITE_DATA_CONTAINER_H 00010 00011 #include "dds/DdsDcpsInfrastructureC.h" 00012 #include "DataSampleElement.h" 00013 #include "SendStateDataSampleList.h" 00014 #include "WriterDataSampleList.h" 00015 #include "DisjointSequence.h" 00016 #include "PoolAllocator.h" 00017 #include "PoolAllocationBase.h" 00018 00019 #include "ace/Condition_T.h" 00020 #include "Service_Participant.h" 00021 00022 #include "ace/Null_Mutex.h" 00023 #include "ace/Thread_Mutex.h" 00024 #include "ace/Recursive_Thread_Mutex.h" 00025 00026 #include <memory> 00027 00028 #if !defined (ACE_LACKS_PRAGMA_ONCE) 00029 #pragma once 00030 #endif /* ACE_LACKS_PRAGMA_ONCE */ 00031 00032 namespace OpenDDS { 00033 namespace DCPS { 00034 00035 class InstanceDataSampleList; 00036 class DataWriterImpl; 00037 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE 00038 class DataDurabilityCache; 00039 #endif 00040 class FilterEvaluator; 00041 00042 typedef OPENDDS_MAP(DDS::InstanceHandle_t, PublicationInstance*) 00043 PublicationInstanceMapType; 00044 00045 /** 00046 * @class WriteDataContainer 00047 * 00048 * @brief A container for instances sample data. 00049 * 00050 * This container is instantiated per DataWriter. It maintains 00051 * list of PublicationInstance objects which is internally 00052 * referenced by the instance handle. 00053 * 00054 * This container contains threaded lists of all data written to a 00055 * given DataWriter. The real data sample is represented by the 00056 * DataSampleElement. The data_holder_ holds all 00057 * DataSampleElement in the writing order via the 00058 * next_writer_sample_/previous_writer_sample_ thread. The instance list in 00059 * PublicationInstance links samples via the next_instance_sample_ 00060 * thread. 00061 * 00062 * There are three state transition lists used during write operations: 00063 * unsent, sending, and sent. These lists are linked 00064 * via the next_send_sample_/previous_send_sample_ thread. Any 00065 * DataSampleElement should be in one of these three lists and 00066 * SHOULD NOT be shared between these three lists. 00067 * A normal transition of a DataSampleElement would be 00068 * unsent->sending->sent. A DataSampleElement transitions from unsent 00069 * to sent naturally during a write operation when get_unsent_data is called. 00070 * A DataSampleElement transitions from sending to sent when data_delivered 00071 * is called notifying the container that the transport is finished with the 00072 * sample and it can be marked as a historical sample. 00073 * A DataSampleElement may transition back to unsent from sending if 00074 * the transport notifies that the sample was dropped and should be resent. 00075 * A DataSampleElement is removed from sent list and freed when the instance 00076 * queue or container (for another instance) needs more space. 00077 * A DataSampleElement may be removed and freed directly from sending when 00078 * and unreliable writer needs space. In this case the transport will be 00079 * notified that the sample should be removed. 00080 * A DataSampleElement may also be removed directly from unsent if an unreliable 00081 * writer needs space for new samples. 00082 * Note: The real data sample will be freed when the reference counting goes 0. 00083 * The resend list is only used when the datawriter uses 00084 * TRANSIENT_LOCAL_DURABILITY_QOS. It holds the DataSampleElements 00085 * for the data sample duplicates of the sending and sent list and 00086 * hands this list off to the transport. 00087 * 00088 * 00089 * 00090 * @note: 1) The PublicationInstance object is not removed from 00091 * this container until the instance is 00092 * unregistered. The same instance handle is reused for 00093 * re-registration. The instance data is deleted when 00094 * this container is deleted. This would simplify 00095 * instance data memory management. An alternative way 00096 * is to remove the handle from the instance list when 00097 * unregister occurs and delete the instance data after 00098 * the transport is done with the instance, but we do 00099 * not have a way to know when the transport is done 00100 * since we have the sending list for all instances in 00101 * the same datawriter. 00102 * 2) This container has the ownership of the instance data 00103 * samples when the data is written. The type-specific 00104 * datawriter that allocates the memory for the sample 00105 * data gives the ownership to its base class. 00106 * 3) It is the responsibility of the owner of objects of 00107 * this class to ensure that access to the lists are 00108 * properly locked. This means using the same 00109 * lock/condition to access the lists via the enqueue(), 00110 * get*(), and data_delivered() methods. For the case 00111 * where these are all called from the same (client) 00112 * thread, this should be a recursive lock so that: 1) 00113 * we do not deadlock; and, 2) we incur the cost of 00114 * obtaining the lock only once. 00115 */ 00116 class OpenDDS_Dcps_Export WriteDataContainer : public PoolAllocationBase { 00117 public: 00118 00119 friend class DataWriterImpl; 00120 00121 /** 00122 * No default constructor, must be initialized. 00123 */ 00124 WriteDataContainer( 00125 /// The writer which owns this container. 00126 DataWriterImpl* writer, 00127 /// Depth of the instance sample queue. 00128 CORBA::Long depth, 00129 /// The timeout for write. 00130 ::DDS::Duration_t max_blocking_time, 00131 /// The number of chunks that the DataSampleElementAllocator 00132 /// needs allocate. 00133 size_t n_chunks, 00134 /// Domain ID. 00135 DDS::DomainId_t domain_id, 00136 /// Topic name. 00137 char const * topic_name, 00138 /// Type name. 00139 char const * type_name, 00140 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE 00141 /// The data durability cache for unsent data. 00142 DataDurabilityCache * durability_cache, 00143 /// DURABILITY_SERVICE QoS specific to the DataWriter. 00144 DDS::DurabilityServiceQosPolicy const & durability_service, 00145 #endif 00146 /// maximum number of instances, 0 for unlimited 00147 CORBA::Long max_instances, 00148 /// maximum total number of samples, 0 for unlimited 00149 CORBA::Long max_total_samples); 00150 00151 /** 00152 * Default destructor. 00153 */ 00154 ~WriteDataContainer(); 00155 00156 DDS::ReturnCode_t 00157 enqueue_control(DataSampleElement* control_sample); 00158 00159 /** 00160 * Enqueue the data sample in its instance thread. This method 00161 * assumes there is an available space for the sample in the 00162 * instance list. 00163 */ 00164 DDS::ReturnCode_t 00165 enqueue( 00166 DataSampleElement* sample, 00167 DDS::InstanceHandle_t instance); 00168 00169 /** 00170 * Create a resend list with the copies of all current "sending" 00171 * and "sent" samples. The samples will be sent to the 00172 * subscriber specified. 00173 */ 00174 DDS::ReturnCode_t reenqueue_all(const RepoId& reader_id, 00175 const DDS::LifespanQosPolicy& lifespan 00176 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC 00177 , 00178 const OPENDDS_STRING& filterClassName, 00179 const FilterEvaluator* eval, 00180 const DDS::StringSeq& params 00181 #endif 00182 ); 00183 00184 /** 00185 * Dynamically allocate a PublicationInstance object and add to 00186 * the instances_ list. 00187 * 00188 * @note: The registered_sample is an input and output parameter. 00189 * A shallow copy of the sample data will be given to 00190 * datawriter as part of the control message. 00191 */ 00192 DDS::ReturnCode_t 00193 register_instance(DDS::InstanceHandle_t& instance_handle, 00194 DataSample*& registered_sample); 00195 00196 /** 00197 * Remove the provided instance from the instances_ list. 00198 * The registered sample data will be released upon the deletion 00199 * of the PublicationInstance. A shallow copy of the sample data 00200 * will be given to datawriter as part of the control message if 00201 * the dup_registered_sample is true. 00202 * 00203 * This method returns error if the instance is not registered. 00204 */ 00205 DDS::ReturnCode_t unregister( 00206 DDS::InstanceHandle_t handle, 00207 DataSample*& registered_sample, 00208 bool dup_registered_sample = true); 00209 00210 /** 00211 * Delete the samples for the provided instance. 00212 * A shallow copy of the sample data will be given to datawriter 00213 * as part of the control message if the dup_registered_sample 00214 * is true. 00215 * 00216 * This method returns error if the instance is not registered. 00217 */ 00218 DDS::ReturnCode_t dispose( 00219 DDS::InstanceHandle_t handle, 00220 DataSample*& registered_sample, 00221 bool dup_registered_sample = true); 00222 00223 /** 00224 * Return the number of samples for the given instance. 00225 */ 00226 DDS::ReturnCode_t num_samples( 00227 DDS::InstanceHandle_t handle, 00228 size_t& size); 00229 00230 /** 00231 * Return the number of samples for all instances. 00232 */ 00233 size_t num_all_samples(); 00234 00235 /** 00236 * Obtain a list of data that has not yet been sent. The data 00237 * on the list returned is moved from the internal unsent_data_ 00238 * list to the internal sending_data_ list as part of this call. 00239 * The entire list is linked via the 00240 * DataSampleElement.next_send_sample_ link as well. 00241 */ 00242 ACE_UINT64 get_unsent_data(SendStateDataSampleList& list); 00243 00244 /** 00245 * Obtain a list of data for resending. This is only used when 00246 * TRANSIENT_LOCAL_DURABILITY_QOS is used. The data on the list 00247 * returned is not put on any SendStateDataSampleList. 00248 */ 00249 SendStateDataSampleList get_resend_data() ; 00250 00251 /** 00252 * Returns if pending data exists. This includes 00253 * sending, and unsent data. 00254 */ 00255 bool pending_data(); 00256 00257 /** 00258 * Acknowledge the delivery of data. The sample that resides in 00259 * this container will be moved from sending_data_ list to the 00260 * internal sent_data_ list. If there are any threads waiting for 00261 * available space, it wakes up these threads. 00262 */ 00263 void data_delivered(const DataSampleElement* sample); 00264 00265 /** 00266 * This method is called by the transport to notify the sample 00267 * is dropped. Which the transport was told to do by the 00268 * publication code by calling 00269 * TransportClient::remove_sample(). If the sample was 00270 * "sending" then it is moved to the "unsent" list. If there are any 00271 * threads waiting for available space then it needs wake up 00272 * these threads. The dropped_by_transport flag true indicates 00273 * the dropping initiated by transport when the transport send 00274 * strategy is in a MODE_TERMINATED. The dropped_by_transport 00275 * flag false indicates the dropping is initiated by the 00276 * remove_sample and data_dropped() is a result of 00277 * remove_sample(). 00278 */ 00279 void data_dropped(const DataSampleElement* element, 00280 bool dropped_by_transport); 00281 00282 DDS::ReturnCode_t obtain_buffer_for_control(DataSampleElement*& element); 00283 00284 /** 00285 * Allocate a DataSampleElement object and check the space 00286 * availability for newly allocated element according to qos settings. 00287 * For the blocking write case, if resource limits or history qos limits 00288 * are reached, then it blocks for max blocking time for a previous sample 00289 * to be delivered or dropped by the transport. In non-blocking write 00290 * case, if resource limits or history qos limits are reached, will attempt 00291 * to remove oldest samples (forcing the transport to drop samples if necessary) 00292 * to make space. If there are several threads waiting then 00293 * the first one in the waiting list can enqueue, others continue 00294 * waiting. 00295 */ 00296 DDS::ReturnCode_t obtain_buffer( 00297 DataSampleElement*& element, 00298 DDS::InstanceHandle_t handle); 00299 00300 /** 00301 * Release the memory previously allocated. 00302 * This method is corresponding to the obtain_buffer method. If 00303 * the memory is allocated by some allocator then the memory 00304 * needs to be released to the allocator. 00305 */ 00306 void release_buffer(DataSampleElement* element); 00307 00308 /** 00309 * Unregister all instances managed by this data containers. 00310 */ 00311 void unregister_all(); 00312 00313 /** 00314 * @todo remove/document this! 00315 */ 00316 PublicationInstance* get_handle_instance( 00317 DDS::InstanceHandle_t handle); 00318 00319 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE 00320 /** 00321 * Copy sent data to data DURABILITY cache. 00322 */ 00323 bool persist_data(); 00324 #endif 00325 00326 /// Reset time interval for each instance. 00327 void reschedule_deadline(); 00328 00329 /** 00330 * Block until pending samples have either been delivered 00331 * or dropped. 00332 */ 00333 void wait_pending(); 00334 00335 /** 00336 * Returns a vector of handles for the instances registered for this 00337 * data writer. 00338 */ 00339 typedef OPENDDS_VECTOR(DDS::InstanceHandle_t) InstanceHandleVec; 00340 void get_instance_handles(InstanceHandleVec& instance_handles); 00341 00342 DDS::ReturnCode_t wait_ack_of_seq(const ACE_Time_Value& abs_deadline, const SequenceNumber& sequence); 00343 00344 bool sequence_acknowledged(const SequenceNumber sequence); 00345 00346 private: 00347 00348 // A class, normally provided by an unit test, that needs access to 00349 // private methods/members. 00350 friend class ::DDS_TEST; 00351 00352 // -------------------------- 00353 // Preventing copying 00354 // -------------------------- 00355 WriteDataContainer(WriteDataContainer const &); 00356 WriteDataContainer & operator= (WriteDataContainer const &); 00357 // -------------------------- 00358 00359 void copy_and_append(SendStateDataSampleList& list, 00360 const SendStateDataSampleList& appended, 00361 const RepoId& reader_id, 00362 const DDS::LifespanQosPolicy& lifespan 00363 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC 00364 , 00365 const OPENDDS_STRING& filterClassName, 00366 const FilterEvaluator* eval, 00367 const DDS::StringSeq& params 00368 #endif 00369 ); 00370 /** 00371 * Remove the sample (head) from the instance list if it is only being 00372 * used for history purposes (located on sent_data_ list). 00373 * This method also updates the internal lists to reflect 00374 * the change. 00375 * The "released" boolean value indicates whether a sample was 00376 * released. 00377 */ 00378 DDS::ReturnCode_t remove_oldest_historical_sample( 00379 InstanceDataSampleList& instance_list, 00380 bool& released); 00381 00382 /** 00383 * Remove the oldest sample (head) from the instance history list. 00384 * This method also updates the internal lists to reflect 00385 * the change. 00386 * If the sample is in the unsent_data_ or sent_data_ list then 00387 * it will be released. If the sample is in the sending_data_ list 00388 * then the transport will be notified to release the sample, then 00389 * the sample will be released. Otherwise an error 00390 * is returned. 00391 * The "released" boolean value indicates whether the sample is 00392 * released. 00393 */ 00394 DDS::ReturnCode_t remove_oldest_sample( 00395 InstanceDataSampleList& instance_list, 00396 bool& released); 00397 00398 /** 00399 * Called when data has been dropped or delivered and any 00400 * blocked writers should be notified 00401 */ 00402 void wakeup_blocking_writers (DataSampleElement* stale); 00403 00404 private: 00405 00406 void log_send_state_lists (OPENDDS_STRING description); 00407 00408 DisjointSequence acked_sequences_; 00409 00410 /// List of data that has not been sent yet. 00411 SendStateDataSampleList unsent_data_; 00412 00413 /// Id used to keep track of which send transaction 00414 /// DataWriter is currently creating 00415 ACE_UINT64 transaction_id_; 00416 00417 /// List of data that is currently being sent. 00418 SendStateDataSampleList sending_data_; 00419 00420 /// List of data that has already been sent. 00421 SendStateDataSampleList sent_data_; 00422 00423 /// List of data that has been released by WriteDataContainer 00424 /// but is still in process of delivery (or dropping) by transport 00425 SendStateDataSampleList orphaned_to_transport_; 00426 00427 /// The list of all samples written to this datawriter in 00428 /// writing order. 00429 WriterDataSampleList data_holder_; 00430 00431 /// List of the data reenqueued to support the 00432 /// TRANSIENT_LOCAL_DURABILITY_QOS policy. It duplicates the 00433 /// samples in sent and sending list. This 00434 /// list will be passed to the transport for re-sending. 00435 SendStateDataSampleList resend_data_; 00436 00437 /// The individual instance queue threads in the data. 00438 PublicationInstanceMapType instances_; 00439 00440 /// The publication Id from repo. 00441 PublicationId publication_id_; 00442 00443 /// The writer that owns this container. 00444 DataWriterImpl* writer_; 00445 00446 /// The maximum size a container should allow for 00447 /// an instance sample list 00448 /// It corresponds to the QoS.HISTORY.depth value for 00449 /// QoS.HISTORY.kind==KEEP_LAST and corresponds to the 00450 /// QoS.RESOURCE_LIMITS.max_samples_per_instance for 00451 /// the case of QoS.HISTORY.kind==KEEP_ALL. 00452 CORBA::Long depth_; 00453 00454 /// The maximum number of instances allowed or zero 00455 /// to indicate unlimited. 00456 /// It corresponds to the QoS.RESOURCE_LIMITS.max_instances 00457 /// when QoS.RELIABILITY.kind == DDS::RELIABLE_RELIABILITY_QOS 00458 CORBA::Long max_num_instances_; 00459 00460 /// The maximum number of samples allowed or zero 00461 /// to indicate unlimited. 00462 /// It corresponds to the QoS.RESOURCE_LIMITS.max_instances 00463 /// when QoS.RELIABILITY.kind == DDS::RELIABLE_RELIABILITY_QOS 00464 /// It also covers QoS.RESOURCE_LIMITS.max_samples and 00465 /// max_instances * depth 00466 CORBA::Long max_num_samples_; 00467 00468 /// The maximum time to block on write operation. 00469 /// This comes from DataWriter's QoS HISTORY.max_blocking_time 00470 ::DDS::Duration_t max_blocking_time_; 00471 00472 /// The block waiting flag. 00473 bool waiting_on_release_; 00474 00475 /// This lock is used to protect the container and the map 00476 /// in the type-specific DataWriter. 00477 /// This lock can be accessible via the datawriter. 00478 /// This lock is made to be globally accessible for 00479 /// performance concern. The lock is acquired as the external 00480 /// call (e.g. FooDataWriterImpl::write) started and the 00481 /// same lock will be used by the transport thread to notify 00482 /// the datawriter the data is delivered. Other internal 00483 /// operations will not lock. 00484 ACE_Recursive_Thread_Mutex lock_; 00485 ACE_Condition<ACE_Recursive_Thread_Mutex> condition_; 00486 ACE_Condition<ACE_Recursive_Thread_Mutex> empty_condition_; 00487 00488 /// Lock used for wait_for_acks() processing. 00489 ACE_Thread_Mutex wfa_lock_; 00490 00491 /// Used to block in wait_for_acks(). 00492 ACE_Condition<ACE_Thread_Mutex> wfa_condition_; 00493 00494 /// The number of chunks that sample_list_element_allocator_ 00495 /// needs initialize. 00496 size_t n_chunks_; 00497 00498 /// The cached allocator to allocate DataSampleElement 00499 /// objects. 00500 DataSampleElementAllocator sample_list_element_allocator_; 00501 00502 /// The allocator for TransportSendElement. 00503 /// The TransportSendElement allocator is put here because it 00504 /// needs the number of chunks information that WriteDataContainer 00505 /// has. 00506 TransportSendElementAllocator transport_send_element_allocator_; 00507 00508 TransportCustomizedElementAllocator transport_customized_element_allocator_; 00509 00510 /// The flag indicates the datawriter will be destroyed. 00511 bool shutdown_; 00512 00513 /// Domain ID. 00514 DDS::DomainId_t const domain_id_; 00515 00516 /// Topic name. 00517 char const * const topic_name_; 00518 00519 /// Type name. 00520 char const * const type_name_; 00521 00522 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE 00523 00524 /// Pointer to the data durability cache. 00525 /** 00526 * This a pointer to the data durability cache owned by the 00527 * Service Participant singleton, which means this cache is also 00528 * a singleton. 00529 */ 00530 DataDurabilityCache * const durability_cache_; 00531 00532 /// DURABILITY_SERVICE QoS specific to the DataWriter. 00533 DDS::DurabilityServiceQosPolicy const & durability_service_; 00534 00535 #endif 00536 }; 00537 00538 } /// namespace OpenDDS 00539 } /// namespace DCPS 00540 00541 #endif /* OPENDDS_DCPS_WRITE_DATA_CONTAINER_H */