OpenDDS::DCPS::MulticastDataLink Class Reference

#include <MulticastDataLink.h>

Inheritance diagram for OpenDDS::DCPS::MulticastDataLink:

Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::MulticastDataLink:

Collaboration graph
[legend]
List of all members.

Public Member Functions

 MulticastDataLink (MulticastTransport *transport, MulticastSessionFactory *session_factory, MulticastPeer local_peer, bool is_active)
virtual ~MulticastDataLink ()
MulticastTransporttransport ()
MulticastPeer local_peer () const
void configure (MulticastInst *config, TransportReactorTask *reactor_task)
void send_strategy (MulticastSendStrategy *send_strategy)
MulticastSendStrategysend_strategy ()
void receive_strategy (MulticastReceiveStrategy *recv_strategy)
MulticastReceiveStrategyreceive_strategy ()
SingleSendBuffersend_buffer ()
MulticastInstconfig ()
TransportReactorTaskreactor_task ()
ACE_Reactor * get_reactor ()
ACE_Proactor * get_proactor ()
ACE_SOCK_Dgram_Mcast & socket ()
bool join (const ACE_INET_Addr &group_address)
MulticastSessionfind_or_create_session (MulticastPeer remote_peer)
MulticastSessionfind_session (MulticastPeer remote_peer)
bool check_header (const TransportHeader &header)
bool check_header (const DataSampleHeader &header)
void sample_received (ReceivedDataSample &sample)
bool reassemble (ReceivedDataSample &data, const TransportHeader &header)

Private Member Functions

typedef OPENDDS_MAP (MulticastPeer, MulticastSession_rch) MulticastSessionMap
virtual void stop_i ()
void syn_received_no_session (MulticastPeer source, ACE_Message_Block *data, bool swap_bytes)
void release_remote_i (const RepoId &remote)
bool ready_to_deliver (const ReceivedDataSample &data)

Private Attributes

MulticastTransporttransport_
MulticastSessionFactory_rch session_factory_
MulticastPeer local_peer_
MulticastInstconfig_
TransportReactorTaskreactor_task_
MulticastSendStrategy_rch send_strategy_
 The transport send strategy object for this DataLink.
MulticastReceiveStrategy_rch recv_strategy_
SingleSendBuffersend_buffer_
ACE_SOCK_Dgram_Mcast socket_
ACE_SYNCH_RECURSIVE_MUTEX session_lock_
MulticastSessionMap sessions_
RepoIdSet readers_selected_
RepoIdSet readers_withheld_

Detailed Description

Definition at line 39 of file MulticastDataLink.h.


Constructor & Destructor Documentation

OpenDDS::DCPS::MulticastDataLink::MulticastDataLink ( MulticastTransport transport,
MulticastSessionFactory session_factory,
MulticastPeer  local_peer,
bool  is_active 
)

Definition at line 33 of file MulticastDataLink.cpp.

00037 : DataLink(transport, 0 /*priority*/, false /*loopback*/, is_active),
00038   transport_(transport),
00039   session_factory_(session_factory, false),
00040   local_peer_(local_peer),
00041   config_(0),
00042   reactor_task_(0),
00043   send_buffer_(0)
00044 {
00045 }

OpenDDS::DCPS::MulticastDataLink::~MulticastDataLink (  )  [virtual]

Definition at line 47 of file MulticastDataLink.cpp.

References send_buffer_, and send_strategy_.

00048 {
00049   if (this->send_buffer_) {
00050     this->send_strategy_->send_buffer(0);
00051     delete this->send_buffer_;
00052   }
00053 }


Member Function Documentation

bool OpenDDS::DCPS::MulticastDataLink::check_header ( const DataSampleHeader header  ) 

Definition at line 238 of file MulticastDataLink.cpp.

References header, receive_strategy(), and OpenDDS::DCPS::TRANSPORT_CONTROL.

