MulticastSession.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "MulticastSession.h"
00009 
00010 #include "ace/Log_Msg.h"
00011 #include <cmath>
00012 #ifndef __ACE_INLINE__
00013 # include "MulticastSession.inl"
00014 #endif  /* __ACE_INLINE__ */
00015 
00016 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00017 
00018 namespace OpenDDS {
00019 namespace DCPS {
00020 
00021 SynWatchdog::SynWatchdog(ACE_Reactor* reactor,
00022                          ACE_thread_t owner,
00023                          MulticastSession* session)
00024   : DataLinkWatchdog (reactor, owner)
00025   , session_(session)
00026   , retries_(0)
00027 {
00028 }
00029 
00030 bool
00031 SynWatchdog::reactor_is_shut_down() const
00032 {
00033   return session_->link()->transport().is_shut_down();
00034 }
00035 
00036 ACE_Time_Value
00037 SynWatchdog::next_interval()
00038 {
00039   MulticastInst& config = this->session_->link()->config();
00040   ACE_Time_Value interval(config.syn_interval_);
00041 
00042   // Apply exponential backoff based on number of retries:
00043   if (this->retries_ > 0) {
00044     interval *= std::pow(config.syn_backoff_, double(this->retries_));
00045   }
00046   ++this->retries_;
00047 
00048   return interval;
00049 }
00050 
00051 void
00052 SynWatchdog::on_interval(const void* /*arg*/)
00053 {
00054   // Initiate handshake by sending a MULTICAST_SYN control
00055   // sample to the assigned remote peer:
00056   this->session_->send_syn();
00057 }
00058 
00059 ACE_Time_Value
00060 SynWatchdog::next_timeout()
00061 {
00062   return this->session_->link()->config().syn_timeout_;
00063 }
00064 
00065 void
00066 SynWatchdog::on_timeout(const void* /*arg*/)
00067 {
00068   // There is no recourse if a link is unable to handshake;
00069   // log an error and return:
00070   ACE_ERROR((LM_WARNING,
00071              ACE_TEXT("(%P|%t) WARNING: ")
00072              ACE_TEXT("SynWatchdog[transport=%C]::on_timeout: ")
00073              ACE_TEXT("timed out waiting on remote peer: %#08x%08x local: %#08x%08x\n"),
00074              this->session_->link()->config().name().c_str(),
00075              (unsigned int)(this->session_->remote_peer() >> 32),
00076              (unsigned int) this->session_->remote_peer(),
00077              (unsigned int)(this->session_->link()->local_peer() >> 32),
00078              (unsigned int) this->session_->link()->local_peer()));
00079 }
00080 
00081 
00082 MulticastSession::MulticastSession(ACE_Reactor* reactor,
00083                                    ACE_thread_t owner,
00084                                    MulticastDataLink* link,
00085                                    MulticastPeer remote_peer)
00086   : link_(link)
00087   , remote_peer_(remote_peer)
00088   , reverse_start_lock_(start_lock_)
00089   , started_(false)
00090   , active_(true)
00091   , acked_(false)
00092   , syn_watchdog_(make_rch<SynWatchdog> (reactor, owner, this))
00093 {
00094 }
00095 
00096 MulticastSession::~MulticastSession()
00097 {
00098   syn_watchdog_->cancel();
00099   syn_watchdog_->wait();
00100 }
00101 
00102 bool
00103 MulticastSession::acked()
00104 {
00105   ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, guard, this->ack_lock_, false);
00106   return this->acked_;
00107 }
00108 
00109 void
00110 MulticastSession::set_acked() {
00111   ACE_GUARD(ACE_SYNCH_MUTEX, guard, this->ack_lock_);
00112   this->acked_ = true;
00113 }
00114 
00115 bool
00116 MulticastSession::start_syn()
00117 {
00118   return this->syn_watchdog_->schedule_now();
00119 }
00120 
00121 void
00122 MulticastSession::send_control(char submessage_id, Message_Block_Ptr data)
00123 {
00124   DataSampleHeader header;
00125   Message_Block_Ptr control(
00126     this->link_->create_control(submessage_id, header, move(data)));
00127   if (!control) {
00128     ACE_ERROR((LM_ERROR,
00129                ACE_TEXT("(%P|%t) ERROR: ")
00130                ACE_TEXT("MulticastSession::send_control: ")
00131                ACE_TEXT("create_control failed!\n")));
00132     return;
00133   }
00134 
00135   int error = this->link_->send_control(header, move(control));
00136   if (error != SEND_CONTROL_OK) {
00137     ACE_ERROR((LM_ERROR,
00138                ACE_TEXT("(%P|%t) ERROR: ")
00139                ACE_TEXT("MulticastSession::send_control: ")
00140                ACE_TEXT("send_control failed: %d!\n"),
00141                error));
00142     return;
00143   }
00144 }
00145 
00146 bool
00147 MulticastSession::control_received(char submessage_id,
00148                                    const Message_Block_Ptr& control)
00149 {
00150   switch (submessage_id) {
00151   case MULTICAST_SYN:
00152     syn_received(control);
00153     break;
00154 
00155   case MULTICAST_SYNACK:
00156     synack_received(control);
00157     break;
00158 
00159   default:
00160     return false;
00161   }
00162 
00163   return true;
00164 }
00165 
00166 void
00167 MulticastSession::syn_received(const Message_Block_Ptr& control)
00168 {
00169   if (this->active_) return; // pub send syn, then doesn't receive them.
00170 
00171   const TransportHeader& header =
00172     this->link_->receive_strategy()->received_header();
00173 
00174   // Not from the remote peer for this session.
00175   if (this->remote_peer_ != header.source_) return;
00176 
00177   Serializer serializer(control.get(), header.swap_bytes());
00178 
00179   MulticastPeer local_peer;
00180   serializer >> local_peer; // sent as remote_peer
00181 
00182   // Ignore sample if not destined for us:
00183   if (local_peer != this->link_->local_peer()) return;
00184 
00185   VDBG_LVL((LM_DEBUG, "(%P|%t) MulticastSession[%C]::syn_received "
00186                     "local %#08x%08x remote %#08x%08x\n",
00187                     this->link()->config().name().c_str(),
00188                     (unsigned int)(this->link()->local_peer() >> 32),
00189                     (unsigned int) this->link()->local_peer(),
00190                     (unsigned int)(this->remote_peer_ >> 32),
00191                     (unsigned int) this->remote_peer_), 2);
00192 
00193   {
00194     ACE_GUARD(ACE_SYNCH_MUTEX, guard, this->ack_lock_);
00195 
00196     if (!this->acked_) {
00197       this->acked_ = true;
00198       syn_hook(header.sequence_);
00199     }
00200   }
00201 
00202   // MULTICAST_SYN control samples are always positively
00203   // acknowledged by a matching remote peer:
00204   send_synack();
00205 
00206   this->link_->transport().passive_connection(this->link_->local_peer(), this->remote_peer_);
00207 
00208 }
00209 
00210 void
00211 MulticastSession::send_syn()
00212 {
00213   size_t len = sizeof(this->remote_peer_);
00214 
00215   Message_Block_Ptr data( new ACE_Message_Block(len));
00216 
00217   Serializer serializer(data.get());
00218 
00219   serializer << this->remote_peer_;
00220 
00221   VDBG_LVL((LM_DEBUG, "(%P|%t) MulticastSession[%C]::send_syn "
00222                       "local %#08x%08x remote %#08x%08x\n",
00223                       this->link()->config().name().c_str(),
00224                       (unsigned int)(this->link()->local_peer() >> 32),
00225                       (unsigned int) this->link()->local_peer(),
00226                       (unsigned int)(this->remote_peer_ >> 32),
00227                       (unsigned int) this->remote_peer_), 2);
00228 
00229   // Send control sample to remote peer:
00230   send_control(MULTICAST_SYN, move(data));
00231 }
00232 
00233 void
00234 MulticastSession::synack_received(const Message_Block_Ptr& control)
00235 {
00236   if (!this->active_) return; // sub send synack, then doesn't receive them.
00237 
00238   // Already received ack.
00239   if (this->acked()) return;
00240 
00241   const TransportHeader& header =
00242     this->link_->receive_strategy()->received_header();
00243 
00244   // Not from the remote peer for this session.
00245   if (this->remote_peer_ != header.source_) return;
00246 
00247   Serializer serializer(control.get(), header.swap_bytes());
00248 
00249   MulticastPeer local_peer;
00250   serializer >> local_peer; // sent as remote_peer
00251 
00252   // Ignore sample if not destined for us:
00253   if (local_peer != this->link_->local_peer()) return;
00254 
00255   VDBG_LVL((LM_DEBUG, "(%P|%t) MulticastSession[%C]::synack_received "
00256                       "local %#08x%08x remote %#08x%08x\n",
00257                       this->link()->config().name().c_str(),
00258                       (unsigned int)(this->link()->local_peer() >> 32),
00259                       (unsigned int) this->link()->local_peer(),
00260                       (unsigned int)(this->remote_peer_ >> 32),
00261                       (unsigned int) this->remote_peer_), 2);
00262 
00263   {
00264     ACE_GUARD(ACE_SYNCH_MUTEX, guard, this->ack_lock_);
00265 
00266     if (this->acked_) return; // already acked
00267 
00268     this->syn_watchdog_->cancel();
00269     this->acked_ = true;
00270   }
00271 }
00272 
00273 void
00274 MulticastSession::send_synack()
00275 {
00276   size_t len = sizeof(this->remote_peer_);
00277 
00278   Message_Block_Ptr data(new ACE_Message_Block(len));
00279 
00280   Serializer serializer(data.get());
00281 
00282   serializer << this->remote_peer_;
00283 
00284   VDBG_LVL((LM_DEBUG, "(%P|%t) MulticastSession[%C]::send_synack "
00285                       "local %#08x%08x remote %#08x%08x active %d\n",
00286                       this->link()->config().name().c_str(),
00287                       (unsigned int)(this->link()->local_peer() >> 32),
00288                       (unsigned int) this->link()->local_peer(),
00289                       (unsigned int)(this->remote_peer_ >> 32),
00290                       (unsigned int) this->remote_peer_,
00291                       this->active_ ? 1 : 0), 2);
00292 
00293   // Send control sample to remote peer:
00294   send_control(MULTICAST_SYNACK, move(data));
00295 
00296   // Send naks before sending synack to
00297   // reduce wait time for resends from remote.
00298   send_naks();
00299 }
00300 
00301 void
00302 MulticastSession::stop()
00303 {
00304   this->syn_watchdog_->cancel();
00305 }
00306 
00307 bool
00308 MulticastSession::reassemble(ReceivedDataSample& data,
00309                              const TransportHeader& header)
00310 {
00311   return this->reassembly_.reassemble(header.sequence_,
00312                                       header.first_fragment_,
00313                                       data);
00314 }
00315 
00316 } // namespace DCPS
00317 } // namespace OpenDDS
00318 
00319 OPENDDS_END_VERSIONED_NAMESPACE_DECL
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1