FaceTSS.h

Go to the documentation of this file.
00001 #ifndef OPENDDS_FACE_TSS_H
00002 #define OPENDDS_FACE_TSS_H
00003 
00004 #include "FACE/TS.hpp"
00005 #include "dds/DCPS/PoolAllocator.h"
00006 #include "dds/DdsDcpsPublicationC.h"
00007 #include "dds/DdsDcpsSubscriptionC.h"
00008 #include "dds/DCPS/TypeSupportImpl.h"
00009 #include "dds/DCPS/WaitSet.h"
00010 #include "dds/DCPS/SafetyProfileStreams.h"
00011 #include "ace/Singleton.h"
00012 
00013 
00014 namespace OpenDDS {
00015 namespace FaceTSS {
00016 
00017 class Entities {
00018   friend class ACE_Singleton<Entities, ACE_Thread_Mutex>;
00019 
00020   Entities();
00021   ~Entities();
00022 
00023 public:
00024   struct DDSAdapter : public OpenDDS::DCPS::PoolAllocationBase {
00025     DDSAdapter ()
00026       : status_valid(FACE::INVALID)
00027     {}
00028 
00029     FACE::VALIDITY_TYPE status_valid;
00030   };
00031   struct FaceSender : public DDSAdapter {
00032     FaceSender () {}
00033     DDS::DataWriter_var dw;
00034   };
00035 
00036   struct FaceReceiver : public DDSAdapter {
00037     FaceReceiver ()
00038       : last_msg_tid(0),
00039         sum_recvd_msgs_latency(0),
00040         total_msgs_recvd(0)
00041     {}
00042 
00043     virtual ~FaceReceiver() {}
00044     virtual FACE::RETURN_CODE_TYPE messages_waiting(FACE::WAITING_RANGE_TYPE& /*num_waiting*/)
00045     {
00046       return FACE::NOT_AVAILABLE;
00047     };
00048     DDS::DataReader_var dr;
00049     FACE::TS::MessageHeader last_msg_header;
00050     FACE::TRANSACTION_ID_TYPE last_msg_tid;
00051     FACE::SYSTEM_TIME_TYPE sum_recvd_msgs_latency;
00052     FACE::LongLong total_msgs_recvd;
00053   };
00054 
00055   template<typename Msg>
00056   class DDSTypedAdapter : public FaceReceiver {
00057   public:
00058     DDSTypedAdapter(FaceReceiver& rcvr);
00059     ~DDSTypedAdapter();
00060     virtual FACE::RETURN_CODE_TYPE messages_waiting(FACE::WAITING_RANGE_TYPE& num_waiting);
00061     typedef typename DCPS::DDSTraits<Msg>::DataReaderType DataReader;
00062   };
00063 
00064   typedef OPENDDS_MAP(FACE::CONNECTION_ID_TYPE, FaceSender) ConnIdToSenderMap;
00065   typedef OPENDDS_MAP(FACE::CONNECTION_ID_TYPE, FaceReceiver*) ConnIdToReceiverMap;
00066 
00067   OpenDDS_FACE_Export static Entities* instance();
00068   ConnIdToSenderMap senders_;
00069   ConnIdToReceiverMap receivers_;
00070 
00071   struct ConnectionInfo {
00072     OPENDDS_STRING connection_name;
00073     FACE::TRANSPORT_CONNECTION_STATUS_TYPE connection_status;
00074     FACE::MESSAGE_TYPE_GUID platform_view_guid;
00075   };
00076   OPENDDS_MAP(FACE::CONNECTION_ID_TYPE, ConnectionInfo ) connections_;
00077 };
00078 
00079 OpenDDS_FACE_Export DDS::Duration_t convertTimeout(FACE::TIMEOUT_TYPE timeout);
00080 OpenDDS_FACE_Export FACE::SYSTEM_TIME_TYPE convertDuration(const DDS::Duration_t& duration);
00081 OpenDDS_FACE_Export FACE::SYSTEM_TIME_TYPE convertTime(const DDS::Time_t& timestamp);
00082 OpenDDS_FACE_Export void populate_header_received(const FACE::CONNECTION_ID_TYPE& connection_id,
00083                                                   const DDS::DomainParticipant_var part,
00084                                                   const DDS::SampleInfo& sinfo,
00085                                                   FACE::RETURN_CODE_TYPE& return_code);
00086 
00087 OpenDDS_FACE_Export FACE::MESSAGE_INSTANCE_GUID create_message_instance_guid(const OpenDDS::DCPS::RepoId& pub,
00088                                                                              const CORBA::LongLong& seq);
00089 
00090 OpenDDS_FACE_Export FACE::RETURN_CODE_TYPE update_status(FACE::CONNECTION_ID_TYPE connection_id,
00091     DDS::ReturnCode_t retcode);
00092 
00093 template <typename Msg>
00094 Entities::DDSTypedAdapter<Msg>::DDSTypedAdapter(FaceReceiver& rcvr)
00095   : FaceReceiver()
00096 {
00097   dr = rcvr.dr;
00098   last_msg_header = rcvr.last_msg_header;
00099   last_msg_tid = rcvr.last_msg_tid;
00100   sum_recvd_msgs_latency = rcvr.sum_recvd_msgs_latency;
00101   total_msgs_recvd = rcvr.total_msgs_recvd;
00102 }
00103 
00104 template <typename Msg>
00105 Entities::DDSTypedAdapter<Msg>::~DDSTypedAdapter()
00106 {
00107 }
00108 
00109 template <typename Msg>
00110 FACE::RETURN_CODE_TYPE Entities::DDSTypedAdapter<Msg>::messages_waiting(FACE::WAITING_RANGE_TYPE& num_waiting)
00111 {
00112   const typename DataReader::_var_type typedReader =
00113     DataReader::_narrow(dr);
00114   if (!typedReader) {
00115     return FACE::INVALID_PARAM;
00116   }
00117   const DDS::ReadCondition_var rc =
00118       typedReader->create_readcondition(DDS::ANY_SAMPLE_STATE,
00119                                 DDS::ANY_VIEW_STATE,
00120                                 DDS::ALIVE_INSTANCE_STATE);
00121 
00122   DDS::ReturnCode_t ret;
00123   typename DCPS::DDSTraits<Msg>::MessageSequenceType seq;
00124   DDS::SampleInfoSeq sinfo;
00125   FACE::WAITING_RANGE_TYPE valid_waiting = 0;
00126   ret = typedReader->read_w_condition(seq, sinfo, DDS::LENGTH_UNLIMITED, rc);
00127   if (ret == DDS::RETCODE_OK) {
00128     for (CORBA::ULong i = 0; i < seq.length(); ++i) {
00129       if (sinfo[i].valid_data) {
00130         ++valid_waiting;
00131       }
00132     }
00133     num_waiting = valid_waiting;
00134     return FACE::RC_NO_ERROR;
00135   } else if (ret == DDS::RETCODE_NO_DATA) {
00136     num_waiting = 0;
00137     return FACE::RC_NO_ERROR;
00138   }
00139   return FACE::NOT_AVAILABLE;
00140 }
00141 
00142 template <typename Msg>
00143 void receive_message(/*in*/    FACE::CONNECTION_ID_TYPE connection_id,
00144                      /*in*/    FACE::TIMEOUT_TYPE timeout,
00145                      /*inout*/ FACE::TRANSACTION_ID_TYPE& transaction_id,
00146                      /*inout*/ Msg& message,
00147                      /*in*/    FACE::MESSAGE_SIZE_TYPE message_size,
00148                      /*out*/   FACE::RETURN_CODE_TYPE& return_code)
00149 {
00150   Entities::ConnIdToReceiverMap& readers = Entities::instance()->receivers_;
00151   if (!readers.count(connection_id)) {
00152     return_code = FACE::INVALID_PARAM;
00153     return;
00154   }
00155   if(!Entities::instance()->connections_.count(connection_id)) {
00156     return_code = FACE::INVALID_PARAM;
00157     return;
00158   }
00159   FACE::TRANSPORT_CONNECTION_STATUS_TYPE status =
00160     Entities::instance()->connections_[connection_id].connection_status;
00161   if (message_size < status.MAX_MESSAGE_SIZE) {
00162     return_code = FACE::INVALID_PARAM;
00163     return;
00164   }
00165   typedef typename DCPS::DDSTraits<Msg>::DataReaderType DataReader;
00166   const typename DataReader::_var_type typedReader =
00167     DataReader::_narrow(readers[connection_id]->dr);
00168   if (!typedReader) {
00169     return_code = update_status(connection_id, DDS::RETCODE_BAD_PARAMETER);
00170     return;
00171   }
00172   if (readers[connection_id]->status_valid != FACE::VALID) {
00173     Entities::FaceReceiver* tmp = readers[connection_id];
00174     readers[connection_id] = new Entities::DDSTypedAdapter<Msg>(*readers[connection_id]);
00175     delete tmp;
00176   }
00177   readers[connection_id]->status_valid = FACE::VALID;
00178 
00179   const DDS::ReadCondition_var rc =
00180     typedReader->create_readcondition(DDS::ANY_SAMPLE_STATE,
00181                                       DDS::ANY_VIEW_STATE,
00182                                       DDS::ALIVE_INSTANCE_STATE);
00183   const DDS::WaitSet_var ws = new DDS::WaitSet;
00184   ws->attach_condition(rc);
00185 
00186   DDS::ConditionSeq active;
00187   const DDS::Duration_t ddsTimeout = convertTimeout(timeout);
00188   DDS::ReturnCode_t ret = ws->wait(active, ddsTimeout);
00189   ws->detach_condition(rc);
00190 
00191   if (ret == DDS::RETCODE_TIMEOUT) {
00192     return_code = update_status(connection_id, ret);
00193     return;
00194   }
00195 
00196   typename DCPS::DDSTraits<Msg>::MessageSequenceType seq;
00197   DDS::SampleInfoSeq sinfo;
00198   ret = typedReader->take_w_condition(seq, sinfo, 1 /*max*/, rc);
00199   if (ret == DDS::RETCODE_OK && sinfo[0].valid_data) {
00200     DDS::DomainParticipant_var participant = typedReader->get_subscriber()->get_participant();
00201     FACE::RETURN_CODE_TYPE ret_code;
00202     populate_header_received(connection_id, participant, sinfo[0], ret_code);
00203     if (ret_code != FACE::RC_NO_ERROR) {
00204       return_code = update_status(connection_id, ret_code);
00205       return;
00206     }
00207 
00208     transaction_id = ++readers[connection_id]->last_msg_tid;
00209 
00210     message = seq[0];
00211     return_code = update_status(connection_id, ret);
00212     return;
00213   }
00214   return_code = update_status(connection_id, DDS::RETCODE_NO_DATA);
00215 }
00216 
00217 template <typename Msg>
00218 void send_message(FACE::CONNECTION_ID_TYPE connection_id,
00219                   FACE::TIMEOUT_TYPE timeout,
00220                   FACE::TRANSACTION_ID_TYPE& /*transaction_id*/,
00221                   const Msg& message,
00222                   FACE::MESSAGE_SIZE_TYPE message_size,
00223                   FACE::RETURN_CODE_TYPE& return_code)
00224 {
00225   if(!Entities::instance()->connections_.count(connection_id)) {
00226     return_code = FACE::INVALID_PARAM;
00227     return;
00228   }
00229   FACE::TRANSPORT_CONNECTION_STATUS_TYPE status =
00230     Entities::instance()->connections_[connection_id].connection_status;
00231   if (message_size < status.MAX_MESSAGE_SIZE) {
00232     return_code = FACE::INVALID_PARAM;
00233     return;
00234   }
00235   Entities::ConnIdToSenderMap& writers = Entities::instance()->senders_;
00236   if (!writers.count(connection_id)) {
00237     return_code = FACE::INVALID_PARAM;
00238     return;
00239   }
00240 
00241   typedef typename DCPS::DDSTraits<Msg>::DataWriterType DataWriter;
00242   const typename DataWriter::_var_type typedWriter =
00243     DataWriter::_narrow(writers[connection_id].dw);
00244   if (!typedWriter) {
00245     return_code = update_status(connection_id, DDS::RETCODE_BAD_PARAMETER);
00246     return;
00247   }
00248   writers[connection_id].status_valid = FACE::VALID;
00249 
00250   DDS::DataWriterQos dw_qos;
00251   typedWriter->get_qos(dw_qos);
00252   FACE::SYSTEM_TIME_TYPE max_blocking_time = convertDuration(dw_qos.reliability.max_blocking_time);
00253   if (dw_qos.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS &&
00254       timeout != FACE::INF_TIME_VALUE &&
00255       ((max_blocking_time == FACE::INF_TIME_VALUE) || (timeout < max_blocking_time))) {
00256     return_code = update_status(connection_id, DDS::RETCODE_BAD_PARAMETER);
00257     return;
00258   }
00259 
00260   return_code = update_status(connection_id, typedWriter->write(message, DDS::HANDLE_NIL));
00261 }
00262 
00263 template <typename Msg>
00264 class Listener : public DCPS::LocalObject<DDS::DataReaderListener> {
00265 public:
00266   typedef void (*Callback)(FACE::TRANSACTION_ID_TYPE, Msg&,
00267                            FACE::MESSAGE_TYPE_GUID,
00268                            FACE::MESSAGE_SIZE_TYPE,
00269                            const FACE::WAITSET_TYPE,
00270                            FACE::RETURN_CODE_TYPE&);
00271 
00272   Listener(Callback callback, FACE::CONNECTION_ID_TYPE connection_id)
00273     : connection_id_(connection_id)
00274   {
00275     callbacks_.push_back(callback);
00276   }
00277 
00278   void add_callback(Callback callback) {
00279     GuardType guard(callbacks_lock_);
00280     callbacks_.push_back(callback);
00281   }
00282 
00283 private:
00284   void on_requested_deadline_missed(DDS::DataReader_ptr,
00285     const DDS::RequestedDeadlineMissedStatus&) {}
00286 
00287   void on_requested_incompatible_qos(DDS::DataReader_ptr,
00288     const DDS::RequestedIncompatibleQosStatus&) {}
00289 
00290   void on_sample_rejected(DDS::DataReader_ptr,
00291     const DDS::SampleRejectedStatus&) {}
00292 
00293   void on_liveliness_changed(DDS::DataReader_ptr,
00294     const DDS::LivelinessChangedStatus&) {}
00295 
00296   void on_subscription_matched(DDS::DataReader_ptr,
00297     const DDS::SubscriptionMatchedStatus&) {}
00298 
00299   void on_sample_lost(DDS::DataReader_ptr, const DDS::SampleLostStatus&) {}
00300 
00301   void on_data_available(DDS::DataReader_ptr reader)
00302   {
00303     typedef typename DCPS::DDSTraits<Msg>::DataReaderType DataReader;
00304     const typename DataReader::_var_type typedReader =
00305       DataReader::_narrow(reader);
00306     if (!typedReader) {
00307       update_status(connection_id_, DDS::RETCODE_BAD_PARAMETER);
00308       return;
00309     }
00310 
00311     FACE::MESSAGE_TYPE_GUID& msg_id = Entities::instance()->connections_[connection_id_].platform_view_guid;
00312     Msg sample;
00313     DDS::SampleInfo sinfo;
00314     while (typedReader->take_next_sample(sample, sinfo) == DDS::RETCODE_OK) {
00315       if (sinfo.valid_data) {
00316         DDS::DomainParticipant_var participant = typedReader->get_subscriber()->get_participant();
00317         FACE::RETURN_CODE_TYPE ret_code;
00318         populate_header_received(connection_id_, participant, sinfo, ret_code);
00319         if (ret_code != FACE::RC_NO_ERROR) {
00320           update_status(connection_id_, ret_code);
00321           return;
00322         }
00323 
00324         FACE::TRANSACTION_ID_TYPE transaction_id = ++Entities::instance()->receivers_[connection_id_]->last_msg_tid;
00325         update_status(connection_id_, DDS::RETCODE_OK);
00326         FACE::RETURN_CODE_TYPE retcode;
00327         GuardType guard(callbacks_lock_);
00328         if (OpenDDS::DCPS::DCPS_debug_level > 3) {
00329           ACE_DEBUG((LM_DEBUG, "Listener::on_data_available - invoking %d callbacks\n", callbacks_.size()));
00330         }
00331         for (size_t i = 0; i < callbacks_.size(); ++i) {
00332           retcode = FACE::RC_NO_ERROR;
00333           callbacks_.at(i)(transaction_id /*Transaction_ID*/, sample, msg_id, sizeof(Msg), 0 /*WAITSET_TYPE*/, retcode);
00334           if (retcode != FACE::RC_NO_ERROR) {
00335             ACE_ERROR((LM_ERROR, "ERROR: Listener::on_data_available - callback %d returned retcode: %d\n", i, retcode));
00336           }
00337         }
00338       }
00339     }
00340   }
00341 
00342   typedef ACE_SYNCH_MUTEX     LockType;
00343   typedef ACE_Guard<LockType> GuardType;
00344   LockType callbacks_lock_;
00345   OPENDDS_VECTOR(Callback) callbacks_;
00346   const FACE::CONNECTION_ID_TYPE connection_id_;
00347 };
00348 
00349 template <typename Msg>
00350 void register_callback(FACE::CONNECTION_ID_TYPE connection_id,
00351                        const FACE::WAITSET_TYPE /*waitset*/,
00352                        void (*callback)(FACE::TRANSACTION_ID_TYPE, Msg&,
00353                                         FACE::MESSAGE_TYPE_GUID,
00354                                         FACE::MESSAGE_SIZE_TYPE,
00355                                         const FACE::WAITSET_TYPE,
00356                                         FACE::RETURN_CODE_TYPE&),
00357                        FACE::MESSAGE_SIZE_TYPE max_message_size,
00358                        FACE::RETURN_CODE_TYPE& return_code)
00359 {
00360   Entities::ConnIdToReceiverMap& readers = Entities::instance()->receivers_;
00361   if (!readers.count(connection_id)) {
00362     return_code = FACE::INVALID_PARAM;
00363     return;
00364   }
00365   if(!Entities::instance()->connections_.count(connection_id)) {
00366     return_code = FACE::INVALID_PARAM;
00367     return;
00368   }
00369   FACE::TRANSPORT_CONNECTION_STATUS_TYPE status =
00370     Entities::instance()->connections_[connection_id].connection_status;
00371   if (max_message_size < status.MAX_MESSAGE_SIZE) {
00372     return_code = FACE::INVALID_PARAM;
00373     return;
00374   }
00375   DDS::DataReaderListener_ptr existing_listener = readers[connection_id]->dr->get_listener();
00376   if (existing_listener) {
00377     Listener<Msg>* typedListener = dynamic_cast<Listener<Msg>*>(existing_listener);
00378     typedListener->add_callback(callback);
00379   } else {
00380     DDS::DataReaderListener_var listener = new Listener<Msg>(callback, connection_id);
00381     readers[connection_id]->dr->set_listener(listener, DDS::DATA_AVAILABLE_STATUS);
00382   }
00383   if (readers[connection_id]->status_valid != FACE::VALID) {
00384     Entities::FaceReceiver* tmp = readers[connection_id];
00385     readers[connection_id] = new Entities::DDSTypedAdapter<Msg>(*readers[connection_id]);
00386     delete tmp;
00387   }
00388   readers[connection_id]->status_valid = FACE::VALID;
00389 
00390   return_code = FACE::RC_NO_ERROR;
00391 }
00392 
00393 } }
00394 
00395 #endif

Generated on Fri Feb 12 20:05:23 2016 for OpenDDS by  doxygen 1.4.7