MulticastDataLink.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "MulticastDataLink.h"
00009 #include "MulticastSession.h"
00010 #include "MulticastSessionFactory.h"
00011 #include "MulticastTransport.h"
00012 #include "MulticastSendStrategy.h"
00013 #include "MulticastReceiveStrategy.h"
00014 
00015 #include "ace/Default_Constants.h"
00016 #include "ace/Global_Macros.h"
00017 #include "ace/Log_Msg.h"
00018 #include "ace/Truncate.h"
00019 #include "ace/OS_NS_sys_socket.h"
00020 
00021 #include "tao/ORB_Core.h"
00022 
00023 #include "dds/DCPS/Service_Participant.h"
00024 #include "dds/DCPS/transport/framework/NetworkAddress.h"
00025 #include "dds/DCPS/GuidConverter.h"
00026 #include "dds/DCPS/RepoIdConverter.h"
00027 
00028 #ifndef __ACE_INLINE__
00029 # include "MulticastDataLink.inl"
00030 #endif  /* __ACE_INLINE__ */
00031 
00032 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00033 
00034 namespace OpenDDS {
00035 namespace DCPS {
00036 
00037 MulticastDataLink::MulticastDataLink(MulticastTransport& transport,
00038     const MulticastSessionFactory_rch& session_factory,
00039     MulticastPeer local_peer,
00040     MulticastInst& config,
00041     TransportReactorTask* reactor_task,
00042     bool is_active)
00043 : DataLink(transport, 0 /*priority*/, false /*loopback*/, is_active),
00044   session_factory_(session_factory),
00045   local_peer_(local_peer),
00046   reactor_task_(reactor_task),
00047   send_strategy_(make_rch<MulticastSendStrategy>(this)),
00048   recv_strategy_(make_rch<MulticastReceiveStrategy>(this))
00049 {
00050   // A send buffer may be bound to the send strategy to ensure a
00051   // configured number of most-recent datagrams are retained:
00052   if (this->session_factory_->requires_send_buffer()) {
00053     this->send_buffer_.reset(new SingleSendBuffer(config.nak_depth_,
00054                                               config.max_samples_per_packet_));
00055     this->send_strategy_->send_buffer(this->send_buffer_.get());
00056   }
00057 }
00058 
00059 MulticastDataLink::~MulticastDataLink()
00060 {
00061   if (this->send_buffer_) {
00062     this->send_strategy_->send_buffer(0);
00063   }
00064 }
00065 
00066 
00067 bool
00068 MulticastDataLink::join(const ACE_INET_Addr& group_address)
00069 {
00070 
00071   const std::string& net_if = this->config().local_address_;
00072 #ifdef ACE_HAS_MAC_OSX
00073   socket_.opts(ACE_SOCK_Dgram_Mcast::OPT_BINDADDR_NO |
00074                ACE_SOCK_Dgram_Mcast::DEFOPT_NULLIFACE);
00075 #endif
00076   if (this->socket_.join(group_address, 1,
00077       net_if.empty() ? 0 :
00078           ACE_TEXT_CHAR_TO_TCHAR(net_if.c_str())) != 0) {
00079     ACE_ERROR_RETURN((LM_ERROR,
00080         ACE_TEXT("(%P|%t) ERROR: MulticastDataLink::join: ")
00081         ACE_TEXT("ACE_SOCK_Dgram_Mcast::join failed %m.\n")),
00082         false);
00083   }
00084   VDBG_LVL((LM_DEBUG, ACE_TEXT("(%P|%t) MulticastDataLink::join OK\n")), 6);
00085 
00086   ACE_HANDLE handle = this->socket_.get_handle();
00087 
00088   if (!OpenDDS::DCPS::set_socket_multicast_ttl(this->socket_, this->config().ttl_)) {
00089     ACE_ERROR_RETURN((LM_ERROR,
00090         ACE_TEXT("(%P|%t) ERROR: ")
00091         ACE_TEXT("MulticastDataLink::join: ")
00092         ACE_TEXT("OpenDDS::DCPS::set_socket_multicast_ttl failed.\n")),
00093         false);
00094   }
00095 
00096   int rcv_buffer_size = ACE_Utils::truncate_cast<int>(this->config().rcv_buffer_size_);
00097   if (rcv_buffer_size != 0
00098       && ACE_OS::setsockopt(handle, SOL_SOCKET,
00099           SO_RCVBUF,
00100           (char *) &rcv_buffer_size,
00101           sizeof (int)) < 0) {
00102     ACE_ERROR_RETURN((LM_ERROR,
00103         ACE_TEXT("(%P|%t) ERROR: ")
00104         ACE_TEXT("MulticastDataLink::join: ")
00105         ACE_TEXT("ACE_OS::setsockopt RCVBUF failed.\n")),
00106         false);
00107   }
00108 
00109 #if defined (ACE_DEFAULT_MAX_SOCKET_BUFSIZ)
00110   int snd_size = ACE_DEFAULT_MAX_SOCKET_BUFSIZ;
00111 
00112   if (ACE_OS::setsockopt(handle, SOL_SOCKET,
00113       SO_SNDBUF,
00114       (char *) &snd_size,
00115       sizeof(snd_size)) < 0
00116       && errno != ENOTSUP) {
00117     ACE_ERROR_RETURN((LM_ERROR,
00118         ACE_TEXT("(%P|%t) ERROR: ")
00119         ACE_TEXT("MulticastDataLink::join: ")
00120         ACE_TEXT("ACE_OS::setsockopt SNDBUF failed to set the send buffer size to %d errno %m\n"),
00121         snd_size),
00122         false);
00123   }
00124 #endif /* ACE_DEFAULT_MAX_SOCKET_BUFSIZ */
00125 
00126   if (start(static_rchandle_cast<TransportSendStrategy>(this->send_strategy_),
00127       static_rchandle_cast<TransportStrategy>(this->recv_strategy_))
00128       != 0) {
00129     this->socket_.close();
00130     ACE_ERROR_RETURN((LM_ERROR,
00131         ACE_TEXT("(%P|%t) ERROR: ")
00132         ACE_TEXT("MulticastDataLink::join: ")
00133         ACE_TEXT("DataLink::start failed!\n")),
00134         false);
00135   }
00136 
00137   return true;
00138 }
00139 
00140 MulticastSession_rch
00141 MulticastDataLink::find_session(MulticastPeer remote_peer)
00142 {
00143   ACE_GUARD_RETURN(ACE_SYNCH_RECURSIVE_MUTEX,
00144       guard,
00145       this->session_lock_,
00146       MulticastSession_rch());
00147 
00148   MulticastSessionMap::iterator it(this->sessions_.find(remote_peer));
00149   if (it != this->sessions_.end()) {
00150     return it->second;
00151   }
00152   else return MulticastSession_rch();
00153 }
00154 
00155 MulticastSession_rch
00156 MulticastDataLink::find_or_create_session(MulticastPeer remote_peer)
00157 {
00158   ACE_GUARD_RETURN(ACE_SYNCH_RECURSIVE_MUTEX,
00159       guard,
00160       this->session_lock_,
00161       MulticastSession_rch());
00162 
00163   MulticastSessionMap::iterator it(this->sessions_.find(remote_peer));
00164   if (it != this->sessions_.end()) {
00165     return it->second;
00166   }
00167 
00168   MulticastSession_rch session =
00169     this->session_factory_->create(transport().reactor(), transport().reactor_owner(), this, remote_peer);
00170   if (session.is_nil()) {
00171     ACE_ERROR_RETURN((LM_ERROR,
00172         ACE_TEXT("(%P|%t) ERROR: ")
00173         ACE_TEXT("MulticastDataLink::find_or_create_session: ")
00174         ACE_TEXT("failed to create session for remote peer: %#08x%08x!\n"),
00175         (unsigned int) (remote_peer >> 32),
00176         (unsigned int) remote_peer),
00177         MulticastSession_rch());
00178   }
00179 
00180   std::pair<MulticastSessionMap::iterator, bool> pair = this->sessions_.insert(
00181       MulticastSessionMap::value_type(remote_peer, session));
00182   if (pair.first == this->sessions_.end()) {
00183     ACE_ERROR_RETURN((LM_ERROR,
00184         ACE_TEXT("(%P|%t) ERROR: ")
00185         ACE_TEXT("MulticastDataLink::find_or_create_session: ")
00186         ACE_TEXT("failed to insert session for remote peer: %#08x%08x!\n"),
00187         (unsigned int) (remote_peer >> 32),
00188         (unsigned int) remote_peer),
00189         MulticastSession_rch());
00190   }
00191   return session;
00192 }
00193 
00194 bool
00195 MulticastDataLink::check_header(const TransportHeader& header)
00196 {
00197   ACE_GUARD_RETURN(ACE_SYNCH_RECURSIVE_MUTEX,
00198       guard,
00199       this->session_lock_,
00200       false);
00201 
00202   MulticastSessionMap::iterator it(this->sessions_.find(header.source_));
00203   if (it == this->sessions_.end() && is_active()) {
00204     return false;
00205   }
00206   if (it != this->sessions_.end() && it->second->acked()) {
00207     return it->second->check_header(header);
00208   }
00209 
00210   return true;
00211 }
00212 
00213 bool
00214 MulticastDataLink::check_header(const DataSampleHeader& header)
00215 {
00216   if (header.message_id_ == TRANSPORT_CONTROL) return true;
00217 
00218   ACE_GUARD_RETURN(ACE_SYNCH_RECURSIVE_MUTEX,
00219       guard,
00220       this->session_lock_,
00221       false);
00222 
00223   // Skip data sample unless there is a session for it.
00224   return (this->sessions_.count(receive_strategy()->received_header().source_) > 0);
00225 }
00226 
00227 bool
00228 MulticastDataLink::reassemble(ReceivedDataSample& data,
00229     const TransportHeader& header)
00230 {
00231   ACE_GUARD_RETURN(ACE_SYNCH_RECURSIVE_MUTEX,
00232       guard,
00233       this->session_lock_,
00234       false);
00235 
00236   MulticastSessionMap::iterator it(this->sessions_.find(header.source_));
00237   if (it == this->sessions_.end()) return false;
00238   if (it->second->acked()) {
00239     return it->second->reassemble(data, header);
00240   }
00241   return false;
00242 }
00243 
00244 void
00245 MulticastDataLink::sample_received(ReceivedDataSample& sample)
00246 {
00247   switch (sample.header_.message_id_) {
00248   case TRANSPORT_CONTROL: {
00249     // Transport control samples are delivered to all sessions
00250     // regardless of association status:
00251     {
00252       char* const ptr = sample.sample_ ? sample.sample_->rd_ptr() : 0;
00253 
00254       ACE_GUARD(ACE_SYNCH_RECURSIVE_MUTEX,
00255           guard,
00256           this->session_lock_);
00257 
00258       const TransportHeader& theader = receive_strategy()->received_header();
00259 
00260       if (!is_active() && sample.header_.submessage_id_ == MULTICAST_SYN &&
00261           sessions_.find(theader.source_) == sessions_.end()) {
00262         // We have received a SYN but there is no session (yet) for this source.
00263         // Depending on the data, we may need to send SYNACK.
00264 
00265         guard.release();
00266         syn_received_no_session(theader.source_, sample.sample_,
00267                                 theader.swap_bytes());
00268 
00269         guard.acquire();
00270         MulticastSessionMap::iterator s_itr = sessions_.find(theader.source_);
00271         if (s_itr != sessions_.end()) {
00272           s_itr->second->record_header_received(theader);
00273         }
00274 
00275         if (ptr) {
00276           sample.sample_->rd_ptr(ptr);
00277         }
00278         return;
00279       }
00280 
00281       MulticastSessionMap temp_sessions(sessions_);
00282       guard.release();
00283 
00284       for (MulticastSessionMap::iterator it(temp_sessions.begin());
00285           it != temp_sessions.end(); ++it) {
00286         it->second->control_received(sample.header_.submessage_id_,
00287                                      sample.sample_);
00288         it->second->record_header_received(theader);
00289 
00290         // reset read pointer
00291         if (ptr) {
00292           sample.sample_->rd_ptr(ptr);
00293         }
00294       }
00295     }
00296   } break;
00297 
00298   default:
00299 
00300     if (ready_to_deliver(sample)) {
00301       data_received(sample);
00302     }
00303     break;
00304   }
00305 }
00306 
00307 bool
00308 MulticastDataLink::ready_to_deliver(const ReceivedDataSample& data)
00309 {
00310   ACE_GUARD_RETURN(ACE_SYNCH_RECURSIVE_MUTEX,
00311       guard,
00312       this->session_lock_, false);
00313 
00314   const TransportHeader& theader = receive_strategy()->received_header();
00315 
00316   MulticastSessionMap::iterator session_it = sessions_.find(theader.source_);
00317   if (session_it != sessions_.end()) {
00318     MulticastSession_rch sess_rch(session_it->second);
00319     guard.release();
00320     return sess_rch->ready_to_deliver(theader, data);
00321   }
00322 
00323   return true;
00324 }
00325 
00326 void
00327 MulticastDataLink::release_remote_i(const RepoId& remote)
00328 {
00329   ACE_GUARD(ACE_SYNCH_RECURSIVE_MUTEX, guard, session_lock_);
00330   MulticastPeer remote_source = (ACE_INT64)RepoIdConverter(remote).federationId() << 32
00331                               | RepoIdConverter(remote).participantId();
00332   MulticastSessionMap::iterator session_it = sessions_.find(remote_source);
00333   if (session_it != sessions_.end() && session_it->second->is_reliable()) {
00334     session_it->second->release_remote(remote);
00335   }
00336 }
00337 
00338 void
00339 MulticastDataLink::syn_received_no_session(MulticastPeer source,
00340     const Message_Block_Ptr& data,
00341     bool swap_bytes)
00342 {
00343   Serializer serializer_read(data.get(), swap_bytes);
00344 
00345   MulticastPeer local_peer;
00346   serializer_read >> local_peer;
00347 
00348   if (local_peer != local_peer_) {
00349     return;
00350   }
00351 
00352   VDBG_LVL((LM_DEBUG, "(%P|%t) MulticastDataLink[%C]::syn_received_no_session "
00353       "send_synack local %#08x%08x remote %#08x%08x\n",
00354       this->config().name().c_str(),
00355       (unsigned int) (local_peer >> 32),
00356       (unsigned int) local_peer,
00357       (unsigned int) (source >> 32),
00358       (unsigned int) source), 2);
00359 
00360   Message_Block_Ptr synack_data(new ACE_Message_Block(sizeof(MulticastPeer)));
00361 
00362   Serializer serializer_write(synack_data.get());
00363   serializer_write << source;
00364 
00365   DataSampleHeader header;
00366   Message_Block_Ptr control(
00367       create_control(MULTICAST_SYNACK, header, move(synack_data)));
00368 
00369   if (control == 0) {
00370     ACE_ERROR((LM_ERROR,
00371                ACE_TEXT("(%P|%t) ERROR: ")
00372                ACE_TEXT("MulticastDataLink::syn_received_no_session: ")
00373                ACE_TEXT("create_control failed!\n")));
00374     return;
00375   }
00376 
00377   const int error = send_control(header, move(control));
00378   if (error != SEND_CONTROL_OK) {
00379     ACE_ERROR((LM_ERROR, "(%P|%t) MulticastDataLink::syn_received_no_session: "
00380         "ERROR: send_control failed: %d!\n", error));
00381     return;
00382   }
00383 
00384   transport().passive_connection(local_peer, source);
00385 }
00386 
00387 void
00388 MulticastDataLink::stop_i()
00389 {
00390   ACE_GUARD(ACE_SYNCH_RECURSIVE_MUTEX,
00391       guard,
00392       this->session_lock_);
00393 
00394   for (MulticastSessionMap::iterator it(this->sessions_.begin());
00395       it != this->sessions_.end(); ++it) {
00396     it->second->stop();
00397   }
00398   this->sessions_.clear();
00399 
00400   this->socket_.close();
00401 }
00402 
00403 } // namespace DCPS
00404 } // namespace OpenDDS
00405 
00406 OPENDDS_END_VERSIONED_NAMESPACE_DECL
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1