OpenDDS  Snapshot(2023/04/28-20:55)
UpdateListener_T.cpp
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 #include "UpdateListener_T.h"
9 #include "FederatorConfig.h"
10 #include "dds/DCPS/debug.h"
11 
13 
14 namespace OpenDDS {
15 namespace Federator {
16 
17 template<class DataType, class ReaderType>
19  : federationId_(NIL_REPOSITORY),
20  receiver_(processor)
21 {
24  ACE_TEXT("(%P|%t) UpdateListener::UpdateListener\n")));
25  }
26 }
27 
28 template<class DataType, class ReaderType>
30 {
33  ACE_TEXT("(%P|%t) UpdateListener::~UpdateListener\n")));
34  }
35 }
36 
37 template<class DataType, class ReaderType>
38 void
40 {
41  this->federationId_ = id;
42 }
43 
44 template<class DataType, class ReaderType>
47 {
48  return this->federationId_;
49 }
50 
51 template<class DataType, class ReaderType>
52 void
54  DDS::DataReader_ptr reader)
55 {
58  ACE_TEXT("(%P|%t) UpdateListener::on_data_available\n")));
59  }
60 
61  try {
62  // Get the type specific reader.
63  typename ReaderType::_var_type dataReader = ReaderType::_narrow(reader);
64 
65  if (CORBA::is_nil(dataReader.in())) {
67  ACE_TEXT("(%P|%t) UpdateListener::on_data_available - _narrow failed.\n")));
68  return;
69  }
70 
71  // Process all available data.
72  while (true) {
73  using namespace OpenDDS::DCPS;
74  unique_ptr<DataType> sample(new DataType);
76  DDS::ReturnCode_t status = dataReader->read_next_sample(*sample, *info);
77 
78  if (status == DDS::RETCODE_OK) {
79  // Check if we should process the sample.
80  if (this->federationId_.overridden() &&
81  this->federationId_.id() != sample->sender) {
82 
83  // Delegate processing to the federation manager.
84  this->receiver_.add(move(sample), move(info));
85  }
86 
87  } else if (status == DDS::RETCODE_NO_DATA) {
88  break;
89 
90  } else {
92  ACE_TEXT("(%P|%t) ERROR: UpdateListener::on_data_available: read status==%d\n"),
93  status));
94  break;
95  }
96  }
97 
98  } catch (const CORBA::Exception& ex) {
99  ex._tao_print_exception("(%P|%t) UpdateListener::read - ");
100  }
101 }
102 
103 template<class DataType, class ReaderType>
104 void
106  DDS::DataReader_ptr /* reader */,
107  const DDS::RequestedDeadlineMissedStatus & /* status */)
108 {
111  ACE_TEXT("(%P|%t) ")
112  ACE_TEXT("Federatorer::on_requested_deadline_missed\n")));
113  }
114 }
115 
116 template<class DataType, class ReaderType>
117 void
119  DDS::DataReader_ptr /* reader */,
120  const DDS::RequestedIncompatibleQosStatus & /* status */)
121 {
124  ACE_TEXT("(%P|%t) UpdateListener::")
125  ACE_TEXT("on_requested_incompatible_qos\n")));
126  }
127 }
128 
129 template<class DataType, class ReaderType>
130 void
132  DDS::DataReader_ptr /* reader */,
133  const DDS::LivelinessChangedStatus & /* status */)
134 {
137  ACE_TEXT("(%P|%t) UpdateListener::on_liveliness_changed\n")));
138  }
139 }
140 
141 template<class DataType, class ReaderType>
142 void
144  DDS::DataReader_ptr /* reader */,
145  const DDS::SubscriptionMatchedStatus & /* status */)
146 {
149  ACE_TEXT("(%P|%t) UpdateListener::on_subscription_matched\n")));
150  }
151 }
152 
153 template<class DataType, class ReaderType>
154 void
156  DDS::DataReader_ptr /* reader */,
157  const DDS::SampleRejectedStatus& /* status */)
158 {
161  ACE_TEXT("(%P|%t) UpdateListener::on_sample_rejected\n")));
162  }
163 }
164 
165 template<class DataType, class ReaderType>
166 void
168  DDS::DataReader_ptr /* reader */,
169  const DDS::SampleLostStatus& /* status */)
170 {
173  ACE_TEXT("(%P|%t) UpdateListener::on_sample_lost\n")));
174  }
175 }
176 
177 template<class DataType, class ReaderType>
178 void
180 {
181  this->receiver_.stop();
182 }
183 
184 template<class DataType, class ReaderType>
185 void
187 {
188  this->receiver_.wait();
189 }
190 
191 } // namespace Federator
192 } // namespace OpenDDS
193 
#define ACE_DEBUG(X)
#define ACE_ERROR(X)
UpdateListener(UpdateProcessor< DataType > &processor)
Interface for managing update publications.
T::rv_reference move(T &p)
Definition: unique_ptr.h:141
const RepoKey NIL_REPOSITORY
Definition: Federator.idl:36
LM_DEBUG
ACE_TEXT("TCP_Factory")
const ReturnCode_t RETCODE_NO_DATA
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
const ReturnCode_t RETCODE_OK
LM_ERROR
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
void _tao_print_exception(const char *info, FILE *f=stdout) const
Definition: Exception.cpp:82
Boolean is_nil(T x)