00239 {
00240   if (header.message_id_ == TRANSPORT_CONTROL) return true;
00241 
00242   ACE_GUARD_RETURN(ACE_SYNCH_RECURSIVE_MUTEX,
00243       guard,
00244       this->session_lock_,
00245       false);
00246 
00247   // Skip data sample unless there is a session for it.
00248   return (this->sessions_.count(receive_strategy()->received_header().source_) > 0);
00249 }

bool OpenDDS::DCPS::MulticastDataLink::check_header ( const TransportHeader header  ) 

Definition at line 219 of file MulticastDataLink.cpp.

References header, and OpenDDS::DCPS::DataLink::is_active().

Referenced by OpenDDS::DCPS::MulticastReceiveStrategy::check_header().

00220 {
00221   ACE_GUARD_RETURN(ACE_SYNCH_RECURSIVE_MUTEX,
00222       guard,
00223       this->session_lock_,
00224       false);
00225 
00226   MulticastSessionMap::iterator it(this->sessions_.find(header.source_));
00227   if (it == this->sessions_.end() && is_active()) {
00228     return false;
00229   }
00230   if (it != this->sessions_.end() && it->second->acked()) {
00231     return it->second->check_header(header);
00232   }
00233 
00234   return true;
00235 }

ACE_INLINE MulticastInst * OpenDDS::DCPS::MulticastDataLink::config (  ) 

Definition at line 42 of file MulticastDataLink.inl.

References config_.

Referenced by configure(), OpenDDS::DCPS::ReliableSession::expire_naks(), OpenDDS::DCPS::NakWatchdog::next_interval(), OpenDDS::DCPS::SynWatchdog::next_interval(), OpenDDS::DCPS::SynWatchdog::next_timeout(), and OpenDDS::DCPS::ReliableSession::send_naks().

00043 {
00044   return this->config_;
00045 }

void OpenDDS::DCPS::MulticastDataLink::configure ( MulticastInst config,
TransportReactorTask reactor_task 
)

Definition at line 56 of file MulticastDataLink.cpp.

References config(), config_, reactor_task(), and reactor_task_.

00058 {
00059   this->config_ = config;
00060   this->reactor_task_ = reactor_task;
00061 }

MulticastSession * OpenDDS::DCPS::MulticastDataLink::find_or_create_session ( MulticastPeer  remote_peer  ) 

Definition at line 179 of file MulticastDataLink.cpp.

References OpenDDS::DCPS::RcHandle< T >::_retn(), OpenDDS::DCPS::RcHandle< T >::is_nil(), session_factory_, sessions_, and transport().

00180 {
00181   ACE_GUARD_RETURN(ACE_SYNCH_RECURSIVE_MUTEX,
00182       guard,
00183       this->session_lock_,
00184       0);
00185 
00186   MulticastSessionMap::iterator it(this->sessions_.find(remote_peer));
00187   if (it != this->sessions_.end()) {
00188     MulticastSession_rch sess = it->second;
00189     return sess._retn();
00190   }
00191 
00192   MulticastSession_rch session =
00193     this->session_factory_->create(transport()->reactor(), transport()->reactor_owner(), this, remote_peer);
00194   if (session.is_nil()) {
00195     ACE_ERROR_RETURN((LM_ERROR,
00196         ACE_TEXT("(%P|%t) ERROR: ")
00197         ACE_TEXT("MulticastDataLink::find_or_create_session: ")
00198         ACE_TEXT("failed to create session for remote peer: %#08x%08x!\n"),
00199         (unsigned int) (remote_peer >> 32),
00200         (unsigned int) remote_peer),
00201         0);
00202   }
00203 
00204   std::pair<MulticastSessionMap::iterator, bool> pair = this->sessions_.insert(
00205       MulticastSessionMap::value_type(remote_peer, session));
00206   if (pair.first == this->sessions_.end()) {
00207     ACE_ERROR_RETURN((LM_ERROR,
00208         ACE_TEXT("(%P|%t) ERROR: ")
00209         ACE_TEXT("MulticastDataLink::find_or_create_session: ")
00210         ACE_TEXT("failed to insert session for remote peer: %#08x%08x!\n"),
00211         (unsigned int) (remote_peer >> 32),
00212         (unsigned int) remote_peer),
00213         0);
00214   }
00215   return session._retn();
00216 }

