OpenDDS::DCPS::MulticastSession Class Reference

#include <MulticastSession.h>

Inheritance diagram for OpenDDS::DCPS::MulticastSession:

Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::MulticastSession:

Collaboration graph
[legend]
List of all members.

Public Member Functions

virtual ~MulticastSession ()
MulticastDataLinklink ()
MulticastPeer remote_peer () const
bool acked ()
void set_acked ()
virtual bool is_reliable ()
void syn_received (ACE_Message_Block *control)
void send_syn ()
void synack_received (ACE_Message_Block *control)
void send_synack ()
virtual void send_naks ()
virtual bool check_header (const TransportHeader &header)=0
virtual void record_header_received (const TransportHeader &header)=0
virtual bool ready_to_deliver (const TransportHeader &header, const ReceivedDataSample &data)=0
virtual void release_remote (const RepoId &)
virtual bool control_received (char submessage_id, ACE_Message_Block *control)
virtual bool start (bool active, bool acked)=0
virtual void stop ()
bool reassemble (ReceivedDataSample &data, const TransportHeader &header)

Protected Types

typedef ACE_Reverse_Lock<
ACE_Thread_Mutex > 
Reverse_Lock_t

Protected Member Functions

 MulticastSession (ACE_Reactor *reactor, ACE_thread_t owner, MulticastDataLink *link, MulticastPeer remote_peer)
void send_control (char submessage_id, ACE_Message_Block *data)
bool start_syn ()
virtual void syn_hook (const SequenceNumber &)

Protected Attributes

MulticastDataLinklink_
MulticastPeer remote_peer_
Reverse_Lock_t reverse_start_lock_
ACE_Thread_Mutex start_lock_
bool started_
bool active_
TransportReassembly reassembly_
bool acked_

Private Attributes

ACE_Thread_Mutex ack_lock_
SynWatchdogsyn_watchdog_

Detailed Description

Definition at line 55 of file MulticastSession.h.


Member Typedef Documentation

typedef ACE_Reverse_Lock<ACE_Thread_Mutex> OpenDDS::DCPS::MulticastSession::Reverse_Lock_t [protected]

Definition at line 106 of file MulticastSession.h.


Constructor & Destructor Documentation

OpenDDS::DCPS::MulticastSession::~MulticastSession (  )  [virtual]

Definition at line 95 of file MulticastSession.cpp.

References OpenDDS::DCPS::DataLinkWatchdog::cancel(), OpenDDS::DCPS::ReactorInterceptor::destroy(), syn_watchdog_, and OpenDDS::DCPS::ReactorInterceptor::wait().

00096 {
00097   syn_watchdog_->cancel();
00098   syn_watchdog_->wait();
00099   syn_watchdog_->destroy();
00100 }

OpenDDS::DCPS::MulticastSession::MulticastSession ( ACE_Reactor *  reactor,
ACE_thread_t  owner,
MulticastDataLink link,
MulticastPeer  remote_peer 
) [protected]

Definition at line 81 of file MulticastSession.cpp.

00085   : link_(link)
00086   , remote_peer_(remote_peer)
00087   , reverse_start_lock_(start_lock_)
00088   , started_(false)
00089   , active_(true)
00090   , acked_(false)
00091   , syn_watchdog_(new SynWatchdog (reactor, owner, this))
00092 {
00093 }


Member Function Documentation

bool OpenDDS::DCPS::MulticastSession::acked (  ) 

Definition at line 103 of file MulticastSession.cpp.

References acked_.

Referenced by OpenDDS::DCPS::ReliableSession::ready_to_deliver().

00104 {
00105   ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, guard, this->ack_lock_, false);
00106   return this->acked_;
00107 }

virtual bool OpenDDS::DCPS::MulticastSession::check_header ( const TransportHeader header  )  [pure virtual]

Implemented in OpenDDS::DCPS::BestEffortSession, and OpenDDS::DCPS::ReliableSession.

bool OpenDDS::DCPS::MulticastSession::control_received ( char  submessage_id,
ACE_Message_Block *  control 
) [virtual]

