00001
00002
00003
00004
00005
00006
00007
00008 #include "RtpsUdpTransport.h"
00009 #include "RtpsUdpInst.h"
00010 #include "RtpsUdpInst_rch.h"
00011 #include "RtpsUdpSendStrategy.h"
00012 #include "RtpsUdpReceiveStrategy.h"
00013
00014 #include "dds/DCPS/AssociationData.h"
00015
00016 #include "dds/DCPS/transport/framework/TransportClient.h"
00017 #include "dds/DCPS/transport/framework/TransportExceptions.h"
00018
00019 #include "dds/DCPS/RTPS/BaseMessageUtils.h"
00020 #include "dds/DCPS/RTPS/RtpsCoreTypeSupportImpl.h"
00021
00022 #include "ace/CDR_Base.h"
00023 #include "ace/Log_Msg.h"
00024 #include "ace/Sock_Connect.h"
00025
00026 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00027
00028 namespace OpenDDS {
00029 namespace DCPS {
00030
00031 RtpsUdpTransport::RtpsUdpTransport(RtpsUdpInst& inst)
00032 : TransportImpl(inst)
00033 #if defined(OPENDDS_SECURITY)
00034 , local_crypto_handle_(DDS::HANDLE_NIL)
00035 #endif
00036 {
00037 if (! (configure_i(inst) && open())) {
00038 throw Transport::UnableToCreate();
00039 }
00040 }
00041
00042 RtpsUdpInst&
00043 RtpsUdpTransport::config() const
00044 {
00045 return static_cast<RtpsUdpInst&>(TransportImpl::config());
00046 }
00047
00048 RtpsUdpDataLink_rch
00049 RtpsUdpTransport::make_datalink(const GuidPrefix_t& local_prefix)
00050 {
00051
00052 RtpsUdpDataLink_rch link = make_rch<RtpsUdpDataLink>(ref(*this), local_prefix, config(), reactor_task());
00053
00054 #if defined(OPENDDS_SECURITY)
00055 link->local_crypto_handle(local_crypto_handle_);
00056 #endif
00057
00058 if (!link->open(unicast_socket_)) {
00059 ACE_ERROR((LM_ERROR,
00060 ACE_TEXT("(%P|%t) ERROR: ")
00061 ACE_TEXT("RtpsUdpTransport::make_datalink: ")
00062 ACE_TEXT("failed to open DataLink for socket %d\n"),
00063 unicast_socket_.get_handle()));
00064 return RtpsUdpDataLink_rch();
00065 }
00066
00067
00068 unicast_socket_.set_handle(ACE_INVALID_HANDLE);
00069
00070 return link;
00071 }
00072
00073 TransportImpl::AcceptConnectResult
00074 RtpsUdpTransport::connect_datalink(const RemoteTransport& remote,
00075 const ConnectionAttribs& attribs,
00076 const TransportClient_rch& client )
00077 {
00078 GuardThreadType guard_links(this->links_lock_);
00079
00080 if (link_.is_nil()) {
00081 link_ = make_datalink(attribs.local_id_.guidPrefix);
00082 if (link_.is_nil()) {
00083 return AcceptConnectResult();
00084 }
00085 }
00086
00087 RtpsUdpDataLink_rch link = link_;
00088
00089
00090 use_datalink(attribs.local_id_, remote.repo_id_, remote.blob_,
00091 attribs.local_reliable_, remote.reliable_,
00092 attribs.local_durable_, remote.durable_);
00093
00094 if (0 == std::memcmp(attribs.local_id_.guidPrefix, remote.repo_id_.guidPrefix,
00095 sizeof(GuidPrefix_t))) {
00096 return AcceptConnectResult(link);
00097 }
00098
00099 if (link->check_handshake_complete(attribs.local_id_, remote.repo_id_)){
00100 return AcceptConnectResult(link);
00101 }
00102
00103 if (!link->add_on_start_callback(client, remote.repo_id_)) {
00104
00105 VDBG_LVL((LM_DEBUG, "(%P|%t) RtpsUdpTransport::connect_datalink got link.\n"), 2);
00106 return AcceptConnectResult(link);
00107 }
00108
00109 GuardType guard(connections_lock_);
00110 add_pending_connection(client, link);
00111 VDBG_LVL((LM_DEBUG, "(%P|%t) RtpsUdpTransport::connect_datalink pending.\n"), 2);
00112 return AcceptConnectResult(AcceptConnectResult::ACR_SUCCESS);
00113 }
00114
00115 TransportImpl::AcceptConnectResult
00116 RtpsUdpTransport::accept_datalink(const RemoteTransport& remote,
00117 const ConnectionAttribs& attribs,
00118 const TransportClient_rch& )
00119 {
00120 GuardThreadType guard_links(this->links_lock_);
00121 if (link_.is_nil()) {
00122 link_= make_datalink(attribs.local_id_.guidPrefix);
00123 if (link_.is_nil()) {
00124 return AcceptConnectResult();
00125 }
00126 }
00127 RtpsUdpDataLink_rch link = link_;
00128
00129 use_datalink(attribs.local_id_, remote.repo_id_, remote.blob_,
00130 attribs.local_reliable_, remote.reliable_,
00131 attribs.local_durable_, remote.durable_);
00132 return AcceptConnectResult(link);
00133 }
00134
00135
00136 void
00137 RtpsUdpTransport::stop_accepting_or_connecting(const TransportClient_wrch& client,
00138 const RepoId& remote_id)
00139 {
00140 GuardType guard(connections_lock_);
00141 typedef PendConnMap::iterator iter_t;
00142 const std::pair<iter_t, iter_t> range =
00143 pending_connections_.equal_range(client);
00144 for (iter_t iter = range.first; iter != range.second; ++iter) {
00145 iter->second->remove_on_start_callback(client, remote_id);
00146 }
00147 pending_connections_.erase(range.first, range.second);
00148 }
00149
00150 void
00151 RtpsUdpTransport::use_datalink(const RepoId& local_id,
00152 const RepoId& remote_id,
00153 const TransportBLOB& remote_data,
00154 bool local_reliable, bool remote_reliable,
00155 bool local_durable, bool remote_durable)
00156 {
00157 bool requires_inline_qos;
00158 unsigned int blob_bytes_read;
00159 ACE_INET_Addr addr = get_connection_addr(remote_data, &requires_inline_qos,
00160 &blob_bytes_read);
00161 link_->add_locator(remote_id, addr, requires_inline_qos);
00162
00163 #if defined(OPENDDS_SECURITY)
00164 if (remote_data.length() > blob_bytes_read) {
00165 link_->populate_security_handles(local_id, remote_id,
00166 remote_data.get_buffer() + blob_bytes_read,
00167 remote_data.length() - blob_bytes_read);
00168 }
00169 #endif
00170
00171 link_->associated(local_id, remote_id, local_reliable, remote_reliable,
00172 local_durable, remote_durable);
00173 }
00174
00175 ACE_INET_Addr
00176 RtpsUdpTransport::get_connection_addr(const TransportBLOB& remote,
00177 bool* requires_inline_qos,
00178 unsigned int* blob_bytes_read) const
00179 {
00180 using namespace OpenDDS::RTPS;
00181 LocatorSeq locators;
00182 DDS::ReturnCode_t result =
00183 blob_to_locators(remote, locators, requires_inline_qos, blob_bytes_read);
00184 if (result != DDS::RETCODE_OK) {
00185 return ACE_INET_Addr();
00186 }
00187
00188 for (CORBA::ULong i = 0; i < locators.length(); ++i) {
00189 ACE_INET_Addr addr;
00190
00191 if (locator_to_address(addr, locators[i], map_ipv4_to_ipv6()) == 0) {
00192
00193 if (!addr.is_multicast() || config().use_multicast_) {
00194 return addr;
00195 }
00196 }
00197 }
00198
00199
00200 return ACE_INET_Addr();
00201 }
00202
00203 bool
00204 RtpsUdpTransport::connection_info_i(TransportLocator& info) const
00205 {
00206 this->config().populate_locator(info);
00207 return true;
00208 }
00209
00210 void
00211 RtpsUdpTransport::register_for_reader(const RepoId& participant,
00212 const RepoId& writerid,
00213 const RepoId& readerid,
00214 const TransportLocatorSeq& locators,
00215 OpenDDS::DCPS::DiscoveryListener* listener)
00216 {
00217 const TransportBLOB* blob = this->config().get_blob(locators);
00218 if (!blob) {
00219 return;
00220 }
00221
00222 if (!link_) {
00223 link_ = make_datalink(participant.guidPrefix);
00224 }
00225
00226 link_->register_for_reader(writerid, readerid, get_connection_addr(*blob),
00227 listener);
00228 }
00229
00230 void
00231 RtpsUdpTransport::unregister_for_reader(const RepoId& ,
00232 const RepoId& writerid,
00233 const RepoId& readerid)
00234 {
00235 if (link_) {
00236 link_->unregister_for_reader(writerid, readerid);
00237 }
00238 }
00239
00240 void
00241 RtpsUdpTransport::register_for_writer(const RepoId& participant,
00242 const RepoId& readerid,
00243 const RepoId& writerid,
00244 const TransportLocatorSeq& locators,
00245 DiscoveryListener* listener)
00246 {
00247 const TransportBLOB* blob = this->config().get_blob(locators);
00248 if (!blob) {
00249 return;
00250 }
00251
00252 if (!link_) {
00253 link_ = make_datalink(participant.guidPrefix);
00254 }
00255
00256 link_->register_for_writer(readerid, writerid, get_connection_addr(*blob),
00257 listener);
00258 }
00259
00260 void
00261 RtpsUdpTransport::unregister_for_writer(const RepoId& ,
00262 const RepoId& readerid,
00263 const RepoId& writerid)
00264 {
00265 if (link_) {
00266 link_->unregister_for_writer(readerid, writerid);
00267 }
00268 }
00269
00270 bool
00271 RtpsUdpTransport::configure_i(RtpsUdpInst& config)
00272 {
00273
00274 if (config.local_address() == ACE_INET_Addr () &&
00275 !TheServiceParticipant->default_address ().empty ()) {
00276 config.local_address(0, TheServiceParticipant->default_address ().c_str ());
00277 }
00278
00279
00280
00281
00282
00283
00284
00285 if (!open_appropriate_socket_type(unicast_socket_, config.local_address())) {
00286 ACE_ERROR_RETURN((LM_ERROR,
00287 ACE_TEXT("(%P|%t) ERROR: ")
00288 ACE_TEXT("RtpsUdpTransport::configure_i: open_appropriate_socket_type:")
00289 ACE_TEXT("%m\n")),
00290 false);
00291 }
00292
00293 if (config.local_address().get_port_number() == 0) {
00294
00295 ACE_INET_Addr address;
00296 if (unicast_socket_.get_local_addr(address) != 0) {
00297 ACE_ERROR_RETURN((LM_ERROR,
00298 ACE_TEXT("(%P|%t) ERROR: RtpsUdpDataLink::configure_i - %p\n"),
00299 ACE_TEXT("cannot get local addr")), false);
00300 }
00301 config.local_address_set_port(address.get_port_number());
00302 }
00303
00304 create_reactor_task();
00305
00306 if (config.opendds_discovery_default_listener_) {
00307 link_= make_datalink(config.opendds_discovery_guid_.guidPrefix);
00308 link_->default_listener(*config.opendds_discovery_default_listener_);
00309 }
00310
00311 return true;
00312 }
00313
00314 void
00315 RtpsUdpTransport::shutdown_i()
00316 {
00317 if (!link_.is_nil()) {
00318 link_->transport_shutdown();
00319 }
00320 link_.reset();
00321 }
00322
00323 void
00324 RtpsUdpTransport::release_datalink(DataLink* )
00325 {
00326
00327 }
00328
00329
00330
00331 bool
00332 RtpsUdpTransport::map_ipv4_to_ipv6() const
00333 {
00334 bool map = false;
00335 ACE_INET_Addr tmp;
00336 link_->unicast_socket().get_local_addr(tmp);
00337 if (tmp.get_type() != AF_INET) {
00338 map = true;
00339 }
00340 return map;
00341 }
00342 }
00343 }
00344
00345 OPENDDS_END_VERSIONED_NAMESPACE_DECL