DCPS_IR_Subscription.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "DcpsInfo_pch.h"
00009 
00010 #include /**/ "DCPS_IR_Subscription.h"
00011 
00012 #include /**/ "DCPS_IR_Publication.h"
00013 #include /**/ "DCPS_IR_Participant.h"
00014 #include /**/ "DCPS_IR_Topic_Description.h"
00015 #include /**/ "DCPS_IR_Domain.h"
00016 #include /**/ "dds/DCPS/DCPS_Utils.h"
00017 #include /**/ "dds/DCPS/RepoIdConverter.h"
00018 #include /**/ "dds/DCPS/Qos_Helper.h"
00019 #include /**/ "tao/debug.h"
00020 
00021 DCPS_IR_Subscription::DCPS_IR_Subscription(const OpenDDS::DCPS::RepoId& id,
00022                                            DCPS_IR_Participant* participant,
00023                                            DCPS_IR_Topic* topic,
00024                                            OpenDDS::DCPS::DataReaderRemote_ptr reader,
00025                                            const DDS::DataReaderQos& qos,
00026                                            const OpenDDS::DCPS::TransportLocatorSeq& info,
00027                                            const DDS::SubscriberQos& subscriberQos,
00028                                            const char* filterClassName,
00029                                            const char* filterExpression,
00030                                            const DDS::StringSeq& exprParams)
00031   : id_(id),
00032     participant_(participant),
00033     topic_(topic),
00034     handle_(0),
00035     isBIT_(0),
00036     qos_(qos),
00037     info_(info),
00038     subscriberQos_(subscriberQos),
00039     filterClassName_(filterClassName),
00040     filterExpression_(filterExpression),
00041     exprParams_(exprParams)
00042 {
00043   reader_ =  OpenDDS::DCPS::DataReaderRemote::_duplicate(reader);
00044 
00045   incompatibleQosStatus_.total_count = 0;
00046   incompatibleQosStatus_.count_since_last_send = 0;
00047 }
00048 
00049 DCPS_IR_Subscription::~DCPS_IR_Subscription()
00050 {
00051   if (0 != associations_.size()) {
00052     CORBA::Boolean dont_notify_lost = 0;
00053     remove_associations(dont_notify_lost);
00054   }
00055 }
00056 
00057 int DCPS_IR_Subscription::add_associated_publication(DCPS_IR_Publication* pub,
00058                                                      bool active)
00059 {
00060   // keep track of the association locally
00061   int status = associations_.insert(pub);
00062 
00063   switch (status) {
00064   case 0: {
00065     // inform the datareader about the association
00066     OpenDDS::DCPS::WriterAssociation association;
00067     association.writerTransInfo = pub->get_transportLocatorSeq();
00068     association.writerId = pub->get_id();
00069     association.pubQos = *(pub->get_publisher_qos());
00070     association.writerQos = *(pub->get_datawriter_qos());
00071 
00072     if (participant_->is_alive() && this->participant_->isOwner()) {
00073       try {
00074         if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00075           OpenDDS::DCPS::RepoIdConverter sub_converter(id_);
00076           OpenDDS::DCPS::RepoIdConverter pub_converter(pub->get_id());
00077           ACE_DEBUG((LM_DEBUG,
00078                      ACE_TEXT("(%P|%t) DCPS_IR_Subscription::add_associated_publication:")
00079                      ACE_TEXT(" subscription %C adding publication %C.\n"),
00080                      std::string(sub_converter).c_str(),
00081                      std::string(pub_converter).c_str()));
00082         }
00083 
00084         reader_->add_association(id_, association, active);
00085 
00086         if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00087           ACE_DEBUG((LM_DEBUG,
00088                      ACE_TEXT("(%P|%t) DCPS_IR_Subscription::add_associated_publication: ")
00089                      ACE_TEXT("successfully added publication %x\n"),
00090                      pub));
00091         }
00092       } catch (const CORBA::Exception& ex) {
00093         ex._tao_print_exception(
00094           "(%P|%t) ERROR: Exception caught in DCPS_IR_Subscription::add_associated_publication:");
00095         participant_->mark_dead();
00096         status = -1;
00097       }
00098     }
00099 
00100   }
00101   break;
00102 
00103   case 1: {
00104     OpenDDS::DCPS::RepoIdConverter sub_converter(id_);
00105     OpenDDS::DCPS::RepoIdConverter pub_converter(pub->get_id());
00106     ACE_ERROR((LM_ERROR,
00107                ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Subscription::add_associated_publication: ")
00108                ACE_TEXT("subscription %C attempted to re-add publication %C\n"),
00109                std::string(sub_converter).c_str(),
00110                std::string(pub_converter).c_str()));
00111   }
00112   break;
00113 
00114   case -1: {
00115     OpenDDS::DCPS::RepoIdConverter sub_converter(id_);
00116     OpenDDS::DCPS::RepoIdConverter pub_converter(pub->get_id());
00117     ACE_ERROR((LM_ERROR,
00118                ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Subscription::add_associated_publication: ")
00119                ACE_TEXT("subscription %C failed to add publication %C\n"),
00120                std::string(sub_converter).c_str(),
00121                std::string(pub_converter).c_str()));
00122   }
00123   };
00124 
00125   return status;
00126 }
00127 
00128 void
00129 DCPS_IR_Subscription::association_complete(const OpenDDS::DCPS::RepoId& remote)
00130 {
00131   typedef DCPS_IR_Publication_Set::ITERATOR iter_t;
00132   for (iter_t iter = associations_.begin(); iter != associations_.end(); ++iter) {
00133     if ((*iter)->get_id() == remote) {
00134       (*iter)->call_association_complete(get_id());
00135     }
00136   }
00137 }
00138 
00139 void
00140 DCPS_IR_Subscription::call_association_complete(const OpenDDS::DCPS::RepoId& remote)
00141 {
00142   try {
00143     reader_->association_complete(remote);
00144   } catch (const CORBA::Exception& ex) {
00145     ex._tao_print_exception(
00146       "(%P|%t) ERROR: Exception caught in DCPS_IR_Subscription::call_association_complete:");
00147     participant_->mark_dead();
00148   }
00149 }
00150 
00151 int DCPS_IR_Subscription::remove_associated_publication(DCPS_IR_Publication* pub,
00152                                                         CORBA::Boolean sendNotify,
00153                                                         CORBA::Boolean notify_lost,
00154                                                         bool notify_both_side)
00155 {
00156   bool marked_dead = false;
00157 
00158   if (sendNotify) {
00159     OpenDDS::DCPS::WriterIdSeq idSeq(5);
00160     idSeq.length(1);
00161     idSeq[0] = pub->get_id();
00162 
00163     if (participant_->is_alive() && this->participant_->isOwner()) {
00164       try {
00165         if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00166           ACE_DEBUG((LM_DEBUG,
00167                      ACE_TEXT("(%P|%t) DCPS_IR_Subscription::remove_associated_publication:")
00168                      ACE_TEXT(" calling sub %d with pub %d\n"),
00169                      id_, pub->get_id()));
00170         }
00171 
00172         reader_->remove_associations(idSeq, notify_lost);
00173 
00174         if (notify_both_side) {
00175           pub->remove_associated_subscription(this, sendNotify, notify_lost);
00176         }
00177 
00178       } catch (const CORBA::Exception& ex) {
00179         if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00180           ex._tao_print_exception(
00181             "(%P|%t) ERROR: Exception caught in DCPS_IR_Subscription::remove_associated_publication:");
00182         }
00183 
00184         participant_->mark_dead();
00185         marked_dead = true;
00186       }
00187     }
00188   }
00189 
00190   int status = associations_.remove(pub);
00191 
00192   if (0 == status) {
00193     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00194       OpenDDS::DCPS::RepoIdConverter sub_converter(id_);
00195       OpenDDS::DCPS::RepoIdConverter pub_converter(pub->get_id());
00196       ACE_DEBUG((LM_DEBUG,
00197                  ACE_TEXT("(%P|%t) DCPS_IR_Subscription::remove_associated_publication: ")
00198                  ACE_TEXT("subscription %C removed publication %C at %x.\n"),
00199                  std::string(sub_converter).c_str(),
00200                  std::string(pub_converter).c_str(),
00201                  pub));
00202     }
00203 
00204   } else {
00205     OpenDDS::DCPS::RepoIdConverter sub_converter(id_);
00206     OpenDDS::DCPS::RepoIdConverter pub_converter(pub->get_id());
00207     ACE_ERROR((LM_ERROR,
00208                ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Subscription::remove_associated_publication: ")
00209                ACE_TEXT("subscription %C failed to remove publication %C at %x.\n"),
00210                std::string(sub_converter).c_str(),
00211                std::string(pub_converter).c_str(),
00212                pub));
00213   } // if (0 == status)
00214 
00215   if (marked_dead) {
00216     return -1;
00217 
00218   } else {
00219     return status;
00220   }
00221 }
00222 
00223 int DCPS_IR_Subscription::remove_associations(CORBA::Boolean notify_lost)
00224 {
00225   int status = 0;
00226   DCPS_IR_Publication* pub = 0;
00227   size_t numAssociations = associations_.size();
00228   CORBA::Boolean dontSend = 0;
00229   CORBA::Boolean send = 1;
00230 
00231   if (0 < numAssociations) {
00232     DCPS_IR_Publication_Set::ITERATOR iter = associations_.begin();
00233     DCPS_IR_Publication_Set::ITERATOR end = associations_.end();
00234 
00235     while (iter != end) {
00236       pub = *iter;
00237       ++iter;
00238 
00239       pub->remove_associated_subscription(this, send, notify_lost);
00240       CORBA::Boolean dont_notify_lost = 0;
00241       remove_associated_publication(pub, dontSend, dont_notify_lost);
00242     }
00243   }
00244   this->defunct_.reset();
00245 
00246   return status;
00247 }
00248 
00249 void DCPS_IR_Subscription::disassociate_participant(OpenDDS::DCPS::RepoId id,
00250                                                     bool reassociate)
00251 {
00252   DCPS_IR_Publication* pub = 0;
00253   size_t numAssociations = associations_.size();
00254   CORBA::Boolean dontSend = 0;
00255   CORBA::Boolean send = 1;
00256   long count = 0;
00257 
00258   if (0 < numAssociations) {
00259     OpenDDS::DCPS::WriterIdSeq idSeq(static_cast<CORBA::ULong>(numAssociations));
00260     idSeq.length(static_cast<CORBA::ULong>(numAssociations));
00261 
00262     DCPS_IR_Publication_Set::ITERATOR iter = associations_.begin();
00263     DCPS_IR_Publication_Set::ITERATOR end = associations_.end();
00264 
00265     while (iter != end) {
00266       pub = *iter;
00267       ++iter;
00268 
00269       if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00270         OpenDDS::DCPS::RepoIdConverter sub_converter(id_);
00271         OpenDDS::DCPS::RepoIdConverter pub_converter(pub->get_id());
00272         OpenDDS::DCPS::RepoIdConverter sub_part_converter(id);
00273         OpenDDS::DCPS::RepoIdConverter pub_part_converter(pub->get_participant_id());
00274         ACE_DEBUG((LM_DEBUG,
00275                    ACE_TEXT("(%P|%t) DCPS_IR_Subscription::disassociate_participant: ")
00276                    ACE_TEXT("subscription %C testing if publication %C particpant %C == %C.\n"),
00277                    std::string(sub_converter).c_str(),
00278                    std::string(pub_converter).c_str(),
00279                    std::string(sub_part_converter).c_str(),
00280                    std::string(pub_part_converter).c_str()));
00281       }
00282 
00283       if (id == pub->get_participant_id()) {
00284         CORBA::Boolean dont_notify_lost = 0;
00285         pub->remove_associated_subscription(this, send, dont_notify_lost);
00286         remove_associated_publication(pub, dontSend, dont_notify_lost);
00287 
00288         idSeq[count] = pub->get_id();
00289         ++count;
00290 
00291         if (reassociate && this->defunct_.insert(pub) != 0) {
00292           OpenDDS::DCPS::RepoIdConverter sub_converter(id_);
00293           OpenDDS::DCPS::RepoIdConverter pub_converter(pub->get_id());
00294           ACE_ERROR((LM_ERROR,
00295                      ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Subscription::disassociate_participant: ")
00296                      ACE_TEXT("subscription %C failed to reassociate publication %C at %x.\n"),
00297                      std::string(sub_converter).c_str(),
00298                      std::string(pub_converter).c_str(),
00299                      pub));
00300         }
00301       }
00302     }
00303 
00304     if (0 < count) {
00305       idSeq.length(count);
00306 
00307       if (participant_->is_alive() && this->participant_->isOwner()) {
00308         try {
00309           CORBA::Boolean dont_notify_lost = 0;
00310           reader_->remove_associations(idSeq, dont_notify_lost);
00311 
00312         } catch (const CORBA::Exception& ex) {
00313           if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00314             ex._tao_print_exception(
00315               "(%P|%t) ERROR: Exception caught in DCPS_IR_Subscription::disassociate_participant:");
00316           }
00317 
00318           participant_->mark_dead();
00319         }
00320       }
00321     }
00322   }
00323 }
00324 
00325 void DCPS_IR_Subscription::disassociate_topic(OpenDDS::DCPS::RepoId id)
00326 {
00327   DCPS_IR_Publication* pub = 0;
00328   size_t numAssociations = associations_.size();
00329   CORBA::Boolean dontSend = 0;
00330   CORBA::Boolean send = 1;
00331   long count = 0;
00332 
00333   if (0 < numAssociations) {
00334     OpenDDS::DCPS::WriterIdSeq idSeq(static_cast<CORBA::ULong>(numAssociations));
00335     idSeq.length(static_cast<CORBA::ULong>(numAssociations));
00336 
00337     DCPS_IR_Publication_Set::ITERATOR iter = associations_.begin();
00338     DCPS_IR_Publication_Set::ITERATOR end = associations_.end();
00339 
00340     while (iter != end) {
00341       pub = *iter;
00342       ++iter;
00343 
00344       if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00345         OpenDDS::DCPS::RepoIdConverter sub_converter(id_);
00346         OpenDDS::DCPS::RepoIdConverter pub_converter(pub->get_id());
00347         OpenDDS::DCPS::RepoIdConverter sub_topic_converter(id);
00348         OpenDDS::DCPS::RepoIdConverter pub_topic_converter(pub->get_topic_id());
00349         ACE_DEBUG((LM_DEBUG,
00350                    ACE_TEXT("(%P|%t) DCPS_IR_Subscription::disassociate_topic: ")
00351                    ACE_TEXT("subscription %C testing if publication %C topic %C == %C.\n"),
00352                    std::string(sub_converter).c_str(),
00353                    std::string(pub_converter).c_str(),
00354                    std::string(sub_topic_converter).c_str(),
00355                    std::string(pub_topic_converter).c_str()));
00356       }
00357 
00358       if (id == pub->get_topic_id()) {
00359         CORBA::Boolean dont_notify_lost = 0;
00360         pub->remove_associated_subscription(this, send, dont_notify_lost);
00361         remove_associated_publication(pub, dontSend, dont_notify_lost);
00362 
00363         idSeq[count] = pub->get_id();
00364         ++count;
00365       }
00366     }
00367 
00368     if (0 < count) {
00369       idSeq.length(count);
00370 
00371       if (participant_->is_alive() && this->participant_->isOwner()) {
00372         try {
00373           CORBA::Boolean dont_notify_lost = 0;
00374           reader_->remove_associations(idSeq, dont_notify_lost);
00375 
00376         } catch (const CORBA::Exception& ex) {
00377           if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00378             ex._tao_print_exception(
00379               "(%P|%t) ERROR: Exception caught in DCPS_IR_Subscription::remove_associations:");
00380           }
00381 
00382           participant_->mark_dead();
00383         }
00384       }
00385     }
00386   }
00387 }
00388 
00389 void DCPS_IR_Subscription::disassociate_publication(OpenDDS::DCPS::RepoId id,
00390                                                     bool reassociate)
00391 {
00392   DCPS_IR_Publication* pub = 0;
00393   size_t numAssociations = associations_.size();
00394   CORBA::Boolean dontSend = 0;
00395   CORBA::Boolean send = 1;
00396   long count = 0;
00397 
00398   if (0 < numAssociations) {
00399     OpenDDS::DCPS::WriterIdSeq idSeq(static_cast<CORBA::ULong>(numAssociations));
00400     idSeq.length(static_cast<CORBA::ULong>(numAssociations));
00401 
00402     DCPS_IR_Publication_Set::ITERATOR iter = associations_.begin();
00403     DCPS_IR_Publication_Set::ITERATOR end = associations_.end();
00404 
00405     while (iter != end) {
00406       pub = *iter;
00407       ++iter;
00408 
00409       if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00410         OpenDDS::DCPS::RepoIdConverter sub_converter(id_);
00411         OpenDDS::DCPS::RepoIdConverter pub_converter(pub->get_id());
00412         OpenDDS::DCPS::RepoIdConverter sub_pub_converter(id);
00413         ACE_DEBUG((LM_DEBUG,
00414                    ACE_TEXT("(%P|%t) DCPS_IR_Subscription::disassociate_publication: ")
00415                    ACE_TEXT("subscription %C testing if publication %C == %C.\n"),
00416                    std::string(sub_converter).c_str(),
00417                    std::string(pub_converter).c_str(),
00418                    std::string(sub_pub_converter).c_str()));
00419       }
00420 
00421       if (id == pub->get_id()) {
00422         CORBA::Boolean dont_notify_lost = 0;
00423         pub->remove_associated_subscription(this, send, dont_notify_lost);
00424         remove_associated_publication(pub, dontSend, dont_notify_lost);
00425 
00426         idSeq[count] = pub->get_id();
00427         ++count;
00428 
00429         if (reassociate && this->defunct_.insert(pub) != 0) {
00430           OpenDDS::DCPS::RepoIdConverter sub_converter(id_);
00431           OpenDDS::DCPS::RepoIdConverter pub_converter(pub->get_id());
00432           ACE_ERROR((LM_ERROR,
00433                      ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Subscription::disassociate_publication: ")
00434                      ACE_TEXT("subscription %C failed to reassociate publication %C at %x.\n"),
00435                      std::string(sub_converter).c_str(),
00436                      std::string(pub_converter).c_str(),
00437                      pub));
00438         }
00439       }
00440     }
00441 
00442     if (0 < count) {
00443       idSeq.length(count);
00444 
00445       if (participant_->is_alive() && this->participant_->isOwner()) {
00446         try {
00447           CORBA::Boolean dont_notify_lost = 0;
00448           reader_->remove_associations(idSeq, dont_notify_lost);
00449 
00450         } catch (const CORBA::Exception& ex) {
00451           if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00452             ex._tao_print_exception(
00453               "(%P|%t) ERROR: Exception caught in DCPS_IR_Subscription::remove_associations:");
00454           }
00455 
00456           participant_->mark_dead();
00457         }
00458       }
00459     }
00460   }
00461 }
00462 
00463 void DCPS_IR_Subscription::update_incompatible_qos()
00464 {
00465   if (participant_->is_alive() && this->participant_->isOwner()) {
00466     try {
00467       reader_->update_incompatible_qos(incompatibleQosStatus_);
00468       incompatibleQosStatus_.count_since_last_send = 0;
00469     } catch (const CORBA::Exception& ex) {
00470       if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00471         ex._tao_print_exception(
00472           "(%P|%t) ERROR: Exception caught in DCPS_IR_Subscription::update_incompatible_qos:");
00473       }
00474 
00475       participant_->mark_dead();
00476     }
00477   }
00478 }
00479 
00480 CORBA::Boolean DCPS_IR_Subscription::is_publication_ignored(OpenDDS::DCPS::RepoId partId,
00481                                                             OpenDDS::DCPS::RepoId topicId,
00482                                                             OpenDDS::DCPS::RepoId pubId)
00483 {
00484   CORBA::Boolean ignored;
00485   ignored = (participant_->is_participant_ignored(partId) ||
00486              participant_->is_topic_ignored(topicId) ||
00487              participant_->is_publication_ignored(pubId));
00488 
00489   return ignored;
00490 }
00491 
00492 OpenDDS::DCPS::TransportLocatorSeq DCPS_IR_Subscription::get_transportLocatorSeq() const
00493 {
00494   return info_;
00495 }
00496 
00497 OpenDDS::DCPS::IncompatibleQosStatus* DCPS_IR_Subscription::get_incompatibleQosStatus()
00498 {
00499   return &incompatibleQosStatus_;
00500 }
00501 
00502 const DDS::DataReaderQos* DCPS_IR_Subscription::get_datareader_qos()
00503 {
00504   return &qos_;
00505 }
00506 
00507 const DDS::SubscriberQos* DCPS_IR_Subscription::get_subscriber_qos()
00508 {
00509   return &subscriberQos_;
00510 }
00511 
00512 using OpenDDS::DCPS::operator==;
00513 
00514 void
00515 DCPS_IR_Subscription::set_qos(const DDS::DataReaderQos& qos)
00516 {
00517   if (false == (qos == this->qos_)) {
00518     // Check if we should check while we have both values.
00519     bool check =
00520       OpenDDS::DCPS::should_check_association_upon_change(qos, this->qos_);
00521 
00522     // Store the new, compatible, value.
00523     this->qos_ = qos;
00524 
00525     if (check) {
00526       // This will remove any newly stale associations.
00527       this->reevaluate_existing_associations();
00528 
00529       // Sleep a while to let remove_association handled by DataWriter
00530       // before add_association. Otherwise, new association will have
00531       // trouble to connect each other.
00532       ACE_OS::sleep(ACE_Time_Value(0, 250000));
00533 
00534       // This will establish any newly made associations.
00535       DCPS_IR_Topic_Description* description
00536       = this->topic_->get_topic_description();
00537       description->reevaluate_associations(this);
00538     }
00539 
00540     this->participant_->get_domain_reference()->publish_subscription_bit(this);
00541   }
00542 }
00543 
00544 void
00545 DCPS_IR_Subscription::set_qos(const DDS::SubscriberQos& qos)
00546 {
00547   if (false == (qos == this->subscriberQos_)) {
00548     // Check if we should check while we have both values.
00549     bool check = OpenDDS::DCPS::should_check_association_upon_change(qos, this->subscriberQos_);
00550 
00551     // Store the new, compatible, value.
00552     this->subscriberQos_ = qos;
00553 
00554     if (check) {
00555       // This will remove any newly stale associations.
00556       this->reevaluate_existing_associations();
00557 
00558       // Sleep a while to let remove_association handled by DataWriter
00559       // before add_association. Otherwise, new association will have
00560       // trouble to connect each other.
00561       ACE_OS::sleep(ACE_Time_Value(0, 250000));
00562 
00563       // This will establish any newly made associations.
00564       DCPS_IR_Topic_Description* description
00565       = this->topic_->get_topic_description();
00566       description->reevaluate_associations(this);
00567     }
00568 
00569     this->participant_->get_domain_reference()->publish_subscription_bit(this);
00570   }
00571 }
00572 
00573 bool DCPS_IR_Subscription::set_qos(const DDS::DataReaderQos & qos,
00574                                    const DDS::SubscriberQos & subscriberQos,
00575                                    Update::SpecificQos& specificQos)
00576 {
00577   bool need_evaluate = false;
00578   bool u_dr_qos = !(qos_ == qos);
00579 
00580   if (u_dr_qos) {
00581     if (OpenDDS::DCPS::should_check_association_upon_change(qos_, qos)) {
00582       need_evaluate = true;
00583     }
00584 
00585     qos_ = qos;
00586   }
00587 
00588   bool u_sub_qos = !(subscriberQos_ == subscriberQos);
00589 
00590   if (u_sub_qos) {
00591     if (OpenDDS::DCPS::should_check_association_upon_change(subscriberQos_, subscriberQos)) {
00592       need_evaluate = true;
00593     }
00594 
00595     subscriberQos_ = subscriberQos;
00596   }
00597 
00598   if (need_evaluate) {
00599     // Check if any existing association need be removed first.
00600     this->reevaluate_existing_associations();
00601 
00602     DCPS_IR_Topic_Description* description = this->topic_->get_topic_description();
00603     description->reevaluate_associations(this);
00604   }
00605 
00606   participant_->get_domain_reference()->publish_subscription_bit(this);
00607   specificQos = u_dr_qos?  Update::DataReaderQos:
00608                 u_sub_qos? Update::SubscriberQos:
00609                 Update::NoQos;
00610 
00611   return true;
00612 }
00613 
00614 void
00615 DCPS_IR_Subscription::reevaluate_defunct_associations()
00616 {
00617   DCPS_IR_Publication_Set::iterator it(this->defunct_.begin());
00618   while (it != this->defunct_.end()) {
00619     DCPS_IR_Publication* publication = *it;
00620     ++it;
00621 
00622     if (reevaluate_association(publication)) {
00623       this->defunct_.remove(publication); // no longer defunct
00624 
00625     } else {
00626       OpenDDS::DCPS::RepoIdConverter sub_converter(id_);
00627       OpenDDS::DCPS::RepoIdConverter pub_converter(publication->get_id());
00628       ACE_ERROR((LM_ERROR,
00629                  ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Subscription::reevaluate_defunct_associations: ")
00630                  ACE_TEXT("subscription %C failed to reassociate publication %C at %x.\n"),
00631                  std::string(sub_converter).c_str(),
00632                  std::string(pub_converter).c_str(),
00633                  publication));
00634     }
00635   }
00636 }
00637 
00638 void DCPS_IR_Subscription::reevaluate_existing_associations()
00639 {
00640   DCPS_IR_Publication * pub = 0;
00641   DCPS_IR_Publication_Set::ITERATOR iter = associations_.begin();
00642   DCPS_IR_Publication_Set::ITERATOR end = associations_.end();
00643 
00644   while (iter != end) {
00645     pub = *iter;
00646     ++iter;
00647 
00648     this->reevaluate_association(pub);
00649   }
00650 }
00651 
00652 bool
00653 DCPS_IR_Subscription::reevaluate_association(DCPS_IR_Publication* publication)
00654 {
00655   int status = this->associations_.find(publication);
00656 
00657   if (status == 0) {
00658     // verify if they are still compatiable after change
00659 
00660     if (!OpenDDS::DCPS::compatibleQOS(publication->get_incompatibleQosStatus(),
00661                                       this->get_incompatibleQosStatus(),
00662                                       publication->get_transportLocatorSeq(),
00663                                       this->get_transportLocatorSeq(),
00664                                       publication->get_datawriter_qos(),
00665                                       this->get_datareader_qos(),
00666                                       publication->get_publisher_qos(),
00667                                       this->get_subscriber_qos())) {
00668       bool sendNotify = true; // inform datareader
00669       bool notify_lost = true; // invoke listerner callback
00670 
00671       this->remove_associated_publication(publication, sendNotify, notify_lost, true);
00672     }
00673 
00674   } else {
00675     DCPS_IR_Topic_Description* description = this->topic_->get_topic_description();
00676     return description->try_associate(publication, this);
00677   }
00678 
00679   return false;
00680 }
00681 
00682 OpenDDS::DCPS::RepoId DCPS_IR_Subscription::get_id()
00683 {
00684   return id_;
00685 }
00686 
00687 OpenDDS::DCPS::RepoId DCPS_IR_Subscription::get_topic_id()
00688 {
00689   return topic_->get_id();
00690 }
00691 
00692 OpenDDS::DCPS::RepoId DCPS_IR_Subscription::get_participant_id()
00693 {
00694   return participant_->get_id();
00695 }
00696 
00697 DCPS_IR_Topic_Description* DCPS_IR_Subscription::get_topic_description()
00698 {
00699   return topic_->get_topic_description();
00700 }
00701 
00702 DCPS_IR_Topic* DCPS_IR_Subscription::get_topic()
00703 {
00704   return topic_;
00705 }
00706 
00707 DDS::InstanceHandle_t DCPS_IR_Subscription::get_handle()
00708 {
00709   return handle_;
00710 }
00711 
00712 void DCPS_IR_Subscription::set_handle(DDS::InstanceHandle_t handle)
00713 {
00714   handle_ = handle;
00715 }
00716 
00717 CORBA::Boolean DCPS_IR_Subscription::is_bit()
00718 {
00719   return isBIT_;
00720 }
00721 
00722 void DCPS_IR_Subscription::set_bit_status(CORBA::Boolean isBIT)
00723 {
00724   isBIT_ = isBIT;
00725 }
00726 
00727 OpenDDS::DCPS::DataReaderRemote_ptr
00728 DCPS_IR_Subscription::reader()
00729 {
00730   return OpenDDS::DCPS::DataReaderRemote::_duplicate(this->reader_.in());
00731 }
00732 
00733 std::string
00734 DCPS_IR_Subscription::get_filter_class_name() const
00735 {
00736   return filterClassName_;
00737 }
00738 
00739 std::string
00740 DCPS_IR_Subscription::get_filter_expression() const
00741 {
00742   return filterExpression_;
00743 }
00744 
00745 DDS::StringSeq
00746 DCPS_IR_Subscription::get_expr_params() const
00747 {
00748   return exprParams_;
00749 }
00750 
00751 void
00752 DCPS_IR_Subscription::update_expr_params(const DDS::StringSeq& params)
00753 {
00754   exprParams_ = params;
00755   typedef DCPS_IR_Publication_Set::ITERATOR iter_t;
00756   for (iter_t i(associations_.begin()), e(associations_.end()); i != e; ++i) {
00757     (*i)->update_expr_params(id_, params);
00758   }
00759 }
00760 
00761 std::string
00762 DCPS_IR_Subscription::dump_to_string(const std::string& prefix, int depth) const
00763 {
00764   std::string str;
00765 #if !defined (OPENDDS_INFOREPO_REDUCED_FOOTPRINT)
00766   OpenDDS::DCPS::RepoIdConverter local_converter(id_);
00767 
00768   for (int i=0; i < depth; i++)
00769     str += prefix;
00770   std::string indent = str + prefix;
00771   str += "DCPS_IR_Subscription[";
00772   str += std::string(local_converter);
00773   str += "]";
00774   if (isBIT_)
00775     str += " (BIT)";
00776   str += "\n";
00777 
00778   str += indent + "Associations [ ";
00779   for (DCPS_IR_Publication_Set::const_iterator assoc = associations_.begin();
00780        assoc != associations_.end();
00781        assoc++)
00782   {
00783     OpenDDS::DCPS::RepoIdConverter assoc_converter((*assoc)->get_id());
00784     str += std::string(assoc_converter);
00785     str += " ";
00786   }
00787   str += "]\n";
00788 
00789   str += indent + "Defunct Associations [ ";
00790   for (DCPS_IR_Publication_Set::const_iterator def = defunct_.begin();
00791        def != defunct_.end();
00792        def++)
00793   {
00794     OpenDDS::DCPS::RepoIdConverter def_converter((*def)->get_id());
00795     str += std::string(def_converter);
00796     str += " ";
00797   }
00798   str += "]\n";
00799 
00800 #endif // !defined (OPENDDS_INFOREPO_REDUCED_FOOTPRINT)
00801   return str;
00802 }
00803 
00804 
00805 #if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
00806 
00807 template class ACE_Node<DCPS_IR_Publication*>;
00808 template class ACE_Unbounded_Set<DCPS_IR_Publication*>;
00809 template class ACE_Unbounded_Set_Iterator<DCPS_IR_Publication*>;
00810 
00811 #elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
00812 
00813 #pragma instantiate ACE_Node<DCPS_IR_Publication*>
00814 #pragma instantiate ACE_Unbounded_Set<DCPS_IR_Publication*>
00815 #pragma instantiate ACE_Unbounded_Set_Iterator<DCPS_IR_Publication*>
00816 
00817 #endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */

Generated on Fri Feb 12 20:05:20 2016 for OpenDDS by  doxygen 1.4.7