00001
00002
00003
00004
00005
00006
00007
00008 #include "MulticastSession.h"
00009
00010 #include "ace/Log_Msg.h"
00011 #include <cmath>
00012 #ifndef __ACE_INLINE__
00013 # include "MulticastSession.inl"
00014 #endif
00015
00016 namespace OpenDDS {
00017 namespace DCPS {
00018
00019 SynWatchdog::SynWatchdog(ACE_Reactor* reactor,
00020 ACE_thread_t owner,
00021 MulticastSession* session)
00022 : DataLinkWatchdog (reactor, owner)
00023 , session_(session)
00024 , retries_(0)
00025 {
00026 }
00027
00028 bool
00029 SynWatchdog::reactor_is_shut_down() const
00030 {
00031 return session_->link()->transport()->is_shut_down();
00032 }
00033
00034 ACE_Time_Value
00035 SynWatchdog::next_interval()
00036 {
00037 MulticastInst* config = this->session_->link()->config();
00038 ACE_Time_Value interval(config->syn_interval_);
00039
00040
00041 if (this->retries_ > 0) {
00042 interval *= std::pow(config->syn_backoff_, double(this->retries_));
00043 }
00044 ++this->retries_;
00045
00046 return interval;
00047 }
00048
00049 void
00050 SynWatchdog::on_interval(const void* )
00051 {
00052
00053
00054 this->session_->send_syn();
00055 }
00056
00057 ACE_Time_Value
00058 SynWatchdog::next_timeout()
00059 {
00060 MulticastInst* config = this->session_->link()->config();
00061 return config->syn_timeout_;
00062 }
00063
00064 void
00065 SynWatchdog::on_timeout(const void* )
00066 {
00067
00068
00069 ACE_ERROR((LM_WARNING,
00070 ACE_TEXT("(%P|%t) WARNING: ")
00071 ACE_TEXT("SynWatchdog[transport=%C]::on_timeout: ")
00072 ACE_TEXT("timed out waiting on remote peer: %#08x%08x local: %#08x%08x\n"),
00073 this->session_->link()->config()->name().c_str(),
00074 (unsigned int)(this->session_->remote_peer() >> 32),
00075 (unsigned int) this->session_->remote_peer(),
00076 (unsigned int)(this->session_->link()->local_peer() >> 32),
00077 (unsigned int) this->session_->link()->local_peer()));
00078 }
00079
00080
00081 MulticastSession::MulticastSession(ACE_Reactor* reactor,
00082 ACE_thread_t owner,
00083 MulticastDataLink* link,
00084 MulticastPeer remote_peer)
00085 : link_(link)
00086 , remote_peer_(remote_peer)
00087 , reverse_start_lock_(start_lock_)
00088 , started_(false)
00089 , active_(true)
00090 , acked_(false)
00091 , syn_watchdog_(new SynWatchdog (reactor, owner, this))
00092 {
00093 }
00094
00095 MulticastSession::~MulticastSession()
00096 {
00097 syn_watchdog_->cancel();
00098 syn_watchdog_->wait();
00099 syn_watchdog_->destroy();
00100 }
00101
00102 bool
00103 MulticastSession::acked()
00104 {
00105 ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, guard, this->ack_lock_, false);
00106 return this->acked_;
00107 }
00108
00109 void
00110 MulticastSession::set_acked() {
00111 ACE_GUARD(ACE_SYNCH_MUTEX, guard, this->ack_lock_);
00112 this->acked_ = true;
00113 }
00114
00115 bool
00116 MulticastSession::start_syn()
00117 {
00118 return this->syn_watchdog_->schedule_now();
00119 }
00120
00121 void
00122 MulticastSession::send_control(char submessage_id, ACE_Message_Block* data)
00123 {
00124 DataSampleHeader header;
00125 ACE_Message_Block* control =
00126 this->link_->create_control(submessage_id, header, data);
00127 if (control == 0) {
00128 ACE_ERROR((LM_ERROR,
00129 ACE_TEXT("(%P|%t) ERROR: ")
00130 ACE_TEXT("MulticastSession::send_control: ")
00131 ACE_TEXT("create_control failed!\n")));
00132 return;
00133 }
00134
00135 int error = this->link_->send_control(header, control);
00136 if (error != SEND_CONTROL_OK) {
00137 ACE_ERROR((LM_ERROR,
00138 ACE_TEXT("(%P|%t) ERROR: ")
00139 ACE_TEXT("MulticastSession::send_control: ")
00140 ACE_TEXT("send_control failed: %d!\n"),
00141 error));
00142 return;
00143 }
00144 }
00145
00146 bool
00147 MulticastSession::control_received(char submessage_id,
00148 ACE_Message_Block* control)
00149 {
00150 switch (submessage_id) {
00151 case MULTICAST_SYN:
00152 syn_received(control);
00153 break;
00154
00155 case MULTICAST_SYNACK:
00156 synack_received(control);
00157 break;
00158
00159 default:
00160 return false;
00161 }
00162
00163 return true;
00164 }
00165
00166 void
00167 MulticastSession::syn_received(ACE_Message_Block* control)
00168 {
00169 if (this->active_) return;
00170
00171 const TransportHeader& header =
00172 this->link_->receive_strategy()->received_header();
00173
00174
00175 if (this->remote_peer_ != header.source_) return;
00176
00177 Serializer serializer(control, header.swap_bytes());
00178
00179 MulticastPeer local_peer;
00180 serializer >> local_peer;
00181
00182
00183 if (local_peer != this->link_->local_peer()) return;
00184
00185 VDBG_LVL((LM_DEBUG, "(%P|%t) MulticastSession[%C]::syn_received "
00186 "local %#08x%08x remote %#08x%08x\n",
00187 this->link()->config()->name().c_str(),
00188 (unsigned int)(this->link()->local_peer() >> 32),
00189 (unsigned int) this->link()->local_peer(),
00190 (unsigned int)(this->remote_peer_ >> 32),
00191 (unsigned int) this->remote_peer_), 2);
00192
00193 {
00194 ACE_GUARD(ACE_SYNCH_MUTEX, guard, this->ack_lock_);
00195
00196 if (!this->acked_) {
00197 this->acked_ = true;
00198 syn_hook(header.sequence_);
00199 }
00200 }
00201
00202
00203
00204 send_synack();
00205
00206 this->link_->transport()->passive_connection(this->link_->local_peer(), this->remote_peer_);
00207
00208 }
00209
00210 void
00211 MulticastSession::send_syn()
00212 {
00213 size_t len = sizeof(this->remote_peer_);
00214
00215 ACE_Message_Block* data;
00216 ACE_NEW(data, ACE_Message_Block(len));
00217
00218 Serializer serializer(data);
00219
00220 serializer << this->remote_peer_;
00221
00222 VDBG_LVL((LM_DEBUG, "(%P|%t) MulticastSession[%C]::send_syn "
00223 "local %#08x%08x remote %#08x%08x\n",
00224 this->link()->config()->name().c_str(),
00225 (unsigned int)(this->link()->local_peer() >> 32),
00226 (unsigned int) this->link()->local_peer(),
00227 (unsigned int)(this->remote_peer_ >> 32),
00228 (unsigned int) this->remote_peer_), 2);
00229
00230
00231 send_control(MULTICAST_SYN, data);
00232 }
00233
00234 void
00235 MulticastSession::synack_received(ACE_Message_Block* control)
00236 {
00237 if (!this->active_) return;
00238
00239
00240 if (this->acked()) return;
00241
00242 const TransportHeader& header =
00243 this->link_->receive_strategy()->received_header();
00244
00245
00246 if (this->remote_peer_ != header.source_) return;
00247
00248 Serializer serializer(control, header.swap_bytes());
00249
00250 MulticastPeer local_peer;
00251 serializer >> local_peer;
00252
00253
00254 if (local_peer != this->link_->local_peer()) return;
00255
00256 VDBG_LVL((LM_DEBUG, "(%P|%t) MulticastSession[%C]::synack_received "
00257 "local %#08x%08x remote %#08x%08x\n",
00258 this->link()->config()->name().c_str(),
00259 (unsigned int)(this->link()->local_peer() >> 32),
00260 (unsigned int) this->link()->local_peer(),
00261 (unsigned int)(this->remote_peer_ >> 32),
00262 (unsigned int) this->remote_peer_), 2);
00263
00264 {
00265 ACE_GUARD(ACE_SYNCH_MUTEX, guard, this->ack_lock_);
00266
00267 if (this->acked_) return;
00268
00269 this->syn_watchdog_->cancel();
00270 this->acked_ = true;
00271 }
00272 }
00273
00274 void
00275 MulticastSession::send_synack()
00276 {
00277 size_t len = sizeof(this->remote_peer_);
00278
00279 ACE_Message_Block* data;
00280 ACE_NEW(data, ACE_Message_Block(len));
00281
00282 Serializer serializer(data);
00283
00284 serializer << this->remote_peer_;
00285
00286 VDBG_LVL((LM_DEBUG, "(%P|%t) MulticastSession[%C]::send_synack "
00287 "local %#08x%08x remote %#08x%08x active %d\n",
00288 this->link()->config()->name().c_str(),
00289 (unsigned int)(this->link()->local_peer() >> 32),
00290 (unsigned int) this->link()->local_peer(),
00291 (unsigned int)(this->remote_peer_ >> 32),
00292 (unsigned int) this->remote_peer_,
00293 this->active_ ? 1 : 0), 2);
00294
00295
00296 send_control(MULTICAST_SYNACK, data);
00297
00298
00299
00300 send_naks();
00301 }
00302
00303 void
00304 MulticastSession::stop()
00305 {
00306 this->syn_watchdog_->cancel();
00307 }
00308
00309 bool
00310 MulticastSession::reassemble(ReceivedDataSample& data,
00311 const TransportHeader& header)
00312 {
00313 return this->reassembly_.reassemble(header.sequence_,
00314 header.first_fragment_,
00315 data);
00316 }
00317
00318 }
00319 }