Namespaces | |
namespace | config |
Classes | |
class | Entities |
class | Listener |
Enumerations | |
enum | { NSEC_PER_SEC = 1000000000 } |
Functions | |
FACE::RETURN_CODE_TYPE | update_status (FACE::CONNECTION_ID_TYPE connection_id, DDS::ReturnCode_t retcode) |
DDS::Duration_t | convertTimeout (FACE::TIMEOUT_TYPE timeout) |
FACE::SYSTEM_TIME_TYPE | convertDuration (const DDS::Duration_t &duration) |
FACE::SYSTEM_TIME_TYPE | convertTime (const DDS::Time_t ×tamp) |
FACE::MESSAGE_INSTANCE_GUID | create_message_instance_guid (const OpenDDS::DCPS::RepoId &pub, const CORBA::LongLong &orig_seq) |
void | populate_header_received (const FACE::CONNECTION_ID_TYPE &connection_id, const DDS::DomainParticipant_var part, const DDS::SampleInfo &sinfo, FACE::RETURN_CODE_TYPE &return_code) |
template<typename Msg > | |
void | receive_message (FACE::CONNECTION_ID_TYPE connection_id, FACE::TIMEOUT_TYPE timeout, FACE::TRANSACTION_ID_TYPE &transaction_id, Msg &message, FACE::MESSAGE_SIZE_TYPE message_size, FACE::RETURN_CODE_TYPE &return_code) |
template<typename Msg > | |
void | send_message (FACE::CONNECTION_ID_TYPE connection_id, FACE::TIMEOUT_TYPE timeout, FACE::TRANSACTION_ID_TYPE &, const Msg &message, FACE::MESSAGE_SIZE_TYPE message_size, FACE::RETURN_CODE_TYPE &return_code) |
template<typename Msg > | |
void | register_callback (FACE::CONNECTION_ID_TYPE connection_id, const FACE::WAITSET_TYPE, void(*callback)(FACE::TRANSACTION_ID_TYPE, Msg &, FACE::MESSAGE_TYPE_GUID, FACE::MESSAGE_SIZE_TYPE, const FACE::WAITSET_TYPE, FACE::RETURN_CODE_TYPE &), FACE::MESSAGE_SIZE_TYPE max_message_size, FACE::RETURN_CODE_TYPE &return_code) |
anonymous enum |
Definition at line 775 of file FaceTSS.cpp.
00775 { NSEC_PER_SEC = 1000000000 };
OpenDDS_FACE_Export FACE::SYSTEM_TIME_TYPE OpenDDS::FaceTSS::convertDuration | ( | const DDS::Duration_t & | duration | ) |
Referenced by FACE::TS::Create_Connection(), and send_message().
OpenDDS_FACE_Export FACE::SYSTEM_TIME_TYPE OpenDDS::FaceTSS::convertTime | ( | const DDS::Time_t & | timestamp | ) |
OpenDDS_FACE_Export DDS::Duration_t OpenDDS::FaceTSS::convertTimeout | ( | FACE::TIMEOUT_TYPE | timeout | ) |
OpenDDS_FACE_Export FACE::MESSAGE_INSTANCE_GUID OpenDDS::FaceTSS::create_message_instance_guid | ( | const OpenDDS::DCPS::RepoId & | pub, | |
const CORBA::LongLong & | orig_seq | |||
) |
OpenDDS_FACE_Export void OpenDDS::FaceTSS::populate_header_received | ( | const FACE::CONNECTION_ID_TYPE & | connection_id, | |
const DDS::DomainParticipant_var | part, | |||
const DDS::SampleInfo & | sinfo, | |||
FACE::RETURN_CODE_TYPE & | return_code | |||
) |
Referenced by OpenDDS::FaceTSS::Listener< Msg >::on_data_available(), and receive_message().
void OpenDDS::FaceTSS::receive_message | ( | FACE::CONNECTION_ID_TYPE | connection_id, | |
FACE::TIMEOUT_TYPE | timeout, | |||
FACE::TRANSACTION_ID_TYPE & | transaction_id, | |||
Msg & | message, | |||
FACE::MESSAGE_SIZE_TYPE | message_size, | |||
FACE::RETURN_CODE_TYPE & | return_code | |||
) | [inline] |
Definition at line 145 of file FaceTSS.h.
References DDS::ALIVE_INSTANCE_STATE, DDS::ANY_SAMPLE_STATE, DDS::ANY_VIEW_STATE, convertTimeout(), DataReader, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::FaceTSS::Entities::instance(), LM_ERROR, populate_header_received(), OpenDDS::FaceTSS::Entities::receivers_, DDS::RETCODE_BAD_PARAMETER, DDS::RETCODE_NO_DATA, DDS::RETCODE_OK, DDS::RETCODE_TIMEOUT, status, OpenDDS::FaceTSS::Entities::DDSAdapter::status_valid, and update_status().
00151 { 00152 try { 00153 Entities::ConnIdToReceiverMap& readers = Entities::instance()->receivers_; 00154 if (!readers.count(connection_id)) { 00155 return_code = FACE::INVALID_PARAM; 00156 return; 00157 } 00158 if (!Entities::instance()->connections_.count(connection_id)) { 00159 return_code = FACE::INVALID_PARAM; 00160 return; 00161 } 00162 FACE::TRANSPORT_CONNECTION_STATUS_TYPE status = 00163 Entities::instance()->connections_[connection_id].connection_status; 00164 if (message_size < status.MAX_MESSAGE_SIZE) { 00165 return_code = FACE::INVALID_PARAM; 00166 return; 00167 } 00168 typedef typename DCPS::DDSTraits<Msg>::DataReaderType DataReader; 00169 const typename DataReader::_var_type typedReader = 00170 DataReader::_narrow(readers[connection_id]->dr); 00171 if (!typedReader) { 00172 return_code = update_status(connection_id, DDS::RETCODE_BAD_PARAMETER); 00173 return; 00174 } 00175 if (readers[connection_id]->status_valid != FACE::VALID) { 00176 Entities::FaceReceiver* tmp = readers[connection_id]; 00177 readers[connection_id] = new Entities::DDSTypedAdapter<Msg>(*readers[connection_id]); 00178 delete tmp; 00179 } 00180 readers[connection_id]->status_valid = FACE::VALID; 00181 00182 const DDS::ReadCondition_var rc = 00183 typedReader->create_readcondition(DDS::ANY_SAMPLE_STATE, 00184 DDS::ANY_VIEW_STATE, 00185 DDS::ALIVE_INSTANCE_STATE); 00186 const DDS::WaitSet_var ws = new DDS::WaitSet; 00187 ws->attach_condition(rc); 00188 00189 DDS::ConditionSeq active; 00190 const DDS::Duration_t ddsTimeout = convertTimeout(timeout); 00191 DDS::ReturnCode_t ret = ws->wait(active, ddsTimeout); 00192 ws->detach_condition(rc); 00193 00194 if (ret == DDS::RETCODE_TIMEOUT) { 00195 typedReader->delete_readcondition(rc); 00196 return_code = update_status(connection_id, ret); 00197 return; 00198 } 00199 00200 typename DCPS::DDSTraits<Msg>::MessageSequenceType seq; 00201 DDS::SampleInfoSeq sinfo; 00202 ret = typedReader->take_w_condition(seq, sinfo, 1 /*max*/, rc); 00203 typedReader->delete_readcondition(rc); 00204 if (ret == DDS::RETCODE_OK && sinfo[0].valid_data) { 00205 DDS::Subscriber_var subscriber = typedReader->get_subscriber(); 00206 DDS::DomainParticipant_var participant = subscriber->get_participant(); 00207 FACE::RETURN_CODE_TYPE ret_code; 00208 populate_header_received(connection_id, participant, sinfo[0], ret_code); 00209 if (ret_code != FACE::RC_NO_ERROR) { 00210 return_code = update_status(connection_id, ret_code); 00211 return; 00212 } 00213 00214 transaction_id = ++readers[connection_id]->last_msg_tid; 00215 00216 message = seq[0]; 00217 return_code = update_status(connection_id, ret); 00218 return; 00219 } 00220 return_code = update_status(connection_id, DDS::RETCODE_NO_DATA); 00221 } catch (const CORBA::BAD_PARAM&) { 00222 if (OpenDDS::DCPS::DCPS_debug_level) { 00223 ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: receive_message - INVALID_PARAM\n")); 00224 } 00225 return_code = FACE::INVALID_PARAM; 00226 } 00227 }
void OpenDDS::FaceTSS::register_callback | ( | FACE::CONNECTION_ID_TYPE | connection_id, | |
const FACE::WAITSET_TYPE | , | |||
void(*)(FACE::TRANSACTION_ID_TYPE, Msg &, FACE::MESSAGE_TYPE_GUID, FACE::MESSAGE_SIZE_TYPE, const FACE::WAITSET_TYPE, FACE::RETURN_CODE_TYPE &) | callback, | |||
FACE::MESSAGE_SIZE_TYPE | max_message_size, | |||
FACE::RETURN_CODE_TYPE & | return_code | |||
) | [inline] |
Definition at line 363 of file FaceTSS.h.
References OpenDDS::FaceTSS::Listener< Msg >::add_callback(), DDS::DATA_AVAILABLE_STATUS, OpenDDS::FaceTSS::Entities::instance(), LM_ERROR, OpenDDS::FaceTSS::Entities::receivers_, status, and OpenDDS::FaceTSS::Entities::DDSAdapter::status_valid.
00372 { 00373 Entities::ConnIdToReceiverMap& readers = Entities::instance()->receivers_; 00374 if (!readers.count(connection_id)) { 00375 return_code = FACE::INVALID_PARAM; 00376 return; 00377 } 00378 if(!Entities::instance()->connections_.count(connection_id)) { 00379 return_code = FACE::INVALID_PARAM; 00380 return; 00381 } 00382 FACE::TRANSPORT_CONNECTION_STATUS_TYPE status = 00383 Entities::instance()->connections_[connection_id].connection_status; 00384 if (max_message_size < status.MAX_MESSAGE_SIZE) { 00385 return_code = FACE::INVALID_PARAM; 00386 return; 00387 } 00388 DDS::DataReaderListener_var existing_listener = readers[connection_id]->dr->get_listener(); 00389 if (existing_listener.in()) { 00390 Listener<Msg>* typedListener = dynamic_cast<Listener<Msg>*>(existing_listener.in()); 00391 if (typedListener) { 00392 typedListener->add_callback(callback); 00393 } else { 00394 ACE_ERROR((LM_ERROR, "ERROR: register_callback - failed to obtain typed listener\n")); 00395 return_code = FACE::INVALID_PARAM; 00396 return; 00397 } 00398 } else { 00399 DDS::DataReaderListener_var listener = new Listener<Msg>(callback, connection_id); 00400 readers[connection_id]->dr->set_listener(listener, DDS::DATA_AVAILABLE_STATUS); 00401 } 00402 if (readers[connection_id]->status_valid != FACE::VALID) { 00403 Entities::FaceReceiver* tmp = readers[connection_id]; 00404 readers[connection_id] = new Entities::DDSTypedAdapter<Msg>(*readers[connection_id]); 00405 delete tmp; 00406 } 00407 readers[connection_id]->status_valid = FACE::VALID; 00408 00409 return_code = FACE::RC_NO_ERROR; 00410 }
void OpenDDS::FaceTSS::send_message | ( | FACE::CONNECTION_ID_TYPE | connection_id, | |
FACE::TIMEOUT_TYPE | timeout, | |||
FACE::TRANSACTION_ID_TYPE & | , | |||
const Msg & | message, | |||
FACE::MESSAGE_SIZE_TYPE | message_size, | |||
FACE::RETURN_CODE_TYPE & | return_code | |||
) | [inline] |
Definition at line 230 of file FaceTSS.h.
References convertDuration(), DataWriter, DDS::HANDLE_NIL, OpenDDS::FaceTSS::Entities::instance(), DDS::DataWriterQos::reliability, DDS::RELIABLE_RELIABILITY_QOS, DDS::RETCODE_BAD_PARAMETER, OpenDDS::FaceTSS::Entities::senders_, status, and update_status().
00236 { 00237 if(!Entities::instance()->connections_.count(connection_id)) { 00238 return_code = FACE::INVALID_PARAM; 00239 return; 00240 } 00241 FACE::TRANSPORT_CONNECTION_STATUS_TYPE status = 00242 Entities::instance()->connections_[connection_id].connection_status; 00243 if (message_size < status.MAX_MESSAGE_SIZE) { 00244 return_code = FACE::INVALID_PARAM; 00245 return; 00246 } 00247 Entities::ConnIdToSenderMap& writers = Entities::instance()->senders_; 00248 if (!writers.count(connection_id)) { 00249 return_code = FACE::INVALID_PARAM; 00250 return; 00251 } 00252 00253 typedef typename DCPS::DDSTraits<Msg>::DataWriterType DataWriter; 00254 const typename DataWriter::_var_type typedWriter = 00255 DataWriter::_narrow(writers[connection_id].dw); 00256 if (!typedWriter) { 00257 return_code = update_status(connection_id, DDS::RETCODE_BAD_PARAMETER); 00258 return; 00259 } 00260 writers[connection_id].status_valid = FACE::VALID; 00261 00262 DDS::DataWriterQos dw_qos; 00263 typedWriter->get_qos(dw_qos); 00264 FACE::SYSTEM_TIME_TYPE max_blocking_time = convertDuration(dw_qos.reliability.max_blocking_time); 00265 if (dw_qos.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS && 00266 timeout != FACE::INF_TIME_VALUE && 00267 ((max_blocking_time == FACE::INF_TIME_VALUE) || (timeout < max_blocking_time))) { 00268 return_code = update_status(connection_id, DDS::RETCODE_BAD_PARAMETER); 00269 return; 00270 } 00271 00272 return_code = update_status(connection_id, typedWriter->write(message, DDS::HANDLE_NIL)); 00273 }
OpenDDS_FACE_Export FACE::RETURN_CODE_TYPE OpenDDS::FaceTSS::update_status | ( | FACE::CONNECTION_ID_TYPE | connection_id, | |
DDS::ReturnCode_t | retcode | |||
) |
Referenced by OpenDDS::FaceTSS::Listener< Msg >::on_data_available(), FACE::TS::receive_header(), receive_message(), and send_message().