#include <DCPS_IR_Publication.h>
Collaboration diagram for DCPS_IR_Publication:
Definition at line 38 of file DCPS_IR_Publication.h.
DCPS_IR_Publication::DCPS_IR_Publication | ( | const OpenDDS::DCPS::RepoId & | id, | |
DCPS_IR_Participant * | participant, | |||
DCPS_IR_Topic * | topic, | |||
OpenDDS::DCPS::DataWriterRemote_ptr | writer, | |||
const DDS::DataWriterQos & | qos, | |||
const OpenDDS::DCPS::TransportLocatorSeq & | info, | |||
const DDS::PublisherQos & | publisherQos | |||
) |
Definition at line 22 of file DCPS_IR_Publication.cpp.
References OpenDDS::DCPS::IncompatibleQosStatus::count_since_last_send, incompatibleQosStatus_, OpenDDS::DCPS::IncompatibleQosStatus::total_count, and writer_.
00029 : id_(id), 00030 participant_(participant), 00031 topic_(topic), 00032 handle_(0), 00033 isBIT_(0), 00034 qos_(qos), 00035 info_(info), 00036 publisherQos_(publisherQos) 00037 { 00038 writer_ = OpenDDS::DCPS::DataWriterRemote::_duplicate(writer); 00039 00040 incompatibleQosStatus_.total_count = 0; 00041 incompatibleQosStatus_.count_since_last_send = 0; 00042 }
DCPS_IR_Publication::~DCPS_IR_Publication | ( | ) |
Definition at line 44 of file DCPS_IR_Publication.cpp.
References associations_, and remove_associations().
00045 { 00046 if (0 != associations_.size()) { 00047 CORBA::Boolean dont_notify_lost = 0; 00048 remove_associations(dont_notify_lost); 00049 } 00050 }
int DCPS_IR_Publication::add_associated_subscription | ( | DCPS_IR_Subscription * | sub, | |
bool | active | |||
) |
Associate with the subscription Adds the subscription to the list of associated subscriptions and notifies datawriter if successfully added This method can mark the participant dead Returns 0 if added, 1 if already exists, -1 other failure
Definition at line 52 of file DCPS_IR_Publication.cpp.
References associations_, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::ReaderAssociation::exprParams, OpenDDS::DCPS::ReaderAssociation::filterClassName, OpenDDS::DCPS::ReaderAssociation::filterExpression, DCPS_IR_Subscription::get_datareader_qos(), DCPS_IR_Subscription::get_expr_params(), DCPS_IR_Subscription::get_filter_class_name(), DCPS_IR_Subscription::get_filter_expression(), DCPS_IR_Subscription::get_id(), DCPS_IR_Subscription::get_subscriber_qos(), DCPS_IR_Subscription::get_transportLocatorSeq(), id_, DCPS_IR_Participant::is_alive(), DCPS_IR_Participant::isOwner(), DCPS_IR_Participant::mark_dead(), participant_, OpenDDS::DCPS::ReaderAssociation::readerId, OpenDDS::DCPS::ReaderAssociation::readerQos, OpenDDS::DCPS::ReaderAssociation::readerTransInfo, OpenDDS::DCPS::ReaderAssociation::subQos, and writer_.
Referenced by DCPS_IR_Topic_Description::associate().
00054 { 00055 // keep track of the association locally 00056 int status = associations_.insert(sub); 00057 00058 switch (status) { 00059 case 0: { 00060 // inform the datawriter about the association 00061 OpenDDS::DCPS::ReaderAssociation association; 00062 association.readerTransInfo = sub->get_transportLocatorSeq(); 00063 association.readerId = sub->get_id(); 00064 association.subQos = *(sub->get_subscriber_qos()); 00065 association.readerQos = *(sub->get_datareader_qos()); 00066 association.filterClassName = sub->get_filter_class_name().c_str(); 00067 association.filterExpression = sub->get_filter_expression().c_str(); 00068 association.exprParams = sub->get_expr_params(); 00069 00070 if (participant_->is_alive() && this->participant_->isOwner()) { 00071 try { 00072 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 00073 OpenDDS::DCPS::RepoIdConverter pub_converter(id_); 00074 OpenDDS::DCPS::RepoIdConverter sub_converter(sub->get_id()); 00075 ACE_DEBUG((LM_DEBUG, 00076 ACE_TEXT("(%P|%t) DCPS_IR_Publication::add_associated_subscription:") 00077 ACE_TEXT(" publication %C adding subscription %C.\n"), 00078 std::string(pub_converter).c_str(), 00079 std::string(sub_converter).c_str())); 00080 } 00081 00082 writer_->add_association(id_, association, active); 00083 00084 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 00085 ACE_DEBUG((LM_DEBUG, 00086 ACE_TEXT("(%P|%t) DCPS_IR_Publication::add_associated_subscription: ") 00087 ACE_TEXT("successfully added subscription %x.\n"), 00088 sub)); 00089 } 00090 } catch (const CORBA::Exception& ex) { 00091 ex._tao_print_exception( 00092 "(%P|%t) ERROR: Exception caught in DCPS_IR_Publication::add_associated_subscription:"); 00093 participant_->mark_dead(); 00094 status = -1; 00095 } 00096 } 00097 } 00098 break; 00099 00100 case 1: { 00101 OpenDDS::DCPS::RepoIdConverter pub_converter(id_); 00102 OpenDDS::DCPS::RepoIdConverter sub_converter(sub->get_id()); 00103 ACE_ERROR((LM_ERROR, 00104 ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Publication::add_associated_subscription: ") 00105 ACE_TEXT("publication %C attempted to re-add subscription %C.\n"), 00106 std::string(pub_converter).c_str(), 00107 std::string(sub_converter).c_str())); 00108 } 00109 break; 00110 00111 case -1: { 00112 OpenDDS::DCPS::RepoIdConverter pub_converter(id_); 00113 OpenDDS::DCPS::RepoIdConverter sub_converter(sub->get_id()); 00114 ACE_ERROR((LM_ERROR, 00115 ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Publication::add_associated_subscription: ") 00116 ACE_TEXT("publication %C failed to add subscription %C.\n"), 00117 std::string(pub_converter).c_str(), 00118 std::string(sub_converter).c_str())); 00119 } 00120 }; 00121 00122 return status; 00123 }
void DCPS_IR_Publication::association_complete | ( | const OpenDDS::DCPS::RepoId & | remote | ) |
The service participant that contains this Publication has indicated that the assocation to peer "remote" is complete. This method will locate the Subscription object for "remote" in order to inform it of the completed association.
Definition at line 126 of file DCPS_IR_Publication.cpp.
References associations_, and get_id().
Referenced by TAO_DDS_DCPSInfo_i::association_complete().
00127 { 00128 typedef DCPS_IR_Subscription_Set::ITERATOR iter_t; 00129 for (iter_t iter = associations_.begin(); iter != associations_.end(); ++iter) { 00130 if ((*iter)->get_id() == remote) { 00131 (*iter)->call_association_complete(get_id()); 00132 } 00133 } 00134 }
void DCPS_IR_Publication::call_association_complete | ( | const OpenDDS::DCPS::RepoId & | remote | ) |
Invoke the DataWriterRemote::association_complete() callback, passing the "remote" parameter (Subscription) to the service participant.
Definition at line 137 of file DCPS_IR_Publication.cpp.
References OpenDDS::DCPS::DCPS_debug_level, DCPS_IR_Participant::mark_dead(), participant_, and writer_.
00138 { 00139 try { 00140 writer_->association_complete(remote); 00141 } catch (const CORBA::Exception& ex) { 00142 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 00143 ex._tao_print_exception( 00144 "(%P|%t) ERROR: Exception caught in DCPS_IR_Publication::call_association_complete:"); 00145 } 00146 participant_->mark_dead(); 00147 } 00148 }
void DCPS_IR_Publication::disassociate_participant | ( | OpenDDS::DCPS::RepoId | id, | |
bool | reassociate = false | |||
) |
Remove any subscriptions whose participant has the id.
Definition at line 246 of file DCPS_IR_Publication.cpp.
References associations_, OpenDDS::DCPS::DCPS_debug_level, DCPS_IR_Subscription::get_id(), DCPS_IR_Subscription::get_participant_id(), id_, DCPS_IR_Participant::is_alive(), DCPS_IR_Participant::isOwner(), DCPS_IR_Participant::mark_dead(), participant_, DCPS_IR_Subscription::remove_associated_publication(), remove_associated_subscription(), and writer_.
00248 { 00249 DCPS_IR_Subscription* sub = 0; 00250 size_t numAssociations = associations_.size(); 00251 CORBA::Boolean send = 1; 00252 CORBA::Boolean dontSend = 0; 00253 long count = 0; 00254 00255 if (0 < numAssociations) { 00256 OpenDDS::DCPS::ReaderIdSeq idSeq(static_cast<CORBA::ULong>(numAssociations)); 00257 idSeq.length(static_cast<CORBA::ULong>(numAssociations)); 00258 00259 DCPS_IR_Subscription_Set::ITERATOR iter = associations_.begin(); 00260 DCPS_IR_Subscription_Set::ITERATOR end = associations_.end(); 00261 00262 while (iter != end) { 00263 sub = *iter; 00264 ++iter; 00265 00266 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 00267 OpenDDS::DCPS::RepoIdConverter pub_converter(id_); 00268 OpenDDS::DCPS::RepoIdConverter sub_converter(sub->get_id()); 00269 OpenDDS::DCPS::RepoIdConverter pub_part_converter(id); 00270 OpenDDS::DCPS::RepoIdConverter sub_part_converter(sub->get_participant_id()); 00271 ACE_DEBUG((LM_DEBUG, 00272 ACE_TEXT("(%P|%t) DCPS_IR_Publication::disassociate_participant: ") 00273 ACE_TEXT("publication %C testing if subscription %C particpant %C == %C.\n"), 00274 std::string(pub_converter).c_str(), 00275 std::string(sub_converter).c_str(), 00276 std::string(sub_part_converter).c_str(), 00277 std::string(pub_part_converter).c_str())); 00278 } 00279 00280 if (id == sub->get_participant_id()) { 00281 CORBA::Boolean dont_notify_lost = 0; 00282 sub->remove_associated_publication(this, send, dont_notify_lost); 00283 remove_associated_subscription(sub, dontSend, dont_notify_lost); 00284 00285 idSeq[count] = sub->get_id(); 00286 ++count; 00287 00288 if (reassociate && this->defunct_.insert(sub) != 0) { 00289 OpenDDS::DCPS::RepoIdConverter pub_converter(id_); 00290 OpenDDS::DCPS::RepoIdConverter sub_converter(sub->get_id()); 00291 ACE_ERROR((LM_ERROR, 00292 ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Publication::disassociate_participant: ") 00293 ACE_TEXT("publication %C failed to reassociate subscription %C at %x.\n"), 00294 std::string(pub_converter).c_str(), 00295 std::string(sub_converter).c_str(), 00296 sub)); 00297 } 00298 } 00299 } 00300 00301 if (0 < count) { 00302 idSeq.length(count); 00303 00304 if (participant_->is_alive() && this->participant_->isOwner()) { 00305 try { 00306 CORBA::Boolean dont_notify_lost = 0; 00307 writer_->remove_associations(idSeq, dont_notify_lost); 00308 00309 } catch (const CORBA::Exception& ex) { 00310 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 00311 ex._tao_print_exception( 00312 "(%P|%t) ERROR: Exception caught in DCPS_IR_Publication::disassociate_participant:"); 00313 } 00314 00315 participant_->mark_dead(); 00316 } 00317 } 00318 } 00319 } 00320 }
void DCPS_IR_Publication::disassociate_subscription | ( | OpenDDS::DCPS::RepoId | id, | |
bool | reassociate = false | |||
) |
Remove any subscriptions with the id.
Definition at line 386 of file DCPS_IR_Publication.cpp.
References associations_, OpenDDS::DCPS::DCPS_debug_level, DCPS_IR_Subscription::get_id(), id_, DCPS_IR_Participant::is_alive(), DCPS_IR_Participant::isOwner(), DCPS_IR_Participant::mark_dead(), participant_, DCPS_IR_Subscription::remove_associated_publication(), remove_associated_subscription(), and writer_.
Referenced by TAO_DDS_DCPSInfo_i::disassociate_publication().
00388 { 00389 DCPS_IR_Subscription* sub = 0; 00390 size_t numAssociations = associations_.size(); 00391 CORBA::Boolean send = 1; 00392 CORBA::Boolean dontSend = 0; 00393 long count = 0; 00394 00395 if (0 < numAssociations) { 00396 OpenDDS::DCPS::ReaderIdSeq idSeq(static_cast<CORBA::ULong>(numAssociations)); 00397 idSeq.length(static_cast<CORBA::ULong>(numAssociations)); 00398 00399 DCPS_IR_Subscription_Set::ITERATOR iter = associations_.begin(); 00400 DCPS_IR_Subscription_Set::ITERATOR end = associations_.end(); 00401 00402 while (iter != end) { 00403 sub = *iter; 00404 ++iter; 00405 00406 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 00407 OpenDDS::DCPS::RepoIdConverter pub_converter(id_); 00408 OpenDDS::DCPS::RepoIdConverter sub_converter(sub->get_id()); 00409 OpenDDS::DCPS::RepoIdConverter pub_sub_converter(id); 00410 ACE_DEBUG((LM_DEBUG, 00411 ACE_TEXT("(%P|%t) DCPS_IR_Publication::disassociate_subscription: ") 00412 ACE_TEXT("publication %C testing if subscription %C == %C.\n"), 00413 std::string(pub_converter).c_str(), 00414 std::string(sub_converter).c_str(), 00415 std::string(pub_sub_converter).c_str())); 00416 } 00417 00418 if (id == sub->get_id()) { 00419 CORBA::Boolean dont_notify_lost = 0; 00420 sub->remove_associated_publication(this, send, dont_notify_lost); 00421 remove_associated_subscription(sub, dontSend, dont_notify_lost); 00422 00423 idSeq[count] = sub->get_id(); 00424 ++count; 00425 00426 if (reassociate && this->defunct_.insert(sub) != 0) { 00427 OpenDDS::DCPS::RepoIdConverter pub_converter(id_); 00428 OpenDDS::DCPS::RepoIdConverter sub_converter(sub->get_id()); 00429 ACE_ERROR((LM_ERROR, 00430 ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Publication::disassociate_subscription: ") 00431 ACE_TEXT("publication %C failed to reassociate subscription %C at %x.\n"), 00432 std::string(pub_converter).c_str(), 00433 std::string(sub_converter).c_str(), 00434 sub)); 00435 } 00436 } 00437 } 00438 00439 if (0 < count) { 00440 idSeq.length(count); 00441 00442 if (participant_->is_alive() && this->participant_->isOwner()) { 00443 try { 00444 CORBA::Boolean dont_notify_lost = 0; 00445 writer_->remove_associations(idSeq, dont_notify_lost); 00446 00447 } catch (const CORBA::Exception& ex) { 00448 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 00449 ex._tao_print_exception( 00450 "(%P|%t) ERROR: Exception caught in DCPS_IR_Publication::remove_associations:"); 00451 } 00452 00453 participant_->mark_dead(); 00454 } 00455 } 00456 } 00457 } 00458 }
void DCPS_IR_Publication::disassociate_topic | ( | OpenDDS::DCPS::RepoId | id | ) |
Remove any subscriptions whose topic has the id.
Definition at line 322 of file DCPS_IR_Publication.cpp.
References associations_, OpenDDS::DCPS::DCPS_debug_level, DCPS_IR_Subscription::get_id(), DCPS_IR_Subscription::get_topic_id(), id_, DCPS_IR_Participant::is_alive(), DCPS_IR_Participant::isOwner(), DCPS_IR_Participant::mark_dead(), participant_, DCPS_IR_Subscription::remove_associated_publication(), remove_associated_subscription(), and writer_.
00323 { 00324 DCPS_IR_Subscription* sub = 0; 00325 size_t numAssociations = associations_.size(); 00326 CORBA::Boolean send = 1; 00327 CORBA::Boolean dontSend = 0; 00328 long count = 0; 00329 00330 if (0 < numAssociations) { 00331 OpenDDS::DCPS::ReaderIdSeq idSeq(static_cast<CORBA::ULong>(numAssociations)); 00332 idSeq.length(static_cast<CORBA::ULong>(numAssociations)); 00333 00334 DCPS_IR_Subscription_Set::ITERATOR iter = associations_.begin(); 00335 DCPS_IR_Subscription_Set::ITERATOR end = associations_.end(); 00336 00337 while (iter != end) { 00338 sub = *iter; 00339 ++iter; 00340 00341 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 00342 OpenDDS::DCPS::RepoIdConverter pub_converter(id_); 00343 OpenDDS::DCPS::RepoIdConverter sub_converter(sub->get_id()); 00344 OpenDDS::DCPS::RepoIdConverter pub_topic_converter(id); 00345 OpenDDS::DCPS::RepoIdConverter sub_topic_converter(sub->get_topic_id()); 00346 ACE_DEBUG((LM_DEBUG, 00347 ACE_TEXT("(%P|%t) DCPS_IR_Publication::disassociate_topic: ") 00348 ACE_TEXT("publication %C testing if subscription %C topic %C == %C.\n"), 00349 std::string(pub_converter).c_str(), 00350 std::string(sub_converter).c_str(), 00351 std::string(sub_topic_converter).c_str(), 00352 std::string(pub_topic_converter).c_str())); 00353 } 00354 00355 if (id == sub->get_topic_id()) { 00356 CORBA::Boolean dont_notify_lost = 0; 00357 sub->remove_associated_publication(this, send, dont_notify_lost); 00358 remove_associated_subscription(sub, dontSend, dont_notify_lost); 00359 00360 idSeq[count] = sub->get_id(); 00361 ++count; 00362 } 00363 } 00364 00365 if (0 < count) { 00366 idSeq.length(count); 00367 00368 if (participant_->is_alive() && this->participant_->isOwner()) { 00369 try { 00370 CORBA::Boolean dont_notify_lost = 0; 00371 writer_->remove_associations(idSeq, dont_notify_lost); 00372 00373 } catch (const CORBA::Exception& ex) { 00374 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 00375 ex._tao_print_exception( 00376 "(%P|%t) ERROR: Exception caught in DCPS_IR_Publication::remove_associations:"); 00377 } 00378 00379 participant_->mark_dead(); 00380 } 00381 } 00382 } 00383 } 00384 }
std::string DCPS_IR_Publication::dump_to_string | ( | const std::string & | prefix, | |
int | depth | |||
) | const |
Definition at line 746 of file DCPS_IR_Publication.cpp.
References associations_, defunct_, id_, and isBIT_.
00747 { 00748 std::string str; 00749 #if !defined (OPENDDS_INFOREPO_REDUCED_FOOTPRINT) 00750 OpenDDS::DCPS::RepoIdConverter local_converter(id_); 00751 00752 for (int i=0; i < depth; i++) 00753 str += prefix; 00754 std::string indent = str + prefix; 00755 str += "DCPS_IR_Publication["; 00756 str += std::string(local_converter); 00757 str += "]"; 00758 if (isBIT_) 00759 str += " (BIT)"; 00760 str += "\n"; 00761 00762 str += indent + "Associations [ "; 00763 for (DCPS_IR_Subscription_Set::const_iterator assoc = associations_.begin(); 00764 assoc != associations_.end(); 00765 assoc++) 00766 { 00767 OpenDDS::DCPS::RepoIdConverter assoc_converter((*assoc)->get_id()); 00768 str += std::string(assoc_converter); 00769 str += " "; 00770 } 00771 str += "]\n"; 00772 00773 str += indent + "Defunct Associations [ "; 00774 for (DCPS_IR_Subscription_Set::const_iterator def = defunct_.begin(); 00775 def != defunct_.end(); 00776 def++) 00777 { 00778 OpenDDS::DCPS::RepoIdConverter def_converter((*def)->get_id()); 00779 str += std::string(def_converter); 00780 str += " "; 00781 } 00782 str += "]\n"; 00783 #endif // !defined (OPENDDS_INFOREPO_REDUCED_FOOTPRINT) 00784 return str; 00785 }
DDS::DataWriterQos * DCPS_IR_Publication::get_datawriter_qos | ( | ) |
Return pointer to the DataWriter qos Publication retains ownership
Definition at line 489 of file DCPS_IR_Publication.cpp.
References qos_.
Referenced by DCPS_IR_Subscription::add_associated_publication(), DCPS_IR_Domain::publish_publication_bit(), OpenDDS::Federator::ManagerImpl::pushState(), DCPS_IR_Subscription::reevaluate_association(), reevaluate_association(), and DCPS_IR_Topic_Description::try_associate().
00490 { 00491 return &qos_; 00492 }
DDS::InstanceHandle_t DCPS_IR_Publication::get_handle | ( | ) |
Definition at line 635 of file DCPS_IR_Publication.cpp.
References handle_.
Referenced by DCPS_IR_Domain::dispose_publication_bit().
00636 { 00637 return handle_; 00638 }
OpenDDS::DCPS::RepoId DCPS_IR_Publication::get_id | ( | ) |
Definition at line 610 of file DCPS_IR_Publication.cpp.
References id_.
Referenced by DCPS_IR_Subscription::add_associated_publication(), DCPS_IR_Participant::add_publication(), DCPS_IR_Topic_Description::associate(), association_complete(), DCPS_IR_Subscription::disassociate_participant(), DCPS_IR_Subscription::disassociate_publication(), DCPS_IR_Subscription::disassociate_topic(), DCPS_IR_Domain::publish_publication_bit(), OpenDDS::Federator::ManagerImpl::pushState(), DCPS_IR_Subscription::remove_associated_publication(), DCPS_IR_Topic::remove_publication_reference(), and DCPS_IR_Topic_Description::try_associate().
00611 { 00612 return id_; 00613 }
OpenDDS::DCPS::IncompatibleQosStatus * DCPS_IR_Publication::get_incompatibleQosStatus | ( | ) |
Return pointer to the incompatible qos status Publication retains ownership
Definition at line 605 of file DCPS_IR_Publication.cpp.
References incompatibleQosStatus_.
Referenced by DCPS_IR_Subscription::reevaluate_association(), DCPS_IR_Topic_Description::try_associate(), DCPS_IR_Topic::try_associate(), and DCPS_IR_Topic_Description::try_associate_publication().
00606 { 00607 return &incompatibleQosStatus_; 00608 }
OpenDDS::DCPS::RepoId DCPS_IR_Publication::get_participant_id | ( | ) |
Definition at line 620 of file DCPS_IR_Publication.cpp.
References DCPS_IR_Participant::get_id(), and participant_.
Referenced by DCPS_IR_Subscription::disassociate_participant(), DCPS_IR_Domain::publish_publication_bit(), OpenDDS::Federator::ManagerImpl::pushState(), and DCPS_IR_Topic_Description::try_associate().
00621 { 00622 return participant_->get_id(); 00623 }
DDS::PublisherQos * DCPS_IR_Publication::get_publisher_qos | ( | ) |
Return pointer to the Publisher qos Publication retains ownership
Definition at line 595 of file DCPS_IR_Publication.cpp.
References publisherQos_.
Referenced by DCPS_IR_Subscription::add_associated_publication(), DCPS_IR_Domain::publish_publication_bit(), OpenDDS::Federator::ManagerImpl::pushState(), DCPS_IR_Subscription::reevaluate_association(), reevaluate_association(), and DCPS_IR_Topic_Description::try_associate().
00596 { 00597 return &publisherQos_; 00598 }
DCPS_IR_Topic * DCPS_IR_Publication::get_topic | ( | ) |
Definition at line 625 of file DCPS_IR_Publication.cpp.
References topic_.
Referenced by DCPS_IR_Domain::publish_publication_bit().
00626 { 00627 return topic_; 00628 }
DCPS_IR_Topic_Description * DCPS_IR_Publication::get_topic_description | ( | ) |
Definition at line 630 of file DCPS_IR_Publication.cpp.
References DCPS_IR_Topic::get_topic_description(), and topic_.
Referenced by DCPS_IR_Domain::publish_publication_bit().
00631 { 00632 return topic_->get_topic_description(); 00633 }
OpenDDS::DCPS::RepoId DCPS_IR_Publication::get_topic_id | ( | ) |
Definition at line 615 of file DCPS_IR_Publication.cpp.
References DCPS_IR_Topic::get_id(), and topic_.
Referenced by DCPS_IR_Subscription::disassociate_topic(), OpenDDS::Federator::ManagerImpl::pushState(), and DCPS_IR_Topic_Description::try_associate().
OpenDDS::DCPS::TransportLocatorSeq DCPS_IR_Publication::get_transportLocatorSeq | ( | ) | const |
Definition at line 600 of file DCPS_IR_Publication.cpp.
References info_.
Referenced by DCPS_IR_Subscription::add_associated_publication(), OpenDDS::Federator::ManagerImpl::pushState(), DCPS_IR_Subscription::reevaluate_association(), reevaluate_association(), and DCPS_IR_Topic_Description::try_associate().
00601 { 00602 return info_; 00603 }
CORBA::Boolean DCPS_IR_Publication::is_bit | ( | ) |
Definition at line 645 of file DCPS_IR_Publication.cpp.
References isBIT_.
Referenced by DCPS_IR_Domain::dispose_publication_bit().
00646 { 00647 return isBIT_; 00648 }
CORBA::Boolean DCPS_IR_Publication::is_subscription_ignored | ( | OpenDDS::DCPS::RepoId | partId, | |
OpenDDS::DCPS::RepoId | topicId, | |||
OpenDDS::DCPS::RepoId | subId | |||
) |
Check that none of the ids given are ones that this publication should ignore. returns 1 if one of these ids is an ignored id
Definition at line 477 of file DCPS_IR_Publication.cpp.
References DCPS_IR_Participant::is_participant_ignored(), DCPS_IR_Participant::is_subscription_ignored(), DCPS_IR_Participant::is_topic_ignored(), and participant_.
Referenced by DCPS_IR_Topic_Description::try_associate().
00480 { 00481 CORBA::Boolean ignored; 00482 ignored = (participant_->is_participant_ignored(partId) || 00483 participant_->is_topic_ignored(topicId) || 00484 participant_->is_subscription_ignored(subId)); 00485 00486 return ignored; 00487 }
bool DCPS_IR_Publication::reevaluate_association | ( | DCPS_IR_Subscription * | subscription | ) |
Definition at line 700 of file DCPS_IR_Publication.cpp.
References associations_, OpenDDS::DCPS::compatibleQOS(), DCPS_IR_Subscription::get_datareader_qos(), get_datawriter_qos(), DCPS_IR_Subscription::get_incompatibleQosStatus(), get_publisher_qos(), DCPS_IR_Subscription::get_subscriber_qos(), DCPS_IR_Topic::get_topic_description(), DCPS_IR_Subscription::get_transportLocatorSeq(), get_transportLocatorSeq(), remove_associated_subscription(), topic_, and DCPS_IR_Topic_Description::try_associate().
Referenced by DCPS_IR_Topic_Description::reevaluate_associations(), DCPS_IR_Topic::reevaluate_associations(), reevaluate_defunct_associations(), and reevaluate_existing_associations().
00701 { 00702 int status = this->associations_.find(subscription); 00703 00704 if (status == 0) { 00705 // verify if they are still compatiable after change 00706 00707 00708 if (!OpenDDS::DCPS::compatibleQOS(this->get_incompatibleQosStatus(), 00709 subscription->get_incompatibleQosStatus(), 00710 this->get_transportLocatorSeq(), 00711 subscription->get_transportLocatorSeq(), 00712 this->get_datawriter_qos(), 00713 subscription->get_datareader_qos(), 00714 this->get_publisher_qos(), 00715 subscription->get_subscriber_qos())) { 00716 bool sendNotify = true; // inform datawriter 00717 bool notify_lost = true; // invoke listerner callback 00718 00719 this->remove_associated_subscription(subscription, sendNotify, notify_lost, true); 00720 } 00721 00722 } else { 00723 DCPS_IR_Topic_Description* description = this->topic_->get_topic_description(); 00724 return description->try_associate(this, subscription); 00725 } 00726 00727 return false; 00728 }
void DCPS_IR_Publication::reevaluate_defunct_associations | ( | ) |
Definition at line 662 of file DCPS_IR_Publication.cpp.
References defunct_, id_, and reevaluate_association().
00663 { 00664 DCPS_IR_Subscription_Set::iterator it(this->defunct_.begin()); 00665 while (it != this->defunct_.end()) { 00666 DCPS_IR_Subscription* subscription = *it; 00667 ++it; 00668 00669 if (reevaluate_association(subscription)) { 00670 this->defunct_.remove(subscription); // no longer defunct 00671 00672 } else { 00673 OpenDDS::DCPS::RepoIdConverter pub_converter(id_); 00674 OpenDDS::DCPS::RepoIdConverter sub_converter(subscription->get_id()); 00675 ACE_ERROR((LM_ERROR, 00676 ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Publication::reevaluate_defunct_associations: ") 00677 ACE_TEXT("publication %C failed to reassociate subscription %C at %x.\n"), 00678 std::string(pub_converter).c_str(), 00679 std::string(sub_converter).c_str(), 00680 subscription)); 00681 } 00682 } 00683 }
void DCPS_IR_Publication::reevaluate_existing_associations | ( | ) |
Definition at line 685 of file DCPS_IR_Publication.cpp.
References associations_, and reevaluate_association().
Referenced by set_qos().
00686 { 00687 DCPS_IR_Subscription* sub = 0; 00688 DCPS_IR_Subscription_Set::ITERATOR iter = associations_.begin(); 00689 DCPS_IR_Subscription_Set::ITERATOR end = associations_.end(); 00690 00691 while (iter != end) { 00692 sub = *iter; 00693 ++iter; 00694 00695 this->reevaluate_association(sub); 00696 } 00697 }
int DCPS_IR_Publication::remove_associated_subscription | ( | DCPS_IR_Subscription * | sub, | |
CORBA::Boolean | sendNotify, | |||
CORBA::Boolean | notify_lost, | |||
bool | notify_both_side = false | |||
) |
Remove the associated subscription Removes the subscription from the list of associated subscriptions if return successful sendNotify indicates whether to tell the datawriter about removing the subscription The notify_lost parameter is passed to the remove_associations() The notify_both_side parameter indicates if it needs call sub to remove association as well. See the comments of remove_associations() in DataWriterRemote.idl or DataReaderRemote.idl. This method can mark the participant dead Returns 0 if successful
Definition at line 150 of file DCPS_IR_Publication.cpp.
References associations_, OpenDDS::DCPS::DCPS_debug_level, DCPS_IR_Subscription::get_id(), id_, DCPS_IR_Participant::is_alive(), DCPS_IR_Participant::isOwner(), DCPS_IR_Participant::mark_dead(), participant_, DCPS_IR_Subscription::remove_associated_publication(), and writer_.
Referenced by DCPS_IR_Subscription::disassociate_participant(), disassociate_participant(), DCPS_IR_Subscription::disassociate_publication(), disassociate_subscription(), DCPS_IR_Subscription::disassociate_topic(), disassociate_topic(), reevaluate_association(), DCPS_IR_Subscription::remove_associated_publication(), DCPS_IR_Subscription::remove_associations(), and remove_associations().
00154 { 00155 bool marked_dead = false; 00156 00157 if (sendNotify) { 00158 OpenDDS::DCPS::ReaderIdSeq idSeq(1); 00159 idSeq.length(1); 00160 idSeq[0]= sub->get_id(); 00161 00162 if (participant_->is_alive() && this->participant_->isOwner()) { 00163 try { 00164 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 00165 ACE_DEBUG((LM_DEBUG, 00166 ACE_TEXT("(%P|%t) DCPS_IR_Publication::remove_associated_subscription:") 00167 ACE_TEXT(" calling pub %d with sub %d\n"), 00168 id_, sub->get_id())); 00169 } 00170 00171 writer_->remove_associations(idSeq, notify_lost); 00172 00173 if (notify_both_side) { 00174 sub->remove_associated_publication(this, sendNotify, notify_lost); 00175 } 00176 00177 } catch (const CORBA::Exception& ex) { 00178 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 00179 ex._tao_print_exception( 00180 "(%P|%t) ERROR: Exception caught in DCPS_IR_Publication::remove_associated_publication:"); 00181 } 00182 00183 participant_->mark_dead(); 00184 marked_dead = true; 00185 } 00186 } 00187 } 00188 00189 int status = associations_.remove(sub); 00190 00191 if (0 == status) { 00192 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 00193 OpenDDS::DCPS::RepoIdConverter pub_converter(id_); 00194 OpenDDS::DCPS::RepoIdConverter sub_converter(sub->get_id()); 00195 ACE_DEBUG((LM_DEBUG, 00196 ACE_TEXT("(%P|%t) DCPS_IR_Publication::remove_associated_subscription: ") 00197 ACE_TEXT("publication %C removed subscription %C at %x.\n"), 00198 std::string(pub_converter).c_str(), 00199 std::string(sub_converter).c_str(), 00200 sub)); 00201 } 00202 00203 } else { 00204 OpenDDS::DCPS::RepoIdConverter pub_converter(id_); 00205 OpenDDS::DCPS::RepoIdConverter sub_converter(sub->get_id()); 00206 ACE_ERROR((LM_ERROR, 00207 ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Publication::remove_associated_subscription: ") 00208 ACE_TEXT("publication %C failed to remove subscription %C at %x.\n"), 00209 std::string(pub_converter).c_str(), 00210 std::string(sub_converter).c_str(), 00211 sub)); 00212 } // if (0 == status) 00213 00214 if (marked_dead) { 00215 return -1; 00216 00217 } else { 00218 return status; 00219 } 00220 }
int DCPS_IR_Publication::remove_associations | ( | CORBA::Boolean | notify_lost | ) |
Removes all the associated subscriptions This method can mark the participant dead The notify_lost flag true indicates this remove_associations is called when the InfoRepo detects this publication is lost because of the failure of invocation on this publication. Returns 0 if successful
Definition at line 222 of file DCPS_IR_Publication.cpp.
References associations_, defunct_, DCPS_IR_Subscription::remove_associated_publication(), and remove_associated_subscription().
Referenced by ~DCPS_IR_Publication().
00223 { 00224 int status = 0; 00225 DCPS_IR_Subscription* sub = 0; 00226 size_t numAssociations = associations_.size(); 00227 CORBA::Boolean dontSend = 0; 00228 CORBA::Boolean send = 1; 00229 00230 if (0 < numAssociations) { 00231 DCPS_IR_Subscription_Set::ITERATOR iter = associations_.begin(); 00232 DCPS_IR_Subscription_Set::ITERATOR end = associations_.end(); 00233 00234 while (iter != end) { 00235 sub = *iter; 00236 ++iter; 00237 sub->remove_associated_publication(this, send, notify_lost); 00238 remove_associated_subscription(sub, dontSend, notify_lost); 00239 } 00240 } 00241 this->defunct_.reset(); 00242 00243 return status; 00244 }
void DCPS_IR_Publication::set_bit_status | ( | CORBA::Boolean | isBIT | ) |
Definition at line 650 of file DCPS_IR_Publication.cpp.
References isBIT_.
Referenced by DCPS_IR_Participant::add_publication(), and DCPS_IR_Domain::publish_publication_bit().
00651 { 00652 isBIT_ = isBIT; 00653 }
void DCPS_IR_Publication::set_handle | ( | DDS::InstanceHandle_t | handle | ) |
Definition at line 640 of file DCPS_IR_Publication.cpp.
References handle_.
Referenced by DCPS_IR_Domain::publish_publication_bit().
00641 { 00642 handle_ = handle; 00643 }
void DCPS_IR_Publication::set_qos | ( | const DDS::PublisherQos & | qos | ) |
Update PublisherQos only.
Definition at line 526 of file DCPS_IR_Publication.cpp.
References DCPS_IR_Participant::get_domain_reference(), DCPS_IR_Topic::get_topic_description(), participant_, DCPS_IR_Domain::publish_publication_bit(), publisherQos_, DCPS_IR_Topic_Description::reevaluate_associations(), reevaluate_existing_associations(), OpenDDS::DCPS::should_check_association_upon_change(), and topic_.
00527 { 00528 if (false == (qos == this->publisherQos_)) { 00529 // Check if we should check while we have both values. 00530 bool check = OpenDDS::DCPS::should_check_association_upon_change(qos, this->publisherQos_); 00531 00532 // Store the new, compatible, value. 00533 this->publisherQos_ = qos; 00534 00535 if (check) { 00536 // This will remove any newly stale associations. 00537 this->reevaluate_existing_associations(); 00538 00539 // Sleep a while to let remove_association handled by DataWriter 00540 // before add_association. Otherwise, new association will have 00541 // trouble to connect each other. 00542 ACE_OS::sleep(ACE_Time_Value(0, 250000)); 00543 00544 // This will establish any newly made associations. 00545 DCPS_IR_Topic_Description* description 00546 = this->topic_->get_topic_description(); 00547 description->reevaluate_associations(this); 00548 } 00549 00550 this->participant_->get_domain_reference()->publish_publication_bit(this); 00551 } 00552 }
void DCPS_IR_Publication::set_qos | ( | const DDS::DataWriterQos & | qos | ) |
Update DataWriterQos only.
Definition at line 497 of file DCPS_IR_Publication.cpp.
References DCPS_IR_Participant::get_domain_reference(), DCPS_IR_Topic::get_topic_description(), participant_, DCPS_IR_Domain::publish_publication_bit(), qos_, DCPS_IR_Topic_Description::reevaluate_associations(), reevaluate_existing_associations(), OpenDDS::DCPS::should_check_association_upon_change(), and topic_.
00498 { 00499 if (false == (qos == this->qos_)) { 00500 // Check if we should check while we have both values. 00501 bool check = OpenDDS::DCPS::should_check_association_upon_change(qos, this->qos_); 00502 00503 // Store the new, compatible, value. 00504 this->qos_ = qos; 00505 00506 if (check) { 00507 // This will remove any newly stale associations. 00508 this->reevaluate_existing_associations(); 00509 00510 // Sleep a while to let remove_association handled by DataWriter 00511 // before add_association. Otherwise, new association will have 00512 // trouble to connect each other. 00513 ACE_OS::sleep(ACE_Time_Value(0, 250000)); 00514 00515 // This will establish any newly made associations. 00516 DCPS_IR_Topic_Description* description 00517 = this->topic_->get_topic_description(); 00518 description->reevaluate_associations(this); 00519 } 00520 00521 this->participant_->get_domain_reference()->publish_publication_bit(this); 00522 } 00523 }
bool DCPS_IR_Publication::set_qos | ( | const DDS::DataWriterQos & | qos, | |
const DDS::PublisherQos & | publisherQos, | |||
Update::SpecificQos & | specificQos | |||
) |
Update the DataWriter or Publisher qos and also publish the qos changes to datawriter BIT.
Definition at line 554 of file DCPS_IR_Publication.cpp.
References Update::DataWriterQos, DCPS_IR_Participant::get_domain_reference(), DCPS_IR_Topic::get_topic_description(), Update::NoQos, participant_, DCPS_IR_Domain::publish_publication_bit(), Update::PublisherQos, publisherQos_, qos_, DCPS_IR_Topic_Description::reevaluate_associations(), reevaluate_existing_associations(), OpenDDS::DCPS::should_check_association_upon_change(), and topic_.
Referenced by TAO_DDS_DCPSInfo_i::update_publication_qos().
00557 { 00558 bool need_evaluate = false; 00559 bool u_dw_qos = !(qos_ == qos); 00560 00561 if (u_dw_qos) { 00562 if (OpenDDS::DCPS::should_check_association_upon_change(qos_, qos)) { 00563 need_evaluate = true; 00564 } 00565 00566 qos_ = qos; 00567 } 00568 00569 bool u_pub_qos = !(publisherQos_ == publisherQos); 00570 00571 if (u_pub_qos) { 00572 if (OpenDDS::DCPS::should_check_association_upon_change(publisherQos_, publisherQos)) { 00573 need_evaluate = true; 00574 } 00575 00576 publisherQos_ = publisherQos; 00577 } 00578 00579 if (need_evaluate) { 00580 // Check if any existing association need be removed first. 00581 this->reevaluate_existing_associations(); 00582 00583 DCPS_IR_Topic_Description* description = this->topic_->get_topic_description(); 00584 description->reevaluate_associations(this); 00585 } 00586 00587 participant_->get_domain_reference()->publish_publication_bit(this); 00588 specificQos = u_dw_qos? Update::DataWriterQos: 00589 u_pub_qos? Update::PublisherQos: 00590 Update::NoQos; 00591 00592 return true; 00593 }
void DCPS_IR_Publication::update_expr_params | ( | OpenDDS::DCPS::RepoId | readerId, | |
const DDS::StringSeq & | params | |||
) |
Definition at line 731 of file DCPS_IR_Publication.cpp.
References OpenDDS::DCPS::DCPS_debug_level, DCPS_IR_Participant::mark_dead(), participant_, and writer_.
00733 { 00734 try { 00735 writer_->update_subscription_params(readerId, params); 00736 } catch (const CORBA::SystemException& ex) { 00737 if (OpenDDS::DCPS::DCPS_debug_level) { 00738 ex._tao_print_exception("(%P|%t) ERROR: Exception caught in " 00739 "DCPS_IR_Publication::update_expr_params:"); 00740 } 00741 participant_->mark_dead(); 00742 } 00743 }
void DCPS_IR_Publication::update_incompatible_qos | ( | ) |
Notify the writer of incompatible qos status and reset the status' count_since_last_send to 0
Definition at line 460 of file DCPS_IR_Publication.cpp.
References OpenDDS::DCPS::DCPS_debug_level, incompatibleQosStatus_, DCPS_IR_Participant::is_alive(), DCPS_IR_Participant::isOwner(), DCPS_IR_Participant::mark_dead(), participant_, and writer_.
Referenced by DCPS_IR_Topic::try_associate(), and DCPS_IR_Topic_Description::try_associate_publication().
00461 { 00462 if (participant_->is_alive() && this->participant_->isOwner()) { 00463 try { 00464 writer_->update_incompatible_qos(incompatibleQosStatus_); 00465 incompatibleQosStatus_.count_since_last_send = 0; 00466 } catch (const CORBA::Exception& ex) { 00467 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 00468 ex._tao_print_exception( 00469 "(%P|%t) ERROR: Exception caught in DCPS_IR_Publication::update_incompatible_qos:"); 00470 } 00471 00472 participant_->mark_dead(); 00473 } 00474 } 00475 }
OpenDDS::DCPS::DataWriterRemote_ptr DCPS_IR_Publication::writer | ( | ) |
Definition at line 656 of file DCPS_IR_Publication.cpp.
Referenced by OpenDDS::Federator::ManagerImpl::pushState().
Definition at line 191 of file DCPS_IR_Publication.h.
Referenced by add_associated_subscription(), association_complete(), disassociate_participant(), disassociate_subscription(), disassociate_topic(), dump_to_string(), reevaluate_association(), reevaluate_existing_associations(), remove_associated_subscription(), remove_associations(), and ~DCPS_IR_Publication().
Definition at line 192 of file DCPS_IR_Publication.h.
Referenced by dump_to_string(), reevaluate_defunct_associations(), and remove_associations().
Definition at line 179 of file DCPS_IR_Publication.h.
Referenced by add_associated_subscription(), disassociate_participant(), disassociate_subscription(), disassociate_topic(), dump_to_string(), get_id(), reevaluate_defunct_associations(), and remove_associated_subscription().
Definition at line 194 of file DCPS_IR_Publication.h.
Referenced by DCPS_IR_Publication(), get_incompatibleQosStatus(), and update_incompatible_qos().
CORBA::Boolean DCPS_IR_Publication::isBIT_ [private] |
Definition at line 183 of file DCPS_IR_Publication.h.
Referenced by dump_to_string(), is_bit(), and set_bit_status().
Definition at line 180 of file DCPS_IR_Publication.h.
Referenced by add_associated_subscription(), call_association_complete(), disassociate_participant(), disassociate_subscription(), disassociate_topic(), get_participant_id(), is_subscription_ignored(), remove_associated_subscription(), set_qos(), update_expr_params(), and update_incompatible_qos().
Definition at line 189 of file DCPS_IR_Publication.h.
Referenced by get_publisher_qos(), and set_qos().
DDS::DataWriterQos DCPS_IR_Publication::qos_ [private] |
Definition at line 187 of file DCPS_IR_Publication.h.
Referenced by get_datawriter_qos(), and set_qos().
DCPS_IR_Topic* DCPS_IR_Publication::topic_ [private] |
Definition at line 181 of file DCPS_IR_Publication.h.
Referenced by get_topic(), get_topic_description(), get_topic_id(), reevaluate_association(), and set_qos().
OpenDDS::DCPS::DataWriterRemote_var DCPS_IR_Publication::writer_ [private] |
the corresponding DataWriterRemote object
Definition at line 186 of file DCPS_IR_Publication.h.
Referenced by add_associated_subscription(), call_association_complete(), DCPS_IR_Publication(), disassociate_participant(), disassociate_subscription(), disassociate_topic(), remove_associated_subscription(), update_expr_params(), and update_incompatible_qos().