Line data Source code
1 : /*
2 : *
3 : *
4 : * Distributed under the OpenDDS License.
5 : * See: http://www.opendds.org/license.html
6 : */
7 :
8 : #ifndef OPENDDS_DCPS_TRANSPORT_FRAMEWORK_TRANSPORTCLIENT_H
9 : #define OPENDDS_DCPS_TRANSPORT_FRAMEWORK_TRANSPORTCLIENT_H
10 :
11 : #include "TransportConfig_rch.h"
12 : #include "TransportImpl.h"
13 : #include "DataLinkSet.h"
14 :
15 : #include <dds/DCPS/dcps_export.h>
16 : #include <dds/DCPS/AssociationData.h>
17 : #include <dds/DCPS/ReactorInterceptor.h>
18 : #include <dds/DCPS/Service_Participant.h>
19 : #include <dds/DCPS/PoolAllocator.h>
20 : #include <dds/DCPS/PoolAllocationBase.h>
21 : #include <dds/DCPS/DiscoveryListener.h>
22 : #include <dds/DCPS/RcEventHandler.h>
23 : #include <dds/DCPS/BuiltInTopicUtils.h>
24 :
25 : #include <ace/Time_Value.h>
26 : #include <ace/Event_Handler.h>
27 : #include <ace/Reverse_Lock_T.h>
28 :
29 : // Forward definition of a test-friendly class in the global name space
30 : class DDS_TEST;
31 :
32 : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
33 :
34 : namespace OpenDDS {
35 : namespace DCPS {
36 :
37 : class SendStateDataSampleList;
38 :
39 : /**
40 : * @brief Mix-in class for DDS entities which directly use the transport layer.
41 : *
42 : * DataReaderImpl and DataWriterImpl are TransportClients. The TransportClient
43 : * class manages the TransportImpl objects that represent the available
44 : * communication mechanisms and the DataLink objects that represent the
45 : * currently active communication channels to peers.
46 : */
47 : class OpenDDS_Dcps_Export TransportClient
48 : : public virtual RcObject
49 : {
50 : public:
51 : // Used by TransportImpl to complete associate() processing:
52 : void use_datalink(const GUID_t& remote_id, const DataLink_rch& link);
53 :
54 : // values for flags parameter of transport_assoc_done():
55 : enum { ASSOC_OK = 1, ASSOC_ACTIVE = 2 };
56 : TransportClient();
57 : virtual ~TransportClient();
58 :
59 :
60 : // Local setup:
61 :
62 : void enable_transport(bool reliable, bool durable);
63 : void enable_transport_using_config(bool reliable, bool durable,
64 : const TransportConfig_rch& tc);
65 :
66 0 : bool swap_bytes() const { return swap_bytes_; }
67 0 : bool cdr_encapsulation() const { return cdr_encapsulation_; }
68 0 : const TransportLocatorSeq& connection_info() const { return conn_info_; }
69 : void populate_connection_info();
70 : bool is_reliable() const { return reliable_; }
71 :
72 : // Managing associations to remote peers:
73 :
74 : bool associate(const AssociationData& peer, bool active);
75 : void disassociate(const GUID_t& peerId);
76 : void stop_associating();
77 : void stop_associating(const GUID_t* repos, CORBA::ULong length);
78 : void send_final_acks();
79 : void transport_stop();
80 :
81 : // Discovery:
82 : void register_for_reader(const GUID_t& participant,
83 : const GUID_t& writerid,
84 : const GUID_t& readerid,
85 : const TransportLocatorSeq& locators,
86 : OpenDDS::DCPS::DiscoveryListener* listener);
87 :
88 : void unregister_for_reader(const GUID_t& participant,
89 : const GUID_t& writerid,
90 : const GUID_t& readerid);
91 :
92 : void register_for_writer(const GUID_t& participant,
93 : const GUID_t& readerid,
94 : const GUID_t& writerid,
95 : const TransportLocatorSeq& locators,
96 : DiscoveryListener* listener);
97 :
98 : void unregister_for_writer(const GUID_t& participant,
99 : const GUID_t& readerid,
100 : const GUID_t& writerid);
101 :
102 : void update_locators(const GUID_t& remote,
103 : const TransportLocatorSeq& locators);
104 :
105 : WeakRcHandle<ICE::Endpoint> get_ice_endpoint();
106 :
107 : // Data transfer:
108 :
109 : bool send_response(const GUID_t& peer,
110 : const DataSampleHeader& header,
111 : Message_Block_Ptr payload); // [DR]
112 :
113 : void send(SendStateDataSampleList send_list, ACE_UINT64 transaction_id = 0);
114 :
115 : SendControlStatus send_w_control(SendStateDataSampleList send_list,
116 : const DataSampleHeader& header,
117 : Message_Block_Ptr msg,
118 : const GUID_t& destination);
119 :
120 : SendControlStatus send_control(const DataSampleHeader& header,
121 : Message_Block_Ptr msg);
122 :
123 : SendControlStatus send_control_to(const DataSampleHeader& header,
124 : Message_Block_Ptr msg,
125 : const GUID_t& destination);
126 :
127 : bool remove_sample(const DataSampleElement* sample);
128 : bool remove_all_msgs();
129 :
130 : virtual void add_link(const DataLink_rch& link, const GUID_t& peer);
131 : virtual GUID_t get_guid() const = 0;
132 0 : virtual RcHandle<BitSubscriber> get_builtin_subscriber_proxy() const { return RcHandle<BitSubscriber>(); }
133 :
134 : void terminate_send_if_suspended();
135 :
136 : bool associated_with(const GUID_t& remote) const;
137 : bool pending_association_with(const GUID_t& remote) const;
138 :
139 0 : GUID_t repo_id() const
140 : {
141 0 : ACE_Guard<ACE_Thread_Mutex> guard(lock_);
142 0 : return repo_id_;
143 0 : }
144 :
145 : void data_acked(const GUID_t& remote);
146 :
147 : bool is_leading(const GUID_t& reader_id) const;
148 :
149 : protected:
150 0 : void cdr_encapsulation(bool encap)
151 : {
152 0 : cdr_encapsulation_ = encap;
153 0 : }
154 :
155 : private:
156 :
157 : // Implemented by derived classes (DataReaderImpl/DataWriterImpl)
158 : virtual bool check_transport_qos(const TransportInst& inst) = 0;
159 : virtual DDS::DomainId_t domain_id() const = 0;
160 : virtual Priority get_priority_value(const AssociationData& data) const = 0;
161 0 : virtual void transport_assoc_done(int /*flags*/, const GUID_t& /*remote*/) {}
162 0 : virtual SequenceNumber get_max_sn() const { return SequenceNumber::SEQUENCENUMBER_UNKNOWN(); };
163 :
164 :
165 :
166 : #if defined(OPENDDS_SECURITY)
167 0 : virtual DDS::Security::ParticipantCryptoHandle get_crypto_handle() const
168 : {
169 0 : return DDS::HANDLE_NIL;
170 : }
171 : #endif
172 :
173 : // helpers
174 : typedef ACE_Guard<ACE_Thread_Mutex> Guard;
175 : void use_datalink_i(const GUID_t& remote_id,
176 : const DataLink_rch& link,
177 : Guard& guard);
178 : TransportSendListener_rch get_send_listener();
179 : TransportReceiveListener_rch get_receive_listener();
180 :
181 : //helper for initiating connection, called by PendingAssoc objects
182 : //allows PendingAssoc to temporarily release lock_ to allow
183 : //TransportImpl to access Reactor if needed
184 : bool initiate_connect_i(TransportImpl::AcceptConnectResult& result,
185 : TransportImpl_rch impl,
186 : const TransportImpl::RemoteTransport& remote,
187 : const TransportImpl::ConnectionAttribs& attribs_,
188 : Guard& guard);
189 :
190 : void send_i(SendStateDataSampleList send_list, ACE_UINT64 transaction_id);
191 :
192 : // A class, normally provided by an unit test, who needs access to a client's
193 : // privates.
194 : friend class ::DDS_TEST;
195 :
196 : typedef OPENDDS_MAP_CMP(GUID_t, DataLink_rch, GUID_tKeyLessThan) DataLinkIndex;
197 : typedef OPENDDS_VECTOR(WeakRcHandle<TransportImpl>) ImplsType;
198 :
199 : typedef ACE_Reverse_Lock<ACE_Thread_Mutex> Reverse_Lock_t;
200 : struct PendingAssoc : RcEventHandler {
201 : ACE_Thread_Mutex mutex_;
202 : bool active_, scheduled_;
203 : ImplsType impls_;
204 : CORBA::ULong blob_index_;
205 : AssociationData data_;
206 : TransportImpl::ConnectionAttribs attribs_;
207 : WeakRcHandle<TransportClient> client_;
208 :
209 0 : explicit PendingAssoc(RcHandle<TransportClient> tc_rch)
210 0 : : active_(false)
211 0 : , scheduled_(false)
212 0 : , blob_index_(0)
213 0 : , client_(tc_rch)
214 0 : {}
215 :
216 : void reset_client();
217 : bool safe_to_remove();
218 : bool initiate_connect(TransportClient* tc, Guard& guard);
219 : int handle_timeout(const ACE_Time_Value& time, const void* arg);
220 : };
221 :
222 : typedef RcHandle<PendingAssoc> PendingAssoc_rch;
223 :
224 : typedef OPENDDS_MAP_CMP(GUID_t, PendingAssoc_rch, GUID_tKeyLessThan) PendingMap;
225 : typedef OPENDDS_MULTIMAP_CMP(GUID_t, PendingAssoc_rch, GUID_tKeyLessThan) PrevPendingMap;
226 :
227 : void clean_prev_pending();
228 :
229 : class PendingAssocTimer : public ReactorInterceptor {
230 : public:
231 3 : PendingAssocTimer(ACE_Reactor* reactor,
232 : ACE_thread_t owner)
233 3 : : ReactorInterceptor(reactor, owner)
234 3 : , timer_id_(-1)
235 3 : { }
236 :
237 0 : void schedule_timer(TransportClient_rch transport_client, const PendingAssoc_rch& pend)
238 : {
239 0 : execute_or_enqueue(make_rch<ScheduleCommand>(this, transport_client, pend));
240 0 : }
241 :
242 0 : ReactorInterceptor::CommandPtr cancel_timer(const PendingAssoc_rch& pend)
243 : {
244 0 : return execute_or_enqueue(make_rch<CancelCommand>(this, pend));
245 : }
246 :
247 0 : void set_id(long id) { timer_id_ = id; }
248 0 : long get_id() const { return timer_id_; }
249 :
250 0 : virtual bool reactor_is_shut_down() const
251 : {
252 0 : return TheServiceParticipant->is_shut_down();
253 : }
254 :
255 : private:
256 6 : ~PendingAssocTimer()
257 6 : { }
258 :
259 : class CommandBase : public Command {
260 : public:
261 0 : CommandBase(PendingAssocTimer* timer,
262 : const PendingAssoc_rch& assoc)
263 0 : : timer_ (timer)
264 0 : , assoc_ (assoc)
265 0 : { }
266 : protected:
267 : PendingAssocTimer* timer_;
268 : PendingAssoc_rch assoc_;
269 : };
270 : struct ScheduleCommand : public CommandBase {
271 0 : ScheduleCommand(PendingAssocTimer* timer,
272 : TransportClient_rch transport_client,
273 : const PendingAssoc_rch& assoc)
274 0 : : CommandBase (timer, assoc)
275 0 : , transport_client_ (transport_client)
276 0 : { }
277 0 : virtual void execute()
278 : {
279 0 : if (timer_->reactor()) {
280 0 : TransportClient_rch client = transport_client_.lock();
281 0 : if (client) {
282 0 : ACE_Guard<ACE_Thread_Mutex> guard(assoc_->mutex_);
283 0 : assoc_->scheduled_ = true;
284 0 : long id = timer_->reactor()->schedule_timer(assoc_.in(),
285 0 : client.in(),
286 0 : client->passive_connect_duration_.value());
287 0 : if (id != -1) {
288 0 : timer_->set_id(id);
289 : }
290 0 : }
291 0 : }
292 0 : }
293 : WeakRcHandle<TransportClient> transport_client_;
294 : };
295 : struct CancelCommand : public CommandBase {
296 0 : CancelCommand(PendingAssocTimer* timer,
297 : const PendingAssoc_rch& assoc)
298 0 : : CommandBase (timer, assoc)
299 0 : { }
300 0 : virtual void execute()
301 : {
302 0 : if (timer_->reactor() && timer_->get_id()) {
303 0 : ACE_Guard<ACE_Thread_Mutex> guard(assoc_->mutex_);
304 0 : timer_->reactor()->cancel_timer(timer_->get_id());
305 0 : timer_->set_id(-1);
306 0 : assoc_->scheduled_ = false;
307 0 : }
308 0 : }
309 : };
310 : long timer_id_;
311 : };
312 : RcHandle<PendingAssocTimer> pending_assoc_timer_;
313 :
314 : // Associated Impls and DataLinks:
315 :
316 : TransportConfig_rch config_;
317 : ImplsType impls_;
318 : PendingMap pending_;
319 : PrevPendingMap prev_pending_;
320 : DataLinkSet links_;
321 :
322 : DataLinkIndex data_link_index_;
323 :
324 : // Used to allow sends to completed as a transaction and block
325 : // multi-threaded writers from proceeding to send data
326 : // on two thread simultaneously, which could cause out-of-order data.
327 : ACE_Thread_Mutex send_transaction_lock_;
328 : ACE_UINT64 expected_transaction_id_;
329 : ACE_UINT64 max_transaction_id_seen_;
330 :
331 : //max_transaction_tail_ will always be the tail of the
332 : //max transaction that has been observed or 0 if this is
333 : //the first transaction or a transaction after the expected
334 : //value was met and thus reset to 0 indicating the samples were
335 : //sent up to max_transaction_id_
336 : DataSampleElement* max_transaction_tail_;
337 :
338 : // Configuration details:
339 :
340 : bool swap_bytes_, cdr_encapsulation_, reliable_, durable_;
341 :
342 : TimeDuration passive_connect_duration_;
343 :
344 : TransportLocatorSeq conn_info_;
345 :
346 : /// Seems to protect accesses to impls_, pending_, links_, data_link_index_
347 : mutable ACE_Thread_Mutex lock_;
348 :
349 : Reverse_Lock_t reverse_lock_;
350 :
351 : GUID_t repo_id_;
352 : };
353 :
354 : typedef RcHandle<TransportClient> TransportClient_rch;
355 :
356 : }
357 : }
358 :
359 : OPENDDS_END_VERSIONED_NAMESPACE_DECL
360 :
361 : #endif
|