OpenDDS  Snapshot(2023/04/07-19:43)
Functions
FACE::TS Namespace Reference

Functions

void Initialize (const CONFIGURATION_RESOURCE configuration_file, RETURN_CODE_TYPE &return_code)
 
void Create_Connection (const CONNECTION_NAME_TYPE connection_name, MESSAGING_PATTERN_TYPE pattern, CONNECTION_ID_TYPE &connection_id, CONNECTION_DIRECTION_TYPE &connection_direction, MESSAGE_SIZE_TYPE &max_message_size, TIMEOUT_TYPE, RETURN_CODE_TYPE &return_code)
 
void Get_Connection_Parameters (CONNECTION_NAME_TYPE &connection_name, CONNECTION_ID_TYPE &connection_id, TRANSPORT_CONNECTION_STATUS_TYPE &status, RETURN_CODE_TYPE &return_code)
 
void Unregister_Callback (CONNECTION_ID_TYPE connection_id, RETURN_CODE_TYPE &return_code)
 
void Destroy_Connection (CONNECTION_ID_TYPE connection_id, RETURN_CODE_TYPE &return_code)
 
OpenDDS_FACE_Export void receive_header (FACE::CONNECTION_ID_TYPE connection_id, FACE::TIMEOUT_TYPE, FACE::TRANSACTION_ID_TYPE &transaction_id, FACE::TS::MessageHeader &message_header, FACE::MESSAGE_SIZE_TYPE message_size, FACE::RETURN_CODE_TYPE &return_code)
 
void Receive_Message (CONNECTION_ID_TYPE connection_id, TIMEOUT_TYPE timeout, TRANSACTION_ID_TYPE &transaction_id, MessageHeader &message_header, MESSAGE_SIZE_TYPE message_size, RETURN_CODE_TYPE &return_code)
 

Function Documentation

◆ Create_Connection()

void FACE::TS::Create_Connection ( const CONNECTION_NAME_TYPE  connection_name,
MESSAGING_PATTERN_TYPE  pattern,
CONNECTION_ID_TYPE &  connection_id,
CONNECTION_DIRECTION_TYPE &  connection_direction,
MESSAGE_SIZE_TYPE &  max_message_size,
TIMEOUT_TYPE  ,
RETURN_CODE_TYPE &  return_code 
)

Definition at line 95 of file FaceTSS.cpp.

