OpenDDS::FaceTSS Namespace Reference


Classes

class  Entities
class  Listener

Namespaces

namespace  config

Enumerations

enum  { NSEC_PER_SEC = 1000000000 }

Functions

FACE::RETURN_CODE_TYPE update_status (FACE::CONNECTION_ID_TYPE connection_id, DDS::ReturnCode_t retcode)
DDS::Duration_t convertTimeout (FACE::TIMEOUT_TYPE timeout)
FACE::SYSTEM_TIME_TYPE convertDuration (const DDS::Duration_t &duration)
FACE::SYSTEM_TIME_TYPE convertTime (const DDS::Time_t &timestamp)
FACE::MESSAGE_INSTANCE_GUID create_message_instance_guid (const OpenDDS::DCPS::RepoId &pub, const CORBA::LongLong &orig_seq)
void populate_header_received (const FACE::CONNECTION_ID_TYPE &connection_id, const DDS::DomainParticipant_var part, const DDS::SampleInfo &sinfo, FACE::RETURN_CODE_TYPE &return_code)
template<typename Msg>
void receive_message (FACE::CONNECTION_ID_TYPE connection_id, FACE::TIMEOUT_TYPE timeout, FACE::TRANSACTION_ID_TYPE &transaction_id, Msg &message, FACE::MESSAGE_SIZE_TYPE message_size, FACE::RETURN_CODE_TYPE &return_code)
template<typename Msg>
void send_message (FACE::CONNECTION_ID_TYPE connection_id, FACE::TIMEOUT_TYPE timeout, FACE::TRANSACTION_ID_TYPE &, const Msg &message, FACE::MESSAGE_SIZE_TYPE message_size, FACE::RETURN_CODE_TYPE &return_code)
template<typename Msg>
void register_callback (FACE::CONNECTION_ID_TYPE connection_id, const FACE::WAITSET_TYPE, void(*callback)(FACE::TRANSACTION_ID_TYPE, Msg &, FACE::MESSAGE_TYPE_GUID, FACE::MESSAGE_SIZE_TYPE, const FACE::WAITSET_TYPE, FACE::RETURN_CODE_TYPE &), FACE::MESSAGE_SIZE_TYPE max_message_size, FACE::RETURN_CODE_TYPE &return_code)


Enumeration Type Documentation

anonymous enum

Enumerator:
NSEC_PER_SEC 

Definition at line 711 of file FaceTSS.cpp.

00711 { NSEC_PER_SEC = 1000000000 };


Function Documentation

OpenDDS_FACE_Export FACE::RETURN_CODE_TYPE OpenDDS::FaceTSS::update_status ( FACE::CONNECTION_ID_TYPE  connection_id,
DDS::ReturnCode_t  retcode 
)

Definition at line 664 of file FaceTSS.cpp.

References OpenDDS::FaceTSS::Entities::instance(), DDS::RETCODE_ALREADY_DELETED, DDS::RETCODE_BAD_PARAMETER, DDS::RETCODE_ERROR, DDS::RETCODE_ILLEGAL_OPERATION, DDS::RETCODE_IMMUTABLE_POLICY, DDS::RETCODE_INCONSISTENT_POLICY, DDS::RETCODE_NO_DATA, DDS::RETCODE_NOT_ENABLED, DDS::RETCODE_OK, DDS::RETCODE_OUT_OF_RESOURCES, DDS::RETCODE_PRECONDITION_NOT_MET, DDS::RETCODE_TIMEOUT, and DDS::RETCODE_UNSUPPORTED.

Referenced by OpenDDS::FaceTSS::Listener< Msg >::on_data_available(), FACE::TS::receive_header(), receive_message(), and send_message().

