00001
00002
00003
00004
00005
00006
00007
00008 #include "DcpsInfo_pch.h"
00009
00010 #include "DCPS_IR_Topic.h"
00011 #include "DCPS_IR_Domain.h"
00012
00013 #include "DCPS_IR_Subscription.h"
00014 #include "DCPS_IR_Publication.h"
00015 #include "DCPS_IR_Participant.h"
00016 #include "DCPS_IR_Topic_Description.h"
00017
00018 #include "dds/DCPS/DCPS_Utils.h"
00019 #include "dds/DCPS/RepoIdConverter.h"
00020 #include "dds/DCPS/Qos_Helper.h"
00021 #include "tao/debug.h"
00022
00023 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00024
00025 DCPS_IR_Topic::DCPS_IR_Topic(const OpenDDS::DCPS::RepoId& id,
00026 const DDS::TopicQos& qos,
00027 DCPS_IR_Domain* domain,
00028 DCPS_IR_Participant* creator,
00029 DCPS_IR_Topic_Description* description,
00030 bool isBIT)
00031 : id_(id),
00032 qos_(qos),
00033 domain_(domain),
00034 participant_(creator),
00035 description_(description),
00036 handle_(0),
00037 isBIT_(isBIT),
00038 removed_(false)
00039 {
00040 }
00041
00042 DCPS_IR_Topic::~DCPS_IR_Topic()
00043 {
00044 }
00045
00046 void DCPS_IR_Topic::release(bool removing)
00047 {
00048 if (removing || this->removed_) {
00049 this->removed_ = true;
00050
00051 if (publicationRefs_.size() == 0 && subscriptionRefs_.size() == 0) {
00052 this->domain_->remove_topic_id_mapping(this->id_);
00053 }
00054 }
00055 }
00056
00057 int DCPS_IR_Topic::add_publication_reference(DCPS_IR_Publication* publication
00058 , bool associate)
00059 {
00060 int status = publicationRefs_.insert(publication);
00061
00062 switch (status) {
00063 case 0:
00064
00065
00066 domain_->publish_publication_bit(publication);
00067
00068 if (associate) {
00069 description_->try_associate_publication(publication);
00070
00071
00072 }
00073
00074 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00075 OpenDDS::DCPS::RepoIdConverter topic_converter(id_);
00076 OpenDDS::DCPS::RepoIdConverter pub_converter(publication->get_id());
00077 ACE_DEBUG((LM_DEBUG,
00078 ACE_TEXT("(%P|%t) DCPS_IR_Topic::add_publication_reference: ")
00079 ACE_TEXT("topic %C added publication %C at %x\n"),
00080 std::string(topic_converter).c_str(),
00081 std::string(pub_converter).c_str(),
00082 publication));
00083 }
00084
00085 break;
00086
00087 case 1:
00088
00089 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00090 OpenDDS::DCPS::RepoIdConverter topic_converter(id_);
00091 OpenDDS::DCPS::RepoIdConverter pub_converter(publication->get_id());
00092 ACE_DEBUG((LM_WARNING,
00093 ACE_TEXT("(%P|%t) WARNING: DCPS_IR_Topic::add_publication_reference: ")
00094 ACE_TEXT("topic %C attempt to re-add publication %C.\n"),
00095 std::string(topic_converter).c_str(),
00096 std::string(pub_converter).c_str()));
00097 }
00098
00099 break;
00100
00101 case -1: {
00102 OpenDDS::DCPS::RepoIdConverter topic_converter(id_);
00103 OpenDDS::DCPS::RepoIdConverter pub_converter(publication->get_id());
00104 ACE_ERROR((LM_ERROR,
00105 ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Topic::add_publication_reference: ")
00106 ACE_TEXT("topic %C failed to add publication %C\n"),
00107 std::string(topic_converter).c_str(),
00108 std::string(pub_converter).c_str()));
00109 }
00110 };
00111
00112 return status;
00113 }
00114
00115 int DCPS_IR_Topic::remove_publication_reference(DCPS_IR_Publication* publication)
00116 {
00117 int status = publicationRefs_.remove(publication);
00118
00119 if (0 == status) {
00120 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00121 OpenDDS::DCPS::RepoIdConverter topic_converter(id_);
00122 OpenDDS::DCPS::RepoIdConverter pub_converter(publication->get_id());
00123 ACE_DEBUG((LM_DEBUG,
00124 ACE_TEXT("(%P|%t) DCPS_IR_Topic::remove_publication_reference: ")
00125 ACE_TEXT("topic %C removed publication %C.\n"),
00126 std::string(topic_converter).c_str(),
00127 std::string(pub_converter).c_str()));
00128 }
00129
00130 } else {
00131 OpenDDS::DCPS::RepoIdConverter topic_converter(id_);
00132 OpenDDS::DCPS::RepoIdConverter pub_converter(publication->get_id());
00133 ACE_ERROR((LM_ERROR,
00134 ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Topic::remove_publication_reference: ")
00135 ACE_TEXT("topic %C failed to remove publication %C.\n"),
00136 std::string(topic_converter).c_str(),
00137 std::string(pub_converter).c_str()));
00138 }
00139
00140 return status;
00141 }
00142
00143 int DCPS_IR_Topic::add_subscription_reference(DCPS_IR_Subscription* subscription
00144 , bool associate)
00145 {
00146 int status = subscriptionRefs_.insert(subscription);
00147
00148 switch (status) {
00149 case 0:
00150
00151 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00152 OpenDDS::DCPS::RepoIdConverter topic_converter(id_);
00153 OpenDDS::DCPS::RepoIdConverter sub_converter(subscription->get_id());
00154 ACE_DEBUG((LM_DEBUG,
00155 ACE_TEXT("(%P|%t) DCPS_IR_Topic::add_subscription_reference: ")
00156 ACE_TEXT("topic %C added subscription %C at %x.\n"),
00157 std::string(topic_converter).c_str(),
00158 std::string(sub_converter).c_str(),
00159 subscription));
00160 }
00161
00162 status = this->description_->add_subscription_reference(subscription, associate);
00163 break;
00164
00165 case 1: {
00166 OpenDDS::DCPS::RepoIdConverter topic_converter(id_);
00167 OpenDDS::DCPS::RepoIdConverter sub_converter(subscription->get_id());
00168 ACE_ERROR((LM_ERROR,
00169 ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Topic::add_subscription_reference: ")
00170 ACE_TEXT("topic %C attempt to re-add subscription %C.\n"),
00171 std::string(topic_converter).c_str(),
00172 std::string(sub_converter).c_str()));
00173 }
00174 break;
00175
00176 case -1: {
00177 OpenDDS::DCPS::RepoIdConverter topic_converter(id_);
00178 OpenDDS::DCPS::RepoIdConverter sub_converter(subscription->get_id());
00179 ACE_ERROR((LM_ERROR,
00180 ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Topic::add_subscription_reference: ")
00181 ACE_TEXT("topic %C failed to add subscription %C.\n"),
00182 std::string(topic_converter).c_str(),
00183 std::string(sub_converter).c_str()));
00184 }
00185 };
00186
00187 return status;
00188 }
00189
00190 int DCPS_IR_Topic::remove_subscription_reference(DCPS_IR_Subscription* subscription)
00191 {
00192 int status = subscriptionRefs_.remove(subscription);
00193
00194 if (0 == status) {
00195 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00196 OpenDDS::DCPS::RepoIdConverter topic_converter(id_);
00197 OpenDDS::DCPS::RepoIdConverter sub_converter(subscription->get_id());
00198 ACE_DEBUG((LM_DEBUG,
00199 ACE_TEXT("(%P|%t) DCPS_IR_Topic::remove_subscription_reference: ")
00200 ACE_TEXT("topic %C removed subscription %C.\n"),
00201 std::string(topic_converter).c_str(),
00202 std::string(sub_converter).c_str()));
00203 }
00204
00205 this->description_->remove_subscription_reference(subscription);
00206
00207 } else {
00208 OpenDDS::DCPS::RepoIdConverter topic_converter(id_);
00209 OpenDDS::DCPS::RepoIdConverter sub_converter(subscription->get_id());
00210 ACE_ERROR((LM_ERROR,
00211 ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Topic::remove_subscription_reference: ")
00212 ACE_TEXT("topic %C failed to remove subscription %C.\n"),
00213 std::string(topic_converter).c_str(),
00214 std::string(sub_converter).c_str()));
00215 }
00216
00217 return status;
00218 }
00219
00220 OpenDDS::DCPS::RepoId DCPS_IR_Topic::get_id() const
00221 {
00222 return id_;
00223 }
00224
00225 OpenDDS::DCPS::RepoId DCPS_IR_Topic::get_participant_id() const
00226 {
00227 return participant_->get_id();
00228 }
00229
00230 DDS::TopicQos * DCPS_IR_Topic::get_topic_qos()
00231 {
00232 return &qos_;
00233 }
00234
00235 bool DCPS_IR_Topic::set_topic_qos(const DDS::TopicQos& qos)
00236 {
00237
00238
00239
00240 using OpenDDS::DCPS::operator==;
00241 bool pub_to_rd_wr = !(qos.topic_data == qos_.topic_data);
00242
00243 qos_ = qos;
00244 domain_->publish_topic_bit(this);
00245
00246 if (!pub_to_rd_wr)
00247 return true;
00248
00249
00250
00251
00252
00253
00254
00255 {
00256 DCPS_IR_Publication_Set::ITERATOR iter = publicationRefs_.begin();
00257 DCPS_IR_Publication_Set::ITERATOR end = publicationRefs_.end();
00258
00259 while (iter != end) {
00260 domain_->publish_publication_bit(*iter);
00261 ++iter;
00262 }
00263 }
00264
00265
00266
00267 {
00268 DCPS_IR_Subscription_Set::ITERATOR iter = subscriptionRefs_.begin();
00269 DCPS_IR_Subscription_Set::ITERATOR end = subscriptionRefs_.end();
00270
00271 while (iter != end) {
00272 domain_->publish_subscription_bit(*iter);
00273 ++iter;
00274 }
00275 }
00276
00277 return true;
00278 }
00279
00280 void DCPS_IR_Topic::try_associate(DCPS_IR_Subscription* subscription)
00281 {
00282
00283 if (participant_->is_subscription_ignored(subscription->get_id()) ||
00284 participant_->is_participant_ignored(subscription->get_participant_id()) ||
00285 participant_->is_topic_ignored(subscription->get_topic_id())) {
00286 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00287 OpenDDS::DCPS::RepoIdConverter topic_converter(id_);
00288 OpenDDS::DCPS::RepoIdConverter sub_converter(subscription->get_id());
00289 ACE_DEBUG((LM_DEBUG,
00290 ACE_TEXT("(%P|%t) DCPS_IR_Topic::try_associate: ")
00291 ACE_TEXT("topic %C ignoring subscription %C.\n"),
00292 std::string(topic_converter).c_str(),
00293 std::string(sub_converter).c_str()));
00294 }
00295
00296 } else {
00297
00298 DCPS_IR_Publication* pub = 0;
00299 OpenDDS::DCPS::IncompatibleQosStatus* qosStatus = 0;
00300
00301 DCPS_IR_Publication_Set::ITERATOR iter = publicationRefs_.begin();
00302 DCPS_IR_Publication_Set::ITERATOR end = publicationRefs_.end();
00303
00304 while (iter != end) {
00305 pub = *iter;
00306 ++iter;
00307 description_->try_associate(pub, subscription);
00308
00309 qosStatus = pub->get_incompatibleQosStatus();
00310
00311 if (0 < qosStatus->count_since_last_send) {
00312 pub->update_incompatible_qos();
00313 }
00314 }
00315
00316
00317
00318
00319 }
00320 }
00321
00322 DCPS_IR_Topic_Description* DCPS_IR_Topic::get_topic_description()
00323 {
00324 return description_;
00325 }
00326
00327 DDS::InstanceHandle_t DCPS_IR_Topic::get_handle()
00328 {
00329 return handle_;
00330 }
00331
00332 void DCPS_IR_Topic::set_handle(DDS::InstanceHandle_t handle)
00333 {
00334 handle_ = handle;
00335 }
00336
00337 CORBA::Boolean DCPS_IR_Topic::is_bit()
00338 {
00339 return isBIT_;
00340 }
00341
00342 void DCPS_IR_Topic::set_bit_status(CORBA::Boolean isBIT)
00343 {
00344 isBIT_ = isBIT;
00345 }
00346
00347 void DCPS_IR_Topic::reevaluate_associations(DCPS_IR_Subscription* subscription)
00348 {
00349 DCPS_IR_Publication * pub = 0;
00350 DCPS_IR_Publication_Set::ITERATOR iter = publicationRefs_.begin();
00351 DCPS_IR_Publication_Set::ITERATOR end = publicationRefs_.end();
00352
00353 while (iter != end) {
00354 pub = *iter;
00355 ++iter;
00356
00357 subscription->reevaluate_association(pub);
00358 pub->reevaluate_association(subscription);
00359 }
00360 }
00361
00362
00363 void DCPS_IR_Topic::reassociate_all_publications()
00364 {
00365 DCPS_IR_Publication_Set::ITERATOR iter = publicationRefs_.begin();
00366 DCPS_IR_Publication_Set::ITERATOR end = publicationRefs_.end();
00367
00368 for ( ; iter != end; ++iter)
00369 {
00370 description_->try_associate_publication(*iter);
00371 }
00372 }
00373
00374 std::string
00375 DCPS_IR_Topic::dump_to_string(const std::string& prefix, int depth) const
00376 {
00377 std::string str;
00378 #if !defined (OPENDDS_INFOREPO_REDUCED_FOOTPRINT)
00379 OpenDDS::DCPS::RepoIdConverter local_converter(id_);
00380
00381 for (int i=0; i < depth; i++)
00382 str += prefix;
00383 std::string indent = str + prefix;
00384 str += "DCPS_IR_Topic[";
00385 str += std::string(local_converter);
00386 str += "]";
00387 if (isBIT_)
00388 str += " (BIT)";
00389 str += "\n";
00390
00391 str += indent + "Publications:\n";
00392 for (DCPS_IR_Publication_Set::const_iterator pub = publicationRefs_.begin();
00393 pub != publicationRefs_.end();
00394 pub++)
00395 {
00396 OpenDDS::DCPS::RepoIdConverter pub_converter((*pub)->get_id());
00397 str += indent + std::string(pub_converter);
00398 str += "\n";
00399
00400 }
00401
00402 str += indent + "Subscriptions:\n";
00403 for (DCPS_IR_Subscription_Set::const_iterator sub = subscriptionRefs_.begin();
00404 sub != subscriptionRefs_.end();
00405 sub++)
00406 {
00407 OpenDDS::DCPS::RepoIdConverter sub_converter((*sub)->get_id());
00408 str += indent + std::string(sub_converter);
00409 str += "\n";
00410 }
00411 #endif // !defined (OPENDDS_INFOREPO_REDUCED_FOOTPRINT)
00412 return str;
00413 }
00414
00415 OPENDDS_END_VERSIONED_NAMESPACE_DECL