OpenDDS  Snapshot(2023/04/07-19:43)
Namespaces | Classes | Enumerations | Functions
OpenDDS::FaceTSS Namespace Reference

Namespaces

 config
 

Classes

class  Entities
 
class  Listener
 

Enumerations

enum  { NSEC_PER_SEC = 1000000000 }
 

Functions

FACE::RETURN_CODE_TYPE update_status (FACE::CONNECTION_ID_TYPE connection_id, DDS::ReturnCode_t retcode)
 
DDS::Duration_t convertTimeout (FACE::TIMEOUT_TYPE timeout)
 
FACE::SYSTEM_TIME_TYPE convertDuration (const DDS::Duration_t &duration)
 
FACE::SYSTEM_TIME_TYPE convertTime (const DDS::Time_t &timestamp)
 
FACE::MESSAGE_INSTANCE_GUID create_message_instance_guid (const OpenDDS::DCPS::GUID_t &pub, const CORBA::LongLong &orig_seq)
 
void populate_header_received (const FACE::CONNECTION_ID_TYPE &connection_id, const DDS::DomainParticipant_var part, const DDS::SampleInfo &sinfo, FACE::RETURN_CODE_TYPE &return_code)
 
template<typename Msg >
void receive_message (FACE::CONNECTION_ID_TYPE connection_id, FACE::TIMEOUT_TYPE timeout, FACE::TRANSACTION_ID_TYPE &transaction_id, Msg &message, FACE::MESSAGE_SIZE_TYPE message_size, FACE::RETURN_CODE_TYPE &return_code)
 
template<typename Msg >
void send_message (FACE::CONNECTION_ID_TYPE connection_id, FACE::TIMEOUT_TYPE timeout, FACE::TRANSACTION_ID_TYPE &, const Msg &message, FACE::MESSAGE_SIZE_TYPE message_size, FACE::RETURN_CODE_TYPE &return_code)
 
template<typename Msg >
void register_callback (FACE::CONNECTION_ID_TYPE connection_id, const FACE::WAITSET_TYPE, void(*callback)(FACE::TRANSACTION_ID_TYPE, Msg &, FACE::MESSAGE_TYPE_GUID, FACE::MESSAGE_SIZE_TYPE, const FACE::WAITSET_TYPE, FACE::RETURN_CODE_TYPE &), FACE::MESSAGE_SIZE_TYPE max_message_size, FACE::RETURN_CODE_TYPE &return_code)
 

Enumeration Type Documentation

◆ anonymous enum

anonymous enum
Enumerator
NSEC_PER_SEC 

Definition at line 777 of file FaceTSS.cpp.

Function Documentation

◆ convertDuration()

OpenDDS_FACE_Export FACE::SYSTEM_TIME_TYPE OpenDDS::FaceTSS::convertDuration ( const DDS::Duration_t duration)

Definition at line 792 of file FaceTSS.cpp.

References DDS::DURATION_INFINITE_NSEC, DDS::DURATION_INFINITE_SEC, DDS::Duration_t::nanosec, NSEC_PER_SEC, and DDS::Duration_t::sec.

Referenced by FACE::TS::Create_Connection(), and send_message().

793 {
794  if (duration.sec == DDS::DURATION_INFINITE_SEC
795  && duration.nanosec == DDS::DURATION_INFINITE_NSEC) {
796  return FACE::INF_TIME_VALUE;
797  }
798  return duration.nanosec +
799  duration.sec * static_cast<FACE::SYSTEM_TIME_TYPE>(NSEC_PER_SEC);
800 }
const long DURATION_INFINITE_SEC
Definition: DdsDcpsCore.idl:72
const unsigned long DURATION_INFINITE_NSEC
Definition: DdsDcpsCore.idl:73
unsigned long nanosec
Definition: DdsDcpsCore.idl:69

◆ convertTime()

OpenDDS_FACE_Export FACE::SYSTEM_TIME_TYPE OpenDDS::FaceTSS::convertTime ( const DDS::Time_t timestamp)

Definition at line 802 of file FaceTSS.cpp.

References DDS::Time_t::nanosec, NSEC_PER_SEC, and DDS::Time_t::sec.

Referenced by populate_header_received().

