MulticastDataLink.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "MulticastDataLink.h"
00009 #include "MulticastSession.h"
00010 #include "MulticastSessionFactory.h"
00011 #include "MulticastTransport.h"
00012 
00013 #include "ace/Default_Constants.h"
00014 #include "ace/Global_Macros.h"
00015 #include "ace/Log_Msg.h"
00016 #include "ace/Truncate.h"
00017 #include "ace/OS_NS_sys_socket.h"
00018 
00019 #include "tao/ORB_Core.h"
00020 
00021 #include "dds/DCPS/Service_Participant.h"
00022 #include "dds/DCPS/transport/framework/NetworkAddress.h"
00023 #include "dds/DCPS/GuidConverter.h"
00024 #include "dds/DCPS/RepoIdConverter.h"
00025 
00026 #ifndef __ACE_INLINE__
00027 # include "MulticastDataLink.inl"
00028 #endif  /* __ACE_INLINE__ */
00029 
00030 namespace OpenDDS {
00031 namespace DCPS {
00032 
00033 MulticastDataLink::MulticastDataLink(MulticastTransport* transport,
00034     MulticastSessionFactory* session_factory,
00035     MulticastPeer local_peer,
00036     bool is_active)
00037 : DataLink(transport, 0 /*priority*/, false /*loopback*/, is_active),
00038   transport_(transport),
00039   session_factory_(session_factory, false),
00040   local_peer_(local_peer),
00041   config_(0),
00042   reactor_task_(0),
00043   send_buffer_(0)
00044 {
00045 }
00046 
00047 MulticastDataLink::~MulticastDataLink()
00048 {
00049   if (this->send_buffer_) {
00050     this->send_strategy_->send_buffer(0);
00051     delete this->send_buffer_;
00052   }
00053 }
00054 
00055 void
00056 MulticastDataLink::configure(MulticastInst* config,
00057     TransportReactorTask* reactor_task)
00058 {
00059   this->config_ = config;
00060   this->reactor_task_ = reactor_task;
00061 }
00062 
00063 void
00064 MulticastDataLink::send_strategy(MulticastSendStrategy* send_strategy)
00065 {
00066   // A send buffer may be bound to the send strategy to ensure a
00067   // configured number of most-recent datagrams are retained:
00068   if (this->session_factory_->requires_send_buffer()) {
00069     ACE_NEW_NORETURN(this->send_buffer_,
00070         SingleSendBuffer(this->config_->nak_depth_,
00071             this->config_->max_samples_per_packet_));
00072     if (!this->send_buffer_) {
00073       ACE_ERROR((LM_ERROR,
00074           ACE_TEXT("(%P|%t) ERROR: ")
00075           ACE_TEXT("MulticastDataLink::send_strategy: ")
00076           ACE_TEXT("failed to create SingleSendBuffer!\n")));
00077       return;
00078     }
00079     send_strategy->send_buffer(this->send_buffer_);
00080   }
00081   this->send_strategy_ = send_strategy;
00082 }
00083 
00084 void
00085 MulticastDataLink::receive_strategy(MulticastReceiveStrategy* recv_strategy)
00086 {
00087   this->recv_strategy_ = recv_strategy;
00088 }
00089 
00090 bool
00091 MulticastDataLink::join(const ACE_INET_Addr& group_address)
00092 {
00093   const std::string& net_if = this->config_->local_address_;
00094 #ifdef ACE_HAS_MAC_OSX
00095   socket_.opts(ACE_SOCK_Dgram_Mcast::OPT_BINDADDR_NO |
00096                ACE_SOCK_Dgram_Mcast::DEFOPT_NULLIFACE);
00097 #endif
00098   if (this->socket_.join(group_address, 1,
00099       net_if.empty() ? 0 :
00100           ACE_TEXT_CHAR_TO_TCHAR(net_if.c_str())) != 0) {
00101     ACE_ERROR_RETURN((LM_ERROR,
00102         ACE_TEXT("(%P|%t) ERROR: MulticastDataLink::join: ")
00103         ACE_TEXT("ACE_SOCK_Dgram_Mcast::join failed %m.\n")),
00104         false);
00105   }
00106   VDBG_LVL((LM_DEBUG, ACE_TEXT("(%P|%t) MulticastDataLink::join OK\n")), 6);
00107 
00108   ACE_HANDLE handle = this->socket_.get_handle();
00109 
00110   if (!OpenDDS::DCPS::set_socket_multicast_ttl(this->socket_, this->config_->ttl_)) {
00111     ACE_ERROR_RETURN((LM_ERROR,
00112         ACE_TEXT("(%P|%t) ERROR: ")
00113         ACE_TEXT("MulticastDataLink::join: ")
00114         ACE_TEXT("OpenDDS::DCPS::set_socket_multicast_ttl failed.\n")),
00115         false);
00116   }
00117 
00118   int rcv_buffer_size = ACE_Utils::truncate_cast<int>(this->config_->rcv_buffer_size_);
00119   if (rcv_buffer_size != 0
00120       && ACE_OS::setsockopt(handle, SOL_SOCKET,
00121           SO_RCVBUF,
00122           (char *) &rcv_buffer_size,
00123           sizeof (int)) < 0) {
00124     ACE_ERROR_RETURN((LM_ERROR,
00125         ACE_TEXT("(%P|%t) ERROR: ")
00126         ACE_TEXT("MulticastDataLink::join: ")
00127         ACE_TEXT("ACE_OS::setsockopt RCVBUF failed.\n")),
00128         false);
00129   }
00130 
00131 #if defined (ACE_DEFAULT_MAX_SOCKET_BUFSIZ)
00132   int snd_size = ACE_DEFAULT_MAX_SOCKET_BUFSIZ;
00133 
00134   if (ACE_OS::setsockopt(handle, SOL_SOCKET,
00135       SO_SNDBUF,
00136       (char *) &snd_size,
00137       sizeof(snd_size)) < 0
00138       && errno != ENOTSUP) {
00139     ACE_ERROR_RETURN((LM_ERROR,
00140         ACE_TEXT("(%P|%t) ERROR: ")
00141         ACE_TEXT("MulticastDataLink::join: ")
00142         ACE_TEXT("ACE_OS::setsockopt SNDBUF failed to set the send buffer size to %d errno %m\n"),
00143         snd_size),
00144         false);
00145   }
00146 #endif /* ACE_DEFAULT_MAX_SOCKET_BUFSIZ */
00147 
00148   if (start(static_rchandle_cast<TransportSendStrategy>(this->send_strategy_),
00149       static_rchandle_cast<TransportStrategy>(this->recv_strategy_))
00150       != 0) {
00151     this->socket_.close();
00152     ACE_ERROR_RETURN((LM_ERROR,
00153         ACE_TEXT("(%P|%t) ERROR: ")
00154         ACE_TEXT("MulticastDataLink::join: ")
00155         ACE_TEXT("DataLink::start failed!\n")),
00156         false);
00157   }
00158 
00159   return true;
00160 }
00161 
00162 MulticastSession*
00163 MulticastDataLink::find_session(MulticastPeer remote_peer)
00164 {
00165   ACE_GUARD_RETURN(ACE_SYNCH_RECURSIVE_MUTEX,
00166       guard,
00167       this->session_lock_,
00168       0);
00169 
00170   MulticastSessionMap::iterator it(this->sessions_.find(remote_peer));
00171   if (it != this->sessions_.end()) {
00172     MulticastSession_rch sess = it->second;
00173     return sess._retn();
00174   }
00175   else return 0;
00176 }
00177 
00178 MulticastSession*
00179 MulticastDataLink::find_or_create_session(MulticastPeer remote_peer)
00180 {
00181   ACE_GUARD_RETURN(ACE_SYNCH_RECURSIVE_MUTEX,
00182       guard,
00183       this->session_lock_,
00184       0);
00185 
00186   MulticastSessionMap::iterator it(this->sessions_.find(remote_peer));
00187   if (it != this->sessions_.end()) {
00188     MulticastSession_rch sess = it->second;
00189     return sess._retn();
00190   }
00191 
00192   MulticastSession_rch session =
00193     this->session_factory_->create(transport()->reactor(), transport()->reactor_owner(), this, remote_peer);
00194   if (session.is_nil()) {
00195     ACE_ERROR_RETURN((LM_ERROR,
00196         ACE_TEXT("(%P|%t) ERROR: ")
00197         ACE_TEXT("MulticastDataLink::find_or_create_session: ")
00198         ACE_TEXT("failed to create session for remote peer: %#08x%08x!\n"),
00199         (unsigned int) (remote_peer >> 32),
00200         (unsigned int) remote_peer),
00201         0);
00202   }
00203 
00204   std::pair<MulticastSessionMap::iterator, bool> pair = this->sessions_.insert(
00205       MulticastSessionMap::value_type(remote_peer, session));
00206   if (pair.first == this->sessions_.end()) {
00207     ACE_ERROR_RETURN((LM_ERROR,
00208         ACE_TEXT("(%P|%t) ERROR: ")
00209         ACE_TEXT("MulticastDataLink::find_or_create_session: ")
00210         ACE_TEXT("failed to insert session for remote peer: %#08x%08x!\n"),
00211         (unsigned int) (remote_peer >> 32),
00212         (unsigned int) remote_peer),
00213         0);
00214   }
00215   return session._retn();
00216 }
00217 
00218 bool
00219 MulticastDataLink::check_header(const TransportHeader& header)
00220 {
00221   ACE_GUARD_RETURN(ACE_SYNCH_RECURSIVE_MUTEX,
00222       guard,
00223       this->session_lock_,
00224       false);
00225 
00226   MulticastSessionMap::iterator it(this->sessions_.find(header.source_));
00227   if (it == this->sessions_.end() && is_active()) {
00228     return false;
00229   }
00230   if (it != this->sessions_.end() && it->second->acked()) {
00231     return it->second->check_header(header);
00232   }
00233 
00234   return true;
00235 }
00236 
00237 bool
00238 MulticastDataLink::check_header(const DataSampleHeader& header)
00239 {
00240   if (header.message_id_ == TRANSPORT_CONTROL) return true;
00241 
00242   ACE_GUARD_RETURN(ACE_SYNCH_RECURSIVE_MUTEX,
00243       guard,
00244       this->session_lock_,
00245       false);
00246 
00247   // Skip data sample unless there is a session for it.
00248   return (this->sessions_.count(receive_strategy()->received_header().source_) > 0);
00249 }
00250 
00251 bool
00252 MulticastDataLink::reassemble(ReceivedDataSample& data,
00253     const TransportHeader& header)
00254 {
00255   ACE_GUARD_RETURN(ACE_SYNCH_RECURSIVE_MUTEX,
00256       guard,
00257       this->session_lock_,
00258       false);
00259 
00260   MulticastSessionMap::iterator it(this->sessions_.find(header.source_));
00261   if (it == this->sessions_.end()) return false;
00262   if (it->second->acked()) {
00263     return it->second->reassemble(data, header);
00264   }
00265   return false;
00266 }
00267 
00268 void
00269 MulticastDataLink::sample_received(ReceivedDataSample& sample)
00270 {
00271   switch (sample.header_.message_id_) {
00272   case TRANSPORT_CONTROL: {
00273     // Transport control samples are delivered to all sessions
00274     // regardless of association status:
00275     {
00276       char* const ptr = sample.sample_ ? sample.sample_->rd_ptr() : 0;
00277 
00278       ACE_GUARD(ACE_SYNCH_RECURSIVE_MUTEX,
00279           guard,
00280           this->session_lock_);
00281 
00282       const TransportHeader& theader = receive_strategy()->received_header();
00283 
00284       if (!is_active() && sample.header_.submessage_id_ == MULTICAST_SYN &&
00285           sessions_.find(theader.source_) == sessions_.end()) {
00286         // We have received a SYN but there is no session (yet) for this source.
00287         // Depending on the data, we may need to send SYNACK.
00288 
00289         guard.release();
00290         syn_received_no_session(theader.source_, sample.sample_,
00291                                 theader.swap_bytes());
00292 
00293         guard.acquire();
00294         MulticastSessionMap::iterator s_itr = sessions_.find(theader.source_);
00295         if (s_itr != sessions_.end()) {
00296           s_itr->second->record_header_received(theader);
00297         }
00298 
00299         if (ptr) {
00300           sample.sample_->rd_ptr(ptr);
00301         }
00302         return;
00303       }
00304 
00305       MulticastSessionMap temp_sessions(sessions_);
00306       guard.release();
00307 
00308       for (MulticastSessionMap::iterator it(temp_sessions.begin());
00309           it != temp_sessions.end(); ++it) {
00310         it->second->control_received(sample.header_.submessage_id_,
00311                                      sample.sample_);
00312         it->second->record_header_received(theader);
00313 
00314         // reset read pointer
00315         if (ptr) {
00316           sample.sample_->rd_ptr(ptr);
00317         }
00318       }
00319     }
00320   } break;
00321 
00322   default:
00323 
00324     if (ready_to_deliver(sample)) {
00325       data_received(sample);
00326     }
00327     break;
00328   }
00329 }
00330 
00331 bool
00332 MulticastDataLink::ready_to_deliver(const ReceivedDataSample& data)
00333 {
00334   ACE_GUARD_RETURN(ACE_SYNCH_RECURSIVE_MUTEX,
00335       guard,
00336       this->session_lock_, false);
00337 
00338   const TransportHeader& theader = receive_strategy()->received_header();
00339 
00340   MulticastSessionMap::iterator session_it = sessions_.find(theader.source_);
00341   if (session_it != sessions_.end()) {
00342     MulticastSession_rch sess_rch(session_it->second);
00343     guard.release();
00344     return sess_rch->ready_to_deliver(theader, data);
00345   }
00346 
00347   return true;
00348 }
00349 
00350 void
00351 MulticastDataLink::release_remote_i(const RepoId& remote)
00352 {
00353   ACE_GUARD(ACE_SYNCH_RECURSIVE_MUTEX, guard, session_lock_);
00354   MulticastPeer remote_source = (ACE_INT64)RepoIdConverter(remote).federationId() << 32
00355                               | RepoIdConverter(remote).participantId();
00356   MulticastSessionMap::iterator session_it = sessions_.find(remote_source);
00357   if (session_it != sessions_.end() && session_it->second->is_reliable()) {
00358     session_it->second->release_remote(remote);
00359   }
00360 }
00361 
00362 void
00363 MulticastDataLink::syn_received_no_session(MulticastPeer source,
00364     ACE_Message_Block* data,
00365     bool swap_bytes)
00366 {
00367   Serializer serializer_read(data, swap_bytes);
00368 
00369   MulticastPeer local_peer;
00370   serializer_read >> local_peer;
00371 
00372   if (local_peer != local_peer_) {
00373     return;
00374   }
00375 
00376   VDBG_LVL((LM_DEBUG, "(%P|%t) MulticastDataLink[%C]::syn_received_no_session "
00377       "send_synack local %#08x%08x remote %#08x%08x\n",
00378       config_->name().c_str(),
00379       (unsigned int) (local_peer >> 32),
00380       (unsigned int) local_peer,
00381       (unsigned int) (source >> 32),
00382       (unsigned int) source), 2);
00383 
00384   ACE_Message_Block* synack_data = new ACE_Message_Block(sizeof(MulticastPeer));
00385 
00386   Serializer serializer_write(synack_data);
00387   serializer_write << source;
00388 
00389   DataSampleHeader header;
00390   ACE_Message_Block* control =
00391       create_control(MULTICAST_SYNACK, header, synack_data);
00392 
00393   const int error = send_control(header, control);
00394   if (error != SEND_CONTROL_OK) {
00395     ACE_ERROR((LM_ERROR, "(%P|%t) MulticastDataLink::syn_received_no_session: "
00396         "ERROR: send_control failed: %d!\n", error));
00397     return;
00398   }
00399 
00400   transport_->passive_connection(local_peer, source);
00401 }
00402 
00403 void
00404 MulticastDataLink::stop_i()
00405 {
00406   ACE_GUARD(ACE_SYNCH_RECURSIVE_MUTEX,
00407       guard,
00408       this->session_lock_);
00409 
00410   for (MulticastSessionMap::iterator it(this->sessions_.begin());
00411       it != this->sessions_.end(); ++it) {
00412     it->second->stop();
00413   }
00414   this->sessions_.clear();
00415 
00416   this->socket_.close();
00417 }
00418 
00419 } // namespace DCPS
00420 } // namespace OpenDDS

Generated on Fri Feb 12 20:05:24 2016 for OpenDDS by  doxygen 1.4.7