MulticastSession * OpenDDS::DCPS::MulticastDataLink::find_session ( MulticastPeer  remote_peer  ) 

Definition at line 163 of file MulticastDataLink.cpp.

00164 {
00165   ACE_GUARD_RETURN(ACE_SYNCH_RECURSIVE_MUTEX,
00166       guard,
00167       this->session_lock_,
00168       0);
00169 
00170   MulticastSessionMap::iterator it(this->sessions_.find(remote_peer));
00171   if (it != this->sessions_.end()) {
00172     MulticastSession_rch sess = it->second;
00173     return sess._retn();
00174   }
00175   else return 0;
00176 }

ACE_INLINE ACE_Proactor * OpenDDS::DCPS::MulticastDataLink::get_proactor (  ) 

Definition at line 61 of file MulticastDataLink.inl.

References OpenDDS::DCPS::TransportReactorTask::get_proactor(), and reactor_task_.

Referenced by OpenDDS::DCPS::MulticastSendStrategy::async_send().

00062 {
00063   if (this->reactor_task_ == 0) return 0;
00064   return this->reactor_task_->get_proactor();
00065 }

ACE_INLINE ACE_Reactor * OpenDDS::DCPS::MulticastDataLink::get_reactor (  ) 

Definition at line 54 of file MulticastDataLink.inl.

References OpenDDS::DCPS::TransportReactorTask::get_reactor(), and reactor_task_.

Referenced by OpenDDS::DCPS::MulticastReceiveStrategy::start_i(), and OpenDDS::DCPS::MulticastReceiveStrategy::stop_i().

00055 {
00056   if (this->reactor_task_ == 0) return 0;
00057   return this->reactor_task_->get_reactor();
00058 }

bool OpenDDS::DCPS::MulticastDataLink::join ( const ACE_INET_Addr &  group_address  ) 

Definition at line 91 of file MulticastDataLink.cpp.

References config_, OpenDDS::DCPS::MulticastInst::local_address_, OpenDDS::DCPS::MulticastInst::rcv_buffer_size_, OpenDDS::DCPS::set_socket_multicast_ttl(), socket_, OpenDDS::DCPS::DataLink::start(), and VDBG_LVL.

