OpenDDS::FaceTSS Namespace Reference

Namespaces

namespace  config

Classes

class  Entities
class  Listener

Enumerations

enum  { NSEC_PER_SEC = 1000000000 }

Functions

FACE::RETURN_CODE_TYPE update_status (FACE::CONNECTION_ID_TYPE connection_id, DDS::ReturnCode_t retcode)
DDS::Duration_t convertTimeout (FACE::TIMEOUT_TYPE timeout)
FACE::SYSTEM_TIME_TYPE convertDuration (const DDS::Duration_t &duration)
FACE::SYSTEM_TIME_TYPE convertTime (const DDS::Time_t &timestamp)
FACE::MESSAGE_INSTANCE_GUID create_message_instance_guid (const OpenDDS::DCPS::RepoId &pub, const CORBA::LongLong &orig_seq)
void populate_header_received (const FACE::CONNECTION_ID_TYPE &connection_id, const DDS::DomainParticipant_var part, const DDS::SampleInfo &sinfo, FACE::RETURN_CODE_TYPE &return_code)
template<typename Msg >
void receive_message (FACE::CONNECTION_ID_TYPE connection_id, FACE::TIMEOUT_TYPE timeout, FACE::TRANSACTION_ID_TYPE &transaction_id, Msg &message, FACE::MESSAGE_SIZE_TYPE message_size, FACE::RETURN_CODE_TYPE &return_code)
template<typename Msg >
void send_message (FACE::CONNECTION_ID_TYPE connection_id, FACE::TIMEOUT_TYPE timeout, FACE::TRANSACTION_ID_TYPE &, const Msg &message, FACE::MESSAGE_SIZE_TYPE message_size, FACE::RETURN_CODE_TYPE &return_code)
template<typename Msg >
void register_callback (FACE::CONNECTION_ID_TYPE connection_id, const FACE::WAITSET_TYPE, void(*callback)(FACE::TRANSACTION_ID_TYPE, Msg &, FACE::MESSAGE_TYPE_GUID, FACE::MESSAGE_SIZE_TYPE, const FACE::WAITSET_TYPE, FACE::RETURN_CODE_TYPE &), FACE::MESSAGE_SIZE_TYPE max_message_size, FACE::RETURN_CODE_TYPE &return_code)

Enumeration Type Documentation

anonymous enum
Enumerator:
NSEC_PER_SEC 

Definition at line 775 of file FaceTSS.cpp.

00775 { NSEC_PER_SEC = 1000000000 };


Function Documentation

OpenDDS_FACE_Export FACE::SYSTEM_TIME_TYPE OpenDDS::FaceTSS::convertDuration ( const DDS::Duration_t duration  ) 

Referenced by FACE::TS::Create_Connection(), and send_message().

Here is the caller graph for this function:

OpenDDS_FACE_Export FACE::SYSTEM_TIME_TYPE OpenDDS::FaceTSS::convertTime ( const DDS::Time_t timestamp  ) 
OpenDDS_FACE_Export DDS::Duration_t OpenDDS::FaceTSS::convertTimeout ( FACE::TIMEOUT_TYPE  timeout  ) 

Referenced by receive_message().

Here is the caller graph for this function:

OpenDDS_FACE_Export FACE::MESSAGE_INSTANCE_GUID OpenDDS::FaceTSS::create_message_instance_guid ( const OpenDDS::DCPS::RepoId pub,
const CORBA::LongLong orig_seq 
)
OpenDDS_FACE_Export void OpenDDS::FaceTSS::populate_header_received ( const FACE::CONNECTION_ID_TYPE &  connection_id,
const DDS::DomainParticipant_var  part,
const DDS::SampleInfo sinfo,
FACE::RETURN_CODE_TYPE &  return_code 
)

Referenced by OpenDDS::FaceTSS::Listener< Msg >::on_data_available(), and receive_message().

Here is the caller graph for this function:

template<typename Msg >
void OpenDDS::FaceTSS::receive_message ( FACE::CONNECTION_ID_TYPE  connection_id,
FACE::TIMEOUT_TYPE  timeout,
FACE::TRANSACTION_ID_TYPE &  transaction_id,
Msg &  message,
FACE::MESSAGE_SIZE_TYPE  message_size,
FACE::RETURN_CODE_TYPE &  return_code 
) [inline]

Definition at line 145 of file FaceTSS.h.

References DDS::ALIVE_INSTANCE_STATE, DDS::ANY_SAMPLE_STATE, DDS::ANY_VIEW_STATE, convertTimeout(), DataReader, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::FaceTSS::Entities::instance(), LM_ERROR, populate_header_received(), OpenDDS::FaceTSS::Entities::receivers_, DDS::RETCODE_BAD_PARAMETER, DDS::RETCODE_NO_DATA, DDS::RETCODE_OK, DDS::RETCODE_TIMEOUT, status, OpenDDS::FaceTSS::Entities::DDSAdapter::status_valid, and update_status().

