Public Member Functions | |
Reader (const DCPS::RepoId &sub_id, Sedp &sedp) | |
virtual | ~Reader () |
bool | assoc (const DCPS::AssociationData &publication) |
void | data_received (const DCPS::ReceivedDataSample &sample) |
void | notify_subscription_disconnected (const DCPS::WriterIdSeq &) |
void | notify_subscription_reconnected (const DCPS::WriterIdSeq &) |
void | notify_subscription_lost (const DCPS::WriterIdSeq &) |
void | notify_connection_deleted (const DCPS::RepoId &) |
void | remove_associations (const DCPS::WriterIdSeq &, bool) |
void | listener_add_ref () |
void | listener_remove_ref () |
Public Attributes | |
ACE_Atomic_Op< ACE_SYNCH_MUTEX, long > | shutting_down_ |
Definition at line 208 of file Sedp.h.
OpenDDS::RTPS::Sedp::Reader::Reader | ( | const DCPS::RepoId & | sub_id, | |
Sedp & | sedp | |||
) | [inline] |
Definition at line 214 of file Sedp.h.
00215 : Endpoint(sub_id, sedp) 00216 , shutting_down_(0) 00217 {}
OpenDDS::RTPS::Sedp::Reader::~Reader | ( | ) | [virtual] |
bool OpenDDS::RTPS::Sedp::Reader::assoc | ( | const DCPS::AssociationData & | publication | ) |
Definition at line 1645 of file Sedp.cpp.
References OpenDDS::DCPS::TransportClient::associate().
01646 { 01647 return associate(publication, false); 01648 }
void OpenDDS::RTPS::Sedp::Reader::data_received | ( | const DCPS::ReceivedDataSample & | sample | ) | [virtual] |
Implements OpenDDS::DCPS::TransportReceiveListener.
Definition at line 1673 of file Sedp.cpp.
References OpenDDS::DCPS::Serializer::ALIGN_CDR, OpenDDS::RTPS::decode_parameter_list(), OpenDDS::DCPS::DISPOSE_INSTANCE, OpenDDS::DCPS::DISPOSE_UNREGISTER_INSTANCE, dummy, OpenDDS::RTPS::Sedp::Task::enqueue(), OpenDDS::DCPS::ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_WRITER, OpenDDS::DCPS::ENTITYID_SEDP_BUILTIN_PUBLICATIONS_WRITER, OpenDDS::DCPS::ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_WRITER, OpenDDS::RTPS::ParameterListConverter::from_param_list(), OpenDDS::DCPS::ReceivedDataSample::header_, OpenDDS::DCPS::ReceivedDataSample::sample_, OpenDDS::DCPS::SAMPLE_DATA, OpenDDS::RTPS::Sedp::Endpoint::sedp_, OpenDDS::RTPS::Sedp::set_inline_qos(), shutting_down_, OpenDDS::RTPS::Sedp::task_, and OpenDDS::DCPS::UNREGISTER_INSTANCE.
01674 { 01675 if (shutting_down_.value()) return; 01676 01677 switch (sample.header_.message_id_) { 01678 case DCPS::SAMPLE_DATA: 01679 case DCPS::DISPOSE_INSTANCE: 01680 case DCPS::UNREGISTER_INSTANCE: 01681 case DCPS::DISPOSE_UNREGISTER_INSTANCE: { 01682 const DCPS::MessageId id = 01683 static_cast<DCPS::MessageId>(sample.header_.message_id_); 01684 01685 DCPS::Serializer ser(sample.sample_, 01686 sample.header_.byte_order_ != ACE_CDR_BYTE_ORDER, 01687 DCPS::Serializer::ALIGN_CDR); 01688 bool ok = true; 01689 ACE_CDR::Octet encap, dummy; 01690 ACE_CDR::UShort options; 01691 ok &= (ser >> ACE_InputCDR::to_octet(dummy)) 01692 && (ser >> ACE_InputCDR::to_octet(encap)) 01693 && (ser >> options); 01694 01695 // Ignore the 'encap' byte order since we use sample.header_.byte_order_ 01696 // to determine whether or not to swap bytes. 01697 01698 if (sample.header_.publication_id_.entityId == ENTITYID_SEDP_BUILTIN_PUBLICATIONS_WRITER) { 01699 ParameterList data; 01700 if (!decode_parameter_list(sample, ser, encap, data)) { 01701 ACE_DEBUG((LM_ERROR, ACE_TEXT("ERROR: Sedp::Reader::data_received - ") 01702 ACE_TEXT("failed to deserialize data\n"))); 01703 return; 01704 } 01705 01706 ACE_Auto_Ptr<OpenDDS::DCPS::DiscoveredWriterData> wdata(new OpenDDS::DCPS::DiscoveredWriterData); 01707 if (ParameterListConverter::from_param_list(data, *wdata) < 0) { 01708 ACE_DEBUG((LM_ERROR, 01709 ACE_TEXT("(%P|%t) ERROR: Sedp::Reader::data_received - ") 01710 ACE_TEXT("failed to convert from ParameterList ") 01711 ACE_TEXT("to DiscoveredWriterData\n"))); 01712 return; 01713 } 01714 sedp_.task_.enqueue(id, wdata.release()); 01715 01716 } else if (sample.header_.publication_id_.entityId == ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_WRITER) { 01717 ParameterList data; 01718 if (!decode_parameter_list(sample, ser, encap, data)) { 01719 ACE_DEBUG((LM_ERROR, ACE_TEXT("ERROR: Sedp::Reader::data_received - ") 01720 ACE_TEXT("failed to deserialize data\n"))); 01721 return; 01722 } 01723 01724 ACE_Auto_Ptr<OpenDDS::DCPS::DiscoveredReaderData> rdata(new OpenDDS::DCPS::DiscoveredReaderData); 01725 if (ParameterListConverter::from_param_list(data, *rdata) < 0) { 01726 ACE_DEBUG((LM_ERROR, 01727 ACE_TEXT("(%P|%t) ERROR Sedp::Reader::data_received - ") 01728 ACE_TEXT("failed to convert from ParameterList ") 01729 ACE_TEXT("to DiscoveredReaderData\n"))); 01730 return; 01731 } 01732 if (rdata->readerProxy.expectsInlineQos) { 01733 set_inline_qos(rdata->readerProxy.allLocators); 01734 } 01735 sedp_.task_.enqueue(id, rdata.release()); 01736 01737 } else if (sample.header_.publication_id_.entityId == ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_WRITER 01738 && !sample.header_.key_fields_only_) { 01739 ACE_Auto_Ptr<ParticipantMessageData> data(new ParticipantMessageData); 01740 if (!(ser >> *data)) { 01741 ACE_DEBUG((LM_ERROR, ACE_TEXT("ERROR: Sedp::Reader::data_received - ") 01742 ACE_TEXT("failed to deserialize data\n"))); 01743 return; 01744 } 01745 01746 sedp_.task_.enqueue(id, data.release()); 01747 01748 } 01749 } 01750 break; 01751 01752 default: 01753 break; 01754 } 01755 }
void OpenDDS::RTPS::Sedp::Reader::listener_add_ref | ( | ) | [inline, virtual] |
Implements OpenDDS::DCPS::TransportReceiveListener.
Definition at line 233 of file Sedp.h.
References OpenDDS::DCPS::RcObject< ACE_SYNCH_MUTEX >::_add_ref().
00233 { _add_ref(); }
void OpenDDS::RTPS::Sedp::Reader::listener_remove_ref | ( | ) | [inline, virtual] |
Implements OpenDDS::DCPS::TransportReceiveListener.
Definition at line 234 of file Sedp.h.
References OpenDDS::DCPS::RcObject< ACE_SYNCH_MUTEX >::_remove_ref().
00234 { _remove_ref(); }
void OpenDDS::RTPS::Sedp::Reader::notify_connection_deleted | ( | const DCPS::RepoId & | ) | [inline, virtual] |
void OpenDDS::RTPS::Sedp::Reader::notify_subscription_disconnected | ( | const DCPS::WriterIdSeq & | ) | [inline, virtual] |
void OpenDDS::RTPS::Sedp::Reader::notify_subscription_lost | ( | const DCPS::WriterIdSeq & | ) | [inline, virtual] |
void OpenDDS::RTPS::Sedp::Reader::notify_subscription_reconnected | ( | const DCPS::WriterIdSeq & | ) | [inline, virtual] |
void OpenDDS::RTPS::Sedp::Reader::remove_associations | ( | const DCPS::WriterIdSeq & | , | |
bool | ||||
) | [inline, virtual] |
ACE_Atomic_Op<ACE_SYNCH_MUTEX, long> OpenDDS::RTPS::Sedp::Reader::shutting_down_ |