FaceTSS.cpp

Go to the documentation of this file.
00001 #include "ace/config-macros.h"
00002 #include "FACE/TS.hpp"
00003 #include "FaceTSS.h"
00004 #include "config/Parser.h"
00005 
00006 #include "dds/DCPS/Service_Participant.h"
00007 #include "dds/DCPS/DomainParticipantImpl.h"
00008 #include "dds/DCPS/Registered_Data_Types.h"
00009 #include "dds/DCPS/Marked_Default_Qos.h"
00010 #include "dds/DCPS/BuiltInTopicUtils.h"
00011 #include "dds/DCPS/SafetyProfileStreams.h"
00012 #include "dds/DCPS/SafetyProfilePool.h"
00013 #include "dds/DCPS/Qos_Helper.h"
00014 #include "dds/DdsDcpsCoreC.h"
00015 #include "dds/DCPS/transport/framework/TransportRegistry.h"
00016 
00017 #include <cstring>
00018 
00019 #ifndef OPENDDS_SAFETY_PROFILE
00020 using OpenDDS::DCPS::operator==;
00021 #endif
00022 
00023 namespace FACE {
00024 namespace TS {
00025 
00026 bool MessageHeader::operator==(const MessageHeader& rhs) const
00027 {
00028   return message_instance_guid == rhs.message_instance_guid
00029     && platform_view_guid == rhs.platform_view_guid
00030     && message_source_guid == rhs.message_source_guid
00031     && message_timestamp == rhs.message_timestamp
00032     && message_validity == rhs.message_validity;
00033 }
00034 
00035 using OpenDDS::FaceTSS::config::ConnectionSettings;
00036 using OpenDDS::FaceTSS::config::TopicSettings;
00037 using OpenDDS::FaceTSS::config::QosSettings;
00038 
00039 namespace {
00040   OpenDDS::FaceTSS::config::Parser parser;
00041 
00042   void find_or_create_dp(const DDS::DomainId_t& domainId,
00043                          int participantId,
00044                          const DDS::DomainParticipantFactory_var& dpf,
00045                          DDS::DomainParticipant_var& dp);
00046   void find_or_create_pub(const DDS::PublisherQos& qos,
00047                           const DDS::DomainParticipant_var& dp,
00048                           DDS::Publisher_var& pub);
00049   void find_or_create_sub(const DDS::SubscriberQos& qos,
00050                           const DDS::DomainParticipant_var& dp,
00051                           DDS::Subscriber_var& sub);
00052 
00053   bool cleanup_opendds_publisher(const DDS::Publisher_var pub);
00054   bool cleanup_opendds_subscriber(const DDS::Subscriber_var sub);
00055   void cleanup_opendds_participant(const DDS::DomainParticipant_var dp);
00056 
00057   RETURN_CODE_TYPE create_opendds_entities(CONNECTION_ID_TYPE connectionId,
00058                                            int participantId,
00059                                            const DDS::DomainId_t domainId,
00060                                            const char* topic,
00061                                            const char* type,
00062                                            CONNECTION_DIRECTION_TYPE dir,
00063                                            QosSettings& qos,
00064                                            const char* transport);
00065 }
00066 
00067 using OpenDDS::FaceTSS::Entities;
00068 
00069 void Initialize(const CONFIGURATION_RESOURCE configuration_file,
00070                 RETURN_CODE_TYPE& return_code)
00071 {
00072   int status = parser.parse(configuration_file);
00073   if (status != 0) {
00074     ACE_ERROR((LM_ERROR,
00075                ACE_TEXT("(%P|%t) ERROR: Initialize() ")
00076                ACE_TEXT("Parser::parse () returned %d\n"),
00077                status));
00078     return_code = INVALID_PARAM;
00079   } else {
00080     return_code = RC_NO_ERROR;
00081 #if defined OPENDDS_SAFETY_PROFILE && defined ACE_HAS_ALLOC_HOOKS
00082     TheServiceParticipant->configure_pool();
00083 #endif
00084   }
00085 }
00086 
00087 void Create_Connection(const CONNECTION_NAME_TYPE connection_name,
00088                        MESSAGING_PATTERN_TYPE pattern,
00089                        CONNECTION_ID_TYPE& connection_id,
00090                        CONNECTION_DIRECTION_TYPE& connection_direction,
00091                        MESSAGE_SIZE_TYPE& max_message_size,
00092                        TIMEOUT_TYPE,
00093                        RETURN_CODE_TYPE& return_code)
00094 {
00095   return_code = RC_NO_ERROR;
00096 
00097   if (pattern != PUB_SUB) {
00098     return_code = INVALID_CONFIG;
00099     return;
00100   }
00101 
00102   ConnectionSettings connection;
00103   TopicSettings topic;
00104   QosSettings qos;
00105 
00106   // Find connection
00107   if (!parser.find_connection(connection_name, connection)) {
00108     // Find topic
00109     if (!parser.find_topic(connection.topic_name_, topic)) {
00110       // Find Qos by specified name(s)
00111       if (parser.find_qos(connection, qos)) {
00112         return_code = INVALID_CONFIG;
00113       }
00114     } else {
00115       return_code = INVALID_CONFIG;
00116     }
00117   } else {
00118     return_code = INVALID_CONFIG;
00119   }
00120 
00121   if (return_code != RC_NO_ERROR) {
00122     return;
00123   }
00124 
00125   connection_id = connection.connection_id_;
00126   connection_direction = connection.direction_;
00127   max_message_size = topic.max_message_size_;
00128 
00129   return_code = create_opendds_entities(connection_id,
00130                                         connection.participant_id_,
00131                                         connection.domain_id_,
00132                                         connection.topic_name_,
00133                                         topic.type_name_,
00134                                         connection_direction,
00135                                         qos,
00136                                         connection.config_name_);
00137   if (return_code != RC_NO_ERROR) {
00138     return;
00139   }
00140 
00141   const SYSTEM_TIME_TYPE refresh_period =
00142     (connection_direction == SOURCE) ?
00143     OpenDDS::FaceTSS::convertDuration(qos.datawriter_qos().lifespan.duration) : 0;
00144 
00145   const TRANSPORT_CONNECTION_STATUS_TYPE status = {
00146     0, // MESSAGE currently set to 0 due to type mismatch in MESSAGE_RANGE_TYPE and MESSAGE_TYPE_GUID
00147     max_message_size, // MAX_MESSAGE with no transformations, set to config value
00148     max_message_size,
00149     connection_direction,
00150     0, // WAITING_PROCESSES_OR_MESSAGES
00151     refresh_period,
00152     INVALID,
00153   };
00154   Entities::ConnectionInfo& conn_info = Entities::instance()->connections_[connection_id];
00155   conn_info.connection_name = connection_name;
00156   conn_info.connection_status = status;
00157   conn_info.platform_view_guid = topic.platform_view_guid_;
00158 }
00159 
00160 void Get_Connection_Parameters(CONNECTION_NAME_TYPE& connection_name,
00161                                CONNECTION_ID_TYPE& connection_id /* 0 if an out param */,
00162                                TRANSPORT_CONNECTION_STATUS_TYPE& status,
00163                                RETURN_CODE_TYPE& return_code)
00164 {
00165   // connection_name is optional, if absent populate from connection_id lookup
00166   // connection_id is also optional, if absent populate from connection_name lookup
00167   // if both provided, validate
00168   // if neither present, return error
00169   Entities& entities = *Entities::instance();
00170 
00171   if (connection_id != 0 && entities.connections_.count(connection_id)) {
00172     // connection_id was provided
00173     // if validated or populated, set return_code so status will be populated
00174     if (connection_name[0]) {
00175       // Validate provided connection_name
00176       OPENDDS_STRING conn_name = entities.connections_[connection_id].connection_name;
00177       if (std::strcmp(connection_name, conn_name.c_str()) == 0) {
00178         return_code = RC_NO_ERROR;
00179       } else {
00180         return_code = INVALID_PARAM;
00181       }
00182     } else {
00183       // connection_name not provided
00184       // so populate from connection_id lookup
00185       // and set return code so status will be populated
00186       entities.connections_[connection_id].connection_name.copy(connection_name,
00187                                                       sizeof(CONNECTION_NAME_TYPE));
00188       connection_name[sizeof(CONNECTION_NAME_TYPE) - 1] = 0;
00189       return_code = RC_NO_ERROR;
00190     }
00191 
00192   } else if (connection_name[0] && connection_id == 0) {
00193     // connection_id was not specified, but name was provided.
00194     // lookup connection_id and if found set return code to populate status
00195     ConnectionSettings settings;
00196     if (0 == parser.find_connection(connection_name, settings)) {
00197       connection_id = settings.connection_id_;
00198       return_code = RC_NO_ERROR;
00199     } else {
00200       // could not find connection for connection_name
00201       return_code = INVALID_PARAM;
00202     }
00203   } else {
00204     //Neither connection_id or connection_name provided
00205     // a valid connection
00206     return_code = INVALID_PARAM;
00207   }
00208   if (return_code == RC_NO_ERROR) {
00209     TRANSPORT_CONNECTION_STATUS_TYPE& cur_status = entities.connections_[connection_id].connection_status;
00210     if (cur_status.CONNECTION_DIRECTION == FACE::DESTINATION) {
00211       Entities::FaceReceiver& receiver = *entities.receivers_[connection_id];
00212       if (receiver.status_valid != FACE::VALID) {
00213         if (OpenDDS::DCPS::DCPS_debug_level > 3) {
00214           ACE_DEBUG((LM_DEBUG, "Get_Connection_Parameters: returning NOT_AVAILABLE due to receiver's status not valid\n"));
00215         }
00216         return_code = NOT_AVAILABLE;
00217         return;
00218       }
00219       if (receiver.total_msgs_recvd != 0) {
00220         cur_status.REFRESH_PERIOD = receiver.sum_recvd_msgs_latency/receiver.total_msgs_recvd;
00221         cur_status.LAST_MSG_VALIDITY = receiver.last_msg_header.message_validity;
00222       } else {
00223         cur_status.REFRESH_PERIOD = 0;
00224       }
00225       WAITING_RANGE_TYPE num_waiting;
00226       if (receiver.messages_waiting(num_waiting) == FACE::RC_NO_ERROR) {
00227           cur_status.WAITING_PROCESSES_OR_MESSAGES = num_waiting;
00228       } else {
00229         if (OpenDDS::DCPS::DCPS_debug_level > 3) {
00230           ACE_DEBUG((LM_DEBUG, "Get_Connection_Parameters: returning NOT_AVAILABLE due to messages_waiting\n"));
00231         }
00232         return_code = NOT_AVAILABLE;
00233         return;
00234       }
00235     } else {
00236       //DDS Only supports Destination/Source therefore
00237       // CONNECTION_DIRECTION == FACE::SOURCE
00238       Entities::FaceSender& sender = entities.senders_[connection_id];
00239       if (sender.status_valid != FACE::VALID) {
00240         return_code = NOT_AVAILABLE;
00241         return;
00242       }
00243       cur_status.REFRESH_PERIOD = 0;
00244       cur_status.WAITING_PROCESSES_OR_MESSAGES = 0;
00245     }
00246     status = cur_status;
00247   }
00248 }
00249 
00250 void Unregister_Callback(CONNECTION_ID_TYPE connection_id,
00251                          RETURN_CODE_TYPE& return_code)
00252 {
00253   Entities& entities = *Entities::instance();
00254   Entities::ConnIdToReceiverMap& readers = entities.receivers_;
00255   if (readers.count(connection_id)) {
00256     readers[connection_id]->dr->set_listener(NULL, 0);
00257     return_code = RC_NO_ERROR;
00258     return;
00259   }
00260   return_code = INVALID_PARAM;
00261 }
00262 
00263 void Destroy_Connection(CONNECTION_ID_TYPE connection_id,
00264                         RETURN_CODE_TYPE& return_code)
00265 {
00266   Entities& entities = *Entities::instance();
00267   Entities::ConnIdToSenderMap& writers = entities.senders_;
00268   Entities::ConnIdToReceiverMap& readers = entities.receivers_;
00269 
00270   DDS::DomainParticipant_var dp;
00271   bool try_cleanup_participant = false;
00272   if (writers.count(connection_id)) {
00273     const DDS::DataWriter_var datawriter = writers[connection_id].dw;
00274     const DDS::Publisher_var pub = datawriter->get_publisher();
00275     writers.erase(connection_id);
00276     pub->delete_datawriter(datawriter);
00277     dp = pub->get_participant();
00278     try_cleanup_participant = cleanup_opendds_publisher(pub);
00279 
00280   } else if (readers.count(connection_id)) {
00281     const DDS::DataReader_var datareader = readers[connection_id]->dr;
00282     const DDS::Subscriber_var sub = datareader->get_subscriber();
00283     delete readers[connection_id];
00284     readers.erase(connection_id);
00285     sub->delete_datareader(datareader);
00286     dp = sub->get_participant();
00287     try_cleanup_participant = cleanup_opendds_subscriber(sub);
00288   }
00289 
00290   if (!dp) {
00291     return_code = INVALID_PARAM;
00292     return;
00293   }
00294 
00295   if (try_cleanup_participant) {
00296     cleanup_opendds_participant(dp);
00297   }
00298 
00299   entities.connections_.erase(connection_id);
00300   return_code = RC_NO_ERROR;
00301 }
00302 OpenDDS_FACE_Export
00303 void receive_header(/*in*/    FACE::CONNECTION_ID_TYPE connection_id,
00304                     /*in*/    FACE::TIMEOUT_TYPE /*timeout*/,
00305                     /*inout*/ FACE::TRANSACTION_ID_TYPE& transaction_id,
00306                     /*inout*/ FACE::TS::MessageHeader& message_header,
00307                     /*in*/    FACE::MESSAGE_SIZE_TYPE message_size,
00308                     /*out*/   FACE::RETURN_CODE_TYPE& return_code)
00309 {
00310   Entities::ConnIdToReceiverMap& readers =
00311     Entities::instance()->receivers_;
00312   // transaction_id cannot be 0 due to initialization
00313   // of last_msg_tid to 0 before a msg has been received so
00314   // only valid transaction_ids are > 0.
00315   if (!readers.count(connection_id) || transaction_id == 0) {
00316     if (OpenDDS::DCPS::DCPS_debug_level > 3) {
00317       ACE_DEBUG((LM_DEBUG, "(%P|%t) receive_header - INVALID_PARAM - "
00318           "could not find reader for connection_id: %d OR transaction id[%d] == 0 \n",
00319           connection_id,
00320           transaction_id));
00321     }
00322     return_code = FACE::INVALID_PARAM;
00323     return;
00324   }
00325 
00326   if (message_size < 0 || (unsigned)message_size < sizeof(FACE::TS::MessageHeader)) {
00327     if (OpenDDS::DCPS::DCPS_debug_level) {
00328       ACE_DEBUG((LM_DEBUG, "(%P|%t) receive_header - INVALID_PARAM - message_size: %d is < %d \n",
00329           message_size,
00330           sizeof(FACE::TS::MessageHeader)));
00331     }
00332     return_code = FACE::INVALID_PARAM;
00333     return;
00334   }
00335   if (transaction_id == readers[connection_id]->last_msg_tid) {
00336     message_header = readers[connection_id]->last_msg_header;
00337     return_code = OpenDDS::FaceTSS::update_status(connection_id, DDS::RETCODE_OK);
00338   } else {
00339     return_code = OpenDDS::FaceTSS::update_status(connection_id, DDS::RETCODE_BAD_PARAMETER);
00340   }
00341 }
00342 
00343 void Receive_Message(
00344   /* in */ CONNECTION_ID_TYPE connection_id,
00345   /* in */ TIMEOUT_TYPE timeout,
00346   /* inout */ TRANSACTION_ID_TYPE& transaction_id,
00347   /* out */ MessageHeader& message_header,
00348   /* in */ MESSAGE_SIZE_TYPE message_size,
00349   /* out */ RETURN_CODE_TYPE& return_code)
00350 {
00351   receive_header(connection_id, timeout,
00352                  transaction_id, message_header,
00353                  message_size, return_code);
00354 }
00355 
00356 namespace {
00357   void find_or_create_dp(const DDS::DomainId_t& domainId,
00358                          int participantId,
00359                          const DDS::DomainParticipantFactory_var& dpf,
00360                          DDS::DomainParticipant_var& dp) {
00361     DDS::DomainParticipant_var temp_dp;
00362 
00363     temp_dp = dpf->lookup_participant(domainId);
00364     if (!temp_dp) {
00365       if (OpenDDS::DCPS::DCPS_debug_level > 3) {
00366         ACE_DEBUG((LM_DEBUG, "(%P|%t) find_or_create_dp - created new participant for domainId: %d\n", domainId));
00367       }
00368       DDS::DomainParticipantQos qos(PARTICIPANT_QOS_DEFAULT);
00369       qos.user_data.value.length(6);
00370       qos.user_data.value[0] = (participantId >> 0) & 0xFF;
00371       qos.user_data.value[1] = (participantId >> 8) & 0xFF;
00372       qos.user_data.value[2] = (participantId >> 16) & 0xFF;
00373       qos.user_data.value[3] = (participantId >> 24) & 0xFF;
00374       qos.user_data.value[4] = 0; // (participantId >> 32) & 0xFF;
00375       qos.user_data.value[5] = 0; // (participantId >> 40) & 0xFF;
00376       dp = dpf->create_participant(domainId, qos, 0, 0);
00377       return;
00378     }
00379     if (OpenDDS::DCPS::DCPS_debug_level > 3) {
00380       ACE_DEBUG((LM_DEBUG, "(%P|%t) find_or_create_dp - found exiting participant for domainId: %d \n", domainId));
00381     }
00382     dp = temp_dp;
00383   }
00384 
00385   void find_or_create_pub(const DDS::PublisherQos& qos,
00386                           const DDS::DomainParticipant_var& dp,
00387                           DDS::Publisher_var& pub) {
00388     DDS::DomainParticipant_var temp_dp;
00389     DDS::Publisher_var temp_pub;
00390 
00391     Entities::ConnIdToSenderMap& writers = Entities::instance()->senders_;
00392     Entities::ConnIdToSenderMap::iterator wtrIter = writers.begin();
00393 
00394     while (wtrIter != writers.end()) {
00395       temp_pub = wtrIter->second.dw->get_publisher();
00396       temp_dp = temp_pub->get_participant();
00397       DDS::PublisherQos temp_qos;
00398       temp_pub->get_qos(temp_qos);
00399       if (dp->get_domain_id() == temp_dp->get_domain_id() &&
00400           temp_qos == qos) {
00401         if (OpenDDS::DCPS::DCPS_debug_level > 3) {
00402           ACE_DEBUG((LM_DEBUG, "(%P|%t) find_or_create_pub - found exiting publisher in senders\n"));
00403         }
00404         pub = temp_pub;
00405         return;
00406       } else {
00407         ++wtrIter;
00408       }
00409     }
00410     if (OpenDDS::DCPS::DCPS_debug_level > 3) {
00411       ACE_DEBUG((LM_DEBUG, "(%P|%t) find_or_create_pub - created new publisher\n"));
00412     }
00413     pub = dp->create_publisher(qos, 0, 0);
00414   }
00415 
00416   void find_or_create_sub(const DDS::SubscriberQos& qos,
00417                           const DDS::DomainParticipant_var& dp,
00418                           DDS::Subscriber_var& sub)
00419   {
00420     DDS::DomainParticipant_var temp_dp;
00421     DDS::Subscriber_var temp_sub;
00422 
00423     Entities::ConnIdToReceiverMap& readers = Entities::instance()->receivers_;
00424     Entities::ConnIdToReceiverMap::iterator rdrIter = readers.begin();
00425 
00426     while (rdrIter != readers.end()) {
00427       temp_sub = rdrIter->second->dr->get_subscriber();
00428       temp_dp = temp_sub->get_participant();
00429       DDS::SubscriberQos temp_qos;
00430       temp_sub->get_qos(temp_qos);
00431       if (dp->get_domain_id() == temp_dp->get_domain_id() &&
00432           temp_qos == qos) {
00433         if (OpenDDS::DCPS::DCPS_debug_level > 3) {
00434           ACE_DEBUG((LM_DEBUG, "(%P|%t) find_or_create_sub - found exiting subscriber in receivers\n"));
00435         }
00436         sub = temp_sub;
00437         return;
00438       } else {
00439         ++rdrIter;
00440       }
00441     }
00442     if (OpenDDS::DCPS::DCPS_debug_level > 3) {
00443       ACE_DEBUG((LM_DEBUG, "(%P|%t) find_or_create_sub - created new subscriber\n"));
00444     }
00445     sub = dp->create_subscriber(qos, 0, 0);
00446   }
00447 
00448   bool cleanup_opendds_publisher(const DDS::Publisher_var pub)
00449   {
00450     DDS::Publisher_var temp_pub;
00451     DDS::PublisherQos pub_qos;
00452     pub->get_qos(pub_qos);
00453     DDS::DomainParticipant_var dp = pub->get_participant();
00454     DDS::DomainParticipant_var temp_dp;
00455     DDS::PublisherQos temp_qos;
00456 
00457     Entities::ConnIdToSenderMap& writers = Entities::instance()->senders_;
00458     Entities::ConnIdToSenderMap::iterator wtrIter = writers.begin();
00459     while (wtrIter != writers.end()) {
00460       temp_pub = wtrIter->second.dw->get_publisher();
00461       temp_dp = temp_pub->get_participant();
00462       if (dp->get_domain_id() == temp_dp->get_domain_id()) {
00463         temp_pub->get_qos(temp_qos);
00464         if (pub_qos == temp_qos) {
00465           if (OpenDDS::DCPS::DCPS_debug_level > 3) {
00466             ACE_DEBUG((LM_DEBUG, "(%P|%t) cleanup_opendds_publisher - publisher still in use by other writer\n"));
00467           }
00468           return false;
00469         }
00470       }
00471       ++wtrIter;
00472     }
00473     if (OpenDDS::DCPS::DCPS_debug_level > 3) {
00474       ACE_DEBUG((LM_DEBUG, "(%P|%t) cleanup_opendds_publisher - publisher no longer in use, delete pub\n"));
00475     }
00476     dp->delete_publisher(pub);
00477     return true;
00478   }
00479 
00480   bool cleanup_opendds_subscriber(const DDS::Subscriber_var sub)
00481   {
00482     DDS::Subscriber_var temp_sub;
00483     DDS::SubscriberQos sub_qos;
00484     sub->get_qos(sub_qos);
00485     DDS::DomainParticipant_var dp = sub->get_participant();
00486     DDS::DomainParticipant_var temp_dp;
00487     DDS::SubscriberQos temp_qos;
00488 
00489 
00490     Entities::ConnIdToReceiverMap& readers = Entities::instance()->receivers_;
00491     Entities::ConnIdToReceiverMap::iterator rdrIter = readers.begin();
00492     while (rdrIter != readers.end()) {
00493       temp_sub = rdrIter->second->dr->get_subscriber();
00494       temp_dp = temp_sub->get_participant();
00495 
00496       if (dp->get_domain_id() == temp_dp->get_domain_id()) {
00497         temp_sub->get_qos(temp_qos);
00498         if (sub_qos == temp_qos) {
00499           if (OpenDDS::DCPS::DCPS_debug_level > 3) {
00500             ACE_DEBUG((LM_DEBUG, "(%P|%t) cleanup_opendds_subscriber - subscriber still in use by other reader\n"));
00501           }
00502           return false;
00503         }
00504       }
00505       ++rdrIter;
00506     }
00507     if (OpenDDS::DCPS::DCPS_debug_level > 3) {
00508       ACE_DEBUG((LM_DEBUG, "(%P|%t) cleanup_opendds_subscriber - subscriber no longer in use, delete sub\n"));
00509     }
00510     dp->delete_subscriber(sub);
00511     return true;
00512   }
00513 
00514   void cleanup_opendds_participant(const DDS::DomainParticipant_var dp)
00515   {
00516     DDS::DomainParticipant_var temp_dp;
00517     Entities::ConnIdToReceiverMap& readers = Entities::instance()->receivers_;
00518     Entities::ConnIdToReceiverMap::iterator rdrIter = readers.begin();
00519     while (rdrIter != readers.end()) {
00520       temp_dp = rdrIter->second->dr->get_subscriber()->get_participant();
00521       if (dp->get_domain_id() == temp_dp->get_domain_id()) {
00522         if (OpenDDS::DCPS::DCPS_debug_level > 3) {
00523           ACE_DEBUG((LM_DEBUG, "(%P|%t) cleanup_opendds_participant - participant still in use by reader\n"));
00524         }
00525         return;
00526       } else {
00527         ++rdrIter;
00528       }
00529     }
00530 
00531     Entities::ConnIdToSenderMap& writers = Entities::instance()->senders_;
00532     Entities::ConnIdToSenderMap::iterator wtrIter = writers.begin();
00533 
00534     while (wtrIter != writers.end()) {
00535       temp_dp = wtrIter->second.dw->get_publisher()->get_participant();
00536       if (dp->get_domain_id() == temp_dp->get_domain_id()) {
00537         if (OpenDDS::DCPS::DCPS_debug_level > 3) {
00538           ACE_DEBUG((LM_DEBUG, "(%P|%t) cleanup_opendds_participant - participant still in use by writer\n"));
00539         }
00540         return;
00541       } else {
00542         ++wtrIter;
00543       }
00544     }
00545     if (OpenDDS::DCPS::DCPS_debug_level > 3) {
00546       ACE_DEBUG((LM_DEBUG, "(%P|%t) cleanup_opendds_participant - participant for domain: %d no longer in use, delete entities and participant\n", dp->get_domain_id()));
00547     }
00548     dp->delete_contained_entities();
00549     const DDS::DomainParticipantFactory_var dpf = TheParticipantFactory;
00550     dpf->delete_participant(dp);
00551   }
00552 
00553   RETURN_CODE_TYPE create_opendds_entities(CONNECTION_ID_TYPE connectionId,
00554                                            int participantId,
00555                                            const DDS::DomainId_t domainId,
00556                                            const char* topicName,
00557                                            const char* type,
00558                                            CONNECTION_DIRECTION_TYPE dir,
00559                                            QosSettings& qos_settings,
00560                                            const char* transport)
00561   {
00562 #ifdef DEBUG_OPENDDS_FACETSS
00563     OpenDDS::DCPS::set_DCPS_debug_level(8);
00564     OpenDDS::DCPS::Transport_debug_level = 5;
00565     TheServiceParticipant->set_BIT(false);
00566 #endif
00567 
00568     const DDS::DomainParticipantFactory_var dpf = TheParticipantFactory;
00569     if (!dpf) return INVALID_PARAM;
00570 
00571     DDS::DomainParticipant_var dp;
00572     find_or_create_dp(domainId, participantId, dpf, dp);
00573     if (!dp) return INVALID_PARAM;
00574 
00575     using OpenDDS::DCPS::Data_Types_Register;
00576     OpenDDS::DCPS::TypeSupport* ts =
00577       Registered_Data_Types->lookup(dp, type);
00578     if (!ts) {
00579       ts = Registered_Data_Types->lookup(0, type);
00580       if (!ts) return INVALID_PARAM;
00581       Registered_Data_Types->register_type(dp, type, ts);
00582     }
00583 
00584     const DDS::Topic_var topic =
00585       dp->create_topic(topicName, type, TOPIC_QOS_DEFAULT, 0, 0);
00586     if (!topic) return INVALID_PARAM;
00587 
00588     if (dir == SOURCE) {
00589       DDS::PublisherQos publisher_qos;
00590       qos_settings.apply_to(publisher_qos);
00591 
00592       DDS::Publisher_var pub;
00593       find_or_create_pub(publisher_qos, dp, pub);
00594       if (!pub) return INVALID_PARAM;
00595 
00596       if (transport && transport[0]) {
00597         OpenDDS::DCPS::TransportConfig_rch config = TheTransportRegistry->get_config(transport);
00598         if (config.is_nil()) return INVALID_PARAM;
00599         TheTransportRegistry->bind_config(config, pub);
00600       }
00601 
00602       DDS::DataWriterQos datawriter_qos;
00603       qos_settings.apply_to(datawriter_qos);
00604 
00605       // set up user data in DW qos
00606       datawriter_qos.user_data.value.length (3);
00607       datawriter_qos.user_data.value[0] = (connectionId >> 0) & 0xFF;
00608       datawriter_qos.user_data.value[1] = (connectionId >> 8) & 0xFF;
00609       datawriter_qos.user_data.value[2] = (connectionId >> 16) & 0xFF;
00610 
00611       const DDS::DataWriter_var dw =
00612         pub->create_datawriter(topic, datawriter_qos, 0, 0);
00613       if (!dw) return INVALID_PARAM;
00614 
00615       Entities::instance()->senders_[connectionId].dw = dw;
00616 
00617     } else { // dir == DESTINATION
00618       DDS::SubscriberQos subscriber_qos;
00619       qos_settings.apply_to(subscriber_qos);
00620 
00621       DDS::Subscriber_var sub;
00622       find_or_create_sub(subscriber_qos, dp, sub);
00623       if (!sub) return INVALID_PARAM;
00624 
00625       if (transport && transport[0]) {
00626         OpenDDS::DCPS::TransportConfig_rch config = TheTransportRegistry->get_config(transport);
00627         if (config.is_nil()) return INVALID_PARAM;
00628         TheTransportRegistry->bind_config(config, sub);
00629       }
00630 
00631       DDS::DataReaderQos datareader_qos;
00632       qos_settings.apply_to(datareader_qos);
00633 
00634       // set up user data in DR qos
00635       datareader_qos.user_data.value.length (3);
00636       datareader_qos.user_data.value[0] = (connectionId >> 0) & 0xFF;
00637       datareader_qos.user_data.value[1] = (connectionId >> 8) & 0xFF;
00638       datareader_qos.user_data.value[2] = (connectionId >> 16) & 0xFF;
00639 
00640       const DDS::DataReader_var dr =
00641         sub->create_datareader(topic, datareader_qos, 0, 0);
00642       if (!dr) return INVALID_PARAM;
00643       Entities::instance()->receivers_[connectionId] = new Entities::FaceReceiver();
00644       Entities::instance()->receivers_[connectionId]->dr = dr;
00645     }
00646 
00647     return RC_NO_ERROR;
00648   }
00649 }
00650 
00651 }}
00652 
00653 namespace OpenDDS {
00654 namespace FaceTSS {
00655 
00656 Entities::Entities() {}
00657 Entities::~Entities() {}
00658 
00659 Entities* Entities::instance()
00660 {
00661   return ACE_Singleton<Entities, ACE_Thread_Mutex>::instance();
00662 }
00663 
00664 FACE::RETURN_CODE_TYPE update_status(FACE::CONNECTION_ID_TYPE connection_id,
00665   DDS::ReturnCode_t retcode)
00666 {
00667   FACE::TRANSPORT_CONNECTION_STATUS_TYPE& status =
00668     Entities::instance()->connections_[connection_id].connection_status;
00669   FACE::RETURN_CODE_TYPE rc = FACE::INVALID_PARAM;
00670 
00671   switch (retcode) {
00672   case DDS::RETCODE_OK:
00673     status.LAST_MSG_VALIDITY = FACE::VALID;
00674     return FACE::RC_NO_ERROR;
00675 
00676   case DDS::RETCODE_ERROR:
00677     rc = FACE::CONNECTION_CLOSED; break;
00678 
00679   case DDS::RETCODE_BAD_PARAMETER:
00680     rc = FACE::INVALID_PARAM; break;
00681 
00682   case DDS::RETCODE_OUT_OF_RESOURCES:
00683     rc = FACE::DATA_BUFFER_TOO_SMALL; break;
00684 
00685   case DDS::RETCODE_PRECONDITION_NOT_MET:
00686   case DDS::RETCODE_NOT_ENABLED:
00687     rc = FACE::INVALID_MODE; break;
00688 
00689   case DDS::RETCODE_IMMUTABLE_POLICY:
00690   case DDS::RETCODE_INCONSISTENT_POLICY:
00691     rc = FACE::INVALID_CONFIG; break;
00692 
00693   case DDS::RETCODE_ALREADY_DELETED:
00694     rc = FACE::CONNECTION_CLOSED; break;
00695 
00696   case DDS::RETCODE_TIMEOUT:
00697     rc = FACE::TIMED_OUT; break;
00698 
00699   case DDS::RETCODE_UNSUPPORTED:
00700   case DDS::RETCODE_NO_DATA:
00701     rc = FACE::NOT_AVAILABLE; break;
00702 
00703   case DDS::RETCODE_ILLEGAL_OPERATION:
00704     rc = FACE::PERMISSION_DENIED; break;
00705   }
00706 
00707   status.LAST_MSG_VALIDITY = FACE::INVALID;
00708   return rc;
00709 }
00710 
00711 enum { NSEC_PER_SEC = 1000000000 };
00712 
00713 DDS::Duration_t convertTimeout(FACE::TIMEOUT_TYPE timeout)
00714 {
00715   if (timeout == FACE::INF_TIME_VALUE) {
00716     static const DDS::Duration_t dds_inf = {DDS::DURATION_INFINITE_SEC,
00717                                             DDS::DURATION_INFINITE_NSEC};
00718     return dds_inf;
00719   }
00720 
00721   DDS::Duration_t dur = {static_cast<int>(timeout / NSEC_PER_SEC),
00722                          static_cast<unsigned int>(timeout % NSEC_PER_SEC)};
00723   return dur;
00724 }
00725 
00726 FACE::SYSTEM_TIME_TYPE convertDuration(const DDS::Duration_t& duration)
00727 {
00728   if (duration.sec == DDS::DURATION_INFINITE_SEC
00729       && duration.nanosec == DDS::DURATION_INFINITE_NSEC) {
00730     return FACE::INF_TIME_VALUE;
00731   }
00732   return duration.nanosec +
00733     duration.sec * static_cast<FACE::SYSTEM_TIME_TYPE>(NSEC_PER_SEC);
00734 }
00735 
00736 FACE::SYSTEM_TIME_TYPE convertTime(const DDS::Time_t& timestamp)
00737 {
00738   return timestamp.nanosec +
00739       timestamp.sec * static_cast<FACE::SYSTEM_TIME_TYPE>(NSEC_PER_SEC);
00740 }
00741 
00742 FACE::MESSAGE_INSTANCE_GUID
00743 create_message_instance_guid(const OpenDDS::DCPS::RepoId& pub, const CORBA::LongLong& orig_seq)
00744 {
00745   OpenDDS::DCPS::GuidConverter writer(pub);
00746 
00747   FACE::MESSAGE_INSTANCE_GUID message_instance_guid;
00748   FACE::LongLong mig_low;
00749   FACE::LongLong masked_seq;
00750 
00751   //Until MESSAGE_INSTANCE_GUID becomes 128 bit GUID, use checksum to represent Prefix
00752   FACE::Long prefix_representation = ACE::crc32(reinterpret_cast<const void*>(&pub), sizeof(pub));
00753   masked_seq = orig_seq >> 32;
00754 
00755   if (masked_seq) {
00756     ACE_DEBUG((LM_WARNING, "(%P|%t) WARNING: create_message_instance_guid - seq does not fit in FACE::Long, truncating high bits to fit\n"));
00757   }
00758   mig_low = orig_seq & 0xFFFFFFFF;
00759   message_instance_guid = (((FACE::LongLong) prefix_representation) << 32 ) | ((FACE::LongLong) mig_low);
00760 /*
00761   //TODO: This is initial work toward defining how a 128 bit guid could be created
00762   //in the future for a Message Instance Guid when supported.
00763   // 13 byte prefix contains identifying pieces of guid
00764   // 3 byte seq - truncated sequence from the sample
00765   typedef CORBA::Octet MsgInstGuidPrefix_t[13];
00766   typedef CORBA::Octet MsgInstGuidSeq_t[3];
00767   MsgInstGuidPrefix_t migPrefix;
00768   MsgInstGuidSeq_t migSeq;
00769   ACE_OS::memcpy(&migPrefix[0], &pub.guidPrefix[2], 10);
00770   ACE_OS::memcpy(&migPrefix[10], &pub.entityId.entityKey[0], 3);
00771   masked_seq = orig_seq >> 24;
00772 
00773   if (masked_seq) {
00774     ACE_DEBUG((LM_WARNING, "(%P|%t) WARNING: create_message_instance_guid - seq does not fit in 3 bytes, truncating high bits to fit\n"));
00775   }
00776   FACE::LongLong masked = orig_seq & 0xFFFFFF;
00777 
00778 #ifdef ACE_BIG_ENDIAN
00779   masked <<= 8 * (sizeof(FACE::LongLong)-3);
00780 #endif
00781   ACE_OS::memcpy(&migSeq[0], &masked, 3);
00782 */
00783   return message_instance_guid;
00784 }
00785 
00786 void populate_header_received(const FACE::CONNECTION_ID_TYPE& connection_id,
00787                               const DDS::DomainParticipant_var part,
00788                               const DDS::SampleInfo& sinfo,
00789                               FACE::RETURN_CODE_TYPE& return_code)
00790 {
00791   Entities::ConnIdToReceiverMap& readers = Entities::instance()->receivers_;
00792   if (!readers.count(connection_id)) {
00793     return_code = FACE::INVALID_PARAM;
00794     return;
00795   }
00796   FACE::TS::MessageHeader& header = readers[connection_id]->last_msg_header;
00797 
00798   header.platform_view_guid = Entities::instance()->connections_[connection_id].platform_view_guid;
00799 
00800   DDS::Subscriber_var temp_sub = readers[connection_id]->dr->get_subscriber();
00801   DDS::DomainParticipant_var temp_dp = temp_sub->get_participant();
00802   const OpenDDS::DCPS::RepoId pub = dynamic_cast<OpenDDS::DCPS::DomainParticipantImpl*>(temp_dp.in())->get_repoid(sinfo.publication_handle);
00803   header.message_instance_guid = create_message_instance_guid(pub, sinfo.opendds_reserved_publication_seq);
00804 
00805   header.message_timestamp = convertTime(sinfo.source_timestamp);
00806   ACE_Time_Value now(ACE_OS::gettimeofday());
00807 
00808   readers[connection_id]->sum_recvd_msgs_latency += (convertTime(OpenDDS::DCPS::time_value_to_time(now)) - header.message_timestamp);
00809   ++readers[connection_id]->total_msgs_recvd;
00810 
00811   if (OpenDDS::DCPS::DCPS_debug_level > 8) {
00812     ACE_DEBUG((LM_DEBUG, "(%P|%t) populate_header_received: Latency is now (tot_latency %d / tot_msgs_recvd %d): %d\n",
00813         readers[connection_id]->sum_recvd_msgs_latency,
00814         readers[connection_id]->total_msgs_recvd,
00815         readers[connection_id]->sum_recvd_msgs_latency/readers[connection_id]->total_msgs_recvd));
00816   }
00817   ::DDS::Subscriber_var bit_subscriber
00818    = part->get_builtin_subscriber () ;
00819 
00820   ::DDS::DataReader_var reader
00821    = bit_subscriber->lookup_datareader (OpenDDS::DCPS::BUILT_IN_PUBLICATION_TOPIC) ;
00822   ::DDS::PublicationBuiltinTopicDataDataReader_var pub_reader
00823    = ::DDS::PublicationBuiltinTopicDataDataReader::_narrow (reader.in ());
00824   if (CORBA::is_nil (pub_reader.in ())) {
00825     ACE_ERROR((LM_ERROR, "(%P|%t) populate_header_received: failed to get BUILT_IN_PUBLICATION_TOPIC datareader.\n"));
00826     return_code = FACE::NOT_AVAILABLE;
00827     return;
00828   }
00829 
00830   ::DDS::ReturnCode_t ret;
00831   ::DDS::SampleInfoSeq pubinfos(1);
00832   ::DDS::PublicationBuiltinTopicDataSeq pubdata(1);
00833   ret = pub_reader->read_instance(pubdata,
00834                                   pubinfos,
00835                                   1,
00836                                   sinfo.publication_handle,
00837                                   ::DDS::ANY_SAMPLE_STATE,
00838                                   ::DDS::ANY_VIEW_STATE,
00839                                   ::DDS::ALIVE_INSTANCE_STATE);
00840 
00841   if (ret != ::DDS::RETCODE_OK && ret != ::DDS::RETCODE_NO_DATA) {
00842     ACE_ERROR((LM_ERROR,
00843         "(%P|%t) populate_header_received:  failed to read BIT publication data.\n"));
00844     return_code = FACE::NOT_AVAILABLE;
00845     return;
00846   }
00847 
00848   const CORBA::ULong i = 0;
00849 
00850   header.message_source_guid =
00851     (pubdata[i].user_data.value[0] << 0) |
00852     (pubdata[i].user_data.value[1] << 8) |
00853     (pubdata[i].user_data.value[2] << 16);
00854 
00855 //  ACE_DEBUG((LM_DEBUG, "(%P|%t) populate_header_received: DW lifespan qos value: sec: %d nanosec: %d\n",
00856 //                         pubdata[i].lifespan.duration.sec, pubdata[i].lifespan.duration.nanosec));
00857 
00858   DDS::Duration_t lifespan = pubdata[i].lifespan.duration;
00859   if (lifespan.sec != DDS::DURATION_INFINITE_SEC &&
00860       lifespan.nanosec != DDS::DURATION_INFINITE_NSEC) {
00861     // Finite lifespan.  Check if data has expired.
00862 
00863     DDS::Time_t const tmp = {
00864       sinfo.source_timestamp.sec + lifespan.sec,
00865       sinfo.source_timestamp.nanosec + lifespan.nanosec
00866     };
00867 
00868     // We assume that the publisher host's clock and subscriber host's
00869     // clock are synchronized (allowed by the spec).
00870     ACE_Time_Value const expiration_time(
00871         OpenDDS::DCPS::time_to_time_value(tmp));
00872 
00873     if (now >= expiration_time) {
00874 //      ACE_DEBUG((LM_DEBUG, "(%P|%t) populate_header_received: Last message expired, setting message_validity to INVALID\n"));
00875       header.message_validity = FACE::INVALID;
00876       return_code = FACE::RC_NO_ERROR;
00877       return;
00878     }
00879   }
00880 //  ACE_DEBUG((LM_DEBUG, "(%P|%t) populate_header_received: Setting message_validity to VALID\n"));
00881   header.message_validity = FACE::VALID;
00882   return_code = FACE::RC_NO_ERROR;
00883 }
00884 }}

Generated on Fri Feb 12 20:05:23 2016 for OpenDDS by  doxygen 1.4.7