00092 {
00093   const std::string& net_if = this->config_->local_address_;
00094 #ifdef ACE_HAS_MAC_OSX
00095   socket_.opts(ACE_SOCK_Dgram_Mcast::OPT_BINDADDR_NO |
00096                ACE_SOCK_Dgram_Mcast::DEFOPT_NULLIFACE);
00097 #endif
00098   if (this->socket_.join(group_address, 1,
00099       net_if.empty() ? 0 :
00100           ACE_TEXT_CHAR_TO_TCHAR(net_if.c_str())) != 0) {
00101     ACE_ERROR_RETURN((LM_ERROR,
00102         ACE_TEXT("(%P|%t) ERROR: MulticastDataLink::join: ")
00103         ACE_TEXT("ACE_SOCK_Dgram_Mcast::join failed %m.\n")),
00104         false);
00105   }
00106   VDBG_LVL((LM_DEBUG, ACE_TEXT("(%P|%t) MulticastDataLink::join OK\n")), 6);
00107 
00108   ACE_HANDLE handle = this->socket_.get_handle();
00109 
00110   if (!OpenDDS::DCPS::set_socket_multicast_ttl(this->socket_, this->config_->ttl_)) {
00111     ACE_ERROR_RETURN((LM_ERROR,
00112         ACE_TEXT("(%P|%t) ERROR: ")
00113         ACE_TEXT("MulticastDataLink::join: ")
00114         ACE_TEXT("OpenDDS::DCPS::set_socket_multicast_ttl failed.\n")),
00115         false);
00116   }
00117 
00118   int rcv_buffer_size = ACE_Utils::truncate_cast<int>(this->config_->rcv_buffer_size_);
00119   if (rcv_buffer_size != 0
00120       && ACE_OS::setsockopt(handle, SOL_SOCKET,
00121           SO_RCVBUF,
00122           (char *) &rcv_buffer_size,
00123           sizeof (int)) < 0) {
00124     ACE_ERROR_RETURN((LM_ERROR,
00125         ACE_TEXT("(%P|%t) ERROR: ")
00126         ACE_TEXT("MulticastDataLink::join: ")
00127         ACE_TEXT("ACE_OS::setsockopt RCVBUF failed.\n")),
00128         false);
00129   }
00130 
00131 #if defined (ACE_DEFAULT_MAX_SOCKET_BUFSIZ)
00132   int snd_size = ACE_DEFAULT_MAX_SOCKET_BUFSIZ;
00133 
00134   if (ACE_OS::setsockopt(handle, SOL_SOCKET,
00135       SO_SNDBUF,
00136       (char *) &snd_size,
00137       sizeof(snd_size)) < 0
00138       && errno != ENOTSUP) {
00139     ACE_ERROR_RETURN((LM_ERROR,
00140         ACE_TEXT("(%P|%t) ERROR: ")
00141         ACE_TEXT("MulticastDataLink::join: ")
00142         ACE_TEXT("ACE_OS::setsockopt SNDBUF failed to set the send buffer size to %d errno %m\n"),
00143         snd_size),
00144         false);
00145   }
00146 #endif /* ACE_DEFAULT_MAX_SOCKET_BUFSIZ */
00147 
00148   if (start(static_rchandle_cast<TransportSendStrategy>(this->send_strategy_),
00149       static_rchandle_cast<TransportStrategy>(this->recv_strategy_))
00150       != 0) {
00151     this->socket_.close();
00152     ACE_ERROR_RETURN((LM_ERROR,
00153         ACE_TEXT("(%P|%t) ERROR: ")
00154         ACE_TEXT("MulticastDataLink::join: ")
00155         ACE_TEXT("DataLink::start failed!\n")),
00156         false);
00157   }
00158 
00159   return true;
00160 }

ACE_INLINE MulticastPeer OpenDDS::DCPS::MulticastDataLink::local_peer (  )  const

Definition at line 18 of file MulticastDataLink.inl.

References local_peer_.

Referenced by OpenDDS::DCPS::ReliableSession::nak_received(), OpenDDS::DCPS::ReliableSession::nakack_received(), OpenDDS::DCPS::SynWatchdog::on_timeout(), OpenDDS::DCPS::MulticastSendStrategy::prepare_header_i(), OpenDDS::DCPS::ReliableSession::send_naks(), OpenDDS::DCPS::MulticastSession::send_syn(), OpenDDS::DCPS::MulticastSession::send_synack(), OpenDDS::DCPS::MulticastSession::syn_received(), syn_received_no_session(), and OpenDDS::DCPS::MulticastSession::synack_received().

00019 {
00020   return this->local_peer_;
00021 }

typedef OpenDDS::DCPS::MulticastDataLink::OPENDDS_MAP ( MulticastPeer  ,
MulticastSession_rch   
) [private]

ACE_INLINE TransportReactorTask * OpenDDS::DCPS::MulticastDataLink::reactor_task (  ) 

Definition at line 48 of file MulticastDataLink.inl.

References reactor_task_.

Referenced by configure().

00049 {
00050   return this->reactor_task_;
00051 }

bool OpenDDS::DCPS::MulticastDataLink::ready_to_deliver ( const ReceivedDataSample data  )  [private]