Reimplemented in OpenDDS::DCPS::ReliableSession.

Definition at line 147 of file MulticastSession.cpp.

References OpenDDS::DCPS::MULTICAST_SYN, OpenDDS::DCPS::MULTICAST_SYNACK, syn_received(), and synack_received().

Referenced by OpenDDS::DCPS::ReliableSession::control_received().

00149 {
00150   switch (submessage_id) {
00151   case MULTICAST_SYN:
00152     syn_received(control);
00153     break;
00154 
00155   case MULTICAST_SYNACK:
00156     synack_received(control);
00157     break;
00158 
00159   default:
00160     return false;
00161   }
00162 
00163   return true;
00164 }

virtual bool OpenDDS::DCPS::MulticastSession::is_reliable (  )  [inline, virtual]

Reimplemented in OpenDDS::DCPS::BestEffortSession, and OpenDDS::DCPS::ReliableSession.

Definition at line 66 of file MulticastSession.h.

00066 { return false;}

ACE_INLINE MulticastDataLink * OpenDDS::DCPS::MulticastSession::link (  ) 

Definition at line 12 of file MulticastSession.inl.

References link_.

Referenced by OpenDDS::DCPS::ReliableSession::nak_received(), OpenDDS::DCPS::ReliableSession::nakack_received(), OpenDDS::DCPS::NakWatchdog::next_interval(), OpenDDS::DCPS::SynWatchdog::next_interval(), OpenDDS::DCPS::SynWatchdog::next_timeout(), OpenDDS::DCPS::SynWatchdog::on_timeout(), OpenDDS::DCPS::NakWatchdog::reactor_is_shut_down(), OpenDDS::DCPS::SynWatchdog::reactor_is_shut_down(), OpenDDS::DCPS::ReliableSession::send_naks(), send_syn(), send_synack(), syn_received(), and synack_received().

00013 {
00014   return this->link_;
00015 }

virtual bool OpenDDS::DCPS::MulticastSession::ready_to_deliver ( const TransportHeader header,
const ReceivedDataSample data 
) [pure virtual]

Implemented in OpenDDS::DCPS::BestEffortSession, and OpenDDS::DCPS::ReliableSession.

bool OpenDDS::DCPS::MulticastSession::reassemble ( ReceivedDataSample data,
const TransportHeader header 
)

Definition at line 310 of file MulticastSession.cpp.

References header, OpenDDS::DCPS::TransportReassembly::reassemble(), and reassembly_.

00312 {
00313   return this->reassembly_.reassemble(header.sequence_,
00314                                       header.first_fragment_,
00315                                       data);
00316 }

virtual void OpenDDS::DCPS::MulticastSession::record_header_received ( const TransportHeader header  )  [pure virtual]

Implemented in OpenDDS::DCPS::BestEffortSession, and OpenDDS::DCPS::ReliableSession.

virtual void OpenDDS::DCPS::MulticastSession::release_remote ( const RepoId  )  [inline, virtual]

Reimplemented in OpenDDS::DCPS::ReliableSession.

Definition at line 79 of file MulticastSession.h.

00079 {};

ACE_INLINE MulticastPeer OpenDDS::DCPS::MulticastSession::remote_peer (  )  const

Definition at line 18 of file MulticastSession.inl.

References remote_peer_.

Referenced by OpenDDS::DCPS::SynWatchdog::on_timeout().

00019 {
00020   return this->remote_peer_;
00021 }

void OpenDDS::DCPS::MulticastSession::send_control ( char  submessage_id,
ACE_Message_Block *  data 
) [protected]

Definition at line 122 of file MulticastSession.cpp.

References OpenDDS::DCPS::DataLink::create_control(), header, link_, OpenDDS::DCPS::DataLink::send_control(), and OpenDDS::DCPS::SEND_CONTROL_OK.

Referenced by OpenDDS::DCPS::ReliableSession::send_nakack(), OpenDDS::DCPS::ReliableSession::send_naks(), send_syn(), and send_synack().

