OpenDDS  Snapshot(2023/04/07-19:43)
Public Member Functions | Private Member Functions | Private Attributes | List of all members
OpenDDS::DCPS::MulticastDataLink Class Reference

#include <MulticastDataLink.h>

Inheritance diagram for OpenDDS::DCPS::MulticastDataLink:
Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::MulticastDataLink:
Collaboration graph
[legend]

Public Member Functions

 MulticastDataLink (const MulticastTransport_rch &transport, const MulticastSessionFactory_rch &session_factory, MulticastPeer local_peer, const MulticastInst_rch &config, const ReactorTask_rch &reactor_task, bool is_active)
 
virtual ~MulticastDataLink ()
 
MulticastTransport_rch transport ()
 
MulticastPeer local_peer () const
 
MulticastSendStrategysend_strategy ()
 
MulticastReceiveStrategyreceive_strategy ()
 
SingleSendBuffersend_buffer ()
 
MulticastInst_rch config ()
 
ReactorTask_rch reactor_task ()
 
ACE_Reactorget_reactor ()
 
ACE_Proactorget_proactor ()
 
ACE_SOCK_Dgram_Mcastsocket ()
 
bool join (const ACE_INET_Addr &group_address)
 
MulticastSession_rch find_or_create_session (MulticastPeer remote_peer)
 
MulticastSession_rch find_session (MulticastPeer remote_peer)
 
bool check_header (const TransportHeader &header)
 
bool check_header (const DataSampleHeader &header)
 
void sample_received (ReceivedDataSample &sample)
 
bool reassemble (ReceivedDataSample &data, const TransportHeader &header)
 
int make_reservation (const GUID_t &remote_publication_id, const GUID_t &local_subscription_id, const TransportReceiveListener_wrch &receive_listener, bool reliable)
 
void release_reservations_i (const GUID_t &remote_id, const GUID_t &local_id)
 
void client_stop (const GUID_t &localId)
 
- Public Member Functions inherited from OpenDDS::DCPS::DataLink
 DataLink (const TransportImpl_rch &impl, Priority priority, bool is_loopback, bool is_active)
 Only called by our TransportImpl object. More...
 
virtual ~DataLink ()
 
int handle_exception (ACE_HANDLE)
 Reactor invokes this after being notified in schedule_stop or cancel_release. More...
 
void schedule_stop (const MonotonicTimePoint &schedule_to_stop_at)
 
void stop ()
 The stop method is used to stop the DataLink prior to shutdown. More...
 
void resume_send ()
 
virtual int make_reservation (const GUID_t &remote_subscription_id, const GUID_t &local_publication_id, const TransportSendListener_wrch &send_listener, bool reliable)
 
void release_reservations (GUID_t remote_id, GUID_t local_id, DataLinkSetMap &released_locals)
 
void schedule_delayed_release ()
 
const TimeDurationdatalink_release_delay () const
 
void remove_listener (const GUID_t &local_id)
 
void send_start ()
 
void send (TransportQueueElement *element)
 
void send_stop (GUID_t repoId)
 
virtual RemoveResult remove_sample (const DataSampleElement *sample)
 
virtual void remove_all_msgs (const GUID_t &pub_id)
 
int data_received (ReceivedDataSample &sample, const GUID_t &readerId=GUID_UNKNOWN)
 
void data_received_include (ReceivedDataSample &sample, const RepoIdSet &incl)
 
DataLinkIdType id () const
 Obtain a unique identifier for this DataLink object. More...
 
void transport_shutdown ()
 
void notify (ConnectionNotice notice)
 
virtual void pre_stop_i ()
 
void release_resources ()
 
void terminate_send ()
 
void terminate_send_if_suspended ()
 
bool is_target (const GUID_t &remote_id)
 
void clear_associations ()
 
int handle_timeout (const ACE_Time_Value &tv, const void *arg)
 
int handle_close (ACE_HANDLE h, ACE_Reactor_Mask m)
 
void set_dscp_codepoint (int cp, ACE_SOCK &socket)
 
Prioritytransport_priority ()
 
Priority transport_priority () const
 
bool & is_loopback ()
 
bool is_loopback () const
 
bool & is_active ()
 
bool is_active () const
 
bool cancel_release ()
 
ACE_Message_Blockcreate_control (char submessage_id, DataSampleHeader &header, Message_Block_Ptr data)
 
SendControlStatus send_control (const DataSampleHeader &header, Message_Block_Ptr data)
 
GUIDSeqtarget_intersection (const GUID_t &pub_id, const GUIDSeq &in, size_t &n_subs)
 
TransportImpl_rch impl () const
 
void default_listener (const TransportReceiveListener_wrch &trl)
 
TransportReceiveListener_wrch default_listener () const
 
bool add_on_start_callback (const TransportClient_wrch &client, const GUID_t &remote)
 
void remove_on_start_callback (const TransportClient_wrch &client, const GUID_t &remote)
 
void invoke_on_start_callbacks (bool success)
 
bool invoke_on_start_callbacks (const GUID_t &local, const GUID_t &remote, bool success)
 
void remove_startup_callbacks (const GUID_t &local, const GUID_t &remote)
 
void set_scheduling_release (bool scheduling_release)
 
virtual void send_final_acks (const GUID_t &readerid)
 
virtual WeakRcHandle< ICE::Endpointget_ice_endpoint () const
 
virtual bool is_leading (const GUID_t &, const GUID_t &) const
 
- Public Member Functions inherited from OpenDDS::DCPS::RcEventHandler
 RcEventHandler ()
 
ACE_Event_Handler::Reference_Count add_reference ()
 
ACE_Event_Handler::Reference_Count remove_reference ()
 
- Public Member Functions inherited from ACE_Event_Handler
virtual ~ACE_Event_Handler (void)
 
virtual ACE_HANDLE get_handle (void) const
 
virtual void set_handle (ACE_HANDLE)
 
virtual int priority (void) const
 
virtual void priority (int priority)
 
