00001 #ifndef OPENDDS_FACE_TSS_H
00002 #define OPENDDS_FACE_TSS_H
00003
00004 #include "FACE/TS.hpp"
00005 #include "dds/DCPS/PoolAllocator.h"
00006 #include "dds/DdsDcpsPublicationC.h"
00007 #include "dds/DdsDcpsSubscriptionC.h"
00008 #include "dds/DCPS/TypeSupportImpl.h"
00009 #include "dds/DCPS/WaitSet.h"
00010 #include "dds/DCPS/SafetyProfileStreams.h"
00011 #include "ace/Singleton.h"
00012
00013
00014 namespace OpenDDS {
00015 namespace FaceTSS {
00016
00017 class Entities {
00018 friend class ACE_Singleton<Entities, ACE_Thread_Mutex>;
00019
00020 Entities();
00021 ~Entities();
00022
00023 public:
00024 struct DDSAdapter : public OpenDDS::DCPS::PoolAllocationBase {
00025 DDSAdapter ()
00026 : status_valid(FACE::INVALID)
00027 {}
00028
00029 FACE::VALIDITY_TYPE status_valid;
00030 };
00031 struct FaceSender : public DDSAdapter {
00032 FaceSender () {}
00033 DDS::DataWriter_var dw;
00034 };
00035
00036 struct FaceReceiver : public DDSAdapter {
00037 FaceReceiver ()
00038 : last_msg_tid(0),
00039 sum_recvd_msgs_latency(0),
00040 total_msgs_recvd(0)
00041 {}
00042
00043 virtual ~FaceReceiver() {}
00044 virtual FACE::RETURN_CODE_TYPE messages_waiting(FACE::WAITING_RANGE_TYPE& )
00045 {
00046 return FACE::NOT_AVAILABLE;
00047 };
00048 DDS::DataReader_var dr;
00049 FACE::TS::MessageHeader last_msg_header;
00050 FACE::TRANSACTION_ID_TYPE last_msg_tid;
00051 FACE::SYSTEM_TIME_TYPE sum_recvd_msgs_latency;
00052 FACE::LongLong total_msgs_recvd;
00053 };
00054
00055 template<typename Msg>
00056 class DDSTypedAdapter : public FaceReceiver {
00057 public:
00058 DDSTypedAdapter(FaceReceiver& rcvr);
00059 ~DDSTypedAdapter();
00060 virtual FACE::RETURN_CODE_TYPE messages_waiting(FACE::WAITING_RANGE_TYPE& num_waiting);
00061 typedef typename DCPS::DDSTraits<Msg>::DataReaderType DataReader;
00062 };
00063
00064 typedef OPENDDS_MAP(FACE::CONNECTION_ID_TYPE, FaceSender) ConnIdToSenderMap;
00065 typedef OPENDDS_MAP(FACE::CONNECTION_ID_TYPE, FaceReceiver*) ConnIdToReceiverMap;
00066
00067 OpenDDS_FACE_Export static Entities* instance();
00068 ConnIdToSenderMap senders_;
00069 ConnIdToReceiverMap receivers_;
00070
00071 struct ConnectionInfo {
00072 OPENDDS_STRING connection_name;
00073 FACE::TRANSPORT_CONNECTION_STATUS_TYPE connection_status;
00074 FACE::MESSAGE_TYPE_GUID platform_view_guid;
00075 };
00076 OPENDDS_MAP(FACE::CONNECTION_ID_TYPE, ConnectionInfo ) connections_;
00077 };
00078
00079 OpenDDS_FACE_Export DDS::Duration_t convertTimeout(FACE::TIMEOUT_TYPE timeout);
00080 OpenDDS_FACE_Export FACE::SYSTEM_TIME_TYPE convertDuration(const DDS::Duration_t& duration);
00081 OpenDDS_FACE_Export FACE::SYSTEM_TIME_TYPE convertTime(const DDS::Time_t& timestamp);
00082 OpenDDS_FACE_Export void populate_header_received(const FACE::CONNECTION_ID_TYPE& connection_id,
00083 const DDS::DomainParticipant_var part,
00084 const DDS::SampleInfo& sinfo,
00085 FACE::RETURN_CODE_TYPE& return_code);
00086
00087 OpenDDS_FACE_Export FACE::MESSAGE_INSTANCE_GUID create_message_instance_guid(const OpenDDS::DCPS::RepoId& pub,
00088 const CORBA::LongLong& seq);
00089
00090 OpenDDS_FACE_Export FACE::RETURN_CODE_TYPE update_status(FACE::CONNECTION_ID_TYPE connection_id,
00091 DDS::ReturnCode_t retcode);
00092
00093 template <typename Msg>
00094 Entities::DDSTypedAdapter<Msg>::DDSTypedAdapter(FaceReceiver& rcvr)
00095 : FaceReceiver()
00096 {
00097 dr = rcvr.dr;
00098 last_msg_header = rcvr.last_msg_header;
00099 last_msg_tid = rcvr.last_msg_tid;
00100 sum_recvd_msgs_latency = rcvr.sum_recvd_msgs_latency;
00101 total_msgs_recvd = rcvr.total_msgs_recvd;
00102 }
00103
00104 template <typename Msg>
00105 Entities::DDSTypedAdapter<Msg>::~DDSTypedAdapter()
00106 {
00107 }
00108
00109 template <typename Msg>
00110 FACE::RETURN_CODE_TYPE Entities::DDSTypedAdapter<Msg>::messages_waiting(FACE::WAITING_RANGE_TYPE& num_waiting)
00111 {
00112 const typename DataReader::_var_type typedReader =
00113 DataReader::_narrow(dr);
00114 if (!typedReader) {
00115 return FACE::INVALID_PARAM;
00116 }
00117 const DDS::ReadCondition_var rc =
00118 typedReader->create_readcondition(DDS::ANY_SAMPLE_STATE,
00119 DDS::ANY_VIEW_STATE,
00120 DDS::ALIVE_INSTANCE_STATE);
00121
00122 DDS::ReturnCode_t ret;
00123 typename DCPS::DDSTraits<Msg>::MessageSequenceType seq;
00124 DDS::SampleInfoSeq sinfo;
00125 FACE::WAITING_RANGE_TYPE valid_waiting = 0;
00126 ret = typedReader->read_w_condition(seq, sinfo, DDS::LENGTH_UNLIMITED, rc);
00127 if (ret == DDS::RETCODE_OK) {
00128 for (CORBA::ULong i = 0; i < seq.length(); ++i) {
00129 if (sinfo[i].valid_data) {
00130 ++valid_waiting;
00131 }
00132 }
00133 num_waiting = valid_waiting;
00134 return FACE::RC_NO_ERROR;
00135 } else if (ret == DDS::RETCODE_NO_DATA) {
00136 num_waiting = 0;
00137 return FACE::RC_NO_ERROR;
00138 }
00139 return FACE::NOT_AVAILABLE;
00140 }
00141
00142 template <typename Msg>
00143 void receive_message( FACE::CONNECTION_ID_TYPE connection_id,
00144 FACE::TIMEOUT_TYPE timeout,
00145 FACE::TRANSACTION_ID_TYPE& transaction_id,
00146 Msg& message,
00147 FACE::MESSAGE_SIZE_TYPE message_size,
00148 FACE::RETURN_CODE_TYPE& return_code)
00149 {
00150 Entities::ConnIdToReceiverMap& readers = Entities::instance()->receivers_;
00151 if (!readers.count(connection_id)) {
00152 return_code = FACE::INVALID_PARAM;
00153 return;
00154 }
00155 if(!Entities::instance()->connections_.count(connection_id)) {
00156 return_code = FACE::INVALID_PARAM;
00157 return;
00158 }
00159 FACE::TRANSPORT_CONNECTION_STATUS_TYPE status =
00160 Entities::instance()->connections_[connection_id].connection_status;
00161 if (message_size < status.MAX_MESSAGE_SIZE) {
00162 return_code = FACE::INVALID_PARAM;
00163 return;
00164 }
00165 typedef typename DCPS::DDSTraits<Msg>::DataReaderType DataReader;
00166 const typename DataReader::_var_type typedReader =
00167 DataReader::_narrow(readers[connection_id]->dr);
00168 if (!typedReader) {
00169 return_code = update_status(connection_id, DDS::RETCODE_BAD_PARAMETER);
00170 return;
00171 }
00172 if (readers[connection_id]->status_valid != FACE::VALID) {
00173 Entities::FaceReceiver* tmp = readers[connection_id];
00174 readers[connection_id] = new Entities::DDSTypedAdapter<Msg>(*readers[connection_id]);
00175 delete tmp;
00176 }
00177 readers[connection_id]->status_valid = FACE::VALID;
00178
00179 const DDS::ReadCondition_var rc =
00180 typedReader->create_readcondition(DDS::ANY_SAMPLE_STATE,
00181 DDS::ANY_VIEW_STATE,
00182 DDS::ALIVE_INSTANCE_STATE);
00183 const DDS::WaitSet_var ws = new DDS::WaitSet;
00184 ws->attach_condition(rc);
00185
00186 DDS::ConditionSeq active;
00187 const DDS::Duration_t ddsTimeout = convertTimeout(timeout);
00188 DDS::ReturnCode_t ret = ws->wait(active, ddsTimeout);
00189 ws->detach_condition(rc);
00190
00191 if (ret == DDS::RETCODE_TIMEOUT) {
00192 return_code = update_status(connection_id, ret);
00193 return;
00194 }
00195
00196 typename DCPS::DDSTraits<Msg>::MessageSequenceType seq;
00197 DDS::SampleInfoSeq sinfo;
00198 ret = typedReader->take_w_condition(seq, sinfo, 1 , rc);
00199 if (ret == DDS::RETCODE_OK && sinfo[0].valid_data) {
00200 DDS::DomainParticipant_var participant = typedReader->get_subscriber()->get_participant();
00201 FACE::RETURN_CODE_TYPE ret_code;
00202 populate_header_received(connection_id, participant, sinfo[0], ret_code);
00203 if (ret_code != FACE::RC_NO_ERROR) {
00204 return_code = update_status(connection_id, ret_code);
00205 return;
00206 }
00207
00208 transaction_id = ++readers[connection_id]->last_msg_tid;
00209
00210 message = seq[0];
00211 return_code = update_status(connection_id, ret);
00212 return;
00213 }
00214 return_code = update_status(connection_id, DDS::RETCODE_NO_DATA);
00215 }
00216
00217 template <typename Msg>
00218 void send_message(FACE::CONNECTION_ID_TYPE connection_id,
00219 FACE::TIMEOUT_TYPE timeout,
00220 FACE::TRANSACTION_ID_TYPE& ,
00221 const Msg& message,
00222 FACE::MESSAGE_SIZE_TYPE message_size,
00223 FACE::RETURN_CODE_TYPE& return_code)
00224 {
00225 if(!Entities::instance()->connections_.count(connection_id)) {
00226 return_code = FACE::INVALID_PARAM;
00227 return;
00228 }
00229 FACE::TRANSPORT_CONNECTION_STATUS_TYPE status =
00230 Entities::instance()->connections_[connection_id].connection_status;
00231 if (message_size < status.MAX_MESSAGE_SIZE) {
00232 return_code = FACE::INVALID_PARAM;
00233 return;
00234 }
00235 Entities::ConnIdToSenderMap& writers = Entities::instance()->senders_;
00236 if (!writers.count(connection_id)) {
00237 return_code = FACE::INVALID_PARAM;
00238 return;
00239 }
00240
00241 typedef typename DCPS::DDSTraits<Msg>::DataWriterType DataWriter;
00242 const typename DataWriter::_var_type typedWriter =
00243 DataWriter::_narrow(writers[connection_id].dw);
00244 if (!typedWriter) {
00245 return_code = update_status(connection_id, DDS::RETCODE_BAD_PARAMETER);
00246 return;
00247 }
00248 writers[connection_id].status_valid = FACE::VALID;
00249
00250 DDS::DataWriterQos dw_qos;
00251 typedWriter->get_qos(dw_qos);
00252 FACE::SYSTEM_TIME_TYPE max_blocking_time = convertDuration(dw_qos.reliability.max_blocking_time);
00253 if (dw_qos.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS &&
00254 timeout != FACE::INF_TIME_VALUE &&
00255 ((max_blocking_time == FACE::INF_TIME_VALUE) || (timeout < max_blocking_time))) {
00256 return_code = update_status(connection_id, DDS::RETCODE_BAD_PARAMETER);
00257 return;
00258 }
00259
00260 return_code = update_status(connection_id, typedWriter->write(message, DDS::HANDLE_NIL));
00261 }
00262
00263 template <typename Msg>
00264 class Listener : public DCPS::LocalObject<DDS::DataReaderListener> {
00265 public:
00266 typedef void (*Callback)(FACE::TRANSACTION_ID_TYPE, Msg&,
00267 FACE::MESSAGE_TYPE_GUID,
00268 FACE::MESSAGE_SIZE_TYPE,
00269 const FACE::WAITSET_TYPE,
00270 FACE::RETURN_CODE_TYPE&);
00271
00272 Listener(Callback callback, FACE::CONNECTION_ID_TYPE connection_id)
00273 : connection_id_(connection_id)
00274 {
00275 callbacks_.push_back(callback);
00276 }
00277
00278 void add_callback(Callback callback) {
00279 GuardType guard(callbacks_lock_);
00280 callbacks_.push_back(callback);
00281 }
00282
00283 private:
00284 void on_requested_deadline_missed(DDS::DataReader_ptr,
00285 const DDS::RequestedDeadlineMissedStatus&) {}
00286
00287 void on_requested_incompatible_qos(DDS::DataReader_ptr,
00288 const DDS::RequestedIncompatibleQosStatus&) {}
00289
00290 void on_sample_rejected(DDS::DataReader_ptr,
00291 const DDS::SampleRejectedStatus&) {}
00292
00293 void on_liveliness_changed(DDS::DataReader_ptr,
00294 const DDS::LivelinessChangedStatus&) {}
00295
00296 void on_subscription_matched(DDS::DataReader_ptr,
00297 const DDS::SubscriptionMatchedStatus&) {}
00298
00299 void on_sample_lost(DDS::DataReader_ptr, const DDS::SampleLostStatus&) {}
00300
00301 void on_data_available(DDS::DataReader_ptr reader)
00302 {
00303 typedef typename DCPS::DDSTraits<Msg>::DataReaderType DataReader;
00304 const typename DataReader::_var_type typedReader =
00305 DataReader::_narrow(reader);
00306 if (!typedReader) {
00307 update_status(connection_id_, DDS::RETCODE_BAD_PARAMETER);
00308 return;
00309 }
00310
00311 FACE::MESSAGE_TYPE_GUID& msg_id = Entities::instance()->connections_[connection_id_].platform_view_guid;
00312 Msg sample;
00313 DDS::SampleInfo sinfo;
00314 while (typedReader->take_next_sample(sample, sinfo) == DDS::RETCODE_OK) {
00315 if (sinfo.valid_data) {
00316 DDS::DomainParticipant_var participant = typedReader->get_subscriber()->get_participant();
00317 FACE::RETURN_CODE_TYPE ret_code;
00318 populate_header_received(connection_id_, participant, sinfo, ret_code);
00319 if (ret_code != FACE::RC_NO_ERROR) {
00320 update_status(connection_id_, ret_code);
00321 return;
00322 }
00323
00324 FACE::TRANSACTION_ID_TYPE transaction_id = ++Entities::instance()->receivers_[connection_id_]->last_msg_tid;
00325 update_status(connection_id_, DDS::RETCODE_OK);
00326 FACE::RETURN_CODE_TYPE retcode;
00327 GuardType guard(callbacks_lock_);
00328 if (OpenDDS::DCPS::DCPS_debug_level > 3) {
00329 ACE_DEBUG((LM_DEBUG, "Listener::on_data_available - invoking %d callbacks\n", callbacks_.size()));
00330 }
00331 for (size_t i = 0; i < callbacks_.size(); ++i) {
00332 retcode = FACE::RC_NO_ERROR;
00333 callbacks_.at(i)(transaction_id , sample, msg_id, sizeof(Msg), 0 , retcode);
00334 if (retcode != FACE::RC_NO_ERROR) {
00335 ACE_ERROR((LM_ERROR, "ERROR: Listener::on_data_available - callback %d returned retcode: %d\n", i, retcode));
00336 }
00337 }
00338 }
00339 }
00340 }
00341
00342 typedef ACE_SYNCH_MUTEX LockType;
00343 typedef ACE_Guard<LockType> GuardType;
00344 LockType callbacks_lock_;
00345 OPENDDS_VECTOR(Callback) callbacks_;
00346 const FACE::CONNECTION_ID_TYPE connection_id_;
00347 };
00348
00349 template <typename Msg>
00350 void register_callback(FACE::CONNECTION_ID_TYPE connection_id,
00351 const FACE::WAITSET_TYPE ,
00352 void (*callback)(FACE::TRANSACTION_ID_TYPE, Msg&,
00353 FACE::MESSAGE_TYPE_GUID,
00354 FACE::MESSAGE_SIZE_TYPE,
00355 const FACE::WAITSET_TYPE,
00356 FACE::RETURN_CODE_TYPE&),
00357 FACE::MESSAGE_SIZE_TYPE max_message_size,
00358 FACE::RETURN_CODE_TYPE& return_code)
00359 {
00360 Entities::ConnIdToReceiverMap& readers = Entities::instance()->receivers_;
00361 if (!readers.count(connection_id)) {
00362 return_code = FACE::INVALID_PARAM;
00363 return;
00364 }
00365 if(!Entities::instance()->connections_.count(connection_id)) {
00366 return_code = FACE::INVALID_PARAM;
00367 return;
00368 }
00369 FACE::TRANSPORT_CONNECTION_STATUS_TYPE status =
00370 Entities::instance()->connections_[connection_id].connection_status;
00371 if (max_message_size < status.MAX_MESSAGE_SIZE) {
00372 return_code = FACE::INVALID_PARAM;
00373 return;
00374 }
00375 DDS::DataReaderListener_ptr existing_listener = readers[connection_id]->dr->get_listener();
00376 if (existing_listener) {
00377 Listener<Msg>* typedListener = dynamic_cast<Listener<Msg>*>(existing_listener);
00378 typedListener->add_callback(callback);
00379 } else {
00380 DDS::DataReaderListener_var listener = new Listener<Msg>(callback, connection_id);
00381 readers[connection_id]->dr->set_listener(listener, DDS::DATA_AVAILABLE_STATUS);
00382 }
00383 if (readers[connection_id]->status_valid != FACE::VALID) {
00384 Entities::FaceReceiver* tmp = readers[connection_id];
00385 readers[connection_id] = new Entities::DDSTypedAdapter<Msg>(*readers[connection_id]);
00386 delete tmp;
00387 }
00388 readers[connection_id]->status_valid = FACE::VALID;
00389
00390 return_code = FACE::RC_NO_ERROR;
00391 }
00392
00393 } }
00394
00395 #endif