DCPS_IR_Topic_Description.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "DcpsInfo_pch.h"
00009 #include /**/ "DCPS_IR_Topic_Description.h"
00010 
00011 #include /**/ "DCPS_IR_Subscription.h"
00012 #include /**/ "DCPS_IR_Publication.h"
00013 
00014 #include /**/ "DCPS_IR_Topic.h"
00015 #include /**/ "DCPS_IR_Domain.h"
00016 
00017 #include /**/ "dds/DCPS/DCPS_Utils.h"
00018 
00019 #include /**/ "tao/debug.h"
00020 
00021 #include /**/ "dds/DCPS/RepoIdConverter.h"
00022 
00023 DCPS_IR_Topic_Description::DCPS_IR_Topic_Description(DCPS_IR_Domain* domain,
00024                                                      const char* name,
00025                                                      const char* dataTypeName)
00026   : name_(name),
00027     dataTypeName_(dataTypeName),
00028     domain_(domain)
00029 {
00030 }
00031 
00032 DCPS_IR_Topic_Description::~DCPS_IR_Topic_Description()
00033 {
00034   // check for any subscriptions still referencing this description.
00035   if (0 != subscriptionRefs_.size()) {
00036     DCPS_IR_Subscription* subscription = 0;
00037     DCPS_IR_Subscription_Set::ITERATOR iter = subscriptionRefs_.begin();
00038     DCPS_IR_Subscription_Set::ITERATOR end = subscriptionRefs_.end();
00039 
00040     ACE_ERROR((LM_ERROR,
00041                ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Topic_Description::~DCPS_IR_Topic_Description: ")
00042                ACE_TEXT("topic description name %C data type name %C has retained subscriptions.\n"),
00043                name_.c_str(),
00044                dataTypeName_.c_str()));
00045 
00046     while (iter != end) {
00047       subscription = *iter;
00048       ++iter;
00049 
00050       OpenDDS::DCPS::RepoIdConverter converter(subscription->get_id());
00051       ACE_ERROR((LM_ERROR,
00052                  ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Topic_Description::~DCPS_IR_Topic_Description: ")
00053                  ACE_TEXT("topic description %C retains subscription %C.\n"),
00054                  this->name_.c_str(),
00055                  std::string(converter).c_str()));
00056     }
00057   }
00058 
00059   if (0 != topics_.size()) {
00060     DCPS_IR_Topic* topic = 0;
00061     DCPS_IR_Topic_Set::ITERATOR iter = topics_.begin();
00062     DCPS_IR_Topic_Set::ITERATOR end = topics_.end();
00063 
00064     ACE_ERROR((LM_ERROR,
00065                ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Topic_Description::~DCPS_IR_Topic_Description: ")
00066                ACE_TEXT("topic description name %C data type name %C has retained topics.\n"),
00067                name_.c_str(),
00068                dataTypeName_.c_str()));
00069 
00070     while (iter != end) {
00071       topic = *iter;
00072       ++iter;
00073 
00074       OpenDDS::DCPS::RepoIdConverter converter(topic->get_id());
00075       ACE_ERROR((LM_ERROR,
00076                  ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Topic_Description::~DCPS_IR_Topic_Description: ")
00077                  ACE_TEXT("topic description %C retains topic %C.\n"),
00078                  this->name_.c_str(),
00079                  std::string(converter).c_str()));
00080     }
00081   }
00082 }
00083 
00084 int DCPS_IR_Topic_Description::add_subscription_reference(DCPS_IR_Subscription* subscription
00085                                                           , bool associate)
00086 {
00087   int status = subscriptionRefs_.insert(subscription);
00088 
00089   switch (status) {
00090   case 0:
00091 
00092     // Publish the BIT information
00093     domain_->publish_subscription_bit(subscription);
00094 
00095     if (associate) {
00096       try_associate_subscription(subscription);
00097       // Do not check incompatible qos here.  The check is done
00098       // in the DCPS_IR_Topic_Description::try_associate_subscription method
00099     }
00100 
00101     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00102       OpenDDS::DCPS::RepoIdConverter converter(subscription->get_id());
00103       ACE_DEBUG((LM_DEBUG,
00104                  ACE_TEXT("(%P|%t) DCPS_IR_Topic_Description::add_subscription_reference: ")
00105                  ACE_TEXT("topic description %C added subscription %C at %x\n"),
00106                  this->name_.c_str(),
00107                  std::string(converter).c_str(),
00108                  subscription));
00109     }
00110 
00111     break;
00112 
00113   case 1: {
00114     OpenDDS::DCPS::RepoIdConverter converter(subscription->get_id());
00115     ACE_ERROR((LM_ERROR,
00116                ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Topic_Description::add_subscription_reference: ")
00117                ACE_TEXT("topic description %C attempt to re-add subscription %C.\n"),
00118                this->name_.c_str(),
00119                std::string(converter).c_str()));
00120   }
00121   break;
00122 
00123   case -1: {
00124     OpenDDS::DCPS::RepoIdConverter converter(subscription->get_id());
00125     ACE_ERROR((LM_ERROR,
00126                ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Topic_Description::add_subscription_reference: ")
00127                ACE_TEXT("topic description %C failed to add subscription %C.\n"),
00128                this->name_.c_str(),
00129                std::string(converter).c_str()));
00130   }
00131   };
00132 
00133   return status;
00134 }
00135 
00136 int DCPS_IR_Topic_Description::remove_subscription_reference(DCPS_IR_Subscription* subscription)
00137 {
00138   int status = subscriptionRefs_.remove(subscription);
00139 
00140   if (0 == status) {
00141     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00142       OpenDDS::DCPS::RepoIdConverter converter(subscription->get_id());
00143       ACE_DEBUG((LM_DEBUG,
00144                  ACE_TEXT("(%P|%t) DCPS_IR_Topic_Description::remove_subscription_reference: ")
00145                  ACE_TEXT("topic description %C removed subscription %C.\n"),
00146                  this->name_.c_str(),
00147                  std::string(converter).c_str()));
00148     }
00149 
00150   } else {
00151     OpenDDS::DCPS::RepoIdConverter converter(subscription->get_id());
00152     ACE_ERROR((LM_ERROR,
00153                ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Topic_Description::remove_subscription_reference: ")
00154                ACE_TEXT("topic description %C failed to remove subscription %C.\n"),
00155                this->name_.c_str(),
00156                std::string(converter).c_str()));
00157   } // if (0 == status)
00158 
00159   return status;
00160 }
00161 
00162 int DCPS_IR_Topic_Description::add_topic(DCPS_IR_Topic* topic)
00163 {
00164   int status = topics_.insert(topic);
00165 
00166   switch (status) {
00167   case 0:
00168 
00169     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00170       OpenDDS::DCPS::RepoIdConverter converter(topic->get_id());
00171       ACE_DEBUG((LM_DEBUG,
00172                  ACE_TEXT("(%P|%t) DCPS_IR_Topic_Description::add_topic: ")
00173                  ACE_TEXT("topic description %C added topic %C at %x.\n"),
00174                  this->name_.c_str(),
00175                  std::string(converter).c_str(),
00176                  topic));
00177     }
00178 
00179     break;
00180   case 1:
00181 
00182     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00183       OpenDDS::DCPS::RepoIdConverter converter(topic->get_id());
00184       ACE_DEBUG((LM_WARNING,
00185                  ACE_TEXT("(%P|%t) WARNING: DCPS_IR_Topic_Description::add_topic: ")
00186                  ACE_TEXT("topic description %C attempt to re-add topic %C.\n"),
00187                  this->name_.c_str(),
00188                  std::string(converter).c_str()));
00189     }
00190 
00191     break;
00192   case -1: {
00193     OpenDDS::DCPS::RepoIdConverter converter(topic->get_id());
00194     ACE_ERROR((LM_ERROR,
00195                ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Topic_Description::add_topic: ")
00196                ACE_TEXT("topic description %C failed to add topic %C.\n"),
00197                this->name_.c_str(),
00198                std::string(converter).c_str()));
00199   }
00200   break;
00201   };
00202 
00203   return status;
00204 }
00205 
00206 int DCPS_IR_Topic_Description::remove_topic(DCPS_IR_Topic* topic)
00207 {
00208   int status = topics_.remove(topic);
00209 
00210   if (0 == status) {
00211     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00212       OpenDDS::DCPS::RepoIdConverter converter(topic->get_id());
00213       ACE_DEBUG((LM_DEBUG,
00214                  ACE_TEXT("(%P|%t) DCPS_IR_Topic_Description::remove_topic: ")
00215                  ACE_TEXT("topic description %C removed topic %C.\n"),
00216                  this->name_.c_str(),
00217                  std::string(converter).c_str()));
00218     }
00219 
00220   } else {
00221     OpenDDS::DCPS::RepoIdConverter converter(topic->get_id());
00222     ACE_ERROR((LM_ERROR,
00223                ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Topic_Description::remove_topic: ")
00224                ACE_TEXT("topic description failed to remove topic %C.\n"),
00225                this->name_.c_str(),
00226                std::string(converter).c_str()));
00227   }
00228 
00229   return status;
00230 }
00231 
00232 DCPS_IR_Topic* DCPS_IR_Topic_Description::get_first_topic()
00233 {
00234   DCPS_IR_Topic* topic = 0;
00235 
00236   if (0 < topics_.size()) {
00237     DCPS_IR_Topic_Set::ITERATOR iter = topics_.begin();
00238     topic = *iter;
00239 
00240     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00241       OpenDDS::DCPS::RepoIdConverter converter(topic->get_id());
00242       ACE_DEBUG((LM_DEBUG,
00243                  ACE_TEXT("(%P|%t) DCPS_IR_Topic_Description::get_first_topic: ")
00244                  ACE_TEXT("topic description %C first topic %C.\n"),
00245                  this->name_.c_str(),
00246                  std::string(converter).c_str()));
00247     }
00248   }
00249 
00250   return topic;
00251 }
00252 
00253 void DCPS_IR_Topic_Description::try_associate_publication(DCPS_IR_Publication* publication)
00254 {
00255   // for each subscription check for compatibility
00256   DCPS_IR_Subscription* subscription = 0;
00257   OpenDDS::DCPS::IncompatibleQosStatus* qosStatus = 0;
00258 
00259   DCPS_IR_Subscription_Set::ITERATOR iter = subscriptionRefs_.begin();
00260   DCPS_IR_Subscription_Set::ITERATOR end = subscriptionRefs_.end();
00261 
00262   while (iter != end) {
00263     subscription = *iter;
00264     ++iter;
00265     try_associate(publication, subscription);
00266 
00267     // Check the subscriptions QOS status
00268     qosStatus = subscription->get_incompatibleQosStatus();
00269 
00270     if (0 < qosStatus->count_since_last_send) {
00271       subscription->update_incompatible_qos();
00272     }
00273   }
00274 
00275   // Check the publications QOS status
00276   qosStatus = publication->get_incompatibleQosStatus();
00277 
00278   if (0 < qosStatus->count_since_last_send) {
00279     publication->update_incompatible_qos();
00280   }
00281 }
00282 
00283 void DCPS_IR_Topic_Description::try_associate_subscription(DCPS_IR_Subscription* subscription)
00284 {
00285   // check all topics for compatible publications
00286 
00287   DCPS_IR_Topic* topic = 0;
00288 
00289   DCPS_IR_Topic_Set::ITERATOR iter = topics_.begin();
00290   DCPS_IR_Topic_Set::ITERATOR end = topics_.end();
00291 
00292   while (iter != end) {
00293     topic = *iter;
00294     ++iter;
00295 
00296     topic->try_associate(subscription);
00297   }
00298 
00299   // Check the subscriptions QOS status
00300   OpenDDS::DCPS::IncompatibleQosStatus* qosStatus =
00301     subscription->get_incompatibleQosStatus();
00302 
00303   if (0 < qosStatus->total_count) {
00304     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00305       OpenDDS::DCPS::RepoIdConverter converter(subscription->get_id());
00306       ACE_DEBUG((LM_DEBUG,
00307                  ACE_TEXT("(%P|%t) DCPS_IR_Topic_Description::try_associate_subscription: ")
00308                  ACE_TEXT("topic description %C has %d incompatible publications ")
00309                  ACE_TEXT("with subscription %C.\n"),
00310                  this->name_.c_str(),
00311                  qosStatus->total_count,
00312                  std::string(converter).c_str()));
00313     }
00314 
00315     subscription->update_incompatible_qos();
00316   }
00317 }
00318 
00319 bool
00320 DCPS_IR_Topic_Description::try_associate(DCPS_IR_Publication* publication,
00321                                          DCPS_IR_Subscription* subscription)
00322 {
00323   if (publication->is_subscription_ignored(subscription->get_participant_id(),
00324                                            subscription->get_topic_id(),
00325                                            subscription->get_id())) {
00326     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00327       OpenDDS::DCPS::RepoIdConverter pub_converter(publication->get_id());
00328       OpenDDS::DCPS::RepoIdConverter sub_converter(subscription->get_id());
00329       ACE_DEBUG((LM_DEBUG,
00330                  ACE_TEXT("(%P|%t) DCPS_IR_Topic_Description::try_associate: ")
00331                  ACE_TEXT("topic description %C publication %C ignores subscription %C.\n"),
00332                  this->name_.c_str(),
00333                  std::string(pub_converter).c_str(),
00334                  std::string(sub_converter).c_str()));
00335     }
00336   }
00337 
00338   else if (subscription->is_publication_ignored(publication->get_participant_id(),
00339                                                 publication->get_topic_id(),
00340                                                 publication->get_id())) {
00341     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00342       OpenDDS::DCPS::RepoIdConverter pub_converter(publication->get_id());
00343       OpenDDS::DCPS::RepoIdConverter sub_converter(subscription->get_id());
00344       ACE_DEBUG((LM_DEBUG,
00345                  ACE_TEXT("(%P|%t) DCPS_IR_Topic_Description::try_associate: ")
00346                  ACE_TEXT("topic description %C subscription %C ignores publication %C.\n"),
00347                  this->name_.c_str(),
00348                  std::string(pub_converter).c_str(),
00349                  std::string(sub_converter).c_str()));
00350     }
00351 
00352   } else {
00353     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00354       OpenDDS::DCPS::RepoIdConverter pub_converter(publication->get_id());
00355       OpenDDS::DCPS::RepoIdConverter sub_converter(subscription->get_id());
00356       ACE_DEBUG((LM_DEBUG,
00357                  ACE_TEXT("(%P|%t) DCPS_IR_Topic_Description::try_associate: ")
00358                  ACE_TEXT("topic description %C checking compatibility of ")
00359                  ACE_TEXT("publication %C with subscription %C.\n"),
00360                  this->name_.c_str(),
00361                  std::string(pub_converter).c_str(),
00362                  std::string(sub_converter).c_str()));
00363     }
00364 
00365     if (OpenDDS::DCPS::compatibleQOS(publication->get_incompatibleQosStatus(),
00366                                      subscription->get_incompatibleQosStatus(),
00367                                      publication->get_transportLocatorSeq(),
00368                                      subscription->get_transportLocatorSeq(),
00369                                      publication->get_datawriter_qos(),
00370                                      subscription->get_datareader_qos(),
00371                                      publication->get_publisher_qos(),
00372                                      subscription->get_subscriber_qos())) {
00373       associate(publication, subscription);
00374       return true;
00375     }
00376 
00377     // Dont notify that there is an incompatible qos here
00378     // notify where we can distinguish which one is being added
00379     // so we only send one response(with all incompatible qos) to it
00380   }
00381 
00382   return false;
00383 }
00384 
00385 void DCPS_IR_Topic_Description::associate(DCPS_IR_Publication* publication,
00386                                           DCPS_IR_Subscription* subscription)
00387 {
00388   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00389     OpenDDS::DCPS::RepoIdConverter pub_converter(publication->get_id());
00390     OpenDDS::DCPS::RepoIdConverter sub_converter(subscription->get_id());
00391     ACE_DEBUG((LM_DEBUG,
00392                ACE_TEXT("(%P|%t) DCPS_IR_Topic_Description::associate: ")
00393                ACE_TEXT("topic description %C associating ")
00394                ACE_TEXT("publication %C with subscription %C.\n"),
00395                this->name_.c_str(),
00396                std::string(pub_converter).c_str(),
00397                std::string(sub_converter).c_str()));
00398   }
00399 
00400   // The publication must be told first because it will be the connector
00401   // if a data link needs to be created.
00402   // This is only required if the publication and subscription are being
00403   // handed by the same process and thread.  Order when there is
00404   // another thread or process is not important.
00405   // Note: the client thread may process the add_associations() oneway
00406   //       call instead of the ORB thread because it is currently
00407   //       in a two-way call to the Repo.
00408   int error = publication->add_associated_subscription(subscription, true);
00409 
00410   // If there was no TAO error contacting the publication (This can happen if
00411   // an old publisher has exited non-gracefully)
00412   if (error != -1) {
00413     // Associate the subscription with the publication
00414     subscription->add_associated_publication(publication, false);
00415   } else {
00416     ACE_DEBUG((LM_INFO, ACE_TEXT("Invalid publication detected, NOT notifying subcription of association\n")));
00417   }
00418 }
00419 
00420 void DCPS_IR_Topic_Description::reevaluate_associations(DCPS_IR_Subscription* subscription)
00421 {
00422   DCPS_IR_Topic* topic = 0;
00423 
00424   DCPS_IR_Topic_Set::ITERATOR iter = topics_.begin();
00425   DCPS_IR_Topic_Set::ITERATOR end = topics_.end();
00426 
00427   while (iter != end) {
00428     topic = *iter;
00429     ++iter;
00430 
00431     topic->reevaluate_associations(subscription);
00432   }
00433 }
00434 
00435 void DCPS_IR_Topic_Description::reevaluate_associations(DCPS_IR_Publication* publication)
00436 {
00437   DCPS_IR_Subscription * sub = 0;
00438   DCPS_IR_Subscription_Set::ITERATOR iter = subscriptionRefs_.begin();
00439   DCPS_IR_Subscription_Set::ITERATOR end = subscriptionRefs_.end();
00440 
00441   while (iter != end) {
00442     sub = *iter;
00443     ++iter;
00444     publication->reevaluate_association(sub);
00445     sub->reevaluate_association(publication);
00446   }
00447 }
00448 
00449 const char* DCPS_IR_Topic_Description::get_name() const
00450 {
00451   return name_.c_str();
00452 }
00453 
00454 const char* DCPS_IR_Topic_Description::get_dataTypeName() const
00455 {
00456   return dataTypeName_.c_str();
00457 }
00458 
00459 CORBA::ULong DCPS_IR_Topic_Description::get_number_topics() const
00460 {
00461   return static_cast<CORBA::ULong>(topics_.size());
00462 }
00463 
00464 std::string
00465 DCPS_IR_Topic_Description::dump_to_string(const std::string& prefix,
00466                                           int depth) const
00467 {
00468   std::string str;
00469 #if !defined (OPENDDS_INFOREPO_REDUCED_FOOTPRINT)
00470   for (int i=0; i < depth; i++)
00471     str += prefix;
00472   std::string indent = str + prefix;
00473   str += "DCPS_IR_Topic_Description [";
00474   str += name_.c_str();
00475   str += "][";
00476   str += dataTypeName_.c_str();
00477   str += "]\n";
00478 
00479   str += indent + "Subscription References [ ";
00480   for (DCPS_IR_Subscription_Set::const_iterator sub = subscriptionRefs_.begin();
00481        sub != subscriptionRefs_.end();
00482        sub++)
00483   {
00484     OpenDDS::DCPS::RepoIdConverter sub_converter((*sub)->get_id());
00485     str += std::string(sub_converter);
00486     str += " ";
00487   }
00488   str += "]\n";
00489 
00490   str += indent + "Topics [ ";
00491   for (DCPS_IR_Topic_Set::const_iterator top = topics_.begin();
00492        top != topics_.end();
00493        top++)
00494   {
00495     OpenDDS::DCPS::RepoIdConverter top_converter((*top)->get_id());
00496     str += std::string(top_converter);
00497     str += " ";
00498   }
00499   str += "]\n";
00500 #endif // !defined (OPENDDS_INFOREPO_REDUCED_FOOTPRINT)
00501   return str;
00502 }
00503 
00504 #if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
00505 
00506 template class ACE_Node<DCPS_IR_Subscription*>;
00507 template class ACE_Unbounded_Set<DCPS_IR_Subscription*>;
00508 template class ACE_Unbounded_Set_Iterator<DCPS_IR_Subscription*>;
00509 
00510 template class ACE_Node<DCPS_IR_Topic*>;
00511 template class ACE_Unbounded_Set<DCPS_IR_Topic*>;
00512 template class ACE_Unbounded_Set_Iterator<DCPS_IR_Topic*>;
00513 
00514 #elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
00515 
00516 #pragma instantiate ACE_Node<DCPS_IR_Subscription*>
00517 #pragma instantiate ACE_Unbounded_Set<DCPS_IR_Subscription*>
00518 #pragma instantiate ACE_Unbounded_Set_Iterator<DCPS_IR_Subscription*>
00519 
00520 #pragma instantiate ACE_Node<DCPS_IR_Topic*>
00521 #pragma instantiate ACE_Unbounded_Set<DCPS_IR_Topic*>
00522 #pragma instantiate ACE_Unbounded_Set_Iterator<DCPS_IR_Topic*>
00523 
00524 #endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */

Generated on Fri Feb 12 20:05:20 2016 for OpenDDS by  doxygen 1.4.7