FACE::TS Namespace Reference


Functions

void find_or_create_dp (const DDS::DomainId_t &domainId, int participantId, const DDS::DomainParticipantFactory_var &dpf, DDS::DomainParticipant_var &dp)
void find_or_create_pub (const DDS::PublisherQos &qos, const DDS::DomainParticipant_var &dp, DDS::Publisher_var &pub)
void find_or_create_sub (const DDS::SubscriberQos &qos, const DDS::DomainParticipant_var &dp, DDS::Subscriber_var &sub)
bool cleanup_opendds_publisher (const DDS::Publisher_var pub)
bool cleanup_opendds_subscriber (const DDS::Subscriber_var sub)
void cleanup_opendds_participant (const DDS::DomainParticipant_var dp)
RETURN_CODE_TYPE create_opendds_entities (CONNECTION_ID_TYPE connectionId, int participantId, const DDS::DomainId_t domainId, const char *topic, const char *type, CONNECTION_DIRECTION_TYPE dir, QosSettings &qos, const char *transport)
void Initialize (const CONFIGURATION_RESOURCE configuration_file, RETURN_CODE_TYPE &return_code)
void Create_Connection (const CONNECTION_NAME_TYPE connection_name, MESSAGING_PATTERN_TYPE pattern, CONNECTION_ID_TYPE &connection_id, CONNECTION_DIRECTION_TYPE &connection_direction, MESSAGE_SIZE_TYPE &max_message_size, TIMEOUT_TYPE, RETURN_CODE_TYPE &return_code)
void Get_Connection_Parameters (CONNECTION_NAME_TYPE &connection_name, CONNECTION_ID_TYPE &connection_id, TRANSPORT_CONNECTION_STATUS_TYPE &status, RETURN_CODE_TYPE &return_code)
void Unregister_Callback (CONNECTION_ID_TYPE connection_id, RETURN_CODE_TYPE &return_code)
void Destroy_Connection (CONNECTION_ID_TYPE connection_id, RETURN_CODE_TYPE &return_code)
OpenDDS_FACE_Export void receive_header (FACE::CONNECTION_ID_TYPE connection_id, FACE::TIMEOUT_TYPE, FACE::TRANSACTION_ID_TYPE &transaction_id, FACE::TS::MessageHeader &message_header, FACE::MESSAGE_SIZE_TYPE message_size, FACE::RETURN_CODE_TYPE &return_code)
void Receive_Message (CONNECTION_ID_TYPE connection_id, TIMEOUT_TYPE timeout, TRANSACTION_ID_TYPE &transaction_id, MessageHeader &message_header, MESSAGE_SIZE_TYPE message_size, RETURN_CODE_TYPE &return_code)
void find_or_create_dp (const DDS::DomainId_t &domainId, int participantId, const DDS::DomainParticipantFactory_var &dpf, DDS::DomainParticipant_var &dp)
void find_or_create_pub (const DDS::PublisherQos &qos, const DDS::DomainParticipant_var &dp, DDS::Publisher_var &pub)
void find_or_create_sub (const DDS::SubscriberQos &qos, const DDS::DomainParticipant_var &dp, DDS::Subscriber_var &sub)
bool cleanup_opendds_publisher (const DDS::Publisher_var pub)
bool cleanup_opendds_subscriber (const DDS::Subscriber_var sub)
void cleanup_opendds_participant (const DDS::DomainParticipant_var dp)
RETURN_CODE_TYPE create_opendds_entities (CONNECTION_ID_TYPE connectionId, int participantId, const DDS::DomainId_t domainId, const char *topicName, const char *type, CONNECTION_DIRECTION_TYPE dir, QosSettings &qos_settings, const char *transport)

Variables

OpenDDS::FaceTSS::config::Parser parser


Function Documentation

void FACE::TS::@78::find_or_create_dp ( const DDS::DomainId_t domainId,
int  participantId,
const DDS::DomainParticipantFactory_var &  dpf,
DDS::DomainParticipant_var &  dp 
) [static]

Referenced by create_opendds_entities().

void FACE::TS::@78::find_or_create_pub ( const DDS::PublisherQos qos,
const DDS::DomainParticipant_var &  dp,
DDS::Publisher_var &  pub 
) [static]

Referenced by create_opendds_entities().

void FACE::TS::@78::find_or_create_sub ( const DDS::SubscriberQos qos,
const DDS::DomainParticipant_var &  dp,
DDS::Subscriber_var &  sub 
) [static]

Referenced by create_opendds_entities().

bool FACE::TS::@78::cleanup_opendds_publisher ( const DDS::Publisher_var  pub  )  [static]

Referenced by Destroy_Connection().

