MulticastSession.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "MulticastSession.h"
00009 
00010 #include "ace/Log_Msg.h"
00011 #include <cmath>
00012 #ifndef __ACE_INLINE__
00013 # include "MulticastSession.inl"
00014 #endif  /* __ACE_INLINE__ */
00015 
00016 namespace OpenDDS {
00017 namespace DCPS {
00018 
00019 SynWatchdog::SynWatchdog(ACE_Reactor* reactor,
00020                          ACE_thread_t owner,
00021                          MulticastSession* session)
00022   : DataLinkWatchdog (reactor, owner)
00023   , session_(session)
00024   , retries_(0)
00025 {
00026 }
00027 
00028 bool
00029 SynWatchdog::reactor_is_shut_down() const
00030 {
00031   return session_->link()->transport()->is_shut_down();
00032 }
00033 
00034 ACE_Time_Value
00035 SynWatchdog::next_interval()
00036 {
00037   MulticastInst* config = this->session_->link()->config();
00038   ACE_Time_Value interval(config->syn_interval_);
00039 
00040   // Apply exponential backoff based on number of retries:
00041   if (this->retries_ > 0) {
00042     interval *= std::pow(config->syn_backoff_, double(this->retries_));
00043   }
00044   ++this->retries_;
00045 
00046   return interval;
00047 }
00048 
00049 void
00050 SynWatchdog::on_interval(const void* /*arg*/)
00051 {
00052   // Initiate handshake by sending a MULTICAST_SYN control
00053   // sample to the assigned remote peer:
00054   this->session_->send_syn();
00055 }
00056 
00057 ACE_Time_Value
00058 SynWatchdog::next_timeout()
00059 {
00060   MulticastInst* config = this->session_->link()->config();
00061   return config->syn_timeout_;
00062 }
00063 
00064 void
00065 SynWatchdog::on_timeout(const void* /*arg*/)
00066 {
00067   // There is no recourse if a link is unable to handshake;
00068   // log an error and return:
00069   ACE_ERROR((LM_WARNING,
00070              ACE_TEXT("(%P|%t) WARNING: ")
00071              ACE_TEXT("SynWatchdog[transport=%C]::on_timeout: ")
00072              ACE_TEXT("timed out waiting on remote peer: %#08x%08x local: %#08x%08x\n"),
00073              this->session_->link()->config()->name().c_str(),
00074              (unsigned int)(this->session_->remote_peer() >> 32),
00075              (unsigned int) this->session_->remote_peer(),
00076              (unsigned int)(this->session_->link()->local_peer() >> 32),
00077              (unsigned int) this->session_->link()->local_peer()));
00078 }
00079 
00080 
00081 MulticastSession::MulticastSession(ACE_Reactor* reactor,
00082                                    ACE_thread_t owner,
00083                                    MulticastDataLink* link,
00084                                    MulticastPeer remote_peer)
00085   : link_(link)
00086   , remote_peer_(remote_peer)
00087   , reverse_start_lock_(start_lock_)
00088   , started_(false)
00089   , active_(true)
00090   , acked_(false)
00091   , syn_watchdog_(new SynWatchdog (reactor, owner, this))
00092 {
00093 }
00094 
00095 MulticastSession::~MulticastSession()
00096 {
00097   syn_watchdog_->cancel();
00098   syn_watchdog_->wait();
00099   syn_watchdog_->destroy();
00100 }
00101 
00102 bool
00103 MulticastSession::acked()
00104 {
00105   ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, guard, this->ack_lock_, false);
00106   return this->acked_;
00107 }
00108 
00109 void
00110 MulticastSession::set_acked() {
00111   ACE_GUARD(ACE_SYNCH_MUTEX, guard, this->ack_lock_);
00112   this->acked_ = true;
00113 }
00114 
00115 bool
00116 MulticastSession::start_syn()
00117 {
00118   return this->syn_watchdog_->schedule_now();
00119 }
00120 
00121 void
00122 MulticastSession::send_control(char submessage_id, ACE_Message_Block* data)
00123 {
00124   DataSampleHeader header;
00125   ACE_Message_Block* control =
00126     this->link_->create_control(submessage_id, header, data);
00127   if (control == 0) {
00128     ACE_ERROR((LM_ERROR,
00129                ACE_TEXT("(%P|%t) ERROR: ")
00130                ACE_TEXT("MulticastSession::send_control: ")
00131                ACE_TEXT("create_control failed!\n")));
00132     return;
00133   }
00134 
00135   int error = this->link_->send_control(header, control);
00136   if (error != SEND_CONTROL_OK) {
00137     ACE_ERROR((LM_ERROR,
00138                ACE_TEXT("(%P|%t) ERROR: ")
00139                ACE_TEXT("MulticastSession::send_control: ")
00140                ACE_TEXT("send_control failed: %d!\n"),
00141                error));
00142     return;
00143   }
00144 }
00145 
00146 bool
00147 MulticastSession::control_received(char submessage_id,
00148                                    ACE_Message_Block* control)
00149 {
00150   switch (submessage_id) {
00151   case MULTICAST_SYN:
00152     syn_received(control);
00153     break;
00154 
00155   case MULTICAST_SYNACK:
00156     synack_received(control);
00157     break;
00158 
00159   default:
00160     return false;
00161   }
00162 
00163   return true;
00164 }
00165 
00166 void
00167 MulticastSession::syn_received(ACE_Message_Block* control)
00168 {
00169   if (this->active_) return; // pub send syn, then doesn't receive them.
00170 
00171   const TransportHeader& header =
00172     this->link_->receive_strategy()->received_header();
00173 
00174   // Not from the remote peer for this session.
00175   if (this->remote_peer_ != header.source_) return;
00176 
00177   Serializer serializer(control, header.swap_bytes());
00178 
00179   MulticastPeer local_peer;
00180   serializer >> local_peer; // sent as remote_peer
00181 
00182   // Ignore sample if not destined for us:
00183   if (local_peer != this->link_->local_peer()) return;
00184 
00185   VDBG_LVL((LM_DEBUG, "(%P|%t) MulticastSession[%C]::syn_received "
00186                     "local %#08x%08x remote %#08x%08x\n",
00187                     this->link()->config()->name().c_str(),
00188                     (unsigned int)(this->link()->local_peer() >> 32),
00189                     (unsigned int) this->link()->local_peer(),
00190                     (unsigned int)(this->remote_peer_ >> 32),
00191                     (unsigned int) this->remote_peer_), 2);
00192 
00193   {
00194     ACE_GUARD(ACE_SYNCH_MUTEX, guard, this->ack_lock_);
00195 
00196     if (!this->acked_) {
00197       this->acked_ = true;
00198       syn_hook(header.sequence_);
00199     }
00200   }
00201 
00202   // MULTICAST_SYN control samples are always positively
00203   // acknowledged by a matching remote peer:
00204   send_synack();
00205 
00206   this->link_->transport()->passive_connection(this->link_->local_peer(), this->remote_peer_);
00207 
00208 }
00209 
00210 void
00211 MulticastSession::send_syn()
00212 {
00213   size_t len = sizeof(this->remote_peer_);
00214 
00215   ACE_Message_Block* data;
00216   ACE_NEW(data, ACE_Message_Block(len));
00217 
00218   Serializer serializer(data);
00219 
00220   serializer << this->remote_peer_;
00221 
00222   VDBG_LVL((LM_DEBUG, "(%P|%t) MulticastSession[%C]::send_syn "
00223                       "local %#08x%08x remote %#08x%08x\n",
00224                       this->link()->config()->name().c_str(),
00225                       (unsigned int)(this->link()->local_peer() >> 32),
00226                       (unsigned int) this->link()->local_peer(),
00227                       (unsigned int)(this->remote_peer_ >> 32),
00228                       (unsigned int) this->remote_peer_), 2);
00229 
00230   // Send control sample to remote peer:
00231   send_control(MULTICAST_SYN, data);
00232 }
00233 
00234 void
00235 MulticastSession::synack_received(ACE_Message_Block* control)
00236 {
00237   if (!this->active_) return; // sub send synack, then doesn't receive them.
00238 
00239   // Already received ack.
00240   if (this->acked()) return;
00241 
00242   const TransportHeader& header =
00243     this->link_->receive_strategy()->received_header();
00244 
00245   // Not from the remote peer for this session.
00246   if (this->remote_peer_ != header.source_) return;
00247 
00248   Serializer serializer(control, header.swap_bytes());
00249 
00250   MulticastPeer local_peer;
00251   serializer >> local_peer; // sent as remote_peer
00252 
00253   // Ignore sample if not destined for us:
00254   if (local_peer != this->link_->local_peer()) return;
00255 
00256   VDBG_LVL((LM_DEBUG, "(%P|%t) MulticastSession[%C]::synack_received "
00257                       "local %#08x%08x remote %#08x%08x\n",
00258                       this->link()->config()->name().c_str(),
00259                       (unsigned int)(this->link()->local_peer() >> 32),
00260                       (unsigned int) this->link()->local_peer(),
00261                       (unsigned int)(this->remote_peer_ >> 32),
00262                       (unsigned int) this->remote_peer_), 2);
00263 
00264   {
00265     ACE_GUARD(ACE_SYNCH_MUTEX, guard, this->ack_lock_);
00266 
00267     if (this->acked_) return; // already acked
00268 
00269     this->syn_watchdog_->cancel();
00270     this->acked_ = true;
00271   }
00272 }
00273 
00274 void
00275 MulticastSession::send_synack()
00276 {
00277   size_t len = sizeof(this->remote_peer_);
00278 
00279   ACE_Message_Block* data;
00280   ACE_NEW(data, ACE_Message_Block(len));
00281 
00282   Serializer serializer(data);
00283 
00284   serializer << this->remote_peer_;
00285 
00286   VDBG_LVL((LM_DEBUG, "(%P|%t) MulticastSession[%C]::send_synack "
00287                       "local %#08x%08x remote %#08x%08x active %d\n",
00288                       this->link()->config()->name().c_str(),
00289                       (unsigned int)(this->link()->local_peer() >> 32),
00290                       (unsigned int) this->link()->local_peer(),
00291                       (unsigned int)(this->remote_peer_ >> 32),
00292                       (unsigned int) this->remote_peer_,
00293                       this->active_ ? 1 : 0), 2);
00294 
00295   // Send control sample to remote peer:
00296   send_control(MULTICAST_SYNACK, data);
00297 
00298   // Send naks before sending synack to
00299   // reduce wait time for resends from remote.
00300   send_naks();
00301 }
00302 
00303 void
00304 MulticastSession::stop()
00305 {
00306   this->syn_watchdog_->cancel();
00307 }
00308 
00309 bool
00310 MulticastSession::reassemble(ReceivedDataSample& data,
00311                              const TransportHeader& header)
00312 {
00313   return this->reassembly_.reassemble(header.sequence_,
00314                                       header.first_fragment_,
00315                                       data);
00316 }
00317 
00318 } // namespace DCPS
00319 } // namespace OpenDDS

Generated on Fri Feb 12 20:05:24 2016 for OpenDDS by  doxygen 1.4.7