OpenDDS  Snapshot(2023/04/28-20:55)
FaceTSS.cpp
Go to the documentation of this file.
1 #include "FaceTSS.h"
2 #include "FACE/TS.hpp"
3 #include "config/Parser.h"
4 
12 #include "dds/DCPS/GuidConverter.h"
13 #include "dds/DCPS/Qos_Helper.h"
14 #include "dds/DdsDcpsCoreC.h"
17 
18 #include <cstring>
19 
20 #ifndef OPENDDS_SAFETY_PROFILE
21 using OpenDDS::DCPS::operator==;
22 #endif
23 
24 namespace FACE {
25 namespace TS {
26 
27 bool MessageHeader::operator==(const MessageHeader& rhs) const
28 {
29  return message_instance_guid == rhs.message_instance_guid
30  && platform_view_guid == rhs.platform_view_guid
31  && message_source_guid == rhs.message_source_guid
32  && message_timestamp == rhs.message_timestamp
33  && message_validity == rhs.message_validity;
34 }
35 
39 
40 namespace {
42 
43  void find_or_create_dp(const DDS::DomainId_t& domainId,
44  int participantId,
45  const DDS::DomainParticipantFactory_var& dpf,
46  DDS::DomainParticipant_var& dp);
47  void find_or_create_pub(const DDS::PublisherQos& qos,
48  const DDS::DomainParticipant_var& dp,
49  DDS::Publisher_var& pub);
50  void find_or_create_sub(const DDS::SubscriberQos& qos,
51  const DDS::DomainParticipant_var& dp,
52  DDS::Subscriber_var& sub);
53 
54  bool cleanup_opendds_publisher(const DDS::Publisher_var pub);
55  bool cleanup_opendds_subscriber(const DDS::Subscriber_var sub);
56  void cleanup_opendds_participant(const DDS::DomainParticipant_var dp);
57 
58  RETURN_CODE_TYPE create_opendds_entities(CONNECTION_ID_TYPE connectionId,
59  int participantId,
60  const DDS::DomainId_t domainId,
61  const char* topic,
62  const char* type,
63  CONNECTION_DIRECTION_TYPE dir,
64  QosSettings& qos,
65  const char* transport);
66 }
67 
69 
70 void Initialize(const CONFIGURATION_RESOURCE configuration_file,
71  RETURN_CODE_TYPE& return_code)
72 {
73  try {
74  int status = parser.parse(configuration_file);
75  if (status != 0) {
77  ACE_TEXT("(%P|%t) ERROR: Initialize() ")
78  ACE_TEXT("Parser::parse () returned %d\n"),
79  status));
80  return_code = INVALID_PARAM;
81  } else {
82  return_code = RC_NO_ERROR;
83 #if OPENDDS_POOL_ALLOCATOR
84  TheServiceParticipant->configure_pool();
85 #endif
86  }
87  } catch (const CORBA::BAD_PARAM& ) {
89  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: Initialize - INVALID_PARAM\n"));
90  }
91  return_code = INVALID_PARAM;
92  }
93 }
94 
95 void Create_Connection(const CONNECTION_NAME_TYPE connection_name,
96  MESSAGING_PATTERN_TYPE pattern,
97  CONNECTION_ID_TYPE& connection_id,
98  CONNECTION_DIRECTION_TYPE& connection_direction,
99  MESSAGE_SIZE_TYPE& max_message_size,
100  TIMEOUT_TYPE,
101  RETURN_CODE_TYPE& return_code)
102 {
103  try {
104  return_code = RC_NO_ERROR;
105 
106  if (pattern != PUB_SUB) {
107  return_code = INVALID_CONFIG;
108  return;
109  }
110 
111  ConnectionSettings connection;
112  TopicSettings topic;
113  QosSettings qos;
114 
115  // Find connection
116  if (!parser.find_connection(connection_name, connection)) {
117  // Find topic
118  if (!parser.find_topic(connection.topic_name_, topic)) {
119  // Find Qos by specified name(s)
120  if (parser.find_qos(connection, qos)) {
121  return_code = INVALID_CONFIG;
122  }
123  } else {
124  return_code = INVALID_CONFIG;
125  }
126  } else {
127  return_code = INVALID_CONFIG;
128  }
129 
130  if (return_code != RC_NO_ERROR) {
131  return;
132  }
133 
134  connection_id = connection.connection_id_;
135  connection_direction = connection.direction_;
136  max_message_size = topic.max_message_size_;
137 
138  return_code = create_opendds_entities(connection_id,
139  connection.participant_id_,
140  connection.domain_id_,
141  connection.topic_name_,
142  topic.type_name_,
143  connection_direction,
144  qos,
145  connection.config_name_);
146  if (return_code != RC_NO_ERROR) {
147  return;
148  }
149 
150  const SYSTEM_TIME_TYPE refresh_period =
151  (connection_direction == SOURCE) ?
153 
154  const TRANSPORT_CONNECTION_STATUS_TYPE status = {
155  0, // MESSAGE currently set to 0 due to type mismatch in MESSAGE_RANGE_TYPE and MESSAGE_TYPE_GUID
156  max_message_size, // MAX_MESSAGE with no transformations, set to config value
157  max_message_size,
158  connection_direction,
159  0, // WAITING_PROCESSES_OR_MESSAGES
160  refresh_period,
161  INVALID,
162  };
163  Entities::ConnectionInfo& conn_info = Entities::instance()->connections_[connection_id];
164  conn_info.connection_name = connection_name;
165  conn_info.connection_status = status;
166  conn_info.platform_view_guid = topic.platform_view_guid_;
167  } catch (const CORBA::BAD_PARAM&) {
169  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: Create_Connection - INVALID_PARAM\n"));
170  }
171  return_code = INVALID_PARAM;
172  }
173 }
174 
175 void Get_Connection_Parameters(CONNECTION_NAME_TYPE& connection_name,
176  CONNECTION_ID_TYPE& connection_id /* 0 if an out param */,
177  TRANSPORT_CONNECTION_STATUS_TYPE& status,
178  RETURN_CODE_TYPE& return_code)
179 {
180  try {
181  // connection_name is optional, if absent populate from connection_id lookup
182  // connection_id is also optional, if absent populate from connection_name lookup
183  // if both provided, validate
184  // if neither present, return error
185  Entities& entities = *Entities::instance();
186 
187  if (connection_id != 0 && entities.connections_.count(connection_id)) {
188  // connection_id was provided
189  // if validated or populated, set return_code so status will be populated
190  if (connection_name[0]) {
191  // Validate provided connection_name
192  OPENDDS_STRING conn_name = entities.connections_[connection_id].connection_name;
193  if (std::strcmp(connection_name, conn_name.c_str()) == 0) {
194  return_code = RC_NO_ERROR;
195  } else {
196  return_code = INVALID_PARAM;
197  }
198  } else {
199  // connection_name not provided
200  // so populate from connection_id lookup
201  // and set return code so status will be populated
202  entities.connections_[connection_id].connection_name.copy(connection_name,
203  sizeof(CONNECTION_NAME_TYPE));
204  connection_name[sizeof(CONNECTION_NAME_TYPE) - 1] = 0;
205  return_code = RC_NO_ERROR;
206  }
207 
208  } else if (connection_name[0] && connection_id == 0) {
209  // connection_id was not specified, but name was provided.
210  // lookup connection_id and if found set return code to populate status
211  ConnectionSettings settings;
212  if (0 == parser.find_connection(connection_name, settings)) {
213  connection_id = settings.connection_id_;
214  return_code = RC_NO_ERROR;
215  } else {
216  // could not find connection for connection_name
217  return_code = INVALID_PARAM;
218  }
219  } else {
220  //Neither connection_id or connection_name provided
221  // a valid connection
222  return_code = INVALID_PARAM;
223  }
224  if (return_code == RC_NO_ERROR) {
225  TRANSPORT_CONNECTION_STATUS_TYPE& cur_status = entities.connections_[connection_id].connection_status;
226  if (cur_status.CONNECTION_DIRECTION == FACE::DESTINATION) {
227  Entities::FaceReceiver& receiver = *entities.receivers_[connection_id];
228  if (receiver.status_valid != FACE::VALID) {
230  ACE_DEBUG((LM_DEBUG, "Get_Connection_Parameters: returning NOT_AVAILABLE due to receiver's status not valid\n"));
231  }
232  return_code = NOT_AVAILABLE;
233  return;
234  }
235  if (receiver.total_msgs_recvd != 0) {
236  cur_status.REFRESH_PERIOD = receiver.sum_recvd_msgs_latency / receiver.total_msgs_recvd;
237  cur_status.LAST_MSG_VALIDITY = receiver.last_msg_header.message_validity;
238  } else {
239  cur_status.REFRESH_PERIOD = 0;
240  }
241  WAITING_RANGE_TYPE num_waiting;
242  if (receiver.messages_waiting(num_waiting) == FACE::RC_NO_ERROR) {
243  cur_status.WAITING_PROCESSES_OR_MESSAGES = num_waiting;
244  } else {
246  ACE_DEBUG((LM_DEBUG, "Get_Connection_Parameters: returning NOT_AVAILABLE due to messages_waiting\n"));
247  }
248  return_code = NOT_AVAILABLE;
249  return;
250  }
251  } else {
252  //DDS Only supports Destination/Source therefore
253  // CONNECTION_DIRECTION == FACE::SOURCE
254  Entities::FaceSender& sender = entities.senders_[connection_id];
255  if (sender.status_valid != FACE::VALID) {
256  return_code = NOT_AVAILABLE;
257  return;
258  }
259  cur_status.REFRESH_PERIOD = 0;
260  cur_status.WAITING_PROCESSES_OR_MESSAGES = 0;
261  }
262  status = cur_status;
263  }
264  } catch (const CORBA::BAD_PARAM&) {
266  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: Get_Connection_Parameters - INVALID_PARAM\n"));
267  }
268  return_code = INVALID_PARAM;
269  }
270 }
271 
272 void Unregister_Callback(CONNECTION_ID_TYPE connection_id,
273  RETURN_CODE_TYPE& return_code)
274 {
275  try {
276  Entities& entities = *Entities::instance();
277  Entities::ConnIdToReceiverMap& readers = entities.receivers_;
278  if (readers.count(connection_id)) {
279  readers[connection_id]->dr->set_listener(NULL, 0);
280  return_code = RC_NO_ERROR;
281  return;
282  }
283  } catch (const CORBA::BAD_PARAM&) {
285  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: Unregister_Callback - INVALID_PARAM\n"));
286  }
287  }
288 
289  return_code = INVALID_PARAM;
290 }
291 
292 void Destroy_Connection(CONNECTION_ID_TYPE connection_id,
293  RETURN_CODE_TYPE& return_code)
294 {
295  try {
296  Entities& entities = *Entities::instance();
297  Entities::ConnIdToSenderMap& writers = entities.senders_;
298  Entities::ConnIdToReceiverMap& readers = entities.receivers_;
299 
300  DDS::DomainParticipant_var dp;
301  bool try_cleanup_participant = false;
302  if (writers.count(connection_id)) {
303  const DDS::DataWriter_var datawriter = writers[connection_id].dw;
304  const DDS::Publisher_var pub = datawriter->get_publisher();
305  writers.erase(connection_id);
306  pub->delete_datawriter(datawriter);
307  dp = pub->get_participant();
308  try_cleanup_participant = cleanup_opendds_publisher(pub);
309 
310  } else if (readers.count(connection_id)) {
311  const DDS::DataReader_var datareader = readers[connection_id]->dr;
312  const DDS::Subscriber_var sub = datareader->get_subscriber();
313  delete readers[connection_id];
314  readers.erase(connection_id);
315  datareader->delete_contained_entities();
316  sub->delete_datareader(datareader);
317  dp = sub->get_participant();
318  try_cleanup_participant = cleanup_opendds_subscriber(sub);
319  }
320 
321  if (!dp) {
322  return_code = INVALID_PARAM;
323  return;
324  }
325 
326  if (try_cleanup_participant) {
327  cleanup_opendds_participant(dp);
328  }
329 
330  entities.connections_.erase(connection_id);
331  return_code = RC_NO_ERROR;
332  } catch (const CORBA::BAD_PARAM&) {
334  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: Destroy_Connection - INVALID_PARAM\n"));
335  }
336  return_code = INVALID_PARAM;
337  }
338 }
339 
340 OpenDDS_FACE_Export
341 void receive_header(/*in*/ FACE::CONNECTION_ID_TYPE connection_id,
342  /*in*/ FACE::TIMEOUT_TYPE /*timeout*/,
343  /*inout*/ FACE::TRANSACTION_ID_TYPE& transaction_id,
344  /*inout*/ FACE::TS::MessageHeader& message_header,
345  /*in*/ FACE::MESSAGE_SIZE_TYPE message_size,
346  /*out*/ FACE::RETURN_CODE_TYPE& return_code)
347 {
348  try {
349  Entities::ConnIdToReceiverMap& readers =
351  // transaction_id cannot be 0 due to initialization
352  // of last_msg_tid to 0 before a msg has been received so
353  // only valid transaction_ids are > 0.
354  if (!readers.count(connection_id) || transaction_id == 0) {
356  ACE_DEBUG((LM_DEBUG, "(%P|%t) receive_header - INVALID_PARAM - "
357  "could not find reader for connection_id: %d OR transaction id[%d] == 0\n",
358  connection_id,
359  transaction_id));
360  }
361  return_code = FACE::INVALID_PARAM;
362  return;
363  }
364 
365  if (message_size < 0 || (unsigned)message_size < sizeof(FACE::TS::MessageHeader)) {
367  ACE_DEBUG((LM_DEBUG, "(%P|%t) receive_header - INVALID_PARAM - message_size: %d is < %d\n",
368  message_size,
369  sizeof(FACE::TS::MessageHeader)));
370  }
371  return_code = FACE::INVALID_PARAM;
372  return;
373  }
374  if (transaction_id == readers[connection_id]->last_msg_tid) {
375  message_header = readers[connection_id]->last_msg_header;
376  return_code = OpenDDS::FaceTSS::update_status(connection_id, DDS::RETCODE_OK);
377  } else {
378  return_code = OpenDDS::FaceTSS::update_status(connection_id, DDS::RETCODE_BAD_PARAMETER);
379  }
380  } catch (const CORBA::BAD_PARAM&) {
382  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: receive_header - INVALID_PARAM\n"));
383  }
384  return_code = INVALID_PARAM;
385  }
386 }
387 
389  /* in */ CONNECTION_ID_TYPE connection_id,
390  /* in */ TIMEOUT_TYPE timeout,
391  /* inout */ TRANSACTION_ID_TYPE& transaction_id,
392  /* out */ MessageHeader& message_header,
393  /* in */ MESSAGE_SIZE_TYPE message_size,
394  /* out */ RETURN_CODE_TYPE& return_code)
395 {
396  try {
397  receive_header(connection_id, timeout,
398  transaction_id, message_header,
399  message_size, return_code);
400  } catch (const CORBA::BAD_PARAM&) {
402  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: Receive_Message - INVALID_PARAM\n"));
403  }
404  return_code = INVALID_PARAM;
405  }
406 }
407 
408 namespace {
409  void find_or_create_dp(const DDS::DomainId_t& domainId,
410  int participantId,
411  const DDS::DomainParticipantFactory_var& dpf,
412  DDS::DomainParticipant_var& dp) {
413  DDS::DomainParticipant_var temp_dp;
414 
415  temp_dp = dpf->lookup_participant(domainId);
416  if (!temp_dp) {
418  ACE_DEBUG((LM_DEBUG, "(%P|%t) find_or_create_dp - created new participant for domainId: %d\n", domainId));
419  }
421  dpf->get_default_participant_qos(qos);
422  qos.user_data.value.length(6);
423  qos.user_data.value[0] = (participantId >> 0) & 0xFF;
424  qos.user_data.value[1] = (participantId >> 8) & 0xFF;
425  qos.user_data.value[2] = (participantId >> 16) & 0xFF;
426  qos.user_data.value[3] = (participantId >> 24) & 0xFF;
427  qos.user_data.value[4] = 0; // (participantId >> 32) & 0xFF;
428  qos.user_data.value[5] = 0; // (participantId >> 40) & 0xFF;
429  dp = dpf->create_participant(domainId, qos, 0, 0);
430  return;
431  }
433  ACE_DEBUG((LM_DEBUG, "(%P|%t) find_or_create_dp - found existing participant for domainId: %d\n", domainId));
434  }
435  dp = temp_dp;
436  }
437 
438  void find_or_create_pub(const DDS::PublisherQos& qos,
439  const DDS::DomainParticipant_var& dp,
440  DDS::Publisher_var& pub) {
441  DDS::DomainParticipant_var temp_dp;
442  DDS::Publisher_var temp_pub;
443 
444  Entities::ConnIdToSenderMap& writers = Entities::instance()->senders_;
445  Entities::ConnIdToSenderMap::iterator wtrIter = writers.begin();
446 
447  while (wtrIter != writers.end()) {
448  temp_pub = wtrIter->second.dw->get_publisher();
449  temp_dp = temp_pub->get_participant();
450  DDS::PublisherQos temp_qos;
451  temp_pub->get_qos(temp_qos);
452  if (dp->get_domain_id() == temp_dp->get_domain_id() &&
453  temp_qos == qos) {
455  ACE_DEBUG((LM_DEBUG, "(%P|%t) find_or_create_pub - found existing publisher in senders\n"));
456  }
457  pub = temp_pub;
458  return;
459  } else {
460  ++wtrIter;
461  }
462  }
464  ACE_DEBUG((LM_DEBUG, "(%P|%t) find_or_create_pub - created new publisher\n"));
465  }
466  pub = dp->create_publisher(qos, 0, 0);
467  }
468 
469  void find_or_create_sub(const DDS::SubscriberQos& qos,
470  const DDS::DomainParticipant_var& dp,
471  DDS::Subscriber_var& sub)
472  {
473  DDS::DomainParticipant_var temp_dp;
474  DDS::Subscriber_var temp_sub;
475 
476  Entities::ConnIdToReceiverMap& readers = Entities::instance()->receivers_;
477  Entities::ConnIdToReceiverMap::iterator rdrIter = readers.begin();
478 
479  while (rdrIter != readers.end()) {
480  temp_sub = rdrIter->second->dr->get_subscriber();
481  temp_dp = temp_sub->get_participant();
482  DDS::SubscriberQos temp_qos;
483  temp_sub->get_qos(temp_qos);
484  if (dp->get_domain_id() == temp_dp->get_domain_id() &&
485  temp_qos == qos) {
487  ACE_DEBUG((LM_DEBUG, "(%P|%t) find_or_create_sub - found existing subscriber in receivers\n"));
488  }
489  sub = temp_sub;
490  return;
491  } else {
492  ++rdrIter;
493  }
494  }
496  ACE_DEBUG((LM_DEBUG, "(%P|%t) find_or_create_sub - created new subscriber\n"));
497  }
498  sub = dp->create_subscriber(qos, 0, 0);
499  }
500 
501  bool cleanup_opendds_publisher(const DDS::Publisher_var pub)
502  {
503  DDS::Publisher_var temp_pub;
504  DDS::PublisherQos pub_qos;
505  pub->get_qos(pub_qos);
506  DDS::DomainParticipant_var dp = pub->get_participant();
507  DDS::DomainParticipant_var temp_dp;
508  DDS::PublisherQos temp_qos;
509 
510  Entities::ConnIdToSenderMap& writers = Entities::instance()->senders_;
511  Entities::ConnIdToSenderMap::iterator wtrIter = writers.begin();
512  while (wtrIter != writers.end()) {
513  temp_pub = wtrIter->second.dw->get_publisher();
514  temp_dp = temp_pub->get_participant();
515  if (dp->get_domain_id() == temp_dp->get_domain_id()) {
516  temp_pub->get_qos(temp_qos);
517  if (pub_qos == temp_qos) {
519  ACE_DEBUG((LM_DEBUG, "(%P|%t) cleanup_opendds_publisher - publisher still in use by other writer\n"));
520  }
521  return false;
522  }
523  }
524  ++wtrIter;
525  }
527  ACE_DEBUG((LM_DEBUG, "(%P|%t) cleanup_opendds_publisher - publisher no longer in use, delete pub\n"));
528  }
529  dp->delete_publisher(pub);
530  return true;
531  }
532 
533  bool cleanup_opendds_subscriber(const DDS::Subscriber_var sub)
534  {
535  DDS::Subscriber_var temp_sub;
536  DDS::SubscriberQos sub_qos;
537  sub->get_qos(sub_qos);
538  DDS::DomainParticipant_var dp = sub->get_participant();
539  DDS::DomainParticipant_var temp_dp;
540  DDS::SubscriberQos temp_qos;
541 
542 
543  Entities::ConnIdToReceiverMap& readers = Entities::instance()->receivers_;
544  Entities::ConnIdToReceiverMap::iterator rdrIter = readers.begin();
545  while (rdrIter != readers.end()) {
546  temp_sub = rdrIter->second->dr->get_subscriber();
547  temp_dp = temp_sub->get_participant();
548 
549  if (dp->get_domain_id() == temp_dp->get_domain_id()) {
550  temp_sub->get_qos(temp_qos);
551  if (sub_qos == temp_qos) {
553  ACE_DEBUG((LM_DEBUG, "(%P|%t) cleanup_opendds_subscriber - subscriber still in use by other reader\n"));
554  }
555  return false;
556  }
557  }
558  ++rdrIter;
559  }
561  ACE_DEBUG((LM_DEBUG, "(%P|%t) cleanup_opendds_subscriber - subscriber no longer in use, delete sub\n"));
562  }
563  sub->delete_contained_entities();
564  dp->delete_subscriber(sub);
565  return true;
566  }
567 
568  void cleanup_opendds_participant(const DDS::DomainParticipant_var dp)
569  {
570  DDS::DomainParticipant_var temp_dp;
571  Entities::ConnIdToReceiverMap& readers = Entities::instance()->receivers_;
572  Entities::ConnIdToReceiverMap::iterator rdrIter = readers.begin();
573  while (rdrIter != readers.end()) {
574  DDS::Subscriber_var sub = rdrIter->second->dr->get_subscriber();
575  temp_dp = sub->get_participant();
576  if (dp->get_domain_id() == temp_dp->get_domain_id()) {
578  ACE_DEBUG((LM_DEBUG, "(%P|%t) cleanup_opendds_participant - participant still in use by reader\n"));
579  }
580  return;
581  } else {
582  ++rdrIter;
583  }
584  }
585 
586  Entities::ConnIdToSenderMap& writers = Entities::instance()->senders_;
587  Entities::ConnIdToSenderMap::iterator wtrIter = writers.begin();
588 
589  while (wtrIter != writers.end()) {
590  DDS::Publisher_var publisher = wtrIter->second.dw->get_publisher();
591  temp_dp = publisher->get_participant();
592  if (dp->get_domain_id() == temp_dp->get_domain_id()) {
594  ACE_DEBUG((LM_DEBUG, "(%P|%t) cleanup_opendds_participant - participant still in use by writer\n"));
595  }
596  return;
597  } else {
598  ++wtrIter;
599  }
600  }
602  ACE_DEBUG((LM_DEBUG, "(%P|%t) cleanup_opendds_participant - participant for domain: %d no longer in use, delete entities and participant\n", dp->get_domain_id()));
603  }
604  dp->delete_contained_entities();
605  const DDS::DomainParticipantFactory_var dpf = TheParticipantFactory;
606  dpf->delete_participant(dp);
607  }
608 
609  RETURN_CODE_TYPE create_opendds_entities(CONNECTION_ID_TYPE connectionId,
610  int participantId,
611  const DDS::DomainId_t domainId,
612  const char* topicName,
613  const char* type,
614  CONNECTION_DIRECTION_TYPE dir,
615  QosSettings& qos_settings,
616  const char* transport)
617  {
618 #ifdef DEBUG_OPENDDS_FACETSS
621  TheServiceParticipant->set_BIT(false);
622 #endif
623 
624  const DDS::DomainParticipantFactory_var dpf = TheParticipantFactory;
625  if (!dpf) return INVALID_PARAM;
626 
627  DDS::DomainParticipant_var dp;
628  find_or_create_dp(domainId, participantId, dpf, dp);
629  if (!dp) return INVALID_PARAM;
630 
632  OpenDDS::DCPS::TypeSupport_var ts =
633  Registered_Data_Types->lookup(dp, type);
634  if (!ts) {
635  ts = Registered_Data_Types->lookup(0, type);
636  if (!ts) return INVALID_PARAM;
637  Registered_Data_Types->register_type(dp, type, ts);
638  }
639 
640  const DDS::Topic_var topic =
641  dp->create_topic(topicName, type, TOPIC_QOS_DEFAULT, 0, 0);
642  if (!topic) return INVALID_PARAM;
643 
644  if (dir == SOURCE) {
645  DDS::PublisherQos publisher_qos;
646  qos_settings.apply_to(publisher_qos);
647 
648  DDS::Publisher_var pub;
649  find_or_create_pub(publisher_qos, dp, pub);
650  if (!pub) return INVALID_PARAM;
651 
652  if (transport && transport[0]) {
653  OpenDDS::DCPS::TransportConfig_rch config = TheTransportRegistry->get_config(transport);
654  if (config.is_nil()) return INVALID_PARAM;
655  try {
656  TheTransportRegistry->bind_config(config, pub);
657  } catch (const OpenDDS::DCPS::Transport::Exception&) {
658  return INVALID_PARAM;
659  }
660  }
661 
662  DDS::DataWriterQos datawriter_qos;
663  qos_settings.apply_to(datawriter_qos);
664 
665  // set up user data in DW qos
666  datawriter_qos.user_data.value.length (3);
667  datawriter_qos.user_data.value[0] = (connectionId >> 0) & 0xFF;
668  datawriter_qos.user_data.value[1] = (connectionId >> 8) & 0xFF;
669  datawriter_qos.user_data.value[2] = (connectionId >> 16) & 0xFF;
670 
671  const DDS::DataWriter_var dw =
672  pub->create_datawriter(topic, datawriter_qos, 0, 0);
673  if (!dw) return INVALID_PARAM;
674 
675  Entities::instance()->senders_[connectionId].dw = dw;
676 
677  } else { // dir == DESTINATION
678  DDS::SubscriberQos subscriber_qos;
679  qos_settings.apply_to(subscriber_qos);
680 
681  DDS::Subscriber_var sub;
682  find_or_create_sub(subscriber_qos, dp, sub);
683  if (!sub) return INVALID_PARAM;
684 
685  if (transport && transport[0]) {
686  OpenDDS::DCPS::TransportConfig_rch config = TheTransportRegistry->get_config(transport);
687  if (config.is_nil()) return INVALID_PARAM;
688  try {
689  TheTransportRegistry->bind_config(config, sub);
690  } catch (const OpenDDS::DCPS::Transport::Exception&) {
691  return INVALID_PARAM;
692  }
693  }
694 
695  DDS::DataReaderQos datareader_qos;
696  qos_settings.apply_to(datareader_qos);
697 
698  // set up user data in DR qos
699  datareader_qos.user_data.value.length (3);
700  datareader_qos.user_data.value[0] = (connectionId >> 0) & 0xFF;
701  datareader_qos.user_data.value[1] = (connectionId >> 8) & 0xFF;
702  datareader_qos.user_data.value[2] = (connectionId >> 16) & 0xFF;
703 
704  const DDS::DataReader_var dr =
705  sub->create_datareader(topic, datareader_qos, 0, 0);
706  if (!dr) return INVALID_PARAM;
707  Entities::instance()->receivers_[connectionId] = new Entities::FaceReceiver();
708  Entities::instance()->receivers_[connectionId]->dr = dr;
709  }
710 
711  return RC_NO_ERROR;
712  }
713 }
714 
715 }}
716 
718 
719 namespace OpenDDS {
720 namespace FaceTSS {
721 
724 
726 {
728 }
729 
730 FACE::RETURN_CODE_TYPE update_status(FACE::CONNECTION_ID_TYPE connection_id,
731  DDS::ReturnCode_t retcode)
732 {
733  FACE::TRANSPORT_CONNECTION_STATUS_TYPE& status =
734  Entities::instance()->connections_[connection_id].connection_status;
735  FACE::RETURN_CODE_TYPE rc = FACE::INVALID_PARAM;
736 
737  switch (retcode) {
738  case DDS::RETCODE_OK:
739  status.LAST_MSG_VALIDITY = FACE::VALID;
740  return FACE::RC_NO_ERROR;
741 
742  case DDS::RETCODE_ERROR:
743  rc = FACE::CONNECTION_CLOSED; break;
744 
746  rc = FACE::INVALID_PARAM; break;
747 
749  rc = FACE::DATA_BUFFER_TOO_SMALL; break;
750 
753  rc = FACE::INVALID_MODE; break;
754 
757  rc = FACE::INVALID_CONFIG; break;
758 
760  rc = FACE::CONNECTION_CLOSED; break;
761 
763  rc = FACE::TIMED_OUT; break;
764 
767  rc = FACE::NOT_AVAILABLE; break;
768 
770  rc = FACE::PERMISSION_DENIED; break;
771  }
772 
773  status.LAST_MSG_VALIDITY = FACE::INVALID;
774  return rc;
775 }
776 
777 enum { NSEC_PER_SEC = 1000000000 };
778 
779 DDS::Duration_t convertTimeout(FACE::TIMEOUT_TYPE timeout)
780 {
781  if (timeout == FACE::INF_TIME_VALUE) {
782  static const DDS::Duration_t dds_inf = {DDS::DURATION_INFINITE_SEC,
784  return dds_inf;
785  }
786 
787  DDS::Duration_t dur = {static_cast<int>(timeout / NSEC_PER_SEC),
788  static_cast<unsigned int>(timeout % NSEC_PER_SEC)};
789  return dur;
790 }
791 
792 FACE::SYSTEM_TIME_TYPE convertDuration(const DDS::Duration_t& duration)
793 {
794  if (duration.sec == DDS::DURATION_INFINITE_SEC
795  && duration.nanosec == DDS::DURATION_INFINITE_NSEC) {
796  return FACE::INF_TIME_VALUE;
797  }
798  return duration.nanosec +
799  duration.sec * static_cast<FACE::SYSTEM_TIME_TYPE>(NSEC_PER_SEC);
800 }
801 
802 FACE::SYSTEM_TIME_TYPE convertTime(const DDS::Time_t& timestamp)
803 {
804  return timestamp.nanosec +
805  timestamp.sec * static_cast<FACE::SYSTEM_TIME_TYPE>(NSEC_PER_SEC);
806 }
807 
808 FACE::MESSAGE_INSTANCE_GUID
810 {
811  OpenDDS::DCPS::GuidConverter writer(pub);
812 
813  FACE::MESSAGE_INSTANCE_GUID message_instance_guid;
814  FACE::LongLong mig_low;
815  FACE::LongLong masked_seq;
816 
817  //Until MESSAGE_INSTANCE_GUID becomes 128 bit GUID, use checksum to represent Prefix
818  FACE::Long prefix_representation = ACE::crc32(reinterpret_cast<const void*>(&pub), sizeof(pub));
819  masked_seq = orig_seq >> 32;
820 
821  if (masked_seq) {
822  ACE_DEBUG((LM_WARNING, "(%P|%t) WARNING: create_message_instance_guid - seq does not fit in FACE::Long, truncating high bits to fit\n"));
823  }
824  mig_low = orig_seq & 0xFFFFFFFF;
825  message_instance_guid = (((FACE::LongLong) prefix_representation) << 32 ) | ((FACE::LongLong) mig_low);
826 /*
827  //TODO: This is initial work toward defining how a 128 bit guid could be created
828  //in the future for a Message Instance Guid when supported.
829  // 13 byte prefix contains identifying pieces of guid
830  // 3 byte seq - truncated sequence from the sample
831  typedef CORBA::Octet MsgInstGuidPrefix_t[13];
832  typedef CORBA::Octet MsgInstGuidSeq_t[3];
833  MsgInstGuidPrefix_t migPrefix;
834  MsgInstGuidSeq_t migSeq;
835  ACE_OS::memcpy(&migPrefix[0], &pub.guidPrefix[2], 10);
836  ACE_OS::memcpy(&migPrefix[10], &pub.entityId.entityKey[0], 3);
837  masked_seq = orig_seq >> 24;
838 
839  if (masked_seq) {
840  ACE_DEBUG((LM_WARNING, "(%P|%t) WARNING: create_message_instance_guid - seq does not fit in 3 bytes, truncating high bits to fit\n"));
841  }
842  FACE::LongLong masked = orig_seq & 0xFFFFFF;
843 
844 #ifdef ACE_BIG_ENDIAN
845  masked <<= 8 * (sizeof(FACE::LongLong)-3);
846 #endif
847  ACE_OS::memcpy(&migSeq[0], &masked, 3);
848 */
849  return message_instance_guid;
850 }
851 
852 void populate_header_received(const FACE::CONNECTION_ID_TYPE& connection_id,
853  const DDS::DomainParticipant_var part,
854  const DDS::SampleInfo& sinfo,
855  FACE::RETURN_CODE_TYPE& return_code)
856 {
857  Entities::ConnIdToReceiverMap& readers = Entities::instance()->receivers_;
858  if (!readers.count(connection_id)) {
859  return_code = FACE::INVALID_PARAM;
860  return;
861  }
862  FACE::TS::MessageHeader& header = readers[connection_id]->last_msg_header;
863 
864  header.platform_view_guid = Entities::instance()->connections_[connection_id].platform_view_guid;
865 
866  DDS::Subscriber_var temp_sub = readers[connection_id]->dr->get_subscriber();
867  DDS::DomainParticipant_var temp_dp = temp_sub->get_participant();
869  if (!dpi) {
870  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) populate_header_received: ")
871  ACE_TEXT("failed to get DomainParticipantImpl.\n")));
872  return_code = FACE::NOT_AVAILABLE;
873  return;
874  }
875  const OpenDDS::DCPS::GUID_t pub = dpi->get_repoid(sinfo.publication_handle);
876  header.message_instance_guid = create_message_instance_guid(pub, sinfo.opendds_reserved_publication_seq);
877 
878  header.message_timestamp = convertTime(sinfo.source_timestamp);
880 
881  readers[connection_id]->sum_recvd_msgs_latency += (convertTime(now.to_dds_time()) - header.message_timestamp);
882  ++readers[connection_id]->total_msgs_recvd;
883 
885  ACE_DEBUG((LM_DEBUG, "(%P|%t) populate_header_received: Latency is now (tot_latency %d / tot_msgs_recvd %d): %d\n",
886  readers[connection_id]->sum_recvd_msgs_latency,
887  readers[connection_id]->total_msgs_recvd,
888  readers[connection_id]->sum_recvd_msgs_latency/readers[connection_id]->total_msgs_recvd));
889  }
890 
891  DDS::UserDataQosPolicy qos_user_data;
892  DDS::LifespanQosPolicy qos_lifespan;
893  const OpenDDS::DCPS::GUID_t sub = dpi->get_id();
894 
895  // Test if the reader and writer share a participant
896  if (std::memcmp(pub.guidPrefix, sub.guidPrefix, sizeof(sub.guidPrefix)) == 0) {
897 
898  Entities::ConnIdToSenderMap& writers = Entities::instance()->senders_;
899  Entities::ConnIdToSenderMap::iterator wtrIter = writers.begin();
900 
901  // Search domain writers for the sending instance
902  while (wtrIter != writers.end()) {
903  if (wtrIter->second.dw->get_instance_handle() == sinfo.publication_handle) {
904  break;
905  }
906  ++wtrIter;
907  }
908 
909  if (wtrIter == writers.end()) {
910  ACE_ERROR((LM_ERROR, "(%P|%t) populate_header_received: failed to find matching DataWriter instance.\n"));
911  return_code = FACE::NOT_AVAILABLE;
912  return;
913  }
914 
915  DDS::DataWriterQos qos;
916  wtrIter->second.dw->get_qos(qos);
917  qos_lifespan = qos.lifespan;
918  qos_user_data = qos.user_data;
919 
920  } else {
921  ::DDS::Subscriber_var bit_subscriber
922  = part->get_builtin_subscriber () ;
923 
924  ::DDS::DataReader_var reader
925  = bit_subscriber->lookup_datareader (OpenDDS::DCPS::BUILT_IN_PUBLICATION_TOPIC) ;
926  ::DDS::PublicationBuiltinTopicDataDataReader_var pub_reader
927  = ::DDS::PublicationBuiltinTopicDataDataReader::_narrow (reader.in ());
928  if (CORBA::is_nil (pub_reader.in ())) {
929  ACE_ERROR((LM_ERROR, "(%P|%t) populate_header_received: failed to get BUILT_IN_PUBLICATION_TOPIC datareader.\n"));
930  return_code = FACE::NOT_AVAILABLE;
931  return;
932  }
933 
935  ::DDS::SampleInfoSeq pubinfos(1);
936  ::DDS::PublicationBuiltinTopicDataSeq pubdata(1);
937 
938  ret = pub_reader->read_instance(pubdata,
939  pubinfos,
940  1,
941  sinfo.publication_handle,
945 
946 
947  if (ret != ::DDS::RETCODE_OK && ret != ::DDS::RETCODE_NO_DATA) {
949  "(%P|%t) populate_header_received: failed to read BIT publication data.\n"));
950  return_code = FACE::NOT_AVAILABLE;
951  return;
952  }
953 
954  const CORBA::ULong i = 0;
955  qos_lifespan = pubdata[i].lifespan;
956  qos_user_data = pubdata[i].user_data;
957  }
958 
959  header.message_source_guid =
960  (qos_user_data.value[0] << 0) |
961  (qos_user_data.value[1] << 8) |
962  (qos_user_data.value[2] << 16);
963 
964 // ACE_DEBUG((LM_DEBUG, "(%P|%t) populate_header_received: DW lifespan qos value: sec: %d nanosec: %d\n",
965 // pubdata[i].lifespan.duration.sec, pubdata[i].lifespan.duration.nanosec));
966 
967  DDS::Duration_t lifespan = qos_lifespan.duration;
968  if (lifespan.sec != DDS::DURATION_INFINITE_SEC &&
969  lifespan.nanosec != DDS::DURATION_INFINITE_NSEC) {
970  // Finite lifespan. Check if data has expired.
971 
972  const DDS::Time_t tmp = {
973  sinfo.source_timestamp.sec + lifespan.sec,
974  sinfo.source_timestamp.nanosec + lifespan.nanosec
975  };
976 
977  // We assume that the publisher host's clock and subscriber host's
978  // clock are synchronized (allowed by the spec).
979  const OpenDDS::DCPS::SystemTimePoint expiration_time(tmp);
980 
981  if (now >= expiration_time) {
982 // ACE_DEBUG((LM_DEBUG, "(%P|%t) populate_header_received: Last message expired, setting message_validity to INVALID\n"));
983  header.message_validity = FACE::INVALID;
984  return_code = FACE::RC_NO_ERROR;
985  return;
986  }
987  }
988 // ACE_DEBUG((LM_DEBUG, "(%P|%t) populate_header_received: Setting message_validity to VALID\n"));
989  header.message_validity = FACE::VALID;
990  return_code = FACE::RC_NO_ERROR;
991 }
992 }}
993 
UserDataQosPolicy user_data
#define TheTransportRegistry
Implements the OpenDDS::DCPS::DomainParticipant interfaces.
#define ACE_DEBUG(X)
void populate_header_received(const FACE::CONNECTION_ID_TYPE &connection_id, const DDS::DomainParticipant_var part, const DDS::SampleInfo &sinfo, FACE::RETURN_CODE_TYPE &return_code)
Definition: FaceTSS.cpp:852
#define ACE_ERROR(X)
DDS::Time_t to_dds_time() const
Definition: TimePoint_T.inl:89
const ReturnCode_t RETCODE_PRECONDITION_NOT_MET
const ReturnCode_t RETCODE_INCONSISTENT_POLICY
FACE::MESSAGE_INSTANCE_GUID create_message_instance_guid(const OpenDDS::DCPS::GUID_t &pub, const CORBA::LongLong &orig_seq)
Definition: FaceTSS.cpp:809
const long DURATION_INFINITE_SEC
Definition: DdsDcpsCore.idl:72
void Receive_Message(CONNECTION_ID_TYPE connection_id, TIMEOUT_TYPE timeout, TRANSACTION_ID_TYPE &transaction_id, MessageHeader &message_header, MESSAGE_SIZE_TYPE message_size, RETURN_CODE_TYPE &return_code)
Definition: FaceTSS.cpp:388
ACE_CDR::LongLong LongLong
sequence< SampleInfo > SampleInfoSeq
const ReturnCode_t RETCODE_ALREADY_DELETED
void Destroy_Connection(CONNECTION_ID_TYPE connection_id, RETURN_CODE_TYPE &return_code)
Definition: FaceTSS.cpp:292
const SampleStateMask ANY_SAMPLE_STATE
key GuidPrefix_t guidPrefix
Definition: DdsDcpsGuid.idl:58
ConnIdToReceiverMap receivers_
Definition: FaceTSS.h:70
OpenDDS_FACE_Export void receive_header(FACE::CONNECTION_ID_TYPE connection_id, FACE::TIMEOUT_TYPE, FACE::TRANSACTION_ID_TYPE &transaction_id, FACE::TS::MessageHeader &message_header, FACE::MESSAGE_SIZE_TYPE message_size, FACE::RETURN_CODE_TYPE &return_code)
Definition: FaceTSS.cpp:341
void Get_Connection_Parameters(CONNECTION_NAME_TYPE &connection_name, CONNECTION_ID_TYPE &connection_id, TRANSPORT_CONNECTION_STATUS_TYPE &status, RETURN_CODE_TYPE &return_code)
Definition: FaceTSS.cpp:175
bool operator==(const DisjointSequence::OrderedRanges< T > &a, const DisjointSequence::OrderedRanges< T > &b)
GUID_t get_repoid(DDS::InstanceHandle_t id) const
FACE::SYSTEM_TIME_TYPE convertDuration(const DDS::Duration_t &duration)
Definition: FaceTSS.cpp:792
Conversion processing and value testing utilities for RTPS GUID_t types.
Definition: GuidConverter.h:62
const ReturnCode_t RETCODE_IMMUTABLE_POLICY
static TimePoint_T< SystemClock > now()
Definition: TimePoint_T.inl:41
long long opendds_reserved_publication_seq
const char *const BUILT_IN_PUBLICATION_TOPIC
void Create_Connection(const CONNECTION_NAME_TYPE connection_name, MESSAGING_PATTERN_TYPE pattern, CONNECTION_ID_TYPE &connection_id, CONNECTION_DIRECTION_TYPE &connection_direction, MESSAGE_SIZE_TYPE &max_message_size, TIMEOUT_TYPE, RETURN_CODE_TYPE &return_code)
Definition: FaceTSS.cpp:95
#define OPENDDS_STRING
OpenDDS_Dcps_Export unsigned int Transport_debug_level
Transport Logging verbosity level.
Definition: debug.cpp:25
OpenDDS_Dcps_Export void set_DCPS_debug_level(unsigned int lvl)
Definition: debug.cpp:98
DOMAINID_TYPE_NATIVE DomainId_t
LM_DEBUG
#define Registered_Data_Types
static OpenDDS_FACE_Export Entities * instance()
Definition: FaceTSS.cpp:725
void apply_to(DDS::PublisherQos &target) const
Definition: QosSettings.cpp:20
ACE_CDR::ULong ULong
InstanceHandle_t publication_handle
FACE::SYSTEM_TIME_TYPE convertTime(const DDS::Time_t &timestamp)
Definition: FaceTSS.cpp:802
FACE::CONNECTION_DIRECTION_TYPE direction_
unsigned long nanosec
Definition: DdsDcpsCore.idl:69
Christopher Diggins *renamed files *fixing compilation errors *adding Visual C project file *removed make Max Lybbert *removed references to missing and unused header
Definition: CHANGELOG.txt:8
const ReturnCode_t RETCODE_TIMEOUT
const ViewStateMask ANY_VIEW_STATE
const ReturnCode_t RETCODE_OUT_OF_RESOURCES
const ReturnCode_t RETCODE_ILLEGAL_OPERATION
FACE::MESSAGE_SIZE_TYPE max_message_size_
Definition: TopicSettings.h:21
LM_WARNING
void Initialize(const CONFIGURATION_RESOURCE configuration_file, RETURN_CODE_TYPE &return_code)
Definition: FaceTSS.cpp:70
FACE::SYSTEM_TIME_TYPE sum_recvd_msgs_latency
Definition: FaceTSS.h:52
FACE::RETURN_CODE_TYPE update_status(FACE::CONNECTION_ID_TYPE connection_id, DDS::ReturnCode_t retcode)
Definition: FaceTSS.cpp:730
ConnIdToSenderMap senders_
Definition: FaceTSS.h:69
ACE_UINT32 crc32(const char *str)
ACE_TEXT("TCP_Factory")
const unsigned long DURATION_INFINITE_NSEC
Definition: DdsDcpsCore.idl:73
static TYPE * instance(void)
const ReturnCode_t RETCODE_NOT_ENABLED
unsigned long nanosec
const ReturnCode_t RETCODE_NO_DATA
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
FACE::TRANSPORT_CONNECTION_STATUS_TYPE connection_status
Definition: FaceTSS.h:74
FACE::TS::MessageHeader last_msg_header
Definition: FaceTSS.h:50
FACE::VALIDITY_TYPE status_valid
Definition: FaceTSS.h:29
const ReturnCode_t RETCODE_ERROR
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
LifespanQosPolicy lifespan
const ReturnCode_t RETCODE_OK
const ReturnCode_t RETCODE_UNSUPPORTED
DDS::DataWriterQos & datawriter_qos()
Definition: QosSettings.h:33
#define TheParticipantFactory
virtual FACE::RETURN_CODE_TYPE messages_waiting(FACE::WAITING_RANGE_TYPE &)
Definition: FaceTSS.h:45
UserDataQosPolicy user_data
UserDataQosPolicy user_data
#define TheServiceParticipant
LM_ERROR
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
FACE::MESSAGE_TYPE_GUID platform_view_guid_
Definition: TopicSettings.h:20
FACE::MESSAGE_TYPE_GUID platform_view_guid
Definition: FaceTSS.h:75
Time_t source_timestamp
const InstanceStateKind ALIVE_INSTANCE_STATE
Boolean is_nil(T x)
DDS::Duration_t convertTimeout(FACE::TIMEOUT_TYPE timeout)
Definition: FaceTSS.cpp:779
void Unregister_Callback(CONNECTION_ID_TYPE connection_id, RETURN_CODE_TYPE &return_code)
Definition: FaceTSS.cpp:272
#define TOPIC_QOS_DEFAULT
const ReturnCode_t RETCODE_BAD_PARAMETER
Definition: FaceTSS.cpp:24