OpenDDS  Snapshot(2023/04/07-19:43)
Public Member Functions | Protected Member Functions | Protected Attributes | Private Attributes | List of all members
OpenDDS::DCPS::UdpDataLink Class Reference

#include <UdpDataLink.h>

Inheritance diagram for OpenDDS::DCPS::UdpDataLink:
Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::UdpDataLink:
Collaboration graph
[legend]

Public Member Functions

 UdpDataLink (const UdpTransport_rch &transport, Priority priority, const ReactorTask_rch &reactor_task, bool active)
 
bool active () const
 
ReactorTask_rch reactor_task ()
 
ACE_Reactorget_reactor ()
 
ACE_INET_Addrremote_address ()
 
ACE_SOCK_Dgramsocket ()
 
bool open (const ACE_INET_Addr &remote_address)
 
void control_received (ReceivedDataSample &sample, const ACE_INET_Addr &remote_address)
 
- Public Member Functions inherited from OpenDDS::DCPS::DataLink
 DataLink (const TransportImpl_rch &impl, Priority priority, bool is_loopback, bool is_active)
 Only called by our TransportImpl object. More...
 
virtual ~DataLink ()
 
int handle_exception (ACE_HANDLE)
 Reactor invokes this after being notified in schedule_stop or cancel_release. More...
 
void schedule_stop (const MonotonicTimePoint &schedule_to_stop_at)
 
void stop ()
 The stop method is used to stop the DataLink prior to shutdown. More...
 
void resume_send ()
 
virtual int make_reservation (const GUID_t &remote_subscription_id, const GUID_t &local_publication_id, const TransportSendListener_wrch &send_listener, bool reliable)
 
virtual int make_reservation (const GUID_t &remote_publication_id, const GUID_t &local_subscription_id, const TransportReceiveListener_wrch &receive_listener, bool reliable)
 
void release_reservations (GUID_t remote_id, GUID_t local_id, DataLinkSetMap &released_locals)
 
void schedule_delayed_release ()
 
const TimeDurationdatalink_release_delay () const
 
void remove_listener (const GUID_t &local_id)
 
void send_start ()
 
void send (TransportQueueElement *element)
 
void send_stop (GUID_t repoId)
 
virtual RemoveResult remove_sample (const DataSampleElement *sample)
 
virtual void remove_all_msgs (const GUID_t &pub_id)
 
int data_received (ReceivedDataSample &sample, const GUID_t &readerId=GUID_UNKNOWN)
 
void data_received_include (ReceivedDataSample &sample, const RepoIdSet &incl)
 
DataLinkIdType id () const
 Obtain a unique identifier for this DataLink object. More...
 
void transport_shutdown ()
 
void notify (ConnectionNotice notice)
 
virtual void pre_stop_i ()
 
void release_resources ()
 
void terminate_send ()
 
void terminate_send_if_suspended ()
 
bool is_target (const GUID_t &remote_id)
 
void clear_associations ()
 
int handle_timeout (const ACE_Time_Value &tv, const void *arg)
 
int handle_close (ACE_HANDLE h, ACE_Reactor_Mask m)
 
void set_dscp_codepoint (int cp, ACE_SOCK &socket)
 
Prioritytransport_priority ()
 
Priority transport_priority () const
 
bool & is_loopback ()
 
bool is_loopback () const
 
bool & is_active ()
 
bool is_active () const
 
bool cancel_release ()
 
ACE_Message_Blockcreate_control (char submessage_id, DataSampleHeader &header, Message_Block_Ptr data)
 
SendControlStatus send_control (const DataSampleHeader &header, Message_Block_Ptr data)
 
GUIDSeqtarget_intersection (const GUID_t &pub_id, const GUIDSeq &in, size_t &n_subs)
 
TransportImpl_rch impl () const
 
void default_listener (const TransportReceiveListener_wrch &trl)
 
TransportReceiveListener_wrch default_listener () const
 
bool add_on_start_callback (const TransportClient_wrch &client, const GUID_t &remote)
 
