FaceTSS.cpp

Go to the documentation of this file.
00001 #include "FaceTSS.h"
00002 #include "FACE/TS.hpp"
00003 #include "config/Parser.h"
00004 
00005 #include "dds/DCPS/Service_Participant.h"
00006 #include "dds/DCPS/DomainParticipantImpl.h"
00007 #include "dds/DCPS/Registered_Data_Types.h"
00008 #include "dds/DCPS/Marked_Default_Qos.h"
00009 #include "dds/DCPS/BuiltInTopicUtils.h"
00010 #include "dds/DCPS/SafetyProfileStreams.h"
00011 #include "dds/DCPS/SafetyProfilePool.h"
00012 #include "dds/DCPS/GuidConverter.h"
00013 #include "dds/DCPS/Qos_Helper.h"
00014 #include "dds/DdsDcpsCoreC.h"
00015 #include "dds/DCPS/transport/framework/TransportRegistry.h"
00016 #include "dds/DCPS/transport/framework/TransportExceptions.h"
00017 
00018 #include <cstring>
00019 
00020 #ifndef OPENDDS_SAFETY_PROFILE
00021 using OpenDDS::DCPS::operator==;
00022 #endif
00023 
00024 namespace FACE {
00025 namespace TS {
00026 
00027 bool MessageHeader::operator==(const MessageHeader& rhs) const
00028 {
00029   return message_instance_guid == rhs.message_instance_guid
00030     && platform_view_guid == rhs.platform_view_guid
00031     && message_source_guid == rhs.message_source_guid
00032     && message_timestamp == rhs.message_timestamp
00033     && message_validity == rhs.message_validity;
00034 }
00035 
00036 using OpenDDS::FaceTSS::config::ConnectionSettings;
00037 using OpenDDS::FaceTSS::config::TopicSettings;
00038 using OpenDDS::FaceTSS::config::QosSettings;
00039 
00040 namespace {
00041   OpenDDS::FaceTSS::config::Parser parser;
00042 
00043   void find_or_create_dp(const DDS::DomainId_t& domainId,
00044                          int participantId,
00045                          const DDS::DomainParticipantFactory_var& dpf,
00046                          DDS::DomainParticipant_var& dp);
00047   void find_or_create_pub(const DDS::PublisherQos& qos,
00048                           const DDS::DomainParticipant_var& dp,
00049                           DDS::Publisher_var& pub);
00050   void find_or_create_sub(const DDS::SubscriberQos& qos,
00051                           const DDS::DomainParticipant_var& dp,
00052                           DDS::Subscriber_var& sub);
00053 
00054   bool cleanup_opendds_publisher(const DDS::Publisher_var pub);
00055   bool cleanup_opendds_subscriber(const DDS::Subscriber_var sub);
00056   void cleanup_opendds_participant(const DDS::DomainParticipant_var dp);
00057 
00058   RETURN_CODE_TYPE create_opendds_entities(CONNECTION_ID_TYPE connectionId,
00059                                            int participantId,
00060                                            const DDS::DomainId_t domainId,
00061                                            const char* topic,
00062                                            const char* type,
00063                                            CONNECTION_DIRECTION_TYPE dir,
00064                                            QosSettings& qos,
00065                                            const char* transport);
00066 }
00067 
00068 using OpenDDS::FaceTSS::Entities;
00069 
00070 void Initialize(const CONFIGURATION_RESOURCE configuration_file,
00071                 RETURN_CODE_TYPE& return_code)
00072 {
00073   try {
00074     int status = parser.parse(configuration_file);
00075     if (status != 0) {
00076       ACE_ERROR((LM_ERROR,
00077         ACE_TEXT("(%P|%t) ERROR: Initialize() ")
00078         ACE_TEXT("Parser::parse () returned %d\n"),
00079         status));
00080       return_code = INVALID_PARAM;
00081     } else {
00082       return_code = RC_NO_ERROR;
00083 #if defined OPENDDS_SAFETY_PROFILE && defined ACE_HAS_ALLOC_HOOKS
00084       TheServiceParticipant->configure_pool();
00085 #endif
00086     }
00087   } catch (const CORBA::BAD_PARAM& ) {
00088     if (OpenDDS::DCPS::DCPS_debug_level) {
00089       ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: Initialize - INVALID_PARAM\n"));
00090     }
00091     return_code = INVALID_PARAM;
00092   }
00093 }
00094 
00095 void Create_Connection(const CONNECTION_NAME_TYPE connection_name,
00096                        MESSAGING_PATTERN_TYPE pattern,
00097                        CONNECTION_ID_TYPE& connection_id,
00098                        CONNECTION_DIRECTION_TYPE& connection_direction,
00099                        MESSAGE_SIZE_TYPE& max_message_size,
00100                        TIMEOUT_TYPE,
00101                        RETURN_CODE_TYPE& return_code)
00102 {
00103   try {
00104     return_code = RC_NO_ERROR;
00105 
00106     if (pattern != PUB_SUB) {
00107       return_code = INVALID_CONFIG;
00108       return;
00109     }
00110 
00111     ConnectionSettings connection;
00112     TopicSettings topic;
00113     QosSettings qos;
00114 
00115     // Find connection
00116     if (!parser.find_connection(connection_name, connection)) {
00117       // Find topic
00118       if (!parser.find_topic(connection.topic_name_, topic)) {
00119         // Find Qos by specified name(s)
00120         if (parser.find_qos(connection, qos)) {
00121           return_code = INVALID_CONFIG;
00122         }
00123       } else {
00124         return_code = INVALID_CONFIG;
00125       }
00126     } else {
00127       return_code = INVALID_CONFIG;
00128     }
00129 
00130     if (return_code != RC_NO_ERROR) {
00131       return;
00132     }
00133 
00134     connection_id = connection.connection_id_;
00135     connection_direction = connection.direction_;
00136     max_message_size = topic.max_message_size_;
00137 
00138     return_code = create_opendds_entities(connection_id,
00139       connection.participant_id_,
00140       connection.domain_id_,
00141       connection.topic_name_,
00142       topic.type_name_,
00143       connection_direction,
00144       qos,
00145       connection.config_name_);
00146     if (return_code != RC_NO_ERROR) {
00147       return;
00148     }
00149 
00150     const SYSTEM_TIME_TYPE refresh_period =
00151       (connection_direction == SOURCE) ?
00152       OpenDDS::FaceTSS::convertDuration(qos.datawriter_qos().lifespan.duration) : 0;
00153 
00154     const TRANSPORT_CONNECTION_STATUS_TYPE status = {
00155       0, // MESSAGE currently set to 0 due to type mismatch in MESSAGE_RANGE_TYPE and MESSAGE_TYPE_GUID
00156       max_message_size, // MAX_MESSAGE with no transformations, set to config value
00157       max_message_size,
00158       connection_direction,
00159       0, // WAITING_PROCESSES_OR_MESSAGES
00160       refresh_period,
00161       INVALID,
00162     };
00163     Entities::ConnectionInfo& conn_info = Entities::instance()->connections_[connection_id];
00164     conn_info.connection_name = connection_name;
00165     conn_info.connection_status = status;
00166     conn_info.platform_view_guid = topic.platform_view_guid_;
00167   } catch (const CORBA::BAD_PARAM&) {
00168     if (OpenDDS::DCPS::DCPS_debug_level) {
00169       ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: Create_Connection - INVALID_PARAM\n"));
00170     }
00171     return_code = INVALID_PARAM;
00172   }
00173 }
00174 
00175 void Get_Connection_Parameters(CONNECTION_NAME_TYPE& connection_name,
00176                                CONNECTION_ID_TYPE& connection_id /* 0 if an out param */,
00177                                TRANSPORT_CONNECTION_STATUS_TYPE& status,
00178                                RETURN_CODE_TYPE& return_code)
00179 {
00180   try {
00181     // connection_name is optional, if absent populate from connection_id lookup
00182     // connection_id is also optional, if absent populate from connection_name lookup
00183     // if both provided, validate
00184     // if neither present, return error
00185     Entities& entities = *Entities::instance();
00186 
00187     if (connection_id != 0 && entities.connections_.count(connection_id)) {
00188       // connection_id was provided
00189       // if validated or populated, set return_code so status will be populated
00190       if (connection_name[0]) {
00191         // Validate provided connection_name
00192         OPENDDS_STRING conn_name = entities.connections_[connection_id].connection_name;
00193         if (std::strcmp(connection_name, conn_name.c_str()) == 0) {
00194           return_code = RC_NO_ERROR;
00195         } else {
00196           return_code = INVALID_PARAM;
00197         }
00198       } else {
00199         // connection_name not provided
00200         // so populate from connection_id lookup
00201         // and set return code so status will be populated
00202         entities.connections_[connection_id].connection_name.copy(connection_name,
00203           sizeof(CONNECTION_NAME_TYPE));
00204         connection_name[sizeof(CONNECTION_NAME_TYPE) - 1] = 0;
00205         return_code = RC_NO_ERROR;
00206       }
00207 
00208     } else if (connection_name[0] && connection_id == 0) {
00209       // connection_id was not specified, but name was provided.
00210       // lookup connection_id and if found set return code to populate status
00211       ConnectionSettings settings;
00212       if (0 == parser.find_connection(connection_name, settings)) {
00213         connection_id = settings.connection_id_;
00214         return_code = RC_NO_ERROR;
00215       } else {
00216         // could not find connection for connection_name
00217         return_code = INVALID_PARAM;
00218       }
00219     } else {
00220       //Neither connection_id or connection_name provided
00221       // a valid connection
00222       return_code = INVALID_PARAM;
00223     }
00224     if (return_code == RC_NO_ERROR) {
00225       TRANSPORT_CONNECTION_STATUS_TYPE& cur_status = entities.connections_[connection_id].connection_status;
00226       if (cur_status.CONNECTION_DIRECTION == FACE::DESTINATION) {
00227         Entities::FaceReceiver& receiver = *entities.receivers_[connection_id];
00228         if (receiver.status_valid != FACE::VALID) {
00229           if (OpenDDS::DCPS::DCPS_debug_level > 3) {
00230             ACE_DEBUG((LM_DEBUG, "Get_Connection_Parameters: returning NOT_AVAILABLE due to receiver's status not valid\n"));
00231           }
00232           return_code = NOT_AVAILABLE;
00233           return;
00234         }
00235         if (receiver.total_msgs_recvd != 0) {
00236           cur_status.REFRESH_PERIOD = receiver.sum_recvd_msgs_latency / receiver.total_msgs_recvd;
00237           cur_status.LAST_MSG_VALIDITY = receiver.last_msg_header.message_validity;
00238         } else {
00239           cur_status.REFRESH_PERIOD = 0;
00240         }
00241         WAITING_RANGE_TYPE num_waiting;
00242         if (receiver.messages_waiting(num_waiting) == FACE::RC_NO_ERROR) {
00243           cur_status.WAITING_PROCESSES_OR_MESSAGES = num_waiting;
00244         } else {
00245           if (OpenDDS::DCPS::DCPS_debug_level > 3) {
00246             ACE_DEBUG((LM_DEBUG, "Get_Connection_Parameters: returning NOT_AVAILABLE due to messages_waiting\n"));
00247           }
00248           return_code = NOT_AVAILABLE;
00249           return;
00250         }
00251       } else {
00252         //DDS Only supports Destination/Source therefore
00253         // CONNECTION_DIRECTION == FACE::SOURCE
00254         Entities::FaceSender& sender = entities.senders_[connection_id];
00255         if (sender.status_valid != FACE::VALID) {
00256           return_code = NOT_AVAILABLE;
00257           return;
00258         }
00259         cur_status.REFRESH_PERIOD = 0;
00260         cur_status.WAITING_PROCESSES_OR_MESSAGES = 0;
00261       }
00262       status = cur_status;
00263     }
00264   } catch (const CORBA::BAD_PARAM&) {
00265     if (OpenDDS::DCPS::DCPS_debug_level) {
00266       ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: Get_Connection_Parameters - INVALID_PARAM\n"));
00267     }
00268     return_code = INVALID_PARAM;
00269   }
00270 }
00271 
00272 void Unregister_Callback(CONNECTION_ID_TYPE connection_id,
00273                          RETURN_CODE_TYPE& return_code)
00274 {
00275   try {
00276     Entities& entities = *Entities::instance();
00277     Entities::ConnIdToReceiverMap& readers = entities.receivers_;
00278     if (readers.count(connection_id)) {
00279       readers[connection_id]->dr->set_listener(NULL, 0);
00280       return_code = RC_NO_ERROR;
00281       return;
00282     }
00283   } catch (const CORBA::BAD_PARAM&) {
00284     if (OpenDDS::DCPS::DCPS_debug_level) {
00285       ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: Unregister_Callback - INVALID_PARAM\n"));
00286     }
00287   }
00288 
00289   return_code = INVALID_PARAM;
00290 }
00291 
00292 void Destroy_Connection(CONNECTION_ID_TYPE connection_id,
00293                         RETURN_CODE_TYPE& return_code)
00294 {
00295   try {
00296     Entities& entities = *Entities::instance();
00297     Entities::ConnIdToSenderMap& writers = entities.senders_;
00298     Entities::ConnIdToReceiverMap& readers = entities.receivers_;
00299 
00300     DDS::DomainParticipant_var dp;
00301     bool try_cleanup_participant = false;
00302     if (writers.count(connection_id)) {
00303       const DDS::DataWriter_var datawriter = writers[connection_id].dw;
00304       const DDS::Publisher_var pub = datawriter->get_publisher();
00305       writers.erase(connection_id);
00306       pub->delete_datawriter(datawriter);
00307       dp = pub->get_participant();
00308       try_cleanup_participant = cleanup_opendds_publisher(pub);
00309 
00310     } else if (readers.count(connection_id)) {
00311       const DDS::DataReader_var datareader = readers[connection_id]->dr;
00312       const DDS::Subscriber_var sub = datareader->get_subscriber();
00313       delete readers[connection_id];
00314       readers.erase(connection_id);
00315       sub->delete_datareader(datareader);
00316       dp = sub->get_participant();
00317       try_cleanup_participant = cleanup_opendds_subscriber(sub);
00318     }
00319 
00320     if (!dp) {
00321       return_code = INVALID_PARAM;
00322       return;
00323     }
00324 
00325     if (try_cleanup_participant) {
00326       cleanup_opendds_participant(dp);
00327     }
00328 
00329     entities.connections_.erase(connection_id);
00330     return_code = RC_NO_ERROR;
00331   } catch (const CORBA::BAD_PARAM&) {
00332     if (OpenDDS::DCPS::DCPS_debug_level) {
00333       ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: Destroy_Connection - INVALID_PARAM\n"));
00334     }
00335     return_code = INVALID_PARAM;
00336   }
00337 }
00338 
00339 OpenDDS_FACE_Export
00340 void receive_header(/*in*/    FACE::CONNECTION_ID_TYPE connection_id,
00341                     /*in*/    FACE::TIMEOUT_TYPE /*timeout*/,
00342                     /*inout*/ FACE::TRANSACTION_ID_TYPE& transaction_id,
00343                     /*inout*/ FACE::TS::MessageHeader& message_header,
00344                     /*in*/    FACE::MESSAGE_SIZE_TYPE message_size,
00345                     /*out*/   FACE::RETURN_CODE_TYPE& return_code)
00346 {
00347   try {
00348     Entities::ConnIdToReceiverMap& readers =
00349       Entities::instance()->receivers_;
00350     // transaction_id cannot be 0 due to initialization
00351     // of last_msg_tid to 0 before a msg has been received so
00352     // only valid transaction_ids are > 0.
00353     if (!readers.count(connection_id) || transaction_id == 0) {
00354       if (OpenDDS::DCPS::DCPS_debug_level > 3) {
00355         ACE_DEBUG((LM_DEBUG, "(%P|%t) receive_header - INVALID_PARAM - "
00356           "could not find reader for connection_id: %d OR transaction id[%d] == 0 \n",
00357           connection_id,
00358           transaction_id));
00359       }
00360       return_code = FACE::INVALID_PARAM;
00361       return;
00362     }
00363 
00364     if (message_size < 0 || (unsigned)message_size < sizeof(FACE::TS::MessageHeader)) {
00365       if (OpenDDS::DCPS::DCPS_debug_level) {
00366         ACE_DEBUG((LM_DEBUG, "(%P|%t) receive_header - INVALID_PARAM - message_size: %d is < %d \n",
00367           message_size,
00368           sizeof(FACE::TS::MessageHeader)));
00369       }
00370       return_code = FACE::INVALID_PARAM;
00371       return;
00372     }
00373     if (transaction_id == readers[connection_id]->last_msg_tid) {
00374       message_header = readers[connection_id]->last_msg_header;
00375       return_code = OpenDDS::FaceTSS::update_status(connection_id, DDS::RETCODE_OK);
00376     } else {
00377       return_code = OpenDDS::FaceTSS::update_status(connection_id, DDS::RETCODE_BAD_PARAMETER);
00378     }
00379   } catch (const CORBA::BAD_PARAM&) {
00380     if (OpenDDS::DCPS::DCPS_debug_level) {
00381       ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: receive_header - INVALID_PARAM\n"));
00382     }
00383     return_code = INVALID_PARAM;
00384   }
00385 }
00386 
00387 void Receive_Message(
00388   /* in */ CONNECTION_ID_TYPE connection_id,
00389   /* in */ TIMEOUT_TYPE timeout,
00390   /* inout */ TRANSACTION_ID_TYPE& transaction_id,
00391   /* out */ MessageHeader& message_header,
00392   /* in */ MESSAGE_SIZE_TYPE message_size,
00393   /* out */ RETURN_CODE_TYPE& return_code)
00394 {
00395   try {
00396     receive_header(connection_id, timeout,
00397       transaction_id, message_header,
00398       message_size, return_code);
00399   } catch (const CORBA::BAD_PARAM&) {
00400     if (OpenDDS::DCPS::DCPS_debug_level) {
00401       ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: Receive_Message - INVALID_PARAM\n"));
00402     }
00403     return_code = INVALID_PARAM;
00404   }
00405 }
00406 
00407 namespace {
00408   void find_or_create_dp(const DDS::DomainId_t& domainId,
00409                          int participantId,
00410                          const DDS::DomainParticipantFactory_var& dpf,
00411                          DDS::DomainParticipant_var& dp) {
00412     DDS::DomainParticipant_var temp_dp;
00413 
00414     temp_dp = dpf->lookup_participant(domainId);
00415     if (!temp_dp) {
00416       if (OpenDDS::DCPS::DCPS_debug_level > 3) {
00417         ACE_DEBUG((LM_DEBUG, "(%P|%t) find_or_create_dp - created new participant for domainId: %d\n", domainId));
00418       }
00419       DDS::DomainParticipantQos qos;
00420       dpf->get_default_participant_qos(qos);
00421       qos.user_data.value.length(6);
00422       qos.user_data.value[0] = (participantId >> 0) & 0xFF;
00423       qos.user_data.value[1] = (participantId >> 8) & 0xFF;
00424       qos.user_data.value[2] = (participantId >> 16) & 0xFF;
00425       qos.user_data.value[3] = (participantId >> 24) & 0xFF;
00426       qos.user_data.value[4] = 0; // (participantId >> 32) & 0xFF;
00427       qos.user_data.value[5] = 0; // (participantId >> 40) & 0xFF;
00428       dp = dpf->create_participant(domainId, qos, 0, 0);
00429       return;
00430     }
00431     if (OpenDDS::DCPS::DCPS_debug_level > 3) {
00432       ACE_DEBUG((LM_DEBUG, "(%P|%t) find_or_create_dp - found existing participant for domainId: %d \n", domainId));
00433     }
00434     dp = temp_dp;
00435   }
00436 
00437   void find_or_create_pub(const DDS::PublisherQos& qos,
00438                           const DDS::DomainParticipant_var& dp,
00439                           DDS::Publisher_var& pub) {
00440     DDS::DomainParticipant_var temp_dp;
00441     DDS::Publisher_var temp_pub;
00442 
00443     Entities::ConnIdToSenderMap& writers = Entities::instance()->senders_;
00444     Entities::ConnIdToSenderMap::iterator wtrIter = writers.begin();
00445 
00446     while (wtrIter != writers.end()) {
00447       temp_pub = wtrIter->second.dw->get_publisher();
00448       temp_dp = temp_pub->get_participant();
00449       DDS::PublisherQos temp_qos;
00450       temp_pub->get_qos(temp_qos);
00451       if (dp->get_domain_id() == temp_dp->get_domain_id() &&
00452           temp_qos == qos) {
00453         if (OpenDDS::DCPS::DCPS_debug_level > 3) {
00454           ACE_DEBUG((LM_DEBUG, "(%P|%t) find_or_create_pub - found existing publisher in senders\n"));
00455         }
00456         pub = temp_pub;
00457         return;
00458       } else {
00459         ++wtrIter;
00460       }
00461     }
00462     if (OpenDDS::DCPS::DCPS_debug_level > 3) {
00463       ACE_DEBUG((LM_DEBUG, "(%P|%t) find_or_create_pub - created new publisher\n"));
00464     }
00465     pub = dp->create_publisher(qos, 0, 0);
00466   }
00467 
00468   void find_or_create_sub(const DDS::SubscriberQos& qos,
00469                           const DDS::DomainParticipant_var& dp,
00470                           DDS::Subscriber_var& sub)
00471   {
00472     DDS::DomainParticipant_var temp_dp;
00473     DDS::Subscriber_var temp_sub;
00474 
00475     Entities::ConnIdToReceiverMap& readers = Entities::instance()->receivers_;
00476     Entities::ConnIdToReceiverMap::iterator rdrIter = readers.begin();
00477 
00478     while (rdrIter != readers.end()) {
00479       temp_sub = rdrIter->second->dr->get_subscriber();
00480       temp_dp = temp_sub->get_participant();
00481       DDS::SubscriberQos temp_qos;
00482       temp_sub->get_qos(temp_qos);
00483       if (dp->get_domain_id() == temp_dp->get_domain_id() &&
00484           temp_qos == qos) {
00485         if (OpenDDS::DCPS::DCPS_debug_level > 3) {
00486           ACE_DEBUG((LM_DEBUG, "(%P|%t) find_or_create_sub - found existing subscriber in receivers\n"));
00487         }
00488         sub = temp_sub;
00489         return;
00490       } else {
00491         ++rdrIter;
00492       }
00493     }
00494     if (OpenDDS::DCPS::DCPS_debug_level > 3) {
00495       ACE_DEBUG((LM_DEBUG, "(%P|%t) find_or_create_sub - created new subscriber\n"));
00496     }
00497     sub = dp->create_subscriber(qos, 0, 0);
00498   }
00499 
00500   bool cleanup_opendds_publisher(const DDS::Publisher_var pub)
00501   {
00502     DDS::Publisher_var temp_pub;
00503     DDS::PublisherQos pub_qos;
00504     pub->get_qos(pub_qos);
00505     DDS::DomainParticipant_var dp = pub->get_participant();
00506     DDS::DomainParticipant_var temp_dp;
00507     DDS::PublisherQos temp_qos;
00508 
00509     Entities::ConnIdToSenderMap& writers = Entities::instance()->senders_;
00510     Entities::ConnIdToSenderMap::iterator wtrIter = writers.begin();
00511     while (wtrIter != writers.end()) {
00512       temp_pub = wtrIter->second.dw->get_publisher();
00513       temp_dp = temp_pub->get_participant();
00514       if (dp->get_domain_id() == temp_dp->get_domain_id()) {
00515         temp_pub->get_qos(temp_qos);
00516         if (pub_qos == temp_qos) {
00517           if (OpenDDS::DCPS::DCPS_debug_level > 3) {
00518             ACE_DEBUG((LM_DEBUG, "(%P|%t) cleanup_opendds_publisher - publisher still in use by other writer\n"));
00519           }
00520           return false;
00521         }
00522       }
00523       ++wtrIter;
00524     }
00525     if (OpenDDS::DCPS::DCPS_debug_level > 3) {
00526       ACE_DEBUG((LM_DEBUG, "(%P|%t) cleanup_opendds_publisher - publisher no longer in use, delete pub\n"));
00527     }
00528     dp->delete_publisher(pub);
00529     return true;
00530   }
00531 
00532   bool cleanup_opendds_subscriber(const DDS::Subscriber_var sub)
00533   {
00534     DDS::Subscriber_var temp_sub;
00535     DDS::SubscriberQos sub_qos;
00536     sub->get_qos(sub_qos);
00537     DDS::DomainParticipant_var dp = sub->get_participant();
00538     DDS::DomainParticipant_var temp_dp;
00539     DDS::SubscriberQos temp_qos;
00540 
00541 
00542     Entities::ConnIdToReceiverMap& readers = Entities::instance()->receivers_;
00543     Entities::ConnIdToReceiverMap::iterator rdrIter = readers.begin();
00544     while (rdrIter != readers.end()) {
00545       temp_sub = rdrIter->second->dr->get_subscriber();
00546       temp_dp = temp_sub->get_participant();
00547 
00548       if (dp->get_domain_id() == temp_dp->get_domain_id()) {
00549         temp_sub->get_qos(temp_qos);
00550         if (sub_qos == temp_qos) {
00551           if (OpenDDS::DCPS::DCPS_debug_level > 3) {
00552             ACE_DEBUG((LM_DEBUG, "(%P|%t) cleanup_opendds_subscriber - subscriber still in use by other reader\n"));
00553           }
00554           return false;
00555         }
00556       }
00557       ++rdrIter;
00558     }
00559     if (OpenDDS::DCPS::DCPS_debug_level > 3) {
00560       ACE_DEBUG((LM_DEBUG, "(%P|%t) cleanup_opendds_subscriber - subscriber no longer in use, delete sub\n"));
00561     }
00562     dp->delete_subscriber(sub);
00563     return true;
00564   }
00565 
00566   void cleanup_opendds_participant(const DDS::DomainParticipant_var dp)
00567   {
00568     DDS::DomainParticipant_var temp_dp;
00569     Entities::ConnIdToReceiverMap& readers = Entities::instance()->receivers_;
00570     Entities::ConnIdToReceiverMap::iterator rdrIter = readers.begin();
00571     while (rdrIter != readers.end()) {
00572       DDS::Subscriber_var sub = rdrIter->second->dr->get_subscriber();
00573       temp_dp = sub->get_participant();
00574       if (dp->get_domain_id() == temp_dp->get_domain_id()) {
00575         if (OpenDDS::DCPS::DCPS_debug_level > 3) {
00576           ACE_DEBUG((LM_DEBUG, "(%P|%t) cleanup_opendds_participant - participant still in use by reader\n"));
00577         }
00578         return;
00579       } else {
00580         ++rdrIter;
00581       }
00582     }
00583 
00584     Entities::ConnIdToSenderMap& writers = Entities::instance()->senders_;
00585     Entities::ConnIdToSenderMap::iterator wtrIter = writers.begin();
00586 
00587     while (wtrIter != writers.end()) {
00588       DDS::Publisher_var publisher = wtrIter->second.dw->get_publisher();
00589       temp_dp = publisher->get_participant();
00590       if (dp->get_domain_id() == temp_dp->get_domain_id()) {
00591         if (OpenDDS::DCPS::DCPS_debug_level > 3) {
00592           ACE_DEBUG((LM_DEBUG, "(%P|%t) cleanup_opendds_participant - participant still in use by writer\n"));
00593         }
00594         return;
00595       } else {
00596         ++wtrIter;
00597       }
00598     }
00599     if (OpenDDS::DCPS::DCPS_debug_level > 3) {
00600       ACE_DEBUG((LM_DEBUG, "(%P|%t) cleanup_opendds_participant - participant for domain: %d no longer in use, delete entities and participant\n", dp->get_domain_id()));
00601     }
00602     dp->delete_contained_entities();
00603     const DDS::DomainParticipantFactory_var dpf = TheParticipantFactory;
00604     dpf->delete_participant(dp);
00605   }
00606 
00607   RETURN_CODE_TYPE create_opendds_entities(CONNECTION_ID_TYPE connectionId,
00608                                            int participantId,
00609                                            const DDS::DomainId_t domainId,
00610                                            const char* topicName,
00611                                            const char* type,
00612                                            CONNECTION_DIRECTION_TYPE dir,
00613                                            QosSettings& qos_settings,
00614                                            const char* transport)
00615   {
00616 #ifdef DEBUG_OPENDDS_FACETSS
00617     OpenDDS::DCPS::set_DCPS_debug_level(8);
00618     OpenDDS::DCPS::Transport_debug_level = 5;
00619     TheServiceParticipant->set_BIT(false);
00620 #endif
00621 
00622     const DDS::DomainParticipantFactory_var dpf = TheParticipantFactory;
00623     if (!dpf) return INVALID_PARAM;
00624 
00625     DDS::DomainParticipant_var dp;
00626     find_or_create_dp(domainId, participantId, dpf, dp);
00627     if (!dp) return INVALID_PARAM;
00628 
00629     using OpenDDS::DCPS::Data_Types_Register;
00630     OpenDDS::DCPS::TypeSupport_var ts =
00631       Registered_Data_Types->lookup(dp, type);
00632     if (!ts) {
00633       ts = Registered_Data_Types->lookup(0, type);
00634       if (!ts) return INVALID_PARAM;
00635       Registered_Data_Types->register_type(dp, type, ts);
00636     }
00637 
00638     const DDS::Topic_var topic =
00639       dp->create_topic(topicName, type, TOPIC_QOS_DEFAULT, 0, 0);
00640     if (!topic) return INVALID_PARAM;
00641 
00642     if (dir == SOURCE) {
00643       DDS::PublisherQos publisher_qos;
00644       qos_settings.apply_to(publisher_qos);
00645 
00646       DDS::Publisher_var pub;
00647       find_or_create_pub(publisher_qos, dp, pub);
00648       if (!pub) return INVALID_PARAM;
00649 
00650       if (transport && transport[0]) {
00651         OpenDDS::DCPS::TransportConfig_rch config = TheTransportRegistry->get_config(transport);
00652         if (config.is_nil()) return INVALID_PARAM;
00653         try {
00654           TheTransportRegistry->bind_config(config, pub);
00655         } catch (const OpenDDS::DCPS::Transport::Exception&) {
00656           return INVALID_PARAM;
00657         }
00658       }
00659 
00660       DDS::DataWriterQos datawriter_qos;
00661       qos_settings.apply_to(datawriter_qos);
00662 
00663       // set up user data in DW qos
00664       datawriter_qos.user_data.value.length (3);
00665       datawriter_qos.user_data.value[0] = (connectionId >> 0) & 0xFF;
00666       datawriter_qos.user_data.value[1] = (connectionId >> 8) & 0xFF;
00667       datawriter_qos.user_data.value[2] = (connectionId >> 16) & 0xFF;
00668 
00669       const DDS::DataWriter_var dw =
00670         pub->create_datawriter(topic, datawriter_qos, 0, 0);
00671       if (!dw) return INVALID_PARAM;
00672 
00673       Entities::instance()->senders_[connectionId].dw = dw;
00674 
00675     } else { // dir == DESTINATION
00676       DDS::SubscriberQos subscriber_qos;
00677       qos_settings.apply_to(subscriber_qos);
00678 
00679       DDS::Subscriber_var sub;
00680       find_or_create_sub(subscriber_qos, dp, sub);
00681       if (!sub) return INVALID_PARAM;
00682 
00683       if (transport && transport[0]) {
00684         OpenDDS::DCPS::TransportConfig_rch config = TheTransportRegistry->get_config(transport);
00685         if (config.is_nil()) return INVALID_PARAM;
00686         try {
00687           TheTransportRegistry->bind_config(config, sub);
00688         } catch (const OpenDDS::DCPS::Transport::Exception&) {
00689           return INVALID_PARAM;
00690         }
00691       }
00692 
00693       DDS::DataReaderQos datareader_qos;
00694       qos_settings.apply_to(datareader_qos);
00695 
00696       // set up user data in DR qos
00697       datareader_qos.user_data.value.length (3);
00698       datareader_qos.user_data.value[0] = (connectionId >> 0) & 0xFF;
00699       datareader_qos.user_data.value[1] = (connectionId >> 8) & 0xFF;
00700       datareader_qos.user_data.value[2] = (connectionId >> 16) & 0xFF;
00701 
00702       const DDS::DataReader_var dr =
00703         sub->create_datareader(topic, datareader_qos, 0, 0);
00704       if (!dr) return INVALID_PARAM;
00705       Entities::instance()->receivers_[connectionId] = new Entities::FaceReceiver();
00706       Entities::instance()->receivers_[connectionId]->dr = dr;
00707     }
00708 
00709     return RC_NO_ERROR;
00710   }
00711 }
00712 
00713 }}
00714 
00715 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00716 
00717 namespace OpenDDS {
00718 namespace FaceTSS {
00719 
00720 Entities::Entities() {}
00721 Entities::~Entities() {}
00722 
00723 Entities* Entities::instance()
00724 {
00725   return ACE_Singleton<Entities, ACE_Thread_Mutex>::instance();
00726 }
00727 
00728 FACE::RETURN_CODE_TYPE update_status(FACE::CONNECTION_ID_TYPE connection_id,
00729   DDS::ReturnCode_t retcode)
00730 {
00731   FACE::TRANSPORT_CONNECTION_STATUS_TYPE& status =
00732     Entities::instance()->connections_[connection_id].connection_status;
00733   FACE::RETURN_CODE_TYPE rc = FACE::INVALID_PARAM;
00734 
00735   switch (retcode) {
00736   case DDS::RETCODE_OK:
00737     status.LAST_MSG_VALIDITY = FACE::VALID;
00738     return FACE::RC_NO_ERROR;
00739 
00740   case DDS::RETCODE_ERROR:
00741     rc = FACE::CONNECTION_CLOSED; break;
00742 
00743   case DDS::RETCODE_BAD_PARAMETER:
00744     rc = FACE::INVALID_PARAM; break;
00745 
00746   case DDS::RETCODE_OUT_OF_RESOURCES:
00747     rc = FACE::DATA_BUFFER_TOO_SMALL; break;
00748 
00749   case DDS::RETCODE_PRECONDITION_NOT_MET:
00750   case DDS::RETCODE_NOT_ENABLED:
00751     rc = FACE::INVALID_MODE; break;
00752 
00753   case DDS::RETCODE_IMMUTABLE_POLICY:
00754   case DDS::RETCODE_INCONSISTENT_POLICY:
00755     rc = FACE::INVALID_CONFIG; break;
00756 
00757   case DDS::RETCODE_ALREADY_DELETED:
00758     rc = FACE::CONNECTION_CLOSED; break;
00759 
00760   case DDS::RETCODE_TIMEOUT:
00761     rc = FACE::TIMED_OUT; break;
00762 
00763   case DDS::RETCODE_UNSUPPORTED:
00764   case DDS::RETCODE_NO_DATA:
00765     rc = FACE::NOT_AVAILABLE; break;
00766 
00767   case DDS::RETCODE_ILLEGAL_OPERATION:
00768     rc = FACE::PERMISSION_DENIED; break;
00769   }
00770 
00771   status.LAST_MSG_VALIDITY = FACE::INVALID;
00772   return rc;
00773 }
00774 
00775 enum { NSEC_PER_SEC = 1000000000 };
00776 
00777 DDS::Duration_t convertTimeout(FACE::TIMEOUT_TYPE timeout)
00778 {
00779   if (timeout == FACE::INF_TIME_VALUE) {
00780     static const DDS::Duration_t dds_inf = {DDS::DURATION_INFINITE_SEC,
00781                                             DDS::DURATION_INFINITE_NSEC};
00782     return dds_inf;
00783   }
00784 
00785   DDS::Duration_t dur = {static_cast<int>(timeout / NSEC_PER_SEC),
00786                          static_cast<unsigned int>(timeout % NSEC_PER_SEC)};
00787   return dur;
00788 }
00789 
00790 FACE::SYSTEM_TIME_TYPE convertDuration(const DDS::Duration_t& duration)
00791 {
00792   if (duration.sec == DDS::DURATION_INFINITE_SEC
00793       && duration.nanosec == DDS::DURATION_INFINITE_NSEC) {
00794     return FACE::INF_TIME_VALUE;
00795   }
00796   return duration.nanosec +
00797     duration.sec * static_cast<FACE::SYSTEM_TIME_TYPE>(NSEC_PER_SEC);
00798 }
00799 
00800 FACE::SYSTEM_TIME_TYPE convertTime(const DDS::Time_t& timestamp)
00801 {
00802   return timestamp.nanosec +
00803       timestamp.sec * static_cast<FACE::SYSTEM_TIME_TYPE>(NSEC_PER_SEC);
00804 }
00805 
00806 FACE::MESSAGE_INSTANCE_GUID
00807 create_message_instance_guid(const OpenDDS::DCPS::RepoId& pub, const CORBA::LongLong& orig_seq)
00808 {
00809   OpenDDS::DCPS::GuidConverter writer(pub);
00810 
00811   FACE::MESSAGE_INSTANCE_GUID message_instance_guid;
00812   FACE::LongLong mig_low;
00813   FACE::LongLong masked_seq;
00814 
00815   //Until MESSAGE_INSTANCE_GUID becomes 128 bit GUID, use checksum to represent Prefix
00816   FACE::Long prefix_representation = ACE::crc32(reinterpret_cast<const void*>(&pub), sizeof(pub));
00817   masked_seq = orig_seq >> 32;
00818 
00819   if (masked_seq) {
00820     ACE_DEBUG((LM_WARNING, "(%P|%t) WARNING: create_message_instance_guid - seq does not fit in FACE::Long, truncating high bits to fit\n"));
00821   }
00822   mig_low = orig_seq & 0xFFFFFFFF;
00823   message_instance_guid = (((FACE::LongLong) prefix_representation) << 32 ) | ((FACE::LongLong) mig_low);
00824 /*
00825   //TODO: This is initial work toward defining how a 128 bit guid could be created
00826   //in the future for a Message Instance Guid when supported.
00827   // 13 byte prefix contains identifying pieces of guid
00828   // 3 byte seq - truncated sequence from the sample
00829   typedef CORBA::Octet MsgInstGuidPrefix_t[13];
00830   typedef CORBA::Octet MsgInstGuidSeq_t[3];
00831   MsgInstGuidPrefix_t migPrefix;
00832   MsgInstGuidSeq_t migSeq;
00833   ACE_OS::memcpy(&migPrefix[0], &pub.guidPrefix[2], 10);
00834   ACE_OS::memcpy(&migPrefix[10], &pub.entityId.entityKey[0], 3);
00835   masked_seq = orig_seq >> 24;
00836 
00837   if (masked_seq) {
00838     ACE_DEBUG((LM_WARNING, "(%P|%t) WARNING: create_message_instance_guid - seq does not fit in 3 bytes, truncating high bits to fit\n"));
00839   }
00840   FACE::LongLong masked = orig_seq & 0xFFFFFF;
00841 
00842 #ifdef ACE_BIG_ENDIAN
00843   masked <<= 8 * (sizeof(FACE::LongLong)-3);
00844 #endif
00845   ACE_OS::memcpy(&migSeq[0], &masked, 3);
00846 */
00847   return message_instance_guid;
00848 }
00849 
00850 void populate_header_received(const FACE::CONNECTION_ID_TYPE& connection_id,
00851                               const DDS::DomainParticipant_var part,
00852                               const DDS::SampleInfo& sinfo,
00853                               FACE::RETURN_CODE_TYPE& return_code)
00854 {
00855   Entities::ConnIdToReceiverMap& readers = Entities::instance()->receivers_;
00856   if (!readers.count(connection_id)) {
00857     return_code = FACE::INVALID_PARAM;
00858     return;
00859   }
00860   FACE::TS::MessageHeader& header = readers[connection_id]->last_msg_header;
00861 
00862   header.platform_view_guid = Entities::instance()->connections_[connection_id].platform_view_guid;
00863 
00864   DDS::Subscriber_var temp_sub = readers[connection_id]->dr->get_subscriber();
00865   DDS::DomainParticipant_var temp_dp = temp_sub->get_participant();
00866   OpenDDS::DCPS::DomainParticipantImpl* dpi = dynamic_cast<OpenDDS::DCPS::DomainParticipantImpl*>(temp_dp.in());
00867   if (!dpi) {
00868     ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) populate_header_received: ")
00869       ACE_TEXT("failed to get DomainParticipantImpl.\n")));
00870     return_code = FACE::NOT_AVAILABLE;
00871     return;
00872   }
00873   const OpenDDS::DCPS::RepoId pub = dpi->get_repoid(sinfo.publication_handle);
00874   header.message_instance_guid = create_message_instance_guid(pub, sinfo.opendds_reserved_publication_seq);
00875 
00876   header.message_timestamp = convertTime(sinfo.source_timestamp);
00877   ACE_Time_Value now(ACE_OS::gettimeofday());
00878 
00879   readers[connection_id]->sum_recvd_msgs_latency += (convertTime(OpenDDS::DCPS::time_value_to_time(now)) - header.message_timestamp);
00880   ++readers[connection_id]->total_msgs_recvd;
00881 
00882   if (OpenDDS::DCPS::DCPS_debug_level > 8) {
00883     ACE_DEBUG((LM_DEBUG, "(%P|%t) populate_header_received: Latency is now (tot_latency %d / tot_msgs_recvd %d): %d\n",
00884         readers[connection_id]->sum_recvd_msgs_latency,
00885         readers[connection_id]->total_msgs_recvd,
00886         readers[connection_id]->sum_recvd_msgs_latency/readers[connection_id]->total_msgs_recvd));
00887   }
00888 
00889   DDS::UserDataQosPolicy qos_user_data;
00890   DDS::LifespanQosPolicy qos_lifespan;
00891   const OpenDDS::DCPS::RepoId sub = dpi->get_id();
00892 
00893   // Test if the reader and writer share a participant
00894   if (std::memcmp(pub.guidPrefix, sub.guidPrefix, sizeof(sub.guidPrefix)) == 0) {
00895 
00896     Entities::ConnIdToSenderMap& writers = Entities::instance()->senders_;
00897     Entities::ConnIdToSenderMap::iterator wtrIter = writers.begin();
00898 
00899     // Search domain writers for the sending instance
00900     while (wtrIter != writers.end()) {
00901       if (wtrIter->second.dw->get_instance_handle() == sinfo.publication_handle) {
00902         break;
00903       }
00904       ++wtrIter;
00905     }
00906 
00907     if (wtrIter == writers.end()) {
00908       ACE_ERROR((LM_ERROR, "(%P|%t) populate_header_received: failed to find matching DataWriter instance.\n"));
00909       return_code = FACE::NOT_AVAILABLE;
00910       return;
00911     }
00912 
00913     DDS::DataWriterQos qos;
00914     wtrIter->second.dw->get_qos(qos);
00915     qos_lifespan = qos.lifespan;
00916     qos_user_data = qos.user_data;
00917 
00918   } else {
00919     ::DDS::Subscriber_var bit_subscriber
00920      = part->get_builtin_subscriber () ;
00921 
00922     ::DDS::DataReader_var reader
00923      = bit_subscriber->lookup_datareader (OpenDDS::DCPS::BUILT_IN_PUBLICATION_TOPIC) ;
00924     ::DDS::PublicationBuiltinTopicDataDataReader_var pub_reader
00925      = ::DDS::PublicationBuiltinTopicDataDataReader::_narrow (reader.in ());
00926     if (CORBA::is_nil (pub_reader.in ())) {
00927       ACE_ERROR((LM_ERROR, "(%P|%t) populate_header_received: failed to get BUILT_IN_PUBLICATION_TOPIC datareader.\n"));
00928       return_code = FACE::NOT_AVAILABLE;
00929       return;
00930     }
00931 
00932     ::DDS::ReturnCode_t ret;
00933     ::DDS::SampleInfoSeq pubinfos(1);
00934     ::DDS::PublicationBuiltinTopicDataSeq pubdata(1);
00935 
00936     ret = pub_reader->read_instance(pubdata,
00937                                     pubinfos,
00938                                     1,
00939                                     sinfo.publication_handle,
00940                                     ::DDS::ANY_SAMPLE_STATE,
00941                                     ::DDS::ANY_VIEW_STATE,
00942                                     ::DDS::ALIVE_INSTANCE_STATE);
00943 
00944 
00945     if (ret != ::DDS::RETCODE_OK && ret != ::DDS::RETCODE_NO_DATA) {
00946       ACE_ERROR((LM_ERROR,
00947           "(%P|%t) populate_header_received:  failed to read BIT publication data.\n"));
00948       return_code = FACE::NOT_AVAILABLE;
00949       return;
00950     }
00951 
00952     const CORBA::ULong i = 0;
00953     qos_lifespan = pubdata[i].lifespan;
00954     qos_user_data = pubdata[i].user_data;
00955   }
00956 
00957   header.message_source_guid =
00958     (qos_user_data.value[0] << 0) |
00959     (qos_user_data.value[1] << 8) |
00960     (qos_user_data.value[2] << 16);
00961 
00962 //  ACE_DEBUG((LM_DEBUG, "(%P|%t) populate_header_received: DW lifespan qos value: sec: %d nanosec: %d\n",
00963 //                         pubdata[i].lifespan.duration.sec, pubdata[i].lifespan.duration.nanosec));
00964 
00965   DDS::Duration_t lifespan = qos_lifespan.duration;
00966   if (lifespan.sec != DDS::DURATION_INFINITE_SEC &&
00967       lifespan.nanosec != DDS::DURATION_INFINITE_NSEC) {
00968     // Finite lifespan.  Check if data has expired.
00969 
00970     DDS::Time_t const tmp = {
00971       sinfo.source_timestamp.sec + lifespan.sec,
00972       sinfo.source_timestamp.nanosec + lifespan.nanosec
00973     };
00974 
00975     // We assume that the publisher host's clock and subscriber host's
00976     // clock are synchronized (allowed by the spec).
00977     ACE_Time_Value const expiration_time(
00978         OpenDDS::DCPS::time_to_time_value(tmp));
00979 
00980     if (now >= expiration_time) {
00981 //      ACE_DEBUG((LM_DEBUG, "(%P|%t) populate_header_received: Last message expired, setting message_validity to INVALID\n"));
00982       header.message_validity = FACE::INVALID;
00983       return_code = FACE::RC_NO_ERROR;
00984       return;
00985     }
00986   }
00987 //  ACE_DEBUG((LM_DEBUG, "(%P|%t) populate_header_received: Setting message_validity to VALID\n"));
00988   header.message_validity = FACE::VALID;
00989   return_code = FACE::RC_NO_ERROR;
00990 }
00991 }}
00992 
00993 OPENDDS_END_VERSIONED_NAMESPACE_DECL
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1