virtual int handle_input (ACE_HANDLE fd=ACE_INVALID_HANDLE)
 
virtual int handle_output (ACE_HANDLE fd=ACE_INVALID_HANDLE)
 
virtual int handle_exit (ACE_Process *)
 
virtual int handle_signal (int signum, siginfo_t *=0, ucontext_t *=0)
 
virtual int resume_handler (void)
 
virtual int handle_qos (ACE_HANDLE=ACE_INVALID_HANDLE)
 
virtual int handle_group_qos (ACE_HANDLE=ACE_INVALID_HANDLE)
 
virtual void reactor (ACE_Reactor *reactor)
 
virtual ACE_Reactorreactor (void) const
 
virtual ACE_Reactor_Timer_Interfacereactor_timer_interface (void) const
 
Reference_Counting_Policyreference_counting_policy (void)
 
- Public Member Functions inherited from OpenDDS::DCPS::RcObject
virtual ~RcObject ()
 
virtual void _add_ref ()
 
virtual void _remove_ref ()
 
long ref_count () const
 
WeakObject_get_weak_object () const
 

Private Member Functions

typedef OPENDDS_MAP (MulticastPeer, MulticastSession_rch) MulticastSessionMap
 
virtual void stop_i ()
 
void syn_received_no_session (MulticastPeer source, const Message_Block_Ptr &data, bool swap_bytes)
 
void release_remote_i (const GUID_t &remote)
 
bool ready_to_deliver (const ReceivedDataSample &data)
 

Private Attributes

MulticastSessionFactory_rch session_factory_
 
MulticastPeer local_peer_
 
ReactorTask_rch reactor_task_
 
MulticastSendStrategy_rch send_strategy_
 
MulticastReceiveStrategy_rch recv_strategy_
 
unique_ptr< SingleSendBuffersend_buffer_
 
ACE_SOCK_Dgram_Mcast socket_
 
ACE_SYNCH_RECURSIVE_MUTEX session_lock_
 
MulticastSessionMap sessions_
 
RepoIdSet readers_selected_
 
RepoIdSet readers_withheld_
 

Additional Inherited Members

- Public Types inherited from OpenDDS::DCPS::DataLink
enum  ConnectionNotice { DISCONNECTED, RECONNECTED, LOST }
 
typedef WeakRcHandle< TransportClientTransportClient_wrch
 
typedef std::pair< TransportClient_wrch, GUID_tOnStartCallback
 
- Public Types inherited from ACE_Event_Handler
typedef long Reference_Count
 
- Static Public Member Functions inherited from ACE_Event_Handler
static ACE_THR_FUNC_RETURN read_adapter (void *event_handler)
 
static int register_stdin_handler (ACE_Event_Handler *eh, ACE_Reactor *reactor, ACE_Thread_Manager *thr_mgr, int flags=THR_DETACHED)
 
static int remove_stdin_handler (ACE_Reactor *reactor, ACE_Thread_Manager *thr_mgr)
 
- Public Attributes inherited from ACE_Event_Handler
 LO_PRIORITY
 
 HI_PRIORITY
 
 NULL_MASK
 
 READ_MASK
 
 WRITE_MASK
 
 EXCEPT_MASK
 
 ACCEPT_MASK
 
 CONNECT_MASK
 
 TIMER_MASK
 
 QOS_MASK
 
 GROUP_QOS_MASK
 
 SIGNAL_MASK
 
 ALL_EVENTS_MASK
 
 RWE_MASK
 
 DONT_CALL
 
 ACE_EVENT_HANDLER_NOT_RESUMED
 
 ACE_REACTOR_RESUMES_HANDLER
 
 ACE_APPLICATION_RESUMES_HANDLER
 
- Protected Types inherited from OpenDDS::DCPS::DataLink
typedef ACE_Guard< LockTypeGuardType
 
- Protected Types inherited from ACE_Event_Handler
typedef ACE_Atomic_Op< ACE_SYNCH_MUTEX, Reference_CountAtomic_Reference_Count
 
- Protected Member Functions inherited from OpenDDS::DCPS::DataLink
int start (const TransportSendStrategy_rch &send_strategy, const TransportStrategy_rch &receive_strategy, bool invoke_all=true)
 
void send_start_i ()
 
virtual void send_i (TransportQueueElement *element, bool relink=true)
 
void send_stop_i (GUID_t repoId)
 
GUIDSeqpeer_ids (const GUID_t &local_id) const
 
void network_change () const
 
void replay_durable_data (const GUID_t &local_pub_id, const GUID_t &remote_sub_id) const
 
TransportSendStrategy_rch get_send_strategy ()
 
typedef OPENDDS_MAP_CMP (GUID_t, TransportClient_wrch, GUID_tKeyLessThan) RepoToClientMap
 
typedef OPENDDS_MAP_CMP (GUID_t, RepoToClientMap, GUID_tKeyLessThan) OnStartCallbackMap
 
typedef OPENDDS_MAP_CMP (GUID_t, RepoIdSet, GUID_tKeyLessThan) PendingOnStartsMap
 
- Protected Member Functions inherited from ACE_Event_Handler
 ACE_Event_Handler (ACE_Reactor *=0, int priority=ACE_Event_Handler::LO_PRIORITY)
 
- Protected Member Functions inherited from OpenDDS::DCPS::RcObject
 RcObject ()
 
- Static Protected Member Functions inherited from OpenDDS::DCPS::DataLink
static ACE_UINT64 get_next_datalink_id ()
 Used to provide unique Ids to all DataLink methods. More...
 
- Protected Attributes inherited from OpenDDS::DCPS::DataLink
TransportStrategy_rch receive_strategy_
 The transport receive strategy object for this DataLink. More...
 
TransportSendStrategy_rch send_strategy_
 The transport send strategy object for this DataLink. More...
 
LockType strategy_lock_
 
OnStartCallbackMap on_start_callbacks_
 
PendingOnStartsMap pending_on_starts_
 
TimeDuration datalink_release_delay_
 
unique_ptr< MessageBlockAllocatormb_allocator_
 
