#include <DCPS_IR_Subscription.h>
Collaboration diagram for DCPS_IR_Subscription:
Definition at line 38 of file DCPS_IR_Subscription.h.
DCPS_IR_Subscription::DCPS_IR_Subscription | ( | const OpenDDS::DCPS::RepoId & | id, | |
DCPS_IR_Participant * | participant, | |||
DCPS_IR_Topic * | topic, | |||
OpenDDS::DCPS::DataReaderRemote_ptr | reader, | |||
const DDS::DataReaderQos & | qos, | |||
const OpenDDS::DCPS::TransportLocatorSeq & | info, | |||
const DDS::SubscriberQos & | subscriberQos, | |||
const char * | filterClassName, | |||
const char * | filterExpression, | |||
const DDS::StringSeq & | exprParams | |||
) |
Definition at line 21 of file DCPS_IR_Subscription.cpp.
References OpenDDS::DCPS::IncompatibleQosStatus::count_since_last_send, incompatibleQosStatus_, reader_, and OpenDDS::DCPS::IncompatibleQosStatus::total_count.
00031 : id_(id), 00032 participant_(participant), 00033 topic_(topic), 00034 handle_(0), 00035 isBIT_(0), 00036 qos_(qos), 00037 info_(info), 00038 subscriberQos_(subscriberQos), 00039 filterClassName_(filterClassName), 00040 filterExpression_(filterExpression), 00041 exprParams_(exprParams) 00042 { 00043 reader_ = OpenDDS::DCPS::DataReaderRemote::_duplicate(reader); 00044 00045 incompatibleQosStatus_.total_count = 0; 00046 incompatibleQosStatus_.count_since_last_send = 0; 00047 }
DCPS_IR_Subscription::~DCPS_IR_Subscription | ( | ) |
Definition at line 49 of file DCPS_IR_Subscription.cpp.
References associations_, and remove_associations().
00050 { 00051 if (0 != associations_.size()) { 00052 CORBA::Boolean dont_notify_lost = 0; 00053 remove_associations(dont_notify_lost); 00054 } 00055 }
int DCPS_IR_Subscription::add_associated_publication | ( | DCPS_IR_Publication * | pub, | |
bool | active | |||
) |
Associate with the publication Adds the publication to the list of associated publications and notifies datareader if successfully added This method can mark the participant dead Returns 0 if added, 1 if already exists, -1 other failure
Definition at line 57 of file DCPS_IR_Subscription.cpp.
References associations_, OpenDDS::DCPS::DCPS_debug_level, DCPS_IR_Publication::get_datawriter_qos(), DCPS_IR_Publication::get_id(), DCPS_IR_Publication::get_publisher_qos(), DCPS_IR_Publication::get_transportLocatorSeq(), id_, DCPS_IR_Participant::is_alive(), DCPS_IR_Participant::isOwner(), DCPS_IR_Participant::mark_dead(), participant_, OpenDDS::DCPS::WriterAssociation::pubQos, reader_, OpenDDS::DCPS::WriterAssociation::writerId, OpenDDS::DCPS::WriterAssociation::writerQos, and OpenDDS::DCPS::WriterAssociation::writerTransInfo.
Referenced by DCPS_IR_Topic_Description::associate().
00059 { 00060 // keep track of the association locally 00061 int status = associations_.insert(pub); 00062 00063 switch (status) { 00064 case 0: { 00065 // inform the datareader about the association 00066 OpenDDS::DCPS::WriterAssociation association; 00067 association.writerTransInfo = pub->get_transportLocatorSeq(); 00068 association.writerId = pub->get_id(); 00069 association.pubQos = *(pub->get_publisher_qos()); 00070 association.writerQos = *(pub->get_datawriter_qos()); 00071 00072 if (participant_->is_alive() && this->participant_->isOwner()) { 00073 try { 00074 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 00075 OpenDDS::DCPS::RepoIdConverter sub_converter(id_); 00076 OpenDDS::DCPS::RepoIdConverter pub_converter(pub->get_id()); 00077 ACE_DEBUG((LM_DEBUG, 00078 ACE_TEXT("(%P|%t) DCPS_IR_Subscription::add_associated_publication:") 00079 ACE_TEXT(" subscription %C adding publication %C.\n"), 00080 std::string(sub_converter).c_str(), 00081 std::string(pub_converter).c_str())); 00082 } 00083 00084 reader_->add_association(id_, association, active); 00085 00086 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 00087 ACE_DEBUG((LM_DEBUG, 00088 ACE_TEXT("(%P|%t) DCPS_IR_Subscription::add_associated_publication: ") 00089 ACE_TEXT("successfully added publication %x\n"), 00090 pub)); 00091 } 00092 } catch (const CORBA::Exception& ex) { 00093 ex._tao_print_exception( 00094 "(%P|%t) ERROR: Exception caught in DCPS_IR_Subscription::add_associated_publication:"); 00095 participant_->mark_dead(); 00096 status = -1; 00097 } 00098 } 00099 00100 } 00101 break; 00102 00103 case 1: { 00104 OpenDDS::DCPS::RepoIdConverter sub_converter(id_); 00105 OpenDDS::DCPS::RepoIdConverter pub_converter(pub->get_id()); 00106 ACE_ERROR((LM_ERROR, 00107 ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Subscription::add_associated_publication: ") 00108 ACE_TEXT("subscription %C attempted to re-add publication %C\n"), 00109 std::string(sub_converter).c_str(), 00110 std::string(pub_converter).c_str())); 00111 } 00112 break; 00113 00114 case -1: { 00115 OpenDDS::DCPS::RepoIdConverter sub_converter(id_); 00116 OpenDDS::DCPS::RepoIdConverter pub_converter(pub->get_id()); 00117 ACE_ERROR((LM_ERROR, 00118 ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Subscription::add_associated_publication: ") 00119 ACE_TEXT("subscription %C failed to add publication %C\n"), 00120 std::string(sub_converter).c_str(), 00121 std::string(pub_converter).c_str())); 00122 } 00123 }; 00124 00125 return status; 00126 }
void DCPS_IR_Subscription::association_complete | ( | const OpenDDS::DCPS::RepoId & | remote | ) |
The service participant that contains this Subscription has indicated that the assocation to peer "remote" is complete. This method will locate the Publication object for "remote" in order to inform it of the completed association.
Definition at line 129 of file DCPS_IR_Subscription.cpp.
References associations_, and get_id().
Referenced by TAO_DDS_DCPSInfo_i::association_complete().
00130 { 00131 typedef DCPS_IR_Publication_Set::ITERATOR iter_t; 00132 for (iter_t iter = associations_.begin(); iter != associations_.end(); ++iter) { 00133 if ((*iter)->get_id() == remote) { 00134 (*iter)->call_association_complete(get_id()); 00135 } 00136 } 00137 }
void DCPS_IR_Subscription::call_association_complete | ( | const OpenDDS::DCPS::RepoId & | remote | ) |
Invoke the DataWriterRemote::association_complete() callback, passing the "remote" parameter (Subscription) to the service participant.
Definition at line 140 of file DCPS_IR_Subscription.cpp.
References DCPS_IR_Participant::mark_dead(), participant_, and reader_.
00141 { 00142 try { 00143 reader_->association_complete(remote); 00144 } catch (const CORBA::Exception& ex) { 00145 ex._tao_print_exception( 00146 "(%P|%t) ERROR: Exception caught in DCPS_IR_Subscription::call_association_complete:"); 00147 participant_->mark_dead(); 00148 } 00149 }
void DCPS_IR_Subscription::disassociate_participant | ( | OpenDDS::DCPS::RepoId | id, | |
bool | reassociate = false | |||
) |
Remove any publications whose participant has the id.
Definition at line 249 of file DCPS_IR_Subscription.cpp.
References associations_, OpenDDS::DCPS::DCPS_debug_level, DCPS_IR_Publication::get_id(), DCPS_IR_Publication::get_participant_id(), id_, DCPS_IR_Participant::is_alive(), DCPS_IR_Participant::isOwner(), DCPS_IR_Participant::mark_dead(), participant_, reader_, remove_associated_publication(), and DCPS_IR_Publication::remove_associated_subscription().
00251 { 00252 DCPS_IR_Publication* pub = 0; 00253 size_t numAssociations = associations_.size(); 00254 CORBA::Boolean dontSend = 0; 00255 CORBA::Boolean send = 1; 00256 long count = 0; 00257 00258 if (0 < numAssociations) { 00259 OpenDDS::DCPS::WriterIdSeq idSeq(static_cast<CORBA::ULong>(numAssociations)); 00260 idSeq.length(static_cast<CORBA::ULong>(numAssociations)); 00261 00262 DCPS_IR_Publication_Set::ITERATOR iter = associations_.begin(); 00263 DCPS_IR_Publication_Set::ITERATOR end = associations_.end(); 00264 00265 while (iter != end) { 00266 pub = *iter; 00267 ++iter; 00268 00269 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 00270 OpenDDS::DCPS::RepoIdConverter sub_converter(id_); 00271 OpenDDS::DCPS::RepoIdConverter pub_converter(pub->get_id()); 00272 OpenDDS::DCPS::RepoIdConverter sub_part_converter(id); 00273 OpenDDS::DCPS::RepoIdConverter pub_part_converter(pub->get_participant_id()); 00274 ACE_DEBUG((LM_DEBUG, 00275 ACE_TEXT("(%P|%t) DCPS_IR_Subscription::disassociate_participant: ") 00276 ACE_TEXT("subscription %C testing if publication %C particpant %C == %C.\n"), 00277 std::string(sub_converter).c_str(), 00278 std::string(pub_converter).c_str(), 00279 std::string(sub_part_converter).c_str(), 00280 std::string(pub_part_converter).c_str())); 00281 } 00282 00283 if (id == pub->get_participant_id()) { 00284 CORBA::Boolean dont_notify_lost = 0; 00285 pub->remove_associated_subscription(this, send, dont_notify_lost); 00286 remove_associated_publication(pub, dontSend, dont_notify_lost); 00287 00288 idSeq[count] = pub->get_id(); 00289 ++count; 00290 00291 if (reassociate && this->defunct_.insert(pub) != 0) { 00292 OpenDDS::DCPS::RepoIdConverter sub_converter(id_); 00293 OpenDDS::DCPS::RepoIdConverter pub_converter(pub->get_id()); 00294 ACE_ERROR((LM_ERROR, 00295 ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Subscription::disassociate_participant: ") 00296 ACE_TEXT("subscription %C failed to reassociate publication %C at %x.\n"), 00297 std::string(sub_converter).c_str(), 00298 std::string(pub_converter).c_str(), 00299 pub)); 00300 } 00301 } 00302 } 00303 00304 if (0 < count) { 00305 idSeq.length(count); 00306 00307 if (participant_->is_alive() && this->participant_->isOwner()) { 00308 try { 00309 CORBA::Boolean dont_notify_lost = 0; 00310 reader_->remove_associations(idSeq, dont_notify_lost); 00311 00312 } catch (const CORBA::Exception& ex) { 00313 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 00314 ex._tao_print_exception( 00315 "(%P|%t) ERROR: Exception caught in DCPS_IR_Subscription::disassociate_participant:"); 00316 } 00317 00318 participant_->mark_dead(); 00319 } 00320 } 00321 } 00322 } 00323 }
void DCPS_IR_Subscription::disassociate_publication | ( | OpenDDS::DCPS::RepoId | id, | |
bool | reassociate = false | |||
) |
Remove any publications with id.
Definition at line 389 of file DCPS_IR_Subscription.cpp.
References associations_, OpenDDS::DCPS::DCPS_debug_level, DCPS_IR_Publication::get_id(), id_, DCPS_IR_Participant::is_alive(), DCPS_IR_Participant::isOwner(), DCPS_IR_Participant::mark_dead(), participant_, reader_, remove_associated_publication(), and DCPS_IR_Publication::remove_associated_subscription().
Referenced by TAO_DDS_DCPSInfo_i::disassociate_subscription().
00391 { 00392 DCPS_IR_Publication* pub = 0; 00393 size_t numAssociations = associations_.size(); 00394 CORBA::Boolean dontSend = 0; 00395 CORBA::Boolean send = 1; 00396 long count = 0; 00397 00398 if (0 < numAssociations) { 00399 OpenDDS::DCPS::WriterIdSeq idSeq(static_cast<CORBA::ULong>(numAssociations)); 00400 idSeq.length(static_cast<CORBA::ULong>(numAssociations)); 00401 00402 DCPS_IR_Publication_Set::ITERATOR iter = associations_.begin(); 00403 DCPS_IR_Publication_Set::ITERATOR end = associations_.end(); 00404 00405 while (iter != end) { 00406 pub = *iter; 00407 ++iter; 00408 00409 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 00410 OpenDDS::DCPS::RepoIdConverter sub_converter(id_); 00411 OpenDDS::DCPS::RepoIdConverter pub_converter(pub->get_id()); 00412 OpenDDS::DCPS::RepoIdConverter sub_pub_converter(id); 00413 ACE_DEBUG((LM_DEBUG, 00414 ACE_TEXT("(%P|%t) DCPS_IR_Subscription::disassociate_publication: ") 00415 ACE_TEXT("subscription %C testing if publication %C == %C.\n"), 00416 std::string(sub_converter).c_str(), 00417 std::string(pub_converter).c_str(), 00418 std::string(sub_pub_converter).c_str())); 00419 } 00420 00421 if (id == pub->get_id()) { 00422 CORBA::Boolean dont_notify_lost = 0; 00423 pub->remove_associated_subscription(this, send, dont_notify_lost); 00424 remove_associated_publication(pub, dontSend, dont_notify_lost); 00425 00426 idSeq[count] = pub->get_id(); 00427 ++count; 00428 00429 if (reassociate && this->defunct_.insert(pub) != 0) { 00430 OpenDDS::DCPS::RepoIdConverter sub_converter(id_); 00431 OpenDDS::DCPS::RepoIdConverter pub_converter(pub->get_id()); 00432 ACE_ERROR((LM_ERROR, 00433 ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Subscription::disassociate_publication: ") 00434 ACE_TEXT("subscription %C failed to reassociate publication %C at %x.\n"), 00435 std::string(sub_converter).c_str(), 00436 std::string(pub_converter).c_str(), 00437 pub)); 00438 } 00439 } 00440 } 00441 00442 if (0 < count) { 00443 idSeq.length(count); 00444 00445 if (participant_->is_alive() && this->participant_->isOwner()) { 00446 try { 00447 CORBA::Boolean dont_notify_lost = 0; 00448 reader_->remove_associations(idSeq, dont_notify_lost); 00449 00450 } catch (const CORBA::Exception& ex) { 00451 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 00452 ex._tao_print_exception( 00453 "(%P|%t) ERROR: Exception caught in DCPS_IR_Subscription::remove_associations:"); 00454 } 00455 00456 participant_->mark_dead(); 00457 } 00458 } 00459 } 00460 } 00461 }
void DCPS_IR_Subscription::disassociate_topic | ( | OpenDDS::DCPS::RepoId | id | ) |
Remove any publications whose topic has the id.
Definition at line 325 of file DCPS_IR_Subscription.cpp.
References associations_, OpenDDS::DCPS::DCPS_debug_level, DCPS_IR_Publication::get_id(), DCPS_IR_Publication::get_topic_id(), id_, DCPS_IR_Participant::is_alive(), DCPS_IR_Participant::isOwner(), DCPS_IR_Participant::mark_dead(), participant_, reader_, remove_associated_publication(), and DCPS_IR_Publication::remove_associated_subscription().
00326 { 00327 DCPS_IR_Publication* pub = 0; 00328 size_t numAssociations = associations_.size(); 00329 CORBA::Boolean dontSend = 0; 00330 CORBA::Boolean send = 1; 00331 long count = 0; 00332 00333 if (0 < numAssociations) { 00334 OpenDDS::DCPS::WriterIdSeq idSeq(static_cast<CORBA::ULong>(numAssociations)); 00335 idSeq.length(static_cast<CORBA::ULong>(numAssociations)); 00336 00337 DCPS_IR_Publication_Set::ITERATOR iter = associations_.begin(); 00338 DCPS_IR_Publication_Set::ITERATOR end = associations_.end(); 00339 00340 while (iter != end) { 00341 pub = *iter; 00342 ++iter; 00343 00344 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 00345 OpenDDS::DCPS::RepoIdConverter sub_converter(id_); 00346 OpenDDS::DCPS::RepoIdConverter pub_converter(pub->get_id()); 00347 OpenDDS::DCPS::RepoIdConverter sub_topic_converter(id); 00348 OpenDDS::DCPS::RepoIdConverter pub_topic_converter(pub->get_topic_id()); 00349 ACE_DEBUG((LM_DEBUG, 00350 ACE_TEXT("(%P|%t) DCPS_IR_Subscription::disassociate_topic: ") 00351 ACE_TEXT("subscription %C testing if publication %C topic %C == %C.\n"), 00352 std::string(sub_converter).c_str(), 00353 std::string(pub_converter).c_str(), 00354 std::string(sub_topic_converter).c_str(), 00355 std::string(pub_topic_converter).c_str())); 00356 } 00357 00358 if (id == pub->get_topic_id()) { 00359 CORBA::Boolean dont_notify_lost = 0; 00360 pub->remove_associated_subscription(this, send, dont_notify_lost); 00361 remove_associated_publication(pub, dontSend, dont_notify_lost); 00362 00363 idSeq[count] = pub->get_id(); 00364 ++count; 00365 } 00366 } 00367 00368 if (0 < count) { 00369 idSeq.length(count); 00370 00371 if (participant_->is_alive() && this->participant_->isOwner()) { 00372 try { 00373 CORBA::Boolean dont_notify_lost = 0; 00374 reader_->remove_associations(idSeq, dont_notify_lost); 00375 00376 } catch (const CORBA::Exception& ex) { 00377 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 00378 ex._tao_print_exception( 00379 "(%P|%t) ERROR: Exception caught in DCPS_IR_Subscription::remove_associations:"); 00380 } 00381 00382 participant_->mark_dead(); 00383 } 00384 } 00385 } 00386 } 00387 }
std::string DCPS_IR_Subscription::dump_to_string | ( | const std::string & | prefix, | |
int | depth | |||
) | const |
Definition at line 762 of file DCPS_IR_Subscription.cpp.
References associations_, defunct_, id_, and isBIT_.
00763 { 00764 std::string str; 00765 #if !defined (OPENDDS_INFOREPO_REDUCED_FOOTPRINT) 00766 OpenDDS::DCPS::RepoIdConverter local_converter(id_); 00767 00768 for (int i=0; i < depth; i++) 00769 str += prefix; 00770 std::string indent = str + prefix; 00771 str += "DCPS_IR_Subscription["; 00772 str += std::string(local_converter); 00773 str += "]"; 00774 if (isBIT_) 00775 str += " (BIT)"; 00776 str += "\n"; 00777 00778 str += indent + "Associations [ "; 00779 for (DCPS_IR_Publication_Set::const_iterator assoc = associations_.begin(); 00780 assoc != associations_.end(); 00781 assoc++) 00782 { 00783 OpenDDS::DCPS::RepoIdConverter assoc_converter((*assoc)->get_id()); 00784 str += std::string(assoc_converter); 00785 str += " "; 00786 } 00787 str += "]\n"; 00788 00789 str += indent + "Defunct Associations [ "; 00790 for (DCPS_IR_Publication_Set::const_iterator def = defunct_.begin(); 00791 def != defunct_.end(); 00792 def++) 00793 { 00794 OpenDDS::DCPS::RepoIdConverter def_converter((*def)->get_id()); 00795 str += std::string(def_converter); 00796 str += " "; 00797 } 00798 str += "]\n"; 00799 00800 #endif // !defined (OPENDDS_INFOREPO_REDUCED_FOOTPRINT) 00801 return str; 00802 }
const DDS::DataReaderQos * DCPS_IR_Subscription::get_datareader_qos | ( | ) |
Return pointer to the DataReader qos Subscription retains ownership
Definition at line 502 of file DCPS_IR_Subscription.cpp.
References qos_.
Referenced by DCPS_IR_Publication::add_associated_subscription(), DCPS_IR_Domain::publish_subscription_bit(), OpenDDS::Federator::ManagerImpl::pushState(), reevaluate_association(), DCPS_IR_Publication::reevaluate_association(), and DCPS_IR_Topic_Description::try_associate().
00503 { 00504 return &qos_; 00505 }
DDS::StringSeq DCPS_IR_Subscription::get_expr_params | ( | ) | const |
Definition at line 746 of file DCPS_IR_Subscription.cpp.
References exprParams_.
Referenced by DCPS_IR_Publication::add_associated_subscription(), and OpenDDS::Federator::ManagerImpl::pushState().
00747 { 00748 return exprParams_; 00749 }
std::string DCPS_IR_Subscription::get_filter_class_name | ( | ) | const |
Definition at line 734 of file DCPS_IR_Subscription.cpp.
References filterClassName_.
Referenced by DCPS_IR_Publication::add_associated_subscription().
00735 { 00736 return filterClassName_; 00737 }
std::string DCPS_IR_Subscription::get_filter_expression | ( | ) | const |
Definition at line 740 of file DCPS_IR_Subscription.cpp.
References filterExpression_.
Referenced by DCPS_IR_Publication::add_associated_subscription(), and OpenDDS::Federator::ManagerImpl::pushState().
00741 { 00742 return filterExpression_; 00743 }
DDS::InstanceHandle_t DCPS_IR_Subscription::get_handle | ( | ) |
Definition at line 707 of file DCPS_IR_Subscription.cpp.
References handle_.
Referenced by DCPS_IR_Domain::dispose_subscription_bit().
00708 { 00709 return handle_; 00710 }
OpenDDS::DCPS::RepoId DCPS_IR_Subscription::get_id | ( | ) |
Definition at line 682 of file DCPS_IR_Subscription.cpp.
References id_.
Referenced by DCPS_IR_Publication::add_associated_subscription(), DCPS_IR_Participant::add_subscription(), DCPS_IR_Topic::add_subscription_reference(), DCPS_IR_Topic_Description::associate(), association_complete(), DCPS_IR_Publication::disassociate_participant(), DCPS_IR_Publication::disassociate_subscription(), DCPS_IR_Publication::disassociate_topic(), DCPS_IR_Domain::publish_subscription_bit(), OpenDDS::Federator::ManagerImpl::pushState(), DCPS_IR_Publication::remove_associated_subscription(), DCPS_IR_Topic_Description::remove_subscription_reference(), DCPS_IR_Topic::remove_subscription_reference(), DCPS_IR_Topic_Description::try_associate(), DCPS_IR_Topic::try_associate(), and DCPS_IR_Topic_Description::try_associate_subscription().
00683 { 00684 return id_; 00685 }
OpenDDS::DCPS::IncompatibleQosStatus * DCPS_IR_Subscription::get_incompatibleQosStatus | ( | ) |
Return pointer to the incompatible qos status Subscription retains ownership
Definition at line 497 of file DCPS_IR_Subscription.cpp.
References incompatibleQosStatus_.
Referenced by reevaluate_association(), DCPS_IR_Publication::reevaluate_association(), DCPS_IR_Topic_Description::try_associate(), DCPS_IR_Topic_Description::try_associate_publication(), and DCPS_IR_Topic_Description::try_associate_subscription().
00498 { 00499 return &incompatibleQosStatus_; 00500 }
OpenDDS::DCPS::RepoId DCPS_IR_Subscription::get_participant_id | ( | ) |
Definition at line 692 of file DCPS_IR_Subscription.cpp.
References DCPS_IR_Participant::get_id(), and participant_.
Referenced by DCPS_IR_Publication::disassociate_participant(), DCPS_IR_Domain::publish_subscription_bit(), OpenDDS::Federator::ManagerImpl::pushState(), DCPS_IR_Topic_Description::try_associate(), and DCPS_IR_Topic::try_associate().
00693 { 00694 return participant_->get_id(); 00695 }
const DDS::SubscriberQos * DCPS_IR_Subscription::get_subscriber_qos | ( | ) |
Return pointer to the Subscriber qos Subscription retains ownership
Definition at line 507 of file DCPS_IR_Subscription.cpp.
References subscriberQos_.
Referenced by DCPS_IR_Publication::add_associated_subscription(), DCPS_IR_Domain::publish_subscription_bit(), OpenDDS::Federator::ManagerImpl::pushState(), reevaluate_association(), DCPS_IR_Publication::reevaluate_association(), and DCPS_IR_Topic_Description::try_associate().
00508 { 00509 return &subscriberQos_; 00510 }
DCPS_IR_Topic * DCPS_IR_Subscription::get_topic | ( | ) |
Definition at line 702 of file DCPS_IR_Subscription.cpp.
References topic_.
Referenced by DCPS_IR_Domain::publish_subscription_bit().
00703 { 00704 return topic_; 00705 }
DCPS_IR_Topic_Description * DCPS_IR_Subscription::get_topic_description | ( | ) |
Definition at line 697 of file DCPS_IR_Subscription.cpp.
References DCPS_IR_Topic::get_topic_description(), and topic_.
Referenced by DCPS_IR_Domain::publish_subscription_bit().
00698 { 00699 return topic_->get_topic_description(); 00700 }
OpenDDS::DCPS::RepoId DCPS_IR_Subscription::get_topic_id | ( | ) |
Definition at line 687 of file DCPS_IR_Subscription.cpp.
References DCPS_IR_Topic::get_id(), and topic_.
Referenced by DCPS_IR_Publication::disassociate_topic(), OpenDDS::Federator::ManagerImpl::pushState(), DCPS_IR_Topic_Description::try_associate(), and DCPS_IR_Topic::try_associate().
OpenDDS::DCPS::TransportLocatorSeq DCPS_IR_Subscription::get_transportLocatorSeq | ( | ) | const |
Definition at line 492 of file DCPS_IR_Subscription.cpp.
References info_.
Referenced by DCPS_IR_Publication::add_associated_subscription(), OpenDDS::Federator::ManagerImpl::pushState(), reevaluate_association(), DCPS_IR_Publication::reevaluate_association(), and DCPS_IR_Topic_Description::try_associate().
00493 { 00494 return info_; 00495 }
CORBA::Boolean DCPS_IR_Subscription::is_bit | ( | ) |
Definition at line 717 of file DCPS_IR_Subscription.cpp.
References isBIT_.
Referenced by DCPS_IR_Domain::dispose_subscription_bit().
00718 { 00719 return isBIT_; 00720 }
CORBA::Boolean DCPS_IR_Subscription::is_publication_ignored | ( | OpenDDS::DCPS::RepoId | partId, | |
OpenDDS::DCPS::RepoId | topicId, | |||
OpenDDS::DCPS::RepoId | pubId | |||
) |
Check that none of the ids given are ones that this subscription should ignore. returns 1 if one of these ids is an ignored id
Definition at line 480 of file DCPS_IR_Subscription.cpp.
References DCPS_IR_Participant::is_participant_ignored(), DCPS_IR_Participant::is_publication_ignored(), DCPS_IR_Participant::is_topic_ignored(), and participant_.
Referenced by DCPS_IR_Topic_Description::try_associate().
00483 { 00484 CORBA::Boolean ignored; 00485 ignored = (participant_->is_participant_ignored(partId) || 00486 participant_->is_topic_ignored(topicId) || 00487 participant_->is_publication_ignored(pubId)); 00488 00489 return ignored; 00490 }
OpenDDS::DCPS::DataReaderRemote_ptr DCPS_IR_Subscription::reader | ( | ) |
Definition at line 728 of file DCPS_IR_Subscription.cpp.
Referenced by OpenDDS::Federator::ManagerImpl::pushState().
bool DCPS_IR_Subscription::reevaluate_association | ( | DCPS_IR_Publication * | publication | ) |
Definition at line 653 of file DCPS_IR_Subscription.cpp.
References associations_, OpenDDS::DCPS::compatibleQOS(), get_datareader_qos(), DCPS_IR_Publication::get_datawriter_qos(), get_incompatibleQosStatus(), DCPS_IR_Publication::get_incompatibleQosStatus(), DCPS_IR_Publication::get_publisher_qos(), get_subscriber_qos(), DCPS_IR_Topic::get_topic_description(), get_transportLocatorSeq(), DCPS_IR_Publication::get_transportLocatorSeq(), remove_associated_publication(), topic_, and DCPS_IR_Topic_Description::try_associate().
Referenced by DCPS_IR_Topic_Description::reevaluate_associations(), DCPS_IR_Topic::reevaluate_associations(), reevaluate_defunct_associations(), and reevaluate_existing_associations().
00654 { 00655 int status = this->associations_.find(publication); 00656 00657 if (status == 0) { 00658 // verify if they are still compatiable after change 00659 00660 if (!OpenDDS::DCPS::compatibleQOS(publication->get_incompatibleQosStatus(), 00661 this->get_incompatibleQosStatus(), 00662 publication->get_transportLocatorSeq(), 00663 this->get_transportLocatorSeq(), 00664 publication->get_datawriter_qos(), 00665 this->get_datareader_qos(), 00666 publication->get_publisher_qos(), 00667 this->get_subscriber_qos())) { 00668 bool sendNotify = true; // inform datareader 00669 bool notify_lost = true; // invoke listerner callback 00670 00671 this->remove_associated_publication(publication, sendNotify, notify_lost, true); 00672 } 00673 00674 } else { 00675 DCPS_IR_Topic_Description* description = this->topic_->get_topic_description(); 00676 return description->try_associate(publication, this); 00677 } 00678 00679 return false; 00680 }
void DCPS_IR_Subscription::reevaluate_defunct_associations | ( | ) |
Definition at line 615 of file DCPS_IR_Subscription.cpp.
References defunct_, id_, and reevaluate_association().
00616 { 00617 DCPS_IR_Publication_Set::iterator it(this->defunct_.begin()); 00618 while (it != this->defunct_.end()) { 00619 DCPS_IR_Publication* publication = *it; 00620 ++it; 00621 00622 if (reevaluate_association(publication)) { 00623 this->defunct_.remove(publication); // no longer defunct 00624 00625 } else { 00626 OpenDDS::DCPS::RepoIdConverter sub_converter(id_); 00627 OpenDDS::DCPS::RepoIdConverter pub_converter(publication->get_id()); 00628 ACE_ERROR((LM_ERROR, 00629 ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Subscription::reevaluate_defunct_associations: ") 00630 ACE_TEXT("subscription %C failed to reassociate publication %C at %x.\n"), 00631 std::string(sub_converter).c_str(), 00632 std::string(pub_converter).c_str(), 00633 publication)); 00634 } 00635 } 00636 }
void DCPS_IR_Subscription::reevaluate_existing_associations | ( | ) |
Definition at line 638 of file DCPS_IR_Subscription.cpp.
References associations_, and reevaluate_association().
Referenced by set_qos().
00639 { 00640 DCPS_IR_Publication * pub = 0; 00641 DCPS_IR_Publication_Set::ITERATOR iter = associations_.begin(); 00642 DCPS_IR_Publication_Set::ITERATOR end = associations_.end(); 00643 00644 while (iter != end) { 00645 pub = *iter; 00646 ++iter; 00647 00648 this->reevaluate_association(pub); 00649 } 00650 }
int DCPS_IR_Subscription::remove_associated_publication | ( | DCPS_IR_Publication * | pub, | |
CORBA::Boolean | sendNotify, | |||
CORBA::Boolean | notify_lost, | |||
bool | notify_both_side = false | |||
) |
Remove the associated publication Removes the publication from the list of associated publications if return successful sendNotify indicates whether to tell the datareader about removing the publication The notify_lost flag true indicates this remove_associations is called when the InfoRepo detects this subscription is lost because of the failure of invocation on this subscription. The notify_both_side parameter indicates if it needs call pub to remove association as well. This method can mark the participant dead Returns 0 if successful
Definition at line 151 of file DCPS_IR_Subscription.cpp.
References associations_, OpenDDS::DCPS::DCPS_debug_level, DCPS_IR_Publication::get_id(), id_, DCPS_IR_Participant::is_alive(), DCPS_IR_Participant::isOwner(), DCPS_IR_Participant::mark_dead(), participant_, reader_, and DCPS_IR_Publication::remove_associated_subscription().
Referenced by disassociate_participant(), DCPS_IR_Publication::disassociate_participant(), disassociate_publication(), DCPS_IR_Publication::disassociate_subscription(), disassociate_topic(), DCPS_IR_Publication::disassociate_topic(), reevaluate_association(), DCPS_IR_Publication::remove_associated_subscription(), remove_associations(), and DCPS_IR_Publication::remove_associations().
00155 { 00156 bool marked_dead = false; 00157 00158 if (sendNotify) { 00159 OpenDDS::DCPS::WriterIdSeq idSeq(5); 00160 idSeq.length(1); 00161 idSeq[0] = pub->get_id(); 00162 00163 if (participant_->is_alive() && this->participant_->isOwner()) { 00164 try { 00165 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 00166 ACE_DEBUG((LM_DEBUG, 00167 ACE_TEXT("(%P|%t) DCPS_IR_Subscription::remove_associated_publication:") 00168 ACE_TEXT(" calling sub %d with pub %d\n"), 00169 id_, pub->get_id())); 00170 } 00171 00172 reader_->remove_associations(idSeq, notify_lost); 00173 00174 if (notify_both_side) { 00175 pub->remove_associated_subscription(this, sendNotify, notify_lost); 00176 } 00177 00178 } catch (const CORBA::Exception& ex) { 00179 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 00180 ex._tao_print_exception( 00181 "(%P|%t) ERROR: Exception caught in DCPS_IR_Subscription::remove_associated_publication:"); 00182 } 00183 00184 participant_->mark_dead(); 00185 marked_dead = true; 00186 } 00187 } 00188 } 00189 00190 int status = associations_.remove(pub); 00191 00192 if (0 == status) { 00193 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 00194 OpenDDS::DCPS::RepoIdConverter sub_converter(id_); 00195 OpenDDS::DCPS::RepoIdConverter pub_converter(pub->get_id()); 00196 ACE_DEBUG((LM_DEBUG, 00197 ACE_TEXT("(%P|%t) DCPS_IR_Subscription::remove_associated_publication: ") 00198 ACE_TEXT("subscription %C removed publication %C at %x.\n"), 00199 std::string(sub_converter).c_str(), 00200 std::string(pub_converter).c_str(), 00201 pub)); 00202 } 00203 00204 } else { 00205 OpenDDS::DCPS::RepoIdConverter sub_converter(id_); 00206 OpenDDS::DCPS::RepoIdConverter pub_converter(pub->get_id()); 00207 ACE_ERROR((LM_ERROR, 00208 ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Subscription::remove_associated_publication: ") 00209 ACE_TEXT("subscription %C failed to remove publication %C at %x.\n"), 00210 std::string(sub_converter).c_str(), 00211 std::string(pub_converter).c_str(), 00212 pub)); 00213 } // if (0 == status) 00214 00215 if (marked_dead) { 00216 return -1; 00217 00218 } else { 00219 return status; 00220 } 00221 }
int DCPS_IR_Subscription::remove_associations | ( | CORBA::Boolean | notify_lost | ) |
Removes all the associated publications This method can mark the participant dead The notify_lost flag true indicates this remove_associations is called when the InfoRepo detects this subscription is lost because of the failure of invocation on this subscription. Returns 0 if successful
Definition at line 223 of file DCPS_IR_Subscription.cpp.
References associations_, defunct_, remove_associated_publication(), and DCPS_IR_Publication::remove_associated_subscription().
Referenced by ~DCPS_IR_Subscription().
00224 { 00225 int status = 0; 00226 DCPS_IR_Publication* pub = 0; 00227 size_t numAssociations = associations_.size(); 00228 CORBA::Boolean dontSend = 0; 00229 CORBA::Boolean send = 1; 00230 00231 if (0 < numAssociations) { 00232 DCPS_IR_Publication_Set::ITERATOR iter = associations_.begin(); 00233 DCPS_IR_Publication_Set::ITERATOR end = associations_.end(); 00234 00235 while (iter != end) { 00236 pub = *iter; 00237 ++iter; 00238 00239 pub->remove_associated_subscription(this, send, notify_lost); 00240 CORBA::Boolean dont_notify_lost = 0; 00241 remove_associated_publication(pub, dontSend, dont_notify_lost); 00242 } 00243 } 00244 this->defunct_.reset(); 00245 00246 return status; 00247 }
void DCPS_IR_Subscription::set_bit_status | ( | CORBA::Boolean | isBIT | ) |
Definition at line 722 of file DCPS_IR_Subscription.cpp.
References isBIT_.
Referenced by DCPS_IR_Participant::add_subscription(), and DCPS_IR_Domain::publish_subscription_bit().
00723 { 00724 isBIT_ = isBIT; 00725 }
void DCPS_IR_Subscription::set_handle | ( | DDS::InstanceHandle_t | handle | ) |
Definition at line 712 of file DCPS_IR_Subscription.cpp.
References handle_.
Referenced by DCPS_IR_Domain::publish_subscription_bit().
00713 { 00714 handle_ = handle; 00715 }
void DCPS_IR_Subscription::set_qos | ( | const DDS::SubscriberQos & | qos | ) |
Update SubscriberQos only.
Definition at line 545 of file DCPS_IR_Subscription.cpp.
References DCPS_IR_Participant::get_domain_reference(), DCPS_IR_Topic::get_topic_description(), participant_, DCPS_IR_Domain::publish_subscription_bit(), DCPS_IR_Topic_Description::reevaluate_associations(), reevaluate_existing_associations(), OpenDDS::DCPS::should_check_association_upon_change(), subscriberQos_, and topic_.
00546 { 00547 if (false == (qos == this->subscriberQos_)) { 00548 // Check if we should check while we have both values. 00549 bool check = OpenDDS::DCPS::should_check_association_upon_change(qos, this->subscriberQos_); 00550 00551 // Store the new, compatible, value. 00552 this->subscriberQos_ = qos; 00553 00554 if (check) { 00555 // This will remove any newly stale associations. 00556 this->reevaluate_existing_associations(); 00557 00558 // Sleep a while to let remove_association handled by DataWriter 00559 // before add_association. Otherwise, new association will have 00560 // trouble to connect each other. 00561 ACE_OS::sleep(ACE_Time_Value(0, 250000)); 00562 00563 // This will establish any newly made associations. 00564 DCPS_IR_Topic_Description* description 00565 = this->topic_->get_topic_description(); 00566 description->reevaluate_associations(this); 00567 } 00568 00569 this->participant_->get_domain_reference()->publish_subscription_bit(this); 00570 } 00571 }
void DCPS_IR_Subscription::set_qos | ( | const DDS::DataReaderQos & | qos | ) |
Update DataReaderQos only.
Definition at line 515 of file DCPS_IR_Subscription.cpp.
References DCPS_IR_Participant::get_domain_reference(), DCPS_IR_Topic::get_topic_description(), participant_, DCPS_IR_Domain::publish_subscription_bit(), qos_, DCPS_IR_Topic_Description::reevaluate_associations(), reevaluate_existing_associations(), OpenDDS::DCPS::should_check_association_upon_change(), and topic_.
00516 { 00517 if (false == (qos == this->qos_)) { 00518 // Check if we should check while we have both values. 00519 bool check = 00520 OpenDDS::DCPS::should_check_association_upon_change(qos, this->qos_); 00521 00522 // Store the new, compatible, value. 00523 this->qos_ = qos; 00524 00525 if (check) { 00526 // This will remove any newly stale associations. 00527 this->reevaluate_existing_associations(); 00528 00529 // Sleep a while to let remove_association handled by DataWriter 00530 // before add_association. Otherwise, new association will have 00531 // trouble to connect each other. 00532 ACE_OS::sleep(ACE_Time_Value(0, 250000)); 00533 00534 // This will establish any newly made associations. 00535 DCPS_IR_Topic_Description* description 00536 = this->topic_->get_topic_description(); 00537 description->reevaluate_associations(this); 00538 } 00539 00540 this->participant_->get_domain_reference()->publish_subscription_bit(this); 00541 } 00542 }
bool DCPS_IR_Subscription::set_qos | ( | const DDS::DataReaderQos & | qos, | |
const DDS::SubscriberQos & | subscriberQos, | |||
Update::SpecificQos & | specificQos | |||
) |
Update the DataReader or Subscriber qos and also publish the qos changes to datereader BIT.
Definition at line 573 of file DCPS_IR_Subscription.cpp.
References Update::DataReaderQos, DCPS_IR_Participant::get_domain_reference(), DCPS_IR_Topic::get_topic_description(), Update::NoQos, participant_, DCPS_IR_Domain::publish_subscription_bit(), qos_, DCPS_IR_Topic_Description::reevaluate_associations(), reevaluate_existing_associations(), OpenDDS::DCPS::should_check_association_upon_change(), Update::SubscriberQos, subscriberQos_, and topic_.
Referenced by TAO_DDS_DCPSInfo_i::update_subscription_qos().
00576 { 00577 bool need_evaluate = false; 00578 bool u_dr_qos = !(qos_ == qos); 00579 00580 if (u_dr_qos) { 00581 if (OpenDDS::DCPS::should_check_association_upon_change(qos_, qos)) { 00582 need_evaluate = true; 00583 } 00584 00585 qos_ = qos; 00586 } 00587 00588 bool u_sub_qos = !(subscriberQos_ == subscriberQos); 00589 00590 if (u_sub_qos) { 00591 if (OpenDDS::DCPS::should_check_association_upon_change(subscriberQos_, subscriberQos)) { 00592 need_evaluate = true; 00593 } 00594 00595 subscriberQos_ = subscriberQos; 00596 } 00597 00598 if (need_evaluate) { 00599 // Check if any existing association need be removed first. 00600 this->reevaluate_existing_associations(); 00601 00602 DCPS_IR_Topic_Description* description = this->topic_->get_topic_description(); 00603 description->reevaluate_associations(this); 00604 } 00605 00606 participant_->get_domain_reference()->publish_subscription_bit(this); 00607 specificQos = u_dr_qos? Update::DataReaderQos: 00608 u_sub_qos? Update::SubscriberQos: 00609 Update::NoQos; 00610 00611 return true; 00612 }
void DCPS_IR_Subscription::update_expr_params | ( | const DDS::StringSeq & | params | ) |
Calls associated Publications.
Definition at line 752 of file DCPS_IR_Subscription.cpp.
References associations_, exprParams_, and id_.
Referenced by TAO_DDS_DCPSInfo_i::update_subscription_params().
00753 { 00754 exprParams_ = params; 00755 typedef DCPS_IR_Publication_Set::ITERATOR iter_t; 00756 for (iter_t i(associations_.begin()), e(associations_.end()); i != e; ++i) { 00757 (*i)->update_expr_params(id_, params); 00758 } 00759 }
void DCPS_IR_Subscription::update_incompatible_qos | ( | ) |
Notify the reader of incompatible qos status and reset the status' count_since_last_send to 0
Definition at line 463 of file DCPS_IR_Subscription.cpp.
References OpenDDS::DCPS::DCPS_debug_level, incompatibleQosStatus_, DCPS_IR_Participant::is_alive(), DCPS_IR_Participant::isOwner(), DCPS_IR_Participant::mark_dead(), participant_, and reader_.
Referenced by DCPS_IR_Topic_Description::try_associate_publication(), and DCPS_IR_Topic_Description::try_associate_subscription().
00464 { 00465 if (participant_->is_alive() && this->participant_->isOwner()) { 00466 try { 00467 reader_->update_incompatible_qos(incompatibleQosStatus_); 00468 incompatibleQosStatus_.count_since_last_send = 0; 00469 } catch (const CORBA::Exception& ex) { 00470 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 00471 ex._tao_print_exception( 00472 "(%P|%t) ERROR: Exception caught in DCPS_IR_Subscription::update_incompatible_qos:"); 00473 } 00474 00475 participant_->mark_dead(); 00476 } 00477 } 00478 }
Definition at line 201 of file DCPS_IR_Subscription.h.
Referenced by add_associated_publication(), association_complete(), disassociate_participant(), disassociate_publication(), disassociate_topic(), dump_to_string(), reevaluate_association(), reevaluate_existing_associations(), remove_associated_publication(), remove_associations(), update_expr_params(), and ~DCPS_IR_Subscription().
Definition at line 202 of file DCPS_IR_Subscription.h.
Referenced by dump_to_string(), reevaluate_defunct_associations(), and remove_associations().
Definition at line 199 of file DCPS_IR_Subscription.h.
Referenced by get_expr_params(), and update_expr_params().
std::string DCPS_IR_Subscription::filterClassName_ [private] |
std::string DCPS_IR_Subscription::filterExpression_ [private] |
Definition at line 189 of file DCPS_IR_Subscription.h.
Referenced by get_handle(), and set_handle().
Definition at line 186 of file DCPS_IR_Subscription.h.
Referenced by add_associated_publication(), disassociate_participant(), disassociate_publication(), disassociate_topic(), dump_to_string(), get_id(), reevaluate_defunct_associations(), remove_associated_publication(), and update_expr_params().
Definition at line 204 of file DCPS_IR_Subscription.h.
Referenced by DCPS_IR_Subscription(), get_incompatibleQosStatus(), and update_incompatible_qos().
CORBA::Boolean DCPS_IR_Subscription::isBIT_ [private] |
Definition at line 190 of file DCPS_IR_Subscription.h.
Referenced by dump_to_string(), is_bit(), and set_bit_status().
Definition at line 187 of file DCPS_IR_Subscription.h.
Referenced by add_associated_publication(), call_association_complete(), disassociate_participant(), disassociate_publication(), disassociate_topic(), get_participant_id(), is_publication_ignored(), remove_associated_publication(), set_qos(), and update_incompatible_qos().
DDS::DataReaderQos DCPS_IR_Subscription::qos_ [private] |
Definition at line 194 of file DCPS_IR_Subscription.h.
Referenced by get_datareader_qos(), and set_qos().
OpenDDS::DCPS::DataReaderRemote_var DCPS_IR_Subscription::reader_ [private] |
the corresponding DataReaderRemote object
Definition at line 193 of file DCPS_IR_Subscription.h.
Referenced by add_associated_publication(), call_association_complete(), DCPS_IR_Subscription(), disassociate_participant(), disassociate_publication(), disassociate_topic(), remove_associated_publication(), and update_incompatible_qos().
Definition at line 196 of file DCPS_IR_Subscription.h.
Referenced by get_subscriber_qos(), and set_qos().
DCPS_IR_Topic* DCPS_IR_Subscription::topic_ [private] |
Definition at line 188 of file DCPS_IR_Subscription.h.
Referenced by get_topic(), get_topic_description(), get_topic_id(), reevaluate_association(), and set_qos().