00001
00002
00003
00004
00005
00006
00007
00008 #include "MulticastTransport.h"
00009 #include "MulticastDataLink.h"
00010 #include "MulticastReceiveStrategy.h"
00011 #include "MulticastSendStrategy.h"
00012 #include "MulticastSession.h"
00013 #include "BestEffortSessionFactory.h"
00014 #include "ReliableSessionFactory.h"
00015
00016 #include "ace/Log_Msg.h"
00017 #include "ace/Truncate.h"
00018
00019 #include "dds/DCPS/RepoIdConverter.h"
00020 #include "dds/DCPS/AssociationData.h"
00021 #include "dds/DCPS/transport/framework/NetworkAddress.h"
00022 #include "dds/DCPS/transport/framework/TransportExceptions.h"
00023 #include "dds/DCPS/transport/framework/TransportClient.h"
00024
00025 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00026
00027 namespace OpenDDS {
00028 namespace DCPS {
00029
00030 MulticastTransport::MulticastTransport(MulticastInst& inst)
00031 : TransportImpl(inst)
00032 {
00033 if (! (configure_i(inst) && open())) {
00034 throw Transport::UnableToCreate();
00035 }
00036 }
00037
00038 MulticastTransport::~MulticastTransport()
00039 {
00040 }
00041
00042
00043 MulticastInst&
00044 MulticastTransport::config() const
00045 {
00046 return static_cast<MulticastInst&>(TransportImpl::config());
00047 }
00048
00049 MulticastDataLink_rch
00050 MulticastTransport::make_datalink(const RepoId& local_id,
00051 Priority priority,
00052 bool active)
00053 {
00054
00055 RcHandle<MulticastSessionFactory> session_factory;
00056
00057 if (this->config().is_reliable()) {
00058 session_factory = make_rch<ReliableSessionFactory>();
00059
00060 } else {
00061 session_factory = make_rch<BestEffortSessionFactory>();
00062 }
00063
00064 MulticastPeer local_peer = (ACE_INT64)RepoIdConverter(local_id).federationId() << 32
00065 | RepoIdConverter(local_id).participantId();
00066
00067 VDBG_LVL((LM_DEBUG, "(%P|%t) MulticastTransport[%C]::make_datalink "
00068 "peers: local %#08x%08x priority %d active %d\n",
00069 this->config().name().c_str(), (unsigned int)(local_peer >> 32), (unsigned int)local_peer,
00070 priority, active), 2);
00071
00072 TransportReactorTask_rch rtask(reactor_task());
00073 MulticastDataLink_rch link(make_rch<MulticastDataLink>(ref(*this),
00074 session_factory,
00075 local_peer,
00076 ref(config()),
00077 rtask.in(),
00078 active));
00079
00080
00081 if (!link->join(this->config().group_address_)) {
00082 ACE_TCHAR str[64];
00083 this->config().group_address_.addr_to_string(str,
00084 sizeof(str)/sizeof(str[0]));
00085 ACE_ERROR((LM_ERROR,
00086 ACE_TEXT("(%P|%t) ERROR: ")
00087 ACE_TEXT("MulticastTransport::make_datalink: ")
00088 ACE_TEXT("failed to join multicast group: %s!\n"),
00089 str));
00090 return MulticastDataLink_rch();
00091 }
00092
00093 return link;
00094 }
00095
00096 MulticastSession_rch
00097 MulticastTransport::start_session(const MulticastDataLink_rch& link,
00098 MulticastPeer remote_peer, bool active)
00099 {
00100 if (link.is_nil()) {
00101 ACE_ERROR_RETURN((LM_ERROR,
00102 ACE_TEXT("(%P|%t) ERROR: ")
00103 ACE_TEXT("MulticastTransport[%C]::start_session: ")
00104 ACE_TEXT("link is nil\n"),
00105 this->config().name().c_str()),
00106 MulticastSession_rch());
00107 }
00108
00109 MulticastSession_rch session(link->find_or_create_session(remote_peer));
00110
00111 if (session.is_nil()) {
00112 ACE_ERROR_RETURN((LM_ERROR,
00113 ACE_TEXT("(%P|%t) ERROR: ")
00114 ACE_TEXT("MulticastTransport[%C]::start_session: ")
00115 ACE_TEXT("failed to create session for remote peer: %#08x%08x!\n"),
00116 this->config().name().c_str(),
00117 (unsigned int)(remote_peer >> 32),
00118 (unsigned int) remote_peer),
00119 MulticastSession_rch());
00120 }
00121
00122 const bool acked = this->connections_.count(std::make_pair(remote_peer, link->local_peer()));
00123
00124 if (!session->start(active, acked)) {
00125 ACE_ERROR_RETURN((LM_ERROR,
00126 ACE_TEXT("(%P|%t) ERROR: ")
00127 ACE_TEXT("MulticastTransport[%C]::start_session: ")
00128 ACE_TEXT("failed to start session for remote peer: %#08x%08x!\n"),
00129 this->config().name().c_str(),
00130 (unsigned int)(remote_peer >> 32),
00131 (unsigned int) remote_peer),
00132 MulticastSession_rch());
00133 }
00134
00135 return session;
00136 }
00137
00138 static bool
00139 get_remote_reliability(const TransportImpl::RemoteTransport& remote)
00140 {
00141 NetworkAddress network_address;
00142 ACE_CDR::Boolean reliable;
00143
00144 const size_t len = remote.blob_.length();
00145 const char* buffer = reinterpret_cast<const char*>(remote.blob_.get_buffer());
00146
00147 ACE_InputCDR cdr(buffer, len);
00148 cdr >> network_address;
00149 cdr >> ACE_InputCDR::to_boolean(reliable);
00150
00151 return reliable;
00152 }
00153
00154 TransportImpl::AcceptConnectResult
00155 MulticastTransport::connect_datalink(const RemoteTransport& remote,
00156 const ConnectionAttribs& attribs,
00157 const TransportClient_rch&)
00158 {
00159
00160 if (get_remote_reliability(remote) != this->config().is_reliable()) {
00161 return AcceptConnectResult();
00162 }
00163
00164 GuardThreadType guard_links(this->links_lock_);
00165 const MulticastPeer local_peer = (ACE_INT64)RepoIdConverter(attribs.local_id_).federationId() << 32
00166 | RepoIdConverter(attribs.local_id_).participantId();
00167 Links::const_iterator link_iter = this->client_links_.find(local_peer);
00168 MulticastDataLink_rch link;
00169
00170 if (link_iter == this->client_links_.end()) {
00171 link = this->make_datalink(attribs.local_id_, attribs.priority_, true );
00172 this->client_links_[local_peer] = link;
00173 } else {
00174 link = link_iter->second;
00175 }
00176
00177 MulticastPeer remote_peer = (ACE_INT64)RepoIdConverter(remote.repo_id_).federationId() << 32
00178 | RepoIdConverter(remote.repo_id_).participantId();
00179
00180 MulticastSession_rch session(
00181 this->start_session(link, remote_peer, true ));
00182
00183 if (session.is_nil()) {
00184 Links::iterator to_remove = this->client_links_.find(local_peer);
00185 if (to_remove != this->client_links_.end()) {
00186 this->client_links_.erase(to_remove);
00187 }
00188 return AcceptConnectResult();
00189 }
00190 return AcceptConnectResult(link);
00191 }
00192
00193 TransportImpl::AcceptConnectResult
00194 MulticastTransport::accept_datalink(const RemoteTransport& remote,
00195 const ConnectionAttribs& attribs,
00196 const TransportClient_rch& client)
00197 {
00198
00199 if (get_remote_reliability(remote) != this->config().is_reliable()) {
00200 return AcceptConnectResult();
00201 }
00202
00203 const MulticastPeer local_peer = (ACE_INT64)RepoIdConverter(attribs.local_id_).federationId() << 32
00204 | RepoIdConverter(attribs.local_id_).participantId();
00205
00206 GuardThreadType guard_links(this->links_lock_);
00207
00208 Links::const_iterator link_iter = this->server_links_.find(local_peer);
00209 MulticastDataLink_rch link;
00210
00211 if (link_iter == this->server_links_.end()) {
00212
00213 link = this->make_datalink(attribs.local_id_, attribs.priority_, false );
00214 this->server_links_[local_peer] = link;
00215 } else {
00216 link = link_iter->second;
00217 }
00218
00219 guard_links.release();
00220
00221 MulticastPeer remote_peer = (ACE_INT64)RepoIdConverter(remote.repo_id_).federationId() << 32
00222 | RepoIdConverter(remote.repo_id_).participantId();
00223 GuardThreadType guard(this->connections_lock_);
00224
00225 if (connections_.count(std::make_pair(remote_peer, local_peer))) {
00226
00227
00228 guard.release();
00229
00230 VDBG((LM_DEBUG, "(%P|%t) MulticastTransport::accept_datalink found\n"));
00231 MulticastSession_rch session(
00232 this->start_session(link, remote_peer, false ));
00233
00234 if (session.is_nil()) {
00235 link.reset();
00236 }
00237 return AcceptConnectResult(link);
00238
00239 } else {
00240
00241 this->pending_connections_[std::make_pair(remote_peer, local_peer)].
00242 push_back(std::make_pair(client, remote.repo_id_));
00243
00244
00245 guard.release();
00246 MulticastSession_rch session(
00247 this->start_session(link, remote_peer, false ));
00248
00249 return AcceptConnectResult(
00250 session ? AcceptConnectResult::ACR_SUCCESS : AcceptConnectResult::ACR_FAILED
00251 );
00252
00253 }
00254 }
00255
00256 void
00257 MulticastTransport::stop_accepting_or_connecting(const TransportClient_wrch& client,
00258 const RepoId& remote_id)
00259 {
00260 VDBG((LM_DEBUG, "(%P|%t) MulticastTransport::stop_accepting_or_connecting\n"));
00261
00262 GuardThreadType guard(this->connections_lock_);
00263
00264 for (PendConnMap::iterator it = this->pending_connections_.begin();
00265 it != this->pending_connections_.end(); ++it) {
00266 bool erased_from_it = false;
00267 for (size_t i = 0; i < it->second.size(); ++i) {
00268 if (it->second[i].first == client && it->second[i].second == remote_id) {
00269 erased_from_it = true;
00270 it->second.erase(it->second.begin() + i);
00271 break;
00272 }
00273 }
00274
00275 if (erased_from_it && it->second.empty()) {
00276 this->pending_connections_.erase(it);
00277 return;
00278 }
00279 }
00280 }
00281
00282 void
00283 MulticastTransport::passive_connection(MulticastPeer local_peer, MulticastPeer remote_peer)
00284 {
00285 GuardThreadType guard(this->connections_lock_);
00286
00287 VDBG_LVL((LM_DEBUG, "(%P|%t) MulticastTransport[%C]::passive_connection "
00288 "from remote peer %#08x%08x to local peer %#08x%08x\n",
00289 this->config().name().c_str(),
00290 (unsigned int) (remote_peer >> 32),
00291 (unsigned int) remote_peer,
00292 (unsigned int) (local_peer >> 32),
00293 (unsigned int) local_peer), 2);
00294
00295 const Peers peers(remote_peer, local_peer);
00296 const PendConnMap::iterator pend = this->pending_connections_.find(peers);
00297
00298
00299 this->connections_.insert(peers);
00300
00301 Links::const_iterator server_link = this->server_links_.find(local_peer);
00302 DataLink_rch link;
00303
00304 if (server_link != this->server_links_.end()) {
00305 link = static_rchandle_cast<DataLink>(server_link->second);
00306 MulticastSession_rch session (server_link->second->find_or_create_session(remote_peer));
00307 session->set_acked();
00308 }
00309
00310 if (pend != pending_connections_.end()) {
00311 Callbacks tmp(pend->second);
00312 for (size_t i = 0; i < tmp.size(); ++i) {
00313 const PendConnMap::iterator pend = pending_connections_.find(peers);
00314 if (pend != pending_connections_.end()) {
00315 const Callbacks::iterator tmp_iter = find(pend->second.begin(),
00316 pend->second.end(),
00317 tmp.at(i));
00318 if (tmp_iter != pend->second.end()) {
00319 TransportClient_wrch pend_client = tmp.at(i).first;
00320 RepoId remote_repo = tmp.at(i).second;
00321 guard.release();
00322 TransportClient_rch client = pend_client.lock();
00323 if (client)
00324 client->use_datalink(remote_repo, link);
00325 guard.acquire();
00326 }
00327 }
00328 }
00329 }
00330 }
00331
00332 bool
00333 MulticastTransport::configure_i(MulticastInst& config)
00334 {
00335
00336 if (config.local_address_.empty () &&
00337 !TheServiceParticipant->default_address ().empty ()) {
00338 config.local_address_ = TheServiceParticipant->default_address ().c_str ();
00339 }
00340
00341 if (!config.group_address_.is_multicast()) {
00342 ACE_ERROR_RETURN((LM_ERROR,
00343 ACE_TEXT("(%P|%t) ERROR: ")
00344 ACE_TEXT("MulticastTransport[%@]::configure_i: ")
00345 ACE_TEXT("invalid configuration: address %C is not ")
00346 ACE_TEXT("multicast.\n"),
00347 this, this->config().group_address_.get_host_addr()),
00348 false);
00349 }
00350
00351 this->create_reactor_task(config.async_send_);
00352
00353 return true;
00354 }
00355
00356 void
00357 MulticastTransport::shutdown_i()
00358 {
00359 GuardThreadType guard_links(this->links_lock_);
00360 Links::iterator link;
00361
00362 for (link = this->client_links_.begin();
00363 link != this->client_links_.end();
00364 ++link) {
00365 if (link->second.in()) {
00366 link->second->transport_shutdown();
00367 }
00368 }
00369 client_links_.clear();
00370
00371 for (link = this->server_links_.begin();
00372 link != this->server_links_.end();
00373 ++link) {
00374 if (link->second.in()) {
00375 link->second->transport_shutdown();
00376 }
00377 }
00378 server_links_.clear();
00379 }
00380
00381 bool
00382 MulticastTransport::connection_info_i(TransportLocator& info) const
00383 {
00384 this->config().populate_locator(info);
00385 return true;
00386 }
00387
00388 void
00389 MulticastTransport::release_datalink(DataLink* )
00390 {
00391
00392
00393 }
00394
00395 }
00396 }
00397
00398 OPENDDS_END_VERSIONED_NAMESPACE_DECL