TopicImpl.cpp
Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008 #include "DCPS/DdsDcps_pch.h"
00009 #include "TopicImpl.h"
00010 #include "Qos_Helper.h"
00011 #include "FeatureDisabledQosCheck.h"
00012 #include "Definitions.h"
00013 #include "Service_Participant.h"
00014 #include "DomainParticipantImpl.h"
00015 #include "MonitorFactory.h"
00016 #include "dds/DCPS/transport/framework/TransportExceptions.h"
00017
00018 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00019
00020 namespace OpenDDS {
00021 namespace DCPS {
00022
00023 TopicImpl::TopicImpl(const RepoId topic_id,
00024 const char* topic_name,
00025 const char* type_name,
00026 OpenDDS::DCPS::TypeSupport_ptr type_support,
00027 const DDS::TopicQos & qos,
00028 DDS::TopicListener_ptr a_listener,
00029 const DDS::StatusMask & mask,
00030 DomainParticipantImpl* participant)
00031 : TopicDescriptionImpl(topic_name,
00032 type_name,
00033 type_support,
00034 participant),
00035 qos_(qos),
00036 listener_mask_(mask),
00037 listener_(DDS::TopicListener::_duplicate(a_listener)),
00038 id_(topic_id),
00039 monitor_(0)
00040 {
00041 inconsistent_topic_status_.total_count = 0;
00042 inconsistent_topic_status_.total_count_change = 0;
00043 monitor_ =
00044 TheServiceParticipant->monitor_factory_->create_topic_monitor(this);
00045 }
00046
00047 TopicImpl::~TopicImpl()
00048 {
00049 }
00050
00051 DDS::ReturnCode_t
00052 TopicImpl::set_qos(const DDS::TopicQos & qos)
00053 {
00054
00055 OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00056 OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00057 OPENDDS_NO_DURABILITY_SERVICE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00058 OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00059
00060 if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
00061 if (qos_ == qos)
00062 return DDS::RETCODE_OK;
00063
00064
00065 if (!Qos_Helper::changeable(qos_, qos) && enabled_ == true) {
00066 return DDS::RETCODE_IMMUTABLE_POLICY;
00067
00068 } else {
00069 qos_ = qos;
00070
00071 Discovery_rch disco =
00072 TheServiceParticipant->get_discovery(participant_->get_domain_id());
00073 const bool status =
00074 disco->update_topic_qos(this->id_, participant_->get_domain_id(),
00075 participant_->get_id(), qos_);
00076
00077 if (!status) {
00078 ACE_ERROR_RETURN((LM_ERROR,
00079 ACE_TEXT("(%P|%t) TopicImpl::set_qos, ")
00080 ACE_TEXT("failed on compatibility check. \n")),
00081 DDS::RETCODE_ERROR);
00082 }
00083 }
00084
00085 return DDS::RETCODE_OK;
00086
00087 } else {
00088 return DDS::RETCODE_INCONSISTENT_POLICY;
00089 }
00090 }
00091
00092 DDS::ReturnCode_t
00093 TopicImpl::get_qos(DDS::TopicQos& qos)
00094 {
00095 qos = qos_;
00096 return DDS::RETCODE_OK;
00097 }
00098
00099 DDS::ReturnCode_t
00100 TopicImpl::set_listener(DDS::TopicListener_ptr a_listener, DDS::StatusMask mask)
00101 {
00102 listener_mask_ = mask;
00103
00104 listener_ = DDS::TopicListener::_duplicate(a_listener);
00105 return DDS::RETCODE_OK;
00106 }
00107
00108 DDS::TopicListener_ptr
00109 TopicImpl::get_listener()
00110 {
00111 return DDS::TopicListener::_duplicate(listener_.in());
00112 }
00113
00114 DDS::ReturnCode_t
00115 TopicImpl::get_inconsistent_topic_status(DDS::InconsistentTopicStatus& a_status)
00116 {
00117 set_status_changed_flag(DDS::INCONSISTENT_TOPIC_STATUS, false);
00118 a_status = inconsistent_topic_status_;
00119 inconsistent_topic_status_.total_count_change = 0;
00120 return DDS::RETCODE_OK;
00121 }
00122
00123 DDS::ReturnCode_t
00124 TopicImpl::enable()
00125 {
00126
00127
00128
00129
00130
00131
00132 if (this->is_enabled()) {
00133 return DDS::RETCODE_OK;
00134 }
00135
00136 if (!this->participant_->is_enabled()) {
00137 return DDS::RETCODE_PRECONDITION_NOT_MET;
00138 }
00139
00140 if (id_ == GUID_UNKNOWN) {
00141 const DDS::DomainId_t dom_id = participant_->get_domain_id();
00142 Discovery_rch disco = TheServiceParticipant->get_discovery(dom_id);
00143 TopicStatus status = disco->assert_topic(id_,
00144 dom_id,
00145 participant_->get_id(),
00146 topic_name_.c_str(),
00147 type_name_.c_str(),
00148 qos_,
00149 type_support_->has_dcps_key());
00150 if (status != CREATED && status != FOUND) {
00151 ACE_ERROR((LM_ERROR,
00152 ACE_TEXT("(%P|%t) ERROR: TopicImpl::enable, ")
00153 ACE_TEXT("assert_topic failed with return value %d.\n"),
00154 status));
00155 return DDS::RETCODE_ERROR;
00156 }
00157 }
00158
00159 if (this->monitor_) {
00160 monitor_->report();
00161 }
00162 return this->set_enabled();
00163 }
00164
00165 RepoId
00166 TopicImpl::get_id() const
00167 {
00168 return id_;
00169 }
00170
00171 DDS::InstanceHandle_t
00172 TopicImpl::get_instance_handle()
00173 {
00174 return this->participant_->id_to_handle(this->id_);
00175 }
00176
00177 const char*
00178 TopicImpl::type_name() const
00179 {
00180 return this->type_name_.c_str();
00181 }
00182
00183 void
00184 TopicImpl::transport_config(const TransportConfig_rch&)
00185 {
00186 throw Transport::MiscProblem();
00187 }
00188
00189 void
00190 TopicImpl::inconsistent_topic()
00191 {
00192 ++inconsistent_topic_status_.total_count;
00193 ++inconsistent_topic_status_.total_count_change;
00194 set_status_changed_flag(DDS::INCONSISTENT_TOPIC_STATUS, true);
00195
00196 DDS::TopicListener_var listener = listener_;
00197 if (!listener || !(listener_mask_ & DDS::INCONSISTENT_TOPIC_STATUS)) {
00198 listener = participant_->listener_for(DDS::INCONSISTENT_TOPIC_STATUS);
00199 }
00200
00201 if (listener) {
00202 listener->on_inconsistent_topic(this, inconsistent_topic_status_);
00203 inconsistent_topic_status_.total_count_change = 0;
00204 }
00205
00206 notify_status_condition();
00207 }
00208
00209 }
00210 }
00211
00212 OPENDDS_END_VERSIONED_NAMESPACE_DECL