#include <MulticastSession.h>
Definition at line 58 of file MulticastSession.h.
typedef ACE_Reverse_Lock<ACE_Thread_Mutex> OpenDDS::DCPS::MulticastSession::Reverse_Lock_t [protected] |
Definition at line 109 of file MulticastSession.h.
OpenDDS::DCPS::MulticastSession::~MulticastSession | ( | ) | [virtual] |
Definition at line 96 of file MulticastSession.cpp.
References syn_watchdog_.
00097 { 00098 syn_watchdog_->cancel(); 00099 syn_watchdog_->wait(); 00100 }
OpenDDS::DCPS::MulticastSession::MulticastSession | ( | ACE_Reactor * | reactor, | |
ACE_thread_t | owner, | |||
MulticastDataLink * | link, | |||
MulticastPeer | remote_peer | |||
) | [protected] |
Definition at line 82 of file MulticastSession.cpp.
00086 : link_(link) 00087 , remote_peer_(remote_peer) 00088 , reverse_start_lock_(start_lock_) 00089 , started_(false) 00090 , active_(true) 00091 , acked_(false) 00092 , syn_watchdog_(make_rch<SynWatchdog> (reactor, owner, this)) 00093 { 00094 }
bool OpenDDS::DCPS::MulticastSession::acked | ( | ) |
Definition at line 103 of file MulticastSession.cpp.
References ack_lock_, and acked_.
Referenced by OpenDDS::DCPS::ReliableSession::ready_to_deliver(), OpenDDS::DCPS::ReliableSession::send_naks(), and synack_received().
00104 { 00105 ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, guard, this->ack_lock_, false); 00106 return this->acked_; 00107 }
virtual bool OpenDDS::DCPS::MulticastSession::check_header | ( | const TransportHeader & | header | ) | [pure virtual] |
Implemented in OpenDDS::DCPS::BestEffortSession, and OpenDDS::DCPS::ReliableSession.
bool OpenDDS::DCPS::MulticastSession::control_received | ( | char | submessage_id, | |
const Message_Block_Ptr & | control | |||
) | [virtual] |
Reimplemented in OpenDDS::DCPS::ReliableSession.
Definition at line 147 of file MulticastSession.cpp.
References OpenDDS::DCPS::MULTICAST_SYN, OpenDDS::DCPS::MULTICAST_SYNACK, syn_received(), and synack_received().
Referenced by OpenDDS::DCPS::ReliableSession::control_received().
00149 { 00150 switch (submessage_id) { 00151 case MULTICAST_SYN: 00152 syn_received(control); 00153 break; 00154 00155 case MULTICAST_SYNACK: 00156 synack_received(control); 00157 break; 00158 00159 default: 00160 return false; 00161 } 00162 00163 return true; 00164 }
virtual bool OpenDDS::DCPS::MulticastSession::is_reliable | ( | ) | [inline, virtual] |
Reimplemented in OpenDDS::DCPS::BestEffortSession, and OpenDDS::DCPS::ReliableSession.
Definition at line 69 of file MulticastSession.h.
ACE_INLINE MulticastDataLink * OpenDDS::DCPS::MulticastSession::link | ( | void | ) |
Definition at line 14 of file MulticastSession.inl.
References link_.
Referenced by OpenDDS::DCPS::ReliableSession::nak_received(), OpenDDS::DCPS::ReliableSession::nakack_received(), OpenDDS::DCPS::NakWatchdog::next_interval(), OpenDDS::DCPS::SynWatchdog::next_interval(), OpenDDS::DCPS::SynWatchdog::next_timeout(), OpenDDS::DCPS::SynWatchdog::on_timeout(), OpenDDS::DCPS::NakWatchdog::reactor_is_shut_down(), OpenDDS::DCPS::SynWatchdog::reactor_is_shut_down(), OpenDDS::DCPS::ReliableSession::send_naks(), send_syn(), send_synack(), syn_received(), and synack_received().
00015 { 00016 return this->link_; 00017 }
virtual bool OpenDDS::DCPS::MulticastSession::ready_to_deliver | ( | const TransportHeader & | header, | |
const ReceivedDataSample & | data | |||
) | [pure virtual] |
Implemented in OpenDDS::DCPS::BestEffortSession, and OpenDDS::DCPS::ReliableSession.
bool OpenDDS::DCPS::MulticastSession::reassemble | ( | ReceivedDataSample & | data, | |
const TransportHeader & | header | |||
) |
Definition at line 308 of file MulticastSession.cpp.
References OpenDDS::DCPS::TransportHeader::first_fragment_, OpenDDS::DCPS::TransportReassembly::reassemble(), reassembly_, and OpenDDS::DCPS::TransportHeader::sequence_.
00310 { 00311 return this->reassembly_.reassemble(header.sequence_, 00312 header.first_fragment_, 00313 data); 00314 }
virtual void OpenDDS::DCPS::MulticastSession::record_header_received | ( | const TransportHeader & | header | ) | [pure virtual] |
Implemented in OpenDDS::DCPS::BestEffortSession, and OpenDDS::DCPS::ReliableSession.
virtual void OpenDDS::DCPS::MulticastSession::release_remote | ( | const RepoId & | ) | [inline, virtual] |
Reimplemented in OpenDDS::DCPS::ReliableSession.
Definition at line 82 of file MulticastSession.h.
ACE_INLINE MulticastPeer OpenDDS::DCPS::MulticastSession::remote_peer | ( | ) | const |
Definition at line 20 of file MulticastSession.inl.
References remote_peer_.
Referenced by OpenDDS::DCPS::SynWatchdog::on_timeout().
00021 { 00022 return this->remote_peer_; 00023 }
void OpenDDS::DCPS::MulticastSession::send_control | ( | char | submessage_id, | |
Message_Block_Ptr | data | |||
) | [protected] |
Definition at line 122 of file MulticastSession.cpp.
References ACE_TEXT(), OpenDDS::DCPS::DataLink::create_control(), header, link_, LM_ERROR, OpenDDS::DCPS::move(), OpenDDS::DCPS::DataLink::send_control(), and OpenDDS::DCPS::SEND_CONTROL_OK.
Referenced by OpenDDS::DCPS::ReliableSession::send_nakack(), OpenDDS::DCPS::ReliableSession::send_naks(), send_syn(), and send_synack().
00123 { 00124 DataSampleHeader header; 00125 Message_Block_Ptr control( 00126 this->link_->create_control(submessage_id, header, move(data))); 00127 if (!control) { 00128 ACE_ERROR((LM_ERROR, 00129 ACE_TEXT("(%P|%t) ERROR: ") 00130 ACE_TEXT("MulticastSession::send_control: ") 00131 ACE_TEXT("create_control failed!\n"))); 00132 return; 00133 } 00134 00135 int error = this->link_->send_control(header, move(control)); 00136 if (error != SEND_CONTROL_OK) { 00137 ACE_ERROR((LM_ERROR, 00138 ACE_TEXT("(%P|%t) ERROR: ") 00139 ACE_TEXT("MulticastSession::send_control: ") 00140 ACE_TEXT("send_control failed: %d!\n"), 00141 error)); 00142 return; 00143 } 00144 }
virtual void OpenDDS::DCPS::MulticastSession::send_naks | ( | ) | [inline, virtual] |
Reimplemented in OpenDDS::DCPS::ReliableSession.
Definition at line 76 of file MulticastSession.h.
Referenced by send_synack().
void OpenDDS::DCPS::MulticastSession::send_syn | ( | ) |
Definition at line 211 of file MulticastSession.cpp.
References OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), len, link(), LM_DEBUG, OpenDDS::DCPS::move(), OpenDDS::DCPS::MULTICAST_SYN, remote_peer_, send_control(), and VDBG_LVL.
Referenced by OpenDDS::DCPS::SynWatchdog::on_interval().
00212 { 00213 size_t len = sizeof(this->remote_peer_); 00214 00215 Message_Block_Ptr data( new ACE_Message_Block(len)); 00216 00217 Serializer serializer(data.get()); 00218 00219 serializer << this->remote_peer_; 00220 00221 VDBG_LVL((LM_DEBUG, "(%P|%t) MulticastSession[%C]::send_syn " 00222 "local %#08x%08x remote %#08x%08x\n", 00223 this->link()->config().name().c_str(), 00224 (unsigned int)(this->link()->local_peer() >> 32), 00225 (unsigned int) this->link()->local_peer(), 00226 (unsigned int)(this->remote_peer_ >> 32), 00227 (unsigned int) this->remote_peer_), 2); 00228 00229 // Send control sample to remote peer: 00230 send_control(MULTICAST_SYN, move(data)); 00231 }
void OpenDDS::DCPS::MulticastSession::send_synack | ( | ) |
Definition at line 274 of file MulticastSession.cpp.
References active_, OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), len, link(), LM_DEBUG, OpenDDS::DCPS::move(), OpenDDS::DCPS::MULTICAST_SYNACK, remote_peer_, send_control(), send_naks(), and VDBG_LVL.
Referenced by syn_received().
00275 { 00276 size_t len = sizeof(this->remote_peer_); 00277 00278 Message_Block_Ptr data(new ACE_Message_Block(len)); 00279 00280 Serializer serializer(data.get()); 00281 00282 serializer << this->remote_peer_; 00283 00284 VDBG_LVL((LM_DEBUG, "(%P|%t) MulticastSession[%C]::send_synack " 00285 "local %#08x%08x remote %#08x%08x active %d\n", 00286 this->link()->config().name().c_str(), 00287 (unsigned int)(this->link()->local_peer() >> 32), 00288 (unsigned int) this->link()->local_peer(), 00289 (unsigned int)(this->remote_peer_ >> 32), 00290 (unsigned int) this->remote_peer_, 00291 this->active_ ? 1 : 0), 2); 00292 00293 // Send control sample to remote peer: 00294 send_control(MULTICAST_SYNACK, move(data)); 00295 00296 // Send naks before sending synack to 00297 // reduce wait time for resends from remote. 00298 send_naks(); 00299 }
void OpenDDS::DCPS::MulticastSession::set_acked | ( | ) |
Definition at line 110 of file MulticastSession.cpp.
References ack_lock_, and acked_.
Referenced by OpenDDS::DCPS::ReliableSession::start().
00110 { 00111 ACE_GUARD(ACE_SYNCH_MUTEX, guard, this->ack_lock_); 00112 this->acked_ = true; 00113 }
virtual bool OpenDDS::DCPS::MulticastSession::start | ( | bool | active, | |
bool | acked | |||
) | [pure virtual] |
Implemented in OpenDDS::DCPS::BestEffortSession, and OpenDDS::DCPS::ReliableSession.
bool OpenDDS::DCPS::MulticastSession::start_syn | ( | ) | [protected] |
Definition at line 116 of file MulticastSession.cpp.
References syn_watchdog_.
Referenced by OpenDDS::DCPS::ReliableSession::start(), and OpenDDS::DCPS::BestEffortSession::start().
00117 { 00118 return this->syn_watchdog_->schedule_now(); 00119 }
void OpenDDS::DCPS::MulticastSession::stop | ( | void | ) | [virtual] |
Reimplemented in OpenDDS::DCPS::ReliableSession.
Definition at line 302 of file MulticastSession.cpp.
References syn_watchdog_.
00303 { 00304 this->syn_watchdog_->cancel(); 00305 }
virtual void OpenDDS::DCPS::MulticastSession::syn_hook | ( | const SequenceNumber & | ) | [inline, protected, virtual] |
Reimplemented in OpenDDS::DCPS::ReliableSession.
Definition at line 107 of file MulticastSession.h.
Referenced by syn_received().
void OpenDDS::DCPS::MulticastSession::syn_received | ( | const Message_Block_Ptr & | control | ) |
Definition at line 167 of file MulticastSession.cpp.
References ack_lock_, acked_, active_, OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), header, link(), link_, LM_DEBUG, OpenDDS::DCPS::MulticastDataLink::local_peer(), OpenDDS::DCPS::MulticastTransport::passive_connection(), OpenDDS::DCPS::MulticastDataLink::receive_strategy(), OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::received_header(), remote_peer_, send_synack(), OpenDDS::DCPS::TransportHeader::sequence_, OpenDDS::DCPS::TransportHeader::source_, OpenDDS::DCPS::TransportHeader::swap_bytes(), syn_hook(), OpenDDS::DCPS::MulticastDataLink::transport(), and VDBG_LVL.
Referenced by control_received().
00168 { 00169 if (this->active_) return; // pub send syn, then doesn't receive them. 00170 00171 const TransportHeader& header = 00172 this->link_->receive_strategy()->received_header(); 00173 00174 // Not from the remote peer for this session. 00175 if (this->remote_peer_ != header.source_) return; 00176 00177 Serializer serializer(control.get(), header.swap_bytes()); 00178 00179 MulticastPeer local_peer; 00180 serializer >> local_peer; // sent as remote_peer 00181 00182 // Ignore sample if not destined for us: 00183 if (local_peer != this->link_->local_peer()) return; 00184 00185 VDBG_LVL((LM_DEBUG, "(%P|%t) MulticastSession[%C]::syn_received " 00186 "local %#08x%08x remote %#08x%08x\n", 00187 this->link()->config().name().c_str(), 00188 (unsigned int)(this->link()->local_peer() >> 32), 00189 (unsigned int) this->link()->local_peer(), 00190 (unsigned int)(this->remote_peer_ >> 32), 00191 (unsigned int) this->remote_peer_), 2); 00192 00193 { 00194 ACE_GUARD(ACE_SYNCH_MUTEX, guard, this->ack_lock_); 00195 00196 if (!this->acked_) { 00197 this->acked_ = true; 00198 syn_hook(header.sequence_); 00199 } 00200 } 00201 00202 // MULTICAST_SYN control samples are always positively 00203 // acknowledged by a matching remote peer: 00204 send_synack(); 00205 00206 this->link_->transport().passive_connection(this->link_->local_peer(), this->remote_peer_); 00207 00208 }
void OpenDDS::DCPS::MulticastSession::synack_received | ( | const Message_Block_Ptr & | control | ) |
Definition at line 234 of file MulticastSession.cpp.
References ack_lock_, acked(), acked_, active_, OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), header, link(), link_, LM_DEBUG, OpenDDS::DCPS::MulticastDataLink::local_peer(), OpenDDS::DCPS::MulticastDataLink::receive_strategy(), OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::received_header(), remote_peer_, OpenDDS::DCPS::TransportHeader::source_, OpenDDS::DCPS::TransportHeader::swap_bytes(), syn_watchdog_, and VDBG_LVL.
Referenced by control_received().
00235 { 00236 if (!this->active_) return; // sub send synack, then doesn't receive them. 00237 00238 // Already received ack. 00239 if (this->acked()) return; 00240 00241 const TransportHeader& header = 00242 this->link_->receive_strategy()->received_header(); 00243 00244 // Not from the remote peer for this session. 00245 if (this->remote_peer_ != header.source_) return; 00246 00247 Serializer serializer(control.get(), header.swap_bytes()); 00248 00249 MulticastPeer local_peer; 00250 serializer >> local_peer; // sent as remote_peer 00251 00252 // Ignore sample if not destined for us: 00253 if (local_peer != this->link_->local_peer()) return; 00254 00255 VDBG_LVL((LM_DEBUG, "(%P|%t) MulticastSession[%C]::synack_received " 00256 "local %#08x%08x remote %#08x%08x\n", 00257 this->link()->config().name().c_str(), 00258 (unsigned int)(this->link()->local_peer() >> 32), 00259 (unsigned int) this->link()->local_peer(), 00260 (unsigned int)(this->remote_peer_ >> 32), 00261 (unsigned int) this->remote_peer_), 2); 00262 00263 { 00264 ACE_GUARD(ACE_SYNCH_MUTEX, guard, this->ack_lock_); 00265 00266 if (this->acked_) return; // already acked 00267 00268 this->syn_watchdog_->cancel(); 00269 this->acked_ = true; 00270 } 00271 }
Definition at line 128 of file MulticastSession.h.
Referenced by acked(), set_acked(), syn_received(), and synack_received().
bool OpenDDS::DCPS::MulticastSession::acked_ [protected] |
Definition at line 125 of file MulticastSession.h.
Referenced by acked(), set_acked(), syn_received(), and synack_received().
bool OpenDDS::DCPS::MulticastSession::active_ [protected] |
Definition at line 121 of file MulticastSession.h.
Referenced by OpenDDS::DCPS::ReliableSession::check_header(), OpenDDS::DCPS::ReliableSession::nak_received(), OpenDDS::DCPS::ReliableSession::nakack_received(), OpenDDS::DCPS::ReliableSession::record_header_received(), send_synack(), OpenDDS::DCPS::ReliableSession::start(), OpenDDS::DCPS::BestEffortSession::start(), syn_received(), and synack_received().
MulticastDataLink* OpenDDS::DCPS::MulticastSession::link_ [protected] |
Definition at line 93 of file MulticastSession.h.
Referenced by OpenDDS::DCPS::ReliableSession::deliver_held_data(), OpenDDS::DCPS::ReliableSession::expire_naks(), link(), OpenDDS::DCPS::ReliableSession::nak_received(), OpenDDS::DCPS::ReliableSession::nakack_received(), send_control(), syn_received(), and synack_received().
Definition at line 123 of file MulticastSession.h.
Referenced by OpenDDS::DCPS::BestEffortSession::check_header(), OpenDDS::DCPS::ReliableSession::expire_naks(), OpenDDS::DCPS::ReliableSession::nakack_received(), and reassemble().
Definition at line 95 of file MulticastSession.h.
Referenced by OpenDDS::DCPS::ReliableSession::check_header(), OpenDDS::DCPS::ReliableSession::expire_naks(), OpenDDS::DCPS::ReliableSession::nak_received(), OpenDDS::DCPS::ReliableSession::nakack_received(), OpenDDS::DCPS::ReliableSession::record_header_received(), OpenDDS::DCPS::BestEffortSession::record_header_received(), remote_peer(), OpenDDS::DCPS::ReliableSession::send_naks(), send_syn(), send_synack(), syn_received(), and synack_received().
Definition at line 110 of file MulticastSession.h.
Referenced by OpenDDS::DCPS::ReliableSession::start().
Definition at line 112 of file MulticastSession.h.
Referenced by OpenDDS::DCPS::ReliableSession::start(), and OpenDDS::DCPS::BestEffortSession::start().
bool OpenDDS::DCPS::MulticastSession::started_ [protected] |
Definition at line 113 of file MulticastSession.h.
Referenced by OpenDDS::DCPS::ReliableSession::start(), and OpenDDS::DCPS::BestEffortSession::start().
Definition at line 129 of file MulticastSession.h.
Referenced by start_syn(), stop(), synack_received(), and ~MulticastSession().