00001
00002
00003
00004
00005
00006
00007
00008 #include "MulticastDataLink.h"
00009 #include "MulticastSession.h"
00010 #include "MulticastSessionFactory.h"
00011 #include "MulticastTransport.h"
00012 #include "MulticastSendStrategy.h"
00013 #include "MulticastReceiveStrategy.h"
00014
00015 #include "ace/Default_Constants.h"
00016 #include "ace/Global_Macros.h"
00017 #include "ace/Log_Msg.h"
00018 #include "ace/Truncate.h"
00019 #include "ace/OS_NS_sys_socket.h"
00020
00021 #include "tao/ORB_Core.h"
00022
00023 #include "dds/DCPS/Service_Participant.h"
00024 #include "dds/DCPS/transport/framework/NetworkAddress.h"
00025 #include "dds/DCPS/GuidConverter.h"
00026 #include "dds/DCPS/RepoIdConverter.h"
00027
00028 #ifndef __ACE_INLINE__
00029 # include "MulticastDataLink.inl"
00030 #endif
00031
00032 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00033
00034 namespace OpenDDS {
00035 namespace DCPS {
00036
00037 MulticastDataLink::MulticastDataLink(MulticastTransport& transport,
00038 const MulticastSessionFactory_rch& session_factory,
00039 MulticastPeer local_peer,
00040 MulticastInst& config,
00041 TransportReactorTask* reactor_task,
00042 bool is_active)
00043 : DataLink(transport, 0 , false , is_active),
00044 session_factory_(session_factory),
00045 local_peer_(local_peer),
00046 reactor_task_(reactor_task),
00047 send_strategy_(make_rch<MulticastSendStrategy>(this)),
00048 recv_strategy_(make_rch<MulticastReceiveStrategy>(this))
00049 {
00050
00051
00052 if (this->session_factory_->requires_send_buffer()) {
00053 this->send_buffer_.reset(new SingleSendBuffer(config.nak_depth_,
00054 config.max_samples_per_packet_));
00055 this->send_strategy_->send_buffer(this->send_buffer_.get());
00056 }
00057 }
00058
00059 MulticastDataLink::~MulticastDataLink()
00060 {
00061 if (this->send_buffer_) {
00062 this->send_strategy_->send_buffer(0);
00063 }
00064 }
00065
00066
00067 bool
00068 MulticastDataLink::join(const ACE_INET_Addr& group_address)
00069 {
00070
00071 const std::string& net_if = this->config().local_address_;
00072 #ifdef ACE_HAS_MAC_OSX
00073 socket_.opts(ACE_SOCK_Dgram_Mcast::OPT_BINDADDR_NO |
00074 ACE_SOCK_Dgram_Mcast::DEFOPT_NULLIFACE);
00075 #endif
00076 if (this->socket_.join(group_address, 1,
00077 net_if.empty() ? 0 :
00078 ACE_TEXT_CHAR_TO_TCHAR(net_if.c_str())) != 0) {
00079 ACE_ERROR_RETURN((LM_ERROR,
00080 ACE_TEXT("(%P|%t) ERROR: MulticastDataLink::join: ")
00081 ACE_TEXT("ACE_SOCK_Dgram_Mcast::join failed %m.\n")),
00082 false);
00083 }
00084 VDBG_LVL((LM_DEBUG, ACE_TEXT("(%P|%t) MulticastDataLink::join OK\n")), 6);
00085
00086 ACE_HANDLE handle = this->socket_.get_handle();
00087
00088 if (!OpenDDS::DCPS::set_socket_multicast_ttl(this->socket_, this->config().ttl_)) {
00089 ACE_ERROR_RETURN((LM_ERROR,
00090 ACE_TEXT("(%P|%t) ERROR: ")
00091 ACE_TEXT("MulticastDataLink::join: ")
00092 ACE_TEXT("OpenDDS::DCPS::set_socket_multicast_ttl failed.\n")),
00093 false);
00094 }
00095
00096 int rcv_buffer_size = ACE_Utils::truncate_cast<int>(this->config().rcv_buffer_size_);
00097 if (rcv_buffer_size != 0
00098 && ACE_OS::setsockopt(handle, SOL_SOCKET,
00099 SO_RCVBUF,
00100 (char *) &rcv_buffer_size,
00101 sizeof (int)) < 0) {
00102 ACE_ERROR_RETURN((LM_ERROR,
00103 ACE_TEXT("(%P|%t) ERROR: ")
00104 ACE_TEXT("MulticastDataLink::join: ")
00105 ACE_TEXT("ACE_OS::setsockopt RCVBUF failed.\n")),
00106 false);
00107 }
00108
00109 #if defined (ACE_DEFAULT_MAX_SOCKET_BUFSIZ)
00110 int snd_size = ACE_DEFAULT_MAX_SOCKET_BUFSIZ;
00111
00112 if (ACE_OS::setsockopt(handle, SOL_SOCKET,
00113 SO_SNDBUF,
00114 (char *) &snd_size,
00115 sizeof(snd_size)) < 0
00116 && errno != ENOTSUP) {
00117 ACE_ERROR_RETURN((LM_ERROR,
00118 ACE_TEXT("(%P|%t) ERROR: ")
00119 ACE_TEXT("MulticastDataLink::join: ")
00120 ACE_TEXT("ACE_OS::setsockopt SNDBUF failed to set the send buffer size to %d errno %m\n"),
00121 snd_size),
00122 false);
00123 }
00124 #endif
00125
00126 if (start(static_rchandle_cast<TransportSendStrategy>(this->send_strategy_),
00127 static_rchandle_cast<TransportStrategy>(this->recv_strategy_))
00128 != 0) {
00129 this->socket_.close();
00130 ACE_ERROR_RETURN((LM_ERROR,
00131 ACE_TEXT("(%P|%t) ERROR: ")
00132 ACE_TEXT("MulticastDataLink::join: ")
00133 ACE_TEXT("DataLink::start failed!\n")),
00134 false);
00135 }
00136
00137 return true;
00138 }
00139
00140 MulticastSession_rch
00141 MulticastDataLink::find_session(MulticastPeer remote_peer)
00142 {
00143 ACE_GUARD_RETURN(ACE_SYNCH_RECURSIVE_MUTEX,
00144 guard,
00145 this->session_lock_,
00146 MulticastSession_rch());
00147
00148 MulticastSessionMap::iterator it(this->sessions_.find(remote_peer));
00149 if (it != this->sessions_.end()) {
00150 return it->second;
00151 }
00152 else return MulticastSession_rch();
00153 }
00154
00155 MulticastSession_rch
00156 MulticastDataLink::find_or_create_session(MulticastPeer remote_peer)
00157 {
00158 ACE_GUARD_RETURN(ACE_SYNCH_RECURSIVE_MUTEX,
00159 guard,
00160 this->session_lock_,
00161 MulticastSession_rch());
00162
00163 MulticastSessionMap::iterator it(this->sessions_.find(remote_peer));
00164 if (it != this->sessions_.end()) {
00165 return it->second;
00166 }
00167
00168 MulticastSession_rch session =
00169 this->session_factory_->create(transport().reactor(), transport().reactor_owner(), this, remote_peer);
00170 if (session.is_nil()) {
00171 ACE_ERROR_RETURN((LM_ERROR,
00172 ACE_TEXT("(%P|%t) ERROR: ")
00173 ACE_TEXT("MulticastDataLink::find_or_create_session: ")
00174 ACE_TEXT("failed to create session for remote peer: %#08x%08x!\n"),
00175 (unsigned int) (remote_peer >> 32),
00176 (unsigned int) remote_peer),
00177 MulticastSession_rch());
00178 }
00179
00180 std::pair<MulticastSessionMap::iterator, bool> pair = this->sessions_.insert(
00181 MulticastSessionMap::value_type(remote_peer, session));
00182 if (pair.first == this->sessions_.end()) {
00183 ACE_ERROR_RETURN((LM_ERROR,
00184 ACE_TEXT("(%P|%t) ERROR: ")
00185 ACE_TEXT("MulticastDataLink::find_or_create_session: ")
00186 ACE_TEXT("failed to insert session for remote peer: %#08x%08x!\n"),
00187 (unsigned int) (remote_peer >> 32),
00188 (unsigned int) remote_peer),
00189 MulticastSession_rch());
00190 }
00191 return session;
00192 }
00193
00194 bool
00195 MulticastDataLink::check_header(const TransportHeader& header)
00196 {
00197 ACE_GUARD_RETURN(ACE_SYNCH_RECURSIVE_MUTEX,
00198 guard,
00199 this->session_lock_,
00200 false);
00201
00202 MulticastSessionMap::iterator it(this->sessions_.find(header.source_));
00203 if (it == this->sessions_.end() && is_active()) {
00204 return false;
00205 }
00206 if (it != this->sessions_.end() && it->second->acked()) {
00207 return it->second->check_header(header);
00208 }
00209
00210 return true;
00211 }
00212
00213 bool
00214 MulticastDataLink::check_header(const DataSampleHeader& header)
00215 {
00216 if (header.message_id_ == TRANSPORT_CONTROL) return true;
00217
00218 ACE_GUARD_RETURN(ACE_SYNCH_RECURSIVE_MUTEX,
00219 guard,
00220 this->session_lock_,
00221 false);
00222
00223
00224 return (this->sessions_.count(receive_strategy()->received_header().source_) > 0);
00225 }
00226
00227 bool
00228 MulticastDataLink::reassemble(ReceivedDataSample& data,
00229 const TransportHeader& header)
00230 {
00231 ACE_GUARD_RETURN(ACE_SYNCH_RECURSIVE_MUTEX,
00232 guard,
00233 this->session_lock_,
00234 false);
00235
00236 MulticastSessionMap::iterator it(this->sessions_.find(header.source_));
00237 if (it == this->sessions_.end()) return false;
00238 if (it->second->acked()) {
00239 return it->second->reassemble(data, header);
00240 }
00241 return false;
00242 }
00243
00244 void
00245 MulticastDataLink::sample_received(ReceivedDataSample& sample)
00246 {
00247 switch (sample.header_.message_id_) {
00248 case TRANSPORT_CONTROL: {
00249
00250
00251 {
00252 char* const ptr = sample.sample_ ? sample.sample_->rd_ptr() : 0;
00253
00254 ACE_GUARD(ACE_SYNCH_RECURSIVE_MUTEX,
00255 guard,
00256 this->session_lock_);
00257
00258 const TransportHeader& theader = receive_strategy()->received_header();
00259
00260 if (!is_active() && sample.header_.submessage_id_ == MULTICAST_SYN &&
00261 sessions_.find(theader.source_) == sessions_.end()) {
00262
00263
00264
00265 guard.release();
00266 syn_received_no_session(theader.source_, sample.sample_,
00267 theader.swap_bytes());
00268
00269 guard.acquire();
00270 MulticastSessionMap::iterator s_itr = sessions_.find(theader.source_);
00271 if (s_itr != sessions_.end()) {
00272 s_itr->second->record_header_received(theader);
00273 }
00274
00275 if (ptr) {
00276 sample.sample_->rd_ptr(ptr);
00277 }
00278 return;
00279 }
00280
00281 MulticastSessionMap temp_sessions(sessions_);
00282 guard.release();
00283
00284 for (MulticastSessionMap::iterator it(temp_sessions.begin());
00285 it != temp_sessions.end(); ++it) {
00286 it->second->control_received(sample.header_.submessage_id_,
00287 sample.sample_);
00288 it->second->record_header_received(theader);
00289
00290
00291 if (ptr) {
00292 sample.sample_->rd_ptr(ptr);
00293 }
00294 }
00295 }
00296 } break;
00297
00298 default:
00299
00300 if (ready_to_deliver(sample)) {
00301 data_received(sample);
00302 }
00303 break;
00304 }
00305 }
00306
00307 bool
00308 MulticastDataLink::ready_to_deliver(const ReceivedDataSample& data)
00309 {
00310 ACE_GUARD_RETURN(ACE_SYNCH_RECURSIVE_MUTEX,
00311 guard,
00312 this->session_lock_, false);
00313
00314 const TransportHeader& theader = receive_strategy()->received_header();
00315
00316 MulticastSessionMap::iterator session_it = sessions_.find(theader.source_);
00317 if (session_it != sessions_.end()) {
00318 MulticastSession_rch sess_rch(session_it->second);
00319 guard.release();
00320 return sess_rch->ready_to_deliver(theader, data);
00321 }
00322
00323 return true;
00324 }
00325
00326 void
00327 MulticastDataLink::release_remote_i(const RepoId& remote)
00328 {
00329 ACE_GUARD(ACE_SYNCH_RECURSIVE_MUTEX, guard, session_lock_);
00330 MulticastPeer remote_source = (ACE_INT64)RepoIdConverter(remote).federationId() << 32
00331 | RepoIdConverter(remote).participantId();
00332 MulticastSessionMap::iterator session_it = sessions_.find(remote_source);
00333 if (session_it != sessions_.end() && session_it->second->is_reliable()) {
00334 session_it->second->release_remote(remote);
00335 }
00336 }
00337
00338 void
00339 MulticastDataLink::syn_received_no_session(MulticastPeer source,
00340 const Message_Block_Ptr& data,
00341 bool swap_bytes)
00342 {
00343 Serializer serializer_read(data.get(), swap_bytes);
00344
00345 MulticastPeer local_peer;
00346 serializer_read >> local_peer;
00347
00348 if (local_peer != local_peer_) {
00349 return;
00350 }
00351
00352 VDBG_LVL((LM_DEBUG, "(%P|%t) MulticastDataLink[%C]::syn_received_no_session "
00353 "send_synack local %#08x%08x remote %#08x%08x\n",
00354 this->config().name().c_str(),
00355 (unsigned int) (local_peer >> 32),
00356 (unsigned int) local_peer,
00357 (unsigned int) (source >> 32),
00358 (unsigned int) source), 2);
00359
00360 Message_Block_Ptr synack_data(new ACE_Message_Block(sizeof(MulticastPeer)));
00361
00362 Serializer serializer_write(synack_data.get());
00363 serializer_write << source;
00364
00365 DataSampleHeader header;
00366 Message_Block_Ptr control(
00367 create_control(MULTICAST_SYNACK, header, move(synack_data)));
00368
00369 if (control == 0) {
00370 ACE_ERROR((LM_ERROR,
00371 ACE_TEXT("(%P|%t) ERROR: ")
00372 ACE_TEXT("MulticastDataLink::syn_received_no_session: ")
00373 ACE_TEXT("create_control failed!\n")));
00374 return;
00375 }
00376
00377 const int error = send_control(header, move(control));
00378 if (error != SEND_CONTROL_OK) {
00379 ACE_ERROR((LM_ERROR, "(%P|%t) MulticastDataLink::syn_received_no_session: "
00380 "ERROR: send_control failed: %d!\n", error));
00381 return;
00382 }
00383
00384 transport().passive_connection(local_peer, source);
00385 }
00386
00387 void
00388 MulticastDataLink::stop_i()
00389 {
00390 ACE_GUARD(ACE_SYNCH_RECURSIVE_MUTEX,
00391 guard,
00392 this->session_lock_);
00393
00394 for (MulticastSessionMap::iterator it(this->sessions_.begin());
00395 it != this->sessions_.end(); ++it) {
00396 it->second->stop();
00397 }
00398 this->sessions_.clear();
00399
00400 this->socket_.close();
00401 }
00402
00403 }
00404 }
00405
00406 OPENDDS_END_VERSIONED_NAMESPACE_DECL