OpenDDS::RTPS::Sedp::Reader Class Reference

Inheritance diagram for OpenDDS::RTPS::Sedp::Reader:
Inheritance graph
[legend]
Collaboration diagram for OpenDDS::RTPS::Sedp::Reader:
Collaboration graph
[legend]

List of all members.

Public Member Functions

 Reader (const DCPS::RepoId &sub_id, Sedp &sedp)
virtual ~Reader ()
bool assoc (const DCPS::AssociationData &publication)
void data_received (const DCPS::ReceivedDataSample &sample)
void notify_subscription_disconnected (const DCPS::WriterIdSeq &)
void notify_subscription_reconnected (const DCPS::WriterIdSeq &)
void notify_subscription_lost (const DCPS::WriterIdSeq &)
void remove_associations (const DCPS::WriterIdSeq &, bool)

Public Attributes

ACE_Atomic_Op< ACE_SYNCH_MUTEX,
bool > 
shutting_down_

Detailed Description

Definition at line 362 of file Sedp.h.


Constructor & Destructor Documentation

OpenDDS::RTPS::Sedp::Reader::Reader ( const DCPS::RepoId sub_id,
Sedp sedp 
) [inline]

Definition at line 367 of file Sedp.h.

00368       : Endpoint(sub_id, sedp)
00369       , shutting_down_(false)
00370     {}

OpenDDS::RTPS::Sedp::Reader::~Reader (  )  [virtual]

Definition at line 2862 of file Sedp.cpp.

02863 {}


Member Function Documentation

bool OpenDDS::RTPS::Sedp::Reader::assoc ( const DCPS::AssociationData publication  ) 

Definition at line 2866 of file Sedp.cpp.

References OpenDDS::DCPS::TransportClient::associate().

02867 {
02868   return associate(publication, false);
02869 }

Here is the call graph for this function:

void OpenDDS::RTPS::Sedp::Reader::data_received ( const DCPS::ReceivedDataSample sample  )  [virtual]

Implements OpenDDS::DCPS::TransportReceiveListener.

Definition at line 2893 of file Sedp.cpp.

References ACE_TEXT(), OpenDDS::DCPS::Serializer::ALIGN_CDR, OpenDDS::RTPS::decode_parameter_list(), OpenDDS::DCPS::DISPOSE_INSTANCE, OpenDDS::DCPS::DISPOSE_UNREGISTER_INSTANCE, OpenDDS::RTPS::Sedp::Task::enqueue(), OpenDDS::RTPS::ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_SECURE_WRITER, OpenDDS::DCPS::ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_WRITER, OpenDDS::RTPS::ENTITYID_P2P_BUILTIN_PARTICIPANT_STATELESS_WRITER, OpenDDS::RTPS::ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_WRITER, OpenDDS::RTPS::ENTITYID_SEDP_BUILTIN_PUBLICATIONS_SECURE_WRITER, OpenDDS::DCPS::ENTITYID_SEDP_BUILTIN_PUBLICATIONS_WRITER, OpenDDS::RTPS::ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_SECURE_WRITER, OpenDDS::DCPS::ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_WRITER, OpenDDS::RTPS::ENTITYID_SPDP_RELIABLE_BUILTIN_PARTICIPANT_SECURE_WRITER, OpenDDS::RTPS::ParameterListConverter::from_param_list(), OpenDDS::DCPS::ReceivedDataSample::header_, LM_ERROR, OpenDDS::DCPS::move(), OpenDDS::DCPS::ReceivedDataSample::sample_, OpenDDS::DCPS::SAMPLE_DATA, OpenDDS::RTPS::Sedp::Endpoint::sedp_, OpenDDS::RTPS::Sedp::set_inline_qos(), shutting_down_, OpenDDS::RTPS::Sedp::task_, OpenDDS::DCPS::UNREGISTER_INSTANCE, and ACE_Atomic_Op< ACE_LOCK, TYPE >::value().