References ACE_ERROR, OpenDDS::FaceTSS::config::ConnectionSettings::config_name_, OpenDDS::FaceTSS::config::ConnectionSettings::connection_id_, OpenDDS::FaceTSS::Entities::ConnectionInfo::connection_name, OpenDDS::FaceTSS::Entities::ConnectionInfo::connection_status, OpenDDS::FaceTSS::convertDuration(), OpenDDS::FaceTSS::config::QosSettings::datawriter_qos(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::FaceTSS::config::ConnectionSettings::direction_, OpenDDS::FaceTSS::config::ConnectionSettings::domain_id_, DDS::LifespanQosPolicy::duration, OpenDDS::FaceTSS::Entities::instance(), DDS::DataWriterQos::lifespan, LM_ERROR, OpenDDS::FaceTSS::config::TopicSettings::max_message_size_, OpenDDS::FaceTSS::config::ConnectionSettings::participant_id_, OpenDDS::FaceTSS::Entities::ConnectionInfo::platform_view_guid, OpenDDS::FaceTSS::config::TopicSettings::platform_view_guid_, OpenDDS::FaceTSS::config::ConnectionSettings::topic_name_, and OpenDDS::FaceTSS::config::TopicSettings::type_name_.

102 {
103  try {
104  return_code = RC_NO_ERROR;
105 
106  if (pattern != PUB_SUB) {
107  return_code = INVALID_CONFIG;
108  return;
109  }
110 
111  ConnectionSettings connection;
112  TopicSettings topic;
113  QosSettings qos;
114 
115  // Find connection
116  if (!parser.find_connection(connection_name, connection)) {
117  // Find topic
118  if (!parser.find_topic(connection.topic_name_, topic)) {
119  // Find Qos by specified name(s)
120  if (parser.find_qos(connection, qos)) {
121  return_code = INVALID_CONFIG;
122  }
123  } else {
124  return_code = INVALID_CONFIG;
125  }
126  } else {
127  return_code = INVALID_CONFIG;
128  }
129 
130  if (return_code != RC_NO_ERROR) {
131  return;
132  }
133 
134  connection_id = connection.connection_id_;
135  connection_direction = connection.direction_;
136  max_message_size = topic.max_message_size_;
137 
138  return_code = create_opendds_entities(connection_id,
139  connection.participant_id_,
140  connection.domain_id_,
141  connection.topic_name_,
142  topic.type_name_,
143  connection_direction,
144  qos,
145  connection.config_name_);
146  if (return_code != RC_NO_ERROR) {
147  return;
148  }
149 
150  const SYSTEM_TIME_TYPE refresh_period =
151  (connection_direction == SOURCE) ?
152  OpenDDS::FaceTSS::convertDuration(qos.datawriter_qos().lifespan.duration) : 0;
153 
154  const TRANSPORT_CONNECTION_STATUS_TYPE status = {
155  0, // MESSAGE currently set to 0 due to type mismatch in MESSAGE_RANGE_TYPE and MESSAGE_TYPE_GUID
156  max_message_size, // MAX_MESSAGE with no transformations, set to config value
157  max_message_size,
158  connection_direction,
159  0, // WAITING_PROCESSES_OR_MESSAGES
160  refresh_period,
161  INVALID,
162  };
163  Entities::ConnectionInfo& conn_info = Entities::instance()->connections_[connection_id];
164  conn_info.connection_name = connection_name;
165  conn_info.connection_status = status;
166  conn_info.platform_view_guid = topic.platform_view_guid_;
167  } catch (const CORBA::BAD_PARAM&) {
169  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: Create_Connection - INVALID_PARAM\n"));
170  }
171  return_code = INVALID_PARAM;
172  }
173 }
#define ACE_ERROR(X)
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
FACE::SYSTEM_TIME_TYPE convertDuration(const DDS::Duration_t &duration)
Definition: FaceTSS.cpp:792

◆ Destroy_Connection()

void FACE::TS::Destroy_Connection ( CONNECTION_ID_TYPE  connection_id,
RETURN_CODE_TYPE &  return_code 
)

Definition at line 292 of file FaceTSS.cpp.

References ACE_ERROR, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::FaceTSS::Entities::instance(), LM_ERROR, OpenDDS::FaceTSS::Entities::receivers_, and OpenDDS::FaceTSS::Entities::senders_.

294 {
295  try {
296  Entities& entities = *Entities::instance();
297  Entities::ConnIdToSenderMap& writers = entities.senders_;
298  Entities::ConnIdToReceiverMap& readers = entities.receivers_;
299 
300  DDS::DomainParticipant_var dp;
301  bool try_cleanup_participant = false;
302  if (writers.count(connection_id)) {
303  const DDS::DataWriter_var datawriter = writers[connection_id].dw;
304  const DDS::Publisher_var pub = datawriter->get_publisher();
305  writers.erase(connection_id);
306  pub->delete_datawriter(datawriter);
307  dp = pub->get_participant();
308  try_cleanup_participant = cleanup_opendds_publisher(pub);
309 
310  } else if (readers.count(connection_id)) {
311  const DDS::DataReader_var datareader = readers[connection_id]->dr;
312  const DDS::Subscriber_var sub = datareader->get_subscriber();
313  delete readers[connection_id];
314  readers.erase(connection_id);
315  datareader->delete_contained_entities();
316  sub->delete_datareader(datareader);
317  dp = sub->get_participant();
318  try_cleanup_participant = cleanup_opendds_subscriber(sub);
319  }
320 
321  if (!dp) {
322  return_code = INVALID_PARAM;
323  return;
324  }
325 
326  if (try_cleanup_participant) {
327  cleanup_opendds_participant(dp);
328  }
329 
330  entities.connections_.erase(connection_id);
331  return_code = RC_NO_ERROR;
332  } catch (const CORBA::BAD_PARAM&) {
334  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: Destroy_Connection - INVALID_PARAM\n"));
335  }
336  return_code = INVALID_PARAM;
337  }
338 }
#define ACE_ERROR(X)
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30