unique_ptr< DataBlockAllocatordb_allocator_
 
bool is_loopback_
 Is remote attached to same transport ? More...
 
bool is_active_
 Is pub or sub ? More...
 
bool started_
 
SendResponseListener send_response_listener_
 Listener for TransportSendControlElements created in send_control. More...
 
Interceptor interceptor_
 
- Protected Attributes inherited from ACE_Event_Handler
Atomic_Reference_Count reference_count_
 

Detailed Description

Definition at line 34 of file MulticastDataLink.h.

Constructor & Destructor Documentation

◆ MulticastDataLink()

OpenDDS::DCPS::MulticastDataLink::MulticastDataLink ( const MulticastTransport_rch transport,
const MulticastSessionFactory_rch session_factory,
MulticastPeer  local_peer,
const MulticastInst_rch config,
const ReactorTask_rch reactor_task,
bool  is_active 
)

Definition at line 41 of file MulticastDataLink.cpp.

References OpenDDS::DCPS::DEFAULT_CONFIG_MAX_SAMPLES_PER_PACKET, OpenDDS::DCPS::MulticastInst::DEFAULT_NAK_DEPTH, OpenDDS::DCPS::MulticastSessionFactory::requires_send_buffer(), OpenDDS::DCPS::TransportSendStrategy::send_buffer(), send_buffer_, send_strategy_, and session_factory_.

47 : DataLink(transport, 0 /*priority*/, false /*loopback*/, is_active),
48  session_factory_(session_factory),
51  send_strategy_(make_rch<MulticastSendStrategy>(this)),
52  recv_strategy_(make_rch<MulticastReceiveStrategy>(this))
53 {
54  // A send buffer may be bound to the send strategy to ensure a
55  // configured number of most-recent datagrams are retained:
57  const size_t nak_depth = config ? config->nak_depth_ : MulticastInst::DEFAULT_NAK_DEPTH;
58  const size_t default_max_samples = DEFAULT_CONFIG_MAX_SAMPLES_PER_PACKET;
59  const size_t max_samples_per_packet = config ? config->max_samples_per_packet_ : default_max_samples;
60  send_buffer_.reset(new SingleSendBuffer(nak_depth, max_samples_per_packet));
62  }
63 }
void send_buffer(TransportSendBuffer *send_buffer)
Assigns an optional send buffer.
MulticastTransport_rch transport()
MulticastSessionFactory_rch session_factory_
MulticastSendStrategy_rch send_strategy_
DataLink(const TransportImpl_rch &impl, Priority priority, bool is_loopback, bool is_active)
Only called by our TransportImpl object.
Definition: DataLink.cpp:42
MulticastReceiveStrategy_rch recv_strategy_
unique_ptr< SingleSendBuffer > send_buffer_
static const size_t DEFAULT_NAK_DEPTH
Definition: MulticastInst.h:30
virtual int requires_send_buffer() const =0

◆ ~MulticastDataLink()

OpenDDS::DCPS::MulticastDataLink::~MulticastDataLink ( )
virtual

Definition at line 65 of file MulticastDataLink.cpp.

References OpenDDS::DCPS::TransportSendStrategy::send_buffer(), send_buffer_, and send_strategy_.

66 {
67  if (send_buffer_) {
69  }
70 }
void send_buffer(TransportSendBuffer *send_buffer)
Assigns an optional send buffer.
MulticastSendStrategy_rch send_strategy_
unique_ptr< SingleSendBuffer > send_buffer_

Member Function Documentation

◆ check_header() [1/2]

bool OpenDDS::DCPS::MulticastDataLink::check_header ( const TransportHeader header)

Definition at line 208 of file MulticastDataLink.cpp.

References ACE_GUARD_RETURN, ACE_SYNCH_RECURSIVE_MUTEX, OpenDDS::DCPS::DataLink::is_active(), session_lock_, sessions_, and OpenDDS::DCPS::TransportHeader::source_.

Referenced by OpenDDS::DCPS::MulticastReceiveStrategy::check_header().

209 {
210  ACE_GUARD_RETURN(ACE_SYNCH_RECURSIVE_MUTEX,
211  guard,
212  this->session_lock_,
213  false);
214 
215  MulticastSessionMap::iterator it(this->sessions_.find(header.source_));
216  if (it == this->sessions_.end() && is_active()) {
217  return false;
218  }
219  if (it != this->sessions_.end() && it->second->acked()) {
220  return it->second->check_header(header);
221  }
222 
223  return true;
224 }
Christopher Diggins *renamed files *fixing compilation errors *adding Visual C project file *removed make Max Lybbert *removed references to missing and unused header
Definition: CHANGELOG.txt:8
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
ACE_SYNCH_RECURSIVE_MUTEX session_lock_

◆ check_header() [2/2]

bool OpenDDS::DCPS::MulticastDataLink::check_header ( const DataSampleHeader header)

Definition at line 227 of file MulticastDataLink.cpp.

References ACE_GUARD_RETURN, ACE_SYNCH_RECURSIVE_MUTEX, OpenDDS::DCPS::DataSampleHeader::message_id_, receive_strategy(), session_lock_, sessions_, and OpenDDS::DCPS::TRANSPORT_CONTROL.

228 {
229  if (header.message_id_ == TRANSPORT_CONTROL) return true;
230 
231  ACE_GUARD_RETURN(ACE_SYNCH_RECURSIVE_MUTEX,
232  guard,
233  this->session_lock_,
234  false);
235 
236  // Skip data sample unless there is a session for it.
237  return (this->sessions_.count(receive_strategy()->received_header().source_) > 0);
238 }
Christopher Diggins *renamed files *fixing compilation errors *adding Visual C project file *removed make Max Lybbert *removed references to missing and unused header
Definition: CHANGELOG.txt:8
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
MulticastReceiveStrategy * receive_strategy()
ACE_SYNCH_RECURSIVE_MUTEX session_lock_

◆ client_stop()

void OpenDDS::DCPS::MulticastDataLink::client_stop ( const GUID_t localId)

