00001
00002
00003
00004
00005
00006
00007
00008 #include "DcpsInfo_pch.h"
00009 #include "DCPS_IR_Publication.h"
00010
00011 #include "DCPS_IR_Participant.h"
00012 #include "DCPS_IR_Topic.h"
00013 #include "DCPS_IR_Subscription.h"
00014 #include "DCPS_IR_Domain.h"
00015 #include "DCPS_IR_Topic_Description.h"
00016 #include "dds/DCPS/DCPS_Utils.h"
00017 #include "dds/DdsDcpsInfoUtilsC.h"
00018 #include "dds/DCPS/RepoIdConverter.h"
00019 #include "dds/DCPS/Qos_Helper.h"
00020 #include "tao/debug.h"
00021
00022 #include "ace/OS_NS_unistd.h"
00023
00024 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00025
00026 DCPS_IR_Publication::DCPS_IR_Publication(const OpenDDS::DCPS::RepoId& id,
00027 DCPS_IR_Participant* participant,
00028 DCPS_IR_Topic* topic,
00029 OpenDDS::DCPS::DataWriterRemote_ptr writer,
00030 const DDS::DataWriterQos& qos,
00031 const OpenDDS::DCPS::TransportLocatorSeq& info,
00032 const DDS::PublisherQos& publisherQos)
00033 : id_(id),
00034 participant_(participant),
00035 topic_(topic),
00036 handle_(0),
00037 isBIT_(0),
00038 qos_(qos),
00039 info_(info),
00040 publisherQos_(publisherQos)
00041 {
00042 writer_ = OpenDDS::DCPS::DataWriterRemote::_duplicate(writer);
00043
00044 incompatibleQosStatus_.total_count = 0;
00045 incompatibleQosStatus_.count_since_last_send = 0;
00046 }
00047
00048 DCPS_IR_Publication::~DCPS_IR_Publication()
00049 {
00050 }
00051
00052 int DCPS_IR_Publication::add_associated_subscription(DCPS_IR_Subscription* sub,
00053 bool active)
00054 {
00055
00056 int status = associations_.insert(sub);
00057
00058 switch (status) {
00059 case 0: {
00060
00061 OpenDDS::DCPS::ReaderAssociation association;
00062 association.readerTransInfo = sub->get_transportLocatorSeq();
00063 association.readerId = sub->get_id();
00064 association.subQos = *(sub->get_subscriber_qos());
00065 association.readerQos = *(sub->get_datareader_qos());
00066 association.filterClassName = sub->get_filter_class_name().c_str();
00067 association.filterExpression = sub->get_filter_expression().c_str();
00068 association.exprParams = sub->get_expr_params();
00069
00070 if (participant_->is_alive() && this->participant_->isOwner()) {
00071 try {
00072 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00073 OpenDDS::DCPS::RepoIdConverter pub_converter(id_);
00074 OpenDDS::DCPS::RepoIdConverter sub_converter(sub->get_id());
00075 ACE_DEBUG((LM_DEBUG,
00076 ACE_TEXT("(%P|%t) DCPS_IR_Publication::add_associated_subscription:")
00077 ACE_TEXT(" publication %C adding subscription %C.\n"),
00078 std::string(pub_converter).c_str(),
00079 std::string(sub_converter).c_str()));
00080 }
00081
00082 writer_->add_association(id_, association, active);
00083
00084 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00085 ACE_DEBUG((LM_DEBUG,
00086 ACE_TEXT("(%P|%t) DCPS_IR_Publication::add_associated_subscription: ")
00087 ACE_TEXT("successfully added subscription %x.\n"),
00088 sub));
00089 }
00090 } catch (const CORBA::Exception& ex) {
00091 ex._tao_print_exception(
00092 "(%P|%t) ERROR: Exception caught in DCPS_IR_Publication::add_associated_subscription:");
00093 participant_->mark_dead();
00094 status = -1;
00095 }
00096 }
00097 }
00098 break;
00099
00100 case 1: {
00101 OpenDDS::DCPS::RepoIdConverter pub_converter(id_);
00102 OpenDDS::DCPS::RepoIdConverter sub_converter(sub->get_id());
00103 ACE_ERROR((LM_ERROR,
00104 ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Publication::add_associated_subscription: ")
00105 ACE_TEXT("publication %C attempted to re-add subscription %C.\n"),
00106 std::string(pub_converter).c_str(),
00107 std::string(sub_converter).c_str()));
00108 }
00109 break;
00110
00111 case -1: {
00112 OpenDDS::DCPS::RepoIdConverter pub_converter(id_);
00113 OpenDDS::DCPS::RepoIdConverter sub_converter(sub->get_id());
00114 ACE_ERROR((LM_ERROR,
00115 ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Publication::add_associated_subscription: ")
00116 ACE_TEXT("publication %C failed to add subscription %C.\n"),
00117 std::string(pub_converter).c_str(),
00118 std::string(sub_converter).c_str()));
00119 }
00120 };
00121
00122 return status;
00123 }
00124
00125 void
00126 DCPS_IR_Publication::association_complete(const OpenDDS::DCPS::RepoId& remote)
00127 {
00128 typedef DCPS_IR_Subscription_Set::ITERATOR iter_t;
00129 for (iter_t iter = associations_.begin(); iter != associations_.end(); ++iter) {
00130 if ((*iter)->get_id() == remote) {
00131 (*iter)->call_association_complete(get_id());
00132 }
00133 }
00134 }
00135
00136 void
00137 DCPS_IR_Publication::call_association_complete(const OpenDDS::DCPS::RepoId& remote)
00138 {
00139 try {
00140 writer_->association_complete(remote);
00141 } catch (const CORBA::Exception& ex) {
00142 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00143 ex._tao_print_exception(
00144 "(%P|%t) ERROR: Exception caught in DCPS_IR_Publication::call_association_complete:");
00145 }
00146 participant_->mark_dead();
00147 }
00148 }
00149
00150 int DCPS_IR_Publication::remove_associated_subscription(DCPS_IR_Subscription* sub,
00151 CORBA::Boolean sendNotify,
00152 CORBA::Boolean notify_lost,
00153 bool notify_both_side)
00154 {
00155 bool marked_dead = false;
00156
00157 if (sendNotify) {
00158 if (participant_->is_alive() && this->participant_->isOwner()) {
00159 try {
00160 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00161 OpenDDS::DCPS::RepoIdConverter pub_converter(id_);
00162 OpenDDS::DCPS::RepoIdConverter sub_converter(sub->get_id());
00163 ACE_DEBUG((LM_DEBUG,
00164 ACE_TEXT("(%P|%t) DCPS_IR_Publication::remove_associated_subscription:")
00165 ACE_TEXT(" calling pub %C with sub %C\n"),
00166 OPENDDS_STRING(pub_converter).c_str(),
00167 OPENDDS_STRING(sub_converter).c_str()));
00168 }
00169
00170 OpenDDS::DCPS::ReaderIdSeq idSeq(1);
00171 idSeq.length(1);
00172 idSeq[0] = sub->get_id();
00173
00174 writer_->remove_associations(idSeq, notify_lost);
00175
00176 if (notify_both_side) {
00177 sub->remove_associated_publication(this, sendNotify, notify_lost);
00178 }
00179
00180 } catch (const CORBA::Exception& ex) {
00181 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00182 ex._tao_print_exception(
00183 "(%P|%t) ERROR: Exception caught in DCPS_IR_Publication::remove_associated_subscription:");
00184 }
00185
00186 participant_->mark_dead();
00187 marked_dead = true;
00188 }
00189 }
00190 }
00191
00192 int status = associations_.remove(sub);
00193
00194 if (0 == status) {
00195 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00196 OpenDDS::DCPS::RepoIdConverter pub_converter(id_);
00197 OpenDDS::DCPS::RepoIdConverter sub_converter(sub->get_id());
00198 ACE_DEBUG((LM_DEBUG,
00199 ACE_TEXT("(%P|%t) DCPS_IR_Publication::remove_associated_subscription: ")
00200 ACE_TEXT("publication %C removed subscription %C at %x.\n"),
00201 std::string(pub_converter).c_str(),
00202 std::string(sub_converter).c_str(),
00203 sub));
00204 }
00205
00206 } else {
00207 OpenDDS::DCPS::RepoIdConverter pub_converter(id_);
00208 OpenDDS::DCPS::RepoIdConverter sub_converter(sub->get_id());
00209 ACE_ERROR((LM_ERROR,
00210 ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Publication::remove_associated_subscription: ")
00211 ACE_TEXT("publication %C failed to remove subscription %C at %x.\n"),
00212 std::string(pub_converter).c_str(),
00213 std::string(sub_converter).c_str(),
00214 sub));
00215 }
00216
00217 if (marked_dead) {
00218 return -1;
00219
00220 } else {
00221 return status;
00222 }
00223 }
00224
00225 int DCPS_IR_Publication::remove_associations(CORBA::Boolean notify_lost)
00226 {
00227 int status = 0;
00228 DCPS_IR_Subscription* sub = 0;
00229 size_t numAssociations = associations_.size();
00230 CORBA::Boolean dontSend = 0;
00231 CORBA::Boolean send = 1;
00232
00233 if (0 < numAssociations) {
00234 DCPS_IR_Subscription_Set::ITERATOR iter = associations_.begin();
00235 DCPS_IR_Subscription_Set::ITERATOR end = associations_.end();
00236
00237 while (iter != end) {
00238 sub = *iter;
00239 ++iter;
00240 sub->remove_associated_publication(this, send, notify_lost);
00241 remove_associated_subscription(sub, dontSend, notify_lost);
00242 }
00243 }
00244 this->defunct_.reset();
00245
00246 return status;
00247 }
00248
00249 void DCPS_IR_Publication::disassociate_participant(OpenDDS::DCPS::RepoId id,
00250 bool reassociate)
00251 {
00252 DCPS_IR_Subscription* sub = 0;
00253 size_t numAssociations = associations_.size();
00254 CORBA::Boolean send = 1;
00255 CORBA::Boolean dontSend = 0;
00256 long count = 0;
00257
00258 if (0 < numAssociations) {
00259 OpenDDS::DCPS::ReaderIdSeq idSeq(static_cast<CORBA::ULong>(numAssociations));
00260 idSeq.length(static_cast<CORBA::ULong>(numAssociations));
00261
00262 DCPS_IR_Subscription_Set::ITERATOR iter = associations_.begin();
00263 DCPS_IR_Subscription_Set::ITERATOR end = associations_.end();
00264
00265 while (iter != end) {
00266 sub = *iter;
00267 ++iter;
00268
00269 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00270 OpenDDS::DCPS::RepoIdConverter pub_converter(id_);
00271 OpenDDS::DCPS::RepoIdConverter sub_converter(sub->get_id());
00272 OpenDDS::DCPS::RepoIdConverter pub_part_converter(id);
00273 OpenDDS::DCPS::RepoIdConverter sub_part_converter(sub->get_participant_id());
00274 ACE_DEBUG((LM_DEBUG,
00275 ACE_TEXT("(%P|%t) DCPS_IR_Publication::disassociate_participant: ")
00276 ACE_TEXT("publication %C testing if subscription %C particpant %C == %C.\n"),
00277 std::string(pub_converter).c_str(),
00278 std::string(sub_converter).c_str(),
00279 std::string(sub_part_converter).c_str(),
00280 std::string(pub_part_converter).c_str()));
00281 }
00282
00283 if (id == sub->get_participant_id()) {
00284 CORBA::Boolean dont_notify_lost = 0;
00285 sub->remove_associated_publication(this, send, dont_notify_lost);
00286 remove_associated_subscription(sub, dontSend, dont_notify_lost);
00287
00288 idSeq[count] = sub->get_id();
00289 ++count;
00290
00291 if (reassociate && this->defunct_.insert(sub) != 0) {
00292 OpenDDS::DCPS::RepoIdConverter pub_converter(id_);
00293 OpenDDS::DCPS::RepoIdConverter sub_converter(sub->get_id());
00294 ACE_ERROR((LM_ERROR,
00295 ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Publication::disassociate_participant: ")
00296 ACE_TEXT("publication %C failed to reassociate subscription %C at %x.\n"),
00297 std::string(pub_converter).c_str(),
00298 std::string(sub_converter).c_str(),
00299 sub));
00300 }
00301 }
00302 }
00303
00304 if (0 < count) {
00305 idSeq.length(count);
00306
00307 if (participant_->is_alive() && this->participant_->isOwner()) {
00308 try {
00309 CORBA::Boolean dont_notify_lost = 0;
00310 writer_->remove_associations(idSeq, dont_notify_lost);
00311
00312 } catch (const CORBA::Exception& ex) {
00313 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00314 ex._tao_print_exception(
00315 "(%P|%t) ERROR: Exception caught in DCPS_IR_Publication::disassociate_participant:");
00316 }
00317
00318 participant_->mark_dead();
00319 }
00320 }
00321 }
00322 }
00323 }
00324
00325 void DCPS_IR_Publication::disassociate_topic(OpenDDS::DCPS::RepoId id)
00326 {
00327 DCPS_IR_Subscription* sub = 0;
00328 size_t numAssociations = associations_.size();
00329 CORBA::Boolean send = 1;
00330 CORBA::Boolean dontSend = 0;
00331 long count = 0;
00332
00333 if (0 < numAssociations) {
00334 OpenDDS::DCPS::ReaderIdSeq idSeq(static_cast<CORBA::ULong>(numAssociations));
00335 idSeq.length(static_cast<CORBA::ULong>(numAssociations));
00336
00337 DCPS_IR_Subscription_Set::ITERATOR iter = associations_.begin();
00338 DCPS_IR_Subscription_Set::ITERATOR end = associations_.end();
00339
00340 while (iter != end) {
00341 sub = *iter;
00342 ++iter;
00343
00344 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00345 OpenDDS::DCPS::RepoIdConverter pub_converter(id_);
00346 OpenDDS::DCPS::RepoIdConverter sub_converter(sub->get_id());
00347 OpenDDS::DCPS::RepoIdConverter pub_topic_converter(id);
00348 OpenDDS::DCPS::RepoIdConverter sub_topic_converter(sub->get_topic_id());
00349 ACE_DEBUG((LM_DEBUG,
00350 ACE_TEXT("(%P|%t) DCPS_IR_Publication::disassociate_topic: ")
00351 ACE_TEXT("publication %C testing if subscription %C topic %C == %C.\n"),
00352 std::string(pub_converter).c_str(),
00353 std::string(sub_converter).c_str(),
00354 std::string(sub_topic_converter).c_str(),
00355 std::string(pub_topic_converter).c_str()));
00356 }
00357
00358 if (id == sub->get_topic_id()) {
00359 CORBA::Boolean dont_notify_lost = 0;
00360 sub->remove_associated_publication(this, send, dont_notify_lost);
00361 remove_associated_subscription(sub, dontSend, dont_notify_lost);
00362
00363 idSeq[count] = sub->get_id();
00364 ++count;
00365 }
00366 }
00367
00368 if (0 < count) {
00369 idSeq.length(count);
00370
00371 if (participant_->is_alive() && this->participant_->isOwner()) {
00372 try {
00373 CORBA::Boolean dont_notify_lost = 0;
00374 writer_->remove_associations(idSeq, dont_notify_lost);
00375
00376 } catch (const CORBA::Exception& ex) {
00377 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00378 ex._tao_print_exception(
00379 "(%P|%t) ERROR: Exception caught in DCPS_IR_Publication::remove_associations:");
00380 }
00381
00382 participant_->mark_dead();
00383 }
00384 }
00385 }
00386 }
00387 }
00388
00389 void DCPS_IR_Publication::disassociate_subscription(OpenDDS::DCPS::RepoId id,
00390 bool reassociate)
00391 {
00392 DCPS_IR_Subscription* sub = 0;
00393 size_t numAssociations = associations_.size();
00394 CORBA::Boolean send = 1;
00395 CORBA::Boolean dontSend = 0;
00396 long count = 0;
00397
00398 if (0 < numAssociations) {
00399 OpenDDS::DCPS::ReaderIdSeq idSeq(static_cast<CORBA::ULong>(numAssociations));
00400 idSeq.length(static_cast<CORBA::ULong>(numAssociations));
00401
00402 DCPS_IR_Subscription_Set::ITERATOR iter = associations_.begin();
00403 DCPS_IR_Subscription_Set::ITERATOR end = associations_.end();
00404
00405 while (iter != end) {
00406 sub = *iter;
00407 ++iter;
00408
00409 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00410 OpenDDS::DCPS::RepoIdConverter pub_converter(id_);
00411 OpenDDS::DCPS::RepoIdConverter sub_converter(sub->get_id());
00412 OpenDDS::DCPS::RepoIdConverter pub_sub_converter(id);
00413 ACE_DEBUG((LM_DEBUG,
00414 ACE_TEXT("(%P|%t) DCPS_IR_Publication::disassociate_subscription: ")
00415 ACE_TEXT("publication %C testing if subscription %C == %C.\n"),
00416 std::string(pub_converter).c_str(),
00417 std::string(sub_converter).c_str(),
00418 std::string(pub_sub_converter).c_str()));
00419 }
00420
00421 if (id == sub->get_id()) {
00422 CORBA::Boolean dont_notify_lost = 0;
00423 sub->remove_associated_publication(this, send, dont_notify_lost);
00424 remove_associated_subscription(sub, dontSend, dont_notify_lost);
00425
00426 idSeq[count] = sub->get_id();
00427 ++count;
00428
00429 if (reassociate && this->defunct_.insert(sub) != 0) {
00430 OpenDDS::DCPS::RepoIdConverter pub_converter(id_);
00431 OpenDDS::DCPS::RepoIdConverter sub_converter(sub->get_id());
00432 ACE_ERROR((LM_ERROR,
00433 ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Publication::disassociate_subscription: ")
00434 ACE_TEXT("publication %C failed to reassociate subscription %C at %x.\n"),
00435 std::string(pub_converter).c_str(),
00436 std::string(sub_converter).c_str(),
00437 sub));
00438 }
00439 }
00440 }
00441
00442 if (0 < count) {
00443 idSeq.length(count);
00444
00445 if (participant_->is_alive() && this->participant_->isOwner()) {
00446 try {
00447 CORBA::Boolean dont_notify_lost = 0;
00448 writer_->remove_associations(idSeq, dont_notify_lost);
00449
00450 } catch (const CORBA::Exception& ex) {
00451 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00452 ex._tao_print_exception(
00453 "(%P|%t) ERROR: Exception caught in DCPS_IR_Publication::remove_associations:");
00454 }
00455
00456 participant_->mark_dead();
00457 }
00458 }
00459 }
00460 }
00461 }
00462
00463 void DCPS_IR_Publication::update_incompatible_qos()
00464 {
00465 if (participant_->is_alive() && this->participant_->isOwner()) {
00466 try {
00467 writer_->update_incompatible_qos(incompatibleQosStatus_);
00468 incompatibleQosStatus_.count_since_last_send = 0;
00469 } catch (const CORBA::Exception& ex) {
00470 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00471 ex._tao_print_exception(
00472 "(%P|%t) ERROR: Exception caught in DCPS_IR_Publication::update_incompatible_qos:");
00473 }
00474
00475 participant_->mark_dead();
00476 }
00477 }
00478 }
00479
00480 CORBA::Boolean DCPS_IR_Publication::is_subscription_ignored(OpenDDS::DCPS::RepoId partId,
00481 OpenDDS::DCPS::RepoId topicId,
00482 OpenDDS::DCPS::RepoId subId)
00483 {
00484 CORBA::Boolean ignored;
00485 ignored = (participant_->is_participant_ignored(partId) ||
00486 participant_->is_topic_ignored(topicId) ||
00487 participant_->is_subscription_ignored(subId));
00488
00489 return ignored;
00490 }
00491
00492 DDS::DataWriterQos* DCPS_IR_Publication::get_datawriter_qos()
00493 {
00494 return &qos_;
00495 }
00496
00497 using OpenDDS::DCPS::operator==;
00498
00499 void
00500 DCPS_IR_Publication::set_qos(const DDS::DataWriterQos& qos)
00501 {
00502 if (false == (qos == this->qos_)) {
00503
00504 bool check = OpenDDS::DCPS::should_check_association_upon_change(qos, this->qos_);
00505
00506
00507 this->qos_ = qos;
00508
00509 if (check) {
00510
00511 this->reevaluate_existing_associations();
00512
00513
00514
00515
00516 ACE_OS::sleep(ACE_Time_Value(0, 250000));
00517
00518
00519 DCPS_IR_Topic_Description* description
00520 = this->topic_->get_topic_description();
00521 description->reevaluate_associations(this);
00522 }
00523
00524 this->participant_->get_domain_reference()->publish_publication_bit(this);
00525 }
00526 }
00527
00528 void
00529 DCPS_IR_Publication::set_qos(const DDS::PublisherQos& qos)
00530 {
00531 if (false == (qos == this->publisherQos_)) {
00532
00533 bool check = OpenDDS::DCPS::should_check_association_upon_change(qos, this->publisherQos_);
00534
00535
00536 this->publisherQos_ = qos;
00537
00538 if (check) {
00539
00540 this->reevaluate_existing_associations();
00541
00542
00543
00544
00545 ACE_OS::sleep(ACE_Time_Value(0, 250000));
00546
00547
00548 DCPS_IR_Topic_Description* description
00549 = this->topic_->get_topic_description();
00550 description->reevaluate_associations(this);
00551 }
00552
00553 this->participant_->get_domain_reference()->publish_publication_bit(this);
00554 }
00555 }
00556
00557 bool DCPS_IR_Publication::set_qos(const DDS::DataWriterQos & qos,
00558 const DDS::PublisherQos & publisherQos,
00559 Update::SpecificQos& specificQos)
00560 {
00561 bool need_evaluate = false;
00562 bool u_dw_qos = !(qos_ == qos);
00563
00564 if (u_dw_qos) {
00565 if (OpenDDS::DCPS::should_check_association_upon_change(qos_, qos)) {
00566 need_evaluate = true;
00567 }
00568
00569 qos_ = qos;
00570 }
00571
00572 bool u_pub_qos = !(publisherQos_ == publisherQos);
00573
00574 if (u_pub_qos) {
00575 if (OpenDDS::DCPS::should_check_association_upon_change(publisherQos_, publisherQos)) {
00576 need_evaluate = true;
00577 }
00578
00579 publisherQos_ = publisherQos;
00580 }
00581
00582 if (need_evaluate) {
00583
00584 this->reevaluate_existing_associations();
00585
00586 DCPS_IR_Topic_Description* description = this->topic_->get_topic_description();
00587 description->reevaluate_associations(this);
00588 }
00589
00590 participant_->get_domain_reference()->publish_publication_bit(this);
00591 specificQos = u_dw_qos? Update::DataWriterQos:
00592 u_pub_qos? Update::PublisherQos:
00593 Update::NoQos;
00594
00595 return true;
00596 }
00597
00598 DDS::PublisherQos* DCPS_IR_Publication::get_publisher_qos()
00599 {
00600 return &publisherQos_;
00601 }
00602
00603 OpenDDS::DCPS::TransportLocatorSeq DCPS_IR_Publication::get_transportLocatorSeq() const
00604 {
00605 return info_;
00606 }
00607
00608 OpenDDS::DCPS::IncompatibleQosStatus* DCPS_IR_Publication::get_incompatibleQosStatus()
00609 {
00610 return &incompatibleQosStatus_;
00611 }
00612
00613 OpenDDS::DCPS::RepoId DCPS_IR_Publication::get_id()
00614 {
00615 return id_;
00616 }
00617
00618 OpenDDS::DCPS::RepoId DCPS_IR_Publication::get_topic_id()
00619 {
00620 return topic_->get_id();
00621 }
00622
00623 OpenDDS::DCPS::RepoId DCPS_IR_Publication::get_participant_id()
00624 {
00625 return participant_->get_id();
00626 }
00627
00628 DCPS_IR_Topic* DCPS_IR_Publication::get_topic()
00629 {
00630 return topic_;
00631 }
00632
00633 DCPS_IR_Topic_Description* DCPS_IR_Publication::get_topic_description()
00634 {
00635 return topic_->get_topic_description();
00636 }
00637
00638 DDS::InstanceHandle_t DCPS_IR_Publication::get_handle()
00639 {
00640 return handle_;
00641 }
00642
00643 void DCPS_IR_Publication::set_handle(DDS::InstanceHandle_t handle)
00644 {
00645 handle_ = handle;
00646 }
00647
00648 CORBA::Boolean DCPS_IR_Publication::is_bit()
00649 {
00650 return isBIT_;
00651 }
00652
00653 void DCPS_IR_Publication::set_bit_status(CORBA::Boolean isBIT)
00654 {
00655 isBIT_ = isBIT;
00656 }
00657
00658 OpenDDS::DCPS::DataWriterRemote_ptr
00659 DCPS_IR_Publication::writer()
00660 {
00661 return OpenDDS::DCPS::DataWriterRemote::_duplicate(this->writer_.in());
00662 }
00663
00664 void
00665 DCPS_IR_Publication::reevaluate_defunct_associations()
00666 {
00667 DCPS_IR_Subscription_Set::iterator it(this->defunct_.begin());
00668 while (it != this->defunct_.end()) {
00669 DCPS_IR_Subscription* subscription = *it;
00670 ++it;
00671
00672 if (reevaluate_association(subscription)) {
00673 this->defunct_.remove(subscription);
00674
00675 } else {
00676 OpenDDS::DCPS::RepoIdConverter pub_converter(id_);
00677 OpenDDS::DCPS::RepoIdConverter sub_converter(subscription->get_id());
00678 ACE_ERROR((LM_ERROR,
00679 ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Publication::reevaluate_defunct_associations: ")
00680 ACE_TEXT("publication %C failed to reassociate subscription %C at %x.\n"),
00681 std::string(pub_converter).c_str(),
00682 std::string(sub_converter).c_str(),
00683 subscription));
00684 }
00685 }
00686 }
00687
00688 void DCPS_IR_Publication::reevaluate_existing_associations()
00689 {
00690 DCPS_IR_Subscription* sub = 0;
00691 DCPS_IR_Subscription_Set::ITERATOR iter = associations_.begin();
00692 DCPS_IR_Subscription_Set::ITERATOR end = associations_.end();
00693
00694 while (iter != end) {
00695 sub = *iter;
00696 ++iter;
00697
00698 this->reevaluate_association(sub);
00699 }
00700 }
00701
00702 bool
00703 DCPS_IR_Publication::reevaluate_association(DCPS_IR_Subscription* subscription)
00704 {
00705 int status = this->associations_.find(subscription);
00706
00707 if (status == 0) {
00708
00709
00710
00711 if (!OpenDDS::DCPS::compatibleQOS(this->get_incompatibleQosStatus(),
00712 subscription->get_incompatibleQosStatus(),
00713 this->get_transportLocatorSeq(),
00714 subscription->get_transportLocatorSeq(),
00715 this->get_datawriter_qos(),
00716 subscription->get_datareader_qos(),
00717 this->get_publisher_qos(),
00718 subscription->get_subscriber_qos())) {
00719 bool sendNotify = true;
00720 bool notify_lost = true;
00721
00722 this->remove_associated_subscription(subscription, sendNotify, notify_lost, true);
00723 }
00724
00725 } else {
00726 DCPS_IR_Topic_Description* description = this->topic_->get_topic_description();
00727 return description->try_associate(this, subscription);
00728 }
00729
00730 return false;
00731 }
00732
00733 void
00734 DCPS_IR_Publication::update_expr_params(OpenDDS::DCPS::RepoId readerId,
00735 const DDS::StringSeq& params)
00736 {
00737 try {
00738 writer_->update_subscription_params(readerId, params);
00739 } catch (const CORBA::SystemException& ex) {
00740 if (OpenDDS::DCPS::DCPS_debug_level) {
00741 ex._tao_print_exception("(%P|%t) ERROR: Exception caught in "
00742 "DCPS_IR_Publication::update_expr_params:");
00743 }
00744 participant_->mark_dead();
00745 }
00746 }
00747
00748 std::string
00749 DCPS_IR_Publication::dump_to_string(const std::string& prefix, int depth) const
00750 {
00751 std::string str;
00752 #if !defined (OPENDDS_INFOREPO_REDUCED_FOOTPRINT)
00753 OpenDDS::DCPS::RepoIdConverter local_converter(id_);
00754
00755 for (int i=0; i < depth; i++)
00756 str += prefix;
00757 std::string indent = str + prefix;
00758 str += "DCPS_IR_Publication[";
00759 str += std::string(local_converter);
00760 str += "]";
00761 if (isBIT_)
00762 str += " (BIT)";
00763 str += "\n";
00764
00765 str += indent + "Associations [ ";
00766 for (DCPS_IR_Subscription_Set::const_iterator assoc = associations_.begin();
00767 assoc != associations_.end();
00768 assoc++)
00769 {
00770 OpenDDS::DCPS::RepoIdConverter assoc_converter((*assoc)->get_id());
00771 str += std::string(assoc_converter);
00772 str += " ";
00773 }
00774 str += "]\n";
00775
00776 str += indent + "Defunct Associations [ ";
00777 for (DCPS_IR_Subscription_Set::const_iterator def = defunct_.begin();
00778 def != defunct_.end();
00779 def++)
00780 {
00781 OpenDDS::DCPS::RepoIdConverter def_converter((*def)->get_id());
00782 str += std::string(def_converter);
00783 str += " ";
00784 }
00785 str += "]\n";
00786 #endif // !defined (OPENDDS_INFOREPO_REDUCED_FOOTPRINT)
00787 return str;
00788 }
00789
00790 OPENDDS_END_VERSIONED_NAMESPACE_DECL