void remove_on_start_callback (const TransportClient_wrch &client, const GUID_t &remote)
 
void invoke_on_start_callbacks (bool success)
 
bool invoke_on_start_callbacks (const GUID_t &local, const GUID_t &remote, bool success)
 
void remove_startup_callbacks (const GUID_t &local, const GUID_t &remote)
 
void set_scheduling_release (bool scheduling_release)
 
virtual void send_final_acks (const GUID_t &readerid)
 
virtual WeakRcHandle< ICE::Endpointget_ice_endpoint () const
 
virtual bool is_leading (const GUID_t &, const GUID_t &) const
 
- Public Member Functions inherited from OpenDDS::DCPS::RcEventHandler
 RcEventHandler ()
 
ACE_Event_Handler::Reference_Count add_reference ()
 
ACE_Event_Handler::Reference_Count remove_reference ()
 
- Public Member Functions inherited from ACE_Event_Handler
virtual ~ACE_Event_Handler (void)
 
virtual ACE_HANDLE get_handle (void) const
 
virtual void set_handle (ACE_HANDLE)
 
virtual int priority (void) const
 
virtual void priority (int priority)
 
virtual int handle_input (ACE_HANDLE fd=ACE_INVALID_HANDLE)
 
virtual int handle_output (ACE_HANDLE fd=ACE_INVALID_HANDLE)
 
virtual int handle_exit (ACE_Process *)
 
virtual int handle_signal (int signum, siginfo_t *=0, ucontext_t *=0)
 
virtual int resume_handler (void)
 
virtual int handle_qos (ACE_HANDLE=ACE_INVALID_HANDLE)
 
virtual int handle_group_qos (ACE_HANDLE=ACE_INVALID_HANDLE)
 
virtual void reactor (ACE_Reactor *reactor)
 
virtual ACE_Reactorreactor (void) const
 
virtual ACE_Reactor_Timer_Interfacereactor_timer_interface (void) const
 
Reference_Counting_Policyreference_counting_policy (void)
 
- Public Member Functions inherited from OpenDDS::DCPS::RcObject
virtual ~RcObject ()
 
virtual void _add_ref ()
 
virtual void _remove_ref ()
 
long ref_count () const
 
WeakObject_get_weak_object () const
 

Protected Member Functions

virtual void stop_i ()
 
- Protected Member Functions inherited from OpenDDS::DCPS::DataLink
int start (const TransportSendStrategy_rch &send_strategy, const TransportStrategy_rch &receive_strategy, bool invoke_all=true)
 
void send_start_i ()
 
virtual void send_i (TransportQueueElement *element, bool relink=true)
 
void send_stop_i (GUID_t repoId)
 
GUIDSeqpeer_ids (const GUID_t &local_id) const
 
void network_change () const
 
void replay_durable_data (const GUID_t &local_pub_id, const GUID_t &remote_sub_id) const
 
TransportSendStrategy_rch get_send_strategy ()
 
typedef OPENDDS_MAP_CMP (GUID_t, TransportClient_wrch, GUID_tKeyLessThan) RepoToClientMap
 
typedef OPENDDS_MAP_CMP (GUID_t, RepoToClientMap, GUID_tKeyLessThan) OnStartCallbackMap
 
typedef OPENDDS_MAP_CMP (GUID_t, RepoIdSet, GUID_tKeyLessThan) PendingOnStartsMap
 
- Protected Member Functions inherited from ACE_Event_Handler
 ACE_Event_Handler (ACE_Reactor *=0, int priority=ACE_Event_Handler::LO_PRIORITY)
 
- Protected Member Functions inherited from OpenDDS::DCPS::RcObject
 RcObject ()
 

Protected Attributes

bool active_
 
ReactorTask_rch reactor_task_
 
UdpSendStrategy_rch send_strategy_
 
UdpReceiveStrategy_rch recv_strategy_
 
- Protected Attributes inherited from OpenDDS::DCPS::DataLink
TransportStrategy_rch receive_strategy_
 The transport receive strategy object for this DataLink. More...
 
