OpenDDS::RTPS::Sedp::Reader Class Reference

Inheritance diagram for OpenDDS::RTPS::Sedp::Reader:

Inheritance graph
[legend]
Collaboration diagram for OpenDDS::RTPS::Sedp::Reader:

Collaboration graph
[legend]
List of all members.

Public Member Functions

 Reader (const DCPS::RepoId &sub_id, Sedp &sedp)
virtual ~Reader ()
bool assoc (const DCPS::AssociationData &publication)
void data_received (const DCPS::ReceivedDataSample &sample)
void notify_subscription_disconnected (const DCPS::WriterIdSeq &)
void notify_subscription_reconnected (const DCPS::WriterIdSeq &)
void notify_subscription_lost (const DCPS::WriterIdSeq &)
void notify_connection_deleted (const DCPS::RepoId &)
void remove_associations (const DCPS::WriterIdSeq &, bool)
void listener_add_ref ()
void listener_remove_ref ()

Public Attributes

ACE_Atomic_Op< ACE_SYNCH_MUTEX,
long > 
shutting_down_

Detailed Description

Definition at line 208 of file Sedp.h.


Constructor & Destructor Documentation

OpenDDS::RTPS::Sedp::Reader::Reader ( const DCPS::RepoId sub_id,
Sedp sedp 
) [inline]

Definition at line 214 of file Sedp.h.

00215       : Endpoint(sub_id, sedp)
00216       , shutting_down_(0)
00217     {}

OpenDDS::RTPS::Sedp::Reader::~Reader (  )  [virtual]

Definition at line 1641 of file Sedp.cpp.

01642 {}


Member Function Documentation

bool OpenDDS::RTPS::Sedp::Reader::assoc ( const DCPS::AssociationData publication  ) 

Definition at line 1645 of file Sedp.cpp.

References OpenDDS::DCPS::TransportClient::associate().

01646 {
01647   return associate(publication, false);
01648 }

void OpenDDS::RTPS::Sedp::Reader::data_received ( const DCPS::ReceivedDataSample sample  )  [virtual]

Implements OpenDDS::DCPS::TransportReceiveListener.

Definition at line 1673 of file Sedp.cpp.

References OpenDDS::DCPS::Serializer::ALIGN_CDR, OpenDDS::RTPS::decode_parameter_list(), OpenDDS::DCPS::DISPOSE_INSTANCE, OpenDDS::DCPS::DISPOSE_UNREGISTER_INSTANCE, dummy, OpenDDS::RTPS::Sedp::Task::enqueue(), OpenDDS::DCPS::ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_WRITER, OpenDDS::DCPS::ENTITYID_SEDP_BUILTIN_PUBLICATIONS_WRITER, OpenDDS::DCPS::ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_WRITER, OpenDDS::RTPS::ParameterListConverter::from_param_list(), OpenDDS::DCPS::ReceivedDataSample::header_, OpenDDS::DCPS::ReceivedDataSample::sample_, OpenDDS::DCPS::SAMPLE_DATA, OpenDDS::RTPS::Sedp::Endpoint::sedp_, OpenDDS::RTPS::Sedp::set_inline_qos(), shutting_down_, OpenDDS::RTPS::Sedp::task_, and OpenDDS::DCPS::UNREGISTER_INSTANCE.

