UpdateListener_T.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "UpdateListener_T.h"
00009 #include "FederatorConfig.h"
00010 #include "dds/DCPS/debug.h"
00011 
00012 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00013 
00014 namespace OpenDDS {
00015 namespace Federator {
00016 
00017 template<class DataType, class ReaderType>
00018 UpdateListener<DataType, ReaderType>::UpdateListener(UpdateProcessor<DataType>& processor)
00019   : federationId_(NIL_REPOSITORY),
00020     receiver_(processor)
00021 {
00022   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00023     ACE_DEBUG((LM_DEBUG,
00024                ACE_TEXT("(%P|%t) UpdateListener::UpdateListener\n")));
00025   }
00026 }
00027 
00028 template<class DataType, class ReaderType>
00029 UpdateListener<DataType, ReaderType>::~UpdateListener()
00030 {
00031   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00032     ACE_DEBUG((LM_DEBUG,
00033                ACE_TEXT("(%P|%t) UpdateListener::~UpdateListener\n")));
00034   }
00035 }
00036 
00037 template<class DataType, class ReaderType>
00038 void
00039 UpdateListener<DataType, ReaderType>::federationId(const TAO_DDS_DCPSFederationId& id)
00040 {
00041   this->federationId_ = id;
00042 }
00043 
00044 template<class DataType, class ReaderType>
00045 const TAO_DDS_DCPSFederationId&
00046 UpdateListener<DataType, ReaderType>::federationId() const
00047 {
00048   return this->federationId_;
00049 }
00050 
00051 template<class DataType, class ReaderType>
00052 void
00053 UpdateListener<DataType, ReaderType>::on_data_available(
00054   DDS::DataReader_ptr reader)
00055 {
00056   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00057     ACE_DEBUG((LM_DEBUG,
00058                ACE_TEXT("(%P|%t) UpdateListener::on_data_available\n")));
00059   }
00060 
00061   try {
00062     // Get the type specific reader.
00063     typename ReaderType::_var_type dataReader = ReaderType::_narrow(reader);
00064 
00065     if (CORBA::is_nil(dataReader.in())) {
00066       ACE_ERROR((LM_ERROR,
00067                  ACE_TEXT("(%P|%t) UpdateListener::on_data_available - _narrow failed.\n")));
00068       return;
00069     }
00070 
00071     // Process all available data.
00072     while (true) {
00073       using namespace OpenDDS::DCPS;
00074       unique_ptr<DataType> sample(new DataType);
00075       unique_ptr<DDS::SampleInfo>  info(new DDS::SampleInfo);
00076       DDS::ReturnCode_t status = dataReader->read_next_sample(*sample, *info);
00077 
00078       if (status == DDS::RETCODE_OK) {
00079         // Check if we should process the sample.
00080         if (this->federationId_.overridden() &&
00081             this->federationId_.id() != sample->sender) {
00082 
00083           // Delegate processing to the federation manager.
00084           this->receiver_.add(move(sample), move(info));
00085         }
00086 
00087       } else if (status == DDS::RETCODE_NO_DATA) {
00088         break;
00089 
00090       } else {
00091         ACE_ERROR((LM_ERROR,
00092                    ACE_TEXT("(%P|%t) ERROR: UpdateListener::on_data_available: read status==%d\n"),
00093                    status));
00094         break;
00095       }
00096     }
00097 
00098   } catch (const CORBA::Exception& ex) {
00099     ex._tao_print_exception("(%P|%t) UpdateListener::read - ");
00100   }
00101 }
00102 
00103 template<class DataType, class ReaderType>
00104 void
00105 UpdateListener<DataType, ReaderType>::on_requested_deadline_missed(
00106   DDS::DataReader_ptr /* reader */,
00107   const DDS::RequestedDeadlineMissedStatus & /* status */)
00108 {
00109   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00110     ACE_DEBUG((LM_DEBUG,
00111                ACE_TEXT("(%P|%t) ")
00112                ACE_TEXT("Federatorer::on_requested_deadline_missed\n")));
00113   }
00114 }
00115 
00116 template<class DataType, class ReaderType>
00117 void
00118 UpdateListener<DataType, ReaderType>::on_requested_incompatible_qos(
00119   DDS::DataReader_ptr /* reader */,
00120   const DDS::RequestedIncompatibleQosStatus & /* status */)
00121 {
00122   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00123     ACE_DEBUG((LM_DEBUG,
00124                ACE_TEXT("(%P|%t) UpdateListener::")
00125                ACE_TEXT("on_requested_incompatible_qos\n")));
00126   }
00127 }
00128 
00129 template<class DataType, class ReaderType>
00130 void
00131 UpdateListener<DataType, ReaderType>::on_liveliness_changed(
00132   DDS::DataReader_ptr /* reader */,
00133   const DDS::LivelinessChangedStatus & /* status */)
00134 {
00135   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00136     ACE_DEBUG((LM_DEBUG,
00137                ACE_TEXT("(%P|%t) UpdateListener::on_liveliness_changed\n")));
00138   }
00139 }
00140 
00141 template<class DataType, class ReaderType>
00142 void
00143 UpdateListener<DataType, ReaderType>::on_subscription_matched(
00144   DDS::DataReader_ptr /* reader */,
00145   const DDS::SubscriptionMatchedStatus & /* status */)
00146 {
00147   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00148     ACE_DEBUG((LM_DEBUG,
00149                ACE_TEXT("(%P|%t) UpdateListener::on_subscription_matched\n")));
00150   }
00151 }
00152 
00153 template<class DataType, class ReaderType>
00154 void
00155 UpdateListener<DataType, ReaderType>::on_sample_rejected(
00156   DDS::DataReader_ptr /* reader */,
00157   const DDS::SampleRejectedStatus& /* status */)
00158 {
00159   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00160     ACE_DEBUG((LM_DEBUG,
00161                ACE_TEXT("(%P|%t) UpdateListener::on_sample_rejected\n")));
00162   }
00163 }
00164 
00165 template<class DataType, class ReaderType>
00166 void
00167 UpdateListener<DataType, ReaderType>::on_sample_lost(
00168   DDS::DataReader_ptr /* reader */,
00169   const DDS::SampleLostStatus& /* status */)
00170 {
00171   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00172     ACE_DEBUG((LM_DEBUG,
00173                ACE_TEXT("(%P|%t) UpdateListener::on_sample_lost\n")));
00174   }
00175 }
00176 
00177 template<class DataType, class ReaderType>
00178 void
00179 UpdateListener<DataType, ReaderType>::stop()
00180 {
00181   this->receiver_.stop();
00182 }
00183 
00184 template<class DataType, class ReaderType>
00185 void
00186 UpdateListener<DataType, ReaderType>::join()
00187 {
00188   this->receiver_.wait();
00189 }
00190 
00191 } // namespace Federator
00192 } // namespace OpenDDS
00193 
00194 OPENDDS_END_VERSIONED_NAMESPACE_DECL
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1