58 if (link->
open(remote_address)) {
64 ACE_TEXT(
"UdpTransport::make_datalink: ")
65 ACE_TEXT(
"failed to open DataLink!\n")));
80 const bool active =
true;
84 "prio=%d, addr=%C, is_loopback=%d, is_active=%d\n",
95 VDBG((
LM_DEBUG,
"(%P|%t) UdpTransport::connect_datalink found\n"));
106 VDBG((
LM_DEBUG,
"(%P|%t) UdpTransport::connect_datalink connected\n"));
124 attribs.
priority_, cfg->local_address(),
false );
127 "prio=%d, addr=%C, is_loopback=%d, is_active=%d\n",
132 VDBG((
LM_DEBUG,
"(%P|%t) UdpTransport::accept_datalink found\n"));
137 VDBG((
LM_DEBUG,
"(%P|%t) UdpTransport::accept_datalink completed\n"));
142 VDBG((
LM_DEBUG,
"(%P|%t) UdpTransport::accept_datalink pending\n"));
154 VDBG((
LM_DEBUG,
"(%P|%t) UdpTransport::stop_accepting_or_connecting\n"));
160 for (
size_t i = 0; i < it->second.size(); ++i) {
161 if (it->second[i].first == client && it->second[i].second == remote_id) {
162 it->second.erase(it->second.begin() + i);
166 if (it->second.empty()) {
201 it->second->transport_shutdown();
216 cfg->populate_locator(info, flags);
228 size_t len = data.length();
229 const char* buffer =
reinterpret_cast<const char*
>(data.get_buffer());
232 if (cdr >> network_resource) {
233 network_resource.
to_addr(local_address);
236 return local_address;
247 if (link == static_cast<DataLink*>(it->second.in())) {
262 ACE_InputCDR cdr((
const char*)remote.get_buffer(), remote.length());
264 if (!(cdr >> network_resource)) {
266 ACE_TEXT(
"(%P|%t) ERROR: UdpTransport::blob_to_key")
267 ACE_TEXT(
" failed to de-serialize the NetworkResource\n")));
271 network_resource.
to_addr(remote_address);
272 const bool is_loopback = remote_address == local_addr;
274 return PriorityKey(priority, remote_address, is_loopback, active);
288 Serializer serializer(payload.get(), encoding_kind);
289 serializer >> priority;
291 blob.length(blob.maximum());
292 serializer.read_octet_array(blob.get_buffer(), blob.length());
299 const char ack_data = 23;
301 VDBG((
LM_DEBUG,
"(%P|%t) UdpTransport::passive_connection failed to send ack\n"));
315 VDBG((
LM_DEBUG,
"(%P|%t) UdpTransport::passive_connection completing\n"));
332 for (
size_t i = 0; i < tmp.size(); ++i) {
335 const Callbacks::iterator tmp_iter =
find(pend->second.begin(),
338 if (tmp_iter != pend->second.end()) {
340 GUID_t remote_repo = tmp.at(i).second;
344 client->use_datalink(remote_repo, link);
353 VDBG((
LM_DEBUG,
"(%P|%t) UdpTransport::passive_connection pending\n"));
RcHandle< T > rchandle_from(T *pointer)
virtual void release_datalink(DataLink *link)
std::pair< TransportClient_wrch, GUID_t > OnStartCallback
virtual AcceptConnectResult connect_datalink(const RemoteTransport &remote, const ConnectionAttribs &attribs, const TransportClient_rch &client)
void transport_shutdown()
bool open(const ACE_INET_Addr &remote_address)
Encapsulate a priority value and internet address as a key.
void create_reactor_task(bool useAsyncSend=false, const OPENDDS_STRING &name="")
UdpTransport(const UdpInst_rch &inst)
void passive_connection(const ACE_INET_Addr &remote_address, const ReceivedDataSample &data)
RcHandle< UdpDataLink > UdpDataLink_rch
virtual void stop_accepting_or_connecting(const TransportClient_wrch &client, const GUID_t &remote_id, bool disassociate, bool association_failed)
ACE_INET_Addr & address()
std::vector< DataLink::OnStartCallback > Callbacks
Class to serialize and deserialize data for DDS.
ACE_INET_Addr get_connection_addr(const TransportBLOB &data) const
Holds a data sample received by the transport.
std::set< PriorityKey > pending_server_link_keys_
bool is_shut_down() const
Defines a wrapper around address info which is used for advertise.
UdpDataLink_rch server_link_
The single datalink for the passive side. No locking required.
bool configure_i(const UdpInst_rch &config)
PendConnMap pending_connections_
virtual bool connection_info_i(TransportLocator &info, ConnectionInfoFlags flags) const
ACE_SOCK_Dgram & socket()
ReactorTask_rch reactor_task()
ACE_Recursive_Thread_Mutex connections_lock_
std::set< PriorityKey > server_link_keys_
PriorityKey blob_to_key(const TransportBLOB &remote, Priority priority, ACE_INET_Addr local_addr, bool active)
size_t data_length() const
total length of usable bytes (between rd_ptr and wr_ptr) of all Data Blocks
void stop()
The stop method is used to stop the DataLink prior to shutdown.
UdpDataLink_rch make_datalink(const ACE_INET_Addr &remote_address, Priority priority, bool active)
RcHandle< T > static_rchandle_cast(const RcHandle< U > &h)
LockType client_links_lock_
This lock is used to protect the client_links_ data member.
#define VDBG_LVL(DBG_ARGS, LEVEL)
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
UdpDataLinkMap client_links_
ACE_Message_Block * data(ACE_Allocator *mb_alloc=0) const
RcHandle< T > lock() const
RcHandle< T > dynamic_rchandle_cast(const RcHandle< U > &h)
virtual void shutdown_i()
void to_addr(ACE_INET_Addr &addr) const
#define TheServiceParticipant
ssize_t send(const void *buf, size_t n, const ACE_Addr &addr, int flags=0) const
DDS::OctetSeq TransportBLOB
The Internal API and Implementation of OpenDDS.
virtual AcceptConnectResult accept_datalink(const RemoteTransport &remote, const ConnectionAttribs &attribs, const TransportClient_rch &client)
UdpInst_rch config() const
size_t ConnectionInfoFlags
TransportInst_rch config() const
int find(Container &c, const Key &key, typename Container::mapped_type *&value)