01674 {
01675   if (shutting_down_.value()) return;
01676 
01677   switch (sample.header_.message_id_) {
01678   case DCPS::SAMPLE_DATA:
01679   case DCPS::DISPOSE_INSTANCE:
01680   case DCPS::UNREGISTER_INSTANCE:
01681   case DCPS::DISPOSE_UNREGISTER_INSTANCE: {
01682     const DCPS::MessageId id =
01683       static_cast<DCPS::MessageId>(sample.header_.message_id_);
01684 
01685     DCPS::Serializer ser(sample.sample_,
01686                          sample.header_.byte_order_ != ACE_CDR_BYTE_ORDER,
01687                          DCPS::Serializer::ALIGN_CDR);
01688     bool ok = true;
01689     ACE_CDR::Octet encap, dummy;
01690     ACE_CDR::UShort options;
01691     ok &= (ser >> ACE_InputCDR::to_octet(dummy))
01692       && (ser >> ACE_InputCDR::to_octet(encap))
01693       && (ser >> options);
01694 
01695     // Ignore the 'encap' byte order since we use sample.header_.byte_order_
01696     // to determine whether or not to swap bytes.
01697 
01698     if (sample.header_.publication_id_.entityId == ENTITYID_SEDP_BUILTIN_PUBLICATIONS_WRITER) {
01699       ParameterList data;
01700       if (!decode_parameter_list(sample, ser, encap, data)) {
01701         ACE_DEBUG((LM_ERROR, ACE_TEXT("ERROR: Sedp::Reader::data_received - ")
01702                    ACE_TEXT("failed to deserialize data\n")));
01703         return;
01704       }
01705 
01706       ACE_Auto_Ptr<OpenDDS::DCPS::DiscoveredWriterData> wdata(new OpenDDS::DCPS::DiscoveredWriterData);
01707       if (ParameterListConverter::from_param_list(data, *wdata) < 0) {
01708         ACE_DEBUG((LM_ERROR,
01709                    ACE_TEXT("(%P|%t) ERROR: Sedp::Reader::data_received - ")
01710                    ACE_TEXT("failed to convert from ParameterList ")
01711                    ACE_TEXT("to DiscoveredWriterData\n")));
01712         return;
01713       }
01714       sedp_.task_.enqueue(id, wdata.release());
01715 
01716     } else if (sample.header_.publication_id_.entityId == ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_WRITER) {
01717       ParameterList data;
01718       if (!decode_parameter_list(sample, ser, encap, data)) {
01719         ACE_DEBUG((LM_ERROR, ACE_TEXT("ERROR: Sedp::Reader::data_received - ")
01720                    ACE_TEXT("failed to deserialize data\n")));
01721         return;
01722       }
01723 
01724       ACE_Auto_Ptr<OpenDDS::DCPS::DiscoveredReaderData> rdata(new OpenDDS::DCPS::DiscoveredReaderData);
01725       if (ParameterListConverter::from_param_list(data, *rdata) < 0) {
01726         ACE_DEBUG((LM_ERROR,
01727                    ACE_TEXT("(%P|%t) ERROR Sedp::Reader::data_received - ")
01728                    ACE_TEXT("failed to convert from ParameterList ")
01729                    ACE_TEXT("to DiscoveredReaderData\n")));
01730         return;
01731       }
01732       if (rdata->readerProxy.expectsInlineQos) {
01733         set_inline_qos(rdata->readerProxy.allLocators);
01734       }
01735       sedp_.task_.enqueue(id, rdata.release());
01736 
01737     } else if (sample.header_.publication_id_.entityId == ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_WRITER
01738                && !sample.header_.key_fields_only_) {
01739       ACE_Auto_Ptr<ParticipantMessageData> data(new ParticipantMessageData);
01740       if (!(ser >> *data)) {
01741         ACE_DEBUG((LM_ERROR, ACE_TEXT("ERROR: Sedp::Reader::data_received - ")
01742                    ACE_TEXT("failed to deserialize data\n")));
01743         return;
01744       }
01745 
01746       sedp_.task_.enqueue(id, data.release());
01747 
01748     }
01749   }
01750     break;
01751 
01752   default:
01753     break;
01754   }
01755 }

void OpenDDS::RTPS::Sedp::Reader::listener_add_ref (  )  [inline, virtual]

Implements OpenDDS::DCPS::TransportReceiveListener.

Definition at line 233 of file Sedp.h.

References OpenDDS::DCPS::RcObject< ACE_SYNCH_MUTEX >::_add_ref().

00233 { _add_ref(); }

void OpenDDS::RTPS::Sedp::Reader::listener_remove_ref (  )  [inline, virtual]

Implements OpenDDS::DCPS::TransportReceiveListener.

Definition at line 234 of file Sedp.h.

References OpenDDS::DCPS::RcObject< ACE_SYNCH_MUTEX >::_remove_ref().

00234 { _remove_ref(); }

void OpenDDS::RTPS::Sedp::Reader::notify_connection_deleted ( const DCPS::RepoId  )  [inline, virtual]

Implements OpenDDS::DCPS::TransportReceiveListener.

Definition at line 230 of file Sedp.h.

00230 {}

void OpenDDS::RTPS::Sedp::Reader::notify_subscription_disconnected ( const DCPS::WriterIdSeq  )  [inline, virtual]

Implements OpenDDS::DCPS::TransportReceiveListener.

Definition at line 227 of file Sedp.h.

00227 {}

void OpenDDS::RTPS::Sedp::Reader::notify_subscription_lost ( const DCPS::WriterIdSeq  )  [inline, virtual]

Implements OpenDDS::DCPS::TransportReceiveListener.

Definition at line 229 of file Sedp.h.

00229 {}

void OpenDDS::RTPS::Sedp::Reader::notify_subscription_reconnected ( const DCPS::WriterIdSeq  )  [inline, virtual]

Implements OpenDDS::DCPS::TransportReceiveListener.

Definition at line 228 of file Sedp.h.

00228 {}

void OpenDDS::RTPS::Sedp::Reader::remove_associations ( const DCPS::WriterIdSeq ,
bool   
) [inline, virtual]

Implements OpenDDS::DCPS::TransportReceiveListener.

Definition at line 231 of file Sedp.h.

00231 {}


Member Data Documentation

ACE_Atomic_Op<ACE_SYNCH_MUTEX, long> OpenDDS::RTPS::Sedp::Reader::shutting_down_

Definition at line 236 of file Sedp.h.

Referenced by data_received().


The documentation for this class was generated from the following files:
Generated on Fri Feb 12 20:06:50 2016 for OpenDDS by  doxygen 1.4.7