00001
00002
00003
00004
00005
00006
00007
00008 #include "RtpsUdpTransport.h"
00009 #include "RtpsUdpInst.h"
00010 #include "RtpsUdpInst_rch.h"
00011 #include "RtpsUdpSendStrategy.h"
00012 #include "RtpsUdpReceiveStrategy.h"
00013
00014 #include "dds/DCPS/AssociationData.h"
00015
00016 #include "dds/DCPS/transport/framework/TransportClient.h"
00017 #include "dds/DCPS/transport/framework/TransportExceptions.h"
00018
00019 #include "dds/DCPS/RTPS/BaseMessageUtils.h"
00020 #include "dds/DCPS/RTPS/RtpsCoreTypeSupportImpl.h"
00021
00022 #include "ace/CDR_Base.h"
00023 #include "ace/Log_Msg.h"
00024 #include "ace/Sock_Connect.h"
00025
00026 namespace OpenDDS {
00027 namespace DCPS {
00028
00029 RtpsUdpTransport::RtpsUdpTransport(const TransportInst_rch& inst)
00030 : default_listener_(0)
00031 {
00032 if (!inst.is_nil()) {
00033 if (!configure(inst.in())) {
00034 throw Transport::UnableToCreate();
00035 }
00036 }
00037 }
00038
00039 RtpsUdpDataLink*
00040 RtpsUdpTransport::make_datalink(const GuidPrefix_t& local_prefix)
00041 {
00042 TransportReactorTask_rch rt = reactor_task();
00043 ACE_NEW_RETURN(link_,
00044 RtpsUdpDataLink(this, local_prefix, config_i_.in(), rt.in()),
00045 0);
00046
00047 RtpsUdpSendStrategy* send_strategy;
00048 ACE_NEW_RETURN(send_strategy, RtpsUdpSendStrategy(link_.in()), 0);
00049 link_->send_strategy(send_strategy);
00050
00051 RtpsUdpReceiveStrategy* recv_strategy;
00052 ACE_NEW_RETURN(recv_strategy, RtpsUdpReceiveStrategy(link_.in()), 0);
00053 link_->receive_strategy(recv_strategy);
00054
00055 if (!link_->open(unicast_socket_)) {
00056 ACE_ERROR_RETURN((LM_ERROR,
00057 ACE_TEXT("(%P|%t) ERROR: ")
00058 ACE_TEXT("RtpsUdpTransport::make_datalink: ")
00059 ACE_TEXT("failed to open DataLink for socket %d\n"),
00060 unicast_socket_.get_handle()),
00061 0);
00062 }
00063
00064
00065 unicast_socket_.set_handle(ACE_INVALID_HANDLE);
00066
00067 return RtpsUdpDataLink_rch(link_)._retn();
00068 }
00069
00070 TransportImpl::AcceptConnectResult
00071 RtpsUdpTransport::connect_datalink(const RemoteTransport& remote,
00072 const ConnectionAttribs& attribs,
00073 TransportClient* client )
00074 {
00075 GuardThreadType guard_links(this->links_lock_);
00076 RtpsUdpDataLink_rch link = link_;
00077 if (link_.is_nil()) {
00078 link = make_datalink(attribs.local_id_.guidPrefix);
00079 if (link.is_nil()) {
00080 return AcceptConnectResult();
00081 }
00082 }
00083
00084 use_datalink(attribs.local_id_, remote.repo_id_, remote.blob_,
00085 attribs.local_reliable_, remote.reliable_,
00086 attribs.local_durable_, remote.durable_);
00087
00088 if (0 == std::memcmp(attribs.local_id_.guidPrefix, remote.repo_id_.guidPrefix,
00089 sizeof(GuidPrefix_t))) {
00090 return AcceptConnectResult(link._retn());
00091 }
00092
00093 if (link->check_handshake_complete(attribs.local_id_, remote.repo_id_)){
00094 return AcceptConnectResult(link._retn());
00095 }
00096
00097 if (!link->add_on_start_callback(client, remote.repo_id_)) {
00098
00099 VDBG_LVL((LM_DEBUG, "(%P|%t) RtpsUdpTransport::connect_datalink got link.\n"), 2);
00100 return AcceptConnectResult(link._retn());
00101 }
00102
00103 GuardType guard(connections_lock_);
00104 add_pending_connection(client, link.in());
00105 VDBG_LVL((LM_DEBUG, "(%P|%t) RtpsUdpTransport::connect_datalink pending.\n"), 2);
00106 return AcceptConnectResult(AcceptConnectResult::ACR_SUCCESS);
00107 }
00108
00109 TransportImpl::AcceptConnectResult
00110 RtpsUdpTransport::accept_datalink(const RemoteTransport& remote,
00111 const ConnectionAttribs& attribs,
00112 TransportClient* )
00113 {
00114 GuardThreadType guard_links(this->links_lock_);
00115 RtpsUdpDataLink_rch link = link_;
00116 if (link_.is_nil()) {
00117 link = make_datalink(attribs.local_id_.guidPrefix);
00118 if (link.is_nil()) {
00119 return AcceptConnectResult();
00120 }
00121 }
00122 use_datalink(attribs.local_id_, remote.repo_id_, remote.blob_,
00123 attribs.local_reliable_, remote.reliable_,
00124 attribs.local_durable_, remote.durable_);
00125 return AcceptConnectResult(link._retn());
00126 }
00127
00128
00129 void
00130 RtpsUdpTransport::stop_accepting_or_connecting(TransportClient* client,
00131 const RepoId& remote_id)
00132 {
00133 GuardType guard(connections_lock_);
00134 typedef OPENDDS_MULTIMAP(TransportClient*, DataLink_rch)::iterator iter_t;
00135 const std::pair<iter_t, iter_t> range =
00136 pending_connections_.equal_range(client);
00137 for (iter_t iter = range.first; iter != range.second; ++iter) {
00138 iter->second->remove_on_start_callback(client, remote_id);
00139 }
00140 pending_connections_.erase(range.first, range.second);
00141 }
00142
00143 void
00144 RtpsUdpTransport::use_datalink(const RepoId& local_id,
00145 const RepoId& remote_id,
00146 const TransportBLOB& remote_data,
00147 bool local_reliable, bool remote_reliable,
00148 bool local_durable, bool remote_durable)
00149 {
00150 bool requires_inline_qos;
00151 ACE_INET_Addr addr = get_connection_addr(remote_data, requires_inline_qos);
00152 link_->add_locator(remote_id, addr, requires_inline_qos);
00153 link_->associated(local_id, remote_id, local_reliable, remote_reliable,
00154 local_durable, remote_durable);
00155 }
00156
00157 ACE_INET_Addr
00158 RtpsUdpTransport::get_connection_addr(const TransportBLOB& remote,
00159 bool& requires_inline_qos) const
00160 {
00161 using namespace OpenDDS::RTPS;
00162 LocatorSeq locators;
00163 DDS::ReturnCode_t result =
00164 blob_to_locators(remote, locators, requires_inline_qos);
00165 if (result != DDS::RETCODE_OK) {
00166 return ACE_INET_Addr();
00167 }
00168
00169 for (CORBA::ULong i = 0; i < locators.length(); ++i) {
00170 ACE_INET_Addr addr;
00171
00172 if (locator_to_address(addr, locators[i], map_ipv4_to_ipv6()) == 0) {
00173
00174 if (!addr.is_multicast() || config_i_->use_multicast_) {
00175 return addr;
00176 }
00177 }
00178 }
00179
00180
00181 return ACE_INET_Addr();
00182 }
00183
00184 bool
00185 RtpsUdpTransport::connection_info_i(TransportLocator& info) const
00186 {
00187 this->config_i_->populate_locator(info);
00188 return true;
00189 }
00190
00191 void
00192 RtpsUdpTransport::register_for_reader(const RepoId& participant,
00193 const RepoId& writerid,
00194 const RepoId& readerid,
00195 const TransportLocatorSeq& locators,
00196 OpenDDS::DCPS::DiscoveryListener* listener)
00197 {
00198 const TransportBLOB* blob = this->config_i_->get_blob(locators);
00199 if (!blob)
00200 return;
00201 if (link_ == 0) {
00202 make_datalink(participant.guidPrefix);
00203 }
00204 bool requires_inline_qos;
00205 link_->register_for_reader(writerid, readerid, get_connection_addr(*blob, requires_inline_qos), listener);
00206 }
00207
00208 void
00209 RtpsUdpTransport::unregister_for_reader(const RepoId& ,
00210 const RepoId& writerid,
00211 const RepoId& readerid)
00212 {
00213 if (link_ != 0) {
00214 link_->unregister_for_reader(writerid, readerid);
00215 }
00216 }
00217
00218 void
00219 RtpsUdpTransport::register_for_writer(const RepoId& participant,
00220 const RepoId& readerid,
00221 const RepoId& writerid,
00222 const TransportLocatorSeq& locators,
00223 DiscoveryListener* listener)
00224 {
00225 const TransportBLOB* blob = this->config_i_->get_blob(locators);
00226 if (!blob)
00227 return;
00228 if (link_ == 0) {
00229 make_datalink(participant.guidPrefix);
00230 }
00231 bool requires_inline_qos;
00232 link_->register_for_writer(readerid, writerid, get_connection_addr(*blob, requires_inline_qos), listener);
00233 }
00234
00235 void
00236 RtpsUdpTransport::unregister_for_writer(const RepoId& ,
00237 const RepoId& readerid,
00238 const RepoId& writerid)
00239 {
00240 if (link_ != 0) {
00241 link_->unregister_for_writer(readerid, writerid);
00242 }
00243 }
00244
00245 bool
00246 RtpsUdpTransport::configure_i(TransportInst* config)
00247 {
00248 config_i_ = RtpsUdpInst_rch(dynamic_cast<RtpsUdpInst*>(config), false);
00249
00250 if (config_i_.is_nil()) {
00251 ACE_ERROR_RETURN((LM_ERROR,
00252 ACE_TEXT("(%P|%t) ERROR: ")
00253 ACE_TEXT("RtpsUdpTransport::configure_i: ")
00254 ACE_TEXT("invalid configuration!\n")),
00255 false);
00256 }
00257
00258
00259 if (this->config_i_->local_address() == ACE_INET_Addr () &&
00260 !TheServiceParticipant->default_address ().empty ()) {
00261 this->config_i_->local_address(0, TheServiceParticipant->default_address ().c_str ());
00262 }
00263
00264
00265
00266
00267
00268
00269
00270 if (!open_appropriate_socket_type(unicast_socket_, config_i_->local_address())) {
00271 ACE_ERROR_RETURN((LM_ERROR,
00272 ACE_TEXT("(%P|%t) ERROR: ")
00273 ACE_TEXT("RtpsUdpTransport::configure_i: open_appropriate_socket_type:")
00274 ACE_TEXT("%m\n")),
00275 false);
00276 }
00277
00278 if (config_i_->local_address().get_port_number() == 0) {
00279
00280 ACE_INET_Addr address;
00281 if (unicast_socket_.get_local_addr(address) != 0) {
00282 ACE_ERROR_RETURN((LM_ERROR,
00283 ACE_TEXT("(%P|%t) ERROR: RtpsUdpDataLink::configure_i - %p\n"),
00284 ACE_TEXT("cannot get local addr")), false);
00285 }
00286 config_i_->local_address_set_port(address.get_port_number());
00287 }
00288
00289 create_reactor_task();
00290
00291 if (config_i_->opendds_discovery_default_listener_) {
00292 RtpsUdpDataLink_rch link =
00293 make_datalink(config_i_->opendds_discovery_guid_.guidPrefix);
00294 link->default_listener(config_i_->opendds_discovery_default_listener_);
00295 default_listener_ =
00296 dynamic_cast<TransportClient*>(config_i_->opendds_discovery_default_listener_);
00297 }
00298
00299 return true;
00300 }
00301
00302 void
00303 RtpsUdpTransport::shutdown_i()
00304 {
00305 if (!link_.is_nil()) {
00306 link_->transport_shutdown();
00307 }
00308 link_ = 0;
00309 config_i_ = 0;
00310 }
00311
00312 void
00313 RtpsUdpTransport::release_datalink(DataLink* )
00314 {
00315
00316 }
00317
00318 void
00319 RtpsUdpTransport::pre_detach(TransportClient* c)
00320 {
00321 if (default_listener_ && !link_.is_nil() && c == default_listener_) {
00322 link_->default_listener(0);
00323 default_listener_ = 0;
00324 }
00325 }
00326
00327 bool
00328 RtpsUdpTransport::map_ipv4_to_ipv6() const
00329 {
00330 bool map = false;
00331 ACE_INET_Addr tmp;
00332 link_->unicast_socket().get_local_addr(tmp);
00333 if (tmp.get_type() != AF_INET) {
00334 map = true;
00335 }
00336 return map;
00337 }
00338 }
00339 }