00123 {
00124   DataSampleHeader header;
00125   ACE_Message_Block* control =
00126     this->link_->create_control(submessage_id, header, data);
00127   if (control == 0) {
00128     ACE_ERROR((LM_ERROR,
00129                ACE_TEXT("(%P|%t) ERROR: ")
00130                ACE_TEXT("MulticastSession::send_control: ")
00131                ACE_TEXT("create_control failed!\n")));
00132     return;
00133   }
00134 
00135   int error = this->link_->send_control(header, control);
00136   if (error != SEND_CONTROL_OK) {
00137     ACE_ERROR((LM_ERROR,
00138                ACE_TEXT("(%P|%t) ERROR: ")
00139                ACE_TEXT("MulticastSession::send_control: ")
00140                ACE_TEXT("send_control failed: %d!\n"),
00141                error));
00142     return;
00143   }
00144 }

virtual void OpenDDS::DCPS::MulticastSession::send_naks (  )  [inline, virtual]

Reimplemented in OpenDDS::DCPS::ReliableSession.

Definition at line 73 of file MulticastSession.h.

Referenced by send_synack().

00073 {}

void OpenDDS::DCPS::MulticastSession::send_syn (  ) 

Definition at line 211 of file MulticastSession.cpp.

References link(), OpenDDS::DCPS::MulticastDataLink::local_peer(), OpenDDS::DCPS::MULTICAST_SYN, remote_peer_, send_control(), and VDBG_LVL.

Referenced by OpenDDS::DCPS::SynWatchdog::on_interval().

00212 {
00213   size_t len = sizeof(this->remote_peer_);
00214 
00215   ACE_Message_Block* data;
00216   ACE_NEW(data, ACE_Message_Block(len));
00217 
00218   Serializer serializer(data);
00219 
00220   serializer << this->remote_peer_;
00221 
00222   VDBG_LVL((LM_DEBUG, "(%P|%t) MulticastSession[%C]::send_syn "
00223                       "local %#08x%08x remote %#08x%08x\n",
00224                       this->link()->config()->name().c_str(),
00225                       (unsigned int)(this->link()->local_peer() >> 32),
00226                       (unsigned int) this->link()->local_peer(),
00227                       (unsigned int)(this->remote_peer_ >> 32),
00228                       (unsigned int) this->remote_peer_), 2);
00229 
00230   // Send control sample to remote peer:
00231   send_control(MULTICAST_SYN, data);
00232 }

void OpenDDS::DCPS::MulticastSession::send_synack (  ) 

Definition at line 275 of file MulticastSession.cpp.

References link(), OpenDDS::DCPS::MulticastDataLink::local_peer(), OpenDDS::DCPS::MULTICAST_SYNACK, remote_peer_, send_control(), send_naks(), and VDBG_LVL.

Referenced by syn_received().

00276 {
00277   size_t len = sizeof(this->remote_peer_);
00278 
00279   ACE_Message_Block* data;
00280   ACE_NEW(data, ACE_Message_Block(len));
00281 
00282   Serializer serializer(data);
00283 
00284   serializer << this->remote_peer_;
00285 
00286   VDBG_LVL((LM_DEBUG, "(%P|%t) MulticastSession[%C]::send_synack "
00287                       "local %#08x%08x remote %#08x%08x active %d\n",
00288                       this->link()->config()->name().c_str(),
00289                       (unsigned int)(this->link()->local_peer() >> 32),
00290                       (unsigned int) this->link()->local_peer(),
00291                       (unsigned int)(this->remote_peer_ >> 32),
00292                       (unsigned int) this->remote_peer_,
00293                       this->active_ ? 1 : 0), 2);
00294 
00295   // Send control sample to remote peer:
00296   send_control(MULTICAST_SYNACK, data);
00297 
00298   // Send naks before sending synack to
00299   // reduce wait time for resends from remote.
00300   send_naks();
00301 }

void OpenDDS::DCPS::MulticastSession::set_acked (  ) 

Definition at line 110 of file MulticastSession.cpp.

References acked_.

Referenced by OpenDDS::DCPS::ReliableSession::start().

