00001
00002
00003
00004
00005
00006
00007
00008 #include "DcpsInfo_pch.h"
00009
00010 #include "DCPS_IR_Subscription.h"
00011
00012 #include "DCPS_IR_Publication.h"
00013 #include "DCPS_IR_Participant.h"
00014 #include "DCPS_IR_Topic_Description.h"
00015 #include "DCPS_IR_Domain.h"
00016 #include "dds/DCPS/DCPS_Utils.h"
00017 #include "dds/DCPS/RepoIdConverter.h"
00018 #include "dds/DCPS/Qos_Helper.h"
00019 #include "tao/debug.h"
00020
00021 DCPS_IR_Subscription::DCPS_IR_Subscription(const OpenDDS::DCPS::RepoId& id,
00022 DCPS_IR_Participant* participant,
00023 DCPS_IR_Topic* topic,
00024 OpenDDS::DCPS::DataReaderRemote_ptr reader,
00025 const DDS::DataReaderQos& qos,
00026 const OpenDDS::DCPS::TransportLocatorSeq& info,
00027 const DDS::SubscriberQos& subscriberQos,
00028 const char* filterClassName,
00029 const char* filterExpression,
00030 const DDS::StringSeq& exprParams)
00031 : id_(id),
00032 participant_(participant),
00033 topic_(topic),
00034 handle_(0),
00035 isBIT_(0),
00036 qos_(qos),
00037 info_(info),
00038 subscriberQos_(subscriberQos),
00039 filterClassName_(filterClassName),
00040 filterExpression_(filterExpression),
00041 exprParams_(exprParams)
00042 {
00043 reader_ = OpenDDS::DCPS::DataReaderRemote::_duplicate(reader);
00044
00045 incompatibleQosStatus_.total_count = 0;
00046 incompatibleQosStatus_.count_since_last_send = 0;
00047 }
00048
00049 DCPS_IR_Subscription::~DCPS_IR_Subscription()
00050 {
00051 if (0 != associations_.size()) {
00052 CORBA::Boolean dont_notify_lost = 0;
00053 remove_associations(dont_notify_lost);
00054 }
00055 }
00056
00057 int DCPS_IR_Subscription::add_associated_publication(DCPS_IR_Publication* pub,
00058 bool active)
00059 {
00060
00061 int status = associations_.insert(pub);
00062
00063 switch (status) {
00064 case 0: {
00065
00066 OpenDDS::DCPS::WriterAssociation association;
00067 association.writerTransInfo = pub->get_transportLocatorSeq();
00068 association.writerId = pub->get_id();
00069 association.pubQos = *(pub->get_publisher_qos());
00070 association.writerQos = *(pub->get_datawriter_qos());
00071
00072 if (participant_->is_alive() && this->participant_->isOwner()) {
00073 try {
00074 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00075 OpenDDS::DCPS::RepoIdConverter sub_converter(id_);
00076 OpenDDS::DCPS::RepoIdConverter pub_converter(pub->get_id());
00077 ACE_DEBUG((LM_DEBUG,
00078 ACE_TEXT("(%P|%t) DCPS_IR_Subscription::add_associated_publication:")
00079 ACE_TEXT(" subscription %C adding publication %C.\n"),
00080 std::string(sub_converter).c_str(),
00081 std::string(pub_converter).c_str()));
00082 }
00083
00084 reader_->add_association(id_, association, active);
00085
00086 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00087 ACE_DEBUG((LM_DEBUG,
00088 ACE_TEXT("(%P|%t) DCPS_IR_Subscription::add_associated_publication: ")
00089 ACE_TEXT("successfully added publication %x\n"),
00090 pub));
00091 }
00092 } catch (const CORBA::Exception& ex) {
00093 ex._tao_print_exception(
00094 "(%P|%t) ERROR: Exception caught in DCPS_IR_Subscription::add_associated_publication:");
00095 participant_->mark_dead();
00096 status = -1;
00097 }
00098 }
00099
00100 }
00101 break;
00102
00103 case 1: {
00104 OpenDDS::DCPS::RepoIdConverter sub_converter(id_);
00105 OpenDDS::DCPS::RepoIdConverter pub_converter(pub->get_id());
00106 ACE_ERROR((LM_ERROR,
00107 ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Subscription::add_associated_publication: ")
00108 ACE_TEXT("subscription %C attempted to re-add publication %C\n"),
00109 std::string(sub_converter).c_str(),
00110 std::string(pub_converter).c_str()));
00111 }
00112 break;
00113
00114 case -1: {
00115 OpenDDS::DCPS::RepoIdConverter sub_converter(id_);
00116 OpenDDS::DCPS::RepoIdConverter pub_converter(pub->get_id());
00117 ACE_ERROR((LM_ERROR,
00118 ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Subscription::add_associated_publication: ")
00119 ACE_TEXT("subscription %C failed to add publication %C\n"),
00120 std::string(sub_converter).c_str(),
00121 std::string(pub_converter).c_str()));
00122 }
00123 };
00124
00125 return status;
00126 }
00127
00128 void
00129 DCPS_IR_Subscription::association_complete(const OpenDDS::DCPS::RepoId& remote)
00130 {
00131 typedef DCPS_IR_Publication_Set::ITERATOR iter_t;
00132 for (iter_t iter = associations_.begin(); iter != associations_.end(); ++iter) {
00133 if ((*iter)->get_id() == remote) {
00134 (*iter)->call_association_complete(get_id());
00135 }
00136 }
00137 }
00138
00139 void
00140 DCPS_IR_Subscription::call_association_complete(const OpenDDS::DCPS::RepoId& remote)
00141 {
00142 try {
00143 reader_->association_complete(remote);
00144 } catch (const CORBA::Exception& ex) {
00145 ex._tao_print_exception(
00146 "(%P|%t) ERROR: Exception caught in DCPS_IR_Subscription::call_association_complete:");
00147 participant_->mark_dead();
00148 }
00149 }
00150
00151 int DCPS_IR_Subscription::remove_associated_publication(DCPS_IR_Publication* pub,
00152 CORBA::Boolean sendNotify,
00153 CORBA::Boolean notify_lost,
00154 bool notify_both_side)
00155 {
00156 bool marked_dead = false;
00157
00158 if (sendNotify) {
00159 OpenDDS::DCPS::WriterIdSeq idSeq(5);
00160 idSeq.length(1);
00161 idSeq[0] = pub->get_id();
00162
00163 if (participant_->is_alive() && this->participant_->isOwner()) {
00164 try {
00165 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00166 ACE_DEBUG((LM_DEBUG,
00167 ACE_TEXT("(%P|%t) DCPS_IR_Subscription::remove_associated_publication:")
00168 ACE_TEXT(" calling sub %d with pub %d\n"),
00169 id_, pub->get_id()));
00170 }
00171
00172 reader_->remove_associations(idSeq, notify_lost);
00173
00174 if (notify_both_side) {
00175 pub->remove_associated_subscription(this, sendNotify, notify_lost);
00176 }
00177
00178 } catch (const CORBA::Exception& ex) {
00179 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00180 ex._tao_print_exception(
00181 "(%P|%t) ERROR: Exception caught in DCPS_IR_Subscription::remove_associated_publication:");
00182 }
00183
00184 participant_->mark_dead();
00185 marked_dead = true;
00186 }
00187 }
00188 }
00189
00190 int status = associations_.remove(pub);
00191
00192 if (0 == status) {
00193 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00194 OpenDDS::DCPS::RepoIdConverter sub_converter(id_);
00195 OpenDDS::DCPS::RepoIdConverter pub_converter(pub->get_id());
00196 ACE_DEBUG((LM_DEBUG,
00197 ACE_TEXT("(%P|%t) DCPS_IR_Subscription::remove_associated_publication: ")
00198 ACE_TEXT("subscription %C removed publication %C at %x.\n"),
00199 std::string(sub_converter).c_str(),
00200 std::string(pub_converter).c_str(),
00201 pub));
00202 }
00203
00204 } else {
00205 OpenDDS::DCPS::RepoIdConverter sub_converter(id_);
00206 OpenDDS::DCPS::RepoIdConverter pub_converter(pub->get_id());
00207 ACE_ERROR((LM_ERROR,
00208 ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Subscription::remove_associated_publication: ")
00209 ACE_TEXT("subscription %C failed to remove publication %C at %x.\n"),
00210 std::string(sub_converter).c_str(),
00211 std::string(pub_converter).c_str(),
00212 pub));
00213 }
00214
00215 if (marked_dead) {
00216 return -1;
00217
00218 } else {
00219 return status;
00220 }
00221 }
00222
00223 int DCPS_IR_Subscription::remove_associations(CORBA::Boolean notify_lost)
00224 {
00225 int status = 0;
00226 DCPS_IR_Publication* pub = 0;
00227 size_t numAssociations = associations_.size();
00228 CORBA::Boolean dontSend = 0;
00229 CORBA::Boolean send = 1;
00230
00231 if (0 < numAssociations) {
00232 DCPS_IR_Publication_Set::ITERATOR iter = associations_.begin();
00233 DCPS_IR_Publication_Set::ITERATOR end = associations_.end();
00234
00235 while (iter != end) {
00236 pub = *iter;
00237 ++iter;
00238
00239 pub->remove_associated_subscription(this, send, notify_lost);
00240 CORBA::Boolean dont_notify_lost = 0;
00241 remove_associated_publication(pub, dontSend, dont_notify_lost);
00242 }
00243 }
00244 this->defunct_.reset();
00245
00246 return status;
00247 }
00248
00249 void DCPS_IR_Subscription::disassociate_participant(OpenDDS::DCPS::RepoId id,
00250 bool reassociate)
00251 {
00252 DCPS_IR_Publication* pub = 0;
00253 size_t numAssociations = associations_.size();
00254 CORBA::Boolean dontSend = 0;
00255 CORBA::Boolean send = 1;
00256 long count = 0;
00257
00258 if (0 < numAssociations) {
00259 OpenDDS::DCPS::WriterIdSeq idSeq(static_cast<CORBA::ULong>(numAssociations));
00260 idSeq.length(static_cast<CORBA::ULong>(numAssociations));
00261
00262 DCPS_IR_Publication_Set::ITERATOR iter = associations_.begin();
00263 DCPS_IR_Publication_Set::ITERATOR end = associations_.end();
00264
00265 while (iter != end) {
00266 pub = *iter;
00267 ++iter;
00268
00269 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00270 OpenDDS::DCPS::RepoIdConverter sub_converter(id_);
00271 OpenDDS::DCPS::RepoIdConverter pub_converter(pub->get_id());
00272 OpenDDS::DCPS::RepoIdConverter sub_part_converter(id);
00273 OpenDDS::DCPS::RepoIdConverter pub_part_converter(pub->get_participant_id());
00274 ACE_DEBUG((LM_DEBUG,
00275 ACE_TEXT("(%P|%t) DCPS_IR_Subscription::disassociate_participant: ")
00276 ACE_TEXT("subscription %C testing if publication %C particpant %C == %C.\n"),
00277 std::string(sub_converter).c_str(),
00278 std::string(pub_converter).c_str(),
00279 std::string(sub_part_converter).c_str(),
00280 std::string(pub_part_converter).c_str()));
00281 }
00282
00283 if (id == pub->get_participant_id()) {
00284 CORBA::Boolean dont_notify_lost = 0;
00285 pub->remove_associated_subscription(this, send, dont_notify_lost);
00286 remove_associated_publication(pub, dontSend, dont_notify_lost);
00287
00288 idSeq[count] = pub->get_id();
00289 ++count;
00290
00291 if (reassociate && this->defunct_.insert(pub) != 0) {
00292 OpenDDS::DCPS::RepoIdConverter sub_converter(id_);
00293 OpenDDS::DCPS::RepoIdConverter pub_converter(pub->get_id());
00294 ACE_ERROR((LM_ERROR,
00295 ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Subscription::disassociate_participant: ")
00296 ACE_TEXT("subscription %C failed to reassociate publication %C at %x.\n"),
00297 std::string(sub_converter).c_str(),
00298 std::string(pub_converter).c_str(),
00299 pub));
00300 }
00301 }
00302 }
00303
00304 if (0 < count) {
00305 idSeq.length(count);
00306
00307 if (participant_->is_alive() && this->participant_->isOwner()) {
00308 try {
00309 CORBA::Boolean dont_notify_lost = 0;
00310 reader_->remove_associations(idSeq, dont_notify_lost);
00311
00312 } catch (const CORBA::Exception& ex) {
00313 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00314 ex._tao_print_exception(
00315 "(%P|%t) ERROR: Exception caught in DCPS_IR_Subscription::disassociate_participant:");
00316 }
00317
00318 participant_->mark_dead();
00319 }
00320 }
00321 }
00322 }
00323 }
00324
00325 void DCPS_IR_Subscription::disassociate_topic(OpenDDS::DCPS::RepoId id)
00326 {
00327 DCPS_IR_Publication* pub = 0;
00328 size_t numAssociations = associations_.size();
00329 CORBA::Boolean dontSend = 0;
00330 CORBA::Boolean send = 1;
00331 long count = 0;
00332
00333 if (0 < numAssociations) {
00334 OpenDDS::DCPS::WriterIdSeq idSeq(static_cast<CORBA::ULong>(numAssociations));
00335 idSeq.length(static_cast<CORBA::ULong>(numAssociations));
00336
00337 DCPS_IR_Publication_Set::ITERATOR iter = associations_.begin();
00338 DCPS_IR_Publication_Set::ITERATOR end = associations_.end();
00339
00340 while (iter != end) {
00341 pub = *iter;
00342 ++iter;
00343
00344 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00345 OpenDDS::DCPS::RepoIdConverter sub_converter(id_);
00346 OpenDDS::DCPS::RepoIdConverter pub_converter(pub->get_id());
00347 OpenDDS::DCPS::RepoIdConverter sub_topic_converter(id);
00348 OpenDDS::DCPS::RepoIdConverter pub_topic_converter(pub->get_topic_id());
00349 ACE_DEBUG((LM_DEBUG,
00350 ACE_TEXT("(%P|%t) DCPS_IR_Subscription::disassociate_topic: ")
00351 ACE_TEXT("subscription %C testing if publication %C topic %C == %C.\n"),
00352 std::string(sub_converter).c_str(),
00353 std::string(pub_converter).c_str(),
00354 std::string(sub_topic_converter).c_str(),
00355 std::string(pub_topic_converter).c_str()));
00356 }
00357
00358 if (id == pub->get_topic_id()) {
00359 CORBA::Boolean dont_notify_lost = 0;
00360 pub->remove_associated_subscription(this, send, dont_notify_lost);
00361 remove_associated_publication(pub, dontSend, dont_notify_lost);
00362
00363 idSeq[count] = pub->get_id();
00364 ++count;
00365 }
00366 }
00367
00368 if (0 < count) {
00369 idSeq.length(count);
00370
00371 if (participant_->is_alive() && this->participant_->isOwner()) {
00372 try {
00373 CORBA::Boolean dont_notify_lost = 0;
00374 reader_->remove_associations(idSeq, dont_notify_lost);
00375
00376 } catch (const CORBA::Exception& ex) {
00377 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00378 ex._tao_print_exception(
00379 "(%P|%t) ERROR: Exception caught in DCPS_IR_Subscription::remove_associations:");
00380 }
00381
00382 participant_->mark_dead();
00383 }
00384 }
00385 }
00386 }
00387 }
00388
00389 void DCPS_IR_Subscription::disassociate_publication(OpenDDS::DCPS::RepoId id,
00390 bool reassociate)
00391 {
00392 DCPS_IR_Publication* pub = 0;
00393 size_t numAssociations = associations_.size();
00394 CORBA::Boolean dontSend = 0;
00395 CORBA::Boolean send = 1;
00396 long count = 0;
00397
00398 if (0 < numAssociations) {
00399 OpenDDS::DCPS::WriterIdSeq idSeq(static_cast<CORBA::ULong>(numAssociations));
00400 idSeq.length(static_cast<CORBA::ULong>(numAssociations));
00401
00402 DCPS_IR_Publication_Set::ITERATOR iter = associations_.begin();
00403 DCPS_IR_Publication_Set::ITERATOR end = associations_.end();
00404
00405 while (iter != end) {
00406 pub = *iter;
00407 ++iter;
00408
00409 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00410 OpenDDS::DCPS::RepoIdConverter sub_converter(id_);
00411 OpenDDS::DCPS::RepoIdConverter pub_converter(pub->get_id());
00412 OpenDDS::DCPS::RepoIdConverter sub_pub_converter(id);
00413 ACE_DEBUG((LM_DEBUG,
00414 ACE_TEXT("(%P|%t) DCPS_IR_Subscription::disassociate_publication: ")
00415 ACE_TEXT("subscription %C testing if publication %C == %C.\n"),
00416 std::string(sub_converter).c_str(),
00417 std::string(pub_converter).c_str(),
00418 std::string(sub_pub_converter).c_str()));
00419 }
00420
00421 if (id == pub->get_id()) {
00422 CORBA::Boolean dont_notify_lost = 0;
00423 pub->remove_associated_subscription(this, send, dont_notify_lost);
00424 remove_associated_publication(pub, dontSend, dont_notify_lost);
00425
00426 idSeq[count] = pub->get_id();
00427 ++count;
00428
00429 if (reassociate && this->defunct_.insert(pub) != 0) {
00430 OpenDDS::DCPS::RepoIdConverter sub_converter(id_);
00431 OpenDDS::DCPS::RepoIdConverter pub_converter(pub->get_id());
00432 ACE_ERROR((LM_ERROR,
00433 ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Subscription::disassociate_publication: ")
00434 ACE_TEXT("subscription %C failed to reassociate publication %C at %x.\n"),
00435 std::string(sub_converter).c_str(),
00436 std::string(pub_converter).c_str(),
00437 pub));
00438 }
00439 }
00440 }
00441
00442 if (0 < count) {
00443 idSeq.length(count);
00444
00445 if (participant_->is_alive() && this->participant_->isOwner()) {
00446 try {
00447 CORBA::Boolean dont_notify_lost = 0;
00448 reader_->remove_associations(idSeq, dont_notify_lost);
00449
00450 } catch (const CORBA::Exception& ex) {
00451 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00452 ex._tao_print_exception(
00453 "(%P|%t) ERROR: Exception caught in DCPS_IR_Subscription::remove_associations:");
00454 }
00455
00456 participant_->mark_dead();
00457 }
00458 }
00459 }
00460 }
00461 }
00462
00463 void DCPS_IR_Subscription::update_incompatible_qos()
00464 {
00465 if (participant_->is_alive() && this->participant_->isOwner()) {
00466 try {
00467 reader_->update_incompatible_qos(incompatibleQosStatus_);
00468 incompatibleQosStatus_.count_since_last_send = 0;
00469 } catch (const CORBA::Exception& ex) {
00470 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00471 ex._tao_print_exception(
00472 "(%P|%t) ERROR: Exception caught in DCPS_IR_Subscription::update_incompatible_qos:");
00473 }
00474
00475 participant_->mark_dead();
00476 }
00477 }
00478 }
00479
00480 CORBA::Boolean DCPS_IR_Subscription::is_publication_ignored(OpenDDS::DCPS::RepoId partId,
00481 OpenDDS::DCPS::RepoId topicId,
00482 OpenDDS::DCPS::RepoId pubId)
00483 {
00484 CORBA::Boolean ignored;
00485 ignored = (participant_->is_participant_ignored(partId) ||
00486 participant_->is_topic_ignored(topicId) ||
00487 participant_->is_publication_ignored(pubId));
00488
00489 return ignored;
00490 }
00491
00492 OpenDDS::DCPS::TransportLocatorSeq DCPS_IR_Subscription::get_transportLocatorSeq() const
00493 {
00494 return info_;
00495 }
00496
00497 OpenDDS::DCPS::IncompatibleQosStatus* DCPS_IR_Subscription::get_incompatibleQosStatus()
00498 {
00499 return &incompatibleQosStatus_;
00500 }
00501
00502 const DDS::DataReaderQos* DCPS_IR_Subscription::get_datareader_qos()
00503 {
00504 return &qos_;
00505 }
00506
00507 const DDS::SubscriberQos* DCPS_IR_Subscription::get_subscriber_qos()
00508 {
00509 return &subscriberQos_;
00510 }
00511
00512 using OpenDDS::DCPS::operator==;
00513
00514 void
00515 DCPS_IR_Subscription::set_qos(const DDS::DataReaderQos& qos)
00516 {
00517 if (false == (qos == this->qos_)) {
00518
00519 bool check =
00520 OpenDDS::DCPS::should_check_association_upon_change(qos, this->qos_);
00521
00522
00523 this->qos_ = qos;
00524
00525 if (check) {
00526
00527 this->reevaluate_existing_associations();
00528
00529
00530
00531
00532 ACE_OS::sleep(ACE_Time_Value(0, 250000));
00533
00534
00535 DCPS_IR_Topic_Description* description
00536 = this->topic_->get_topic_description();
00537 description->reevaluate_associations(this);
00538 }
00539
00540 this->participant_->get_domain_reference()->publish_subscription_bit(this);
00541 }
00542 }
00543
00544 void
00545 DCPS_IR_Subscription::set_qos(const DDS::SubscriberQos& qos)
00546 {
00547 if (false == (qos == this->subscriberQos_)) {
00548
00549 bool check = OpenDDS::DCPS::should_check_association_upon_change(qos, this->subscriberQos_);
00550
00551
00552 this->subscriberQos_ = qos;
00553
00554 if (check) {
00555
00556 this->reevaluate_existing_associations();
00557
00558
00559
00560
00561 ACE_OS::sleep(ACE_Time_Value(0, 250000));
00562
00563
00564 DCPS_IR_Topic_Description* description
00565 = this->topic_->get_topic_description();
00566 description->reevaluate_associations(this);
00567 }
00568
00569 this->participant_->get_domain_reference()->publish_subscription_bit(this);
00570 }
00571 }
00572
00573 bool DCPS_IR_Subscription::set_qos(const DDS::DataReaderQos & qos,
00574 const DDS::SubscriberQos & subscriberQos,
00575 Update::SpecificQos& specificQos)
00576 {
00577 bool need_evaluate = false;
00578 bool u_dr_qos = !(qos_ == qos);
00579
00580 if (u_dr_qos) {
00581 if (OpenDDS::DCPS::should_check_association_upon_change(qos_, qos)) {
00582 need_evaluate = true;
00583 }
00584
00585 qos_ = qos;
00586 }
00587
00588 bool u_sub_qos = !(subscriberQos_ == subscriberQos);
00589
00590 if (u_sub_qos) {
00591 if (OpenDDS::DCPS::should_check_association_upon_change(subscriberQos_, subscriberQos)) {
00592 need_evaluate = true;
00593 }
00594
00595 subscriberQos_ = subscriberQos;
00596 }
00597
00598 if (need_evaluate) {
00599
00600 this->reevaluate_existing_associations();
00601
00602 DCPS_IR_Topic_Description* description = this->topic_->get_topic_description();
00603 description->reevaluate_associations(this);
00604 }
00605
00606 participant_->get_domain_reference()->publish_subscription_bit(this);
00607 specificQos = u_dr_qos? Update::DataReaderQos:
00608 u_sub_qos? Update::SubscriberQos:
00609 Update::NoQos;
00610
00611 return true;
00612 }
00613
00614 void
00615 DCPS_IR_Subscription::reevaluate_defunct_associations()
00616 {
00617 DCPS_IR_Publication_Set::iterator it(this->defunct_.begin());
00618 while (it != this->defunct_.end()) {
00619 DCPS_IR_Publication* publication = *it;
00620 ++it;
00621
00622 if (reevaluate_association(publication)) {
00623 this->defunct_.remove(publication);
00624
00625 } else {
00626 OpenDDS::DCPS::RepoIdConverter sub_converter(id_);
00627 OpenDDS::DCPS::RepoIdConverter pub_converter(publication->get_id());
00628 ACE_ERROR((LM_ERROR,
00629 ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Subscription::reevaluate_defunct_associations: ")
00630 ACE_TEXT("subscription %C failed to reassociate publication %C at %x.\n"),
00631 std::string(sub_converter).c_str(),
00632 std::string(pub_converter).c_str(),
00633 publication));
00634 }
00635 }
00636 }
00637
00638 void DCPS_IR_Subscription::reevaluate_existing_associations()
00639 {
00640 DCPS_IR_Publication * pub = 0;
00641 DCPS_IR_Publication_Set::ITERATOR iter = associations_.begin();
00642 DCPS_IR_Publication_Set::ITERATOR end = associations_.end();
00643
00644 while (iter != end) {
00645 pub = *iter;
00646 ++iter;
00647
00648 this->reevaluate_association(pub);
00649 }
00650 }
00651
00652 bool
00653 DCPS_IR_Subscription::reevaluate_association(DCPS_IR_Publication* publication)
00654 {
00655 int status = this->associations_.find(publication);
00656
00657 if (status == 0) {
00658
00659
00660 if (!OpenDDS::DCPS::compatibleQOS(publication->get_incompatibleQosStatus(),
00661 this->get_incompatibleQosStatus(),
00662 publication->get_transportLocatorSeq(),
00663 this->get_transportLocatorSeq(),
00664 publication->get_datawriter_qos(),
00665 this->get_datareader_qos(),
00666 publication->get_publisher_qos(),
00667 this->get_subscriber_qos())) {
00668 bool sendNotify = true;
00669 bool notify_lost = true;
00670
00671 this->remove_associated_publication(publication, sendNotify, notify_lost, true);
00672 }
00673
00674 } else {
00675 DCPS_IR_Topic_Description* description = this->topic_->get_topic_description();
00676 return description->try_associate(publication, this);
00677 }
00678
00679 return false;
00680 }
00681
00682 OpenDDS::DCPS::RepoId DCPS_IR_Subscription::get_id()
00683 {
00684 return id_;
00685 }
00686
00687 OpenDDS::DCPS::RepoId DCPS_IR_Subscription::get_topic_id()
00688 {
00689 return topic_->get_id();
00690 }
00691
00692 OpenDDS::DCPS::RepoId DCPS_IR_Subscription::get_participant_id()
00693 {
00694 return participant_->get_id();
00695 }
00696
00697 DCPS_IR_Topic_Description* DCPS_IR_Subscription::get_topic_description()
00698 {
00699 return topic_->get_topic_description();
00700 }
00701
00702 DCPS_IR_Topic* DCPS_IR_Subscription::get_topic()
00703 {
00704 return topic_;
00705 }
00706
00707 DDS::InstanceHandle_t DCPS_IR_Subscription::get_handle()
00708 {
00709 return handle_;
00710 }
00711
00712 void DCPS_IR_Subscription::set_handle(DDS::InstanceHandle_t handle)
00713 {
00714 handle_ = handle;
00715 }
00716
00717 CORBA::Boolean DCPS_IR_Subscription::is_bit()
00718 {
00719 return isBIT_;
00720 }
00721
00722 void DCPS_IR_Subscription::set_bit_status(CORBA::Boolean isBIT)
00723 {
00724 isBIT_ = isBIT;
00725 }
00726
00727 OpenDDS::DCPS::DataReaderRemote_ptr
00728 DCPS_IR_Subscription::reader()
00729 {
00730 return OpenDDS::DCPS::DataReaderRemote::_duplicate(this->reader_.in());
00731 }
00732
00733 std::string
00734 DCPS_IR_Subscription::get_filter_class_name() const
00735 {
00736 return filterClassName_;
00737 }
00738
00739 std::string
00740 DCPS_IR_Subscription::get_filter_expression() const
00741 {
00742 return filterExpression_;
00743 }
00744
00745 DDS::StringSeq
00746 DCPS_IR_Subscription::get_expr_params() const
00747 {
00748 return exprParams_;
00749 }
00750
00751 void
00752 DCPS_IR_Subscription::update_expr_params(const DDS::StringSeq& params)
00753 {
00754 exprParams_ = params;
00755 typedef DCPS_IR_Publication_Set::ITERATOR iter_t;
00756 for (iter_t i(associations_.begin()), e(associations_.end()); i != e; ++i) {
00757 (*i)->update_expr_params(id_, params);
00758 }
00759 }
00760
00761 std::string
00762 DCPS_IR_Subscription::dump_to_string(const std::string& prefix, int depth) const
00763 {
00764 std::string str;
00765 #if !defined (OPENDDS_INFOREPO_REDUCED_FOOTPRINT)
00766 OpenDDS::DCPS::RepoIdConverter local_converter(id_);
00767
00768 for (int i=0; i < depth; i++)
00769 str += prefix;
00770 std::string indent = str + prefix;
00771 str += "DCPS_IR_Subscription[";
00772 str += std::string(local_converter);
00773 str += "]";
00774 if (isBIT_)
00775 str += " (BIT)";
00776 str += "\n";
00777
00778 str += indent + "Associations [ ";
00779 for (DCPS_IR_Publication_Set::const_iterator assoc = associations_.begin();
00780 assoc != associations_.end();
00781 assoc++)
00782 {
00783 OpenDDS::DCPS::RepoIdConverter assoc_converter((*assoc)->get_id());
00784 str += std::string(assoc_converter);
00785 str += " ";
00786 }
00787 str += "]\n";
00788
00789 str += indent + "Defunct Associations [ ";
00790 for (DCPS_IR_Publication_Set::const_iterator def = defunct_.begin();
00791 def != defunct_.end();
00792 def++)
00793 {
00794 OpenDDS::DCPS::RepoIdConverter def_converter((*def)->get_id());
00795 str += std::string(def_converter);
00796 str += " ";
00797 }
00798 str += "]\n";
00799
00800 #endif // !defined (OPENDDS_INFOREPO_REDUCED_FOOTPRINT)
00801 return str;
00802 }
00803
00804
00805 #if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
00806
00807 template class ACE_Node<DCPS_IR_Publication*>;
00808 template class ACE_Unbounded_Set<DCPS_IR_Publication*>;
00809 template class ACE_Unbounded_Set_Iterator<DCPS_IR_Publication*>;
00810
00811 #elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
00812
00813 #pragma instantiate ACE_Node<DCPS_IR_Publication*>
00814 #pragma instantiate ACE_Unbounded_Set<DCPS_IR_Publication*>
00815 #pragma instantiate ACE_Unbounded_Set_Iterator<DCPS_IR_Publication*>
00816
00817 #endif