Functions | |
void | find_or_create_dp (const DDS::DomainId_t &domainId, int participantId, const DDS::DomainParticipantFactory_var &dpf, DDS::DomainParticipant_var &dp) |
void | find_or_create_pub (const DDS::PublisherQos &qos, const DDS::DomainParticipant_var &dp, DDS::Publisher_var &pub) |
void | find_or_create_sub (const DDS::SubscriberQos &qos, const DDS::DomainParticipant_var &dp, DDS::Subscriber_var &sub) |
bool | cleanup_opendds_publisher (const DDS::Publisher_var pub) |
bool | cleanup_opendds_subscriber (const DDS::Subscriber_var sub) |
void | cleanup_opendds_participant (const DDS::DomainParticipant_var dp) |
RETURN_CODE_TYPE | create_opendds_entities (CONNECTION_ID_TYPE connectionId, int participantId, const DDS::DomainId_t domainId, const char *topic, const char *type, CONNECTION_DIRECTION_TYPE dir, QosSettings &qos, const char *transport) |
void | Initialize (const CONFIGURATION_RESOURCE configuration_file, RETURN_CODE_TYPE &return_code) |
void | Create_Connection (const CONNECTION_NAME_TYPE connection_name, MESSAGING_PATTERN_TYPE pattern, CONNECTION_ID_TYPE &connection_id, CONNECTION_DIRECTION_TYPE &connection_direction, MESSAGE_SIZE_TYPE &max_message_size, TIMEOUT_TYPE, RETURN_CODE_TYPE &return_code) |
void | Get_Connection_Parameters (CONNECTION_NAME_TYPE &connection_name, CONNECTION_ID_TYPE &connection_id, TRANSPORT_CONNECTION_STATUS_TYPE &status, RETURN_CODE_TYPE &return_code) |
void | Unregister_Callback (CONNECTION_ID_TYPE connection_id, RETURN_CODE_TYPE &return_code) |
void | Destroy_Connection (CONNECTION_ID_TYPE connection_id, RETURN_CODE_TYPE &return_code) |
OpenDDS_FACE_Export void | receive_header (FACE::CONNECTION_ID_TYPE connection_id, FACE::TIMEOUT_TYPE, FACE::TRANSACTION_ID_TYPE &transaction_id, FACE::TS::MessageHeader &message_header, FACE::MESSAGE_SIZE_TYPE message_size, FACE::RETURN_CODE_TYPE &return_code) |
void | Receive_Message (CONNECTION_ID_TYPE connection_id, TIMEOUT_TYPE timeout, TRANSACTION_ID_TYPE &transaction_id, MessageHeader &message_header, MESSAGE_SIZE_TYPE message_size, RETURN_CODE_TYPE &return_code) |
void | find_or_create_dp (const DDS::DomainId_t &domainId, int participantId, const DDS::DomainParticipantFactory_var &dpf, DDS::DomainParticipant_var &dp) |
void | find_or_create_pub (const DDS::PublisherQos &qos, const DDS::DomainParticipant_var &dp, DDS::Publisher_var &pub) |
void | find_or_create_sub (const DDS::SubscriberQos &qos, const DDS::DomainParticipant_var &dp, DDS::Subscriber_var &sub) |
bool | cleanup_opendds_publisher (const DDS::Publisher_var pub) |
bool | cleanup_opendds_subscriber (const DDS::Subscriber_var sub) |
void | cleanup_opendds_participant (const DDS::DomainParticipant_var dp) |
RETURN_CODE_TYPE | create_opendds_entities (CONNECTION_ID_TYPE connectionId, int participantId, const DDS::DomainId_t domainId, const char *topicName, const char *type, CONNECTION_DIRECTION_TYPE dir, QosSettings &qos_settings, const char *transport) |
Variables | |
OpenDDS::FaceTSS::config::Parser | parser |
void FACE::TS::@78::find_or_create_dp | ( | const DDS::DomainId_t & | domainId, | |
int | participantId, | |||
const DDS::DomainParticipantFactory_var & | dpf, | |||
DDS::DomainParticipant_var & | dp | |||
) | [static] |
Referenced by create_opendds_entities().
void FACE::TS::@78::find_or_create_pub | ( | const DDS::PublisherQos & | qos, | |
const DDS::DomainParticipant_var & | dp, | |||
DDS::Publisher_var & | pub | |||
) | [static] |
Referenced by create_opendds_entities().
void FACE::TS::@78::find_or_create_sub | ( | const DDS::SubscriberQos & | qos, | |
const DDS::DomainParticipant_var & | dp, | |||
DDS::Subscriber_var & | sub | |||
) | [static] |
Referenced by create_opendds_entities().
bool FACE::TS::@78::cleanup_opendds_publisher | ( | const DDS::Publisher_var | pub | ) | [static] |
Referenced by Destroy_Connection().
bool FACE::TS::@78::cleanup_opendds_subscriber | ( | const DDS::Subscriber_var | sub | ) | [static] |
Referenced by Destroy_Connection().
void FACE::TS::@78::cleanup_opendds_participant | ( | const DDS::DomainParticipant_var | dp | ) | [static] |
Referenced by Destroy_Connection().
RETURN_CODE_TYPE FACE::TS::@78::create_opendds_entities | ( | CONNECTION_ID_TYPE | connectionId, | |
int | participantId, | |||
const DDS::DomainId_t | domainId, | |||
const char * | topic, | |||
const char * | type, | |||
CONNECTION_DIRECTION_TYPE | dir, | |||
QosSettings & | qos, | |||
const char * | transport | |||
) | [static] |
Referenced by Create_Connection().
void FACE::TS::Initialize | ( | const CONFIGURATION_RESOURCE | configuration_file, | |
RETURN_CODE_TYPE & | return_code | |||
) |
Definition at line 69 of file FaceTSS.cpp.
References OpenDDS::FaceTSS::config::Parser::parse(), parser, and TheServiceParticipant.
00071 { 00072 int status = parser.parse(configuration_file); 00073 if (status != 0) { 00074 ACE_ERROR((LM_ERROR, 00075 ACE_TEXT("(%P|%t) ERROR: Initialize() ") 00076 ACE_TEXT("Parser::parse () returned %d\n"), 00077 status)); 00078 return_code = INVALID_PARAM; 00079 } else { 00080 return_code = RC_NO_ERROR; 00081 #if defined OPENDDS_SAFETY_PROFILE && defined ACE_HAS_ALLOC_HOOKS 00082 TheServiceParticipant->configure_pool(); 00083 #endif 00084 } 00085 }
void FACE::TS::Create_Connection | ( | const CONNECTION_NAME_TYPE | connection_name, | |
MESSAGING_PATTERN_TYPE | pattern, | |||
CONNECTION_ID_TYPE & | connection_id, | |||
CONNECTION_DIRECTION_TYPE & | connection_direction, | |||
MESSAGE_SIZE_TYPE & | max_message_size, | |||
TIMEOUT_TYPE | , | |||
RETURN_CODE_TYPE & | return_code | |||
) |
Definition at line 87 of file FaceTSS.cpp.
References OpenDDS::FaceTSS::config::ConnectionSettings::config_name_, OpenDDS::FaceTSS::config::ConnectionSettings::connection_id_, OpenDDS::FaceTSS::convertDuration(), create_opendds_entities(), OpenDDS::FaceTSS::config::QosSettings::datawriter_qos(), OpenDDS::FaceTSS::config::ConnectionSettings::direction_, OpenDDS::FaceTSS::config::ConnectionSettings::domain_id_, OpenDDS::FaceTSS::config::Parser::find_connection(), OpenDDS::FaceTSS::config::Parser::find_qos(), OpenDDS::FaceTSS::config::Parser::find_topic(), DDS::DataWriterQos::lifespan, OpenDDS::FaceTSS::config::TopicSettings::max_message_size_, parser, OpenDDS::FaceTSS::config::ConnectionSettings::participant_id_, OpenDDS::FaceTSS::config::TopicSettings::platform_view_guid_, OpenDDS::FaceTSS::config::ConnectionSettings::topic_name_, and OpenDDS::FaceTSS::config::TopicSettings::type_name_.
00094 { 00095 return_code = RC_NO_ERROR; 00096 00097 if (pattern != PUB_SUB) { 00098 return_code = INVALID_CONFIG; 00099 return; 00100 } 00101 00102 ConnectionSettings connection; 00103 TopicSettings topic; 00104 QosSettings qos; 00105 00106 // Find connection 00107 if (!parser.find_connection(connection_name, connection)) { 00108 // Find topic 00109 if (!parser.find_topic(connection.topic_name_, topic)) { 00110 // Find Qos by specified name(s) 00111 if (parser.find_qos(connection, qos)) { 00112 return_code = INVALID_CONFIG; 00113 } 00114 } else { 00115 return_code = INVALID_CONFIG; 00116 } 00117 } else { 00118 return_code = INVALID_CONFIG; 00119 } 00120 00121 if (return_code != RC_NO_ERROR) { 00122 return; 00123 } 00124 00125 connection_id = connection.connection_id_; 00126 connection_direction = connection.direction_; 00127 max_message_size = topic.max_message_size_; 00128 00129 return_code = create_opendds_entities(connection_id, 00130 connection.participant_id_, 00131 connection.domain_id_, 00132 connection.topic_name_, 00133 topic.type_name_, 00134 connection_direction, 00135 qos, 00136 connection.config_name_); 00137 if (return_code != RC_NO_ERROR) { 00138 return; 00139 } 00140 00141 const SYSTEM_TIME_TYPE refresh_period = 00142 (connection_direction == SOURCE) ? 00143 OpenDDS::FaceTSS::convertDuration(qos.datawriter_qos().lifespan.duration) : 0; 00144 00145 const TRANSPORT_CONNECTION_STATUS_TYPE status = { 00146 0, // MESSAGE currently set to 0 due to type mismatch in MESSAGE_RANGE_TYPE and MESSAGE_TYPE_GUID 00147 max_message_size, // MAX_MESSAGE with no transformations, set to config value 00148 max_message_size, 00149 connection_direction, 00150 0, // WAITING_PROCESSES_OR_MESSAGES 00151 refresh_period, 00152 INVALID, 00153 }; 00154 Entities::ConnectionInfo& conn_info = Entities::instance()->connections_[connection_id]; 00155 conn_info.connection_name = connection_name; 00156 conn_info.connection_status = status; 00157 conn_info.platform_view_guid = topic.platform_view_guid_; 00158 }
void FACE::TS::Get_Connection_Parameters | ( | CONNECTION_NAME_TYPE & | connection_name, | |
CONNECTION_ID_TYPE & | connection_id, | |||
TRANSPORT_CONNECTION_STATUS_TYPE & | status, | |||
RETURN_CODE_TYPE & | return_code | |||
) |
Definition at line 160 of file FaceTSS.cpp.
References OpenDDS::FaceTSS::config::ConnectionSettings::connection_id_, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::FaceTSS::config::Parser::find_connection(), OPENDDS_STRING, parser, OpenDDS::FaceTSS::Entities::receivers_, and OpenDDS::FaceTSS::Entities::senders_.
00164 { 00165 // connection_name is optional, if absent populate from connection_id lookup 00166 // connection_id is also optional, if absent populate from connection_name lookup 00167 // if both provided, validate 00168 // if neither present, return error 00169 Entities& entities = *Entities::instance(); 00170 00171 if (connection_id != 0 && entities.connections_.count(connection_id)) { 00172 // connection_id was provided 00173 // if validated or populated, set return_code so status will be populated 00174 if (connection_name[0]) { 00175 // Validate provided connection_name 00176 OPENDDS_STRING conn_name = entities.connections_[connection_id].connection_name; 00177 if (std::strcmp(connection_name, conn_name.c_str()) == 0) { 00178 return_code = RC_NO_ERROR; 00179 } else { 00180 return_code = INVALID_PARAM; 00181 } 00182 } else { 00183 // connection_name not provided 00184 // so populate from connection_id lookup 00185 // and set return code so status will be populated 00186 entities.connections_[connection_id].connection_name.copy(connection_name, 00187 sizeof(CONNECTION_NAME_TYPE)); 00188 connection_name[sizeof(CONNECTION_NAME_TYPE) - 1] = 0; 00189 return_code = RC_NO_ERROR; 00190 } 00191 00192 } else if (connection_name[0] && connection_id == 0) { 00193 // connection_id was not specified, but name was provided. 00194 // lookup connection_id and if found set return code to populate status 00195 ConnectionSettings settings; 00196 if (0 == parser.find_connection(connection_name, settings)) { 00197 connection_id = settings.connection_id_; 00198 return_code = RC_NO_ERROR; 00199 } else { 00200 // could not find connection for connection_name 00201 return_code = INVALID_PARAM; 00202 } 00203 } else { 00204 //Neither connection_id or connection_name provided 00205 // a valid connection 00206 return_code = INVALID_PARAM; 00207 } 00208 if (return_code == RC_NO_ERROR) { 00209 TRANSPORT_CONNECTION_STATUS_TYPE& cur_status = entities.connections_[connection_id].connection_status; 00210 if (cur_status.CONNECTION_DIRECTION == FACE::DESTINATION) { 00211 Entities::FaceReceiver& receiver = *entities.receivers_[connection_id]; 00212 if (receiver.status_valid != FACE::VALID) { 00213 if (OpenDDS::DCPS::DCPS_debug_level > 3) { 00214 ACE_DEBUG((LM_DEBUG, "Get_Connection_Parameters: returning NOT_AVAILABLE due to receiver's status not valid\n")); 00215 } 00216 return_code = NOT_AVAILABLE; 00217 return; 00218 } 00219 if (receiver.total_msgs_recvd != 0) { 00220 cur_status.REFRESH_PERIOD = receiver.sum_recvd_msgs_latency/receiver.total_msgs_recvd; 00221 cur_status.LAST_MSG_VALIDITY = receiver.last_msg_header.message_validity; 00222 } else { 00223 cur_status.REFRESH_PERIOD = 0; 00224 } 00225 WAITING_RANGE_TYPE num_waiting; 00226 if (receiver.messages_waiting(num_waiting) == FACE::RC_NO_ERROR) { 00227 cur_status.WAITING_PROCESSES_OR_MESSAGES = num_waiting; 00228 } else { 00229 if (OpenDDS::DCPS::DCPS_debug_level > 3) { 00230 ACE_DEBUG((LM_DEBUG, "Get_Connection_Parameters: returning NOT_AVAILABLE due to messages_waiting\n")); 00231 } 00232 return_code = NOT_AVAILABLE; 00233 return; 00234 } 00235 } else { 00236 //DDS Only supports Destination/Source therefore 00237 // CONNECTION_DIRECTION == FACE::SOURCE 00238 Entities::FaceSender& sender = entities.senders_[connection_id]; 00239 if (sender.status_valid != FACE::VALID) { 00240 return_code = NOT_AVAILABLE; 00241 return; 00242 } 00243 cur_status.REFRESH_PERIOD = 0; 00244 cur_status.WAITING_PROCESSES_OR_MESSAGES = 0; 00245 } 00246 status = cur_status; 00247 } 00248 }
void FACE::TS::Unregister_Callback | ( | CONNECTION_ID_TYPE | connection_id, | |
RETURN_CODE_TYPE & | return_code | |||
) |
Definition at line 250 of file FaceTSS.cpp.
References OpenDDS::FaceTSS::Entities::receivers_.
00252 { 00253 Entities& entities = *Entities::instance(); 00254 Entities::ConnIdToReceiverMap& readers = entities.receivers_; 00255 if (readers.count(connection_id)) { 00256 readers[connection_id]->dr->set_listener(NULL, 0); 00257 return_code = RC_NO_ERROR; 00258 return; 00259 } 00260 return_code = INVALID_PARAM; 00261 }
void FACE::TS::Destroy_Connection | ( | CONNECTION_ID_TYPE | connection_id, | |
RETURN_CODE_TYPE & | return_code | |||
) |
Definition at line 263 of file FaceTSS.cpp.
References cleanup_opendds_participant(), cleanup_opendds_publisher(), cleanup_opendds_subscriber(), OpenDDS::FaceTSS::Entities::receivers_, and OpenDDS::FaceTSS::Entities::senders_.
00265 { 00266 Entities& entities = *Entities::instance(); 00267 Entities::ConnIdToSenderMap& writers = entities.senders_; 00268 Entities::ConnIdToReceiverMap& readers = entities.receivers_; 00269 00270 DDS::DomainParticipant_var dp; 00271 bool try_cleanup_participant = false; 00272 if (writers.count(connection_id)) { 00273 const DDS::DataWriter_var datawriter = writers[connection_id].dw; 00274 const DDS::Publisher_var pub = datawriter->get_publisher(); 00275 writers.erase(connection_id); 00276 pub->delete_datawriter(datawriter); 00277 dp = pub->get_participant(); 00278 try_cleanup_participant = cleanup_opendds_publisher(pub); 00279 00280 } else if (readers.count(connection_id)) { 00281 const DDS::DataReader_var datareader = readers[connection_id]->dr; 00282 const DDS::Subscriber_var sub = datareader->get_subscriber(); 00283 delete readers[connection_id]; 00284 readers.erase(connection_id); 00285 sub->delete_datareader(datareader); 00286 dp = sub->get_participant(); 00287 try_cleanup_participant = cleanup_opendds_subscriber(sub); 00288 } 00289 00290 if (!dp) { 00291 return_code = INVALID_PARAM; 00292 return; 00293 } 00294 00295 if (try_cleanup_participant) { 00296 cleanup_opendds_participant(dp); 00297 } 00298 00299 entities.connections_.erase(connection_id); 00300 return_code = RC_NO_ERROR; 00301 }
OpenDDS_FACE_Export void FACE::TS::receive_header | ( | FACE::CONNECTION_ID_TYPE | connection_id, | |
FACE::TIMEOUT_TYPE | , | |||
FACE::TRANSACTION_ID_TYPE & | transaction_id, | |||
FACE::TS::MessageHeader & | message_header, | |||
FACE::MESSAGE_SIZE_TYPE | message_size, | |||
FACE::RETURN_CODE_TYPE & | return_code | |||
) |
Definition at line 303 of file FaceTSS.cpp.
References OpenDDS::DCPS::DCPS_debug_level, DDS::RETCODE_BAD_PARAMETER, DDS::RETCODE_OK, and OpenDDS::FaceTSS::update_status().
Referenced by Receive_Message().
00309 { 00310 Entities::ConnIdToReceiverMap& readers = 00311 Entities::instance()->receivers_; 00312 // transaction_id cannot be 0 due to initialization 00313 // of last_msg_tid to 0 before a msg has been received so 00314 // only valid transaction_ids are > 0. 00315 if (!readers.count(connection_id) || transaction_id == 0) { 00316 if (OpenDDS::DCPS::DCPS_debug_level > 3) { 00317 ACE_DEBUG((LM_DEBUG, "(%P|%t) receive_header - INVALID_PARAM - " 00318 "could not find reader for connection_id: %d OR transaction id[%d] == 0 \n", 00319 connection_id, 00320 transaction_id)); 00321 } 00322 return_code = FACE::INVALID_PARAM; 00323 return; 00324 } 00325 00326 if (message_size < 0 || (unsigned)message_size < sizeof(FACE::TS::MessageHeader)) { 00327 if (OpenDDS::DCPS::DCPS_debug_level) { 00328 ACE_DEBUG((LM_DEBUG, "(%P|%t) receive_header - INVALID_PARAM - message_size: %d is < %d \n", 00329 message_size, 00330 sizeof(FACE::TS::MessageHeader))); 00331 } 00332 return_code = FACE::INVALID_PARAM; 00333 return; 00334 } 00335 if (transaction_id == readers[connection_id]->last_msg_tid) { 00336 message_header = readers[connection_id]->last_msg_header; 00337 return_code = OpenDDS::FaceTSS::update_status(connection_id, DDS::RETCODE_OK); 00338 } else { 00339 return_code = OpenDDS::FaceTSS::update_status(connection_id, DDS::RETCODE_BAD_PARAMETER); 00340 } 00341 }
void FACE::TS::Receive_Message | ( | CONNECTION_ID_TYPE | connection_id, | |
TIMEOUT_TYPE | timeout, | |||
TRANSACTION_ID_TYPE & | transaction_id, | |||
MessageHeader & | message_header, | |||
MESSAGE_SIZE_TYPE | message_size, | |||
RETURN_CODE_TYPE & | return_code | |||
) |
Definition at line 343 of file FaceTSS.cpp.
References receive_header().
00350 { 00351 receive_header(connection_id, timeout, 00352 transaction_id, message_header, 00353 message_size, return_code); 00354 }
void FACE::TS::@79::find_or_create_dp | ( | const DDS::DomainId_t & | domainId, | |
int | participantId, | |||
const DDS::DomainParticipantFactory_var & | dpf, | |||
DDS::DomainParticipant_var & | dp | |||
) | [static] |
Definition at line 357 of file FaceTSS.cpp.
References OpenDDS::DCPS::DCPS_debug_level, PARTICIPANT_QOS_DEFAULT, and DDS::DomainParticipantQos::user_data.
00360 { 00361 DDS::DomainParticipant_var temp_dp; 00362 00363 temp_dp = dpf->lookup_participant(domainId); 00364 if (!temp_dp) { 00365 if (OpenDDS::DCPS::DCPS_debug_level > 3) { 00366 ACE_DEBUG((LM_DEBUG, "(%P|%t) find_or_create_dp - created new participant for domainId: %d\n", domainId)); 00367 } 00368 DDS::DomainParticipantQos qos(PARTICIPANT_QOS_DEFAULT); 00369 qos.user_data.value.length(6); 00370 qos.user_data.value[0] = (participantId >> 0) & 0xFF; 00371 qos.user_data.value[1] = (participantId >> 8) & 0xFF; 00372 qos.user_data.value[2] = (participantId >> 16) & 0xFF; 00373 qos.user_data.value[3] = (participantId >> 24) & 0xFF; 00374 qos.user_data.value[4] = 0; // (participantId >> 32) & 0xFF; 00375 qos.user_data.value[5] = 0; // (participantId >> 40) & 0xFF; 00376 dp = dpf->create_participant(domainId, qos, 0, 0); 00377 return; 00378 } 00379 if (OpenDDS::DCPS::DCPS_debug_level > 3) { 00380 ACE_DEBUG((LM_DEBUG, "(%P|%t) find_or_create_dp - found exiting participant for domainId: %d \n", domainId)); 00381 } 00382 dp = temp_dp; 00383 }
void FACE::TS::@79::find_or_create_pub | ( | const DDS::PublisherQos & | qos, | |
const DDS::DomainParticipant_var & | dp, | |||
DDS::Publisher_var & | pub | |||
) | [static] |
Definition at line 385 of file FaceTSS.cpp.
References OpenDDS::DCPS::DCPS_debug_level.
00387 { 00388 DDS::DomainParticipant_var temp_dp; 00389 DDS::Publisher_var temp_pub; 00390 00391 Entities::ConnIdToSenderMap& writers = Entities::instance()->senders_; 00392 Entities::ConnIdToSenderMap::iterator wtrIter = writers.begin(); 00393 00394 while (wtrIter != writers.end()) { 00395 temp_pub = wtrIter->second.dw->get_publisher(); 00396 temp_dp = temp_pub->get_participant(); 00397 DDS::PublisherQos temp_qos; 00398 temp_pub->get_qos(temp_qos); 00399 if (dp->get_domain_id() == temp_dp->get_domain_id() && 00400 temp_qos == qos) { 00401 if (OpenDDS::DCPS::DCPS_debug_level > 3) { 00402 ACE_DEBUG((LM_DEBUG, "(%P|%t) find_or_create_pub - found exiting publisher in senders\n")); 00403 } 00404 pub = temp_pub; 00405 return; 00406 } else { 00407 ++wtrIter; 00408 } 00409 } 00410 if (OpenDDS::DCPS::DCPS_debug_level > 3) { 00411 ACE_DEBUG((LM_DEBUG, "(%P|%t) find_or_create_pub - created new publisher\n")); 00412 } 00413 pub = dp->create_publisher(qos, 0, 0); 00414 }
void FACE::TS::@79::find_or_create_sub | ( | const DDS::SubscriberQos & | qos, | |
const DDS::DomainParticipant_var & | dp, | |||
DDS::Subscriber_var & | sub | |||
) | [static] |
Definition at line 416 of file FaceTSS.cpp.
References OpenDDS::DCPS::DCPS_debug_level.
00419 { 00420 DDS::DomainParticipant_var temp_dp; 00421 DDS::Subscriber_var temp_sub; 00422 00423 Entities::ConnIdToReceiverMap& readers = Entities::instance()->receivers_; 00424 Entities::ConnIdToReceiverMap::iterator rdrIter = readers.begin(); 00425 00426 while (rdrIter != readers.end()) { 00427 temp_sub = rdrIter->second->dr->get_subscriber(); 00428 temp_dp = temp_sub->get_participant(); 00429 DDS::SubscriberQos temp_qos; 00430 temp_sub->get_qos(temp_qos); 00431 if (dp->get_domain_id() == temp_dp->get_domain_id() && 00432 temp_qos == qos) { 00433 if (OpenDDS::DCPS::DCPS_debug_level > 3) { 00434 ACE_DEBUG((LM_DEBUG, "(%P|%t) find_or_create_sub - found exiting subscriber in receivers\n")); 00435 } 00436 sub = temp_sub; 00437 return; 00438 } else { 00439 ++rdrIter; 00440 } 00441 } 00442 if (OpenDDS::DCPS::DCPS_debug_level > 3) { 00443 ACE_DEBUG((LM_DEBUG, "(%P|%t) find_or_create_sub - created new subscriber\n")); 00444 } 00445 sub = dp->create_subscriber(qos, 0, 0); 00446 }
bool FACE::TS::@79::cleanup_opendds_publisher | ( | const DDS::Publisher_var | pub | ) | [static] |
Definition at line 448 of file FaceTSS.cpp.
References OpenDDS::DCPS::DCPS_debug_level.
00449 { 00450 DDS::Publisher_var temp_pub; 00451 DDS::PublisherQos pub_qos; 00452 pub->get_qos(pub_qos); 00453 DDS::DomainParticipant_var dp = pub->get_participant(); 00454 DDS::DomainParticipant_var temp_dp; 00455 DDS::PublisherQos temp_qos; 00456 00457 Entities::ConnIdToSenderMap& writers = Entities::instance()->senders_; 00458 Entities::ConnIdToSenderMap::iterator wtrIter = writers.begin(); 00459 while (wtrIter != writers.end()) { 00460 temp_pub = wtrIter->second.dw->get_publisher(); 00461 temp_dp = temp_pub->get_participant(); 00462 if (dp->get_domain_id() == temp_dp->get_domain_id()) { 00463 temp_pub->get_qos(temp_qos); 00464 if (pub_qos == temp_qos) { 00465 if (OpenDDS::DCPS::DCPS_debug_level > 3) { 00466 ACE_DEBUG((LM_DEBUG, "(%P|%t) cleanup_opendds_publisher - publisher still in use by other writer\n")); 00467 } 00468 return false; 00469 } 00470 } 00471 ++wtrIter; 00472 } 00473 if (OpenDDS::DCPS::DCPS_debug_level > 3) { 00474 ACE_DEBUG((LM_DEBUG, "(%P|%t) cleanup_opendds_publisher - publisher no longer in use, delete pub\n")); 00475 } 00476 dp->delete_publisher(pub); 00477 return true; 00478 }
bool FACE::TS::@79::cleanup_opendds_subscriber | ( | const DDS::Subscriber_var | sub | ) | [static] |
Definition at line 480 of file FaceTSS.cpp.
References OpenDDS::DCPS::DCPS_debug_level.
00481 { 00482 DDS::Subscriber_var temp_sub; 00483 DDS::SubscriberQos sub_qos; 00484 sub->get_qos(sub_qos); 00485 DDS::DomainParticipant_var dp = sub->get_participant(); 00486 DDS::DomainParticipant_var temp_dp; 00487 DDS::SubscriberQos temp_qos; 00488 00489 00490 Entities::ConnIdToReceiverMap& readers = Entities::instance()->receivers_; 00491 Entities::ConnIdToReceiverMap::iterator rdrIter = readers.begin(); 00492 while (rdrIter != readers.end()) { 00493 temp_sub = rdrIter->second->dr->get_subscriber(); 00494 temp_dp = temp_sub->get_participant(); 00495 00496 if (dp->get_domain_id() == temp_dp->get_domain_id()) { 00497 temp_sub->get_qos(temp_qos); 00498 if (sub_qos == temp_qos) { 00499 if (OpenDDS::DCPS::DCPS_debug_level > 3) { 00500 ACE_DEBUG((LM_DEBUG, "(%P|%t) cleanup_opendds_subscriber - subscriber still in use by other reader\n")); 00501 } 00502 return false; 00503 } 00504 } 00505 ++rdrIter; 00506 } 00507 if (OpenDDS::DCPS::DCPS_debug_level > 3) { 00508 ACE_DEBUG((LM_DEBUG, "(%P|%t) cleanup_opendds_subscriber - subscriber no longer in use, delete sub\n")); 00509 } 00510 dp->delete_subscriber(sub); 00511 return true; 00512 }
void FACE::TS::@79::cleanup_opendds_participant | ( | const DDS::DomainParticipant_var | dp | ) | [static] |
Definition at line 514 of file FaceTSS.cpp.
References OpenDDS::DCPS::DCPS_debug_level, and TheParticipantFactory.
00515 { 00516 DDS::DomainParticipant_var temp_dp; 00517 Entities::ConnIdToReceiverMap& readers = Entities::instance()->receivers_; 00518 Entities::ConnIdToReceiverMap::iterator rdrIter = readers.begin(); 00519 while (rdrIter != readers.end()) { 00520 temp_dp = rdrIter->second->dr->get_subscriber()->get_participant(); 00521 if (dp->get_domain_id() == temp_dp->get_domain_id()) { 00522 if (OpenDDS::DCPS::DCPS_debug_level > 3) { 00523 ACE_DEBUG((LM_DEBUG, "(%P|%t) cleanup_opendds_participant - participant still in use by reader\n")); 00524 } 00525 return; 00526 } else { 00527 ++rdrIter; 00528 } 00529 } 00530 00531 Entities::ConnIdToSenderMap& writers = Entities::instance()->senders_; 00532 Entities::ConnIdToSenderMap::iterator wtrIter = writers.begin(); 00533 00534 while (wtrIter != writers.end()) { 00535 temp_dp = wtrIter->second.dw->get_publisher()->get_participant(); 00536 if (dp->get_domain_id() == temp_dp->get_domain_id()) { 00537 if (OpenDDS::DCPS::DCPS_debug_level > 3) { 00538 ACE_DEBUG((LM_DEBUG, "(%P|%t) cleanup_opendds_participant - participant still in use by writer\n")); 00539 } 00540 return; 00541 } else { 00542 ++wtrIter; 00543 } 00544 } 00545 if (OpenDDS::DCPS::DCPS_debug_level > 3) { 00546 ACE_DEBUG((LM_DEBUG, "(%P|%t) cleanup_opendds_participant - participant for domain: %d no longer in use, delete entities and participant\n", dp->get_domain_id())); 00547 } 00548 dp->delete_contained_entities(); 00549 const DDS::DomainParticipantFactory_var dpf = TheParticipantFactory; 00550 dpf->delete_participant(dp); 00551 }
RETURN_CODE_TYPE FACE::TS::@79::create_opendds_entities | ( | CONNECTION_ID_TYPE | connectionId, | |
int | participantId, | |||
const DDS::DomainId_t | domainId, | |||
const char * | topicName, | |||
const char * | type, | |||
CONNECTION_DIRECTION_TYPE | dir, | |||
QosSettings & | qos_settings, | |||
const char * | transport | |||
) | [static] |
Definition at line 553 of file FaceTSS.cpp.
References OpenDDS::FaceTSS::config::QosSettings::apply_to(), find_or_create_dp(), find_or_create_pub(), find_or_create_sub(), OpenDDS::DCPS::RcHandle< T >::is_nil(), Registered_Data_Types, OpenDDS::DCPS::set_DCPS_debug_level(), TheParticipantFactory, TheServiceParticipant, TheTransportRegistry, TOPIC_QOS_DEFAULT, OpenDDS::DCPS::Transport_debug_level, DDS::DataReaderQos::user_data, and DDS::DataWriterQos::user_data.
00561 { 00562 #ifdef DEBUG_OPENDDS_FACETSS 00563 OpenDDS::DCPS::set_DCPS_debug_level(8); 00564 OpenDDS::DCPS::Transport_debug_level = 5; 00565 TheServiceParticipant->set_BIT(false); 00566 #endif 00567 00568 const DDS::DomainParticipantFactory_var dpf = TheParticipantFactory; 00569 if (!dpf) return INVALID_PARAM; 00570 00571 DDS::DomainParticipant_var dp; 00572 find_or_create_dp(domainId, participantId, dpf, dp); 00573 if (!dp) return INVALID_PARAM; 00574 00575 using OpenDDS::DCPS::Data_Types_Register; 00576 OpenDDS::DCPS::TypeSupport* ts = 00577 Registered_Data_Types->lookup(dp, type); 00578 if (!ts) { 00579 ts = Registered_Data_Types->lookup(0, type); 00580 if (!ts) return INVALID_PARAM; 00581 Registered_Data_Types->register_type(dp, type, ts); 00582 } 00583 00584 const DDS::Topic_var topic = 00585 dp->create_topic(topicName, type, TOPIC_QOS_DEFAULT, 0, 0); 00586 if (!topic) return INVALID_PARAM; 00587 00588 if (dir == SOURCE) { 00589 DDS::PublisherQos publisher_qos; 00590 qos_settings.apply_to(publisher_qos); 00591 00592 DDS::Publisher_var pub; 00593 find_or_create_pub(publisher_qos, dp, pub); 00594 if (!pub) return INVALID_PARAM; 00595 00596 if (transport && transport[0]) { 00597 OpenDDS::DCPS::TransportConfig_rch config = TheTransportRegistry->get_config(transport); 00598 if (config.is_nil()) return INVALID_PARAM; 00599 TheTransportRegistry->bind_config(config, pub); 00600 } 00601 00602 DDS::DataWriterQos datawriter_qos; 00603 qos_settings.apply_to(datawriter_qos); 00604 00605 // set up user data in DW qos 00606 datawriter_qos.user_data.value.length (3); 00607 datawriter_qos.user_data.value[0] = (connectionId >> 0) & 0xFF; 00608 datawriter_qos.user_data.value[1] = (connectionId >> 8) & 0xFF; 00609 datawriter_qos.user_data.value[2] = (connectionId >> 16) & 0xFF; 00610 00611 const DDS::DataWriter_var dw = 00612 pub->create_datawriter(topic, datawriter_qos, 0, 0); 00613 if (!dw) return INVALID_PARAM; 00614 00615 Entities::instance()->senders_[connectionId].dw = dw; 00616 00617 } else { // dir == DESTINATION 00618 DDS::SubscriberQos subscriber_qos; 00619 qos_settings.apply_to(subscriber_qos); 00620 00621 DDS::Subscriber_var sub; 00622 find_or_create_sub(subscriber_qos, dp, sub); 00623 if (!sub) return INVALID_PARAM; 00624 00625 if (transport && transport[0]) { 00626 OpenDDS::DCPS::TransportConfig_rch config = TheTransportRegistry->get_config(transport); 00627 if (config.is_nil()) return INVALID_PARAM; 00628 TheTransportRegistry->bind_config(config, sub); 00629 } 00630 00631 DDS::DataReaderQos datareader_qos; 00632 qos_settings.apply_to(datareader_qos); 00633 00634 // set up user data in DR qos 00635 datareader_qos.user_data.value.length (3); 00636 datareader_qos.user_data.value[0] = (connectionId >> 0) & 0xFF; 00637 datareader_qos.user_data.value[1] = (connectionId >> 8) & 0xFF; 00638 datareader_qos.user_data.value[2] = (connectionId >> 16) & 0xFF; 00639 00640 const DDS::DataReader_var dr = 00641 sub->create_datareader(topic, datareader_qos, 0, 0); 00642 if (!dr) return INVALID_PARAM; 00643 Entities::instance()->receivers_[connectionId] = new Entities::FaceReceiver(); 00644 Entities::instance()->receivers_[connectionId]->dr = dr; 00645 } 00646 00647 return RC_NO_ERROR; 00648 }
Definition at line 40 of file FaceTSS.cpp.
Referenced by Create_Connection(), OpenDDS::DCPS::FilterEvaluator::FilterEvaluator(), Get_Connection_Parameters(), Initialize(), and OpenDDS::DCPS::MultiTopicImpl::MultiTopicImpl().