Definition at line 332 of file MulticastDataLink.cpp.

References receive_strategy(), OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::received_header(), sessions_, and OpenDDS::DCPS::TransportHeader::source_.

Referenced by sample_received().

00333 {
00334   ACE_GUARD_RETURN(ACE_SYNCH_RECURSIVE_MUTEX,
00335       guard,
00336       this->session_lock_, false);
00337 
00338   const TransportHeader& theader = receive_strategy()->received_header();
00339 
00340   MulticastSessionMap::iterator session_it = sessions_.find(theader.source_);
00341   if (session_it != sessions_.end()) {
00342     MulticastSession_rch sess_rch(session_it->second);
00343     guard.release();
00344     return sess_rch->ready_to_deliver(theader, data);
00345   }
00346 
00347   return true;
00348 }

bool OpenDDS::DCPS::MulticastDataLink::reassemble ( ReceivedDataSample data,
const TransportHeader header 
)

Definition at line 252 of file MulticastDataLink.cpp.

References header.

Referenced by OpenDDS::DCPS::MulticastReceiveStrategy::reassemble().

00254 {
00255   ACE_GUARD_RETURN(ACE_SYNCH_RECURSIVE_MUTEX,
00256       guard,
00257       this->session_lock_,
00258       false);
00259 
00260   MulticastSessionMap::iterator it(this->sessions_.find(header.source_));
00261   if (it == this->sessions_.end()) return false;
00262   if (it->second->acked()) {
00263     return it->second->reassemble(data, header);
00264   }
00265   return false;
00266 }

ACE_INLINE MulticastReceiveStrategy * OpenDDS::DCPS::MulticastDataLink::receive_strategy (  ) 

Definition at line 30 of file MulticastDataLink.inl.

References OpenDDS::DCPS::RcHandle< T >::in(), and recv_strategy_.

Referenced by check_header(), ready_to_deliver(), and sample_received().

00031 {
00032   return this->recv_strategy_.in();
00033 }

void OpenDDS::DCPS::MulticastDataLink::receive_strategy ( MulticastReceiveStrategy recv_strategy  ) 

Definition at line 85 of file MulticastDataLink.cpp.

References recv_strategy_.

Referenced by OpenDDS::DCPS::ReliableSession::nak_received(), OpenDDS::DCPS::ReliableSession::nakack_received(), OpenDDS::DCPS::MulticastSession::syn_received(), and OpenDDS::DCPS::MulticastSession::synack_received().

00086 {
00087   this->recv_strategy_ = recv_strategy;
00088 }

void OpenDDS::DCPS::MulticastDataLink::release_remote_i ( const RepoId remote  )  [private, virtual]

Reimplemented from OpenDDS::DCPS::DataLink.

Definition at line 351 of file MulticastDataLink.cpp.

References session_lock_, and sessions_.

00352 {
00353   ACE_GUARD(ACE_SYNCH_RECURSIVE_MUTEX, guard, session_lock_);
00354   MulticastPeer remote_source = (ACE_INT64)RepoIdConverter(remote).federationId() << 32
00355                               | RepoIdConverter(remote).participantId();
00356   MulticastSessionMap::iterator session_it = sessions_.find(remote_source);
00357   if (session_it != sessions_.end() && session_it->second->is_reliable()) {
00358     session_it->second->release_remote(remote);
00359   }
00360 }

void OpenDDS::DCPS::MulticastDataLink::sample_received ( ReceivedDataSample sample  ) 

Definition at line 269 of file MulticastDataLink.cpp.

