00001
00002
00003
00004
00005
00006
00007
00008 #ifndef OPENDDS_DCPS_TRANSPORTQUEUEELEMENT_H
00009 #define OPENDDS_DCPS_TRANSPORTQUEUEELEMENT_H
00010
00011 #include "dds/DCPS/dcps_export.h"
00012 #include "dds/DCPS/Definitions.h"
00013 #include "dds/DCPS/GuidUtils.h"
00014 #include "ace/Synch.h"
00015 #include "dds/DCPS/PoolAllocationBase.h"
00016
00017 #include <utility>
00018
00019 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
00020 class ACE_Message_Block;
00021 ACE_END_VERSIONED_NAMESPACE_DECL
00022
00023 namespace OpenDDS {
00024 namespace DCPS {
00025
00026 class DataSampleElement;
00027
00028 class TransportQueueElement;
00029 typedef std::pair<TransportQueueElement*, TransportQueueElement*> ElementPair;
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045
00046
00047
00048 class OpenDDS_Dcps_Export TransportQueueElement : public PoolAllocationBase {
00049 public:
00050
00051 virtual ~TransportQueueElement();
00052
00053 class OpenDDS_Dcps_Export MatchCriteria {
00054 protected:
00055 virtual ~MatchCriteria();
00056 MatchCriteria() {}
00057 public:
00058 virtual bool matches(const TransportQueueElement& candidate) const = 0;
00059 virtual bool unique() const = 0;
00060 private:
00061 MatchCriteria(const MatchCriteria&);
00062 MatchCriteria& operator=(const MatchCriteria&);
00063 };
00064
00065 class OpenDDS_Dcps_Export MatchOnPubId : public MatchCriteria {
00066 public:
00067 explicit MatchOnPubId(const RepoId& id) : pub_id_(id) {}
00068 virtual ~MatchOnPubId();
00069 virtual bool matches(const TransportQueueElement& candidate) const;
00070 virtual bool unique() const { return false; }
00071 private:
00072 RepoId pub_id_;
00073 };
00074
00075 class OpenDDS_Dcps_Export MatchOnDataPayload : public MatchCriteria {
00076 public:
00077 explicit MatchOnDataPayload(const char* data) : data_(data) {}
00078 virtual ~MatchOnDataPayload();
00079 virtual bool matches(const TransportQueueElement& candidate) const;
00080 virtual bool unique() const { return true; }
00081 private:
00082 const char* data_;
00083 };
00084
00085
00086
00087
00088
00089
00090
00091
00092 bool data_dropped(bool dropped_by_transport = false);
00093
00094
00095
00096 bool data_delivered();
00097
00098
00099 virtual bool requires_exclusive_packet() const;
00100
00101
00102 virtual RepoId publication_id() const = 0;
00103
00104
00105 virtual RepoId subscription_id() const {
00106 return GUID_UNKNOWN;
00107 }
00108
00109 virtual SequenceNumber sequence() const {
00110 return SequenceNumber::SEQUENCENUMBER_UNKNOWN();
00111 }
00112
00113
00114 virtual const ACE_Message_Block* msg() const = 0;
00115
00116
00117 virtual const ACE_Message_Block* msg_payload() const = 0;
00118
00119
00120 virtual bool is_control(RepoId pub_id) const;
00121
00122
00123 bool released() const;
00124 void released(bool flag);
00125
00126
00127
00128 static ACE_Message_Block* clone_mb(const ACE_Message_Block* msg,
00129 MessageBlockAllocator* mb_allocator,
00130 DataBlockAllocator* db_allocator);
00131
00132
00133 virtual bool owned_by_transport() = 0;
00134
00135
00136
00137
00138
00139
00140
00141
00142 virtual ElementPair fragment(size_t size);
00143
00144
00145 virtual bool is_fragment() const { return false; }
00146
00147 protected:
00148
00149
00150
00151 explicit TransportQueueElement(unsigned long initial_count);
00152
00153
00154 virtual void release_element(bool dropped_by_transport) = 0;
00155
00156
00157
00158
00159 bool was_dropped() const;
00160
00161 private:
00162
00163
00164 bool decision_made(bool dropped_by_transport);
00165 friend class TransportCustomizedElement;
00166
00167
00168 ACE_Atomic_Op<ACE_Thread_Mutex, unsigned long> sub_loan_count_;
00169
00170
00171 bool dropped_;
00172
00173
00174 bool released_;
00175 };
00176
00177 }
00178 }
00179
00180 #if defined(__ACE_INLINE__)
00181 #include "TransportQueueElement.inl"
00182 #endif
00183
00184 #endif