803 {
804  return timestamp.nanosec +
805  timestamp.sec * static_cast<FACE::SYSTEM_TIME_TYPE>(NSEC_PER_SEC);
806 }
unsigned long nanosec

◆ convertTimeout()

OpenDDS_FACE_Export DDS::Duration_t OpenDDS::FaceTSS::convertTimeout ( FACE::TIMEOUT_TYPE  timeout)

Definition at line 779 of file FaceTSS.cpp.

References DDS::DURATION_INFINITE_NSEC, DDS::DURATION_INFINITE_SEC, and NSEC_PER_SEC.

Referenced by receive_message().

780 {
781  if (timeout == FACE::INF_TIME_VALUE) {
782  static const DDS::Duration_t dds_inf = {DDS::DURATION_INFINITE_SEC,
784  return dds_inf;
785  }
786 
787  DDS::Duration_t dur = {static_cast<int>(timeout / NSEC_PER_SEC),
788  static_cast<unsigned int>(timeout % NSEC_PER_SEC)};
789  return dur;
790 }
const long DURATION_INFINITE_SEC
Definition: DdsDcpsCore.idl:72
const unsigned long DURATION_INFINITE_NSEC
Definition: DdsDcpsCore.idl:73

◆ create_message_instance_guid()

OpenDDS_FACE_Export FACE::MESSAGE_INSTANCE_GUID OpenDDS::FaceTSS::create_message_instance_guid ( const OpenDDS::DCPS::GUID_t pub,
const CORBA::LongLong orig_seq 
)

Definition at line 809 of file FaceTSS.cpp.

References ACE_DEBUG, ACE::crc32(), and LM_WARNING.

Referenced by populate_header_received().

810 {
811  OpenDDS::DCPS::GuidConverter writer(pub);
812 
813  FACE::MESSAGE_INSTANCE_GUID message_instance_guid;
814  FACE::LongLong mig_low;
815  FACE::LongLong masked_seq;
816 
817  //Until MESSAGE_INSTANCE_GUID becomes 128 bit GUID, use checksum to represent Prefix
818  FACE::Long prefix_representation = ACE::crc32(reinterpret_cast<const void*>(&pub), sizeof(pub));
819  masked_seq = orig_seq >> 32;
820 
821  if (masked_seq) {
822  ACE_DEBUG((LM_WARNING, "(%P|%t) WARNING: create_message_instance_guid - seq does not fit in FACE::Long, truncating high bits to fit\n"));
823  }
824  mig_low = orig_seq & 0xFFFFFFFF;
825  message_instance_guid = (((FACE::LongLong) prefix_representation) << 32 ) | ((FACE::LongLong) mig_low);
826 /*
827  //TODO: This is initial work toward defining how a 128 bit guid could be created
828  //in the future for a Message Instance Guid when supported.
829  // 13 byte prefix contains identifying pieces of guid
830  // 3 byte seq - truncated sequence from the sample
831  typedef CORBA::Octet MsgInstGuidPrefix_t[13];
832  typedef CORBA::Octet MsgInstGuidSeq_t[3];
833  MsgInstGuidPrefix_t migPrefix;
834  MsgInstGuidSeq_t migSeq;
835  ACE_OS::memcpy(&migPrefix[0], &pub.guidPrefix[2], 10);
836  ACE_OS::memcpy(&migPrefix[10], &pub.entityId.entityKey[0], 3);
837  masked_seq = orig_seq >> 24;
838 
839  if (masked_seq) {
840  ACE_DEBUG((LM_WARNING, "(%P|%t) WARNING: create_message_instance_guid - seq does not fit in 3 bytes, truncating high bits to fit\n"));
841  }
842  FACE::LongLong masked = orig_seq & 0xFFFFFF;
843 
844 #ifdef ACE_BIG_ENDIAN
845  masked <<= 8 * (sizeof(FACE::LongLong)-3);
846 #endif
847  ACE_OS::memcpy(&migSeq[0], &masked, 3);
848 */
849  return message_instance_guid;
850 }
#define ACE_DEBUG(X)
Conversion processing and value testing utilities for RTPS GUID_t types.
Definition: GuidConverter.h:62
ACE_UINT32 crc32(const char *str)

◆ populate_header_received()

OpenDDS_FACE_Export void OpenDDS::FaceTSS::populate_header_received ( const FACE::CONNECTION_ID_TYPE &  connection_id,
const DDS::DomainParticipant_var  part,
const DDS::SampleInfo sinfo,
FACE::RETURN_CODE_TYPE &  return_code 
)

Definition at line 852 of file FaceTSS.cpp.

References ACE_DEBUG, ACE_ERROR, ACE_TEXT(), DDS::ALIVE_INSTANCE_STATE, DDS::ANY_SAMPLE_STATE, DDS::ANY_VIEW_STATE, OpenDDS::DCPS::BUILT_IN_PUBLICATION_TOPIC, convertTime(), create_message_instance_guid(), OpenDDS::DCPS::DCPS_debug_level, DDS::LifespanQosPolicy::duration, DDS::DURATION_INFINITE_NSEC, DDS::DURATION_INFINITE_SEC, OpenDDS::DCPS::DomainParticipantImpl::get_id(), OpenDDS::DCPS::DomainParticipantImpl::get_repoid(), OpenDDS::DCPS::GUID_t::guidPrefix, header, OpenDDS::FaceTSS::Entities::instance(), CORBA::is_nil(), DDS::DataWriterQos::lifespan, LM_DEBUG, LM_ERROR, DDS::Duration_t::nanosec, DDS::Time_t::nanosec, OpenDDS::DCPS::TimePoint_T< SystemClock >::now(), OPENDDS_END_VERSIONED_NAMESPACE_DECL, DDS::SampleInfo::opendds_reserved_publication_seq, DDS::SampleInfo::publication_handle, OpenDDS::FaceTSS::Entities::receivers_, DDS::RETCODE_NO_DATA, DDS::RETCODE_OK, DDS::Duration_t::sec, DDS::Time_t::sec, OpenDDS::FaceTSS::Entities::senders_, DDS::SampleInfo::source_timestamp, OpenDDS::DCPS::TimePoint_T< AceClock >::to_dds_time(), DDS::DataWriterQos::user_data, and DDS::UserDataQosPolicy::value.

Referenced by OpenDDS::FaceTSS::Listener< Msg >::on_data_available(), and receive_message().

856 {
857  Entities::ConnIdToReceiverMap& readers = Entities::instance()->receivers_;
858  if (!readers.count(connection_id)) {
859  return_code = FACE::INVALID_PARAM;
860  return;
861  }
862  FACE::TS::MessageHeader& header = readers[connection_id]->last_msg_header;
863 
864  header.platform_view_guid = Entities::instance()->connections_[connection_id].platform_view_guid;
865 
866  DDS::Subscriber_var temp_sub = readers[connection_id]->dr->get_subscriber();
867  DDS::DomainParticipant_var temp_dp = temp_sub->get_participant();
869  if (!dpi) {
870  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) populate_header_received: ")
871  ACE_TEXT("failed to get DomainParticipantImpl.\n")));
872  return_code = FACE::NOT_AVAILABLE;
873  return;
874  }
875  const OpenDDS::DCPS::GUID_t pub = dpi->get_repoid(sinfo.publication_handle);
876  header.message_instance_guid = create_message_instance_guid(pub, sinfo.opendds_reserved_publication_seq);
877 
878  header.message_timestamp = convertTime(sinfo.source_timestamp);
880 
881  readers[connection_id]->sum_recvd_msgs_latency += (convertTime(now.to_dds_time()) - header.message_timestamp);
882  ++readers[connection_id]->total_msgs_recvd;
883 
885  ACE_DEBUG((LM_DEBUG, "(%P|%t) populate_header_received: Latency is now (tot_latency %d / tot_msgs_recvd %d): %d\n",
886  readers[connection_id]->sum_recvd_msgs_latency,
887  readers[connection_id]->total_msgs_recvd,
888  readers[connection_id]->sum_recvd_msgs_latency/readers[connection_id]->total_msgs_recvd));
889  }
890 
891  DDS::UserDataQosPolicy qos_user_data;
892  DDS::LifespanQosPolicy qos_lifespan;
893  const OpenDDS::DCPS::GUID_t sub = dpi->get_id();
894 
895  // Test if the reader and writer share a participant
896  if (std::memcmp(pub.guidPrefix, sub.guidPrefix, sizeof(sub.guidPrefix)) == 0) {
897 
898  Entities::ConnIdToSenderMap& writers = Entities::instance()->senders_;
899  Entities::ConnIdToSenderMap::iterator wtrIter = writers.begin();
900 
901  // Search domain writers for the sending instance
902  while (wtrIter != writers.end()) {
903  if (wtrIter->second.dw->get_instance_handle() == sinfo.publication_handle) {
904  break;
905  }
906  ++wtrIter;
907  }
908 
909  if (wtrIter == writers.end()) {
910  ACE_ERROR((LM_ERROR, "(%P|%t) populate_header_received: failed to find matching DataWriter instance.\n"));
911  return_code = FACE::NOT_AVAILABLE;
912  return;
913  }
914 
915  DDS::DataWriterQos qos;
916  wtrIter->second.dw->get_qos(qos);
917  qos_lifespan = qos.lifespan;
918  qos_user_data = qos.user_data;
919 
920  } else {
921  ::DDS::Subscriber_var bit_subscriber
922  = part->get_builtin_subscriber () ;
923 
924  ::DDS::DataReader_var reader
925  = bit_subscriber->lookup_datareader (OpenDDS::DCPS::BUILT_IN_PUBLICATION_TOPIC) ;
926  ::DDS::PublicationBuiltinTopicDataDataReader_var pub_reader
927  = ::DDS::PublicationBuiltinTopicDataDataReader::_narrow (reader.in ());
928  if (CORBA::is_nil (pub_reader.in ())) {
929  ACE_ERROR((LM_ERROR, "(%P|%t) populate_header_received: failed to get BUILT_IN_PUBLICATION_TOPIC datareader.\n"));
930  return_code = FACE::NOT_AVAILABLE;
931  return;
932  }
933 
935  ::DDS::SampleInfoSeq pubinfos(1);
936  ::DDS::PublicationBuiltinTopicDataSeq pubdata(1);
937 
938  ret = pub_reader->read_instance(pubdata,
939  pubinfos,
940  1,
941  sinfo.publication_handle,
945 
946 
947  if (ret != ::DDS::RETCODE_OK && ret != ::DDS::RETCODE_NO_DATA) {
948  ACE_ERROR((LM_ERROR,
949  "(%P|%t) populate_header_received: failed to read BIT publication data.\n"));
950  return_code = FACE::NOT_AVAILABLE;
951  return;
952  }
953 
954  const CORBA::ULong i = 0;
955  qos_lifespan = pubdata[i].lifespan;
956  qos_user_data = pubdata[i].user_data;
957  }
958 
959  header.message_source_guid =
960  (qos_user_data.value[0] << 0) |
961  (qos_user_data.value[1] << 8) |
962  (qos_user_data.value[2] << 16);
963 
964 // ACE_DEBUG((LM_DEBUG, "(%P|%t) populate_header_received: DW lifespan qos value: sec: %d nanosec: %d\n",
965 // pubdata[i].lifespan.duration.sec, pubdata[i].lifespan.duration.nanosec));
966 
967  DDS::Duration_t lifespan = qos_lifespan.duration;
968  if (lifespan.sec != DDS::DURATION_INFINITE_SEC &&
969  lifespan.nanosec != DDS::DURATION_INFINITE_NSEC) {
970  // Finite lifespan. Check if data has expired.
971 
972  const DDS::Time_t tmp = {
973  sinfo.source_timestamp.sec + lifespan.sec,
974  sinfo.source_timestamp.nanosec + lifespan.nanosec
975  };
976 
977  // We assume that the publisher host's clock and subscriber host's
978  // clock are synchronized (allowed by the spec).
979  const OpenDDS::DCPS::SystemTimePoint expiration_time(tmp);
980 
981  if (now >= expiration_time) {
982 // ACE_DEBUG((LM_DEBUG, "(%P|%t) populate_header_received: Last message expired, setting message_validity to INVALID\n"));
983  header.message_validity = FACE::INVALID;
984  return_code = FACE::RC_NO_ERROR;
985  return;
986  }
987  }
988 // ACE_DEBUG((LM_DEBUG, "(%P|%t) populate_header_received: Setting message_validity to VALID\n"));
989  header.message_validity = FACE::VALID;
990  return_code = FACE::RC_NO_ERROR;
991 }
FACE::MESSAGE_INSTANCE_GUID create_message_instance_guid(const OpenDDS::DCPS::GUID_t &pub, const CORBA::LongLong &orig_seq)
Definition: FaceTSS.cpp:809
#define ACE_DEBUG(X)
unsigned long nanosec
#define ACE_ERROR(X)
GUID_t get_repoid(DDS::InstanceHandle_t id) const
InstanceHandle_t publication_handle
key GuidPrefix_t guidPrefix
Definition: DdsDcpsGuid.idl:58
const long DURATION_INFINITE_SEC
Definition: DdsDcpsCore.idl:72
long long opendds_reserved_publication_seq
const ReturnCode_t RETCODE_OK
static TimePoint_T< SystemClock > now()
Definition: TimePoint_T.inl:41
const ReturnCode_t RETCODE_NO_DATA
UserDataQosPolicy user_data
Christopher Diggins *renamed files *fixing compilation errors *adding Visual C project file *removed make Max Lybbert *removed references to missing and unused header
Definition: CHANGELOG.txt:8
const InstanceStateKind ALIVE_INSTANCE_STATE
Implements the OpenDDS::DCPS::DomainParticipant interfaces.
ACE_CDR::ULong ULong
const unsigned long DURATION_INFINITE_NSEC
Definition: DdsDcpsCore.idl:73
const char *const BUILT_IN_PUBLICATION_TOPIC
Time_t source_timestamp
const SampleStateMask ANY_SAMPLE_STATE
LifespanQosPolicy lifespan
FACE::SYSTEM_TIME_TYPE convertTime(const DDS::Time_t &timestamp)
Definition: FaceTSS.cpp:802
unsigned long nanosec
Definition: DdsDcpsCore.idl:69
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_TEXT("TCP_Factory")
const ViewStateMask ANY_VIEW_STATE
DDS::Time_t to_dds_time() const
Definition: TimePoint_T.inl:89
sequence< SampleInfo > SampleInfoSeq
Boolean is_nil(T x)