◆ Get_Connection_Parameters()

void FACE::TS::Get_Connection_Parameters ( CONNECTION_NAME_TYPE &  connection_name,
CONNECTION_ID_TYPE &  connection_id,
TRANSPORT_CONNECTION_STATUS_TYPE &  status,
RETURN_CODE_TYPE &  return_code 
)

Definition at line 175 of file FaceTSS.cpp.

References ACE_DEBUG, ACE_ERROR, OpenDDS::FaceTSS::config::ConnectionSettings::connection_id_, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::FaceTSS::Entities::instance(), OpenDDS::FaceTSS::Entities::FaceReceiver::last_msg_header, LM_DEBUG, LM_ERROR, OpenDDS::FaceTSS::Entities::FaceReceiver::messages_waiting(), OPENDDS_STRING, OpenDDS::FaceTSS::Entities::receivers_, OpenDDS::FaceTSS::Entities::senders_, OpenDDS::FaceTSS::Entities::DDSAdapter::status_valid, OpenDDS::FaceTSS::Entities::FaceReceiver::sum_recvd_msgs_latency, and OpenDDS::FaceTSS::Entities::FaceReceiver::total_msgs_recvd.

179 {
180  try {
181  // connection_name is optional, if absent populate from connection_id lookup
182  // connection_id is also optional, if absent populate from connection_name lookup
183  // if both provided, validate
184  // if neither present, return error
185  Entities& entities = *Entities::instance();
186 
187  if (connection_id != 0 && entities.connections_.count(connection_id)) {
188  // connection_id was provided
189  // if validated or populated, set return_code so status will be populated
190  if (connection_name[0]) {
191  // Validate provided connection_name
192  OPENDDS_STRING conn_name = entities.connections_[connection_id].connection_name;
193  if (std::strcmp(connection_name, conn_name.c_str()) == 0) {
194  return_code = RC_NO_ERROR;
195  } else {
196  return_code = INVALID_PARAM;
197  }
198  } else {
199  // connection_name not provided
200  // so populate from connection_id lookup
201  // and set return code so status will be populated
202  entities.connections_[connection_id].connection_name.copy(connection_name,
203  sizeof(CONNECTION_NAME_TYPE));
204  connection_name[sizeof(CONNECTION_NAME_TYPE) - 1] = 0;
205  return_code = RC_NO_ERROR;
206  }
207 
208  } else if (connection_name[0] && connection_id == 0) {
209  // connection_id was not specified, but name was provided.
210  // lookup connection_id and if found set return code to populate status
211  ConnectionSettings settings;
212  if (0 == parser.find_connection(connection_name, settings)) {
213  connection_id = settings.connection_id_;
214  return_code = RC_NO_ERROR;
215  } else {
216  // could not find connection for connection_name
217  return_code = INVALID_PARAM;
218  }
219  } else {
220  //Neither connection_id or connection_name provided
221  // a valid connection
222  return_code = INVALID_PARAM;
223  }
224  if (return_code == RC_NO_ERROR) {
225  TRANSPORT_CONNECTION_STATUS_TYPE& cur_status = entities.connections_[connection_id].connection_status;
226  if (cur_status.CONNECTION_DIRECTION == FACE::DESTINATION) {
227  Entities::FaceReceiver& receiver = *entities.receivers_[connection_id];
228  if (receiver.status_valid != FACE::VALID) {
230  ACE_DEBUG((LM_DEBUG, "Get_Connection_Parameters: returning NOT_AVAILABLE due to receiver's status not valid\n"));
231  }
232  return_code = NOT_AVAILABLE;
233  return;
234  }
235  if (receiver.total_msgs_recvd != 0) {
236  cur_status.REFRESH_PERIOD = receiver.sum_recvd_msgs_latency / receiver.total_msgs_recvd;
237  cur_status.LAST_MSG_VALIDITY = receiver.last_msg_header.message_validity;
238  } else {
239  cur_status.REFRESH_PERIOD = 0;
240  }
241  WAITING_RANGE_TYPE num_waiting;
242  if (receiver.messages_waiting(num_waiting) == FACE::RC_NO_ERROR) {
243  cur_status.WAITING_PROCESSES_OR_MESSAGES = num_waiting;
244  } else {
246  ACE_DEBUG((LM_DEBUG, "Get_Connection_Parameters: returning NOT_AVAILABLE due to messages_waiting\n"));
247  }
248  return_code = NOT_AVAILABLE;
249  return;
250  }
251  } else {
252  //DDS Only supports Destination/Source therefore
253  // CONNECTION_DIRECTION == FACE::SOURCE
254  Entities::FaceSender& sender = entities.senders_[connection_id];
255  if (sender.status_valid != FACE::VALID) {
256  return_code = NOT_AVAILABLE;
257  return;
258  }
259  cur_status.REFRESH_PERIOD = 0;
260  cur_status.WAITING_PROCESSES_OR_MESSAGES = 0;
261  }
262  status = cur_status;
263  }
264  } catch (const CORBA::BAD_PARAM&) {
266  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: Get_Connection_Parameters - INVALID_PARAM\n"));
267  }
268  return_code = INVALID_PARAM;
269  }
270 }
#define ACE_DEBUG(X)
#define ACE_ERROR(X)
#define OPENDDS_STRING
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30

