00001 #include "FaceTSS.h"
00002 #include "FACE/TS.hpp"
00003 #include "config/Parser.h"
00004
00005 #include "dds/DCPS/Service_Participant.h"
00006 #include "dds/DCPS/DomainParticipantImpl.h"
00007 #include "dds/DCPS/Registered_Data_Types.h"
00008 #include "dds/DCPS/Marked_Default_Qos.h"
00009 #include "dds/DCPS/BuiltInTopicUtils.h"
00010 #include "dds/DCPS/SafetyProfileStreams.h"
00011 #include "dds/DCPS/SafetyProfilePool.h"
00012 #include "dds/DCPS/GuidConverter.h"
00013 #include "dds/DCPS/Qos_Helper.h"
00014 #include "dds/DdsDcpsCoreC.h"
00015 #include "dds/DCPS/transport/framework/TransportRegistry.h"
00016 #include "dds/DCPS/transport/framework/TransportExceptions.h"
00017
00018 #include <cstring>
00019
00020 #ifndef OPENDDS_SAFETY_PROFILE
00021 using OpenDDS::DCPS::operator==;
00022 #endif
00023
00024 namespace FACE {
00025 namespace TS {
00026
00027 bool MessageHeader::operator==(const MessageHeader& rhs) const
00028 {
00029 return message_instance_guid == rhs.message_instance_guid
00030 && platform_view_guid == rhs.platform_view_guid
00031 && message_source_guid == rhs.message_source_guid
00032 && message_timestamp == rhs.message_timestamp
00033 && message_validity == rhs.message_validity;
00034 }
00035
00036 using OpenDDS::FaceTSS::config::ConnectionSettings;
00037 using OpenDDS::FaceTSS::config::TopicSettings;
00038 using OpenDDS::FaceTSS::config::QosSettings;
00039
00040 namespace {
00041 OpenDDS::FaceTSS::config::Parser parser;
00042
00043 void find_or_create_dp(const DDS::DomainId_t& domainId,
00044 int participantId,
00045 const DDS::DomainParticipantFactory_var& dpf,
00046 DDS::DomainParticipant_var& dp);
00047 void find_or_create_pub(const DDS::PublisherQos& qos,
00048 const DDS::DomainParticipant_var& dp,
00049 DDS::Publisher_var& pub);
00050 void find_or_create_sub(const DDS::SubscriberQos& qos,
00051 const DDS::DomainParticipant_var& dp,
00052 DDS::Subscriber_var& sub);
00053
00054 bool cleanup_opendds_publisher(const DDS::Publisher_var pub);
00055 bool cleanup_opendds_subscriber(const DDS::Subscriber_var sub);
00056 void cleanup_opendds_participant(const DDS::DomainParticipant_var dp);
00057
00058 RETURN_CODE_TYPE create_opendds_entities(CONNECTION_ID_TYPE connectionId,
00059 int participantId,
00060 const DDS::DomainId_t domainId,
00061 const char* topic,
00062 const char* type,
00063 CONNECTION_DIRECTION_TYPE dir,
00064 QosSettings& qos,
00065 const char* transport);
00066 }
00067
00068 using OpenDDS::FaceTSS::Entities;
00069
00070 void Initialize(const CONFIGURATION_RESOURCE configuration_file,
00071 RETURN_CODE_TYPE& return_code)
00072 {
00073 try {
00074 int status = parser.parse(configuration_file);
00075 if (status != 0) {
00076 ACE_ERROR((LM_ERROR,
00077 ACE_TEXT("(%P|%t) ERROR: Initialize() ")
00078 ACE_TEXT("Parser::parse () returned %d\n"),
00079 status));
00080 return_code = INVALID_PARAM;
00081 } else {
00082 return_code = RC_NO_ERROR;
00083 #if defined OPENDDS_SAFETY_PROFILE && defined ACE_HAS_ALLOC_HOOKS
00084 TheServiceParticipant->configure_pool();
00085 #endif
00086 }
00087 } catch (const CORBA::BAD_PARAM& ) {
00088 if (OpenDDS::DCPS::DCPS_debug_level) {
00089 ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: Initialize - INVALID_PARAM\n"));
00090 }
00091 return_code = INVALID_PARAM;
00092 }
00093 }
00094
00095 void Create_Connection(const CONNECTION_NAME_TYPE connection_name,
00096 MESSAGING_PATTERN_TYPE pattern,
00097 CONNECTION_ID_TYPE& connection_id,
00098 CONNECTION_DIRECTION_TYPE& connection_direction,
00099 MESSAGE_SIZE_TYPE& max_message_size,
00100 TIMEOUT_TYPE,
00101 RETURN_CODE_TYPE& return_code)
00102 {
00103 try {
00104 return_code = RC_NO_ERROR;
00105
00106 if (pattern != PUB_SUB) {
00107 return_code = INVALID_CONFIG;
00108 return;
00109 }
00110
00111 ConnectionSettings connection;
00112 TopicSettings topic;
00113 QosSettings qos;
00114
00115
00116 if (!parser.find_connection(connection_name, connection)) {
00117
00118 if (!parser.find_topic(connection.topic_name_, topic)) {
00119
00120 if (parser.find_qos(connection, qos)) {
00121 return_code = INVALID_CONFIG;
00122 }
00123 } else {
00124 return_code = INVALID_CONFIG;
00125 }
00126 } else {
00127 return_code = INVALID_CONFIG;
00128 }
00129
00130 if (return_code != RC_NO_ERROR) {
00131 return;
00132 }
00133
00134 connection_id = connection.connection_id_;
00135 connection_direction = connection.direction_;
00136 max_message_size = topic.max_message_size_;
00137
00138 return_code = create_opendds_entities(connection_id,
00139 connection.participant_id_,
00140 connection.domain_id_,
00141 connection.topic_name_,
00142 topic.type_name_,
00143 connection_direction,
00144 qos,
00145 connection.config_name_);
00146 if (return_code != RC_NO_ERROR) {
00147 return;
00148 }
00149
00150 const SYSTEM_TIME_TYPE refresh_period =
00151 (connection_direction == SOURCE) ?
00152 OpenDDS::FaceTSS::convertDuration(qos.datawriter_qos().lifespan.duration) : 0;
00153
00154 const TRANSPORT_CONNECTION_STATUS_TYPE status = {
00155 0,
00156 max_message_size,
00157 max_message_size,
00158 connection_direction,
00159 0,
00160 refresh_period,
00161 INVALID,
00162 };
00163 Entities::ConnectionInfo& conn_info = Entities::instance()->connections_[connection_id];
00164 conn_info.connection_name = connection_name;
00165 conn_info.connection_status = status;
00166 conn_info.platform_view_guid = topic.platform_view_guid_;
00167 } catch (const CORBA::BAD_PARAM&) {
00168 if (OpenDDS::DCPS::DCPS_debug_level) {
00169 ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: Create_Connection - INVALID_PARAM\n"));
00170 }
00171 return_code = INVALID_PARAM;
00172 }
00173 }
00174
00175 void Get_Connection_Parameters(CONNECTION_NAME_TYPE& connection_name,
00176 CONNECTION_ID_TYPE& connection_id ,
00177 TRANSPORT_CONNECTION_STATUS_TYPE& status,
00178 RETURN_CODE_TYPE& return_code)
00179 {
00180 try {
00181
00182
00183
00184
00185 Entities& entities = *Entities::instance();
00186
00187 if (connection_id != 0 && entities.connections_.count(connection_id)) {
00188
00189
00190 if (connection_name[0]) {
00191
00192 OPENDDS_STRING conn_name = entities.connections_[connection_id].connection_name;
00193 if (std::strcmp(connection_name, conn_name.c_str()) == 0) {
00194 return_code = RC_NO_ERROR;
00195 } else {
00196 return_code = INVALID_PARAM;
00197 }
00198 } else {
00199
00200
00201
00202 entities.connections_[connection_id].connection_name.copy(connection_name,
00203 sizeof(CONNECTION_NAME_TYPE));
00204 connection_name[sizeof(CONNECTION_NAME_TYPE) - 1] = 0;
00205 return_code = RC_NO_ERROR;
00206 }
00207
00208 } else if (connection_name[0] && connection_id == 0) {
00209
00210
00211 ConnectionSettings settings;
00212 if (0 == parser.find_connection(connection_name, settings)) {
00213 connection_id = settings.connection_id_;
00214 return_code = RC_NO_ERROR;
00215 } else {
00216
00217 return_code = INVALID_PARAM;
00218 }
00219 } else {
00220
00221
00222 return_code = INVALID_PARAM;
00223 }
00224 if (return_code == RC_NO_ERROR) {
00225 TRANSPORT_CONNECTION_STATUS_TYPE& cur_status = entities.connections_[connection_id].connection_status;
00226 if (cur_status.CONNECTION_DIRECTION == FACE::DESTINATION) {
00227 Entities::FaceReceiver& receiver = *entities.receivers_[connection_id];
00228 if (receiver.status_valid != FACE::VALID) {
00229 if (OpenDDS::DCPS::DCPS_debug_level > 3) {
00230 ACE_DEBUG((LM_DEBUG, "Get_Connection_Parameters: returning NOT_AVAILABLE due to receiver's status not valid\n"));
00231 }
00232 return_code = NOT_AVAILABLE;
00233 return;
00234 }
00235 if (receiver.total_msgs_recvd != 0) {
00236 cur_status.REFRESH_PERIOD = receiver.sum_recvd_msgs_latency / receiver.total_msgs_recvd;
00237 cur_status.LAST_MSG_VALIDITY = receiver.last_msg_header.message_validity;
00238 } else {
00239 cur_status.REFRESH_PERIOD = 0;
00240 }
00241 WAITING_RANGE_TYPE num_waiting;
00242 if (receiver.messages_waiting(num_waiting) == FACE::RC_NO_ERROR) {
00243 cur_status.WAITING_PROCESSES_OR_MESSAGES = num_waiting;
00244 } else {
00245 if (OpenDDS::DCPS::DCPS_debug_level > 3) {
00246 ACE_DEBUG((LM_DEBUG, "Get_Connection_Parameters: returning NOT_AVAILABLE due to messages_waiting\n"));
00247 }
00248 return_code = NOT_AVAILABLE;
00249 return;
00250 }
00251 } else {
00252
00253
00254 Entities::FaceSender& sender = entities.senders_[connection_id];
00255 if (sender.status_valid != FACE::VALID) {
00256 return_code = NOT_AVAILABLE;
00257 return;
00258 }
00259 cur_status.REFRESH_PERIOD = 0;
00260 cur_status.WAITING_PROCESSES_OR_MESSAGES = 0;
00261 }
00262 status = cur_status;
00263 }
00264 } catch (const CORBA::BAD_PARAM&) {
00265 if (OpenDDS::DCPS::DCPS_debug_level) {
00266 ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: Get_Connection_Parameters - INVALID_PARAM\n"));
00267 }
00268 return_code = INVALID_PARAM;
00269 }
00270 }
00271
00272 void Unregister_Callback(CONNECTION_ID_TYPE connection_id,
00273 RETURN_CODE_TYPE& return_code)
00274 {
00275 try {
00276 Entities& entities = *Entities::instance();
00277 Entities::ConnIdToReceiverMap& readers = entities.receivers_;
00278 if (readers.count(connection_id)) {
00279 readers[connection_id]->dr->set_listener(NULL, 0);
00280 return_code = RC_NO_ERROR;
00281 return;
00282 }
00283 } catch (const CORBA::BAD_PARAM&) {
00284 if (OpenDDS::DCPS::DCPS_debug_level) {
00285 ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: Unregister_Callback - INVALID_PARAM\n"));
00286 }
00287 }
00288
00289 return_code = INVALID_PARAM;
00290 }
00291
00292 void Destroy_Connection(CONNECTION_ID_TYPE connection_id,
00293 RETURN_CODE_TYPE& return_code)
00294 {
00295 try {
00296 Entities& entities = *Entities::instance();
00297 Entities::ConnIdToSenderMap& writers = entities.senders_;
00298 Entities::ConnIdToReceiverMap& readers = entities.receivers_;
00299
00300 DDS::DomainParticipant_var dp;
00301 bool try_cleanup_participant = false;
00302 if (writers.count(connection_id)) {
00303 const DDS::DataWriter_var datawriter = writers[connection_id].dw;
00304 const DDS::Publisher_var pub = datawriter->get_publisher();
00305 writers.erase(connection_id);
00306 pub->delete_datawriter(datawriter);
00307 dp = pub->get_participant();
00308 try_cleanup_participant = cleanup_opendds_publisher(pub);
00309
00310 } else if (readers.count(connection_id)) {
00311 const DDS::DataReader_var datareader = readers[connection_id]->dr;
00312 const DDS::Subscriber_var sub = datareader->get_subscriber();
00313 delete readers[connection_id];
00314 readers.erase(connection_id);
00315 sub->delete_datareader(datareader);
00316 dp = sub->get_participant();
00317 try_cleanup_participant = cleanup_opendds_subscriber(sub);
00318 }
00319
00320 if (!dp) {
00321 return_code = INVALID_PARAM;
00322 return;
00323 }
00324
00325 if (try_cleanup_participant) {
00326 cleanup_opendds_participant(dp);
00327 }
00328
00329 entities.connections_.erase(connection_id);
00330 return_code = RC_NO_ERROR;
00331 } catch (const CORBA::BAD_PARAM&) {
00332 if (OpenDDS::DCPS::DCPS_debug_level) {
00333 ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: Destroy_Connection - INVALID_PARAM\n"));
00334 }
00335 return_code = INVALID_PARAM;
00336 }
00337 }
00338
00339 OpenDDS_FACE_Export
00340 void receive_header( FACE::CONNECTION_ID_TYPE connection_id,
00341 FACE::TIMEOUT_TYPE ,
00342 FACE::TRANSACTION_ID_TYPE& transaction_id,
00343 FACE::TS::MessageHeader& message_header,
00344 FACE::MESSAGE_SIZE_TYPE message_size,
00345 FACE::RETURN_CODE_TYPE& return_code)
00346 {
00347 try {
00348 Entities::ConnIdToReceiverMap& readers =
00349 Entities::instance()->receivers_;
00350
00351
00352
00353 if (!readers.count(connection_id) || transaction_id == 0) {
00354 if (OpenDDS::DCPS::DCPS_debug_level > 3) {
00355 ACE_DEBUG((LM_DEBUG, "(%P|%t) receive_header - INVALID_PARAM - "
00356 "could not find reader for connection_id: %d OR transaction id[%d] == 0 \n",
00357 connection_id,
00358 transaction_id));
00359 }
00360 return_code = FACE::INVALID_PARAM;
00361 return;
00362 }
00363
00364 if (message_size < 0 || (unsigned)message_size < sizeof(FACE::TS::MessageHeader)) {
00365 if (OpenDDS::DCPS::DCPS_debug_level) {
00366 ACE_DEBUG((LM_DEBUG, "(%P|%t) receive_header - INVALID_PARAM - message_size: %d is < %d \n",
00367 message_size,
00368 sizeof(FACE::TS::MessageHeader)));
00369 }
00370 return_code = FACE::INVALID_PARAM;
00371 return;
00372 }
00373 if (transaction_id == readers[connection_id]->last_msg_tid) {
00374 message_header = readers[connection_id]->last_msg_header;
00375 return_code = OpenDDS::FaceTSS::update_status(connection_id, DDS::RETCODE_OK);
00376 } else {
00377 return_code = OpenDDS::FaceTSS::update_status(connection_id, DDS::RETCODE_BAD_PARAMETER);
00378 }
00379 } catch (const CORBA::BAD_PARAM&) {
00380 if (OpenDDS::DCPS::DCPS_debug_level) {
00381 ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: receive_header - INVALID_PARAM\n"));
00382 }
00383 return_code = INVALID_PARAM;
00384 }
00385 }
00386
00387 void Receive_Message(
00388 CONNECTION_ID_TYPE connection_id,
00389 TIMEOUT_TYPE timeout,
00390 TRANSACTION_ID_TYPE& transaction_id,
00391 MessageHeader& message_header,
00392 MESSAGE_SIZE_TYPE message_size,
00393 RETURN_CODE_TYPE& return_code)
00394 {
00395 try {
00396 receive_header(connection_id, timeout,
00397 transaction_id, message_header,
00398 message_size, return_code);
00399 } catch (const CORBA::BAD_PARAM&) {
00400 if (OpenDDS::DCPS::DCPS_debug_level) {
00401 ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: Receive_Message - INVALID_PARAM\n"));
00402 }
00403 return_code = INVALID_PARAM;
00404 }
00405 }
00406
00407 namespace {
00408 void find_or_create_dp(const DDS::DomainId_t& domainId,
00409 int participantId,
00410 const DDS::DomainParticipantFactory_var& dpf,
00411 DDS::DomainParticipant_var& dp) {
00412 DDS::DomainParticipant_var temp_dp;
00413
00414 temp_dp = dpf->lookup_participant(domainId);
00415 if (!temp_dp) {
00416 if (OpenDDS::DCPS::DCPS_debug_level > 3) {
00417 ACE_DEBUG((LM_DEBUG, "(%P|%t) find_or_create_dp - created new participant for domainId: %d\n", domainId));
00418 }
00419 DDS::DomainParticipantQos qos;
00420 dpf->get_default_participant_qos(qos);
00421 qos.user_data.value.length(6);
00422 qos.user_data.value[0] = (participantId >> 0) & 0xFF;
00423 qos.user_data.value[1] = (participantId >> 8) & 0xFF;
00424 qos.user_data.value[2] = (participantId >> 16) & 0xFF;
00425 qos.user_data.value[3] = (participantId >> 24) & 0xFF;
00426 qos.user_data.value[4] = 0;
00427 qos.user_data.value[5] = 0;
00428 dp = dpf->create_participant(domainId, qos, 0, 0);
00429 return;
00430 }
00431 if (OpenDDS::DCPS::DCPS_debug_level > 3) {
00432 ACE_DEBUG((LM_DEBUG, "(%P|%t) find_or_create_dp - found existing participant for domainId: %d \n", domainId));
00433 }
00434 dp = temp_dp;
00435 }
00436
00437 void find_or_create_pub(const DDS::PublisherQos& qos,
00438 const DDS::DomainParticipant_var& dp,
00439 DDS::Publisher_var& pub) {
00440 DDS::DomainParticipant_var temp_dp;
00441 DDS::Publisher_var temp_pub;
00442
00443 Entities::ConnIdToSenderMap& writers = Entities::instance()->senders_;
00444 Entities::ConnIdToSenderMap::iterator wtrIter = writers.begin();
00445
00446 while (wtrIter != writers.end()) {
00447 temp_pub = wtrIter->second.dw->get_publisher();
00448 temp_dp = temp_pub->get_participant();
00449 DDS::PublisherQos temp_qos;
00450 temp_pub->get_qos(temp_qos);
00451 if (dp->get_domain_id() == temp_dp->get_domain_id() &&
00452 temp_qos == qos) {
00453 if (OpenDDS::DCPS::DCPS_debug_level > 3) {
00454 ACE_DEBUG((LM_DEBUG, "(%P|%t) find_or_create_pub - found existing publisher in senders\n"));
00455 }
00456 pub = temp_pub;
00457 return;
00458 } else {
00459 ++wtrIter;
00460 }
00461 }
00462 if (OpenDDS::DCPS::DCPS_debug_level > 3) {
00463 ACE_DEBUG((LM_DEBUG, "(%P|%t) find_or_create_pub - created new publisher\n"));
00464 }
00465 pub = dp->create_publisher(qos, 0, 0);
00466 }
00467
00468 void find_or_create_sub(const DDS::SubscriberQos& qos,
00469 const DDS::DomainParticipant_var& dp,
00470 DDS::Subscriber_var& sub)
00471 {
00472 DDS::DomainParticipant_var temp_dp;
00473 DDS::Subscriber_var temp_sub;
00474
00475 Entities::ConnIdToReceiverMap& readers = Entities::instance()->receivers_;
00476 Entities::ConnIdToReceiverMap::iterator rdrIter = readers.begin();
00477
00478 while (rdrIter != readers.end()) {
00479 temp_sub = rdrIter->second->dr->get_subscriber();
00480 temp_dp = temp_sub->get_participant();
00481 DDS::SubscriberQos temp_qos;
00482 temp_sub->get_qos(temp_qos);
00483 if (dp->get_domain_id() == temp_dp->get_domain_id() &&
00484 temp_qos == qos) {
00485 if (OpenDDS::DCPS::DCPS_debug_level > 3) {
00486 ACE_DEBUG((LM_DEBUG, "(%P|%t) find_or_create_sub - found existing subscriber in receivers\n"));
00487 }
00488 sub = temp_sub;
00489 return;
00490 } else {
00491 ++rdrIter;
00492 }
00493 }
00494 if (OpenDDS::DCPS::DCPS_debug_level > 3) {
00495 ACE_DEBUG((LM_DEBUG, "(%P|%t) find_or_create_sub - created new subscriber\n"));
00496 }
00497 sub = dp->create_subscriber(qos, 0, 0);
00498 }
00499
00500 bool cleanup_opendds_publisher(const DDS::Publisher_var pub)
00501 {
00502 DDS::Publisher_var temp_pub;
00503 DDS::PublisherQos pub_qos;
00504 pub->get_qos(pub_qos);
00505 DDS::DomainParticipant_var dp = pub->get_participant();
00506 DDS::DomainParticipant_var temp_dp;
00507 DDS::PublisherQos temp_qos;
00508
00509 Entities::ConnIdToSenderMap& writers = Entities::instance()->senders_;
00510 Entities::ConnIdToSenderMap::iterator wtrIter = writers.begin();
00511 while (wtrIter != writers.end()) {
00512 temp_pub = wtrIter->second.dw->get_publisher();
00513 temp_dp = temp_pub->get_participant();
00514 if (dp->get_domain_id() == temp_dp->get_domain_id()) {
00515 temp_pub->get_qos(temp_qos);
00516 if (pub_qos == temp_qos) {
00517 if (OpenDDS::DCPS::DCPS_debug_level > 3) {
00518 ACE_DEBUG((LM_DEBUG, "(%P|%t) cleanup_opendds_publisher - publisher still in use by other writer\n"));
00519 }
00520 return false;
00521 }
00522 }
00523 ++wtrIter;
00524 }
00525 if (OpenDDS::DCPS::DCPS_debug_level > 3) {
00526 ACE_DEBUG((LM_DEBUG, "(%P|%t) cleanup_opendds_publisher - publisher no longer in use, delete pub\n"));
00527 }
00528 dp->delete_publisher(pub);
00529 return true;
00530 }
00531
00532 bool cleanup_opendds_subscriber(const DDS::Subscriber_var sub)
00533 {
00534 DDS::Subscriber_var temp_sub;
00535 DDS::SubscriberQos sub_qos;
00536 sub->get_qos(sub_qos);
00537 DDS::DomainParticipant_var dp = sub->get_participant();
00538 DDS::DomainParticipant_var temp_dp;
00539 DDS::SubscriberQos temp_qos;
00540
00541
00542 Entities::ConnIdToReceiverMap& readers = Entities::instance()->receivers_;
00543 Entities::ConnIdToReceiverMap::iterator rdrIter = readers.begin();
00544 while (rdrIter != readers.end()) {
00545 temp_sub = rdrIter->second->dr->get_subscriber();
00546 temp_dp = temp_sub->get_participant();
00547
00548 if (dp->get_domain_id() == temp_dp->get_domain_id()) {
00549 temp_sub->get_qos(temp_qos);
00550 if (sub_qos == temp_qos) {
00551 if (OpenDDS::DCPS::DCPS_debug_level > 3) {
00552 ACE_DEBUG((LM_DEBUG, "(%P|%t) cleanup_opendds_subscriber - subscriber still in use by other reader\n"));
00553 }
00554 return false;
00555 }
00556 }
00557 ++rdrIter;
00558 }
00559 if (OpenDDS::DCPS::DCPS_debug_level > 3) {
00560 ACE_DEBUG((LM_DEBUG, "(%P|%t) cleanup_opendds_subscriber - subscriber no longer in use, delete sub\n"));
00561 }
00562 dp->delete_subscriber(sub);
00563 return true;
00564 }
00565
00566 void cleanup_opendds_participant(const DDS::DomainParticipant_var dp)
00567 {
00568 DDS::DomainParticipant_var temp_dp;
00569 Entities::ConnIdToReceiverMap& readers = Entities::instance()->receivers_;
00570 Entities::ConnIdToReceiverMap::iterator rdrIter = readers.begin();
00571 while (rdrIter != readers.end()) {
00572 DDS::Subscriber_var sub = rdrIter->second->dr->get_subscriber();
00573 temp_dp = sub->get_participant();
00574 if (dp->get_domain_id() == temp_dp->get_domain_id()) {
00575 if (OpenDDS::DCPS::DCPS_debug_level > 3) {
00576 ACE_DEBUG((LM_DEBUG, "(%P|%t) cleanup_opendds_participant - participant still in use by reader\n"));
00577 }
00578 return;
00579 } else {
00580 ++rdrIter;
00581 }
00582 }
00583
00584 Entities::ConnIdToSenderMap& writers = Entities::instance()->senders_;
00585 Entities::ConnIdToSenderMap::iterator wtrIter = writers.begin();
00586
00587 while (wtrIter != writers.end()) {
00588 DDS::Publisher_var publisher = wtrIter->second.dw->get_publisher();
00589 temp_dp = publisher->get_participant();
00590 if (dp->get_domain_id() == temp_dp->get_domain_id()) {
00591 if (OpenDDS::DCPS::DCPS_debug_level > 3) {
00592 ACE_DEBUG((LM_DEBUG, "(%P|%t) cleanup_opendds_participant - participant still in use by writer\n"));
00593 }
00594 return;
00595 } else {
00596 ++wtrIter;
00597 }
00598 }
00599 if (OpenDDS::DCPS::DCPS_debug_level > 3) {
00600 ACE_DEBUG((LM_DEBUG, "(%P|%t) cleanup_opendds_participant - participant for domain: %d no longer in use, delete entities and participant\n", dp->get_domain_id()));
00601 }
00602 dp->delete_contained_entities();
00603 const DDS::DomainParticipantFactory_var dpf = TheParticipantFactory;
00604 dpf->delete_participant(dp);
00605 }
00606
00607 RETURN_CODE_TYPE create_opendds_entities(CONNECTION_ID_TYPE connectionId,
00608 int participantId,
00609 const DDS::DomainId_t domainId,
00610 const char* topicName,
00611 const char* type,
00612 CONNECTION_DIRECTION_TYPE dir,
00613 QosSettings& qos_settings,
00614 const char* transport)
00615 {
00616 #ifdef DEBUG_OPENDDS_FACETSS
00617 OpenDDS::DCPS::set_DCPS_debug_level(8);
00618 OpenDDS::DCPS::Transport_debug_level = 5;
00619 TheServiceParticipant->set_BIT(false);
00620 #endif
00621
00622 const DDS::DomainParticipantFactory_var dpf = TheParticipantFactory;
00623 if (!dpf) return INVALID_PARAM;
00624
00625 DDS::DomainParticipant_var dp;
00626 find_or_create_dp(domainId, participantId, dpf, dp);
00627 if (!dp) return INVALID_PARAM;
00628
00629 using OpenDDS::DCPS::Data_Types_Register;
00630 OpenDDS::DCPS::TypeSupport_var ts =
00631 Registered_Data_Types->lookup(dp, type);
00632 if (!ts) {
00633 ts = Registered_Data_Types->lookup(0, type);
00634 if (!ts) return INVALID_PARAM;
00635 Registered_Data_Types->register_type(dp, type, ts);
00636 }
00637
00638 const DDS::Topic_var topic =
00639 dp->create_topic(topicName, type, TOPIC_QOS_DEFAULT, 0, 0);
00640 if (!topic) return INVALID_PARAM;
00641
00642 if (dir == SOURCE) {
00643 DDS::PublisherQos publisher_qos;
00644 qos_settings.apply_to(publisher_qos);
00645
00646 DDS::Publisher_var pub;
00647 find_or_create_pub(publisher_qos, dp, pub);
00648 if (!pub) return INVALID_PARAM;
00649
00650 if (transport && transport[0]) {
00651 OpenDDS::DCPS::TransportConfig_rch config = TheTransportRegistry->get_config(transport);
00652 if (config.is_nil()) return INVALID_PARAM;
00653 try {
00654 TheTransportRegistry->bind_config(config, pub);
00655 } catch (const OpenDDS::DCPS::Transport::Exception&) {
00656 return INVALID_PARAM;
00657 }
00658 }
00659
00660 DDS::DataWriterQos datawriter_qos;
00661 qos_settings.apply_to(datawriter_qos);
00662
00663
00664 datawriter_qos.user_data.value.length (3);
00665 datawriter_qos.user_data.value[0] = (connectionId >> 0) & 0xFF;
00666 datawriter_qos.user_data.value[1] = (connectionId >> 8) & 0xFF;
00667 datawriter_qos.user_data.value[2] = (connectionId >> 16) & 0xFF;
00668
00669 const DDS::DataWriter_var dw =
00670 pub->create_datawriter(topic, datawriter_qos, 0, 0);
00671 if (!dw) return INVALID_PARAM;
00672
00673 Entities::instance()->senders_[connectionId].dw = dw;
00674
00675 } else {
00676 DDS::SubscriberQos subscriber_qos;
00677 qos_settings.apply_to(subscriber_qos);
00678
00679 DDS::Subscriber_var sub;
00680 find_or_create_sub(subscriber_qos, dp, sub);
00681 if (!sub) return INVALID_PARAM;
00682
00683 if (transport && transport[0]) {
00684 OpenDDS::DCPS::TransportConfig_rch config = TheTransportRegistry->get_config(transport);
00685 if (config.is_nil()) return INVALID_PARAM;
00686 try {
00687 TheTransportRegistry->bind_config(config, sub);
00688 } catch (const OpenDDS::DCPS::Transport::Exception&) {
00689 return INVALID_PARAM;
00690 }
00691 }
00692
00693 DDS::DataReaderQos datareader_qos;
00694 qos_settings.apply_to(datareader_qos);
00695
00696
00697 datareader_qos.user_data.value.length (3);
00698 datareader_qos.user_data.value[0] = (connectionId >> 0) & 0xFF;
00699 datareader_qos.user_data.value[1] = (connectionId >> 8) & 0xFF;
00700 datareader_qos.user_data.value[2] = (connectionId >> 16) & 0xFF;
00701
00702 const DDS::DataReader_var dr =
00703 sub->create_datareader(topic, datareader_qos, 0, 0);
00704 if (!dr) return INVALID_PARAM;
00705 Entities::instance()->receivers_[connectionId] = new Entities::FaceReceiver();
00706 Entities::instance()->receivers_[connectionId]->dr = dr;
00707 }
00708
00709 return RC_NO_ERROR;
00710 }
00711 }
00712
00713 }}
00714
00715 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00716
00717 namespace OpenDDS {
00718 namespace FaceTSS {
00719
00720 Entities::Entities() {}
00721 Entities::~Entities() {}
00722
00723 Entities* Entities::instance()
00724 {
00725 return ACE_Singleton<Entities, ACE_Thread_Mutex>::instance();
00726 }
00727
00728 FACE::RETURN_CODE_TYPE update_status(FACE::CONNECTION_ID_TYPE connection_id,
00729 DDS::ReturnCode_t retcode)
00730 {
00731 FACE::TRANSPORT_CONNECTION_STATUS_TYPE& status =
00732 Entities::instance()->connections_[connection_id].connection_status;
00733 FACE::RETURN_CODE_TYPE rc = FACE::INVALID_PARAM;
00734
00735 switch (retcode) {
00736 case DDS::RETCODE_OK:
00737 status.LAST_MSG_VALIDITY = FACE::VALID;
00738 return FACE::RC_NO_ERROR;
00739
00740 case DDS::RETCODE_ERROR:
00741 rc = FACE::CONNECTION_CLOSED; break;
00742
00743 case DDS::RETCODE_BAD_PARAMETER:
00744 rc = FACE::INVALID_PARAM; break;
00745
00746 case DDS::RETCODE_OUT_OF_RESOURCES:
00747 rc = FACE::DATA_BUFFER_TOO_SMALL; break;
00748
00749 case DDS::RETCODE_PRECONDITION_NOT_MET:
00750 case DDS::RETCODE_NOT_ENABLED:
00751 rc = FACE::INVALID_MODE; break;
00752
00753 case DDS::RETCODE_IMMUTABLE_POLICY:
00754 case DDS::RETCODE_INCONSISTENT_POLICY:
00755 rc = FACE::INVALID_CONFIG; break;
00756
00757 case DDS::RETCODE_ALREADY_DELETED:
00758 rc = FACE::CONNECTION_CLOSED; break;
00759
00760 case DDS::RETCODE_TIMEOUT:
00761 rc = FACE::TIMED_OUT; break;
00762
00763 case DDS::RETCODE_UNSUPPORTED:
00764 case DDS::RETCODE_NO_DATA:
00765 rc = FACE::NOT_AVAILABLE; break;
00766
00767 case DDS::RETCODE_ILLEGAL_OPERATION:
00768 rc = FACE::PERMISSION_DENIED; break;
00769 }
00770
00771 status.LAST_MSG_VALIDITY = FACE::INVALID;
00772 return rc;
00773 }
00774
00775 enum { NSEC_PER_SEC = 1000000000 };
00776
00777 DDS::Duration_t convertTimeout(FACE::TIMEOUT_TYPE timeout)
00778 {
00779 if (timeout == FACE::INF_TIME_VALUE) {
00780 static const DDS::Duration_t dds_inf = {DDS::DURATION_INFINITE_SEC,
00781 DDS::DURATION_INFINITE_NSEC};
00782 return dds_inf;
00783 }
00784
00785 DDS::Duration_t dur = {static_cast<int>(timeout / NSEC_PER_SEC),
00786 static_cast<unsigned int>(timeout % NSEC_PER_SEC)};
00787 return dur;
00788 }
00789
00790 FACE::SYSTEM_TIME_TYPE convertDuration(const DDS::Duration_t& duration)
00791 {
00792 if (duration.sec == DDS::DURATION_INFINITE_SEC
00793 && duration.nanosec == DDS::DURATION_INFINITE_NSEC) {
00794 return FACE::INF_TIME_VALUE;
00795 }
00796 return duration.nanosec +
00797 duration.sec * static_cast<FACE::SYSTEM_TIME_TYPE>(NSEC_PER_SEC);
00798 }
00799
00800 FACE::SYSTEM_TIME_TYPE convertTime(const DDS::Time_t& timestamp)
00801 {
00802 return timestamp.nanosec +
00803 timestamp.sec * static_cast<FACE::SYSTEM_TIME_TYPE>(NSEC_PER_SEC);
00804 }
00805
00806 FACE::MESSAGE_INSTANCE_GUID
00807 create_message_instance_guid(const OpenDDS::DCPS::RepoId& pub, const CORBA::LongLong& orig_seq)
00808 {
00809 OpenDDS::DCPS::GuidConverter writer(pub);
00810
00811 FACE::MESSAGE_INSTANCE_GUID message_instance_guid;
00812 FACE::LongLong mig_low;
00813 FACE::LongLong masked_seq;
00814
00815
00816 FACE::Long prefix_representation = ACE::crc32(reinterpret_cast<const void*>(&pub), sizeof(pub));
00817 masked_seq = orig_seq >> 32;
00818
00819 if (masked_seq) {
00820 ACE_DEBUG((LM_WARNING, "(%P|%t) WARNING: create_message_instance_guid - seq does not fit in FACE::Long, truncating high bits to fit\n"));
00821 }
00822 mig_low = orig_seq & 0xFFFFFFFF;
00823 message_instance_guid = (((FACE::LongLong) prefix_representation) << 32 ) | ((FACE::LongLong) mig_low);
00824
00825
00826
00827
00828
00829
00830
00831
00832
00833
00834
00835
00836
00837
00838
00839
00840
00841
00842
00843
00844
00845
00846
00847 return message_instance_guid;
00848 }
00849
00850 void populate_header_received(const FACE::CONNECTION_ID_TYPE& connection_id,
00851 const DDS::DomainParticipant_var part,
00852 const DDS::SampleInfo& sinfo,
00853 FACE::RETURN_CODE_TYPE& return_code)
00854 {
00855 Entities::ConnIdToReceiverMap& readers = Entities::instance()->receivers_;
00856 if (!readers.count(connection_id)) {
00857 return_code = FACE::INVALID_PARAM;
00858 return;
00859 }
00860 FACE::TS::MessageHeader& header = readers[connection_id]->last_msg_header;
00861
00862 header.platform_view_guid = Entities::instance()->connections_[connection_id].platform_view_guid;
00863
00864 DDS::Subscriber_var temp_sub = readers[connection_id]->dr->get_subscriber();
00865 DDS::DomainParticipant_var temp_dp = temp_sub->get_participant();
00866 OpenDDS::DCPS::DomainParticipantImpl* dpi = dynamic_cast<OpenDDS::DCPS::DomainParticipantImpl*>(temp_dp.in());
00867 if (!dpi) {
00868 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) populate_header_received: ")
00869 ACE_TEXT("failed to get DomainParticipantImpl.\n")));
00870 return_code = FACE::NOT_AVAILABLE;
00871 return;
00872 }
00873 const OpenDDS::DCPS::RepoId pub = dpi->get_repoid(sinfo.publication_handle);
00874 header.message_instance_guid = create_message_instance_guid(pub, sinfo.opendds_reserved_publication_seq);
00875
00876 header.message_timestamp = convertTime(sinfo.source_timestamp);
00877 ACE_Time_Value now(ACE_OS::gettimeofday());
00878
00879 readers[connection_id]->sum_recvd_msgs_latency += (convertTime(OpenDDS::DCPS::time_value_to_time(now)) - header.message_timestamp);
00880 ++readers[connection_id]->total_msgs_recvd;
00881
00882 if (OpenDDS::DCPS::DCPS_debug_level > 8) {
00883 ACE_DEBUG((LM_DEBUG, "(%P|%t) populate_header_received: Latency is now (tot_latency %d / tot_msgs_recvd %d): %d\n",
00884 readers[connection_id]->sum_recvd_msgs_latency,
00885 readers[connection_id]->total_msgs_recvd,
00886 readers[connection_id]->sum_recvd_msgs_latency/readers[connection_id]->total_msgs_recvd));
00887 }
00888
00889 DDS::UserDataQosPolicy qos_user_data;
00890 DDS::LifespanQosPolicy qos_lifespan;
00891 const OpenDDS::DCPS::RepoId sub = dpi->get_id();
00892
00893
00894 if (std::memcmp(pub.guidPrefix, sub.guidPrefix, sizeof(sub.guidPrefix)) == 0) {
00895
00896 Entities::ConnIdToSenderMap& writers = Entities::instance()->senders_;
00897 Entities::ConnIdToSenderMap::iterator wtrIter = writers.begin();
00898
00899
00900 while (wtrIter != writers.end()) {
00901 if (wtrIter->second.dw->get_instance_handle() == sinfo.publication_handle) {
00902 break;
00903 }
00904 ++wtrIter;
00905 }
00906
00907 if (wtrIter == writers.end()) {
00908 ACE_ERROR((LM_ERROR, "(%P|%t) populate_header_received: failed to find matching DataWriter instance.\n"));
00909 return_code = FACE::NOT_AVAILABLE;
00910 return;
00911 }
00912
00913 DDS::DataWriterQos qos;
00914 wtrIter->second.dw->get_qos(qos);
00915 qos_lifespan = qos.lifespan;
00916 qos_user_data = qos.user_data;
00917
00918 } else {
00919 ::DDS::Subscriber_var bit_subscriber
00920 = part->get_builtin_subscriber () ;
00921
00922 ::DDS::DataReader_var reader
00923 = bit_subscriber->lookup_datareader (OpenDDS::DCPS::BUILT_IN_PUBLICATION_TOPIC) ;
00924 ::DDS::PublicationBuiltinTopicDataDataReader_var pub_reader
00925 = ::DDS::PublicationBuiltinTopicDataDataReader::_narrow (reader.in ());
00926 if (CORBA::is_nil (pub_reader.in ())) {
00927 ACE_ERROR((LM_ERROR, "(%P|%t) populate_header_received: failed to get BUILT_IN_PUBLICATION_TOPIC datareader.\n"));
00928 return_code = FACE::NOT_AVAILABLE;
00929 return;
00930 }
00931
00932 ::DDS::ReturnCode_t ret;
00933 ::DDS::SampleInfoSeq pubinfos(1);
00934 ::DDS::PublicationBuiltinTopicDataSeq pubdata(1);
00935
00936 ret = pub_reader->read_instance(pubdata,
00937 pubinfos,
00938 1,
00939 sinfo.publication_handle,
00940 ::DDS::ANY_SAMPLE_STATE,
00941 ::DDS::ANY_VIEW_STATE,
00942 ::DDS::ALIVE_INSTANCE_STATE);
00943
00944
00945 if (ret != ::DDS::RETCODE_OK && ret != ::DDS::RETCODE_NO_DATA) {
00946 ACE_ERROR((LM_ERROR,
00947 "(%P|%t) populate_header_received: failed to read BIT publication data.\n"));
00948 return_code = FACE::NOT_AVAILABLE;
00949 return;
00950 }
00951
00952 const CORBA::ULong i = 0;
00953 qos_lifespan = pubdata[i].lifespan;
00954 qos_user_data = pubdata[i].user_data;
00955 }
00956
00957 header.message_source_guid =
00958 (qos_user_data.value[0] << 0) |
00959 (qos_user_data.value[1] << 8) |
00960 (qos_user_data.value[2] << 16);
00961
00962
00963
00964
00965 DDS::Duration_t lifespan = qos_lifespan.duration;
00966 if (lifespan.sec != DDS::DURATION_INFINITE_SEC &&
00967 lifespan.nanosec != DDS::DURATION_INFINITE_NSEC) {
00968
00969
00970 DDS::Time_t const tmp = {
00971 sinfo.source_timestamp.sec + lifespan.sec,
00972 sinfo.source_timestamp.nanosec + lifespan.nanosec
00973 };
00974
00975
00976
00977 ACE_Time_Value const expiration_time(
00978 OpenDDS::DCPS::time_to_time_value(tmp));
00979
00980 if (now >= expiration_time) {
00981
00982 header.message_validity = FACE::INVALID;
00983 return_code = FACE::RC_NO_ERROR;
00984 return;
00985 }
00986 }
00987
00988 header.message_validity = FACE::VALID;
00989 return_code = FACE::RC_NO_ERROR;
00990 }
00991 }}
00992
00993 OPENDDS_END_VERSIONED_NAMESPACE_DECL