Public Member Functions | |
Reader (const DCPS::RepoId &sub_id, Sedp &sedp) | |
virtual | ~Reader () |
bool | assoc (const DCPS::AssociationData &publication) |
void | data_received (const DCPS::ReceivedDataSample &sample) |
void | notify_subscription_disconnected (const DCPS::WriterIdSeq &) |
void | notify_subscription_reconnected (const DCPS::WriterIdSeq &) |
void | notify_subscription_lost (const DCPS::WriterIdSeq &) |
void | remove_associations (const DCPS::WriterIdSeq &, bool) |
Public Attributes | |
ACE_Atomic_Op< ACE_SYNCH_MUTEX, bool > | shutting_down_ |
Definition at line 362 of file Sedp.h.
OpenDDS::RTPS::Sedp::Reader::Reader | ( | const DCPS::RepoId & | sub_id, | |
Sedp & | sedp | |||
) | [inline] |
Definition at line 367 of file Sedp.h.
00368 : Endpoint(sub_id, sedp) 00369 , shutting_down_(false) 00370 {}
OpenDDS::RTPS::Sedp::Reader::~Reader | ( | ) | [virtual] |
bool OpenDDS::RTPS::Sedp::Reader::assoc | ( | const DCPS::AssociationData & | publication | ) |
Definition at line 2866 of file Sedp.cpp.
References OpenDDS::DCPS::TransportClient::associate().
02867 { 02868 return associate(publication, false); 02869 }
void OpenDDS::RTPS::Sedp::Reader::data_received | ( | const DCPS::ReceivedDataSample & | sample | ) | [virtual] |
Implements OpenDDS::DCPS::TransportReceiveListener.
Definition at line 2893 of file Sedp.cpp.
References ACE_TEXT(), OpenDDS::DCPS::Serializer::ALIGN_CDR, OpenDDS::RTPS::decode_parameter_list(), OpenDDS::DCPS::DISPOSE_INSTANCE, OpenDDS::DCPS::DISPOSE_UNREGISTER_INSTANCE, OpenDDS::RTPS::Sedp::Task::enqueue(), OpenDDS::RTPS::ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_SECURE_WRITER, OpenDDS::DCPS::ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_WRITER, OpenDDS::RTPS::ENTITYID_P2P_BUILTIN_PARTICIPANT_STATELESS_WRITER, OpenDDS::RTPS::ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_WRITER, OpenDDS::RTPS::ENTITYID_SEDP_BUILTIN_PUBLICATIONS_SECURE_WRITER, OpenDDS::DCPS::ENTITYID_SEDP_BUILTIN_PUBLICATIONS_WRITER, OpenDDS::RTPS::ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_SECURE_WRITER, OpenDDS::DCPS::ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_WRITER, OpenDDS::RTPS::ENTITYID_SPDP_RELIABLE_BUILTIN_PARTICIPANT_SECURE_WRITER, OpenDDS::RTPS::ParameterListConverter::from_param_list(), OpenDDS::DCPS::ReceivedDataSample::header_, LM_ERROR, OpenDDS::DCPS::move(), OpenDDS::DCPS::ReceivedDataSample::sample_, OpenDDS::DCPS::SAMPLE_DATA, OpenDDS::RTPS::Sedp::Endpoint::sedp_, OpenDDS::RTPS::Sedp::set_inline_qos(), shutting_down_, OpenDDS::RTPS::Sedp::task_, OpenDDS::DCPS::UNREGISTER_INSTANCE, and ACE_Atomic_Op< ACE_LOCK, TYPE >::value().
02894 { 02895 if (shutting_down_.value()) return; 02896 02897 switch (sample.header_.message_id_) { 02898 case DCPS::SAMPLE_DATA: 02899 case DCPS::DISPOSE_INSTANCE: 02900 case DCPS::UNREGISTER_INSTANCE: 02901 case DCPS::DISPOSE_UNREGISTER_INSTANCE: { 02902 const DCPS::MessageId id = 02903 static_cast<DCPS::MessageId>(sample.header_.message_id_); 02904 02905 DCPS::Serializer ser(sample.sample_.get(), 02906 sample.header_.byte_order_ != ACE_CDR_BYTE_ORDER, 02907 DCPS::Serializer::ALIGN_CDR); 02908 ACE_CDR::Octet encap, dummy; 02909 ACE_CDR::UShort options; 02910 const bool ok = (ser >> ACE_InputCDR::to_octet(dummy)) 02911 && (ser >> ACE_InputCDR::to_octet(encap)) 02912 && (ser >> options); 02913 if (!ok) { 02914 ACE_ERROR((LM_ERROR, ACE_TEXT("ERROR: Sedp::Reader::data_received - ") 02915 ACE_TEXT("failed to deserialize encap and options\n"))); 02916 return; 02917 } 02918 02919 // Ignore the 'encap' byte order since we use sample.header_.byte_order_ 02920 // to determine whether or not to swap bytes. 02921 02922 if (sample.header_.publication_id_.entityId == ENTITYID_SEDP_BUILTIN_PUBLICATIONS_WRITER) { 02923 ParameterList data; 02924 if (!decode_parameter_list(sample, ser, encap, data)) { 02925 ACE_ERROR((LM_ERROR, ACE_TEXT("ERROR: Sedp::Reader::data_received - ") 02926 ACE_TEXT("failed to deserialize data\n"))); 02927 return; 02928 } 02929 02930 DCPS::unique_ptr<DCPS::DiscoveredWriterData> wdata(new DCPS::DiscoveredWriterData); 02931 if (ParameterListConverter::from_param_list(data, *wdata) < 0) { 02932 ACE_ERROR((LM_ERROR, 02933 ACE_TEXT("(%P|%t) ERROR: Sedp::Reader::data_received - ") 02934 ACE_TEXT("failed to convert from ParameterList ") 02935 ACE_TEXT("to DiscoveredWriterData\n"))); 02936 return; 02937 } 02938 sedp_.task_.enqueue(id, move(wdata)); 02939 02940 #if defined(OPENDDS_SECURITY) 02941 } else if (sample.header_.publication_id_.entityId == ENTITYID_SEDP_BUILTIN_PUBLICATIONS_SECURE_WRITER) { 02942 ParameterList data; 02943 if (!decode_parameter_list(sample, ser, encap, data)) { 02944 ACE_ERROR((LM_ERROR, ACE_TEXT("ERROR: Sedp::Reader::data_received - ") 02945 ACE_TEXT("failed to deserialize data\n"))); 02946 return; 02947 } 02948 02949 DCPS::unique_ptr<DiscoveredWriterData_SecurityWrapper> wdata_secure(new DiscoveredWriterData_SecurityWrapper); 02950 02951 if (ParameterListConverter::from_param_list(data, *wdata_secure) < 0) { 02952 ACE_ERROR((LM_ERROR, 02953 ACE_TEXT("(%P|%t) ERROR: Sedp::Reader::data_received - ") 02954 ACE_TEXT("failed to convert from ParameterList ") 02955 ACE_TEXT("to DiscoveredWriterData_SecurityWrapper\n"))); 02956 return; 02957 } 02958 sedp_.task_.enqueue(id, move(wdata_secure)); 02959 #endif 02960 02961 } else if (sample.header_.publication_id_.entityId == ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_WRITER) { 02962 ParameterList data; 02963 if (!decode_parameter_list(sample, ser, encap, data)) { 02964 ACE_ERROR((LM_ERROR, ACE_TEXT("ERROR: Sedp::Reader::data_received - ") 02965 ACE_TEXT("failed to deserialize data\n"))); 02966 return; 02967 } 02968 02969 DCPS::unique_ptr<DCPS::DiscoveredReaderData> rdata(new DCPS::DiscoveredReaderData); 02970 if (ParameterListConverter::from_param_list(data, *rdata) < 0) { 02971 ACE_ERROR((LM_ERROR, 02972 ACE_TEXT("(%P|%t) ERROR Sedp::Reader::data_received - ") 02973 ACE_TEXT("failed to convert from ParameterList ") 02974 ACE_TEXT("to DiscoveredReaderData\n"))); 02975 return; 02976 } 02977 if (rdata->readerProxy.expectsInlineQos) { 02978 set_inline_qos(rdata->readerProxy.allLocators); 02979 } 02980 sedp_.task_.enqueue(id, move(rdata)); 02981 02982 #if defined(OPENDDS_SECURITY) 02983 } else if (sample.header_.publication_id_.entityId == ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_SECURE_WRITER) { 02984 ParameterList data; 02985 if (!decode_parameter_list(sample, ser, encap, data)) { 02986 ACE_ERROR((LM_ERROR, ACE_TEXT("ERROR: Sedp::Reader::data_received - ") 02987 ACE_TEXT("failed to deserialize data\n"))); 02988 return; 02989 } 02990 02991 DCPS::unique_ptr<DiscoveredReaderData_SecurityWrapper> rdata(new DiscoveredReaderData_SecurityWrapper); 02992 02993 if (ParameterListConverter::from_param_list(data, *rdata) < 0) { 02994 ACE_ERROR((LM_ERROR, 02995 ACE_TEXT("(%P|%t) ERROR Sedp::Reader::data_received - ") 02996 ACE_TEXT("failed to convert from ParameterList ") 02997 ACE_TEXT("to DiscoveredReaderData_SecurityWrapper\n"))); 02998 return; 02999 } 03000 if ((rdata->data).readerProxy.expectsInlineQos) { 03001 set_inline_qos((rdata->data).readerProxy.allLocators); 03002 } 03003 sedp_.task_.enqueue(id, move(rdata)); 03004 #endif 03005 03006 } else if (sample.header_.publication_id_.entityId == ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_WRITER 03007 && !sample.header_.key_fields_only_) { 03008 DCPS::unique_ptr<ParticipantMessageData> data(new ParticipantMessageData); 03009 if (!(ser >> *data)) { 03010 ACE_ERROR((LM_ERROR, ACE_TEXT("ERROR: Sedp::Reader::data_received - ") 03011 ACE_TEXT("failed to deserialize data\n"))); 03012 return; 03013 } 03014 sedp_.task_.enqueue(id, move(data)); 03015 03016 #if defined(OPENDDS_SECURITY) 03017 } else if (sample.header_.publication_id_.entityId == ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_SECURE_WRITER 03018 && !sample.header_.key_fields_only_) { 03019 03020 DCPS::unique_ptr<ParticipantMessageData> data(new ParticipantMessageData); 03021 03022 if (!(ser >> *data)) { 03023 ACE_ERROR((LM_ERROR, ACE_TEXT("ERROR: Sedp::Reader::data_received - ") 03024 ACE_TEXT("failed to deserialize data\n"))); 03025 return; 03026 } 03027 sedp_.task_.enqueue_participant_message_secure(id, move(data)); 03028 03029 } else if (sample.header_.publication_id_.entityId == ENTITYID_P2P_BUILTIN_PARTICIPANT_STATELESS_WRITER) { 03030 03031 DCPS::unique_ptr<DDS::Security::ParticipantStatelessMessage> data(new DDS::Security::ParticipantStatelessMessage); 03032 ser.reset_alignment(); // https://issues.omg.org/browse/DDSIRTP23-63 03033 if (!(ser >> *data)) { 03034 ACE_ERROR((LM_ERROR, ACE_TEXT("ERROR: Sedp::Reader::data_received - ") 03035 ACE_TEXT("failed to deserialize data\n"))); 03036 return; 03037 } 03038 sedp_.task_.enqueue_stateless_message(id, move(data)); 03039 03040 } else if (sample.header_.publication_id_.entityId == ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_WRITER) { 03041 03042 DCPS::unique_ptr<DDS::Security::ParticipantVolatileMessageSecure> data(new DDS::Security::ParticipantVolatileMessageSecure); 03043 ser.reset_alignment(); // https://issues.omg.org/browse/DDSIRTP23-63 03044 if (!(ser >> *data)) { 03045 ACE_ERROR((LM_ERROR, ACE_TEXT("ERROR: Sedp::Reader::data_received - ") 03046 ACE_TEXT("failed to deserialize data\n"))); 03047 return; 03048 } 03049 sedp_.task_.enqueue_volatile_message_secure(id, move(data)); 03050 03051 } else if (sample.header_.publication_id_.entityId == ENTITYID_SPDP_RELIABLE_BUILTIN_PARTICIPANT_SECURE_WRITER) { 03052 03053 ParameterList data; 03054 if (!decode_parameter_list(sample, ser, encap, data)) { 03055 ACE_ERROR((LM_ERROR, ACE_TEXT("ERROR: Sedp::Reader::data_received - ") 03056 ACE_TEXT("failed to deserialize data\n"))); 03057 return; 03058 } 03059 03060 DCPS::unique_ptr<Security::SPDPdiscoveredParticipantData> pdata(new Security::SPDPdiscoveredParticipantData); 03061 03062 if (ParameterListConverter::from_param_list(data, *pdata) < 0) { 03063 ACE_ERROR((LM_ERROR, 03064 ACE_TEXT("(%P|%t) ERROR: Sedp::Reader::data_received - ") 03065 ACE_TEXT("failed to convert from ParameterList ") 03066 ACE_TEXT("to Security::SPDPdiscoveredParticipantData\n"))); 03067 return; 03068 } 03069 sedp_.task_.enqueue(id, move(pdata)); 03070 #endif 03071 03072 } 03073 break; 03074 } 03075 03076 default: 03077 break; 03078 } 03079 }
void OpenDDS::RTPS::Sedp::Reader::notify_subscription_disconnected | ( | const DCPS::WriterIdSeq & | ) | [inline, virtual] |
void OpenDDS::RTPS::Sedp::Reader::notify_subscription_lost | ( | const DCPS::WriterIdSeq & | ) | [inline, virtual] |
void OpenDDS::RTPS::Sedp::Reader::notify_subscription_reconnected | ( | const DCPS::WriterIdSeq & | ) | [inline, virtual] |
void OpenDDS::RTPS::Sedp::Reader::remove_associations | ( | const DCPS::WriterIdSeq & | , | |
bool | ||||
) | [inline, virtual] |
ACE_Atomic_Op<ACE_SYNCH_MUTEX, bool> OpenDDS::RTPS::Sedp::Reader::shutting_down_ |
Definition at line 385 of file Sedp.h.
Referenced by data_received().