Classes | |
class | Entities |
class | Listener |
Namespaces | |
namespace | config |
Enumerations | |
enum | { NSEC_PER_SEC = 1000000000 } |
Functions | |
FACE::RETURN_CODE_TYPE | update_status (FACE::CONNECTION_ID_TYPE connection_id, DDS::ReturnCode_t retcode) |
DDS::Duration_t | convertTimeout (FACE::TIMEOUT_TYPE timeout) |
FACE::SYSTEM_TIME_TYPE | convertDuration (const DDS::Duration_t &duration) |
FACE::SYSTEM_TIME_TYPE | convertTime (const DDS::Time_t ×tamp) |
FACE::MESSAGE_INSTANCE_GUID | create_message_instance_guid (const OpenDDS::DCPS::RepoId &pub, const CORBA::LongLong &orig_seq) |
void | populate_header_received (const FACE::CONNECTION_ID_TYPE &connection_id, const DDS::DomainParticipant_var part, const DDS::SampleInfo &sinfo, FACE::RETURN_CODE_TYPE &return_code) |
template<typename Msg> | |
void | receive_message (FACE::CONNECTION_ID_TYPE connection_id, FACE::TIMEOUT_TYPE timeout, FACE::TRANSACTION_ID_TYPE &transaction_id, Msg &message, FACE::MESSAGE_SIZE_TYPE message_size, FACE::RETURN_CODE_TYPE &return_code) |
template<typename Msg> | |
void | send_message (FACE::CONNECTION_ID_TYPE connection_id, FACE::TIMEOUT_TYPE timeout, FACE::TRANSACTION_ID_TYPE &, const Msg &message, FACE::MESSAGE_SIZE_TYPE message_size, FACE::RETURN_CODE_TYPE &return_code) |
template<typename Msg> | |
void | register_callback (FACE::CONNECTION_ID_TYPE connection_id, const FACE::WAITSET_TYPE, void(*callback)(FACE::TRANSACTION_ID_TYPE, Msg &, FACE::MESSAGE_TYPE_GUID, FACE::MESSAGE_SIZE_TYPE, const FACE::WAITSET_TYPE, FACE::RETURN_CODE_TYPE &), FACE::MESSAGE_SIZE_TYPE max_message_size, FACE::RETURN_CODE_TYPE &return_code) |
anonymous enum |
OpenDDS_FACE_Export FACE::RETURN_CODE_TYPE OpenDDS::FaceTSS::update_status | ( | FACE::CONNECTION_ID_TYPE | connection_id, | |
DDS::ReturnCode_t | retcode | |||
) |
Definition at line 664 of file FaceTSS.cpp.
References OpenDDS::FaceTSS::Entities::instance(), DDS::RETCODE_ALREADY_DELETED, DDS::RETCODE_BAD_PARAMETER, DDS::RETCODE_ERROR, DDS::RETCODE_ILLEGAL_OPERATION, DDS::RETCODE_IMMUTABLE_POLICY, DDS::RETCODE_INCONSISTENT_POLICY, DDS::RETCODE_NO_DATA, DDS::RETCODE_NOT_ENABLED, DDS::RETCODE_OK, DDS::RETCODE_OUT_OF_RESOURCES, DDS::RETCODE_PRECONDITION_NOT_MET, DDS::RETCODE_TIMEOUT, and DDS::RETCODE_UNSUPPORTED.
Referenced by OpenDDS::FaceTSS::Listener< Msg >::on_data_available(), FACE::TS::receive_header(), receive_message(), and send_message().
00666 { 00667 FACE::TRANSPORT_CONNECTION_STATUS_TYPE& status = 00668 Entities::instance()->connections_[connection_id].connection_status; 00669 FACE::RETURN_CODE_TYPE rc = FACE::INVALID_PARAM; 00670 00671 switch (retcode) { 00672 case DDS::RETCODE_OK: 00673 status.LAST_MSG_VALIDITY = FACE::VALID; 00674 return FACE::RC_NO_ERROR; 00675 00676 case DDS::RETCODE_ERROR: 00677 rc = FACE::CONNECTION_CLOSED; break; 00678 00679 case DDS::RETCODE_BAD_PARAMETER: 00680 rc = FACE::INVALID_PARAM; break; 00681 00682 case DDS::RETCODE_OUT_OF_RESOURCES: 00683 rc = FACE::DATA_BUFFER_TOO_SMALL; break; 00684 00685 case DDS::RETCODE_PRECONDITION_NOT_MET: 00686 case DDS::RETCODE_NOT_ENABLED: 00687 rc = FACE::INVALID_MODE; break; 00688 00689 case DDS::RETCODE_IMMUTABLE_POLICY: 00690 case DDS::RETCODE_INCONSISTENT_POLICY: 00691 rc = FACE::INVALID_CONFIG; break; 00692 00693 case DDS::RETCODE_ALREADY_DELETED: 00694 rc = FACE::CONNECTION_CLOSED; break; 00695 00696 case DDS::RETCODE_TIMEOUT: 00697 rc = FACE::TIMED_OUT; break; 00698 00699 case DDS::RETCODE_UNSUPPORTED: 00700 case DDS::RETCODE_NO_DATA: 00701 rc = FACE::NOT_AVAILABLE; break; 00702 00703 case DDS::RETCODE_ILLEGAL_OPERATION: 00704 rc = FACE::PERMISSION_DENIED; break; 00705 } 00706 00707 status.LAST_MSG_VALIDITY = FACE::INVALID; 00708 return rc; 00709 }
OpenDDS_FACE_Export DDS::Duration_t OpenDDS::FaceTSS::convertTimeout | ( | FACE::TIMEOUT_TYPE | timeout | ) |
Definition at line 713 of file FaceTSS.cpp.
References DDS::DURATION_INFINITE_NSEC, DDS::DURATION_INFINITE_SEC, and NSEC_PER_SEC.
Referenced by receive_message().
00714 { 00715 if (timeout == FACE::INF_TIME_VALUE) { 00716 static const DDS::Duration_t dds_inf = {DDS::DURATION_INFINITE_SEC, 00717 DDS::DURATION_INFINITE_NSEC}; 00718 return dds_inf; 00719 } 00720 00721 DDS::Duration_t dur = {static_cast<int>(timeout / NSEC_PER_SEC), 00722 static_cast<unsigned int>(timeout % NSEC_PER_SEC)}; 00723 return dur; 00724 }
OpenDDS_FACE_Export FACE::SYSTEM_TIME_TYPE OpenDDS::FaceTSS::convertDuration | ( | const DDS::Duration_t & | duration | ) |
Definition at line 726 of file FaceTSS.cpp.
References DDS::DURATION_INFINITE_NSEC, DDS::DURATION_INFINITE_SEC, DDS::Duration_t::nanosec, NSEC_PER_SEC, and DDS::Duration_t::sec.
Referenced by FACE::TS::Create_Connection(), and send_message().
00727 { 00728 if (duration.sec == DDS::DURATION_INFINITE_SEC 00729 && duration.nanosec == DDS::DURATION_INFINITE_NSEC) { 00730 return FACE::INF_TIME_VALUE; 00731 } 00732 return duration.nanosec + 00733 duration.sec * static_cast<FACE::SYSTEM_TIME_TYPE>(NSEC_PER_SEC); 00734 }
OpenDDS_FACE_Export FACE::SYSTEM_TIME_TYPE OpenDDS::FaceTSS::convertTime | ( | const DDS::Time_t & | timestamp | ) |
Definition at line 736 of file FaceTSS.cpp.
References DDS::Time_t::nanosec, NSEC_PER_SEC, and DDS::Time_t::sec.
Referenced by populate_header_received().
00737 { 00738 return timestamp.nanosec + 00739 timestamp.sec * static_cast<FACE::SYSTEM_TIME_TYPE>(NSEC_PER_SEC); 00740 }
OpenDDS_FACE_Export FACE::MESSAGE_INSTANCE_GUID OpenDDS::FaceTSS::create_message_instance_guid | ( | const OpenDDS::DCPS::RepoId & | pub, | |
const CORBA::LongLong & | orig_seq | |||
) |
Definition at line 743 of file FaceTSS.cpp.
Referenced by populate_header_received().
00744 { 00745 OpenDDS::DCPS::GuidConverter writer(pub); 00746 00747 FACE::MESSAGE_INSTANCE_GUID message_instance_guid; 00748 FACE::LongLong mig_low; 00749 FACE::LongLong masked_seq; 00750 00751 //Until MESSAGE_INSTANCE_GUID becomes 128 bit GUID, use checksum to represent Prefix 00752 FACE::Long prefix_representation = ACE::crc32(reinterpret_cast<const void*>(&pub), sizeof(pub)); 00753 masked_seq = orig_seq >> 32; 00754 00755 if (masked_seq) { 00756 ACE_DEBUG((LM_WARNING, "(%P|%t) WARNING: create_message_instance_guid - seq does not fit in FACE::Long, truncating high bits to fit\n")); 00757 } 00758 mig_low = orig_seq & 0xFFFFFFFF; 00759 message_instance_guid = (((FACE::LongLong) prefix_representation) << 32 ) | ((FACE::LongLong) mig_low); 00760 /* 00761 //TODO: This is initial work toward defining how a 128 bit guid could be created 00762 //in the future for a Message Instance Guid when supported. 00763 // 13 byte prefix contains identifying pieces of guid 00764 // 3 byte seq - truncated sequence from the sample 00765 typedef CORBA::Octet MsgInstGuidPrefix_t[13]; 00766 typedef CORBA::Octet MsgInstGuidSeq_t[3]; 00767 MsgInstGuidPrefix_t migPrefix; 00768 MsgInstGuidSeq_t migSeq; 00769 ACE_OS::memcpy(&migPrefix[0], &pub.guidPrefix[2], 10); 00770 ACE_OS::memcpy(&migPrefix[10], &pub.entityId.entityKey[0], 3); 00771 masked_seq = orig_seq >> 24; 00772 00773 if (masked_seq) { 00774 ACE_DEBUG((LM_WARNING, "(%P|%t) WARNING: create_message_instance_guid - seq does not fit in 3 bytes, truncating high bits to fit\n")); 00775 } 00776 FACE::LongLong masked = orig_seq & 0xFFFFFF; 00777 00778 #ifdef ACE_BIG_ENDIAN 00779 masked <<= 8 * (sizeof(FACE::LongLong)-3); 00780 #endif 00781 ACE_OS::memcpy(&migSeq[0], &masked, 3); 00782 */ 00783 return message_instance_guid; 00784 }
OpenDDS_FACE_Export void OpenDDS::FaceTSS::populate_header_received | ( | const FACE::CONNECTION_ID_TYPE & | connection_id, | |
const DDS::DomainParticipant_var | part, | |||
const DDS::SampleInfo & | sinfo, | |||
FACE::RETURN_CODE_TYPE & | return_code | |||
) |
Definition at line 786 of file FaceTSS.cpp.
References DDS::ALIVE_INSTANCE_STATE, DDS::ANY_SAMPLE_STATE, DDS::ANY_VIEW_STATE, OpenDDS::DCPS::BUILT_IN_PUBLICATION_TOPIC, convertTime(), create_message_instance_guid(), OpenDDS::DCPS::DCPS_debug_level, DDS::DURATION_INFINITE_NSEC, DDS::DURATION_INFINITE_SEC, header, OpenDDS::FaceTSS::Entities::instance(), DDS::Duration_t::nanosec, DDS::SampleInfo::opendds_reserved_publication_seq, DDS::SampleInfo::publication_handle, OpenDDS::FaceTSS::Entities::receivers_, DDS::RETCODE_NO_DATA, DDS::RETCODE_OK, DDS::Duration_t::sec, DDS::SampleInfo::source_timestamp, OpenDDS::DCPS::time_to_time_value(), and OpenDDS::DCPS::time_value_to_time().
Referenced by OpenDDS::FaceTSS::Listener< Msg >::on_data_available(), and receive_message().
00790 { 00791 Entities::ConnIdToReceiverMap& readers = Entities::instance()->receivers_; 00792 if (!readers.count(connection_id)) { 00793 return_code = FACE::INVALID_PARAM; 00794 return; 00795 } 00796 FACE::TS::MessageHeader& header = readers[connection_id]->last_msg_header; 00797 00798 header.platform_view_guid = Entities::instance()->connections_[connection_id].platform_view_guid; 00799 00800 DDS::Subscriber_var temp_sub = readers[connection_id]->dr->get_subscriber(); 00801 DDS::DomainParticipant_var temp_dp = temp_sub->get_participant(); 00802 const OpenDDS::DCPS::RepoId pub = dynamic_cast<OpenDDS::DCPS::DomainParticipantImpl*>(temp_dp.in())->get_repoid(sinfo.publication_handle); 00803 header.message_instance_guid = create_message_instance_guid(pub, sinfo.opendds_reserved_publication_seq); 00804 00805 header.message_timestamp = convertTime(sinfo.source_timestamp); 00806 ACE_Time_Value now(ACE_OS::gettimeofday()); 00807 00808 readers[connection_id]->sum_recvd_msgs_latency += (convertTime(OpenDDS::DCPS::time_value_to_time(now)) - header.message_timestamp); 00809 ++readers[connection_id]->total_msgs_recvd; 00810 00811 if (OpenDDS::DCPS::DCPS_debug_level > 8) { 00812 ACE_DEBUG((LM_DEBUG, "(%P|%t) populate_header_received: Latency is now (tot_latency %d / tot_msgs_recvd %d): %d\n", 00813 readers[connection_id]->sum_recvd_msgs_latency, 00814 readers[connection_id]->total_msgs_recvd, 00815 readers[connection_id]->sum_recvd_msgs_latency/readers[connection_id]->total_msgs_recvd)); 00816 } 00817 ::DDS::Subscriber_var bit_subscriber 00818 = part->get_builtin_subscriber () ; 00819 00820 ::DDS::DataReader_var reader 00821 = bit_subscriber->lookup_datareader (OpenDDS::DCPS::BUILT_IN_PUBLICATION_TOPIC) ; 00822 ::DDS::PublicationBuiltinTopicDataDataReader_var pub_reader 00823 = ::DDS::PublicationBuiltinTopicDataDataReader::_narrow (reader.in ()); 00824 if (CORBA::is_nil (pub_reader.in ())) { 00825 ACE_ERROR((LM_ERROR, "(%P|%t) populate_header_received: failed to get BUILT_IN_PUBLICATION_TOPIC datareader.\n")); 00826 return_code = FACE::NOT_AVAILABLE; 00827 return; 00828 } 00829 00830 ::DDS::ReturnCode_t ret; 00831 ::DDS::SampleInfoSeq pubinfos(1); 00832 ::DDS::PublicationBuiltinTopicDataSeq pubdata(1); 00833 ret = pub_reader->read_instance(pubdata, 00834 pubinfos, 00835 1, 00836 sinfo.publication_handle, 00837 ::DDS::ANY_SAMPLE_STATE, 00838 ::DDS::ANY_VIEW_STATE, 00839 ::DDS::ALIVE_INSTANCE_STATE); 00840 00841 if (ret != ::DDS::RETCODE_OK && ret != ::DDS::RETCODE_NO_DATA) { 00842 ACE_ERROR((LM_ERROR, 00843 "(%P|%t) populate_header_received: failed to read BIT publication data.\n")); 00844 return_code = FACE::NOT_AVAILABLE; 00845 return; 00846 } 00847 00848 const CORBA::ULong i = 0; 00849 00850 header.message_source_guid = 00851 (pubdata[i].user_data.value[0] << 0) | 00852 (pubdata[i].user_data.value[1] << 8) | 00853 (pubdata[i].user_data.value[2] << 16); 00854 00855 // ACE_DEBUG((LM_DEBUG, "(%P|%t) populate_header_received: DW lifespan qos value: sec: %d nanosec: %d\n", 00856 // pubdata[i].lifespan.duration.sec, pubdata[i].lifespan.duration.nanosec)); 00857 00858 DDS::Duration_t lifespan = pubdata[i].lifespan.duration; 00859 if (lifespan.sec != DDS::DURATION_INFINITE_SEC && 00860 lifespan.nanosec != DDS::DURATION_INFINITE_NSEC) { 00861 // Finite lifespan. Check if data has expired. 00862 00863 DDS::Time_t const tmp = { 00864 sinfo.source_timestamp.sec + lifespan.sec, 00865 sinfo.source_timestamp.nanosec + lifespan.nanosec 00866 }; 00867 00868 // We assume that the publisher host's clock and subscriber host's 00869 // clock are synchronized (allowed by the spec). 00870 ACE_Time_Value const expiration_time( 00871 OpenDDS::DCPS::time_to_time_value(tmp)); 00872 00873 if (now >= expiration_time) { 00874 // ACE_DEBUG((LM_DEBUG, "(%P|%t) populate_header_received: Last message expired, setting message_validity to INVALID\n")); 00875 header.message_validity = FACE::INVALID; 00876 return_code = FACE::RC_NO_ERROR; 00877 return; 00878 } 00879 } 00880 // ACE_DEBUG((LM_DEBUG, "(%P|%t) populate_header_received: Setting message_validity to VALID\n")); 00881 header.message_validity = FACE::VALID; 00882 return_code = FACE::RC_NO_ERROR; 00883 }
void OpenDDS::FaceTSS::receive_message | ( | FACE::CONNECTION_ID_TYPE | connection_id, | |
FACE::TIMEOUT_TYPE | timeout, | |||
FACE::TRANSACTION_ID_TYPE & | transaction_id, | |||
Msg & | message, | |||
FACE::MESSAGE_SIZE_TYPE | message_size, | |||
FACE::RETURN_CODE_TYPE & | return_code | |||
) |
Definition at line 143 of file FaceTSS.h.
References DDS::ALIVE_INSTANCE_STATE, DDS::ANY_SAMPLE_STATE, DDS::ANY_VIEW_STATE, convertTimeout(), DataReader, OpenDDS::FaceTSS::Entities::instance(), populate_header_received(), OpenDDS::FaceTSS::Entities::receivers_, DDS::RETCODE_BAD_PARAMETER, DDS::RETCODE_NO_DATA, DDS::RETCODE_OK, DDS::RETCODE_TIMEOUT, OpenDDS::FaceTSS::Entities::DDSAdapter::status_valid, and update_status().
00149 { 00150 Entities::ConnIdToReceiverMap& readers = Entities::instance()->receivers_; 00151 if (!readers.count(connection_id)) { 00152 return_code = FACE::INVALID_PARAM; 00153 return; 00154 } 00155 if(!Entities::instance()->connections_.count(connection_id)) { 00156 return_code = FACE::INVALID_PARAM; 00157 return; 00158 } 00159 FACE::TRANSPORT_CONNECTION_STATUS_TYPE status = 00160 Entities::instance()->connections_[connection_id].connection_status; 00161 if (message_size < status.MAX_MESSAGE_SIZE) { 00162 return_code = FACE::INVALID_PARAM; 00163 return; 00164 } 00165 typedef typename DCPS::DDSTraits<Msg>::DataReaderType DataReader; 00166 const typename DataReader::_var_type typedReader = 00167 DataReader::_narrow(readers[connection_id]->dr); 00168 if (!typedReader) { 00169 return_code = update_status(connection_id, DDS::RETCODE_BAD_PARAMETER); 00170 return; 00171 } 00172 if (readers[connection_id]->status_valid != FACE::VALID) { 00173 Entities::FaceReceiver* tmp = readers[connection_id]; 00174 readers[connection_id] = new Entities::DDSTypedAdapter<Msg>(*readers[connection_id]); 00175 delete tmp; 00176 } 00177 readers[connection_id]->status_valid = FACE::VALID; 00178 00179 const DDS::ReadCondition_var rc = 00180 typedReader->create_readcondition(DDS::ANY_SAMPLE_STATE, 00181 DDS::ANY_VIEW_STATE, 00182 DDS::ALIVE_INSTANCE_STATE); 00183 const DDS::WaitSet_var ws = new DDS::WaitSet; 00184 ws->attach_condition(rc); 00185 00186 DDS::ConditionSeq active; 00187 const DDS::Duration_t ddsTimeout = convertTimeout(timeout); 00188 DDS::ReturnCode_t ret = ws->wait(active, ddsTimeout); 00189 ws->detach_condition(rc); 00190 00191 if (ret == DDS::RETCODE_TIMEOUT) { 00192 return_code = update_status(connection_id, ret); 00193 return; 00194 } 00195 00196 typename DCPS::DDSTraits<Msg>::MessageSequenceType seq; 00197 DDS::SampleInfoSeq sinfo; 00198 ret = typedReader->take_w_condition(seq, sinfo, 1 /*max*/, rc); 00199 if (ret == DDS::RETCODE_OK && sinfo[0].valid_data) { 00200 DDS::DomainParticipant_var participant = typedReader->get_subscriber()->get_participant(); 00201 FACE::RETURN_CODE_TYPE ret_code; 00202 populate_header_received(connection_id, participant, sinfo[0], ret_code); 00203 if (ret_code != FACE::RC_NO_ERROR) { 00204 return_code = update_status(connection_id, ret_code); 00205 return; 00206 } 00207 00208 transaction_id = ++readers[connection_id]->last_msg_tid; 00209 00210 message = seq[0]; 00211 return_code = update_status(connection_id, ret); 00212 return; 00213 } 00214 return_code = update_status(connection_id, DDS::RETCODE_NO_DATA); 00215 }
void OpenDDS::FaceTSS::send_message | ( | FACE::CONNECTION_ID_TYPE | connection_id, | |
FACE::TIMEOUT_TYPE | timeout, | |||
FACE::TRANSACTION_ID_TYPE & | , | |||
const Msg & | message, | |||
FACE::MESSAGE_SIZE_TYPE | message_size, | |||
FACE::RETURN_CODE_TYPE & | return_code | |||
) |
Definition at line 218 of file FaceTSS.h.
References convertDuration(), DataWriter, DDS::HANDLE_NIL, OpenDDS::FaceTSS::Entities::instance(), DDS::DataWriterQos::reliability, DDS::RELIABLE_RELIABILITY_QOS, DDS::RETCODE_BAD_PARAMETER, OpenDDS::FaceTSS::Entities::senders_, and update_status().
00224 { 00225 if(!Entities::instance()->connections_.count(connection_id)) { 00226 return_code = FACE::INVALID_PARAM; 00227 return; 00228 } 00229 FACE::TRANSPORT_CONNECTION_STATUS_TYPE status = 00230 Entities::instance()->connections_[connection_id].connection_status; 00231 if (message_size < status.MAX_MESSAGE_SIZE) { 00232 return_code = FACE::INVALID_PARAM; 00233 return; 00234 } 00235 Entities::ConnIdToSenderMap& writers = Entities::instance()->senders_; 00236 if (!writers.count(connection_id)) { 00237 return_code = FACE::INVALID_PARAM; 00238 return; 00239 } 00240 00241 typedef typename DCPS::DDSTraits<Msg>::DataWriterType DataWriter; 00242 const typename DataWriter::_var_type typedWriter = 00243 DataWriter::_narrow(writers[connection_id].dw); 00244 if (!typedWriter) { 00245 return_code = update_status(connection_id, DDS::RETCODE_BAD_PARAMETER); 00246 return; 00247 } 00248 writers[connection_id].status_valid = FACE::VALID; 00249 00250 DDS::DataWriterQos dw_qos; 00251 typedWriter->get_qos(dw_qos); 00252 FACE::SYSTEM_TIME_TYPE max_blocking_time = convertDuration(dw_qos.reliability.max_blocking_time); 00253 if (dw_qos.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS && 00254 timeout != FACE::INF_TIME_VALUE && 00255 ((max_blocking_time == FACE::INF_TIME_VALUE) || (timeout < max_blocking_time))) { 00256 return_code = update_status(connection_id, DDS::RETCODE_BAD_PARAMETER); 00257 return; 00258 } 00259 00260 return_code = update_status(connection_id, typedWriter->write(message, DDS::HANDLE_NIL)); 00261 }
void OpenDDS::FaceTSS::register_callback | ( | FACE::CONNECTION_ID_TYPE | connection_id, | |
const FACE::WAITSET_TYPE | , | |||
void(*)(FACE::TRANSACTION_ID_TYPE, Msg &, FACE::MESSAGE_TYPE_GUID, FACE::MESSAGE_SIZE_TYPE, const FACE::WAITSET_TYPE, FACE::RETURN_CODE_TYPE &) | callback, | |||
FACE::MESSAGE_SIZE_TYPE | max_message_size, | |||
FACE::RETURN_CODE_TYPE & | return_code | |||
) |
Definition at line 350 of file FaceTSS.h.
References OpenDDS::FaceTSS::Listener< Msg >::add_callback(), DDS::DATA_AVAILABLE_STATUS, OpenDDS::FaceTSS::Entities::instance(), OpenDDS::FaceTSS::Entities::receivers_, and OpenDDS::FaceTSS::Entities::DDSAdapter::status_valid.
00359 { 00360 Entities::ConnIdToReceiverMap& readers = Entities::instance()->receivers_; 00361 if (!readers.count(connection_id)) { 00362 return_code = FACE::INVALID_PARAM; 00363 return; 00364 } 00365 if(!Entities::instance()->connections_.count(connection_id)) { 00366 return_code = FACE::INVALID_PARAM; 00367 return; 00368 } 00369 FACE::TRANSPORT_CONNECTION_STATUS_TYPE status = 00370 Entities::instance()->connections_[connection_id].connection_status; 00371 if (max_message_size < status.MAX_MESSAGE_SIZE) { 00372 return_code = FACE::INVALID_PARAM; 00373 return; 00374 } 00375 DDS::DataReaderListener_ptr existing_listener = readers[connection_id]->dr->get_listener(); 00376 if (existing_listener) { 00377 Listener<Msg>* typedListener = dynamic_cast<Listener<Msg>*>(existing_listener); 00378 typedListener->add_callback(callback); 00379 } else { 00380 DDS::DataReaderListener_var listener = new Listener<Msg>(callback, connection_id); 00381 readers[connection_id]->dr->set_listener(listener, DDS::DATA_AVAILABLE_STATUS); 00382 } 00383 if (readers[connection_id]->status_valid != FACE::VALID) { 00384 Entities::FaceReceiver* tmp = readers[connection_id]; 00385 readers[connection_id] = new Entities::DDSTypedAdapter<Msg>(*readers[connection_id]); 00386 delete tmp; 00387 } 00388 readers[connection_id]->status_valid = FACE::VALID; 00389 00390 return_code = FACE::RC_NO_ERROR; 00391 }