00110                             {
00111   ACE_GUARD(ACE_SYNCH_MUTEX, guard, this->ack_lock_);
00112   this->acked_ = true;
00113 }

virtual bool OpenDDS::DCPS::MulticastSession::start ( bool  active,
bool  acked 
) [pure virtual]

Implemented in OpenDDS::DCPS::BestEffortSession, and OpenDDS::DCPS::ReliableSession.

bool OpenDDS::DCPS::MulticastSession::start_syn (  )  [protected]

Definition at line 116 of file MulticastSession.cpp.

References OpenDDS::DCPS::DataLinkWatchdog::schedule_now(), and syn_watchdog_.

00117 {
00118   return this->syn_watchdog_->schedule_now();
00119 }

void OpenDDS::DCPS::MulticastSession::stop (  )  [virtual]

Reimplemented in OpenDDS::DCPS::ReliableSession.

Definition at line 304 of file MulticastSession.cpp.

References OpenDDS::DCPS::DataLinkWatchdog::cancel(), and syn_watchdog_.

Referenced by OpenDDS::DCPS::ReliableSession::stop().

00305 {
00306   this->syn_watchdog_->cancel();
00307 }

virtual void OpenDDS::DCPS::MulticastSession::syn_hook ( const SequenceNumber  )  [inline, protected, virtual]

Reimplemented in OpenDDS::DCPS::ReliableSession.

Definition at line 104 of file MulticastSession.h.

Referenced by syn_received().

00104 {}

void OpenDDS::DCPS::MulticastSession::syn_received ( ACE_Message_Block *  control  ) 

Definition at line 167 of file MulticastSession.cpp.

References acked_, header, link(), link_, OpenDDS::DCPS::MulticastDataLink::local_peer(), OpenDDS::DCPS::MulticastTransport::passive_connection(), OpenDDS::DCPS::MulticastDataLink::receive_strategy(), remote_peer_, send_synack(), syn_hook(), OpenDDS::DCPS::MulticastDataLink::transport(), and VDBG_LVL.

Referenced by control_received().

00168 {
00169   if (this->active_) return; // pub send syn, then doesn't receive them.
00170 
00171   const TransportHeader& header =
00172     this->link_->receive_strategy()->received_header();
00173 
00174   // Not from the remote peer for this session.
00175   if (this->remote_peer_ != header.source_) return;
00176 
00177   Serializer serializer(control, header.swap_bytes());
00178 
00179   MulticastPeer local_peer;
00180   serializer >> local_peer; // sent as remote_peer
00181 
00182   // Ignore sample if not destined for us:
00183   if (local_peer != this->link_->local_peer()) return;
00184 
00185   VDBG_LVL((LM_DEBUG, "(%P|%t) MulticastSession[%C]::syn_received "
00186                     "local %#08x%08x remote %#08x%08x\n",
00187                     this->link()->config()->name().c_str(),
00188                     (unsigned int)(this->link()->local_peer() >> 32),
00189                     (unsigned int) this->link()->local_peer(),
00190                     (unsigned int)(this->remote_peer_ >> 32),
00191                     (unsigned int) this->remote_peer_), 2);
00192 
00193   {
00194     ACE_GUARD(ACE_SYNCH_MUTEX, guard, this->ack_lock_);
00195 
00196     if (!this->acked_) {
00197       this->acked_ = true;
00198       syn_hook(header.sequence_);
00199     }
00200   }
00201 
00202   // MULTICAST_SYN control samples are always positively
00203   // acknowledged by a matching remote peer:
00204   send_synack();
00205 
00206   this->link_->transport()->passive_connection(this->link_->local_peer(), this->remote_peer_);
00207 
00208 }

void OpenDDS::DCPS::MulticastSession::synack_received ( ACE_Message_Block *  control  ) 

Definition at line 235 of file MulticastSession.cpp.

References acked_, OpenDDS::DCPS::DataLinkWatchdog::cancel(), header, link(), link_, OpenDDS::DCPS::MulticastDataLink::local_peer(), OpenDDS::DCPS::MulticastDataLink::receive_strategy(), syn_watchdog_, and VDBG_LVL.

