DCPS_IR_Subscription.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "DcpsInfo_pch.h"
00009 
00010 #include /**/ "DCPS_IR_Subscription.h"
00011 
00012 #include /**/ "DCPS_IR_Publication.h"
00013 #include /**/ "DCPS_IR_Participant.h"
00014 #include /**/ "DCPS_IR_Topic_Description.h"
00015 #include /**/ "DCPS_IR_Domain.h"
00016 #include /**/ "dds/DCPS/DCPS_Utils.h"
00017 #include /**/ "dds/DCPS/RepoIdConverter.h"
00018 #include /**/ "dds/DCPS/Qos_Helper.h"
00019 #include /**/ "tao/debug.h"
00020 
00021 #include /**/ "ace/OS_NS_unistd.h"
00022 
00023 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00024 
00025 DCPS_IR_Subscription::DCPS_IR_Subscription(const OpenDDS::DCPS::RepoId& id,
00026                                            DCPS_IR_Participant* participant,
00027                                            DCPS_IR_Topic* topic,
00028                                            OpenDDS::DCPS::DataReaderRemote_ptr reader,
00029                                            const DDS::DataReaderQos& qos,
00030                                            const OpenDDS::DCPS::TransportLocatorSeq& info,
00031                                            const DDS::SubscriberQos& subscriberQos,
00032                                            const char* filterClassName,
00033                                            const char* filterExpression,
00034                                            const DDS::StringSeq& exprParams)
00035   : id_(id),
00036     participant_(participant),
00037     topic_(topic),
00038     handle_(0),
00039     isBIT_(0),
00040     qos_(qos),
00041     info_(info),
00042     subscriberQos_(subscriberQos),
00043     filterClassName_(filterClassName),
00044     filterExpression_(filterExpression),
00045     exprParams_(exprParams)
00046 {
00047   reader_ =  OpenDDS::DCPS::DataReaderRemote::_duplicate(reader);
00048 
00049   incompatibleQosStatus_.total_count = 0;
00050   incompatibleQosStatus_.count_since_last_send = 0;
00051 }
00052 
00053 DCPS_IR_Subscription::~DCPS_IR_Subscription()
00054 {
00055 }
00056 
00057 int DCPS_IR_Subscription::add_associated_publication(DCPS_IR_Publication* pub,
00058                                                      bool active)
00059 {
00060   // keep track of the association locally
00061   int status = associations_.insert(pub);
00062 
00063   switch (status) {
00064   case 0: {
00065     // inform the datareader about the association
00066     OpenDDS::DCPS::WriterAssociation association;
00067     association.writerTransInfo = pub->get_transportLocatorSeq();
00068     association.writerId = pub->get_id();
00069     association.pubQos = *(pub->get_publisher_qos());
00070     association.writerQos = *(pub->get_datawriter_qos());
00071 
00072     if (participant_->is_alive() && this->participant_->isOwner()) {
00073       try {
00074         if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00075           OpenDDS::DCPS::RepoIdConverter sub_converter(id_);
00076           OpenDDS::DCPS::RepoIdConverter pub_converter(pub->get_id());
00077           ACE_DEBUG((LM_DEBUG,
00078                      ACE_TEXT("(%P|%t) DCPS_IR_Subscription::add_associated_publication:")
00079                      ACE_TEXT(" subscription %C adding publication %C.\n"),
00080                      std::string(sub_converter).c_str(),
00081                      std::string(pub_converter).c_str()));
00082         }
00083 
00084         reader_->add_association(id_, association, active);
00085 
00086         if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00087           ACE_DEBUG((LM_DEBUG,
00088                      ACE_TEXT("(%P|%t) DCPS_IR_Subscription::add_associated_publication: ")
00089                      ACE_TEXT("successfully added publication %x\n"),
00090                      pub));
00091         }
00092       } catch (const CORBA::Exception& ex) {
00093         ex._tao_print_exception(
00094           "(%P|%t) ERROR: Exception caught in DCPS_IR_Subscription::add_associated_publication:");
00095         participant_->mark_dead();
00096         status = -1;
00097       }
00098     }
00099 
00100   }
00101   break;
00102 
00103   case 1: {
00104     OpenDDS::DCPS::RepoIdConverter sub_converter(id_);
00105     OpenDDS::DCPS::RepoIdConverter pub_converter(pub->get_id());
00106     ACE_ERROR((LM_ERROR,
00107                ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Subscription::add_associated_publication: ")
00108                ACE_TEXT("subscription %C attempted to re-add publication %C\n"),
00109                std::string(sub_converter).c_str(),
00110                std::string(pub_converter).c_str()));
00111   }
00112   break;
00113 
00114   case -1: {
00115     OpenDDS::DCPS::RepoIdConverter sub_converter(id_);
00116     OpenDDS::DCPS::RepoIdConverter pub_converter(pub->get_id());
00117     ACE_ERROR((LM_ERROR,
00118                ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Subscription::add_associated_publication: ")
00119                ACE_TEXT("subscription %C failed to add publication %C\n"),
00120                std::string(sub_converter).c_str(),
00121                std::string(pub_converter).c_str()));
00122   }
00123   };
00124 
00125   return status;
00126 }
00127 
00128 void
00129 DCPS_IR_Subscription::association_complete(const OpenDDS::DCPS::RepoId& remote)
00130 {
00131   typedef DCPS_IR_Publication_Set::ITERATOR iter_t;
00132   for (iter_t iter = associations_.begin(); iter != associations_.end(); ++iter) {
00133     if ((*iter)->get_id() == remote) {
00134       (*iter)->call_association_complete(get_id());
00135     }
00136   }
00137 }
00138 
00139 void
00140 DCPS_IR_Subscription::call_association_complete(const OpenDDS::DCPS::RepoId& remote)
00141 {
00142   try {
00143     reader_->association_complete(remote);
00144   } catch (const CORBA::Exception& ex) {
00145     ex._tao_print_exception(
00146       "(%P|%t) ERROR: Exception caught in DCPS_IR_Subscription::call_association_complete:");
00147     participant_->mark_dead();
00148   }
00149 }
00150 
00151 int DCPS_IR_Subscription::remove_associated_publication(DCPS_IR_Publication* pub,
00152                                                         CORBA::Boolean sendNotify,
00153                                                         CORBA::Boolean notify_lost,
00154                                                         bool notify_both_side)
00155 {
00156   bool marked_dead = false;
00157 
00158   if (sendNotify) {
00159 
00160     if (participant_->is_alive() && this->participant_->isOwner()) {
00161       try {
00162         if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00163           OpenDDS::DCPS::RepoIdConverter sub_converter(id_);
00164           OpenDDS::DCPS::RepoIdConverter pub_converter(pub->get_id());
00165           ACE_DEBUG((LM_DEBUG,
00166                      ACE_TEXT("(%P|%t) DCPS_IR_Subscription::remove_associated_publication:")
00167                      ACE_TEXT(" calling sub %C with pub %C\n"),
00168                      std::string(sub_converter).c_str(),
00169                      std::string(pub_converter).c_str()
00170                      ));
00171         }
00172 
00173         OpenDDS::DCPS::WriterIdSeq idSeq(5);
00174         idSeq.length(1);
00175         idSeq[0] = pub->get_id();
00176 
00177         reader_->remove_associations(idSeq, notify_lost);
00178 
00179         if (notify_both_side) {
00180           pub->remove_associated_subscription(this, sendNotify, notify_lost);
00181         }
00182 
00183       } catch (const CORBA::Exception& ex) {
00184         if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00185           ex._tao_print_exception(
00186             "(%P|%t) ERROR: Exception caught in DCPS_IR_Subscription::remove_associated_publication:");
00187         }
00188 
00189         participant_->mark_dead();
00190         marked_dead = true;
00191       }
00192     }
00193   }
00194 
00195   int status = associations_.remove(pub);
00196 
00197   if (0 == status) {
00198     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00199       OpenDDS::DCPS::RepoIdConverter sub_converter(id_);
00200       OpenDDS::DCPS::RepoIdConverter pub_converter(pub->get_id());
00201       ACE_DEBUG((LM_DEBUG,
00202                  ACE_TEXT("(%P|%t) DCPS_IR_Subscription::remove_associated_publication: ")
00203                  ACE_TEXT("subscription %C removed publication %C at %x.\n"),
00204                  std::string(sub_converter).c_str(),
00205                  std::string(pub_converter).c_str(),
00206                  pub));
00207     }
00208 
00209   } else {
00210     OpenDDS::DCPS::RepoIdConverter sub_converter(id_);
00211     OpenDDS::DCPS::RepoIdConverter pub_converter(pub->get_id());
00212     ACE_ERROR((LM_ERROR,
00213                ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Subscription::remove_associated_publication: ")
00214                ACE_TEXT("subscription %C failed to remove publication %C at %x.\n"),
00215                std::string(sub_converter).c_str(),
00216                std::string(pub_converter).c_str(),
00217                pub));
00218   } // if (0 == status)
00219 
00220   if (marked_dead) {
00221     return -1;
00222 
00223   } else {
00224     return status;
00225   }
00226 }
00227 
00228 int DCPS_IR_Subscription::remove_associations(CORBA::Boolean notify_lost)
00229 {
00230   int status = 0;
00231   DCPS_IR_Publication* pub = 0;
00232   size_t numAssociations = associations_.size();
00233   CORBA::Boolean dontSend = 0;
00234   CORBA::Boolean send = 1;
00235 
00236   if (0 < numAssociations) {
00237     DCPS_IR_Publication_Set::ITERATOR iter = associations_.begin();
00238     DCPS_IR_Publication_Set::ITERATOR end = associations_.end();
00239 
00240     while (iter != end) {
00241       pub = *iter;
00242       ++iter;
00243 
00244       pub->remove_associated_subscription(this, send, notify_lost);
00245       CORBA::Boolean dont_notify_lost = 0;
00246       remove_associated_publication(pub, dontSend, dont_notify_lost);
00247     }
00248   }
00249   this->defunct_.reset();
00250 
00251   return status;
00252 }
00253 
00254 void DCPS_IR_Subscription::disassociate_participant(OpenDDS::DCPS::RepoId id,
00255                                                     bool reassociate)
00256 {
00257   DCPS_IR_Publication* pub = 0;
00258   size_t numAssociations = associations_.size();
00259   CORBA::Boolean dontSend = 0;
00260   CORBA::Boolean send = 1;
00261   long count = 0;
00262 
00263   if (0 < numAssociations) {
00264     OpenDDS::DCPS::WriterIdSeq idSeq(static_cast<CORBA::ULong>(numAssociations));
00265     idSeq.length(static_cast<CORBA::ULong>(numAssociations));
00266 
00267     DCPS_IR_Publication_Set::ITERATOR iter = associations_.begin();
00268     DCPS_IR_Publication_Set::ITERATOR end = associations_.end();
00269 
00270     while (iter != end) {
00271       pub = *iter;
00272       ++iter;
00273 
00274       if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00275         OpenDDS::DCPS::RepoIdConverter sub_converter(id_);
00276         OpenDDS::DCPS::RepoIdConverter pub_converter(pub->get_id());
00277         OpenDDS::DCPS::RepoIdConverter sub_part_converter(id);
00278         OpenDDS::DCPS::RepoIdConverter pub_part_converter(pub->get_participant_id());
00279         ACE_DEBUG((LM_DEBUG,
00280                    ACE_TEXT("(%P|%t) DCPS_IR_Subscription::disassociate_participant: ")
00281                    ACE_TEXT("subscription %C testing if publication %C particpant %C == %C.\n"),
00282                    std::string(sub_converter).c_str(),
00283                    std::string(pub_converter).c_str(),
00284                    std::string(sub_part_converter).c_str(),
00285                    std::string(pub_part_converter).c_str()));
00286       }
00287 
00288       if (id == pub->get_participant_id()) {
00289         CORBA::Boolean dont_notify_lost = 0;
00290         pub->remove_associated_subscription(this, send, dont_notify_lost);
00291         remove_associated_publication(pub, dontSend, dont_notify_lost);
00292 
00293         idSeq[count] = pub->get_id();
00294         ++count;
00295 
00296         if (reassociate && this->defunct_.insert(pub) != 0) {
00297           OpenDDS::DCPS::RepoIdConverter sub_converter(id_);
00298           OpenDDS::DCPS::RepoIdConverter pub_converter(pub->get_id());
00299           ACE_ERROR((LM_ERROR,
00300                      ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Subscription::disassociate_participant: ")
00301                      ACE_TEXT("subscription %C failed to reassociate publication %C at %x.\n"),
00302                      std::string(sub_converter).c_str(),
00303                      std::string(pub_converter).c_str(),
00304                      pub));
00305         }
00306       }
00307     }
00308 
00309     if (0 < count) {
00310       idSeq.length(count);
00311 
00312       if (participant_->is_alive() && this->participant_->isOwner()) {
00313         try {
00314           CORBA::Boolean dont_notify_lost = 0;
00315           reader_->remove_associations(idSeq, dont_notify_lost);
00316 
00317         } catch (const CORBA::Exception& ex) {
00318           if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00319             ex._tao_print_exception(
00320               "(%P|%t) ERROR: Exception caught in DCPS_IR_Subscription::disassociate_participant:");
00321           }
00322 
00323           participant_->mark_dead();
00324         }
00325       }
00326     }
00327   }
00328 }
00329 
00330 void DCPS_IR_Subscription::disassociate_topic(OpenDDS::DCPS::RepoId id)
00331 {
00332   DCPS_IR_Publication* pub = 0;
00333   size_t numAssociations = associations_.size();
00334   CORBA::Boolean dontSend = 0;
00335   CORBA::Boolean send = 1;
00336   long count = 0;
00337 
00338   if (0 < numAssociations) {
00339     OpenDDS::DCPS::WriterIdSeq idSeq(static_cast<CORBA::ULong>(numAssociations));
00340     idSeq.length(static_cast<CORBA::ULong>(numAssociations));
00341 
00342     DCPS_IR_Publication_Set::ITERATOR iter = associations_.begin();
00343     DCPS_IR_Publication_Set::ITERATOR end = associations_.end();
00344 
00345     while (iter != end) {
00346       pub = *iter;
00347       ++iter;
00348 
00349       if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00350         OpenDDS::DCPS::RepoIdConverter sub_converter(id_);
00351         OpenDDS::DCPS::RepoIdConverter pub_converter(pub->get_id());
00352         OpenDDS::DCPS::RepoIdConverter sub_topic_converter(id);
00353         OpenDDS::DCPS::RepoIdConverter pub_topic_converter(pub->get_topic_id());
00354         ACE_DEBUG((LM_DEBUG,
00355                    ACE_TEXT("(%P|%t) DCPS_IR_Subscription::disassociate_topic: ")
00356                    ACE_TEXT("subscription %C testing if publication %C topic %C == %C.\n"),
00357                    std::string(sub_converter).c_str(),
00358                    std::string(pub_converter).c_str(),
00359                    std::string(sub_topic_converter).c_str(),
00360                    std::string(pub_topic_converter).c_str()));
00361       }
00362 
00363       if (id == pub->get_topic_id()) {
00364         CORBA::Boolean dont_notify_lost = 0;
00365         pub->remove_associated_subscription(this, send, dont_notify_lost);
00366         remove_associated_publication(pub, dontSend, dont_notify_lost);
00367 
00368         idSeq[count] = pub->get_id();
00369         ++count;
00370       }
00371     }
00372 
00373     if (0 < count) {
00374       idSeq.length(count);
00375 
00376       if (participant_->is_alive() && this->participant_->isOwner()) {
00377         try {
00378           CORBA::Boolean dont_notify_lost = false;
00379           reader_->remove_associations(idSeq, dont_notify_lost);
00380 
00381         } catch (const CORBA::Exception& ex) {
00382           if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00383             ex._tao_print_exception(
00384               "(%P|%t) ERROR: Exception caught in DCPS_IR_Subscription::remove_associations:");
00385           }
00386 
00387           participant_->mark_dead();
00388         }
00389       }
00390     }
00391   }
00392 }
00393 
00394 void DCPS_IR_Subscription::disassociate_publication(OpenDDS::DCPS::RepoId id,
00395                                                     bool reassociate)
00396 {
00397   DCPS_IR_Publication* pub = 0;
00398   size_t numAssociations = associations_.size();
00399   CORBA::Boolean dontSend = 0;
00400   CORBA::Boolean send = 1;
00401   long count = 0;
00402 
00403   if (0 < numAssociations) {
00404     OpenDDS::DCPS::WriterIdSeq idSeq(static_cast<CORBA::ULong>(numAssociations));
00405     idSeq.length(static_cast<CORBA::ULong>(numAssociations));
00406 
00407     DCPS_IR_Publication_Set::ITERATOR iter = associations_.begin();
00408     DCPS_IR_Publication_Set::ITERATOR end = associations_.end();
00409 
00410     while (iter != end) {
00411       pub = *iter;
00412       ++iter;
00413 
00414       if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00415         OpenDDS::DCPS::RepoIdConverter sub_converter(id_);
00416         OpenDDS::DCPS::RepoIdConverter pub_converter(pub->get_id());
00417         OpenDDS::DCPS::RepoIdConverter sub_pub_converter(id);
00418         ACE_DEBUG((LM_DEBUG,
00419                    ACE_TEXT("(%P|%t) DCPS_IR_Subscription::disassociate_publication: ")
00420                    ACE_TEXT("subscription %C testing if publication %C == %C.\n"),
00421                    std::string(sub_converter).c_str(),
00422                    std::string(pub_converter).c_str(),
00423                    std::string(sub_pub_converter).c_str()));
00424       }
00425 
00426       if (id == pub->get_id()) {
00427         CORBA::Boolean dont_notify_lost = 0;
00428         pub->remove_associated_subscription(this, send, dont_notify_lost);
00429         remove_associated_publication(pub, dontSend, dont_notify_lost);
00430 
00431         idSeq[count] = pub->get_id();
00432         ++count;
00433 
00434         if (reassociate && this->defunct_.insert(pub) != 0) {
00435           OpenDDS::DCPS::RepoIdConverter sub_converter(id_);
00436           OpenDDS::DCPS::RepoIdConverter pub_converter(pub->get_id());
00437           ACE_ERROR((LM_ERROR,
00438                      ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Subscription::disassociate_publication: ")
00439                      ACE_TEXT("subscription %C failed to reassociate publication %C at %x.\n"),
00440                      std::string(sub_converter).c_str(),
00441                      std::string(pub_converter).c_str(),
00442                      pub));
00443         }
00444       }
00445     }
00446 
00447     if (0 < count) {
00448       idSeq.length(count);
00449 
00450       if (participant_->is_alive() && this->participant_->isOwner()) {
00451         try {
00452           CORBA::Boolean dont_notify_lost = 0;
00453           reader_->remove_associations(idSeq, dont_notify_lost);
00454 
00455         } catch (const CORBA::Exception& ex) {
00456           if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00457             ex._tao_print_exception(
00458               "(%P|%t) ERROR: Exception caught in DCPS_IR_Subscription::remove_associations:");
00459           }
00460 
00461           participant_->mark_dead();
00462         }
00463       }
00464     }
00465   }
00466 }
00467 
00468 void DCPS_IR_Subscription::update_incompatible_qos()
00469 {
00470   if (participant_->is_alive() && this->participant_->isOwner()) {
00471     try {
00472       reader_->update_incompatible_qos(incompatibleQosStatus_);
00473       incompatibleQosStatus_.count_since_last_send = 0;
00474     } catch (const CORBA::Exception& ex) {
00475       if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00476         ex._tao_print_exception(
00477           "(%P|%t) ERROR: Exception caught in DCPS_IR_Subscription::update_incompatible_qos:");
00478       }
00479 
00480       participant_->mark_dead();
00481     }
00482   }
00483 }
00484 
00485 CORBA::Boolean DCPS_IR_Subscription::is_publication_ignored(OpenDDS::DCPS::RepoId partId,
00486                                                             OpenDDS::DCPS::RepoId topicId,
00487                                                             OpenDDS::DCPS::RepoId pubId)
00488 {
00489   CORBA::Boolean ignored = (participant_->is_participant_ignored(partId) ||
00490                             participant_->is_topic_ignored(topicId) ||
00491                             participant_->is_publication_ignored(pubId));
00492 
00493   return ignored;
00494 }
00495 
00496 OpenDDS::DCPS::TransportLocatorSeq DCPS_IR_Subscription::get_transportLocatorSeq() const
00497 {
00498   return info_;
00499 }
00500 
00501 OpenDDS::DCPS::IncompatibleQosStatus* DCPS_IR_Subscription::get_incompatibleQosStatus()
00502 {
00503   return &incompatibleQosStatus_;
00504 }
00505 
00506 const DDS::DataReaderQos* DCPS_IR_Subscription::get_datareader_qos()
00507 {
00508   return &qos_;
00509 }
00510 
00511 const DDS::SubscriberQos* DCPS_IR_Subscription::get_subscriber_qos()
00512 {
00513   return &subscriberQos_;
00514 }
00515 
00516 using OpenDDS::DCPS::operator==;
00517 
00518 void
00519 DCPS_IR_Subscription::set_qos(const DDS::DataReaderQos& qos)
00520 {
00521   if (false == (qos == this->qos_)) {
00522     // Check if we should check while we have both values.
00523     bool check =
00524       OpenDDS::DCPS::should_check_association_upon_change(qos, this->qos_);
00525 
00526     // Store the new, compatible, value.
00527     this->qos_ = qos;
00528 
00529     if (check) {
00530       // This will remove any newly stale associations.
00531       this->reevaluate_existing_associations();
00532 
00533       // Sleep a while to let remove_association handled by DataWriter
00534       // before add_association. Otherwise, new association will have
00535       // trouble to connect each other.
00536       ACE_OS::sleep(ACE_Time_Value(0, 250000));
00537 
00538       // This will establish any newly made associations.
00539       DCPS_IR_Topic_Description* description
00540       = this->topic_->get_topic_description();
00541       description->reevaluate_associations(this);
00542     }
00543 
00544     this->participant_->get_domain_reference()->publish_subscription_bit(this);
00545   }
00546 }
00547 
00548 void
00549 DCPS_IR_Subscription::set_qos(const DDS::SubscriberQos& qos)
00550 {
00551   if (false == (qos == this->subscriberQos_)) {
00552     // Check if we should check while we have both values.
00553     bool check = OpenDDS::DCPS::should_check_association_upon_change(qos, this->subscriberQos_);
00554 
00555     // Store the new, compatible, value.
00556     this->subscriberQos_ = qos;
00557 
00558     if (check) {
00559       // This will remove any newly stale associations.
00560       this->reevaluate_existing_associations();
00561 
00562       // Sleep a while to let remove_association handled by DataWriter
00563       // before add_association. Otherwise, new association will have
00564       // trouble to connect each other.
00565       ACE_OS::sleep(ACE_Time_Value(0, 250000));
00566 
00567       // This will establish any newly made associations.
00568       DCPS_IR_Topic_Description* description
00569       = this->topic_->get_topic_description();
00570       description->reevaluate_associations(this);
00571     }
00572 
00573     this->participant_->get_domain_reference()->publish_subscription_bit(this);
00574   }
00575 }
00576 
00577 bool DCPS_IR_Subscription::set_qos(const DDS::DataReaderQos & qos,
00578                                    const DDS::SubscriberQos & subscriberQos,
00579                                    Update::SpecificQos& specificQos)
00580 {
00581   bool need_evaluate = false;
00582   bool u_dr_qos = !(qos_ == qos);
00583 
00584   if (u_dr_qos) {
00585     if (OpenDDS::DCPS::should_check_association_upon_change(qos_, qos)) {
00586       need_evaluate = true;
00587     }
00588 
00589     qos_ = qos;
00590   }
00591 
00592   bool u_sub_qos = !(subscriberQos_ == subscriberQos);
00593 
00594   if (u_sub_qos) {
00595     if (OpenDDS::DCPS::should_check_association_upon_change(subscriberQos_, subscriberQos)) {
00596       need_evaluate = true;
00597     }
00598 
00599     subscriberQos_ = subscriberQos;
00600   }
00601 
00602   if (need_evaluate) {
00603     // Check if any existing association need be removed first.
00604     this->reevaluate_existing_associations();
00605 
00606     DCPS_IR_Topic_Description* description = this->topic_->get_topic_description();
00607     description->reevaluate_associations(this);
00608   }
00609 
00610   participant_->get_domain_reference()->publish_subscription_bit(this);
00611   specificQos = u_dr_qos?  Update::DataReaderQos:
00612                 u_sub_qos? Update::SubscriberQos:
00613                 Update::NoQos;
00614 
00615   return true;
00616 }
00617 
00618 void
00619 DCPS_IR_Subscription::reevaluate_defunct_associations()
00620 {
00621   DCPS_IR_Publication_Set::iterator it(this->defunct_.begin());
00622   while (it != this->defunct_.end()) {
00623     DCPS_IR_Publication* publication = *it;
00624     ++it;
00625 
00626     if (reevaluate_association(publication)) {
00627       this->defunct_.remove(publication); // no longer defunct
00628 
00629     } else {
00630       OpenDDS::DCPS::RepoIdConverter sub_converter(id_);
00631       OpenDDS::DCPS::RepoIdConverter pub_converter(publication->get_id());
00632       ACE_ERROR((LM_ERROR,
00633                  ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Subscription::reevaluate_defunct_associations: ")
00634                  ACE_TEXT("subscription %C failed to reassociate publication %C at %x.\n"),
00635                  std::string(sub_converter).c_str(),
00636                  std::string(pub_converter).c_str(),
00637                  publication));
00638     }
00639   }
00640 }
00641 
00642 void DCPS_IR_Subscription::reevaluate_existing_associations()
00643 {
00644   DCPS_IR_Publication * pub = 0;
00645   DCPS_IR_Publication_Set::ITERATOR iter = associations_.begin();
00646   DCPS_IR_Publication_Set::ITERATOR end = associations_.end();
00647 
00648   while (iter != end) {
00649     pub = *iter;
00650     ++iter;
00651 
00652     this->reevaluate_association(pub);
00653   }
00654 }
00655 
00656 bool
00657 DCPS_IR_Subscription::reevaluate_association(DCPS_IR_Publication* publication)
00658 {
00659   int status = this->associations_.find(publication);
00660 
00661   if (status == 0) {
00662     // verify if they are still compatiable after change
00663 
00664     if (!OpenDDS::DCPS::compatibleQOS(publication->get_incompatibleQosStatus(),
00665                                       this->get_incompatibleQosStatus(),
00666                                       publication->get_transportLocatorSeq(),
00667                                       this->get_transportLocatorSeq(),
00668                                       publication->get_datawriter_qos(),
00669                                       this->get_datareader_qos(),
00670                                       publication->get_publisher_qos(),
00671                                       this->get_subscriber_qos())) {
00672       bool sendNotify = true; // inform datareader
00673       bool notify_lost = true; // invoke listerner callback
00674 
00675       this->remove_associated_publication(publication, sendNotify, notify_lost, true);
00676     }
00677 
00678   } else {
00679     DCPS_IR_Topic_Description* description = this->topic_->get_topic_description();
00680     return description->try_associate(publication, this);
00681   }
00682 
00683   return false;
00684 }
00685 
00686 OpenDDS::DCPS::RepoId DCPS_IR_Subscription::get_id()
00687 {
00688   return id_;
00689 }
00690 
00691 OpenDDS::DCPS::RepoId DCPS_IR_Subscription::get_topic_id()
00692 {
00693   return topic_->get_id();
00694 }
00695 
00696 OpenDDS::DCPS::RepoId DCPS_IR_Subscription::get_participant_id()
00697 {
00698   return participant_->get_id();
00699 }
00700 
00701 DCPS_IR_Topic_Description* DCPS_IR_Subscription::get_topic_description()
00702 {
00703   return topic_->get_topic_description();
00704 }
00705 
00706 DCPS_IR_Topic* DCPS_IR_Subscription::get_topic()
00707 {
00708   return topic_;
00709 }
00710 
00711 DDS::InstanceHandle_t DCPS_IR_Subscription::get_handle()
00712 {
00713   return handle_;
00714 }
00715 
00716 void DCPS_IR_Subscription::set_handle(DDS::InstanceHandle_t handle)
00717 {
00718   handle_ = handle;
00719 }
00720 
00721 CORBA::Boolean DCPS_IR_Subscription::is_bit()
00722 {
00723   return isBIT_;
00724 }
00725 
00726 void DCPS_IR_Subscription::set_bit_status(CORBA::Boolean isBIT)
00727 {
00728   isBIT_ = isBIT;
00729 }
00730 
00731 OpenDDS::DCPS::DataReaderRemote_ptr
00732 DCPS_IR_Subscription::reader()
00733 {
00734   return OpenDDS::DCPS::DataReaderRemote::_duplicate(this->reader_.in());
00735 }
00736 
00737 std::string
00738 DCPS_IR_Subscription::get_filter_class_name() const
00739 {
00740   return filterClassName_;
00741 }
00742 
00743 std::string
00744 DCPS_IR_Subscription::get_filter_expression() const
00745 {
00746   return filterExpression_;
00747 }
00748 
00749 DDS::StringSeq
00750 DCPS_IR_Subscription::get_expr_params() const
00751 {
00752   return exprParams_;
00753 }
00754 
00755 void
00756 DCPS_IR_Subscription::update_expr_params(const DDS::StringSeq& params)
00757 {
00758   exprParams_ = params;
00759   typedef DCPS_IR_Publication_Set::ITERATOR iter_t;
00760   for (iter_t i(associations_.begin()), e(associations_.end()); i != e; ++i) {
00761     (*i)->update_expr_params(id_, params);
00762   }
00763 }
00764 
00765 std::string
00766 DCPS_IR_Subscription::dump_to_string(const std::string& prefix, int depth) const
00767 {
00768   std::string str;
00769 #if !defined (OPENDDS_INFOREPO_REDUCED_FOOTPRINT)
00770   OpenDDS::DCPS::RepoIdConverter local_converter(id_);
00771 
00772   for (int i=0; i < depth; i++)
00773     str += prefix;
00774   std::string indent = str + prefix;
00775   str += "DCPS_IR_Subscription[";
00776   str += std::string(local_converter);
00777   str += "]";
00778   if (isBIT_)
00779     str += " (BIT)";
00780   str += "\n";
00781 
00782   str += indent + "Associations [ ";
00783   for (DCPS_IR_Publication_Set::const_iterator assoc = associations_.begin();
00784        assoc != associations_.end();
00785        assoc++)
00786   {
00787     OpenDDS::DCPS::RepoIdConverter assoc_converter((*assoc)->get_id());
00788     str += std::string(assoc_converter);
00789     str += " ";
00790   }
00791   str += "]\n";
00792 
00793   str += indent + "Defunct Associations [ ";
00794   for (DCPS_IR_Publication_Set::const_iterator def = defunct_.begin();
00795        def != defunct_.end();
00796        def++)
00797   {
00798     OpenDDS::DCPS::RepoIdConverter def_converter((*def)->get_id());
00799     str += std::string(def_converter);
00800     str += " ";
00801   }
00802   str += "]\n";
00803 
00804 #endif // !defined (OPENDDS_INFOREPO_REDUCED_FOOTPRINT)
00805   return str;
00806 }
00807 
00808 OPENDDS_END_VERSIONED_NAMESPACE_DECL
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1