◆ Initialize()

void FACE::TS::Initialize ( const CONFIGURATION_RESOURCE  configuration_file,
RETURN_CODE_TYPE &  return_code 
)

Definition at line 70 of file FaceTSS.cpp.

References ACE_ERROR, ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, LM_ERROR, and TheServiceParticipant.

Referenced by OpenDDS::Security::XmlUtils::get_parser(), and OpenDDS::DCPS::QOS_XML_String_Handler::QOS_XML_String_Handler().

72 {
73  try {
74  int status = parser.parse(configuration_file);
75  if (status != 0) {
76  ACE_ERROR((LM_ERROR,
77  ACE_TEXT("(%P|%t) ERROR: Initialize() ")
78  ACE_TEXT("Parser::parse () returned %d\n"),
79  status));
80  return_code = INVALID_PARAM;
81  } else {
82  return_code = RC_NO_ERROR;
83 #if OPENDDS_POOL_ALLOCATOR
84  TheServiceParticipant->configure_pool();
85 #endif
86  }
87  } catch (const CORBA::BAD_PARAM& ) {
89  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: Initialize - INVALID_PARAM\n"));
90  }
91  return_code = INVALID_PARAM;
92  }
93 }
#define ACE_ERROR(X)
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_TEXT("TCP_Factory")
#define TheServiceParticipant

◆ receive_header()

OpenDDS_FACE_Export void FACE::TS::receive_header ( FACE::CONNECTION_ID_TYPE  connection_id,
FACE::TIMEOUT_TYPE  ,
FACE::TRANSACTION_ID_TYPE &  transaction_id,
FACE::TS::MessageHeader &  message_header,
FACE::MESSAGE_SIZE_TYPE  message_size,
FACE::RETURN_CODE_TYPE &  return_code 
)

Definition at line 341 of file FaceTSS.cpp.

References ACE_DEBUG, ACE_ERROR, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::FaceTSS::Entities::instance(), LM_DEBUG, LM_ERROR, OpenDDS::FaceTSS::Entities::receivers_, DDS::RETCODE_BAD_PARAMETER, DDS::RETCODE_OK, and OpenDDS::FaceTSS::update_status().

Referenced by Receive_Message().

