00001
00002
00003
00004
00005
00006
00007
00008 #include "MulticastReceiveStrategy.h"
00009 #include "MulticastDataLink.h"
00010
00011 #include "ace/Reactor.h"
00012
00013 namespace OpenDDS {
00014 namespace DCPS {
00015
00016 MulticastReceiveStrategy::MulticastReceiveStrategy(MulticastDataLink* link)
00017 : link_(link)
00018 {
00019 }
00020
00021 ACE_HANDLE
00022 MulticastReceiveStrategy::get_handle() const
00023 {
00024 ACE_SOCK_Dgram_Mcast& socket = this->link_->socket();
00025 return socket.get_handle();
00026 }
00027
00028 int
00029 MulticastReceiveStrategy::handle_input(ACE_HANDLE fd)
00030 {
00031 const int result = this->handle_dds_input(fd);
00032 if (result >= 0 && this->pdu_remaining()) {
00033 VDBG_LVL((LM_DEBUG, "(%P|%t) MulticastReceiveStrategy[%@]::handle_input "
00034 "resetting with %B bytes remaining\n", this, this->pdu_remaining()), 4);
00035 this->reset();
00036 }
00037 return result;
00038 }
00039
00040 ssize_t
00041 MulticastReceiveStrategy::receive_bytes(iovec iov[],
00042 int n,
00043 ACE_INET_Addr& remote_address,
00044 ACE_HANDLE )
00045 {
00046 ACE_SOCK_Dgram_Mcast& socket = this->link_->socket();
00047 return socket.recv(iov, n, remote_address);
00048 }
00049
00050 bool
00051 MulticastReceiveStrategy::check_header(const TransportHeader& header)
00052 {
00053 return this->link_->check_header(header);
00054 }
00055
00056 bool
00057 MulticastReceiveStrategy::check_header(const DataSampleHeader& header)
00058 {
00059 return this->link_->check_header(header);
00060 }
00061
00062 void
00063 MulticastReceiveStrategy::deliver_sample(ReceivedDataSample& sample,
00064 const ACE_INET_Addr& )
00065 {
00066 this->link_->sample_received(sample);
00067 }
00068
00069 bool
00070 MulticastReceiveStrategy::reassemble(ReceivedDataSample& data)
00071 {
00072 return this->link_->reassemble(data, received_header());
00073 }
00074
00075 int
00076 MulticastReceiveStrategy::start_i()
00077 {
00078 ACE_Reactor* reactor = this->link_->get_reactor();
00079 if (reactor == 0) {
00080 ACE_ERROR_RETURN((LM_ERROR,
00081 ACE_TEXT("(%P|%t) ERROR: ")
00082 ACE_TEXT("MulticastReceiveStrategy::start_i: ")
00083 ACE_TEXT("NULL reactor reference!\n")),
00084 -1);
00085 }
00086
00087 if (reactor->register_handler(this, ACE_Event_Handler::READ_MASK) != 0) {
00088 ACE_ERROR_RETURN((LM_ERROR,
00089 ACE_TEXT("(%P|%t) ERROR: ")
00090 ACE_TEXT("MulticastReceiveStrategy::start_i: ")
00091 ACE_TEXT("failed to register handler for DataLink!\n")),
00092 -1);
00093 }
00094
00095 return 0;
00096 }
00097
00098 void
00099 MulticastReceiveStrategy::stop_i()
00100 {
00101 ACE_Reactor* reactor = this->link_->get_reactor();
00102 if (reactor == 0) {
00103 ACE_ERROR((LM_ERROR,
00104 ACE_TEXT("(%P|%t) ERROR: ")
00105 ACE_TEXT("MulticastReceiveStrategy::stop_i: ")
00106 ACE_TEXT("NULL reactor reference!\n")));
00107 return;
00108 }
00109
00110 reactor->remove_handler(this, ACE_Event_Handler::READ_MASK);
00111 }
00112
00113 }
00114 }