TransportSendStrategy_rch send_strategy_
 The transport send strategy object for this DataLink. More...
 
LockType strategy_lock_
 
OnStartCallbackMap on_start_callbacks_
 
PendingOnStartsMap pending_on_starts_
 
TimeDuration datalink_release_delay_
 
unique_ptr< MessageBlockAllocatormb_allocator_
 
unique_ptr< DataBlockAllocatordb_allocator_
 
bool is_loopback_
 Is remote attached to same transport ? More...
 
bool is_active_
 Is pub or sub ? More...
 
bool started_
 
SendResponseListener send_response_listener_
 Listener for TransportSendControlElements created in send_control. More...
 
Interceptor interceptor_
 
- Protected Attributes inherited from ACE_Event_Handler
Atomic_Reference_Count reference_count_
 

Private Attributes

ACE_INET_Addr remote_address_
 
ACE_SOCK_Dgram socket_
 

Additional Inherited Members

- Public Types inherited from OpenDDS::DCPS::DataLink
enum  ConnectionNotice { DISCONNECTED, RECONNECTED, LOST }
 
typedef WeakRcHandle< TransportClientTransportClient_wrch
 
typedef std::pair< TransportClient_wrch, GUID_tOnStartCallback
 
- Public Types inherited from ACE_Event_Handler
typedef long Reference_Count
 
- Static Public Member Functions inherited from ACE_Event_Handler
static ACE_THR_FUNC_RETURN read_adapter (void *event_handler)
 
static int register_stdin_handler (ACE_Event_Handler *eh, ACE_Reactor *reactor, ACE_Thread_Manager *thr_mgr, int flags=THR_DETACHED)
 
static int remove_stdin_handler (ACE_Reactor *reactor, ACE_Thread_Manager *thr_mgr)
 
- Public Attributes inherited from ACE_Event_Handler
 LO_PRIORITY
 
 HI_PRIORITY
 
 NULL_MASK
 
 READ_MASK
 
 WRITE_MASK
 
 EXCEPT_MASK
 
 ACCEPT_MASK
 
 CONNECT_MASK
 
 TIMER_MASK
 
 QOS_MASK
 
 GROUP_QOS_MASK
 
 SIGNAL_MASK
 
 ALL_EVENTS_MASK
 
 RWE_MASK
 
 DONT_CALL
 
 ACE_EVENT_HANDLER_NOT_RESUMED
 
 ACE_REACTOR_RESUMES_HANDLER
 
 ACE_APPLICATION_RESUMES_HANDLER
 
- Protected Types inherited from OpenDDS::DCPS::DataLink
typedef ACE_Guard< LockTypeGuardType
 
- Protected Types inherited from ACE_Event_Handler
typedef ACE_Atomic_Op< ACE_SYNCH_MUTEX, Reference_CountAtomic_Reference_Count
 
- Static Protected Member Functions inherited from OpenDDS::DCPS::DataLink
static ACE_UINT64 get_next_datalink_id ()
 Used to provide unique Ids to all DataLink methods. More...
 

Detailed Description

Definition at line 35 of file UdpDataLink.h.

Constructor & Destructor Documentation

◆ UdpDataLink()

OpenDDS::DCPS::UdpDataLink::UdpDataLink ( const UdpTransport_rch transport,
Priority  priority,
const ReactorTask_rch reactor_task,
bool  active 
)

Definition at line 30 of file UdpDataLink.cpp.

34  : DataLink(transport,
35  priority,
36  false, // is_loopback,
37  active),// is_active
38  active_(active),
40  send_strategy_(make_rch<UdpSendStrategy>(this)),
41  recv_strategy_(make_rch<UdpReceiveStrategy>(this))
42 {
43 }
ReactorTask_rch reactor_task()
Definition: UdpDataLink.inl:21
UdpReceiveStrategy_rch recv_strategy_
Definition: UdpDataLink.h:64
DataLink(const TransportImpl_rch &impl, Priority priority, bool is_loopback, bool is_active)
Only called by our TransportImpl object.
Definition: DataLink.cpp:42
virtual int priority(void) const
UdpSendStrategy_rch send_strategy_
Definition: UdpDataLink.h:63
ReactorTask_rch reactor_task_
Definition: UdpDataLink.h:61