References OpenDDS::DCPS::DataLink::data_received(), OpenDDS::DCPS::ReceivedDataSample::header_, OpenDDS::DCPS::DataLink::is_active(), OpenDDS::DCPS::DataSampleHeader::message_id_, OpenDDS::DCPS::MULTICAST_SYN, ready_to_deliver(), receive_strategy(), OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::received_header(), OpenDDS::DCPS::ReceivedDataSample::sample_, sessions_, OpenDDS::DCPS::TransportHeader::source_, OpenDDS::DCPS::DataSampleHeader::submessage_id_, OpenDDS::DCPS::TransportHeader::swap_bytes(), syn_received_no_session(), and OpenDDS::DCPS::TRANSPORT_CONTROL.

Referenced by OpenDDS::DCPS::MulticastReceiveStrategy::deliver_sample().

00270 {
00271   switch (sample.header_.message_id_) {
00272   case TRANSPORT_CONTROL: {
00273     // Transport control samples are delivered to all sessions
00274     // regardless of association status:
00275     {
00276       char* const ptr = sample.sample_ ? sample.sample_->rd_ptr() : 0;
00277 
00278       ACE_GUARD(ACE_SYNCH_RECURSIVE_MUTEX,
00279           guard,
00280           this->session_lock_);
00281 
00282       const TransportHeader& theader = receive_strategy()->received_header();
00283 
00284       if (!is_active() && sample.header_.submessage_id_ == MULTICAST_SYN &&
00285           sessions_.find(theader.source_) == sessions_.end()) {
00286         // We have received a SYN but there is no session (yet) for this source.
00287         // Depending on the data, we may need to send SYNACK.
00288 
00289         guard.release();
00290         syn_received_no_session(theader.source_, sample.sample_,
00291                                 theader.swap_bytes());
00292 
00293         guard.acquire();
00294         MulticastSessionMap::iterator s_itr = sessions_.find(theader.source_);
00295         if (s_itr != sessions_.end()) {
00296           s_itr->second->record_header_received(theader);
00297         }
00298 
00299         if (ptr) {
00300           sample.sample_->rd_ptr(ptr);
00301         }
00302         return;
00303       }
00304 
00305       MulticastSessionMap temp_sessions(sessions_);
00306       guard.release();
00307 
00308       for (MulticastSessionMap::iterator it(temp_sessions.begin());
00309           it != temp_sessions.end(); ++it) {
00310         it->second->control_received(sample.header_.submessage_id_,
00311                                      sample.sample_);
00312         it->second->record_header_received(theader);
00313 
00314         // reset read pointer
00315         if (ptr) {
00316           sample.sample_->rd_ptr(ptr);
00317         }
00318       }
00319     }
00320   } break;
00321 
00322   default:
00323 
00324     if (ready_to_deliver(sample)) {
00325       data_received(sample);
00326     }
00327     break;
00328   }
00329 }

ACE_INLINE SingleSendBuffer * OpenDDS::DCPS::MulticastDataLink::send_buffer (  ) 

Definition at line 36 of file MulticastDataLink.inl.

References send_buffer_.

Referenced by OpenDDS::DCPS::ReliableSession::nak_received().

00037 {
00038   return this->send_buffer_;
00039 }

ACE_INLINE MulticastSendStrategy * OpenDDS::DCPS::MulticastDataLink::send_strategy (  ) 

Definition at line 24 of file MulticastDataLink.inl.

References OpenDDS::DCPS::RcHandle< T >::in(), and send_strategy_.

Referenced by send_strategy().

00025 {
00026   return this->send_strategy_.in();
00027 }

void OpenDDS::DCPS::MulticastDataLink::send_strategy ( MulticastSendStrategy send_strategy  ) 

Definition at line 64 of file MulticastDataLink.cpp.

References OpenDDS::DCPS::TransportSendStrategy::send_buffer(), send_strategy(), and send_strategy_.