bool FACE::TS::@78::cleanup_opendds_subscriber ( const DDS::Subscriber_var  sub  )  [static]

Referenced by Destroy_Connection().

void FACE::TS::@78::cleanup_opendds_participant ( const DDS::DomainParticipant_var  dp  )  [static]

Referenced by Destroy_Connection().

RETURN_CODE_TYPE FACE::TS::@78::create_opendds_entities ( CONNECTION_ID_TYPE  connectionId,
int  participantId,
const DDS::DomainId_t  domainId,
const char *  topic,
const char *  type,
CONNECTION_DIRECTION_TYPE  dir,
QosSettings &  qos,
const char *  transport 
) [static]

Referenced by Create_Connection().

void FACE::TS::Initialize ( const CONFIGURATION_RESOURCE  configuration_file,
RETURN_CODE_TYPE &  return_code 
)

Definition at line 69 of file FaceTSS.cpp.

References OpenDDS::FaceTSS::config::Parser::parse(), parser, and TheServiceParticipant.

00071 {
00072   int status = parser.parse(configuration_file);
00073   if (status != 0) {
00074     ACE_ERROR((LM_ERROR,
00075                ACE_TEXT("(%P|%t) ERROR: Initialize() ")
00076                ACE_TEXT("Parser::parse () returned %d\n"),
00077                status));
00078     return_code = INVALID_PARAM;
00079   } else {
00080     return_code = RC_NO_ERROR;
00081 #if defined OPENDDS_SAFETY_PROFILE && defined ACE_HAS_ALLOC_HOOKS
00082     TheServiceParticipant->configure_pool();
00083 #endif
00084   }
00085 }

void FACE::TS::Create_Connection ( const CONNECTION_NAME_TYPE  connection_name,
MESSAGING_PATTERN_TYPE  pattern,
CONNECTION_ID_TYPE &  connection_id,
CONNECTION_DIRECTION_TYPE &  connection_direction,
MESSAGE_SIZE_TYPE &  max_message_size,
TIMEOUT_TYPE  ,
RETURN_CODE_TYPE &  return_code 
)

Definition at line 87 of file FaceTSS.cpp.

References OpenDDS::FaceTSS::config::ConnectionSettings::config_name_, OpenDDS::FaceTSS::config::ConnectionSettings::connection_id_, OpenDDS::FaceTSS::convertDuration(), create_opendds_entities(), OpenDDS::FaceTSS::config::QosSettings::datawriter_qos(), OpenDDS::FaceTSS::config::ConnectionSettings::direction_, OpenDDS::FaceTSS::config::ConnectionSettings::domain_id_, OpenDDS::FaceTSS::config::Parser::find_connection(), OpenDDS::FaceTSS::config::Parser::find_qos(), OpenDDS::FaceTSS::config::Parser::find_topic(), DDS::DataWriterQos::lifespan, OpenDDS::FaceTSS::config::TopicSettings::max_message_size_, parser, OpenDDS::FaceTSS::config::ConnectionSettings::participant_id_, OpenDDS::FaceTSS::config::TopicSettings::platform_view_guid_, OpenDDS::FaceTSS::config::ConnectionSettings::topic_name_, and OpenDDS::FaceTSS::config::TopicSettings::type_name_.

00094 {
00095   return_code = RC_NO_ERROR;
00096 
00097   if (pattern != PUB_SUB) {
00098     return_code = INVALID_CONFIG;
00099     return;
00100   }
00101 
00102   ConnectionSettings connection;
00103   TopicSettings topic;
00104   QosSettings qos;
00105 
00106   // Find connection
00107   if (!parser.find_connection(connection_name, connection)) {
00108     // Find topic
00109     if (!parser.find_topic(connection.topic_name_, topic)) {
00110       // Find Qos by specified name(s)
00111       if (parser.find_qos(connection, qos)) {
00112         return_code = INVALID_CONFIG;
00113       }
00114     } else {
00115       return_code = INVALID_CONFIG;
00116     }
00117   } else {
00118     return_code = INVALID_CONFIG;
00119   }
00120 
00121   if (return_code != RC_NO_ERROR) {
00122     return;
00123   }
00124 
00125   connection_id = connection.connection_id_;
00126   connection_direction = connection.direction_;
00127   max_message_size = topic.max_message_size_;
00128 
00129   return_code = create_opendds_entities(connection_id,
00130                                         connection.participant_id_,
00131                                         connection.domain_id_,
00132                                         connection.topic_name_,
00133                                         topic.type_name_,
00134                                         connection_direction,
00135                                         qos,
00136                                         connection.config_name_);
00137   if (return_code != RC_NO_ERROR) {
00138     return;
00139   }
00140 
00141   const SYSTEM_TIME_TYPE refresh_period =
00142     (connection_direction == SOURCE) ?
00143     OpenDDS::FaceTSS::convertDuration(qos.datawriter_qos().lifespan.duration) : 0;
00144 
00145   const TRANSPORT_CONNECTION_STATUS_TYPE status = {
00146     0, // MESSAGE currently set to 0 due to type mismatch in MESSAGE_RANGE_TYPE and MESSAGE_TYPE_GUID
00147     max_message_size, // MAX_MESSAGE with no transformations, set to config value
00148     max_message_size,
00149     connection_direction,
00150     0, // WAITING_PROCESSES_OR_MESSAGES
00151     refresh_period,
00152     INVALID,
00153   };
00154   Entities::ConnectionInfo& conn_info = Entities::instance()->connections_[connection_id];
00155   conn_info.connection_name = connection_name;
00156   conn_info.connection_status = status;
00157   conn_info.platform_view_guid = topic.platform_view_guid_;
00158 }