00666 {
00667   FACE::TRANSPORT_CONNECTION_STATUS_TYPE& status =
00668     Entities::instance()->connections_[connection_id].connection_status;
00669   FACE::RETURN_CODE_TYPE rc = FACE::INVALID_PARAM;
00670 
00671   switch (retcode) {
00672   case DDS::RETCODE_OK:
00673     status.LAST_MSG_VALIDITY = FACE::VALID;
00674     return FACE::RC_NO_ERROR;
00675 
00676   case DDS::RETCODE_ERROR:
00677     rc = FACE::CONNECTION_CLOSED; break;
00678 
00679   case DDS::RETCODE_BAD_PARAMETER:
00680     rc = FACE::INVALID_PARAM; break;
00681 
00682   case DDS::RETCODE_OUT_OF_RESOURCES:
00683     rc = FACE::DATA_BUFFER_TOO_SMALL; break;
00684 
00685   case DDS::RETCODE_PRECONDITION_NOT_MET:
00686   case DDS::RETCODE_NOT_ENABLED:
00687     rc = FACE::INVALID_MODE; break;
00688 
00689   case DDS::RETCODE_IMMUTABLE_POLICY:
00690   case DDS::RETCODE_INCONSISTENT_POLICY:
00691     rc = FACE::INVALID_CONFIG; break;
00692 
00693   case DDS::RETCODE_ALREADY_DELETED:
00694     rc = FACE::CONNECTION_CLOSED; break;
00695 
00696   case DDS::RETCODE_TIMEOUT:
00697     rc = FACE::TIMED_OUT; break;
00698 
00699   case DDS::RETCODE_UNSUPPORTED:
00700   case DDS::RETCODE_NO_DATA:
00701     rc = FACE::NOT_AVAILABLE; break;
00702 
00703   case DDS::RETCODE_ILLEGAL_OPERATION:
00704     rc = FACE::PERMISSION_DENIED; break;
00705   }
00706 
00707   status.LAST_MSG_VALIDITY = FACE::INVALID;
00708   return rc;
00709 }

OpenDDS_FACE_Export DDS::Duration_t OpenDDS::FaceTSS::convertTimeout ( FACE::TIMEOUT_TYPE  timeout  ) 

Definition at line 713 of file FaceTSS.cpp.

References DDS::DURATION_INFINITE_NSEC, DDS::DURATION_INFINITE_SEC, and NSEC_PER_SEC.

Referenced by receive_message().

00714 {
00715   if (timeout == FACE::INF_TIME_VALUE) {
00716     static const DDS::Duration_t dds_inf = {DDS::DURATION_INFINITE_SEC,
00717                                             DDS::DURATION_INFINITE_NSEC};
00718     return dds_inf;
00719   }
00720 
00721   DDS::Duration_t dur = {static_cast<int>(timeout / NSEC_PER_SEC),
00722                          static_cast<unsigned int>(timeout % NSEC_PER_SEC)};
00723   return dur;
00724 }

OpenDDS_FACE_Export FACE::SYSTEM_TIME_TYPE OpenDDS::FaceTSS::convertDuration ( const DDS::Duration_t duration  ) 

Definition at line 726 of file FaceTSS.cpp.

References DDS::DURATION_INFINITE_NSEC, DDS::DURATION_INFINITE_SEC, DDS::Duration_t::nanosec, NSEC_PER_SEC, and DDS::Duration_t::sec.

Referenced by FACE::TS::Create_Connection(), and send_message().

00727 {
00728   if (duration.sec == DDS::DURATION_INFINITE_SEC
00729       && duration.nanosec == DDS::DURATION_INFINITE_NSEC) {
00730     return FACE::INF_TIME_VALUE;
00731   }
00732   return duration.nanosec +
00733     duration.sec * static_cast<FACE::SYSTEM_TIME_TYPE>(NSEC_PER_SEC);
00734 }

OpenDDS_FACE_Export FACE::SYSTEM_TIME_TYPE OpenDDS::FaceTSS::convertTime ( const DDS::Time_t timestamp  ) 

Definition at line 736 of file FaceTSS.cpp.

References DDS::Time_t::nanosec, NSEC_PER_SEC, and DDS::Time_t::sec.

