#include <MulticastSession.h>
Inheritance diagram for OpenDDS::DCPS::MulticastSession:
Public Member Functions | |
virtual | ~MulticastSession () |
MulticastDataLink * | link () |
MulticastPeer | remote_peer () const |
bool | acked () |
void | set_acked () |
virtual bool | is_reliable () |
void | syn_received (ACE_Message_Block *control) |
void | send_syn () |
void | synack_received (ACE_Message_Block *control) |
void | send_synack () |
virtual void | send_naks () |
virtual bool | check_header (const TransportHeader &header)=0 |
virtual void | record_header_received (const TransportHeader &header)=0 |
virtual bool | ready_to_deliver (const TransportHeader &header, const ReceivedDataSample &data)=0 |
virtual void | release_remote (const RepoId &) |
virtual bool | control_received (char submessage_id, ACE_Message_Block *control) |
virtual bool | start (bool active, bool acked)=0 |
virtual void | stop () |
bool | reassemble (ReceivedDataSample &data, const TransportHeader &header) |
Protected Types | |
typedef ACE_Reverse_Lock< ACE_Thread_Mutex > | Reverse_Lock_t |
Protected Member Functions | |
MulticastSession (ACE_Reactor *reactor, ACE_thread_t owner, MulticastDataLink *link, MulticastPeer remote_peer) | |
void | send_control (char submessage_id, ACE_Message_Block *data) |
bool | start_syn () |
virtual void | syn_hook (const SequenceNumber &) |
Protected Attributes | |
MulticastDataLink * | link_ |
MulticastPeer | remote_peer_ |
Reverse_Lock_t | reverse_start_lock_ |
ACE_Thread_Mutex | start_lock_ |
bool | started_ |
bool | active_ |
TransportReassembly | reassembly_ |
bool | acked_ |
Private Attributes | |
ACE_Thread_Mutex | ack_lock_ |
SynWatchdog * | syn_watchdog_ |
Definition at line 55 of file MulticastSession.h.
typedef ACE_Reverse_Lock<ACE_Thread_Mutex> OpenDDS::DCPS::MulticastSession::Reverse_Lock_t [protected] |
Definition at line 106 of file MulticastSession.h.
OpenDDS::DCPS::MulticastSession::~MulticastSession | ( | ) | [virtual] |
Definition at line 95 of file MulticastSession.cpp.
References OpenDDS::DCPS::DataLinkWatchdog::cancel(), OpenDDS::DCPS::ReactorInterceptor::destroy(), syn_watchdog_, and OpenDDS::DCPS::ReactorInterceptor::wait().
00096 { 00097 syn_watchdog_->cancel(); 00098 syn_watchdog_->wait(); 00099 syn_watchdog_->destroy(); 00100 }
OpenDDS::DCPS::MulticastSession::MulticastSession | ( | ACE_Reactor * | reactor, | |
ACE_thread_t | owner, | |||
MulticastDataLink * | link, | |||
MulticastPeer | remote_peer | |||
) | [protected] |
Definition at line 81 of file MulticastSession.cpp.
00085 : link_(link) 00086 , remote_peer_(remote_peer) 00087 , reverse_start_lock_(start_lock_) 00088 , started_(false) 00089 , active_(true) 00090 , acked_(false) 00091 , syn_watchdog_(new SynWatchdog (reactor, owner, this)) 00092 { 00093 }
bool OpenDDS::DCPS::MulticastSession::acked | ( | ) |
Definition at line 103 of file MulticastSession.cpp.
References acked_.
Referenced by OpenDDS::DCPS::ReliableSession::ready_to_deliver().
00104 { 00105 ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, guard, this->ack_lock_, false); 00106 return this->acked_; 00107 }
virtual bool OpenDDS::DCPS::MulticastSession::check_header | ( | const TransportHeader & | header | ) | [pure virtual] |
Implemented in OpenDDS::DCPS::BestEffortSession, and OpenDDS::DCPS::ReliableSession.
bool OpenDDS::DCPS::MulticastSession::control_received | ( | char | submessage_id, | |
ACE_Message_Block * | control | |||
) | [virtual] |
Reimplemented in OpenDDS::DCPS::ReliableSession.
Definition at line 147 of file MulticastSession.cpp.
References OpenDDS::DCPS::MULTICAST_SYN, OpenDDS::DCPS::MULTICAST_SYNACK, syn_received(), and synack_received().
Referenced by OpenDDS::DCPS::ReliableSession::control_received().
00149 { 00150 switch (submessage_id) { 00151 case MULTICAST_SYN: 00152 syn_received(control); 00153 break; 00154 00155 case MULTICAST_SYNACK: 00156 synack_received(control); 00157 break; 00158 00159 default: 00160 return false; 00161 } 00162 00163 return true; 00164 }
virtual bool OpenDDS::DCPS::MulticastSession::is_reliable | ( | ) | [inline, virtual] |
Reimplemented in OpenDDS::DCPS::BestEffortSession, and OpenDDS::DCPS::ReliableSession.
Definition at line 66 of file MulticastSession.h.
ACE_INLINE MulticastDataLink * OpenDDS::DCPS::MulticastSession::link | ( | ) |
Definition at line 12 of file MulticastSession.inl.
References link_.
Referenced by OpenDDS::DCPS::ReliableSession::nak_received(), OpenDDS::DCPS::ReliableSession::nakack_received(), OpenDDS::DCPS::NakWatchdog::next_interval(), OpenDDS::DCPS::SynWatchdog::next_interval(), OpenDDS::DCPS::SynWatchdog::next_timeout(), OpenDDS::DCPS::SynWatchdog::on_timeout(), OpenDDS::DCPS::NakWatchdog::reactor_is_shut_down(), OpenDDS::DCPS::SynWatchdog::reactor_is_shut_down(), OpenDDS::DCPS::ReliableSession::send_naks(), send_syn(), send_synack(), syn_received(), and synack_received().
00013 { 00014 return this->link_; 00015 }
virtual bool OpenDDS::DCPS::MulticastSession::ready_to_deliver | ( | const TransportHeader & | header, | |
const ReceivedDataSample & | data | |||
) | [pure virtual] |
Implemented in OpenDDS::DCPS::BestEffortSession, and OpenDDS::DCPS::ReliableSession.
bool OpenDDS::DCPS::MulticastSession::reassemble | ( | ReceivedDataSample & | data, | |
const TransportHeader & | header | |||
) |
Definition at line 310 of file MulticastSession.cpp.
References header, OpenDDS::DCPS::TransportReassembly::reassemble(), and reassembly_.
00312 { 00313 return this->reassembly_.reassemble(header.sequence_, 00314 header.first_fragment_, 00315 data); 00316 }
virtual void OpenDDS::DCPS::MulticastSession::record_header_received | ( | const TransportHeader & | header | ) | [pure virtual] |
Implemented in OpenDDS::DCPS::BestEffortSession, and OpenDDS::DCPS::ReliableSession.
virtual void OpenDDS::DCPS::MulticastSession::release_remote | ( | const RepoId & | ) | [inline, virtual] |
ACE_INLINE MulticastPeer OpenDDS::DCPS::MulticastSession::remote_peer | ( | ) | const |
Definition at line 18 of file MulticastSession.inl.
References remote_peer_.
Referenced by OpenDDS::DCPS::SynWatchdog::on_timeout().
00019 { 00020 return this->remote_peer_; 00021 }
void OpenDDS::DCPS::MulticastSession::send_control | ( | char | submessage_id, | |
ACE_Message_Block * | data | |||
) | [protected] |
Definition at line 122 of file MulticastSession.cpp.
References OpenDDS::DCPS::DataLink::create_control(), header, link_, OpenDDS::DCPS::DataLink::send_control(), and OpenDDS::DCPS::SEND_CONTROL_OK.
Referenced by OpenDDS::DCPS::ReliableSession::send_nakack(), OpenDDS::DCPS::ReliableSession::send_naks(), send_syn(), and send_synack().
00123 { 00124 DataSampleHeader header; 00125 ACE_Message_Block* control = 00126 this->link_->create_control(submessage_id, header, data); 00127 if (control == 0) { 00128 ACE_ERROR((LM_ERROR, 00129 ACE_TEXT("(%P|%t) ERROR: ") 00130 ACE_TEXT("MulticastSession::send_control: ") 00131 ACE_TEXT("create_control failed!\n"))); 00132 return; 00133 } 00134 00135 int error = this->link_->send_control(header, control); 00136 if (error != SEND_CONTROL_OK) { 00137 ACE_ERROR((LM_ERROR, 00138 ACE_TEXT("(%P|%t) ERROR: ") 00139 ACE_TEXT("MulticastSession::send_control: ") 00140 ACE_TEXT("send_control failed: %d!\n"), 00141 error)); 00142 return; 00143 } 00144 }
virtual void OpenDDS::DCPS::MulticastSession::send_naks | ( | ) | [inline, virtual] |
Reimplemented in OpenDDS::DCPS::ReliableSession.
Definition at line 73 of file MulticastSession.h.
Referenced by send_synack().
void OpenDDS::DCPS::MulticastSession::send_syn | ( | ) |
Definition at line 211 of file MulticastSession.cpp.
References link(), OpenDDS::DCPS::MulticastDataLink::local_peer(), OpenDDS::DCPS::MULTICAST_SYN, remote_peer_, send_control(), and VDBG_LVL.
Referenced by OpenDDS::DCPS::SynWatchdog::on_interval().
00212 { 00213 size_t len = sizeof(this->remote_peer_); 00214 00215 ACE_Message_Block* data; 00216 ACE_NEW(data, ACE_Message_Block(len)); 00217 00218 Serializer serializer(data); 00219 00220 serializer << this->remote_peer_; 00221 00222 VDBG_LVL((LM_DEBUG, "(%P|%t) MulticastSession[%C]::send_syn " 00223 "local %#08x%08x remote %#08x%08x\n", 00224 this->link()->config()->name().c_str(), 00225 (unsigned int)(this->link()->local_peer() >> 32), 00226 (unsigned int) this->link()->local_peer(), 00227 (unsigned int)(this->remote_peer_ >> 32), 00228 (unsigned int) this->remote_peer_), 2); 00229 00230 // Send control sample to remote peer: 00231 send_control(MULTICAST_SYN, data); 00232 }
void OpenDDS::DCPS::MulticastSession::send_synack | ( | ) |
Definition at line 275 of file MulticastSession.cpp.
References link(), OpenDDS::DCPS::MulticastDataLink::local_peer(), OpenDDS::DCPS::MULTICAST_SYNACK, remote_peer_, send_control(), send_naks(), and VDBG_LVL.
Referenced by syn_received().
00276 { 00277 size_t len = sizeof(this->remote_peer_); 00278 00279 ACE_Message_Block* data; 00280 ACE_NEW(data, ACE_Message_Block(len)); 00281 00282 Serializer serializer(data); 00283 00284 serializer << this->remote_peer_; 00285 00286 VDBG_LVL((LM_DEBUG, "(%P|%t) MulticastSession[%C]::send_synack " 00287 "local %#08x%08x remote %#08x%08x active %d\n", 00288 this->link()->config()->name().c_str(), 00289 (unsigned int)(this->link()->local_peer() >> 32), 00290 (unsigned int) this->link()->local_peer(), 00291 (unsigned int)(this->remote_peer_ >> 32), 00292 (unsigned int) this->remote_peer_, 00293 this->active_ ? 1 : 0), 2); 00294 00295 // Send control sample to remote peer: 00296 send_control(MULTICAST_SYNACK, data); 00297 00298 // Send naks before sending synack to 00299 // reduce wait time for resends from remote. 00300 send_naks(); 00301 }
void OpenDDS::DCPS::MulticastSession::set_acked | ( | ) |
Definition at line 110 of file MulticastSession.cpp.
References acked_.
Referenced by OpenDDS::DCPS::ReliableSession::start().
00110 { 00111 ACE_GUARD(ACE_SYNCH_MUTEX, guard, this->ack_lock_); 00112 this->acked_ = true; 00113 }
virtual bool OpenDDS::DCPS::MulticastSession::start | ( | bool | active, | |
bool | acked | |||
) | [pure virtual] |
Implemented in OpenDDS::DCPS::BestEffortSession, and OpenDDS::DCPS::ReliableSession.
bool OpenDDS::DCPS::MulticastSession::start_syn | ( | ) | [protected] |
Definition at line 116 of file MulticastSession.cpp.
References OpenDDS::DCPS::DataLinkWatchdog::schedule_now(), and syn_watchdog_.
00117 { 00118 return this->syn_watchdog_->schedule_now(); 00119 }
void OpenDDS::DCPS::MulticastSession::stop | ( | ) | [virtual] |
Reimplemented in OpenDDS::DCPS::ReliableSession.
Definition at line 304 of file MulticastSession.cpp.
References OpenDDS::DCPS::DataLinkWatchdog::cancel(), and syn_watchdog_.
Referenced by OpenDDS::DCPS::ReliableSession::stop().
00305 { 00306 this->syn_watchdog_->cancel(); 00307 }
virtual void OpenDDS::DCPS::MulticastSession::syn_hook | ( | const SequenceNumber & | ) | [inline, protected, virtual] |
Reimplemented in OpenDDS::DCPS::ReliableSession.
Definition at line 104 of file MulticastSession.h.
Referenced by syn_received().
void OpenDDS::DCPS::MulticastSession::syn_received | ( | ACE_Message_Block * | control | ) |
Definition at line 167 of file MulticastSession.cpp.
References acked_, header, link(), link_, OpenDDS::DCPS::MulticastDataLink::local_peer(), OpenDDS::DCPS::MulticastTransport::passive_connection(), OpenDDS::DCPS::MulticastDataLink::receive_strategy(), remote_peer_, send_synack(), syn_hook(), OpenDDS::DCPS::MulticastDataLink::transport(), and VDBG_LVL.
Referenced by control_received().
00168 { 00169 if (this->active_) return; // pub send syn, then doesn't receive them. 00170 00171 const TransportHeader& header = 00172 this->link_->receive_strategy()->received_header(); 00173 00174 // Not from the remote peer for this session. 00175 if (this->remote_peer_ != header.source_) return; 00176 00177 Serializer serializer(control, header.swap_bytes()); 00178 00179 MulticastPeer local_peer; 00180 serializer >> local_peer; // sent as remote_peer 00181 00182 // Ignore sample if not destined for us: 00183 if (local_peer != this->link_->local_peer()) return; 00184 00185 VDBG_LVL((LM_DEBUG, "(%P|%t) MulticastSession[%C]::syn_received " 00186 "local %#08x%08x remote %#08x%08x\n", 00187 this->link()->config()->name().c_str(), 00188 (unsigned int)(this->link()->local_peer() >> 32), 00189 (unsigned int) this->link()->local_peer(), 00190 (unsigned int)(this->remote_peer_ >> 32), 00191 (unsigned int) this->remote_peer_), 2); 00192 00193 { 00194 ACE_GUARD(ACE_SYNCH_MUTEX, guard, this->ack_lock_); 00195 00196 if (!this->acked_) { 00197 this->acked_ = true; 00198 syn_hook(header.sequence_); 00199 } 00200 } 00201 00202 // MULTICAST_SYN control samples are always positively 00203 // acknowledged by a matching remote peer: 00204 send_synack(); 00205 00206 this->link_->transport()->passive_connection(this->link_->local_peer(), this->remote_peer_); 00207 00208 }
void OpenDDS::DCPS::MulticastSession::synack_received | ( | ACE_Message_Block * | control | ) |
Definition at line 235 of file MulticastSession.cpp.
References acked_, OpenDDS::DCPS::DataLinkWatchdog::cancel(), header, link(), link_, OpenDDS::DCPS::MulticastDataLink::local_peer(), OpenDDS::DCPS::MulticastDataLink::receive_strategy(), syn_watchdog_, and VDBG_LVL.
Referenced by control_received().
00236 { 00237 if (!this->active_) return; // sub send synack, then doesn't receive them. 00238 00239 // Already received ack. 00240 if (this->acked()) return; 00241 00242 const TransportHeader& header = 00243 this->link_->receive_strategy()->received_header(); 00244 00245 // Not from the remote peer for this session. 00246 if (this->remote_peer_ != header.source_) return; 00247 00248 Serializer serializer(control, header.swap_bytes()); 00249 00250 MulticastPeer local_peer; 00251 serializer >> local_peer; // sent as remote_peer 00252 00253 // Ignore sample if not destined for us: 00254 if (local_peer != this->link_->local_peer()) return; 00255 00256 VDBG_LVL((LM_DEBUG, "(%P|%t) MulticastSession[%C]::synack_received " 00257 "local %#08x%08x remote %#08x%08x\n", 00258 this->link()->config()->name().c_str(), 00259 (unsigned int)(this->link()->local_peer() >> 32), 00260 (unsigned int) this->link()->local_peer(), 00261 (unsigned int)(this->remote_peer_ >> 32), 00262 (unsigned int) this->remote_peer_), 2); 00263 00264 { 00265 ACE_GUARD(ACE_SYNCH_MUTEX, guard, this->ack_lock_); 00266 00267 if (this->acked_) return; // already acked 00268 00269 this->syn_watchdog_->cancel(); 00270 this->acked_ = true; 00271 } 00272 }
ACE_Thread_Mutex OpenDDS::DCPS::MulticastSession::ack_lock_ [private] |
Definition at line 125 of file MulticastSession.h.
bool OpenDDS::DCPS::MulticastSession::acked_ [protected] |
Definition at line 122 of file MulticastSession.h.
Referenced by acked(), set_acked(), syn_received(), and synack_received().
bool OpenDDS::DCPS::MulticastSession::active_ [protected] |
Definition at line 118 of file MulticastSession.h.
Referenced by OpenDDS::DCPS::ReliableSession::start(), and OpenDDS::DCPS::BestEffortSession::start().
MulticastDataLink* OpenDDS::DCPS::MulticastSession::link_ [protected] |
Definition at line 90 of file MulticastSession.h.
Referenced by OpenDDS::DCPS::ReliableSession::expire_naks(), link(), OpenDDS::DCPS::ReliableSession::nak_received(), OpenDDS::DCPS::ReliableSession::nakack_received(), send_control(), syn_received(), and synack_received().
Definition at line 120 of file MulticastSession.h.
Referenced by OpenDDS::DCPS::BestEffortSession::check_header(), and reassemble().
Definition at line 92 of file MulticastSession.h.
Referenced by remote_peer(), OpenDDS::DCPS::ReliableSession::send_naks(), send_syn(), send_synack(), and syn_received().
Definition at line 107 of file MulticastSession.h.
ACE_Thread_Mutex OpenDDS::DCPS::MulticastSession::start_lock_ [protected] |
Definition at line 109 of file MulticastSession.h.
bool OpenDDS::DCPS::MulticastSession::started_ [protected] |
Definition at line 110 of file MulticastSession.h.
Referenced by OpenDDS::DCPS::ReliableSession::start(), and OpenDDS::DCPS::BestEffortSession::start().
Definition at line 126 of file MulticastSession.h.
Referenced by start_syn(), stop(), synack_received(), and ~MulticastSession().