void FACE::TS::Get_Connection_Parameters ( CONNECTION_NAME_TYPE &  connection_name,
CONNECTION_ID_TYPE &  connection_id,
TRANSPORT_CONNECTION_STATUS_TYPE &  status,
RETURN_CODE_TYPE &  return_code 
)

Definition at line 160 of file FaceTSS.cpp.

References OpenDDS::FaceTSS::config::ConnectionSettings::connection_id_, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::FaceTSS::config::Parser::find_connection(), OPENDDS_STRING, parser, OpenDDS::FaceTSS::Entities::receivers_, and OpenDDS::FaceTSS::Entities::senders_.

00164 {
00165   // connection_name is optional, if absent populate from connection_id lookup
00166   // connection_id is also optional, if absent populate from connection_name lookup
00167   // if both provided, validate
00168   // if neither present, return error
00169   Entities& entities = *Entities::instance();
00170 
00171   if (connection_id != 0 && entities.connections_.count(connection_id)) {
00172     // connection_id was provided
00173     // if validated or populated, set return_code so status will be populated
00174     if (connection_name[0]) {
00175       // Validate provided connection_name
00176       OPENDDS_STRING conn_name = entities.connections_[connection_id].connection_name;
00177       if (std::strcmp(connection_name, conn_name.c_str()) == 0) {
00178         return_code = RC_NO_ERROR;
00179       } else {
00180         return_code = INVALID_PARAM;
00181       }
00182     } else {
00183       // connection_name not provided
00184       // so populate from connection_id lookup
00185       // and set return code so status will be populated
00186       entities.connections_[connection_id].connection_name.copy(connection_name,
00187                                                       sizeof(CONNECTION_NAME_TYPE));
00188       connection_name[sizeof(CONNECTION_NAME_TYPE) - 1] = 0;
00189       return_code = RC_NO_ERROR;
00190     }
00191 
00192   } else if (connection_name[0] && connection_id == 0) {
00193     // connection_id was not specified, but name was provided.
00194     // lookup connection_id and if found set return code to populate status
00195     ConnectionSettings settings;
00196     if (0 == parser.find_connection(connection_name, settings)) {
00197       connection_id = settings.connection_id_;
00198       return_code = RC_NO_ERROR;
00199     } else {
00200       // could not find connection for connection_name
00201       return_code = INVALID_PARAM;
00202     }
00203   } else {
00204     //Neither connection_id or connection_name provided
00205     // a valid connection
00206     return_code = INVALID_PARAM;
00207   }
00208   if (return_code == RC_NO_ERROR) {
00209     TRANSPORT_CONNECTION_STATUS_TYPE& cur_status = entities.connections_[connection_id].connection_status;
00210     if (cur_status.CONNECTION_DIRECTION == FACE::DESTINATION) {
00211       Entities::FaceReceiver& receiver = *entities.receivers_[connection_id];
00212       if (receiver.status_valid != FACE::VALID) {
00213         if (OpenDDS::DCPS::DCPS_debug_level > 3) {
00214           ACE_DEBUG((LM_DEBUG, "Get_Connection_Parameters: returning NOT_AVAILABLE due to receiver's status not valid\n"));
00215         }
00216         return_code = NOT_AVAILABLE;
00217         return;
00218       }
00219       if (receiver.total_msgs_recvd != 0) {
00220         cur_status.REFRESH_PERIOD = receiver.sum_recvd_msgs_latency/receiver.total_msgs_recvd;
00221         cur_status.LAST_MSG_VALIDITY = receiver.last_msg_header.message_validity;
00222       } else {
00223         cur_status.REFRESH_PERIOD = 0;
00224       }
00225       WAITING_RANGE_TYPE num_waiting;
00226       if (receiver.messages_waiting(num_waiting) == FACE::RC_NO_ERROR) {
00227           cur_status.WAITING_PROCESSES_OR_MESSAGES = num_waiting;
00228       } else {
00229         if (OpenDDS::DCPS::DCPS_debug_level > 3) {
00230           ACE_DEBUG((LM_DEBUG, "Get_Connection_Parameters: returning NOT_AVAILABLE due to messages_waiting\n"));
00231         }
00232         return_code = NOT_AVAILABLE;
00233         return;
00234       }
00235     } else {
00236       //DDS Only supports Destination/Source therefore
00237       // CONNECTION_DIRECTION == FACE::SOURCE
00238       Entities::FaceSender& sender = entities.senders_[connection_id];
00239       if (sender.status_valid != FACE::VALID) {
00240         return_code = NOT_AVAILABLE;
00241         return;
00242       }
00243       cur_status.REFRESH_PERIOD = 0;
00244       cur_status.WAITING_PROCESSES_OR_MESSAGES = 0;
00245     }
00246     status = cur_status;
00247   }
00248 }

