00001
00002
00003
00004
00005
00006
00007
00008 #ifndef OPENDDS_DCPS_TRANSPORT_CLIENT_H
00009 #define OPENDDS_DCPS_TRANSPORT_CLIENT_H
00010
00011 #include "dds/DCPS/dcps_export.h"
00012 #include "TransportConfig_rch.h"
00013 #include "TransportImpl.h"
00014 #include "DataLinkSet.h"
00015
00016 #include "dds/DCPS/AssociationData.h"
00017 #include "dds/DCPS/ReactorInterceptor.h"
00018 #include "dds/DCPS/Service_Participant.h"
00019 #include "dds/DCPS/PoolAllocator.h"
00020 #include "dds/DCPS/PoolAllocationBase.h"
00021 #include "dds/DCPS/DiscoveryListener.h"
00022
00023 #include "ace/Time_Value.h"
00024 #include "ace/Event_Handler.h"
00025 #include "ace/Reverse_Lock_T.h"
00026
00027
00028 class DDS_TEST;
00029
00030 namespace OpenDDS {
00031 namespace DCPS {
00032
00033 class EntityImpl;
00034 class TransportInst;
00035 class AssocationInfo;
00036 class ReaderIdSeq;
00037 class WriterIdSeq;
00038 class SendStateDataSampleList;
00039 class SendStateDataSampleListIterator;
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049 class OpenDDS_Dcps_Export TransportClient {
00050 public:
00051
00052 void use_datalink(const RepoId& remote_id, const DataLink_rch& link);
00053
00054
00055 enum { ASSOC_OK = 1, ASSOC_ACTIVE = 2 };
00056
00057 protected:
00058 TransportClient();
00059 virtual ~TransportClient();
00060
00061
00062
00063
00064 void enable_transport(bool reliable, bool durable);
00065 void enable_transport_using_config(bool reliable, bool durable,
00066 const TransportConfig_rch& tc);
00067
00068 bool swap_bytes() const { return swap_bytes_; }
00069 bool cdr_encapsulation() const { return cdr_encapsulation_; }
00070 const TransportLocatorSeq& connection_info() const { return conn_info_; }
00071
00072
00073
00074 bool associate(const AssociationData& peer, bool active);
00075 void disassociate(const RepoId& peerId);
00076 void stop_associating();
00077 void stop_associating(const GUID_t* repos, CORBA::ULong length);
00078 void send_final_acks();
00079
00080
00081 void register_for_reader(const RepoId& participant,
00082 const RepoId& writerid,
00083 const RepoId& readerid,
00084 const TransportLocatorSeq& locators,
00085 OpenDDS::DCPS::DiscoveryListener* listener);
00086
00087 void unregister_for_reader(const RepoId& participant,
00088 const RepoId& writerid,
00089 const RepoId& readerid);
00090
00091 void register_for_writer(const RepoId& participant,
00092 const RepoId& readerid,
00093 const RepoId& writerid,
00094 const TransportLocatorSeq& locators,
00095 DiscoveryListener* listener);
00096
00097 void unregister_for_writer(const RepoId& participant,
00098 const RepoId& readerid,
00099 const RepoId& writerid);
00100
00101
00102
00103 bool send_response(const RepoId& peer,
00104 const DataSampleHeader& header,
00105 ACE_Message_Block* payload);
00106
00107 void send(SendStateDataSampleList send_list, ACE_UINT64 transaction_id = 0);
00108
00109 SendControlStatus send_w_control(SendStateDataSampleList send_list,
00110 const DataSampleHeader& header,
00111 ACE_Message_Block* msg,
00112 const RepoId& destination);
00113
00114 SendControlStatus send_control(const DataSampleHeader& header,
00115 ACE_Message_Block* msg);
00116
00117 SendControlStatus send_control_to(const DataSampleHeader& header,
00118 ACE_Message_Block* msg,
00119 const RepoId& destination);
00120
00121 bool remove_sample(const DataSampleElement* sample);
00122 bool remove_all_msgs();
00123
00124 virtual void add_link(const DataLink_rch& link, const RepoId& peer);
00125
00126 void on_notification_of_connection_deletion(const RepoId& peerId);
00127
00128 private:
00129
00130
00131 virtual bool check_transport_qos(const TransportInst& inst) = 0;
00132 virtual const RepoId& get_repo_id() const = 0;
00133 virtual DDS::DomainId_t domain_id() const = 0;
00134 virtual Priority get_priority_value(const AssociationData& data) const = 0;
00135 virtual void transport_assoc_done(int , const RepoId& ) {}
00136
00137
00138 friend class TransportImpl;
00139 void transport_detached(TransportImpl* which);
00140
00141
00142 typedef ACE_Guard<ACE_Thread_Mutex> Guard;
00143 void use_datalink_i(const RepoId& remote_id,
00144 const DataLink_rch& link,
00145 Guard& guard);
00146 TransportSendListener* get_send_listener();
00147 TransportReceiveListener* get_receive_listener();
00148
00149
00150
00151
00152 bool initiate_connect_i(TransportImpl::AcceptConnectResult& result,
00153 const TransportImpl_rch impl,
00154 const TransportImpl::RemoteTransport& remote,
00155 const TransportImpl::ConnectionAttribs& attribs_,
00156 Guard& guard);
00157
00158 void send_i(SendStateDataSampleList send_list, ACE_UINT64 transaction_id);
00159
00160
00161
00162 friend class ::DDS_TEST;
00163
00164 typedef OPENDDS_MAP_CMP(RepoId, DataLink_rch, GUID_tKeyLessThan) DataLinkIndex;
00165 typedef OPENDDS_VECTOR(TransportImpl_rch) ImplsType;
00166
00167 struct PendingAssoc : ACE_Event_Handler, public PoolAllocationBase {
00168 bool active_, removed_;
00169 ImplsType impls_;
00170 CORBA::ULong blob_index_;
00171 AssociationData data_;
00172 TransportImpl::ConnectionAttribs attribs_;
00173
00174 PendingAssoc()
00175 : active_(false), removed_(false), blob_index_(0)
00176 {}
00177
00178 bool initiate_connect(TransportClient* tc, Guard& guard);
00179 int handle_timeout(const ACE_Time_Value& time, const void* arg);
00180 };
00181
00182 typedef OPENDDS_MAP_CMP(RepoId, PendingAssoc*, GUID_tKeyLessThan) PendingMap;
00183
00184 class PendingAssocTimer : public ReactorInterceptor {
00185 public:
00186 PendingAssocTimer(ACE_Reactor* reactor,
00187 ACE_thread_t owner)
00188 : ReactorInterceptor(reactor, owner)
00189 { }
00190
00191 void schedule_timer(TransportClient* transport_client, PendingAssoc* pend)
00192 {
00193 ScheduleCommand c(this, transport_client, pend);
00194 execute_or_enqueue(c);
00195 }
00196
00197 void cancel_timer(TransportClient* transport_client, PendingAssoc* pend)
00198 {
00199 CancelCommand c(this, transport_client, pend);
00200 execute_or_enqueue(c);
00201 }
00202
00203 void delete_pending_assoc(PendingAssoc* pend)
00204 {
00205 DeleteCommand c(pend);
00206
00207 enqueue(c);
00208 }
00209
00210 virtual bool reactor_is_shut_down() const
00211 {
00212 return TheServiceParticipant->is_shut_down();
00213 }
00214
00215 private:
00216 ~PendingAssocTimer()
00217 { }
00218
00219 class CommandBase : public Command {
00220 public:
00221 CommandBase(PendingAssocTimer* timer,
00222 TransportClient* transport_client,
00223 PendingAssoc* assoc)
00224 : timer_ (timer)
00225 , transport_client_ (transport_client)
00226 , assoc_ (assoc)
00227 { }
00228 protected:
00229 PendingAssocTimer* timer_;
00230 TransportClient* transport_client_;
00231 PendingAssoc* assoc_;
00232 };
00233 struct ScheduleCommand : public CommandBase {
00234 ScheduleCommand(PendingAssocTimer* timer,
00235 TransportClient* transport_client,
00236 PendingAssoc* assoc)
00237 : CommandBase (timer, transport_client, assoc)
00238 { }
00239 virtual void execute()
00240 {
00241 if (timer_->reactor()) {
00242 timer_->reactor()->schedule_timer(assoc_, transport_client_, transport_client_->passive_connect_duration_);
00243 }
00244 }
00245 };
00246 struct CancelCommand : public CommandBase {
00247 CancelCommand(PendingAssocTimer* timer,
00248 TransportClient* transport_client,
00249 PendingAssoc* assoc)
00250 : CommandBase (timer, transport_client, assoc)
00251 { }
00252 virtual void execute()
00253 {
00254 if (timer_->reactor()) {
00255 timer_->reactor()->cancel_timer(assoc_);
00256 }
00257 }
00258 };
00259 struct DeleteCommand : public CommandBase {
00260 DeleteCommand(PendingAssoc* assoc)
00261 : CommandBase (0, 0, assoc)
00262 { }
00263 virtual void execute()
00264 {
00265 delete assoc_;
00266 }
00267 };
00268 };
00269 PendingAssocTimer* pending_assoc_timer_;
00270
00271
00272
00273 ImplsType impls_;
00274 PendingMap pending_;
00275 DataLinkSet links_;
00276 DataLinkIndex links_waiting_for_on_deleted_callback_;
00277
00278
00279
00280 DataLinkSet send_links_;
00281
00282 DataLinkIndex data_link_index_;
00283
00284
00285
00286
00287 ACE_Thread_Mutex send_transaction_lock_;
00288 ACE_UINT64 expected_transaction_id_;
00289 ACE_UINT64 max_transaction_id_seen_;
00290
00291
00292
00293
00294
00295
00296 DataSampleElement* max_transaction_tail_;
00297
00298
00299
00300 bool swap_bytes_, cdr_encapsulation_, reliable_, durable_;
00301
00302 ACE_Time_Value passive_connect_duration_;
00303
00304 TransportLocatorSeq conn_info_;
00305
00306
00307 ACE_Thread_Mutex lock_;
00308
00309 typedef ACE_Reverse_Lock<ACE_Thread_Mutex> Reverse_Lock_t;
00310 Reverse_Lock_t reverse_lock_;
00311
00312 RepoId repo_id_;
00313 };
00314
00315 }
00316 }
00317
00318 #endif