00001
00002
00003
00004
00005
00006
00007
00008 #include "UpdateListener_T.h"
00009 #include "FederatorConfig.h"
00010 #include "dds/DCPS/debug.h"
00011
00012 namespace OpenDDS {
00013 namespace Federator {
00014
00015 template<class DataType, class ReaderType>
00016 UpdateListener<DataType, ReaderType>::UpdateListener(UpdateProcessor<DataType>& processor)
00017 : federationId_(NIL_REPOSITORY),
00018 receiver_(processor)
00019 {
00020 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00021 ACE_DEBUG((LM_DEBUG,
00022 ACE_TEXT("(%P|%t) UpdateListener::UpdateListener\n")));
00023 }
00024 }
00025
00026 template<class DataType, class ReaderType>
00027 UpdateListener<DataType, ReaderType>::~UpdateListener()
00028 {
00029 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00030 ACE_DEBUG((LM_DEBUG,
00031 ACE_TEXT("(%P|%t) UpdateListener::~UpdateListener\n")));
00032 }
00033 }
00034
00035 template<class DataType, class ReaderType>
00036 void
00037 UpdateListener<DataType, ReaderType>::federationId(const TAO_DDS_DCPSFederationId& id)
00038 {
00039 this->federationId_ = id;
00040 }
00041
00042 template<class DataType, class ReaderType>
00043 const TAO_DDS_DCPSFederationId&
00044 UpdateListener<DataType, ReaderType>::federationId() const
00045 {
00046 return this->federationId_;
00047 }
00048
00049 template<class DataType, class ReaderType>
00050 void
00051 UpdateListener<DataType, ReaderType>::on_data_available(
00052 DDS::DataReader_ptr reader)
00053 {
00054 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00055 ACE_DEBUG((LM_DEBUG,
00056 ACE_TEXT("(%P|%t) UpdateListener::on_data_available\n")));
00057 }
00058
00059 try {
00060
00061 typename ReaderType::_var_type dataReader = ReaderType::_narrow(reader);
00062
00063 if (CORBA::is_nil(dataReader.in())) {
00064 ACE_ERROR((LM_ERROR,
00065 ACE_TEXT("(%P|%t) UpdateListener::on_data_available - _narrow failed.\n")));
00066 return;
00067 }
00068
00069
00070 while (true) {
00071 DataType* sample = new DataType();
00072 DDS::SampleInfo* info = new DDS::SampleInfo();
00073 DDS::ReturnCode_t status = dataReader->read_next_sample(*sample, *info);
00074
00075 if (status == DDS::RETCODE_OK) {
00076
00077 if (this->federationId_.overridden() &&
00078 this->federationId_.id() != sample->sender) {
00079
00080
00081 this->receiver_.add(sample, info);
00082 }
00083
00084 } else if (status == DDS::RETCODE_NO_DATA) {
00085 break;
00086
00087 } else {
00088 ACE_ERROR((LM_ERROR,
00089 ACE_TEXT("(%P|%t) ERROR: UpdateListener::on_data_available: read status==%d\n"),
00090 status));
00091 break;
00092 }
00093 }
00094
00095 } catch (const CORBA::Exception& ex) {
00096 ex._tao_print_exception("(%P|%t) UpdateListener::read - ");
00097 }
00098 }
00099
00100 template<class DataType, class ReaderType>
00101 void
00102 UpdateListener<DataType, ReaderType>::on_requested_deadline_missed(
00103 DDS::DataReader_ptr ,
00104 const DDS::RequestedDeadlineMissedStatus & )
00105 {
00106 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00107 ACE_DEBUG((LM_DEBUG,
00108 ACE_TEXT("(%P|%t) ")
00109 ACE_TEXT("Federatorer::on_requested_deadline_missed\n")));
00110 }
00111 }
00112
00113 template<class DataType, class ReaderType>
00114 void
00115 UpdateListener<DataType, ReaderType>::on_requested_incompatible_qos(
00116 DDS::DataReader_ptr ,
00117 const DDS::RequestedIncompatibleQosStatus & )
00118 {
00119 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00120 ACE_DEBUG((LM_DEBUG,
00121 ACE_TEXT("(%P|%t) UpdateListener::")
00122 ACE_TEXT("on_requested_incompatible_qos\n")));
00123 }
00124 }
00125
00126 template<class DataType, class ReaderType>
00127 void
00128 UpdateListener<DataType, ReaderType>::on_liveliness_changed(
00129 DDS::DataReader_ptr ,
00130 const DDS::LivelinessChangedStatus & )
00131 {
00132 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00133 ACE_DEBUG((LM_DEBUG,
00134 ACE_TEXT("(%P|%t) UpdateListener::on_liveliness_changed\n")));
00135 }
00136 }
00137
00138 template<class DataType, class ReaderType>
00139 void
00140 UpdateListener<DataType, ReaderType>::on_subscription_matched(
00141 DDS::DataReader_ptr ,
00142 const DDS::SubscriptionMatchedStatus & )
00143 {
00144 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00145 ACE_DEBUG((LM_DEBUG,
00146 ACE_TEXT("(%P|%t) UpdateListener::on_subscription_matched\n")));
00147 }
00148 }
00149
00150 template<class DataType, class ReaderType>
00151 void
00152 UpdateListener<DataType, ReaderType>::on_sample_rejected(
00153 DDS::DataReader_ptr ,
00154 const DDS::SampleRejectedStatus& )
00155 {
00156 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00157 ACE_DEBUG((LM_DEBUG,
00158 ACE_TEXT("(%P|%t) UpdateListener::on_sample_rejected\n")));
00159 }
00160 }
00161
00162 template<class DataType, class ReaderType>
00163 void
00164 UpdateListener<DataType, ReaderType>::on_sample_lost(
00165 DDS::DataReader_ptr ,
00166 const DDS::SampleLostStatus& )
00167 {
00168 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00169 ACE_DEBUG((LM_DEBUG,
00170 ACE_TEXT("(%P|%t) UpdateListener::on_sample_lost\n")));
00171 }
00172 }
00173
00174 template<class DataType, class ReaderType>
00175 void
00176 UpdateListener<DataType, ReaderType>::stop()
00177 {
00178 this->receiver_.stop();
00179 }
00180
00181 template<class DataType, class ReaderType>
00182 void
00183 UpdateListener<DataType, ReaderType>::join()
00184 {
00185 this->receiver_.wait();
00186 }
00187
00188 }
00189 }