Referenced by populate_header_received().

00737 {
00738   return timestamp.nanosec +
00739       timestamp.sec * static_cast<FACE::SYSTEM_TIME_TYPE>(NSEC_PER_SEC);
00740 }

OpenDDS_FACE_Export FACE::MESSAGE_INSTANCE_GUID OpenDDS::FaceTSS::create_message_instance_guid ( const OpenDDS::DCPS::RepoId pub,
const CORBA::LongLong &  orig_seq 
)

Definition at line 743 of file FaceTSS.cpp.

Referenced by populate_header_received().

00744 {
00745   OpenDDS::DCPS::GuidConverter writer(pub);
00746 
00747   FACE::MESSAGE_INSTANCE_GUID message_instance_guid;
00748   FACE::LongLong mig_low;
00749   FACE::LongLong masked_seq;
00750 
00751   //Until MESSAGE_INSTANCE_GUID becomes 128 bit GUID, use checksum to represent Prefix
00752   FACE::Long prefix_representation = ACE::crc32(reinterpret_cast<const void*>(&pub), sizeof(pub));
00753   masked_seq = orig_seq >> 32;
00754 
00755   if (masked_seq) {
00756     ACE_DEBUG((LM_WARNING, "(%P|%t) WARNING: create_message_instance_guid - seq does not fit in FACE::Long, truncating high bits to fit\n"));
00757   }
00758   mig_low = orig_seq & 0xFFFFFFFF;
00759   message_instance_guid = (((FACE::LongLong) prefix_representation) << 32 ) | ((FACE::LongLong) mig_low);
00760 /*
00761   //TODO: This is initial work toward defining how a 128 bit guid could be created
00762   //in the future for a Message Instance Guid when supported.
00763   // 13 byte prefix contains identifying pieces of guid
00764   // 3 byte seq - truncated sequence from the sample
00765   typedef CORBA::Octet MsgInstGuidPrefix_t[13];
00766   typedef CORBA::Octet MsgInstGuidSeq_t[3];
00767   MsgInstGuidPrefix_t migPrefix;
00768   MsgInstGuidSeq_t migSeq;
00769   ACE_OS::memcpy(&migPrefix[0], &pub.guidPrefix[2], 10);
00770   ACE_OS::memcpy(&migPrefix[10], &pub.entityId.entityKey[0], 3);
00771   masked_seq = orig_seq >> 24;
00772 
00773   if (masked_seq) {
00774     ACE_DEBUG((LM_WARNING, "(%P|%t) WARNING: create_message_instance_guid - seq does not fit in 3 bytes, truncating high bits to fit\n"));
00775   }
00776   FACE::LongLong masked = orig_seq & 0xFFFFFF;
00777 
00778 #ifdef ACE_BIG_ENDIAN
00779   masked <<= 8 * (sizeof(FACE::LongLong)-3);
00780 #endif
00781   ACE_OS::memcpy(&migSeq[0], &masked, 3);
00782 */
00783   return message_instance_guid;
00784 }

OpenDDS_FACE_Export void OpenDDS::FaceTSS::populate_header_received ( const FACE::CONNECTION_ID_TYPE &  connection_id,
const DDS::DomainParticipant_var  part,
const DDS::SampleInfo sinfo,
FACE::RETURN_CODE_TYPE &  return_code 
)

Definition at line 786 of file FaceTSS.cpp.

References DDS::ALIVE_INSTANCE_STATE, DDS::ANY_SAMPLE_STATE, DDS::ANY_VIEW_STATE, OpenDDS::DCPS::BUILT_IN_PUBLICATION_TOPIC, convertTime(), create_message_instance_guid(), OpenDDS::DCPS::DCPS_debug_level, DDS::DURATION_INFINITE_NSEC, DDS::DURATION_INFINITE_SEC, header, OpenDDS::FaceTSS::Entities::instance(), DDS::Duration_t::nanosec, DDS::SampleInfo::opendds_reserved_publication_seq, DDS::SampleInfo::publication_handle, OpenDDS::FaceTSS::Entities::receivers_, DDS::RETCODE_NO_DATA, DDS::RETCODE_OK, DDS::Duration_t::sec, DDS::SampleInfo::source_timestamp, OpenDDS::DCPS::time_to_time_value(), and OpenDDS::DCPS::time_value_to_time().