Referenced by control_received().

00236 {
00237   if (!this->active_) return; // sub send synack, then doesn't receive them.
00238 
00239   // Already received ack.
00240   if (this->acked()) return;
00241 
00242   const TransportHeader& header =
00243     this->link_->receive_strategy()->received_header();
00244 
00245   // Not from the remote peer for this session.
00246   if (this->remote_peer_ != header.source_) return;
00247 
00248   Serializer serializer(control, header.swap_bytes());
00249 
00250   MulticastPeer local_peer;
00251   serializer >> local_peer; // sent as remote_peer
00252 
00253   // Ignore sample if not destined for us:
00254   if (local_peer != this->link_->local_peer()) return;
00255 
00256   VDBG_LVL((LM_DEBUG, "(%P|%t) MulticastSession[%C]::synack_received "
00257                       "local %#08x%08x remote %#08x%08x\n",
00258                       this->link()->config()->name().c_str(),
00259                       (unsigned int)(this->link()->local_peer() >> 32),
00260                       (unsigned int) this->link()->local_peer(),
00261                       (unsigned int)(this->remote_peer_ >> 32),
00262                       (unsigned int) this->remote_peer_), 2);
00263 
00264   {
00265     ACE_GUARD(ACE_SYNCH_MUTEX, guard, this->ack_lock_);
00266 
00267     if (this->acked_) return; // already acked
00268 
00269     this->syn_watchdog_->cancel();
00270     this->acked_ = true;
00271   }
00272 }


Member Data Documentation

ACE_Thread_Mutex OpenDDS::DCPS::MulticastSession::ack_lock_ [private]

Definition at line 125 of file MulticastSession.h.

bool OpenDDS::DCPS::MulticastSession::acked_ [protected]

Definition at line 122 of file MulticastSession.h.

Referenced by acked(), set_acked(), syn_received(), and synack_received().

bool OpenDDS::DCPS::MulticastSession::active_ [protected]

Definition at line 118 of file MulticastSession.h.

Referenced by OpenDDS::DCPS::ReliableSession::start(), and OpenDDS::DCPS::BestEffortSession::start().

MulticastDataLink* OpenDDS::DCPS::MulticastSession::link_ [protected]

Definition at line 90 of file MulticastSession.h.

Referenced by OpenDDS::DCPS::ReliableSession::expire_naks(), link(), OpenDDS::DCPS::ReliableSession::nak_received(), OpenDDS::DCPS::ReliableSession::nakack_received(), send_control(), syn_received(), and synack_received().

TransportReassembly OpenDDS::DCPS::MulticastSession::reassembly_ [protected]

Definition at line 120 of file MulticastSession.h.

Referenced by OpenDDS::DCPS::BestEffortSession::check_header(), and reassemble().

MulticastPeer OpenDDS::DCPS::MulticastSession::remote_peer_ [protected]

Definition at line 92 of file MulticastSession.h.

Referenced by remote_peer(), OpenDDS::DCPS::ReliableSession::send_naks(), send_syn(), send_synack(), and syn_received().

Reverse_Lock_t OpenDDS::DCPS::MulticastSession::reverse_start_lock_ [protected]

Definition at line 107 of file MulticastSession.h.

ACE_Thread_Mutex OpenDDS::DCPS::MulticastSession::start_lock_ [protected]

Definition at line 109 of file MulticastSession.h.

bool OpenDDS::DCPS::MulticastSession::started_ [protected]

Definition at line 110 of file MulticastSession.h.

Referenced by OpenDDS::DCPS::ReliableSession::start(), and OpenDDS::DCPS::BestEffortSession::start().

SynWatchdog* OpenDDS::DCPS::MulticastSession::syn_watchdog_ [private]

Definition at line 126 of file MulticastSession.h.

Referenced by start_syn(), stop(), synack_received(), and ~MulticastSession().


The documentation for this class was generated from the following files:
Generated on Fri Feb 12 20:06:35 2016 for OpenDDS by  doxygen 1.4.7