00001
00002
00003
00004
00005
00006
00007
00008 #include "DCPS/DdsDcps_pch.h"
00009
00010 #ifndef DDS_HAS_MINIMUM_BIT
00011
00012 #include "BitPubListenerImpl.h"
00013 #include "DomainParticipantImpl.h"
00014 #include "GuidConverter.h"
00015 #include "Discovery.h"
00016 #include "Service_Participant.h"
00017 #include "BuiltInTopicUtils.h"
00018 #include "dds/DdsDcpsCoreTypeSupportImpl.h"
00019
00020 namespace OpenDDS {
00021 namespace DCPS {
00022
00023 BitPubListenerImpl::BitPubListenerImpl(DomainParticipantImpl* partipant)
00024 : partipant_ (partipant)
00025 {
00026 }
00027
00028 BitPubListenerImpl::~BitPubListenerImpl()
00029 {
00030 }
00031
00032 void BitPubListenerImpl::on_data_available(DDS::DataReader_ptr reader)
00033 {
00034 try {
00035 ::DDS::PublicationBuiltinTopicDataDataReader_var bit_dr =
00036 ::DDS::PublicationBuiltinTopicDataDataReader::_narrow(reader);
00037
00038 if (CORBA::is_nil(bit_dr.in())) {
00039 ACE_ERROR((LM_ERROR,
00040 ACE_TEXT("(%P|%t) ERROR: BitPubListenerImpl::on_data_available ")
00041 ACE_TEXT("_narrow failed!\n")));
00042 return;
00043 }
00044
00045 ::DDS::PublicationBuiltinTopicData data;
00046 DDS::SampleInfo si;
00047 DDS::ReturnCode_t status;
00048
00049 do {
00050 status = bit_dr->take_next_sample(data, si);
00051
00052 if (status == DDS::RETCODE_OK) {
00053 if (si.valid_data) {
00054 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00055 Discovery_rch disc =
00056 TheServiceParticipant->get_discovery(partipant_->get_domain_id());
00057 PublicationId pub_id =
00058 disc->bit_key_to_repo_id(partipant_, BUILT_IN_PUBLICATION_TOPIC, data.key);
00059 CORBA::Long const ownership_strength = data.ownership_strength.value;
00060 this->partipant_->update_ownership_strength(pub_id, ownership_strength);
00061 if (DCPS_debug_level > 4) {
00062 GuidConverter writer_converter(pub_id);
00063 ACE_DEBUG((LM_DEBUG,
00064 ACE_TEXT("(%P|%t) BitPubListenerImpl::on_data_available: %X ")
00065 ACE_TEXT("reset ownership strength %d for writer %C.\n"),
00066 this, ownership_strength, OPENDDS_STRING(writer_converter).c_str()));
00067 }
00068 #endif
00069 }
00070 else if (si.instance_state != DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE
00071 && si.instance_state != DDS::NOT_ALIVE_NO_WRITERS_INSTANCE_STATE) {
00072 ACE_ERROR((LM_ERROR,
00073 ACE_TEXT("(%P|%t) ERROR: BitPubListenerImpl::on_data_available:")
00074 ACE_TEXT(" unknown instance state: %d\n"),
00075 si.instance_state));
00076 }
00077 } else if (status != DDS::RETCODE_NO_DATA) {
00078 ACE_ERROR((LM_ERROR,
00079 ACE_TEXT("(%P|%t) ERROR: BitPubListenerImpl::on_data_available:")
00080 ACE_TEXT(" unexpected status: %d\n"),
00081 status));
00082 }
00083 } while (status == DDS::RETCODE_OK);
00084
00085 } catch (const CORBA::Exception& e) {
00086 e._tao_print_exception("Exception caught in BitPubListenerImpl::on_data_available():");
00087 }
00088 }
00089
00090 void BitPubListenerImpl::on_requested_deadline_missed(
00091 DDS::DataReader_ptr,
00092 const DDS::RequestedDeadlineMissedStatus &)
00093 {
00094 }
00095
00096 void BitPubListenerImpl::on_requested_incompatible_qos(
00097 DDS::DataReader_ptr,
00098 const DDS::RequestedIncompatibleQosStatus &)
00099 {
00100 }
00101
00102 void BitPubListenerImpl::on_liveliness_changed(
00103 DDS::DataReader_ptr,
00104 const DDS::LivelinessChangedStatus &)
00105 {
00106 }
00107
00108 void BitPubListenerImpl::on_subscription_matched(
00109 DDS::DataReader_ptr,
00110 const DDS::SubscriptionMatchedStatus &)
00111 {
00112 }
00113
00114 void BitPubListenerImpl::on_sample_rejected(
00115 DDS::DataReader_ptr,
00116 const DDS::SampleRejectedStatus&)
00117 {
00118 }
00119
00120 void BitPubListenerImpl::on_sample_lost(
00121 DDS::DataReader_ptr,
00122 const DDS::SampleLostStatus&)
00123 {
00124 }
00125
00126 }
00127 }
00128
00129 #endif // DDS_HAS_MINIMUM_BIT