Referenced by OpenDDS::FaceTSS::Listener< Msg >::on_data_available(), and receive_message().

00790 {
00791   Entities::ConnIdToReceiverMap& readers = Entities::instance()->receivers_;
00792   if (!readers.count(connection_id)) {
00793     return_code = FACE::INVALID_PARAM;
00794     return;
00795   }
00796   FACE::TS::MessageHeader& header = readers[connection_id]->last_msg_header;
00797 
00798   header.platform_view_guid = Entities::instance()->connections_[connection_id].platform_view_guid;
00799 
00800   DDS::Subscriber_var temp_sub = readers[connection_id]->dr->get_subscriber();
00801   DDS::DomainParticipant_var temp_dp = temp_sub->get_participant();
00802   const OpenDDS::DCPS::RepoId pub = dynamic_cast<OpenDDS::DCPS::DomainParticipantImpl*>(temp_dp.in())->get_repoid(sinfo.publication_handle);
00803   header.message_instance_guid = create_message_instance_guid(pub, sinfo.opendds_reserved_publication_seq);
00804 
00805   header.message_timestamp = convertTime(sinfo.source_timestamp);
00806   ACE_Time_Value now(ACE_OS::gettimeofday());
00807 
00808   readers[connection_id]->sum_recvd_msgs_latency += (convertTime(OpenDDS::DCPS::time_value_to_time(now)) - header.message_timestamp);
00809   ++readers[connection_id]->total_msgs_recvd;
00810 
00811   if (OpenDDS::DCPS::DCPS_debug_level > 8) {
00812     ACE_DEBUG((LM_DEBUG, "(%P|%t) populate_header_received: Latency is now (tot_latency %d / tot_msgs_recvd %d): %d\n",
00813         readers[connection_id]->sum_recvd_msgs_latency,
00814         readers[connection_id]->total_msgs_recvd,
00815         readers[connection_id]->sum_recvd_msgs_latency/readers[connection_id]->total_msgs_recvd));
00816   }
00817   ::DDS::Subscriber_var bit_subscriber
00818    = part->get_builtin_subscriber () ;
00819 
00820   ::DDS::DataReader_var reader
00821    = bit_subscriber->lookup_datareader (OpenDDS::DCPS::BUILT_IN_PUBLICATION_TOPIC) ;
00822   ::DDS::PublicationBuiltinTopicDataDataReader_var pub_reader
00823    = ::DDS::PublicationBuiltinTopicDataDataReader::_narrow (reader.in ());
00824   if (CORBA::is_nil (pub_reader.in ())) {
00825     ACE_ERROR((LM_ERROR, "(%P|%t) populate_header_received: failed to get BUILT_IN_PUBLICATION_TOPIC datareader.\n"));
00826     return_code = FACE::NOT_AVAILABLE;
00827     return;
00828   }
00829 
00830   ::DDS::ReturnCode_t ret;
00831   ::DDS::SampleInfoSeq pubinfos(1);
00832   ::DDS::PublicationBuiltinTopicDataSeq pubdata(1);
00833   ret = pub_reader->read_instance(pubdata,
00834                                   pubinfos,
00835                                   1,
00836                                   sinfo.publication_handle,
00837                                   ::DDS::ANY_SAMPLE_STATE,
00838                                   ::DDS::ANY_VIEW_STATE,
00839                                   ::DDS::ALIVE_INSTANCE_STATE);
00840 
00841   if (ret != ::DDS::RETCODE_OK && ret != ::DDS::RETCODE_NO_DATA) {
00842     ACE_ERROR((LM_ERROR,
00843         "(%P|%t) populate_header_received:  failed to read BIT publication data.\n"));
00844     return_code = FACE::NOT_AVAILABLE;
00845     return;
00846   }
00847 
00848   const CORBA::ULong i = 0;
00849 
00850   header.message_source_guid =
00851     (pubdata[i].user_data.value[0] << 0) |
00852     (pubdata[i].user_data.value[1] << 8) |
00853     (pubdata[i].user_data.value[2] << 16);
00854 
00855 //  ACE_DEBUG((LM_DEBUG, "(%P|%t) populate_header_received: DW lifespan qos value: sec: %d nanosec: %d\n",
00856 //                         pubdata[i].lifespan.duration.sec, pubdata[i].lifespan.duration.nanosec));
00857 
00858   DDS::Duration_t lifespan = pubdata[i].lifespan.duration;
00859   if (lifespan.sec != DDS::DURATION_INFINITE_SEC &&
00860       lifespan.nanosec != DDS::DURATION_INFINITE_NSEC) {
00861     // Finite lifespan.  Check if data has expired.
00862 
00863     DDS::Time_t const tmp = {
00864       sinfo.source_timestamp.sec + lifespan.sec,
00865       sinfo.source_timestamp.nanosec + lifespan.nanosec
00866     };
00867 
00868     // We assume that the publisher host's clock and subscriber host's
00869     // clock are synchronized (allowed by the spec).
00870     ACE_Time_Value const expiration_time(
00871         OpenDDS::DCPS::time_to_time_value(tmp));
00872 
00873     if (now >= expiration_time) {
00874 //      ACE_DEBUG((LM_DEBUG, "(%P|%t) populate_header_received: Last message expired, setting message_validity to INVALID\n"));
00875       header.message_validity = FACE::INVALID;
00876       return_code = FACE::RC_NO_ERROR;
00877       return;
00878     }
00879   }
00880 //  ACE_DEBUG((LM_DEBUG, "(%P|%t) populate_header_received: Setting message_validity to VALID\n"));
00881   header.message_validity = FACE::VALID;
00882   return_code = FACE::RC_NO_ERROR;
00883 }

