DCPSInfo_i.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "DcpsInfo_pch.h"
00009 
00010 #include /**/ "DCPSInfo_i.h"
00011 
00012 #include "dds/DCPS/InfoRepoDiscovery/InfoC.h"
00013 #include "dds/DCPS/transport/tcp/TcpInst.h"
00014 #include "dds/DCPS/transport/framework/TransportRegistry.h"
00015 #include "dds/DCPS/transport/framework/TransportInst.h"
00016 #include "dds/DCPS/transport/framework/TransportInst_rch.h"
00017 #include "dds/DCPS/transport/tcp/TcpInst.h"
00018 #include "dds/DCPS/transport/tcp/TcpInst_rch.h"
00019 #include "UpdateManager.h"
00020 #include "ShutdownInterface.h"
00021 
00022 #include "dds/DCPS/BuiltInTopicUtils.h"
00023 #include "dds/DCPS/GuidUtils.h"
00024 #include "dds/DCPS/RepoIdConverter.h"
00025 
00026 #include /**/ "tao/debug.h"
00027 
00028 #include /**/ "ace/Read_Buffer.h"
00029 #include /**/ "ace/OS_NS_stdio.h"
00030 #include "ace/Dynamic_Service.h"
00031 #include "ace/Reactor.h"
00032 
00033 // constructor
00034 TAO_DDS_DCPSInfo_i::TAO_DDS_DCPSInfo_i(CORBA::ORB_ptr orb
00035                                        , bool reincarnate
00036                                        , ShutdownInterface* shutdown
00037                                        , const TAO_DDS_DCPSFederationId& federation)
00038   : orb_(CORBA::ORB::_duplicate(orb))
00039   , federation_(federation)
00040   , participantIdGenerator_(federation.id())
00041   , um_(0)
00042   , reincarnate_(reincarnate)
00043   , shutdown_(shutdown)
00044   , reassociate_timer_id_(-1)
00045   , dispatch_check_timer_id_(-1)
00046 {
00047   int argc = 0;
00048   char** no_argv = 0;
00049   dispatchingOrb_ = CORBA::ORB_init(argc, no_argv, "dispatchingOnly");
00050 }
00051 
00052 //  destructor
00053 TAO_DDS_DCPSInfo_i::~TAO_DDS_DCPSInfo_i()
00054 {
00055 }
00056 
00057 int
00058 TAO_DDS_DCPSInfo_i::handle_timeout(const ACE_Time_Value& /*now*/,
00059                                    const void* arg)
00060 {
00061   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, 0);
00062 
00063   if (arg == this) {
00064     if ( !CORBA::is_nil(this->dispatchingOrb_.in())){
00065       if (this->dispatchingOrb_->work_pending())
00066       {
00067         // Ten microseconds
00068         ACE_Time_Value small(0,10);
00069         this->dispatchingOrb_->perform_work(small);
00070       }
00071     }
00072   }
00073   else {
00074   // NOTE: This is a purposefully naive approach to addressing defunct
00075   // associations.  In the future, it may be worthwhile to introduce a
00076   // callback model to fix the heinous runtime cost below:
00077   for (DCPS_IR_Domain_Map::const_iterator dom(this->domains_.begin());
00078        dom != this->domains_.end(); ++dom) {
00079 
00080     const DCPS_IR_Participant_Map& participants(dom->second->participants());
00081     for (DCPS_IR_Participant_Map::const_iterator part(participants.begin());
00082          part != participants.end(); ++part) {
00083 
00084       const DCPS_IR_Subscription_Map& subscriptions(part->second->subscriptions());
00085       for (DCPS_IR_Subscription_Map::const_iterator sub(subscriptions.begin());
00086            sub != subscriptions.end(); ++sub) {
00087         sub->second->reevaluate_defunct_associations();
00088       }
00089 
00090       const DCPS_IR_Publication_Map& publications(part->second->publications());
00091       for (DCPS_IR_Publication_Map::const_iterator pub(publications.begin());
00092            pub != publications.end(); ++pub) {
00093         pub->second->reevaluate_defunct_associations();
00094       }
00095     }
00096   }
00097   }
00098 
00099   return 0;
00100 }
00101 
00102 void
00103 TAO_DDS_DCPSInfo_i::shutdown()
00104 {
00105   this->shutdown_->shutdown();
00106 }
00107 
00108 CORBA::ORB_ptr
00109 TAO_DDS_DCPSInfo_i::orb()
00110 {
00111   return CORBA::ORB::_duplicate(this->orb_.in());
00112 }
00113 
00114 CORBA::Boolean TAO_DDS_DCPSInfo_i::attach_participant(
00115   DDS::DomainId_t            domainId,
00116   const OpenDDS::DCPS::RepoId& participantId)
00117 {
00118   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, 0);
00119 
00120   // Grab the domain.
00121   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
00122 
00123   if (where == this->domains_.end()) {
00124     throw OpenDDS::DCPS::Invalid_Domain();
00125   }
00126 
00127   // Grab the participant.
00128   DCPS_IR_Participant* participant
00129   = where->second->participant(participantId);
00130 
00131   if (0 == participant) {
00132     throw OpenDDS::DCPS::Invalid_Participant();
00133   }
00134 
00135   // Establish ownership within the local repository.
00136   participant->takeOwnership();
00137 
00138   return false;
00139 }
00140 
00141 bool
00142 TAO_DDS_DCPSInfo_i::changeOwnership(
00143   DDS::DomainId_t              domainId,
00144   const OpenDDS::DCPS::RepoId& participantId,
00145   long                           sender,
00146   long                           owner)
00147 {
00148   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, false);
00149 
00150   // Grab the domain.
00151   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
00152 
00153   if (where == this->domains_.end()) {
00154     return false;
00155   }
00156 
00157   // Grab the participant.
00158   DCPS_IR_Participant* participant
00159   = where->second->participant(participantId);
00160 
00161   if (0 == participant) {
00162     return false;
00163   }
00164 
00165   // Establish the ownership.
00166   participant->changeOwner(sender, owner);
00167   return true;
00168 }
00169 
00170 OpenDDS::DCPS::TopicStatus TAO_DDS_DCPSInfo_i::assert_topic(
00171   OpenDDS::DCPS::RepoId_out topicId,
00172   DDS::DomainId_t domainId,
00173   const OpenDDS::DCPS::RepoId& participantId,
00174   const char * topicName,
00175   const char * dataTypeName,
00176   const DDS::TopicQos & qos,
00177   bool /*hasDcpsKey -- only used for RTPS Discovery*/)
00178 {
00179   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, OpenDDS::DCPS::INTERNAL_ERROR);
00180   // Grab the domain.
00181   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
00182 
00183   if (where == this->domains_.end()) {
00184     throw OpenDDS::DCPS::Invalid_Domain();
00185   }
00186 
00187   // Grab the participant.
00188   DCPS_IR_Participant* participantPtr
00189   = where->second->participant(participantId);
00190 
00191   if (0 == participantPtr) {
00192     throw OpenDDS::DCPS::Invalid_Participant();
00193   }
00194 
00195   OpenDDS::DCPS::TopicStatus topicStatus
00196   = where->second->add_topic(
00197       topicId,
00198       topicName,
00199       dataTypeName,
00200       qos,
00201       participantPtr);
00202 
00203   if (this->um_ && (participantPtr->isBitPublisher() == false)) {
00204     Update::UTopic topic(domainId, topicId, participantId
00205                          , topicName, dataTypeName
00206                          , const_cast<DDS::TopicQos &>(qos));
00207     this->um_->create(topic);
00208 
00209     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00210       OpenDDS::DCPS::RepoIdConverter converter(topicId);
00211       ACE_DEBUG((LM_DEBUG,
00212                  ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::assert_topic: ")
00213                  ACE_TEXT("pushing creation of topic %C in domain %d.\n"),
00214                  std::string(converter).c_str(),
00215                  domainId));
00216     }
00217   }
00218   return topicStatus;
00219 }
00220 
00221 bool
00222 TAO_DDS_DCPSInfo_i::add_topic(const OpenDDS::DCPS::RepoId& topicId,
00223                               DDS::DomainId_t domainId,
00224                               const OpenDDS::DCPS::RepoId& participantId,
00225                               const char* topicName,
00226                               const char* dataTypeName,
00227                               const DDS::TopicQos& qos)
00228 {
00229   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, false);
00230 
00231   // Grab the domain.
00232   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
00233 
00234   if (where == this->domains_.end()) {
00235     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00236       ACE_DEBUG((LM_WARNING,
00237                  ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_topic: ")
00238                  ACE_TEXT("invalid domain %d.\n"),
00239                  domainId));
00240     }
00241 
00242     return false;
00243   }
00244 
00245   // Grab the participant.
00246   DCPS_IR_Participant* participantPtr
00247   = where->second->participant(participantId);
00248 
00249   if (0 == participantPtr) {
00250     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00251       OpenDDS::DCPS::RepoIdConverter converter(participantId);
00252       ACE_DEBUG((LM_WARNING,
00253                  ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_topic: ")
00254                  ACE_TEXT("invalid participant %C.\n"),
00255                  std::string(converter).c_str()));
00256     }
00257 
00258     return false;
00259   }
00260 
00261   OpenDDS::DCPS::TopicStatus topicStatus
00262   = where->second->force_add_topic(topicId, topicName, dataTypeName,
00263                                    qos, participantPtr);
00264 
00265   if (topicStatus != OpenDDS::DCPS::CREATED) {
00266     return false;
00267   }
00268 
00269   OpenDDS::DCPS::RepoIdConverter converter(topicId);
00270 
00271   // See if we are adding a topic that was created within this
00272   // repository or a different repository.
00273   if (converter.federationId() == federation_.id()) {
00274     // Ensure the topic RepoId values do not conflict.
00275     participantPtr->last_topic_key(converter.entityKey());
00276   }
00277 
00278   return true;
00279 }
00280 
00281 OpenDDS::DCPS::TopicStatus TAO_DDS_DCPSInfo_i::find_topic(
00282   DDS::DomainId_t domainId,
00283   const char * topicName,
00284   CORBA::String_out dataTypeName,
00285   DDS::TopicQos_out qos,
00286   OpenDDS::DCPS::RepoId_out topicId)
00287 {
00288   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, OpenDDS::DCPS::INTERNAL_ERROR);
00289 
00290   // Grab the domain.
00291   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
00292 
00293   if (where == this->domains_.end()) {
00294     throw OpenDDS::DCPS::Invalid_Domain();
00295   }
00296 
00297   OpenDDS::DCPS::TopicStatus status = OpenDDS::DCPS::NOT_FOUND;
00298 
00299   DCPS_IR_Topic* topic = 0;
00300   qos = new DDS::TopicQos;
00301 
00302   status = where->second->find_topic(topicName, topic);
00303 
00304   if (0 != topic) {
00305     status = OpenDDS::DCPS::FOUND;
00306     const DCPS_IR_Topic_Description* desc = topic->get_topic_description();
00307     dataTypeName = desc->get_dataTypeName();
00308     *qos = *(topic->get_topic_qos());
00309     topicId = topic->get_id();
00310   }
00311 
00312   return status;
00313 }
00314 
00315 OpenDDS::DCPS::TopicStatus TAO_DDS_DCPSInfo_i::remove_topic(
00316   DDS::DomainId_t domainId,
00317   const OpenDDS::DCPS::RepoId& participantId,
00318   const OpenDDS::DCPS::RepoId& topicId)
00319 {
00320   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, OpenDDS::DCPS::INTERNAL_ERROR);
00321 
00322   // Grab the domain.
00323   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
00324 
00325   if (where == this->domains_.end()) {
00326     throw OpenDDS::DCPS::Invalid_Domain();
00327   }
00328 
00329   // Grab the participant.
00330   DCPS_IR_Participant* partPtr
00331   = where->second->participant(participantId);
00332 
00333   if (0 == partPtr) {
00334     throw OpenDDS::DCPS::Invalid_Participant();
00335   }
00336 
00337   DCPS_IR_Topic* topic;
00338 
00339   if (partPtr->find_topic_reference(topicId, topic) != 0) {
00340     throw OpenDDS::DCPS::Invalid_Topic();
00341   }
00342 
00343   OpenDDS::DCPS::TopicStatus removedStatus = where->second->remove_topic(partPtr, topic);
00344 
00345   if (this->um_
00346       && (partPtr->isOwner() == true)
00347       && (partPtr->isBitPublisher() == false)) {
00348     Update::IdPath path(domainId, participantId, topicId);
00349     this->um_->destroy(path, Update::Topic);
00350 
00351     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00352       OpenDDS::DCPS::RepoIdConverter converter(topicId);
00353       ACE_DEBUG((LM_DEBUG,
00354                  ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::remove_topic: ")
00355                  ACE_TEXT("pushing deletion of topic %C in domain %d.\n"),
00356                  std::string(converter).c_str(),
00357                  domainId));
00358     }
00359   }
00360 
00361   return removedStatus;
00362 }
00363 
00364 OpenDDS::DCPS::RepoId TAO_DDS_DCPSInfo_i::add_publication(
00365   DDS::DomainId_t domainId,
00366   const OpenDDS::DCPS::RepoId& participantId,
00367   const OpenDDS::DCPS::RepoId& topicId,
00368   OpenDDS::DCPS::DataWriterRemote_ptr publication,
00369   const DDS::DataWriterQos & qos,
00370   const OpenDDS::DCPS::TransportLocatorSeq & transInfo,
00371   const DDS::PublisherQos & publisherQos)
00372 {
00373   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, OpenDDS::DCPS::GUID_UNKNOWN);
00374 
00375   // Grab the domain.
00376   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
00377 
00378   if (where == this->domains_.end()) {
00379     throw OpenDDS::DCPS::Invalid_Domain();
00380   }
00381 
00382   // Grab the participant.
00383   DCPS_IR_Participant* partPtr
00384   = where->second->participant(participantId);
00385 
00386   if (0 == partPtr) {
00387     throw OpenDDS::DCPS::Invalid_Participant();
00388   }
00389 
00390   DCPS_IR_Topic* topic = where->second->find_topic(topicId);
00391 
00392   if (topic == 0) {
00393     throw OpenDDS::DCPS::Invalid_Topic();
00394   }
00395 
00396   OpenDDS::DCPS::RepoId pubId = partPtr->get_next_publication_id();
00397 
00398   // Remarshall the remote reference onto the dispatching orb.
00399   OpenDDS::DCPS::DataWriterRemote_var marshalledPub(OpenDDS::DCPS::DataWriterRemote::_duplicate(publication));
00400   if (CORBA::is_nil(marshalledPub.in())) {
00401     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00402       ACE_DEBUG((LM_WARNING,
00403                  ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_publication: ")
00404                  ACE_TEXT("invalid publication reference.\n")));
00405     }
00406     return OpenDDS::DCPS::GUID_UNKNOWN;
00407   }
00408   CORBA::String_var pubStr = orb_->object_to_string(marshalledPub.in());
00409   CORBA::Object_var pubObj = dispatchingOrb_->string_to_object(pubStr.in());
00410   if (CORBA::is_nil(pubObj.in()))  {
00411     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00412       ACE_DEBUG((LM_WARNING,
00413                  ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_publication: ")
00414                  ACE_TEXT("failure marshalling publication on dispatching orb.\n")));
00415     }
00416     return OpenDDS::DCPS::GUID_UNKNOWN;
00417   }
00418   OpenDDS::DCPS::DataWriterRemote_var dispatchingPublication
00419   = OpenDDS::DCPS::DataWriterRemote::_unchecked_narrow(pubObj.in());
00420 
00421   DCPS_IR_Publication* pubPtr;
00422   ACE_NEW_RETURN(pubPtr,
00423                  DCPS_IR_Publication(
00424                    pubId,
00425                    partPtr,
00426                    topic,
00427                    dispatchingPublication.in(),
00428                    qos,
00429                    transInfo,
00430                    publisherQos),
00431                  OpenDDS::DCPS::GUID_UNKNOWN);
00432 
00433   if (partPtr->add_publication(pubPtr) != 0) {
00434     // failed to add.  we are responsible for the memory.
00435     pubId = OpenDDS::DCPS::GUID_UNKNOWN;
00436     delete pubPtr;
00437     pubPtr = 0;
00438 
00439   } else if (topic->add_publication_reference(pubPtr) != 0) {
00440     // Failed to add to the topic
00441     // so remove from participant and fail.
00442     partPtr->remove_publication(pubId);
00443     pubId = OpenDDS::DCPS::GUID_UNKNOWN;
00444   }
00445 
00446   if (this->um_ && (partPtr->isBitPublisher() == false)) {
00447     CORBA::String_var callback = orb_->object_to_string(publication);
00448     Update::ContentSubscriptionInfo csi;
00449 
00450     Update::UWActor actor(domainId, pubId, topicId, participantId, Update::DataWriter
00451                           , callback.in()
00452                           , const_cast<DDS::PublisherQos &>(publisherQos)
00453                           , const_cast<DDS::DataWriterQos &>(qos)
00454                           , const_cast<OpenDDS::DCPS::TransportLocatorSeq &>
00455                           (transInfo), csi);
00456     this->um_->create(actor);
00457 
00458     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00459       OpenDDS::DCPS::RepoIdConverter converter(pubId);
00460       ACE_DEBUG((LM_DEBUG,
00461                  ACE_TEXT("(%P|%t) (RepoId)TAO_DDS_DCPSInfo_i::add_publication: ")
00462                  ACE_TEXT("pushing creation of publication %C in domain %d.\n"),
00463                  std::string(converter).c_str(),
00464                  domainId));
00465     }
00466   }
00467 
00468   where->second->remove_dead_participants();
00469   return pubId;
00470 }
00471 
00472 bool
00473 TAO_DDS_DCPSInfo_i::add_publication(DDS::DomainId_t domainId,
00474                                     const OpenDDS::DCPS::RepoId& participantId,
00475                                     const OpenDDS::DCPS::RepoId& topicId,
00476                                     const OpenDDS::DCPS::RepoId& pubId,
00477                                     const char* pub_str,
00478                                     const DDS::DataWriterQos & qos,
00479                                     const OpenDDS::DCPS::TransportLocatorSeq & transInfo,
00480                                     const DDS::PublisherQos & publisherQos,
00481                                     bool associate)
00482 {
00483   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, false);
00484 
00485   // Grab the domain.
00486   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
00487 
00488   if (where == this->domains_.end()) {
00489     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00490       ACE_DEBUG((LM_WARNING,
00491                  ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_publication: ")
00492                  ACE_TEXT("invalid domain %d.\n"),
00493                  domainId));
00494     }
00495 
00496     return false;
00497   }
00498 
00499   // Grab the participant.
00500   DCPS_IR_Participant* partPtr
00501   = where->second->participant(participantId);
00502 
00503   if (0 == partPtr) {
00504     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00505       OpenDDS::DCPS::RepoIdConverter converter(pubId);
00506       ACE_DEBUG((LM_WARNING,
00507                  ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_publication: ")
00508                  ACE_TEXT("invalid participant %C in domain %d.\n"),
00509                  std::string(converter).c_str(),
00510                  domainId));
00511     }
00512 
00513     return false;
00514   }
00515 
00516   DCPS_IR_Topic* topic = where->second->find_topic(topicId);
00517 
00518   if (topic == 0) {
00519     OpenDDS::DCPS::RepoIdConverter converter(topicId);
00520     ACE_DEBUG((LM_WARNING,
00521                ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_publication: ")
00522                ACE_TEXT("invalid topic %C in domain %d.\n"),
00523                std::string(converter).c_str(),
00524                domainId));
00525     return false;
00526   }
00527 
00528   /// @TODO: Check if this is already stored.  If so, just clear the callback IOR.
00529 
00530   CORBA::Object_var obj = dispatchingOrb_->string_to_object(pub_str);
00531   if (CORBA::is_nil(obj.in())) {
00532     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00533       ACE_DEBUG((LM_WARNING,
00534                  ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_publication: ")
00535                  ACE_TEXT("failure marshalling publication %s on dispatching orb.\n"),
00536                  pub_str));
00537     }
00538     return false;
00539   }
00540   OpenDDS::DCPS::DataWriterRemote_var publication
00541   = OpenDDS::DCPS::DataWriterRemote::_unchecked_narrow(obj.in());
00542 
00543   DCPS_IR_Publication* pubPtr;
00544   ACE_NEW_RETURN(pubPtr,
00545                  DCPS_IR_Publication(
00546                    pubId,
00547                    partPtr,
00548                    topic,
00549                    publication.in(),
00550                    qos,
00551                    transInfo,
00552                    publisherQos),
00553                  0);
00554 
00555   switch (partPtr->add_publication(pubPtr)) {
00556   case -1: {
00557     OpenDDS::DCPS::RepoIdConverter converter(pubId);
00558     ACE_ERROR((LM_ERROR,
00559                ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::add_publication: ")
00560                ACE_TEXT("failed to add publication to participant %C.\n"),
00561                std::string(converter).c_str()));
00562   }
00563   // Deliberate fall through to next case.
00564 
00565   case 1:
00566     delete pubPtr;
00567     return false;
00568 
00569   case 0:
00570   default:
00571     break;
00572   }
00573 
00574   switch (topic->add_publication_reference(pubPtr, associate)) {
00575   case -1: {
00576     OpenDDS::DCPS::RepoIdConverter converter(pubId);
00577     ACE_ERROR((LM_ERROR,
00578                ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::add_publication: ")
00579                ACE_TEXT("failed to add publication to participant %C topic list.\n"),
00580                std::string(converter).c_str()));
00581 
00582     // Remove the publication.
00583     partPtr->remove_publication(pubId);
00584 
00585   }
00586   return false;
00587 
00588   case 1: // This is actually a really really bad place to jump to.
00589     // This means that we successfully added the new publication
00590     // to the participant (it had not been inserted before) but
00591     // that we are adding a duplicate publication to the topic
00592     // list - which should never ever be able to happen.
00593     return false;
00594 
00595   case 0:
00596   default:
00597     break;
00598   }
00599 
00600   OpenDDS::DCPS::RepoIdConverter converter(pubId);
00601 
00602   // See if we are adding a publication that was created within this
00603   // repository or a different repository.
00604   if (converter.federationId() == federation_.id()) {
00605     // Ensure the publication RepoId values do not conflict.
00606     partPtr->last_publication_key(converter.entityKey());
00607   }
00608 
00609   return true;
00610 }
00611 
00612 void TAO_DDS_DCPSInfo_i::remove_publication(
00613   DDS::DomainId_t domainId,
00614   const OpenDDS::DCPS::RepoId& participantId,
00615   const OpenDDS::DCPS::RepoId& publicationId)
00616 {
00617   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00618 
00619   // Grab the domain.
00620   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
00621 
00622   if (where == this->domains_.end()) {
00623     throw OpenDDS::DCPS::Invalid_Domain();
00624   }
00625 
00626   // Grab the participant.
00627   DCPS_IR_Participant* partPtr
00628   = where->second->participant(participantId);
00629 
00630   if (0 == partPtr) {
00631     throw OpenDDS::DCPS::Invalid_Participant();
00632   }
00633 
00634   if (partPtr->remove_publication(publicationId) != 0) {
00635     where->second->remove_dead_participants();
00636 
00637     // throw exception because the publication was not removed!
00638     throw OpenDDS::DCPS::Invalid_Publication();
00639   }
00640 
00641   where->second->remove_dead_participants();
00642 
00643   if (this->um_
00644       && (partPtr->isOwner() == true)
00645       && (partPtr->isBitPublisher() == false)) {
00646     Update::IdPath path(domainId, participantId, publicationId);
00647     this->um_->destroy(path, Update::Actor, Update::DataWriter);
00648 
00649     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00650       OpenDDS::DCPS::RepoIdConverter converter(publicationId);
00651       ACE_DEBUG((LM_DEBUG,
00652                  ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::remove_publication: ")
00653                  ACE_TEXT("pushing deletion of publication %C in domain %d.\n"),
00654                  std::string(converter).c_str(),
00655                  domainId));
00656     }
00657   }
00658 }
00659 
00660 OpenDDS::DCPS::RepoId TAO_DDS_DCPSInfo_i::add_subscription(
00661   DDS::DomainId_t domainId,
00662   const OpenDDS::DCPS::RepoId& participantId,
00663   const OpenDDS::DCPS::RepoId& topicId,
00664   OpenDDS::DCPS::DataReaderRemote_ptr subscription,
00665   const DDS::DataReaderQos & qos,
00666   const OpenDDS::DCPS::TransportLocatorSeq & transInfo,
00667   const DDS::SubscriberQos & subscriberQos,
00668   const char* filterClassName,
00669   const char* filterExpression,
00670   const DDS::StringSeq& exprParams)
00671 {
00672   DCPS_IR_Domain* domainPtr;
00673   DCPS_IR_Participant* partPtr;
00674   DCPS_IR_Subscription* subPtr;
00675   DCPS_IR_Topic* topic;
00676   OpenDDS::DCPS::RepoId subId;
00677   {
00678     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, OpenDDS::DCPS::GUID_UNKNOWN);
00679 
00680     // Grab the domain.
00681     DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
00682 
00683     if (where == this->domains_.end()) {
00684       throw OpenDDS::DCPS::Invalid_Domain();
00685     }
00686 
00687     // Grab the domain and participant.
00688     domainPtr = where->second;
00689     partPtr = domainPtr->participant(participantId);
00690 
00691     if (0 == partPtr) {
00692       throw OpenDDS::DCPS::Invalid_Participant();
00693     }
00694 
00695     topic = where->second->find_topic(topicId);
00696 
00697     if (topic == 0) {
00698       throw OpenDDS::DCPS::Invalid_Topic();
00699     }
00700 
00701     subId = partPtr->get_next_subscription_id();
00702 
00703     // Remarshall the remote reference onto the dispatching orb.
00704     OpenDDS::DCPS::DataReaderRemote_var marshalledSub (
00705       OpenDDS::DCPS::DataReaderRemote::_duplicate(subscription));
00706     if (CORBA::is_nil(marshalledSub.in())) {
00707       if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00708         ACE_DEBUG((LM_WARNING,
00709                    ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_subscription: ")
00710                    ACE_TEXT("invalid subscription reference.\n")));
00711       }
00712       return OpenDDS::DCPS::GUID_UNKNOWN;
00713     }
00714 
00715     CORBA::String_var subStr = orb_->object_to_string(marshalledSub.in());
00716     CORBA::Object_var subObj = dispatchingOrb_->string_to_object(subStr.in());
00717     if (CORBA::is_nil(subObj.in())) {
00718       if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00719         ACE_DEBUG((LM_WARNING,
00720                    ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_subscription: ")
00721                    ACE_TEXT("failure marshalling subscription on dispatching orb.\n")));
00722       }
00723       return OpenDDS::DCPS::GUID_UNKNOWN;
00724     }
00725     OpenDDS::DCPS::DataReaderRemote_var dispatchingSubscription
00726     = OpenDDS::DCPS::DataReaderRemote::_unchecked_narrow(subObj.in());
00727 
00728     ACE_NEW_RETURN(subPtr,
00729                    DCPS_IR_Subscription(
00730                      subId,
00731                      partPtr,
00732                      topic,
00733                      dispatchingSubscription.in(),
00734                      qos,
00735                      transInfo,
00736                      subscriberQos,
00737                      filterClassName,
00738                      filterExpression,
00739                      exprParams),
00740                    OpenDDS::DCPS::GUID_UNKNOWN);
00741 
00742     // Release lock
00743   }
00744   if (partPtr->add_subscription(subPtr) != 0) {
00745     // failed to add.  we are responsible for the memory.
00746     subId = OpenDDS::DCPS::GUID_UNKNOWN;
00747     delete subPtr;
00748     subPtr = 0;
00749 
00750   } else if (topic->add_subscription_reference(subPtr) != 0) {
00751     ACE_ERROR((LM_ERROR, ACE_TEXT("Failed to add subscription to topic list.\n")));
00752     // No associations were made so remove and fail.
00753     partPtr->remove_subscription(subId);
00754     subId = OpenDDS::DCPS::GUID_UNKNOWN;
00755   }
00756 
00757   if (this->um_ && (partPtr->isBitPublisher() == false)) {
00758     CORBA::String_var callback = orb_->object_to_string(subscription);
00759     Update::ContentSubscriptionInfo csi(filterClassName, filterExpression, exprParams);
00760 
00761     Update::URActor actor(domainId, subId, topicId, participantId, Update::DataReader
00762                           , callback.in()
00763                           , const_cast<DDS::SubscriberQos &>(subscriberQos)
00764                           , const_cast<DDS::DataReaderQos &>(qos)
00765                           , const_cast<OpenDDS::DCPS::TransportLocatorSeq &>
00766                           (transInfo), csi);
00767 
00768     this->um_->create(actor);
00769 
00770     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00771       OpenDDS::DCPS::RepoIdConverter converter(subId);
00772       ACE_DEBUG((LM_DEBUG,
00773                  ACE_TEXT("(%P|%t) (RepoId)TAO_DDS_DCPSInfo_i::add_subscription: ")
00774                  ACE_TEXT("pushing creation of subscription %C in domain %d.\n"),
00775                  std::string(converter).c_str(),
00776                  domainId));
00777     }
00778   }
00779 
00780   domainPtr->remove_dead_participants();
00781 
00782   return subId;
00783 }
00784 
00785 bool
00786 TAO_DDS_DCPSInfo_i::add_subscription(
00787   DDS::DomainId_t domainId,
00788   const OpenDDS::DCPS::RepoId& participantId,
00789   const OpenDDS::DCPS::RepoId& topicId,
00790   const OpenDDS::DCPS::RepoId& subId,
00791   const char* sub_str,
00792   const DDS::DataReaderQos & qos,
00793   const OpenDDS::DCPS::TransportLocatorSeq & transInfo,
00794   const DDS::SubscriberQos & subscriberQos,
00795   const char* filterClassName,
00796   const char* filterExpression,
00797   const DDS::StringSeq& exprParams,
00798   bool associate)
00799 {
00800   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, false);
00801 
00802   // Grab the domain.
00803   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
00804 
00805   if (where == this->domains_.end()) {
00806     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00807       ACE_DEBUG((LM_WARNING,
00808                  ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_subscription: ")
00809                  ACE_TEXT("invalid domain %d.\n"),
00810                  domainId));
00811     }
00812 
00813     return false;
00814   }
00815 
00816   // Grab the participant.
00817   DCPS_IR_Participant* partPtr
00818   = where->second->participant(participantId);
00819 
00820   if (0 == partPtr) {
00821     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00822       OpenDDS::DCPS::RepoIdConverter converter(participantId);
00823       ACE_DEBUG((LM_WARNING,
00824                  ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_subscription: ")
00825                  ACE_TEXT("invalid participant %C in domain %d.\n"),
00826                  std::string(converter).c_str(),
00827                  domainId));
00828     }
00829 
00830     return false;
00831   }
00832 
00833   DCPS_IR_Topic* topic = where->second->find_topic(topicId);
00834 
00835   if (topic == 0) {
00836     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00837       OpenDDS::DCPS::RepoIdConverter converter(topicId);
00838       ACE_DEBUG((LM_WARNING,
00839                  ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_subscription: ")
00840                  ACE_TEXT("invalid topic %C in domain %d.\n"),
00841                  std::string(converter).c_str(),
00842                  domainId));
00843     }
00844 
00845     return false;
00846   }
00847 
00848   CORBA::Object_var obj = dispatchingOrb_->string_to_object(sub_str);
00849   if (CORBA::is_nil(obj.in())) {
00850     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00851       ACE_DEBUG((LM_WARNING,
00852                  ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_subscription: ")
00853                  ACE_TEXT("failure marshalling subscription %s on dispatching orb.\n"),
00854                  sub_str));
00855     }
00856     return false;
00857   }
00858   OpenDDS::DCPS::DataReaderRemote_var subscription
00859   = OpenDDS::DCPS::DataReaderRemote::_unchecked_narrow(obj.in());
00860 
00861   DCPS_IR_Subscription* subPtr;
00862   ACE_NEW_RETURN(subPtr,
00863                  DCPS_IR_Subscription(
00864                    subId,
00865                    partPtr,
00866                    topic,
00867                    subscription.in(),
00868                    qos,
00869                    transInfo,
00870                    subscriberQos,
00871                    filterClassName,
00872                    filterExpression,
00873                    exprParams),
00874                  0);
00875 
00876   switch (partPtr->add_subscription(subPtr)) {
00877   case -1: {
00878     OpenDDS::DCPS::RepoIdConverter converter(subId);
00879     ACE_ERROR((LM_ERROR,
00880                ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::add_subscription: ")
00881                ACE_TEXT("failed to add subscription to participant %C.\n"),
00882                std::string(converter).c_str()));
00883   }
00884   // Deliberate fall through to next case.
00885 
00886   case 1:
00887     delete subPtr;
00888     return false;
00889 
00890   case 0:
00891   default:
00892     break;
00893   }
00894 
00895   switch (topic->add_subscription_reference(subPtr, associate)) {
00896   case -1: {
00897     OpenDDS::DCPS::RepoIdConverter converter(subId);
00898     ACE_ERROR((LM_ERROR,
00899                ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::add_subscription: ")
00900                ACE_TEXT("failed to add subscription to participant %C topic list.\n"),
00901                std::string(converter).c_str()));
00902 
00903     // Remove the subscription.
00904     partPtr->remove_subscription(subId);
00905 
00906   }
00907   return false;
00908 
00909   case 1: // This is actually a really really bad place to jump to.
00910     // This means that we successfully added the new subscription
00911     // to the participant (it had not been inserted before) but
00912     // that we are adding a duplicate subscription to the topic
00913     // list - which should never ever be able to happen.
00914     return false;
00915 
00916   case 0:
00917   default:
00918     break;
00919   }
00920 
00921   OpenDDS::DCPS::RepoIdConverter converter(subId);
00922 
00923   // See if we are adding a subscription that was created within this
00924   // repository or a different repository.
00925   if (converter.federationId() == federation_.id()) {
00926     // Ensure the subscription RepoId values do not conflict.
00927     partPtr->last_subscription_key(converter.entityKey());
00928   }
00929 
00930   return true;
00931 }
00932 
00933 void TAO_DDS_DCPSInfo_i::remove_subscription(
00934   DDS::DomainId_t domainId,
00935   const OpenDDS::DCPS::RepoId& participantId,
00936   const OpenDDS::DCPS::RepoId& subscriptionId)
00937 {
00938   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00939 
00940   // Grab the domain.
00941   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
00942 
00943   if (where == this->domains_.end()) {
00944     throw OpenDDS::DCPS::Invalid_Domain();
00945   }
00946 
00947   // Grab the participant.
00948   DCPS_IR_Participant* partPtr
00949   = where->second->participant(participantId);
00950 
00951   if (0 == partPtr) {
00952     throw OpenDDS::DCPS::Invalid_Participant();
00953   }
00954 
00955   if (partPtr->remove_subscription(subscriptionId) != 0) {
00956     // throw exception because the subscription was not removed!
00957     throw OpenDDS::DCPS::Invalid_Subscription();
00958   }
00959 
00960   where->second->remove_dead_participants();
00961 
00962   if (this->um_
00963       && (partPtr->isOwner() == true)
00964       && (partPtr->isBitPublisher() == false)) {
00965     Update::IdPath path(domainId, participantId, subscriptionId);
00966     this->um_->destroy(path, Update::Actor, Update::DataReader);
00967 
00968     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00969       OpenDDS::DCPS::RepoIdConverter converter(subscriptionId);
00970       ACE_DEBUG((LM_DEBUG,
00971                  ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::remove_subscription: ")
00972                  ACE_TEXT("pushing deletion of subscription %C in domain %d.\n"),
00973                  std::string(converter).c_str(),
00974                  domainId));
00975     }
00976   }
00977 }
00978 
00979 OpenDDS::DCPS::AddDomainStatus TAO_DDS_DCPSInfo_i::add_domain_participant(
00980   DDS::DomainId_t domain,
00981   const DDS::DomainParticipantQos & qos)
00982 {
00983   // A value to return.
00984   OpenDDS::DCPS::AddDomainStatus value;
00985   value.id        = OpenDDS::DCPS::GUID_UNKNOWN;
00986   value.federated = this->federation_.overridden();
00987 
00988   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, value);
00989 
00990   // Grab the domain.
00991   DCPS_IR_Domain* domainPtr = this->domain(domain);
00992 
00993   if (0 == domainPtr) {
00994     throw OpenDDS::DCPS::Invalid_Domain();
00995   }
00996 
00997   // Obtain a shiny new GUID value.
00998   OpenDDS::DCPS::RepoId participantId = domainPtr->get_next_participant_id();
00999 
01000   DCPS_IR_Participant* participant;
01001   ACE_NEW_RETURN(participant,
01002                  DCPS_IR_Participant(
01003                    this->federation_,
01004                    participantId,
01005                    domainPtr,
01006                    qos, um_),
01007                  value);
01008 
01009   // We created the participant, now we can return the Id value (eventually).
01010   value.id = participantId;
01011 
01012   // Determine if this is the 'special' repository internal participant
01013   // that publishes the built-in topics for a domain.
01014   if (domainPtr->participants().empty() && TheServiceParticipant->get_BIT()) {
01015     participant->isBitPublisher() = true;
01016 
01017     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
01018       OpenDDS::DCPS::RepoIdConverter converter(participantId);
01019       ACE_DEBUG((LM_DEBUG,
01020                  ACE_TEXT("(%P|%t) (RepoId)TAO_DDS_DCPSInfo_i::add_domain_participant: ")
01021                  ACE_TEXT("participant %C in domain %d is BIT publisher for this domain.\n"),
01022                  std::string(converter).c_str(),
01023                  domain));
01024     }
01025   }
01026 
01027   // Assume responsibilty for writing back to the participant.
01028   participant->takeOwnership();
01029 
01030   int status = domainPtr->add_participant(participant);
01031 
01032   if (0 != status) {
01033     // Adding the participant failed return the invalid
01034     // pariticipant Id number.
01035     participantId = OpenDDS::DCPS::GUID_UNKNOWN;
01036     delete participant;
01037     participant = 0;
01038 
01039   } else if (this->um_ && (participant->isBitPublisher() == false)) {
01040     // Push this participant to interested observers.
01041     Update::UParticipant updateParticipant(
01042       domain,
01043       participant->owner(),
01044       participantId,
01045       const_cast<DDS::DomainParticipantQos &>(qos));
01046     this->um_->create(updateParticipant);
01047 
01048     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
01049       OpenDDS::DCPS::RepoIdConverter converter(participantId);
01050       ACE_DEBUG((LM_DEBUG,
01051                  ACE_TEXT("(%P|%t) (RepoId)TAO_DDS_DCPSInfo_i::add_domain_participant: ")
01052                  ACE_TEXT("pushing creation of participant %C in domain %d.\n"),
01053                  std::string(converter).c_str(),
01054                  domain));
01055     }
01056   }
01057 
01058   if (OpenDDS::DCPS::DCPS_debug_level > 4) {
01059     OpenDDS::DCPS::RepoIdConverter converter(participantId);
01060     ACE_DEBUG((LM_DEBUG,
01061                ACE_TEXT("(%P|%t) (RepoId)TAO_DDS_DCPSInfo_i::add_domain_participant: ")
01062                ACE_TEXT("domain %d loaded participant %C at 0x%x.\n"),
01063                domain,
01064                std::string(converter).c_str(),
01065                participant));
01066   }
01067   return value;
01068 }
01069 
01070 bool
01071 TAO_DDS_DCPSInfo_i::add_domain_participant(DDS::DomainId_t domainId
01072                                            , const OpenDDS::DCPS::RepoId& participantId
01073                                            , const DDS::DomainParticipantQos & qos)
01074 {
01075   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, false);
01076 
01077   // Grab the domain.
01078   DCPS_IR_Domain* domainPtr = this->domain(domainId);
01079 
01080   if (0 == domainPtr) {
01081     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
01082       ACE_DEBUG((LM_WARNING,
01083                  ACE_TEXT("(%P|%t) WARNING: (bool)TAO_DDS_DCPSInfo_i::add_domain_participant: ")
01084                  ACE_TEXT("invalid domain Id: %d\n"),
01085                  domainId));
01086     }
01087 
01088     return false;
01089   }
01090 
01091   // Prepare to manipulate the participant's Id value.
01092   OpenDDS::DCPS::RepoIdConverter converter(participantId);
01093 
01094   // Grab the participant.
01095   DCPS_IR_Participant* partPtr
01096   = domainPtr->participant(participantId);
01097 
01098   if (0 != partPtr) {
01099     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
01100       ACE_ERROR((LM_ERROR,
01101                  ACE_TEXT("(%P|%t) (bool)TAO_DDS_DCPSInfo_i::add_domain_participant: ")
01102                  ACE_TEXT("participant id %C already exists.\n"),
01103                  std::string(converter).c_str()));
01104     }
01105 
01106     return false;
01107   }
01108 
01109   DCPS_IR_Participant* participant;
01110   ACE_NEW_RETURN(participant,
01111                  DCPS_IR_Participant(this->federation_,
01112                                      participantId,
01113                                      domainPtr,
01114                                      qos, um_), 0);
01115 
01116   switch (domainPtr->add_participant(participant)) {
01117   case -1: {
01118     ACE_ERROR((LM_ERROR,
01119                ACE_TEXT("(%P|%t) ERROR: (bool)TAO_DDS_DCPSInfo_i::add_domain_participant: ")
01120                ACE_TEXT("failed to load participant %C in domain %d.\n"),
01121                std::string(converter).c_str(),
01122                domainId));
01123   }
01124   delete participant;
01125   return false;
01126 
01127   case 1:
01128 
01129     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
01130       ACE_DEBUG((LM_WARNING,
01131                  ACE_TEXT("(%P|%t) WARNING: (bool)TAO_DDS_DCPSInfo_i::add_domain_participant: ")
01132                  ACE_TEXT("attempt to load duplicate participant %C in domain %d.\n"),
01133                  std::string(converter).c_str(),
01134                  domainId));
01135     }
01136 
01137     delete participant;
01138     return false;
01139 
01140   case 0:
01141   default:
01142     break;
01143   }
01144 
01145   // See if we are adding a participant that was created within this
01146   // repository or a different repository.
01147   if (converter.federationId() == this->federation_.id()) {
01148     // Ensure the participant GUID values do not conflict.
01149     domainPtr->last_participant_key(converter.participantId());
01150 
01151     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
01152       ACE_DEBUG((LM_DEBUG,
01153                  ACE_TEXT("(%P|%t) (bool)TAO_DDS_DCPSInfo_i::add_domain_participant: ")
01154                  ACE_TEXT("Adjusting highest participant Id value to at least %d.\n"),
01155                  converter.participantId()));
01156     }
01157   }
01158 
01159   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
01160     ACE_DEBUG((LM_DEBUG,
01161                ACE_TEXT("(%P|%t) (bool)TAO_DDS_DCPSInfo_i::add_domain_participant: ")
01162                ACE_TEXT("loaded participant %C at 0x%x in domain %d.\n"),
01163                std::string(converter).c_str(),
01164                participant,
01165                domainId));
01166   }
01167 
01168   return true;
01169 }
01170 
01171 bool
01172 TAO_DDS_DCPSInfo_i::remove_by_owner(
01173   DDS::DomainId_t domain,
01174   long              owner)
01175 {
01176   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, false);
01177 
01178   // Grab the domain.
01179   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domain);
01180 
01181   if (where == this->domains_.end()) {
01182     return false;
01183   }
01184 
01185   std::vector<OpenDDS::DCPS::RepoId> candidates;
01186 
01187   for (DCPS_IR_Participant_Map::const_iterator
01188        current = where->second->participants().begin();
01189        current != where->second->participants().end();
01190        ++current) {
01191     if (current->second->owner() == owner) {
01192       candidates.push_back(current->second->get_id());
01193     }
01194   }
01195 
01196   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
01197     ACE_DEBUG((LM_DEBUG,
01198                ACE_TEXT("(%P|%t) (bool)TAO_DDS_DCPSInfo_i::remove_by_owner: ")
01199                ACE_TEXT("%d participants to remove from domain %d.\n"),
01200                candidates.size(),
01201                domain));
01202   }
01203 
01204   bool status = true;
01205 
01206   for (unsigned int index = 0; index < candidates.size(); ++index) {
01207     DCPS_IR_Participant* participant
01208     = where->second->participant(candidates[index]);
01209 
01210     std::vector<OpenDDS::DCPS::RepoId> keylist;
01211 
01212     // Remove Subscriptions
01213     for (DCPS_IR_Subscription_Map::const_iterator
01214          current = participant->subscriptions().begin();
01215          current != participant->subscriptions().end();
01216          ++current) {
01217       keylist.push_back(current->second->get_id());
01218     }
01219 
01220     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
01221       OpenDDS::DCPS::RepoIdConverter converter(candidates[index]);
01222       ACE_DEBUG((LM_DEBUG,
01223                  ACE_TEXT("(%P|%t) (bool)TAO_DDS_DCPSInfo_i::remove_by_owner: ")
01224                  ACE_TEXT("%d subscriptions to remove from participant %C.\n"),
01225                  keylist.size(),
01226                  std::string(converter).c_str()));
01227     }
01228 
01229     for (unsigned int key = 0; key < keylist.size(); ++key) {
01230       if (participant->remove_subscription(keylist[ key]) != 0) {
01231         status = false;
01232       }
01233     }
01234 
01235     // Remove Publications
01236     keylist.clear();
01237 
01238     for (DCPS_IR_Publication_Map::const_iterator
01239          current = participant->publications().begin();
01240          current != participant->publications().end();
01241          ++current) {
01242       keylist.push_back(current->second->get_id());
01243     }
01244 
01245     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
01246       OpenDDS::DCPS::RepoIdConverter converter(candidates[index]);
01247       ACE_DEBUG((LM_DEBUG,
01248                  ACE_TEXT("(%P|%t) (bool)TAO_DDS_DCPSInfo_i::remove_by_owner: ")
01249                  ACE_TEXT("%d publications to remove from participant %C.\n"),
01250                  keylist.size(),
01251                  std::string(converter).c_str()));
01252     }
01253 
01254     for (unsigned int key = 0; key < keylist.size(); ++key) {
01255       if (participant->remove_publication(keylist[ key]) != 0) {
01256         status = false;
01257       }
01258     }
01259 
01260     // Remove Topics
01261     keylist.clear();
01262 
01263     for (DCPS_IR_Topic_Map::const_iterator
01264          current = participant->topics().begin();
01265          current != participant->topics().end();
01266          ++current) {
01267       keylist.push_back(current->second->get_id());
01268     }
01269 
01270     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
01271       OpenDDS::DCPS::RepoIdConverter converter(candidates[index]);
01272       ACE_DEBUG((LM_DEBUG,
01273                  ACE_TEXT("(%P|%t) (bool)TAO_DDS_DCPSInfo_i::remove_by_owner: ")
01274                  ACE_TEXT("%d topics to remove from participant %C.\n"),
01275                  keylist.size(),
01276                  std::string(converter).c_str()));
01277     }
01278 
01279     for (unsigned int key = 0; key < keylist.size(); ++key) {
01280       DCPS_IR_Topic* discard;
01281 
01282       if (participant->remove_topic_reference(keylist[ key], discard) != 0) {
01283         status = false;
01284       }
01285     }
01286 
01287     // Remove Participant
01288     this->remove_domain_participant(domain, candidates[ index]);
01289   }
01290 
01291   return status;
01292 }
01293 
01294 void
01295 TAO_DDS_DCPSInfo_i::disassociate_participant(
01296   DDS::DomainId_t domainId,
01297   const OpenDDS::DCPS::RepoId& local_id,
01298   const OpenDDS::DCPS::RepoId& remote_id)
01299 {
01300   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
01301 
01302   DCPS_IR_Domain_Map::iterator it(this->domains_.find(domainId));
01303   if (it == this->domains_.end()) {
01304     throw OpenDDS::DCPS::Invalid_Domain();
01305   }
01306 
01307   DCPS_IR_Participant* participant = it->second->participant(local_id);
01308   if (participant == 0) {
01309     throw OpenDDS::DCPS::Invalid_Participant();
01310   }
01311 
01312   // Disassociate from participant temporarily:
01313   const DCPS_IR_Subscription_Map& subscriptions = participant->subscriptions();
01314   for (DCPS_IR_Subscription_Map::const_iterator sub(subscriptions.begin());
01315        sub != subscriptions.end(); ++sub) {
01316     sub->second->disassociate_participant(remote_id, true);
01317   }
01318 
01319   const DCPS_IR_Publication_Map& publications = participant->publications();
01320   for (DCPS_IR_Publication_Map::const_iterator pub(publications.begin());
01321        pub != publications.end(); ++pub) {
01322     pub->second->disassociate_participant(remote_id, true);
01323   }
01324 
01325   it->second->remove_dead_participants();
01326 }
01327 
01328 void
01329 TAO_DDS_DCPSInfo_i::disassociate_subscription(
01330   DDS::DomainId_t domainId,
01331   const OpenDDS::DCPS::RepoId& participantId,
01332   const OpenDDS::DCPS::RepoId& local_id,
01333   const OpenDDS::DCPS::RepoId& remote_id)
01334 {
01335   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
01336 
01337   DCPS_IR_Domain_Map::iterator it(this->domains_.find(domainId));
01338   if (it == this->domains_.end()) {
01339     throw OpenDDS::DCPS::Invalid_Domain();
01340   }
01341 
01342   DCPS_IR_Participant* participant = it->second->participant(participantId);
01343   if (participant == 0) {
01344     throw OpenDDS::DCPS::Invalid_Participant();
01345   }
01346 
01347   if (OpenDDS::DCPS::DCPS_debug_level > 3) {
01348     ACE_DEBUG((LM_INFO, "(%P|%t) disassociating subscription\n"));
01349   }
01350 
01351   DCPS_IR_Subscription* subscription;
01352   if (participant->find_subscription_reference(local_id, subscription)
01353       != 0 || subscription == 0) {
01354     OpenDDS::DCPS::RepoIdConverter part_converter(participantId);
01355     OpenDDS::DCPS::RepoIdConverter sub_converter(local_id);
01356     ACE_ERROR((LM_ERROR,
01357                ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::disassociate_subscription: ")
01358                ACE_TEXT("participant %C could not find subscription %C.\n"),
01359                std::string(part_converter).c_str(),
01360                std::string(sub_converter).c_str()));
01361     throw OpenDDS::DCPS::Invalid_Subscription();
01362   }
01363 
01364   // Disassociate from publication temporarily:
01365   subscription->disassociate_publication(remote_id, true);
01366 
01367   it->second->remove_dead_participants();
01368 }
01369 
01370 void
01371 TAO_DDS_DCPSInfo_i::disassociate_publication(
01372   DDS::DomainId_t domainId,
01373   const OpenDDS::DCPS::RepoId& participantId,
01374   const OpenDDS::DCPS::RepoId& local_id,
01375   const OpenDDS::DCPS::RepoId& remote_id)
01376 {
01377   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
01378 
01379   DCPS_IR_Domain_Map::iterator it(this->domains_.find(domainId));
01380   if (it == this->domains_.end()) {
01381     throw OpenDDS::DCPS::Invalid_Domain();
01382   }
01383 
01384   DCPS_IR_Participant* participant = it->second->participant(participantId);
01385   if (participant == 0) {
01386     throw OpenDDS::DCPS::Invalid_Participant();
01387   }
01388 
01389   if (OpenDDS::DCPS::DCPS_debug_level > 3) {
01390     ACE_DEBUG((LM_INFO, "(%P|%t) disassociating publication\n"));
01391   }
01392 
01393   DCPS_IR_Publication* publication;
01394   if (participant->find_publication_reference(local_id, publication)
01395       != 0 || publication == 0) {
01396     OpenDDS::DCPS::RepoIdConverter part_converter(participantId);
01397     OpenDDS::DCPS::RepoIdConverter pub_converter(local_id);
01398     ACE_ERROR((LM_ERROR,
01399                ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::disassociate_publication: ")
01400                ACE_TEXT("participant %C could not find publication %C.\n"),
01401                std::string(part_converter).c_str(),
01402                std::string(pub_converter).c_str()));
01403     throw OpenDDS::DCPS::Invalid_Publication();
01404   }
01405 
01406   // Disassociate from subscription temporarily:
01407   publication->disassociate_subscription(remote_id, true);
01408 
01409   it->second->remove_dead_participants();
01410 }
01411 
01412 void TAO_DDS_DCPSInfo_i::remove_domain_participant(
01413   DDS::DomainId_t domainId,
01414   const OpenDDS::DCPS::RepoId& participantId)
01415 {
01416   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
01417 
01418   // Grab the domain.
01419   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
01420 
01421   if (where == this->domains_.end()) {
01422     throw OpenDDS::DCPS::Invalid_Domain();
01423   }
01424 
01425   DCPS_IR_Participant* participant = where->second->participant(participantId);
01426 
01427   if (participant == 0) {
01428     OpenDDS::DCPS::RepoIdConverter converter(participantId);
01429     ACE_ERROR((LM_ERROR,
01430                ACE_TEXT("(%P|%t) ERROR: (bool)TAO_DDS_DCPSInfo_i::remove_domain_participant: ")
01431                ACE_TEXT("failed to locate participant %C in domain %d.\n"),
01432                std::string(converter).c_str(),
01433                domainId));
01434     throw OpenDDS::DCPS::Invalid_Participant();
01435   }
01436 
01437   // Determine if we should propagate this event;  we need to cache this
01438   // result as the participant will be gone by the time we use the result.
01439   bool sendUpdate = (participant->isOwner() == true)
01440                     && (participant->isBitPublisher() == false);
01441 
01442   CORBA::Boolean dont_notify_lost = 0;
01443   int status = where->second->remove_participant(participantId, dont_notify_lost);
01444 
01445   if (0 != status) {
01446     // Removing the participant failed
01447     throw OpenDDS::DCPS::Invalid_Participant();
01448   }
01449 
01450   // Update any concerned observers that the participant was destroyed.
01451   if (this->um_ && sendUpdate) {
01452     Update::IdPath path(
01453       where->second->get_id(),
01454       participantId,
01455       participantId);
01456     this->um_->destroy(path, Update::Participant);
01457 
01458     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
01459       OpenDDS::DCPS::RepoIdConverter converter(participantId);
01460       ACE_DEBUG((LM_DEBUG,
01461                  ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::remove_domain_participant: ")
01462                  ACE_TEXT("pushing deletion of participant %C in domain %d.\n"),
01463                  std::string(converter).c_str(),
01464                  domainId));
01465     }
01466   }
01467 }
01468 
01469 void TAO_DDS_DCPSInfo_i::association_complete(DDS::DomainId_t domainId,
01470   const OpenDDS::DCPS::RepoId& participantId,
01471   const OpenDDS::DCPS::RepoId& localId,
01472   const OpenDDS::DCPS::RepoId& remoteId)
01473 {
01474   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
01475 
01476   DCPS_IR_Domain_Map::iterator dom_iter = this->domains_.find(domainId);
01477   if (dom_iter == this->domains_.end()) {
01478     return;
01479   }
01480 
01481   DCPS_IR_Participant* partPtr = dom_iter->second->participant(participantId);
01482   if (0 == partPtr) {
01483     return;
01484   }
01485 
01486   // localId could be pub or sub (initial implementation will only use sub
01487   // since the DataReader is the passive peer)
01488   DCPS_IR_Subscription* sub = 0;
01489   DCPS_IR_Publication* pub = 0;
01490   if (OpenDDS::DCPS::DCPS_debug_level > 3) {
01491     ACE_DEBUG((LM_INFO, "(%P|%t) completing association\n"));
01492   }
01493   if (0 == partPtr->find_subscription_reference(localId, sub)) {
01494     sub->association_complete(remoteId);
01495   } else if (0 == partPtr->find_publication_reference(localId, pub)) {
01496     pub->association_complete(remoteId);
01497   } else {
01498     if (OpenDDS::DCPS::DCPS_debug_level > 3) {
01499       OpenDDS::DCPS::RepoIdConverter part_converter(participantId);
01500       OpenDDS::DCPS::RepoIdConverter local_converter(localId);
01501       OpenDDS::DCPS::RepoIdConverter remote_converter(remoteId);
01502       ACE_DEBUG((LM_WARNING,
01503                  ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i::association_complete: ")
01504                  ACE_TEXT("participant %C could not find subscription or publication %C ")
01505                  ACE_TEXT("to complete association with remote %C.\n"),
01506                  std::string(part_converter).c_str(),
01507                  std::string(local_converter).c_str(),
01508                  std::string(remote_converter).c_str()));
01509     }
01510   }
01511 }
01512 
01513 void TAO_DDS_DCPSInfo_i::ignore_domain_participant(
01514   DDS::DomainId_t domainId,
01515   const OpenDDS::DCPS::RepoId& myParticipantId,
01516   const OpenDDS::DCPS::RepoId& ignoreId)
01517 {
01518   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
01519 
01520   // Grab the domain.
01521   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
01522 
01523   if (where == this->domains_.end()) {
01524     throw OpenDDS::DCPS::Invalid_Domain();
01525   }
01526 
01527   // Grab the participant.
01528   DCPS_IR_Participant* partPtr
01529   = where->second->participant(myParticipantId);
01530 
01531   if (0 == partPtr) {
01532     throw OpenDDS::DCPS::Invalid_Participant();
01533   }
01534 
01535   partPtr->ignore_participant(ignoreId);
01536 
01537   where->second->remove_dead_participants();
01538 }
01539 
01540 void TAO_DDS_DCPSInfo_i::ignore_topic(
01541   DDS::DomainId_t domainId,
01542   const OpenDDS::DCPS::RepoId& myParticipantId,
01543   const OpenDDS::DCPS::RepoId& ignoreId)
01544 {
01545   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
01546 
01547   // Grab the domain.
01548   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
01549 
01550   if (where == this->domains_.end()) {
01551     throw OpenDDS::DCPS::Invalid_Domain();
01552   }
01553 
01554   // Grab the participant.
01555   DCPS_IR_Participant* partPtr
01556   = where->second->participant(myParticipantId);
01557 
01558   if (0 == partPtr) {
01559     throw OpenDDS::DCPS::Invalid_Participant();
01560   }
01561 
01562   partPtr->ignore_topic(ignoreId);
01563 
01564   where->second->remove_dead_participants();
01565 }
01566 
01567 void TAO_DDS_DCPSInfo_i::ignore_subscription(
01568   DDS::DomainId_t domainId,
01569   const OpenDDS::DCPS::RepoId& myParticipantId,
01570   const OpenDDS::DCPS::RepoId& ignoreId)
01571 {
01572   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
01573 
01574   // Grab the domain.
01575   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
01576 
01577   if (where == this->domains_.end()) {
01578     throw OpenDDS::DCPS::Invalid_Domain();
01579   }
01580 
01581   // Grab the participant.
01582   DCPS_IR_Participant* partPtr
01583   = where->second->participant(myParticipantId);
01584 
01585   if (0 == partPtr) {
01586     throw OpenDDS::DCPS::Invalid_Participant();
01587   }
01588 
01589   partPtr->ignore_subscription(ignoreId);
01590 
01591   where->second->remove_dead_participants();
01592 }
01593 
01594 void TAO_DDS_DCPSInfo_i::ignore_publication(
01595   DDS::DomainId_t domainId,
01596   const OpenDDS::DCPS::RepoId& myParticipantId,
01597   const OpenDDS::DCPS::RepoId& ignoreId)
01598 {
01599   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
01600 
01601   // Grab the domain.
01602   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
01603 
01604   if (where == this->domains_.end()) {
01605     throw OpenDDS::DCPS::Invalid_Domain();
01606   }
01607 
01608   // Grab the participant.
01609   DCPS_IR_Participant* partPtr
01610   = where->second->participant(myParticipantId);
01611 
01612   if (0 == partPtr) {
01613     throw OpenDDS::DCPS::Invalid_Participant();
01614   }
01615 
01616   partPtr->ignore_publication(ignoreId);
01617 
01618   where->second->remove_dead_participants();
01619 }
01620 
01621 CORBA::Boolean TAO_DDS_DCPSInfo_i::update_publication_qos(
01622   DDS::DomainId_t domainId,
01623   const OpenDDS::DCPS::RepoId& partId,
01624   const OpenDDS::DCPS::RepoId& dwId,
01625   const DDS::DataWriterQos & qos,
01626   const DDS::PublisherQos & publisherQos)
01627 {
01628   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, 0);
01629 
01630   // Grab the domain.
01631   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
01632 
01633   if (where == this->domains_.end()) {
01634     throw OpenDDS::DCPS::Invalid_Domain();
01635   }
01636 
01637   // Grab the participant.
01638   DCPS_IR_Participant* partPtr
01639   = where->second->participant(partId);
01640 
01641   if (0 == partPtr) {
01642     throw OpenDDS::DCPS::Invalid_Participant();
01643   }
01644 
01645   if (OpenDDS::DCPS::DCPS_debug_level > 3) {
01646     ACE_DEBUG((LM_INFO, "(%P|%t) updating  publication qos 1\n"));
01647   }
01648 
01649   DCPS_IR_Publication* pub;
01650 
01651   if (partPtr->find_publication_reference(dwId, pub) != 0 || pub == 0) {
01652     OpenDDS::DCPS::RepoIdConverter part_converter(partId);
01653     OpenDDS::DCPS::RepoIdConverter pub_converter(dwId);
01654     ACE_ERROR((LM_ERROR,
01655                ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::update_publication_qos: ")
01656                ACE_TEXT("participant %C could not find publication %C.\n"),
01657                std::string(part_converter).c_str(),
01658                std::string(pub_converter).c_str()));
01659     throw OpenDDS::DCPS::Invalid_Publication();
01660   }
01661 
01662   Update::SpecificQos qosType;
01663 
01664   if (pub->set_qos(qos, publisherQos, qosType) == false)  // failed
01665     return 0;
01666 
01667   if (this->um_ && (partPtr->isBitPublisher() == false)) {
01668     Update::IdPath path(domainId, partId, dwId);
01669 
01670     switch (qosType) {
01671     case Update::DataWriterQos:
01672       this->um_->update(path, qos);
01673       break;
01674 
01675     case Update::PublisherQos:
01676       this->um_->update(path, publisherQos);
01677       break;
01678 
01679     case Update::NoQos:
01680     default:
01681       break;
01682     }
01683 
01684     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
01685       OpenDDS::DCPS::RepoIdConverter converter(dwId);
01686       ACE_DEBUG((LM_DEBUG,
01687                  ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::update_publication_qos: ")
01688                  ACE_TEXT("pushing update of publication %C in domain %d.\n"),
01689                  std::string(converter).c_str(),
01690                  domainId));
01691     }
01692   }
01693 
01694   return 1;
01695 }
01696 
01697 void
01698 TAO_DDS_DCPSInfo_i::update_publication_qos(
01699   DDS::DomainId_t            domainId,
01700   const OpenDDS::DCPS::RepoId& partId,
01701   const OpenDDS::DCPS::RepoId& dwId,
01702   const DDS::DataWriterQos&  qos)
01703 {
01704   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
01705 
01706   // Grab the domain.
01707   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
01708 
01709   if (where == this->domains_.end()) {
01710     throw OpenDDS::DCPS::Invalid_Domain();
01711   }
01712 
01713   // Grab the participant.
01714   DCPS_IR_Participant* partPtr
01715   = where->second->participant(partId);
01716 
01717   if (0 == partPtr) {
01718     throw OpenDDS::DCPS::Invalid_Participant();
01719   }
01720 
01721   if (OpenDDS::DCPS::DCPS_debug_level > 3) {
01722     ACE_DEBUG((LM_INFO, "(%P|%t) updating  publication qos 2\n"));
01723   }
01724 
01725   DCPS_IR_Publication* pub;
01726 
01727   if (partPtr->find_publication_reference(dwId, pub) != 0 || pub == 0) {
01728     OpenDDS::DCPS::RepoIdConverter part_converter(partId);
01729     OpenDDS::DCPS::RepoIdConverter pub_converter(dwId);
01730     ACE_ERROR((LM_ERROR,
01731                ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::update_publication_qos: ")
01732                ACE_TEXT("participant %C could not find publication %C.\n"),
01733                std::string(part_converter).c_str(),
01734                std::string(pub_converter).c_str()));
01735     throw OpenDDS::DCPS::Invalid_Publication();
01736   }
01737 
01738   pub->set_qos(qos);
01739 }
01740 
01741 void
01742 TAO_DDS_DCPSInfo_i::update_publication_qos(
01743   DDS::DomainId_t            domainId,
01744   const OpenDDS::DCPS::RepoId& partId,
01745   const OpenDDS::DCPS::RepoId& dwId,
01746   const DDS::PublisherQos&   qos)
01747 {
01748   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
01749 
01750   // Grab the domain.
01751   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
01752 
01753   if (where == this->domains_.end()) {
01754     throw OpenDDS::DCPS::Invalid_Domain();
01755   }
01756 
01757   // Grab the participant.
01758   DCPS_IR_Participant* partPtr
01759   = where->second->participant(partId);
01760 
01761   if (0 == partPtr) {
01762     throw OpenDDS::DCPS::Invalid_Participant();
01763   }
01764 
01765   if (OpenDDS::DCPS::DCPS_debug_level > 3) {
01766     ACE_DEBUG((LM_INFO, "(%P|%t) updating  publication qos 3\n"));
01767   }
01768 
01769   DCPS_IR_Publication* pub;
01770 
01771   if (partPtr->find_publication_reference(dwId, pub) != 0 || pub == 0) {
01772     OpenDDS::DCPS::RepoIdConverter part_converter(partId);
01773     OpenDDS::DCPS::RepoIdConverter pub_converter(dwId);
01774     ACE_ERROR((LM_ERROR,
01775                ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::update_publication_qos: ")
01776                ACE_TEXT("participant %C could not find publication %C.\n"),
01777                std::string(part_converter).c_str(),
01778                std::string(pub_converter).c_str()));
01779     throw OpenDDS::DCPS::Invalid_Publication();
01780   }
01781 
01782   pub->set_qos(qos);
01783 }
01784 
01785 CORBA::Boolean TAO_DDS_DCPSInfo_i::update_subscription_qos(
01786   DDS::DomainId_t domainId,
01787   const OpenDDS::DCPS::RepoId& partId,
01788   const OpenDDS::DCPS::RepoId& drId,
01789   const DDS::DataReaderQos & qos,
01790   const DDS::SubscriberQos & subscriberQos)
01791 {
01792   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, 0);
01793 
01794   // Grab the domain.
01795   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
01796 
01797   if (where == this->domains_.end()) {
01798     throw OpenDDS::DCPS::Invalid_Domain();
01799   }
01800 
01801   // Grab the participant.
01802   DCPS_IR_Participant* partPtr
01803   = where->second->participant(partId);
01804 
01805   if (0 == partPtr) {
01806     throw OpenDDS::DCPS::Invalid_Participant();
01807   }
01808 
01809   DCPS_IR_Subscription* sub;
01810 
01811   if (OpenDDS::DCPS::DCPS_debug_level > 3) {
01812     ACE_DEBUG((LM_INFO, "(%P|%t) updating QOS for subscription 1\n"));
01813   }
01814 
01815   if (partPtr->find_subscription_reference(drId, sub) != 0 || sub == 0) {
01816     OpenDDS::DCPS::RepoIdConverter part_converter(partId);
01817     OpenDDS::DCPS::RepoIdConverter sub_converter(drId);
01818     ACE_ERROR((LM_ERROR,
01819                ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::update_subscription_qos: ")
01820                ACE_TEXT("participant %C could not find subscription %C.\n"),
01821                std::string(part_converter).c_str(),
01822                std::string(sub_converter).c_str()));
01823     throw OpenDDS::DCPS::Invalid_Subscription();
01824   }
01825 
01826   Update::SpecificQos qosType;
01827 
01828   if (sub->set_qos(qos, subscriberQos, qosType) == false) // failed
01829     return 0;
01830 
01831   if (this->um_ && (partPtr->isBitPublisher() == false)) {
01832     Update::IdPath path(domainId, partId, drId);
01833 
01834     switch (qosType) {
01835     case Update::DataReaderQos:
01836       this->um_->update(path, qos);
01837       break;
01838 
01839     case Update::SubscriberQos:
01840       this->um_->update(path, subscriberQos);
01841       break;
01842 
01843     case Update::NoQos:
01844     default:
01845       break;
01846     }
01847 
01848     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
01849       OpenDDS::DCPS::RepoIdConverter converter(drId);
01850       ACE_DEBUG((LM_DEBUG,
01851                  ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::update_subscription_qos: ")
01852                  ACE_TEXT("pushing update of subscription %C in domain %d.\n"),
01853                  std::string(converter).c_str(),
01854                  domainId));
01855     }
01856   }
01857 
01858   return 1;
01859 }
01860 
01861 void
01862 TAO_DDS_DCPSInfo_i::update_subscription_qos(
01863   DDS::DomainId_t            domainId,
01864   const OpenDDS::DCPS::RepoId& partId,
01865   const OpenDDS::DCPS::RepoId& drId,
01866   const DDS::DataReaderQos&  qos)
01867 {
01868   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
01869 
01870   // Grab the domain.
01871   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
01872 
01873   if (where == this->domains_.end()) {
01874     throw OpenDDS::DCPS::Invalid_Domain();
01875   }
01876 
01877   // Grab the participant.
01878   DCPS_IR_Participant* partPtr
01879   = where->second->participant(partId);
01880 
01881   if (0 == partPtr) {
01882     throw OpenDDS::DCPS::Invalid_Participant();
01883   }
01884 
01885   DCPS_IR_Subscription* sub;
01886 
01887   if (OpenDDS::DCPS::DCPS_debug_level > 3) {
01888     ACE_DEBUG((LM_INFO, "(%P|%t) updating QOS for subscription 2\n"));
01889   }
01890 
01891   if (partPtr->find_subscription_reference(drId, sub) != 0 || sub == 0) {
01892     OpenDDS::DCPS::RepoIdConverter part_converter(partId);
01893     OpenDDS::DCPS::RepoIdConverter sub_converter(drId);
01894     ACE_ERROR((LM_ERROR,
01895                ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::update_subscription_qos: ")
01896                ACE_TEXT("participant %C could not find subscription %C.\n"),
01897                std::string(part_converter).c_str(),
01898                std::string(sub_converter).c_str()));
01899     throw OpenDDS::DCPS::Invalid_Subscription();
01900   }
01901 
01902   sub->set_qos(qos);
01903 }
01904 
01905 void
01906 TAO_DDS_DCPSInfo_i::update_subscription_qos(
01907   DDS::DomainId_t            domainId,
01908   const OpenDDS::DCPS::RepoId& partId,
01909   const OpenDDS::DCPS::RepoId& drId,
01910   const DDS::SubscriberQos&  qos)
01911 {
01912   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
01913 
01914   // Grab the domain.
01915   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
01916 
01917   if (where == this->domains_.end()) {
01918     throw OpenDDS::DCPS::Invalid_Domain();
01919   }
01920 
01921   // Grab the participant.
01922   DCPS_IR_Participant* partPtr
01923   = where->second->participant(partId);
01924 
01925   if (0 == partPtr) {
01926     throw OpenDDS::DCPS::Invalid_Participant();
01927   }
01928 
01929   DCPS_IR_Subscription* sub;
01930 
01931   if (OpenDDS::DCPS::DCPS_debug_level > 3) {
01932     ACE_DEBUG((LM_INFO, "(%P|%t) updating QOS for subscription 3\n"));
01933   }
01934 
01935   if (partPtr->find_subscription_reference(drId, sub) != 0 || sub == 0) {
01936     OpenDDS::DCPS::RepoIdConverter part_converter(partId);
01937     OpenDDS::DCPS::RepoIdConverter sub_converter(drId);
01938     ACE_ERROR((LM_ERROR,
01939                ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::update_subscription_qos: ")
01940                ACE_TEXT("participant %C could not find subscription %C.\n"),
01941                std::string(part_converter).c_str(),
01942                std::string(sub_converter).c_str()));
01943     throw OpenDDS::DCPS::Invalid_Subscription();
01944   }
01945 
01946   sub->set_qos(qos);
01947 }
01948 
01949 CORBA::Boolean
01950 TAO_DDS_DCPSInfo_i::update_subscription_params(
01951     DDS::DomainId_t domainId,
01952     const OpenDDS::DCPS::RepoId& participantId,
01953     const OpenDDS::DCPS::RepoId& subscriptionId,
01954     const DDS::StringSeq& params)
01955 {
01956   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, 0);
01957 
01958   DCPS_IR_Domain_Map::iterator domain = this->domains_.find(domainId);
01959   if (domain == this->domains_.end()) {
01960     throw OpenDDS::DCPS::Invalid_Domain();
01961   }
01962 
01963   DCPS_IR_Participant* partPtr = domain->second->participant(participantId);
01964   if (0 == partPtr) {
01965     throw OpenDDS::DCPS::Invalid_Participant();
01966   }
01967 
01968   if (OpenDDS::DCPS::DCPS_debug_level > 3) {
01969     ACE_DEBUG((LM_INFO, "(%P|%t) updating subscription params\n"));
01970   }
01971 
01972   DCPS_IR_Subscription* sub;
01973   if (partPtr->find_subscription_reference(subscriptionId, sub) != 0) {
01974     OpenDDS::DCPS::RepoIdConverter part_converter(participantId);
01975     OpenDDS::DCPS::RepoIdConverter sub_converter(subscriptionId);
01976     ACE_ERROR((LM_ERROR,
01977                ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::update_subscription_params: ")
01978                ACE_TEXT("participant %C could not find subscription %C.\n"),
01979                std::string(part_converter).c_str(),
01980                std::string(sub_converter).c_str()));
01981     throw OpenDDS::DCPS::Invalid_Subscription();
01982   }
01983 
01984   sub->update_expr_params(params);  // calls writers via DataWriterRemote
01985 
01986   if (this->um_ && !partPtr->isBitPublisher()) {
01987     Update::IdPath path(domainId, participantId, subscriptionId);
01988     this->um_->update(path, params);
01989   }
01990 
01991   return true;
01992 }
01993 
01994 CORBA::Boolean TAO_DDS_DCPSInfo_i::update_topic_qos(
01995   const OpenDDS::DCPS::RepoId& topicId,
01996   DDS::DomainId_t domainId,
01997   const OpenDDS::DCPS::RepoId& participantId,
01998   const DDS::TopicQos & qos)
01999 {
02000   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, 0);
02001 
02002   // Grab the domain.
02003   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
02004 
02005   if (where == this->domains_.end()) {
02006     throw OpenDDS::DCPS::Invalid_Domain();
02007   }
02008 
02009   // Grab the participant.
02010   DCPS_IR_Participant* partPtr
02011   = where->second->participant(participantId);
02012 
02013   if (0 == partPtr) {
02014     throw OpenDDS::DCPS::Invalid_Participant();
02015   }
02016 
02017   DCPS_IR_Topic* topic;
02018 
02019   if (partPtr->find_topic_reference(topicId, topic) != 0) {
02020     throw OpenDDS::DCPS::Invalid_Topic();
02021   }
02022 
02023   if (topic->set_topic_qos(qos) == false)
02024     return 0;
02025 
02026   if (this->um_
02027       && (partPtr->isOwner() == true)
02028       && (partPtr->isBitPublisher() == false)) {
02029     Update::IdPath path(domainId, participantId, topicId);
02030     this->um_->update(path, qos);
02031 
02032     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
02033       OpenDDS::DCPS::RepoIdConverter converter(topicId);
02034       ACE_DEBUG((LM_DEBUG,
02035                  ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::update_topic_qos: ")
02036                  ACE_TEXT("pushing update of topic %C in domain %d.\n"),
02037                  std::string(converter).c_str(),
02038                  domainId));
02039     }
02040   }
02041 
02042   return 1;
02043 }
02044 
02045 CORBA::Boolean TAO_DDS_DCPSInfo_i::update_domain_participant_qos(
02046   DDS::DomainId_t domainId,
02047   const OpenDDS::DCPS::RepoId& participantId,
02048   const DDS::DomainParticipantQos & qos)
02049 {
02050   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, 0);
02051 
02052   // Grab the domain.
02053   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
02054 
02055   if (where == this->domains_.end()) {
02056     throw OpenDDS::DCPS::Invalid_Domain();
02057   }
02058 
02059   // Grab the participant.
02060   DCPS_IR_Participant* partPtr
02061   = where->second->participant(participantId);
02062 
02063   if (0 == partPtr) {
02064     throw OpenDDS::DCPS::Invalid_Participant();
02065   }
02066 
02067   if (partPtr->set_qos(qos) == false)
02068     return 0;
02069 
02070   if (this->um_
02071       && (partPtr->isOwner() == true)
02072       && (partPtr->isBitPublisher() == false)) {
02073     Update::IdPath path(domainId, participantId, participantId);
02074     this->um_->update(path, qos);
02075 
02076     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
02077       OpenDDS::DCPS::RepoIdConverter converter(participantId);
02078       ACE_DEBUG((LM_DEBUG,
02079                  ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::update_domain_participant_qos: ")
02080                  ACE_TEXT("pushing update of participant %C in domain %d.\n"),
02081                  std::string(converter).c_str(),
02082                  domainId));
02083     }
02084   }
02085 
02086   return 1;
02087 }
02088 
02089 DCPS_IR_Domain*
02090 TAO_DDS_DCPSInfo_i::domain(DDS::DomainId_t domain)
02091 {
02092   if (domain == OpenDDS::DCPS::Service_Participant::ANY_DOMAIN) {
02093     ACE_ERROR((LM_ERROR,
02094                ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::domain: ")
02095                ACE_TEXT("ANY_DOMAIN not supported for operations.\n")));
02096     return 0;
02097   }
02098 
02099   // Check if the domain is already in the map.
02100   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domain);
02101 
02102   if (where == this->domains_.end()) {
02103     // We will attempt to insert a new domain, go ahead and allocate it.
02104     DCPS_IR_Domain* domainPtr;
02105     ACE_NEW_RETURN(domainPtr,
02106                    DCPS_IR_Domain(domain, this->participantIdGenerator_),
02107                    0);
02108 
02109     // We need to insert the domain into the map at this time since it
02110     // might be looked up during the init_built_in_topics() call.
02111     this->domains_.insert(
02112       where,
02113       DCPS_IR_Domain_Map::value_type(domain, domainPtr));
02114 
02115     int bit_status = 0;
02116 
02117     if (TheServiceParticipant->get_BIT()) {
02118 #if !defined (DDS_HAS_MINIMUM_BIT)
02119       bit_status = domainPtr->init_built_in_topics(this->federation_.overridden());
02120 #endif // !defined (DDS_HAS_MINIMUM_BIT)
02121     }
02122 
02123     if (0 != bit_status) {
02124       ACE_ERROR((LM_ERROR,
02125                  ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::domain: ")
02126                  ACE_TEXT("failed to initialize the Built-In Topics ")
02127                  ACE_TEXT("when loading domain %d.\n"),
02128                  domain));
02129       this->domains_.erase(domain);
02130       delete domainPtr;
02131       return 0;
02132     }
02133 
02134     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
02135       ACE_DEBUG((LM_DEBUG,
02136                  ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::domain: ")
02137                  ACE_TEXT("successfully loaded domain %d at %x.\n"),
02138                  domain,
02139                  domainPtr));
02140     }
02141     return domainPtr;
02142 
02143   } else {
02144     return where->second;
02145   }
02146 }
02147 
02148 int TAO_DDS_DCPSInfo_i::init_transport(int listen_address_given,
02149                                        const char* listen_str)
02150 {
02151   int status = 0;
02152 
02153   try {
02154 
02155 #ifndef ACE_AS_STATIC_LIBS
02156     if (ACE_Service_Config::current()->find(ACE_TEXT("OpenDDS_Tcp"))
02157         < 0 /* not found (-1) or suspended (-2) */) {
02158       static const ACE_TCHAR directive[] =
02159         ACE_TEXT("dynamic OpenDDS_Tcp Service_Object * ")
02160         ACE_TEXT("OpenDDS_Tcp:_make_TcpLoader()");
02161       ACE_Service_Config::process_directive(directive);
02162     }
02163 #endif
02164 
02165     std::string config_name =
02166       OpenDDS::DCPS::TransportRegistry::DEFAULT_INST_PREFIX
02167       + std::string("InfoRepoBITTransportConfig");
02168     OpenDDS::DCPS::TransportConfig_rch config =
02169       OpenDDS::DCPS::TransportRegistry::instance()->create_config(config_name);
02170 
02171     std::string inst_name =
02172       OpenDDS::DCPS::TransportRegistry::DEFAULT_INST_PREFIX
02173       + std::string("InfoRepoBITTCPTransportInst");
02174     OpenDDS::DCPS::TransportInst_rch inst =
02175       OpenDDS::DCPS::TransportRegistry::instance()->create_inst(inst_name,
02176                                                                "tcp");
02177     config->instances_.push_back(inst);
02178 
02179     OpenDDS::DCPS::TcpInst_rch tcp_inst =
02180       OpenDDS::DCPS::dynamic_rchandle_cast<OpenDDS::DCPS::TcpInst>(inst);
02181     inst->datalink_release_delay_ = 0;
02182 
02183     tcp_inst->conn_retry_attempts_ = 0;
02184 
02185     if (listen_address_given) {
02186       tcp_inst->local_address(listen_str);
02187     }
02188 
02189   } catch (...) {
02190     // TransportRegistry is extremely varied in the exceptions that
02191     // it throws on failure; do not allow exceptions to bubble up
02192     // beyond this point.
02193     status = 1;
02194   }
02195   return status;
02196 }
02197 
02198 bool
02199 TAO_DDS_DCPSInfo_i::receive_image(const Update::UImage& image)
02200 {
02201   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
02202     ACE_DEBUG((LM_DEBUG,
02203                ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::receive_image: ")
02204                ACE_TEXT("processing persistent data.\n")));
02205   }
02206 
02207   // Ensure that new BIT participants do not reuse an id
02208   for (Update::UImage::ParticipantSeq::const_iterator
02209        iter = image.participants.begin();
02210        iter != image.participants.end(); iter++) {
02211     const Update::UParticipant* part = *iter;
02212     OpenDDS::DCPS::RepoIdConverter converter(part->participantId);
02213     if (converter.federationId() == this->federation_.id()) {
02214       participantIdGenerator_.last(converter.participantId());
02215     }
02216   }
02217 
02218   for (Update::UImage::ParticipantSeq::const_iterator
02219        iter = image.participants.begin();
02220        iter != image.participants.end(); iter++) {
02221     const Update::UParticipant* part = *iter;
02222 
02223     if (!this->add_domain_participant(part->domainId, part->participantId
02224                                       , part->participantQos)) {
02225       OpenDDS::DCPS::RepoIdConverter converter(part->participantId);
02226       ACE_ERROR((LM_ERROR,
02227                  ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::receive_image: ")
02228                  ACE_TEXT("failed to add participant %C to domain %d.\n"),
02229                  std::string(converter).c_str(),
02230                  part->domainId));
02231       return false;
02232 
02233     } else if (OpenDDS::DCPS::DCPS_debug_level > 0) {
02234       OpenDDS::DCPS::RepoIdConverter converter(part->participantId);
02235       ACE_DEBUG((LM_DEBUG,
02236                  ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::receive_image: ")
02237                  ACE_TEXT("added participant %C to domain %d.\n"),
02238                  std::string(converter).c_str(),
02239                  part->domainId));
02240     }
02241   }
02242 
02243   for (Update::UImage::TopicSeq::const_iterator iter = image.topics.begin();
02244        iter != image.topics.end(); iter++) {
02245     const Update::UTopic* topic = *iter;
02246 
02247     if (!this->add_topic(topic->topicId, topic->domainId
02248                          , topic->participantId, topic->name.c_str()
02249                          , topic->dataType.c_str(), topic->topicQos)) {
02250       OpenDDS::DCPS::RepoIdConverter topic_converter(topic->topicId);
02251       OpenDDS::DCPS::RepoIdConverter part_converter(topic->participantId);
02252       ACE_ERROR((LM_ERROR,
02253                  ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::receive_image: ")
02254                  ACE_TEXT("failed to add topic %C to participant %C.\n"),
02255                  std::string(topic_converter).c_str(),
02256                  std::string(part_converter).c_str()));
02257       return false;
02258 
02259     } else if (OpenDDS::DCPS::DCPS_debug_level > 0) {
02260       OpenDDS::DCPS::RepoIdConverter topic_converter(topic->topicId);
02261       OpenDDS::DCPS::RepoIdConverter part_converter(topic->participantId);
02262       ACE_DEBUG((LM_DEBUG,
02263                  ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::receive_image: ")
02264                  ACE_TEXT("added topic %C to participant %C.\n"),
02265                  std::string(topic_converter).c_str(),
02266                  std::string(part_converter).c_str()));
02267     }
02268   }
02269 
02270   for (Update::UImage::ReaderSeq::const_iterator iter = image.actors.begin();
02271        iter != image.actors.end(); iter++) {
02272     const Update::URActor* sub = *iter;
02273 
02274     // no reason to associate, there are no publishers yet to associate with
02275     if (!this->add_subscription(sub->domainId, sub->participantId
02276                                 , sub->topicId, sub->actorId
02277                                 , sub->callback.c_str(), sub->drdwQos
02278                                 , sub->transportInterfaceInfo
02279                                 , sub->pubsubQos
02280                                 , sub->contentSubscriptionProfile.filterClassName
02281                                 , sub->contentSubscriptionProfile.filterExpr
02282                                 , sub->contentSubscriptionProfile.exprParams)) {
02283       OpenDDS::DCPS::RepoIdConverter sub_converter(sub->actorId);
02284       OpenDDS::DCPS::RepoIdConverter part_converter(sub->participantId);
02285       ACE_ERROR((LM_ERROR,
02286                  ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::receive_image: ")
02287                  ACE_TEXT("failed to add subscription %C to participant %C.\n"),
02288                  std::string(sub_converter).c_str(),
02289                  std::string(part_converter).c_str()));
02290       return false;
02291 
02292     } else if (OpenDDS::DCPS::DCPS_debug_level > 0) {
02293       OpenDDS::DCPS::RepoIdConverter sub_converter(sub->actorId);
02294       OpenDDS::DCPS::RepoIdConverter part_converter(sub->participantId);
02295       ACE_DEBUG((LM_DEBUG,
02296                  ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::receive_image: ")
02297                  ACE_TEXT("added subscription %C to participant %C.\n"),
02298                  std::string(sub_converter).c_str(),
02299                  std::string(part_converter).c_str()));
02300     }
02301   }
02302 
02303   for (Update::UImage::WriterSeq::const_iterator iter = image.wActors.begin();
02304        iter != image.wActors.end(); iter++) {
02305     const Update::UWActor* pub = *iter;
02306 
02307     // try to associate with any persisted subscriptions to track any expected
02308     // existing associations
02309     if (!this->add_publication(pub->domainId, pub->participantId
02310                                , pub->topicId, pub->actorId
02311                                , pub->callback.c_str() , pub->drdwQos
02312                                , pub->transportInterfaceInfo
02313                                , pub->pubsubQos
02314                                , true)) {
02315       OpenDDS::DCPS::RepoIdConverter pub_converter(pub->actorId);
02316       OpenDDS::DCPS::RepoIdConverter part_converter(pub->participantId);
02317       ACE_ERROR((LM_ERROR,
02318                  ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::receive_image: ")
02319                  ACE_TEXT("failed to add publication %C to participant %C.\n"),
02320                  std::string(pub_converter).c_str(),
02321                  std::string(part_converter).c_str()));
02322       return false;
02323 
02324     } else if (OpenDDS::DCPS::DCPS_debug_level > 0) {
02325       OpenDDS::DCPS::RepoIdConverter pub_converter(pub->actorId);
02326       OpenDDS::DCPS::RepoIdConverter part_converter(pub->participantId);
02327       ACE_DEBUG((LM_DEBUG,
02328                  ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::receive_image: ")
02329                  ACE_TEXT("added publication %C to participant %C.\n"),
02330                  std::string(pub_converter).c_str(),
02331                  std::string(part_converter).c_str()));
02332     }
02333   }
02334 
02335 #if !defined (DDS_HAS_MINIMUM_BIT)
02336   // reassociate the bit publisher and subscribers
02337   for (DCPS_IR_Domain_Map::const_iterator currentDomain = domains_.begin();
02338        currentDomain != domains_.end();
02339        ++currentDomain) {
02340 
02341     currentDomain->second->reassociate_built_in_topic_pubs();
02342   }
02343 #endif // !defined (DDS_HAS_MINIMUM_BIT)
02344 
02345   return true;
02346 }
02347 
02348 void
02349 TAO_DDS_DCPSInfo_i::add(Update::Updater* updater)
02350 {
02351   if (this->um_) {
02352     this->um_->add(updater);
02353   }
02354 }
02355 
02356 bool
02357 TAO_DDS_DCPSInfo_i::init_persistence()
02358 {
02359   um_ = ACE_Dynamic_Service<UpdateManagerSvc>::instance
02360         ("UpdateManagerSvc");
02361 
02362   if (um_ != 0) {
02363     um_->add(this);
02364 
02365     // Request persistent image.
02366     if (reincarnate_) {
02367       um_->requestImage();
02368     }
02369 
02370   } else {
02371     ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("TAO_DDS_DCPSInfo_i> Failed to discover ")
02372                       ACE_TEXT("UpdateManagerSvc.\n")), false);
02373   }
02374 
02375   return true;
02376 }
02377 
02378 bool
02379 TAO_DDS_DCPSInfo_i::init_reassociation(const ACE_Time_Value& delay)
02380 {
02381   if (this->reassociate_timer_id_ != -1) return false;  // already scheduled
02382 
02383   ACE_Reactor* reactor = this->orb_->orb_core()->reactor();
02384 
02385   this->reassociate_timer_id_ = reactor->schedule_timer(this, 0, delay, delay);
02386   return this->reassociate_timer_id_ != -1;
02387 }
02388 
02389 bool
02390 TAO_DDS_DCPSInfo_i::init_dispatchChecking(const ACE_Time_Value& delay)
02391 {
02392   if (this->dispatch_check_timer_id_ != -1) return false;  // already scheduled
02393 
02394   ACE_Reactor* reactor = this->orb_->orb_core()->reactor();
02395 
02396   this->dispatch_check_timer_id_ = reactor->schedule_timer(this, this, delay, delay);
02397   return this->dispatch_check_timer_id_ != -1;
02398 }
02399 
02400 void
02401 TAO_DDS_DCPSInfo_i::finalize()
02402 {
02403   if (reassociate_timer_id_ != -1) {
02404     ACE_Reactor* reactor = this->orb_->orb_core()->reactor();
02405 
02406     reactor->cancel_timer(this->reassociate_timer_id_);
02407     this->reassociate_timer_id_ = -1;
02408   }
02409 
02410   if (dispatch_check_timer_id_ != -1) {
02411     ACE_Reactor* reactor = this->orb_->orb_core()->reactor();
02412 
02413     reactor->cancel_timer(this->dispatch_check_timer_id_);
02414     this->dispatch_check_timer_id_ = -1;
02415   }
02416 }
02417 
02418 const DCPS_IR_Domain_Map&
02419 TAO_DDS_DCPSInfo_i::domains() const
02420 {
02421   return this->domains_;
02422 }
02423 
02424 
02425 char*
02426 TAO_DDS_DCPSInfo_i::dump_to_string()
02427 {
02428   std::string dump;
02429 #if !defined (OPENDDS_INFOREPO_REDUCED_FOOTPRINT)
02430   std::string indent ("    ");
02431 
02432   for (DCPS_IR_Domain_Map::const_iterator dm = domains_.begin();
02433        dm != domains_.end();
02434        dm++)
02435   {
02436     dump += dm->second->dump_to_string(indent, 0);
02437   }
02438 #endif // !defined (OPENDDS_INFOREPO_REDUCED_FOOTPRINT)
02439   return CORBA::string_dup(dump.c_str());
02440 
02441 }

Generated on Fri Feb 12 20:05:20 2016 for OpenDDS by  doxygen 1.4.7