00001
00002
00003
00004
00005
00006
00007
00008 #include "DcpsInfo_pch.h"
00009 #include "DCPS_IR_Topic_Description.h"
00010
00011 #include "DCPS_IR_Subscription.h"
00012 #include "DCPS_IR_Publication.h"
00013
00014 #include "DCPS_IR_Topic.h"
00015 #include "DCPS_IR_Domain.h"
00016
00017 #include "dds/DCPS/DCPS_Utils.h"
00018
00019 #include "tao/debug.h"
00020
00021 #include "dds/DCPS/RepoIdConverter.h"
00022
00023 DCPS_IR_Topic_Description::DCPS_IR_Topic_Description(DCPS_IR_Domain* domain,
00024 const char* name,
00025 const char* dataTypeName)
00026 : name_(name),
00027 dataTypeName_(dataTypeName),
00028 domain_(domain)
00029 {
00030 }
00031
00032 DCPS_IR_Topic_Description::~DCPS_IR_Topic_Description()
00033 {
00034
00035 if (0 != subscriptionRefs_.size()) {
00036 DCPS_IR_Subscription* subscription = 0;
00037 DCPS_IR_Subscription_Set::ITERATOR iter = subscriptionRefs_.begin();
00038 DCPS_IR_Subscription_Set::ITERATOR end = subscriptionRefs_.end();
00039
00040 ACE_ERROR((LM_ERROR,
00041 ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Topic_Description::~DCPS_IR_Topic_Description: ")
00042 ACE_TEXT("topic description name %C data type name %C has retained subscriptions.\n"),
00043 name_.c_str(),
00044 dataTypeName_.c_str()));
00045
00046 while (iter != end) {
00047 subscription = *iter;
00048 ++iter;
00049
00050 OpenDDS::DCPS::RepoIdConverter converter(subscription->get_id());
00051 ACE_ERROR((LM_ERROR,
00052 ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Topic_Description::~DCPS_IR_Topic_Description: ")
00053 ACE_TEXT("topic description %C retains subscription %C.\n"),
00054 this->name_.c_str(),
00055 std::string(converter).c_str()));
00056 }
00057 }
00058
00059 if (0 != topics_.size()) {
00060 DCPS_IR_Topic* topic = 0;
00061 DCPS_IR_Topic_Set::ITERATOR iter = topics_.begin();
00062 DCPS_IR_Topic_Set::ITERATOR end = topics_.end();
00063
00064 ACE_ERROR((LM_ERROR,
00065 ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Topic_Description::~DCPS_IR_Topic_Description: ")
00066 ACE_TEXT("topic description name %C data type name %C has retained topics.\n"),
00067 name_.c_str(),
00068 dataTypeName_.c_str()));
00069
00070 while (iter != end) {
00071 topic = *iter;
00072 ++iter;
00073
00074 OpenDDS::DCPS::RepoIdConverter converter(topic->get_id());
00075 ACE_ERROR((LM_ERROR,
00076 ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Topic_Description::~DCPS_IR_Topic_Description: ")
00077 ACE_TEXT("topic description %C retains topic %C.\n"),
00078 this->name_.c_str(),
00079 std::string(converter).c_str()));
00080 }
00081 }
00082 }
00083
00084 int DCPS_IR_Topic_Description::add_subscription_reference(DCPS_IR_Subscription* subscription
00085 , bool associate)
00086 {
00087 int status = subscriptionRefs_.insert(subscription);
00088
00089 switch (status) {
00090 case 0:
00091
00092
00093 domain_->publish_subscription_bit(subscription);
00094
00095 if (associate) {
00096 try_associate_subscription(subscription);
00097
00098
00099 }
00100
00101 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00102 OpenDDS::DCPS::RepoIdConverter converter(subscription->get_id());
00103 ACE_DEBUG((LM_DEBUG,
00104 ACE_TEXT("(%P|%t) DCPS_IR_Topic_Description::add_subscription_reference: ")
00105 ACE_TEXT("topic description %C added subscription %C at %x\n"),
00106 this->name_.c_str(),
00107 std::string(converter).c_str(),
00108 subscription));
00109 }
00110
00111 break;
00112
00113 case 1: {
00114 OpenDDS::DCPS::RepoIdConverter converter(subscription->get_id());
00115 ACE_ERROR((LM_ERROR,
00116 ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Topic_Description::add_subscription_reference: ")
00117 ACE_TEXT("topic description %C attempt to re-add subscription %C.\n"),
00118 this->name_.c_str(),
00119 std::string(converter).c_str()));
00120 }
00121 break;
00122
00123 case -1: {
00124 OpenDDS::DCPS::RepoIdConverter converter(subscription->get_id());
00125 ACE_ERROR((LM_ERROR,
00126 ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Topic_Description::add_subscription_reference: ")
00127 ACE_TEXT("topic description %C failed to add subscription %C.\n"),
00128 this->name_.c_str(),
00129 std::string(converter).c_str()));
00130 }
00131 };
00132
00133 return status;
00134 }
00135
00136 int DCPS_IR_Topic_Description::remove_subscription_reference(DCPS_IR_Subscription* subscription)
00137 {
00138 int status = subscriptionRefs_.remove(subscription);
00139
00140 if (0 == status) {
00141 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00142 OpenDDS::DCPS::RepoIdConverter converter(subscription->get_id());
00143 ACE_DEBUG((LM_DEBUG,
00144 ACE_TEXT("(%P|%t) DCPS_IR_Topic_Description::remove_subscription_reference: ")
00145 ACE_TEXT("topic description %C removed subscription %C.\n"),
00146 this->name_.c_str(),
00147 std::string(converter).c_str()));
00148 }
00149
00150 } else {
00151 OpenDDS::DCPS::RepoIdConverter converter(subscription->get_id());
00152 ACE_ERROR((LM_ERROR,
00153 ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Topic_Description::remove_subscription_reference: ")
00154 ACE_TEXT("topic description %C failed to remove subscription %C.\n"),
00155 this->name_.c_str(),
00156 std::string(converter).c_str()));
00157 }
00158
00159 return status;
00160 }
00161
00162 int DCPS_IR_Topic_Description::add_topic(DCPS_IR_Topic* topic)
00163 {
00164 int status = topics_.insert(topic);
00165
00166 switch (status) {
00167 case 0:
00168
00169 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00170 OpenDDS::DCPS::RepoIdConverter converter(topic->get_id());
00171 ACE_DEBUG((LM_DEBUG,
00172 ACE_TEXT("(%P|%t) DCPS_IR_Topic_Description::add_topic: ")
00173 ACE_TEXT("topic description %C added topic %C at %x.\n"),
00174 this->name_.c_str(),
00175 std::string(converter).c_str(),
00176 topic));
00177 }
00178
00179 break;
00180 case 1:
00181
00182 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00183 OpenDDS::DCPS::RepoIdConverter converter(topic->get_id());
00184 ACE_DEBUG((LM_WARNING,
00185 ACE_TEXT("(%P|%t) WARNING: DCPS_IR_Topic_Description::add_topic: ")
00186 ACE_TEXT("topic description %C attempt to re-add topic %C.\n"),
00187 this->name_.c_str(),
00188 std::string(converter).c_str()));
00189 }
00190
00191 break;
00192 case -1: {
00193 OpenDDS::DCPS::RepoIdConverter converter(topic->get_id());
00194 ACE_ERROR((LM_ERROR,
00195 ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Topic_Description::add_topic: ")
00196 ACE_TEXT("topic description %C failed to add topic %C.\n"),
00197 this->name_.c_str(),
00198 std::string(converter).c_str()));
00199 }
00200 break;
00201 };
00202
00203 return status;
00204 }
00205
00206 int DCPS_IR_Topic_Description::remove_topic(DCPS_IR_Topic* topic)
00207 {
00208 int status = topics_.remove(topic);
00209
00210 if (0 == status) {
00211 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00212 OpenDDS::DCPS::RepoIdConverter converter(topic->get_id());
00213 ACE_DEBUG((LM_DEBUG,
00214 ACE_TEXT("(%P|%t) DCPS_IR_Topic_Description::remove_topic: ")
00215 ACE_TEXT("topic description %C removed topic %C.\n"),
00216 this->name_.c_str(),
00217 std::string(converter).c_str()));
00218 }
00219
00220 } else {
00221 OpenDDS::DCPS::RepoIdConverter converter(topic->get_id());
00222 ACE_ERROR((LM_ERROR,
00223 ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Topic_Description::remove_topic: ")
00224 ACE_TEXT("topic description failed to remove topic %C.\n"),
00225 this->name_.c_str(),
00226 std::string(converter).c_str()));
00227 }
00228
00229 return status;
00230 }
00231
00232 DCPS_IR_Topic* DCPS_IR_Topic_Description::get_first_topic()
00233 {
00234 DCPS_IR_Topic* topic = 0;
00235
00236 if (0 < topics_.size()) {
00237 DCPS_IR_Topic_Set::ITERATOR iter = topics_.begin();
00238 topic = *iter;
00239
00240 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00241 OpenDDS::DCPS::RepoIdConverter converter(topic->get_id());
00242 ACE_DEBUG((LM_DEBUG,
00243 ACE_TEXT("(%P|%t) DCPS_IR_Topic_Description::get_first_topic: ")
00244 ACE_TEXT("topic description %C first topic %C.\n"),
00245 this->name_.c_str(),
00246 std::string(converter).c_str()));
00247 }
00248 }
00249
00250 return topic;
00251 }
00252
00253 void DCPS_IR_Topic_Description::try_associate_publication(DCPS_IR_Publication* publication)
00254 {
00255
00256 DCPS_IR_Subscription* subscription = 0;
00257 OpenDDS::DCPS::IncompatibleQosStatus* qosStatus = 0;
00258
00259 DCPS_IR_Subscription_Set::ITERATOR iter = subscriptionRefs_.begin();
00260 DCPS_IR_Subscription_Set::ITERATOR end = subscriptionRefs_.end();
00261
00262 while (iter != end) {
00263 subscription = *iter;
00264 ++iter;
00265 try_associate(publication, subscription);
00266
00267
00268 qosStatus = subscription->get_incompatibleQosStatus();
00269
00270 if (0 < qosStatus->count_since_last_send) {
00271 subscription->update_incompatible_qos();
00272 }
00273 }
00274
00275
00276 qosStatus = publication->get_incompatibleQosStatus();
00277
00278 if (0 < qosStatus->count_since_last_send) {
00279 publication->update_incompatible_qos();
00280 }
00281 }
00282
00283 void DCPS_IR_Topic_Description::try_associate_subscription(DCPS_IR_Subscription* subscription)
00284 {
00285
00286
00287 DCPS_IR_Topic* topic = 0;
00288
00289 DCPS_IR_Topic_Set::ITERATOR iter = topics_.begin();
00290 DCPS_IR_Topic_Set::ITERATOR end = topics_.end();
00291
00292 while (iter != end) {
00293 topic = *iter;
00294 ++iter;
00295
00296 topic->try_associate(subscription);
00297 }
00298
00299
00300 OpenDDS::DCPS::IncompatibleQosStatus* qosStatus =
00301 subscription->get_incompatibleQosStatus();
00302
00303 if (0 < qosStatus->total_count) {
00304 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00305 OpenDDS::DCPS::RepoIdConverter converter(subscription->get_id());
00306 ACE_DEBUG((LM_DEBUG,
00307 ACE_TEXT("(%P|%t) DCPS_IR_Topic_Description::try_associate_subscription: ")
00308 ACE_TEXT("topic description %C has %d incompatible publications ")
00309 ACE_TEXT("with subscription %C.\n"),
00310 this->name_.c_str(),
00311 qosStatus->total_count,
00312 std::string(converter).c_str()));
00313 }
00314
00315 subscription->update_incompatible_qos();
00316 }
00317 }
00318
00319 bool
00320 DCPS_IR_Topic_Description::try_associate(DCPS_IR_Publication* publication,
00321 DCPS_IR_Subscription* subscription)
00322 {
00323 if (publication->is_subscription_ignored(subscription->get_participant_id(),
00324 subscription->get_topic_id(),
00325 subscription->get_id())) {
00326 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00327 OpenDDS::DCPS::RepoIdConverter pub_converter(publication->get_id());
00328 OpenDDS::DCPS::RepoIdConverter sub_converter(subscription->get_id());
00329 ACE_DEBUG((LM_DEBUG,
00330 ACE_TEXT("(%P|%t) DCPS_IR_Topic_Description::try_associate: ")
00331 ACE_TEXT("topic description %C publication %C ignores subscription %C.\n"),
00332 this->name_.c_str(),
00333 std::string(pub_converter).c_str(),
00334 std::string(sub_converter).c_str()));
00335 }
00336 }
00337
00338 else if (subscription->is_publication_ignored(publication->get_participant_id(),
00339 publication->get_topic_id(),
00340 publication->get_id())) {
00341 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00342 OpenDDS::DCPS::RepoIdConverter pub_converter(publication->get_id());
00343 OpenDDS::DCPS::RepoIdConverter sub_converter(subscription->get_id());
00344 ACE_DEBUG((LM_DEBUG,
00345 ACE_TEXT("(%P|%t) DCPS_IR_Topic_Description::try_associate: ")
00346 ACE_TEXT("topic description %C subscription %C ignores publication %C.\n"),
00347 this->name_.c_str(),
00348 std::string(pub_converter).c_str(),
00349 std::string(sub_converter).c_str()));
00350 }
00351
00352 } else {
00353 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00354 OpenDDS::DCPS::RepoIdConverter pub_converter(publication->get_id());
00355 OpenDDS::DCPS::RepoIdConverter sub_converter(subscription->get_id());
00356 ACE_DEBUG((LM_DEBUG,
00357 ACE_TEXT("(%P|%t) DCPS_IR_Topic_Description::try_associate: ")
00358 ACE_TEXT("topic description %C checking compatibility of ")
00359 ACE_TEXT("publication %C with subscription %C.\n"),
00360 this->name_.c_str(),
00361 std::string(pub_converter).c_str(),
00362 std::string(sub_converter).c_str()));
00363 }
00364
00365 if (OpenDDS::DCPS::compatibleQOS(publication->get_incompatibleQosStatus(),
00366 subscription->get_incompatibleQosStatus(),
00367 publication->get_transportLocatorSeq(),
00368 subscription->get_transportLocatorSeq(),
00369 publication->get_datawriter_qos(),
00370 subscription->get_datareader_qos(),
00371 publication->get_publisher_qos(),
00372 subscription->get_subscriber_qos())) {
00373 associate(publication, subscription);
00374 return true;
00375 }
00376
00377
00378
00379
00380 }
00381
00382 return false;
00383 }
00384
00385 void DCPS_IR_Topic_Description::associate(DCPS_IR_Publication* publication,
00386 DCPS_IR_Subscription* subscription)
00387 {
00388 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00389 OpenDDS::DCPS::RepoIdConverter pub_converter(publication->get_id());
00390 OpenDDS::DCPS::RepoIdConverter sub_converter(subscription->get_id());
00391 ACE_DEBUG((LM_DEBUG,
00392 ACE_TEXT("(%P|%t) DCPS_IR_Topic_Description::associate: ")
00393 ACE_TEXT("topic description %C associating ")
00394 ACE_TEXT("publication %C with subscription %C.\n"),
00395 this->name_.c_str(),
00396 std::string(pub_converter).c_str(),
00397 std::string(sub_converter).c_str()));
00398 }
00399
00400
00401
00402
00403
00404
00405
00406
00407
00408 int error = publication->add_associated_subscription(subscription, true);
00409
00410
00411
00412 if (error != -1) {
00413
00414 subscription->add_associated_publication(publication, false);
00415 } else {
00416 ACE_DEBUG((LM_INFO, ACE_TEXT("Invalid publication detected, NOT notifying subcription of association\n")));
00417 }
00418 }
00419
00420 void DCPS_IR_Topic_Description::reevaluate_associations(DCPS_IR_Subscription* subscription)
00421 {
00422 DCPS_IR_Topic* topic = 0;
00423
00424 DCPS_IR_Topic_Set::ITERATOR iter = topics_.begin();
00425 DCPS_IR_Topic_Set::ITERATOR end = topics_.end();
00426
00427 while (iter != end) {
00428 topic = *iter;
00429 ++iter;
00430
00431 topic->reevaluate_associations(subscription);
00432 }
00433 }
00434
00435 void DCPS_IR_Topic_Description::reevaluate_associations(DCPS_IR_Publication* publication)
00436 {
00437 DCPS_IR_Subscription * sub = 0;
00438 DCPS_IR_Subscription_Set::ITERATOR iter = subscriptionRefs_.begin();
00439 DCPS_IR_Subscription_Set::ITERATOR end = subscriptionRefs_.end();
00440
00441 while (iter != end) {
00442 sub = *iter;
00443 ++iter;
00444 publication->reevaluate_association(sub);
00445 sub->reevaluate_association(publication);
00446 }
00447 }
00448
00449 const char* DCPS_IR_Topic_Description::get_name() const
00450 {
00451 return name_.c_str();
00452 }
00453
00454 const char* DCPS_IR_Topic_Description::get_dataTypeName() const
00455 {
00456 return dataTypeName_.c_str();
00457 }
00458
00459 CORBA::ULong DCPS_IR_Topic_Description::get_number_topics() const
00460 {
00461 return static_cast<CORBA::ULong>(topics_.size());
00462 }
00463
00464 std::string
00465 DCPS_IR_Topic_Description::dump_to_string(const std::string& prefix,
00466 int depth) const
00467 {
00468 std::string str;
00469 #if !defined (OPENDDS_INFOREPO_REDUCED_FOOTPRINT)
00470 for (int i=0; i < depth; i++)
00471 str += prefix;
00472 std::string indent = str + prefix;
00473 str += "DCPS_IR_Topic_Description [";
00474 str += name_.c_str();
00475 str += "][";
00476 str += dataTypeName_.c_str();
00477 str += "]\n";
00478
00479 str += indent + "Subscription References [ ";
00480 for (DCPS_IR_Subscription_Set::const_iterator sub = subscriptionRefs_.begin();
00481 sub != subscriptionRefs_.end();
00482 sub++)
00483 {
00484 OpenDDS::DCPS::RepoIdConverter sub_converter((*sub)->get_id());
00485 str += std::string(sub_converter);
00486 str += " ";
00487 }
00488 str += "]\n";
00489
00490 str += indent + "Topics [ ";
00491 for (DCPS_IR_Topic_Set::const_iterator top = topics_.begin();
00492 top != topics_.end();
00493 top++)
00494 {
00495 OpenDDS::DCPS::RepoIdConverter top_converter((*top)->get_id());
00496 str += std::string(top_converter);
00497 str += " ";
00498 }
00499 str += "]\n";
00500 #endif // !defined (OPENDDS_INFOREPO_REDUCED_FOOTPRINT)
00501 return str;
00502 }
00503
00504 #if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
00505
00506 template class ACE_Node<DCPS_IR_Subscription*>;
00507 template class ACE_Unbounded_Set<DCPS_IR_Subscription*>;
00508 template class ACE_Unbounded_Set_Iterator<DCPS_IR_Subscription*>;
00509
00510 template class ACE_Node<DCPS_IR_Topic*>;
00511 template class ACE_Unbounded_Set<DCPS_IR_Topic*>;
00512 template class ACE_Unbounded_Set_Iterator<DCPS_IR_Topic*>;
00513
00514 #elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
00515
00516 #pragma instantiate ACE_Node<DCPS_IR_Subscription*>
00517 #pragma instantiate ACE_Unbounded_Set<DCPS_IR_Subscription*>
00518 #pragma instantiate ACE_Unbounded_Set_Iterator<DCPS_IR_Subscription*>
00519
00520 #pragma instantiate ACE_Node<DCPS_IR_Topic*>
00521 #pragma instantiate ACE_Unbounded_Set<DCPS_IR_Topic*>
00522 #pragma instantiate ACE_Unbounded_Set_Iterator<DCPS_IR_Topic*>
00523
00524 #endif