template<typename Msg>
void OpenDDS::FaceTSS::receive_message ( FACE::CONNECTION_ID_TYPE  connection_id,
FACE::TIMEOUT_TYPE  timeout,
FACE::TRANSACTION_ID_TYPE &  transaction_id,
Msg &  message,
FACE::MESSAGE_SIZE_TYPE  message_size,
FACE::RETURN_CODE_TYPE &  return_code 
)

Definition at line 143 of file FaceTSS.h.

References DDS::ALIVE_INSTANCE_STATE, DDS::ANY_SAMPLE_STATE, DDS::ANY_VIEW_STATE, convertTimeout(), DataReader, OpenDDS::FaceTSS::Entities::instance(), populate_header_received(), OpenDDS::FaceTSS::Entities::receivers_, DDS::RETCODE_BAD_PARAMETER, DDS::RETCODE_NO_DATA, DDS::RETCODE_OK, DDS::RETCODE_TIMEOUT, OpenDDS::FaceTSS::Entities::DDSAdapter::status_valid, and update_status().

00149 {
00150   Entities::ConnIdToReceiverMap& readers = Entities::instance()->receivers_;
00151   if (!readers.count(connection_id)) {
00152     return_code = FACE::INVALID_PARAM;
00153     return;
00154   }
00155   if(!Entities::instance()->connections_.count(connection_id)) {
00156     return_code = FACE::INVALID_PARAM;
00157     return;
00158   }
00159   FACE::TRANSPORT_CONNECTION_STATUS_TYPE status =
00160     Entities::instance()->connections_[connection_id].connection_status;
00161   if (message_size < status.MAX_MESSAGE_SIZE) {
00162     return_code = FACE::INVALID_PARAM;
00163     return;
00164   }
00165   typedef typename DCPS::DDSTraits<Msg>::DataReaderType DataReader;
00166   const typename DataReader::_var_type typedReader =
00167     DataReader::_narrow(readers[connection_id]->dr);
00168   if (!typedReader) {
00169     return_code = update_status(connection_id, DDS::RETCODE_BAD_PARAMETER);
00170     return;
00171   }
00172   if (readers[connection_id]->status_valid != FACE::VALID) {
00173     Entities::FaceReceiver* tmp = readers[connection_id];
00174     readers[connection_id] = new Entities::DDSTypedAdapter<Msg>(*readers[connection_id]);
00175     delete tmp;
00176   }
00177   readers[connection_id]->status_valid = FACE::VALID;
00178 
00179   const DDS::ReadCondition_var rc =
00180     typedReader->create_readcondition(DDS::ANY_SAMPLE_STATE,
00181                                       DDS::ANY_VIEW_STATE,
00182                                       DDS::ALIVE_INSTANCE_STATE);
00183   const DDS::WaitSet_var ws = new DDS::WaitSet;
00184   ws->attach_condition(rc);
00185 
00186   DDS::ConditionSeq active;
00187   const DDS::Duration_t ddsTimeout = convertTimeout(timeout);
00188   DDS::ReturnCode_t ret = ws->wait(active, ddsTimeout);
00189   ws->detach_condition(rc);
00190 
00191   if (ret == DDS::RETCODE_TIMEOUT) {
00192     return_code = update_status(connection_id, ret);
00193     return;
00194   }
00195 
00196   typename DCPS::DDSTraits<Msg>::MessageSequenceType seq;
00197   DDS::SampleInfoSeq sinfo;
00198   ret = typedReader->take_w_condition(seq, sinfo, 1 /*max*/, rc);
00199   if (ret == DDS::RETCODE_OK && sinfo[0].valid_data) {
00200     DDS::DomainParticipant_var participant = typedReader->get_subscriber()->get_participant();
00201     FACE::RETURN_CODE_TYPE ret_code;
00202     populate_header_received(connection_id, participant, sinfo[0], ret_code);
00203     if (ret_code != FACE::RC_NO_ERROR) {
00204       return_code = update_status(connection_id, ret_code);
00205       return;
00206     }
00207 
00208     transaction_id = ++readers[connection_id]->last_msg_tid;
00209 
00210     message = seq[0];
00211     return_code = update_status(connection_id, ret);
00212     return;
00213   }
00214   return_code = update_status(connection_id, DDS::RETCODE_NO_DATA);
00215 }

