00001 /* 00002 * Distributed under the OpenDDS License. 00003 * See: http://www.opendds.org/license.html 00004 */ 00005 #ifndef OPENDDS_DCPS_DATASAMPLEELEMENT_H 00006 #define OPENDDS_DCPS_DATASAMPLEELEMENT_H 00007 00008 #include "dds/DdsDcpsInfoUtilsC.h" 00009 #include "Definitions.h" 00010 #include "transport/framework/TransportDefs.h" 00011 #include "Dynamic_Cached_Allocator_With_Overflow_T.h" 00012 #include "DataSampleHeader.h" 00013 #include "dds/DCPS/PoolAllocator.h" 00014 #include "dds/DCPS/PoolAllocationBase.h" 00015 #include "dds/DCPS/RcHandle_T.h" 00016 #include "dds/DCPS/Message_Block_Ptr.h" 00017 00018 class DDS_TEST; 00019 00020 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL 00021 00022 namespace OpenDDS { 00023 namespace DCPS { 00024 00025 const CORBA::ULong MAX_READERS_PER_ELEM = 5; 00026 00027 class DataSampleElement; 00028 typedef Cached_Allocator_With_Overflow<DataSampleElement, ACE_Null_Mutex> 00029 DataSampleElementAllocator; 00030 00031 class TransportSendListener; 00032 struct PublicationInstance; 00033 typedef RcHandle<PublicationInstance> PublicationInstance_rch; 00034 00035 /** 00036 * Currently we contain entire messages in a single ACE_Message_Block 00037 * chain. 00038 */ 00039 typedef ACE_Message_Block DataSample; 00040 00041 /** 00042 * Wraps the marshaled message sample to be published, along with 00043 * the publication Id and Instance handle for downstream processing. 00044 * 00045 * Internally there are next/previous pointers that used for lists 00046 * InstanceDataSampleList, SendStateDataSampleList, and WriterDataSampleList. 00047 * These pointers are kept in this single element rather than having multiple smaller 00048 * lists in order to allow us to allocate once which will minimize locking. 00049 * Note that because the list pointers are stored within the element, 00050 * the element can simultaneously be in at most one InstanceDataSampleList list, one 00051 * SendStateDataSampleList list, and one WriterDataSampleList list. 00052 */ 00053 class OpenDDS_Dcps_Export DataSampleElement : public PoolAllocationBase { 00054 00055 00056 public: 00057 DataSampleElement(PublicationId publication_id, 00058 TransportSendListener* send_listener, 00059 PublicationInstance_rch handle); 00060 00061 DataSampleElement(const DataSampleElement& elem); 00062 DataSampleElement& operator=(const DataSampleElement& elem); 00063 00064 ~DataSampleElement(); 00065 00066 const DataSampleHeader& get_header() const; 00067 DataSampleHeader& get_header(); 00068 00069 DataSample* get_sample() const; 00070 DataSample* get_sample(); 00071 00072 void set_sample(Message_Block_Ptr sample); 00073 00074 PublicationId get_pub_id() const; 00075 00076 CORBA::ULong get_num_subs() const; 00077 00078 void set_num_subs(int num_subs); 00079 00080 const OpenDDS::DCPS::RepoId* get_sub_ids() const; 00081 OpenDDS::DCPS::RepoId get_sub_id(int index) const; 00082 00083 void set_sub_id(int index, OpenDDS::DCPS::RepoId id); 00084 00085 TransportSendListener* get_send_listener() const; 00086 TransportSendListener* get_send_listener(); 00087 00088 PublicationInstance_rch get_handle() const; 00089 00090 typedef OPENDDS_MAP(DataLinkIdType, GUIDSeq_var) DataLinkIdTypeGUIDMap; 00091 DataLinkIdTypeGUIDMap& get_filter_per_link(); 00092 00093 void set_filter_out(GUIDSeq *filter_out); 00094 00095 void set_transaction_id(ACE_UINT64 transaction_id); 00096 00097 ACE_UINT64 transaction_id() const; 00098 00099 private: 00100 00101 ACE_UINT64 transaction_id_; 00102 00103 /// The OpenDDS DCPS header for this sample 00104 DataSampleHeader header_; 00105 00106 /// Message being sent which includes the DataSampleHeader message block 00107 /// and DataSample message block. 00108 Message_Block_Ptr sample_; 00109 00110 /// Publication Id used downstream. 00111 PublicationId publication_id_; 00112 00113 CORBA::ULong num_subs_; 00114 00115 OpenDDS::DCPS::RepoId subscription_ids_[OpenDDS::DCPS::MAX_READERS_PER_ELEM]; 00116 00117 /// Pointer to object that will be informed when the data has 00118 /// been delivered. This needs to be set prior to using the 00119 /// TransportClient to send(). 00120 TransportSendListener* send_listener_; 00121 00122 /// The pointer to the object that contains the instance information 00123 /// and data sample list. 00124 /// The client holds this as an InstanceHandle_t. 00125 PublicationInstance_rch handle_; 00126 00127 //{@ 00128 /// tracking for Content-Filtering data 00129 GUIDSeq_var filter_out_; 00130 DataLinkIdTypeGUIDMap filter_per_link_; 00131 //@} 00132 00133 DataSampleElement* get_next_send_sample() const; 00134 00135 void set_next_send_sample(DataSampleElement* next_send_sample); 00136 00137 /// *DataSampleList(s) is in charge of managing list placement therefore 00138 /// needs access to pointers 00139 friend class SendStateDataSampleList; 00140 friend class WriterDataSampleList; 00141 friend class InstanceDataSampleList; 00142 friend class TransportClient; 00143 friend class ::DDS_TEST; 00144 /// Iterators needs access to prev/next pointers for iteration 00145 friend class SendStateDataSampleListIterator; 00146 friend class SendStateDataSampleListConstIterator; 00147 00148 00149 /// Used to make removal from the 00150 /// container _much_ more efficient. 00151 00152 /// Thread of all data within a DataWriter. 00153 mutable DataSampleElement* previous_writer_sample_; 00154 mutable DataSampleElement* next_writer_sample_; 00155 00156 /// Thread of data within the instance. 00157 mutable DataSampleElement* next_instance_sample_; 00158 mutable DataSampleElement* previous_instance_sample_; 00159 00160 /// Thread of data being unsent/sending/sent/released. 00161 mutable DataSampleElement* next_send_sample_; 00162 mutable DataSampleElement* previous_send_sample_; 00163 }; 00164 00165 00166 } // namespace DCPS 00167 } // namespace OpenDDS 00168 00169 OPENDDS_END_VERSIONED_NAMESPACE_DECL 00170 00171 #if defined(__ACE_INLINE__) 00172 #include "DataSampleElement.inl" 00173 #endif /* __ACE_INLINE__ */ 00174 00175 #endif /* OPENDDS_DCPS_DATASAMPLEELEMENT_H */