1 #ifndef OPENDDS_FACE_FACETSS_H 2 #define OPENDDS_FACE_FACETSS_H 6 #include "dds/DdsDcpsSubscriptionC.h" 33 DDS::DataWriter_var
dw;
40 sum_recvd_msgs_latency(0),
47 return FACE::NOT_AVAILABLE;
49 DDS::DataReader_var
dr;
56 template<
typename Msg>
61 virtual FACE::RETURN_CODE_TYPE messages_waiting(FACE::WAITING_RANGE_TYPE& num_waiting);
84 const DDS::DomainParticipant_var part,
86 FACE::RETURN_CODE_TYPE& return_code);
91 OpenDDS_FACE_Export FACE::RETURN_CODE_TYPE
update_status(FACE::CONNECTION_ID_TYPE connection_id,
94 template <
typename Msg>
105 template <
typename Msg>
110 template <
typename Msg>
113 const typename DataReader::_var_type typedReader =
114 DataReader::_narrow(
dr);
116 return FACE::INVALID_PARAM;
118 const DDS::ReadCondition_var rc =
126 FACE::WAITING_RANGE_TYPE valid_waiting = 0;
130 if (sinfo[i].valid_data) {
134 num_waiting = valid_waiting;
135 return FACE::RC_NO_ERROR;
138 return FACE::RC_NO_ERROR;
140 return FACE::NOT_AVAILABLE;
143 template <
typename Msg>
145 FACE::TIMEOUT_TYPE timeout,
146 FACE::TRANSACTION_ID_TYPE& transaction_id,
148 FACE::MESSAGE_SIZE_TYPE message_size,
149 FACE::RETURN_CODE_TYPE& return_code)
153 if (!readers.count(connection_id)) {
154 return_code = FACE::INVALID_PARAM;
158 return_code = FACE::INVALID_PARAM;
161 FACE::TRANSPORT_CONNECTION_STATUS_TYPE status =
163 if (message_size < status.MAX_MESSAGE_SIZE) {
164 return_code = FACE::INVALID_PARAM;
168 const typename DataReader::_var_type typedReader =
169 DataReader::_narrow(readers[connection_id]->
dr);
174 if (readers[connection_id]->
status_valid != FACE::VALID) {
181 const DDS::ReadCondition_var rc =
186 ws->attach_condition(rc);
191 ws->detach_condition(rc);
194 typedReader->delete_readcondition(rc);
201 ret = typedReader->take_w_condition(seq, sinfo, 1 , rc);
202 typedReader->delete_readcondition(rc);
204 DDS::Subscriber_var subscriber = typedReader->get_subscriber();
205 DDS::DomainParticipant_var participant = subscriber->get_participant();
206 FACE::RETURN_CODE_TYPE ret_code;
208 if (ret_code != FACE::RC_NO_ERROR) {
213 transaction_id = ++readers[connection_id]->last_msg_tid;
220 }
catch (
const CORBA::BAD_PARAM&) {
224 return_code = FACE::INVALID_PARAM;
228 template <
typename Msg>
230 FACE::TIMEOUT_TYPE timeout,
231 FACE::TRANSACTION_ID_TYPE& ,
233 FACE::MESSAGE_SIZE_TYPE message_size,
234 FACE::RETURN_CODE_TYPE& return_code)
237 return_code = FACE::INVALID_PARAM;
240 FACE::TRANSPORT_CONNECTION_STATUS_TYPE status =
242 if (message_size < status.MAX_MESSAGE_SIZE) {
243 return_code = FACE::INVALID_PARAM;
247 if (!writers.count(connection_id)) {
248 return_code = FACE::INVALID_PARAM;
253 const typename DataWriter::_var_type typedWriter =
254 DataWriter::_narrow(writers[connection_id].dw);
259 writers[connection_id].status_valid = FACE::VALID;
262 typedWriter->get_qos(dw_qos);
265 timeout != FACE::INF_TIME_VALUE &&
266 ((max_blocking_time == FACE::INF_TIME_VALUE) || (timeout < max_blocking_time))) {
274 template <
typename Msg>
277 typedef void (*Callback)(FACE::TRANSACTION_ID_TYPE, Msg&,
278 FACE::MESSAGE_TYPE_GUID,
279 FACE::MESSAGE_SIZE_TYPE,
280 const FACE::WAITSET_TYPE,
281 FACE::RETURN_CODE_TYPE&);
283 Listener(Callback callback, FACE::CONNECTION_ID_TYPE connection_id)
284 : connection_id_(connection_id)
286 callbacks_.push_back(callback);
291 callbacks_.push_back(callback);
315 const typename DataReader::_var_type typedReader =
316 DataReader::_narrow(reader);
322 FACE::MESSAGE_TYPE_GUID& msg_id =
Entities::instance()->connections_[connection_id_].platform_view_guid;
325 while (typedReader->take_next_sample(sample, sinfo) ==
DDS::RETCODE_OK) {
327 DDS::Subscriber_var subscriber = typedReader->get_subscriber();
328 DDS::DomainParticipant_var participant = subscriber->get_participant();
329 FACE::RETURN_CODE_TYPE ret_code;
331 if (ret_code != FACE::RC_NO_ERROR) {
338 FACE::RETURN_CODE_TYPE retcode;
341 ACE_DEBUG((
LM_DEBUG,
"Listener::on_data_available - invoking %d callbacks\n", callbacks_.size()));
343 for (
size_t i = 0; i < callbacks_.size(); ++i) {
344 retcode = FACE::RC_NO_ERROR;
345 callbacks_.at(i)(transaction_id , sample, msg_id,
sizeof(Msg), 0 , retcode);
346 if (retcode != FACE::RC_NO_ERROR) {
347 ACE_ERROR((
LM_ERROR,
"ERROR: Listener::on_data_available - callback %d returned retcode: %d\n", i, retcode));
361 template <
typename Msg>
363 const FACE::WAITSET_TYPE ,
364 void (*callback)(FACE::TRANSACTION_ID_TYPE, Msg&,
365 FACE::MESSAGE_TYPE_GUID,
366 FACE::MESSAGE_SIZE_TYPE,
367 const FACE::WAITSET_TYPE,
368 FACE::RETURN_CODE_TYPE&),
369 FACE::MESSAGE_SIZE_TYPE max_message_size,
370 FACE::RETURN_CODE_TYPE& return_code)
373 if (!readers.count(connection_id)) {
374 return_code = FACE::INVALID_PARAM;
378 return_code = FACE::INVALID_PARAM;
381 FACE::TRANSPORT_CONNECTION_STATUS_TYPE status =
383 if (max_message_size < status.MAX_MESSAGE_SIZE) {
384 return_code = FACE::INVALID_PARAM;
387 DDS::DataReaderListener_var existing_listener = readers[connection_id]->dr->get_listener();
388 if (existing_listener.in()) {
393 ACE_ERROR((
LM_ERROR,
"ERROR: register_callback - failed to obtain typed listener\n"));
394 return_code = FACE::INVALID_PARAM;
398 DDS::DataReaderListener_var listener =
new Listener<Msg>(callback, connection_id);
401 if (readers[connection_id]->
status_valid != FACE::VALID) {
408 return_code = FACE::RC_NO_ERROR;
void populate_header_received(const FACE::CONNECTION_ID_TYPE &connection_id, const DDS::DomainParticipant_var part, const DDS::SampleInfo &sinfo, FACE::RETURN_CODE_TYPE &return_code)
const InstanceHandle_t HANDLE_NIL
FACE::LongLong total_msgs_recvd
ReliabilityQosPolicy reliability
Duration_t max_blocking_time
void send_message(FACE::CONNECTION_ID_TYPE connection_id, FACE::TIMEOUT_TYPE timeout, FACE::TRANSACTION_ID_TYPE &, const Msg &message, FACE::MESSAGE_SIZE_TYPE message_size, FACE::RETURN_CODE_TYPE &return_code)
FACE::MESSAGE_INSTANCE_GUID create_message_instance_guid(const OpenDDS::DCPS::GUID_t &pub, const CORBA::LongLong &orig_seq)
ACE_CDR::LongLong LongLong
sequence< SampleInfo > SampleInfoSeq
const SampleStateMask ANY_SAMPLE_STATE
virtual FACE::RETURN_CODE_TYPE messages_waiting(FACE::WAITING_RANGE_TYPE &num_waiting)
ConnIdToReceiverMap receivers_
sequence< Condition > ConditionSeq
void register_callback(FACE::CONNECTION_ID_TYPE connection_id, const FACE::WAITSET_TYPE, void(*callback)(FACE::TRANSACTION_ID_TYPE, Msg &, FACE::MESSAGE_TYPE_GUID, FACE::MESSAGE_SIZE_TYPE, const FACE::WAITSET_TYPE, FACE::RETURN_CODE_TYPE &), FACE::MESSAGE_SIZE_TYPE max_message_size, FACE::RETURN_CODE_TYPE &return_code)
ACE_Guard< LockType > GuardType
void on_sample_rejected(DDS::DataReader_ptr, const DDS::SampleRejectedStatus &)
const FACE::CONNECTION_ID_TYPE connection_id_
FACE::SYSTEM_TIME_TYPE convertDuration(const DDS::Duration_t &duration)
local interface<%TYPE%> DataWriter
DCPS::DDSTraits< Msg >::DataReaderType DataReader
void receive_message(FACE::CONNECTION_ID_TYPE connection_id, FACE::TIMEOUT_TYPE timeout, FACE::TRANSACTION_ID_TYPE &transaction_id, Msg &message, FACE::MESSAGE_SIZE_TYPE message_size, FACE::RETURN_CODE_TYPE &return_code)
static OpenDDS_FACE_Export Entities * instance()
FACE::TRANSACTION_ID_TYPE last_msg_tid
ReliabilityQosPolicyKind kind
FACE::SYSTEM_TIME_TYPE convertTime(const DDS::Time_t ×tamp)
void on_requested_incompatible_qos(DDS::DataReader_ptr, const DDS::RequestedIncompatibleQosStatus &)
const ReturnCode_t RETCODE_TIMEOUT
const StatusKind DATA_AVAILABLE_STATUS
void on_requested_deadline_missed(DDS::DataReader_ptr, const DDS::RequestedDeadlineMissedStatus &)
void on_sample_lost(DDS::DataReader_ptr, const DDS::SampleLostStatus &)
void on_subscription_matched(DDS::DataReader_ptr, const DDS::SubscriptionMatchedStatus &)
const ViewStateMask ANY_VIEW_STATE
void on_data_available(DDS::DataReader_ptr reader)
typedef OPENDDS_MAP(FACE::CONNECTION_ID_TYPE, FaceSender) ConnIdToSenderMap
OPENDDS_STRING connection_name
FACE::SYSTEM_TIME_TYPE sum_recvd_msgs_latency
FACE::RETURN_CODE_TYPE update_status(FACE::CONNECTION_ID_TYPE connection_id, DDS::ReturnCode_t retcode)
ConnIdToSenderMap senders_
const ReturnCode_t RETCODE_NO_DATA
#define OPENDDS_VECTOR(T)
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
DDSTypedAdapter(FaceReceiver &rcvr)
FACE::TRANSPORT_CONNECTION_STATUS_TYPE connection_status
FACE::TS::MessageHeader last_msg_header
FACE::VALIDITY_TYPE status_valid
Listener(Callback callback, FACE::CONNECTION_ID_TYPE connection_id)
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
void on_liveliness_changed(DDS::DataReader_ptr, const DDS::LivelinessChangedStatus &)
const ReturnCode_t RETCODE_OK
const long LENGTH_UNLIMITED
virtual FACE::RETURN_CODE_TYPE messages_waiting(FACE::WAITING_RANGE_TYPE &)
The Internal API and Implementation of OpenDDS.
FACE::MESSAGE_TYPE_GUID platform_view_guid
const InstanceStateKind ALIVE_INSTANCE_STATE
DDS::Duration_t convertTimeout(FACE::TIMEOUT_TYPE timeout)
void add_callback(Callback callback)
const ReturnCode_t RETCODE_BAD_PARAMETER