template<typename Msg>
void OpenDDS::FaceTSS::send_message ( FACE::CONNECTION_ID_TYPE  connection_id,
FACE::TIMEOUT_TYPE  timeout,
FACE::TRANSACTION_ID_TYPE &  ,
const Msg &  message,
FACE::MESSAGE_SIZE_TYPE  message_size,
FACE::RETURN_CODE_TYPE &  return_code 
)

Definition at line 218 of file FaceTSS.h.

References convertDuration(), DataWriter, DDS::HANDLE_NIL, OpenDDS::FaceTSS::Entities::instance(), DDS::DataWriterQos::reliability, DDS::RELIABLE_RELIABILITY_QOS, DDS::RETCODE_BAD_PARAMETER, OpenDDS::FaceTSS::Entities::senders_, and update_status().

00224 {
00225   if(!Entities::instance()->connections_.count(connection_id)) {
00226     return_code = FACE::INVALID_PARAM;
00227     return;
00228   }
00229   FACE::TRANSPORT_CONNECTION_STATUS_TYPE status =
00230     Entities::instance()->connections_[connection_id].connection_status;
00231   if (message_size < status.MAX_MESSAGE_SIZE) {
00232     return_code = FACE::INVALID_PARAM;
00233     return;
00234   }
00235   Entities::ConnIdToSenderMap& writers = Entities::instance()->senders_;
00236   if (!writers.count(connection_id)) {
00237     return_code = FACE::INVALID_PARAM;
00238     return;
00239   }
00240 
00241   typedef typename DCPS::DDSTraits<Msg>::DataWriterType DataWriter;
00242   const typename DataWriter::_var_type typedWriter =
00243     DataWriter::_narrow(writers[connection_id].dw);
00244   if (!typedWriter) {
00245     return_code = update_status(connection_id, DDS::RETCODE_BAD_PARAMETER);
00246     return;
00247   }
00248   writers[connection_id].status_valid = FACE::VALID;
00249 
00250   DDS::DataWriterQos dw_qos;
00251   typedWriter->get_qos(dw_qos);
00252   FACE::SYSTEM_TIME_TYPE max_blocking_time = convertDuration(dw_qos.reliability.max_blocking_time);
00253   if (dw_qos.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS &&
00254       timeout != FACE::INF_TIME_VALUE &&
00255       ((max_blocking_time == FACE::INF_TIME_VALUE) || (timeout < max_blocking_time))) {
00256     return_code = update_status(connection_id, DDS::RETCODE_BAD_PARAMETER);
00257     return;
00258   }
00259 
00260   return_code = update_status(connection_id, typedWriter->write(message, DDS::HANDLE_NIL));
00261 }

