00001
00002
00003
00004
00005
00006
00007
00008 #include "MulticastSession.h"
00009
00010 #include "ace/Log_Msg.h"
00011 #include <cmath>
00012 #ifndef __ACE_INLINE__
00013 # include "MulticastSession.inl"
00014 #endif
00015
00016 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00017
00018 namespace OpenDDS {
00019 namespace DCPS {
00020
00021 SynWatchdog::SynWatchdog(ACE_Reactor* reactor,
00022 ACE_thread_t owner,
00023 MulticastSession* session)
00024 : DataLinkWatchdog (reactor, owner)
00025 , session_(session)
00026 , retries_(0)
00027 {
00028 }
00029
00030 bool
00031 SynWatchdog::reactor_is_shut_down() const
00032 {
00033 return session_->link()->transport().is_shut_down();
00034 }
00035
00036 ACE_Time_Value
00037 SynWatchdog::next_interval()
00038 {
00039 MulticastInst& config = this->session_->link()->config();
00040 ACE_Time_Value interval(config.syn_interval_);
00041
00042
00043 if (this->retries_ > 0) {
00044 interval *= std::pow(config.syn_backoff_, double(this->retries_));
00045 }
00046 ++this->retries_;
00047
00048 return interval;
00049 }
00050
00051 void
00052 SynWatchdog::on_interval(const void* )
00053 {
00054
00055
00056 this->session_->send_syn();
00057 }
00058
00059 ACE_Time_Value
00060 SynWatchdog::next_timeout()
00061 {
00062 return this->session_->link()->config().syn_timeout_;
00063 }
00064
00065 void
00066 SynWatchdog::on_timeout(const void* )
00067 {
00068
00069
00070 ACE_ERROR((LM_WARNING,
00071 ACE_TEXT("(%P|%t) WARNING: ")
00072 ACE_TEXT("SynWatchdog[transport=%C]::on_timeout: ")
00073 ACE_TEXT("timed out waiting on remote peer: %#08x%08x local: %#08x%08x\n"),
00074 this->session_->link()->config().name().c_str(),
00075 (unsigned int)(this->session_->remote_peer() >> 32),
00076 (unsigned int) this->session_->remote_peer(),
00077 (unsigned int)(this->session_->link()->local_peer() >> 32),
00078 (unsigned int) this->session_->link()->local_peer()));
00079 }
00080
00081
00082 MulticastSession::MulticastSession(ACE_Reactor* reactor,
00083 ACE_thread_t owner,
00084 MulticastDataLink* link,
00085 MulticastPeer remote_peer)
00086 : link_(link)
00087 , remote_peer_(remote_peer)
00088 , reverse_start_lock_(start_lock_)
00089 , started_(false)
00090 , active_(true)
00091 , acked_(false)
00092 , syn_watchdog_(make_rch<SynWatchdog> (reactor, owner, this))
00093 {
00094 }
00095
00096 MulticastSession::~MulticastSession()
00097 {
00098 syn_watchdog_->cancel();
00099 syn_watchdog_->wait();
00100 }
00101
00102 bool
00103 MulticastSession::acked()
00104 {
00105 ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, guard, this->ack_lock_, false);
00106 return this->acked_;
00107 }
00108
00109 void
00110 MulticastSession::set_acked() {
00111 ACE_GUARD(ACE_SYNCH_MUTEX, guard, this->ack_lock_);
00112 this->acked_ = true;
00113 }
00114
00115 bool
00116 MulticastSession::start_syn()
00117 {
00118 return this->syn_watchdog_->schedule_now();
00119 }
00120
00121 void
00122 MulticastSession::send_control(char submessage_id, Message_Block_Ptr data)
00123 {
00124 DataSampleHeader header;
00125 Message_Block_Ptr control(
00126 this->link_->create_control(submessage_id, header, move(data)));
00127 if (!control) {
00128 ACE_ERROR((LM_ERROR,
00129 ACE_TEXT("(%P|%t) ERROR: ")
00130 ACE_TEXT("MulticastSession::send_control: ")
00131 ACE_TEXT("create_control failed!\n")));
00132 return;
00133 }
00134
00135 int error = this->link_->send_control(header, move(control));
00136 if (error != SEND_CONTROL_OK) {
00137 ACE_ERROR((LM_ERROR,
00138 ACE_TEXT("(%P|%t) ERROR: ")
00139 ACE_TEXT("MulticastSession::send_control: ")
00140 ACE_TEXT("send_control failed: %d!\n"),
00141 error));
00142 return;
00143 }
00144 }
00145
00146 bool
00147 MulticastSession::control_received(char submessage_id,
00148 const Message_Block_Ptr& control)
00149 {
00150 switch (submessage_id) {
00151 case MULTICAST_SYN:
00152 syn_received(control);
00153 break;
00154
00155 case MULTICAST_SYNACK:
00156 synack_received(control);
00157 break;
00158
00159 default:
00160 return false;
00161 }
00162
00163 return true;
00164 }
00165
00166 void
00167 MulticastSession::syn_received(const Message_Block_Ptr& control)
00168 {
00169 if (this->active_) return;
00170
00171 const TransportHeader& header =
00172 this->link_->receive_strategy()->received_header();
00173
00174
00175 if (this->remote_peer_ != header.source_) return;
00176
00177 Serializer serializer(control.get(), header.swap_bytes());
00178
00179 MulticastPeer local_peer;
00180 serializer >> local_peer;
00181
00182
00183 if (local_peer != this->link_->local_peer()) return;
00184
00185 VDBG_LVL((LM_DEBUG, "(%P|%t) MulticastSession[%C]::syn_received "
00186 "local %#08x%08x remote %#08x%08x\n",
00187 this->link()->config().name().c_str(),
00188 (unsigned int)(this->link()->local_peer() >> 32),
00189 (unsigned int) this->link()->local_peer(),
00190 (unsigned int)(this->remote_peer_ >> 32),
00191 (unsigned int) this->remote_peer_), 2);
00192
00193 {
00194 ACE_GUARD(ACE_SYNCH_MUTEX, guard, this->ack_lock_);
00195
00196 if (!this->acked_) {
00197 this->acked_ = true;
00198 syn_hook(header.sequence_);
00199 }
00200 }
00201
00202
00203
00204 send_synack();
00205
00206 this->link_->transport().passive_connection(this->link_->local_peer(), this->remote_peer_);
00207
00208 }
00209
00210 void
00211 MulticastSession::send_syn()
00212 {
00213 size_t len = sizeof(this->remote_peer_);
00214
00215 Message_Block_Ptr data( new ACE_Message_Block(len));
00216
00217 Serializer serializer(data.get());
00218
00219 serializer << this->remote_peer_;
00220
00221 VDBG_LVL((LM_DEBUG, "(%P|%t) MulticastSession[%C]::send_syn "
00222 "local %#08x%08x remote %#08x%08x\n",
00223 this->link()->config().name().c_str(),
00224 (unsigned int)(this->link()->local_peer() >> 32),
00225 (unsigned int) this->link()->local_peer(),
00226 (unsigned int)(this->remote_peer_ >> 32),
00227 (unsigned int) this->remote_peer_), 2);
00228
00229
00230 send_control(MULTICAST_SYN, move(data));
00231 }
00232
00233 void
00234 MulticastSession::synack_received(const Message_Block_Ptr& control)
00235 {
00236 if (!this->active_) return;
00237
00238
00239 if (this->acked()) return;
00240
00241 const TransportHeader& header =
00242 this->link_->receive_strategy()->received_header();
00243
00244
00245 if (this->remote_peer_ != header.source_) return;
00246
00247 Serializer serializer(control.get(), header.swap_bytes());
00248
00249 MulticastPeer local_peer;
00250 serializer >> local_peer;
00251
00252
00253 if (local_peer != this->link_->local_peer()) return;
00254
00255 VDBG_LVL((LM_DEBUG, "(%P|%t) MulticastSession[%C]::synack_received "
00256 "local %#08x%08x remote %#08x%08x\n",
00257 this->link()->config().name().c_str(),
00258 (unsigned int)(this->link()->local_peer() >> 32),
00259 (unsigned int) this->link()->local_peer(),
00260 (unsigned int)(this->remote_peer_ >> 32),
00261 (unsigned int) this->remote_peer_), 2);
00262
00263 {
00264 ACE_GUARD(ACE_SYNCH_MUTEX, guard, this->ack_lock_);
00265
00266 if (this->acked_) return;
00267
00268 this->syn_watchdog_->cancel();
00269 this->acked_ = true;
00270 }
00271 }
00272
00273 void
00274 MulticastSession::send_synack()
00275 {
00276 size_t len = sizeof(this->remote_peer_);
00277
00278 Message_Block_Ptr data(new ACE_Message_Block(len));
00279
00280 Serializer serializer(data.get());
00281
00282 serializer << this->remote_peer_;
00283
00284 VDBG_LVL((LM_DEBUG, "(%P|%t) MulticastSession[%C]::send_synack "
00285 "local %#08x%08x remote %#08x%08x active %d\n",
00286 this->link()->config().name().c_str(),
00287 (unsigned int)(this->link()->local_peer() >> 32),
00288 (unsigned int) this->link()->local_peer(),
00289 (unsigned int)(this->remote_peer_ >> 32),
00290 (unsigned int) this->remote_peer_,
00291 this->active_ ? 1 : 0), 2);
00292
00293
00294 send_control(MULTICAST_SYNACK, move(data));
00295
00296
00297
00298 send_naks();
00299 }
00300
00301 void
00302 MulticastSession::stop()
00303 {
00304 this->syn_watchdog_->cancel();
00305 }
00306
00307 bool
00308 MulticastSession::reassemble(ReceivedDataSample& data,
00309 const TransportHeader& header)
00310 {
00311 return this->reassembly_.reassemble(header.sequence_,
00312 header.first_fragment_,
00313 data);
00314 }
00315
00316 }
00317 }
00318
00319 OPENDDS_END_VERSIONED_NAMESPACE_DECL