00001
00002
00003
00004
00005
00006
00007
00008 #include "UpdateListener_T.h"
00009 #include "FederatorConfig.h"
00010 #include "dds/DCPS/debug.h"
00011
00012 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00013
00014 namespace OpenDDS {
00015 namespace Federator {
00016
00017 template<class DataType, class ReaderType>
00018 UpdateListener<DataType, ReaderType>::UpdateListener(UpdateProcessor<DataType>& processor)
00019 : federationId_(NIL_REPOSITORY),
00020 receiver_(processor)
00021 {
00022 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00023 ACE_DEBUG((LM_DEBUG,
00024 ACE_TEXT("(%P|%t) UpdateListener::UpdateListener\n")));
00025 }
00026 }
00027
00028 template<class DataType, class ReaderType>
00029 UpdateListener<DataType, ReaderType>::~UpdateListener()
00030 {
00031 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00032 ACE_DEBUG((LM_DEBUG,
00033 ACE_TEXT("(%P|%t) UpdateListener::~UpdateListener\n")));
00034 }
00035 }
00036
00037 template<class DataType, class ReaderType>
00038 void
00039 UpdateListener<DataType, ReaderType>::federationId(const TAO_DDS_DCPSFederationId& id)
00040 {
00041 this->federationId_ = id;
00042 }
00043
00044 template<class DataType, class ReaderType>
00045 const TAO_DDS_DCPSFederationId&
00046 UpdateListener<DataType, ReaderType>::federationId() const
00047 {
00048 return this->federationId_;
00049 }
00050
00051 template<class DataType, class ReaderType>
00052 void
00053 UpdateListener<DataType, ReaderType>::on_data_available(
00054 DDS::DataReader_ptr reader)
00055 {
00056 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00057 ACE_DEBUG((LM_DEBUG,
00058 ACE_TEXT("(%P|%t) UpdateListener::on_data_available\n")));
00059 }
00060
00061 try {
00062
00063 typename ReaderType::_var_type dataReader = ReaderType::_narrow(reader);
00064
00065 if (CORBA::is_nil(dataReader.in())) {
00066 ACE_ERROR((LM_ERROR,
00067 ACE_TEXT("(%P|%t) UpdateListener::on_data_available - _narrow failed.\n")));
00068 return;
00069 }
00070
00071
00072 while (true) {
00073 using namespace OpenDDS::DCPS;
00074 unique_ptr<DataType> sample(new DataType);
00075 unique_ptr<DDS::SampleInfo> info(new DDS::SampleInfo);
00076 DDS::ReturnCode_t status = dataReader->read_next_sample(*sample, *info);
00077
00078 if (status == DDS::RETCODE_OK) {
00079
00080 if (this->federationId_.overridden() &&
00081 this->federationId_.id() != sample->sender) {
00082
00083
00084 this->receiver_.add(move(sample), move(info));
00085 }
00086
00087 } else if (status == DDS::RETCODE_NO_DATA) {
00088 break;
00089
00090 } else {
00091 ACE_ERROR((LM_ERROR,
00092 ACE_TEXT("(%P|%t) ERROR: UpdateListener::on_data_available: read status==%d\n"),
00093 status));
00094 break;
00095 }
00096 }
00097
00098 } catch (const CORBA::Exception& ex) {
00099 ex._tao_print_exception("(%P|%t) UpdateListener::read - ");
00100 }
00101 }
00102
00103 template<class DataType, class ReaderType>
00104 void
00105 UpdateListener<DataType, ReaderType>::on_requested_deadline_missed(
00106 DDS::DataReader_ptr ,
00107 const DDS::RequestedDeadlineMissedStatus & )
00108 {
00109 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00110 ACE_DEBUG((LM_DEBUG,
00111 ACE_TEXT("(%P|%t) ")
00112 ACE_TEXT("Federatorer::on_requested_deadline_missed\n")));
00113 }
00114 }
00115
00116 template<class DataType, class ReaderType>
00117 void
00118 UpdateListener<DataType, ReaderType>::on_requested_incompatible_qos(
00119 DDS::DataReader_ptr ,
00120 const DDS::RequestedIncompatibleQosStatus & )
00121 {
00122 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00123 ACE_DEBUG((LM_DEBUG,
00124 ACE_TEXT("(%P|%t) UpdateListener::")
00125 ACE_TEXT("on_requested_incompatible_qos\n")));
00126 }
00127 }
00128
00129 template<class DataType, class ReaderType>
00130 void
00131 UpdateListener<DataType, ReaderType>::on_liveliness_changed(
00132 DDS::DataReader_ptr ,
00133 const DDS::LivelinessChangedStatus & )
00134 {
00135 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00136 ACE_DEBUG((LM_DEBUG,
00137 ACE_TEXT("(%P|%t) UpdateListener::on_liveliness_changed\n")));
00138 }
00139 }
00140
00141 template<class DataType, class ReaderType>
00142 void
00143 UpdateListener<DataType, ReaderType>::on_subscription_matched(
00144 DDS::DataReader_ptr ,
00145 const DDS::SubscriptionMatchedStatus & )
00146 {
00147 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00148 ACE_DEBUG((LM_DEBUG,
00149 ACE_TEXT("(%P|%t) UpdateListener::on_subscription_matched\n")));
00150 }
00151 }
00152
00153 template<class DataType, class ReaderType>
00154 void
00155 UpdateListener<DataType, ReaderType>::on_sample_rejected(
00156 DDS::DataReader_ptr ,
00157 const DDS::SampleRejectedStatus& )
00158 {
00159 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00160 ACE_DEBUG((LM_DEBUG,
00161 ACE_TEXT("(%P|%t) UpdateListener::on_sample_rejected\n")));
00162 }
00163 }
00164
00165 template<class DataType, class ReaderType>
00166 void
00167 UpdateListener<DataType, ReaderType>::on_sample_lost(
00168 DDS::DataReader_ptr ,
00169 const DDS::SampleLostStatus& )
00170 {
00171 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00172 ACE_DEBUG((LM_DEBUG,
00173 ACE_TEXT("(%P|%t) UpdateListener::on_sample_lost\n")));
00174 }
00175 }
00176
00177 template<class DataType, class ReaderType>
00178 void
00179 UpdateListener<DataType, ReaderType>::stop()
00180 {
00181 this->receiver_.stop();
00182 }
00183
00184 template<class DataType, class ReaderType>
00185 void
00186 UpdateListener<DataType, ReaderType>::join()
00187 {
00188 this->receiver_.wait();
00189 }
00190
00191 }
00192 }
00193
00194 OPENDDS_END_VERSIONED_NAMESPACE_DECL