00001 #include "ace/config-macros.h"
00002 #include "FACE/TS.hpp"
00003 #include "FaceTSS.h"
00004 #include "config/Parser.h"
00005
00006 #include "dds/DCPS/Service_Participant.h"
00007 #include "dds/DCPS/DomainParticipantImpl.h"
00008 #include "dds/DCPS/Registered_Data_Types.h"
00009 #include "dds/DCPS/Marked_Default_Qos.h"
00010 #include "dds/DCPS/BuiltInTopicUtils.h"
00011 #include "dds/DCPS/SafetyProfileStreams.h"
00012 #include "dds/DCPS/SafetyProfilePool.h"
00013 #include "dds/DCPS/Qos_Helper.h"
00014 #include "dds/DdsDcpsCoreC.h"
00015 #include "dds/DCPS/transport/framework/TransportRegistry.h"
00016
00017 #include <cstring>
00018
00019 #ifndef OPENDDS_SAFETY_PROFILE
00020 using OpenDDS::DCPS::operator==;
00021 #endif
00022
00023 namespace FACE {
00024 namespace TS {
00025
00026 bool MessageHeader::operator==(const MessageHeader& rhs) const
00027 {
00028 return message_instance_guid == rhs.message_instance_guid
00029 && platform_view_guid == rhs.platform_view_guid
00030 && message_source_guid == rhs.message_source_guid
00031 && message_timestamp == rhs.message_timestamp
00032 && message_validity == rhs.message_validity;
00033 }
00034
00035 using OpenDDS::FaceTSS::config::ConnectionSettings;
00036 using OpenDDS::FaceTSS::config::TopicSettings;
00037 using OpenDDS::FaceTSS::config::QosSettings;
00038
00039 namespace {
00040 OpenDDS::FaceTSS::config::Parser parser;
00041
00042 void find_or_create_dp(const DDS::DomainId_t& domainId,
00043 int participantId,
00044 const DDS::DomainParticipantFactory_var& dpf,
00045 DDS::DomainParticipant_var& dp);
00046 void find_or_create_pub(const DDS::PublisherQos& qos,
00047 const DDS::DomainParticipant_var& dp,
00048 DDS::Publisher_var& pub);
00049 void find_or_create_sub(const DDS::SubscriberQos& qos,
00050 const DDS::DomainParticipant_var& dp,
00051 DDS::Subscriber_var& sub);
00052
00053 bool cleanup_opendds_publisher(const DDS::Publisher_var pub);
00054 bool cleanup_opendds_subscriber(const DDS::Subscriber_var sub);
00055 void cleanup_opendds_participant(const DDS::DomainParticipant_var dp);
00056
00057 RETURN_CODE_TYPE create_opendds_entities(CONNECTION_ID_TYPE connectionId,
00058 int participantId,
00059 const DDS::DomainId_t domainId,
00060 const char* topic,
00061 const char* type,
00062 CONNECTION_DIRECTION_TYPE dir,
00063 QosSettings& qos,
00064 const char* transport);
00065 }
00066
00067 using OpenDDS::FaceTSS::Entities;
00068
00069 void Initialize(const CONFIGURATION_RESOURCE configuration_file,
00070 RETURN_CODE_TYPE& return_code)
00071 {
00072 int status = parser.parse(configuration_file);
00073 if (status != 0) {
00074 ACE_ERROR((LM_ERROR,
00075 ACE_TEXT("(%P|%t) ERROR: Initialize() ")
00076 ACE_TEXT("Parser::parse () returned %d\n"),
00077 status));
00078 return_code = INVALID_PARAM;
00079 } else {
00080 return_code = RC_NO_ERROR;
00081 #if defined OPENDDS_SAFETY_PROFILE && defined ACE_HAS_ALLOC_HOOKS
00082 TheServiceParticipant->configure_pool();
00083 #endif
00084 }
00085 }
00086
00087 void Create_Connection(const CONNECTION_NAME_TYPE connection_name,
00088 MESSAGING_PATTERN_TYPE pattern,
00089 CONNECTION_ID_TYPE& connection_id,
00090 CONNECTION_DIRECTION_TYPE& connection_direction,
00091 MESSAGE_SIZE_TYPE& max_message_size,
00092 TIMEOUT_TYPE,
00093 RETURN_CODE_TYPE& return_code)
00094 {
00095 return_code = RC_NO_ERROR;
00096
00097 if (pattern != PUB_SUB) {
00098 return_code = INVALID_CONFIG;
00099 return;
00100 }
00101
00102 ConnectionSettings connection;
00103 TopicSettings topic;
00104 QosSettings qos;
00105
00106
00107 if (!parser.find_connection(connection_name, connection)) {
00108
00109 if (!parser.find_topic(connection.topic_name_, topic)) {
00110
00111 if (parser.find_qos(connection, qos)) {
00112 return_code = INVALID_CONFIG;
00113 }
00114 } else {
00115 return_code = INVALID_CONFIG;
00116 }
00117 } else {
00118 return_code = INVALID_CONFIG;
00119 }
00120
00121 if (return_code != RC_NO_ERROR) {
00122 return;
00123 }
00124
00125 connection_id = connection.connection_id_;
00126 connection_direction = connection.direction_;
00127 max_message_size = topic.max_message_size_;
00128
00129 return_code = create_opendds_entities(connection_id,
00130 connection.participant_id_,
00131 connection.domain_id_,
00132 connection.topic_name_,
00133 topic.type_name_,
00134 connection_direction,
00135 qos,
00136 connection.config_name_);
00137 if (return_code != RC_NO_ERROR) {
00138 return;
00139 }
00140
00141 const SYSTEM_TIME_TYPE refresh_period =
00142 (connection_direction == SOURCE) ?
00143 OpenDDS::FaceTSS::convertDuration(qos.datawriter_qos().lifespan.duration) : 0;
00144
00145 const TRANSPORT_CONNECTION_STATUS_TYPE status = {
00146 0,
00147 max_message_size,
00148 max_message_size,
00149 connection_direction,
00150 0,
00151 refresh_period,
00152 INVALID,
00153 };
00154 Entities::ConnectionInfo& conn_info = Entities::instance()->connections_[connection_id];
00155 conn_info.connection_name = connection_name;
00156 conn_info.connection_status = status;
00157 conn_info.platform_view_guid = topic.platform_view_guid_;
00158 }
00159
00160 void Get_Connection_Parameters(CONNECTION_NAME_TYPE& connection_name,
00161 CONNECTION_ID_TYPE& connection_id ,
00162 TRANSPORT_CONNECTION_STATUS_TYPE& status,
00163 RETURN_CODE_TYPE& return_code)
00164 {
00165
00166
00167
00168
00169 Entities& entities = *Entities::instance();
00170
00171 if (connection_id != 0 && entities.connections_.count(connection_id)) {
00172
00173
00174 if (connection_name[0]) {
00175
00176 OPENDDS_STRING conn_name = entities.connections_[connection_id].connection_name;
00177 if (std::strcmp(connection_name, conn_name.c_str()) == 0) {
00178 return_code = RC_NO_ERROR;
00179 } else {
00180 return_code = INVALID_PARAM;
00181 }
00182 } else {
00183
00184
00185
00186 entities.connections_[connection_id].connection_name.copy(connection_name,
00187 sizeof(CONNECTION_NAME_TYPE));
00188 connection_name[sizeof(CONNECTION_NAME_TYPE) - 1] = 0;
00189 return_code = RC_NO_ERROR;
00190 }
00191
00192 } else if (connection_name[0] && connection_id == 0) {
00193
00194
00195 ConnectionSettings settings;
00196 if (0 == parser.find_connection(connection_name, settings)) {
00197 connection_id = settings.connection_id_;
00198 return_code = RC_NO_ERROR;
00199 } else {
00200
00201 return_code = INVALID_PARAM;
00202 }
00203 } else {
00204
00205
00206 return_code = INVALID_PARAM;
00207 }
00208 if (return_code == RC_NO_ERROR) {
00209 TRANSPORT_CONNECTION_STATUS_TYPE& cur_status = entities.connections_[connection_id].connection_status;
00210 if (cur_status.CONNECTION_DIRECTION == FACE::DESTINATION) {
00211 Entities::FaceReceiver& receiver = *entities.receivers_[connection_id];
00212 if (receiver.status_valid != FACE::VALID) {
00213 if (OpenDDS::DCPS::DCPS_debug_level > 3) {
00214 ACE_DEBUG((LM_DEBUG, "Get_Connection_Parameters: returning NOT_AVAILABLE due to receiver's status not valid\n"));
00215 }
00216 return_code = NOT_AVAILABLE;
00217 return;
00218 }
00219 if (receiver.total_msgs_recvd != 0) {
00220 cur_status.REFRESH_PERIOD = receiver.sum_recvd_msgs_latency/receiver.total_msgs_recvd;
00221 cur_status.LAST_MSG_VALIDITY = receiver.last_msg_header.message_validity;
00222 } else {
00223 cur_status.REFRESH_PERIOD = 0;
00224 }
00225 WAITING_RANGE_TYPE num_waiting;
00226 if (receiver.messages_waiting(num_waiting) == FACE::RC_NO_ERROR) {
00227 cur_status.WAITING_PROCESSES_OR_MESSAGES = num_waiting;
00228 } else {
00229 if (OpenDDS::DCPS::DCPS_debug_level > 3) {
00230 ACE_DEBUG((LM_DEBUG, "Get_Connection_Parameters: returning NOT_AVAILABLE due to messages_waiting\n"));
00231 }
00232 return_code = NOT_AVAILABLE;
00233 return;
00234 }
00235 } else {
00236
00237
00238 Entities::FaceSender& sender = entities.senders_[connection_id];
00239 if (sender.status_valid != FACE::VALID) {
00240 return_code = NOT_AVAILABLE;
00241 return;
00242 }
00243 cur_status.REFRESH_PERIOD = 0;
00244 cur_status.WAITING_PROCESSES_OR_MESSAGES = 0;
00245 }
00246 status = cur_status;
00247 }
00248 }
00249
00250 void Unregister_Callback(CONNECTION_ID_TYPE connection_id,
00251 RETURN_CODE_TYPE& return_code)
00252 {
00253 Entities& entities = *Entities::instance();
00254 Entities::ConnIdToReceiverMap& readers = entities.receivers_;
00255 if (readers.count(connection_id)) {
00256 readers[connection_id]->dr->set_listener(NULL, 0);
00257 return_code = RC_NO_ERROR;
00258 return;
00259 }
00260 return_code = INVALID_PARAM;
00261 }
00262
00263 void Destroy_Connection(CONNECTION_ID_TYPE connection_id,
00264 RETURN_CODE_TYPE& return_code)
00265 {
00266 Entities& entities = *Entities::instance();
00267 Entities::ConnIdToSenderMap& writers = entities.senders_;
00268 Entities::ConnIdToReceiverMap& readers = entities.receivers_;
00269
00270 DDS::DomainParticipant_var dp;
00271 bool try_cleanup_participant = false;
00272 if (writers.count(connection_id)) {
00273 const DDS::DataWriter_var datawriter = writers[connection_id].dw;
00274 const DDS::Publisher_var pub = datawriter->get_publisher();
00275 writers.erase(connection_id);
00276 pub->delete_datawriter(datawriter);
00277 dp = pub->get_participant();
00278 try_cleanup_participant = cleanup_opendds_publisher(pub);
00279
00280 } else if (readers.count(connection_id)) {
00281 const DDS::DataReader_var datareader = readers[connection_id]->dr;
00282 const DDS::Subscriber_var sub = datareader->get_subscriber();
00283 delete readers[connection_id];
00284 readers.erase(connection_id);
00285 sub->delete_datareader(datareader);
00286 dp = sub->get_participant();
00287 try_cleanup_participant = cleanup_opendds_subscriber(sub);
00288 }
00289
00290 if (!dp) {
00291 return_code = INVALID_PARAM;
00292 return;
00293 }
00294
00295 if (try_cleanup_participant) {
00296 cleanup_opendds_participant(dp);
00297 }
00298
00299 entities.connections_.erase(connection_id);
00300 return_code = RC_NO_ERROR;
00301 }
00302 OpenDDS_FACE_Export
00303 void receive_header( FACE::CONNECTION_ID_TYPE connection_id,
00304 FACE::TIMEOUT_TYPE ,
00305 FACE::TRANSACTION_ID_TYPE& transaction_id,
00306 FACE::TS::MessageHeader& message_header,
00307 FACE::MESSAGE_SIZE_TYPE message_size,
00308 FACE::RETURN_CODE_TYPE& return_code)
00309 {
00310 Entities::ConnIdToReceiverMap& readers =
00311 Entities::instance()->receivers_;
00312
00313
00314
00315 if (!readers.count(connection_id) || transaction_id == 0) {
00316 if (OpenDDS::DCPS::DCPS_debug_level > 3) {
00317 ACE_DEBUG((LM_DEBUG, "(%P|%t) receive_header - INVALID_PARAM - "
00318 "could not find reader for connection_id: %d OR transaction id[%d] == 0 \n",
00319 connection_id,
00320 transaction_id));
00321 }
00322 return_code = FACE::INVALID_PARAM;
00323 return;
00324 }
00325
00326 if (message_size < 0 || (unsigned)message_size < sizeof(FACE::TS::MessageHeader)) {
00327 if (OpenDDS::DCPS::DCPS_debug_level) {
00328 ACE_DEBUG((LM_DEBUG, "(%P|%t) receive_header - INVALID_PARAM - message_size: %d is < %d \n",
00329 message_size,
00330 sizeof(FACE::TS::MessageHeader)));
00331 }
00332 return_code = FACE::INVALID_PARAM;
00333 return;
00334 }
00335 if (transaction_id == readers[connection_id]->last_msg_tid) {
00336 message_header = readers[connection_id]->last_msg_header;
00337 return_code = OpenDDS::FaceTSS::update_status(connection_id, DDS::RETCODE_OK);
00338 } else {
00339 return_code = OpenDDS::FaceTSS::update_status(connection_id, DDS::RETCODE_BAD_PARAMETER);
00340 }
00341 }
00342
00343 void Receive_Message(
00344 CONNECTION_ID_TYPE connection_id,
00345 TIMEOUT_TYPE timeout,
00346 TRANSACTION_ID_TYPE& transaction_id,
00347 MessageHeader& message_header,
00348 MESSAGE_SIZE_TYPE message_size,
00349 RETURN_CODE_TYPE& return_code)
00350 {
00351 receive_header(connection_id, timeout,
00352 transaction_id, message_header,
00353 message_size, return_code);
00354 }
00355
00356 namespace {
00357 void find_or_create_dp(const DDS::DomainId_t& domainId,
00358 int participantId,
00359 const DDS::DomainParticipantFactory_var& dpf,
00360 DDS::DomainParticipant_var& dp) {
00361 DDS::DomainParticipant_var temp_dp;
00362
00363 temp_dp = dpf->lookup_participant(domainId);
00364 if (!temp_dp) {
00365 if (OpenDDS::DCPS::DCPS_debug_level > 3) {
00366 ACE_DEBUG((LM_DEBUG, "(%P|%t) find_or_create_dp - created new participant for domainId: %d\n", domainId));
00367 }
00368 DDS::DomainParticipantQos qos(PARTICIPANT_QOS_DEFAULT);
00369 qos.user_data.value.length(6);
00370 qos.user_data.value[0] = (participantId >> 0) & 0xFF;
00371 qos.user_data.value[1] = (participantId >> 8) & 0xFF;
00372 qos.user_data.value[2] = (participantId >> 16) & 0xFF;
00373 qos.user_data.value[3] = (participantId >> 24) & 0xFF;
00374 qos.user_data.value[4] = 0;
00375 qos.user_data.value[5] = 0;
00376 dp = dpf->create_participant(domainId, qos, 0, 0);
00377 return;
00378 }
00379 if (OpenDDS::DCPS::DCPS_debug_level > 3) {
00380 ACE_DEBUG((LM_DEBUG, "(%P|%t) find_or_create_dp - found exiting participant for domainId: %d \n", domainId));
00381 }
00382 dp = temp_dp;
00383 }
00384
00385 void find_or_create_pub(const DDS::PublisherQos& qos,
00386 const DDS::DomainParticipant_var& dp,
00387 DDS::Publisher_var& pub) {
00388 DDS::DomainParticipant_var temp_dp;
00389 DDS::Publisher_var temp_pub;
00390
00391 Entities::ConnIdToSenderMap& writers = Entities::instance()->senders_;
00392 Entities::ConnIdToSenderMap::iterator wtrIter = writers.begin();
00393
00394 while (wtrIter != writers.end()) {
00395 temp_pub = wtrIter->second.dw->get_publisher();
00396 temp_dp = temp_pub->get_participant();
00397 DDS::PublisherQos temp_qos;
00398 temp_pub->get_qos(temp_qos);
00399 if (dp->get_domain_id() == temp_dp->get_domain_id() &&
00400 temp_qos == qos) {
00401 if (OpenDDS::DCPS::DCPS_debug_level > 3) {
00402 ACE_DEBUG((LM_DEBUG, "(%P|%t) find_or_create_pub - found exiting publisher in senders\n"));
00403 }
00404 pub = temp_pub;
00405 return;
00406 } else {
00407 ++wtrIter;
00408 }
00409 }
00410 if (OpenDDS::DCPS::DCPS_debug_level > 3) {
00411 ACE_DEBUG((LM_DEBUG, "(%P|%t) find_or_create_pub - created new publisher\n"));
00412 }
00413 pub = dp->create_publisher(qos, 0, 0);
00414 }
00415
00416 void find_or_create_sub(const DDS::SubscriberQos& qos,
00417 const DDS::DomainParticipant_var& dp,
00418 DDS::Subscriber_var& sub)
00419 {
00420 DDS::DomainParticipant_var temp_dp;
00421 DDS::Subscriber_var temp_sub;
00422
00423 Entities::ConnIdToReceiverMap& readers = Entities::instance()->receivers_;
00424 Entities::ConnIdToReceiverMap::iterator rdrIter = readers.begin();
00425
00426 while (rdrIter != readers.end()) {
00427 temp_sub = rdrIter->second->dr->get_subscriber();
00428 temp_dp = temp_sub->get_participant();
00429 DDS::SubscriberQos temp_qos;
00430 temp_sub->get_qos(temp_qos);
00431 if (dp->get_domain_id() == temp_dp->get_domain_id() &&
00432 temp_qos == qos) {
00433 if (OpenDDS::DCPS::DCPS_debug_level > 3) {
00434 ACE_DEBUG((LM_DEBUG, "(%P|%t) find_or_create_sub - found exiting subscriber in receivers\n"));
00435 }
00436 sub = temp_sub;
00437 return;
00438 } else {
00439 ++rdrIter;
00440 }
00441 }
00442 if (OpenDDS::DCPS::DCPS_debug_level > 3) {
00443 ACE_DEBUG((LM_DEBUG, "(%P|%t) find_or_create_sub - created new subscriber\n"));
00444 }
00445 sub = dp->create_subscriber(qos, 0, 0);
00446 }
00447
00448 bool cleanup_opendds_publisher(const DDS::Publisher_var pub)
00449 {
00450 DDS::Publisher_var temp_pub;
00451 DDS::PublisherQos pub_qos;
00452 pub->get_qos(pub_qos);
00453 DDS::DomainParticipant_var dp = pub->get_participant();
00454 DDS::DomainParticipant_var temp_dp;
00455 DDS::PublisherQos temp_qos;
00456
00457 Entities::ConnIdToSenderMap& writers = Entities::instance()->senders_;
00458 Entities::ConnIdToSenderMap::iterator wtrIter = writers.begin();
00459 while (wtrIter != writers.end()) {
00460 temp_pub = wtrIter->second.dw->get_publisher();
00461 temp_dp = temp_pub->get_participant();
00462 if (dp->get_domain_id() == temp_dp->get_domain_id()) {
00463 temp_pub->get_qos(temp_qos);
00464 if (pub_qos == temp_qos) {
00465 if (OpenDDS::DCPS::DCPS_debug_level > 3) {
00466 ACE_DEBUG((LM_DEBUG, "(%P|%t) cleanup_opendds_publisher - publisher still in use by other writer\n"));
00467 }
00468 return false;
00469 }
00470 }
00471 ++wtrIter;
00472 }
00473 if (OpenDDS::DCPS::DCPS_debug_level > 3) {
00474 ACE_DEBUG((LM_DEBUG, "(%P|%t) cleanup_opendds_publisher - publisher no longer in use, delete pub\n"));
00475 }
00476 dp->delete_publisher(pub);
00477 return true;
00478 }
00479
00480 bool cleanup_opendds_subscriber(const DDS::Subscriber_var sub)
00481 {
00482 DDS::Subscriber_var temp_sub;
00483 DDS::SubscriberQos sub_qos;
00484 sub->get_qos(sub_qos);
00485 DDS::DomainParticipant_var dp = sub->get_participant();
00486 DDS::DomainParticipant_var temp_dp;
00487 DDS::SubscriberQos temp_qos;
00488
00489
00490 Entities::ConnIdToReceiverMap& readers = Entities::instance()->receivers_;
00491 Entities::ConnIdToReceiverMap::iterator rdrIter = readers.begin();
00492 while (rdrIter != readers.end()) {
00493 temp_sub = rdrIter->second->dr->get_subscriber();
00494 temp_dp = temp_sub->get_participant();
00495
00496 if (dp->get_domain_id() == temp_dp->get_domain_id()) {
00497 temp_sub->get_qos(temp_qos);
00498 if (sub_qos == temp_qos) {
00499 if (OpenDDS::DCPS::DCPS_debug_level > 3) {
00500 ACE_DEBUG((LM_DEBUG, "(%P|%t) cleanup_opendds_subscriber - subscriber still in use by other reader\n"));
00501 }
00502 return false;
00503 }
00504 }
00505 ++rdrIter;
00506 }
00507 if (OpenDDS::DCPS::DCPS_debug_level > 3) {
00508 ACE_DEBUG((LM_DEBUG, "(%P|%t) cleanup_opendds_subscriber - subscriber no longer in use, delete sub\n"));
00509 }
00510 dp->delete_subscriber(sub);
00511 return true;
00512 }
00513
00514 void cleanup_opendds_participant(const DDS::DomainParticipant_var dp)
00515 {
00516 DDS::DomainParticipant_var temp_dp;
00517 Entities::ConnIdToReceiverMap& readers = Entities::instance()->receivers_;
00518 Entities::ConnIdToReceiverMap::iterator rdrIter = readers.begin();
00519 while (rdrIter != readers.end()) {
00520 temp_dp = rdrIter->second->dr->get_subscriber()->get_participant();
00521 if (dp->get_domain_id() == temp_dp->get_domain_id()) {
00522 if (OpenDDS::DCPS::DCPS_debug_level > 3) {
00523 ACE_DEBUG((LM_DEBUG, "(%P|%t) cleanup_opendds_participant - participant still in use by reader\n"));
00524 }
00525 return;
00526 } else {
00527 ++rdrIter;
00528 }
00529 }
00530
00531 Entities::ConnIdToSenderMap& writers = Entities::instance()->senders_;
00532 Entities::ConnIdToSenderMap::iterator wtrIter = writers.begin();
00533
00534 while (wtrIter != writers.end()) {
00535 temp_dp = wtrIter->second.dw->get_publisher()->get_participant();
00536 if (dp->get_domain_id() == temp_dp->get_domain_id()) {
00537 if (OpenDDS::DCPS::DCPS_debug_level > 3) {
00538 ACE_DEBUG((LM_DEBUG, "(%P|%t) cleanup_opendds_participant - participant still in use by writer\n"));
00539 }
00540 return;
00541 } else {
00542 ++wtrIter;
00543 }
00544 }
00545 if (OpenDDS::DCPS::DCPS_debug_level > 3) {
00546 ACE_DEBUG((LM_DEBUG, "(%P|%t) cleanup_opendds_participant - participant for domain: %d no longer in use, delete entities and participant\n", dp->get_domain_id()));
00547 }
00548 dp->delete_contained_entities();
00549 const DDS::DomainParticipantFactory_var dpf = TheParticipantFactory;
00550 dpf->delete_participant(dp);
00551 }
00552
00553 RETURN_CODE_TYPE create_opendds_entities(CONNECTION_ID_TYPE connectionId,
00554 int participantId,
00555 const DDS::DomainId_t domainId,
00556 const char* topicName,
00557 const char* type,
00558 CONNECTION_DIRECTION_TYPE dir,
00559 QosSettings& qos_settings,
00560 const char* transport)
00561 {
00562 #ifdef DEBUG_OPENDDS_FACETSS
00563 OpenDDS::DCPS::set_DCPS_debug_level(8);
00564 OpenDDS::DCPS::Transport_debug_level = 5;
00565 TheServiceParticipant->set_BIT(false);
00566 #endif
00567
00568 const DDS::DomainParticipantFactory_var dpf = TheParticipantFactory;
00569 if (!dpf) return INVALID_PARAM;
00570
00571 DDS::DomainParticipant_var dp;
00572 find_or_create_dp(domainId, participantId, dpf, dp);
00573 if (!dp) return INVALID_PARAM;
00574
00575 using OpenDDS::DCPS::Data_Types_Register;
00576 OpenDDS::DCPS::TypeSupport* ts =
00577 Registered_Data_Types->lookup(dp, type);
00578 if (!ts) {
00579 ts = Registered_Data_Types->lookup(0, type);
00580 if (!ts) return INVALID_PARAM;
00581 Registered_Data_Types->register_type(dp, type, ts);
00582 }
00583
00584 const DDS::Topic_var topic =
00585 dp->create_topic(topicName, type, TOPIC_QOS_DEFAULT, 0, 0);
00586 if (!topic) return INVALID_PARAM;
00587
00588 if (dir == SOURCE) {
00589 DDS::PublisherQos publisher_qos;
00590 qos_settings.apply_to(publisher_qos);
00591
00592 DDS::Publisher_var pub;
00593 find_or_create_pub(publisher_qos, dp, pub);
00594 if (!pub) return INVALID_PARAM;
00595
00596 if (transport && transport[0]) {
00597 OpenDDS::DCPS::TransportConfig_rch config = TheTransportRegistry->get_config(transport);
00598 if (config.is_nil()) return INVALID_PARAM;
00599 TheTransportRegistry->bind_config(config, pub);
00600 }
00601
00602 DDS::DataWriterQos datawriter_qos;
00603 qos_settings.apply_to(datawriter_qos);
00604
00605
00606 datawriter_qos.user_data.value.length (3);
00607 datawriter_qos.user_data.value[0] = (connectionId >> 0) & 0xFF;
00608 datawriter_qos.user_data.value[1] = (connectionId >> 8) & 0xFF;
00609 datawriter_qos.user_data.value[2] = (connectionId >> 16) & 0xFF;
00610
00611 const DDS::DataWriter_var dw =
00612 pub->create_datawriter(topic, datawriter_qos, 0, 0);
00613 if (!dw) return INVALID_PARAM;
00614
00615 Entities::instance()->senders_[connectionId].dw = dw;
00616
00617 } else {
00618 DDS::SubscriberQos subscriber_qos;
00619 qos_settings.apply_to(subscriber_qos);
00620
00621 DDS::Subscriber_var sub;
00622 find_or_create_sub(subscriber_qos, dp, sub);
00623 if (!sub) return INVALID_PARAM;
00624
00625 if (transport && transport[0]) {
00626 OpenDDS::DCPS::TransportConfig_rch config = TheTransportRegistry->get_config(transport);
00627 if (config.is_nil()) return INVALID_PARAM;
00628 TheTransportRegistry->bind_config(config, sub);
00629 }
00630
00631 DDS::DataReaderQos datareader_qos;
00632 qos_settings.apply_to(datareader_qos);
00633
00634
00635 datareader_qos.user_data.value.length (3);
00636 datareader_qos.user_data.value[0] = (connectionId >> 0) & 0xFF;
00637 datareader_qos.user_data.value[1] = (connectionId >> 8) & 0xFF;
00638 datareader_qos.user_data.value[2] = (connectionId >> 16) & 0xFF;
00639
00640 const DDS::DataReader_var dr =
00641 sub->create_datareader(topic, datareader_qos, 0, 0);
00642 if (!dr) return INVALID_PARAM;
00643 Entities::instance()->receivers_[connectionId] = new Entities::FaceReceiver();
00644 Entities::instance()->receivers_[connectionId]->dr = dr;
00645 }
00646
00647 return RC_NO_ERROR;
00648 }
00649 }
00650
00651 }}
00652
00653 namespace OpenDDS {
00654 namespace FaceTSS {
00655
00656 Entities::Entities() {}
00657 Entities::~Entities() {}
00658
00659 Entities* Entities::instance()
00660 {
00661 return ACE_Singleton<Entities, ACE_Thread_Mutex>::instance();
00662 }
00663
00664 FACE::RETURN_CODE_TYPE update_status(FACE::CONNECTION_ID_TYPE connection_id,
00665 DDS::ReturnCode_t retcode)
00666 {
00667 FACE::TRANSPORT_CONNECTION_STATUS_TYPE& status =
00668 Entities::instance()->connections_[connection_id].connection_status;
00669 FACE::RETURN_CODE_TYPE rc = FACE::INVALID_PARAM;
00670
00671 switch (retcode) {
00672 case DDS::RETCODE_OK:
00673 status.LAST_MSG_VALIDITY = FACE::VALID;
00674 return FACE::RC_NO_ERROR;
00675
00676 case DDS::RETCODE_ERROR:
00677 rc = FACE::CONNECTION_CLOSED; break;
00678
00679 case DDS::RETCODE_BAD_PARAMETER:
00680 rc = FACE::INVALID_PARAM; break;
00681
00682 case DDS::RETCODE_OUT_OF_RESOURCES:
00683 rc = FACE::DATA_BUFFER_TOO_SMALL; break;
00684
00685 case DDS::RETCODE_PRECONDITION_NOT_MET:
00686 case DDS::RETCODE_NOT_ENABLED:
00687 rc = FACE::INVALID_MODE; break;
00688
00689 case DDS::RETCODE_IMMUTABLE_POLICY:
00690 case DDS::RETCODE_INCONSISTENT_POLICY:
00691 rc = FACE::INVALID_CONFIG; break;
00692
00693 case DDS::RETCODE_ALREADY_DELETED:
00694 rc = FACE::CONNECTION_CLOSED; break;
00695
00696 case DDS::RETCODE_TIMEOUT:
00697 rc = FACE::TIMED_OUT; break;
00698
00699 case DDS::RETCODE_UNSUPPORTED:
00700 case DDS::RETCODE_NO_DATA:
00701 rc = FACE::NOT_AVAILABLE; break;
00702
00703 case DDS::RETCODE_ILLEGAL_OPERATION:
00704 rc = FACE::PERMISSION_DENIED; break;
00705 }
00706
00707 status.LAST_MSG_VALIDITY = FACE::INVALID;
00708 return rc;
00709 }
00710
00711 enum { NSEC_PER_SEC = 1000000000 };
00712
00713 DDS::Duration_t convertTimeout(FACE::TIMEOUT_TYPE timeout)
00714 {
00715 if (timeout == FACE::INF_TIME_VALUE) {
00716 static const DDS::Duration_t dds_inf = {DDS::DURATION_INFINITE_SEC,
00717 DDS::DURATION_INFINITE_NSEC};
00718 return dds_inf;
00719 }
00720
00721 DDS::Duration_t dur = {static_cast<int>(timeout / NSEC_PER_SEC),
00722 static_cast<unsigned int>(timeout % NSEC_PER_SEC)};
00723 return dur;
00724 }
00725
00726 FACE::SYSTEM_TIME_TYPE convertDuration(const DDS::Duration_t& duration)
00727 {
00728 if (duration.sec == DDS::DURATION_INFINITE_SEC
00729 && duration.nanosec == DDS::DURATION_INFINITE_NSEC) {
00730 return FACE::INF_TIME_VALUE;
00731 }
00732 return duration.nanosec +
00733 duration.sec * static_cast<FACE::SYSTEM_TIME_TYPE>(NSEC_PER_SEC);
00734 }
00735
00736 FACE::SYSTEM_TIME_TYPE convertTime(const DDS::Time_t& timestamp)
00737 {
00738 return timestamp.nanosec +
00739 timestamp.sec * static_cast<FACE::SYSTEM_TIME_TYPE>(NSEC_PER_SEC);
00740 }
00741
00742 FACE::MESSAGE_INSTANCE_GUID
00743 create_message_instance_guid(const OpenDDS::DCPS::RepoId& pub, const CORBA::LongLong& orig_seq)
00744 {
00745 OpenDDS::DCPS::GuidConverter writer(pub);
00746
00747 FACE::MESSAGE_INSTANCE_GUID message_instance_guid;
00748 FACE::LongLong mig_low;
00749 FACE::LongLong masked_seq;
00750
00751
00752 FACE::Long prefix_representation = ACE::crc32(reinterpret_cast<const void*>(&pub), sizeof(pub));
00753 masked_seq = orig_seq >> 32;
00754
00755 if (masked_seq) {
00756 ACE_DEBUG((LM_WARNING, "(%P|%t) WARNING: create_message_instance_guid - seq does not fit in FACE::Long, truncating high bits to fit\n"));
00757 }
00758 mig_low = orig_seq & 0xFFFFFFFF;
00759 message_instance_guid = (((FACE::LongLong) prefix_representation) << 32 ) | ((FACE::LongLong) mig_low);
00760
00761
00762
00763
00764
00765
00766
00767
00768
00769
00770
00771
00772
00773
00774
00775
00776
00777
00778
00779
00780
00781
00782
00783 return message_instance_guid;
00784 }
00785
00786 void populate_header_received(const FACE::CONNECTION_ID_TYPE& connection_id,
00787 const DDS::DomainParticipant_var part,
00788 const DDS::SampleInfo& sinfo,
00789 FACE::RETURN_CODE_TYPE& return_code)
00790 {
00791 Entities::ConnIdToReceiverMap& readers = Entities::instance()->receivers_;
00792 if (!readers.count(connection_id)) {
00793 return_code = FACE::INVALID_PARAM;
00794 return;
00795 }
00796 FACE::TS::MessageHeader& header = readers[connection_id]->last_msg_header;
00797
00798 header.platform_view_guid = Entities::instance()->connections_[connection_id].platform_view_guid;
00799
00800 DDS::Subscriber_var temp_sub = readers[connection_id]->dr->get_subscriber();
00801 DDS::DomainParticipant_var temp_dp = temp_sub->get_participant();
00802 const OpenDDS::DCPS::RepoId pub = dynamic_cast<OpenDDS::DCPS::DomainParticipantImpl*>(temp_dp.in())->get_repoid(sinfo.publication_handle);
00803 header.message_instance_guid = create_message_instance_guid(pub, sinfo.opendds_reserved_publication_seq);
00804
00805 header.message_timestamp = convertTime(sinfo.source_timestamp);
00806 ACE_Time_Value now(ACE_OS::gettimeofday());
00807
00808 readers[connection_id]->sum_recvd_msgs_latency += (convertTime(OpenDDS::DCPS::time_value_to_time(now)) - header.message_timestamp);
00809 ++readers[connection_id]->total_msgs_recvd;
00810
00811 if (OpenDDS::DCPS::DCPS_debug_level > 8) {
00812 ACE_DEBUG((LM_DEBUG, "(%P|%t) populate_header_received: Latency is now (tot_latency %d / tot_msgs_recvd %d): %d\n",
00813 readers[connection_id]->sum_recvd_msgs_latency,
00814 readers[connection_id]->total_msgs_recvd,
00815 readers[connection_id]->sum_recvd_msgs_latency/readers[connection_id]->total_msgs_recvd));
00816 }
00817 ::DDS::Subscriber_var bit_subscriber
00818 = part->get_builtin_subscriber () ;
00819
00820 ::DDS::DataReader_var reader
00821 = bit_subscriber->lookup_datareader (OpenDDS::DCPS::BUILT_IN_PUBLICATION_TOPIC) ;
00822 ::DDS::PublicationBuiltinTopicDataDataReader_var pub_reader
00823 = ::DDS::PublicationBuiltinTopicDataDataReader::_narrow (reader.in ());
00824 if (CORBA::is_nil (pub_reader.in ())) {
00825 ACE_ERROR((LM_ERROR, "(%P|%t) populate_header_received: failed to get BUILT_IN_PUBLICATION_TOPIC datareader.\n"));
00826 return_code = FACE::NOT_AVAILABLE;
00827 return;
00828 }
00829
00830 ::DDS::ReturnCode_t ret;
00831 ::DDS::SampleInfoSeq pubinfos(1);
00832 ::DDS::PublicationBuiltinTopicDataSeq pubdata(1);
00833 ret = pub_reader->read_instance(pubdata,
00834 pubinfos,
00835 1,
00836 sinfo.publication_handle,
00837 ::DDS::ANY_SAMPLE_STATE,
00838 ::DDS::ANY_VIEW_STATE,
00839 ::DDS::ALIVE_INSTANCE_STATE);
00840
00841 if (ret != ::DDS::RETCODE_OK && ret != ::DDS::RETCODE_NO_DATA) {
00842 ACE_ERROR((LM_ERROR,
00843 "(%P|%t) populate_header_received: failed to read BIT publication data.\n"));
00844 return_code = FACE::NOT_AVAILABLE;
00845 return;
00846 }
00847
00848 const CORBA::ULong i = 0;
00849
00850 header.message_source_guid =
00851 (pubdata[i].user_data.value[0] << 0) |
00852 (pubdata[i].user_data.value[1] << 8) |
00853 (pubdata[i].user_data.value[2] << 16);
00854
00855
00856
00857
00858 DDS::Duration_t lifespan = pubdata[i].lifespan.duration;
00859 if (lifespan.sec != DDS::DURATION_INFINITE_SEC &&
00860 lifespan.nanosec != DDS::DURATION_INFINITE_NSEC) {
00861
00862
00863 DDS::Time_t const tmp = {
00864 sinfo.source_timestamp.sec + lifespan.sec,
00865 sinfo.source_timestamp.nanosec + lifespan.nanosec
00866 };
00867
00868
00869
00870 ACE_Time_Value const expiration_time(
00871 OpenDDS::DCPS::time_to_time_value(tmp));
00872
00873 if (now >= expiration_time) {
00874
00875 header.message_validity = FACE::INVALID;
00876 return_code = FACE::RC_NO_ERROR;
00877 return;
00878 }
00879 }
00880
00881 header.message_validity = FACE::VALID;
00882 return_code = FACE::RC_NO_ERROR;
00883 }
00884 }}