28 #ifndef __ACE_INLINE__ 47 :
DataLink(transport, 0 , false , is_active),
48 session_factory_(session_factory),
49 local_peer_(local_peer),
50 reactor_task_(reactor_task),
59 const size_t max_samples_per_packet = config ? config->max_samples_per_packet_ : default_max_samples;
81 const std::string& net_if = cfg->local_address_;
82 #ifdef ACE_HAS_MAC_OSX 90 ACE_TEXT(
"(%P|%t) ERROR: MulticastDataLink::join: ")
91 ACE_TEXT(
"ACE_SOCK_Dgram_Mcast::join failed %m.\n")),
101 ACE_TEXT(
"MulticastDataLink::join: ")
102 ACE_TEXT(
"OpenDDS::DCPS::set_socket_multicast_ttl failed.\n")),
107 if (rcv_buffer_size != 0
110 (
char *) &rcv_buffer_size,
114 ACE_TEXT(
"MulticastDataLink::join: ")
115 ACE_TEXT(
"ACE_OS::setsockopt RCVBUF failed.\n")),
119 #if defined (ACE_DEFAULT_MAX_SOCKET_BUFSIZ) 125 sizeof(snd_size)) < 0
129 ACE_TEXT(
"MulticastDataLink::join: ")
130 ACE_TEXT(
"ACE_OS::setsockopt SNDBUF failed to set the send buffer size to %d errno %m\n"),
142 ACE_TEXT(
"MulticastDataLink::join: ")
143 ACE_TEXT(
"DataLink::start failed!\n")),
158 MulticastSessionMap::iterator it(this->
sessions_.find(remote_peer));
173 MulticastSessionMap::iterator it(this->
sessions_.find(remote_peer));
185 ACE_TEXT(
"MulticastDataLink::find_or_create_session: ")
186 ACE_TEXT(
"failed to create session for remote peer: %#08x%08x!\n"),
187 (
unsigned int) (remote_peer >> 32),
188 (
unsigned int) remote_peer),
192 std::pair<MulticastSessionMap::iterator, bool> pair = this->
sessions_.insert(
193 MulticastSessionMap::value_type(remote_peer, session));
194 if (pair.first == this->sessions_.end()) {
197 ACE_TEXT(
"MulticastDataLink::find_or_create_session: ")
198 ACE_TEXT(
"failed to insert session for remote peer: %#08x%08x!\n"),
199 (
unsigned int) (remote_peer >> 32),
200 (
unsigned int) remote_peer),
219 if (it != this->
sessions_.end() && it->second->acked()) {
220 return it->second->check_header(header);
250 if (it == this->
sessions_.end())
return false;
251 if (it->second->acked()) {
252 return it->second->reassemble(data, header);
269 session->add_remote(lsi, rpi);
276 session->add_remote(lsi);
290 session->remove_remote(local_id, remote_id);
303 char*
const ptr = payload ? payload->
rd_ptr() : 0;
323 s_itr->second->record_header_received(theader);
327 payload->rd_ptr(ptr);
332 MulticastSessionMap temp_sessions(
sessions_);
335 for (MulticastSessionMap::iterator it(temp_sessions.begin());
336 it != temp_sessions.end(); ++it) {
339 it->second->record_header_received(theader);
343 payload->rd_ptr(ptr);
371 return sess_rch->ready_to_deliver(theader, data);
383 MulticastSessionMap::iterator session_it =
sessions_.find(remote_source);
384 if (session_it !=
sessions_.end() && session_it->second->is_reliable()) {
385 session_it->second->release_remote(remote);
394 Serializer serializer_read(data.
get(), encoding_kind, swap_bytes);
405 VDBG_LVL((
LM_DEBUG,
"(%P|%t) MulticastDataLink[%C]::syn_received_no_session " 406 "send_synack local %#08x%08x remote %#08x%08x\n",
407 cfg ? cfg->name().c_str() :
"",
408 (
unsigned int) (local_peer >> 32),
409 (
unsigned int) local_peer,
410 (
unsigned int) (source >> 32),
411 (
unsigned int) source), 2);
416 Serializer serializer_write(synack_data.
get(), encoding_kind);
417 serializer_write << source;
426 ACE_TEXT(
"MulticastDataLink::syn_received_no_session: ")
427 ACE_TEXT(
"create_control failed!\n")));
434 "ERROR: send_control failed: %d!\n", error));
440 mt->passive_connection(local_peer, source);
451 for (MulticastSessionMap::iterator it(this->
sessions_.begin());
DataSampleHeader header_
The demarshalled sample header.
int setsockopt(ACE_HANDLE handle, int level, int optname, const char *optval, int optlen)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
virtual int requires_send_buffer() const =0
MulticastSessionMap sessions_
char message_id_
The enum MessageId.
MulticastTransport_rch transport()
int join(const ACE_INET_Addr &mcast_addr, int reuse_addr=1, const ACE_TCHAR *net_if=0)
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
MulticastDataLink(const MulticastTransport_rch &transport, const MulticastSessionFactory_rch &session_factory, MulticastPeer local_peer, const MulticastInst_rch &config, const ReactorTask_rch &reactor_task, bool is_active)
virtual MulticastSession_rch create(RcHandle< ReactorInterceptor > interceptor, MulticastDataLink *link, MulticastPeer remote_peer)=0
int data_received(ReceivedDataSample &sample, const GUID_t &readerId=GUID_UNKNOWN)
bool swap_bytes() const
Determine if the serializer should swap bytes.
ACE_Message_Block * create_control(char submessage_id, DataSampleHeader &header, Message_Block_Ptr data)
ParticipantId participantId() const
Get the participant id from the GUID.
RcHandle< MulticastSession > MulticastSession_rch
OpenDDS::Federator::RepoKey federationId() const
Get the federeation id from the GUID.
T::rv_reference move(T &p)
char * rd_ptr(void) const
void client_stop(const GUID_t &localId)
void release_reservations_i(const GUID_t &remote_id, const GUID_t &local_id)
MulticastPeer local_peer() const
int start(const TransportSendStrategy_rch &send_strategy, const TransportStrategy_rch &receive_strategy, bool invoke_all=true)
MulticastReceiveStrategy_rch recv_strategy_
TO truncate_cast(FROM val)
Class to serialize and deserialize data for DDS.
bool set_socket_multicast_ttl(const ACE_SOCK_Dgram &socket, const unsigned char &ttl)
MulticastSendStrategy_rch send_strategy_
#define ACE_SYNCH_RECURSIVE_MUTEX
Christopher Diggins *renamed files *fixing compilation errors *adding Visual C project file *removed make Max Lybbert *removed references to missing and unused header
static const size_t DEFAULT_NAK_DEPTH
Holds a data sample received by the transport.
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
MulticastSession_rch find_session(MulticastPeer remote_peer)
MulticastPeer local_peer_
const TH & received_header() const
Defines class that represents a transport packet header.
int make_reservation(const GUID_t &remote_publication_id, const GUID_t &local_subscription_id, const TransportReceiveListener_wrch &receive_listener, bool reliable)
ACE_HANDLE get_handle(void) const
#define ACE_TEXT_CHAR_TO_TCHAR(STRING)
#define ACE_DEFAULT_MAX_SOCKET_BUFSIZ
char submessage_id_
Implementation-specific sub-message Ids.
unique_ptr< SingleSendBuffer > send_buffer_
virtual ~MulticastDataLink()
bool join(const ACE_INET_Addr &group_address)
void release_remote_i(const GUID_t &remote)
ACE_SOCK_Dgram_Mcast socket_
bool check_header(const TransportHeader &header)
void syn_received_no_session(MulticastPeer source, const Message_Block_Ptr &data, bool swap_bytes)
virtual int make_reservation(const GUID_t &remote_subscription_id, const GUID_t &local_publication_id, const TransportSendListener_wrch &send_listener, bool reliable)
#define VDBG_LVL(DBG_ARGS, LEVEL)
SendControlStatus send_control(const DataSampleHeader &header, Message_Block_Ptr data)
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
void send_buffer(TransportSendBuffer *send_buffer)
Assigns an optional send buffer.
ACE_Message_Block * data(ACE_Allocator *mb_alloc=0) const
MulticastSessionFactory_rch session_factory_
bool ready_to_deliver(const ReceivedDataSample &data)
ACE_SYNCH_RECURSIVE_MUTEX session_lock_
#define ACE_ERROR_RETURN(X, Y)
MulticastInst_rch config()
bool reassemble(ReceivedDataSample &data, const TransportHeader &header)
MulticastSession_rch find_or_create_session(MulticastPeer remote_peer)
The Internal API and Implementation of OpenDDS.
MulticastReceiveStrategy * receive_strategy()
void sample_received(ReceivedDataSample &sample)