00065 {
00066   // A send buffer may be bound to the send strategy to ensure a
00067   // configured number of most-recent datagrams are retained:
00068   if (this->session_factory_->requires_send_buffer()) {
00069     ACE_NEW_NORETURN(this->send_buffer_,
00070         SingleSendBuffer(this->config_->nak_depth_,
00071             this->config_->max_samples_per_packet_));
00072     if (!this->send_buffer_) {
00073       ACE_ERROR((LM_ERROR,
00074           ACE_TEXT("(%P|%t) ERROR: ")
00075           ACE_TEXT("MulticastDataLink::send_strategy: ")
00076           ACE_TEXT("failed to create SingleSendBuffer!\n")));
00077       return;
00078     }
00079     send_strategy->send_buffer(this->send_buffer_);
00080   }
00081   this->send_strategy_ = send_strategy;
00082 }

ACE_INLINE ACE_SOCK_Dgram_Mcast & OpenDDS::DCPS::MulticastDataLink::socket (  ) 

Definition at line 68 of file MulticastDataLink.inl.

References socket_.

Referenced by OpenDDS::DCPS::MulticastSendStrategy::async_send(), OpenDDS::DCPS::MulticastReceiveStrategy::get_handle(), OpenDDS::DCPS::MulticastReceiveStrategy::receive_bytes(), and OpenDDS::DCPS::MulticastSendStrategy::sync_send().

00069 {
00070   return this->socket_;
00071 }

void OpenDDS::DCPS::MulticastDataLink::stop_i (  )  [private, virtual]

This announces the "stop" event to our subclass. The "stop" event will occur when this DataLink is handling a release_reservations() call and determines that it has just released all of the remaining reservations on this DataLink. The "stop" event will also occur when the TransportImpl is being shutdown() - we call stop_i() from our transport_shutdown() method to handle this case.

Reimplemented from OpenDDS::DCPS::DataLink.

Definition at line 404 of file MulticastDataLink.cpp.

References sessions_, and socket_.

00405 {
00406   ACE_GUARD(ACE_SYNCH_RECURSIVE_MUTEX,
00407       guard,
00408       this->session_lock_);
00409 
00410   for (MulticastSessionMap::iterator it(this->sessions_.begin());
00411       it != this->sessions_.end(); ++it) {
00412     it->second->stop();
00413   }
00414   this->sessions_.clear();
00415 
00416   this->socket_.close();
00417 }

void OpenDDS::DCPS::MulticastDataLink::syn_received_no_session ( MulticastPeer  source,
ACE_Message_Block *  data,
bool  swap_bytes 
) [private]

Definition at line 363 of file MulticastDataLink.cpp.

References config_, OpenDDS::DCPS::DataLink::create_control(), header, local_peer(), local_peer_, OpenDDS::DCPS::MULTICAST_SYNACK, OpenDDS::DCPS::TransportInst::name(), OpenDDS::DCPS::MulticastTransport::passive_connection(), OpenDDS::DCPS::DataLink::send_control(), OpenDDS::DCPS::SEND_CONTROL_OK, transport_, and VDBG_LVL.

Referenced by sample_received().

00366 {
00367   Serializer serializer_read(data, swap_bytes);
00368 
00369   MulticastPeer local_peer;
00370   serializer_read >> local_peer;
00371 
00372   if (local_peer != local_peer_) {
00373     return;
00374   }
00375 
00376   VDBG_LVL((LM_DEBUG, "(%P|%t) MulticastDataLink[%C]::syn_received_no_session "
00377       "send_synack local %#08x%08x remote %#08x%08x\n",
00378       config_->name().c_str(),
00379       (unsigned int) (local_peer >> 32),
00380       (unsigned int) local_peer,
00381       (unsigned int) (source >> 32),
00382       (unsigned int) source), 2);
00383 
00384   ACE_Message_Block* synack_data = new ACE_Message_Block(sizeof(MulticastPeer));
00385 
00386   Serializer serializer_write(synack_data);
00387   serializer_write << source;
00388 
00389   DataSampleHeader header;
00390   ACE_Message_Block* control =
00391       create_control(MULTICAST_SYNACK, header, synack_data);
00392 
00393   const int error = send_control(header, control);
00394   if (error != SEND_CONTROL_OK) {
00395     ACE_ERROR((LM_ERROR, "(%P|%t) MulticastDataLink::syn_received_no_session: "
00396         "ERROR: send_control failed: %d!\n", error));
00397     return;
00398   }
00399 
00400   transport_->passive_connection(local_peer, source);
00401 }