void FACE::TS::Unregister_Callback ( CONNECTION_ID_TYPE  connection_id,
RETURN_CODE_TYPE &  return_code 
)

Definition at line 250 of file FaceTSS.cpp.

References OpenDDS::FaceTSS::Entities::receivers_.

00252 {
00253   Entities& entities = *Entities::instance();
00254   Entities::ConnIdToReceiverMap& readers = entities.receivers_;
00255   if (readers.count(connection_id)) {
00256     readers[connection_id]->dr->set_listener(NULL, 0);
00257     return_code = RC_NO_ERROR;
00258     return;
00259   }
00260   return_code = INVALID_PARAM;
00261 }

void FACE::TS::Destroy_Connection ( CONNECTION_ID_TYPE  connection_id,
RETURN_CODE_TYPE &  return_code 
)

Definition at line 263 of file FaceTSS.cpp.

References cleanup_opendds_participant(), cleanup_opendds_publisher(), cleanup_opendds_subscriber(), OpenDDS::FaceTSS::Entities::receivers_, and OpenDDS::FaceTSS::Entities::senders_.

00265 {
00266   Entities& entities = *Entities::instance();
00267   Entities::ConnIdToSenderMap& writers = entities.senders_;
00268   Entities::ConnIdToReceiverMap& readers = entities.receivers_;
00269 
00270   DDS::DomainParticipant_var dp;
00271   bool try_cleanup_participant = false;
00272   if (writers.count(connection_id)) {
00273     const DDS::DataWriter_var datawriter = writers[connection_id].dw;
00274     const DDS::Publisher_var pub = datawriter->get_publisher();
00275     writers.erase(connection_id);
00276     pub->delete_datawriter(datawriter);
00277     dp = pub->get_participant();
00278     try_cleanup_participant = cleanup_opendds_publisher(pub);
00279 
00280   } else if (readers.count(connection_id)) {
00281     const DDS::DataReader_var datareader = readers[connection_id]->dr;
00282     const DDS::Subscriber_var sub = datareader->get_subscriber();
00283     delete readers[connection_id];
00284     readers.erase(connection_id);
00285     sub->delete_datareader(datareader);
00286     dp = sub->get_participant();
00287     try_cleanup_participant = cleanup_opendds_subscriber(sub);
00288   }
00289 
00290   if (!dp) {
00291     return_code = INVALID_PARAM;
00292     return;
00293   }
00294 
00295   if (try_cleanup_participant) {
00296     cleanup_opendds_participant(dp);
00297   }
00298 
00299   entities.connections_.erase(connection_id);
00300   return_code = RC_NO_ERROR;
00301 }

OpenDDS_FACE_Export void FACE::TS::receive_header ( FACE::CONNECTION_ID_TYPE  connection_id,
FACE::TIMEOUT_TYPE  ,
FACE::TRANSACTION_ID_TYPE &  transaction_id,
FACE::TS::MessageHeader &  message_header,
FACE::MESSAGE_SIZE_TYPE  message_size,
FACE::RETURN_CODE_TYPE &  return_code 
)

Definition at line 303 of file FaceTSS.cpp.

References OpenDDS::DCPS::DCPS_debug_level, DDS::RETCODE_BAD_PARAMETER, DDS::RETCODE_OK, and OpenDDS::FaceTSS::update_status().

Referenced by Receive_Message().