Member Function Documentation

◆ active()

ACE_INLINE bool OpenDDS::DCPS::UdpDataLink::active ( void  ) const

Definition at line 14 of file UdpDataLink.inl.

References ACE_INLINE, and active_.

15 {
16  return this->active_;
17 }

◆ control_received()

void OpenDDS::DCPS::UdpDataLink::control_received ( ReceivedDataSample sample,
const ACE_INET_Addr remote_address 
)

Definition at line 276 of file UdpDataLink.cpp.

References OpenDDS::DCPS::DataLink::impl(), and OPENDDS_TEST_AND_CALL.

Referenced by OpenDDS::DCPS::UdpReceiveStrategy::deliver_sample().

278 {
279  // At this time, the TRANSPORT_CONTROL messages in Udp are only used for
280  // the connection handshaking, so receiving one is an indication of the
281  // passive_connection event. In the future the submessage_id_ could be used
282  // to allow different types of messages here.
283  OPENDDS_TEST_AND_CALL(UdpTransport_rch, dynamic_rchandle_cast<UdpTransport>(impl()), passive_connection(remote_address, sample));
284 }
TransportImpl_rch impl() const
Definition: DataLink.cpp:105
#define OPENDDS_TEST_AND_CALL(TYPE, TEST, CALL)
Definition: Definitions.h:69
UdpTransport_rch UdpTransport_rch
Definition: UdpDataLink.h:32

◆ get_reactor()

ACE_INLINE ACE_Reactor * OpenDDS::DCPS::UdpDataLink::get_reactor ( void  )

Definition at line 27 of file UdpDataLink.inl.

References ACE_INLINE, OpenDDS::DCPS::ReactorTask::get_reactor(), and reactor_task_.

Referenced by OpenDDS::DCPS::UdpReceiveStrategy::start_i(), and OpenDDS::DCPS::UdpReceiveStrategy::stop_i().

28 {
29  if (this->reactor_task_ == 0) return 0;
30  return this->reactor_task_->get_reactor();
31 }
ReactorTask_rch reactor_task_
Definition: UdpDataLink.h:61
ACE_Reactor * get_reactor()
Definition: ReactorTask.inl:14

◆ open()

bool OpenDDS::DCPS::UdpDataLink::open ( const ACE_INET_Addr remote_address)

Definition at line 46 of file UdpDataLink.cpp.

References ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY, ACE_ERROR_RETURN, ACE_NEW_RETURN, ACE_TEXT(), active_, OpenDDS::DCPS::DirectPriorityMapper::codepoint(), OpenDDS::DCPS::CONNINFO_ALL, ACE_Message_Block::cont(), ACE_IPC_SAP::control(), OpenDDS::DCPS::TransportLocator::data, OpenDDS::DCPS::dynamic_rchandle_cast(), ENOTSUP, OpenDDS::DCPS::get_fully_qualified_hostname(), ACE_SOCK::get_local_addr(), OpenDDS::DCPS::TransportHeader::get_max_serialized_size(), OpenDDS::DCPS::DataSampleHeader::get_max_serialized_size(), ACE_INET_Addr::get_port_number(), ACE_Addr::get_type(), hostname(), OpenDDS::DCPS::DataLink::impl(), OpenDDS::DCPS::DataLink::is_loopback_, OpenDDS::DCPS::Encoding::KIND_UNALIGNED_CDR, ACE_Message_Block::length(), OpenDDS::DCPS::TransportHeader::length_, LM_DEBUG, LM_ERROR, OpenDDS::DCPS::MAX_SEND_BLOCKS, ACE_Time_Value::max_time, ACE_Message_Block::MB_DATA, OpenDDS::DCPS::TransportSendStrategy::mb_to_iov(), OpenDDS::DCPS::DataSampleHeader::message_id_, OpenDDS::DCPS::DataSampleHeader::message_length_, OpenDDS::DCPS::open_appropriate_socket_type(), OPENDDS_TEST_AND_CALL, ACE_SOCK_Dgram::recv(), recv_strategy_, ACE_Message_Block::release(), remote_address(), remote_address_, ACE_SOCK_Dgram::send(), send_strategy_, ACE_INET_Addr::set(), OpenDDS::DCPS::DataLink::set_dscp_codepoint(), ACE_SOCK::set_option(), SO_RCVBUF, SO_SNDBUF, socket(), socket_, SOL_SOCKET, OpenDDS::DCPS::DataLink::start(), stop_i(), OpenDDS::DCPS::TRANSPORT_CONTROL, OpenDDS::DCPS::DataLink::transport_priority(), OpenDDS::DCPS::TimeDuration::value(), VDBG, VDBG_LVL, and ACE_Time_Value::zero.