00151 {
00152   try {
00153     Entities::ConnIdToReceiverMap& readers = Entities::instance()->receivers_;
00154     if (!readers.count(connection_id)) {
00155       return_code = FACE::INVALID_PARAM;
00156       return;
00157     }
00158     if (!Entities::instance()->connections_.count(connection_id)) {
00159       return_code = FACE::INVALID_PARAM;
00160       return;
00161     }
00162     FACE::TRANSPORT_CONNECTION_STATUS_TYPE status =
00163       Entities::instance()->connections_[connection_id].connection_status;
00164     if (message_size < status.MAX_MESSAGE_SIZE) {
00165       return_code = FACE::INVALID_PARAM;
00166       return;
00167     }
00168     typedef typename DCPS::DDSTraits<Msg>::DataReaderType DataReader;
00169     const typename DataReader::_var_type typedReader =
00170       DataReader::_narrow(readers[connection_id]->dr);
00171     if (!typedReader) {
00172       return_code = update_status(connection_id, DDS::RETCODE_BAD_PARAMETER);
00173       return;
00174     }
00175     if (readers[connection_id]->status_valid != FACE::VALID) {
00176       Entities::FaceReceiver* tmp = readers[connection_id];
00177       readers[connection_id] = new Entities::DDSTypedAdapter<Msg>(*readers[connection_id]);
00178       delete tmp;
00179     }
00180     readers[connection_id]->status_valid = FACE::VALID;
00181 
00182     const DDS::ReadCondition_var rc =
00183       typedReader->create_readcondition(DDS::ANY_SAMPLE_STATE,
00184         DDS::ANY_VIEW_STATE,
00185         DDS::ALIVE_INSTANCE_STATE);
00186     const DDS::WaitSet_var ws = new DDS::WaitSet;
00187     ws->attach_condition(rc);
00188 
00189     DDS::ConditionSeq active;
00190     const DDS::Duration_t ddsTimeout = convertTimeout(timeout);
00191     DDS::ReturnCode_t ret = ws->wait(active, ddsTimeout);
00192     ws->detach_condition(rc);
00193 
00194     if (ret == DDS::RETCODE_TIMEOUT) {
00195       typedReader->delete_readcondition(rc);
00196       return_code = update_status(connection_id, ret);
00197       return;
00198     }
00199 
00200     typename DCPS::DDSTraits<Msg>::MessageSequenceType seq;
00201     DDS::SampleInfoSeq sinfo;
00202     ret = typedReader->take_w_condition(seq, sinfo, 1 /*max*/, rc);
00203     typedReader->delete_readcondition(rc);
00204     if (ret == DDS::RETCODE_OK && sinfo[0].valid_data) {
00205       DDS::Subscriber_var subscriber = typedReader->get_subscriber();
00206       DDS::DomainParticipant_var participant = subscriber->get_participant();
00207       FACE::RETURN_CODE_TYPE ret_code;
00208       populate_header_received(connection_id, participant, sinfo[0], ret_code);
00209       if (ret_code != FACE::RC_NO_ERROR) {
00210         return_code = update_status(connection_id, ret_code);
00211         return;
00212       }
00213 
00214       transaction_id = ++readers[connection_id]->last_msg_tid;
00215 
00216       message = seq[0];
00217       return_code = update_status(connection_id, ret);
00218       return;
00219     }
00220     return_code = update_status(connection_id, DDS::RETCODE_NO_DATA);
00221   } catch (const CORBA::BAD_PARAM&) {
00222     if (OpenDDS::DCPS::DCPS_debug_level) {
00223       ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: receive_message - INVALID_PARAM\n"));
00224     }
00225     return_code = FACE::INVALID_PARAM;
00226   }
00227 }

Here is the call graph for this function:

template<typename Msg >
void OpenDDS::FaceTSS::register_callback ( FACE::CONNECTION_ID_TYPE  connection_id,
const FACE::WAITSET_TYPE  ,
void(*)(FACE::TRANSACTION_ID_TYPE, Msg &, FACE::MESSAGE_TYPE_GUID, FACE::MESSAGE_SIZE_TYPE, const FACE::WAITSET_TYPE, FACE::RETURN_CODE_TYPE &)  callback,
FACE::MESSAGE_SIZE_TYPE  max_message_size,
FACE::RETURN_CODE_TYPE &  return_code 
) [inline]

Definition at line 363 of file FaceTSS.h.

References OpenDDS::FaceTSS::Listener< Msg >::add_callback(), DDS::DATA_AVAILABLE_STATUS, OpenDDS::FaceTSS::Entities::instance(), LM_ERROR, OpenDDS::FaceTSS::Entities::receivers_, status, and OpenDDS::FaceTSS::Entities::DDSAdapter::status_valid.