02894 {
02895   if (shutting_down_.value()) return;
02896 
02897   switch (sample.header_.message_id_) {
02898   case DCPS::SAMPLE_DATA:
02899   case DCPS::DISPOSE_INSTANCE:
02900   case DCPS::UNREGISTER_INSTANCE:
02901   case DCPS::DISPOSE_UNREGISTER_INSTANCE: {
02902     const DCPS::MessageId id =
02903       static_cast<DCPS::MessageId>(sample.header_.message_id_);
02904 
02905     DCPS::Serializer ser(sample.sample_.get(),
02906                          sample.header_.byte_order_ != ACE_CDR_BYTE_ORDER,
02907                          DCPS::Serializer::ALIGN_CDR);
02908     ACE_CDR::Octet encap, dummy;
02909     ACE_CDR::UShort options;
02910     const bool ok = (ser >> ACE_InputCDR::to_octet(dummy))
02911               && (ser >> ACE_InputCDR::to_octet(encap))
02912               && (ser >> options);
02913     if (!ok) {
02914       ACE_ERROR((LM_ERROR, ACE_TEXT("ERROR: Sedp::Reader::data_received - ")
02915                  ACE_TEXT("failed to deserialize encap and options\n")));
02916       return;
02917     }
02918 
02919     // Ignore the 'encap' byte order since we use sample.header_.byte_order_
02920     // to determine whether or not to swap bytes.
02921 
02922     if (sample.header_.publication_id_.entityId == ENTITYID_SEDP_BUILTIN_PUBLICATIONS_WRITER) {
02923       ParameterList data;
02924       if (!decode_parameter_list(sample, ser, encap, data)) {
02925         ACE_ERROR((LM_ERROR, ACE_TEXT("ERROR: Sedp::Reader::data_received - ")
02926                    ACE_TEXT("failed to deserialize data\n")));
02927         return;
02928       }
02929 
02930       DCPS::unique_ptr<DCPS::DiscoveredWriterData> wdata(new DCPS::DiscoveredWriterData);
02931       if (ParameterListConverter::from_param_list(data, *wdata) < 0) {
02932         ACE_ERROR((LM_ERROR,
02933                    ACE_TEXT("(%P|%t) ERROR: Sedp::Reader::data_received - ")
02934                    ACE_TEXT("failed to convert from ParameterList ")
02935                    ACE_TEXT("to DiscoveredWriterData\n")));
02936         return;
02937       }
02938       sedp_.task_.enqueue(id, move(wdata));
02939 
02940 #if defined(OPENDDS_SECURITY)
02941     } else if (sample.header_.publication_id_.entityId == ENTITYID_SEDP_BUILTIN_PUBLICATIONS_SECURE_WRITER) {
02942       ParameterList data;
02943       if (!decode_parameter_list(sample, ser, encap, data)) {
02944         ACE_ERROR((LM_ERROR, ACE_TEXT("ERROR: Sedp::Reader::data_received - ")
02945                    ACE_TEXT("failed to deserialize data\n")));
02946         return;
02947       }
02948 
02949       DCPS::unique_ptr<DiscoveredWriterData_SecurityWrapper> wdata_secure(new DiscoveredWriterData_SecurityWrapper);
02950 
02951       if (ParameterListConverter::from_param_list(data, *wdata_secure) < 0) {
02952         ACE_ERROR((LM_ERROR,
02953                    ACE_TEXT("(%P|%t) ERROR: Sedp::Reader::data_received - ")
02954                    ACE_TEXT("failed to convert from ParameterList ")
02955                    ACE_TEXT("to DiscoveredWriterData_SecurityWrapper\n")));
02956         return;
02957       }
02958       sedp_.task_.enqueue(id, move(wdata_secure));
02959 #endif
02960 
02961     } else if (sample.header_.publication_id_.entityId == ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_WRITER) {
02962       ParameterList data;
02963       if (!decode_parameter_list(sample, ser, encap, data)) {
02964         ACE_ERROR((LM_ERROR, ACE_TEXT("ERROR: Sedp::Reader::data_received - ")
02965                    ACE_TEXT("failed to deserialize data\n")));
02966         return;
02967       }
02968 
02969       DCPS::unique_ptr<DCPS::DiscoveredReaderData> rdata(new DCPS::DiscoveredReaderData);
02970       if (ParameterListConverter::from_param_list(data, *rdata) < 0) {
02971         ACE_ERROR((LM_ERROR,
02972                    ACE_TEXT("(%P|%t) ERROR Sedp::Reader::data_received - ")
02973                    ACE_TEXT("failed to convert from ParameterList ")
02974                    ACE_TEXT("to DiscoveredReaderData\n")));
02975         return;
02976       }
02977       if (rdata->readerProxy.expectsInlineQos) {
02978         set_inline_qos(rdata->readerProxy.allLocators);
02979       }
02980       sedp_.task_.enqueue(id, move(rdata));
02981 
02982 #if defined(OPENDDS_SECURITY)
02983     } else if (sample.header_.publication_id_.entityId == ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_SECURE_WRITER) {
02984       ParameterList data;
02985       if (!decode_parameter_list(sample, ser, encap, data)) {
02986         ACE_ERROR((LM_ERROR, ACE_TEXT("ERROR: Sedp::Reader::data_received - ")
02987                    ACE_TEXT("failed to deserialize data\n")));
02988         return;
02989       }
02990 
02991       DCPS::unique_ptr<DiscoveredReaderData_SecurityWrapper> rdata(new DiscoveredReaderData_SecurityWrapper);
02992 
02993       if (ParameterListConverter::from_param_list(data, *rdata) < 0) {
02994         ACE_ERROR((LM_ERROR,
02995                    ACE_TEXT("(%P|%t) ERROR Sedp::Reader::data_received - ")
02996                    ACE_TEXT("failed to convert from ParameterList ")
02997                    ACE_TEXT("to DiscoveredReaderData_SecurityWrapper\n")));
02998         return;
02999       }
03000       if ((rdata->data).readerProxy.expectsInlineQos) {
03001         set_inline_qos((rdata->data).readerProxy.allLocators);
03002       }
03003       sedp_.task_.enqueue(id, move(rdata));
03004 #endif
03005 
03006     } else if (sample.header_.publication_id_.entityId == ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_WRITER
03007                && !sample.header_.key_fields_only_) {
03008       DCPS::unique_ptr<ParticipantMessageData> data(new ParticipantMessageData);
03009       if (!(ser >> *data)) {
03010         ACE_ERROR((LM_ERROR, ACE_TEXT("ERROR: Sedp::Reader::data_received - ")
03011                    ACE_TEXT("failed to deserialize data\n")));
03012         return;
03013       }
03014       sedp_.task_.enqueue(id, move(data));
03015 
03016 #if defined(OPENDDS_SECURITY)
03017     } else if (sample.header_.publication_id_.entityId == ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_SECURE_WRITER
03018                && !sample.header_.key_fields_only_) {
03019 
03020       DCPS::unique_ptr<ParticipantMessageData> data(new ParticipantMessageData);
03021 
03022       if (!(ser >> *data)) {
03023         ACE_ERROR((LM_ERROR, ACE_TEXT("ERROR: Sedp::Reader::data_received - ")
03024                    ACE_TEXT("failed to deserialize data\n")));
03025         return;
03026       }
03027       sedp_.task_.enqueue_participant_message_secure(id, move(data));
03028 
03029     } else if (sample.header_.publication_id_.entityId == ENTITYID_P2P_BUILTIN_PARTICIPANT_STATELESS_WRITER) {
03030 
03031       DCPS::unique_ptr<DDS::Security::ParticipantStatelessMessage> data(new DDS::Security::ParticipantStatelessMessage);
03032       ser.reset_alignment(); // https://issues.omg.org/browse/DDSIRTP23-63
03033       if (!(ser >> *data)) {
03034         ACE_ERROR((LM_ERROR, ACE_TEXT("ERROR: Sedp::Reader::data_received - ")
03035                    ACE_TEXT("failed to deserialize data\n")));
03036         return;
03037       }
03038       sedp_.task_.enqueue_stateless_message(id, move(data));
03039 
03040     } else if (sample.header_.publication_id_.entityId == ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_WRITER) {
03041 
03042       DCPS::unique_ptr<DDS::Security::ParticipantVolatileMessageSecure> data(new DDS::Security::ParticipantVolatileMessageSecure);
03043       ser.reset_alignment(); // https://issues.omg.org/browse/DDSIRTP23-63
03044       if (!(ser >> *data)) {
03045         ACE_ERROR((LM_ERROR, ACE_TEXT("ERROR: Sedp::Reader::data_received - ")
03046                    ACE_TEXT("failed to deserialize data\n")));
03047         return;
03048       }
03049       sedp_.task_.enqueue_volatile_message_secure(id, move(data));
03050 
03051     } else if (sample.header_.publication_id_.entityId == ENTITYID_SPDP_RELIABLE_BUILTIN_PARTICIPANT_SECURE_WRITER) {
03052 
03053       ParameterList data;
03054       if (!decode_parameter_list(sample, ser, encap, data)) {
03055         ACE_ERROR((LM_ERROR, ACE_TEXT("ERROR: Sedp::Reader::data_received - ")
03056                    ACE_TEXT("failed to deserialize data\n")));
03057         return;
03058       }
03059 
03060       DCPS::unique_ptr<Security::SPDPdiscoveredParticipantData> pdata(new Security::SPDPdiscoveredParticipantData);
03061 
03062       if (ParameterListConverter::from_param_list(data, *pdata) < 0) {
03063         ACE_ERROR((LM_ERROR,
03064                    ACE_TEXT("(%P|%t) ERROR: Sedp::Reader::data_received - ")
03065                    ACE_TEXT("failed to convert from ParameterList ")
03066                    ACE_TEXT("to Security::SPDPdiscoveredParticipantData\n")));
03067         return;
03068       }
03069       sedp_.task_.enqueue(id, move(pdata));
03070 #endif
03071 
03072     }
03073     break;
03074   }
03075 
03076   default:
03077     break;
03078   }
03079 }

Here is the call graph for this function:

void OpenDDS::RTPS::Sedp::Reader::notify_subscription_disconnected ( const DCPS::WriterIdSeq  )  [inline, virtual]

Implements OpenDDS::DCPS::TransportReceiveListener.

Definition at line 380 of file Sedp.h.

00380 {}

void OpenDDS::RTPS::Sedp::Reader::notify_subscription_lost ( const DCPS::WriterIdSeq  )  [inline, virtual]

Implements OpenDDS::DCPS::TransportReceiveListener.

Definition at line 382 of file Sedp.h.

00382 {}

void OpenDDS::RTPS::Sedp::Reader::notify_subscription_reconnected ( const DCPS::WriterIdSeq  )  [inline, virtual]

Implements OpenDDS::DCPS::TransportReceiveListener.

Definition at line 381 of file Sedp.h.

00381 {}

void OpenDDS::RTPS::Sedp::Reader::remove_associations ( const DCPS::WriterIdSeq ,
bool   
) [inline, virtual]

Implements OpenDDS::DCPS::TransportReceiveListener.

Definition at line 383 of file Sedp.h.

00383 {}


Member Data Documentation

Definition at line 385 of file Sedp.h.

Referenced by data_received().


The documentation for this class was generated from the following files:
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1