00001 /* 00002 * Distributed under the OpenDDS License. 00003 * See: http://www.opendds.org/license.html 00004 */ 00005 #ifndef OPENDDS_DCPS_DATASAMPLEELEMENT_H 00006 #define OPENDDS_DCPS_DATASAMPLEELEMENT_H 00007 00008 #include "dds/DdsDcpsInfoUtilsC.h" 00009 #include "Definitions.h" 00010 #include "transport/framework/TransportDefs.h" 00011 #include "Dynamic_Cached_Allocator_With_Overflow_T.h" 00012 #include "DataSampleHeader.h" 00013 #include "dds/DCPS/PoolAllocator.h" 00014 #include "dds/DCPS/PoolAllocationBase.h" 00015 00016 class DDS_TEST; 00017 00018 namespace OpenDDS { 00019 namespace DCPS { 00020 00021 const CORBA::ULong MAX_READERS_PER_ELEM = 5; 00022 00023 class DataSampleElement; 00024 typedef Cached_Allocator_With_Overflow<DataSampleElement, ACE_Null_Mutex> 00025 DataSampleElementAllocator; 00026 00027 typedef Dynamic_Cached_Allocator_With_Overflow<ACE_Thread_Mutex> 00028 TransportSendElementAllocator; 00029 00030 class TransportCustomizedElement; 00031 typedef Dynamic_Cached_Allocator_With_Overflow<ACE_Thread_Mutex> 00032 TransportCustomizedElementAllocator; 00033 00034 class TransportSendListener; 00035 struct PublicationInstance; 00036 00037 /** 00038 * Currently we contain entire messages in a single ACE_Message_Block 00039 * chain. 00040 */ 00041 typedef ACE_Message_Block DataSample; 00042 00043 /** 00044 * Wraps the marshaled message sample to be published, along with 00045 * the publication Id and Instance handle for downstream processing. 00046 * 00047 * Internally there are next/previous pointers that used for lists 00048 * InstanceDataSampleList, SendStateDataSampleList, and WriterDataSampleList. 00049 * These pointers are kept in this single element rather than having multiple smaller 00050 * lists in order to allow us to allocate once which will minimize locking. 00051 * Note that because the list pointers are stored within the element, 00052 * the element can simultaneously be in at most one InstanceDataSampleList list, one 00053 * SendStateDataSampleList list, and one WriterDataSampleList list. 00054 */ 00055 class OpenDDS_Dcps_Export DataSampleElement : public PoolAllocationBase { 00056 00057 00058 public: 00059 DataSampleElement(PublicationId publication_id, 00060 TransportSendListener* send_listener, 00061 PublicationInstance* handle, 00062 TransportSendElementAllocator* tse_allocator, 00063 TransportCustomizedElementAllocator* tce_allocator); 00064 00065 DataSampleElement(const DataSampleElement& elem); 00066 DataSampleElement& operator=(const DataSampleElement& elem); 00067 00068 ~DataSampleElement(); 00069 00070 const DataSampleHeader& get_header() const; 00071 DataSampleHeader& get_header(); 00072 00073 DataSample* get_sample() const; 00074 DataSample* get_sample(); 00075 00076 void set_sample(DataSample* sample); 00077 00078 PublicationId get_pub_id() const; 00079 00080 CORBA::ULong get_num_subs() const; 00081 00082 void set_num_subs(int num_subs); 00083 00084 const OpenDDS::DCPS::RepoId* get_sub_ids() const; 00085 OpenDDS::DCPS::RepoId get_sub_id(int index) const; 00086 00087 void set_sub_id(int index, OpenDDS::DCPS::RepoId id); 00088 00089 TransportSendListener* get_send_listener() const; 00090 TransportSendListener* get_send_listener(); 00091 00092 PublicationInstance* get_handle() const; 00093 00094 TransportSendElementAllocator* get_transport_send_element_allocator() const; 00095 00096 TransportCustomizedElementAllocator* get_transport_customized_element_allocator() const; 00097 00098 typedef OPENDDS_MAP(DataLinkIdType, GUIDSeq_var) DataLinkIdTypeGUIDMap; 00099 DataLinkIdTypeGUIDMap& get_filter_per_link(); 00100 00101 void set_filter_out(GUIDSeq *filter_out); 00102 00103 void set_transaction_id(ACE_UINT64 transaction_id); 00104 00105 ACE_UINT64 transaction_id() const; 00106 00107 private: 00108 00109 ACE_UINT64 transaction_id_; 00110 00111 /// The OpenDDS DCPS header for this sample 00112 DataSampleHeader header_; 00113 00114 /// Message being sent which includes the DataSampleHeader message block 00115 /// and DataSample message block. 00116 DataSample* sample_; 00117 00118 /// Publication Id used downstream. 00119 PublicationId publication_id_; 00120 00121 CORBA::ULong num_subs_; 00122 00123 OpenDDS::DCPS::RepoId subscription_ids_[OpenDDS::DCPS::MAX_READERS_PER_ELEM]; 00124 00125 /// Pointer to object that will be informed when the data has 00126 /// been delivered. This needs to be set prior to using the 00127 /// TransportClient to send(). 00128 TransportSendListener* send_listener_; 00129 00130 /// The pointer to the object that contains the instance information 00131 /// and data sample list. 00132 /// The client holds this as an InstanceHandle_t. 00133 PublicationInstance* handle_; 00134 00135 /// Allocator for the TransportSendElement. 00136 TransportSendElementAllocator* transport_send_element_allocator_; 00137 00138 /// Allocator for TransportCustomizedElement 00139 TransportCustomizedElementAllocator* transport_customized_element_allocator_; 00140 00141 //{@ 00142 /// tracking for Content-Filtering data 00143 GUIDSeq_var filter_out_; 00144 DataLinkIdTypeGUIDMap filter_per_link_; 00145 //@} 00146 00147 DataSampleElement* get_next_send_sample() const; 00148 00149 void set_next_send_sample(DataSampleElement* next_send_sample); 00150 00151 /// *DataSampleList(s) is in charge of managing list placement therefore 00152 /// needs access to pointers 00153 friend class SendStateDataSampleList; 00154 friend class WriterDataSampleList; 00155 friend class InstanceDataSampleList; 00156 friend class TransportClient; 00157 friend class ::DDS_TEST; 00158 /// Iterators needs access to prev/next pointers for iteration 00159 friend class SendStateDataSampleListIterator; 00160 friend class SendStateDataSampleListConstIterator; 00161 00162 00163 /// Used to make removal from the 00164 /// container _much_ more efficient. 00165 00166 /// Thread of all data within a DataWriter. 00167 mutable DataSampleElement* previous_writer_sample_; 00168 mutable DataSampleElement* next_writer_sample_; 00169 00170 /// Thread of data within the instance. 00171 mutable DataSampleElement* next_instance_sample_; 00172 00173 /// Thread of data being unsent/sending/sent/released. 00174 mutable DataSampleElement* next_send_sample_; 00175 mutable DataSampleElement* previous_send_sample_; 00176 }; 00177 00178 00179 } // namespace DCPS 00180 } // namespace OpenDDS 00181 00182 #if defined(__ACE_INLINE__) 00183 #include "DataSampleElement.inl" 00184 #endif /* __ACE_INLINE__ */ 00185 00186 #endif /* OPENDDS_DCPS_DATASAMPLEELEMENT_H */