00372 {
00373   Entities::ConnIdToReceiverMap& readers = Entities::instance()->receivers_;
00374   if (!readers.count(connection_id)) {
00375     return_code = FACE::INVALID_PARAM;
00376     return;
00377   }
00378   if(!Entities::instance()->connections_.count(connection_id)) {
00379     return_code = FACE::INVALID_PARAM;
00380     return;
00381   }
00382   FACE::TRANSPORT_CONNECTION_STATUS_TYPE status =
00383     Entities::instance()->connections_[connection_id].connection_status;
00384   if (max_message_size < status.MAX_MESSAGE_SIZE) {
00385     return_code = FACE::INVALID_PARAM;
00386     return;
00387   }
00388   DDS::DataReaderListener_var existing_listener = readers[connection_id]->dr->get_listener();
00389   if (existing_listener.in()) {
00390     Listener<Msg>* typedListener = dynamic_cast<Listener<Msg>*>(existing_listener.in());
00391     if (typedListener) {
00392       typedListener->add_callback(callback);
00393     } else {
00394       ACE_ERROR((LM_ERROR, "ERROR: register_callback - failed to obtain typed listener\n"));
00395       return_code = FACE::INVALID_PARAM;
00396       return;
00397     }
00398   } else {
00399     DDS::DataReaderListener_var listener = new Listener<Msg>(callback, connection_id);
00400     readers[connection_id]->dr->set_listener(listener, DDS::DATA_AVAILABLE_STATUS);
00401   }
00402   if (readers[connection_id]->status_valid != FACE::VALID) {
00403     Entities::FaceReceiver* tmp = readers[connection_id];
00404     readers[connection_id] = new Entities::DDSTypedAdapter<Msg>(*readers[connection_id]);
00405     delete tmp;
00406   }
00407   readers[connection_id]->status_valid = FACE::VALID;
00408 
00409   return_code = FACE::RC_NO_ERROR;
00410 }

Here is the call graph for this function:

template<typename Msg >
void OpenDDS::FaceTSS::send_message ( FACE::CONNECTION_ID_TYPE  connection_id,
FACE::TIMEOUT_TYPE  timeout,
FACE::TRANSACTION_ID_TYPE &  ,
const Msg &  message,
FACE::MESSAGE_SIZE_TYPE  message_size,
FACE::RETURN_CODE_TYPE &  return_code 
) [inline]

Definition at line 230 of file FaceTSS.h.

References convertDuration(), DataWriter, DDS::HANDLE_NIL, OpenDDS::FaceTSS::Entities::instance(), DDS::DataWriterQos::reliability, DDS::RELIABLE_RELIABILITY_QOS, DDS::RETCODE_BAD_PARAMETER, OpenDDS::FaceTSS::Entities::senders_, status, and update_status().

00236 {
00237   if(!Entities::instance()->connections_.count(connection_id)) {
00238     return_code = FACE::INVALID_PARAM;
00239     return;
00240   }
00241   FACE::TRANSPORT_CONNECTION_STATUS_TYPE status =
00242     Entities::instance()->connections_[connection_id].connection_status;
00243   if (message_size < status.MAX_MESSAGE_SIZE) {
00244     return_code = FACE::INVALID_PARAM;
00245     return;
00246   }
00247   Entities::ConnIdToSenderMap& writers = Entities::instance()->senders_;
00248   if (!writers.count(connection_id)) {
00249     return_code = FACE::INVALID_PARAM;
00250     return;
00251   }
00252 
00253   typedef typename DCPS::DDSTraits<Msg>::DataWriterType DataWriter;
00254   const typename DataWriter::_var_type typedWriter =
00255     DataWriter::_narrow(writers[connection_id].dw);
00256   if (!typedWriter) {
00257     return_code = update_status(connection_id, DDS::RETCODE_BAD_PARAMETER);
00258     return;
00259   }
00260   writers[connection_id].status_valid = FACE::VALID;
00261 
00262   DDS::DataWriterQos dw_qos;
00263   typedWriter->get_qos(dw_qos);
00264   FACE::SYSTEM_TIME_TYPE max_blocking_time = convertDuration(dw_qos.reliability.max_blocking_time);
00265   if (dw_qos.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS &&
00266       timeout != FACE::INF_TIME_VALUE &&
00267       ((max_blocking_time == FACE::INF_TIME_VALUE) || (timeout < max_blocking_time))) {
00268     return_code = update_status(connection_id, DDS::RETCODE_BAD_PARAMETER);
00269     return;
00270   }
00271 
00272   return_code = update_status(connection_id, typedWriter->write(message, DDS::HANDLE_NIL));
00273 }

Here is the call graph for this function:

OpenDDS_FACE_Export FACE::RETURN_CODE_TYPE OpenDDS::FaceTSS::update_status ( FACE::CONNECTION_ID_TYPE  connection_id,
DDS::ReturnCode_t  retcode 
)
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1