Referenced by OpenDDS::DCPS::UdpTransport::make_datalink().

47 {
49 
50  UdpInst_rch cfg = dynamic_rchandle_cast<UdpTransport>(impl())->config();
51  if (!cfg) {
52  return false;
53  }
54 
55  this->is_loopback_ = this->remote_address_ == cfg->local_address();
56 
57  ACE_INET_Addr local_address;
58  if (this->active_) {
59  if (local_address.get_type() != remote_address.get_type()) {
60  local_address.set(0, "", 0, remote_address.get_type());
61  }
62  } else {
63  local_address = cfg->local_address();
64  }
65 
66  if (!open_appropriate_socket_type(this->socket_, local_address)) {
67  ACE_ERROR_RETURN((LM_ERROR,
68  ACE_TEXT("(%P|%t) ERROR: ")
69  ACE_TEXT("UdpDataLink::open: open_appropriate_socket_type failed\n")),
70  false);
71  }
72 
73  VDBG((LM_DEBUG, "(%P|%t) UdpDataLink::open: listening on %C\n",
74  LogAddr(local_address).c_str()));
75 
76  // If listening on "any" host/port, need to record the actual port number
77  // selected by the OS, as well as our actual hostname, into the config_
78  // object's local_address_ for use in UdpTransport::connection_info_i().
79  if (!this->active_ && cfg->local_address().is_any()) {
80  ACE_INET_Addr address;
81  if (this->socket_.get_local_addr(address) != 0) {
82  ACE_ERROR_RETURN((LM_ERROR,
83  ACE_TEXT("(%P|%t) ERROR: UdpDataLink::open - %p"),
84  ACE_TEXT("cannot get local addr\n")), false);
85  }
86  const unsigned short port = address.get_port_number();
87  const std::string hostname = get_fully_qualified_hostname();
88  VDBG_LVL((LM_DEBUG,
89  ACE_TEXT("(%P|%t) UdpDataLink::open listening on host %C:%hu\n"),
90  hostname.c_str(), port), 2);
91 
92  cfg->local_address(port, hostname.c_str());
93 
94  // Similar case to the "if" case above, but with a bound host/IP but no port
95  } else if (!this->active_ &&
96  0 == cfg->local_address().get_port_number()) {
97  ACE_INET_Addr address;
98  if (this->socket_.get_local_addr(address) != 0) {
99  ACE_ERROR_RETURN((LM_ERROR,
100  ACE_TEXT("(%P|%t) ERROR: UdpDataLink::open - %p"),
101  ACE_TEXT("cannot get local addr\n")), false);
102  }
103  const unsigned short port = address.get_port_number();
104  VDBG_LVL((LM_DEBUG,
105  ACE_TEXT("(%P|%t) UdpDataLink::open listening on port %hu\n"),
106  port), 2);
107  cfg->local_address_set_port(port);
108  }
109 
110  if (cfg->send_buffer_size_ > 0) {
111  int snd_size = cfg->send_buffer_size_;
112  if (this->socket_.set_option(SOL_SOCKET,
113  SO_SNDBUF,
114  (void *) &snd_size,
115  sizeof(snd_size)) < 0
116  && errno != ENOTSUP) {
117  ACE_ERROR_RETURN((LM_ERROR,
118  ACE_TEXT("(%P|%t) ERROR: ")
119  ACE_TEXT("UdpDataLink::open: failed to set the send buffer size to %d errno %m\n"),
120  snd_size),
121  false);
122  }
123  }
124 
125  if (cfg->rcv_buffer_size_ > 0) {
126  int rcv_size = cfg->rcv_buffer_size_;
127  if (this->socket_.set_option(SOL_SOCKET,
128  SO_RCVBUF,
129  (void *) &rcv_size,
130  sizeof(int)) < 0
131  && errno != ENOTSUP) {
132  ACE_ERROR_RETURN((LM_ERROR,
133  ACE_TEXT("(%P|%t) ERROR: ")
134  ACE_TEXT("UdpDataLink::open: failed to set the receive buffer size to %d errno %m\n"),
135  rcv_size),
136  false);
137  }
138  }
139 
140 #ifdef ACE_WIN32
141  // By default Winsock will cause reads to fail with "connection reset"
142  // when UDP sends result in ICMP "port unreachable" messages.
143  // The transport framework is not set up for this since returning <= 0
144  // from our receive_bytes causes the framework to close down the datalink
145  // which in this case is used to receive from multiple peers.
146  BOOL recv_udp_connreset = FALSE;
147  socket_.control(SIO_UDP_CONNRESET, &recv_udp_connreset);
148 #endif
149 
150  if (this->active_) {
151  // Set the DiffServ codepoint according to the priority value.
152  DirectPriorityMapper mapper(this->transport_priority());
153  this->set_dscp_codepoint(mapper.codepoint(), this->socket_);
154 
155 
156  // For the active side, send the blob and wait for a 1 byte ack.
157  VDBG((LM_DEBUG, "(%P|%t) UdpDataLink::open: active connect to %C\n",
158  LogAddr(remote_address).c_str()));
159 
160  TransportLocator info = TransportLocator();
161  OPENDDS_TEST_AND_CALL(TransportImpl_rch, impl(), connection_info_i(info, CONNINFO_ALL));
162  ACE_Message_Block* data_block;
163  ACE_NEW_RETURN(data_block,
164  ACE_Message_Block(info.data.length()+sizeof(Priority),
166  0, //cont
167  0, //data
168  0, //allocator_strategy
169  0, //locking_strategy
170  ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
173  0,
174  0),
175  0);
176 
177  Serializer serializer(data_block, Encoding::KIND_UNALIGNED_CDR);
178  serializer << this->transport_priority();
179  serializer.write_octet_array(info.data.get_buffer(),
180  info.data.length());
181 
182  DataSampleHeader sample_header;
183  sample_header.message_id_ = TRANSPORT_CONTROL;
184  sample_header.message_length_ =
185  static_cast<ACE_UINT32>(data_block->length());
186  ACE_Message_Block* sample_header_block;
187  ACE_NEW_RETURN(sample_header_block,
190  0, //cont
191  0, //data
192  0, //allocator_strategy
193  0, //locking_strategy
194  ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
197  0,
198  0),
199  0);
200  *sample_header_block << sample_header;
201  sample_header_block->cont(data_block);
202 
203  ACE_Message_Block* transport_header_block;
204  TransportHeader transport_header;
205  ACE_NEW_RETURN(transport_header_block,
208  0,
209  0,
210  0,
211  0,
212  ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
215  0,
216  0),
217  0);
218 
219  transport_header.length_ =
220  static_cast<ACE_UINT32>(data_block->length() +
221  sample_header_block->length());
222  *transport_header_block << transport_header;
223  transport_header_block->cont(sample_header_block);
224 
225  iovec iov[MAX_SEND_BLOCKS];
226  const int num_blocks =
227  TransportSendStrategy::mb_to_iov(*transport_header_block, iov);
228  const ssize_t sent = socket().send(iov, num_blocks, remote_address);
229  transport_header_block->release();
230  if (sent < 0) {
231  ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: UdpDataLink::open: ")
232  ACE_TEXT("failed to send handshake %m\n")),
233  false);
234  }
235 
236  // Need to wait for the 1 byte ack from the passive side before returning
237  // the link (and indicating success).
238  const size_t size = 32;
239  char buff[size];
240  // Default this timeout to 30. We may want to make this settable
241  // or use another settable timeout value here.
242  const TimeDuration tv(30);
243  const ssize_t recvd = socket().recv(buff, size, this->remote_address_, 0, &tv.value());
244  if (recvd == 1) {
245  // Expected value
246  VDBG_LVL((LM_DEBUG,
247  ACE_TEXT("(%P|%t) UdpDataLink::open received handshake ack\n")),
248  2);
249  } else if (recvd < 0) {
250  // Not a handshake ack, something is wrong
251  ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: UdpDataLink::open: ")
252  ACE_TEXT("failed to receive handshake ack %p\n"),
253  ACE_TEXT("recv")), false);
254  } else {
255  ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: UdpDataLink::open: ")
256  ACE_TEXT("failed to receive handshake ack ")
257  ACE_TEXT("recv returned %b\n"), recvd),
258  false);
259  }
260  }
261 
262  if (start(static_rchandle_cast<TransportSendStrategy>(this->send_strategy_),
263  static_rchandle_cast<TransportStrategy>(this->recv_strategy_))
264  != 0) {
265  stop_i();
266  ACE_ERROR_RETURN((LM_ERROR,
267  ACE_TEXT("(%P|%t) ERROR: ")
268  ACE_TEXT("UdpDataLink::open: start failed!\n")),
269  false);
270  }
271 
272  return true;
273 }
RcHandle< TransportImpl > TransportImpl_rch
The type definition for the smart-pointer to the underlying type.
static const ACE_Time_Value max_time
#define ENOTSUP
ACE_SOCK_Dgram & socket()
Definition: UdpDataLink.inl:40
ssize_t recv(void *buf, size_t n, ACE_Addr &addr, int flags=0) const
size_t length(void) const
bool is_loopback_
Is remote attached to same transport ?
Definition: DataLink.h:461
void set_dscp_codepoint(int cp, ACE_SOCK &socket)
Definition: DataLink.cpp:1115
int ssize_t
int start(const TransportSendStrategy_rch &send_strategy, const TransportStrategy_rch &receive_strategy, bool invoke_all=true)
Definition: DataLink.inl:212
int set_option(int level, int option, void *optval, int optlen) const
int get_type(void) const
ACE_INET_Addr remote_address_
Definition: UdpDataLink.h:69
UdpReceiveStrategy_rch recv_strategy_
Definition: UdpDataLink.h:64
TransportImpl_rch impl() const
Definition: DataLink.cpp:105
ACE_SOCK_Dgram socket_
Definition: UdpDataLink.h:71
Priority & transport_priority()
Definition: DataLink.inl:21
static int mb_to_iov(const ACE_Message_Block &msg, iovec *iov)
#define VDBG(DBG_ARGS)
int hostname(char name[], size_t maxnamelen)
virtual ACE_Message_Block * release(void)
int control(int cmd, void *) const
static size_t get_max_serialized_size()
Similar to IDL compiler generated methods.
ACE_Message_Block * cont(void) const
int get_local_addr(ACE_Addr &) const
RcHandle< T > dynamic_rchandle_cast(const RcHandle< U > &h)
Definition: RcHandle_T.h:214
ACE_CDR::Long Priority
int set(const ACE_INET_Addr &)
ACE_TEXT("TCP_Factory")
static size_t get_max_serialized_size()
Similar to IDL compiler generated methods.
#define OPENDDS_TEST_AND_CALL(TYPE, TEST, CALL)
Definition: Definitions.h:69
u_short get_port_number(void) const
static const ACE_Time_Value zero
#define VDBG_LVL(DBG_ARGS, LEVEL)
#define ACE_NEW_RETURN(POINTER, CONSTRUCTOR, RET_VAL)
UdpSendStrategy_rch send_strategy_
Definition: UdpDataLink.h:63
ACE_INET_Addr & remote_address()
Definition: UdpDataLink.inl:34
RcHandle< UdpInst > UdpInst_rch
Definition: UdpInst_rch.h:18
#define ACE_ERROR_RETURN(X, Y)
ssize_t send(const void *buf, size_t n, const ACE_Addr &addr, int flags=0) const
String get_fully_qualified_hostname(ACE_INET_Addr *addr)
static const ConnectionInfoFlags CONNINFO_ALL
bool open_appropriate_socket_type(ACE_SOCK_Dgram &socket, const ACE_INET_Addr &local_address, int *proto_family)

