00001
00002
00003
00004
00005
00006
00007
00008 #include "DCPS/DdsDcps_pch.h"
00009 #include "TopicImpl.h"
00010 #include "Qos_Helper.h"
00011 #include "FeatureDisabledQosCheck.h"
00012 #include "Definitions.h"
00013 #include "Service_Participant.h"
00014 #include "DomainParticipantImpl.h"
00015 #include "MonitorFactory.h"
00016 #include "dds/DCPS/transport/framework/TransportExceptions.h"
00017
00018 namespace OpenDDS {
00019 namespace DCPS {
00020
00021 TopicImpl::TopicImpl(const RepoId topic_id,
00022 const char* topic_name,
00023 const char* type_name,
00024 OpenDDS::DCPS::TypeSupport_ptr type_support,
00025 const DDS::TopicQos & qos,
00026 DDS::TopicListener_ptr a_listener,
00027 const DDS::StatusMask & mask,
00028 DomainParticipantImpl* participant)
00029 : TopicDescriptionImpl(topic_name,
00030 type_name,
00031 type_support,
00032 participant),
00033 qos_(qos),
00034 listener_mask_(mask),
00035 listener_(DDS::TopicListener::_duplicate(a_listener)),
00036 id_(topic_id),
00037 entity_refs_(0),
00038 monitor_(0)
00039 {
00040 inconsistent_topic_status_.total_count = 0;
00041 inconsistent_topic_status_.total_count_change = 0;
00042 monitor_ =
00043 TheServiceParticipant->monitor_factory_->create_topic_monitor(this);
00044 }
00045
00046 TopicImpl::~TopicImpl()
00047 {
00048 }
00049
00050 DDS::ReturnCode_t
00051 TopicImpl::set_qos(const DDS::TopicQos & qos)
00052 {
00053
00054 OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00055 OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00056 OPENDDS_NO_DURABILITY_SERVICE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00057 OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00058
00059 if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
00060 if (qos_ == qos)
00061 return DDS::RETCODE_OK;
00062
00063
00064 if (!Qos_Helper::changeable(qos_, qos) && enabled_ == true) {
00065 return DDS::RETCODE_IMMUTABLE_POLICY;
00066
00067 } else {
00068 qos_ = qos;
00069 DomainParticipantImpl* part =
00070 dynamic_cast<DomainParticipantImpl*>(this->participant_);
00071
00072 Discovery_rch disco =
00073 TheServiceParticipant->get_discovery(part->get_domain_id());
00074 const bool status =
00075 disco->update_topic_qos(this->id_, part->get_domain_id(),
00076 part->get_id(), qos_);
00077
00078 if (!status) {
00079 ACE_ERROR_RETURN((LM_ERROR,
00080 ACE_TEXT("(%P|%t) TopicImpl::set_qos, ")
00081 ACE_TEXT("failed on compatiblity check. \n")),
00082 DDS::RETCODE_ERROR);
00083 }
00084 }
00085
00086 return DDS::RETCODE_OK;
00087
00088 } else {
00089 return DDS::RETCODE_INCONSISTENT_POLICY;
00090 }
00091 }
00092
00093 DDS::ReturnCode_t
00094 TopicImpl::get_qos(DDS::TopicQos& qos)
00095 {
00096 qos = qos_;
00097 return DDS::RETCODE_OK;
00098 }
00099
00100 DDS::ReturnCode_t
00101 TopicImpl::set_listener(DDS::TopicListener_ptr a_listener, DDS::StatusMask mask)
00102 {
00103 listener_mask_ = mask;
00104
00105 listener_ = DDS::TopicListener::_duplicate(a_listener);
00106 return DDS::RETCODE_OK;
00107 }
00108
00109 DDS::TopicListener_ptr
00110 TopicImpl::get_listener()
00111 {
00112 return DDS::TopicListener::_duplicate(listener_.in());
00113 }
00114
00115 DDS::ReturnCode_t
00116 TopicImpl::get_inconsistent_topic_status(DDS::InconsistentTopicStatus& a_status)
00117 {
00118 set_status_changed_flag(DDS::INCONSISTENT_TOPIC_STATUS, false);
00119 a_status = inconsistent_topic_status_;
00120 inconsistent_topic_status_.total_count_change = 0;
00121 return DDS::RETCODE_OK;
00122 }
00123
00124 DDS::ReturnCode_t
00125 TopicImpl::enable()
00126 {
00127
00128
00129
00130
00131
00132
00133 if (this->is_enabled()) {
00134 return DDS::RETCODE_OK;
00135 }
00136
00137 DomainParticipantImpl* part =
00138 dynamic_cast<DomainParticipantImpl*>(this->participant_);
00139
00140 if (part->is_enabled() == false) {
00141 return DDS::RETCODE_PRECONDITION_NOT_MET;
00142 }
00143
00144 if (this->monitor_) {
00145 monitor_->report();
00146 }
00147 return this->set_enabled();
00148 }
00149
00150 RepoId
00151 TopicImpl::get_id() const
00152 {
00153 return id_;
00154 }
00155
00156 DDS::InstanceHandle_t
00157 TopicImpl::get_instance_handle()
00158 {
00159 return this->participant_->id_to_handle(this->id_);
00160 }
00161
00162 const char*
00163 TopicImpl::type_name() const
00164 {
00165 return this->type_name_.c_str();
00166 }
00167
00168 void
00169 TopicImpl::transport_config(const TransportConfig_rch&)
00170 {
00171 throw Transport::MiscProblem();
00172 }
00173
00174 void
00175 TopicImpl::inconsistent_topic()
00176 {
00177 ++inconsistent_topic_status_.total_count;
00178 ++inconsistent_topic_status_.total_count_change;
00179 set_status_changed_flag(DDS::INCONSISTENT_TOPIC_STATUS, true);
00180
00181 DDS::TopicListener_var listener = listener_;
00182 if (!listener || !(listener_mask_ & DDS::INCONSISTENT_TOPIC_STATUS)) {
00183 listener = participant_->listener_for(DDS::INCONSISTENT_TOPIC_STATUS);
00184 }
00185
00186 if (listener) {
00187 listener->on_inconsistent_topic(this, inconsistent_topic_status_);
00188 inconsistent_topic_status_.total_count_change = 0;
00189 }
00190
00191 notify_status_condition();
00192 }
00193
00194 }
00195 }