00309 {
00310   Entities::ConnIdToReceiverMap& readers =
00311     Entities::instance()->receivers_;
00312   // transaction_id cannot be 0 due to initialization
00313   // of last_msg_tid to 0 before a msg has been received so
00314   // only valid transaction_ids are > 0.
00315   if (!readers.count(connection_id) || transaction_id == 0) {
00316     if (OpenDDS::DCPS::DCPS_debug_level > 3) {
00317       ACE_DEBUG((LM_DEBUG, "(%P|%t) receive_header - INVALID_PARAM - "
00318           "could not find reader for connection_id: %d OR transaction id[%d] == 0 \n",
00319           connection_id,
00320           transaction_id));
00321     }
00322     return_code = FACE::INVALID_PARAM;
00323     return;
00324   }
00325 
00326   if (message_size < 0 || (unsigned)message_size < sizeof(FACE::TS::MessageHeader)) {
00327     if (OpenDDS::DCPS::DCPS_debug_level) {
00328       ACE_DEBUG((LM_DEBUG, "(%P|%t) receive_header - INVALID_PARAM - message_size: %d is < %d \n",
00329           message_size,
00330           sizeof(FACE::TS::MessageHeader)));
00331     }
00332     return_code = FACE::INVALID_PARAM;
00333     return;
00334   }
00335   if (transaction_id == readers[connection_id]->last_msg_tid) {
00336     message_header = readers[connection_id]->last_msg_header;
00337     return_code = OpenDDS::FaceTSS::update_status(connection_id, DDS::RETCODE_OK);
00338   } else {
00339     return_code = OpenDDS::FaceTSS::update_status(connection_id, DDS::RETCODE_BAD_PARAMETER);
00340   }
00341 }

void FACE::TS::Receive_Message ( CONNECTION_ID_TYPE  connection_id,
TIMEOUT_TYPE  timeout,
TRANSACTION_ID_TYPE &  transaction_id,
MessageHeader &  message_header,
MESSAGE_SIZE_TYPE  message_size,
RETURN_CODE_TYPE &  return_code 
)

Definition at line 343 of file FaceTSS.cpp.

References receive_header().

00350 {
00351   receive_header(connection_id, timeout,
00352                  transaction_id, message_header,
00353                  message_size, return_code);
00354 }

void FACE::TS::@79::find_or_create_dp ( const DDS::DomainId_t domainId,
int  participantId,
const DDS::DomainParticipantFactory_var &  dpf,
DDS::DomainParticipant_var &  dp 
) [static]

Definition at line 357 of file FaceTSS.cpp.

References OpenDDS::DCPS::DCPS_debug_level, PARTICIPANT_QOS_DEFAULT, and DDS::DomainParticipantQos::user_data.

00360                                                        {
00361     DDS::DomainParticipant_var temp_dp;
00362 
00363     temp_dp = dpf->lookup_participant(domainId);
00364     if (!temp_dp) {
00365       if (OpenDDS::DCPS::DCPS_debug_level > 3) {
00366         ACE_DEBUG((LM_DEBUG, "(%P|%t) find_or_create_dp - created new participant for domainId: %d\n", domainId));
00367       }
00368       DDS::DomainParticipantQos qos(PARTICIPANT_QOS_DEFAULT);
00369       qos.user_data.value.length(6);
00370       qos.user_data.value[0] = (participantId >> 0) & 0xFF;
00371       qos.user_data.value[1] = (participantId >> 8) & 0xFF;
00372       qos.user_data.value[2] = (participantId >> 16) & 0xFF;
00373       qos.user_data.value[3] = (participantId >> 24) & 0xFF;
00374       qos.user_data.value[4] = 0; // (participantId >> 32) & 0xFF;
00375       qos.user_data.value[5] = 0; // (participantId >> 40) & 0xFF;
00376       dp = dpf->create_participant(domainId, qos, 0, 0);
00377       return;
00378     }
00379     if (OpenDDS::DCPS::DCPS_debug_level > 3) {
00380       ACE_DEBUG((LM_DEBUG, "(%P|%t) find_or_create_dp - found exiting participant for domainId: %d \n", domainId));
00381     }
00382     dp = temp_dp;
00383   }

void FACE::TS::@79::find_or_create_pub ( const DDS::PublisherQos qos,
const DDS::DomainParticipant_var &  dp,
DDS::Publisher_var &  pub 
) [static]

Definition at line 385 of file FaceTSS.cpp.

References OpenDDS::DCPS::DCPS_debug_level.

