FaceTSS.h

Go to the documentation of this file.
00001 #ifndef OPENDDS_FACE_TSS_H
00002 #define OPENDDS_FACE_TSS_H
00003 
00004 #include "FACE/TS.hpp"
00005 #include "dds/DCPS/PoolAllocator.h"
00006 #include "dds/DdsDcpsSubscriptionC.h"
00007 #include "dds/DCPS/TypeSupportImpl.h"
00008 #include "dds/DCPS/WaitSet.h"
00009 #include "dds/DCPS/SafetyProfileStreams.h"
00010 #include "dds/DCPS/RepoIdTypes.h"
00011 #include "ace/Singleton.h"
00012 
00013 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00014 
00015 namespace OpenDDS {
00016 namespace FaceTSS {
00017 
00018 class Entities {
00019   friend class ACE_Singleton<Entities, ACE_Thread_Mutex>;
00020 
00021   Entities();
00022   ~Entities();
00023 
00024 public:
00025   struct DDSAdapter : public OpenDDS::DCPS::PoolAllocationBase {
00026     DDSAdapter ()
00027       : status_valid(FACE::INVALID)
00028     {}
00029 
00030     FACE::VALIDITY_TYPE status_valid;
00031   };
00032   struct FaceSender : public DDSAdapter {
00033     FaceSender () {}
00034     DDS::DataWriter_var dw;
00035   };
00036 
00037   struct FaceReceiver : public DDSAdapter {
00038     FaceReceiver ()
00039       : last_msg_header(),
00040         last_msg_tid(0),
00041         sum_recvd_msgs_latency(0),
00042         total_msgs_recvd(0)
00043     {}
00044 
00045     virtual ~FaceReceiver() {}
00046     virtual FACE::RETURN_CODE_TYPE messages_waiting(FACE::WAITING_RANGE_TYPE& /*num_waiting*/)
00047     {
00048       return FACE::NOT_AVAILABLE;
00049     };
00050     DDS::DataReader_var dr;
00051     FACE::TS::MessageHeader last_msg_header;
00052     FACE::TRANSACTION_ID_TYPE last_msg_tid;
00053     FACE::SYSTEM_TIME_TYPE sum_recvd_msgs_latency;
00054     FACE::LongLong total_msgs_recvd;
00055   };
00056 
00057   template<typename Msg>
00058   class DDSTypedAdapter : public FaceReceiver {
00059   public:
00060     DDSTypedAdapter(FaceReceiver& rcvr);
00061     ~DDSTypedAdapter();
00062     virtual FACE::RETURN_CODE_TYPE messages_waiting(FACE::WAITING_RANGE_TYPE& num_waiting);
00063     typedef typename DCPS::DDSTraits<Msg>::DataReaderType DataReader;
00064   };
00065 
00066   typedef OPENDDS_MAP(FACE::CONNECTION_ID_TYPE, FaceSender) ConnIdToSenderMap;
00067   typedef OPENDDS_MAP(FACE::CONNECTION_ID_TYPE, FaceReceiver*) ConnIdToReceiverMap;
00068 
00069   OpenDDS_FACE_Export static Entities* instance();
00070   ConnIdToSenderMap senders_;
00071   ConnIdToReceiverMap receivers_;
00072 
00073   struct ConnectionInfo {
00074     OPENDDS_STRING connection_name;
00075     FACE::TRANSPORT_CONNECTION_STATUS_TYPE connection_status;
00076     FACE::MESSAGE_TYPE_GUID platform_view_guid;
00077   };
00078   OPENDDS_MAP(FACE::CONNECTION_ID_TYPE, ConnectionInfo ) connections_;
00079 };
00080 
00081 OpenDDS_FACE_Export DDS::Duration_t convertTimeout(FACE::TIMEOUT_TYPE timeout);
00082 OpenDDS_FACE_Export FACE::SYSTEM_TIME_TYPE convertDuration(const DDS::Duration_t& duration);
00083 OpenDDS_FACE_Export FACE::SYSTEM_TIME_TYPE convertTime(const DDS::Time_t& timestamp);
00084 OpenDDS_FACE_Export void populate_header_received(const FACE::CONNECTION_ID_TYPE& connection_id,
00085                                                   const DDS::DomainParticipant_var part,
00086                                                   const DDS::SampleInfo& sinfo,
00087                                                   FACE::RETURN_CODE_TYPE& return_code);
00088 
00089 OpenDDS_FACE_Export FACE::MESSAGE_INSTANCE_GUID create_message_instance_guid(const OpenDDS::DCPS::RepoId& pub,
00090                                                                              const CORBA::LongLong& seq);
00091 
00092 OpenDDS_FACE_Export FACE::RETURN_CODE_TYPE update_status(FACE::CONNECTION_ID_TYPE connection_id,
00093     DDS::ReturnCode_t retcode);
00094 
00095 template <typename Msg>
00096 Entities::DDSTypedAdapter<Msg>::DDSTypedAdapter(FaceReceiver& rcvr)
00097   : FaceReceiver()
00098 {
00099   dr = rcvr.dr;
00100   last_msg_header = rcvr.last_msg_header;
00101   last_msg_tid = rcvr.last_msg_tid;
00102   sum_recvd_msgs_latency = rcvr.sum_recvd_msgs_latency;
00103   total_msgs_recvd = rcvr.total_msgs_recvd;
00104 }
00105 
00106 template <typename Msg>
00107 Entities::DDSTypedAdapter<Msg>::~DDSTypedAdapter()
00108 {
00109 }
00110 
00111 template <typename Msg>
00112 FACE::RETURN_CODE_TYPE Entities::DDSTypedAdapter<Msg>::messages_waiting(FACE::WAITING_RANGE_TYPE& num_waiting)
00113 {
00114   const typename DataReader::_var_type typedReader =
00115     DataReader::_narrow(dr);
00116   if (!typedReader) {
00117     return FACE::INVALID_PARAM;
00118   }
00119   const DDS::ReadCondition_var rc =
00120       typedReader->create_readcondition(DDS::ANY_SAMPLE_STATE,
00121                                 DDS::ANY_VIEW_STATE,
00122                                 DDS::ALIVE_INSTANCE_STATE);
00123 
00124   DDS::ReturnCode_t ret;
00125   typename DCPS::DDSTraits<Msg>::MessageSequenceType seq;
00126   DDS::SampleInfoSeq sinfo;
00127   FACE::WAITING_RANGE_TYPE valid_waiting = 0;
00128   ret = typedReader->read_w_condition(seq, sinfo, DDS::LENGTH_UNLIMITED, rc);
00129   if (ret == DDS::RETCODE_OK) {
00130     for (CORBA::ULong i = 0; i < seq.length(); ++i) {
00131       if (sinfo[i].valid_data) {
00132         ++valid_waiting;
00133       }
00134     }
00135     num_waiting = valid_waiting;
00136     return FACE::RC_NO_ERROR;
00137   } else if (ret == DDS::RETCODE_NO_DATA) {
00138     num_waiting = 0;
00139     return FACE::RC_NO_ERROR;
00140   }
00141   return FACE::NOT_AVAILABLE;
00142 }
00143 
00144 template <typename Msg>
00145 void receive_message(/*in*/    FACE::CONNECTION_ID_TYPE connection_id,
00146                      /*in*/    FACE::TIMEOUT_TYPE timeout,
00147                      /*inout*/ FACE::TRANSACTION_ID_TYPE& transaction_id,
00148                      /*inout*/ Msg& message,
00149                      /*in*/    FACE::MESSAGE_SIZE_TYPE message_size,
00150                      /*out*/   FACE::RETURN_CODE_TYPE& return_code)
00151 {
00152   try {
00153     Entities::ConnIdToReceiverMap& readers = Entities::instance()->receivers_;
00154     if (!readers.count(connection_id)) {
00155       return_code = FACE::INVALID_PARAM;
00156       return;
00157     }
00158     if (!Entities::instance()->connections_.count(connection_id)) {
00159       return_code = FACE::INVALID_PARAM;
00160       return;
00161     }
00162     FACE::TRANSPORT_CONNECTION_STATUS_TYPE status =
00163       Entities::instance()->connections_[connection_id].connection_status;
00164     if (message_size < status.MAX_MESSAGE_SIZE) {
00165       return_code = FACE::INVALID_PARAM;
00166       return;
00167     }
00168     typedef typename DCPS::DDSTraits<Msg>::DataReaderType DataReader;
00169     const typename DataReader::_var_type typedReader =
00170       DataReader::_narrow(readers[connection_id]->dr);
00171     if (!typedReader) {
00172       return_code = update_status(connection_id, DDS::RETCODE_BAD_PARAMETER);
00173       return;
00174     }
00175     if (readers[connection_id]->status_valid != FACE::VALID) {
00176       Entities::FaceReceiver* tmp = readers[connection_id];
00177       readers[connection_id] = new Entities::DDSTypedAdapter<Msg>(*readers[connection_id]);
00178       delete tmp;
00179     }
00180     readers[connection_id]->status_valid = FACE::VALID;
00181 
00182     const DDS::ReadCondition_var rc =
00183       typedReader->create_readcondition(DDS::ANY_SAMPLE_STATE,
00184         DDS::ANY_VIEW_STATE,
00185         DDS::ALIVE_INSTANCE_STATE);
00186     const DDS::WaitSet_var ws = new DDS::WaitSet;
00187     ws->attach_condition(rc);
00188 
00189     DDS::ConditionSeq active;
00190     const DDS::Duration_t ddsTimeout = convertTimeout(timeout);
00191     DDS::ReturnCode_t ret = ws->wait(active, ddsTimeout);
00192     ws->detach_condition(rc);
00193 
00194     if (ret == DDS::RETCODE_TIMEOUT) {
00195       typedReader->delete_readcondition(rc);
00196       return_code = update_status(connection_id, ret);
00197       return;
00198     }
00199 
00200     typename DCPS::DDSTraits<Msg>::MessageSequenceType seq;
00201     DDS::SampleInfoSeq sinfo;
00202     ret = typedReader->take_w_condition(seq, sinfo, 1 /*max*/, rc);
00203     typedReader->delete_readcondition(rc);
00204     if (ret == DDS::RETCODE_OK && sinfo[0].valid_data) {
00205       DDS::Subscriber_var subscriber = typedReader->get_subscriber();
00206       DDS::DomainParticipant_var participant = subscriber->get_participant();
00207       FACE::RETURN_CODE_TYPE ret_code;
00208       populate_header_received(connection_id, participant, sinfo[0], ret_code);
00209       if (ret_code != FACE::RC_NO_ERROR) {
00210         return_code = update_status(connection_id, ret_code);
00211         return;
00212       }
00213 
00214       transaction_id = ++readers[connection_id]->last_msg_tid;
00215 
00216       message = seq[0];
00217       return_code = update_status(connection_id, ret);
00218       return;
00219     }
00220     return_code = update_status(connection_id, DDS::RETCODE_NO_DATA);
00221   } catch (const CORBA::BAD_PARAM&) {
00222     if (OpenDDS::DCPS::DCPS_debug_level) {
00223       ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: receive_message - INVALID_PARAM\n"));
00224     }
00225     return_code = FACE::INVALID_PARAM;
00226   }
00227 }
00228 
00229 template <typename Msg>
00230 void send_message(FACE::CONNECTION_ID_TYPE connection_id,
00231                   FACE::TIMEOUT_TYPE timeout,
00232                   FACE::TRANSACTION_ID_TYPE& /*transaction_id*/,
00233                   const Msg& message,
00234                   FACE::MESSAGE_SIZE_TYPE message_size,
00235                   FACE::RETURN_CODE_TYPE& return_code)
00236 {
00237   if(!Entities::instance()->connections_.count(connection_id)) {
00238     return_code = FACE::INVALID_PARAM;
00239     return;
00240   }
00241   FACE::TRANSPORT_CONNECTION_STATUS_TYPE status =
00242     Entities::instance()->connections_[connection_id].connection_status;
00243   if (message_size < status.MAX_MESSAGE_SIZE) {
00244     return_code = FACE::INVALID_PARAM;
00245     return;
00246   }
00247   Entities::ConnIdToSenderMap& writers = Entities::instance()->senders_;
00248   if (!writers.count(connection_id)) {
00249     return_code = FACE::INVALID_PARAM;
00250     return;
00251   }
00252 
00253   typedef typename DCPS::DDSTraits<Msg>::DataWriterType DataWriter;
00254   const typename DataWriter::_var_type typedWriter =
00255     DataWriter::_narrow(writers[connection_id].dw);
00256   if (!typedWriter) {
00257     return_code = update_status(connection_id, DDS::RETCODE_BAD_PARAMETER);
00258     return;
00259   }
00260   writers[connection_id].status_valid = FACE::VALID;
00261 
00262   DDS::DataWriterQos dw_qos;
00263   typedWriter->get_qos(dw_qos);
00264   FACE::SYSTEM_TIME_TYPE max_blocking_time = convertDuration(dw_qos.reliability.max_blocking_time);
00265   if (dw_qos.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS &&
00266       timeout != FACE::INF_TIME_VALUE &&
00267       ((max_blocking_time == FACE::INF_TIME_VALUE) || (timeout < max_blocking_time))) {
00268     return_code = update_status(connection_id, DDS::RETCODE_BAD_PARAMETER);
00269     return;
00270   }
00271 
00272   return_code = update_status(connection_id, typedWriter->write(message, DDS::HANDLE_NIL));
00273 }
00274 
00275 template <typename Msg>
00276 class Listener : public DCPS::LocalObject<DDS::DataReaderListener> {
00277 public:
00278   typedef void (*Callback)(FACE::TRANSACTION_ID_TYPE, Msg&,
00279                            FACE::MESSAGE_TYPE_GUID,
00280                            FACE::MESSAGE_SIZE_TYPE,
00281                            const FACE::WAITSET_TYPE,
00282                            FACE::RETURN_CODE_TYPE&);
00283 
00284   Listener(Callback callback, FACE::CONNECTION_ID_TYPE connection_id)
00285     : connection_id_(connection_id)
00286   {
00287     callbacks_.push_back(callback);
00288   }
00289 
00290   void add_callback(Callback callback) {
00291     GuardType guard(callbacks_lock_);
00292     callbacks_.push_back(callback);
00293   }
00294 
00295 private:
00296   void on_requested_deadline_missed(DDS::DataReader_ptr,
00297     const DDS::RequestedDeadlineMissedStatus&) {}
00298 
00299   void on_requested_incompatible_qos(DDS::DataReader_ptr,
00300     const DDS::RequestedIncompatibleQosStatus&) {}
00301 
00302   void on_sample_rejected(DDS::DataReader_ptr,
00303     const DDS::SampleRejectedStatus&) {}
00304 
00305   void on_liveliness_changed(DDS::DataReader_ptr,
00306     const DDS::LivelinessChangedStatus&) {}
00307 
00308   void on_subscription_matched(DDS::DataReader_ptr,
00309     const DDS::SubscriptionMatchedStatus&) {}
00310 
00311   void on_sample_lost(DDS::DataReader_ptr, const DDS::SampleLostStatus&) {}
00312 
00313   void on_data_available(DDS::DataReader_ptr reader)
00314   {
00315     typedef typename DCPS::DDSTraits<Msg>::DataReaderType DataReader;
00316     const typename DataReader::_var_type typedReader =
00317       DataReader::_narrow(reader);
00318     if (!typedReader) {
00319       update_status(connection_id_, DDS::RETCODE_BAD_PARAMETER);
00320       return;
00321     }
00322 
00323     FACE::MESSAGE_TYPE_GUID& msg_id = Entities::instance()->connections_[connection_id_].platform_view_guid;
00324     Msg sample;
00325     DDS::SampleInfo sinfo;
00326     while (typedReader->take_next_sample(sample, sinfo) == DDS::RETCODE_OK) {
00327       if (sinfo.valid_data) {
00328         DDS::Subscriber_var subscriber = typedReader->get_subscriber();
00329         DDS::DomainParticipant_var participant = subscriber->get_participant();
00330         FACE::RETURN_CODE_TYPE ret_code;
00331         populate_header_received(connection_id_, participant, sinfo, ret_code);
00332         if (ret_code != FACE::RC_NO_ERROR) {
00333           update_status(connection_id_, ret_code);
00334           return;
00335         }
00336 
00337         FACE::TRANSACTION_ID_TYPE transaction_id = ++Entities::instance()->receivers_[connection_id_]->last_msg_tid;
00338         update_status(connection_id_, DDS::RETCODE_OK);
00339         FACE::RETURN_CODE_TYPE retcode;
00340         GuardType guard(callbacks_lock_);
00341         if (OpenDDS::DCPS::DCPS_debug_level > 3) {
00342           ACE_DEBUG((LM_DEBUG, "Listener::on_data_available - invoking %d callbacks\n", callbacks_.size()));
00343         }
00344         for (size_t i = 0; i < callbacks_.size(); ++i) {
00345           retcode = FACE::RC_NO_ERROR;
00346           callbacks_.at(i)(transaction_id /*Transaction_ID*/, sample, msg_id, sizeof(Msg), 0 /*WAITSET_TYPE*/, retcode);
00347           if (retcode != FACE::RC_NO_ERROR) {
00348             ACE_ERROR((LM_ERROR, "ERROR: Listener::on_data_available - callback %d returned retcode: %d\n", i, retcode));
00349           }
00350         }
00351       }
00352     }
00353   }
00354 
00355   typedef ACE_SYNCH_MUTEX     LockType;
00356   typedef ACE_Guard<LockType> GuardType;
00357   LockType callbacks_lock_;
00358   OPENDDS_VECTOR(Callback) callbacks_;
00359   const FACE::CONNECTION_ID_TYPE connection_id_;
00360 };
00361 
00362 template <typename Msg>
00363 void register_callback(FACE::CONNECTION_ID_TYPE connection_id,
00364                        const FACE::WAITSET_TYPE /*waitset*/,
00365                        void (*callback)(FACE::TRANSACTION_ID_TYPE, Msg&,
00366                                         FACE::MESSAGE_TYPE_GUID,
00367                                         FACE::MESSAGE_SIZE_TYPE,
00368                                         const FACE::WAITSET_TYPE,
00369                                         FACE::RETURN_CODE_TYPE&),
00370                        FACE::MESSAGE_SIZE_TYPE max_message_size,
00371                        FACE::RETURN_CODE_TYPE& return_code)
00372 {
00373   Entities::ConnIdToReceiverMap& readers = Entities::instance()->receivers_;
00374   if (!readers.count(connection_id)) {
00375     return_code = FACE::INVALID_PARAM;
00376     return;
00377   }
00378   if(!Entities::instance()->connections_.count(connection_id)) {
00379     return_code = FACE::INVALID_PARAM;
00380     return;
00381   }
00382   FACE::TRANSPORT_CONNECTION_STATUS_TYPE status =
00383     Entities::instance()->connections_[connection_id].connection_status;
00384   if (max_message_size < status.MAX_MESSAGE_SIZE) {
00385     return_code = FACE::INVALID_PARAM;
00386     return;
00387   }
00388   DDS::DataReaderListener_var existing_listener = readers[connection_id]->dr->get_listener();
00389   if (existing_listener.in()) {
00390     Listener<Msg>* typedListener = dynamic_cast<Listener<Msg>*>(existing_listener.in());
00391     if (typedListener) {
00392       typedListener->add_callback(callback);
00393     } else {
00394       ACE_ERROR((LM_ERROR, "ERROR: register_callback - failed to obtain typed listener\n"));
00395       return_code = FACE::INVALID_PARAM;
00396       return;
00397     }
00398   } else {
00399     DDS::DataReaderListener_var listener = new Listener<Msg>(callback, connection_id);
00400     readers[connection_id]->dr->set_listener(listener, DDS::DATA_AVAILABLE_STATUS);
00401   }
00402   if (readers[connection_id]->status_valid != FACE::VALID) {
00403     Entities::FaceReceiver* tmp = readers[connection_id];
00404     readers[connection_id] = new Entities::DDSTypedAdapter<Msg>(*readers[connection_id]);
00405     delete tmp;
00406   }
00407   readers[connection_id]->status_valid = FACE::VALID;
00408 
00409   return_code = FACE::RC_NO_ERROR;
00410 }
00411 
00412 } }
00413 
00414 OPENDDS_END_VERSIONED_NAMESPACE_DECL
00415 
00416 #endif
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1