◆ reactor_task()

ACE_INLINE ReactorTask_rch OpenDDS::DCPS::UdpDataLink::reactor_task ( )

Definition at line 21 of file UdpDataLink.inl.

References ACE_INLINE, and reactor_task_.

22 {
23  return this->reactor_task_;
24 }
ReactorTask_rch reactor_task_
Definition: UdpDataLink.h:61

◆ remote_address()

ACE_INLINE ACE_INET_Addr & OpenDDS::DCPS::UdpDataLink::remote_address ( )

Definition at line 34 of file UdpDataLink.inl.

References ACE_INLINE, and remote_address_.

Referenced by open(), and OpenDDS::DCPS::UdpSendStrategy::send_bytes_i().

35 {
36  return this->remote_address_;
37 }
ACE_INET_Addr remote_address_
Definition: UdpDataLink.h:69

◆ socket()

ACE_INLINE ACE_SOCK_Dgram & OpenDDS::DCPS::UdpDataLink::socket ( void  )

◆ stop_i()

void OpenDDS::DCPS::UdpDataLink::stop_i ( )
protectedvirtual

This announces the "stop" event to our subclass. The "stop" event will occur when this DataLink is handling a release_reservations() call and determines that it has just released all of the remaining reservations on this DataLink. The "stop" event will also occur when the TransportImpl is being shutdown() - we call stop_i() from our transport_shutdown() method to handle this case.