00387                                                  {
00388     DDS::DomainParticipant_var temp_dp;
00389     DDS::Publisher_var temp_pub;
00390 
00391     Entities::ConnIdToSenderMap& writers = Entities::instance()->senders_;
00392     Entities::ConnIdToSenderMap::iterator wtrIter = writers.begin();
00393 
00394     while (wtrIter != writers.end()) {
00395       temp_pub = wtrIter->second.dw->get_publisher();
00396       temp_dp = temp_pub->get_participant();
00397       DDS::PublisherQos temp_qos;
00398       temp_pub->get_qos(temp_qos);
00399       if (dp->get_domain_id() == temp_dp->get_domain_id() &&
00400           temp_qos == qos) {
00401         if (OpenDDS::DCPS::DCPS_debug_level > 3) {
00402           ACE_DEBUG((LM_DEBUG, "(%P|%t) find_or_create_pub - found exiting publisher in senders\n"));
00403         }
00404         pub = temp_pub;
00405         return;
00406       } else {
00407         ++wtrIter;
00408       }
00409     }
00410     if (OpenDDS::DCPS::DCPS_debug_level > 3) {
00411       ACE_DEBUG((LM_DEBUG, "(%P|%t) find_or_create_pub - created new publisher\n"));
00412     }
00413     pub = dp->create_publisher(qos, 0, 0);
00414   }

void FACE::TS::@79::find_or_create_sub ( const DDS::SubscriberQos qos,
const DDS::DomainParticipant_var &  dp,
DDS::Subscriber_var &  sub 
) [static]

Definition at line 416 of file FaceTSS.cpp.

References OpenDDS::DCPS::DCPS_debug_level.

00419   {
00420     DDS::DomainParticipant_var temp_dp;
00421     DDS::Subscriber_var temp_sub;
00422 
00423     Entities::ConnIdToReceiverMap& readers = Entities::instance()->receivers_;
00424     Entities::ConnIdToReceiverMap::iterator rdrIter = readers.begin();
00425 
00426     while (rdrIter != readers.end()) {
00427       temp_sub = rdrIter->second->dr->get_subscriber();
00428       temp_dp = temp_sub->get_participant();
00429       DDS::SubscriberQos temp_qos;
00430       temp_sub->get_qos(temp_qos);
00431       if (dp->get_domain_id() == temp_dp->get_domain_id() &&
00432           temp_qos == qos) {
00433         if (OpenDDS::DCPS::DCPS_debug_level > 3) {
00434           ACE_DEBUG((LM_DEBUG, "(%P|%t) find_or_create_sub - found exiting subscriber in receivers\n"));
00435         }
00436         sub = temp_sub;
00437         return;
00438       } else {
00439         ++rdrIter;
00440       }
00441     }
00442     if (OpenDDS::DCPS::DCPS_debug_level > 3) {
00443       ACE_DEBUG((LM_DEBUG, "(%P|%t) find_or_create_sub - created new subscriber\n"));
00444     }
00445     sub = dp->create_subscriber(qos, 0, 0);
00446   }

bool FACE::TS::@79::cleanup_opendds_publisher ( const DDS::Publisher_var  pub  )  [static]

Definition at line 448 of file FaceTSS.cpp.

References OpenDDS::DCPS::DCPS_debug_level.

00449   {
00450     DDS::Publisher_var temp_pub;
00451     DDS::PublisherQos pub_qos;
00452     pub->get_qos(pub_qos);
00453     DDS::DomainParticipant_var dp = pub->get_participant();
00454     DDS::DomainParticipant_var temp_dp;
00455     DDS::PublisherQos temp_qos;
00456 
00457     Entities::ConnIdToSenderMap& writers = Entities::instance()->senders_;
00458     Entities::ConnIdToSenderMap::iterator wtrIter = writers.begin();
00459     while (wtrIter != writers.end()) {
00460       temp_pub = wtrIter->second.dw->get_publisher();
00461       temp_dp = temp_pub->get_participant();
00462       if (dp->get_domain_id() == temp_dp->get_domain_id()) {
00463         temp_pub->get_qos(temp_qos);
00464         if (pub_qos == temp_qos) {
00465           if (OpenDDS::DCPS::DCPS_debug_level > 3) {
00466             ACE_DEBUG((LM_DEBUG, "(%P|%t) cleanup_opendds_publisher - publisher still in use by other writer\n"));
00467           }
00468           return false;
00469         }
00470       }
00471       ++wtrIter;
00472     }
00473     if (OpenDDS::DCPS::DCPS_debug_level > 3) {
00474       ACE_DEBUG((LM_DEBUG, "(%P|%t) cleanup_opendds_publisher - publisher no longer in use, delete pub\n"));
00475     }
00476     dp->delete_publisher(pub);
00477     return true;
00478   }

bool FACE::TS::@79::cleanup_opendds_subscriber ( const DDS::Subscriber_var  sub  )  [static]

Definition at line 480 of file FaceTSS.cpp.

References OpenDDS::DCPS::DCPS_debug_level.

