00001
00002
00003
00004
00005
00006
00007
00008 #include "DcpsInfo_pch.h"
00009 #include "DCPS_IR_Publication.h"
00010
00011 #include "DCPS_IR_Participant.h"
00012 #include "DCPS_IR_Topic.h"
00013 #include "DCPS_IR_Subscription.h"
00014 #include "DCPS_IR_Domain.h"
00015 #include "DCPS_IR_Topic_Description.h"
00016 #include "dds/DCPS/DCPS_Utils.h"
00017 #include "dds/DdsDcpsInfoUtilsC.h"
00018 #include "dds/DCPS/RepoIdConverter.h"
00019 #include "dds/DCPS/Qos_Helper.h"
00020 #include "tao/debug.h"
00021
00022 DCPS_IR_Publication::DCPS_IR_Publication(const OpenDDS::DCPS::RepoId& id,
00023 DCPS_IR_Participant* participant,
00024 DCPS_IR_Topic* topic,
00025 OpenDDS::DCPS::DataWriterRemote_ptr writer,
00026 const DDS::DataWriterQos& qos,
00027 const OpenDDS::DCPS::TransportLocatorSeq& info,
00028 const DDS::PublisherQos& publisherQos)
00029 : id_(id),
00030 participant_(participant),
00031 topic_(topic),
00032 handle_(0),
00033 isBIT_(0),
00034 qos_(qos),
00035 info_(info),
00036 publisherQos_(publisherQos)
00037 {
00038 writer_ = OpenDDS::DCPS::DataWriterRemote::_duplicate(writer);
00039
00040 incompatibleQosStatus_.total_count = 0;
00041 incompatibleQosStatus_.count_since_last_send = 0;
00042 }
00043
00044 DCPS_IR_Publication::~DCPS_IR_Publication()
00045 {
00046 if (0 != associations_.size()) {
00047 CORBA::Boolean dont_notify_lost = 0;
00048 remove_associations(dont_notify_lost);
00049 }
00050 }
00051
00052 int DCPS_IR_Publication::add_associated_subscription(DCPS_IR_Subscription* sub,
00053 bool active)
00054 {
00055
00056 int status = associations_.insert(sub);
00057
00058 switch (status) {
00059 case 0: {
00060
00061 OpenDDS::DCPS::ReaderAssociation association;
00062 association.readerTransInfo = sub->get_transportLocatorSeq();
00063 association.readerId = sub->get_id();
00064 association.subQos = *(sub->get_subscriber_qos());
00065 association.readerQos = *(sub->get_datareader_qos());
00066 association.filterClassName = sub->get_filter_class_name().c_str();
00067 association.filterExpression = sub->get_filter_expression().c_str();
00068 association.exprParams = sub->get_expr_params();
00069
00070 if (participant_->is_alive() && this->participant_->isOwner()) {
00071 try {
00072 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00073 OpenDDS::DCPS::RepoIdConverter pub_converter(id_);
00074 OpenDDS::DCPS::RepoIdConverter sub_converter(sub->get_id());
00075 ACE_DEBUG((LM_DEBUG,
00076 ACE_TEXT("(%P|%t) DCPS_IR_Publication::add_associated_subscription:")
00077 ACE_TEXT(" publication %C adding subscription %C.\n"),
00078 std::string(pub_converter).c_str(),
00079 std::string(sub_converter).c_str()));
00080 }
00081
00082 writer_->add_association(id_, association, active);
00083
00084 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00085 ACE_DEBUG((LM_DEBUG,
00086 ACE_TEXT("(%P|%t) DCPS_IR_Publication::add_associated_subscription: ")
00087 ACE_TEXT("successfully added subscription %x.\n"),
00088 sub));
00089 }
00090 } catch (const CORBA::Exception& ex) {
00091 ex._tao_print_exception(
00092 "(%P|%t) ERROR: Exception caught in DCPS_IR_Publication::add_associated_subscription:");
00093 participant_->mark_dead();
00094 status = -1;
00095 }
00096 }
00097 }
00098 break;
00099
00100 case 1: {
00101 OpenDDS::DCPS::RepoIdConverter pub_converter(id_);
00102 OpenDDS::DCPS::RepoIdConverter sub_converter(sub->get_id());
00103 ACE_ERROR((LM_ERROR,
00104 ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Publication::add_associated_subscription: ")
00105 ACE_TEXT("publication %C attempted to re-add subscription %C.\n"),
00106 std::string(pub_converter).c_str(),
00107 std::string(sub_converter).c_str()));
00108 }
00109 break;
00110
00111 case -1: {
00112 OpenDDS::DCPS::RepoIdConverter pub_converter(id_);
00113 OpenDDS::DCPS::RepoIdConverter sub_converter(sub->get_id());
00114 ACE_ERROR((LM_ERROR,
00115 ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Publication::add_associated_subscription: ")
00116 ACE_TEXT("publication %C failed to add subscription %C.\n"),
00117 std::string(pub_converter).c_str(),
00118 std::string(sub_converter).c_str()));
00119 }
00120 };
00121
00122 return status;
00123 }
00124
00125 void
00126 DCPS_IR_Publication::association_complete(const OpenDDS::DCPS::RepoId& remote)
00127 {
00128 typedef DCPS_IR_Subscription_Set::ITERATOR iter_t;
00129 for (iter_t iter = associations_.begin(); iter != associations_.end(); ++iter) {
00130 if ((*iter)->get_id() == remote) {
00131 (*iter)->call_association_complete(get_id());
00132 }
00133 }
00134 }
00135
00136 void
00137 DCPS_IR_Publication::call_association_complete(const OpenDDS::DCPS::RepoId& remote)
00138 {
00139 try {
00140 writer_->association_complete(remote);
00141 } catch (const CORBA::Exception& ex) {
00142 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00143 ex._tao_print_exception(
00144 "(%P|%t) ERROR: Exception caught in DCPS_IR_Publication::call_association_complete:");
00145 }
00146 participant_->mark_dead();
00147 }
00148 }
00149
00150 int DCPS_IR_Publication::remove_associated_subscription(DCPS_IR_Subscription* sub,
00151 CORBA::Boolean sendNotify,
00152 CORBA::Boolean notify_lost,
00153 bool notify_both_side)
00154 {
00155 bool marked_dead = false;
00156
00157 if (sendNotify) {
00158 OpenDDS::DCPS::ReaderIdSeq idSeq(1);
00159 idSeq.length(1);
00160 idSeq[0]= sub->get_id();
00161
00162 if (participant_->is_alive() && this->participant_->isOwner()) {
00163 try {
00164 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00165 ACE_DEBUG((LM_DEBUG,
00166 ACE_TEXT("(%P|%t) DCPS_IR_Publication::remove_associated_subscription:")
00167 ACE_TEXT(" calling pub %d with sub %d\n"),
00168 id_, sub->get_id()));
00169 }
00170
00171 writer_->remove_associations(idSeq, notify_lost);
00172
00173 if (notify_both_side) {
00174 sub->remove_associated_publication(this, sendNotify, notify_lost);
00175 }
00176
00177 } catch (const CORBA::Exception& ex) {
00178 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00179 ex._tao_print_exception(
00180 "(%P|%t) ERROR: Exception caught in DCPS_IR_Publication::remove_associated_publication:");
00181 }
00182
00183 participant_->mark_dead();
00184 marked_dead = true;
00185 }
00186 }
00187 }
00188
00189 int status = associations_.remove(sub);
00190
00191 if (0 == status) {
00192 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00193 OpenDDS::DCPS::RepoIdConverter pub_converter(id_);
00194 OpenDDS::DCPS::RepoIdConverter sub_converter(sub->get_id());
00195 ACE_DEBUG((LM_DEBUG,
00196 ACE_TEXT("(%P|%t) DCPS_IR_Publication::remove_associated_subscription: ")
00197 ACE_TEXT("publication %C removed subscription %C at %x.\n"),
00198 std::string(pub_converter).c_str(),
00199 std::string(sub_converter).c_str(),
00200 sub));
00201 }
00202
00203 } else {
00204 OpenDDS::DCPS::RepoIdConverter pub_converter(id_);
00205 OpenDDS::DCPS::RepoIdConverter sub_converter(sub->get_id());
00206 ACE_ERROR((LM_ERROR,
00207 ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Publication::remove_associated_subscription: ")
00208 ACE_TEXT("publication %C failed to remove subscription %C at %x.\n"),
00209 std::string(pub_converter).c_str(),
00210 std::string(sub_converter).c_str(),
00211 sub));
00212 }
00213
00214 if (marked_dead) {
00215 return -1;
00216
00217 } else {
00218 return status;
00219 }
00220 }
00221
00222 int DCPS_IR_Publication::remove_associations(CORBA::Boolean notify_lost)
00223 {
00224 int status = 0;
00225 DCPS_IR_Subscription* sub = 0;
00226 size_t numAssociations = associations_.size();
00227 CORBA::Boolean dontSend = 0;
00228 CORBA::Boolean send = 1;
00229
00230 if (0 < numAssociations) {
00231 DCPS_IR_Subscription_Set::ITERATOR iter = associations_.begin();
00232 DCPS_IR_Subscription_Set::ITERATOR end = associations_.end();
00233
00234 while (iter != end) {
00235 sub = *iter;
00236 ++iter;
00237 sub->remove_associated_publication(this, send, notify_lost);
00238 remove_associated_subscription(sub, dontSend, notify_lost);
00239 }
00240 }
00241 this->defunct_.reset();
00242
00243 return status;
00244 }
00245
00246 void DCPS_IR_Publication::disassociate_participant(OpenDDS::DCPS::RepoId id,
00247 bool reassociate)
00248 {
00249 DCPS_IR_Subscription* sub = 0;
00250 size_t numAssociations = associations_.size();
00251 CORBA::Boolean send = 1;
00252 CORBA::Boolean dontSend = 0;
00253 long count = 0;
00254
00255 if (0 < numAssociations) {
00256 OpenDDS::DCPS::ReaderIdSeq idSeq(static_cast<CORBA::ULong>(numAssociations));
00257 idSeq.length(static_cast<CORBA::ULong>(numAssociations));
00258
00259 DCPS_IR_Subscription_Set::ITERATOR iter = associations_.begin();
00260 DCPS_IR_Subscription_Set::ITERATOR end = associations_.end();
00261
00262 while (iter != end) {
00263 sub = *iter;
00264 ++iter;
00265
00266 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00267 OpenDDS::DCPS::RepoIdConverter pub_converter(id_);
00268 OpenDDS::DCPS::RepoIdConverter sub_converter(sub->get_id());
00269 OpenDDS::DCPS::RepoIdConverter pub_part_converter(id);
00270 OpenDDS::DCPS::RepoIdConverter sub_part_converter(sub->get_participant_id());
00271 ACE_DEBUG((LM_DEBUG,
00272 ACE_TEXT("(%P|%t) DCPS_IR_Publication::disassociate_participant: ")
00273 ACE_TEXT("publication %C testing if subscription %C particpant %C == %C.\n"),
00274 std::string(pub_converter).c_str(),
00275 std::string(sub_converter).c_str(),
00276 std::string(sub_part_converter).c_str(),
00277 std::string(pub_part_converter).c_str()));
00278 }
00279
00280 if (id == sub->get_participant_id()) {
00281 CORBA::Boolean dont_notify_lost = 0;
00282 sub->remove_associated_publication(this, send, dont_notify_lost);
00283 remove_associated_subscription(sub, dontSend, dont_notify_lost);
00284
00285 idSeq[count] = sub->get_id();
00286 ++count;
00287
00288 if (reassociate && this->defunct_.insert(sub) != 0) {
00289 OpenDDS::DCPS::RepoIdConverter pub_converter(id_);
00290 OpenDDS::DCPS::RepoIdConverter sub_converter(sub->get_id());
00291 ACE_ERROR((LM_ERROR,
00292 ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Publication::disassociate_participant: ")
00293 ACE_TEXT("publication %C failed to reassociate subscription %C at %x.\n"),
00294 std::string(pub_converter).c_str(),
00295 std::string(sub_converter).c_str(),
00296 sub));
00297 }
00298 }
00299 }
00300
00301 if (0 < count) {
00302 idSeq.length(count);
00303
00304 if (participant_->is_alive() && this->participant_->isOwner()) {
00305 try {
00306 CORBA::Boolean dont_notify_lost = 0;
00307 writer_->remove_associations(idSeq, dont_notify_lost);
00308
00309 } catch (const CORBA::Exception& ex) {
00310 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00311 ex._tao_print_exception(
00312 "(%P|%t) ERROR: Exception caught in DCPS_IR_Publication::disassociate_participant:");
00313 }
00314
00315 participant_->mark_dead();
00316 }
00317 }
00318 }
00319 }
00320 }
00321
00322 void DCPS_IR_Publication::disassociate_topic(OpenDDS::DCPS::RepoId id)
00323 {
00324 DCPS_IR_Subscription* sub = 0;
00325 size_t numAssociations = associations_.size();
00326 CORBA::Boolean send = 1;
00327 CORBA::Boolean dontSend = 0;
00328 long count = 0;
00329
00330 if (0 < numAssociations) {
00331 OpenDDS::DCPS::ReaderIdSeq idSeq(static_cast<CORBA::ULong>(numAssociations));
00332 idSeq.length(static_cast<CORBA::ULong>(numAssociations));
00333
00334 DCPS_IR_Subscription_Set::ITERATOR iter = associations_.begin();
00335 DCPS_IR_Subscription_Set::ITERATOR end = associations_.end();
00336
00337 while (iter != end) {
00338 sub = *iter;
00339 ++iter;
00340
00341 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00342 OpenDDS::DCPS::RepoIdConverter pub_converter(id_);
00343 OpenDDS::DCPS::RepoIdConverter sub_converter(sub->get_id());
00344 OpenDDS::DCPS::RepoIdConverter pub_topic_converter(id);
00345 OpenDDS::DCPS::RepoIdConverter sub_topic_converter(sub->get_topic_id());
00346 ACE_DEBUG((LM_DEBUG,
00347 ACE_TEXT("(%P|%t) DCPS_IR_Publication::disassociate_topic: ")
00348 ACE_TEXT("publication %C testing if subscription %C topic %C == %C.\n"),
00349 std::string(pub_converter).c_str(),
00350 std::string(sub_converter).c_str(),
00351 std::string(sub_topic_converter).c_str(),
00352 std::string(pub_topic_converter).c_str()));
00353 }
00354
00355 if (id == sub->get_topic_id()) {
00356 CORBA::Boolean dont_notify_lost = 0;
00357 sub->remove_associated_publication(this, send, dont_notify_lost);
00358 remove_associated_subscription(sub, dontSend, dont_notify_lost);
00359
00360 idSeq[count] = sub->get_id();
00361 ++count;
00362 }
00363 }
00364
00365 if (0 < count) {
00366 idSeq.length(count);
00367
00368 if (participant_->is_alive() && this->participant_->isOwner()) {
00369 try {
00370 CORBA::Boolean dont_notify_lost = 0;
00371 writer_->remove_associations(idSeq, dont_notify_lost);
00372
00373 } catch (const CORBA::Exception& ex) {
00374 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00375 ex._tao_print_exception(
00376 "(%P|%t) ERROR: Exception caught in DCPS_IR_Publication::remove_associations:");
00377 }
00378
00379 participant_->mark_dead();
00380 }
00381 }
00382 }
00383 }
00384 }
00385
00386 void DCPS_IR_Publication::disassociate_subscription(OpenDDS::DCPS::RepoId id,
00387 bool reassociate)
00388 {
00389 DCPS_IR_Subscription* sub = 0;
00390 size_t numAssociations = associations_.size();
00391 CORBA::Boolean send = 1;
00392 CORBA::Boolean dontSend = 0;
00393 long count = 0;
00394
00395 if (0 < numAssociations) {
00396 OpenDDS::DCPS::ReaderIdSeq idSeq(static_cast<CORBA::ULong>(numAssociations));
00397 idSeq.length(static_cast<CORBA::ULong>(numAssociations));
00398
00399 DCPS_IR_Subscription_Set::ITERATOR iter = associations_.begin();
00400 DCPS_IR_Subscription_Set::ITERATOR end = associations_.end();
00401
00402 while (iter != end) {
00403 sub = *iter;
00404 ++iter;
00405
00406 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00407 OpenDDS::DCPS::RepoIdConverter pub_converter(id_);
00408 OpenDDS::DCPS::RepoIdConverter sub_converter(sub->get_id());
00409 OpenDDS::DCPS::RepoIdConverter pub_sub_converter(id);
00410 ACE_DEBUG((LM_DEBUG,
00411 ACE_TEXT("(%P|%t) DCPS_IR_Publication::disassociate_subscription: ")
00412 ACE_TEXT("publication %C testing if subscription %C == %C.\n"),
00413 std::string(pub_converter).c_str(),
00414 std::string(sub_converter).c_str(),
00415 std::string(pub_sub_converter).c_str()));
00416 }
00417
00418 if (id == sub->get_id()) {
00419 CORBA::Boolean dont_notify_lost = 0;
00420 sub->remove_associated_publication(this, send, dont_notify_lost);
00421 remove_associated_subscription(sub, dontSend, dont_notify_lost);
00422
00423 idSeq[count] = sub->get_id();
00424 ++count;
00425
00426 if (reassociate && this->defunct_.insert(sub) != 0) {
00427 OpenDDS::DCPS::RepoIdConverter pub_converter(id_);
00428 OpenDDS::DCPS::RepoIdConverter sub_converter(sub->get_id());
00429 ACE_ERROR((LM_ERROR,
00430 ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Publication::disassociate_subscription: ")
00431 ACE_TEXT("publication %C failed to reassociate subscription %C at %x.\n"),
00432 std::string(pub_converter).c_str(),
00433 std::string(sub_converter).c_str(),
00434 sub));
00435 }
00436 }
00437 }
00438
00439 if (0 < count) {
00440 idSeq.length(count);
00441
00442 if (participant_->is_alive() && this->participant_->isOwner()) {
00443 try {
00444 CORBA::Boolean dont_notify_lost = 0;
00445 writer_->remove_associations(idSeq, dont_notify_lost);
00446
00447 } catch (const CORBA::Exception& ex) {
00448 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00449 ex._tao_print_exception(
00450 "(%P|%t) ERROR: Exception caught in DCPS_IR_Publication::remove_associations:");
00451 }
00452
00453 participant_->mark_dead();
00454 }
00455 }
00456 }
00457 }
00458 }
00459
00460 void DCPS_IR_Publication::update_incompatible_qos()
00461 {
00462 if (participant_->is_alive() && this->participant_->isOwner()) {
00463 try {
00464 writer_->update_incompatible_qos(incompatibleQosStatus_);
00465 incompatibleQosStatus_.count_since_last_send = 0;
00466 } catch (const CORBA::Exception& ex) {
00467 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00468 ex._tao_print_exception(
00469 "(%P|%t) ERROR: Exception caught in DCPS_IR_Publication::update_incompatible_qos:");
00470 }
00471
00472 participant_->mark_dead();
00473 }
00474 }
00475 }
00476
00477 CORBA::Boolean DCPS_IR_Publication::is_subscription_ignored(OpenDDS::DCPS::RepoId partId,
00478 OpenDDS::DCPS::RepoId topicId,
00479 OpenDDS::DCPS::RepoId subId)
00480 {
00481 CORBA::Boolean ignored;
00482 ignored = (participant_->is_participant_ignored(partId) ||
00483 participant_->is_topic_ignored(topicId) ||
00484 participant_->is_subscription_ignored(subId));
00485
00486 return ignored;
00487 }
00488
00489 DDS::DataWriterQos* DCPS_IR_Publication::get_datawriter_qos()
00490 {
00491 return &qos_;
00492 }
00493
00494 using OpenDDS::DCPS::operator==;
00495
00496 void
00497 DCPS_IR_Publication::set_qos(const DDS::DataWriterQos& qos)
00498 {
00499 if (false == (qos == this->qos_)) {
00500
00501 bool check = OpenDDS::DCPS::should_check_association_upon_change(qos, this->qos_);
00502
00503
00504 this->qos_ = qos;
00505
00506 if (check) {
00507
00508 this->reevaluate_existing_associations();
00509
00510
00511
00512
00513 ACE_OS::sleep(ACE_Time_Value(0, 250000));
00514
00515
00516 DCPS_IR_Topic_Description* description
00517 = this->topic_->get_topic_description();
00518 description->reevaluate_associations(this);
00519 }
00520
00521 this->participant_->get_domain_reference()->publish_publication_bit(this);
00522 }
00523 }
00524
00525 void
00526 DCPS_IR_Publication::set_qos(const DDS::PublisherQos& qos)
00527 {
00528 if (false == (qos == this->publisherQos_)) {
00529
00530 bool check = OpenDDS::DCPS::should_check_association_upon_change(qos, this->publisherQos_);
00531
00532
00533 this->publisherQos_ = qos;
00534
00535 if (check) {
00536
00537 this->reevaluate_existing_associations();
00538
00539
00540
00541
00542 ACE_OS::sleep(ACE_Time_Value(0, 250000));
00543
00544
00545 DCPS_IR_Topic_Description* description
00546 = this->topic_->get_topic_description();
00547 description->reevaluate_associations(this);
00548 }
00549
00550 this->participant_->get_domain_reference()->publish_publication_bit(this);
00551 }
00552 }
00553
00554 bool DCPS_IR_Publication::set_qos(const DDS::DataWriterQos & qos,
00555 const DDS::PublisherQos & publisherQos,
00556 Update::SpecificQos& specificQos)
00557 {
00558 bool need_evaluate = false;
00559 bool u_dw_qos = !(qos_ == qos);
00560
00561 if (u_dw_qos) {
00562 if (OpenDDS::DCPS::should_check_association_upon_change(qos_, qos)) {
00563 need_evaluate = true;
00564 }
00565
00566 qos_ = qos;
00567 }
00568
00569 bool u_pub_qos = !(publisherQos_ == publisherQos);
00570
00571 if (u_pub_qos) {
00572 if (OpenDDS::DCPS::should_check_association_upon_change(publisherQos_, publisherQos)) {
00573 need_evaluate = true;
00574 }
00575
00576 publisherQos_ = publisherQos;
00577 }
00578
00579 if (need_evaluate) {
00580
00581 this->reevaluate_existing_associations();
00582
00583 DCPS_IR_Topic_Description* description = this->topic_->get_topic_description();
00584 description->reevaluate_associations(this);
00585 }
00586
00587 participant_->get_domain_reference()->publish_publication_bit(this);
00588 specificQos = u_dw_qos? Update::DataWriterQos:
00589 u_pub_qos? Update::PublisherQos:
00590 Update::NoQos;
00591
00592 return true;
00593 }
00594
00595 DDS::PublisherQos* DCPS_IR_Publication::get_publisher_qos()
00596 {
00597 return &publisherQos_;
00598 }
00599
00600 OpenDDS::DCPS::TransportLocatorSeq DCPS_IR_Publication::get_transportLocatorSeq() const
00601 {
00602 return info_;
00603 }
00604
00605 OpenDDS::DCPS::IncompatibleQosStatus* DCPS_IR_Publication::get_incompatibleQosStatus()
00606 {
00607 return &incompatibleQosStatus_;
00608 }
00609
00610 OpenDDS::DCPS::RepoId DCPS_IR_Publication::get_id()
00611 {
00612 return id_;
00613 }
00614
00615 OpenDDS::DCPS::RepoId DCPS_IR_Publication::get_topic_id()
00616 {
00617 return topic_->get_id();
00618 }
00619
00620 OpenDDS::DCPS::RepoId DCPS_IR_Publication::get_participant_id()
00621 {
00622 return participant_->get_id();
00623 }
00624
00625 DCPS_IR_Topic* DCPS_IR_Publication::get_topic()
00626 {
00627 return topic_;
00628 }
00629
00630 DCPS_IR_Topic_Description* DCPS_IR_Publication::get_topic_description()
00631 {
00632 return topic_->get_topic_description();
00633 }
00634
00635 DDS::InstanceHandle_t DCPS_IR_Publication::get_handle()
00636 {
00637 return handle_;
00638 }
00639
00640 void DCPS_IR_Publication::set_handle(DDS::InstanceHandle_t handle)
00641 {
00642 handle_ = handle;
00643 }
00644
00645 CORBA::Boolean DCPS_IR_Publication::is_bit()
00646 {
00647 return isBIT_;
00648 }
00649
00650 void DCPS_IR_Publication::set_bit_status(CORBA::Boolean isBIT)
00651 {
00652 isBIT_ = isBIT;
00653 }
00654
00655 OpenDDS::DCPS::DataWriterRemote_ptr
00656 DCPS_IR_Publication::writer()
00657 {
00658 return OpenDDS::DCPS::DataWriterRemote::_duplicate(this->writer_.in());
00659 }
00660
00661 void
00662 DCPS_IR_Publication::reevaluate_defunct_associations()
00663 {
00664 DCPS_IR_Subscription_Set::iterator it(this->defunct_.begin());
00665 while (it != this->defunct_.end()) {
00666 DCPS_IR_Subscription* subscription = *it;
00667 ++it;
00668
00669 if (reevaluate_association(subscription)) {
00670 this->defunct_.remove(subscription);
00671
00672 } else {
00673 OpenDDS::DCPS::RepoIdConverter pub_converter(id_);
00674 OpenDDS::DCPS::RepoIdConverter sub_converter(subscription->get_id());
00675 ACE_ERROR((LM_ERROR,
00676 ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Publication::reevaluate_defunct_associations: ")
00677 ACE_TEXT("publication %C failed to reassociate subscription %C at %x.\n"),
00678 std::string(pub_converter).c_str(),
00679 std::string(sub_converter).c_str(),
00680 subscription));
00681 }
00682 }
00683 }
00684
00685 void DCPS_IR_Publication::reevaluate_existing_associations()
00686 {
00687 DCPS_IR_Subscription* sub = 0;
00688 DCPS_IR_Subscription_Set::ITERATOR iter = associations_.begin();
00689 DCPS_IR_Subscription_Set::ITERATOR end = associations_.end();
00690
00691 while (iter != end) {
00692 sub = *iter;
00693 ++iter;
00694
00695 this->reevaluate_association(sub);
00696 }
00697 }
00698
00699 bool
00700 DCPS_IR_Publication::reevaluate_association(DCPS_IR_Subscription* subscription)
00701 {
00702 int status = this->associations_.find(subscription);
00703
00704 if (status == 0) {
00705
00706
00707
00708 if (!OpenDDS::DCPS::compatibleQOS(this->get_incompatibleQosStatus(),
00709 subscription->get_incompatibleQosStatus(),
00710 this->get_transportLocatorSeq(),
00711 subscription->get_transportLocatorSeq(),
00712 this->get_datawriter_qos(),
00713 subscription->get_datareader_qos(),
00714 this->get_publisher_qos(),
00715 subscription->get_subscriber_qos())) {
00716 bool sendNotify = true;
00717 bool notify_lost = true;
00718
00719 this->remove_associated_subscription(subscription, sendNotify, notify_lost, true);
00720 }
00721
00722 } else {
00723 DCPS_IR_Topic_Description* description = this->topic_->get_topic_description();
00724 return description->try_associate(this, subscription);
00725 }
00726
00727 return false;
00728 }
00729
00730 void
00731 DCPS_IR_Publication::update_expr_params(OpenDDS::DCPS::RepoId readerId,
00732 const DDS::StringSeq& params)
00733 {
00734 try {
00735 writer_->update_subscription_params(readerId, params);
00736 } catch (const CORBA::SystemException& ex) {
00737 if (OpenDDS::DCPS::DCPS_debug_level) {
00738 ex._tao_print_exception("(%P|%t) ERROR: Exception caught in "
00739 "DCPS_IR_Publication::update_expr_params:");
00740 }
00741 participant_->mark_dead();
00742 }
00743 }
00744
00745 std::string
00746 DCPS_IR_Publication::dump_to_string(const std::string& prefix, int depth) const
00747 {
00748 std::string str;
00749 #if !defined (OPENDDS_INFOREPO_REDUCED_FOOTPRINT)
00750 OpenDDS::DCPS::RepoIdConverter local_converter(id_);
00751
00752 for (int i=0; i < depth; i++)
00753 str += prefix;
00754 std::string indent = str + prefix;
00755 str += "DCPS_IR_Publication[";
00756 str += std::string(local_converter);
00757 str += "]";
00758 if (isBIT_)
00759 str += " (BIT)";
00760 str += "\n";
00761
00762 str += indent + "Associations [ ";
00763 for (DCPS_IR_Subscription_Set::const_iterator assoc = associations_.begin();
00764 assoc != associations_.end();
00765 assoc++)
00766 {
00767 OpenDDS::DCPS::RepoIdConverter assoc_converter((*assoc)->get_id());
00768 str += std::string(assoc_converter);
00769 str += " ";
00770 }
00771 str += "]\n";
00772
00773 str += indent + "Defunct Associations [ ";
00774 for (DCPS_IR_Subscription_Set::const_iterator def = defunct_.begin();
00775 def != defunct_.end();
00776 def++)
00777 {
00778 OpenDDS::DCPS::RepoIdConverter def_converter((*def)->get_id());
00779 str += std::string(def_converter);
00780 str += " ";
00781 }
00782 str += "]\n";
00783 #endif // !defined (OPENDDS_INFOREPO_REDUCED_FOOTPRINT)
00784 return str;
00785 }
00786
00787
00788 #if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
00789
00790 template class ACE_Node<DCPS_IR_Subscription*>;
00791 template class ACE_Unbounded_Set<DCPS_IR_Subscription*>;
00792 template class ACE_Unbounded_Set_Iterator<DCPS_IR_Subscription*>;
00793
00794 #elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
00795
00796 #pragma instantiate ACE_Node<DCPS_IR_Subscription*>
00797 #pragma instantiate ACE_Unbounded_Set<DCPS_IR_Subscription*>
00798 #pragma instantiate ACE_Unbounded_Set_Iterator<DCPS_IR_Subscription*>
00799
00800 #endif