MulticastReceiveStrategy.cpp
Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008 #include "MulticastReceiveStrategy.h"
00009 #include "MulticastDataLink.h"
00010
00011 #include "ace/Reactor.h"
00012
00013 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00014
00015 namespace OpenDDS {
00016 namespace DCPS {
00017
00018 MulticastReceiveStrategy::MulticastReceiveStrategy(MulticastDataLink* link)
00019 : link_(link)
00020 {
00021 }
00022
00023 ACE_HANDLE
00024 MulticastReceiveStrategy::get_handle() const
00025 {
00026 ACE_SOCK_Dgram_Mcast& socket = this->link_->socket();
00027 return socket.get_handle();
00028 }
00029
00030 int
00031 MulticastReceiveStrategy::handle_input(ACE_HANDLE fd)
00032 {
00033 const int result = this->handle_dds_input(fd);
00034 if (result >= 0 && this->pdu_remaining()) {
00035 VDBG_LVL((LM_DEBUG, "(%P|%t) MulticastReceiveStrategy[%@]::handle_input "
00036 "resetting with %B bytes remaining\n", this, this->pdu_remaining()), 4);
00037 this->reset();
00038 }
00039 return result;
00040 }
00041
00042 ssize_t
00043 MulticastReceiveStrategy::receive_bytes(iovec iov[],
00044 int n,
00045 ACE_INET_Addr& remote_address,
00046 ACE_HANDLE )
00047 {
00048 ACE_SOCK_Dgram_Mcast& socket = this->link_->socket();
00049 return socket.recv(iov, n, remote_address);
00050 }
00051
00052 bool
00053 MulticastReceiveStrategy::check_header(const TransportHeader& header)
00054 {
00055 return this->link_->check_header(header);
00056 }
00057
00058 bool
00059 MulticastReceiveStrategy::check_header(const DataSampleHeader& header)
00060 {
00061 return this->link_->check_header(header);
00062 }
00063
00064 void
00065 MulticastReceiveStrategy::deliver_sample(ReceivedDataSample& sample,
00066 const ACE_INET_Addr& )
00067 {
00068 this->link_->sample_received(sample);
00069 }
00070
00071 bool
00072 MulticastReceiveStrategy::reassemble(ReceivedDataSample& data)
00073 {
00074 return this->link_->reassemble(data, received_header());
00075 }
00076
00077 int
00078 MulticastReceiveStrategy::start_i()
00079 {
00080 ACE_Reactor* reactor = this->link_->get_reactor();
00081 if (reactor == 0) {
00082 ACE_ERROR_RETURN((LM_ERROR,
00083 ACE_TEXT("(%P|%t) ERROR: ")
00084 ACE_TEXT("MulticastReceiveStrategy::start_i: ")
00085 ACE_TEXT("NULL reactor reference!\n")),
00086 -1);
00087 }
00088
00089 if (reactor->register_handler(this, ACE_Event_Handler::READ_MASK) != 0) {
00090 ACE_ERROR_RETURN((LM_ERROR,
00091 ACE_TEXT("(%P|%t) ERROR: ")
00092 ACE_TEXT("MulticastReceiveStrategy::start_i: ")
00093 ACE_TEXT("failed to register handler for DataLink!\n")),
00094 -1);
00095 }
00096
00097 return 0;
00098 }
00099
00100 void
00101 MulticastReceiveStrategy::stop_i()
00102 {
00103 ACE_Reactor* reactor = this->link_->get_reactor();
00104 if (reactor == 0) {
00105 ACE_ERROR((LM_ERROR,
00106 ACE_TEXT("(%P|%t) ERROR: ")
00107 ACE_TEXT("MulticastReceiveStrategy::stop_i: ")
00108 ACE_TEXT("NULL reactor reference!\n")));
00109 return;
00110 }
00111
00112 reactor->remove_handler(this, ACE_Event_Handler::READ_MASK);
00113 }
00114
00115 }
00116 }
00117
00118 OPENDDS_END_VERSIONED_NAMESPACE_DECL