00001
00002
00003
00004
00005
00006
00007
00008 #include "DcpsInfo_pch.h"
00009
00010 #include "DCPS_IR_Topic.h"
00011 #include "DCPS_IR_Domain.h"
00012
00013 #include "DCPS_IR_Subscription.h"
00014 #include "DCPS_IR_Publication.h"
00015 #include "DCPS_IR_Participant.h"
00016 #include "DCPS_IR_Topic_Description.h"
00017
00018 #include "dds/DCPS/DCPS_Utils.h"
00019 #include "dds/DCPS/RepoIdConverter.h"
00020 #include "dds/DCPS/Qos_Helper.h"
00021 #include "tao/debug.h"
00022
00023 DCPS_IR_Topic::DCPS_IR_Topic(const OpenDDS::DCPS::RepoId& id,
00024 const DDS::TopicQos& qos,
00025 DCPS_IR_Domain* domain,
00026 DCPS_IR_Participant* creator,
00027 DCPS_IR_Topic_Description* description)
00028 : id_(id),
00029 qos_(qos),
00030 domain_(domain),
00031 participant_(creator),
00032 description_(description),
00033 handle_(0),
00034 isBIT_(0),
00035 removed_(false)
00036 {
00037 }
00038
00039 DCPS_IR_Topic::~DCPS_IR_Topic()
00040 {
00041
00042 if (0 != publicationRefs_.size()) {
00043 DCPS_IR_Publication* pub = 0;
00044 DCPS_IR_Publication_Set::ITERATOR iter = publicationRefs_.begin();
00045 DCPS_IR_Publication_Set::ITERATOR end = publicationRefs_.end();
00046
00047 OpenDDS::DCPS::RepoIdConverter topic_converter(id_);
00048 ACE_ERROR((LM_ERROR,
00049 ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Topic::~DCPS_IR_Topic: ")
00050 ACE_TEXT("id %C has retained publications.\n"),
00051 std::string(topic_converter).c_str()));
00052
00053 while (iter != end) {
00054 pub = *iter;
00055 ++iter;
00056
00057 OpenDDS::DCPS::RepoIdConverter pub_converter(pub->get_id());
00058 ACE_ERROR((LM_ERROR,
00059 ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Topic::~DCPS_IR_Topic: ")
00060 ACE_TEXT("topic %C retains publication id %C.\n"),
00061 std::string(topic_converter).c_str(),
00062 std::string(pub_converter).c_str()));
00063 }
00064 }
00065
00066 if (0 != subscriptionRefs_.size()) {
00067 DCPS_IR_Subscription* sub = 0;
00068 DCPS_IR_Subscription_Set::ITERATOR iter = subscriptionRefs_.begin();
00069 DCPS_IR_Subscription_Set::ITERATOR end = subscriptionRefs_.end();
00070
00071 OpenDDS::DCPS::RepoIdConverter topic_converter(id_);
00072 ACE_ERROR((LM_ERROR,
00073 ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Topic::~DCPS_IR_Topic: ")
00074 ACE_TEXT("id %C has retained subscriptions.\n"),
00075 std::string(topic_converter).c_str()));
00076
00077 while (iter != end) {
00078 sub = *iter;
00079 ++iter;
00080
00081 OpenDDS::DCPS::RepoIdConverter sub_converter(sub->get_id());
00082 ACE_ERROR((LM_ERROR,
00083 ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Topic::~DCPS_IR_Topic: ")
00084 ACE_TEXT("topic %C retains subscription id %C.\n"),
00085 std::string(topic_converter).c_str(),
00086 std::string(sub_converter).c_str()));
00087 }
00088 }
00089 }
00090
00091 void DCPS_IR_Topic::release(bool removing)
00092 {
00093 if (removing) {
00094 this->removed_ = true;
00095
00096 if (publicationRefs_.size() == 0 && subscriptionRefs_.size() == 0) {
00097 this->domain_->remove_topic_id_mapping(this->id_);
00098 delete this;
00099 }
00100
00101 } else if (this->removed_) {
00102 if (publicationRefs_.size() == 0 && subscriptionRefs_.size() == 0) {
00103 this->domain_->remove_topic_id_mapping(this->id_);
00104 delete this;
00105 }
00106 }
00107 }
00108
00109 int DCPS_IR_Topic::add_publication_reference(DCPS_IR_Publication* publication
00110 , bool associate)
00111 {
00112 int status = publicationRefs_.insert(publication);
00113
00114 switch (status) {
00115 case 0:
00116
00117
00118 domain_->publish_publication_bit(publication);
00119
00120 if (associate) {
00121 description_->try_associate_publication(publication);
00122
00123
00124 }
00125
00126 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00127 OpenDDS::DCPS::RepoIdConverter topic_converter(id_);
00128 OpenDDS::DCPS::RepoIdConverter pub_converter(publication->get_id());
00129 ACE_DEBUG((LM_DEBUG,
00130 ACE_TEXT("(%P|%t) DCPS_IR_Topic::add_publication_reference: ")
00131 ACE_TEXT("topic %C added publication %C at %x\n"),
00132 std::string(topic_converter).c_str(),
00133 std::string(pub_converter).c_str(),
00134 publication));
00135 }
00136
00137 break;
00138
00139 case 1:
00140
00141 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00142 OpenDDS::DCPS::RepoIdConverter topic_converter(id_);
00143 OpenDDS::DCPS::RepoIdConverter pub_converter(publication->get_id());
00144 ACE_DEBUG((LM_WARNING,
00145 ACE_TEXT("(%P|%t) WARNING: DCPS_IR_Topic::add_publication_reference: ")
00146 ACE_TEXT("topic %C attempt to re-add publication %C.\n"),
00147 std::string(topic_converter).c_str(),
00148 std::string(pub_converter).c_str()));
00149 }
00150
00151 break;
00152
00153 case -1: {
00154 OpenDDS::DCPS::RepoIdConverter topic_converter(id_);
00155 OpenDDS::DCPS::RepoIdConverter pub_converter(publication->get_id());
00156 ACE_ERROR((LM_ERROR,
00157 ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Topic::add_publication_reference: ")
00158 ACE_TEXT("topic %C failed to add publication %C\n"),
00159 std::string(topic_converter).c_str(),
00160 std::string(pub_converter).c_str()));
00161 }
00162 };
00163
00164 return status;
00165 }
00166
00167 int DCPS_IR_Topic::remove_publication_reference(DCPS_IR_Publication* publication)
00168 {
00169 int status = publicationRefs_.remove(publication);
00170
00171 if (0 == status) {
00172 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00173 OpenDDS::DCPS::RepoIdConverter topic_converter(id_);
00174 OpenDDS::DCPS::RepoIdConverter pub_converter(publication->get_id());
00175 ACE_DEBUG((LM_DEBUG,
00176 ACE_TEXT("(%P|%t) DCPS_IR_Topic::remove_publication_reference: ")
00177 ACE_TEXT("topic %C removed publication %C.\n"),
00178 std::string(topic_converter).c_str(),
00179 std::string(pub_converter).c_str()));
00180 }
00181
00182 } else {
00183 OpenDDS::DCPS::RepoIdConverter topic_converter(id_);
00184 OpenDDS::DCPS::RepoIdConverter pub_converter(publication->get_id());
00185 ACE_ERROR((LM_ERROR,
00186 ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Topic::remove_publication_reference: ")
00187 ACE_TEXT("topic %C failed to remove publication %C.\n"),
00188 std::string(topic_converter).c_str(),
00189 std::string(pub_converter).c_str()));
00190 }
00191
00192 return status;
00193 }
00194
00195 int DCPS_IR_Topic::add_subscription_reference(DCPS_IR_Subscription* subscription
00196 , bool associate)
00197 {
00198 int status = subscriptionRefs_.insert(subscription);
00199
00200 switch (status) {
00201 case 0:
00202
00203 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00204 OpenDDS::DCPS::RepoIdConverter topic_converter(id_);
00205 OpenDDS::DCPS::RepoIdConverter sub_converter(subscription->get_id());
00206 ACE_DEBUG((LM_DEBUG,
00207 ACE_TEXT("(%P|%t) DCPS_IR_Topic::add_subscription_reference: ")
00208 ACE_TEXT("topic %C added subscription %C at %x.\n"),
00209 std::string(topic_converter).c_str(),
00210 std::string(sub_converter).c_str(),
00211 subscription));
00212 }
00213
00214 status = this->description_->add_subscription_reference(subscription, associate);
00215 break;
00216
00217 case 1: {
00218 OpenDDS::DCPS::RepoIdConverter topic_converter(id_);
00219 OpenDDS::DCPS::RepoIdConverter sub_converter(subscription->get_id());
00220 ACE_ERROR((LM_ERROR,
00221 ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Topic::add_subscription_reference: ")
00222 ACE_TEXT("topic %C attempt to re-add subscription %C.\n"),
00223 std::string(topic_converter).c_str(),
00224 std::string(sub_converter).c_str()));
00225 }
00226 break;
00227
00228 case -1: {
00229 OpenDDS::DCPS::RepoIdConverter topic_converter(id_);
00230 OpenDDS::DCPS::RepoIdConverter sub_converter(subscription->get_id());
00231 ACE_ERROR((LM_ERROR,
00232 ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Topic::add_subscription_reference: ")
00233 ACE_TEXT("topic %C failed to add subscription %C.\n"),
00234 std::string(topic_converter).c_str(),
00235 std::string(sub_converter).c_str()));
00236 }
00237 };
00238
00239 return status;
00240 }
00241
00242 int DCPS_IR_Topic::remove_subscription_reference(DCPS_IR_Subscription* subscription)
00243 {
00244 int status = subscriptionRefs_.remove(subscription);
00245
00246 if (0 == status) {
00247 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00248 OpenDDS::DCPS::RepoIdConverter topic_converter(id_);
00249 OpenDDS::DCPS::RepoIdConverter sub_converter(subscription->get_id());
00250 ACE_DEBUG((LM_DEBUG,
00251 ACE_TEXT("(%P|%t) DCPS_IR_Topic::remove_subscription_reference: ")
00252 ACE_TEXT("topic %C removed subscription %C.\n"),
00253 std::string(topic_converter).c_str(),
00254 std::string(sub_converter).c_str()));
00255 }
00256
00257 this->description_->remove_subscription_reference(subscription);
00258
00259 } else {
00260 OpenDDS::DCPS::RepoIdConverter topic_converter(id_);
00261 OpenDDS::DCPS::RepoIdConverter sub_converter(subscription->get_id());
00262 ACE_ERROR((LM_ERROR,
00263 ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Topic::remove_subscription_reference: ")
00264 ACE_TEXT("topic %C failed to remove subscription %C.\n"),
00265 std::string(topic_converter).c_str(),
00266 std::string(sub_converter).c_str()));
00267 }
00268
00269 return status;
00270 }
00271
00272 OpenDDS::DCPS::RepoId DCPS_IR_Topic::get_id() const
00273 {
00274 return id_;
00275 }
00276
00277 OpenDDS::DCPS::RepoId DCPS_IR_Topic::get_participant_id() const
00278 {
00279 return participant_->get_id();
00280 }
00281
00282 DDS::TopicQos * DCPS_IR_Topic::get_topic_qos()
00283 {
00284 return &qos_;
00285 }
00286
00287 bool DCPS_IR_Topic::set_topic_qos(const DDS::TopicQos& qos)
00288 {
00289
00290
00291
00292 using OpenDDS::DCPS::operator==;
00293 bool pub_to_rd_wr = !(qos.topic_data == qos_.topic_data);
00294
00295 qos_ = qos;
00296 domain_->publish_topic_bit(this);
00297
00298 if (!pub_to_rd_wr)
00299 return true;
00300
00301
00302
00303
00304
00305
00306
00307 {
00308 DCPS_IR_Publication_Set::ITERATOR iter = publicationRefs_.begin();
00309 DCPS_IR_Publication_Set::ITERATOR end = publicationRefs_.end();
00310
00311 while (iter != end) {
00312 domain_->publish_publication_bit(*iter);
00313 ++iter;
00314 }
00315 }
00316
00317
00318
00319 {
00320 DCPS_IR_Subscription_Set::ITERATOR iter = subscriptionRefs_.begin();
00321 DCPS_IR_Subscription_Set::ITERATOR end = subscriptionRefs_.end();
00322
00323 while (iter != end) {
00324 domain_->publish_subscription_bit(*iter);
00325 ++iter;
00326 }
00327 }
00328
00329 return true;
00330 }
00331
00332 void DCPS_IR_Topic::try_associate(DCPS_IR_Subscription* subscription)
00333 {
00334
00335 if (participant_->is_subscription_ignored(subscription->get_id()) ||
00336 participant_->is_participant_ignored(subscription->get_participant_id()) ||
00337 participant_->is_topic_ignored(subscription->get_topic_id())) {
00338 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00339 OpenDDS::DCPS::RepoIdConverter topic_converter(id_);
00340 OpenDDS::DCPS::RepoIdConverter sub_converter(subscription->get_id());
00341 ACE_DEBUG((LM_DEBUG,
00342 ACE_TEXT("(%P|%t) DCPS_IR_Topic::try_associate: ")
00343 ACE_TEXT("topic %C ignoring subscription %C.\n"),
00344 std::string(topic_converter).c_str(),
00345 std::string(sub_converter).c_str()));
00346 }
00347
00348 } else {
00349
00350 DCPS_IR_Publication* pub = 0;
00351 OpenDDS::DCPS::IncompatibleQosStatus* qosStatus = 0;
00352
00353 DCPS_IR_Publication_Set::ITERATOR iter = publicationRefs_.begin();
00354 DCPS_IR_Publication_Set::ITERATOR end = publicationRefs_.end();
00355
00356 while (iter != end) {
00357 pub = *iter;
00358 ++iter;
00359 description_->try_associate(pub, subscription);
00360
00361 qosStatus = pub->get_incompatibleQosStatus();
00362
00363 if (0 < qosStatus->count_since_last_send) {
00364 pub->update_incompatible_qos();
00365 }
00366 }
00367
00368
00369
00370
00371 }
00372 }
00373
00374 DCPS_IR_Topic_Description* DCPS_IR_Topic::get_topic_description()
00375 {
00376 return description_;
00377 }
00378
00379 DDS::InstanceHandle_t DCPS_IR_Topic::get_handle()
00380 {
00381 return handle_;
00382 }
00383
00384 void DCPS_IR_Topic::set_handle(DDS::InstanceHandle_t handle)
00385 {
00386 handle_ = handle;
00387 }
00388
00389 CORBA::Boolean DCPS_IR_Topic::is_bit()
00390 {
00391 return isBIT_;
00392 }
00393
00394 void DCPS_IR_Topic::set_bit_status(CORBA::Boolean isBIT)
00395 {
00396 isBIT_ = isBIT;
00397 }
00398
00399 void DCPS_IR_Topic::reevaluate_associations(DCPS_IR_Subscription* subscription)
00400 {
00401 DCPS_IR_Publication * pub = 0;
00402 DCPS_IR_Publication_Set::ITERATOR iter = publicationRefs_.begin();
00403 DCPS_IR_Publication_Set::ITERATOR end = publicationRefs_.end();
00404
00405 while (iter != end) {
00406 pub = *iter;
00407 ++iter;
00408
00409 subscription->reevaluate_association(pub);
00410 pub->reevaluate_association(subscription);
00411 }
00412 }
00413
00414
00415 void DCPS_IR_Topic::reassociate_all_publications()
00416 {
00417 DCPS_IR_Publication_Set::ITERATOR iter = publicationRefs_.begin();
00418 DCPS_IR_Publication_Set::ITERATOR end = publicationRefs_.end();
00419
00420 for ( ; iter != end; ++iter)
00421 {
00422 description_->try_associate_publication(*iter);
00423 }
00424 }
00425
00426 std::string
00427 DCPS_IR_Topic::dump_to_string(const std::string& prefix, int depth) const
00428 {
00429 std::string str;
00430 #if !defined (OPENDDS_INFOREPO_REDUCED_FOOTPRINT)
00431 OpenDDS::DCPS::RepoIdConverter local_converter(id_);
00432
00433 for (int i=0; i < depth; i++)
00434 str += prefix;
00435 std::string indent = str + prefix;
00436 str += "DCPS_IR_Topic[";
00437 str += std::string(local_converter);
00438 str += "]";
00439 if (isBIT_)
00440 str += " (BIT)";
00441 str += "\n";
00442
00443 str += indent + "Publications:\n";
00444 for (DCPS_IR_Publication_Set::const_iterator pub = publicationRefs_.begin();
00445 pub != publicationRefs_.end();
00446 pub++)
00447 {
00448 OpenDDS::DCPS::RepoIdConverter pub_converter((*pub)->get_id());
00449 str += indent + std::string(pub_converter);
00450 str += "\n";
00451
00452 }
00453
00454 str += indent + "Subscriptions:\n";
00455 for (DCPS_IR_Subscription_Set::const_iterator sub = subscriptionRefs_.begin();
00456 sub != subscriptionRefs_.end();
00457 sub++)
00458 {
00459 OpenDDS::DCPS::RepoIdConverter sub_converter((*sub)->get_id());
00460 str += indent + std::string(sub_converter);
00461 str += "\n";
00462 }
00463 #endif // !defined (OPENDDS_INFOREPO_REDUCED_FOOTPRINT)
00464 return str;
00465 }
00466
00467 #if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
00468
00469 template class ACE_Node<DCPS_IR_Publication*>;
00470 template class ACE_Unbounded_Set<DCPS_IR_Publication*>;
00471 template class ACE_Unbounded_Set_Iterator<DCPS_IR_Publication*>;
00472
00473 #elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
00474
00475 #pragma instantiate ACE_Node<DCPS_IR_Publication*>
00476 #pragma instantiate ACE_Unbounded_Set<DCPS_IR_Publication*>
00477 #pragma instantiate ACE_Unbounded_Set_Iterator<DCPS_IR_Publication*>
00478
00479 #endif