Definition at line 461 of file MulticastDataLink.cpp.

References OPENDDS_END_VERSIONED_NAMESPACE_DECL, and send_buffer_.

462 {
463  if (send_buffer_) {
464  send_buffer_->retain_all(localId);
465  send_buffer_.reset();
466  }
467 }
unique_ptr< SingleSendBuffer > send_buffer_

◆ config()

ACE_INLINE MulticastInst_rch OpenDDS::DCPS::MulticastDataLink::config ( )

◆ find_or_create_session()

MulticastSession_rch OpenDDS::DCPS::MulticastDataLink::find_or_create_session ( MulticastPeer  remote_peer)

Definition at line 166 of file MulticastDataLink.cpp.

References ACE_ERROR_RETURN, ACE_GUARD_RETURN, ACE_SYNCH_RECURSIVE_MUTEX, ACE_TEXT(), OpenDDS::DCPS::MulticastSessionFactory::create(), OpenDDS::DCPS::RcHandle< T >::is_nil(), LM_ERROR, session_factory_, session_lock_, sessions_, and transport().

167 {
168  ACE_GUARD_RETURN(ACE_SYNCH_RECURSIVE_MUTEX,
169  guard,
170  this->session_lock_,
172 
173  MulticastSessionMap::iterator it(this->sessions_.find(remote_peer));
174  if (it != this->sessions_.end()) {
175  return it->second;
176  }
177 
178  MulticastSession_rch session;
180  if (mt) {
181  session = session_factory_->create(mt->reactor_task()->interceptor(), this, remote_peer);
182  if (session.is_nil()) {
183  ACE_ERROR_RETURN((LM_ERROR,
184  ACE_TEXT("(%P|%t) ERROR: ")
185  ACE_TEXT("MulticastDataLink::find_or_create_session: ")
186  ACE_TEXT("failed to create session for remote peer: %#08x%08x!\n"),
187  (unsigned int) (remote_peer >> 32),
188  (unsigned int) remote_peer),
190  }
191 
192  std::pair<MulticastSessionMap::iterator, bool> pair = this->sessions_.insert(
193  MulticastSessionMap::value_type(remote_peer, session));
194  if (pair.first == this->sessions_.end()) {
195  ACE_ERROR_RETURN((LM_ERROR,
196  ACE_TEXT("(%P|%t) ERROR: ")
197  ACE_TEXT("MulticastDataLink::find_or_create_session: ")
198  ACE_TEXT("failed to insert session for remote peer: %#08x%08x!\n"),
199  (unsigned int) (remote_peer >> 32),
200  (unsigned int) remote_peer),
202  }
203  }
204  return session;
205 }
virtual MulticastSession_rch create(RcHandle< ReactorInterceptor > interceptor, MulticastDataLink *link, MulticastPeer remote_peer)=0
MulticastTransport_rch transport()
MulticastSessionFactory_rch session_factory_
RcHandle< MulticastSession > MulticastSession_rch
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
ACE_TEXT("TCP_Factory")
RcHandle< MulticastTransport > MulticastTransport_rch
#define ACE_ERROR_RETURN(X, Y)
ACE_SYNCH_RECURSIVE_MUTEX session_lock_

◆ find_session()

MulticastSession_rch OpenDDS::DCPS::MulticastDataLink::find_session ( MulticastPeer  remote_peer)

Definition at line 151 of file MulticastDataLink.cpp.

References ACE_GUARD_RETURN, ACE_SYNCH_RECURSIVE_MUTEX, session_lock_, and sessions_.

Referenced by make_reservation(), and release_reservations_i().

152 {
153  ACE_GUARD_RETURN(ACE_SYNCH_RECURSIVE_MUTEX,
154  guard,
155  this->session_lock_,
157 
158  MulticastSessionMap::iterator it(this->sessions_.find(remote_peer));
159  if (it != this->sessions_.end()) {
160  return it->second;
161  }
162  else return MulticastSession_rch();
163 }
RcHandle< MulticastSession > MulticastSession_rch
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
ACE_SYNCH_RECURSIVE_MUTEX session_lock_

◆ get_proactor()

ACE_INLINE ACE_Proactor * OpenDDS::DCPS::MulticastDataLink::get_proactor ( )

Definition at line 65 of file MulticastDataLink.inl.

References ACE_INLINE, OpenDDS::DCPS::ReactorTask::get_proactor(), and reactor_task_.

Referenced by OpenDDS::DCPS::MulticastSendStrategy::async_send().

66 {
67  if (this->reactor_task_ == 0) return 0;
68  return this->reactor_task_->get_proactor();
69 }
ACE_Proactor * get_proactor()
Definition: ReactorTask.inl:37

◆ get_reactor()

ACE_INLINE ACE_Reactor * OpenDDS::DCPS::MulticastDataLink::get_reactor ( void  )

◆ join()

bool OpenDDS::DCPS::MulticastDataLink::join ( const ACE_INET_Addr group_address)

Definition at line 74 of file MulticastDataLink.cpp.

References ACE_DEFAULT_MAX_SOCKET_BUFSIZ, ACE_ERROR_RETURN, ACE_TEXT(), ACE_TEXT_CHAR_TO_TCHAR, ACE_SOCK::close(), config(), ACE_SOCK_Dgram_Mcast::DEFOPT_NULLIFACE, ENOTSUP, ACE_IPC_SAP::get_handle(), ACE_SOCK_Dgram_Mcast::join(), LM_DEBUG, LM_ERROR, ACE_SOCK_Dgram_Mcast::OPT_BINDADDR_NO, ACE_SOCK_Dgram_Mcast::opts(), recv_strategy_, send_strategy_, OpenDDS::DCPS::set_socket_multicast_ttl(), ACE_OS::setsockopt(), SO_RCVBUF, SO_SNDBUF, socket_, SOL_SOCKET, OpenDDS::DCPS::DataLink::start(), ACE_Utils::truncate_cast(), and VDBG_LVL.

75 {
76  MulticastInst_rch cfg = config();
77  if (!cfg) {
78  return false;
79  }
80 
81  const std::string& net_if = cfg->local_address_;
82 #ifdef ACE_HAS_MAC_OSX
85 #endif
86  if (this->socket_.join(group_address, 1,
87  net_if.empty() ? 0 :
88  ACE_TEXT_CHAR_TO_TCHAR(net_if.c_str())) != 0) {
89  ACE_ERROR_RETURN((LM_ERROR,
90  ACE_TEXT("(%P|%t) ERROR: MulticastDataLink::join: ")
91  ACE_TEXT("ACE_SOCK_Dgram_Mcast::join failed %m.\n")),
92  false);
93  }
94  VDBG_LVL((LM_DEBUG, ACE_TEXT("(%P|%t) MulticastDataLink::join OK\n")), 6);
95 
96  ACE_HANDLE handle = this->socket_.get_handle();
97 
98  if (!OpenDDS::DCPS::set_socket_multicast_ttl(this->socket_, cfg->ttl_)) {
99  ACE_ERROR_RETURN((LM_ERROR,
100  ACE_TEXT("(%P|%t) ERROR: ")
101  ACE_TEXT("MulticastDataLink::join: ")
102  ACE_TEXT("OpenDDS::DCPS::set_socket_multicast_ttl failed.\n")),
103  false);
104  }
105 
106  int rcv_buffer_size = ACE_Utils::truncate_cast<int>(cfg->rcv_buffer_size_);
107  if (rcv_buffer_size != 0
108  && ACE_OS::setsockopt(handle, SOL_SOCKET,
109  SO_RCVBUF,
110  (char *) &rcv_buffer_size,
111  sizeof (int)) < 0) {
112  ACE_ERROR_RETURN((LM_ERROR,
113  ACE_TEXT("(%P|%t) ERROR: ")
114  ACE_TEXT("MulticastDataLink::join: ")
115  ACE_TEXT("ACE_OS::setsockopt RCVBUF failed.\n")),
116  false);
117  }
118 
119 #if defined (ACE_DEFAULT_MAX_SOCKET_BUFSIZ)
120  int snd_size = ACE_DEFAULT_MAX_SOCKET_BUFSIZ;
121 
122  if (ACE_OS::setsockopt(handle, SOL_SOCKET,
123  SO_SNDBUF,
124  (char *) &snd_size,
125  sizeof(snd_size)) < 0
126  && errno != ENOTSUP) {
127  ACE_ERROR_RETURN((LM_ERROR,
128  ACE_TEXT("(%P|%t) ERROR: ")
129  ACE_TEXT("MulticastDataLink::join: ")
130  ACE_TEXT("ACE_OS::setsockopt SNDBUF failed to set the send buffer size to %d errno %m\n"),
131  snd_size),
132  false);
133  }
134 #endif /* ACE_DEFAULT_MAX_SOCKET_BUFSIZ */
135 
136  if (start(static_rchandle_cast<TransportSendStrategy>(this->send_strategy_),
137  static_rchandle_cast<TransportStrategy>(this->recv_strategy_))
138  != 0) {
139  this->socket_.close();
140  ACE_ERROR_RETURN((LM_ERROR,
141  ACE_TEXT("(%P|%t) ERROR: ")
142  ACE_TEXT("MulticastDataLink::join: ")
143  ACE_TEXT("DataLink::start failed!\n")),
144  false);
145  }
146 
147  return true;
148 }
int setsockopt(ACE_HANDLE handle, int level, int optname, const char *optval, int optlen)
int join(const ACE_INET_Addr &mcast_addr, int reuse_addr=1, const ACE_TCHAR *net_if=0)
void opts(int opts)
int start(const TransportSendStrategy_rch &send_strategy, const TransportStrategy_rch &receive_strategy, bool invoke_all=true)
Definition: DataLink.inl:212
bool set_socket_multicast_ttl(const ACE_SOCK_Dgram &socket, const unsigned char &ttl)
MulticastSendStrategy_rch send_strategy_
TO truncate_cast(FROM val)
MulticastReceiveStrategy_rch recv_strategy_
RcHandle< MulticastInst > MulticastInst_rch
ACE_HANDLE get_handle(void) const
#define ACE_TEXT_CHAR_TO_TCHAR(STRING)
#define ACE_DEFAULT_MAX_SOCKET_BUFSIZ
ACE_TEXT("TCP_Factory")
#define VDBG_LVL(DBG_ARGS, LEVEL)
#define ACE_ERROR_RETURN(X, Y)
int close(void)

◆ local_peer()

ACE_INLINE MulticastPeer OpenDDS::DCPS::MulticastDataLink::local_peer ( ) const

◆ make_reservation()

int OpenDDS::DCPS::MulticastDataLink::make_reservation ( const GUID_t remote_publication_id,
const GUID_t local_subscription_id,
const TransportReceiveListener_wrch receive_listener,
bool  reliable 
)
virtual

Only called by our TransportImpl object.

Return Codes: 0 means successful reservation made. -1 means failure.

Reimplemented from OpenDDS::DCPS::DataLink.

Definition at line 258 of file MulticastDataLink.cpp.

References OpenDDS::DCPS::RepoIdConverter::federationId(), find_session(), OpenDDS::DCPS::DataLink::make_reservation(), and OpenDDS::DCPS::RepoIdConverter::participantId().

262 {
263  int result = DataLink::make_reservation(rpi, lsi, trl, reliable);
264  if (reliable) {
265  const MulticastPeer remote_peer = (ACE_INT64)RepoIdConverter(rpi).federationId() << 32
266  | RepoIdConverter(rpi).participantId();
267  MulticastSession_rch session = find_session(remote_peer);
268  if (session) {
269  session->add_remote(lsi, rpi);
270  }
271  } else {
272  const MulticastPeer remote_peer = (ACE_INT64)RepoIdConverter(rpi).federationId() << 32
273  | RepoIdConverter(rpi).participantId();
274  MulticastSession_rch session = find_session(remote_peer);
275  if (session) {
276  session->add_remote(lsi);
277  }
278  }
279  return result;
280 }
MulticastSession_rch find_session(MulticastPeer remote_peer)
RcHandle< MulticastSession > MulticastSession_rch
virtual int make_reservation(const GUID_t &remote_subscription_id, const GUID_t &local_publication_id, const TransportSendListener_wrch &send_listener, bool reliable)
Definition: DataLink.cpp:398
long long ACE_INT64
ACE_INT64 MulticastPeer

◆ OPENDDS_MAP()

typedef OpenDDS::DCPS::MulticastDataLink::OPENDDS_MAP ( MulticastPeer  ,
MulticastSession_rch   
)
private

◆ reactor_task()

ACE_INLINE ReactorTask_rch OpenDDS::DCPS::MulticastDataLink::reactor_task ( )

Definition at line 52 of file MulticastDataLink.inl.

References ACE_INLINE, and reactor_task_.

53 {
54  return this->reactor_task_;
55 }

◆ ready_to_deliver()

bool OpenDDS::DCPS::MulticastDataLink::ready_to_deliver ( const ReceivedDataSample data)
private

Definition at line 359 of file MulticastDataLink.cpp.

References ACE_GUARD_RETURN, ACE_SYNCH_RECURSIVE_MUTEX, receive_strategy(), OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::received_header(), session_lock_, sessions_, and OpenDDS::DCPS::TransportHeader::source_.

Referenced by sample_received().

360 {
361  ACE_GUARD_RETURN(ACE_SYNCH_RECURSIVE_MUTEX,
362  guard,
363  this->session_lock_, false);
364 
365  const TransportHeader& theader = receive_strategy()->received_header();
366 
367  MulticastSessionMap::iterator session_it = sessions_.find(theader.source_);
368  if (session_it != sessions_.end()) {
369  MulticastSession_rch sess_rch(session_it->second);
370  guard.release();
371  return sess_rch->ready_to_deliver(theader, data);
372  }
373 
374  return true;
375 }
RcHandle< MulticastSession > MulticastSession_rch
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
MulticastReceiveStrategy * receive_strategy()
ACE_SYNCH_RECURSIVE_MUTEX session_lock_

◆ reassemble()

bool OpenDDS::DCPS::MulticastDataLink::reassemble ( ReceivedDataSample data,
const TransportHeader header 
)

Definition at line 241 of file MulticastDataLink.cpp.

References ACE_GUARD_RETURN, ACE_SYNCH_RECURSIVE_MUTEX, session_lock_, sessions_, and OpenDDS::DCPS::TransportHeader::source_.

Referenced by OpenDDS::DCPS::MulticastReceiveStrategy::reassemble().

243 {
244  ACE_GUARD_RETURN(ACE_SYNCH_RECURSIVE_MUTEX,
245  guard,
246  this->session_lock_,
247  false);
248 
249  MulticastSessionMap::iterator it(this->sessions_.find(header.source_));
250  if (it == this->sessions_.end()) return false;
251  if (it->second->acked()) {
252  return it->second->reassemble(data, header);
253  }
254  return false;
255 }
Christopher Diggins *renamed files *fixing compilation errors *adding Visual C project file *removed make Max Lybbert *removed references to missing and unused header
Definition: CHANGELOG.txt:8
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
ACE_SYNCH_RECURSIVE_MUTEX session_lock_

◆ receive_strategy()

ACE_INLINE MulticastReceiveStrategy * OpenDDS::DCPS::MulticastDataLink::receive_strategy ( )

◆ release_remote_i()

void OpenDDS::DCPS::MulticastDataLink::release_remote_i ( const GUID_t remote)
privatevirtual

Reimplemented from OpenDDS::DCPS::DataLink.

Definition at line 378 of file MulticastDataLink.cpp.

References ACE_GUARD, ACE_SYNCH_RECURSIVE_MUTEX, OpenDDS::DCPS::RepoIdConverter::federationId(), OpenDDS::DCPS::RepoIdConverter::participantId(), session_lock_, and sessions_.

379 {
380  ACE_GUARD(ACE_SYNCH_RECURSIVE_MUTEX, guard, session_lock_);
381  MulticastPeer remote_source = (ACE_INT64)RepoIdConverter(remote).federationId() << 32
382  | RepoIdConverter(remote).participantId();
383  MulticastSessionMap::iterator session_it = sessions_.find(remote_source);
384  if (session_it != sessions_.end() && session_it->second->is_reliable()) {
385  session_it->second->release_remote(remote);
386  }
387 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
long long ACE_INT64
ACE_INT64 MulticastPeer
ACE_SYNCH_RECURSIVE_MUTEX session_lock_

◆ release_reservations_i()

void OpenDDS::DCPS::MulticastDataLink::release_reservations_i ( const GUID_t remote_id,
const GUID_t local_id 
)
virtual

Reimplemented from OpenDDS::DCPS::DataLink.

Definition at line 283 of file MulticastDataLink.cpp.

References OpenDDS::DCPS::RepoIdConverter::federationId(), find_session(), and OpenDDS::DCPS::RepoIdConverter::participantId().

285 {
286  const MulticastPeer remote_peer = (ACE_INT64)RepoIdConverter(remote_id).federationId() << 32
287  | RepoIdConverter(remote_id).participantId();
288  MulticastSession_rch session = find_session(remote_peer);
289  if (session) {
290  session->remove_remote(local_id, remote_id);
291  }
292 }
MulticastSession_rch find_session(MulticastPeer remote_peer)
RcHandle< MulticastSession > MulticastSession_rch
long long ACE_INT64
ACE_INT64 MulticastPeer

◆ sample_received()

void OpenDDS::DCPS::MulticastDataLink::sample_received ( ReceivedDataSample sample)

Definition at line 295 of file MulticastDataLink.cpp.

References ACE_GUARD, ACE_SYNCH_RECURSIVE_MUTEX, OpenDDS::DCPS::ReceivedDataSample::data(), OpenDDS::DCPS::DataLink::data_received(), OpenDDS::DCPS::ReceivedDataSample::header_, OpenDDS::DCPS::DataLink::is_active(), OpenDDS::DCPS::DataSampleHeader::message_id_, OpenDDS::DCPS::MULTICAST_SYN, ACE_Message_Block::rd_ptr(), ready_to_deliver(), receive_strategy(), OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::received_header(), session_lock_, sessions_, OpenDDS::DCPS::TransportHeader::source_, OpenDDS::DCPS::DataSampleHeader::submessage_id_, OpenDDS::DCPS::TransportHeader::swap_bytes(), syn_received_no_session(), and OpenDDS::DCPS::TRANSPORT_CONTROL.

Referenced by OpenDDS::DCPS::MulticastReceiveStrategy::deliver_sample().

296 {
297  switch (sample.header_.message_id_) {
298  case TRANSPORT_CONTROL: {
299  // Transport control samples are delivered to all sessions
300  // regardless of association status:
301  {
302  Message_Block_Ptr payload(sample.data());
303  char* const ptr = payload ? payload->rd_ptr() : 0;
304 
305  ACE_GUARD(ACE_SYNCH_RECURSIVE_MUTEX,
306  guard,
307  this->session_lock_);
308 
309  const TransportHeader& theader = receive_strategy()->received_header();
310 
311  if (!is_active() && sample.header_.submessage_id_ == MULTICAST_SYN &&
312  sessions_.find(theader.source_) == sessions_.end()) {
313  // We have received a SYN but there is no session (yet) for this source.
314  // Depending on the data, we may need to send SYNACK.
315 
316  guard.release();
317  syn_received_no_session(theader.source_, payload,
318  theader.swap_bytes());
319 
320  guard.acquire();
321  MulticastSessionMap::iterator s_itr = sessions_.find(theader.source_);
322  if (s_itr != sessions_.end()) {
323  s_itr->second->record_header_received(theader);
324  }
325 
326  if (ptr) {
327  payload->rd_ptr(ptr);
328  }
329  return;
330  }
331 
332  MulticastSessionMap temp_sessions(sessions_);
333  guard.release();
334 
335  for (MulticastSessionMap::iterator it(temp_sessions.begin());
336  it != temp_sessions.end(); ++it) {
337  it->second->control_received(sample.header_.submessage_id_,
338  payload);
339  it->second->record_header_received(theader);
340 
341  // reset read pointer
342  if (ptr) {
343  payload->rd_ptr(ptr);
344  }
345  }
346  }
347  } break;
348 
349  default:
350 
351  if (ready_to_deliver(sample)) {
352  data_received(sample);
353  }
354  break;
355  }
356 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
void syn_received_no_session(MulticastPeer source, const Message_Block_Ptr &data, bool swap_bytes)
char * rd_ptr(void) const
MulticastReceiveStrategy * receive_strategy()
bool ready_to_deliver(const ReceivedDataSample &data)
unique_ptr< ACE_Message_Block, Message_Block_Deleter > Message_Block_Ptr
int data_received(ReceivedDataSample &sample, const GUID_t &readerId=GUID_UNKNOWN)
Definition: DataLink.cpp:690
ACE_SYNCH_RECURSIVE_MUTEX session_lock_

◆ send_buffer()

ACE_INLINE SingleSendBuffer * OpenDDS::DCPS::MulticastDataLink::send_buffer ( )

Definition at line 38 of file MulticastDataLink.inl.

References ACE_INLINE, and send_buffer_.

Referenced by OpenDDS::DCPS::ReliableSession::nak_received().

39 {
40  return this->send_buffer_.get();
41 }
unique_ptr< SingleSendBuffer > send_buffer_

◆ send_strategy()

ACE_INLINE MulticastSendStrategy * OpenDDS::DCPS::MulticastDataLink::send_strategy ( )

Definition at line 26 of file MulticastDataLink.inl.

References ACE_INLINE, OpenDDS::DCPS::RcHandle< T >::in(), and send_strategy_.

27 {
28  return this->send_strategy_.in();
29 }
MulticastSendStrategy_rch send_strategy_

◆ socket()

ACE_INLINE ACE_SOCK_Dgram_Mcast & OpenDDS::DCPS::MulticastDataLink::socket ( void  )

◆ stop_i()

void OpenDDS::DCPS::MulticastDataLink::stop_i ( )
privatevirtual

This announces the "stop" event to our subclass. The "stop" event will occur when this DataLink is handling a release_reservations() call and determines that it has just released all of the remaining reservations on this DataLink. The "stop" event will also occur when the TransportImpl is being shutdown() - we call stop_i() from our transport_shutdown() method to handle this case.

Reimplemented from OpenDDS::DCPS::DataLink.

Definition at line 445 of file MulticastDataLink.cpp.

References ACE_GUARD, ACE_SYNCH_RECURSIVE_MUTEX, ACE_SOCK::close(), session_lock_, sessions_, and socket_.

446 {
447  ACE_GUARD(ACE_SYNCH_RECURSIVE_MUTEX,
448  guard,
449  this->session_lock_);
450 
451  for (MulticastSessionMap::iterator it(this->sessions_.begin());
452  it != this->sessions_.end(); ++it) {
453  it->second->stop();
454  }
455  this->sessions_.clear();
456 
457  this->socket_.close();
458 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
int close(void)
ACE_SYNCH_RECURSIVE_MUTEX session_lock_

◆ syn_received_no_session()

void OpenDDS::DCPS::MulticastDataLink::syn_received_no_session ( MulticastPeer  source,
const Message_Block_Ptr data,
bool  swap_bytes 
)
private

Definition at line 390 of file MulticastDataLink.cpp.

References ACE_ERROR, ACE_TEXT(), config(), OpenDDS::DCPS::DataLink::create_control(), OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), header, LM_DEBUG, LM_ERROR, local_peer(), local_peer_, OpenDDS::DCPS::move(), OpenDDS::DCPS::MULTICAST_SYNACK, OpenDDS::DCPS::DataLink::send_control(), OpenDDS::DCPS::SEND_CONTROL_OK, transport(), and VDBG_LVL.

Referenced by sample_received().

393 {
394  Serializer serializer_read(data.get(), encoding_kind, swap_bytes);
395 
397  serializer_read >> local_peer;
398 
399  if (local_peer != local_peer_) {
400  return;
401  }
402 
403  {
404  MulticastInst_rch cfg = config();
405  VDBG_LVL((LM_DEBUG, "(%P|%t) MulticastDataLink[%C]::syn_received_no_session "
406  "send_synack local %#08x%08x remote %#08x%08x\n",
407  cfg ? cfg->name().c_str() : "",
408  (unsigned int) (local_peer >> 32),
409  (unsigned int) local_peer,
410  (unsigned int) (source >> 32),
411  (unsigned int) source), 2);
412  }
413 
414  Message_Block_Ptr synack_data(new ACE_Message_Block(sizeof(MulticastPeer)));
415 
416  Serializer serializer_write(synack_data.get(), encoding_kind);
417  serializer_write << source;
418 
419  DataSampleHeader header;
420  Message_Block_Ptr control(
421  create_control(MULTICAST_SYNACK, header, move(synack_data)));
422 
423  if (control == 0) {
424  ACE_ERROR((LM_ERROR,
425  ACE_TEXT("(%P|%t) ERROR: ")
426  ACE_TEXT("MulticastDataLink::syn_received_no_session: ")
427  ACE_TEXT("create_control failed!\n")));
428  return;
429  }
430 
431  const int error = send_control(header, move(control));
432  if (error != SEND_CONTROL_OK) {
433  ACE_ERROR((LM_ERROR, "(%P|%t) MulticastDataLink::syn_received_no_session: "
434  "ERROR: send_control failed: %d!\n", error));
435  return;
436  }
437 
439  if (mt) {
440  mt->passive_connection(local_peer, source);
441  }
442 }
#define ACE_ERROR(X)
MulticastTransport_rch transport()
Christopher Diggins *renamed files *fixing compilation errors *adding Visual C project file *removed make Max Lybbert *removed references to missing and unused header
Definition: CHANGELOG.txt:8
T::rv_reference move(T &p)
Definition: unique_ptr.h:141
ACE_Message_Block * create_control(char submessage_id, DataSampleHeader &header, Message_Block_Ptr data)
Definition: DataLink.cpp:628
RcHandle< MulticastInst > MulticastInst_rch
ACE_TEXT("TCP_Factory")
#define VDBG_LVL(DBG_ARGS, LEVEL)
RcHandle< MulticastTransport > MulticastTransport_rch
unique_ptr< ACE_Message_Block, Message_Block_Deleter > Message_Block_Ptr
SendControlStatus send_control(const DataSampleHeader &header, Message_Block_Ptr data)
Definition: DataLink.cpp:668
ACE_INT64 MulticastPeer

◆ transport()

ACE_INLINE MulticastTransport_rch OpenDDS::DCPS::MulticastDataLink::transport ( void  )

Definition at line 14 of file MulticastDataLink.inl.

References ACE_INLINE, OpenDDS::DCPS::dynamic_rchandle_cast(), and OpenDDS::DCPS::DataLink::impl().

Referenced by config(), find_or_create_session(), OpenDDS::DCPS::MulticastSession::syn_received(), and syn_received_no_session().

15 {
16  return dynamic_rchandle_cast<MulticastTransport>(impl());
17 }
TransportImpl_rch impl() const
Definition: DataLink.cpp:105
RcHandle< T > dynamic_rchandle_cast(const RcHandle< U > &h)
Definition: RcHandle_T.h:214

Member Data Documentation

◆ local_peer_

MulticastPeer OpenDDS::DCPS::MulticastDataLink::local_peer_
private

Definition at line 87 of file MulticastDataLink.h.

Referenced by local_peer(), and syn_received_no_session().

◆ reactor_task_

ReactorTask_rch OpenDDS::DCPS::MulticastDataLink::reactor_task_
private

Definition at line 89 of file MulticastDataLink.h.

Referenced by get_proactor(), get_reactor(), and reactor_task().

◆ readers_selected_

RepoIdSet OpenDDS::DCPS::MulticastDataLink::readers_selected_
private

Definition at line 109 of file MulticastDataLink.h.

◆ readers_withheld_

RepoIdSet OpenDDS::DCPS::MulticastDataLink::readers_withheld_
private

Definition at line 109 of file MulticastDataLink.h.

◆ recv_strategy_

MulticastReceiveStrategy_rch OpenDDS::DCPS::MulticastDataLink::recv_strategy_
private

Definition at line 92 of file MulticastDataLink.h.

Referenced by join(), and receive_strategy().

◆ send_buffer_

unique_ptr<SingleSendBuffer> OpenDDS::DCPS::MulticastDataLink::send_buffer_
private

◆ send_strategy_

MulticastSendStrategy_rch OpenDDS::DCPS::MulticastDataLink::send_strategy_
private

Definition at line 91 of file MulticastDataLink.h.

Referenced by join(), MulticastDataLink(), send_strategy(), and ~MulticastDataLink().

◆ session_factory_

MulticastSessionFactory_rch OpenDDS::DCPS::MulticastDataLink::session_factory_
private

Definition at line 85 of file MulticastDataLink.h.

Referenced by find_or_create_session(), and MulticastDataLink().

◆ session_lock_

ACE_SYNCH_RECURSIVE_MUTEX OpenDDS::DCPS::MulticastDataLink::session_lock_
private

◆ sessions_

MulticastSessionMap OpenDDS::DCPS::MulticastDataLink::sessions_
private

◆ socket_

ACE_SOCK_Dgram_Mcast OpenDDS::DCPS::MulticastDataLink::socket_
private

Definition at line 96 of file MulticastDataLink.h.

Referenced by join(), socket(), and stop_i().


The documentation for this class was generated from the following files: