Functions | |
void | Initialize (const CONFIGURATION_RESOURCE configuration_file, RETURN_CODE_TYPE &return_code) |
void | Create_Connection (const CONNECTION_NAME_TYPE connection_name, MESSAGING_PATTERN_TYPE pattern, CONNECTION_ID_TYPE &connection_id, CONNECTION_DIRECTION_TYPE &connection_direction, MESSAGE_SIZE_TYPE &max_message_size, TIMEOUT_TYPE, RETURN_CODE_TYPE &return_code) |
void | Get_Connection_Parameters (CONNECTION_NAME_TYPE &connection_name, CONNECTION_ID_TYPE &connection_id, TRANSPORT_CONNECTION_STATUS_TYPE &status, RETURN_CODE_TYPE &return_code) |
void | Unregister_Callback (CONNECTION_ID_TYPE connection_id, RETURN_CODE_TYPE &return_code) |
void | Destroy_Connection (CONNECTION_ID_TYPE connection_id, RETURN_CODE_TYPE &return_code) |
OpenDDS_FACE_Export void | receive_header (FACE::CONNECTION_ID_TYPE connection_id, FACE::TIMEOUT_TYPE, FACE::TRANSACTION_ID_TYPE &transaction_id, FACE::TS::MessageHeader &message_header, FACE::MESSAGE_SIZE_TYPE message_size, FACE::RETURN_CODE_TYPE &return_code) |
void | Receive_Message (CONNECTION_ID_TYPE connection_id, TIMEOUT_TYPE timeout, TRANSACTION_ID_TYPE &transaction_id, MessageHeader &message_header, MESSAGE_SIZE_TYPE message_size, RETURN_CODE_TYPE &return_code) |
void FACE::TS::Create_Connection | ( | const CONNECTION_NAME_TYPE | connection_name, | |
MESSAGING_PATTERN_TYPE | pattern, | |||
CONNECTION_ID_TYPE & | connection_id, | |||
CONNECTION_DIRECTION_TYPE & | connection_direction, | |||
MESSAGE_SIZE_TYPE & | max_message_size, | |||
TIMEOUT_TYPE | , | |||
RETURN_CODE_TYPE & | return_code | |||
) |
Definition at line 95 of file FaceTSS.cpp.
References OpenDDS::FaceTSS::config::ConnectionSettings::config_name_, OpenDDS::FaceTSS::config::ConnectionSettings::connection_id_, OpenDDS::FaceTSS::Entities::ConnectionInfo::connection_name, OpenDDS::FaceTSS::Entities::ConnectionInfo::connection_status, OpenDDS::FaceTSS::convertDuration(), OpenDDS::FaceTSS::config::QosSettings::datawriter_qos(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::FaceTSS::config::ConnectionSettings::direction_, OpenDDS::FaceTSS::config::ConnectionSettings::domain_id_, DDS::DataWriterQos::lifespan, LM_ERROR, OpenDDS::FaceTSS::config::TopicSettings::max_message_size_, parser, OpenDDS::FaceTSS::config::ConnectionSettings::participant_id_, OpenDDS::FaceTSS::Entities::ConnectionInfo::platform_view_guid, OpenDDS::FaceTSS::config::TopicSettings::platform_view_guid_, status, OpenDDS::FaceTSS::config::ConnectionSettings::topic_name_, and OpenDDS::FaceTSS::config::TopicSettings::type_name_.
00102 { 00103 try { 00104 return_code = RC_NO_ERROR; 00105 00106 if (pattern != PUB_SUB) { 00107 return_code = INVALID_CONFIG; 00108 return; 00109 } 00110 00111 ConnectionSettings connection; 00112 TopicSettings topic; 00113 QosSettings qos; 00114 00115 // Find connection 00116 if (!parser.find_connection(connection_name, connection)) { 00117 // Find topic 00118 if (!parser.find_topic(connection.topic_name_, topic)) { 00119 // Find Qos by specified name(s) 00120 if (parser.find_qos(connection, qos)) { 00121 return_code = INVALID_CONFIG; 00122 } 00123 } else { 00124 return_code = INVALID_CONFIG; 00125 } 00126 } else { 00127 return_code = INVALID_CONFIG; 00128 } 00129 00130 if (return_code != RC_NO_ERROR) { 00131 return; 00132 } 00133 00134 connection_id = connection.connection_id_; 00135 connection_direction = connection.direction_; 00136 max_message_size = topic.max_message_size_; 00137 00138 return_code = create_opendds_entities(connection_id, 00139 connection.participant_id_, 00140 connection.domain_id_, 00141 connection.topic_name_, 00142 topic.type_name_, 00143 connection_direction, 00144 qos, 00145 connection.config_name_); 00146 if (return_code != RC_NO_ERROR) { 00147 return; 00148 } 00149 00150 const SYSTEM_TIME_TYPE refresh_period = 00151 (connection_direction == SOURCE) ? 00152 OpenDDS::FaceTSS::convertDuration(qos.datawriter_qos().lifespan.duration) : 0; 00153 00154 const TRANSPORT_CONNECTION_STATUS_TYPE status = { 00155 0, // MESSAGE currently set to 0 due to type mismatch in MESSAGE_RANGE_TYPE and MESSAGE_TYPE_GUID 00156 max_message_size, // MAX_MESSAGE with no transformations, set to config value 00157 max_message_size, 00158 connection_direction, 00159 0, // WAITING_PROCESSES_OR_MESSAGES 00160 refresh_period, 00161 INVALID, 00162 }; 00163 Entities::ConnectionInfo& conn_info = Entities::instance()->connections_[connection_id]; 00164 conn_info.connection_name = connection_name; 00165 conn_info.connection_status = status; 00166 conn_info.platform_view_guid = topic.platform_view_guid_; 00167 } catch (const CORBA::BAD_PARAM&) { 00168 if (OpenDDS::DCPS::DCPS_debug_level) { 00169 ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: Create_Connection - INVALID_PARAM\n")); 00170 } 00171 return_code = INVALID_PARAM; 00172 } 00173 }
void FACE::TS::Destroy_Connection | ( | CONNECTION_ID_TYPE | connection_id, | |
RETURN_CODE_TYPE & | return_code | |||
) |
Definition at line 292 of file FaceTSS.cpp.
References OpenDDS::DCPS::DCPS_debug_level, LM_ERROR, OpenDDS::FaceTSS::Entities::receivers_, and OpenDDS::FaceTSS::Entities::senders_.
00294 { 00295 try { 00296 Entities& entities = *Entities::instance(); 00297 Entities::ConnIdToSenderMap& writers = entities.senders_; 00298 Entities::ConnIdToReceiverMap& readers = entities.receivers_; 00299 00300 DDS::DomainParticipant_var dp; 00301 bool try_cleanup_participant = false; 00302 if (writers.count(connection_id)) { 00303 const DDS::DataWriter_var datawriter = writers[connection_id].dw; 00304 const DDS::Publisher_var pub = datawriter->get_publisher(); 00305 writers.erase(connection_id); 00306 pub->delete_datawriter(datawriter); 00307 dp = pub->get_participant(); 00308 try_cleanup_participant = cleanup_opendds_publisher(pub); 00309 00310 } else if (readers.count(connection_id)) { 00311 const DDS::DataReader_var datareader = readers[connection_id]->dr; 00312 const DDS::Subscriber_var sub = datareader->get_subscriber(); 00313 delete readers[connection_id]; 00314 readers.erase(connection_id); 00315 sub->delete_datareader(datareader); 00316 dp = sub->get_participant(); 00317 try_cleanup_participant = cleanup_opendds_subscriber(sub); 00318 } 00319 00320 if (!dp) { 00321 return_code = INVALID_PARAM; 00322 return; 00323 } 00324 00325 if (try_cleanup_participant) { 00326 cleanup_opendds_participant(dp); 00327 } 00328 00329 entities.connections_.erase(connection_id); 00330 return_code = RC_NO_ERROR; 00331 } catch (const CORBA::BAD_PARAM&) { 00332 if (OpenDDS::DCPS::DCPS_debug_level) { 00333 ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: Destroy_Connection - INVALID_PARAM\n")); 00334 } 00335 return_code = INVALID_PARAM; 00336 } 00337 }
void FACE::TS::Get_Connection_Parameters | ( | CONNECTION_NAME_TYPE & | connection_name, | |
CONNECTION_ID_TYPE & | connection_id, | |||
TRANSPORT_CONNECTION_STATUS_TYPE & | status, | |||
RETURN_CODE_TYPE & | return_code | |||
) |
Definition at line 175 of file FaceTSS.cpp.
References OpenDDS::FaceTSS::config::ConnectionSettings::connection_id_, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::FaceTSS::Entities::FaceReceiver::last_msg_header, LM_DEBUG, LM_ERROR, OpenDDS::FaceTSS::Entities::FaceReceiver::messages_waiting(), OPENDDS_STRING, parser, OpenDDS::FaceTSS::Entities::receivers_, OpenDDS::FaceTSS::Entities::senders_, OpenDDS::FaceTSS::Entities::DDSAdapter::status_valid, OpenDDS::FaceTSS::Entities::FaceReceiver::sum_recvd_msgs_latency, and OpenDDS::FaceTSS::Entities::FaceReceiver::total_msgs_recvd.
00179 { 00180 try { 00181 // connection_name is optional, if absent populate from connection_id lookup 00182 // connection_id is also optional, if absent populate from connection_name lookup 00183 // if both provided, validate 00184 // if neither present, return error 00185 Entities& entities = *Entities::instance(); 00186 00187 if (connection_id != 0 && entities.connections_.count(connection_id)) { 00188 // connection_id was provided 00189 // if validated or populated, set return_code so status will be populated 00190 if (connection_name[0]) { 00191 // Validate provided connection_name 00192 OPENDDS_STRING conn_name = entities.connections_[connection_id].connection_name; 00193 if (std::strcmp(connection_name, conn_name.c_str()) == 0) { 00194 return_code = RC_NO_ERROR; 00195 } else { 00196 return_code = INVALID_PARAM; 00197 } 00198 } else { 00199 // connection_name not provided 00200 // so populate from connection_id lookup 00201 // and set return code so status will be populated 00202 entities.connections_[connection_id].connection_name.copy(connection_name, 00203 sizeof(CONNECTION_NAME_TYPE)); 00204 connection_name[sizeof(CONNECTION_NAME_TYPE) - 1] = 0; 00205 return_code = RC_NO_ERROR; 00206 } 00207 00208 } else if (connection_name[0] && connection_id == 0) { 00209 // connection_id was not specified, but name was provided. 00210 // lookup connection_id and if found set return code to populate status 00211 ConnectionSettings settings; 00212 if (0 == parser.find_connection(connection_name, settings)) { 00213 connection_id = settings.connection_id_; 00214 return_code = RC_NO_ERROR; 00215 } else { 00216 // could not find connection for connection_name 00217 return_code = INVALID_PARAM; 00218 } 00219 } else { 00220 //Neither connection_id or connection_name provided 00221 // a valid connection 00222 return_code = INVALID_PARAM; 00223 } 00224 if (return_code == RC_NO_ERROR) { 00225 TRANSPORT_CONNECTION_STATUS_TYPE& cur_status = entities.connections_[connection_id].connection_status; 00226 if (cur_status.CONNECTION_DIRECTION == FACE::DESTINATION) { 00227 Entities::FaceReceiver& receiver = *entities.receivers_[connection_id]; 00228 if (receiver.status_valid != FACE::VALID) { 00229 if (OpenDDS::DCPS::DCPS_debug_level > 3) { 00230 ACE_DEBUG((LM_DEBUG, "Get_Connection_Parameters: returning NOT_AVAILABLE due to receiver's status not valid\n")); 00231 } 00232 return_code = NOT_AVAILABLE; 00233 return; 00234 } 00235 if (receiver.total_msgs_recvd != 0) { 00236 cur_status.REFRESH_PERIOD = receiver.sum_recvd_msgs_latency / receiver.total_msgs_recvd; 00237 cur_status.LAST_MSG_VALIDITY = receiver.last_msg_header.message_validity; 00238 } else { 00239 cur_status.REFRESH_PERIOD = 0; 00240 } 00241 WAITING_RANGE_TYPE num_waiting; 00242 if (receiver.messages_waiting(num_waiting) == FACE::RC_NO_ERROR) { 00243 cur_status.WAITING_PROCESSES_OR_MESSAGES = num_waiting; 00244 } else { 00245 if (OpenDDS::DCPS::DCPS_debug_level > 3) { 00246 ACE_DEBUG((LM_DEBUG, "Get_Connection_Parameters: returning NOT_AVAILABLE due to messages_waiting\n")); 00247 } 00248 return_code = NOT_AVAILABLE; 00249 return; 00250 } 00251 } else { 00252 //DDS Only supports Destination/Source therefore 00253 // CONNECTION_DIRECTION == FACE::SOURCE 00254 Entities::FaceSender& sender = entities.senders_[connection_id]; 00255 if (sender.status_valid != FACE::VALID) { 00256 return_code = NOT_AVAILABLE; 00257 return; 00258 } 00259 cur_status.REFRESH_PERIOD = 0; 00260 cur_status.WAITING_PROCESSES_OR_MESSAGES = 0; 00261 } 00262 status = cur_status; 00263 } 00264 } catch (const CORBA::BAD_PARAM&) { 00265 if (OpenDDS::DCPS::DCPS_debug_level) { 00266 ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: Get_Connection_Parameters - INVALID_PARAM\n")); 00267 } 00268 return_code = INVALID_PARAM; 00269 } 00270 }
void FACE::TS::Initialize | ( | const CONFIGURATION_RESOURCE | configuration_file, | |
RETURN_CODE_TYPE & | return_code | |||
) |
Definition at line 70 of file FaceTSS.cpp.
References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, LM_ERROR, ACEXML_Parser::parse(), parser, status, and TheServiceParticipant.
00072 { 00073 try { 00074 int status = parser.parse(configuration_file); 00075 if (status != 0) { 00076 ACE_ERROR((LM_ERROR, 00077 ACE_TEXT("(%P|%t) ERROR: Initialize() ") 00078 ACE_TEXT("Parser::parse () returned %d\n"), 00079 status)); 00080 return_code = INVALID_PARAM; 00081 } else { 00082 return_code = RC_NO_ERROR; 00083 #if defined OPENDDS_SAFETY_PROFILE && defined ACE_HAS_ALLOC_HOOKS 00084 TheServiceParticipant->configure_pool(); 00085 #endif 00086 } 00087 } catch (const CORBA::BAD_PARAM& ) { 00088 if (OpenDDS::DCPS::DCPS_debug_level) { 00089 ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: Initialize - INVALID_PARAM\n")); 00090 } 00091 return_code = INVALID_PARAM; 00092 } 00093 }
OpenDDS_FACE_Export void FACE::TS::receive_header | ( | FACE::CONNECTION_ID_TYPE | connection_id, | |
FACE::TIMEOUT_TYPE | , | |||
FACE::TRANSACTION_ID_TYPE & | transaction_id, | |||
FACE::TS::MessageHeader & | message_header, | |||
FACE::MESSAGE_SIZE_TYPE | message_size, | |||
FACE::RETURN_CODE_TYPE & | return_code | |||
) |
Definition at line 340 of file FaceTSS.cpp.
References OpenDDS::DCPS::DCPS_debug_level, LM_DEBUG, LM_ERROR, DDS::RETCODE_BAD_PARAMETER, DDS::RETCODE_OK, and OpenDDS::FaceTSS::update_status().
Referenced by Receive_Message().
00346 { 00347 try { 00348 Entities::ConnIdToReceiverMap& readers = 00349 Entities::instance()->receivers_; 00350 // transaction_id cannot be 0 due to initialization 00351 // of last_msg_tid to 0 before a msg has been received so 00352 // only valid transaction_ids are > 0. 00353 if (!readers.count(connection_id) || transaction_id == 0) { 00354 if (OpenDDS::DCPS::DCPS_debug_level > 3) { 00355 ACE_DEBUG((LM_DEBUG, "(%P|%t) receive_header - INVALID_PARAM - " 00356 "could not find reader for connection_id: %d OR transaction id[%d] == 0 \n", 00357 connection_id, 00358 transaction_id)); 00359 } 00360 return_code = FACE::INVALID_PARAM; 00361 return; 00362 } 00363 00364 if (message_size < 0 || (unsigned)message_size < sizeof(FACE::TS::MessageHeader)) { 00365 if (OpenDDS::DCPS::DCPS_debug_level) { 00366 ACE_DEBUG((LM_DEBUG, "(%P|%t) receive_header - INVALID_PARAM - message_size: %d is < %d \n", 00367 message_size, 00368 sizeof(FACE::TS::MessageHeader))); 00369 } 00370 return_code = FACE::INVALID_PARAM; 00371 return; 00372 } 00373 if (transaction_id == readers[connection_id]->last_msg_tid) { 00374 message_header = readers[connection_id]->last_msg_header; 00375 return_code = OpenDDS::FaceTSS::update_status(connection_id, DDS::RETCODE_OK); 00376 } else { 00377 return_code = OpenDDS::FaceTSS::update_status(connection_id, DDS::RETCODE_BAD_PARAMETER); 00378 } 00379 } catch (const CORBA::BAD_PARAM&) { 00380 if (OpenDDS::DCPS::DCPS_debug_level) { 00381 ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: receive_header - INVALID_PARAM\n")); 00382 } 00383 return_code = INVALID_PARAM; 00384 } 00385 }
void FACE::TS::Receive_Message | ( | CONNECTION_ID_TYPE | connection_id, | |
TIMEOUT_TYPE | timeout, | |||
TRANSACTION_ID_TYPE & | transaction_id, | |||
MessageHeader & | message_header, | |||
MESSAGE_SIZE_TYPE | message_size, | |||
RETURN_CODE_TYPE & | return_code | |||
) |
Definition at line 387 of file FaceTSS.cpp.
References OpenDDS::DCPS::DCPS_debug_level, LM_ERROR, and receive_header().
00394 { 00395 try { 00396 receive_header(connection_id, timeout, 00397 transaction_id, message_header, 00398 message_size, return_code); 00399 } catch (const CORBA::BAD_PARAM&) { 00400 if (OpenDDS::DCPS::DCPS_debug_level) { 00401 ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: Receive_Message - INVALID_PARAM\n")); 00402 } 00403 return_code = INVALID_PARAM; 00404 } 00405 }
void FACE::TS::Unregister_Callback | ( | CONNECTION_ID_TYPE | connection_id, | |
RETURN_CODE_TYPE & | return_code | |||
) |
Definition at line 272 of file FaceTSS.cpp.
References OpenDDS::DCPS::DCPS_debug_level, LM_ERROR, and OpenDDS::FaceTSS::Entities::receivers_.
00274 { 00275 try { 00276 Entities& entities = *Entities::instance(); 00277 Entities::ConnIdToReceiverMap& readers = entities.receivers_; 00278 if (readers.count(connection_id)) { 00279 readers[connection_id]->dr->set_listener(NULL, 0); 00280 return_code = RC_NO_ERROR; 00281 return; 00282 } 00283 } catch (const CORBA::BAD_PARAM&) { 00284 if (OpenDDS::DCPS::DCPS_debug_level) { 00285 ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: Unregister_Callback - INVALID_PARAM\n")); 00286 } 00287 } 00288 00289 return_code = INVALID_PARAM; 00290 }