#include <RtpsUdpTransport.h>
Public Member Functions | |
RtpsUdpTransport (RtpsUdpInst &inst) | |
RtpsUdpInst & | config () const |
Private Types | |
typedef ACE_Thread_Mutex | ThreadLockType |
typedef ACE_Guard< ThreadLockType > | GuardThreadType |
typedef ACE_SYNCH_MUTEX | LockType |
typedef ACE_Guard< LockType > | GuardType |
Private Member Functions | |
virtual AcceptConnectResult | connect_datalink (const RemoteTransport &remote, const ConnectionAttribs &attribs, const TransportClient_rch &client) |
virtual AcceptConnectResult | accept_datalink (const RemoteTransport &remote, const ConnectionAttribs &attribs, const TransportClient_rch &client) |
virtual void | stop_accepting_or_connecting (const TransportClient_wrch &client, const RepoId &remote_id) |
bool | configure_i (RtpsUdpInst &config) |
virtual void | shutdown_i () |
virtual void | register_for_reader (const RepoId &participant, const RepoId &writerid, const RepoId &readerid, const TransportLocatorSeq &locators, OpenDDS::DCPS::DiscoveryListener *listener) |
virtual void | unregister_for_reader (const RepoId &participant, const RepoId &writerid, const RepoId &readerid) |
virtual void | register_for_writer (const RepoId &, const RepoId &, const RepoId &, const TransportLocatorSeq &, DiscoveryListener *) |
virtual void | unregister_for_writer (const RepoId &, const RepoId &, const RepoId &) |
virtual bool | connection_info_i (TransportLocator &info) const |
ACE_INET_Addr | get_connection_addr (const TransportBLOB &data, bool *requires_inline_qos=0, unsigned int *blob_bytes_read=0) const |
virtual void | release_datalink (DataLink *link) |
virtual OPENDDS_STRING | transport_type () const |
RtpsUdpDataLink_rch | make_datalink (const GuidPrefix_t &local_prefix) |
void | use_datalink (const RepoId &local_id, const RepoId &remote_id, const TransportBLOB &remote_data, bool local_reliable, bool remote_reliable, bool local_durable, bool remote_durable) |
bool | map_ipv4_to_ipv6 () const |
Private Attributes | |
ThreadLockType | links_lock_ |
LockType | connections_lock_ |
RtpsUdpDataLink_rch | link_ |
ACE_SOCK_Dgram | unicast_socket_ |
TransportClient_wrch | default_listener_ |
Definition at line 30 of file RtpsUdpTransport.h.
typedef ACE_Guard<ThreadLockType> OpenDDS::DCPS::RtpsUdpTransport::GuardThreadType [private] |
Definition at line 99 of file RtpsUdpTransport.h.
typedef ACE_Guard<LockType> OpenDDS::DCPS::RtpsUdpTransport::GuardType [private] |
Reimplemented from OpenDDS::DCPS::TransportImpl.
Definition at line 105 of file RtpsUdpTransport.h.
typedef ACE_SYNCH_MUTEX OpenDDS::DCPS::RtpsUdpTransport::LockType [private] |
This protects the connections_ and the pending_connections_ data members.
Reimplemented from OpenDDS::DCPS::TransportImpl.
Definition at line 104 of file RtpsUdpTransport.h.
typedef ACE_Thread_Mutex OpenDDS::DCPS::RtpsUdpTransport::ThreadLockType [private] |
Definition at line 98 of file RtpsUdpTransport.h.
OpenDDS::DCPS::RtpsUdpTransport::RtpsUdpTransport | ( | RtpsUdpInst & | inst | ) |
Definition at line 31 of file RtpsUdpTransport.cpp.
References configure_i(), defined(), if(), and OpenDDS::DCPS::TransportImpl::open().
00032 : TransportImpl(inst) 00033 #if defined(OPENDDS_SECURITY) 00034 , local_crypto_handle_(DDS::HANDLE_NIL) 00035 #endif 00036 { 00037 if (! (configure_i(inst) && open())) { 00038 throw Transport::UnableToCreate(); 00039 } 00040 }
TransportImpl::AcceptConnectResult OpenDDS::DCPS::RtpsUdpTransport::accept_datalink | ( | const RemoteTransport & | remote, | |
const ConnectionAttribs & | attribs, | |||
const TransportClient_rch & | client | |||
) | [private, virtual] |
accept_datalink() is called from TransportClient to initiate an association as the passive peer. A DataLink may be returned if one is already connected and ready to use, otherwise passively wait for a physical connection from the active side (either in the form of a connection event or handshaking message). Upon completion of the physical connection, the transport calls back to TransportClient::use_datalink().
Implements OpenDDS::DCPS::TransportImpl.
Definition at line 116 of file RtpsUdpTransport.cpp.
References OpenDDS::DCPS::TransportImpl::RemoteTransport::blob_, OpenDDS::DCPS::TransportImpl::RemoteTransport::durable_, OpenDDS::DCPS::GUID_t::guidPrefix, OpenDDS::DCPS::RcHandle< T >::is_nil(), link_, links_lock_, OpenDDS::DCPS::TransportImpl::ConnectionAttribs::local_durable_, OpenDDS::DCPS::TransportImpl::ConnectionAttribs::local_id_, OpenDDS::DCPS::TransportImpl::ConnectionAttribs::local_reliable_, make_datalink(), OpenDDS::DCPS::TransportImpl::RemoteTransport::reliable_, OpenDDS::DCPS::TransportImpl::RemoteTransport::repo_id_, and use_datalink().
00119 { 00120 GuardThreadType guard_links(this->links_lock_); 00121 if (link_.is_nil()) { 00122 link_= make_datalink(attribs.local_id_.guidPrefix); 00123 if (link_.is_nil()) { 00124 return AcceptConnectResult(); 00125 } 00126 } 00127 RtpsUdpDataLink_rch link = link_; 00128 00129 use_datalink(attribs.local_id_, remote.repo_id_, remote.blob_, 00130 attribs.local_reliable_, remote.reliable_, 00131 attribs.local_durable_, remote.durable_); 00132 return AcceptConnectResult(link); 00133 }
RtpsUdpInst & OpenDDS::DCPS::RtpsUdpTransport::config | ( | ) | const |
Expose the configuration information so others can see what we can do.
Reimplemented from OpenDDS::DCPS::TransportImpl.
Definition at line 43 of file RtpsUdpTransport.cpp.
Referenced by connection_info_i(), get_connection_addr(), make_datalink(), register_for_reader(), and register_for_writer().
00044 { 00045 return static_cast<RtpsUdpInst&>(TransportImpl::config()); 00046 }
bool OpenDDS::DCPS::RtpsUdpTransport::configure_i | ( | RtpsUdpInst & | config | ) | [private] |
Definition at line 271 of file RtpsUdpTransport.cpp.
References ACE_TEXT(), OpenDDS::DCPS::TransportImpl::create_reactor_task(), ACE_SOCK::get_local_addr(), ACE_INET_Addr::get_port_number(), OpenDDS::DCPS::GUID_t::guidPrefix, link_, LM_ERROR, OpenDDS::DCPS::RtpsUdpInst::local_address(), OpenDDS::DCPS::RtpsUdpInst::local_address_set_port(), make_datalink(), OpenDDS::DCPS::open_appropriate_socket_type(), OpenDDS::DCPS::RtpsUdpInst::opendds_discovery_default_listener_, OpenDDS::DCPS::RtpsUdpInst::opendds_discovery_guid_, TheServiceParticipant, and unicast_socket_.
Referenced by RtpsUdpTransport().
00272 { 00273 // Override with DCPSDefaultAddress. 00274 if (config.local_address() == ACE_INET_Addr () && 00275 !TheServiceParticipant->default_address ().empty ()) { 00276 config.local_address(0, TheServiceParticipant->default_address ().c_str ()); 00277 } 00278 00279 // Open the socket here so that any addresses/ports left 00280 // unspecified in the RtpsUdpInst are known by the time we get to 00281 // connection_info_i(). Opening the sockets here also allows us to 00282 // detect and report errors during DataReader/Writer setup instead 00283 // of during association. 00284 00285 if (!open_appropriate_socket_type(unicast_socket_, config.local_address())) { 00286 ACE_ERROR_RETURN((LM_ERROR, 00287 ACE_TEXT("(%P|%t) ERROR: ") 00288 ACE_TEXT("RtpsUdpTransport::configure_i: open_appropriate_socket_type:") 00289 ACE_TEXT("%m\n")), 00290 false); 00291 } 00292 00293 if (config.local_address().get_port_number() == 0) { 00294 00295 ACE_INET_Addr address; 00296 if (unicast_socket_.get_local_addr(address) != 0) { 00297 ACE_ERROR_RETURN((LM_ERROR, 00298 ACE_TEXT("(%P|%t) ERROR: RtpsUdpDataLink::configure_i - %p\n"), 00299 ACE_TEXT("cannot get local addr")), false); 00300 } 00301 config.local_address_set_port(address.get_port_number()); 00302 } 00303 00304 create_reactor_task(); 00305 00306 if (config.opendds_discovery_default_listener_) { 00307 link_= make_datalink(config.opendds_discovery_guid_.guidPrefix); 00308 link_->default_listener(*config.opendds_discovery_default_listener_); 00309 } 00310 00311 return true; 00312 }
TransportImpl::AcceptConnectResult OpenDDS::DCPS::RtpsUdpTransport::connect_datalink | ( | const RemoteTransport & | remote, | |
const ConnectionAttribs & | attribs, | |||
const TransportClient_rch & | client | |||
) | [private, virtual] |
connect_datalink() is called from TransportClient to initiate an association as the active peer. A DataLink may be returned if one is already connected and ready to use, otherwise initiate a connection to the passive side and return from this method. Upon completion of the physical connection, the transport calls back to TransportClient::use_datalink().
Implements OpenDDS::DCPS::TransportImpl.
Definition at line 74 of file RtpsUdpTransport.cpp.
References OpenDDS::DCPS::TransportImpl::AcceptConnectResult::ACR_SUCCESS, OpenDDS::DCPS::TransportImpl::add_pending_connection(), OpenDDS::DCPS::TransportImpl::RemoteTransport::blob_, connections_lock_, OpenDDS::DCPS::TransportImpl::RemoteTransport::durable_, OpenDDS::DCPS::GUID_t::guidPrefix, OpenDDS::DCPS::RcHandle< T >::is_nil(), link_, links_lock_, LM_DEBUG, OpenDDS::DCPS::TransportImpl::ConnectionAttribs::local_durable_, OpenDDS::DCPS::TransportImpl::ConnectionAttribs::local_id_, OpenDDS::DCPS::TransportImpl::ConnectionAttribs::local_reliable_, make_datalink(), OpenDDS::DCPS::TransportImpl::RemoteTransport::reliable_, OpenDDS::DCPS::TransportImpl::RemoteTransport::repo_id_, use_datalink(), and VDBG_LVL.
00077 { 00078 GuardThreadType guard_links(this->links_lock_); 00079 00080 if (link_.is_nil()) { 00081 link_ = make_datalink(attribs.local_id_.guidPrefix); 00082 if (link_.is_nil()) { 00083 return AcceptConnectResult(); 00084 } 00085 } 00086 00087 RtpsUdpDataLink_rch link = link_; 00088 00089 00090 use_datalink(attribs.local_id_, remote.repo_id_, remote.blob_, 00091 attribs.local_reliable_, remote.reliable_, 00092 attribs.local_durable_, remote.durable_); 00093 00094 if (0 == std::memcmp(attribs.local_id_.guidPrefix, remote.repo_id_.guidPrefix, 00095 sizeof(GuidPrefix_t))) { 00096 return AcceptConnectResult(link); // "loopback" connection return link right away 00097 } 00098 00099 if (link->check_handshake_complete(attribs.local_id_, remote.repo_id_)){ 00100 return AcceptConnectResult(link); 00101 } 00102 00103 if (!link->add_on_start_callback(client, remote.repo_id_)) { 00104 // link was started by the reactor thread before we could add a callback 00105 VDBG_LVL((LM_DEBUG, "(%P|%t) RtpsUdpTransport::connect_datalink got link.\n"), 2); 00106 return AcceptConnectResult(link); 00107 } 00108 00109 GuardType guard(connections_lock_); 00110 add_pending_connection(client, link); 00111 VDBG_LVL((LM_DEBUG, "(%P|%t) RtpsUdpTransport::connect_datalink pending.\n"), 2); 00112 return AcceptConnectResult(AcceptConnectResult::ACR_SUCCESS); 00113 }
bool OpenDDS::DCPS::RtpsUdpTransport::connection_info_i | ( | TransportLocator & | local_info | ) | const [private, virtual] |
Called by our connection_info() method to allow the concrete TransportImpl subclass to do the dirty work since it really is the one that knows how to populate the supplied TransportLocator object.
Implements OpenDDS::DCPS::TransportImpl.
Definition at line 204 of file RtpsUdpTransport.cpp.
References config(), and OpenDDS::DCPS::RtpsUdpInst::populate_locator().
00205 { 00206 this->config().populate_locator(info); 00207 return true; 00208 }
ACE_INET_Addr OpenDDS::DCPS::RtpsUdpTransport::get_connection_addr | ( | const TransportBLOB & | data, | |
bool * | requires_inline_qos = 0 , |
|||
unsigned int * | blob_bytes_read = 0 | |||
) | const [private] |
Definition at line 176 of file RtpsUdpTransport.cpp.
References OpenDDS::RTPS::blob_to_locators(), config(), ACE_INET_Addr::is_multicast(), OpenDDS::RTPS::locator_to_address(), map_ipv4_to_ipv6(), DDS::RETCODE_OK, and OpenDDS::DCPS::RtpsUdpInst::use_multicast_.
Referenced by register_for_reader(), register_for_writer(), and use_datalink().
00179 { 00180 using namespace OpenDDS::RTPS; 00181 LocatorSeq locators; 00182 DDS::ReturnCode_t result = 00183 blob_to_locators(remote, locators, requires_inline_qos, blob_bytes_read); 00184 if (result != DDS::RETCODE_OK) { 00185 return ACE_INET_Addr(); 00186 } 00187 00188 for (CORBA::ULong i = 0; i < locators.length(); ++i) { 00189 ACE_INET_Addr addr; 00190 // If conversion was successful 00191 if (locator_to_address(addr, locators[i], map_ipv4_to_ipv6()) == 0) { 00192 // if this is a unicast address, or if we are allowing multicast 00193 if (!addr.is_multicast() || config().use_multicast_) { 00194 return addr; 00195 } 00196 } 00197 } 00198 00199 // Return default address 00200 return ACE_INET_Addr(); 00201 }
RtpsUdpDataLink_rch OpenDDS::DCPS::RtpsUdpTransport::make_datalink | ( | const GuidPrefix_t & | local_prefix | ) | [private] |
Definition at line 49 of file RtpsUdpTransport.cpp.
References ACE_TEXT(), config(), ACE_IPC_SAP::get_handle(), LM_ERROR, OpenDDS::DCPS::TransportImpl::reactor_task(), OpenDDS::DCPS::ref(), ACE_IPC_SAP::set_handle(), and unicast_socket_.
Referenced by accept_datalink(), configure_i(), connect_datalink(), register_for_reader(), and register_for_writer().
00050 { 00051 00052 RtpsUdpDataLink_rch link = make_rch<RtpsUdpDataLink>(ref(*this), local_prefix, config(), reactor_task()); 00053 00054 #if defined(OPENDDS_SECURITY) 00055 link->local_crypto_handle(local_crypto_handle_); 00056 #endif 00057 00058 if (!link->open(unicast_socket_)) { 00059 ACE_ERROR((LM_ERROR, 00060 ACE_TEXT("(%P|%t) ERROR: ") 00061 ACE_TEXT("RtpsUdpTransport::make_datalink: ") 00062 ACE_TEXT("failed to open DataLink for socket %d\n"), 00063 unicast_socket_.get_handle())); 00064 return RtpsUdpDataLink_rch(); 00065 } 00066 00067 // RtpsUdpDataLink now owns the socket 00068 unicast_socket_.set_handle(ACE_INVALID_HANDLE); 00069 00070 return link; 00071 }
bool OpenDDS::DCPS::RtpsUdpTransport::map_ipv4_to_ipv6 | ( | ) | const [private] |
Definition at line 332 of file RtpsUdpTransport.cpp.
References ACE_Addr::get_type(), and link_.
Referenced by get_connection_addr().
00333 { 00334 bool map = false; 00335 ACE_INET_Addr tmp; 00336 link_->unicast_socket().get_local_addr(tmp); 00337 if (tmp.get_type() != AF_INET) { 00338 map = true; 00339 } 00340 return map; 00341 }
void OpenDDS::DCPS::RtpsUdpTransport::register_for_reader | ( | const RepoId & | participant, | |
const RepoId & | writerid, | |||
const RepoId & | readerid, | |||
const TransportLocatorSeq & | locators, | |||
OpenDDS::DCPS::DiscoveryListener * | listener | |||
) | [private, virtual] |
Reimplemented from OpenDDS::DCPS::TransportImpl.
Definition at line 211 of file RtpsUdpTransport.cpp.
References config(), OpenDDS::DCPS::RtpsUdpInst::get_blob(), get_connection_addr(), OpenDDS::DCPS::GUID_t::guidPrefix, link_, and make_datalink().
00216 { 00217 const TransportBLOB* blob = this->config().get_blob(locators); 00218 if (!blob) { 00219 return; 00220 } 00221 00222 if (!link_) { 00223 link_ = make_datalink(participant.guidPrefix); 00224 } 00225 00226 link_->register_for_reader(writerid, readerid, get_connection_addr(*blob), 00227 listener); 00228 }
void OpenDDS::DCPS::RtpsUdpTransport::register_for_writer | ( | const RepoId & | participant, | |
const RepoId & | readerid, | |||
const RepoId & | writerid, | |||
const TransportLocatorSeq & | locators, | |||
DiscoveryListener * | listener | |||
) | [private, virtual] |
Reimplemented from OpenDDS::DCPS::TransportImpl.
Definition at line 241 of file RtpsUdpTransport.cpp.
References config(), OpenDDS::DCPS::RtpsUdpInst::get_blob(), get_connection_addr(), OpenDDS::DCPS::GUID_t::guidPrefix, link_, and make_datalink().
00246 { 00247 const TransportBLOB* blob = this->config().get_blob(locators); 00248 if (!blob) { 00249 return; 00250 } 00251 00252 if (!link_) { 00253 link_ = make_datalink(participant.guidPrefix); 00254 } 00255 00256 link_->register_for_writer(readerid, writerid, get_connection_addr(*blob), 00257 listener); 00258 }
void OpenDDS::DCPS::RtpsUdpTransport::release_datalink | ( | DataLink * | link | ) | [private, virtual] |
Called by the TransportRegistry when this TransportImpl object is released while the TransportRegistry is handling a release() "event". The DataLink itself calls this method when it thinks it is no longer used for any associations. This occurs during a "remove associations" operation being performed by some TransportClient that uses this TransportImpl. The TransportClient is known to have acquired our reservation_lock_, so there won't be any reserve_datalink() calls being made from any other threads while we perform this release.
Implements OpenDDS::DCPS::TransportImpl.
Definition at line 324 of file RtpsUdpTransport.cpp.
void OpenDDS::DCPS::RtpsUdpTransport::shutdown_i | ( | ) | [private, virtual] |
Called during the shutdown() method in order to give the concrete TransportImpl subclass a chance to do something when the shutdown "event" occurs.
Implements OpenDDS::DCPS::TransportImpl.
Definition at line 315 of file RtpsUdpTransport.cpp.
References OpenDDS::DCPS::RcHandle< T >::is_nil(), link_, and OpenDDS::DCPS::RcHandle< T >::reset().
00316 { 00317 if (!link_.is_nil()) { 00318 link_->transport_shutdown(); 00319 } 00320 link_.reset(); 00321 }
void OpenDDS::DCPS::RtpsUdpTransport::stop_accepting_or_connecting | ( | const TransportClient_wrch & | client, | |
const RepoId & | remote_id | |||
) | [private, virtual] |
stop_accepting_or_connecting() is called from TransportClient to terminate the accepting process begun by accept_datalink() or connect_datalink(). This allows the TransportImpl to clean up any resources associated with this pending connection. The TransportClient* passed in to accept or connect is not valid after this method is called.
Implements OpenDDS::DCPS::TransportImpl.
Definition at line 137 of file RtpsUdpTransport.cpp.
References connections_lock_, and OpenDDS::DCPS::TransportImpl::pending_connections_.
00139 { 00140 GuardType guard(connections_lock_); 00141 typedef PendConnMap::iterator iter_t; 00142 const std::pair<iter_t, iter_t> range = 00143 pending_connections_.equal_range(client); 00144 for (iter_t iter = range.first; iter != range.second; ++iter) { 00145 iter->second->remove_on_start_callback(client, remote_id); 00146 } 00147 pending_connections_.erase(range.first, range.second); 00148 }
virtual OPENDDS_STRING OpenDDS::DCPS::RtpsUdpTransport::transport_type | ( | ) | const [inline, private, virtual] |
Implements OpenDDS::DCPS::TransportImpl.
Definition at line 77 of file RtpsUdpTransport.h.
void OpenDDS::DCPS::RtpsUdpTransport::unregister_for_reader | ( | const RepoId & | participant, | |
const RepoId & | writerid, | |||
const RepoId & | readerid | |||
) | [private, virtual] |
Reimplemented from OpenDDS::DCPS::TransportImpl.
Definition at line 231 of file RtpsUdpTransport.cpp.
References link_.
void OpenDDS::DCPS::RtpsUdpTransport::unregister_for_writer | ( | const RepoId & | , | |
const RepoId & | readerid, | |||
const RepoId & | writerid | |||
) | [private, virtual] |
Reimplemented from OpenDDS::DCPS::TransportImpl.
Definition at line 261 of file RtpsUdpTransport.cpp.
References link_.
void OpenDDS::DCPS::RtpsUdpTransport::use_datalink | ( | const RepoId & | local_id, | |
const RepoId & | remote_id, | |||
const TransportBLOB & | remote_data, | |||
bool | local_reliable, | |||
bool | remote_reliable, | |||
bool | local_durable, | |||
bool | remote_durable | |||
) | [private] |
Definition at line 151 of file RtpsUdpTransport.cpp.
References get_connection_addr(), and link_.
Referenced by accept_datalink(), and connect_datalink().
00156 { 00157 bool requires_inline_qos; 00158 unsigned int blob_bytes_read; 00159 ACE_INET_Addr addr = get_connection_addr(remote_data, &requires_inline_qos, 00160 &blob_bytes_read); 00161 link_->add_locator(remote_id, addr, requires_inline_qos); 00162 00163 #if defined(OPENDDS_SECURITY) 00164 if (remote_data.length() > blob_bytes_read) { 00165 link_->populate_security_handles(local_id, remote_id, 00166 remote_data.get_buffer() + blob_bytes_read, 00167 remote_data.length() - blob_bytes_read); 00168 } 00169 #endif 00170 00171 link_->associated(local_id, remote_id, local_reliable, remote_reliable, 00172 local_durable, remote_durable); 00173 }
Definition at line 106 of file RtpsUdpTransport.h.
Referenced by connect_datalink(), and stop_accepting_or_connecting().
Definition at line 118 of file RtpsUdpTransport.h.
RTPS uses only one link per transport. This link can be safely reused by any clients that belong to the same domain participant (same GUID prefix). Use by a second participant is not possible because the network location returned by connection_info_i() can't be shared among participants.
Definition at line 113 of file RtpsUdpTransport.h.
Referenced by accept_datalink(), configure_i(), connect_datalink(), map_ipv4_to_ipv6(), register_for_reader(), register_for_writer(), shutdown_i(), unregister_for_reader(), unregister_for_writer(), and use_datalink().
Definition at line 100 of file RtpsUdpTransport.h.
Referenced by accept_datalink(), and connect_datalink().
Definition at line 116 of file RtpsUdpTransport.h.
Referenced by configure_i(), and make_datalink().