57 if (cfg && cfg->is_reliable()) {
58 session_factory = make_rch<ReliableSessionFactory>();
60 session_factory = make_rch<BestEffortSessionFactory>();
67 "peers: local %#08x%08x priority %d active %d\n",
68 cfg ? cfg->name().c_str() :
"", (
unsigned int)(local_peer >> 32), (
unsigned int)local_peer,
69 priority, active), 2);
79 if (!link->join(cfg->group_address_)) {
81 ACE_TEXT(
"failed to join multicast group: %C!\n"),
98 ACE_TEXT(
"MulticastTransport[%C]::start_session: ")
100 cfg ? cfg->name().c_str() :
""),
106 if (session.is_nil()) {
109 ACE_TEXT(
"MulticastTransport[%C]::start_session: ")
110 ACE_TEXT(
"failed to create session for remote peer: %#08x%08x!\n"),
111 cfg ? cfg->name().c_str() :
"",
112 (
unsigned int)(remote_peer >> 32),
113 (
unsigned int) remote_peer),
117 const bool acked = this->connections_.count(std::make_pair(remote_peer, link->local_peer()));
119 if (!session->start(active, acked)) {
122 ACE_TEXT(
"MulticastTransport[%C]::start_session: ")
123 ACE_TEXT(
"failed to start session for remote peer: %#08x%08x!\n"),
124 cfg ? cfg->name().c_str() :
"",
125 (
unsigned int)(remote_peer >> 32),
126 (
unsigned int) remote_peer),
139 const size_t len = remote.
blob_.length();
140 const char* buffer =
reinterpret_cast<const char*
>(remote.
blob_.get_buffer());
143 cdr >> network_resource;
168 Links::const_iterator link_iter = this->
client_links_.find(local_peer);
175 link = link_iter->second;
181 if (cfg->is_reliable()) {
182 link->add_on_start_callback(client, remote.
repo_id_);
189 Links::iterator to_remove = this->
client_links_.find(local_peer);
193 link->remove_on_start_callback(client, remote.
repo_id_);
197 if (cfg->is_reliable()) {
199 if (remote_peer != local_peer) {
230 Links::const_iterator link_iter = this->
server_links_.find(local_peer);
238 link = link_iter->second;
248 if (connections_.count(std::make_pair(remote_peer, local_peer))) {
253 VDBG((
LM_DEBUG,
"(%P|%t) MulticastTransport::accept_datalink found\n"));
264 if (cfg->is_reliable()) {
277 }
else if (cfg->is_reliable()) {
291 VDBG((
LM_DEBUG,
"(%P|%t) MulticastTransport::stop_accepting_or_connecting\n"));
297 bool erased_from_it =
false;
298 for (
size_t i = 0; i < it->second.size(); ++i) {
299 if (it->second[i].first == client && it->second[i].second == remote_id) {
300 erased_from_it =
true;
301 it->second.erase(it->second.begin() + i);
306 if (erased_from_it && it->second.empty()) {
321 "from remote peer %#08x%08x to local peer %#08x%08x\n",
322 cfg ? cfg->name().c_str() :
"",
323 (
unsigned int) (remote_peer >> 32),
324 (
unsigned int) remote_peer,
325 (
unsigned int) (local_peer >> 32),
326 (
unsigned int) local_peer), 2);
328 const Peers peers(remote_peer, local_peer);
333 this->connections_.insert(peers);
335 Links::const_iterator server_link = this->
server_links_.find(local_peer);
341 session->set_acked();
346 for (
size_t i = 0; i < tmp.size(); ++i) {
349 const Callbacks::iterator tmp_iter =
find(pend->second.begin(),
352 if (tmp_iter != pend->second.end()) {
354 GUID_t remote_repo = tmp.at(i).second;
358 client->use_datalink(remote_repo, link);
375 if (config->local_address_.empty() &&
378 config->local_address_ =
TheServiceParticipant->default_address().to_addr().get_host_addr(buffer,
sizeof buffer);
381 if (!config->group_address_.is_multicast()) {
383 ACE_TEXT(
"invalid configuration: address %C is not multicast.\n"),
384 this,
LogAddr::ip(config->group_address_).c_str()),
false);
396 Links::iterator link;
401 if (link->second.in()) {
402 link->second->transport_shutdown();
410 if (link->second.in()) {
411 link->second->transport_shutdown();
422 cfg->populate_locator(info, flags);
440 Links::const_iterator link_iter = this->
client_links_.find(local_peer);
444 link = link_iter->second;
449 link->client_stop(localId);
ThreadLockType connections_lock_
RcHandle< T > rchandle_from(T *pointer)
virtual void release_datalink(DataLink *link)
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
static bool get_remote_reliability(const TransportImpl::RemoteTransport &remote)
void create_reactor_task(bool useAsyncSend=false, const OPENDDS_STRING &name="")
Links server_links_
link for subs.
bool configure_i(const MulticastInst_rch &config)
ParticipantId participantId() const
Get the participant id from the GUID.
MulticastTransport(const MulticastInst_rch &inst)
RcHandle< MulticastSession > MulticastSession_rch
reference_wrapper< T > ref(T &r)
void passive_connection(MulticastPeer local_peer, MulticastPeer remote_peer)
OpenDDS::Federator::RepoKey federationId() const
Get the federeation id from the GUID.
virtual AcceptConnectResult connect_datalink(const RemoteTransport &remote, const ConnectionAttribs &attribs, const TransportClient_rch &client)
virtual void stop_accepting_or_connecting(const TransportClient_wrch &client, const GUID_t &remote_id, bool disassociate, bool association_failed)
PendConnMap pending_connections_
MulticastDataLink_rch make_datalink(const GUID_t &local_id, Priority priority, bool active)
virtual AcceptConnectResult accept_datalink(const RemoteTransport &remote, const ConnectionAttribs &attribs, const TransportClient_rch &client)
Defines a wrapper around address info which is used for advertise.
virtual bool connection_info_i(TransportLocator &info, ConnectionInfoFlags flags) const
const char * c_str() const
void client_stop(const GUID_t &localId)
std::vector< DataLink::OnStartCallback > Callbacks
ReactorTask_rch reactor_task()
MulticastInst_rch config() const
void push_back(Seq &seq, const typename Seq::value_type &val)
std::vector-style push_back() for CORBA Sequences
ThreadLockType links_lock_
RcHandle< T > static_rchandle_cast(const RcHandle< U > &h)
MulticastSession_rch start_session(const MulticastDataLink_rch &link, MulticastPeer remote_peer, bool active)
#define VDBG_LVL(DBG_ARGS, LEVEL)
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
std::pair< MulticastPeer, MulticastPeer > Peers
#define ACE_ERROR_RETURN(X, Y)
RcHandle< T > lock() const
static const String ip(const ACE_INET_Addr &addr)
RcHandle< T > dynamic_rchandle_cast(const RcHandle< U > &h)
#define TheServiceParticipant
The Internal API and Implementation of OpenDDS.
RcHandle< MulticastDataLink > MulticastDataLink_rch
virtual void shutdown_i()
size_t ConnectionInfoFlags
TransportInst_rch config() const
int find(Container &c, const Key &key, typename Container::mapped_type *&value)