00001 #ifndef OPENDDS_FACE_TSS_H
00002 #define OPENDDS_FACE_TSS_H
00003
00004 #include "FACE/TS.hpp"
00005 #include "dds/DCPS/PoolAllocator.h"
00006 #include "dds/DdsDcpsSubscriptionC.h"
00007 #include "dds/DCPS/TypeSupportImpl.h"
00008 #include "dds/DCPS/WaitSet.h"
00009 #include "dds/DCPS/SafetyProfileStreams.h"
00010 #include "dds/DCPS/RepoIdTypes.h"
00011 #include "ace/Singleton.h"
00012
00013 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00014
00015 namespace OpenDDS {
00016 namespace FaceTSS {
00017
00018 class Entities {
00019 friend class ACE_Singleton<Entities, ACE_Thread_Mutex>;
00020
00021 Entities();
00022 ~Entities();
00023
00024 public:
00025 struct DDSAdapter : public OpenDDS::DCPS::PoolAllocationBase {
00026 DDSAdapter ()
00027 : status_valid(FACE::INVALID)
00028 {}
00029
00030 FACE::VALIDITY_TYPE status_valid;
00031 };
00032 struct FaceSender : public DDSAdapter {
00033 FaceSender () {}
00034 DDS::DataWriter_var dw;
00035 };
00036
00037 struct FaceReceiver : public DDSAdapter {
00038 FaceReceiver ()
00039 : last_msg_header(),
00040 last_msg_tid(0),
00041 sum_recvd_msgs_latency(0),
00042 total_msgs_recvd(0)
00043 {}
00044
00045 virtual ~FaceReceiver() {}
00046 virtual FACE::RETURN_CODE_TYPE messages_waiting(FACE::WAITING_RANGE_TYPE& )
00047 {
00048 return FACE::NOT_AVAILABLE;
00049 };
00050 DDS::DataReader_var dr;
00051 FACE::TS::MessageHeader last_msg_header;
00052 FACE::TRANSACTION_ID_TYPE last_msg_tid;
00053 FACE::SYSTEM_TIME_TYPE sum_recvd_msgs_latency;
00054 FACE::LongLong total_msgs_recvd;
00055 };
00056
00057 template<typename Msg>
00058 class DDSTypedAdapter : public FaceReceiver {
00059 public:
00060 DDSTypedAdapter(FaceReceiver& rcvr);
00061 ~DDSTypedAdapter();
00062 virtual FACE::RETURN_CODE_TYPE messages_waiting(FACE::WAITING_RANGE_TYPE& num_waiting);
00063 typedef typename DCPS::DDSTraits<Msg>::DataReaderType DataReader;
00064 };
00065
00066 typedef OPENDDS_MAP(FACE::CONNECTION_ID_TYPE, FaceSender) ConnIdToSenderMap;
00067 typedef OPENDDS_MAP(FACE::CONNECTION_ID_TYPE, FaceReceiver*) ConnIdToReceiverMap;
00068
00069 OpenDDS_FACE_Export static Entities* instance();
00070 ConnIdToSenderMap senders_;
00071 ConnIdToReceiverMap receivers_;
00072
00073 struct ConnectionInfo {
00074 OPENDDS_STRING connection_name;
00075 FACE::TRANSPORT_CONNECTION_STATUS_TYPE connection_status;
00076 FACE::MESSAGE_TYPE_GUID platform_view_guid;
00077 };
00078 OPENDDS_MAP(FACE::CONNECTION_ID_TYPE, ConnectionInfo ) connections_;
00079 };
00080
00081 OpenDDS_FACE_Export DDS::Duration_t convertTimeout(FACE::TIMEOUT_TYPE timeout);
00082 OpenDDS_FACE_Export FACE::SYSTEM_TIME_TYPE convertDuration(const DDS::Duration_t& duration);
00083 OpenDDS_FACE_Export FACE::SYSTEM_TIME_TYPE convertTime(const DDS::Time_t& timestamp);
00084 OpenDDS_FACE_Export void populate_header_received(const FACE::CONNECTION_ID_TYPE& connection_id,
00085 const DDS::DomainParticipant_var part,
00086 const DDS::SampleInfo& sinfo,
00087 FACE::RETURN_CODE_TYPE& return_code);
00088
00089 OpenDDS_FACE_Export FACE::MESSAGE_INSTANCE_GUID create_message_instance_guid(const OpenDDS::DCPS::RepoId& pub,
00090 const CORBA::LongLong& seq);
00091
00092 OpenDDS_FACE_Export FACE::RETURN_CODE_TYPE update_status(FACE::CONNECTION_ID_TYPE connection_id,
00093 DDS::ReturnCode_t retcode);
00094
00095 template <typename Msg>
00096 Entities::DDSTypedAdapter<Msg>::DDSTypedAdapter(FaceReceiver& rcvr)
00097 : FaceReceiver()
00098 {
00099 dr = rcvr.dr;
00100 last_msg_header = rcvr.last_msg_header;
00101 last_msg_tid = rcvr.last_msg_tid;
00102 sum_recvd_msgs_latency = rcvr.sum_recvd_msgs_latency;
00103 total_msgs_recvd = rcvr.total_msgs_recvd;
00104 }
00105
00106 template <typename Msg>
00107 Entities::DDSTypedAdapter<Msg>::~DDSTypedAdapter()
00108 {
00109 }
00110
00111 template <typename Msg>
00112 FACE::RETURN_CODE_TYPE Entities::DDSTypedAdapter<Msg>::messages_waiting(FACE::WAITING_RANGE_TYPE& num_waiting)
00113 {
00114 const typename DataReader::_var_type typedReader =
00115 DataReader::_narrow(dr);
00116 if (!typedReader) {
00117 return FACE::INVALID_PARAM;
00118 }
00119 const DDS::ReadCondition_var rc =
00120 typedReader->create_readcondition(DDS::ANY_SAMPLE_STATE,
00121 DDS::ANY_VIEW_STATE,
00122 DDS::ALIVE_INSTANCE_STATE);
00123
00124 DDS::ReturnCode_t ret;
00125 typename DCPS::DDSTraits<Msg>::MessageSequenceType seq;
00126 DDS::SampleInfoSeq sinfo;
00127 FACE::WAITING_RANGE_TYPE valid_waiting = 0;
00128 ret = typedReader->read_w_condition(seq, sinfo, DDS::LENGTH_UNLIMITED, rc);
00129 if (ret == DDS::RETCODE_OK) {
00130 for (CORBA::ULong i = 0; i < seq.length(); ++i) {
00131 if (sinfo[i].valid_data) {
00132 ++valid_waiting;
00133 }
00134 }
00135 num_waiting = valid_waiting;
00136 return FACE::RC_NO_ERROR;
00137 } else if (ret == DDS::RETCODE_NO_DATA) {
00138 num_waiting = 0;
00139 return FACE::RC_NO_ERROR;
00140 }
00141 return FACE::NOT_AVAILABLE;
00142 }
00143
00144 template <typename Msg>
00145 void receive_message( FACE::CONNECTION_ID_TYPE connection_id,
00146 FACE::TIMEOUT_TYPE timeout,
00147 FACE::TRANSACTION_ID_TYPE& transaction_id,
00148 Msg& message,
00149 FACE::MESSAGE_SIZE_TYPE message_size,
00150 FACE::RETURN_CODE_TYPE& return_code)
00151 {
00152 try {
00153 Entities::ConnIdToReceiverMap& readers = Entities::instance()->receivers_;
00154 if (!readers.count(connection_id)) {
00155 return_code = FACE::INVALID_PARAM;
00156 return;
00157 }
00158 if (!Entities::instance()->connections_.count(connection_id)) {
00159 return_code = FACE::INVALID_PARAM;
00160 return;
00161 }
00162 FACE::TRANSPORT_CONNECTION_STATUS_TYPE status =
00163 Entities::instance()->connections_[connection_id].connection_status;
00164 if (message_size < status.MAX_MESSAGE_SIZE) {
00165 return_code = FACE::INVALID_PARAM;
00166 return;
00167 }
00168 typedef typename DCPS::DDSTraits<Msg>::DataReaderType DataReader;
00169 const typename DataReader::_var_type typedReader =
00170 DataReader::_narrow(readers[connection_id]->dr);
00171 if (!typedReader) {
00172 return_code = update_status(connection_id, DDS::RETCODE_BAD_PARAMETER);
00173 return;
00174 }
00175 if (readers[connection_id]->status_valid != FACE::VALID) {
00176 Entities::FaceReceiver* tmp = readers[connection_id];
00177 readers[connection_id] = new Entities::DDSTypedAdapter<Msg>(*readers[connection_id]);
00178 delete tmp;
00179 }
00180 readers[connection_id]->status_valid = FACE::VALID;
00181
00182 const DDS::ReadCondition_var rc =
00183 typedReader->create_readcondition(DDS::ANY_SAMPLE_STATE,
00184 DDS::ANY_VIEW_STATE,
00185 DDS::ALIVE_INSTANCE_STATE);
00186 const DDS::WaitSet_var ws = new DDS::WaitSet;
00187 ws->attach_condition(rc);
00188
00189 DDS::ConditionSeq active;
00190 const DDS::Duration_t ddsTimeout = convertTimeout(timeout);
00191 DDS::ReturnCode_t ret = ws->wait(active, ddsTimeout);
00192 ws->detach_condition(rc);
00193
00194 if (ret == DDS::RETCODE_TIMEOUT) {
00195 typedReader->delete_readcondition(rc);
00196 return_code = update_status(connection_id, ret);
00197 return;
00198 }
00199
00200 typename DCPS::DDSTraits<Msg>::MessageSequenceType seq;
00201 DDS::SampleInfoSeq sinfo;
00202 ret = typedReader->take_w_condition(seq, sinfo, 1 , rc);
00203 typedReader->delete_readcondition(rc);
00204 if (ret == DDS::RETCODE_OK && sinfo[0].valid_data) {
00205 DDS::Subscriber_var subscriber = typedReader->get_subscriber();
00206 DDS::DomainParticipant_var participant = subscriber->get_participant();
00207 FACE::RETURN_CODE_TYPE ret_code;
00208 populate_header_received(connection_id, participant, sinfo[0], ret_code);
00209 if (ret_code != FACE::RC_NO_ERROR) {
00210 return_code = update_status(connection_id, ret_code);
00211 return;
00212 }
00213
00214 transaction_id = ++readers[connection_id]->last_msg_tid;
00215
00216 message = seq[0];
00217 return_code = update_status(connection_id, ret);
00218 return;
00219 }
00220 return_code = update_status(connection_id, DDS::RETCODE_NO_DATA);
00221 } catch (const CORBA::BAD_PARAM&) {
00222 if (OpenDDS::DCPS::DCPS_debug_level) {
00223 ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: receive_message - INVALID_PARAM\n"));
00224 }
00225 return_code = FACE::INVALID_PARAM;
00226 }
00227 }
00228
00229 template <typename Msg>
00230 void send_message(FACE::CONNECTION_ID_TYPE connection_id,
00231 FACE::TIMEOUT_TYPE timeout,
00232 FACE::TRANSACTION_ID_TYPE& ,
00233 const Msg& message,
00234 FACE::MESSAGE_SIZE_TYPE message_size,
00235 FACE::RETURN_CODE_TYPE& return_code)
00236 {
00237 if(!Entities::instance()->connections_.count(connection_id)) {
00238 return_code = FACE::INVALID_PARAM;
00239 return;
00240 }
00241 FACE::TRANSPORT_CONNECTION_STATUS_TYPE status =
00242 Entities::instance()->connections_[connection_id].connection_status;
00243 if (message_size < status.MAX_MESSAGE_SIZE) {
00244 return_code = FACE::INVALID_PARAM;
00245 return;
00246 }
00247 Entities::ConnIdToSenderMap& writers = Entities::instance()->senders_;
00248 if (!writers.count(connection_id)) {
00249 return_code = FACE::INVALID_PARAM;
00250 return;
00251 }
00252
00253 typedef typename DCPS::DDSTraits<Msg>::DataWriterType DataWriter;
00254 const typename DataWriter::_var_type typedWriter =
00255 DataWriter::_narrow(writers[connection_id].dw);
00256 if (!typedWriter) {
00257 return_code = update_status(connection_id, DDS::RETCODE_BAD_PARAMETER);
00258 return;
00259 }
00260 writers[connection_id].status_valid = FACE::VALID;
00261
00262 DDS::DataWriterQos dw_qos;
00263 typedWriter->get_qos(dw_qos);
00264 FACE::SYSTEM_TIME_TYPE max_blocking_time = convertDuration(dw_qos.reliability.max_blocking_time);
00265 if (dw_qos.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS &&
00266 timeout != FACE::INF_TIME_VALUE &&
00267 ((max_blocking_time == FACE::INF_TIME_VALUE) || (timeout < max_blocking_time))) {
00268 return_code = update_status(connection_id, DDS::RETCODE_BAD_PARAMETER);
00269 return;
00270 }
00271
00272 return_code = update_status(connection_id, typedWriter->write(message, DDS::HANDLE_NIL));
00273 }
00274
00275 template <typename Msg>
00276 class Listener : public DCPS::LocalObject<DDS::DataReaderListener> {
00277 public:
00278 typedef void (*Callback)(FACE::TRANSACTION_ID_TYPE, Msg&,
00279 FACE::MESSAGE_TYPE_GUID,
00280 FACE::MESSAGE_SIZE_TYPE,
00281 const FACE::WAITSET_TYPE,
00282 FACE::RETURN_CODE_TYPE&);
00283
00284 Listener(Callback callback, FACE::CONNECTION_ID_TYPE connection_id)
00285 : connection_id_(connection_id)
00286 {
00287 callbacks_.push_back(callback);
00288 }
00289
00290 void add_callback(Callback callback) {
00291 GuardType guard(callbacks_lock_);
00292 callbacks_.push_back(callback);
00293 }
00294
00295 private:
00296 void on_requested_deadline_missed(DDS::DataReader_ptr,
00297 const DDS::RequestedDeadlineMissedStatus&) {}
00298
00299 void on_requested_incompatible_qos(DDS::DataReader_ptr,
00300 const DDS::RequestedIncompatibleQosStatus&) {}
00301
00302 void on_sample_rejected(DDS::DataReader_ptr,
00303 const DDS::SampleRejectedStatus&) {}
00304
00305 void on_liveliness_changed(DDS::DataReader_ptr,
00306 const DDS::LivelinessChangedStatus&) {}
00307
00308 void on_subscription_matched(DDS::DataReader_ptr,
00309 const DDS::SubscriptionMatchedStatus&) {}
00310
00311 void on_sample_lost(DDS::DataReader_ptr, const DDS::SampleLostStatus&) {}
00312
00313 void on_data_available(DDS::DataReader_ptr reader)
00314 {
00315 typedef typename DCPS::DDSTraits<Msg>::DataReaderType DataReader;
00316 const typename DataReader::_var_type typedReader =
00317 DataReader::_narrow(reader);
00318 if (!typedReader) {
00319 update_status(connection_id_, DDS::RETCODE_BAD_PARAMETER);
00320 return;
00321 }
00322
00323 FACE::MESSAGE_TYPE_GUID& msg_id = Entities::instance()->connections_[connection_id_].platform_view_guid;
00324 Msg sample;
00325 DDS::SampleInfo sinfo;
00326 while (typedReader->take_next_sample(sample, sinfo) == DDS::RETCODE_OK) {
00327 if (sinfo.valid_data) {
00328 DDS::Subscriber_var subscriber = typedReader->get_subscriber();
00329 DDS::DomainParticipant_var participant = subscriber->get_participant();
00330 FACE::RETURN_CODE_TYPE ret_code;
00331 populate_header_received(connection_id_, participant, sinfo, ret_code);
00332 if (ret_code != FACE::RC_NO_ERROR) {
00333 update_status(connection_id_, ret_code);
00334 return;
00335 }
00336
00337 FACE::TRANSACTION_ID_TYPE transaction_id = ++Entities::instance()->receivers_[connection_id_]->last_msg_tid;
00338 update_status(connection_id_, DDS::RETCODE_OK);
00339 FACE::RETURN_CODE_TYPE retcode;
00340 GuardType guard(callbacks_lock_);
00341 if (OpenDDS::DCPS::DCPS_debug_level > 3) {
00342 ACE_DEBUG((LM_DEBUG, "Listener::on_data_available - invoking %d callbacks\n", callbacks_.size()));
00343 }
00344 for (size_t i = 0; i < callbacks_.size(); ++i) {
00345 retcode = FACE::RC_NO_ERROR;
00346 callbacks_.at(i)(transaction_id , sample, msg_id, sizeof(Msg), 0 , retcode);
00347 if (retcode != FACE::RC_NO_ERROR) {
00348 ACE_ERROR((LM_ERROR, "ERROR: Listener::on_data_available - callback %d returned retcode: %d\n", i, retcode));
00349 }
00350 }
00351 }
00352 }
00353 }
00354
00355 typedef ACE_SYNCH_MUTEX LockType;
00356 typedef ACE_Guard<LockType> GuardType;
00357 LockType callbacks_lock_;
00358 OPENDDS_VECTOR(Callback) callbacks_;
00359 const FACE::CONNECTION_ID_TYPE connection_id_;
00360 };
00361
00362 template <typename Msg>
00363 void register_callback(FACE::CONNECTION_ID_TYPE connection_id,
00364 const FACE::WAITSET_TYPE ,
00365 void (*callback)(FACE::TRANSACTION_ID_TYPE, Msg&,
00366 FACE::MESSAGE_TYPE_GUID,
00367 FACE::MESSAGE_SIZE_TYPE,
00368 const FACE::WAITSET_TYPE,
00369 FACE::RETURN_CODE_TYPE&),
00370 FACE::MESSAGE_SIZE_TYPE max_message_size,
00371 FACE::RETURN_CODE_TYPE& return_code)
00372 {
00373 Entities::ConnIdToReceiverMap& readers = Entities::instance()->receivers_;
00374 if (!readers.count(connection_id)) {
00375 return_code = FACE::INVALID_PARAM;
00376 return;
00377 }
00378 if(!Entities::instance()->connections_.count(connection_id)) {
00379 return_code = FACE::INVALID_PARAM;
00380 return;
00381 }
00382 FACE::TRANSPORT_CONNECTION_STATUS_TYPE status =
00383 Entities::instance()->connections_[connection_id].connection_status;
00384 if (max_message_size < status.MAX_MESSAGE_SIZE) {
00385 return_code = FACE::INVALID_PARAM;
00386 return;
00387 }
00388 DDS::DataReaderListener_var existing_listener = readers[connection_id]->dr->get_listener();
00389 if (existing_listener.in()) {
00390 Listener<Msg>* typedListener = dynamic_cast<Listener<Msg>*>(existing_listener.in());
00391 if (typedListener) {
00392 typedListener->add_callback(callback);
00393 } else {
00394 ACE_ERROR((LM_ERROR, "ERROR: register_callback - failed to obtain typed listener\n"));
00395 return_code = FACE::INVALID_PARAM;
00396 return;
00397 }
00398 } else {
00399 DDS::DataReaderListener_var listener = new Listener<Msg>(callback, connection_id);
00400 readers[connection_id]->dr->set_listener(listener, DDS::DATA_AVAILABLE_STATUS);
00401 }
00402 if (readers[connection_id]->status_valid != FACE::VALID) {
00403 Entities::FaceReceiver* tmp = readers[connection_id];
00404 readers[connection_id] = new Entities::DDSTypedAdapter<Msg>(*readers[connection_id]);
00405 delete tmp;
00406 }
00407 readers[connection_id]->status_valid = FACE::VALID;
00408
00409 return_code = FACE::RC_NO_ERROR;
00410 }
00411
00412 } }
00413
00414 OPENDDS_END_VERSIONED_NAMESPACE_DECL
00415
00416 #endif