template<typename Msg>
void OpenDDS::FaceTSS::register_callback ( FACE::CONNECTION_ID_TYPE  connection_id,
const FACE::WAITSET_TYPE  ,
void(*)(FACE::TRANSACTION_ID_TYPE, Msg &, FACE::MESSAGE_TYPE_GUID, FACE::MESSAGE_SIZE_TYPE, const FACE::WAITSET_TYPE, FACE::RETURN_CODE_TYPE &)  callback,
FACE::MESSAGE_SIZE_TYPE  max_message_size,
FACE::RETURN_CODE_TYPE &  return_code 
)

Definition at line 350 of file FaceTSS.h.

References OpenDDS::FaceTSS::Listener< Msg >::add_callback(), DDS::DATA_AVAILABLE_STATUS, OpenDDS::FaceTSS::Entities::instance(), OpenDDS::FaceTSS::Entities::receivers_, and OpenDDS::FaceTSS::Entities::DDSAdapter::status_valid.

00359 {
00360   Entities::ConnIdToReceiverMap& readers = Entities::instance()->receivers_;
00361   if (!readers.count(connection_id)) {
00362     return_code = FACE::INVALID_PARAM;
00363     return;
00364   }
00365   if(!Entities::instance()->connections_.count(connection_id)) {
00366     return_code = FACE::INVALID_PARAM;
00367     return;
00368   }
00369   FACE::TRANSPORT_CONNECTION_STATUS_TYPE status =
00370     Entities::instance()->connections_[connection_id].connection_status;
00371   if (max_message_size < status.MAX_MESSAGE_SIZE) {
00372     return_code = FACE::INVALID_PARAM;
00373     return;
00374   }
00375   DDS::DataReaderListener_ptr existing_listener = readers[connection_id]->dr->get_listener();
00376   if (existing_listener) {
00377     Listener<Msg>* typedListener = dynamic_cast<Listener<Msg>*>(existing_listener);
00378     typedListener->add_callback(callback);
00379   } else {
00380     DDS::DataReaderListener_var listener = new Listener<Msg>(callback, connection_id);
00381     readers[connection_id]->dr->set_listener(listener, DDS::DATA_AVAILABLE_STATUS);
00382   }
00383   if (readers[connection_id]->status_valid != FACE::VALID) {
00384     Entities::FaceReceiver* tmp = readers[connection_id];
00385     readers[connection_id] = new Entities::DDSTypedAdapter<Msg>(*readers[connection_id]);
00386     delete tmp;
00387   }
00388   readers[connection_id]->status_valid = FACE::VALID;
00389 
00390   return_code = FACE::RC_NO_ERROR;
00391 }


Generated on Fri Feb 12 20:06:45 2016 for OpenDDS by  doxygen 1.4.7