◆ receive_message()

template<typename Msg >
void OpenDDS::FaceTSS::receive_message ( FACE::CONNECTION_ID_TYPE  connection_id,
FACE::TIMEOUT_TYPE  timeout,
FACE::TRANSACTION_ID_TYPE &  transaction_id,
Msg &  message,
FACE::MESSAGE_SIZE_TYPE  message_size,
FACE::RETURN_CODE_TYPE &  return_code 
)

Definition at line 144 of file FaceTSS.h.

References ACE_ERROR, DDS::ALIVE_INSTANCE_STATE, DDS::ANY_SAMPLE_STATE, DDS::ANY_VIEW_STATE, convertTimeout(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::FaceTSS::Entities::FaceReceiver::dr, OpenDDS::FaceTSS::Entities::instance(), LM_ERROR, populate_header_received(), OpenDDS::FaceTSS::Entities::receivers_, DDS::RETCODE_BAD_PARAMETER, DDS::RETCODE_NO_DATA, DDS::RETCODE_OK, DDS::RETCODE_TIMEOUT, OpenDDS::FaceTSS::Entities::DDSAdapter::status_valid, and update_status().

150 {
151  try {
152  Entities::ConnIdToReceiverMap& readers = Entities::instance()->receivers_;
153  if (!readers.count(connection_id)) {
154  return_code = FACE::INVALID_PARAM;
155  return;
156  }
157  if (!Entities::instance()->connections_.count(connection_id)) {
158  return_code = FACE::INVALID_PARAM;
159  return;
160  }
161  FACE::TRANSPORT_CONNECTION_STATUS_TYPE status =
162  Entities::instance()->connections_[connection_id].connection_status;
163  if (message_size < status.MAX_MESSAGE_SIZE) {
164  return_code = FACE::INVALID_PARAM;
165  return;
166  }
167  typedef typename DCPS::DDSTraits<Msg>::DataReaderType DataReader;
168  const typename DataReader::_var_type typedReader =
169  DataReader::_narrow(readers[connection_id]->dr);
170  if (!typedReader) {
171  return_code = update_status(connection_id, DDS::RETCODE_BAD_PARAMETER);
172  return;
173  }
174  if (readers[connection_id]->status_valid != FACE::VALID) {
175  Entities::FaceReceiver* tmp = readers[connection_id];
176  readers[connection_id] = new Entities::DDSTypedAdapter<Msg>(*readers[connection_id]);
177  delete tmp;
178  }
179  readers[connection_id]->status_valid = FACE::VALID;
180 
181  const DDS::ReadCondition_var rc =
182  typedReader->create_readcondition(DDS::ANY_SAMPLE_STATE,
185  const DDS::WaitSet_var ws = new DDS::WaitSet;
186  ws->attach_condition(rc);
187 
188  DDS::ConditionSeq active;
189  const DDS::Duration_t ddsTimeout = convertTimeout(timeout);
190  DDS::ReturnCode_t ret = ws->wait(active, ddsTimeout);
191  ws->detach_condition(rc);
192 
193  if (ret == DDS::RETCODE_TIMEOUT) {
194  typedReader->delete_readcondition(rc);
195  return_code = update_status(connection_id, ret);
196  return;
197  }
198 
199  typename DCPS::DDSTraits<Msg>::MessageSequenceType seq;
200  DDS::SampleInfoSeq sinfo;
201  ret = typedReader->take_w_condition(seq, sinfo, 1 /*max*/, rc);
202  typedReader->delete_readcondition(rc);
203  if (ret == DDS::RETCODE_OK && sinfo[0].valid_data) {
204  DDS::Subscriber_var subscriber = typedReader->get_subscriber();
205  DDS::DomainParticipant_var participant = subscriber->get_participant();
206  FACE::RETURN_CODE_TYPE ret_code;
207  populate_header_received(connection_id, participant, sinfo[0], ret_code);
208  if (ret_code != FACE::RC_NO_ERROR) {
209  return_code = update_status(connection_id, ret_code);
210  return;
211  }
212 
213  transaction_id = ++readers[connection_id]->last_msg_tid;
214 
215  message = seq[0];
216  return_code = update_status(connection_id, ret);
217  return;
218  }
219  return_code = update_status(connection_id, DDS::RETCODE_NO_DATA);
220  } catch (const CORBA::BAD_PARAM&) {
222  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: receive_message - INVALID_PARAM\n"));
223  }
224  return_code = FACE::INVALID_PARAM;
225  }
226 }
#define ACE_ERROR(X)
DDS::Duration_t convertTimeout(FACE::TIMEOUT_TYPE timeout)
Definition: FaceTSS.cpp:779
const ReturnCode_t RETCODE_OK
sequence< Condition > ConditionSeq
const ReturnCode_t RETCODE_BAD_PARAMETER
void populate_header_received(const FACE::CONNECTION_ID_TYPE &connection_id, const DDS::DomainParticipant_var part, const DDS::SampleInfo &sinfo, FACE::RETURN_CODE_TYPE &return_code)
Definition: FaceTSS.cpp:852
local interface<%TYPE%> DataReader
Definition: IDLTemplate.txt:66
const ReturnCode_t RETCODE_NO_DATA
const InstanceStateKind ALIVE_INSTANCE_STATE
const SampleStateMask ANY_SAMPLE_STATE
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
const ViewStateMask ANY_VIEW_STATE
sequence< SampleInfo > SampleInfoSeq
const ReturnCode_t RETCODE_TIMEOUT
FACE::RETURN_CODE_TYPE update_status(FACE::CONNECTION_ID_TYPE connection_id, DDS::ReturnCode_t retcode)
Definition: FaceTSS.cpp:730

◆ register_callback()

template<typename Msg >
void OpenDDS::FaceTSS::register_callback ( FACE::CONNECTION_ID_TYPE  connection_id,
const FACE::WAITSET_TYPE  ,
void(*)(FACE::TRANSACTION_ID_TYPE, Msg &, FACE::MESSAGE_TYPE_GUID, FACE::MESSAGE_SIZE_TYPE, const FACE::WAITSET_TYPE, FACE::RETURN_CODE_TYPE &)  callback,
FACE::MESSAGE_SIZE_TYPE  max_message_size,
FACE::RETURN_CODE_TYPE &  return_code 
)

Definition at line 362 of file FaceTSS.h.

References ACE_ERROR, OpenDDS::FaceTSS::Listener< Msg >::add_callback(), DDS::DATA_AVAILABLE_STATUS, OpenDDS::FaceTSS::Entities::instance(), LM_ERROR, OPENDDS_END_VERSIONED_NAMESPACE_DECL, OpenDDS::FaceTSS::Entities::receivers_, and OpenDDS::FaceTSS::Entities::DDSAdapter::status_valid.

371 {
372  Entities::ConnIdToReceiverMap& readers = Entities::instance()->receivers_;
373  if (!readers.count(connection_id)) {
374  return_code = FACE::INVALID_PARAM;
375  return;
376  }
377  if(!Entities::instance()->connections_.count(connection_id)) {
378  return_code = FACE::INVALID_PARAM;
379  return;
380  }
381  FACE::TRANSPORT_CONNECTION_STATUS_TYPE status =
382  Entities::instance()->connections_[connection_id].connection_status;
383  if (max_message_size < status.MAX_MESSAGE_SIZE) {
384  return_code = FACE::INVALID_PARAM;
385  return;
386  }
387  DDS::DataReaderListener_var existing_listener = readers[connection_id]->dr->get_listener();
388  if (existing_listener.in()) {
389  Listener<Msg>* typedListener = dynamic_cast<Listener<Msg>*>(existing_listener.in());
390  if (typedListener) {
391  typedListener->add_callback(callback);
392  } else {
393  ACE_ERROR((LM_ERROR, "ERROR: register_callback - failed to obtain typed listener\n"));
394  return_code = FACE::INVALID_PARAM;
395  return;
396  }
397  } else {
398  DDS::DataReaderListener_var listener = new Listener<Msg>(callback, connection_id);
399  readers[connection_id]->dr->set_listener(listener, DDS::DATA_AVAILABLE_STATUS);
400  }
401  if (readers[connection_id]->status_valid != FACE::VALID) {
402  Entities::FaceReceiver* tmp = readers[connection_id];
403  readers[connection_id] = new Entities::DDSTypedAdapter<Msg>(*readers[connection_id]);
404  delete tmp;
405  }
406  readers[connection_id]->status_valid = FACE::VALID;
407 
408  return_code = FACE::RC_NO_ERROR;
409 }
#define ACE_ERROR(X)
const StatusKind DATA_AVAILABLE_STATUS

◆ send_message()

template<typename Msg >
void OpenDDS::FaceTSS::send_message ( FACE::CONNECTION_ID_TYPE  connection_id,
FACE::TIMEOUT_TYPE  timeout,
FACE::TRANSACTION_ID_TYPE &  ,
const Msg &  message,
FACE::MESSAGE_SIZE_TYPE  message_size,
FACE::RETURN_CODE_TYPE &  return_code 
)

Definition at line 229 of file FaceTSS.h.

References convertDuration(), DataWriter, DDS::HANDLE_NIL, OpenDDS::FaceTSS::Entities::instance(), DDS::ReliabilityQosPolicy::kind, DDS::ReliabilityQosPolicy::max_blocking_time, DDS::DataWriterQos::reliability, DDS::RELIABLE_RELIABILITY_QOS, DDS::RETCODE_BAD_PARAMETER, OpenDDS::FaceTSS::Entities::senders_, and update_status().

235 {
236  if(!Entities::instance()->connections_.count(connection_id)) {
237  return_code = FACE::INVALID_PARAM;
238  return;
239  }
240  FACE::TRANSPORT_CONNECTION_STATUS_TYPE status =
241  Entities::instance()->connections_[connection_id].connection_status;
242  if (message_size < status.MAX_MESSAGE_SIZE) {
243  return_code = FACE::INVALID_PARAM;
244  return;
245  }
246  Entities::ConnIdToSenderMap& writers = Entities::instance()->senders_;
247  if (!writers.count(connection_id)) {
248  return_code = FACE::INVALID_PARAM;
249  return;
250  }
251 
252  typedef typename DCPS::DDSTraits<Msg>::DataWriterType DataWriter;
253  const typename DataWriter::_var_type typedWriter =
254  DataWriter::_narrow(writers[connection_id].dw);
255  if (!typedWriter) {
256  return_code = update_status(connection_id, DDS::RETCODE_BAD_PARAMETER);
257  return;
258  }
259  writers[connection_id].status_valid = FACE::VALID;
260 
261  DDS::DataWriterQos dw_qos;
262  typedWriter->get_qos(dw_qos);
263  FACE::SYSTEM_TIME_TYPE max_blocking_time = convertDuration(dw_qos.reliability.max_blocking_time);
265  timeout != FACE::INF_TIME_VALUE &&
266  ((max_blocking_time == FACE::INF_TIME_VALUE) || (timeout < max_blocking_time))) {
267  return_code = update_status(connection_id, DDS::RETCODE_BAD_PARAMETER);
268  return;
269  }
270 
271  return_code = update_status(connection_id, typedWriter->write(message, DDS::HANDLE_NIL));
272 }
const ReturnCode_t RETCODE_BAD_PARAMETER
ReliabilityQosPolicyKind kind
const InstanceHandle_t HANDLE_NIL
ReliabilityQosPolicy reliability
local interface<%TYPE%> DataWriter
Definition: IDLTemplate.txt:21
FACE::RETURN_CODE_TYPE update_status(FACE::CONNECTION_ID_TYPE connection_id, DDS::ReturnCode_t retcode)
Definition: FaceTSS.cpp:730
FACE::SYSTEM_TIME_TYPE convertDuration(const DDS::Duration_t &duration)
Definition: FaceTSS.cpp:792