Reimplemented from OpenDDS::DCPS::DataLink.

Definition at line 287 of file UdpDataLink.cpp.

References ACE_SOCK::close(), OPENDDS_END_VERSIONED_NAMESPACE_DECL, and socket_.

Referenced by open().

288 {
289  this->socket_.close();
290 }
ACE_SOCK_Dgram socket_
Definition: UdpDataLink.h:71
int close(void)

Member Data Documentation

◆ active_

bool OpenDDS::DCPS::UdpDataLink::active_
protected

Definition at line 59 of file UdpDataLink.h.

Referenced by active(), and open().

◆ reactor_task_

ReactorTask_rch OpenDDS::DCPS::UdpDataLink::reactor_task_
protected

Definition at line 61 of file UdpDataLink.h.

Referenced by get_reactor(), and reactor_task().

◆ recv_strategy_

UdpReceiveStrategy_rch OpenDDS::DCPS::UdpDataLink::recv_strategy_
protected

Definition at line 64 of file UdpDataLink.h.

Referenced by open().

◆ remote_address_

ACE_INET_Addr OpenDDS::DCPS::UdpDataLink::remote_address_
private

Definition at line 69 of file UdpDataLink.h.

Referenced by open(), and remote_address().

◆ send_strategy_

UdpSendStrategy_rch OpenDDS::DCPS::UdpDataLink::send_strategy_
protected

Definition at line 63 of file UdpDataLink.h.

Referenced by open().

◆ socket_

ACE_SOCK_Dgram OpenDDS::DCPS::UdpDataLink::socket_
private

Definition at line 71 of file UdpDataLink.h.

Referenced by open(), socket(), and stop_i().


The documentation for this class was generated from the following files: