DCPSInfo_i.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "DcpsInfo_pch.h"
00009 
00010 #include "tao/ORB_Core.h"
00011 
00012 #include /**/ "DCPSInfo_i.h"
00013 
00014 #include "dds/DCPS/InfoRepoDiscovery/InfoC.h"
00015 #include "dds/DCPS/transport/tcp/TcpInst.h"
00016 #include "dds/DCPS/transport/framework/TransportRegistry.h"
00017 #include "dds/DCPS/transport/framework/TransportInst.h"
00018 #include "dds/DCPS/transport/framework/TransportInst_rch.h"
00019 #include "dds/DCPS/transport/tcp/TcpInst.h"
00020 #include "dds/DCPS/transport/tcp/TcpInst_rch.h"
00021 #include "UpdateManager.h"
00022 #include "ShutdownInterface.h"
00023 
00024 #include "dds/DCPS/BuiltInTopicUtils.h"
00025 #include "dds/DCPS/GuidUtils.h"
00026 #include "dds/DCPS/RepoIdConverter.h"
00027 
00028 #include /**/ "tao/debug.h"
00029 
00030 #include /**/ "ace/Read_Buffer.h"
00031 #include /**/ "ace/OS_NS_stdio.h"
00032 #include "ace/Dynamic_Service.h"
00033 #include "ace/Reactor.h"
00034 
00035 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00036 
00037 // constructor
00038 TAO_DDS_DCPSInfo_i::TAO_DDS_DCPSInfo_i(CORBA::ORB_ptr orb
00039                                        , bool reincarnate
00040                                        , ShutdownInterface* shutdown
00041                                        , const TAO_DDS_DCPSFederationId& federation)
00042   : orb_(CORBA::ORB::_duplicate(orb))
00043   , federation_(federation)
00044   , participantIdGenerator_(federation.id())
00045   , um_(0)
00046   , reincarnate_(reincarnate)
00047   , shutdown_(shutdown)
00048   , reassociate_timer_id_(-1)
00049   , dispatch_check_timer_id_(-1)
00050 {
00051   if (!TheServiceParticipant->use_bidir_giop()) {
00052     int argc = 0;
00053     char** no_argv = 0;
00054     dispatchingOrb_ = CORBA::ORB_init(argc, no_argv, "dispatchingOnly");
00055   }
00056 }
00057 
00058 TAO_DDS_DCPSInfo_i::~TAO_DDS_DCPSInfo_i()
00059 {
00060 }
00061 
00062 int
00063 TAO_DDS_DCPSInfo_i::handle_timeout(const ACE_Time_Value& /*now*/,
00064                                    const void* arg)
00065 {
00066   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, 0);
00067 
00068   if (arg == this) {
00069     if ( !CORBA::is_nil(this->dispatchingOrb_.in())){
00070       if (this->dispatchingOrb_->work_pending())
00071       {
00072         // Ten microseconds
00073         ACE_Time_Value small(0,10);
00074         this->dispatchingOrb_->perform_work(small);
00075       }
00076     }
00077   }
00078   else {
00079   // NOTE: This is a purposefully naive approach to addressing defunct
00080   // associations.  In the future, it may be worthwhile to introduce a
00081   // callback model to fix the heinous runtime cost below:
00082   for (DCPS_IR_Domain_Map::const_iterator dom(this->domains_.begin());
00083        dom != this->domains_.end(); ++dom) {
00084 
00085     const DCPS_IR_Participant_Map& participants(dom->second->participants());
00086     for (DCPS_IR_Participant_Map::const_iterator part(participants.begin());
00087          part != participants.end(); ++part) {
00088 
00089       const DCPS_IR_Subscription_Map& subscriptions(part->second->subscriptions());
00090       for (DCPS_IR_Subscription_Map::const_iterator sub(subscriptions.begin());
00091            sub != subscriptions.end(); ++sub) {
00092         sub->second->reevaluate_defunct_associations();
00093       }
00094 
00095       const DCPS_IR_Publication_Map& publications(part->second->publications());
00096       for (DCPS_IR_Publication_Map::const_iterator pub(publications.begin());
00097            pub != publications.end(); ++pub) {
00098         pub->second->reevaluate_defunct_associations();
00099       }
00100     }
00101   }
00102   }
00103 
00104   return 0;
00105 }
00106 
00107 void
00108 TAO_DDS_DCPSInfo_i::shutdown()
00109 {
00110   this->shutdown_->shutdown();
00111 }
00112 
00113 CORBA::ORB_ptr
00114 TAO_DDS_DCPSInfo_i::orb()
00115 {
00116   return CORBA::ORB::_duplicate(this->orb_.in());
00117 }
00118 
00119 CORBA::Boolean TAO_DDS_DCPSInfo_i::attach_participant(
00120   DDS::DomainId_t            domainId,
00121   const OpenDDS::DCPS::RepoId& participantId)
00122 {
00123   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, 0);
00124 
00125   // Grab the domain.
00126   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
00127 
00128   if (where == this->domains_.end()) {
00129     throw OpenDDS::DCPS::Invalid_Domain();
00130   }
00131 
00132   // Grab the participant.
00133   DCPS_IR_Participant* participant
00134   = where->second->participant(participantId);
00135 
00136   if (0 == participant) {
00137     throw OpenDDS::DCPS::Invalid_Participant();
00138   }
00139 
00140   // Establish ownership within the local repository.
00141   participant->takeOwnership();
00142 
00143   return false;
00144 }
00145 
00146 bool
00147 TAO_DDS_DCPSInfo_i::changeOwnership(
00148   DDS::DomainId_t              domainId,
00149   const OpenDDS::DCPS::RepoId& participantId,
00150   long                           sender,
00151   long                           owner)
00152 {
00153   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, false);
00154 
00155   // Grab the domain.
00156   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
00157 
00158   if (where == this->domains_.end()) {
00159     return false;
00160   }
00161 
00162   // Grab the participant.
00163   DCPS_IR_Participant* participant
00164   = where->second->participant(participantId);
00165 
00166   if (0 == participant) {
00167     return false;
00168   }
00169 
00170   // Establish the ownership.
00171   participant->changeOwner(sender, owner);
00172   return true;
00173 }
00174 
00175 OpenDDS::DCPS::TopicStatus TAO_DDS_DCPSInfo_i::assert_topic(
00176   OpenDDS::DCPS::RepoId_out topicId,
00177   DDS::DomainId_t domainId,
00178   const OpenDDS::DCPS::RepoId& participantId,
00179   const char * topicName,
00180   const char * dataTypeName,
00181   const DDS::TopicQos & qos,
00182   bool /*hasDcpsKey -- only used for RTPS Discovery*/)
00183 {
00184   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, OpenDDS::DCPS::INTERNAL_ERROR);
00185   // Grab the domain.
00186   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
00187 
00188   if (where == this->domains_.end()) {
00189     throw OpenDDS::DCPS::Invalid_Domain();
00190   }
00191 
00192   // Grab the participant.
00193   DCPS_IR_Participant* participantPtr
00194   = where->second->participant(participantId);
00195 
00196   if (0 == participantPtr) {
00197     throw OpenDDS::DCPS::Invalid_Participant();
00198   }
00199 
00200   OpenDDS::DCPS::TopicStatus topicStatus
00201   = where->second->add_topic(
00202       topicId,
00203       topicName,
00204       dataTypeName,
00205       qos,
00206       participantPtr);
00207 
00208   if (this->um_ && (participantPtr->isBitPublisher() == false)) {
00209     Update::UTopic topic(domainId, topicId, participantId
00210                          , topicName, dataTypeName
00211                          , const_cast<DDS::TopicQos &>(qos));
00212     this->um_->create(topic);
00213 
00214     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00215       OpenDDS::DCPS::RepoIdConverter converter(topicId);
00216       ACE_DEBUG((LM_DEBUG,
00217                  ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::assert_topic: ")
00218                  ACE_TEXT("pushing creation of topic %C in domain %d.\n"),
00219                  std::string(converter).c_str(),
00220                  domainId));
00221     }
00222   }
00223   return topicStatus;
00224 }
00225 
00226 bool
00227 TAO_DDS_DCPSInfo_i::add_topic(const OpenDDS::DCPS::RepoId& topicId,
00228                               DDS::DomainId_t domainId,
00229                               const OpenDDS::DCPS::RepoId& participantId,
00230                               const char* topicName,
00231                               const char* dataTypeName,
00232                               const DDS::TopicQos& qos)
00233 {
00234   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, false);
00235 
00236   // Grab the domain.
00237   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
00238 
00239   if (where == this->domains_.end()) {
00240     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00241       ACE_DEBUG((LM_WARNING,
00242                  ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_topic: ")
00243                  ACE_TEXT("invalid domain %d.\n"),
00244                  domainId));
00245     }
00246 
00247     return false;
00248   }
00249 
00250   // Grab the participant.
00251   DCPS_IR_Participant* participantPtr
00252   = where->second->participant(participantId);
00253 
00254   if (0 == participantPtr) {
00255     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00256       OpenDDS::DCPS::RepoIdConverter converter(participantId);
00257       ACE_DEBUG((LM_WARNING,
00258                  ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_topic: ")
00259                  ACE_TEXT("invalid participant %C.\n"),
00260                  std::string(converter).c_str()));
00261     }
00262 
00263     return false;
00264   }
00265 
00266   OpenDDS::DCPS::TopicStatus topicStatus
00267   = where->second->force_add_topic(topicId, topicName, dataTypeName,
00268                                    qos, participantPtr);
00269 
00270   if (topicStatus != OpenDDS::DCPS::CREATED) {
00271     return false;
00272   }
00273 
00274   OpenDDS::DCPS::RepoIdConverter converter(topicId);
00275 
00276   // See if we are adding a topic that was created within this
00277   // repository or a different repository.
00278   if (converter.federationId() == federation_.id()) {
00279     // Ensure the topic RepoId values do not conflict.
00280     participantPtr->last_topic_key(converter.entityKey());
00281   }
00282 
00283   return true;
00284 }
00285 
00286 OpenDDS::DCPS::TopicStatus TAO_DDS_DCPSInfo_i::find_topic(
00287   DDS::DomainId_t domainId,
00288   const char * topicName,
00289   CORBA::String_out dataTypeName,
00290   DDS::TopicQos_out qos,
00291   OpenDDS::DCPS::RepoId_out topicId)
00292 {
00293   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, OpenDDS::DCPS::INTERNAL_ERROR);
00294 
00295   // Grab the domain.
00296   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
00297 
00298   if (where == this->domains_.end()) {
00299     throw OpenDDS::DCPS::Invalid_Domain();
00300   }
00301 
00302   OpenDDS::DCPS::TopicStatus status = OpenDDS::DCPS::NOT_FOUND;
00303 
00304   DCPS_IR_Topic* topic = 0;
00305   qos = new DDS::TopicQos;
00306 
00307   status = where->second->find_topic(topicName, topic);
00308 
00309   if (0 != topic) {
00310     status = OpenDDS::DCPS::FOUND;
00311     const DCPS_IR_Topic_Description* desc = topic->get_topic_description();
00312     dataTypeName = desc->get_dataTypeName();
00313     *qos = *(topic->get_topic_qos());
00314     topicId = topic->get_id();
00315   }
00316 
00317   return status;
00318 }
00319 
00320 OpenDDS::DCPS::TopicStatus TAO_DDS_DCPSInfo_i::remove_topic(
00321   DDS::DomainId_t domainId,
00322   const OpenDDS::DCPS::RepoId& participantId,
00323   const OpenDDS::DCPS::RepoId& topicId)
00324 {
00325   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, OpenDDS::DCPS::INTERNAL_ERROR);
00326 
00327   // Grab the domain.
00328   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
00329 
00330   if (where == this->domains_.end()) {
00331     throw OpenDDS::DCPS::Invalid_Domain();
00332   }
00333 
00334   // Grab the participant.
00335   DCPS_IR_Participant* partPtr
00336   = where->second->participant(participantId);
00337 
00338   if (0 == partPtr) {
00339     throw OpenDDS::DCPS::Invalid_Participant();
00340   }
00341 
00342   DCPS_IR_Topic* topic;
00343 
00344   if (partPtr->find_topic_reference(topicId, topic) != 0) {
00345     throw OpenDDS::DCPS::Invalid_Topic();
00346   }
00347 
00348   OpenDDS::DCPS::TopicStatus removedStatus = where->second->remove_topic(partPtr, topic);
00349 
00350   if (this->um_
00351       && (partPtr->isOwner() == true)
00352       && (partPtr->isBitPublisher() == false)) {
00353     Update::IdPath path(domainId, participantId, topicId);
00354     this->um_->destroy(path, Update::Topic);
00355 
00356     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00357       OpenDDS::DCPS::RepoIdConverter converter(topicId);
00358       ACE_DEBUG((LM_DEBUG,
00359                  ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::remove_topic: ")
00360                  ACE_TEXT("pushing deletion of topic %C in domain %d.\n"),
00361                  std::string(converter).c_str(),
00362                  domainId));
00363     }
00364   }
00365 
00366   return removedStatus;
00367 }
00368 
00369 OpenDDS::DCPS::RepoId TAO_DDS_DCPSInfo_i::add_publication(
00370   DDS::DomainId_t domainId,
00371   const OpenDDS::DCPS::RepoId& participantId,
00372   const OpenDDS::DCPS::RepoId& topicId,
00373   OpenDDS::DCPS::DataWriterRemote_ptr publication,
00374   const DDS::DataWriterQos & qos,
00375   const OpenDDS::DCPS::TransportLocatorSeq & transInfo,
00376   const DDS::PublisherQos & publisherQos)
00377 {
00378   if (CORBA::is_nil(publication)) {
00379     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00380       ACE_DEBUG((LM_WARNING,
00381         ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_publication: ")
00382         ACE_TEXT("invalid publication reference.\n")));
00383     }
00384     return OpenDDS::DCPS::GUID_UNKNOWN;
00385   }
00386 
00387   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, OpenDDS::DCPS::GUID_UNKNOWN);
00388 
00389   // Grab the domain.
00390   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
00391 
00392   if (where == this->domains_.end()) {
00393     throw OpenDDS::DCPS::Invalid_Domain();
00394   }
00395 
00396   // Grab the participant.
00397   DCPS_IR_Participant* partPtr
00398   = where->second->participant(participantId);
00399 
00400   if (0 == partPtr) {
00401     throw OpenDDS::DCPS::Invalid_Participant();
00402   }
00403 
00404   DCPS_IR_Topic* topic = where->second->find_topic(topicId);
00405 
00406   if (topic == 0) {
00407     throw OpenDDS::DCPS::Invalid_Topic();
00408   }
00409 
00410   // Get a Id for the Writer, make it a builtin kind if this is for a BIT
00411   OpenDDS::DCPS::RepoId pubId = partPtr->get_next_publication_id(
00412     OpenDDS::DCPS::RepoIdConverter(topicId).isBuiltinDomainEntity());
00413 
00414   OpenDDS::DCPS::DataWriterRemote_var dispatchingPublication =
00415     OpenDDS::DCPS::DataWriterRemote::_duplicate(publication);
00416 
00417   if (dispatchingOrb_) {
00418     // Remarshall the remote reference onto the dispatching orb.
00419     CORBA::String_var pubStr = orb_->object_to_string(dispatchingPublication);
00420     CORBA::Object_var pubObj = dispatchingOrb_->string_to_object(pubStr);
00421     if (CORBA::is_nil(pubObj))  {
00422       if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00423         ACE_DEBUG((LM_WARNING,
00424                    ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_publication: ")
00425                    ACE_TEXT("failure marshalling publication on dispatching orb.\n")));
00426       }
00427       return OpenDDS::DCPS::GUID_UNKNOWN;
00428     }
00429 
00430     dispatchingPublication = OpenDDS::DCPS::DataWriterRemote::_unchecked_narrow(pubObj);
00431   }
00432 
00433   OpenDDS::DCPS::unique_ptr<DCPS_IR_Publication> pubPtr(
00434     new DCPS_IR_Publication(
00435                    pubId,
00436                    partPtr,
00437                    topic,
00438                    dispatchingPublication.in(),
00439                    qos,
00440                    transInfo,
00441                    publisherQos));
00442 
00443   DCPS_IR_Publication* pub = pubPtr.get();
00444   if (partPtr->add_publication(OpenDDS::DCPS::move(pubPtr)) != 0) {
00445     // failed to add.  we are responsible for the memory.
00446     pubId = OpenDDS::DCPS::GUID_UNKNOWN;
00447   } else if (topic->add_publication_reference(pub) != 0) {
00448     // Failed to add to the topic
00449     // so remove from participant and fail.
00450     partPtr->remove_publication(pubId);
00451     pubId = OpenDDS::DCPS::GUID_UNKNOWN;
00452   }
00453 
00454   if (this->um_ && (partPtr->isBitPublisher() == false)) {
00455     CORBA::String_var callback = orb_->object_to_string(publication);
00456     Update::ContentSubscriptionInfo csi;
00457 
00458     Update::UWActor actor(domainId, pubId, topicId, participantId, Update::DataWriter
00459                           , callback.in()
00460                           , const_cast<DDS::PublisherQos &>(publisherQos)
00461                           , const_cast<DDS::DataWriterQos &>(qos)
00462                           , const_cast<OpenDDS::DCPS::TransportLocatorSeq &>
00463                           (transInfo), csi);
00464     this->um_->create(actor);
00465 
00466     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00467       OpenDDS::DCPS::RepoIdConverter converter(pubId);
00468       ACE_DEBUG((LM_DEBUG,
00469                  ACE_TEXT("(%P|%t) (RepoId)TAO_DDS_DCPSInfo_i::add_publication: ")
00470                  ACE_TEXT("pushing creation of publication %C in domain %d.\n"),
00471                  std::string(converter).c_str(),
00472                  domainId));
00473     }
00474   }
00475 
00476   where->second->remove_dead_participants();
00477   return pubId;
00478 }
00479 
00480 bool
00481 TAO_DDS_DCPSInfo_i::add_publication(DDS::DomainId_t domainId,
00482                                     const OpenDDS::DCPS::RepoId& participantId,
00483                                     const OpenDDS::DCPS::RepoId& topicId,
00484                                     const OpenDDS::DCPS::RepoId& pubId,
00485                                     const char* pub_str,
00486                                     const DDS::DataWriterQos & qos,
00487                                     const OpenDDS::DCPS::TransportLocatorSeq & transInfo,
00488                                     const DDS::PublisherQos & publisherQos,
00489                                     bool associate)
00490 {
00491   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, false);
00492 
00493   // Grab the domain.
00494   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
00495 
00496   if (where == this->domains_.end()) {
00497     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00498       ACE_DEBUG((LM_WARNING,
00499                  ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_publication: ")
00500                  ACE_TEXT("invalid domain %d.\n"),
00501                  domainId));
00502     }
00503 
00504     return false;
00505   }
00506 
00507   // Grab the participant.
00508   DCPS_IR_Participant* partPtr
00509   = where->second->participant(participantId);
00510 
00511   if (0 == partPtr) {
00512     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00513       OpenDDS::DCPS::RepoIdConverter converter(pubId);
00514       ACE_DEBUG((LM_WARNING,
00515                  ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_publication: ")
00516                  ACE_TEXT("invalid participant %C in domain %d.\n"),
00517                  std::string(converter).c_str(),
00518                  domainId));
00519     }
00520 
00521     return false;
00522   }
00523 
00524   DCPS_IR_Topic* topic = where->second->find_topic(topicId);
00525 
00526   if (topic == 0) {
00527     OpenDDS::DCPS::RepoIdConverter converter(topicId);
00528     ACE_DEBUG((LM_WARNING,
00529                ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_publication: ")
00530                ACE_TEXT("invalid topic %C in domain %d.\n"),
00531                std::string(converter).c_str(),
00532                domainId));
00533     return false;
00534   }
00535 
00536   /// @TODO: Check if this is already stored.  If so, just clear the callback IOR.
00537 
00538   CORBA::Object_var obj = (dispatchingOrb_ ? dispatchingOrb_ : orb_)->string_to_object(pub_str);
00539   if (CORBA::is_nil(obj.in())) {
00540     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00541       ACE_DEBUG((LM_WARNING,
00542                  ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_publication: ")
00543                  ACE_TEXT("failure converting string %C to objref\n"),
00544                  pub_str));
00545     }
00546     return false;
00547   }
00548 
00549   OpenDDS::DCPS::DataWriterRemote_var publication = OpenDDS::DCPS::DataWriterRemote::_unchecked_narrow(obj.in());
00550 
00551   OpenDDS::DCPS::unique_ptr<DCPS_IR_Publication> pubPtr(
00552     new DCPS_IR_Publication(
00553                    pubId,
00554                    partPtr,
00555                    topic,
00556                    publication.in(),
00557                    qos,
00558                    transInfo,
00559                    publisherQos));
00560 
00561   DCPS_IR_Publication* pub = pubPtr.get();
00562   switch (partPtr->add_publication(move(pubPtr))) {
00563   case -1: {
00564     OpenDDS::DCPS::RepoIdConverter converter(pubId);
00565     ACE_ERROR((LM_ERROR,
00566                ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::add_publication: ")
00567                ACE_TEXT("failed to add publication to participant %C.\n"),
00568                std::string(converter).c_str()));
00569     return false;
00570   }
00571 
00572   case 1:
00573     return false;
00574   case 0:
00575   default:
00576     break;
00577   }
00578 
00579   switch (topic->add_publication_reference(pub, associate)) {
00580   case -1: {
00581     OpenDDS::DCPS::RepoIdConverter converter(pubId);
00582     ACE_ERROR((LM_ERROR,
00583                ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::add_publication: ")
00584                ACE_TEXT("failed to add publication to participant %C topic list.\n"),
00585                std::string(converter).c_str()));
00586 
00587     // Remove the publication.
00588     partPtr->remove_publication(pubId);
00589 
00590   }
00591   return false;
00592 
00593   case 1: // This is actually a really really bad place to jump to.
00594     // This means that we successfully added the new publication
00595     // to the participant (it had not been inserted before) but
00596     // that we are adding a duplicate publication to the topic
00597     // list - which should never ever be able to happen.
00598     return false;
00599 
00600   case 0:
00601   default:
00602     break;
00603   }
00604 
00605   OpenDDS::DCPS::RepoIdConverter converter(pubId);
00606 
00607   // See if we are adding a publication that was created within this
00608   // repository or a different repository.
00609   if (converter.federationId() == federation_.id()) {
00610     // Ensure the publication RepoId values do not conflict.
00611     partPtr->last_publication_key(converter.entityKey());
00612   }
00613 
00614   return true;
00615 }
00616 
00617 void TAO_DDS_DCPSInfo_i::remove_publication(
00618   DDS::DomainId_t domainId,
00619   const OpenDDS::DCPS::RepoId& participantId,
00620   const OpenDDS::DCPS::RepoId& publicationId)
00621 {
00622   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00623 
00624   // Grab the domain.
00625   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
00626 
00627   if (where == this->domains_.end()) {
00628     throw OpenDDS::DCPS::Invalid_Domain();
00629   }
00630 
00631   // Grab the participant.
00632   DCPS_IR_Participant* partPtr
00633   = where->second->participant(participantId);
00634 
00635   if (0 == partPtr) {
00636     throw OpenDDS::DCPS::Invalid_Participant();
00637   }
00638 
00639   if (partPtr->remove_publication(publicationId) != 0) {
00640     where->second->remove_dead_participants();
00641 
00642     // throw exception because the publication was not removed!
00643     throw OpenDDS::DCPS::Invalid_Publication();
00644   }
00645 
00646   where->second->remove_dead_participants();
00647 
00648   if (this->um_
00649       && (partPtr->isOwner() == true)
00650       && (partPtr->isBitPublisher() == false)) {
00651     Update::IdPath path(domainId, participantId, publicationId);
00652     this->um_->destroy(path, Update::Actor, Update::DataWriter);
00653 
00654     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00655       OpenDDS::DCPS::RepoIdConverter converter(publicationId);
00656       ACE_DEBUG((LM_DEBUG,
00657                  ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::remove_publication: ")
00658                  ACE_TEXT("pushing deletion of publication %C in domain %d.\n"),
00659                  std::string(converter).c_str(),
00660                  domainId));
00661     }
00662   }
00663 }
00664 
00665 OpenDDS::DCPS::RepoId TAO_DDS_DCPSInfo_i::add_subscription(
00666   DDS::DomainId_t domainId,
00667   const OpenDDS::DCPS::RepoId& participantId,
00668   const OpenDDS::DCPS::RepoId& topicId,
00669   OpenDDS::DCPS::DataReaderRemote_ptr subscription,
00670   const DDS::DataReaderQos & qos,
00671   const OpenDDS::DCPS::TransportLocatorSeq & transInfo,
00672   const DDS::SubscriberQos & subscriberQos,
00673   const char* filterClassName,
00674   const char* filterExpression,
00675   const DDS::StringSeq& exprParams)
00676 {
00677   if (CORBA::is_nil(subscription)) {
00678     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00679       ACE_DEBUG((LM_WARNING,
00680         ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_subscription: ")
00681         ACE_TEXT("invalid subscription reference.\n")));
00682     }
00683     return OpenDDS::DCPS::GUID_UNKNOWN;
00684   }
00685 
00686   DCPS_IR_Domain* domainPtr;
00687   DCPS_IR_Participant* partPtr;
00688   DCPS_IR_Topic* topic;
00689   OpenDDS::DCPS::RepoId subId;
00690   OpenDDS::DCPS::unique_ptr<DCPS_IR_Subscription> subPtr;
00691   {
00692     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, OpenDDS::DCPS::GUID_UNKNOWN);
00693 
00694     // Grab the domain.
00695     DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
00696 
00697     if (where == this->domains_.end()) {
00698       throw OpenDDS::DCPS::Invalid_Domain();
00699     }
00700 
00701     // Grab the domain and participant.
00702     domainPtr = where->second.get();
00703     partPtr = domainPtr->participant(participantId);
00704 
00705     if (0 == partPtr) {
00706       throw OpenDDS::DCPS::Invalid_Participant();
00707     }
00708 
00709     topic = where->second->find_topic(topicId);
00710 
00711     if (topic == 0) {
00712       throw OpenDDS::DCPS::Invalid_Topic();
00713     }
00714 
00715     // Get a Id for the Reader, make it a builtin kind if this is for a BIT
00716     subId = partPtr->get_next_subscription_id(
00717       OpenDDS::DCPS::RepoIdConverter(topicId).isBuiltinDomainEntity());
00718 
00719     OpenDDS::DCPS::DataReaderRemote_var dispatchingSubscription (
00720       OpenDDS::DCPS::DataReaderRemote::_duplicate(subscription));
00721 
00722     if (dispatchingOrb_) {
00723       // Remarshall the remote reference onto the dispatching orb.
00724       CORBA::String_var subStr = orb_->object_to_string(dispatchingSubscription);
00725       CORBA::Object_var subObj = dispatchingOrb_->string_to_object(subStr);
00726       if (CORBA::is_nil(subObj.in())) {
00727         if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00728           ACE_DEBUG((LM_WARNING,
00729                      ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_subscription: ")
00730                      ACE_TEXT("failure marshalling subscription on dispatching orb.\n")));
00731         }
00732         return OpenDDS::DCPS::GUID_UNKNOWN;
00733       }
00734       dispatchingSubscription = OpenDDS::DCPS::DataReaderRemote::_unchecked_narrow(subObj);
00735     }
00736 
00737     subPtr.reset(
00738       new DCPS_IR_Subscription(
00739                      subId,
00740                      partPtr,
00741                      topic,
00742                      dispatchingSubscription.in(),
00743                      qos,
00744                      transInfo,
00745                      subscriberQos,
00746                      filterClassName,
00747                      filterExpression,
00748                      exprParams));
00749 
00750     // Release lock
00751   }
00752 
00753   DCPS_IR_Subscription* sub = subPtr.get();
00754   if (partPtr->add_subscription(move(subPtr)) != 0) {
00755     // failed to add.  we are responsible for the memory.
00756     subId = OpenDDS::DCPS::GUID_UNKNOWN;
00757   } else if (topic->add_subscription_reference(sub) != 0) {
00758     ACE_ERROR((LM_ERROR, ACE_TEXT("Failed to add subscription to topic list.\n")));
00759     // No associations were made so remove and fail.
00760     partPtr->remove_subscription(subId);
00761     subId = OpenDDS::DCPS::GUID_UNKNOWN;
00762   }
00763 
00764   if (this->um_ && (partPtr->isBitPublisher() == false)) {
00765     CORBA::String_var callback = orb_->object_to_string(subscription);
00766     Update::ContentSubscriptionInfo csi(filterClassName, filterExpression, exprParams);
00767 
00768     Update::URActor actor(domainId, subId, topicId, participantId, Update::DataReader
00769                           , callback.in()
00770                           , const_cast<DDS::SubscriberQos &>(subscriberQos)
00771                           , const_cast<DDS::DataReaderQos &>(qos)
00772                           , const_cast<OpenDDS::DCPS::TransportLocatorSeq &>
00773                           (transInfo), csi);
00774 
00775     this->um_->create(actor);
00776 
00777     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00778       OpenDDS::DCPS::RepoIdConverter converter(subId);
00779       ACE_DEBUG((LM_DEBUG,
00780                  ACE_TEXT("(%P|%t) (RepoId)TAO_DDS_DCPSInfo_i::add_subscription: ")
00781                  ACE_TEXT("pushing creation of subscription %C in domain %d.\n"),
00782                  std::string(converter).c_str(),
00783                  domainId));
00784     }
00785   }
00786 
00787   domainPtr->remove_dead_participants();
00788 
00789   return subId;
00790 }
00791 
00792 bool
00793 TAO_DDS_DCPSInfo_i::add_subscription(
00794   DDS::DomainId_t domainId,
00795   const OpenDDS::DCPS::RepoId& participantId,
00796   const OpenDDS::DCPS::RepoId& topicId,
00797   const OpenDDS::DCPS::RepoId& subId,
00798   const char* sub_str,
00799   const DDS::DataReaderQos & qos,
00800   const OpenDDS::DCPS::TransportLocatorSeq & transInfo,
00801   const DDS::SubscriberQos & subscriberQos,
00802   const char* filterClassName,
00803   const char* filterExpression,
00804   const DDS::StringSeq& exprParams,
00805   bool associate)
00806 {
00807   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, false);
00808 
00809   // Grab the domain.
00810   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
00811 
00812   if (where == this->domains_.end()) {
00813     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00814       ACE_DEBUG((LM_WARNING,
00815                  ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_subscription: ")
00816                  ACE_TEXT("invalid domain %d.\n"),
00817                  domainId));
00818     }
00819 
00820     return false;
00821   }
00822 
00823   // Grab the participant.
00824   DCPS_IR_Participant* partPtr
00825   = where->second->participant(participantId);
00826 
00827   if (0 == partPtr) {
00828     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00829       OpenDDS::DCPS::RepoIdConverter converter(participantId);
00830       ACE_DEBUG((LM_WARNING,
00831                  ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_subscription: ")
00832                  ACE_TEXT("invalid participant %C in domain %d.\n"),
00833                  std::string(converter).c_str(),
00834                  domainId));
00835     }
00836 
00837     return false;
00838   }
00839 
00840   DCPS_IR_Topic* topic = where->second->find_topic(topicId);
00841 
00842   if (topic == 0) {
00843     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00844       OpenDDS::DCPS::RepoIdConverter converter(topicId);
00845       ACE_DEBUG((LM_WARNING,
00846                  ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_subscription: ")
00847                  ACE_TEXT("invalid topic %C in domain %d.\n"),
00848                  std::string(converter).c_str(),
00849                  domainId));
00850     }
00851 
00852     return false;
00853   }
00854 
00855   CORBA::Object_var obj = (dispatchingOrb_ ? dispatchingOrb_ : orb_) ->string_to_object(sub_str);
00856   if (CORBA::is_nil(obj.in())) {
00857     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00858       ACE_DEBUG((LM_WARNING,
00859                  ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_subscription: ")
00860                  ACE_TEXT("failure converting string %C to objref\n"),
00861                  sub_str));
00862     }
00863     return false;
00864   }
00865 
00866   OpenDDS::DCPS::DataReaderRemote_var subscription = OpenDDS::DCPS::DataReaderRemote::_unchecked_narrow(obj.in());
00867 
00868   OpenDDS::DCPS::unique_ptr<DCPS_IR_Subscription> subPtr(
00869     new DCPS_IR_Subscription(
00870                    subId,
00871                    partPtr,
00872                    topic,
00873                    subscription.in(),
00874                    qos,
00875                    transInfo,
00876                    subscriberQos,
00877                    filterClassName,
00878                    filterExpression,
00879                    exprParams));
00880 
00881   DCPS_IR_Subscription* sub = subPtr.get();
00882   switch (partPtr->add_subscription(OpenDDS::DCPS::move(subPtr))) {
00883   case -1: {
00884     OpenDDS::DCPS::RepoIdConverter converter(subId);
00885     ACE_ERROR((LM_ERROR,
00886                ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::add_subscription: ")
00887                ACE_TEXT("failed to add subscription to participant %C.\n"),
00888                std::string(converter).c_str()));
00889     return false;
00890   }
00891 
00892   case 1:
00893     return false;
00894 
00895   case 0:
00896   default:
00897     break;
00898   }
00899 
00900   switch (topic->add_subscription_reference(sub, associate)) {
00901   case -1: {
00902     OpenDDS::DCPS::RepoIdConverter converter(subId);
00903     ACE_ERROR((LM_ERROR,
00904                ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::add_subscription: ")
00905                ACE_TEXT("failed to add subscription to participant %C topic list.\n"),
00906                std::string(converter).c_str()));
00907 
00908     // Remove the subscription.
00909     partPtr->remove_subscription(subId);
00910 
00911   }
00912   return false;
00913 
00914   case 1: // This is actually a really really bad place to jump to.
00915     // This means that we successfully added the new subscription
00916     // to the participant (it had not been inserted before) but
00917     // that we are adding a duplicate subscription to the topic
00918     // list - which should never ever be able to happen.
00919     return false;
00920 
00921   case 0:
00922   default:
00923     break;
00924   }
00925 
00926   OpenDDS::DCPS::RepoIdConverter converter(subId);
00927 
00928   // See if we are adding a subscription that was created within this
00929   // repository or a different repository.
00930   if (converter.federationId() == federation_.id()) {
00931     // Ensure the subscription RepoId values do not conflict.
00932     partPtr->last_subscription_key(converter.entityKey());
00933   }
00934 
00935   return true;
00936 }
00937 
00938 void TAO_DDS_DCPSInfo_i::remove_subscription(
00939   DDS::DomainId_t domainId,
00940   const OpenDDS::DCPS::RepoId& participantId,
00941   const OpenDDS::DCPS::RepoId& subscriptionId)
00942 {
00943   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00944 
00945   // Grab the domain.
00946   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
00947 
00948   if (where == this->domains_.end()) {
00949     throw OpenDDS::DCPS::Invalid_Domain();
00950   }
00951 
00952   // Grab the participant.
00953   DCPS_IR_Participant* partPtr
00954   = where->second->participant(participantId);
00955 
00956   if (0 == partPtr) {
00957     throw OpenDDS::DCPS::Invalid_Participant();
00958   }
00959 
00960   if (partPtr->remove_subscription(subscriptionId) != 0) {
00961     // throw exception because the subscription was not removed!
00962     throw OpenDDS::DCPS::Invalid_Subscription();
00963   }
00964 
00965   where->second->remove_dead_participants();
00966 
00967   if (this->um_
00968       && (partPtr->isOwner() == true)
00969       && (partPtr->isBitPublisher() == false)) {
00970     Update::IdPath path(domainId, participantId, subscriptionId);
00971     this->um_->destroy(path, Update::Actor, Update::DataReader);
00972 
00973     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00974       OpenDDS::DCPS::RepoIdConverter converter(subscriptionId);
00975       ACE_DEBUG((LM_DEBUG,
00976                  ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::remove_subscription: ")
00977                  ACE_TEXT("pushing deletion of subscription %C in domain %d.\n"),
00978                  std::string(converter).c_str(),
00979                  domainId));
00980     }
00981   }
00982 }
00983 
00984 OpenDDS::DCPS::AddDomainStatus TAO_DDS_DCPSInfo_i::add_domain_participant(
00985   DDS::DomainId_t domain,
00986   const DDS::DomainParticipantQos & qos)
00987 {
00988   // A value to return.
00989   OpenDDS::DCPS::AddDomainStatus value;
00990   value.id        = OpenDDS::DCPS::GUID_UNKNOWN;
00991   value.federated = this->federation_.overridden();
00992 
00993   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, value);
00994 
00995   // Grab the domain.
00996   DCPS_IR_Domain* domainPtr = this->domain(domain);
00997 
00998   if (0 == domainPtr) {
00999     throw OpenDDS::DCPS::Invalid_Domain();
01000   }
01001 
01002   // Obtain a shiny new GUID value.
01003   OpenDDS::DCPS::RepoId participantId = domainPtr->get_next_participant_id();
01004 
01005   // Determine if this is the 'special' repository internal participant
01006   // that publishes the built-in topics for a domain.
01007   bool isBitPart = domainPtr->participants().empty() && TheServiceParticipant->get_BIT();
01008 
01009   DCPS_IR_Participant_rch participant =
01010     OpenDDS::DCPS::make_rch<DCPS_IR_Participant>(
01011                    this->federation_,
01012                    participantId,
01013                    domainPtr,
01014                    qos, um_, isBitPart);
01015 
01016   // We created the participant, now we can return the Id value (eventually).
01017   value.id = participantId;
01018 
01019   if (isBitPart) {
01020     participant->isBitPublisher() = true;
01021 
01022     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
01023       OpenDDS::DCPS::RepoIdConverter converter(participantId);
01024       ACE_DEBUG((LM_DEBUG,
01025                  ACE_TEXT("(%P|%t) (RepoId)TAO_DDS_DCPSInfo_i::add_domain_participant: ")
01026                  ACE_TEXT("participant %C in domain %d is BIT publisher for this domain.\n"),
01027                  std::string(converter).c_str(),
01028                  domain));
01029     }
01030   }
01031 
01032   // Assume responsibility for writing back to the participant.
01033   participant->takeOwnership();
01034 
01035   int status = domainPtr->add_participant(participant);
01036 
01037   if (0 != status) {
01038     // Adding the participant failed return the invalid
01039     // participant Id number.
01040     participantId = OpenDDS::DCPS::GUID_UNKNOWN;
01041 
01042   } else if (this->um_) {
01043     OpenDDS::DCPS::RepoIdConverter converter(participantId);
01044     if (participant->isBitPublisher() == false) {
01045       // Push this participant to interested observers.
01046       Update::UParticipant updateParticipant(
01047         domain,
01048         participant->owner(),
01049         participantId,
01050         const_cast<DDS::DomainParticipantQos &>(qos));
01051       this->um_->create(updateParticipant);
01052 
01053       if (OpenDDS::DCPS::DCPS_debug_level > 4) {
01054         ACE_DEBUG((LM_DEBUG,
01055                    ACE_TEXT("(%P|%t) (RepoId)TAO_DDS_DCPSInfo_i::add_domain_participant: ")
01056                    ACE_TEXT("pushing creation of participant %C in domain %d.\n"),
01057                    std::string(converter).c_str(),
01058                    domain));
01059       }
01060     }
01061 
01062     // Update what the last participant id was
01063     um_->updateLastPartId(converter.participantId());
01064   }
01065 
01066   if (OpenDDS::DCPS::DCPS_debug_level > 4) {
01067     OpenDDS::DCPS::RepoIdConverter converter(participantId);
01068     ACE_DEBUG((LM_DEBUG,
01069                ACE_TEXT("(%P|%t) (RepoId)TAO_DDS_DCPSInfo_i::add_domain_participant: ")
01070                ACE_TEXT("domain %d loaded participant %C at 0x%x.\n"),
01071                domain,
01072                std::string(converter).c_str(),
01073                participant.get()));
01074   }
01075   return value;
01076 }
01077 
01078 bool
01079 TAO_DDS_DCPSInfo_i::add_domain_participant(DDS::DomainId_t domainId
01080                                            , const OpenDDS::DCPS::RepoId& participantId
01081                                            , const DDS::DomainParticipantQos & qos)
01082 {
01083   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, false);
01084 
01085   // Grab the domain.
01086   DCPS_IR_Domain* domainPtr = this->domain(domainId);
01087 
01088   if (0 == domainPtr) {
01089     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
01090       ACE_DEBUG((LM_WARNING,
01091                  ACE_TEXT("(%P|%t) WARNING: (bool)TAO_DDS_DCPSInfo_i::add_domain_participant: ")
01092                  ACE_TEXT("invalid domain Id: %d\n"),
01093                  domainId));
01094     }
01095 
01096     return false;
01097   }
01098 
01099   // Prepare to manipulate the participant's Id value.
01100   OpenDDS::DCPS::RepoIdConverter converter(participantId);
01101 
01102   // Determine if this is the 'special' repository internal participant
01103   // that publishes the built-in topics for a domain.
01104   bool isBitPart = domainPtr->participants().empty() && TheServiceParticipant->get_BIT();
01105 
01106   // Grab the participant.
01107   DCPS_IR_Participant* partPtr = domainPtr->participant(participantId);
01108 
01109   if (0 != partPtr) {
01110     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
01111       ACE_ERROR((LM_ERROR,
01112                  ACE_TEXT("(%P|%t) (bool)TAO_DDS_DCPSInfo_i::add_domain_participant: ")
01113                  ACE_TEXT("participant id %C already exists.\n"),
01114                  std::string(converter).c_str()));
01115     }
01116 
01117     return false;
01118   }
01119 
01120   DCPS_IR_Participant_rch participant =
01121     OpenDDS::DCPS::make_rch<DCPS_IR_Participant>(this->federation_,
01122                                      participantId,
01123                                      domainPtr,
01124                                      qos, um_, isBitPart);
01125 
01126   switch (domainPtr->add_participant(participant)) {
01127   case -1: {
01128     ACE_ERROR((LM_ERROR,
01129                ACE_TEXT("(%P|%t) ERROR: (bool)TAO_DDS_DCPSInfo_i::add_domain_participant: ")
01130                ACE_TEXT("failed to load participant %C in domain %d.\n"),
01131                std::string(converter).c_str(),
01132                domainId));
01133   }
01134   return false;
01135 
01136   case 1:
01137 
01138     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
01139       ACE_DEBUG((LM_WARNING,
01140                  ACE_TEXT("(%P|%t) WARNING: (bool)TAO_DDS_DCPSInfo_i::add_domain_participant: ")
01141                  ACE_TEXT("attempt to load duplicate participant %C in domain %d.\n"),
01142                  std::string(converter).c_str(),
01143                  domainId));
01144     }
01145 
01146     return false;
01147 
01148   case 0:
01149   default:
01150     break;
01151   }
01152 
01153   // See if we are adding a participant that was created within this
01154   // repository or a different repository.
01155   if (converter.federationId() == this->federation_.id()) {
01156     // Ensure the participant GUID values do not conflict.
01157     domainPtr->last_participant_key(converter.participantId());
01158 
01159     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
01160       ACE_DEBUG((LM_DEBUG,
01161                  ACE_TEXT("(%P|%t) (bool)TAO_DDS_DCPSInfo_i::add_domain_participant: ")
01162                  ACE_TEXT("Adjusting highest participant Id value to at least %d.\n"),
01163                  converter.participantId()));
01164     }
01165   }
01166 
01167   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
01168     ACE_DEBUG((LM_DEBUG,
01169                ACE_TEXT("(%P|%t) (bool)TAO_DDS_DCPSInfo_i::add_domain_participant: ")
01170                ACE_TEXT("loaded participant %C at 0x%x in domain %d.\n"),
01171                std::string(converter).c_str(),
01172                participant.in(),
01173                domainId));
01174   }
01175 
01176   return true;
01177 }
01178 
01179 bool
01180 TAO_DDS_DCPSInfo_i::remove_by_owner(
01181   DDS::DomainId_t domain,
01182   long              owner)
01183 {
01184   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, false);
01185 
01186   // Grab the domain.
01187   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domain);
01188 
01189   if (where == this->domains_.end()) {
01190     return false;
01191   }
01192 
01193   std::vector<OpenDDS::DCPS::RepoId> candidates;
01194 
01195   for (DCPS_IR_Participant_Map::const_iterator
01196        current = where->second->participants().begin();
01197        current != where->second->participants().end();
01198        ++current) {
01199     if (current->second->owner() == owner) {
01200       candidates.push_back(current->second->get_id());
01201     }
01202   }
01203 
01204   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
01205     ACE_DEBUG((LM_DEBUG,
01206                ACE_TEXT("(%P|%t) (bool)TAO_DDS_DCPSInfo_i::remove_by_owner: ")
01207                ACE_TEXT("%d participants to remove from domain %d.\n"),
01208                candidates.size(),
01209                domain));
01210   }
01211 
01212   bool status = true;
01213 
01214   for (unsigned int index = 0; index < candidates.size(); ++index) {
01215     DCPS_IR_Participant* participant
01216     = where->second->participant(candidates[index]);
01217     if (participant) {
01218       std::vector<OpenDDS::DCPS::RepoId> keylist;
01219 
01220       // Remove Subscriptions
01221       for (DCPS_IR_Subscription_Map::const_iterator
01222         current = participant->subscriptions().begin();
01223         current != participant->subscriptions().end();
01224         ++current) {
01225         keylist.push_back(current->second->get_id());
01226       }
01227 
01228       if (OpenDDS::DCPS::DCPS_debug_level > 0) {
01229         OpenDDS::DCPS::RepoIdConverter converter(candidates[index]);
01230         ACE_DEBUG((LM_DEBUG,
01231           ACE_TEXT("(%P|%t) (bool)TAO_DDS_DCPSInfo_i::remove_by_owner: ")
01232           ACE_TEXT("%d subscriptions to remove from participant %C.\n"),
01233           keylist.size(),
01234           std::string(converter).c_str()));
01235       }
01236 
01237       for (unsigned int key = 0; key < keylist.size(); ++key) {
01238         if (participant->remove_subscription(keylist[key]) != 0) {
01239           status = false;
01240         }
01241       }
01242 
01243       // Remove Publications
01244       keylist.clear();
01245 
01246       for (DCPS_IR_Publication_Map::const_iterator
01247         current = participant->publications().begin();
01248         current != participant->publications().end();
01249         ++current) {
01250         keylist.push_back(current->second->get_id());
01251       }
01252 
01253       if (OpenDDS::DCPS::DCPS_debug_level > 0) {
01254         OpenDDS::DCPS::RepoIdConverter converter(candidates[index]);
01255         ACE_DEBUG((LM_DEBUG,
01256           ACE_TEXT("(%P|%t) (bool)TAO_DDS_DCPSInfo_i::remove_by_owner: ")
01257           ACE_TEXT("%d publications to remove from participant %C.\n"),
01258           keylist.size(),
01259           std::string(converter).c_str()));
01260       }
01261 
01262       for (unsigned int key = 0; key < keylist.size(); ++key) {
01263         if (participant->remove_publication(keylist[key]) != 0) {
01264           status = false;
01265         }
01266       }
01267 
01268       // Remove Topics
01269       keylist.clear();
01270 
01271       for (DCPS_IR_Topic_Map::const_iterator
01272         current = participant->topics().begin();
01273         current != participant->topics().end();
01274         ++current) {
01275         keylist.push_back(current->second->get_id());
01276       }
01277 
01278       if (OpenDDS::DCPS::DCPS_debug_level > 0) {
01279         OpenDDS::DCPS::RepoIdConverter converter(candidates[index]);
01280         ACE_DEBUG((LM_DEBUG,
01281           ACE_TEXT("(%P|%t) (bool)TAO_DDS_DCPSInfo_i::remove_by_owner: ")
01282           ACE_TEXT("%d topics to remove from participant %C.\n"),
01283           keylist.size(),
01284           std::string(converter).c_str()));
01285       }
01286 
01287       for (unsigned int key = 0; key < keylist.size(); ++key) {
01288         DCPS_IR_Topic* discard;
01289 
01290         if (participant->remove_topic_reference(keylist[key], discard) != 0) {
01291           status = false;
01292         }
01293       }
01294     }
01295 
01296     // Remove Participant
01297     this->remove_domain_participant(domain, candidates[ index]);
01298   }
01299 
01300   return status;
01301 }
01302 
01303 void
01304 TAO_DDS_DCPSInfo_i::disassociate_participant(
01305   DDS::DomainId_t domainId,
01306   const OpenDDS::DCPS::RepoId& local_id,
01307   const OpenDDS::DCPS::RepoId& remote_id)
01308 {
01309   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
01310 
01311   DCPS_IR_Domain_Map::iterator it(this->domains_.find(domainId));
01312   if (it == this->domains_.end()) {
01313     throw OpenDDS::DCPS::Invalid_Domain();
01314   }
01315 
01316   DCPS_IR_Participant* participant = it->second->participant(local_id);
01317   if (participant == 0) {
01318     throw OpenDDS::DCPS::Invalid_Participant();
01319   }
01320 
01321   // Disassociate from participant temporarily:
01322   const DCPS_IR_Subscription_Map& subscriptions = participant->subscriptions();
01323   for (DCPS_IR_Subscription_Map::const_iterator sub(subscriptions.begin());
01324        sub != subscriptions.end(); ++sub) {
01325     sub->second->disassociate_participant(remote_id, true);
01326   }
01327 
01328   const DCPS_IR_Publication_Map& publications = participant->publications();
01329   for (DCPS_IR_Publication_Map::const_iterator pub(publications.begin());
01330        pub != publications.end(); ++pub) {
01331     pub->second->disassociate_participant(remote_id, true);
01332   }
01333 
01334   it->second->remove_dead_participants();
01335 }
01336 
01337 void
01338 TAO_DDS_DCPSInfo_i::disassociate_subscription(
01339   DDS::DomainId_t domainId,
01340   const OpenDDS::DCPS::RepoId& participantId,
01341   const OpenDDS::DCPS::RepoId& local_id,
01342   const OpenDDS::DCPS::RepoId& remote_id)
01343 {
01344   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
01345 
01346   DCPS_IR_Domain_Map::iterator it(this->domains_.find(domainId));
01347   if (it == this->domains_.end()) {
01348     throw OpenDDS::DCPS::Invalid_Domain();
01349   }
01350 
01351   DCPS_IR_Participant* participant = it->second->participant(participantId);
01352   if (participant == 0) {
01353     throw OpenDDS::DCPS::Invalid_Participant();
01354   }
01355 
01356   if (OpenDDS::DCPS::DCPS_debug_level > 3) {
01357     ACE_DEBUG((LM_INFO, "(%P|%t) disassociating subscription\n"));
01358   }
01359 
01360   DCPS_IR_Subscription* subscription;
01361   if (participant->find_subscription_reference(local_id, subscription)
01362       != 0 || subscription == 0) {
01363     OpenDDS::DCPS::RepoIdConverter part_converter(participantId);
01364     OpenDDS::DCPS::RepoIdConverter sub_converter(local_id);
01365     ACE_ERROR((LM_ERROR,
01366                ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::disassociate_subscription: ")
01367                ACE_TEXT("participant %C could not find subscription %C.\n"),
01368                std::string(part_converter).c_str(),
01369                std::string(sub_converter).c_str()));
01370     throw OpenDDS::DCPS::Invalid_Subscription();
01371   }
01372 
01373   // Disassociate from publication temporarily:
01374   subscription->disassociate_publication(remote_id, true);
01375 
01376   it->second->remove_dead_participants();
01377 }
01378 
01379 void
01380 TAO_DDS_DCPSInfo_i::disassociate_publication(
01381   DDS::DomainId_t domainId,
01382   const OpenDDS::DCPS::RepoId& participantId,
01383   const OpenDDS::DCPS::RepoId& local_id,
01384   const OpenDDS::DCPS::RepoId& remote_id)
01385 {
01386   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
01387 
01388   DCPS_IR_Domain_Map::iterator it(this->domains_.find(domainId));
01389   if (it == this->domains_.end()) {
01390     throw OpenDDS::DCPS::Invalid_Domain();
01391   }
01392 
01393   DCPS_IR_Participant* participant = it->second->participant(participantId);
01394   if (participant == 0) {
01395     throw OpenDDS::DCPS::Invalid_Participant();
01396   }
01397 
01398   if (OpenDDS::DCPS::DCPS_debug_level > 3) {
01399     ACE_DEBUG((LM_INFO, "(%P|%t) disassociating publication\n"));
01400   }
01401 
01402   DCPS_IR_Publication* publication;
01403   if (participant->find_publication_reference(local_id, publication)
01404       != 0 || publication == 0) {
01405     OpenDDS::DCPS::RepoIdConverter part_converter(participantId);
01406     OpenDDS::DCPS::RepoIdConverter pub_converter(local_id);
01407     ACE_ERROR((LM_ERROR,
01408                ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::disassociate_publication: ")
01409                ACE_TEXT("participant %C could not find publication %C.\n"),
01410                std::string(part_converter).c_str(),
01411                std::string(pub_converter).c_str()));
01412     throw OpenDDS::DCPS::Invalid_Publication();
01413   }
01414 
01415   // Disassociate from subscription temporarily:
01416   publication->disassociate_subscription(remote_id, true);
01417 
01418   it->second->remove_dead_participants();
01419 }
01420 
01421 void TAO_DDS_DCPSInfo_i::remove_domain_participant(
01422   DDS::DomainId_t domainId,
01423   const OpenDDS::DCPS::RepoId& participantId)
01424 {
01425   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
01426 
01427   // Grab the domain.
01428   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
01429 
01430   if (where == this->domains_.end()) {
01431     throw OpenDDS::DCPS::Invalid_Domain();
01432   }
01433 
01434   DCPS_IR_Participant* participant = where->second->participant(participantId);
01435 
01436   if (participant == 0) {
01437     OpenDDS::DCPS::RepoIdConverter converter(participantId);
01438     ACE_ERROR((LM_ERROR,
01439                ACE_TEXT("(%P|%t) ERROR: (bool)TAO_DDS_DCPSInfo_i::remove_domain_participant: ")
01440                ACE_TEXT("failed to locate participant %C in domain %d.\n"),
01441                std::string(converter).c_str(),
01442                domainId));
01443     throw OpenDDS::DCPS::Invalid_Participant();
01444   }
01445 
01446   // Determine if we should propagate this event;  we need to cache this
01447   // result as the participant will be gone by the time we use the result.
01448   bool sendUpdate = (participant->isOwner() == true)
01449                     && (participant->isBitPublisher() == false);
01450 
01451   CORBA::Boolean dont_notify_lost = 0;
01452   int status = where->second->remove_participant(participantId, dont_notify_lost);
01453 
01454   if (0 != status) {
01455     // Removing the participant failed
01456     throw OpenDDS::DCPS::Invalid_Participant();
01457   }
01458 
01459   // Update any concerned observers that the participant was destroyed.
01460   if (this->um_ && sendUpdate) {
01461     Update::IdPath path(
01462       where->second->get_id(),
01463       participantId,
01464       participantId);
01465     this->um_->destroy(path, Update::Participant);
01466 
01467     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
01468       OpenDDS::DCPS::RepoIdConverter converter(participantId);
01469       ACE_DEBUG((LM_DEBUG,
01470                  ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::remove_domain_participant: ")
01471                  ACE_TEXT("pushing deletion of participant %C in domain %d.\n"),
01472                  std::string(converter).c_str(),
01473                  domainId));
01474     }
01475   }
01476 
01477   if (where->second->participants().empty()) {
01478     domains_.erase(where);
01479   }
01480 
01481 #ifndef DDS_HAS_MINIMUM_BIT
01482   else if (where->second->useBIT() &&
01483            where->second->participants().size() == 1) {
01484     // The only participant left is the one we created to publish BITs.
01485     // It can be removed now since no user participants exist in this domain,
01486     // but it has to be removed on the Service Participant's reactor thread
01487     // in order to make the locking work properly in delete_participant().
01488     const ACE_Event_Handler_var eh = new BIT_Cleanup_Handler(this, domainId);
01489     TheServiceParticipant->reactor()->notify(eh.handler());
01490   }
01491 #endif
01492 }
01493 
01494 #ifndef DDS_HAS_MINIMUM_BIT
01495 int TAO_DDS_DCPSInfo_i::BIT_Cleanup_Handler::handle_exception(ACE_HANDLE)
01496 {
01497   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, parent_->lock_, 0);
01498 
01499   const DCPS_IR_Domain_Map::iterator where = parent_->domains_.find(domain_);
01500 
01501   if (where == parent_->domains_.end()) {
01502     return 0;
01503   }
01504 
01505   if (where->second->participants().size() == 1) {
01506     where->second->cleanup_built_in_topics();
01507   }
01508 
01509   return 0;
01510 }
01511 #endif
01512 
01513 void TAO_DDS_DCPSInfo_i::association_complete(DDS::DomainId_t domainId,
01514   const OpenDDS::DCPS::RepoId& participantId,
01515   const OpenDDS::DCPS::RepoId& localId,
01516   const OpenDDS::DCPS::RepoId& remoteId)
01517 {
01518   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
01519 
01520   DCPS_IR_Domain_Map::iterator dom_iter = this->domains_.find(domainId);
01521   if (dom_iter == this->domains_.end()) {
01522     return;
01523   }
01524 
01525   DCPS_IR_Participant* partPtr = dom_iter->second->participant(participantId);
01526   if (0 == partPtr) {
01527     return;
01528   }
01529 
01530   // localId could be pub or sub (initial implementation will only use sub
01531   // since the DataReader is the passive peer)
01532   DCPS_IR_Subscription* sub = 0;
01533   DCPS_IR_Publication* pub = 0;
01534   if (OpenDDS::DCPS::DCPS_debug_level > 3) {
01535     ACE_DEBUG((LM_INFO, "(%P|%t) completing association\n"));
01536   }
01537   if (0 == partPtr->find_subscription_reference(localId, sub)) {
01538     sub->association_complete(remoteId);
01539   } else if (0 == partPtr->find_publication_reference(localId, pub)) {
01540     pub->association_complete(remoteId);
01541   } else {
01542     if (OpenDDS::DCPS::DCPS_debug_level > 3) {
01543       OpenDDS::DCPS::RepoIdConverter part_converter(participantId);
01544       OpenDDS::DCPS::RepoIdConverter local_converter(localId);
01545       OpenDDS::DCPS::RepoIdConverter remote_converter(remoteId);
01546       ACE_DEBUG((LM_WARNING,
01547                  ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i::association_complete: ")
01548                  ACE_TEXT("participant %C could not find subscription or publication %C ")
01549                  ACE_TEXT("to complete association with remote %C.\n"),
01550                  std::string(part_converter).c_str(),
01551                  std::string(local_converter).c_str(),
01552                  std::string(remote_converter).c_str()));
01553     }
01554   }
01555 }
01556 
01557 void TAO_DDS_DCPSInfo_i::ignore_domain_participant(
01558   DDS::DomainId_t domainId,
01559   const OpenDDS::DCPS::RepoId& myParticipantId,
01560   const OpenDDS::DCPS::RepoId& ignoreId)
01561 {
01562   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
01563 
01564   // Grab the domain.
01565   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
01566 
01567   if (where == this->domains_.end()) {
01568     throw OpenDDS::DCPS::Invalid_Domain();
01569   }
01570 
01571   // Grab the participant.
01572   DCPS_IR_Participant* partPtr
01573   = where->second->participant(myParticipantId);
01574 
01575   if (0 == partPtr) {
01576     throw OpenDDS::DCPS::Invalid_Participant();
01577   }
01578 
01579   partPtr->ignore_participant(ignoreId);
01580 
01581   where->second->remove_dead_participants();
01582 }
01583 
01584 void TAO_DDS_DCPSInfo_i::ignore_topic(
01585   DDS::DomainId_t domainId,
01586   const OpenDDS::DCPS::RepoId& myParticipantId,
01587   const OpenDDS::DCPS::RepoId& ignoreId)
01588 {
01589   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
01590 
01591   // Grab the domain.
01592   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
01593 
01594   if (where == this->domains_.end()) {
01595     throw OpenDDS::DCPS::Invalid_Domain();
01596   }
01597 
01598   // Grab the participant.
01599   DCPS_IR_Participant* partPtr
01600   = where->second->participant(myParticipantId);
01601 
01602   if (0 == partPtr) {
01603     throw OpenDDS::DCPS::Invalid_Participant();
01604   }
01605 
01606   partPtr->ignore_topic(ignoreId);
01607 
01608   where->second->remove_dead_participants();
01609 }
01610 
01611 void TAO_DDS_DCPSInfo_i::ignore_subscription(
01612   DDS::DomainId_t domainId,
01613   const OpenDDS::DCPS::RepoId& myParticipantId,
01614   const OpenDDS::DCPS::RepoId& ignoreId)
01615 {
01616   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
01617 
01618   // Grab the domain.
01619   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
01620 
01621   if (where == this->domains_.end()) {
01622     throw OpenDDS::DCPS::Invalid_Domain();
01623   }
01624 
01625   // Grab the participant.
01626   DCPS_IR_Participant* partPtr
01627   = where->second->participant(myParticipantId);
01628 
01629   if (0 == partPtr) {
01630     throw OpenDDS::DCPS::Invalid_Participant();
01631   }
01632 
01633   partPtr->ignore_subscription(ignoreId);
01634 
01635   where->second->remove_dead_participants();
01636 }
01637 
01638 void TAO_DDS_DCPSInfo_i::ignore_publication(
01639   DDS::DomainId_t domainId,
01640   const OpenDDS::DCPS::RepoId& myParticipantId,
01641   const OpenDDS::DCPS::RepoId& ignoreId)
01642 {
01643   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
01644 
01645   // Grab the domain.
01646   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
01647 
01648   if (where == this->domains_.end()) {
01649     throw OpenDDS::DCPS::Invalid_Domain();
01650   }
01651 
01652   // Grab the participant.
01653   DCPS_IR_Participant* partPtr
01654   = where->second->participant(myParticipantId);
01655 
01656   if (0 == partPtr) {
01657     throw OpenDDS::DCPS::Invalid_Participant();
01658   }
01659 
01660   partPtr->ignore_publication(ignoreId);
01661 
01662   where->second->remove_dead_participants();
01663 }
01664 
01665 CORBA::Boolean TAO_DDS_DCPSInfo_i::update_publication_qos(
01666   DDS::DomainId_t domainId,
01667   const OpenDDS::DCPS::RepoId& partId,
01668   const OpenDDS::DCPS::RepoId& dwId,
01669   const DDS::DataWriterQos & qos,
01670   const DDS::PublisherQos & publisherQos)
01671 {
01672   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, 0);
01673 
01674   // Grab the domain.
01675   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
01676 
01677   if (where == this->domains_.end()) {
01678     throw OpenDDS::DCPS::Invalid_Domain();
01679   }
01680 
01681   // Grab the participant.
01682   DCPS_IR_Participant* partPtr
01683   = where->second->participant(partId);
01684 
01685   if (0 == partPtr) {
01686     throw OpenDDS::DCPS::Invalid_Participant();
01687   }
01688 
01689   if (OpenDDS::DCPS::DCPS_debug_level > 3) {
01690     ACE_DEBUG((LM_INFO, "(%P|%t) updating  publication qos 1\n"));
01691   }
01692 
01693   DCPS_IR_Publication* pub;
01694 
01695   if (partPtr->find_publication_reference(dwId, pub) != 0 || pub == 0) {
01696     OpenDDS::DCPS::RepoIdConverter part_converter(partId);
01697     OpenDDS::DCPS::RepoIdConverter pub_converter(dwId);
01698     ACE_ERROR((LM_ERROR,
01699                ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::update_publication_qos: ")
01700                ACE_TEXT("participant %C could not find publication %C.\n"),
01701                std::string(part_converter).c_str(),
01702                std::string(pub_converter).c_str()));
01703     throw OpenDDS::DCPS::Invalid_Publication();
01704   }
01705 
01706   Update::SpecificQos qosType;
01707 
01708   if (pub->set_qos(qos, publisherQos, qosType) == false)  // failed
01709     return 0;
01710 
01711   if (this->um_ && (partPtr->isBitPublisher() == false)) {
01712     Update::IdPath path(domainId, partId, dwId);
01713 
01714     switch (qosType) {
01715     case Update::DataWriterQos:
01716       this->um_->update(path, qos);
01717       break;
01718 
01719     case Update::PublisherQos:
01720       this->um_->update(path, publisherQos);
01721       break;
01722 
01723     case Update::NoQos:
01724     default:
01725       break;
01726     }
01727 
01728     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
01729       OpenDDS::DCPS::RepoIdConverter converter(dwId);
01730       ACE_DEBUG((LM_DEBUG,
01731                  ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::update_publication_qos: ")
01732                  ACE_TEXT("pushing update of publication %C in domain %d.\n"),
01733                  std::string(converter).c_str(),
01734                  domainId));
01735     }
01736   }
01737 
01738   return 1;
01739 }
01740 
01741 void
01742 TAO_DDS_DCPSInfo_i::update_publication_qos(
01743   DDS::DomainId_t            domainId,
01744   const OpenDDS::DCPS::RepoId& partId,
01745   const OpenDDS::DCPS::RepoId& dwId,
01746   const DDS::DataWriterQos&  qos)
01747 {
01748   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
01749 
01750   // Grab the domain.
01751   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
01752 
01753   if (where == this->domains_.end()) {
01754     throw OpenDDS::DCPS::Invalid_Domain();
01755   }
01756 
01757   // Grab the participant.
01758   DCPS_IR_Participant* partPtr
01759   = where->second->participant(partId);
01760 
01761   if (0 == partPtr) {
01762     throw OpenDDS::DCPS::Invalid_Participant();
01763   }
01764 
01765   if (OpenDDS::DCPS::DCPS_debug_level > 3) {
01766     ACE_DEBUG((LM_INFO, "(%P|%t) updating  publication qos 2\n"));
01767   }
01768 
01769   DCPS_IR_Publication* pub;
01770 
01771   if (partPtr->find_publication_reference(dwId, pub) != 0 || pub == 0) {
01772     OpenDDS::DCPS::RepoIdConverter part_converter(partId);
01773     OpenDDS::DCPS::RepoIdConverter pub_converter(dwId);
01774     ACE_ERROR((LM_ERROR,
01775                ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::update_publication_qos: ")
01776                ACE_TEXT("participant %C could not find publication %C.\n"),
01777                std::string(part_converter).c_str(),
01778                std::string(pub_converter).c_str()));
01779     throw OpenDDS::DCPS::Invalid_Publication();
01780   }
01781 
01782   pub->set_qos(qos);
01783 }
01784 
01785 void
01786 TAO_DDS_DCPSInfo_i::update_publication_qos(
01787   DDS::DomainId_t            domainId,
01788   const OpenDDS::DCPS::RepoId& partId,
01789   const OpenDDS::DCPS::RepoId& dwId,
01790   const DDS::PublisherQos&   qos)
01791 {
01792   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
01793 
01794   // Grab the domain.
01795   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
01796 
01797   if (where == this->domains_.end()) {
01798     throw OpenDDS::DCPS::Invalid_Domain();
01799   }
01800 
01801   // Grab the participant.
01802   DCPS_IR_Participant* partPtr
01803   = where->second->participant(partId);
01804 
01805   if (0 == partPtr) {
01806     throw OpenDDS::DCPS::Invalid_Participant();
01807   }
01808 
01809   if (OpenDDS::DCPS::DCPS_debug_level > 3) {
01810     ACE_DEBUG((LM_INFO, "(%P|%t) updating  publication qos 3\n"));
01811   }
01812 
01813   DCPS_IR_Publication* pub;
01814 
01815   if (partPtr->find_publication_reference(dwId, pub) != 0 || pub == 0) {
01816     OpenDDS::DCPS::RepoIdConverter part_converter(partId);
01817     OpenDDS::DCPS::RepoIdConverter pub_converter(dwId);
01818     ACE_ERROR((LM_ERROR,
01819                ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::update_publication_qos: ")
01820                ACE_TEXT("participant %C could not find publication %C.\n"),
01821                std::string(part_converter).c_str(),
01822                std::string(pub_converter).c_str()));
01823     throw OpenDDS::DCPS::Invalid_Publication();
01824   }
01825 
01826   pub->set_qos(qos);
01827 }
01828 
01829 CORBA::Boolean TAO_DDS_DCPSInfo_i::update_subscription_qos(
01830   DDS::DomainId_t domainId,
01831   const OpenDDS::DCPS::RepoId& partId,
01832   const OpenDDS::DCPS::RepoId& drId,
01833   const DDS::DataReaderQos & qos,
01834   const DDS::SubscriberQos & subscriberQos)
01835 {
01836   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, 0);
01837 
01838   // Grab the domain.
01839   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
01840 
01841   if (where == this->domains_.end()) {
01842     throw OpenDDS::DCPS::Invalid_Domain();
01843   }
01844 
01845   // Grab the participant.
01846   DCPS_IR_Participant* partPtr
01847   = where->second->participant(partId);
01848 
01849   if (0 == partPtr) {
01850     throw OpenDDS::DCPS::Invalid_Participant();
01851   }
01852 
01853   DCPS_IR_Subscription* sub;
01854 
01855   if (OpenDDS::DCPS::DCPS_debug_level > 3) {
01856     ACE_DEBUG((LM_INFO, "(%P|%t) updating QOS for subscription 1\n"));
01857   }
01858 
01859   if (partPtr->find_subscription_reference(drId, sub) != 0 || sub == 0) {
01860     OpenDDS::DCPS::RepoIdConverter part_converter(partId);
01861     OpenDDS::DCPS::RepoIdConverter sub_converter(drId);
01862     ACE_ERROR((LM_ERROR,
01863                ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::update_subscription_qos: ")
01864                ACE_TEXT("participant %C could not find subscription %C.\n"),
01865                std::string(part_converter).c_str(),
01866                std::string(sub_converter).c_str()));
01867     throw OpenDDS::DCPS::Invalid_Subscription();
01868   }
01869 
01870   Update::SpecificQos qosType;
01871 
01872   if (sub->set_qos(qos, subscriberQos, qosType) == false) // failed
01873     return 0;
01874 
01875   if (this->um_ && (partPtr->isBitPublisher() == false)) {
01876     Update::IdPath path(domainId, partId, drId);
01877 
01878     switch (qosType) {
01879     case Update::DataReaderQos:
01880       this->um_->update(path, qos);
01881       break;
01882 
01883     case Update::SubscriberQos:
01884       this->um_->update(path, subscriberQos);
01885       break;
01886 
01887     case Update::NoQos:
01888     default:
01889       break;
01890     }
01891 
01892     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
01893       OpenDDS::DCPS::RepoIdConverter converter(drId);
01894       ACE_DEBUG((LM_DEBUG,
01895                  ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::update_subscription_qos: ")
01896                  ACE_TEXT("pushing update of subscription %C in domain %d.\n"),
01897                  std::string(converter).c_str(),
01898                  domainId));
01899     }
01900   }
01901 
01902   return 1;
01903 }
01904 
01905 void
01906 TAO_DDS_DCPSInfo_i::update_subscription_qos(
01907   DDS::DomainId_t            domainId,
01908   const OpenDDS::DCPS::RepoId& partId,
01909   const OpenDDS::DCPS::RepoId& drId,
01910   const DDS::DataReaderQos&  qos)
01911 {
01912   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
01913 
01914   // Grab the domain.
01915   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
01916 
01917   if (where == this->domains_.end()) {
01918     throw OpenDDS::DCPS::Invalid_Domain();
01919   }
01920 
01921   // Grab the participant.
01922   DCPS_IR_Participant* partPtr
01923   = where->second->participant(partId);
01924 
01925   if (0 == partPtr) {
01926     throw OpenDDS::DCPS::Invalid_Participant();
01927   }
01928 
01929   DCPS_IR_Subscription* sub;
01930 
01931   if (OpenDDS::DCPS::DCPS_debug_level > 3) {
01932     ACE_DEBUG((LM_INFO, "(%P|%t) updating QOS for subscription 2\n"));
01933   }
01934 
01935   if (partPtr->find_subscription_reference(drId, sub) != 0 || sub == 0) {
01936     OpenDDS::DCPS::RepoIdConverter part_converter(partId);
01937     OpenDDS::DCPS::RepoIdConverter sub_converter(drId);
01938     ACE_ERROR((LM_ERROR,
01939                ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::update_subscription_qos: ")
01940                ACE_TEXT("participant %C could not find subscription %C.\n"),
01941                std::string(part_converter).c_str(),
01942                std::string(sub_converter).c_str()));
01943     throw OpenDDS::DCPS::Invalid_Subscription();
01944   }
01945 
01946   sub->set_qos(qos);
01947 }
01948 
01949 void
01950 TAO_DDS_DCPSInfo_i::update_subscription_qos(
01951   DDS::DomainId_t            domainId,
01952   const OpenDDS::DCPS::RepoId& partId,
01953   const OpenDDS::DCPS::RepoId& drId,
01954   const DDS::SubscriberQos&  qos)
01955 {
01956   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
01957 
01958   // Grab the domain.
01959   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
01960 
01961   if (where == this->domains_.end()) {
01962     throw OpenDDS::DCPS::Invalid_Domain();
01963   }
01964 
01965   // Grab the participant.
01966   DCPS_IR_Participant* partPtr
01967   = where->second->participant(partId);
01968 
01969   if (0 == partPtr) {
01970     throw OpenDDS::DCPS::Invalid_Participant();
01971   }
01972 
01973   DCPS_IR_Subscription* sub;
01974 
01975   if (OpenDDS::DCPS::DCPS_debug_level > 3) {
01976     ACE_DEBUG((LM_INFO, "(%P|%t) updating QOS for subscription 3\n"));
01977   }
01978 
01979   if (partPtr->find_subscription_reference(drId, sub) != 0 || sub == 0) {
01980     OpenDDS::DCPS::RepoIdConverter part_converter(partId);
01981     OpenDDS::DCPS::RepoIdConverter sub_converter(drId);
01982     ACE_ERROR((LM_ERROR,
01983                ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::update_subscription_qos: ")
01984                ACE_TEXT("participant %C could not find subscription %C.\n"),
01985                std::string(part_converter).c_str(),
01986                std::string(sub_converter).c_str()));
01987     throw OpenDDS::DCPS::Invalid_Subscription();
01988   }
01989 
01990   sub->set_qos(qos);
01991 }
01992 
01993 CORBA::Boolean
01994 TAO_DDS_DCPSInfo_i::update_subscription_params(
01995     DDS::DomainId_t domainId,
01996     const OpenDDS::DCPS::RepoId& participantId,
01997     const OpenDDS::DCPS::RepoId& subscriptionId,
01998     const DDS::StringSeq& params)
01999 {
02000   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, 0);
02001 
02002   DCPS_IR_Domain_Map::iterator domain = this->domains_.find(domainId);
02003   if (domain == this->domains_.end()) {
02004     throw OpenDDS::DCPS::Invalid_Domain();
02005   }
02006 
02007   DCPS_IR_Participant* partPtr = domain->second->participant(participantId);
02008   if (0 == partPtr) {
02009     throw OpenDDS::DCPS::Invalid_Participant();
02010   }
02011 
02012   if (OpenDDS::DCPS::DCPS_debug_level > 3) {
02013     ACE_DEBUG((LM_INFO, "(%P|%t) updating subscription params\n"));
02014   }
02015 
02016   DCPS_IR_Subscription* sub;
02017   if (partPtr->find_subscription_reference(subscriptionId, sub) != 0) {
02018     OpenDDS::DCPS::RepoIdConverter part_converter(participantId);
02019     OpenDDS::DCPS::RepoIdConverter sub_converter(subscriptionId);
02020     ACE_ERROR((LM_ERROR,
02021                ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::update_subscription_params: ")
02022                ACE_TEXT("participant %C could not find subscription %C.\n"),
02023                std::string(part_converter).c_str(),
02024                std::string(sub_converter).c_str()));
02025     throw OpenDDS::DCPS::Invalid_Subscription();
02026   }
02027 
02028   sub->update_expr_params(params);  // calls writers via DataWriterRemote
02029 
02030   if (this->um_ && !partPtr->isBitPublisher()) {
02031     Update::IdPath path(domainId, participantId, subscriptionId);
02032     this->um_->update(path, params);
02033   }
02034 
02035   return true;
02036 }
02037 
02038 CORBA::Boolean TAO_DDS_DCPSInfo_i::update_topic_qos(
02039   const OpenDDS::DCPS::RepoId& topicId,
02040   DDS::DomainId_t domainId,
02041   const OpenDDS::DCPS::RepoId& participantId,
02042   const DDS::TopicQos & qos)
02043 {
02044   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, 0);
02045 
02046   // Grab the domain.
02047   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
02048 
02049   if (where == this->domains_.end()) {
02050     throw OpenDDS::DCPS::Invalid_Domain();
02051   }
02052 
02053   // Grab the participant.
02054   DCPS_IR_Participant* partPtr
02055   = where->second->participant(participantId);
02056 
02057   if (0 == partPtr) {
02058     throw OpenDDS::DCPS::Invalid_Participant();
02059   }
02060 
02061   DCPS_IR_Topic* topic;
02062 
02063   if (partPtr->find_topic_reference(topicId, topic) != 0) {
02064     throw OpenDDS::DCPS::Invalid_Topic();
02065   }
02066 
02067   if (topic->set_topic_qos(qos) == false)
02068     return 0;
02069 
02070   if (this->um_
02071       && (partPtr->isOwner() == true)
02072       && (partPtr->isBitPublisher() == false)) {
02073     Update::IdPath path(domainId, participantId, topicId);
02074     this->um_->update(path, qos);
02075 
02076     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
02077       OpenDDS::DCPS::RepoIdConverter converter(topicId);
02078       ACE_DEBUG((LM_DEBUG,
02079                  ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::update_topic_qos: ")
02080                  ACE_TEXT("pushing update of topic %C in domain %d.\n"),
02081                  std::string(converter).c_str(),
02082                  domainId));
02083     }
02084   }
02085 
02086   return 1;
02087 }
02088 
02089 CORBA::Boolean TAO_DDS_DCPSInfo_i::update_domain_participant_qos(
02090   DDS::DomainId_t domainId,
02091   const OpenDDS::DCPS::RepoId& participantId,
02092   const DDS::DomainParticipantQos & qos)
02093 {
02094   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, 0);
02095 
02096   // Grab the domain.
02097   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
02098 
02099   if (where == this->domains_.end()) {
02100     throw OpenDDS::DCPS::Invalid_Domain();
02101   }
02102 
02103   // Grab the participant.
02104   DCPS_IR_Participant* partPtr
02105   = where->second->participant(participantId);
02106 
02107   if (0 == partPtr) {
02108     throw OpenDDS::DCPS::Invalid_Participant();
02109   }
02110 
02111   if (partPtr->set_qos(qos) == false)
02112     return 0;
02113 
02114   if (this->um_
02115       && (partPtr->isOwner() == true)
02116       && (partPtr->isBitPublisher() == false)) {
02117     Update::IdPath path(domainId, participantId, participantId);
02118     this->um_->update(path, qos);
02119 
02120     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
02121       OpenDDS::DCPS::RepoIdConverter converter(participantId);
02122       ACE_DEBUG((LM_DEBUG,
02123                  ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::update_domain_participant_qos: ")
02124                  ACE_TEXT("pushing update of participant %C in domain %d.\n"),
02125                  std::string(converter).c_str(),
02126                  domainId));
02127     }
02128   }
02129 
02130   return 1;
02131 }
02132 
02133 DCPS_IR_Domain*
02134 TAO_DDS_DCPSInfo_i::domain(DDS::DomainId_t domain)
02135 {
02136   if (domain == OpenDDS::DCPS::Service_Participant::ANY_DOMAIN) {
02137     ACE_ERROR((LM_ERROR,
02138                ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::domain: ")
02139                ACE_TEXT("ANY_DOMAIN not supported for operations.\n")));
02140     return 0;
02141   }
02142 
02143   // Check if the domain is already in the map.
02144   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domain);
02145 
02146   if (where == this->domains_.end()) {
02147     // We will attempt to insert a new domain, go ahead and allocate it.
02148     OpenDDS::DCPS::unique_ptr<DCPS_IR_Domain> domain_uptr( new
02149                    DCPS_IR_Domain(domain, this->participantIdGenerator_));
02150 
02151     DCPS_IR_Domain* domainPtr = domain_uptr.get();
02152 
02153     // We need to insert the domain into the map at this time since it
02154     // might be looked up during the init_built_in_topics() call.
02155     this->domains_.insert(
02156       where,
02157       DCPS_IR_Domain_Map::value_type(domain, OpenDDS::DCPS::move(domain_uptr)));
02158 
02159 #ifndef DDS_HAS_MINIMUM_BIT
02160     if (TheServiceParticipant->get_BIT() && !domainPtr->useBIT() &&
02161       domainPtr->init_built_in_topics(federation_.overridden(), reincarnate_)
02162     ) {
02163       ACE_ERROR((LM_ERROR,
02164                  ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::domain: ")
02165                  ACE_TEXT("failed to initialize the Built-In Topics ")
02166                  ACE_TEXT("when loading domain %d.\n"),
02167                  domain));
02168       this->domains_.erase(domain);
02169       return 0;
02170     }
02171 #endif
02172 
02173     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
02174       ACE_DEBUG((LM_DEBUG,
02175                  ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::domain: ")
02176                  ACE_TEXT("successfully loaded domain %d at %x.\n"),
02177                  domain,
02178                  domainPtr));
02179     }
02180     return domainPtr;
02181 
02182   } else {
02183     return where->second.get();
02184   }
02185 }
02186 
02187 int TAO_DDS_DCPSInfo_i::init_transport(int listen_address_given,
02188                                        const char* listen_str)
02189 {
02190   int status = 0;
02191 
02192   try {
02193 
02194 #ifndef ACE_AS_STATIC_LIBS
02195     if (ACE_Service_Config::current()->find(ACE_TEXT("OpenDDS_Tcp"))
02196         < 0 /* not found (-1) or suspended (-2) */) {
02197       static const ACE_TCHAR directive[] =
02198         ACE_TEXT("dynamic OpenDDS_Tcp Service_Object * ")
02199         ACE_TEXT("OpenDDS_Tcp:_make_TcpLoader()");
02200       ACE_Service_Config::process_directive(directive);
02201     }
02202 #endif
02203 
02204     std::string config_name =
02205       OpenDDS::DCPS::TransportRegistry::DEFAULT_INST_PREFIX
02206       + std::string("InfoRepoBITTransportConfig");
02207     OpenDDS::DCPS::TransportConfig_rch config =
02208       OpenDDS::DCPS::TransportRegistry::instance()->create_config(config_name);
02209 
02210     std::string inst_name =
02211       OpenDDS::DCPS::TransportRegistry::DEFAULT_INST_PREFIX
02212       + std::string("InfoRepoBITTCPTransportInst");
02213     OpenDDS::DCPS::TransportInst_rch inst =
02214       OpenDDS::DCPS::TransportRegistry::instance()->create_inst(inst_name,
02215                                                                "tcp");
02216     config->instances_.push_back(inst);
02217 
02218     OpenDDS::DCPS::TcpInst_rch tcp_inst =
02219       OpenDDS::DCPS::dynamic_rchandle_cast<OpenDDS::DCPS::TcpInst>(inst);
02220     inst->datalink_release_delay_ = 0;
02221 
02222     tcp_inst->conn_retry_attempts_ = 0;
02223 
02224     if (listen_address_given) {
02225       tcp_inst->local_address(listen_str);
02226     }
02227 
02228   } catch (...) {
02229     // TransportRegistry is extremely varied in the exceptions that
02230     // it throws on failure; do not allow exceptions to bubble up
02231     // beyond this point.
02232     status = 1;
02233   }
02234   return status;
02235 }
02236 
02237 bool
02238 TAO_DDS_DCPSInfo_i::receive_image(const Update::UImage& image)
02239 {
02240   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
02241     ACE_DEBUG((LM_DEBUG,
02242                ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::receive_image: ")
02243                ACE_TEXT("processing persistent data.\n")));
02244   }
02245 
02246   // Initialize builtin topics first so that they always have the same IDs
02247 #ifndef DDS_HAS_MINIMUM_BIT
02248   if (TheServiceParticipant->get_BIT()) {
02249     for (Update::UImage::ParticipantSeq::const_iterator
02250          iter = image.participants.begin();
02251          iter != image.participants.end(); iter++) {
02252       const Update::UParticipant* part = *iter;
02253       if (!domain(part->domainId)) {
02254         if (OpenDDS::DCPS::DCPS_debug_level > 4) {
02255           ACE_DEBUG((LM_WARNING,
02256                      ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i::receive_image: ")
02257                      ACE_TEXT("invalid domain Id: %d\n"),
02258                      part->domainId));
02259         }
02260         return false;
02261       }
02262     }
02263   }
02264 #endif
02265 
02266   // Ensure that new non-BIT participants do not reuse an id
02267   participantIdGenerator_.last(image.lastPartId);
02268 
02269   for (Update::UImage::ParticipantSeq::const_iterator
02270        iter = image.participants.begin();
02271        iter != image.participants.end(); iter++) {
02272     const Update::UParticipant* part = *iter;
02273 
02274     if (!this->add_domain_participant(part->domainId, part->participantId
02275                                       , part->participantQos)) {
02276       OpenDDS::DCPS::RepoIdConverter converter(part->participantId);
02277       ACE_ERROR((LM_ERROR,
02278                  ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::receive_image: ")
02279                  ACE_TEXT("failed to add participant %C to domain %d.\n"),
02280                  std::string(converter).c_str(),
02281                  part->domainId));
02282       return false;
02283 
02284     } else if (OpenDDS::DCPS::DCPS_debug_level > 0) {
02285       OpenDDS::DCPS::RepoIdConverter converter(part->participantId);
02286       ACE_DEBUG((LM_DEBUG,
02287                  ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::receive_image: ")
02288                  ACE_TEXT("added participant %C to domain %d.\n"),
02289                  std::string(converter).c_str(),
02290                  part->domainId));
02291     }
02292   }
02293 
02294   for (Update::UImage::TopicSeq::const_iterator iter = image.topics.begin();
02295        iter != image.topics.end(); iter++) {
02296     const Update::UTopic* topic = *iter;
02297 
02298     if (!this->add_topic(topic->topicId, topic->domainId
02299                          , topic->participantId, topic->name.c_str()
02300                          , topic->dataType.c_str(), topic->topicQos)) {
02301       OpenDDS::DCPS::RepoIdConverter topic_converter(topic->topicId);
02302       OpenDDS::DCPS::RepoIdConverter part_converter(topic->participantId);
02303       ACE_ERROR((LM_ERROR,
02304                  ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::receive_image: ")
02305                  ACE_TEXT("failed to add topic %C to participant %C.\n"),
02306                  std::string(topic_converter).c_str(),
02307                  std::string(part_converter).c_str()));
02308       return false;
02309 
02310     } else if (OpenDDS::DCPS::DCPS_debug_level > 0) {
02311       OpenDDS::DCPS::RepoIdConverter topic_converter(topic->topicId);
02312       OpenDDS::DCPS::RepoIdConverter part_converter(topic->participantId);
02313       ACE_DEBUG((LM_DEBUG,
02314                  ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::receive_image: ")
02315                  ACE_TEXT("added topic %C to participant %C.\n"),
02316                  std::string(topic_converter).c_str(),
02317                  std::string(part_converter).c_str()));
02318     }
02319   }
02320 
02321   for (Update::UImage::ReaderSeq::const_iterator iter = image.actors.begin();
02322        iter != image.actors.end(); iter++) {
02323     const Update::URActor* sub = *iter;
02324 
02325     // no reason to associate, there are no publishers yet to associate with
02326     if (!this->add_subscription(sub->domainId, sub->participantId
02327                                 , sub->topicId, sub->actorId
02328                                 , sub->callback.c_str(), sub->drdwQos
02329                                 , sub->transportInterfaceInfo
02330                                 , sub->pubsubQos
02331                                 , sub->contentSubscriptionProfile.filterClassName
02332                                 , sub->contentSubscriptionProfile.filterExpr
02333                                 , sub->contentSubscriptionProfile.exprParams)) {
02334       OpenDDS::DCPS::RepoIdConverter sub_converter(sub->actorId);
02335       OpenDDS::DCPS::RepoIdConverter part_converter(sub->participantId);
02336       ACE_ERROR((LM_ERROR,
02337                  ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::receive_image: ")
02338                  ACE_TEXT("failed to add subscription %C to participant %C.\n"),
02339                  std::string(sub_converter).c_str(),
02340                  std::string(part_converter).c_str()));
02341       return false;
02342 
02343     } else if (OpenDDS::DCPS::DCPS_debug_level > 0) {
02344       OpenDDS::DCPS::RepoIdConverter sub_converter(sub->actorId);
02345       OpenDDS::DCPS::RepoIdConverter part_converter(sub->participantId);
02346       ACE_DEBUG((LM_DEBUG,
02347                  ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::receive_image: ")
02348                  ACE_TEXT("added subscription %C to participant %C.\n"),
02349                  std::string(sub_converter).c_str(),
02350                  std::string(part_converter).c_str()));
02351     }
02352   }
02353 
02354   for (Update::UImage::WriterSeq::const_iterator iter = image.wActors.begin();
02355        iter != image.wActors.end(); iter++) {
02356     const Update::UWActor* pub = *iter;
02357 
02358     // try to associate with any persisted subscriptions to track any expected
02359     // existing associations
02360     if (!this->add_publication(pub->domainId, pub->participantId
02361                                , pub->topicId, pub->actorId
02362                                , pub->callback.c_str() , pub->drdwQos
02363                                , pub->transportInterfaceInfo
02364                                , pub->pubsubQos
02365                                , true)) {
02366       OpenDDS::DCPS::RepoIdConverter pub_converter(pub->actorId);
02367       OpenDDS::DCPS::RepoIdConverter part_converter(pub->participantId);
02368       ACE_ERROR((LM_ERROR,
02369                  ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::receive_image: ")
02370                  ACE_TEXT("failed to add publication %C to participant %C.\n"),
02371                  std::string(pub_converter).c_str(),
02372                  std::string(part_converter).c_str()));
02373       return false;
02374 
02375     } else if (OpenDDS::DCPS::DCPS_debug_level > 0) {
02376       OpenDDS::DCPS::RepoIdConverter pub_converter(pub->actorId);
02377       OpenDDS::DCPS::RepoIdConverter part_converter(pub->participantId);
02378       ACE_DEBUG((LM_DEBUG,
02379                  ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::receive_image: ")
02380                  ACE_TEXT("added publication %C to participant %C.\n"),
02381                  std::string(pub_converter).c_str(),
02382                  std::string(part_converter).c_str()));
02383     }
02384   }
02385 
02386 #ifndef DDS_HAS_MINIMUM_BIT
02387   if (TheServiceParticipant->get_BIT()) {
02388     for (DCPS_IR_Domain_Map::const_iterator currentDomain = domains_.begin();
02389          currentDomain != domains_.end();
02390          ++currentDomain) {
02391       currentDomain->second->reassociate_built_in_topic_pubs();
02392     }
02393   }
02394 #endif
02395 
02396   return true;
02397 }
02398 
02399 void
02400 TAO_DDS_DCPSInfo_i::add(Update::Updater* updater)
02401 {
02402   if (this->um_) {
02403     this->um_->add(updater);
02404   }
02405 }
02406 
02407 bool
02408 TAO_DDS_DCPSInfo_i::init_persistence()
02409 {
02410   um_ = ACE_Dynamic_Service<UpdateManagerSvc>::instance
02411         ("UpdateManagerSvc");
02412 
02413   if (um_ != 0) {
02414     um_->add(this);
02415 
02416     // Request persistent image.
02417     if (reincarnate_) {
02418       um_->requestImage();
02419     }
02420 
02421   } else {
02422     ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("TAO_DDS_DCPSInfo_i> Failed to discover ")
02423                       ACE_TEXT("UpdateManagerSvc.\n")), false);
02424   }
02425 
02426   return true;
02427 }
02428 
02429 bool
02430 TAO_DDS_DCPSInfo_i::init_reassociation(const ACE_Time_Value& delay)
02431 {
02432   if (this->reassociate_timer_id_ != -1) return false;  // already scheduled
02433 
02434   ACE_Reactor* reactor = this->orb_->orb_core()->reactor();
02435 
02436   this->reassociate_timer_id_ = reactor->schedule_timer(this, 0, delay, delay);
02437   return this->reassociate_timer_id_ != -1;
02438 }
02439 
02440 bool
02441 TAO_DDS_DCPSInfo_i::init_dispatchChecking(const ACE_Time_Value& delay)
02442 {
02443   if (this->dispatch_check_timer_id_ != -1) return false;  // already scheduled
02444 
02445   ACE_Reactor* reactor = this->orb_->orb_core()->reactor();
02446 
02447   this->dispatch_check_timer_id_ = reactor->schedule_timer(this, this, delay, delay);
02448   return this->dispatch_check_timer_id_ != -1;
02449 }
02450 
02451 void
02452 TAO_DDS_DCPSInfo_i::finalize()
02453 {
02454   if (reassociate_timer_id_ != -1) {
02455     ACE_Reactor* reactor = this->orb_->orb_core()->reactor();
02456 
02457     reactor->cancel_timer(this->reassociate_timer_id_);
02458     this->reassociate_timer_id_ = -1;
02459   }
02460 
02461   if (dispatch_check_timer_id_ != -1) {
02462     ACE_Reactor* reactor = this->orb_->orb_core()->reactor();
02463 
02464     reactor->cancel_timer(this->dispatch_check_timer_id_);
02465     this->dispatch_check_timer_id_ = -1;
02466   }
02467 }
02468 
02469 const DCPS_IR_Domain_Map&
02470 TAO_DDS_DCPSInfo_i::domains() const
02471 {
02472   return this->domains_;
02473 }
02474 
02475 
02476 char*
02477 TAO_DDS_DCPSInfo_i::dump_to_string()
02478 {
02479   std::string dump;
02480 #if !defined (OPENDDS_INFOREPO_REDUCED_FOOTPRINT)
02481   std::string indent ("    ");
02482 
02483   for (DCPS_IR_Domain_Map::const_iterator dm = domains_.begin();
02484        dm != domains_.end();
02485        dm++)
02486   {
02487     dump += dm->second->dump_to_string(indent, 0);
02488   }
02489 #endif // !defined (OPENDDS_INFOREPO_REDUCED_FOOTPRINT)
02490   return CORBA::string_dup(dump.c_str());
02491 
02492 }
02493 
02494 OPENDDS_END_VERSIONED_NAMESPACE_DECL
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1