◆ update_status()

OpenDDS_FACE_Export FACE::RETURN_CODE_TYPE OpenDDS::FaceTSS::update_status ( FACE::CONNECTION_ID_TYPE  connection_id,
DDS::ReturnCode_t  retcode 
)

Definition at line 730 of file FaceTSS.cpp.

References OpenDDS::FaceTSS::Entities::instance(), DDS::RETCODE_ALREADY_DELETED, DDS::RETCODE_BAD_PARAMETER, DDS::RETCODE_ERROR, DDS::RETCODE_ILLEGAL_OPERATION, DDS::RETCODE_IMMUTABLE_POLICY, DDS::RETCODE_INCONSISTENT_POLICY, DDS::RETCODE_NO_DATA, DDS::RETCODE_NOT_ENABLED, DDS::RETCODE_OK, DDS::RETCODE_OUT_OF_RESOURCES, DDS::RETCODE_PRECONDITION_NOT_MET, DDS::RETCODE_TIMEOUT, and DDS::RETCODE_UNSUPPORTED.

Referenced by OpenDDS::FaceTSS::Listener< Msg >::on_data_available(), FACE::TS::receive_header(), receive_message(), and send_message().

732 {
733  FACE::TRANSPORT_CONNECTION_STATUS_TYPE& status =
734  Entities::instance()->connections_[connection_id].connection_status;
735  FACE::RETURN_CODE_TYPE rc = FACE::INVALID_PARAM;
736 
737  switch (retcode) {
738  case DDS::RETCODE_OK:
739  status.LAST_MSG_VALIDITY = FACE::VALID;
740  return FACE::RC_NO_ERROR;
741 
742  case DDS::RETCODE_ERROR:
743  rc = FACE::CONNECTION_CLOSED; break;
744 
746  rc = FACE::INVALID_PARAM; break;
747 
749  rc = FACE::DATA_BUFFER_TOO_SMALL; break;
750 
753  rc = FACE::INVALID_MODE; break;
754 
757  rc = FACE::INVALID_CONFIG; break;
758 
760  rc = FACE::CONNECTION_CLOSED; break;
761 
763  rc = FACE::TIMED_OUT; break;
764 
767  rc = FACE::NOT_AVAILABLE; break;
768 
770  rc = FACE::PERMISSION_DENIED; break;
771  }
772 
773  status.LAST_MSG_VALIDITY = FACE::INVALID;
774  return rc;
775 }
const ReturnCode_t RETCODE_ILLEGAL_OPERATION
const ReturnCode_t RETCODE_OK
const ReturnCode_t RETCODE_BAD_PARAMETER
const ReturnCode_t RETCODE_NO_DATA
const ReturnCode_t RETCODE_ALREADY_DELETED
const ReturnCode_t RETCODE_PRECONDITION_NOT_MET
const ReturnCode_t RETCODE_INCONSISTENT_POLICY
const ReturnCode_t RETCODE_ERROR
const ReturnCode_t RETCODE_IMMUTABLE_POLICY
const ReturnCode_t RETCODE_OUT_OF_RESOURCES
const ReturnCode_t RETCODE_UNSUPPORTED
const ReturnCode_t RETCODE_TIMEOUT
const ReturnCode_t RETCODE_NOT_ENABLED