OpenDDS  Snapshot(2023/04/28-20:55)
FaceTSS.h
Go to the documentation of this file.
1 #ifndef OPENDDS_FACE_FACETSS_H
2 #define OPENDDS_FACE_FACETSS_H
3 
4 #include "FACE/TS.hpp"
6 #include "dds/DdsDcpsSubscriptionC.h"
8 #include "dds/DCPS/WaitSet.h"
10 #include "ace/Singleton.h"
11 
13 
14 namespace OpenDDS {
15 namespace FaceTSS {
16 
17 class Entities {
19 
20  Entities();
21  ~Entities();
22 
23 public:
26  : status_valid(FACE::INVALID)
27  {}
28 
29  FACE::VALIDITY_TYPE status_valid;
30  };
31  struct FaceSender : public DDSAdapter {
32  FaceSender () {}
33  DDS::DataWriter_var dw;
34  };
35 
36  struct FaceReceiver : public DDSAdapter {
38  : last_msg_header(),
39  last_msg_tid(0),
40  sum_recvd_msgs_latency(0),
41  total_msgs_recvd(0)
42  {}
43 
44  virtual ~FaceReceiver() {}
45  virtual FACE::RETURN_CODE_TYPE messages_waiting(FACE::WAITING_RANGE_TYPE& /*num_waiting*/)
46  {
47  return FACE::NOT_AVAILABLE;
48  };
49  DDS::DataReader_var dr;
50  FACE::TS::MessageHeader last_msg_header;
51  FACE::TRANSACTION_ID_TYPE last_msg_tid;
52  FACE::SYSTEM_TIME_TYPE sum_recvd_msgs_latency;
53  FACE::LongLong total_msgs_recvd;
54  };
55 
56  template<typename Msg>
57  class DDSTypedAdapter : public FaceReceiver {
58  public:
60  ~DDSTypedAdapter();
61  virtual FACE::RETURN_CODE_TYPE messages_waiting(FACE::WAITING_RANGE_TYPE& num_waiting);
63  };
64 
65  typedef OPENDDS_MAP(FACE::CONNECTION_ID_TYPE, FaceSender) ConnIdToSenderMap;
66  typedef OPENDDS_MAP(FACE::CONNECTION_ID_TYPE, FaceReceiver*) ConnIdToReceiverMap;
67 
68  OpenDDS_FACE_Export static Entities* instance();
69  ConnIdToSenderMap senders_;
70  ConnIdToReceiverMap receivers_;
71 
72  struct ConnectionInfo {
74  FACE::TRANSPORT_CONNECTION_STATUS_TYPE connection_status;
75  FACE::MESSAGE_TYPE_GUID platform_view_guid;
76  };
77  OPENDDS_MAP(FACE::CONNECTION_ID_TYPE, ConnectionInfo ) connections_;
78 };
79 
80 OpenDDS_FACE_Export DDS::Duration_t convertTimeout(FACE::TIMEOUT_TYPE timeout);
81 OpenDDS_FACE_Export FACE::SYSTEM_TIME_TYPE convertDuration(const DDS::Duration_t& duration);
82 OpenDDS_FACE_Export FACE::SYSTEM_TIME_TYPE convertTime(const DDS::Time_t& timestamp);
83 OpenDDS_FACE_Export void populate_header_received(const FACE::CONNECTION_ID_TYPE& connection_id,
84  const DDS::DomainParticipant_var part,
85  const DDS::SampleInfo& sinfo,
86  FACE::RETURN_CODE_TYPE& return_code);
87 
88 OpenDDS_FACE_Export FACE::MESSAGE_INSTANCE_GUID create_message_instance_guid(const OpenDDS::DCPS::GUID_t& pub,
89  const CORBA::LongLong& seq);
90 
91 OpenDDS_FACE_Export FACE::RETURN_CODE_TYPE update_status(FACE::CONNECTION_ID_TYPE connection_id,
92  DDS::ReturnCode_t retcode);
93 
94 template <typename Msg>
96  : FaceReceiver()
97 {
98  dr = rcvr.dr;
100  last_msg_tid = rcvr.last_msg_tid;
103 }
104 
105 template <typename Msg>
107 {
108 }
109 
110 template <typename Msg>
111 FACE::RETURN_CODE_TYPE Entities::DDSTypedAdapter<Msg>::messages_waiting(FACE::WAITING_RANGE_TYPE& num_waiting)
112 {
113  const typename DataReader::_var_type typedReader =
114  DataReader::_narrow(dr);
115  if (!typedReader) {
116  return FACE::INVALID_PARAM;
117  }
118  const DDS::ReadCondition_var rc =
119  typedReader->create_readcondition(DDS::ANY_SAMPLE_STATE,
122 
123  DDS::ReturnCode_t ret;
125  DDS::SampleInfoSeq sinfo;
126  FACE::WAITING_RANGE_TYPE valid_waiting = 0;
127  ret = typedReader->read_w_condition(seq, sinfo, DDS::LENGTH_UNLIMITED, rc);
128  if (ret == DDS::RETCODE_OK) {
129  for (CORBA::ULong i = 0; i < seq.length(); ++i) {
130  if (sinfo[i].valid_data) {
131  ++valid_waiting;
132  }
133  }
134  num_waiting = valid_waiting;
135  return FACE::RC_NO_ERROR;
136  } else if (ret == DDS::RETCODE_NO_DATA) {
137  num_waiting = 0;
138  return FACE::RC_NO_ERROR;
139  }
140  return FACE::NOT_AVAILABLE;
141 }
142 
143 template <typename Msg>
144 void receive_message(/*in*/ FACE::CONNECTION_ID_TYPE connection_id,
145  /*in*/ FACE::TIMEOUT_TYPE timeout,
146  /*inout*/ FACE::TRANSACTION_ID_TYPE& transaction_id,
147  /*inout*/ Msg& message,
148  /*in*/ FACE::MESSAGE_SIZE_TYPE message_size,
149  /*out*/ FACE::RETURN_CODE_TYPE& return_code)
150 {
151  try {
152  Entities::ConnIdToReceiverMap& readers = Entities::instance()->receivers_;
153  if (!readers.count(connection_id)) {
154  return_code = FACE::INVALID_PARAM;
155  return;
156  }
157  if (!Entities::instance()->connections_.count(connection_id)) {
158  return_code = FACE::INVALID_PARAM;
159  return;
160  }
161  FACE::TRANSPORT_CONNECTION_STATUS_TYPE status =
162  Entities::instance()->connections_[connection_id].connection_status;
163  if (message_size < status.MAX_MESSAGE_SIZE) {
164  return_code = FACE::INVALID_PARAM;
165  return;
166  }
168  const typename DataReader::_var_type typedReader =
169  DataReader::_narrow(readers[connection_id]->dr);
170  if (!typedReader) {
171  return_code = update_status(connection_id, DDS::RETCODE_BAD_PARAMETER);
172  return;
173  }
174  if (readers[connection_id]->status_valid != FACE::VALID) {
175  Entities::FaceReceiver* tmp = readers[connection_id];
176  readers[connection_id] = new Entities::DDSTypedAdapter<Msg>(*readers[connection_id]);
177  delete tmp;
178  }
179  readers[connection_id]->status_valid = FACE::VALID;
180 
181  const DDS::ReadCondition_var rc =
182  typedReader->create_readcondition(DDS::ANY_SAMPLE_STATE,
185  const DDS::WaitSet_var ws = new DDS::WaitSet;
186  ws->attach_condition(rc);
187 
188  DDS::ConditionSeq active;
189  const DDS::Duration_t ddsTimeout = convertTimeout(timeout);
190  DDS::ReturnCode_t ret = ws->wait(active, ddsTimeout);
191  ws->detach_condition(rc);
192 
193  if (ret == DDS::RETCODE_TIMEOUT) {
194  typedReader->delete_readcondition(rc);
195  return_code = update_status(connection_id, ret);
196  return;
197  }
198 
200  DDS::SampleInfoSeq sinfo;
201  ret = typedReader->take_w_condition(seq, sinfo, 1 /*max*/, rc);
202  typedReader->delete_readcondition(rc);
203  if (ret == DDS::RETCODE_OK && sinfo[0].valid_data) {
204  DDS::Subscriber_var subscriber = typedReader->get_subscriber();
205  DDS::DomainParticipant_var participant = subscriber->get_participant();
206  FACE::RETURN_CODE_TYPE ret_code;
207  populate_header_received(connection_id, participant, sinfo[0], ret_code);
208  if (ret_code != FACE::RC_NO_ERROR) {
209  return_code = update_status(connection_id, ret_code);
210  return;
211  }
212 
213  transaction_id = ++readers[connection_id]->last_msg_tid;
214 
215  message = seq[0];
216  return_code = update_status(connection_id, ret);
217  return;
218  }
219  return_code = update_status(connection_id, DDS::RETCODE_NO_DATA);
220  } catch (const CORBA::BAD_PARAM&) {
222  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: receive_message - INVALID_PARAM\n"));
223  }
224  return_code = FACE::INVALID_PARAM;
225  }
226 }
227 
228 template <typename Msg>
229 void send_message(FACE::CONNECTION_ID_TYPE connection_id,
230  FACE::TIMEOUT_TYPE timeout,
231  FACE::TRANSACTION_ID_TYPE& /*transaction_id*/,
232  const Msg& message,
233  FACE::MESSAGE_SIZE_TYPE message_size,
234  FACE::RETURN_CODE_TYPE& return_code)
235 {
236  if(!Entities::instance()->connections_.count(connection_id)) {
237  return_code = FACE::INVALID_PARAM;
238  return;
239  }
240  FACE::TRANSPORT_CONNECTION_STATUS_TYPE status =
241  Entities::instance()->connections_[connection_id].connection_status;
242  if (message_size < status.MAX_MESSAGE_SIZE) {
243  return_code = FACE::INVALID_PARAM;
244  return;
245  }
246  Entities::ConnIdToSenderMap& writers = Entities::instance()->senders_;
247  if (!writers.count(connection_id)) {
248  return_code = FACE::INVALID_PARAM;
249  return;
250  }
251 
253  const typename DataWriter::_var_type typedWriter =
254  DataWriter::_narrow(writers[connection_id].dw);
255  if (!typedWriter) {
256  return_code = update_status(connection_id, DDS::RETCODE_BAD_PARAMETER);
257  return;
258  }
259  writers[connection_id].status_valid = FACE::VALID;
260 
261  DDS::DataWriterQos dw_qos;
262  typedWriter->get_qos(dw_qos);
263  FACE::SYSTEM_TIME_TYPE max_blocking_time = convertDuration(dw_qos.reliability.max_blocking_time);
265  timeout != FACE::INF_TIME_VALUE &&
266  ((max_blocking_time == FACE::INF_TIME_VALUE) || (timeout < max_blocking_time))) {
267  return_code = update_status(connection_id, DDS::RETCODE_BAD_PARAMETER);
268  return;
269  }
270 
271  return_code = update_status(connection_id, typedWriter->write(message, DDS::HANDLE_NIL));
272 }
273 
274 template <typename Msg>
275 class Listener : public virtual DCPS::LocalObject<DDS::DataReaderListener> {
276 public:
277  typedef void (*Callback)(FACE::TRANSACTION_ID_TYPE, Msg&,
278  FACE::MESSAGE_TYPE_GUID,
279  FACE::MESSAGE_SIZE_TYPE,
280  const FACE::WAITSET_TYPE,
281  FACE::RETURN_CODE_TYPE&);
282 
283  Listener(Callback callback, FACE::CONNECTION_ID_TYPE connection_id)
284  : connection_id_(connection_id)
285  {
286  callbacks_.push_back(callback);
287  }
288 
289  void add_callback(Callback callback) {
290  GuardType guard(callbacks_lock_);
291  callbacks_.push_back(callback);
292  }
293 
294 private:
295  void on_requested_deadline_missed(DDS::DataReader_ptr,
297 
298  void on_requested_incompatible_qos(DDS::DataReader_ptr,
300 
301  void on_sample_rejected(DDS::DataReader_ptr,
302  const DDS::SampleRejectedStatus&) {}
303 
304  void on_liveliness_changed(DDS::DataReader_ptr,
306 
307  void on_subscription_matched(DDS::DataReader_ptr,
309 
310  void on_sample_lost(DDS::DataReader_ptr, const DDS::SampleLostStatus&) {}
311 
312  void on_data_available(DDS::DataReader_ptr reader)
313  {
315  const typename DataReader::_var_type typedReader =
316  DataReader::_narrow(reader);
317  if (!typedReader) {
318  update_status(connection_id_, DDS::RETCODE_BAD_PARAMETER);
319  return;
320  }
321 
322  FACE::MESSAGE_TYPE_GUID& msg_id = Entities::instance()->connections_[connection_id_].platform_view_guid;
323  Msg sample;
324  DDS::SampleInfo sinfo;
325  while (typedReader->take_next_sample(sample, sinfo) == DDS::RETCODE_OK) {
326  if (sinfo.valid_data) {
327  DDS::Subscriber_var subscriber = typedReader->get_subscriber();
328  DDS::DomainParticipant_var participant = subscriber->get_participant();
329  FACE::RETURN_CODE_TYPE ret_code;
330  populate_header_received(connection_id_, participant, sinfo, ret_code);
331  if (ret_code != FACE::RC_NO_ERROR) {
332  update_status(connection_id_, ret_code);
333  return;
334  }
335 
336  FACE::TRANSACTION_ID_TYPE transaction_id = ++Entities::instance()->receivers_[connection_id_]->last_msg_tid;
337  update_status(connection_id_, DDS::RETCODE_OK);
338  FACE::RETURN_CODE_TYPE retcode;
339  GuardType guard(callbacks_lock_);
341  ACE_DEBUG((LM_DEBUG, "Listener::on_data_available - invoking %d callbacks\n", callbacks_.size()));
342  }
343  for (size_t i = 0; i < callbacks_.size(); ++i) {
344  retcode = FACE::RC_NO_ERROR;
345  callbacks_.at(i)(transaction_id /*Transaction_ID*/, sample, msg_id, sizeof(Msg), 0 /*WAITSET_TYPE*/, retcode);
346  if (retcode != FACE::RC_NO_ERROR) {
347  ACE_ERROR((LM_ERROR, "ERROR: Listener::on_data_available - callback %d returned retcode: %d\n", i, retcode));
348  }
349  }
350  }
351  }
352  }
353 
356  LockType callbacks_lock_;
357  OPENDDS_VECTOR(Callback) callbacks_;
358  const FACE::CONNECTION_ID_TYPE connection_id_;
359 };
360 
361 template <typename Msg>
362 void register_callback(FACE::CONNECTION_ID_TYPE connection_id,
363  const FACE::WAITSET_TYPE /*waitset*/,
364  void (*callback)(FACE::TRANSACTION_ID_TYPE, Msg&,
365  FACE::MESSAGE_TYPE_GUID,
366  FACE::MESSAGE_SIZE_TYPE,
367  const FACE::WAITSET_TYPE,
368  FACE::RETURN_CODE_TYPE&),
369  FACE::MESSAGE_SIZE_TYPE max_message_size,
370  FACE::RETURN_CODE_TYPE& return_code)
371 {
372  Entities::ConnIdToReceiverMap& readers = Entities::instance()->receivers_;
373  if (!readers.count(connection_id)) {
374  return_code = FACE::INVALID_PARAM;
375  return;
376  }
377  if(!Entities::instance()->connections_.count(connection_id)) {
378  return_code = FACE::INVALID_PARAM;
379  return;
380  }
381  FACE::TRANSPORT_CONNECTION_STATUS_TYPE status =
382  Entities::instance()->connections_[connection_id].connection_status;
383  if (max_message_size < status.MAX_MESSAGE_SIZE) {
384  return_code = FACE::INVALID_PARAM;
385  return;
386  }
387  DDS::DataReaderListener_var existing_listener = readers[connection_id]->dr->get_listener();
388  if (existing_listener.in()) {
389  Listener<Msg>* typedListener = dynamic_cast<Listener<Msg>*>(existing_listener.in());
390  if (typedListener) {
391  typedListener->add_callback(callback);
392  } else {
393  ACE_ERROR((LM_ERROR, "ERROR: register_callback - failed to obtain typed listener\n"));
394  return_code = FACE::INVALID_PARAM;
395  return;
396  }
397  } else {
398  DDS::DataReaderListener_var listener = new Listener<Msg>(callback, connection_id);
399  readers[connection_id]->dr->set_listener(listener, DDS::DATA_AVAILABLE_STATUS);
400  }
401  if (readers[connection_id]->status_valid != FACE::VALID) {
402  Entities::FaceReceiver* tmp = readers[connection_id];
403  readers[connection_id] = new Entities::DDSTypedAdapter<Msg>(*readers[connection_id]);
404  delete tmp;
405  }
406  readers[connection_id]->status_valid = FACE::VALID;
407 
408  return_code = FACE::RC_NO_ERROR;
409 }
410 
411 } }
412 
414 
415 #endif
#define ACE_DEBUG(X)
void populate_header_received(const FACE::CONNECTION_ID_TYPE &connection_id, const DDS::DomainParticipant_var part, const DDS::SampleInfo &sinfo, FACE::RETURN_CODE_TYPE &return_code)
Definition: FaceTSS.cpp:852
#define ACE_ERROR(X)
#define ACE_SYNCH_MUTEX
const InstanceHandle_t HANDLE_NIL
ReliabilityQosPolicy reliability
void send_message(FACE::CONNECTION_ID_TYPE connection_id, FACE::TIMEOUT_TYPE timeout, FACE::TRANSACTION_ID_TYPE &, const Msg &message, FACE::MESSAGE_SIZE_TYPE message_size, FACE::RETURN_CODE_TYPE &return_code)
Definition: FaceTSS.h:229
FACE::MESSAGE_INSTANCE_GUID create_message_instance_guid(const OpenDDS::DCPS::GUID_t &pub, const CORBA::LongLong &orig_seq)
Definition: FaceTSS.cpp:809
ACE_CDR::LongLong LongLong
sequence< SampleInfo > SampleInfoSeq
const SampleStateMask ANY_SAMPLE_STATE
virtual FACE::RETURN_CODE_TYPE messages_waiting(FACE::WAITING_RANGE_TYPE &num_waiting)
Definition: FaceTSS.h:111
ConnIdToReceiverMap receivers_
Definition: FaceTSS.h:70
sequence< Condition > ConditionSeq
void register_callback(FACE::CONNECTION_ID_TYPE connection_id, const FACE::WAITSET_TYPE, void(*callback)(FACE::TRANSACTION_ID_TYPE, Msg &, FACE::MESSAGE_TYPE_GUID, FACE::MESSAGE_SIZE_TYPE, const FACE::WAITSET_TYPE, FACE::RETURN_CODE_TYPE &), FACE::MESSAGE_SIZE_TYPE max_message_size, FACE::RETURN_CODE_TYPE &return_code)
Definition: FaceTSS.h:362
ACE_Guard< LockType > GuardType
Definition: FaceTSS.h:355
void on_sample_rejected(DDS::DataReader_ptr, const DDS::SampleRejectedStatus &)
Definition: FaceTSS.h:301
const FACE::CONNECTION_ID_TYPE connection_id_
Definition: FaceTSS.h:358
FACE::SYSTEM_TIME_TYPE convertDuration(const DDS::Duration_t &duration)
Definition: FaceTSS.cpp:792
local interface<%TYPE%> DataWriter
Definition: IDLTemplate.txt:18
#define OPENDDS_STRING
DCPS::DDSTraits< Msg >::DataReaderType DataReader
Definition: FaceTSS.h:62
void receive_message(FACE::CONNECTION_ID_TYPE connection_id, FACE::TIMEOUT_TYPE timeout, FACE::TRANSACTION_ID_TYPE &transaction_id, Msg &message, FACE::MESSAGE_SIZE_TYPE message_size, FACE::RETURN_CODE_TYPE &return_code)
Definition: FaceTSS.h:144
LM_DEBUG
static OpenDDS_FACE_Export Entities * instance()
Definition: FaceTSS.cpp:725
FACE::TRANSACTION_ID_TYPE last_msg_tid
Definition: FaceTSS.h:51
ReliabilityQosPolicyKind kind
ACE_CDR::ULong ULong
FACE::SYSTEM_TIME_TYPE convertTime(const DDS::Time_t &timestamp)
Definition: FaceTSS.cpp:802
void on_requested_incompatible_qos(DDS::DataReader_ptr, const DDS::RequestedIncompatibleQosStatus &)
Definition: FaceTSS.h:298
const ReturnCode_t RETCODE_TIMEOUT
const StatusKind DATA_AVAILABLE_STATUS
void on_requested_deadline_missed(DDS::DataReader_ptr, const DDS::RequestedDeadlineMissedStatus &)
Definition: FaceTSS.h:295
void on_sample_lost(DDS::DataReader_ptr, const DDS::SampleLostStatus &)
Definition: FaceTSS.h:310
void on_subscription_matched(DDS::DataReader_ptr, const DDS::SubscriptionMatchedStatus &)
Definition: FaceTSS.h:307
const ViewStateMask ANY_VIEW_STATE
void on_data_available(DDS::DataReader_ptr reader)
Definition: FaceTSS.h:312
typedef OPENDDS_MAP(FACE::CONNECTION_ID_TYPE, FaceSender) ConnIdToSenderMap
FACE::SYSTEM_TIME_TYPE sum_recvd_msgs_latency
Definition: FaceTSS.h:52
FACE::RETURN_CODE_TYPE update_status(FACE::CONNECTION_ID_TYPE connection_id, DDS::ReturnCode_t retcode)
Definition: FaceTSS.cpp:730
ConnIdToSenderMap senders_
Definition: FaceTSS.h:69
const ReturnCode_t RETCODE_NO_DATA
#define OPENDDS_VECTOR(T)
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
FACE::TRANSPORT_CONNECTION_STATUS_TYPE connection_status
Definition: FaceTSS.h:74
ACE_SYNCH_MUTEX LockType
Definition: FaceTSS.h:354
FACE::TS::MessageHeader last_msg_header
Definition: FaceTSS.h:50
FACE::VALIDITY_TYPE status_valid
Definition: FaceTSS.h:29
Listener(Callback callback, FACE::CONNECTION_ID_TYPE connection_id)
Definition: FaceTSS.h:283
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
void on_liveliness_changed(DDS::DataReader_ptr, const DDS::LivelinessChangedStatus &)
Definition: FaceTSS.h:304
const ReturnCode_t RETCODE_OK
const long LENGTH_UNLIMITED
virtual FACE::RETURN_CODE_TYPE messages_waiting(FACE::WAITING_RANGE_TYPE &)
Definition: FaceTSS.h:45
LM_ERROR
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
FACE::MESSAGE_TYPE_GUID platform_view_guid
Definition: FaceTSS.h:75
const InstanceStateKind ALIVE_INSTANCE_STATE
DDS::Duration_t convertTimeout(FACE::TIMEOUT_TYPE timeout)
Definition: FaceTSS.cpp:779
void add_callback(Callback callback)
Definition: FaceTSS.h:289
const ReturnCode_t RETCODE_BAD_PARAMETER
Definition: FaceTSS.cpp:24