BitPubListenerImpl.cpp
Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008 #include "DCPS/DdsDcps_pch.h"
00009
00010 #ifndef DDS_HAS_MINIMUM_BIT
00011
00012 #include "BitPubListenerImpl.h"
00013 #include "DomainParticipantImpl.h"
00014 #include "GuidConverter.h"
00015 #include "Discovery.h"
00016 #include "Service_Participant.h"
00017 #include "BuiltInTopicUtils.h"
00018 #include "dds/DdsDcpsCoreTypeSupportImpl.h"
00019
00020 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00021
00022 namespace OpenDDS {
00023 namespace DCPS {
00024
00025 BitPubListenerImpl::BitPubListenerImpl(DomainParticipantImpl* partipant)
00026 : partipant_ (partipant)
00027 {
00028 }
00029
00030 BitPubListenerImpl::~BitPubListenerImpl()
00031 {
00032 }
00033
00034 void BitPubListenerImpl::on_data_available(DDS::DataReader_ptr reader)
00035 {
00036 try {
00037 ::DDS::PublicationBuiltinTopicDataDataReader_var bit_dr =
00038 ::DDS::PublicationBuiltinTopicDataDataReader::_narrow(reader);
00039
00040 if (CORBA::is_nil(bit_dr.in())) {
00041 ACE_ERROR((LM_ERROR,
00042 ACE_TEXT("(%P|%t) ERROR: BitPubListenerImpl::on_data_available ")
00043 ACE_TEXT("_narrow failed!\n")));
00044 return;
00045 }
00046
00047 ::DDS::PublicationBuiltinTopicData data;
00048 DDS::SampleInfo si;
00049 DDS::ReturnCode_t status;
00050
00051 do {
00052 status = bit_dr->take_next_sample(data, si);
00053
00054 if (status == DDS::RETCODE_OK) {
00055 if (si.valid_data) {
00056 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00057 Discovery_rch disc =
00058 TheServiceParticipant->get_discovery(partipant_->get_domain_id());
00059 PublicationId pub_id =
00060 disc->bit_key_to_repo_id(partipant_, BUILT_IN_PUBLICATION_TOPIC, data.key);
00061 CORBA::Long const ownership_strength = data.ownership_strength.value;
00062 this->partipant_->update_ownership_strength(pub_id, ownership_strength);
00063 if (DCPS_debug_level > 4) {
00064 GuidConverter writer_converter(pub_id);
00065 ACE_DEBUG((LM_DEBUG,
00066 ACE_TEXT("(%P|%t) BitPubListenerImpl::on_data_available: %X ")
00067 ACE_TEXT("reset ownership strength %d for writer %C.\n"),
00068 this, ownership_strength, OPENDDS_STRING(writer_converter).c_str()));
00069 }
00070 #endif
00071 }
00072 else if (si.instance_state != DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE
00073 && si.instance_state != DDS::NOT_ALIVE_NO_WRITERS_INSTANCE_STATE) {
00074 ACE_ERROR((LM_ERROR,
00075 ACE_TEXT("(%P|%t) ERROR: BitPubListenerImpl::on_data_available:")
00076 ACE_TEXT(" unknown instance state: %d\n"),
00077 si.instance_state));
00078 }
00079 } else if (status != DDS::RETCODE_NO_DATA) {
00080 ACE_ERROR((LM_ERROR,
00081 ACE_TEXT("(%P|%t) ERROR: BitPubListenerImpl::on_data_available:")
00082 ACE_TEXT(" unexpected status: %d\n"),
00083 status));
00084 }
00085 } while (status == DDS::RETCODE_OK);
00086
00087 } catch (const CORBA::Exception& e) {
00088 e._tao_print_exception("Exception caught in BitPubListenerImpl::on_data_available():");
00089 }
00090 }
00091
00092 void BitPubListenerImpl::on_requested_deadline_missed(
00093 DDS::DataReader_ptr,
00094 const DDS::RequestedDeadlineMissedStatus &)
00095 {
00096 }
00097
00098 void BitPubListenerImpl::on_requested_incompatible_qos(
00099 DDS::DataReader_ptr,
00100 const DDS::RequestedIncompatibleQosStatus &)
00101 {
00102 }
00103
00104 void BitPubListenerImpl::on_liveliness_changed(
00105 DDS::DataReader_ptr,
00106 const DDS::LivelinessChangedStatus &)
00107 {
00108 }
00109
00110 void BitPubListenerImpl::on_subscription_matched(
00111 DDS::DataReader_ptr,
00112 const DDS::SubscriptionMatchedStatus &)
00113 {
00114 }
00115
00116 void BitPubListenerImpl::on_sample_rejected(
00117 DDS::DataReader_ptr,
00118 const DDS::SampleRejectedStatus&)
00119 {
00120 }
00121
00122 void BitPubListenerImpl::on_sample_lost(
00123 DDS::DataReader_ptr,
00124 const DDS::SampleLostStatus&)
00125 {
00126 }
00127
00128 }
00129 }
00130
00131 OPENDDS_END_VERSIONED_NAMESPACE_DECL
00132
00133 #endif // DDS_HAS_MINIMUM_BIT