00001
00002
00003
00004
00005
00006
00007
00008 #include "DcpsInfo_pch.h"
00009
00010 #include "DCPS_IR_Subscription.h"
00011
00012 #include "DCPS_IR_Publication.h"
00013 #include "DCPS_IR_Participant.h"
00014 #include "DCPS_IR_Topic_Description.h"
00015 #include "DCPS_IR_Domain.h"
00016 #include "dds/DCPS/DCPS_Utils.h"
00017 #include "dds/DCPS/RepoIdConverter.h"
00018 #include "dds/DCPS/Qos_Helper.h"
00019 #include "tao/debug.h"
00020
00021 #include "ace/OS_NS_unistd.h"
00022
00023 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00024
00025 DCPS_IR_Subscription::DCPS_IR_Subscription(const OpenDDS::DCPS::RepoId& id,
00026 DCPS_IR_Participant* participant,
00027 DCPS_IR_Topic* topic,
00028 OpenDDS::DCPS::DataReaderRemote_ptr reader,
00029 const DDS::DataReaderQos& qos,
00030 const OpenDDS::DCPS::TransportLocatorSeq& info,
00031 const DDS::SubscriberQos& subscriberQos,
00032 const char* filterClassName,
00033 const char* filterExpression,
00034 const DDS::StringSeq& exprParams)
00035 : id_(id),
00036 participant_(participant),
00037 topic_(topic),
00038 handle_(0),
00039 isBIT_(0),
00040 qos_(qos),
00041 info_(info),
00042 subscriberQos_(subscriberQos),
00043 filterClassName_(filterClassName),
00044 filterExpression_(filterExpression),
00045 exprParams_(exprParams)
00046 {
00047 reader_ = OpenDDS::DCPS::DataReaderRemote::_duplicate(reader);
00048
00049 incompatibleQosStatus_.total_count = 0;
00050 incompatibleQosStatus_.count_since_last_send = 0;
00051 }
00052
00053 DCPS_IR_Subscription::~DCPS_IR_Subscription()
00054 {
00055 }
00056
00057 int DCPS_IR_Subscription::add_associated_publication(DCPS_IR_Publication* pub,
00058 bool active)
00059 {
00060
00061 int status = associations_.insert(pub);
00062
00063 switch (status) {
00064 case 0: {
00065
00066 OpenDDS::DCPS::WriterAssociation association;
00067 association.writerTransInfo = pub->get_transportLocatorSeq();
00068 association.writerId = pub->get_id();
00069 association.pubQos = *(pub->get_publisher_qos());
00070 association.writerQos = *(pub->get_datawriter_qos());
00071
00072 if (participant_->is_alive() && this->participant_->isOwner()) {
00073 try {
00074 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00075 OpenDDS::DCPS::RepoIdConverter sub_converter(id_);
00076 OpenDDS::DCPS::RepoIdConverter pub_converter(pub->get_id());
00077 ACE_DEBUG((LM_DEBUG,
00078 ACE_TEXT("(%P|%t) DCPS_IR_Subscription::add_associated_publication:")
00079 ACE_TEXT(" subscription %C adding publication %C.\n"),
00080 std::string(sub_converter).c_str(),
00081 std::string(pub_converter).c_str()));
00082 }
00083
00084 reader_->add_association(id_, association, active);
00085
00086 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00087 ACE_DEBUG((LM_DEBUG,
00088 ACE_TEXT("(%P|%t) DCPS_IR_Subscription::add_associated_publication: ")
00089 ACE_TEXT("successfully added publication %x\n"),
00090 pub));
00091 }
00092 } catch (const CORBA::Exception& ex) {
00093 ex._tao_print_exception(
00094 "(%P|%t) ERROR: Exception caught in DCPS_IR_Subscription::add_associated_publication:");
00095 participant_->mark_dead();
00096 status = -1;
00097 }
00098 }
00099
00100 }
00101 break;
00102
00103 case 1: {
00104 OpenDDS::DCPS::RepoIdConverter sub_converter(id_);
00105 OpenDDS::DCPS::RepoIdConverter pub_converter(pub->get_id());
00106 ACE_ERROR((LM_ERROR,
00107 ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Subscription::add_associated_publication: ")
00108 ACE_TEXT("subscription %C attempted to re-add publication %C\n"),
00109 std::string(sub_converter).c_str(),
00110 std::string(pub_converter).c_str()));
00111 }
00112 break;
00113
00114 case -1: {
00115 OpenDDS::DCPS::RepoIdConverter sub_converter(id_);
00116 OpenDDS::DCPS::RepoIdConverter pub_converter(pub->get_id());
00117 ACE_ERROR((LM_ERROR,
00118 ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Subscription::add_associated_publication: ")
00119 ACE_TEXT("subscription %C failed to add publication %C\n"),
00120 std::string(sub_converter).c_str(),
00121 std::string(pub_converter).c_str()));
00122 }
00123 };
00124
00125 return status;
00126 }
00127
00128 void
00129 DCPS_IR_Subscription::association_complete(const OpenDDS::DCPS::RepoId& remote)
00130 {
00131 typedef DCPS_IR_Publication_Set::ITERATOR iter_t;
00132 for (iter_t iter = associations_.begin(); iter != associations_.end(); ++iter) {
00133 if ((*iter)->get_id() == remote) {
00134 (*iter)->call_association_complete(get_id());
00135 }
00136 }
00137 }
00138
00139 void
00140 DCPS_IR_Subscription::call_association_complete(const OpenDDS::DCPS::RepoId& remote)
00141 {
00142 try {
00143 reader_->association_complete(remote);
00144 } catch (const CORBA::Exception& ex) {
00145 ex._tao_print_exception(
00146 "(%P|%t) ERROR: Exception caught in DCPS_IR_Subscription::call_association_complete:");
00147 participant_->mark_dead();
00148 }
00149 }
00150
00151 int DCPS_IR_Subscription::remove_associated_publication(DCPS_IR_Publication* pub,
00152 CORBA::Boolean sendNotify,
00153 CORBA::Boolean notify_lost,
00154 bool notify_both_side)
00155 {
00156 bool marked_dead = false;
00157
00158 if (sendNotify) {
00159
00160 if (participant_->is_alive() && this->participant_->isOwner()) {
00161 try {
00162 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00163 OpenDDS::DCPS::RepoIdConverter sub_converter(id_);
00164 OpenDDS::DCPS::RepoIdConverter pub_converter(pub->get_id());
00165 ACE_DEBUG((LM_DEBUG,
00166 ACE_TEXT("(%P|%t) DCPS_IR_Subscription::remove_associated_publication:")
00167 ACE_TEXT(" calling sub %C with pub %C\n"),
00168 std::string(sub_converter).c_str(),
00169 std::string(pub_converter).c_str()
00170 ));
00171 }
00172
00173 OpenDDS::DCPS::WriterIdSeq idSeq(5);
00174 idSeq.length(1);
00175 idSeq[0] = pub->get_id();
00176
00177 reader_->remove_associations(idSeq, notify_lost);
00178
00179 if (notify_both_side) {
00180 pub->remove_associated_subscription(this, sendNotify, notify_lost);
00181 }
00182
00183 } catch (const CORBA::Exception& ex) {
00184 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00185 ex._tao_print_exception(
00186 "(%P|%t) ERROR: Exception caught in DCPS_IR_Subscription::remove_associated_publication:");
00187 }
00188
00189 participant_->mark_dead();
00190 marked_dead = true;
00191 }
00192 }
00193 }
00194
00195 int status = associations_.remove(pub);
00196
00197 if (0 == status) {
00198 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00199 OpenDDS::DCPS::RepoIdConverter sub_converter(id_);
00200 OpenDDS::DCPS::RepoIdConverter pub_converter(pub->get_id());
00201 ACE_DEBUG((LM_DEBUG,
00202 ACE_TEXT("(%P|%t) DCPS_IR_Subscription::remove_associated_publication: ")
00203 ACE_TEXT("subscription %C removed publication %C at %x.\n"),
00204 std::string(sub_converter).c_str(),
00205 std::string(pub_converter).c_str(),
00206 pub));
00207 }
00208
00209 } else {
00210 OpenDDS::DCPS::RepoIdConverter sub_converter(id_);
00211 OpenDDS::DCPS::RepoIdConverter pub_converter(pub->get_id());
00212 ACE_ERROR((LM_ERROR,
00213 ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Subscription::remove_associated_publication: ")
00214 ACE_TEXT("subscription %C failed to remove publication %C at %x.\n"),
00215 std::string(sub_converter).c_str(),
00216 std::string(pub_converter).c_str(),
00217 pub));
00218 }
00219
00220 if (marked_dead) {
00221 return -1;
00222
00223 } else {
00224 return status;
00225 }
00226 }
00227
00228 int DCPS_IR_Subscription::remove_associations(CORBA::Boolean notify_lost)
00229 {
00230 int status = 0;
00231 DCPS_IR_Publication* pub = 0;
00232 size_t numAssociations = associations_.size();
00233 CORBA::Boolean dontSend = 0;
00234 CORBA::Boolean send = 1;
00235
00236 if (0 < numAssociations) {
00237 DCPS_IR_Publication_Set::ITERATOR iter = associations_.begin();
00238 DCPS_IR_Publication_Set::ITERATOR end = associations_.end();
00239
00240 while (iter != end) {
00241 pub = *iter;
00242 ++iter;
00243
00244 pub->remove_associated_subscription(this, send, notify_lost);
00245 CORBA::Boolean dont_notify_lost = 0;
00246 remove_associated_publication(pub, dontSend, dont_notify_lost);
00247 }
00248 }
00249 this->defunct_.reset();
00250
00251 return status;
00252 }
00253
00254 void DCPS_IR_Subscription::disassociate_participant(OpenDDS::DCPS::RepoId id,
00255 bool reassociate)
00256 {
00257 DCPS_IR_Publication* pub = 0;
00258 size_t numAssociations = associations_.size();
00259 CORBA::Boolean dontSend = 0;
00260 CORBA::Boolean send = 1;
00261 long count = 0;
00262
00263 if (0 < numAssociations) {
00264 OpenDDS::DCPS::WriterIdSeq idSeq(static_cast<CORBA::ULong>(numAssociations));
00265 idSeq.length(static_cast<CORBA::ULong>(numAssociations));
00266
00267 DCPS_IR_Publication_Set::ITERATOR iter = associations_.begin();
00268 DCPS_IR_Publication_Set::ITERATOR end = associations_.end();
00269
00270 while (iter != end) {
00271 pub = *iter;
00272 ++iter;
00273
00274 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00275 OpenDDS::DCPS::RepoIdConverter sub_converter(id_);
00276 OpenDDS::DCPS::RepoIdConverter pub_converter(pub->get_id());
00277 OpenDDS::DCPS::RepoIdConverter sub_part_converter(id);
00278 OpenDDS::DCPS::RepoIdConverter pub_part_converter(pub->get_participant_id());
00279 ACE_DEBUG((LM_DEBUG,
00280 ACE_TEXT("(%P|%t) DCPS_IR_Subscription::disassociate_participant: ")
00281 ACE_TEXT("subscription %C testing if publication %C particpant %C == %C.\n"),
00282 std::string(sub_converter).c_str(),
00283 std::string(pub_converter).c_str(),
00284 std::string(sub_part_converter).c_str(),
00285 std::string(pub_part_converter).c_str()));
00286 }
00287
00288 if (id == pub->get_participant_id()) {
00289 CORBA::Boolean dont_notify_lost = 0;
00290 pub->remove_associated_subscription(this, send, dont_notify_lost);
00291 remove_associated_publication(pub, dontSend, dont_notify_lost);
00292
00293 idSeq[count] = pub->get_id();
00294 ++count;
00295
00296 if (reassociate && this->defunct_.insert(pub) != 0) {
00297 OpenDDS::DCPS::RepoIdConverter sub_converter(id_);
00298 OpenDDS::DCPS::RepoIdConverter pub_converter(pub->get_id());
00299 ACE_ERROR((LM_ERROR,
00300 ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Subscription::disassociate_participant: ")
00301 ACE_TEXT("subscription %C failed to reassociate publication %C at %x.\n"),
00302 std::string(sub_converter).c_str(),
00303 std::string(pub_converter).c_str(),
00304 pub));
00305 }
00306 }
00307 }
00308
00309 if (0 < count) {
00310 idSeq.length(count);
00311
00312 if (participant_->is_alive() && this->participant_->isOwner()) {
00313 try {
00314 CORBA::Boolean dont_notify_lost = 0;
00315 reader_->remove_associations(idSeq, dont_notify_lost);
00316
00317 } catch (const CORBA::Exception& ex) {
00318 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00319 ex._tao_print_exception(
00320 "(%P|%t) ERROR: Exception caught in DCPS_IR_Subscription::disassociate_participant:");
00321 }
00322
00323 participant_->mark_dead();
00324 }
00325 }
00326 }
00327 }
00328 }
00329
00330 void DCPS_IR_Subscription::disassociate_topic(OpenDDS::DCPS::RepoId id)
00331 {
00332 DCPS_IR_Publication* pub = 0;
00333 size_t numAssociations = associations_.size();
00334 CORBA::Boolean dontSend = 0;
00335 CORBA::Boolean send = 1;
00336 long count = 0;
00337
00338 if (0 < numAssociations) {
00339 OpenDDS::DCPS::WriterIdSeq idSeq(static_cast<CORBA::ULong>(numAssociations));
00340 idSeq.length(static_cast<CORBA::ULong>(numAssociations));
00341
00342 DCPS_IR_Publication_Set::ITERATOR iter = associations_.begin();
00343 DCPS_IR_Publication_Set::ITERATOR end = associations_.end();
00344
00345 while (iter != end) {
00346 pub = *iter;
00347 ++iter;
00348
00349 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00350 OpenDDS::DCPS::RepoIdConverter sub_converter(id_);
00351 OpenDDS::DCPS::RepoIdConverter pub_converter(pub->get_id());
00352 OpenDDS::DCPS::RepoIdConverter sub_topic_converter(id);
00353 OpenDDS::DCPS::RepoIdConverter pub_topic_converter(pub->get_topic_id());
00354 ACE_DEBUG((LM_DEBUG,
00355 ACE_TEXT("(%P|%t) DCPS_IR_Subscription::disassociate_topic: ")
00356 ACE_TEXT("subscription %C testing if publication %C topic %C == %C.\n"),
00357 std::string(sub_converter).c_str(),
00358 std::string(pub_converter).c_str(),
00359 std::string(sub_topic_converter).c_str(),
00360 std::string(pub_topic_converter).c_str()));
00361 }
00362
00363 if (id == pub->get_topic_id()) {
00364 CORBA::Boolean dont_notify_lost = 0;
00365 pub->remove_associated_subscription(this, send, dont_notify_lost);
00366 remove_associated_publication(pub, dontSend, dont_notify_lost);
00367
00368 idSeq[count] = pub->get_id();
00369 ++count;
00370 }
00371 }
00372
00373 if (0 < count) {
00374 idSeq.length(count);
00375
00376 if (participant_->is_alive() && this->participant_->isOwner()) {
00377 try {
00378 CORBA::Boolean dont_notify_lost = false;
00379 reader_->remove_associations(idSeq, dont_notify_lost);
00380
00381 } catch (const CORBA::Exception& ex) {
00382 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00383 ex._tao_print_exception(
00384 "(%P|%t) ERROR: Exception caught in DCPS_IR_Subscription::remove_associations:");
00385 }
00386
00387 participant_->mark_dead();
00388 }
00389 }
00390 }
00391 }
00392 }
00393
00394 void DCPS_IR_Subscription::disassociate_publication(OpenDDS::DCPS::RepoId id,
00395 bool reassociate)
00396 {
00397 DCPS_IR_Publication* pub = 0;
00398 size_t numAssociations = associations_.size();
00399 CORBA::Boolean dontSend = 0;
00400 CORBA::Boolean send = 1;
00401 long count = 0;
00402
00403 if (0 < numAssociations) {
00404 OpenDDS::DCPS::WriterIdSeq idSeq(static_cast<CORBA::ULong>(numAssociations));
00405 idSeq.length(static_cast<CORBA::ULong>(numAssociations));
00406
00407 DCPS_IR_Publication_Set::ITERATOR iter = associations_.begin();
00408 DCPS_IR_Publication_Set::ITERATOR end = associations_.end();
00409
00410 while (iter != end) {
00411 pub = *iter;
00412 ++iter;
00413
00414 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00415 OpenDDS::DCPS::RepoIdConverter sub_converter(id_);
00416 OpenDDS::DCPS::RepoIdConverter pub_converter(pub->get_id());
00417 OpenDDS::DCPS::RepoIdConverter sub_pub_converter(id);
00418 ACE_DEBUG((LM_DEBUG,
00419 ACE_TEXT("(%P|%t) DCPS_IR_Subscription::disassociate_publication: ")
00420 ACE_TEXT("subscription %C testing if publication %C == %C.\n"),
00421 std::string(sub_converter).c_str(),
00422 std::string(pub_converter).c_str(),
00423 std::string(sub_pub_converter).c_str()));
00424 }
00425
00426 if (id == pub->get_id()) {
00427 CORBA::Boolean dont_notify_lost = 0;
00428 pub->remove_associated_subscription(this, send, dont_notify_lost);
00429 remove_associated_publication(pub, dontSend, dont_notify_lost);
00430
00431 idSeq[count] = pub->get_id();
00432 ++count;
00433
00434 if (reassociate && this->defunct_.insert(pub) != 0) {
00435 OpenDDS::DCPS::RepoIdConverter sub_converter(id_);
00436 OpenDDS::DCPS::RepoIdConverter pub_converter(pub->get_id());
00437 ACE_ERROR((LM_ERROR,
00438 ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Subscription::disassociate_publication: ")
00439 ACE_TEXT("subscription %C failed to reassociate publication %C at %x.\n"),
00440 std::string(sub_converter).c_str(),
00441 std::string(pub_converter).c_str(),
00442 pub));
00443 }
00444 }
00445 }
00446
00447 if (0 < count) {
00448 idSeq.length(count);
00449
00450 if (participant_->is_alive() && this->participant_->isOwner()) {
00451 try {
00452 CORBA::Boolean dont_notify_lost = 0;
00453 reader_->remove_associations(idSeq, dont_notify_lost);
00454
00455 } catch (const CORBA::Exception& ex) {
00456 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00457 ex._tao_print_exception(
00458 "(%P|%t) ERROR: Exception caught in DCPS_IR_Subscription::remove_associations:");
00459 }
00460
00461 participant_->mark_dead();
00462 }
00463 }
00464 }
00465 }
00466 }
00467
00468 void DCPS_IR_Subscription::update_incompatible_qos()
00469 {
00470 if (participant_->is_alive() && this->participant_->isOwner()) {
00471 try {
00472 reader_->update_incompatible_qos(incompatibleQosStatus_);
00473 incompatibleQosStatus_.count_since_last_send = 0;
00474 } catch (const CORBA::Exception& ex) {
00475 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00476 ex._tao_print_exception(
00477 "(%P|%t) ERROR: Exception caught in DCPS_IR_Subscription::update_incompatible_qos:");
00478 }
00479
00480 participant_->mark_dead();
00481 }
00482 }
00483 }
00484
00485 CORBA::Boolean DCPS_IR_Subscription::is_publication_ignored(OpenDDS::DCPS::RepoId partId,
00486 OpenDDS::DCPS::RepoId topicId,
00487 OpenDDS::DCPS::RepoId pubId)
00488 {
00489 CORBA::Boolean ignored = (participant_->is_participant_ignored(partId) ||
00490 participant_->is_topic_ignored(topicId) ||
00491 participant_->is_publication_ignored(pubId));
00492
00493 return ignored;
00494 }
00495
00496 OpenDDS::DCPS::TransportLocatorSeq DCPS_IR_Subscription::get_transportLocatorSeq() const
00497 {
00498 return info_;
00499 }
00500
00501 OpenDDS::DCPS::IncompatibleQosStatus* DCPS_IR_Subscription::get_incompatibleQosStatus()
00502 {
00503 return &incompatibleQosStatus_;
00504 }
00505
00506 const DDS::DataReaderQos* DCPS_IR_Subscription::get_datareader_qos()
00507 {
00508 return &qos_;
00509 }
00510
00511 const DDS::SubscriberQos* DCPS_IR_Subscription::get_subscriber_qos()
00512 {
00513 return &subscriberQos_;
00514 }
00515
00516 using OpenDDS::DCPS::operator==;
00517
00518 void
00519 DCPS_IR_Subscription::set_qos(const DDS::DataReaderQos& qos)
00520 {
00521 if (false == (qos == this->qos_)) {
00522
00523 bool check =
00524 OpenDDS::DCPS::should_check_association_upon_change(qos, this->qos_);
00525
00526
00527 this->qos_ = qos;
00528
00529 if (check) {
00530
00531 this->reevaluate_existing_associations();
00532
00533
00534
00535
00536 ACE_OS::sleep(ACE_Time_Value(0, 250000));
00537
00538
00539 DCPS_IR_Topic_Description* description
00540 = this->topic_->get_topic_description();
00541 description->reevaluate_associations(this);
00542 }
00543
00544 this->participant_->get_domain_reference()->publish_subscription_bit(this);
00545 }
00546 }
00547
00548 void
00549 DCPS_IR_Subscription::set_qos(const DDS::SubscriberQos& qos)
00550 {
00551 if (false == (qos == this->subscriberQos_)) {
00552
00553 bool check = OpenDDS::DCPS::should_check_association_upon_change(qos, this->subscriberQos_);
00554
00555
00556 this->subscriberQos_ = qos;
00557
00558 if (check) {
00559
00560 this->reevaluate_existing_associations();
00561
00562
00563
00564
00565 ACE_OS::sleep(ACE_Time_Value(0, 250000));
00566
00567
00568 DCPS_IR_Topic_Description* description
00569 = this->topic_->get_topic_description();
00570 description->reevaluate_associations(this);
00571 }
00572
00573 this->participant_->get_domain_reference()->publish_subscription_bit(this);
00574 }
00575 }
00576
00577 bool DCPS_IR_Subscription::set_qos(const DDS::DataReaderQos & qos,
00578 const DDS::SubscriberQos & subscriberQos,
00579 Update::SpecificQos& specificQos)
00580 {
00581 bool need_evaluate = false;
00582 bool u_dr_qos = !(qos_ == qos);
00583
00584 if (u_dr_qos) {
00585 if (OpenDDS::DCPS::should_check_association_upon_change(qos_, qos)) {
00586 need_evaluate = true;
00587 }
00588
00589 qos_ = qos;
00590 }
00591
00592 bool u_sub_qos = !(subscriberQos_ == subscriberQos);
00593
00594 if (u_sub_qos) {
00595 if (OpenDDS::DCPS::should_check_association_upon_change(subscriberQos_, subscriberQos)) {
00596 need_evaluate = true;
00597 }
00598
00599 subscriberQos_ = subscriberQos;
00600 }
00601
00602 if (need_evaluate) {
00603
00604 this->reevaluate_existing_associations();
00605
00606 DCPS_IR_Topic_Description* description = this->topic_->get_topic_description();
00607 description->reevaluate_associations(this);
00608 }
00609
00610 participant_->get_domain_reference()->publish_subscription_bit(this);
00611 specificQos = u_dr_qos? Update::DataReaderQos:
00612 u_sub_qos? Update::SubscriberQos:
00613 Update::NoQos;
00614
00615 return true;
00616 }
00617
00618 void
00619 DCPS_IR_Subscription::reevaluate_defunct_associations()
00620 {
00621 DCPS_IR_Publication_Set::iterator it(this->defunct_.begin());
00622 while (it != this->defunct_.end()) {
00623 DCPS_IR_Publication* publication = *it;
00624 ++it;
00625
00626 if (reevaluate_association(publication)) {
00627 this->defunct_.remove(publication);
00628
00629 } else {
00630 OpenDDS::DCPS::RepoIdConverter sub_converter(id_);
00631 OpenDDS::DCPS::RepoIdConverter pub_converter(publication->get_id());
00632 ACE_ERROR((LM_ERROR,
00633 ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Subscription::reevaluate_defunct_associations: ")
00634 ACE_TEXT("subscription %C failed to reassociate publication %C at %x.\n"),
00635 std::string(sub_converter).c_str(),
00636 std::string(pub_converter).c_str(),
00637 publication));
00638 }
00639 }
00640 }
00641
00642 void DCPS_IR_Subscription::reevaluate_existing_associations()
00643 {
00644 DCPS_IR_Publication * pub = 0;
00645 DCPS_IR_Publication_Set::ITERATOR iter = associations_.begin();
00646 DCPS_IR_Publication_Set::ITERATOR end = associations_.end();
00647
00648 while (iter != end) {
00649 pub = *iter;
00650 ++iter;
00651
00652 this->reevaluate_association(pub);
00653 }
00654 }
00655
00656 bool
00657 DCPS_IR_Subscription::reevaluate_association(DCPS_IR_Publication* publication)
00658 {
00659 int status = this->associations_.find(publication);
00660
00661 if (status == 0) {
00662
00663
00664 if (!OpenDDS::DCPS::compatibleQOS(publication->get_incompatibleQosStatus(),
00665 this->get_incompatibleQosStatus(),
00666 publication->get_transportLocatorSeq(),
00667 this->get_transportLocatorSeq(),
00668 publication->get_datawriter_qos(),
00669 this->get_datareader_qos(),
00670 publication->get_publisher_qos(),
00671 this->get_subscriber_qos())) {
00672 bool sendNotify = true;
00673 bool notify_lost = true;
00674
00675 this->remove_associated_publication(publication, sendNotify, notify_lost, true);
00676 }
00677
00678 } else {
00679 DCPS_IR_Topic_Description* description = this->topic_->get_topic_description();
00680 return description->try_associate(publication, this);
00681 }
00682
00683 return false;
00684 }
00685
00686 OpenDDS::DCPS::RepoId DCPS_IR_Subscription::get_id()
00687 {
00688 return id_;
00689 }
00690
00691 OpenDDS::DCPS::RepoId DCPS_IR_Subscription::get_topic_id()
00692 {
00693 return topic_->get_id();
00694 }
00695
00696 OpenDDS::DCPS::RepoId DCPS_IR_Subscription::get_participant_id()
00697 {
00698 return participant_->get_id();
00699 }
00700
00701 DCPS_IR_Topic_Description* DCPS_IR_Subscription::get_topic_description()
00702 {
00703 return topic_->get_topic_description();
00704 }
00705
00706 DCPS_IR_Topic* DCPS_IR_Subscription::get_topic()
00707 {
00708 return topic_;
00709 }
00710
00711 DDS::InstanceHandle_t DCPS_IR_Subscription::get_handle()
00712 {
00713 return handle_;
00714 }
00715
00716 void DCPS_IR_Subscription::set_handle(DDS::InstanceHandle_t handle)
00717 {
00718 handle_ = handle;
00719 }
00720
00721 CORBA::Boolean DCPS_IR_Subscription::is_bit()
00722 {
00723 return isBIT_;
00724 }
00725
00726 void DCPS_IR_Subscription::set_bit_status(CORBA::Boolean isBIT)
00727 {
00728 isBIT_ = isBIT;
00729 }
00730
00731 OpenDDS::DCPS::DataReaderRemote_ptr
00732 DCPS_IR_Subscription::reader()
00733 {
00734 return OpenDDS::DCPS::DataReaderRemote::_duplicate(this->reader_.in());
00735 }
00736
00737 std::string
00738 DCPS_IR_Subscription::get_filter_class_name() const
00739 {
00740 return filterClassName_;
00741 }
00742
00743 std::string
00744 DCPS_IR_Subscription::get_filter_expression() const
00745 {
00746 return filterExpression_;
00747 }
00748
00749 DDS::StringSeq
00750 DCPS_IR_Subscription::get_expr_params() const
00751 {
00752 return exprParams_;
00753 }
00754
00755 void
00756 DCPS_IR_Subscription::update_expr_params(const DDS::StringSeq& params)
00757 {
00758 exprParams_ = params;
00759 typedef DCPS_IR_Publication_Set::ITERATOR iter_t;
00760 for (iter_t i(associations_.begin()), e(associations_.end()); i != e; ++i) {
00761 (*i)->update_expr_params(id_, params);
00762 }
00763 }
00764
00765 std::string
00766 DCPS_IR_Subscription::dump_to_string(const std::string& prefix, int depth) const
00767 {
00768 std::string str;
00769 #if !defined (OPENDDS_INFOREPO_REDUCED_FOOTPRINT)
00770 OpenDDS::DCPS::RepoIdConverter local_converter(id_);
00771
00772 for (int i=0; i < depth; i++)
00773 str += prefix;
00774 std::string indent = str + prefix;
00775 str += "DCPS_IR_Subscription[";
00776 str += std::string(local_converter);
00777 str += "]";
00778 if (isBIT_)
00779 str += " (BIT)";
00780 str += "\n";
00781
00782 str += indent + "Associations [ ";
00783 for (DCPS_IR_Publication_Set::const_iterator assoc = associations_.begin();
00784 assoc != associations_.end();
00785 assoc++)
00786 {
00787 OpenDDS::DCPS::RepoIdConverter assoc_converter((*assoc)->get_id());
00788 str += std::string(assoc_converter);
00789 str += " ";
00790 }
00791 str += "]\n";
00792
00793 str += indent + "Defunct Associations [ ";
00794 for (DCPS_IR_Publication_Set::const_iterator def = defunct_.begin();
00795 def != defunct_.end();
00796 def++)
00797 {
00798 OpenDDS::DCPS::RepoIdConverter def_converter((*def)->get_id());
00799 str += std::string(def_converter);
00800 str += " ";
00801 }
00802 str += "]\n";
00803
00804 #endif // !defined (OPENDDS_INFOREPO_REDUCED_FOOTPRINT)
00805 return str;
00806 }
00807
00808 OPENDDS_END_VERSIONED_NAMESPACE_DECL