UpdateListener_T.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "UpdateListener_T.h"
00009 #include "FederatorConfig.h"
00010 #include "dds/DCPS/debug.h"
00011 
00012 namespace OpenDDS {
00013 namespace Federator {
00014 
00015 template<class DataType, class ReaderType>
00016 UpdateListener<DataType, ReaderType>::UpdateListener(UpdateProcessor<DataType>& processor)
00017   : federationId_(NIL_REPOSITORY),
00018     receiver_(processor)
00019 {
00020   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00021     ACE_DEBUG((LM_DEBUG,
00022                ACE_TEXT("(%P|%t) UpdateListener::UpdateListener\n")));
00023   }
00024 }
00025 
00026 template<class DataType, class ReaderType>
00027 UpdateListener<DataType, ReaderType>::~UpdateListener()
00028 {
00029   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00030     ACE_DEBUG((LM_DEBUG,
00031                ACE_TEXT("(%P|%t) UpdateListener::~UpdateListener\n")));
00032   }
00033 }
00034 
00035 template<class DataType, class ReaderType>
00036 void
00037 UpdateListener<DataType, ReaderType>::federationId(const TAO_DDS_DCPSFederationId& id)
00038 {
00039   this->federationId_ = id;
00040 }
00041 
00042 template<class DataType, class ReaderType>
00043 const TAO_DDS_DCPSFederationId&
00044 UpdateListener<DataType, ReaderType>::federationId() const
00045 {
00046   return this->federationId_;
00047 }
00048 
00049 template<class DataType, class ReaderType>
00050 void
00051 UpdateListener<DataType, ReaderType>::on_data_available(
00052   DDS::DataReader_ptr reader)
00053 {
00054   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00055     ACE_DEBUG((LM_DEBUG,
00056                ACE_TEXT("(%P|%t) UpdateListener::on_data_available\n")));
00057   }
00058 
00059   try {
00060     // Get the type specific reader.
00061     typename ReaderType::_var_type dataReader = ReaderType::_narrow(reader);
00062 
00063     if (CORBA::is_nil(dataReader.in())) {
00064       ACE_ERROR((LM_ERROR,
00065                  ACE_TEXT("(%P|%t) UpdateListener::on_data_available - _narrow failed.\n")));
00066       return;
00067     }
00068 
00069     // Process all available data.
00070     while (true) {
00071       DataType*           sample = new DataType();
00072       DDS::SampleInfo*  info   = new DDS::SampleInfo();
00073       DDS::ReturnCode_t status = dataReader->read_next_sample(*sample, *info);
00074 
00075       if (status == DDS::RETCODE_OK) {
00076         // Check if we should process the sample.
00077         if (this->federationId_.overridden() &&
00078             this->federationId_.id() != sample->sender) {
00079 
00080           // Delegate processing to the federation manager.
00081           this->receiver_.add(sample, info);
00082         }
00083 
00084       } else if (status == DDS::RETCODE_NO_DATA) {
00085         break;
00086 
00087       } else {
00088         ACE_ERROR((LM_ERROR,
00089                    ACE_TEXT("(%P|%t) ERROR: UpdateListener::on_data_available: read status==%d\n"),
00090                    status));
00091         break;
00092       }
00093     }
00094 
00095   } catch (const CORBA::Exception& ex) {
00096     ex._tao_print_exception("(%P|%t) UpdateListener::read - ");
00097   }
00098 }
00099 
00100 template<class DataType, class ReaderType>
00101 void
00102 UpdateListener<DataType, ReaderType>::on_requested_deadline_missed(
00103   DDS::DataReader_ptr /* reader */,
00104   const DDS::RequestedDeadlineMissedStatus & /* status */)
00105 {
00106   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00107     ACE_DEBUG((LM_DEBUG,
00108                ACE_TEXT("(%P|%t) ")
00109                ACE_TEXT("Federatorer::on_requested_deadline_missed\n")));
00110   }
00111 }
00112 
00113 template<class DataType, class ReaderType>
00114 void
00115 UpdateListener<DataType, ReaderType>::on_requested_incompatible_qos(
00116   DDS::DataReader_ptr /* reader */,
00117   const DDS::RequestedIncompatibleQosStatus & /* status */)
00118 {
00119   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00120     ACE_DEBUG((LM_DEBUG,
00121                ACE_TEXT("(%P|%t) UpdateListener::")
00122                ACE_TEXT("on_requested_incompatible_qos\n")));
00123   }
00124 }
00125 
00126 template<class DataType, class ReaderType>
00127 void
00128 UpdateListener<DataType, ReaderType>::on_liveliness_changed(
00129   DDS::DataReader_ptr /* reader */,
00130   const DDS::LivelinessChangedStatus & /* status */)
00131 {
00132   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00133     ACE_DEBUG((LM_DEBUG,
00134                ACE_TEXT("(%P|%t) UpdateListener::on_liveliness_changed\n")));
00135   }
00136 }
00137 
00138 template<class DataType, class ReaderType>
00139 void
00140 UpdateListener<DataType, ReaderType>::on_subscription_matched(
00141   DDS::DataReader_ptr /* reader */,
00142   const DDS::SubscriptionMatchedStatus & /* status */)
00143 {
00144   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00145     ACE_DEBUG((LM_DEBUG,
00146                ACE_TEXT("(%P|%t) UpdateListener::on_subscription_matched\n")));
00147   }
00148 }
00149 
00150 template<class DataType, class ReaderType>
00151 void
00152 UpdateListener<DataType, ReaderType>::on_sample_rejected(
00153   DDS::DataReader_ptr /* reader */,
00154   const DDS::SampleRejectedStatus& /* status */)
00155 {
00156   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00157     ACE_DEBUG((LM_DEBUG,
00158                ACE_TEXT("(%P|%t) UpdateListener::on_sample_rejected\n")));
00159   }
00160 }
00161 
00162 template<class DataType, class ReaderType>
00163 void
00164 UpdateListener<DataType, ReaderType>::on_sample_lost(
00165   DDS::DataReader_ptr /* reader */,
00166   const DDS::SampleLostStatus& /* status */)
00167 {
00168   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00169     ACE_DEBUG((LM_DEBUG,
00170                ACE_TEXT("(%P|%t) UpdateListener::on_sample_lost\n")));
00171   }
00172 }
00173 
00174 template<class DataType, class ReaderType>
00175 void
00176 UpdateListener<DataType, ReaderType>::stop()
00177 {
00178   this->receiver_.stop();
00179 }
00180 
00181 template<class DataType, class ReaderType>
00182 void
00183 UpdateListener<DataType, ReaderType>::join()
00184 {
00185   this->receiver_.wait();
00186 }
00187 
00188 } // namespace Federator
00189 } // namespace OpenDDS

Generated on Fri Feb 12 20:05:29 2016 for OpenDDS by  doxygen 1.4.7