DCPS_IR_Publication.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "DcpsInfo_pch.h"
00009 #include /**/ "DCPS_IR_Publication.h"
00010 
00011 #include /**/ "DCPS_IR_Participant.h"
00012 #include /**/ "DCPS_IR_Topic.h"
00013 #include /**/ "DCPS_IR_Subscription.h"
00014 #include /**/ "DCPS_IR_Domain.h"
00015 #include /**/ "DCPS_IR_Topic_Description.h"
00016 #include /**/ "dds/DCPS/DCPS_Utils.h"
00017 #include /**/ "dds/DdsDcpsInfoUtilsC.h"
00018 #include /**/ "dds/DCPS/RepoIdConverter.h"
00019 #include /**/ "dds/DCPS/Qos_Helper.h"
00020 #include /**/ "tao/debug.h"
00021 
00022 #include /**/ "ace/OS_NS_unistd.h"
00023 
00024 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00025 
00026 DCPS_IR_Publication::DCPS_IR_Publication(const OpenDDS::DCPS::RepoId& id,
00027                                          DCPS_IR_Participant* participant,
00028                                          DCPS_IR_Topic* topic,
00029                                          OpenDDS::DCPS::DataWriterRemote_ptr writer,
00030                                          const DDS::DataWriterQos& qos,
00031                                          const OpenDDS::DCPS::TransportLocatorSeq& info,
00032                                          const DDS::PublisherQos& publisherQos)
00033   : id_(id),
00034     participant_(participant),
00035     topic_(topic),
00036     handle_(0),
00037     isBIT_(0),
00038     qos_(qos),
00039     info_(info),
00040     publisherQos_(publisherQos)
00041 {
00042   writer_ =  OpenDDS::DCPS::DataWriterRemote::_duplicate(writer);
00043 
00044   incompatibleQosStatus_.total_count = 0;
00045   incompatibleQosStatus_.count_since_last_send = 0;
00046 }
00047 
00048 DCPS_IR_Publication::~DCPS_IR_Publication()
00049 {
00050 }
00051 
00052 int DCPS_IR_Publication::add_associated_subscription(DCPS_IR_Subscription* sub,
00053                                                      bool active)
00054 {
00055   // keep track of the association locally
00056   int status = associations_.insert(sub);
00057 
00058   switch (status) {
00059   case 0: {
00060     // inform the datawriter about the association
00061     OpenDDS::DCPS::ReaderAssociation association;
00062     association.readerTransInfo = sub->get_transportLocatorSeq();
00063     association.readerId = sub->get_id();
00064     association.subQos = *(sub->get_subscriber_qos());
00065     association.readerQos = *(sub->get_datareader_qos());
00066     association.filterClassName = sub->get_filter_class_name().c_str();
00067     association.filterExpression = sub->get_filter_expression().c_str();
00068     association.exprParams = sub->get_expr_params();
00069 
00070     if (participant_->is_alive() && this->participant_->isOwner()) {
00071       try {
00072         if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00073           OpenDDS::DCPS::RepoIdConverter pub_converter(id_);
00074           OpenDDS::DCPS::RepoIdConverter sub_converter(sub->get_id());
00075           ACE_DEBUG((LM_DEBUG,
00076                      ACE_TEXT("(%P|%t) DCPS_IR_Publication::add_associated_subscription:")
00077                      ACE_TEXT(" publication %C adding subscription %C.\n"),
00078                      std::string(pub_converter).c_str(),
00079                      std::string(sub_converter).c_str()));
00080         }
00081 
00082         writer_->add_association(id_, association, active);
00083 
00084         if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00085           ACE_DEBUG((LM_DEBUG,
00086                      ACE_TEXT("(%P|%t) DCPS_IR_Publication::add_associated_subscription: ")
00087                      ACE_TEXT("successfully added subscription %x.\n"),
00088                      sub));
00089         }
00090       } catch (const CORBA::Exception& ex) {
00091         ex._tao_print_exception(
00092           "(%P|%t) ERROR: Exception caught in DCPS_IR_Publication::add_associated_subscription:");
00093         participant_->mark_dead();
00094         status = -1;
00095       }
00096     }
00097   }
00098   break;
00099 
00100   case 1: {
00101     OpenDDS::DCPS::RepoIdConverter pub_converter(id_);
00102     OpenDDS::DCPS::RepoIdConverter sub_converter(sub->get_id());
00103     ACE_ERROR((LM_ERROR,
00104                ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Publication::add_associated_subscription: ")
00105                ACE_TEXT("publication %C attempted to re-add subscription %C.\n"),
00106                std::string(pub_converter).c_str(),
00107                std::string(sub_converter).c_str()));
00108   }
00109   break;
00110 
00111   case -1: {
00112     OpenDDS::DCPS::RepoIdConverter pub_converter(id_);
00113     OpenDDS::DCPS::RepoIdConverter sub_converter(sub->get_id());
00114     ACE_ERROR((LM_ERROR,
00115                ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Publication::add_associated_subscription: ")
00116                ACE_TEXT("publication %C failed to add subscription %C.\n"),
00117                std::string(pub_converter).c_str(),
00118                std::string(sub_converter).c_str()));
00119   }
00120   };
00121 
00122   return status;
00123 }
00124 
00125 void
00126 DCPS_IR_Publication::association_complete(const OpenDDS::DCPS::RepoId& remote)
00127 {
00128   typedef DCPS_IR_Subscription_Set::ITERATOR iter_t;
00129   for (iter_t iter = associations_.begin(); iter != associations_.end(); ++iter) {
00130     if ((*iter)->get_id() == remote) {
00131       (*iter)->call_association_complete(get_id());
00132     }
00133   }
00134 }
00135 
00136 void
00137 DCPS_IR_Publication::call_association_complete(const OpenDDS::DCPS::RepoId& remote)
00138 {
00139   try {
00140     writer_->association_complete(remote);
00141   } catch (const CORBA::Exception& ex) {
00142     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00143       ex._tao_print_exception(
00144         "(%P|%t) ERROR: Exception caught in DCPS_IR_Publication::call_association_complete:");
00145     }
00146     participant_->mark_dead();
00147   }
00148 }
00149 
00150 int DCPS_IR_Publication::remove_associated_subscription(DCPS_IR_Subscription* sub,
00151                                                         CORBA::Boolean sendNotify,
00152                                                         CORBA::Boolean notify_lost,
00153                                                         bool notify_both_side)
00154 {
00155   bool marked_dead = false;
00156 
00157   if (sendNotify) {
00158     if (participant_->is_alive() && this->participant_->isOwner()) {
00159       try {
00160         if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00161           OpenDDS::DCPS::RepoIdConverter pub_converter(id_);
00162           OpenDDS::DCPS::RepoIdConverter sub_converter(sub->get_id());
00163           ACE_DEBUG((LM_DEBUG,
00164                      ACE_TEXT("(%P|%t) DCPS_IR_Publication::remove_associated_subscription:")
00165                      ACE_TEXT(" calling pub %C with sub %C\n"),
00166                      OPENDDS_STRING(pub_converter).c_str(),
00167                      OPENDDS_STRING(sub_converter).c_str()));
00168         }
00169 
00170         OpenDDS::DCPS::ReaderIdSeq idSeq(1);
00171         idSeq.length(1);
00172         idSeq[0] = sub->get_id();
00173 
00174         writer_->remove_associations(idSeq, notify_lost);
00175 
00176         if (notify_both_side) {
00177           sub->remove_associated_publication(this, sendNotify, notify_lost);
00178         }
00179 
00180       } catch (const CORBA::Exception& ex) {
00181         if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00182           ex._tao_print_exception(
00183             "(%P|%t) ERROR: Exception caught in DCPS_IR_Publication::remove_associated_subscription:");
00184         }
00185 
00186         participant_->mark_dead();
00187         marked_dead = true;
00188       }
00189     }
00190   }
00191 
00192   int status = associations_.remove(sub);
00193 
00194   if (0 == status) {
00195     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00196       OpenDDS::DCPS::RepoIdConverter pub_converter(id_);
00197       OpenDDS::DCPS::RepoIdConverter sub_converter(sub->get_id());
00198       ACE_DEBUG((LM_DEBUG,
00199                  ACE_TEXT("(%P|%t) DCPS_IR_Publication::remove_associated_subscription: ")
00200                  ACE_TEXT("publication %C removed subscription %C at %x.\n"),
00201                  std::string(pub_converter).c_str(),
00202                  std::string(sub_converter).c_str(),
00203                  sub));
00204     }
00205 
00206   } else {
00207     OpenDDS::DCPS::RepoIdConverter pub_converter(id_);
00208     OpenDDS::DCPS::RepoIdConverter sub_converter(sub->get_id());
00209     ACE_ERROR((LM_ERROR,
00210                ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Publication::remove_associated_subscription: ")
00211                ACE_TEXT("publication %C failed to remove subscription %C at %x.\n"),
00212                std::string(pub_converter).c_str(),
00213                std::string(sub_converter).c_str(),
00214                sub));
00215   } // if (0 == status)
00216 
00217   if (marked_dead) {
00218     return -1;
00219 
00220   } else {
00221     return status;
00222   }
00223 }
00224 
00225 int DCPS_IR_Publication::remove_associations(CORBA::Boolean notify_lost)
00226 {
00227   int status = 0;
00228   DCPS_IR_Subscription* sub = 0;
00229   size_t numAssociations = associations_.size();
00230   CORBA::Boolean dontSend = 0;
00231   CORBA::Boolean send = 1;
00232 
00233   if (0 < numAssociations) {
00234     DCPS_IR_Subscription_Set::ITERATOR iter = associations_.begin();
00235     DCPS_IR_Subscription_Set::ITERATOR end = associations_.end();
00236 
00237     while (iter != end) {
00238       sub = *iter;
00239       ++iter;
00240       sub->remove_associated_publication(this, send, notify_lost);
00241       remove_associated_subscription(sub, dontSend, notify_lost);
00242     }
00243   }
00244   this->defunct_.reset();
00245 
00246   return status;
00247 }
00248 
00249 void DCPS_IR_Publication::disassociate_participant(OpenDDS::DCPS::RepoId id,
00250                                                    bool reassociate)
00251 {
00252   DCPS_IR_Subscription* sub = 0;
00253   size_t numAssociations = associations_.size();
00254   CORBA::Boolean send = 1;
00255   CORBA::Boolean dontSend = 0;
00256   long count = 0;
00257 
00258   if (0 < numAssociations) {
00259     OpenDDS::DCPS::ReaderIdSeq idSeq(static_cast<CORBA::ULong>(numAssociations));
00260     idSeq.length(static_cast<CORBA::ULong>(numAssociations));
00261 
00262     DCPS_IR_Subscription_Set::ITERATOR iter = associations_.begin();
00263     DCPS_IR_Subscription_Set::ITERATOR end = associations_.end();
00264 
00265     while (iter != end) {
00266       sub = *iter;
00267       ++iter;
00268 
00269       if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00270         OpenDDS::DCPS::RepoIdConverter pub_converter(id_);
00271         OpenDDS::DCPS::RepoIdConverter sub_converter(sub->get_id());
00272         OpenDDS::DCPS::RepoIdConverter pub_part_converter(id);
00273         OpenDDS::DCPS::RepoIdConverter sub_part_converter(sub->get_participant_id());
00274         ACE_DEBUG((LM_DEBUG,
00275                    ACE_TEXT("(%P|%t) DCPS_IR_Publication::disassociate_participant: ")
00276                    ACE_TEXT("publication %C testing if subscription %C particpant %C == %C.\n"),
00277                    std::string(pub_converter).c_str(),
00278                    std::string(sub_converter).c_str(),
00279                    std::string(sub_part_converter).c_str(),
00280                    std::string(pub_part_converter).c_str()));
00281       }
00282 
00283       if (id == sub->get_participant_id()) {
00284         CORBA::Boolean dont_notify_lost = 0;
00285         sub->remove_associated_publication(this, send, dont_notify_lost);
00286         remove_associated_subscription(sub, dontSend, dont_notify_lost);
00287 
00288         idSeq[count] = sub->get_id();
00289         ++count;
00290 
00291         if (reassociate && this->defunct_.insert(sub) != 0) {
00292           OpenDDS::DCPS::RepoIdConverter pub_converter(id_);
00293           OpenDDS::DCPS::RepoIdConverter sub_converter(sub->get_id());
00294           ACE_ERROR((LM_ERROR,
00295                      ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Publication::disassociate_participant: ")
00296                      ACE_TEXT("publication %C failed to reassociate subscription %C at %x.\n"),
00297                      std::string(pub_converter).c_str(),
00298                      std::string(sub_converter).c_str(),
00299                      sub));
00300         }
00301       }
00302     }
00303 
00304     if (0 < count) {
00305       idSeq.length(count);
00306 
00307       if (participant_->is_alive() && this->participant_->isOwner()) {
00308         try {
00309           CORBA::Boolean dont_notify_lost = 0;
00310           writer_->remove_associations(idSeq, dont_notify_lost);
00311 
00312         } catch (const CORBA::Exception& ex) {
00313           if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00314             ex._tao_print_exception(
00315               "(%P|%t) ERROR: Exception caught in DCPS_IR_Publication::disassociate_participant:");
00316           }
00317 
00318           participant_->mark_dead();
00319         }
00320       }
00321     }
00322   }
00323 }
00324 
00325 void DCPS_IR_Publication::disassociate_topic(OpenDDS::DCPS::RepoId id)
00326 {
00327   DCPS_IR_Subscription* sub = 0;
00328   size_t numAssociations = associations_.size();
00329   CORBA::Boolean send = 1;
00330   CORBA::Boolean dontSend = 0;
00331   long count = 0;
00332 
00333   if (0 < numAssociations) {
00334     OpenDDS::DCPS::ReaderIdSeq idSeq(static_cast<CORBA::ULong>(numAssociations));
00335     idSeq.length(static_cast<CORBA::ULong>(numAssociations));
00336 
00337     DCPS_IR_Subscription_Set::ITERATOR iter = associations_.begin();
00338     DCPS_IR_Subscription_Set::ITERATOR end = associations_.end();
00339 
00340     while (iter != end) {
00341       sub = *iter;
00342       ++iter;
00343 
00344       if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00345         OpenDDS::DCPS::RepoIdConverter pub_converter(id_);
00346         OpenDDS::DCPS::RepoIdConverter sub_converter(sub->get_id());
00347         OpenDDS::DCPS::RepoIdConverter pub_topic_converter(id);
00348         OpenDDS::DCPS::RepoIdConverter sub_topic_converter(sub->get_topic_id());
00349         ACE_DEBUG((LM_DEBUG,
00350                    ACE_TEXT("(%P|%t) DCPS_IR_Publication::disassociate_topic: ")
00351                    ACE_TEXT("publication %C testing if subscription %C topic %C == %C.\n"),
00352                    std::string(pub_converter).c_str(),
00353                    std::string(sub_converter).c_str(),
00354                    std::string(sub_topic_converter).c_str(),
00355                    std::string(pub_topic_converter).c_str()));
00356       }
00357 
00358       if (id == sub->get_topic_id()) {
00359         CORBA::Boolean dont_notify_lost = 0;
00360         sub->remove_associated_publication(this, send, dont_notify_lost);
00361         remove_associated_subscription(sub, dontSend, dont_notify_lost);
00362 
00363         idSeq[count] = sub->get_id();
00364         ++count;
00365       }
00366     }
00367 
00368     if (0 < count) {
00369       idSeq.length(count);
00370 
00371       if (participant_->is_alive() && this->participant_->isOwner()) {
00372         try {
00373           CORBA::Boolean dont_notify_lost = 0;
00374           writer_->remove_associations(idSeq, dont_notify_lost);
00375 
00376         } catch (const CORBA::Exception& ex) {
00377           if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00378             ex._tao_print_exception(
00379               "(%P|%t) ERROR: Exception caught in DCPS_IR_Publication::remove_associations:");
00380           }
00381 
00382           participant_->mark_dead();
00383         }
00384       }
00385     }
00386   }
00387 }
00388 
00389 void DCPS_IR_Publication::disassociate_subscription(OpenDDS::DCPS::RepoId id,
00390                                                     bool reassociate)
00391 {
00392   DCPS_IR_Subscription* sub = 0;
00393   size_t numAssociations = associations_.size();
00394   CORBA::Boolean send = 1;
00395   CORBA::Boolean dontSend = 0;
00396   long count = 0;
00397 
00398   if (0 < numAssociations) {
00399     OpenDDS::DCPS::ReaderIdSeq idSeq(static_cast<CORBA::ULong>(numAssociations));
00400     idSeq.length(static_cast<CORBA::ULong>(numAssociations));
00401 
00402     DCPS_IR_Subscription_Set::ITERATOR iter = associations_.begin();
00403     DCPS_IR_Subscription_Set::ITERATOR end = associations_.end();
00404 
00405     while (iter != end) {
00406       sub = *iter;
00407       ++iter;
00408 
00409       if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00410         OpenDDS::DCPS::RepoIdConverter pub_converter(id_);
00411         OpenDDS::DCPS::RepoIdConverter sub_converter(sub->get_id());
00412         OpenDDS::DCPS::RepoIdConverter pub_sub_converter(id);
00413         ACE_DEBUG((LM_DEBUG,
00414                    ACE_TEXT("(%P|%t) DCPS_IR_Publication::disassociate_subscription: ")
00415                    ACE_TEXT("publication %C testing if subscription %C == %C.\n"),
00416                    std::string(pub_converter).c_str(),
00417                    std::string(sub_converter).c_str(),
00418                    std::string(pub_sub_converter).c_str()));
00419       }
00420 
00421       if (id == sub->get_id()) {
00422         CORBA::Boolean dont_notify_lost = 0;
00423         sub->remove_associated_publication(this, send, dont_notify_lost);
00424         remove_associated_subscription(sub, dontSend, dont_notify_lost);
00425 
00426         idSeq[count] = sub->get_id();
00427         ++count;
00428 
00429         if (reassociate && this->defunct_.insert(sub) != 0) {
00430           OpenDDS::DCPS::RepoIdConverter pub_converter(id_);
00431           OpenDDS::DCPS::RepoIdConverter sub_converter(sub->get_id());
00432           ACE_ERROR((LM_ERROR,
00433                      ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Publication::disassociate_subscription: ")
00434                      ACE_TEXT("publication %C failed to reassociate subscription %C at %x.\n"),
00435                      std::string(pub_converter).c_str(),
00436                      std::string(sub_converter).c_str(),
00437                      sub));
00438         }
00439       }
00440     }
00441 
00442     if (0 < count) {
00443       idSeq.length(count);
00444 
00445       if (participant_->is_alive() && this->participant_->isOwner()) {
00446         try {
00447           CORBA::Boolean dont_notify_lost = 0;
00448           writer_->remove_associations(idSeq, dont_notify_lost);
00449 
00450         } catch (const CORBA::Exception& ex) {
00451           if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00452             ex._tao_print_exception(
00453               "(%P|%t) ERROR: Exception caught in DCPS_IR_Publication::remove_associations:");
00454           }
00455 
00456           participant_->mark_dead();
00457         }
00458       }
00459     }
00460   }
00461 }
00462 
00463 void DCPS_IR_Publication::update_incompatible_qos()
00464 {
00465   if (participant_->is_alive() && this->participant_->isOwner()) {
00466     try {
00467       writer_->update_incompatible_qos(incompatibleQosStatus_);
00468       incompatibleQosStatus_.count_since_last_send = 0;
00469     } catch (const CORBA::Exception& ex) {
00470       if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00471         ex._tao_print_exception(
00472           "(%P|%t) ERROR: Exception caught in DCPS_IR_Publication::update_incompatible_qos:");
00473       }
00474 
00475       participant_->mark_dead();
00476     }
00477   }
00478 }
00479 
00480 CORBA::Boolean DCPS_IR_Publication::is_subscription_ignored(OpenDDS::DCPS::RepoId partId,
00481                                                             OpenDDS::DCPS::RepoId topicId,
00482                                                             OpenDDS::DCPS::RepoId subId)
00483 {
00484   CORBA::Boolean ignored;
00485   ignored = (participant_->is_participant_ignored(partId) ||
00486              participant_->is_topic_ignored(topicId) ||
00487              participant_->is_subscription_ignored(subId));
00488 
00489   return ignored;
00490 }
00491 
00492 DDS::DataWriterQos* DCPS_IR_Publication::get_datawriter_qos()
00493 {
00494   return &qos_;
00495 }
00496 
00497 using OpenDDS::DCPS::operator==;
00498 
00499 void
00500 DCPS_IR_Publication::set_qos(const DDS::DataWriterQos& qos)
00501 {
00502   if (false == (qos == this->qos_)) {
00503     // Check if we should check while we have both values.
00504     bool check = OpenDDS::DCPS::should_check_association_upon_change(qos, this->qos_);
00505 
00506     // Store the new, compatible, value.
00507     this->qos_ = qos;
00508 
00509     if (check) {
00510       // This will remove any newly stale associations.
00511       this->reevaluate_existing_associations();
00512 
00513       // Sleep a while to let remove_association handled by DataWriter
00514       // before add_association. Otherwise, new association will have
00515       // trouble to connect each other.
00516       ACE_OS::sleep(ACE_Time_Value(0, 250000));
00517 
00518       // This will establish any newly made associations.
00519       DCPS_IR_Topic_Description* description
00520       = this->topic_->get_topic_description();
00521       description->reevaluate_associations(this);
00522     }
00523 
00524     this->participant_->get_domain_reference()->publish_publication_bit(this);
00525   }
00526 }
00527 
00528 void
00529 DCPS_IR_Publication::set_qos(const DDS::PublisherQos& qos)
00530 {
00531   if (false == (qos == this->publisherQos_)) {
00532     // Check if we should check while we have both values.
00533     bool check = OpenDDS::DCPS::should_check_association_upon_change(qos, this->publisherQos_);
00534 
00535     // Store the new, compatible, value.
00536     this->publisherQos_ = qos;
00537 
00538     if (check) {
00539       // This will remove any newly stale associations.
00540       this->reevaluate_existing_associations();
00541 
00542       // Sleep a while to let remove_association handled by DataWriter
00543       // before add_association. Otherwise, new association will have
00544       // trouble to connect each other.
00545       ACE_OS::sleep(ACE_Time_Value(0, 250000));
00546 
00547       // This will establish any newly made associations.
00548       DCPS_IR_Topic_Description* description
00549       = this->topic_->get_topic_description();
00550       description->reevaluate_associations(this);
00551     }
00552 
00553     this->participant_->get_domain_reference()->publish_publication_bit(this);
00554   }
00555 }
00556 
00557 bool DCPS_IR_Publication::set_qos(const DDS::DataWriterQos & qos,
00558                                   const DDS::PublisherQos & publisherQos,
00559                                   Update::SpecificQos& specificQos)
00560 {
00561   bool need_evaluate = false;
00562   bool u_dw_qos = !(qos_ == qos);
00563 
00564   if (u_dw_qos) {
00565     if (OpenDDS::DCPS::should_check_association_upon_change(qos_, qos)) {
00566       need_evaluate = true;
00567     }
00568 
00569     qos_ = qos;
00570   }
00571 
00572   bool u_pub_qos = !(publisherQos_ == publisherQos);
00573 
00574   if (u_pub_qos) {
00575     if (OpenDDS::DCPS::should_check_association_upon_change(publisherQos_, publisherQos)) {
00576       need_evaluate = true;
00577     }
00578 
00579     publisherQos_ = publisherQos;
00580   }
00581 
00582   if (need_evaluate) {
00583     // Check if any existing association need be removed first.
00584     this->reevaluate_existing_associations();
00585 
00586     DCPS_IR_Topic_Description* description = this->topic_->get_topic_description();
00587     description->reevaluate_associations(this);
00588   }
00589 
00590   participant_->get_domain_reference()->publish_publication_bit(this);
00591   specificQos = u_dw_qos?  Update::DataWriterQos:
00592                 u_pub_qos? Update::PublisherQos:
00593                 Update::NoQos;
00594 
00595   return true;
00596 }
00597 
00598 DDS::PublisherQos* DCPS_IR_Publication::get_publisher_qos()
00599 {
00600   return &publisherQos_;
00601 }
00602 
00603 OpenDDS::DCPS::TransportLocatorSeq DCPS_IR_Publication::get_transportLocatorSeq() const
00604 {
00605   return info_;
00606 }
00607 
00608 OpenDDS::DCPS::IncompatibleQosStatus* DCPS_IR_Publication::get_incompatibleQosStatus()
00609 {
00610   return &incompatibleQosStatus_;
00611 }
00612 
00613 OpenDDS::DCPS::RepoId DCPS_IR_Publication::get_id()
00614 {
00615   return id_;
00616 }
00617 
00618 OpenDDS::DCPS::RepoId DCPS_IR_Publication::get_topic_id()
00619 {
00620   return topic_->get_id();
00621 }
00622 
00623 OpenDDS::DCPS::RepoId DCPS_IR_Publication::get_participant_id()
00624 {
00625   return participant_->get_id();
00626 }
00627 
00628 DCPS_IR_Topic* DCPS_IR_Publication::get_topic()
00629 {
00630   return topic_;
00631 }
00632 
00633 DCPS_IR_Topic_Description* DCPS_IR_Publication::get_topic_description()
00634 {
00635   return topic_->get_topic_description();
00636 }
00637 
00638 DDS::InstanceHandle_t DCPS_IR_Publication::get_handle()
00639 {
00640   return handle_;
00641 }
00642 
00643 void DCPS_IR_Publication::set_handle(DDS::InstanceHandle_t handle)
00644 {
00645   handle_ = handle;
00646 }
00647 
00648 CORBA::Boolean DCPS_IR_Publication::is_bit()
00649 {
00650   return isBIT_;
00651 }
00652 
00653 void DCPS_IR_Publication::set_bit_status(CORBA::Boolean isBIT)
00654 {
00655   isBIT_ = isBIT;
00656 }
00657 
00658 OpenDDS::DCPS::DataWriterRemote_ptr
00659 DCPS_IR_Publication::writer()
00660 {
00661   return OpenDDS::DCPS::DataWriterRemote::_duplicate(this->writer_.in());
00662 }
00663 
00664 void
00665 DCPS_IR_Publication::reevaluate_defunct_associations()
00666 {
00667   DCPS_IR_Subscription_Set::iterator it(this->defunct_.begin());
00668   while (it != this->defunct_.end()) {
00669     DCPS_IR_Subscription* subscription = *it;
00670     ++it;
00671 
00672     if (reevaluate_association(subscription)) {
00673       this->defunct_.remove(subscription); // no longer defunct
00674 
00675     } else {
00676       OpenDDS::DCPS::RepoIdConverter pub_converter(id_);
00677       OpenDDS::DCPS::RepoIdConverter sub_converter(subscription->get_id());
00678       ACE_ERROR((LM_ERROR,
00679                  ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Publication::reevaluate_defunct_associations: ")
00680                  ACE_TEXT("publication %C failed to reassociate subscription %C at %x.\n"),
00681                  std::string(pub_converter).c_str(),
00682                  std::string(sub_converter).c_str(),
00683                  subscription));
00684     }
00685   }
00686 }
00687 
00688 void DCPS_IR_Publication::reevaluate_existing_associations()
00689 {
00690   DCPS_IR_Subscription* sub = 0;
00691   DCPS_IR_Subscription_Set::ITERATOR iter = associations_.begin();
00692   DCPS_IR_Subscription_Set::ITERATOR end = associations_.end();
00693 
00694   while (iter != end) {
00695     sub = *iter;
00696     ++iter;
00697 
00698     this->reevaluate_association(sub);
00699   }
00700 }
00701 
00702 bool
00703 DCPS_IR_Publication::reevaluate_association(DCPS_IR_Subscription* subscription)
00704 {
00705   int status = this->associations_.find(subscription);
00706 
00707   if (status == 0) {
00708     // verify if they are still compatiable after change
00709 
00710 
00711     if (!OpenDDS::DCPS::compatibleQOS(this->get_incompatibleQosStatus(),
00712                                       subscription->get_incompatibleQosStatus(),
00713                                       this->get_transportLocatorSeq(),
00714                                       subscription->get_transportLocatorSeq(),
00715                                       this->get_datawriter_qos(),
00716                                       subscription->get_datareader_qos(),
00717                                       this->get_publisher_qos(),
00718                                       subscription->get_subscriber_qos())) {
00719       bool sendNotify = true; // inform datawriter
00720       bool notify_lost = true; // invoke listerner callback
00721 
00722       this->remove_associated_subscription(subscription, sendNotify, notify_lost, true);
00723     }
00724 
00725   } else {
00726     DCPS_IR_Topic_Description* description = this->topic_->get_topic_description();
00727     return description->try_associate(this, subscription);
00728   }
00729 
00730   return false;
00731 }
00732 
00733 void
00734 DCPS_IR_Publication::update_expr_params(OpenDDS::DCPS::RepoId readerId,
00735                                         const DDS::StringSeq& params)
00736 {
00737   try {
00738     writer_->update_subscription_params(readerId, params);
00739   } catch (const CORBA::SystemException& ex) {
00740     if (OpenDDS::DCPS::DCPS_debug_level) {
00741       ex._tao_print_exception("(%P|%t) ERROR: Exception caught in "
00742         "DCPS_IR_Publication::update_expr_params:");
00743     }
00744     participant_->mark_dead();
00745   }
00746 }
00747 
00748 std::string
00749 DCPS_IR_Publication::dump_to_string(const std::string& prefix, int depth) const
00750 {
00751   std::string str;
00752 #if !defined (OPENDDS_INFOREPO_REDUCED_FOOTPRINT)
00753   OpenDDS::DCPS::RepoIdConverter local_converter(id_);
00754 
00755   for (int i=0; i < depth; i++)
00756     str += prefix;
00757   std::string indent = str + prefix;
00758   str += "DCPS_IR_Publication[";
00759   str += std::string(local_converter);
00760   str += "]";
00761   if (isBIT_)
00762     str += " (BIT)";
00763   str += "\n";
00764 
00765   str += indent + "Associations [ ";
00766   for (DCPS_IR_Subscription_Set::const_iterator assoc = associations_.begin();
00767        assoc != associations_.end();
00768        assoc++)
00769   {
00770     OpenDDS::DCPS::RepoIdConverter assoc_converter((*assoc)->get_id());
00771     str += std::string(assoc_converter);
00772     str += " ";
00773   }
00774   str += "]\n";
00775 
00776   str += indent + "Defunct Associations [ ";
00777   for (DCPS_IR_Subscription_Set::const_iterator def = defunct_.begin();
00778        def != defunct_.end();
00779        def++)
00780   {
00781     OpenDDS::DCPS::RepoIdConverter def_converter((*def)->get_id());
00782     str += std::string(def_converter);
00783     str += " ";
00784   }
00785   str += "]\n";
00786 #endif // !defined (OPENDDS_INFOREPO_REDUCED_FOOTPRINT)
00787   return str;
00788 }
00789 
00790 OPENDDS_END_VERSIONED_NAMESPACE_DECL
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1