00001
00002
00003
00004
00005
00006
00007
00008 #include "MulticastTransport.h"
00009 #include "MulticastDataLink.h"
00010 #include "MulticastReceiveStrategy.h"
00011 #include "MulticastSendStrategy.h"
00012 #include "MulticastSession.h"
00013 #include "BestEffortSessionFactory.h"
00014 #include "ReliableSessionFactory.h"
00015
00016 #include "ace/Log_Msg.h"
00017 #include "ace/Truncate.h"
00018
00019 #include "dds/DCPS/RepoIdConverter.h"
00020 #include "dds/DCPS/AssociationData.h"
00021 #include "dds/DCPS/transport/framework/NetworkAddress.h"
00022 #include "dds/DCPS/transport/framework/TransportExceptions.h"
00023 #include "dds/DCPS/transport/framework/TransportClient.h"
00024
00025 namespace OpenDDS {
00026 namespace DCPS {
00027
00028 MulticastTransport::MulticastTransport(const TransportInst_rch& inst)
00029 : config_i_(0)
00030 {
00031 if (!inst.is_nil()) {
00032 if (!configure(inst.in())) {
00033 throw Transport::UnableToCreate();
00034 }
00035 }
00036
00037 }
00038
00039 MulticastTransport::~MulticastTransport()
00040 {
00041 }
00042
00043 MulticastDataLink*
00044 MulticastTransport::make_datalink(const RepoId& local_id,
00045 Priority priority,
00046 bool active)
00047 {
00048 RcHandle<MulticastSessionFactory> session_factory;
00049
00050 if (this->config_i_->reliable_) {
00051 ACE_NEW_RETURN(session_factory, ReliableSessionFactory, 0);
00052
00053 } else {
00054 ACE_NEW_RETURN(session_factory, BestEffortSessionFactory, 0);
00055 }
00056
00057 MulticastPeer local_peer = (ACE_INT64)RepoIdConverter(local_id).federationId() << 32
00058 | RepoIdConverter(local_id).participantId();
00059
00060 VDBG_LVL((LM_DEBUG, "(%P|%t) MulticastTransport[%C]::make_datalink "
00061 "peers: local %#08x%08x priority %d active %d\n",
00062 this->config_i_->name().c_str(), (unsigned int)(local_peer >> 32), (unsigned int)local_peer,
00063 priority, active), 2);
00064
00065 MulticastDataLink_rch link;
00066 ACE_NEW_RETURN(link,
00067 MulticastDataLink(this,
00068 session_factory.in(),
00069 local_peer,
00070 active),
00071 0);
00072
00073
00074 TransportReactorTask_rch rtask = reactor_task();
00075
00076 link->configure(config_i_.in(), rtask.in());
00077
00078
00079 MulticastSendStrategy* send_strategy;
00080 ACE_NEW_RETURN(send_strategy, MulticastSendStrategy(link.in()), 0);
00081 link->send_strategy(send_strategy);
00082
00083
00084 MulticastReceiveStrategy* recv_strategy;
00085 ACE_NEW_RETURN(recv_strategy, MulticastReceiveStrategy(link.in()), 0);
00086 link->receive_strategy(recv_strategy);
00087
00088
00089 if (!link->join(this->config_i_->group_address_)) {
00090 ACE_TCHAR str[64];
00091 this->config_i_->group_address_.addr_to_string(str,
00092 sizeof(str)/sizeof(str[0]));
00093 ACE_ERROR_RETURN((LM_ERROR,
00094 ACE_TEXT("(%P|%t) ERROR: ")
00095 ACE_TEXT("MulticastTransport::make_datalink: ")
00096 ACE_TEXT("failed to join multicast group: %s!\n"),
00097 str),
00098 0);
00099 }
00100
00101 return link._retn();
00102 }
00103
00104 MulticastSession*
00105 MulticastTransport::start_session(const MulticastDataLink_rch& link,
00106 MulticastPeer remote_peer, bool active)
00107 {
00108 if (link.is_nil()) {
00109 ACE_ERROR_RETURN((LM_ERROR,
00110 ACE_TEXT("(%P|%t) ERROR: ")
00111 ACE_TEXT("MulticastTransport[%C]::start_session: ")
00112 ACE_TEXT("link is nil\n"),
00113 this->config_i_->name().c_str()),
00114 0);
00115 }
00116
00117 MulticastSession_rch session = link->find_or_create_session(remote_peer);
00118
00119 if (session.is_nil()) {
00120 ACE_ERROR_RETURN((LM_ERROR,
00121 ACE_TEXT("(%P|%t) ERROR: ")
00122 ACE_TEXT("MulticastTransport[%C]::start_session: ")
00123 ACE_TEXT("failed to create session for remote peer: %#08x%08x!\n"),
00124 this->config_i_->name().c_str(),
00125 (unsigned int)(remote_peer >> 32),
00126 (unsigned int) remote_peer),
00127 0);
00128 }
00129
00130 const bool acked = this->connections_.count(std::make_pair(remote_peer, link->local_peer()));
00131
00132 if (!session->start(active, acked)) {
00133 ACE_ERROR_RETURN((LM_ERROR,
00134 ACE_TEXT("(%P|%t) ERROR: ")
00135 ACE_TEXT("MulticastTransport[%C]::start_session: ")
00136 ACE_TEXT("failed to start session for remote peer: %#08x%08x!\n"),
00137 this->config_i_->name().c_str(),
00138 (unsigned int)(remote_peer >> 32),
00139 (unsigned int) remote_peer),
00140 0);
00141 }
00142
00143 return session._retn();
00144 }
00145
00146 static bool
00147 get_remote_reliability(const TransportImpl::RemoteTransport& remote)
00148 {
00149 NetworkAddress network_address;
00150 ACE_CDR::Boolean reliable;
00151
00152 size_t len = remote.blob_.length();
00153 const char* buffer = reinterpret_cast<const char*>(remote.blob_.get_buffer());
00154
00155 ACE_InputCDR cdr(buffer, len);
00156 cdr >> network_address;
00157 cdr >> ACE_InputCDR::to_boolean(reliable);
00158
00159 return reliable;
00160 }
00161
00162 TransportImpl::AcceptConnectResult
00163 MulticastTransport::connect_datalink(const RemoteTransport& remote,
00164 const ConnectionAttribs& attribs,
00165 TransportClient*)
00166 {
00167
00168 if (get_remote_reliability(remote) != this->config_i_->is_reliable()) {
00169 return AcceptConnectResult();
00170 }
00171
00172 GuardThreadType guard_links(this->links_lock_);
00173 const MulticastPeer local_peer = (ACE_INT64)RepoIdConverter(attribs.local_id_).federationId() << 32
00174 | RepoIdConverter(attribs.local_id_).participantId();
00175 Links::const_iterator link_iter = this->client_links_.find(local_peer);
00176 MulticastDataLink_rch link;
00177
00178 if (link_iter == this->client_links_.end()) {
00179
00180 link = this->make_datalink(attribs.local_id_, attribs.priority_, true );
00181 this->client_links_[local_peer] = link;
00182 } else {
00183 link = link_iter->second;
00184 }
00185
00186 MulticastPeer remote_peer = (ACE_INT64)RepoIdConverter(remote.repo_id_).federationId() << 32
00187 | RepoIdConverter(remote.repo_id_).participantId();
00188
00189 MulticastSession_rch session =
00190 this->start_session(link, remote_peer, true );
00191
00192 if (session.is_nil()) {
00193 Links::iterator to_remove = this->client_links_.find(local_peer);
00194 if (to_remove != this->client_links_.end()) {
00195 this->client_links_.erase(to_remove);
00196 }
00197 return AcceptConnectResult();
00198 }
00199 return AcceptConnectResult(link._retn());
00200 }
00201
00202 TransportImpl::AcceptConnectResult
00203 MulticastTransport::accept_datalink(const RemoteTransport& remote,
00204 const ConnectionAttribs& attribs,
00205 TransportClient* client)
00206 {
00207
00208 if (get_remote_reliability(remote) != this->config_i_->is_reliable()) {
00209 return AcceptConnectResult();
00210 }
00211
00212 const MulticastPeer local_peer = (ACE_INT64)RepoIdConverter(attribs.local_id_).federationId() << 32
00213 | RepoIdConverter(attribs.local_id_).participantId();
00214
00215 GuardThreadType guard_links(this->links_lock_);
00216
00217 Links::const_iterator link_iter = this->server_links_.find(local_peer);
00218 MulticastDataLink_rch link;
00219
00220 if (link_iter == this->server_links_.end()) {
00221
00222 link = this->make_datalink(attribs.local_id_, attribs.priority_, false );
00223 this->server_links_[local_peer] = link;
00224 } else {
00225 link = link_iter->second;
00226 }
00227
00228 guard_links.release();
00229
00230 MulticastPeer remote_peer = (ACE_INT64)RepoIdConverter(remote.repo_id_).federationId() << 32
00231 | RepoIdConverter(remote.repo_id_).participantId();
00232 GuardThreadType guard(this->connections_lock_);
00233
00234 if (connections_.count(std::make_pair(remote_peer, local_peer))) {
00235
00236
00237 guard.release();
00238
00239 VDBG((LM_DEBUG, "(%P|%t) MulticastTransport::accept_datalink found\n"));
00240 MulticastSession_rch session =
00241 this->start_session(link, remote_peer, false );
00242
00243 if (session.is_nil()) {
00244 link = 0;
00245 }
00246 return AcceptConnectResult(link._retn());
00247
00248 } else {
00249
00250 this->pending_connections_[std::make_pair(remote_peer, local_peer)].
00251 push_back(std::pair<TransportClient*, RepoId>(client, remote.repo_id_));
00252
00253
00254 guard.release();
00255 MulticastSession_rch session =
00256 this->start_session(link, remote_peer, false );
00257
00258 return AcceptConnectResult(AcceptConnectResult::ACR_SUCCESS);
00259
00260 }
00261 }
00262
00263 void
00264 MulticastTransport::stop_accepting_or_connecting(TransportClient* client,
00265 const RepoId& remote_id)
00266 {
00267 VDBG((LM_DEBUG, "(%P|%t) MulticastTransport::stop_accepting_or_connecting\n"));
00268
00269 GuardThreadType guard(this->connections_lock_);
00270
00271 for (PendConnMap::iterator it = this->pending_connections_.begin();
00272 it != this->pending_connections_.end(); ++it) {
00273 bool erased_from_it = false;
00274 for (size_t i = 0; i < it->second.size(); ++i) {
00275 if (it->second[i].first == client && it->second[i].second == remote_id) {
00276 erased_from_it = true;
00277 it->second.erase(it->second.begin() + i);
00278 break;
00279 }
00280 }
00281
00282 if (erased_from_it && it->second.empty()) {
00283 this->pending_connections_.erase(it);
00284 return;
00285 }
00286 }
00287 }
00288
00289 void
00290 MulticastTransport::passive_connection(MulticastPeer local_peer, MulticastPeer remote_peer)
00291 {
00292 GuardThreadType guard(this->connections_lock_);
00293
00294 VDBG_LVL((LM_DEBUG, "(%P|%t) MulticastTransport[%C]::passive_connection "
00295 "from remote peer %#08x%08x to local peer %#08x%08x\n",
00296 this->config_i_->name().c_str(),
00297 (unsigned int) (remote_peer >> 32),
00298 (unsigned int) remote_peer,
00299 (unsigned int) (local_peer >> 32),
00300 (unsigned int) local_peer), 2);
00301
00302 const Peers peers(remote_peer, local_peer);
00303 const PendConnMap::iterator pend = this->pending_connections_.find(peers);
00304
00305
00306 this->connections_.insert(peers);
00307
00308 Links::const_iterator server_link = this->server_links_.find(local_peer);
00309 DataLink_rch link;
00310
00311 if (server_link != this->server_links_.end()) {
00312 link = static_rchandle_cast<DataLink>(server_link->second);
00313 MulticastSession_rch session = server_link->second->find_or_create_session(remote_peer);
00314 session->set_acked();
00315 }
00316
00317 if (pend != pending_connections_.end()) {
00318 Callbacks tmp(pend->second);
00319 for (size_t i = 0; i < tmp.size(); ++i) {
00320 const PendConnMap::iterator pend = pending_connections_.find(peers);
00321 if (pend != pending_connections_.end()) {
00322 const Callbacks::iterator tmp_iter = find(pend->second.begin(),
00323 pend->second.end(),
00324 tmp.at(i));
00325 if (tmp_iter != pend->second.end()) {
00326 TransportClient* pend_client = tmp.at(i).first;
00327 RepoId remote_repo = tmp.at(i).second;
00328 guard.release();
00329 pend_client->use_datalink(remote_repo, link);
00330 guard.acquire();
00331 }
00332 }
00333 }
00334 }
00335 }
00336
00337 bool
00338 MulticastTransport::configure_i(TransportInst* config)
00339 {
00340 this->config_i_ = dynamic_cast<MulticastInst*>(config);
00341
00342 if (this->config_i_ == 0) {
00343 ACE_ERROR_RETURN((LM_ERROR,
00344 ACE_TEXT("(%P|%t) ERROR: ")
00345 ACE_TEXT("MulticastTransport[%@]::configure_i: ")
00346 ACE_TEXT("invalid configuration!\n"), this),
00347 false);
00348 }
00349
00350 this->config_i_->_add_ref();
00351
00352
00353 if (this->config_i_->local_address_.empty () &&
00354 !TheServiceParticipant->default_address ().empty ()) {
00355 this->config_i_->local_address_ = TheServiceParticipant->default_address ().c_str ();
00356 }
00357
00358 if (!this->config_i_->group_address_.is_multicast()) {
00359 ACE_ERROR_RETURN((LM_ERROR,
00360 ACE_TEXT("(%P|%t) ERROR: ")
00361 ACE_TEXT("MulticastTransport[%@]::configure_i: ")
00362 ACE_TEXT("invalid configuration: address %C is not ")
00363 ACE_TEXT("multicast.\n"),
00364 this, this->config_i_->group_address_.get_host_addr()),
00365 false);
00366 }
00367
00368 this->create_reactor_task(this->config_i_->async_send_);
00369
00370 return true;
00371 }
00372
00373 void
00374 MulticastTransport::shutdown_i()
00375 {
00376 GuardThreadType guard_links(this->links_lock_);
00377 Links::iterator link;
00378
00379 for (link = this->client_links_.begin();
00380 link != this->client_links_.end();
00381 ++link) {
00382 if (link->second.in()) {
00383 link->second->transport_shutdown();
00384 }
00385 }
00386
00387 for (link = this->server_links_.begin();
00388 link != this->server_links_.end();
00389 ++link) {
00390 if (link->second.in()) {
00391 link->second->transport_shutdown();
00392 }
00393 }
00394
00395 this->config_i_ = 0;
00396 }
00397
00398 bool
00399 MulticastTransport::connection_info_i(TransportLocator& info) const
00400 {
00401 this->config_i_->populate_locator(info);
00402 return true;
00403 }
00404
00405 void
00406 MulticastTransport::release_datalink(DataLink* )
00407 {
00408
00409
00410 }
00411
00412 }
00413 }