00001
00002
00003
00004
00005
00006
00007
00008 #include "DcpsInfo_pch.h"
00009 #include "DCPS_IR_Topic_Description.h"
00010
00011 #include "DCPS_IR_Subscription.h"
00012 #include "DCPS_IR_Publication.h"
00013
00014 #include "DCPS_IR_Topic.h"
00015 #include "DCPS_IR_Domain.h"
00016
00017 #include "dds/DCPS/DCPS_Utils.h"
00018
00019 #include "tao/debug.h"
00020
00021 #include "dds/DCPS/RepoIdConverter.h"
00022
00023 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00024
00025 DCPS_IR_Topic_Description::DCPS_IR_Topic_Description(DCPS_IR_Domain* domain,
00026 const char* name,
00027 const char* dataTypeName)
00028 : name_(name),
00029 dataTypeName_(dataTypeName),
00030 domain_(domain)
00031 {
00032 }
00033
00034 DCPS_IR_Topic_Description::~DCPS_IR_Topic_Description()
00035 {
00036 }
00037
00038 int DCPS_IR_Topic_Description::add_subscription_reference(DCPS_IR_Subscription* subscription
00039 , bool associate)
00040 {
00041 int status = subscriptionRefs_.insert(subscription);
00042
00043 switch (status) {
00044 case 0:
00045
00046
00047 domain_->publish_subscription_bit(subscription);
00048
00049 if (associate) {
00050 try_associate_subscription(subscription);
00051
00052
00053 }
00054
00055 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00056 OpenDDS::DCPS::RepoIdConverter converter(subscription->get_id());
00057 ACE_DEBUG((LM_DEBUG,
00058 ACE_TEXT("(%P|%t) DCPS_IR_Topic_Description::add_subscription_reference: ")
00059 ACE_TEXT("topic description %C added subscription %C at %x\n"),
00060 this->name_.c_str(),
00061 std::string(converter).c_str(),
00062 subscription));
00063 }
00064
00065 break;
00066
00067 case 1: {
00068 OpenDDS::DCPS::RepoIdConverter converter(subscription->get_id());
00069 ACE_ERROR((LM_ERROR,
00070 ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Topic_Description::add_subscription_reference: ")
00071 ACE_TEXT("topic description %C attempt to re-add subscription %C.\n"),
00072 this->name_.c_str(),
00073 std::string(converter).c_str()));
00074 }
00075 break;
00076
00077 case -1: {
00078 OpenDDS::DCPS::RepoIdConverter converter(subscription->get_id());
00079 ACE_ERROR((LM_ERROR,
00080 ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Topic_Description::add_subscription_reference: ")
00081 ACE_TEXT("topic description %C failed to add subscription %C.\n"),
00082 this->name_.c_str(),
00083 std::string(converter).c_str()));
00084 }
00085 };
00086
00087 return status;
00088 }
00089
00090 int DCPS_IR_Topic_Description::remove_subscription_reference(DCPS_IR_Subscription* subscription)
00091 {
00092 int status = subscriptionRefs_.remove(subscription);
00093
00094 if (0 == status) {
00095 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00096 OpenDDS::DCPS::RepoIdConverter converter(subscription->get_id());
00097 ACE_DEBUG((LM_DEBUG,
00098 ACE_TEXT("(%P|%t) DCPS_IR_Topic_Description::remove_subscription_reference: ")
00099 ACE_TEXT("topic description %C removed subscription %C.\n"),
00100 this->name_.c_str(),
00101 std::string(converter).c_str()));
00102 }
00103
00104 } else {
00105 OpenDDS::DCPS::RepoIdConverter converter(subscription->get_id());
00106 ACE_ERROR((LM_ERROR,
00107 ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Topic_Description::remove_subscription_reference: ")
00108 ACE_TEXT("topic description %C failed to remove subscription %C.\n"),
00109 this->name_.c_str(),
00110 std::string(converter).c_str()));
00111 }
00112
00113 return status;
00114 }
00115
00116 int DCPS_IR_Topic_Description::add_topic(DCPS_IR_Topic* topic)
00117 {
00118 int status = topics_.insert(topic);
00119
00120 switch (status) {
00121 case 0:
00122
00123 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00124 OpenDDS::DCPS::RepoIdConverter converter(topic->get_id());
00125 ACE_DEBUG((LM_DEBUG,
00126 ACE_TEXT("(%P|%t) DCPS_IR_Topic_Description::add_topic: ")
00127 ACE_TEXT("topic description %C added topic %C at %x.\n"),
00128 this->name_.c_str(),
00129 std::string(converter).c_str(),
00130 topic));
00131 }
00132
00133 break;
00134 case 1:
00135
00136 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00137 OpenDDS::DCPS::RepoIdConverter converter(topic->get_id());
00138 ACE_DEBUG((LM_WARNING,
00139 ACE_TEXT("(%P|%t) WARNING: DCPS_IR_Topic_Description::add_topic: ")
00140 ACE_TEXT("topic description %C attempt to re-add topic %C.\n"),
00141 this->name_.c_str(),
00142 std::string(converter).c_str()));
00143 }
00144
00145 break;
00146 case -1: {
00147 OpenDDS::DCPS::RepoIdConverter converter(topic->get_id());
00148 ACE_ERROR((LM_ERROR,
00149 ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Topic_Description::add_topic: ")
00150 ACE_TEXT("topic description %C failed to add topic %C.\n"),
00151 this->name_.c_str(),
00152 std::string(converter).c_str()));
00153 }
00154 break;
00155 };
00156
00157 return status;
00158 }
00159
00160 int DCPS_IR_Topic_Description::remove_topic(DCPS_IR_Topic* topic)
00161 {
00162 int status = topics_.remove(topic);
00163
00164 if (0 == status) {
00165 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00166 OpenDDS::DCPS::RepoIdConverter converter(topic->get_id());
00167 ACE_DEBUG((LM_DEBUG,
00168 ACE_TEXT("(%P|%t) DCPS_IR_Topic_Description::remove_topic: ")
00169 ACE_TEXT("topic description %C removed topic %C.\n"),
00170 this->name_.c_str(),
00171 std::string(converter).c_str()));
00172 }
00173
00174 } else {
00175 OpenDDS::DCPS::RepoIdConverter converter(topic->get_id());
00176 ACE_ERROR((LM_ERROR,
00177 ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Topic_Description::remove_topic: ")
00178 ACE_TEXT("topic description failed to remove topic %C.\n"),
00179 this->name_.c_str(),
00180 std::string(converter).c_str()));
00181 }
00182
00183 return status;
00184 }
00185
00186 DCPS_IR_Topic* DCPS_IR_Topic_Description::get_first_topic()
00187 {
00188 DCPS_IR_Topic* topic = 0;
00189
00190 if (0 < topics_.size()) {
00191 DCPS_IR_Topic_Set::ITERATOR iter = topics_.begin();
00192 topic = *iter;
00193
00194 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00195 OpenDDS::DCPS::RepoIdConverter converter(topic->get_id());
00196 ACE_DEBUG((LM_DEBUG,
00197 ACE_TEXT("(%P|%t) DCPS_IR_Topic_Description::get_first_topic: ")
00198 ACE_TEXT("topic description %C first topic %C.\n"),
00199 this->name_.c_str(),
00200 std::string(converter).c_str()));
00201 }
00202 }
00203
00204 return topic;
00205 }
00206
00207 void DCPS_IR_Topic_Description::try_associate_publication(DCPS_IR_Publication* publication)
00208 {
00209
00210 DCPS_IR_Subscription* subscription = 0;
00211 OpenDDS::DCPS::IncompatibleQosStatus* qosStatus = 0;
00212
00213 DCPS_IR_Subscription_Set::ITERATOR iter = subscriptionRefs_.begin();
00214 DCPS_IR_Subscription_Set::ITERATOR end = subscriptionRefs_.end();
00215
00216 while (iter != end) {
00217 subscription = *iter;
00218 ++iter;
00219 try_associate(publication, subscription);
00220
00221
00222 qosStatus = subscription->get_incompatibleQosStatus();
00223
00224 if (0 < qosStatus->count_since_last_send) {
00225 subscription->update_incompatible_qos();
00226 }
00227 }
00228
00229
00230 qosStatus = publication->get_incompatibleQosStatus();
00231
00232 if (0 < qosStatus->count_since_last_send) {
00233 publication->update_incompatible_qos();
00234 }
00235 }
00236
00237 void DCPS_IR_Topic_Description::try_associate_subscription(DCPS_IR_Subscription* subscription)
00238 {
00239
00240
00241 DCPS_IR_Topic* topic = 0;
00242
00243 DCPS_IR_Topic_Set::ITERATOR iter = topics_.begin();
00244 DCPS_IR_Topic_Set::ITERATOR end = topics_.end();
00245
00246 while (iter != end) {
00247 topic = *iter;
00248 ++iter;
00249
00250 topic->try_associate(subscription);
00251 }
00252
00253
00254 OpenDDS::DCPS::IncompatibleQosStatus* qosStatus =
00255 subscription->get_incompatibleQosStatus();
00256
00257 if (0 < qosStatus->total_count) {
00258 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00259 OpenDDS::DCPS::RepoIdConverter converter(subscription->get_id());
00260 ACE_DEBUG((LM_DEBUG,
00261 ACE_TEXT("(%P|%t) DCPS_IR_Topic_Description::try_associate_subscription: ")
00262 ACE_TEXT("topic description %C has %d incompatible publications ")
00263 ACE_TEXT("with subscription %C.\n"),
00264 this->name_.c_str(),
00265 qosStatus->total_count,
00266 std::string(converter).c_str()));
00267 }
00268
00269 subscription->update_incompatible_qos();
00270 }
00271 }
00272
00273 bool
00274 DCPS_IR_Topic_Description::try_associate(DCPS_IR_Publication* publication,
00275 DCPS_IR_Subscription* subscription)
00276 {
00277 if (publication->is_subscription_ignored(subscription->get_participant_id(),
00278 subscription->get_topic_id(),
00279 subscription->get_id())) {
00280 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00281 OpenDDS::DCPS::RepoIdConverter pub_converter(publication->get_id());
00282 OpenDDS::DCPS::RepoIdConverter sub_converter(subscription->get_id());
00283 ACE_DEBUG((LM_DEBUG,
00284 ACE_TEXT("(%P|%t) DCPS_IR_Topic_Description::try_associate: ")
00285 ACE_TEXT("topic description %C publication %C ignores subscription %C.\n"),
00286 this->name_.c_str(),
00287 std::string(pub_converter).c_str(),
00288 std::string(sub_converter).c_str()));
00289 }
00290 }
00291
00292 else if (subscription->is_publication_ignored(publication->get_participant_id(),
00293 publication->get_topic_id(),
00294 publication->get_id())) {
00295 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00296 OpenDDS::DCPS::RepoIdConverter pub_converter(publication->get_id());
00297 OpenDDS::DCPS::RepoIdConverter sub_converter(subscription->get_id());
00298 ACE_DEBUG((LM_DEBUG,
00299 ACE_TEXT("(%P|%t) DCPS_IR_Topic_Description::try_associate: ")
00300 ACE_TEXT("topic description %C subscription %C ignores publication %C.\n"),
00301 this->name_.c_str(),
00302 std::string(pub_converter).c_str(),
00303 std::string(sub_converter).c_str()));
00304 }
00305
00306 } else {
00307 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00308 OpenDDS::DCPS::RepoIdConverter pub_converter(publication->get_id());
00309 OpenDDS::DCPS::RepoIdConverter sub_converter(subscription->get_id());
00310 ACE_DEBUG((LM_DEBUG,
00311 ACE_TEXT("(%P|%t) DCPS_IR_Topic_Description::try_associate: ")
00312 ACE_TEXT("topic description %C checking compatibility of ")
00313 ACE_TEXT("publication %C with subscription %C.\n"),
00314 this->name_.c_str(),
00315 std::string(pub_converter).c_str(),
00316 std::string(sub_converter).c_str()));
00317 }
00318
00319 if (OpenDDS::DCPS::compatibleQOS(publication->get_incompatibleQosStatus(),
00320 subscription->get_incompatibleQosStatus(),
00321 publication->get_transportLocatorSeq(),
00322 subscription->get_transportLocatorSeq(),
00323 publication->get_datawriter_qos(),
00324 subscription->get_datareader_qos(),
00325 publication->get_publisher_qos(),
00326 subscription->get_subscriber_qos())) {
00327 associate(publication, subscription);
00328 return true;
00329 }
00330
00331
00332
00333
00334 }
00335
00336 return false;
00337 }
00338
00339 void DCPS_IR_Topic_Description::associate(DCPS_IR_Publication* publication,
00340 DCPS_IR_Subscription* subscription)
00341 {
00342 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00343 OpenDDS::DCPS::RepoIdConverter pub_converter(publication->get_id());
00344 OpenDDS::DCPS::RepoIdConverter sub_converter(subscription->get_id());
00345 ACE_DEBUG((LM_DEBUG,
00346 ACE_TEXT("(%P|%t) DCPS_IR_Topic_Description::associate: ")
00347 ACE_TEXT("topic description %C associating ")
00348 ACE_TEXT("publication %C with subscription %C.\n"),
00349 this->name_.c_str(),
00350 std::string(pub_converter).c_str(),
00351 std::string(sub_converter).c_str()));
00352 }
00353
00354
00355
00356
00357
00358
00359
00360
00361
00362 int error = publication->add_associated_subscription(subscription, true);
00363
00364
00365
00366 if (error != -1) {
00367
00368 subscription->add_associated_publication(publication, false);
00369 } else {
00370 ACE_DEBUG((LM_INFO, ACE_TEXT("Invalid publication detected, NOT notifying subcription of association\n")));
00371 }
00372 }
00373
00374 void DCPS_IR_Topic_Description::reevaluate_associations(DCPS_IR_Subscription* subscription)
00375 {
00376 DCPS_IR_Topic* topic = 0;
00377
00378 DCPS_IR_Topic_Set::ITERATOR iter = topics_.begin();
00379 DCPS_IR_Topic_Set::ITERATOR end = topics_.end();
00380
00381 while (iter != end) {
00382 topic = *iter;
00383 ++iter;
00384
00385 topic->reevaluate_associations(subscription);
00386 }
00387 }
00388
00389 void DCPS_IR_Topic_Description::reevaluate_associations(DCPS_IR_Publication* publication)
00390 {
00391 DCPS_IR_Subscription * sub = 0;
00392 DCPS_IR_Subscription_Set::ITERATOR iter = subscriptionRefs_.begin();
00393 DCPS_IR_Subscription_Set::ITERATOR end = subscriptionRefs_.end();
00394
00395 while (iter != end) {
00396 sub = *iter;
00397 ++iter;
00398 publication->reevaluate_association(sub);
00399 sub->reevaluate_association(publication);
00400 }
00401 }
00402
00403 const char* DCPS_IR_Topic_Description::get_name() const
00404 {
00405 return name_.c_str();
00406 }
00407
00408 const char* DCPS_IR_Topic_Description::get_dataTypeName() const
00409 {
00410 return dataTypeName_.c_str();
00411 }
00412
00413 CORBA::ULong DCPS_IR_Topic_Description::get_number_topics() const
00414 {
00415 return static_cast<CORBA::ULong>(topics_.size());
00416 }
00417
00418 std::string
00419 DCPS_IR_Topic_Description::dump_to_string(const std::string& prefix,
00420 int depth) const
00421 {
00422 std::string str;
00423 #if !defined (OPENDDS_INFOREPO_REDUCED_FOOTPRINT)
00424 for (int i=0; i < depth; i++)
00425 str += prefix;
00426 std::string indent = str + prefix;
00427 str += "DCPS_IR_Topic_Description [";
00428 str += name_.c_str();
00429 str += "][";
00430 str += dataTypeName_.c_str();
00431 str += "]\n";
00432
00433 str += indent + "Subscription References [ ";
00434 for (DCPS_IR_Subscription_Set::const_iterator sub = subscriptionRefs_.begin();
00435 sub != subscriptionRefs_.end();
00436 sub++)
00437 {
00438 OpenDDS::DCPS::RepoIdConverter sub_converter((*sub)->get_id());
00439 str += std::string(sub_converter);
00440 str += " ";
00441 }
00442 str += "]\n";
00443
00444 str += indent + "Topics [ ";
00445 for (DCPS_IR_Topic_Set::const_iterator top = topics_.begin();
00446 top != topics_.end();
00447 top++)
00448 {
00449 OpenDDS::DCPS::RepoIdConverter top_converter((*top)->get_id());
00450 str += std::string(top_converter);
00451 str += " ";
00452 }
00453 str += "]\n";
00454 #endif // !defined (OPENDDS_INFOREPO_REDUCED_FOOTPRINT)
00455 return str;
00456 }
00457
00458 OPENDDS_END_VERSIONED_NAMESPACE_DECL