00481   {
00482     DDS::Subscriber_var temp_sub;
00483     DDS::SubscriberQos sub_qos;
00484     sub->get_qos(sub_qos);
00485     DDS::DomainParticipant_var dp = sub->get_participant();
00486     DDS::DomainParticipant_var temp_dp;
00487     DDS::SubscriberQos temp_qos;
00488 
00489 
00490     Entities::ConnIdToReceiverMap& readers = Entities::instance()->receivers_;
00491     Entities::ConnIdToReceiverMap::iterator rdrIter = readers.begin();
00492     while (rdrIter != readers.end()) {
00493       temp_sub = rdrIter->second->dr->get_subscriber();
00494       temp_dp = temp_sub->get_participant();
00495 
00496       if (dp->get_domain_id() == temp_dp->get_domain_id()) {
00497         temp_sub->get_qos(temp_qos);
00498         if (sub_qos == temp_qos) {
00499           if (OpenDDS::DCPS::DCPS_debug_level > 3) {
00500             ACE_DEBUG((LM_DEBUG, "(%P|%t) cleanup_opendds_subscriber - subscriber still in use by other reader\n"));
00501           }
00502           return false;
00503         }
00504       }
00505       ++rdrIter;
00506     }
00507     if (OpenDDS::DCPS::DCPS_debug_level > 3) {
00508       ACE_DEBUG((LM_DEBUG, "(%P|%t) cleanup_opendds_subscriber - subscriber no longer in use, delete sub\n"));
00509     }
00510     dp->delete_subscriber(sub);
00511     return true;
00512   }

void FACE::TS::@79::cleanup_opendds_participant ( const DDS::DomainParticipant_var  dp  )  [static]

Definition at line 514 of file FaceTSS.cpp.

References OpenDDS::DCPS::DCPS_debug_level, and TheParticipantFactory.

00515   {
00516     DDS::DomainParticipant_var temp_dp;
00517     Entities::ConnIdToReceiverMap& readers = Entities::instance()->receivers_;
00518     Entities::ConnIdToReceiverMap::iterator rdrIter = readers.begin();
00519     while (rdrIter != readers.end()) {
00520       temp_dp = rdrIter->second->dr->get_subscriber()->get_participant();
00521       if (dp->get_domain_id() == temp_dp->get_domain_id()) {
00522         if (OpenDDS::DCPS::DCPS_debug_level > 3) {
00523           ACE_DEBUG((LM_DEBUG, "(%P|%t) cleanup_opendds_participant - participant still in use by reader\n"));
00524         }
00525         return;
00526       } else {
00527         ++rdrIter;
00528       }
00529     }
00530 
00531     Entities::ConnIdToSenderMap& writers = Entities::instance()->senders_;
00532     Entities::ConnIdToSenderMap::iterator wtrIter = writers.begin();
00533 
00534     while (wtrIter != writers.end()) {
00535       temp_dp = wtrIter->second.dw->get_publisher()->get_participant();
00536       if (dp->get_domain_id() == temp_dp->get_domain_id()) {
00537         if (OpenDDS::DCPS::DCPS_debug_level > 3) {
00538           ACE_DEBUG((LM_DEBUG, "(%P|%t) cleanup_opendds_participant - participant still in use by writer\n"));
00539         }
00540         return;
00541       } else {
00542         ++wtrIter;
00543       }
00544     }
00545     if (OpenDDS::DCPS::DCPS_debug_level > 3) {
00546       ACE_DEBUG((LM_DEBUG, "(%P|%t) cleanup_opendds_participant - participant for domain: %d no longer in use, delete entities and participant\n", dp->get_domain_id()));
00547     }
00548     dp->delete_contained_entities();
00549     const DDS::DomainParticipantFactory_var dpf = TheParticipantFactory;
00550     dpf->delete_participant(dp);
00551   }

RETURN_CODE_TYPE FACE::TS::@79::create_opendds_entities ( CONNECTION_ID_TYPE  connectionId,
int  participantId,
const DDS::DomainId_t  domainId,
const char *  topicName,
const char *  type,
CONNECTION_DIRECTION_TYPE  dir,
QosSettings &  qos_settings,
const char *  transport 
) [static]

Definition at line 553 of file FaceTSS.cpp.

References OpenDDS::FaceTSS::config::QosSettings::apply_to(), find_or_create_dp(), find_or_create_pub(), find_or_create_sub(), OpenDDS::DCPS::RcHandle< T >::is_nil(), Registered_Data_Types, OpenDDS::DCPS::set_DCPS_debug_level(), TheParticipantFactory, TheServiceParticipant, TheTransportRegistry, TOPIC_QOS_DEFAULT, OpenDDS::DCPS::Transport_debug_level, DDS::DataReaderQos::user_data, and DDS::DataWriterQos::user_data.