ACE_INLINE MulticastTransport * OpenDDS::DCPS::MulticastDataLink::transport (  ) 

Definition at line 12 of file MulticastDataLink.inl.

References transport_.

Referenced by find_or_create_session(), OpenDDS::DCPS::NakWatchdog::reactor_is_shut_down(), OpenDDS::DCPS::SynWatchdog::reactor_is_shut_down(), and OpenDDS::DCPS::MulticastSession::syn_received().

00013 {
00014   return this->transport_;
00015 }


Member Data Documentation

MulticastInst* OpenDDS::DCPS::MulticastDataLink::config_ [private]

Definition at line 89 of file MulticastDataLink.h.

Referenced by config(), configure(), join(), and syn_received_no_session().

MulticastPeer OpenDDS::DCPS::MulticastDataLink::local_peer_ [private]

Definition at line 87 of file MulticastDataLink.h.

Referenced by local_peer(), and syn_received_no_session().

TransportReactorTask* OpenDDS::DCPS::MulticastDataLink::reactor_task_ [private]

Definition at line 91 of file MulticastDataLink.h.

Referenced by configure(), get_proactor(), get_reactor(), and reactor_task().

RepoIdSet OpenDDS::DCPS::MulticastDataLink::readers_selected_ [private]

Definition at line 111 of file MulticastDataLink.h.

RepoIdSet OpenDDS::DCPS::MulticastDataLink::readers_withheld_ [private]

Definition at line 111 of file MulticastDataLink.h.

MulticastReceiveStrategy_rch OpenDDS::DCPS::MulticastDataLink::recv_strategy_ [private]

Definition at line 94 of file MulticastDataLink.h.

Referenced by receive_strategy().

SingleSendBuffer* OpenDDS::DCPS::MulticastDataLink::send_buffer_ [private]

Definition at line 96 of file MulticastDataLink.h.

Referenced by send_buffer(), and ~MulticastDataLink().

MulticastSendStrategy_rch OpenDDS::DCPS::MulticastDataLink::send_strategy_ [private]

The transport send strategy object for this DataLink.

Reimplemented from OpenDDS::DCPS::DataLink.

Definition at line 93 of file MulticastDataLink.h.

Referenced by send_strategy(), and ~MulticastDataLink().

MulticastSessionFactory_rch OpenDDS::DCPS::MulticastDataLink::session_factory_ [private]

Definition at line 85 of file MulticastDataLink.h.

Referenced by find_or_create_session().

ACE_SYNCH_RECURSIVE_MUTEX OpenDDS::DCPS::MulticastDataLink::session_lock_ [private]

Definition at line 100 of file MulticastDataLink.h.

Referenced by release_remote_i().

MulticastSessionMap OpenDDS::DCPS::MulticastDataLink::sessions_ [private]

Definition at line 103 of file MulticastDataLink.h.

Referenced by find_or_create_session(), ready_to_deliver(), release_remote_i(), sample_received(), and stop_i().

ACE_SOCK_Dgram_Mcast OpenDDS::DCPS::MulticastDataLink::socket_ [private]

Definition at line 98 of file MulticastDataLink.h.

Referenced by join(), socket(), and stop_i().

MulticastTransport* OpenDDS::DCPS::MulticastDataLink::transport_ [private]

Definition at line 83 of file MulticastDataLink.h.

Referenced by syn_received_no_session(), and transport().


The documentation for this class was generated from the following files:
Generated on Fri Feb 12 20:06:34 2016 for OpenDDS by  doxygen 1.4.7