#include <RtpsUdpTransport.h>
Inheritance diagram for OpenDDS::DCPS::RtpsUdpTransport:
Public Member Functions | |
RtpsUdpTransport (const TransportInst_rch &inst) | |
Private Types | |
typedef ACE_Thread_Mutex | ThreadLockType |
typedef ACE_Guard< ThreadLockType > | GuardThreadType |
typedef ACE_SYNCH_MUTEX | LockType |
typedef ACE_Guard< LockType > | GuardType |
Private Member Functions | |
virtual AcceptConnectResult | connect_datalink (const RemoteTransport &remote, const ConnectionAttribs &attribs, TransportClient *client) |
virtual AcceptConnectResult | accept_datalink (const RemoteTransport &remote, const ConnectionAttribs &attribs, TransportClient *client) |
virtual void | stop_accepting_or_connecting (TransportClient *client, const RepoId &remote_id) |
virtual bool | configure_i (TransportInst *config) |
virtual void | shutdown_i () |
virtual void | register_for_reader (const RepoId &participant, const RepoId &writerid, const RepoId &readerid, const TransportLocatorSeq &locators, OpenDDS::DCPS::DiscoveryListener *listener) |
virtual void | unregister_for_reader (const RepoId &participant, const RepoId &writerid, const RepoId &readerid) |
virtual void | register_for_writer (const RepoId &, const RepoId &, const RepoId &, const TransportLocatorSeq &, DiscoveryListener *) |
virtual void | unregister_for_writer (const RepoId &, const RepoId &, const RepoId &) |
virtual bool | connection_info_i (TransportLocator &info) const |
ACE_INET_Addr | get_connection_addr (const TransportBLOB &data, bool &requires_inline_qos) const |
virtual void | release_datalink (DataLink *link) |
void | pre_detach (TransportClient *client) |
virtual OPENDDS_STRING | transport_type () const |
RtpsUdpDataLink * | make_datalink (const GuidPrefix_t &local_prefix) |
void | use_datalink (const RepoId &local_id, const RepoId &remote_id, const TransportBLOB &remote_data, bool local_reliable, bool remote_reliable, bool local_durable, bool remote_durable) |
bool | map_ipv4_to_ipv6 () const |
Private Attributes | |
RcHandle< RtpsUdpInst > | config_i_ |
ThreadLockType | links_lock_ |
LockType | connections_lock_ |
RtpsUdpDataLink_rch | link_ |
ACE_SOCK_Dgram | unicast_socket_ |
TransportClient * | default_listener_ |
Definition at line 26 of file RtpsUdpTransport.h.
typedef ACE_Guard<ThreadLockType> OpenDDS::DCPS::RtpsUdpTransport::GuardThreadType [private] |
Definition at line 87 of file RtpsUdpTransport.h.
typedef ACE_Guard<LockType> OpenDDS::DCPS::RtpsUdpTransport::GuardType [private] |
typedef ACE_SYNCH_MUTEX OpenDDS::DCPS::RtpsUdpTransport::LockType [private] |
This protects the connections_ and the pending_connections_ data members.
Reimplemented from OpenDDS::DCPS::TransportImpl.
Definition at line 92 of file RtpsUdpTransport.h.
typedef ACE_Thread_Mutex OpenDDS::DCPS::RtpsUdpTransport::ThreadLockType [private] |
Definition at line 86 of file RtpsUdpTransport.h.
OpenDDS::DCPS::RtpsUdpTransport::RtpsUdpTransport | ( | const TransportInst_rch & | inst | ) | [explicit] |
Definition at line 29 of file RtpsUdpTransport.cpp.
References OpenDDS::DCPS::TransportImpl::configure(), OpenDDS::DCPS::RcHandle< T >::in(), and OpenDDS::DCPS::RcHandle< T >::is_nil().
00030 : default_listener_(0) 00031 { 00032 if (!inst.is_nil()) { 00033 if (!configure(inst.in())) { 00034 throw Transport::UnableToCreate(); 00035 } 00036 } 00037 }
TransportImpl::AcceptConnectResult OpenDDS::DCPS::RtpsUdpTransport::accept_datalink | ( | const RemoteTransport & | remote, | |
const ConnectionAttribs & | attribs, | |||
TransportClient * | client | |||
) | [private, virtual] |
Definition at line 110 of file RtpsUdpTransport.cpp.
References OpenDDS::DCPS::RcHandle< T >::_retn(), OpenDDS::DCPS::RcHandle< T >::is_nil(), link_, make_datalink(), and use_datalink().
00113 { 00114 GuardThreadType guard_links(this->links_lock_); 00115 RtpsUdpDataLink_rch link = link_; 00116 if (link_.is_nil()) { 00117 link = make_datalink(attribs.local_id_.guidPrefix); 00118 if (link.is_nil()) { 00119 return AcceptConnectResult(); 00120 } 00121 } 00122 use_datalink(attribs.local_id_, remote.repo_id_, remote.blob_, 00123 attribs.local_reliable_, remote.reliable_, 00124 attribs.local_durable_, remote.durable_); 00125 return AcceptConnectResult(link._retn()); 00126 }
bool OpenDDS::DCPS::RtpsUdpTransport::configure_i | ( | TransportInst * | config | ) | [private, virtual] |
Concrete subclass gets a shot at the config object. The subclass will likely downcast the TransportInst object to a subclass type that it expects/requires.
Implements OpenDDS::DCPS::TransportImpl.
Definition at line 246 of file RtpsUdpTransport.cpp.
References OpenDDS::DCPS::TransportImpl::config(), config_i_, OpenDDS::DCPS::TransportImpl::create_reactor_task(), default_listener_, make_datalink(), OpenDDS::DCPS::open_appropriate_socket_type(), TheServiceParticipant, and unicast_socket_.
00247 { 00248 config_i_ = RtpsUdpInst_rch(dynamic_cast<RtpsUdpInst*>(config), false); 00249 00250 if (config_i_.is_nil()) { 00251 ACE_ERROR_RETURN((LM_ERROR, 00252 ACE_TEXT("(%P|%t) ERROR: ") 00253 ACE_TEXT("RtpsUdpTransport::configure_i: ") 00254 ACE_TEXT("invalid configuration!\n")), 00255 false); 00256 } 00257 00258 // Override with DCPSDefaultAddress. 00259 if (this->config_i_->local_address() == ACE_INET_Addr () && 00260 !TheServiceParticipant->default_address ().empty ()) { 00261 this->config_i_->local_address(0, TheServiceParticipant->default_address ().c_str ()); 00262 } 00263 00264 // Open the socket here so that any addresses/ports left 00265 // unspecified in the RtpsUdpInst are known by the time we get to 00266 // connection_info_i(). Opening the sockets here also allows us to 00267 // detect and report errors during DataReader/Writer setup instead 00268 // of during association. 00269 00270 if (!open_appropriate_socket_type(unicast_socket_, config_i_->local_address())) { 00271 ACE_ERROR_RETURN((LM_ERROR, 00272 ACE_TEXT("(%P|%t) ERROR: ") 00273 ACE_TEXT("RtpsUdpTransport::configure_i: open_appropriate_socket_type:") 00274 ACE_TEXT("%m\n")), 00275 false); 00276 } 00277 00278 if (config_i_->local_address().get_port_number() == 0) { 00279 00280 ACE_INET_Addr address; 00281 if (unicast_socket_.get_local_addr(address) != 0) { 00282 ACE_ERROR_RETURN((LM_ERROR, 00283 ACE_TEXT("(%P|%t) ERROR: RtpsUdpDataLink::configure_i - %p\n"), 00284 ACE_TEXT("cannot get local addr")), false); 00285 } 00286 config_i_->local_address_set_port(address.get_port_number()); 00287 } 00288 00289 create_reactor_task(); 00290 00291 if (config_i_->opendds_discovery_default_listener_) { 00292 RtpsUdpDataLink_rch link = 00293 make_datalink(config_i_->opendds_discovery_guid_.guidPrefix); 00294 link->default_listener(config_i_->opendds_discovery_default_listener_); 00295 default_listener_ = 00296 dynamic_cast<TransportClient*>(config_i_->opendds_discovery_default_listener_); 00297 } 00298 00299 return true; 00300 }
TransportImpl::AcceptConnectResult OpenDDS::DCPS::RtpsUdpTransport::connect_datalink | ( | const RemoteTransport & | remote, | |
const ConnectionAttribs & | attribs, | |||
TransportClient * | client | |||
) | [private, virtual] |
Definition at line 71 of file RtpsUdpTransport.cpp.
References OpenDDS::DCPS::RcHandle< T >::_retn(), OpenDDS::DCPS::TransportImpl::add_pending_connection(), connections_lock_, OpenDDS::DCPS::RcHandle< T >::in(), OpenDDS::DCPS::RcHandle< T >::is_nil(), link_, make_datalink(), OpenDDS::DCPS::TransportClient::repo_id_, use_datalink(), and VDBG_LVL.
00074 { 00075 GuardThreadType guard_links(this->links_lock_); 00076 RtpsUdpDataLink_rch link = link_; 00077 if (link_.is_nil()) { 00078 link = make_datalink(attribs.local_id_.guidPrefix); 00079 if (link.is_nil()) { 00080 return AcceptConnectResult(); 00081 } 00082 } 00083 00084 use_datalink(attribs.local_id_, remote.repo_id_, remote.blob_, 00085 attribs.local_reliable_, remote.reliable_, 00086 attribs.local_durable_, remote.durable_); 00087 00088 if (0 == std::memcmp(attribs.local_id_.guidPrefix, remote.repo_id_.guidPrefix, 00089 sizeof(GuidPrefix_t))) { 00090 return AcceptConnectResult(link._retn()); // "loopback" connection return link right away 00091 } 00092 00093 if (link->check_handshake_complete(attribs.local_id_, remote.repo_id_)){ 00094 return AcceptConnectResult(link._retn()); 00095 } 00096 00097 if (!link->add_on_start_callback(client, remote.repo_id_)) { 00098 // link was started by the reactor thread before we could add a callback 00099 VDBG_LVL((LM_DEBUG, "(%P|%t) RtpsUdpTransport::connect_datalink got link.\n"), 2); 00100 return AcceptConnectResult(link._retn()); 00101 } 00102 00103 GuardType guard(connections_lock_); 00104 add_pending_connection(client, link.in()); 00105 VDBG_LVL((LM_DEBUG, "(%P|%t) RtpsUdpTransport::connect_datalink pending.\n"), 2); 00106 return AcceptConnectResult(AcceptConnectResult::ACR_SUCCESS); 00107 }
bool OpenDDS::DCPS::RtpsUdpTransport::connection_info_i | ( | TransportLocator & | info | ) | const [private, virtual] |
Called by our connection_info() method to allow the concrete TransportImpl subclass to do the dirty work since it really is the one that knows how to populate the supplied TransportLocator object.
Implements OpenDDS::DCPS::TransportImpl.
Definition at line 185 of file RtpsUdpTransport.cpp.
References config_i_.
00186 { 00187 this->config_i_->populate_locator(info); 00188 return true; 00189 }
ACE_INET_Addr OpenDDS::DCPS::RtpsUdpTransport::get_connection_addr | ( | const TransportBLOB & | data, | |
bool & | requires_inline_qos | |||
) | const [private] |
Definition at line 158 of file RtpsUdpTransport.cpp.
References OpenDDS::RTPS::blob_to_locators(), config_i_, OpenDDS::RTPS::locator_to_address(), map_ipv4_to_ipv6(), and DDS::RETCODE_OK.
Referenced by register_for_reader(), register_for_writer(), and use_datalink().
00160 { 00161 using namespace OpenDDS::RTPS; 00162 LocatorSeq locators; 00163 DDS::ReturnCode_t result = 00164 blob_to_locators(remote, locators, requires_inline_qos); 00165 if (result != DDS::RETCODE_OK) { 00166 return ACE_INET_Addr(); 00167 } 00168 00169 for (CORBA::ULong i = 0; i < locators.length(); ++i) { 00170 ACE_INET_Addr addr; 00171 // If conversion was successful 00172 if (locator_to_address(addr, locators[i], map_ipv4_to_ipv6()) == 0) { 00173 // if this is a unicast address, or if we are allowing multicast 00174 if (!addr.is_multicast() || config_i_->use_multicast_) { 00175 return addr; 00176 } 00177 } 00178 } 00179 00180 // Return default address 00181 return ACE_INET_Addr(); 00182 }
RtpsUdpDataLink * OpenDDS::DCPS::RtpsUdpTransport::make_datalink | ( | const GuidPrefix_t & | local_prefix | ) | [private] |
Definition at line 40 of file RtpsUdpTransport.cpp.
References config_i_, OpenDDS::DCPS::RcHandle< T >::in(), link_, OpenDDS::DCPS::TransportImpl::reactor_task(), and unicast_socket_.
Referenced by accept_datalink(), configure_i(), connect_datalink(), register_for_reader(), and register_for_writer().
00041 { 00042 TransportReactorTask_rch rt = reactor_task(); 00043 ACE_NEW_RETURN(link_, 00044 RtpsUdpDataLink(this, local_prefix, config_i_.in(), rt.in()), 00045 0); 00046 00047 RtpsUdpSendStrategy* send_strategy; 00048 ACE_NEW_RETURN(send_strategy, RtpsUdpSendStrategy(link_.in()), 0); 00049 link_->send_strategy(send_strategy); 00050 00051 RtpsUdpReceiveStrategy* recv_strategy; 00052 ACE_NEW_RETURN(recv_strategy, RtpsUdpReceiveStrategy(link_.in()), 0); 00053 link_->receive_strategy(recv_strategy); 00054 00055 if (!link_->open(unicast_socket_)) { 00056 ACE_ERROR_RETURN((LM_ERROR, 00057 ACE_TEXT("(%P|%t) ERROR: ") 00058 ACE_TEXT("RtpsUdpTransport::make_datalink: ") 00059 ACE_TEXT("failed to open DataLink for socket %d\n"), 00060 unicast_socket_.get_handle()), 00061 0); 00062 } 00063 00064 // RtpsUdpDataLink now owns the socket 00065 unicast_socket_.set_handle(ACE_INVALID_HANDLE); 00066 00067 return RtpsUdpDataLink_rch(link_)._retn(); 00068 }
bool OpenDDS::DCPS::RtpsUdpTransport::map_ipv4_to_ipv6 | ( | ) | const [private] |
Definition at line 328 of file RtpsUdpTransport.cpp.
References link_.
Referenced by get_connection_addr().
00329 { 00330 bool map = false; 00331 ACE_INET_Addr tmp; 00332 link_->unicast_socket().get_local_addr(tmp); 00333 if (tmp.get_type() != AF_INET) { 00334 map = true; 00335 } 00336 return map; 00337 }
void OpenDDS::DCPS::RtpsUdpTransport::pre_detach | ( | TransportClient * | client | ) | [private, virtual] |
Reimplemented from OpenDDS::DCPS::TransportImpl.
Definition at line 319 of file RtpsUdpTransport.cpp.
References default_listener_, OpenDDS::DCPS::RcHandle< T >::is_nil(), and link_.
00320 { 00321 if (default_listener_ && !link_.is_nil() && c == default_listener_) { 00322 link_->default_listener(0); 00323 default_listener_ = 0; 00324 } 00325 }
void OpenDDS::DCPS::RtpsUdpTransport::register_for_reader | ( | const RepoId & | participant, | |
const RepoId & | writerid, | |||
const RepoId & | readerid, | |||
const TransportLocatorSeq & | locators, | |||
OpenDDS::DCPS::DiscoveryListener * | listener | |||
) | [private, virtual] |
Reimplemented from OpenDDS::DCPS::TransportImpl.
Definition at line 192 of file RtpsUdpTransport.cpp.
References config_i_, get_connection_addr(), OpenDDS::DCPS::GUID_t::guidPrefix, link_, and make_datalink().
00197 { 00198 const TransportBLOB* blob = this->config_i_->get_blob(locators); 00199 if (!blob) 00200 return; 00201 if (link_ == 0) { 00202 make_datalink(participant.guidPrefix); 00203 } 00204 bool requires_inline_qos; 00205 link_->register_for_reader(writerid, readerid, get_connection_addr(*blob, requires_inline_qos), listener); 00206 }
void OpenDDS::DCPS::RtpsUdpTransport::register_for_writer | ( | const RepoId & | , | |
const RepoId & | , | |||
const RepoId & | , | |||
const TransportLocatorSeq & | , | |||
DiscoveryListener * | ||||
) | [private, virtual] |
Reimplemented from OpenDDS::DCPS::TransportImpl.
Definition at line 219 of file RtpsUdpTransport.cpp.
References config_i_, get_connection_addr(), OpenDDS::DCPS::GUID_t::guidPrefix, link_, and make_datalink().
00224 { 00225 const TransportBLOB* blob = this->config_i_->get_blob(locators); 00226 if (!blob) 00227 return; 00228 if (link_ == 0) { 00229 make_datalink(participant.guidPrefix); 00230 } 00231 bool requires_inline_qos; 00232 link_->register_for_writer(readerid, writerid, get_connection_addr(*blob, requires_inline_qos), listener); 00233 }
void OpenDDS::DCPS::RtpsUdpTransport::release_datalink | ( | DataLink * | link | ) | [private, virtual] |
The DataLink itself calls this method when it thinks it is no longer used for any associations. This occurs during a "remove associations" operation being performed by some TransportClient that uses this TransportImpl. The TransportClient is known to have acquired our reservation_lock_, so there won't be any reserve_datalink() calls being made from any other threads while we perform this release.
Implements OpenDDS::DCPS::TransportImpl.
Definition at line 313 of file RtpsUdpTransport.cpp.
00314 { 00315 // No-op for rtps_udp: keep the link_ around until the transport is shut down. 00316 }
void OpenDDS::DCPS::RtpsUdpTransport::shutdown_i | ( | ) | [private, virtual] |
Called during the shutdown() method in order to give the concrete TransportImpl subclass a chance to do something when the shutdown "event" occurs.
Implements OpenDDS::DCPS::TransportImpl.
Definition at line 303 of file RtpsUdpTransport.cpp.
References config_i_, OpenDDS::DCPS::RcHandle< T >::is_nil(), and link_.
00304 { 00305 if (!link_.is_nil()) { 00306 link_->transport_shutdown(); 00307 } 00308 link_ = 0; 00309 config_i_ = 0; 00310 }
void OpenDDS::DCPS::RtpsUdpTransport::stop_accepting_or_connecting | ( | TransportClient * | client, | |
const RepoId & | remote_id | |||
) | [private, virtual] |
stop_accepting_or_connecting() is called from TransportClient to terminate the accepting process begun by accept_datalink() or connect_datalink(). This allows the TransportImpl to clean up any resources associated with this pending connection. The TransportClient* passed in to accept or connect is not valid after this method is called.
Implements OpenDDS::DCPS::TransportImpl.
Definition at line 130 of file RtpsUdpTransport.cpp.
References connections_lock_, and OpenDDS::DCPS::TransportImpl::OPENDDS_MULTIMAP().
00132 { 00133 GuardType guard(connections_lock_); 00134 typedef OPENDDS_MULTIMAP(TransportClient*, DataLink_rch)::iterator iter_t; 00135 const std::pair<iter_t, iter_t> range = 00136 pending_connections_.equal_range(client); 00137 for (iter_t iter = range.first; iter != range.second; ++iter) { 00138 iter->second->remove_on_start_callback(client, remote_id); 00139 } 00140 pending_connections_.erase(range.first, range.second); 00141 }
virtual OPENDDS_STRING OpenDDS::DCPS::RtpsUdpTransport::transport_type | ( | ) | const [inline, private, virtual] |
void OpenDDS::DCPS::RtpsUdpTransport::unregister_for_reader | ( | const RepoId & | participant, | |
const RepoId & | writerid, | |||
const RepoId & | readerid | |||
) | [private, virtual] |
Reimplemented from OpenDDS::DCPS::TransportImpl.
Definition at line 209 of file RtpsUdpTransport.cpp.
References link_.
00212 { 00213 if (link_ != 0) { 00214 link_->unregister_for_reader(writerid, readerid); 00215 } 00216 }
void OpenDDS::DCPS::RtpsUdpTransport::unregister_for_writer | ( | const RepoId & | , | |
const RepoId & | , | |||
const RepoId & | ||||
) | [private, virtual] |
Reimplemented from OpenDDS::DCPS::TransportImpl.
Definition at line 236 of file RtpsUdpTransport.cpp.
References link_.
00239 { 00240 if (link_ != 0) { 00241 link_->unregister_for_writer(readerid, writerid); 00242 } 00243 }
void OpenDDS::DCPS::RtpsUdpTransport::use_datalink | ( | const RepoId & | local_id, | |
const RepoId & | remote_id, | |||
const TransportBLOB & | remote_data, | |||
bool | local_reliable, | |||
bool | remote_reliable, | |||
bool | local_durable, | |||
bool | remote_durable | |||
) | [private] |
Definition at line 144 of file RtpsUdpTransport.cpp.
References get_connection_addr(), and link_.
Referenced by accept_datalink(), and connect_datalink().
00149 { 00150 bool requires_inline_qos; 00151 ACE_INET_Addr addr = get_connection_addr(remote_data, requires_inline_qos); 00152 link_->add_locator(remote_id, addr, requires_inline_qos); 00153 link_->associated(local_id, remote_id, local_reliable, remote_reliable, 00154 local_durable, remote_durable); 00155 }
Definition at line 83 of file RtpsUdpTransport.h.
Referenced by configure_i(), connection_info_i(), get_connection_addr(), make_datalink(), register_for_reader(), register_for_writer(), and shutdown_i().
Definition at line 94 of file RtpsUdpTransport.h.
Referenced by connect_datalink(), and stop_accepting_or_connecting().
RTPS uses only one link per transport. This link can be safely reused by any clients that belong to the same domain participant (same GUID prefix). Use by a second participant is not possible because the network location returned by connection_info_i() can't be shared among participants.
Definition at line 101 of file RtpsUdpTransport.h.
Referenced by accept_datalink(), connect_datalink(), make_datalink(), map_ipv4_to_ipv6(), pre_detach(), register_for_reader(), register_for_writer(), shutdown_i(), unregister_for_reader(), unregister_for_writer(), and use_datalink().
Definition at line 88 of file RtpsUdpTransport.h.
ACE_SOCK_Dgram OpenDDS::DCPS::RtpsUdpTransport::unicast_socket_ [private] |
Definition at line 104 of file RtpsUdpTransport.h.
Referenced by configure_i(), and make_datalink().