00561   {
00562 #ifdef DEBUG_OPENDDS_FACETSS
00563     OpenDDS::DCPS::set_DCPS_debug_level(8);
00564     OpenDDS::DCPS::Transport_debug_level = 5;
00565     TheServiceParticipant->set_BIT(false);
00566 #endif
00567 
00568     const DDS::DomainParticipantFactory_var dpf = TheParticipantFactory;
00569     if (!dpf) return INVALID_PARAM;
00570 
00571     DDS::DomainParticipant_var dp;
00572     find_or_create_dp(domainId, participantId, dpf, dp);
00573     if (!dp) return INVALID_PARAM;
00574 
00575     using OpenDDS::DCPS::Data_Types_Register;
00576     OpenDDS::DCPS::TypeSupport* ts =
00577       Registered_Data_Types->lookup(dp, type);
00578     if (!ts) {
00579       ts = Registered_Data_Types->lookup(0, type);
00580       if (!ts) return INVALID_PARAM;
00581       Registered_Data_Types->register_type(dp, type, ts);
00582     }
00583 
00584     const DDS::Topic_var topic =
00585       dp->create_topic(topicName, type, TOPIC_QOS_DEFAULT, 0, 0);
00586     if (!topic) return INVALID_PARAM;
00587 
00588     if (dir == SOURCE) {
00589       DDS::PublisherQos publisher_qos;
00590       qos_settings.apply_to(publisher_qos);
00591 
00592       DDS::Publisher_var pub;
00593       find_or_create_pub(publisher_qos, dp, pub);
00594       if (!pub) return INVALID_PARAM;
00595 
00596       if (transport && transport[0]) {
00597         OpenDDS::DCPS::TransportConfig_rch config = TheTransportRegistry->get_config(transport);
00598         if (config.is_nil()) return INVALID_PARAM;
00599         TheTransportRegistry->bind_config(config, pub);
00600       }
00601 
00602       DDS::DataWriterQos datawriter_qos;
00603       qos_settings.apply_to(datawriter_qos);
00604 
00605       // set up user data in DW qos
00606       datawriter_qos.user_data.value.length (3);
00607       datawriter_qos.user_data.value[0] = (connectionId >> 0) & 0xFF;
00608       datawriter_qos.user_data.value[1] = (connectionId >> 8) & 0xFF;
00609       datawriter_qos.user_data.value[2] = (connectionId >> 16) & 0xFF;
00610 
00611       const DDS::DataWriter_var dw =
00612         pub->create_datawriter(topic, datawriter_qos, 0, 0);
00613       if (!dw) return INVALID_PARAM;
00614 
00615       Entities::instance()->senders_[connectionId].dw = dw;
00616 
00617     } else { // dir == DESTINATION
00618       DDS::SubscriberQos subscriber_qos;
00619       qos_settings.apply_to(subscriber_qos);
00620 
00621       DDS::Subscriber_var sub;
00622       find_or_create_sub(subscriber_qos, dp, sub);
00623       if (!sub) return INVALID_PARAM;
00624 
00625       if (transport && transport[0]) {
00626         OpenDDS::DCPS::TransportConfig_rch config = TheTransportRegistry->get_config(transport);
00627         if (config.is_nil()) return INVALID_PARAM;
00628         TheTransportRegistry->bind_config(config, sub);
00629       }
00630 
00631       DDS::DataReaderQos datareader_qos;
00632       qos_settings.apply_to(datareader_qos);
00633 
00634       // set up user data in DR qos
00635       datareader_qos.user_data.value.length (3);
00636       datareader_qos.user_data.value[0] = (connectionId >> 0) & 0xFF;
00637       datareader_qos.user_data.value[1] = (connectionId >> 8) & 0xFF;
00638       datareader_qos.user_data.value[2] = (connectionId >> 16) & 0xFF;
00639 
00640       const DDS::DataReader_var dr =
00641         sub->create_datareader(topic, datareader_qos, 0, 0);
00642       if (!dr) return INVALID_PARAM;
00643       Entities::instance()->receivers_[connectionId] = new Entities::FaceReceiver();
00644       Entities::instance()->receivers_[connectionId]->dr = dr;
00645     }
00646 
00647     return RC_NO_ERROR;
00648   }


Variable Documentation

OpenDDS::FaceTSS::config::Parser FACE::TS::parser [static]

Definition at line 40 of file FaceTSS.cpp.

Referenced by Create_Connection(), OpenDDS::DCPS::FilterEvaluator::FilterEvaluator(), Get_Connection_Parameters(), Initialize(), and OpenDDS::DCPS::MultiTopicImpl::MultiTopicImpl().


Generated on Fri Feb 12 20:06:09 2016 for OpenDDS by  doxygen 1.4.7