00001
00002
00003
00004
00005
00006
00007
00008 #ifndef OPENDDS_DCPS_TRANSPORT_CLIENT_H
00009 #define OPENDDS_DCPS_TRANSPORT_CLIENT_H
00010
00011 #include "dds/DCPS/dcps_export.h"
00012 #include "TransportConfig_rch.h"
00013 #include "TransportImpl.h"
00014 #include "DataLinkSet.h"
00015
00016 #include "dds/DCPS/AssociationData.h"
00017 #include "dds/DCPS/ReactorInterceptor.h"
00018 #include "dds/DCPS/Service_Participant.h"
00019 #include "dds/DCPS/PoolAllocator.h"
00020 #include "dds/DCPS/PoolAllocationBase.h"
00021 #include "dds/DCPS/DiscoveryListener.h"
00022 #include "dds/DCPS/RcEventHandler.h"
00023
00024 #include "ace/Time_Value.h"
00025 #include "ace/Event_Handler.h"
00026 #include "ace/Reverse_Lock_T.h"
00027
00028
00029 class DDS_TEST;
00030
00031 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00032
00033 namespace OpenDDS {
00034 namespace DCPS {
00035
00036 class EntityImpl;
00037 class TransportInst;
00038 class AssocationInfo;
00039 class ReaderIdSeq;
00040 class WriterIdSeq;
00041 class SendStateDataSampleList;
00042 class SendStateDataSampleListIterator;
00043
00044
00045
00046
00047
00048
00049
00050
00051
00052 class OpenDDS_Dcps_Export TransportClient
00053 : public virtual RcObject
00054 {
00055 public:
00056
00057 void use_datalink(const RepoId& remote_id, const DataLink_rch& link);
00058
00059
00060 enum { ASSOC_OK = 1, ASSOC_ACTIVE = 2 };
00061 TransportClient();
00062 virtual ~TransportClient();
00063
00064
00065
00066
00067 void enable_transport(bool reliable, bool durable);
00068 void enable_transport_using_config(bool reliable, bool durable,
00069 const TransportConfig_rch& tc);
00070
00071 bool swap_bytes() const { return swap_bytes_; }
00072 bool cdr_encapsulation() const { return cdr_encapsulation_; }
00073 const TransportLocatorSeq& connection_info() const { return conn_info_; }
00074
00075
00076
00077 bool associate(const AssociationData& peer, bool active);
00078 void disassociate(const RepoId& peerId);
00079 void stop_associating();
00080 void stop_associating(const GUID_t* repos, CORBA::ULong length);
00081 void send_final_acks();
00082
00083
00084 void register_for_reader(const RepoId& participant,
00085 const RepoId& writerid,
00086 const RepoId& readerid,
00087 const TransportLocatorSeq& locators,
00088 OpenDDS::DCPS::DiscoveryListener* listener);
00089
00090 void unregister_for_reader(const RepoId& participant,
00091 const RepoId& writerid,
00092 const RepoId& readerid);
00093
00094 void register_for_writer(const RepoId& participant,
00095 const RepoId& readerid,
00096 const RepoId& writerid,
00097 const TransportLocatorSeq& locators,
00098 DiscoveryListener* listener);
00099
00100 void unregister_for_writer(const RepoId& participant,
00101 const RepoId& readerid,
00102 const RepoId& writerid);
00103
00104
00105
00106 bool send_response(const RepoId& peer,
00107 const DataSampleHeader& header,
00108 Message_Block_Ptr payload);
00109
00110 void send(SendStateDataSampleList send_list, ACE_UINT64 transaction_id = 0);
00111
00112 SendControlStatus send_w_control(SendStateDataSampleList send_list,
00113 const DataSampleHeader& header,
00114 Message_Block_Ptr msg,
00115 const RepoId& destination);
00116
00117 SendControlStatus send_control(const DataSampleHeader& header,
00118 Message_Block_Ptr msg);
00119
00120 SendControlStatus send_control_to(const DataSampleHeader& header,
00121 Message_Block_Ptr msg,
00122 const RepoId& destination);
00123
00124 bool remove_sample(const DataSampleElement* sample);
00125 bool remove_all_msgs();
00126
00127 virtual void add_link(const DataLink_rch& link, const RepoId& peer);
00128
00129 private:
00130
00131
00132 virtual bool check_transport_qos(const TransportInst& inst) = 0;
00133 virtual const RepoId& get_repo_id() const = 0;
00134 virtual DDS::DomainId_t domain_id() const = 0;
00135 virtual Priority get_priority_value(const AssociationData& data) const = 0;
00136 virtual void transport_assoc_done(int , const RepoId& ) {}
00137
00138 #if defined(OPENDDS_SECURITY)
00139 virtual DDS::Security::ParticipantCryptoHandle get_crypto_handle() const
00140 {
00141 return DDS::HANDLE_NIL;
00142 }
00143 #endif
00144
00145
00146 typedef ACE_Guard<ACE_Thread_Mutex> Guard;
00147 void use_datalink_i(const RepoId& remote_id,
00148 const DataLink_rch& link,
00149 Guard& guard);
00150 TransportSendListener_rch get_send_listener();
00151 TransportReceiveListener_rch get_receive_listener();
00152
00153
00154
00155
00156 bool initiate_connect_i(TransportImpl::AcceptConnectResult& result,
00157 TransportImpl* impl,
00158 const TransportImpl::RemoteTransport& remote,
00159 const TransportImpl::ConnectionAttribs& attribs_,
00160 Guard& guard);
00161
00162 void send_i(SendStateDataSampleList send_list, ACE_UINT64 transaction_id);
00163
00164
00165
00166 friend class ::DDS_TEST;
00167
00168 typedef OPENDDS_MAP_CMP(RepoId, DataLink_rch, GUID_tKeyLessThan) DataLinkIndex;
00169 typedef OPENDDS_VECTOR(TransportImpl*) ImplsType;
00170
00171 struct PendingAssoc : RcEventHandler {
00172 bool active_, removed_;
00173 ImplsType impls_;
00174 CORBA::ULong blob_index_;
00175 AssociationData data_;
00176 TransportImpl::ConnectionAttribs attribs_;
00177
00178 PendingAssoc()
00179 : active_(false)
00180 , removed_(false)
00181 , blob_index_(0)
00182 {}
00183
00184 bool initiate_connect(TransportClient* tc, Guard& guard);
00185 int handle_timeout(const ACE_Time_Value& time, const void* arg);
00186 };
00187
00188 typedef RcHandle<PendingAssoc> PendingAssoc_rch;
00189
00190 typedef OPENDDS_MAP_CMP(RepoId, PendingAssoc_rch, GUID_tKeyLessThan) PendingMap;
00191
00192 class PendingAssocTimer : public ReactorInterceptor {
00193 public:
00194 PendingAssocTimer(ACE_Reactor* reactor,
00195 ACE_thread_t owner)
00196 : ReactorInterceptor(reactor, owner)
00197 { }
00198
00199 void schedule_timer(TransportClient* transport_client, const PendingAssoc_rch& pend)
00200 {
00201 ScheduleCommand c(this, transport_client, pend);
00202 execute_or_enqueue(c);
00203 }
00204
00205 void cancel_timer(TransportClient* transport_client, const PendingAssoc_rch& pend)
00206 {
00207 CancelCommand c(this, transport_client, pend);
00208 execute_or_enqueue(c);
00209 }
00210
00211 virtual bool reactor_is_shut_down() const
00212 {
00213 return TheServiceParticipant->is_shut_down();
00214 }
00215
00216 private:
00217 ~PendingAssocTimer()
00218 { }
00219
00220 class CommandBase : public Command {
00221 public:
00222 CommandBase(PendingAssocTimer* timer,
00223 TransportClient* transport_client,
00224 const PendingAssoc_rch& assoc)
00225 : timer_ (timer)
00226 , transport_client_ (transport_client)
00227 , assoc_ (assoc)
00228 { }
00229 protected:
00230 PendingAssocTimer* timer_;
00231 TransportClient* transport_client_;
00232 PendingAssoc_rch assoc_;
00233 };
00234 struct ScheduleCommand : public CommandBase {
00235 ScheduleCommand(PendingAssocTimer* timer,
00236 TransportClient* transport_client,
00237 const PendingAssoc_rch& assoc)
00238 : CommandBase (timer, transport_client, assoc)
00239 { }
00240 virtual void execute()
00241 {
00242 if (timer_->reactor()) {
00243 timer_->reactor()->schedule_timer(assoc_.in(),
00244 transport_client_,
00245 transport_client_->passive_connect_duration_);
00246 }
00247 }
00248 };
00249 struct CancelCommand : public CommandBase {
00250 CancelCommand(PendingAssocTimer* timer,
00251 TransportClient* transport_client,
00252 const PendingAssoc_rch& assoc)
00253 : CommandBase (timer, transport_client, assoc)
00254 { }
00255 virtual void execute()
00256 {
00257 if (timer_->reactor()) {
00258 timer_->reactor()->cancel_timer(assoc_.in());
00259 }
00260 }
00261 };
00262 };
00263 RcHandle<PendingAssocTimer> pending_assoc_timer_;
00264
00265
00266
00267 ImplsType impls_;
00268 PendingMap pending_;
00269 DataLinkSet links_;
00270
00271
00272
00273 DataLinkSet send_links_;
00274
00275 DataLinkIndex data_link_index_;
00276
00277
00278
00279
00280 ACE_Thread_Mutex send_transaction_lock_;
00281 ACE_UINT64 expected_transaction_id_;
00282 ACE_UINT64 max_transaction_id_seen_;
00283
00284
00285
00286
00287
00288
00289 DataSampleElement* max_transaction_tail_;
00290
00291
00292
00293 bool swap_bytes_, cdr_encapsulation_, reliable_, durable_;
00294
00295 ACE_Time_Value passive_connect_duration_;
00296
00297 TransportLocatorSeq conn_info_;
00298
00299
00300 ACE_Thread_Mutex lock_;
00301
00302 typedef ACE_Reverse_Lock<ACE_Thread_Mutex> Reverse_Lock_t;
00303 Reverse_Lock_t reverse_lock_;
00304
00305 RepoId repo_id_;
00306 };
00307
00308 typedef RcHandle<TransportClient> TransportClient_rch;
00309
00310 }
00311 }
00312
00313 OPENDDS_END_VERSIONED_NAMESPACE_DECL
00314
00315 #endif