DCPS_IR_Publication.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "DcpsInfo_pch.h"
00009 #include /**/ "DCPS_IR_Publication.h"
00010 
00011 #include /**/ "DCPS_IR_Participant.h"
00012 #include /**/ "DCPS_IR_Topic.h"
00013 #include /**/ "DCPS_IR_Subscription.h"
00014 #include /**/ "DCPS_IR_Domain.h"
00015 #include /**/ "DCPS_IR_Topic_Description.h"
00016 #include /**/ "dds/DCPS/DCPS_Utils.h"
00017 #include /**/ "dds/DdsDcpsInfoUtilsC.h"
00018 #include /**/ "dds/DCPS/RepoIdConverter.h"
00019 #include /**/ "dds/DCPS/Qos_Helper.h"
00020 #include /**/ "tao/debug.h"
00021 
00022 DCPS_IR_Publication::DCPS_IR_Publication(const OpenDDS::DCPS::RepoId& id,
00023                                          DCPS_IR_Participant* participant,
00024                                          DCPS_IR_Topic* topic,
00025                                          OpenDDS::DCPS::DataWriterRemote_ptr writer,
00026                                          const DDS::DataWriterQos& qos,
00027                                          const OpenDDS::DCPS::TransportLocatorSeq& info,
00028                                          const DDS::PublisherQos& publisherQos)
00029   : id_(id),
00030     participant_(participant),
00031     topic_(topic),
00032     handle_(0),
00033     isBIT_(0),
00034     qos_(qos),
00035     info_(info),
00036     publisherQos_(publisherQos)
00037 {
00038   writer_ =  OpenDDS::DCPS::DataWriterRemote::_duplicate(writer);
00039 
00040   incompatibleQosStatus_.total_count = 0;
00041   incompatibleQosStatus_.count_since_last_send = 0;
00042 }
00043 
00044 DCPS_IR_Publication::~DCPS_IR_Publication()
00045 {
00046   if (0 != associations_.size()) {
00047     CORBA::Boolean dont_notify_lost = 0;
00048     remove_associations(dont_notify_lost);
00049   }
00050 }
00051 
00052 int DCPS_IR_Publication::add_associated_subscription(DCPS_IR_Subscription* sub,
00053                                                      bool active)
00054 {
00055   // keep track of the association locally
00056   int status = associations_.insert(sub);
00057 
00058   switch (status) {
00059   case 0: {
00060     // inform the datawriter about the association
00061     OpenDDS::DCPS::ReaderAssociation association;
00062     association.readerTransInfo = sub->get_transportLocatorSeq();
00063     association.readerId = sub->get_id();
00064     association.subQos = *(sub->get_subscriber_qos());
00065     association.readerQos = *(sub->get_datareader_qos());
00066     association.filterClassName = sub->get_filter_class_name().c_str();
00067     association.filterExpression = sub->get_filter_expression().c_str();
00068     association.exprParams = sub->get_expr_params();
00069 
00070     if (participant_->is_alive() && this->participant_->isOwner()) {
00071       try {
00072         if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00073           OpenDDS::DCPS::RepoIdConverter pub_converter(id_);
00074           OpenDDS::DCPS::RepoIdConverter sub_converter(sub->get_id());
00075           ACE_DEBUG((LM_DEBUG,
00076                      ACE_TEXT("(%P|%t) DCPS_IR_Publication::add_associated_subscription:")
00077                      ACE_TEXT(" publication %C adding subscription %C.\n"),
00078                      std::string(pub_converter).c_str(),
00079                      std::string(sub_converter).c_str()));
00080         }
00081 
00082         writer_->add_association(id_, association, active);
00083 
00084         if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00085           ACE_DEBUG((LM_DEBUG,
00086                      ACE_TEXT("(%P|%t) DCPS_IR_Publication::add_associated_subscription: ")
00087                      ACE_TEXT("successfully added subscription %x.\n"),
00088                      sub));
00089         }
00090       } catch (const CORBA::Exception& ex) {
00091         ex._tao_print_exception(
00092           "(%P|%t) ERROR: Exception caught in DCPS_IR_Publication::add_associated_subscription:");
00093         participant_->mark_dead();
00094         status = -1;
00095       }
00096     }
00097   }
00098   break;
00099 
00100   case 1: {
00101     OpenDDS::DCPS::RepoIdConverter pub_converter(id_);
00102     OpenDDS::DCPS::RepoIdConverter sub_converter(sub->get_id());
00103     ACE_ERROR((LM_ERROR,
00104                ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Publication::add_associated_subscription: ")
00105                ACE_TEXT("publication %C attempted to re-add subscription %C.\n"),
00106                std::string(pub_converter).c_str(),
00107                std::string(sub_converter).c_str()));
00108   }
00109   break;
00110 
00111   case -1: {
00112     OpenDDS::DCPS::RepoIdConverter pub_converter(id_);
00113     OpenDDS::DCPS::RepoIdConverter sub_converter(sub->get_id());
00114     ACE_ERROR((LM_ERROR,
00115                ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Publication::add_associated_subscription: ")
00116                ACE_TEXT("publication %C failed to add subscription %C.\n"),
00117                std::string(pub_converter).c_str(),
00118                std::string(sub_converter).c_str()));
00119   }
00120   };
00121 
00122   return status;
00123 }
00124 
00125 void
00126 DCPS_IR_Publication::association_complete(const OpenDDS::DCPS::RepoId& remote)
00127 {
00128   typedef DCPS_IR_Subscription_Set::ITERATOR iter_t;
00129   for (iter_t iter = associations_.begin(); iter != associations_.end(); ++iter) {
00130     if ((*iter)->get_id() == remote) {
00131       (*iter)->call_association_complete(get_id());
00132     }
00133   }
00134 }
00135 
00136 void
00137 DCPS_IR_Publication::call_association_complete(const OpenDDS::DCPS::RepoId& remote)
00138 {
00139   try {
00140     writer_->association_complete(remote);
00141   } catch (const CORBA::Exception& ex) {
00142     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00143       ex._tao_print_exception(
00144         "(%P|%t) ERROR: Exception caught in DCPS_IR_Publication::call_association_complete:");
00145     }
00146     participant_->mark_dead();
00147   }
00148 }
00149 
00150 int DCPS_IR_Publication::remove_associated_subscription(DCPS_IR_Subscription* sub,
00151                                                         CORBA::Boolean sendNotify,
00152                                                         CORBA::Boolean notify_lost,
00153                                                         bool notify_both_side)
00154 {
00155   bool marked_dead = false;
00156 
00157   if (sendNotify) {
00158     OpenDDS::DCPS::ReaderIdSeq idSeq(1);
00159     idSeq.length(1);
00160     idSeq[0]= sub->get_id();
00161 
00162     if (participant_->is_alive() && this->participant_->isOwner()) {
00163       try {
00164         if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00165           ACE_DEBUG((LM_DEBUG,
00166                      ACE_TEXT("(%P|%t) DCPS_IR_Publication::remove_associated_subscription:")
00167                      ACE_TEXT(" calling pub %d with sub %d\n"),
00168                      id_, sub->get_id()));
00169         }
00170 
00171         writer_->remove_associations(idSeq, notify_lost);
00172 
00173         if (notify_both_side) {
00174           sub->remove_associated_publication(this, sendNotify, notify_lost);
00175         }
00176 
00177       } catch (const CORBA::Exception& ex) {
00178         if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00179           ex._tao_print_exception(
00180             "(%P|%t) ERROR: Exception caught in DCPS_IR_Publication::remove_associated_publication:");
00181         }
00182 
00183         participant_->mark_dead();
00184         marked_dead = true;
00185       }
00186     }
00187   }
00188 
00189   int status = associations_.remove(sub);
00190 
00191   if (0 == status) {
00192     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00193       OpenDDS::DCPS::RepoIdConverter pub_converter(id_);
00194       OpenDDS::DCPS::RepoIdConverter sub_converter(sub->get_id());
00195       ACE_DEBUG((LM_DEBUG,
00196                  ACE_TEXT("(%P|%t) DCPS_IR_Publication::remove_associated_subscription: ")
00197                  ACE_TEXT("publication %C removed subscription %C at %x.\n"),
00198                  std::string(pub_converter).c_str(),
00199                  std::string(sub_converter).c_str(),
00200                  sub));
00201     }
00202 
00203   } else {
00204     OpenDDS::DCPS::RepoIdConverter pub_converter(id_);
00205     OpenDDS::DCPS::RepoIdConverter sub_converter(sub->get_id());
00206     ACE_ERROR((LM_ERROR,
00207                ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Publication::remove_associated_subscription: ")
00208                ACE_TEXT("publication %C failed to remove subscription %C at %x.\n"),
00209                std::string(pub_converter).c_str(),
00210                std::string(sub_converter).c_str(),
00211                sub));
00212   } // if (0 == status)
00213 
00214   if (marked_dead) {
00215     return -1;
00216 
00217   } else {
00218     return status;
00219   }
00220 }
00221 
00222 int DCPS_IR_Publication::remove_associations(CORBA::Boolean notify_lost)
00223 {
00224   int status = 0;
00225   DCPS_IR_Subscription* sub = 0;
00226   size_t numAssociations = associations_.size();
00227   CORBA::Boolean dontSend = 0;
00228   CORBA::Boolean send = 1;
00229 
00230   if (0 < numAssociations) {
00231     DCPS_IR_Subscription_Set::ITERATOR iter = associations_.begin();
00232     DCPS_IR_Subscription_Set::ITERATOR end = associations_.end();
00233 
00234     while (iter != end) {
00235       sub = *iter;
00236       ++iter;
00237       sub->remove_associated_publication(this, send, notify_lost);
00238       remove_associated_subscription(sub, dontSend, notify_lost);
00239     }
00240   }
00241   this->defunct_.reset();
00242 
00243   return status;
00244 }
00245 
00246 void DCPS_IR_Publication::disassociate_participant(OpenDDS::DCPS::RepoId id,
00247                                                    bool reassociate)
00248 {
00249   DCPS_IR_Subscription* sub = 0;
00250   size_t numAssociations = associations_.size();
00251   CORBA::Boolean send = 1;
00252   CORBA::Boolean dontSend = 0;
00253   long count = 0;
00254 
00255   if (0 < numAssociations) {
00256     OpenDDS::DCPS::ReaderIdSeq idSeq(static_cast<CORBA::ULong>(numAssociations));
00257     idSeq.length(static_cast<CORBA::ULong>(numAssociations));
00258 
00259     DCPS_IR_Subscription_Set::ITERATOR iter = associations_.begin();
00260     DCPS_IR_Subscription_Set::ITERATOR end = associations_.end();
00261 
00262     while (iter != end) {
00263       sub = *iter;
00264       ++iter;
00265 
00266       if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00267         OpenDDS::DCPS::RepoIdConverter pub_converter(id_);
00268         OpenDDS::DCPS::RepoIdConverter sub_converter(sub->get_id());
00269         OpenDDS::DCPS::RepoIdConverter pub_part_converter(id);
00270         OpenDDS::DCPS::RepoIdConverter sub_part_converter(sub->get_participant_id());
00271         ACE_DEBUG((LM_DEBUG,
00272                    ACE_TEXT("(%P|%t) DCPS_IR_Publication::disassociate_participant: ")
00273                    ACE_TEXT("publication %C testing if subscription %C particpant %C == %C.\n"),
00274                    std::string(pub_converter).c_str(),
00275                    std::string(sub_converter).c_str(),
00276                    std::string(sub_part_converter).c_str(),
00277                    std::string(pub_part_converter).c_str()));
00278       }
00279 
00280       if (id == sub->get_participant_id()) {
00281         CORBA::Boolean dont_notify_lost = 0;
00282         sub->remove_associated_publication(this, send, dont_notify_lost);
00283         remove_associated_subscription(sub, dontSend, dont_notify_lost);
00284 
00285         idSeq[count] = sub->get_id();
00286         ++count;
00287 
00288         if (reassociate && this->defunct_.insert(sub) != 0) {
00289           OpenDDS::DCPS::RepoIdConverter pub_converter(id_);
00290           OpenDDS::DCPS::RepoIdConverter sub_converter(sub->get_id());
00291           ACE_ERROR((LM_ERROR,
00292                      ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Publication::disassociate_participant: ")
00293                      ACE_TEXT("publication %C failed to reassociate subscription %C at %x.\n"),
00294                      std::string(pub_converter).c_str(),
00295                      std::string(sub_converter).c_str(),
00296                      sub));
00297         }
00298       }
00299     }
00300 
00301     if (0 < count) {
00302       idSeq.length(count);
00303 
00304       if (participant_->is_alive() && this->participant_->isOwner()) {
00305         try {
00306           CORBA::Boolean dont_notify_lost = 0;
00307           writer_->remove_associations(idSeq, dont_notify_lost);
00308 
00309         } catch (const CORBA::Exception& ex) {
00310           if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00311             ex._tao_print_exception(
00312               "(%P|%t) ERROR: Exception caught in DCPS_IR_Publication::disassociate_participant:");
00313           }
00314 
00315           participant_->mark_dead();
00316         }
00317       }
00318     }
00319   }
00320 }
00321 
00322 void DCPS_IR_Publication::disassociate_topic(OpenDDS::DCPS::RepoId id)
00323 {
00324   DCPS_IR_Subscription* sub = 0;
00325   size_t numAssociations = associations_.size();
00326   CORBA::Boolean send = 1;
00327   CORBA::Boolean dontSend = 0;
00328   long count = 0;
00329 
00330   if (0 < numAssociations) {
00331     OpenDDS::DCPS::ReaderIdSeq idSeq(static_cast<CORBA::ULong>(numAssociations));
00332     idSeq.length(static_cast<CORBA::ULong>(numAssociations));
00333 
00334     DCPS_IR_Subscription_Set::ITERATOR iter = associations_.begin();
00335     DCPS_IR_Subscription_Set::ITERATOR end = associations_.end();
00336 
00337     while (iter != end) {
00338       sub = *iter;
00339       ++iter;
00340 
00341       if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00342         OpenDDS::DCPS::RepoIdConverter pub_converter(id_);
00343         OpenDDS::DCPS::RepoIdConverter sub_converter(sub->get_id());
00344         OpenDDS::DCPS::RepoIdConverter pub_topic_converter(id);
00345         OpenDDS::DCPS::RepoIdConverter sub_topic_converter(sub->get_topic_id());
00346         ACE_DEBUG((LM_DEBUG,
00347                    ACE_TEXT("(%P|%t) DCPS_IR_Publication::disassociate_topic: ")
00348                    ACE_TEXT("publication %C testing if subscription %C topic %C == %C.\n"),
00349                    std::string(pub_converter).c_str(),
00350                    std::string(sub_converter).c_str(),
00351                    std::string(sub_topic_converter).c_str(),
00352                    std::string(pub_topic_converter).c_str()));
00353       }
00354 
00355       if (id == sub->get_topic_id()) {
00356         CORBA::Boolean dont_notify_lost = 0;
00357         sub->remove_associated_publication(this, send, dont_notify_lost);
00358         remove_associated_subscription(sub, dontSend, dont_notify_lost);
00359 
00360         idSeq[count] = sub->get_id();
00361         ++count;
00362       }
00363     }
00364 
00365     if (0 < count) {
00366       idSeq.length(count);
00367 
00368       if (participant_->is_alive() && this->participant_->isOwner()) {
00369         try {
00370           CORBA::Boolean dont_notify_lost = 0;
00371           writer_->remove_associations(idSeq, dont_notify_lost);
00372 
00373         } catch (const CORBA::Exception& ex) {
00374           if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00375             ex._tao_print_exception(
00376               "(%P|%t) ERROR: Exception caught in DCPS_IR_Publication::remove_associations:");
00377           }
00378 
00379           participant_->mark_dead();
00380         }
00381       }
00382     }
00383   }
00384 }
00385 
00386 void DCPS_IR_Publication::disassociate_subscription(OpenDDS::DCPS::RepoId id,
00387                                                     bool reassociate)
00388 {
00389   DCPS_IR_Subscription* sub = 0;
00390   size_t numAssociations = associations_.size();
00391   CORBA::Boolean send = 1;
00392   CORBA::Boolean dontSend = 0;
00393   long count = 0;
00394 
00395   if (0 < numAssociations) {
00396     OpenDDS::DCPS::ReaderIdSeq idSeq(static_cast<CORBA::ULong>(numAssociations));
00397     idSeq.length(static_cast<CORBA::ULong>(numAssociations));
00398 
00399     DCPS_IR_Subscription_Set::ITERATOR iter = associations_.begin();
00400     DCPS_IR_Subscription_Set::ITERATOR end = associations_.end();
00401 
00402     while (iter != end) {
00403       sub = *iter;
00404       ++iter;
00405 
00406       if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00407         OpenDDS::DCPS::RepoIdConverter pub_converter(id_);
00408         OpenDDS::DCPS::RepoIdConverter sub_converter(sub->get_id());
00409         OpenDDS::DCPS::RepoIdConverter pub_sub_converter(id);
00410         ACE_DEBUG((LM_DEBUG,
00411                    ACE_TEXT("(%P|%t) DCPS_IR_Publication::disassociate_subscription: ")
00412                    ACE_TEXT("publication %C testing if subscription %C == %C.\n"),
00413                    std::string(pub_converter).c_str(),
00414                    std::string(sub_converter).c_str(),
00415                    std::string(pub_sub_converter).c_str()));
00416       }
00417 
00418       if (id == sub->get_id()) {
00419         CORBA::Boolean dont_notify_lost = 0;
00420         sub->remove_associated_publication(this, send, dont_notify_lost);
00421         remove_associated_subscription(sub, dontSend, dont_notify_lost);
00422 
00423         idSeq[count] = sub->get_id();
00424         ++count;
00425 
00426         if (reassociate && this->defunct_.insert(sub) != 0) {
00427           OpenDDS::DCPS::RepoIdConverter pub_converter(id_);
00428           OpenDDS::DCPS::RepoIdConverter sub_converter(sub->get_id());
00429           ACE_ERROR((LM_ERROR,
00430                      ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Publication::disassociate_subscription: ")
00431                      ACE_TEXT("publication %C failed to reassociate subscription %C at %x.\n"),
00432                      std::string(pub_converter).c_str(),
00433                      std::string(sub_converter).c_str(),
00434                      sub));
00435         }
00436       }
00437     }
00438 
00439     if (0 < count) {
00440       idSeq.length(count);
00441 
00442       if (participant_->is_alive() && this->participant_->isOwner()) {
00443         try {
00444           CORBA::Boolean dont_notify_lost = 0;
00445           writer_->remove_associations(idSeq, dont_notify_lost);
00446 
00447         } catch (const CORBA::Exception& ex) {
00448           if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00449             ex._tao_print_exception(
00450               "(%P|%t) ERROR: Exception caught in DCPS_IR_Publication::remove_associations:");
00451           }
00452 
00453           participant_->mark_dead();
00454         }
00455       }
00456     }
00457   }
00458 }
00459 
00460 void DCPS_IR_Publication::update_incompatible_qos()
00461 {
00462   if (participant_->is_alive() && this->participant_->isOwner()) {
00463     try {
00464       writer_->update_incompatible_qos(incompatibleQosStatus_);
00465       incompatibleQosStatus_.count_since_last_send = 0;
00466     } catch (const CORBA::Exception& ex) {
00467       if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00468         ex._tao_print_exception(
00469           "(%P|%t) ERROR: Exception caught in DCPS_IR_Publication::update_incompatible_qos:");
00470       }
00471 
00472       participant_->mark_dead();
00473     }
00474   }
00475 }
00476 
00477 CORBA::Boolean DCPS_IR_Publication::is_subscription_ignored(OpenDDS::DCPS::RepoId partId,
00478                                                             OpenDDS::DCPS::RepoId topicId,
00479                                                             OpenDDS::DCPS::RepoId subId)
00480 {
00481   CORBA::Boolean ignored;
00482   ignored = (participant_->is_participant_ignored(partId) ||
00483              participant_->is_topic_ignored(topicId) ||
00484              participant_->is_subscription_ignored(subId));
00485 
00486   return ignored;
00487 }
00488 
00489 DDS::DataWriterQos* DCPS_IR_Publication::get_datawriter_qos()
00490 {
00491   return &qos_;
00492 }
00493 
00494 using OpenDDS::DCPS::operator==;
00495 
00496 void
00497 DCPS_IR_Publication::set_qos(const DDS::DataWriterQos& qos)
00498 {
00499   if (false == (qos == this->qos_)) {
00500     // Check if we should check while we have both values.
00501     bool check = OpenDDS::DCPS::should_check_association_upon_change(qos, this->qos_);
00502 
00503     // Store the new, compatible, value.
00504     this->qos_ = qos;
00505 
00506     if (check) {
00507       // This will remove any newly stale associations.
00508       this->reevaluate_existing_associations();
00509 
00510       // Sleep a while to let remove_association handled by DataWriter
00511       // before add_association. Otherwise, new association will have
00512       // trouble to connect each other.
00513       ACE_OS::sleep(ACE_Time_Value(0, 250000));
00514 
00515       // This will establish any newly made associations.
00516       DCPS_IR_Topic_Description* description
00517       = this->topic_->get_topic_description();
00518       description->reevaluate_associations(this);
00519     }
00520 
00521     this->participant_->get_domain_reference()->publish_publication_bit(this);
00522   }
00523 }
00524 
00525 void
00526 DCPS_IR_Publication::set_qos(const DDS::PublisherQos& qos)
00527 {
00528   if (false == (qos == this->publisherQos_)) {
00529     // Check if we should check while we have both values.
00530     bool check = OpenDDS::DCPS::should_check_association_upon_change(qos, this->publisherQos_);
00531 
00532     // Store the new, compatible, value.
00533     this->publisherQos_ = qos;
00534 
00535     if (check) {
00536       // This will remove any newly stale associations.
00537       this->reevaluate_existing_associations();
00538 
00539       // Sleep a while to let remove_association handled by DataWriter
00540       // before add_association. Otherwise, new association will have
00541       // trouble to connect each other.
00542       ACE_OS::sleep(ACE_Time_Value(0, 250000));
00543 
00544       // This will establish any newly made associations.
00545       DCPS_IR_Topic_Description* description
00546       = this->topic_->get_topic_description();
00547       description->reevaluate_associations(this);
00548     }
00549 
00550     this->participant_->get_domain_reference()->publish_publication_bit(this);
00551   }
00552 }
00553 
00554 bool DCPS_IR_Publication::set_qos(const DDS::DataWriterQos & qos,
00555                                   const DDS::PublisherQos & publisherQos,
00556                                   Update::SpecificQos& specificQos)
00557 {
00558   bool need_evaluate = false;
00559   bool u_dw_qos = !(qos_ == qos);
00560 
00561   if (u_dw_qos) {
00562     if (OpenDDS::DCPS::should_check_association_upon_change(qos_, qos)) {
00563       need_evaluate = true;
00564     }
00565 
00566     qos_ = qos;
00567   }
00568 
00569   bool u_pub_qos = !(publisherQos_ == publisherQos);
00570 
00571   if (u_pub_qos) {
00572     if (OpenDDS::DCPS::should_check_association_upon_change(publisherQos_, publisherQos)) {
00573       need_evaluate = true;
00574     }
00575 
00576     publisherQos_ = publisherQos;
00577   }
00578 
00579   if (need_evaluate) {
00580     // Check if any existing association need be removed first.
00581     this->reevaluate_existing_associations();
00582 
00583     DCPS_IR_Topic_Description* description = this->topic_->get_topic_description();
00584     description->reevaluate_associations(this);
00585   }
00586 
00587   participant_->get_domain_reference()->publish_publication_bit(this);
00588   specificQos = u_dw_qos?  Update::DataWriterQos:
00589                 u_pub_qos? Update::PublisherQos:
00590                 Update::NoQos;
00591 
00592   return true;
00593 }
00594 
00595 DDS::PublisherQos* DCPS_IR_Publication::get_publisher_qos()
00596 {
00597   return &publisherQos_;
00598 }
00599 
00600 OpenDDS::DCPS::TransportLocatorSeq DCPS_IR_Publication::get_transportLocatorSeq() const
00601 {
00602   return info_;
00603 }
00604 
00605 OpenDDS::DCPS::IncompatibleQosStatus* DCPS_IR_Publication::get_incompatibleQosStatus()
00606 {
00607   return &incompatibleQosStatus_;
00608 }
00609 
00610 OpenDDS::DCPS::RepoId DCPS_IR_Publication::get_id()
00611 {
00612   return id_;
00613 }
00614 
00615 OpenDDS::DCPS::RepoId DCPS_IR_Publication::get_topic_id()
00616 {
00617   return topic_->get_id();
00618 }
00619 
00620 OpenDDS::DCPS::RepoId DCPS_IR_Publication::get_participant_id()
00621 {
00622   return participant_->get_id();
00623 }
00624 
00625 DCPS_IR_Topic* DCPS_IR_Publication::get_topic()
00626 {
00627   return topic_;
00628 }
00629 
00630 DCPS_IR_Topic_Description* DCPS_IR_Publication::get_topic_description()
00631 {
00632   return topic_->get_topic_description();
00633 }
00634 
00635 DDS::InstanceHandle_t DCPS_IR_Publication::get_handle()
00636 {
00637   return handle_;
00638 }
00639 
00640 void DCPS_IR_Publication::set_handle(DDS::InstanceHandle_t handle)
00641 {
00642   handle_ = handle;
00643 }
00644 
00645 CORBA::Boolean DCPS_IR_Publication::is_bit()
00646 {
00647   return isBIT_;
00648 }
00649 
00650 void DCPS_IR_Publication::set_bit_status(CORBA::Boolean isBIT)
00651 {
00652   isBIT_ = isBIT;
00653 }
00654 
00655 OpenDDS::DCPS::DataWriterRemote_ptr
00656 DCPS_IR_Publication::writer()
00657 {
00658   return OpenDDS::DCPS::DataWriterRemote::_duplicate(this->writer_.in());
00659 }
00660 
00661 void
00662 DCPS_IR_Publication::reevaluate_defunct_associations()
00663 {
00664   DCPS_IR_Subscription_Set::iterator it(this->defunct_.begin());
00665   while (it != this->defunct_.end()) {
00666     DCPS_IR_Subscription* subscription = *it;
00667     ++it;
00668 
00669     if (reevaluate_association(subscription)) {
00670       this->defunct_.remove(subscription); // no longer defunct
00671 
00672     } else {
00673       OpenDDS::DCPS::RepoIdConverter pub_converter(id_);
00674       OpenDDS::DCPS::RepoIdConverter sub_converter(subscription->get_id());
00675       ACE_ERROR((LM_ERROR,
00676                  ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Publication::reevaluate_defunct_associations: ")
00677                  ACE_TEXT("publication %C failed to reassociate subscription %C at %x.\n"),
00678                  std::string(pub_converter).c_str(),
00679                  std::string(sub_converter).c_str(),
00680                  subscription));
00681     }
00682   }
00683 }
00684 
00685 void DCPS_IR_Publication::reevaluate_existing_associations()
00686 {
00687   DCPS_IR_Subscription* sub = 0;
00688   DCPS_IR_Subscription_Set::ITERATOR iter = associations_.begin();
00689   DCPS_IR_Subscription_Set::ITERATOR end = associations_.end();
00690 
00691   while (iter != end) {
00692     sub = *iter;
00693     ++iter;
00694 
00695     this->reevaluate_association(sub);
00696   }
00697 }
00698 
00699 bool
00700 DCPS_IR_Publication::reevaluate_association(DCPS_IR_Subscription* subscription)
00701 {
00702   int status = this->associations_.find(subscription);
00703 
00704   if (status == 0) {
00705     // verify if they are still compatiable after change
00706 
00707 
00708     if (!OpenDDS::DCPS::compatibleQOS(this->get_incompatibleQosStatus(),
00709                                       subscription->get_incompatibleQosStatus(),
00710                                       this->get_transportLocatorSeq(),
00711                                       subscription->get_transportLocatorSeq(),
00712                                       this->get_datawriter_qos(),
00713                                       subscription->get_datareader_qos(),
00714                                       this->get_publisher_qos(),
00715                                       subscription->get_subscriber_qos())) {
00716       bool sendNotify = true; // inform datawriter
00717       bool notify_lost = true; // invoke listerner callback
00718 
00719       this->remove_associated_subscription(subscription, sendNotify, notify_lost, true);
00720     }
00721 
00722   } else {
00723     DCPS_IR_Topic_Description* description = this->topic_->get_topic_description();
00724     return description->try_associate(this, subscription);
00725   }
00726 
00727   return false;
00728 }
00729 
00730 void
00731 DCPS_IR_Publication::update_expr_params(OpenDDS::DCPS::RepoId readerId,
00732                                         const DDS::StringSeq& params)
00733 {
00734   try {
00735     writer_->update_subscription_params(readerId, params);
00736   } catch (const CORBA::SystemException& ex) {
00737     if (OpenDDS::DCPS::DCPS_debug_level) {
00738       ex._tao_print_exception("(%P|%t) ERROR: Exception caught in "
00739         "DCPS_IR_Publication::update_expr_params:");
00740     }
00741     participant_->mark_dead();
00742   }
00743 }
00744 
00745 std::string
00746 DCPS_IR_Publication::dump_to_string(const std::string& prefix, int depth) const
00747 {
00748   std::string str;
00749 #if !defined (OPENDDS_INFOREPO_REDUCED_FOOTPRINT)
00750   OpenDDS::DCPS::RepoIdConverter local_converter(id_);
00751 
00752   for (int i=0; i < depth; i++)
00753     str += prefix;
00754   std::string indent = str + prefix;
00755   str += "DCPS_IR_Publication[";
00756   str += std::string(local_converter);
00757   str += "]";
00758   if (isBIT_)
00759     str += " (BIT)";
00760   str += "\n";
00761 
00762   str += indent + "Associations [ ";
00763   for (DCPS_IR_Subscription_Set::const_iterator assoc = associations_.begin();
00764        assoc != associations_.end();
00765        assoc++)
00766   {
00767     OpenDDS::DCPS::RepoIdConverter assoc_converter((*assoc)->get_id());
00768     str += std::string(assoc_converter);
00769     str += " ";
00770   }
00771   str += "]\n";
00772 
00773   str += indent + "Defunct Associations [ ";
00774   for (DCPS_IR_Subscription_Set::const_iterator def = defunct_.begin();
00775        def != defunct_.end();
00776        def++)
00777   {
00778     OpenDDS::DCPS::RepoIdConverter def_converter((*def)->get_id());
00779     str += std::string(def_converter);
00780     str += " ";
00781   }
00782   str += "]\n";
00783 #endif // !defined (OPENDDS_INFOREPO_REDUCED_FOOTPRINT)
00784   return str;
00785 }
00786 
00787 
00788 #if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
00789 
00790 template class ACE_Node<DCPS_IR_Subscription*>;
00791 template class ACE_Unbounded_Set<DCPS_IR_Subscription*>;
00792 template class ACE_Unbounded_Set_Iterator<DCPS_IR_Subscription*>;
00793 
00794 #elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
00795 
00796 #pragma instantiate ACE_Node<DCPS_IR_Subscription*>
00797 #pragma instantiate ACE_Unbounded_Set<DCPS_IR_Subscription*>
00798 #pragma instantiate ACE_Unbounded_Set_Iterator<DCPS_IR_Subscription*>
00799 
00800 #endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */

Generated on Fri Feb 12 20:05:20 2016 for OpenDDS by  doxygen 1.4.7