347 {
348  try {
349  Entities::ConnIdToReceiverMap& readers =
350  Entities::instance()->receivers_;
351  // transaction_id cannot be 0 due to initialization
352  // of last_msg_tid to 0 before a msg has been received so
353  // only valid transaction_ids are > 0.
354  if (!readers.count(connection_id) || transaction_id == 0) {
356  ACE_DEBUG((LM_DEBUG, "(%P|%t) receive_header - INVALID_PARAM - "
357  "could not find reader for connection_id: %d OR transaction id[%d] == 0\n",
358  connection_id,
359  transaction_id));
360  }
361  return_code = FACE::INVALID_PARAM;
362  return;
363  }
364 
365  if (message_size < 0 || (unsigned)message_size < sizeof(FACE::TS::MessageHeader)) {
367  ACE_DEBUG((LM_DEBUG, "(%P|%t) receive_header - INVALID_PARAM - message_size: %d is < %d\n",
368  message_size,
369  sizeof(FACE::TS::MessageHeader)));
370  }
371  return_code = FACE::INVALID_PARAM;
372  return;
373  }
374  if (transaction_id == readers[connection_id]->last_msg_tid) {
375  message_header = readers[connection_id]->last_msg_header;
376  return_code = OpenDDS::FaceTSS::update_status(connection_id, DDS::RETCODE_OK);
377  } else {
378  return_code = OpenDDS::FaceTSS::update_status(connection_id, DDS::RETCODE_BAD_PARAMETER);
379  }
380  } catch (const CORBA::BAD_PARAM&) {
382  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: receive_header - INVALID_PARAM\n"));
383  }
384  return_code = INVALID_PARAM;
385  }
386 }
#define ACE_DEBUG(X)
#define ACE_ERROR(X)
const ReturnCode_t RETCODE_OK
const ReturnCode_t RETCODE_BAD_PARAMETER
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
FACE::RETURN_CODE_TYPE update_status(FACE::CONNECTION_ID_TYPE connection_id, DDS::ReturnCode_t retcode)
Definition: FaceTSS.cpp:730

◆ Receive_Message()

void FACE::TS::Receive_Message ( CONNECTION_ID_TYPE  connection_id,
TIMEOUT_TYPE  timeout,
TRANSACTION_ID_TYPE &  transaction_id,
MessageHeader &  message_header,
MESSAGE_SIZE_TYPE  message_size,
RETURN_CODE_TYPE &  return_code 
)

Definition at line 388 of file FaceTSS.cpp.

References ACE_DEBUG, ACE_ERROR, OpenDDS::FaceTSS::config::QosSettings::apply_to(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::FaceTSS::Entities::instance(), OpenDDS::DCPS::RcHandle< T >::is_nil(), LM_DEBUG, LM_ERROR, receive_header(), OpenDDS::FaceTSS::Entities::receivers_, Registered_Data_Types, OpenDDS::FaceTSS::Entities::senders_, OpenDDS::DCPS::set_DCPS_debug_level(), TheParticipantFactory, TheServiceParticipant, TheTransportRegistry, TOPIC_QOS_DEFAULT, OpenDDS::DCPS::Transport_debug_level, DDS::DataWriterQos::user_data, DDS::DataReaderQos::user_data, DDS::DomainParticipantQos::user_data, and DDS::UserDataQosPolicy::value.

395 {
396  try {
397  receive_header(connection_id, timeout,
398  transaction_id, message_header,
399  message_size, return_code);
400  } catch (const CORBA::BAD_PARAM&) {
402  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: Receive_Message - INVALID_PARAM\n"));
403  }
404  return_code = INVALID_PARAM;
405  }
406 }
#define ACE_ERROR(X)
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
OpenDDS_FACE_Export void receive_header(FACE::CONNECTION_ID_TYPE connection_id, FACE::TIMEOUT_TYPE, FACE::TRANSACTION_ID_TYPE &transaction_id, FACE::TS::MessageHeader &message_header, FACE::MESSAGE_SIZE_TYPE message_size, FACE::RETURN_CODE_TYPE &return_code)
Definition: FaceTSS.cpp:341

◆ Unregister_Callback()

void FACE::TS::Unregister_Callback ( CONNECTION_ID_TYPE  connection_id,
RETURN_CODE_TYPE &  return_code 
)

Definition at line 272 of file FaceTSS.cpp.

References ACE_ERROR, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::FaceTSS::Entities::instance(), LM_ERROR, and OpenDDS::FaceTSS::Entities::receivers_.

274 {
275  try {
276  Entities& entities = *Entities::instance();
277  Entities::ConnIdToReceiverMap& readers = entities.receivers_;
278  if (readers.count(connection_id)) {
279  readers[connection_id]->dr->set_listener(NULL, 0);
280  return_code = RC_NO_ERROR;
281  return;
282  }
283  } catch (const CORBA::BAD_PARAM&) {
285  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: Unregister_Callback - INVALID_PARAM\n"));
286  }
287  }
288 
289  return_code = INVALID_PARAM;
290 }
#define ACE_ERROR(X)
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30