TAO_DDS_DCPSInfo_i Class Reference

Implementation of the DCPSInfo. More...

#include <DCPSInfo_i.h>

Collaboration diagram for TAO_DDS_DCPSInfo_i:

Collaboration graph
[legend]
List of all members.

Public Member Functions

 TAO_DDS_DCPSInfo_i (CORBA::ORB_ptr orb, bool reincarnate, ShutdownInterface *shutdown, const TAO_DDS_DCPSFederationId &federation)
virtual ~TAO_DDS_DCPSInfo_i ()
virtual int handle_timeout (const ACE_Time_Value &now, const void *arg)
virtual CORBA::Boolean attach_participant (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &participantId)
virtual OpenDDS::DCPS::TopicStatus assert_topic (OpenDDS::DCPS::RepoId_out topicId, DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &participantId, const char *topicName, const char *dataTypeName, const DDS::TopicQos &qos, bool hasDcpsKey)
bool add_topic (const OpenDDS::DCPS::RepoId &topicId, DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &participantId, const char *topicName, const char *dataTypeName, const DDS::TopicQos &qos)
 Add a previously existing topic to the repository.
virtual OpenDDS::DCPS::TopicStatus find_topic (DDS::DomainId_t domainId, const char *topicName, CORBA::String_out dataTypeName, DDS::TopicQos_out qos, OpenDDS::DCPS::RepoId_out topicId)
virtual OpenDDS::DCPS::TopicStatus remove_topic (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &participantId, const OpenDDS::DCPS::RepoId &topicId)
virtual OpenDDS::DCPS::RepoId add_publication (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &participantId, const OpenDDS::DCPS::RepoId &topicId, OpenDDS::DCPS::DataWriterRemote_ptr publication, const DDS::DataWriterQos &qos, const OpenDDS::DCPS::TransportLocatorSeq &transInfo, const DDS::PublisherQos &publisherQos)
bool add_publication (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &participantId, const OpenDDS::DCPS::RepoId &topicId, const OpenDDS::DCPS::RepoId &pubId, const char *pub_str, const DDS::DataWriterQos &qos, const OpenDDS::DCPS::TransportLocatorSeq &transInfo, const DDS::PublisherQos &publisherQos, bool associate=false)
 Add a previously existing publication to the repository.
virtual void remove_publication (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &participantId, const OpenDDS::DCPS::RepoId &publicationId)
virtual OpenDDS::DCPS::RepoId add_subscription (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &participantId, const OpenDDS::DCPS::RepoId &topicId, OpenDDS::DCPS::DataReaderRemote_ptr subscription, const DDS::DataReaderQos &qos, const OpenDDS::DCPS::TransportLocatorSeq &transInfo, const DDS::SubscriberQos &subscriberQos, const char *filterClassName, const char *filterExpression, const DDS::StringSeq &exprParams)
bool add_subscription (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &participantId, const OpenDDS::DCPS::RepoId &topicId, const OpenDDS::DCPS::RepoId &subId, const char *sub_str, const DDS::DataReaderQos &qos, const OpenDDS::DCPS::TransportLocatorSeq &transInfo, const DDS::SubscriberQos &subscriberQos, const char *filterClassName, const char *filterExpression, const DDS::StringSeq &exprParams, bool associate=false)
 Add a previously existing subscription to the repository.
virtual void remove_subscription (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &participantId, const OpenDDS::DCPS::RepoId &subscriptionId)
virtual OpenDDS::DCPS::AddDomainStatus add_domain_participant (DDS::DomainId_t domain, const DDS::DomainParticipantQos &qos)
bool add_domain_participant (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &participantId, const DDS::DomainParticipantQos &qos)
 Add a previously existing participant to the repository.
virtual void remove_domain_participant (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &participantId)
virtual void association_complete (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &participantId, const OpenDDS::DCPS::RepoId &localId, const OpenDDS::DCPS::RepoId &remoteId)
bool remove_by_owner (DDS::DomainId_t domain, long owner)
virtual void disassociate_participant (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &local_id, const OpenDDS::DCPS::RepoId &remote_id)
virtual void disassociate_subscription (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &participantId, const OpenDDS::DCPS::RepoId &local_id, const OpenDDS::DCPS::RepoId &remote_id)
virtual void disassociate_publication (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &participantId, const OpenDDS::DCPS::RepoId &local_id, const OpenDDS::DCPS::RepoId &remote_id)
virtual void ignore_domain_participant (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &myParticipantId, const OpenDDS::DCPS::RepoId &ignoreId)
virtual void ignore_topic (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &myParticipantId, const OpenDDS::DCPS::RepoId &ignoreId)
virtual void ignore_subscription (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &myParticipantId, const OpenDDS::DCPS::RepoId &ignoreId)
virtual void ignore_publication (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &myParticipantId, const OpenDDS::DCPS::RepoId &ignoreId)
virtual CORBA::Boolean update_publication_qos (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &partId, const OpenDDS::DCPS::RepoId &dwId, const DDS::DataWriterQos &qos, const DDS::PublisherQos &publisherQos)
void update_publication_qos (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &partId, const OpenDDS::DCPS::RepoId &dwId, const DDS::DataWriterQos &qos)
 Entry for federation updates of DataWriterQos values.
void update_publication_qos (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &partId, const OpenDDS::DCPS::RepoId &dwId, const DDS::PublisherQos &qos)
 Entry for federation updates of PublisherQos values.
virtual CORBA::Boolean update_subscription_qos (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &partId, const OpenDDS::DCPS::RepoId &drId, const DDS::DataReaderQos &qos, const DDS::SubscriberQos &subscriberQos)
void update_subscription_qos (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &partId, const OpenDDS::DCPS::RepoId &drId, const DDS::DataReaderQos &qos)
 Entry for federation updates of DataReaderQos values.
void update_subscription_qos (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &partId, const OpenDDS::DCPS::RepoId &drId, const DDS::SubscriberQos &qos)
 Entry for federation updates of SubscriberQos values.
virtual ::CORBA::Boolean update_subscription_params (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &participantId, const OpenDDS::DCPS::RepoId &subscriptionId, const DDS::StringSeq &params)
virtual CORBA::Boolean update_topic_qos (const OpenDDS::DCPS::RepoId &topicId, DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &participantId, const DDS::TopicQos &qos)
virtual CORBA::Boolean update_domain_participant_qos (DDS::DomainId_t domain, const OpenDDS::DCPS::RepoId &participantId, const DDS::DomainParticipantQos &qos)
virtual void shutdown ()
 Cause the entire repository to exit.
virtual char * dump_to_string ()
 Dump the Repos state to string.
bool changeOwnership (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &participantId, long sender, long owner)
 assert new ownership for a participant and its contained entities.
int init_transport (int listen_address_given, const char *listen_str)
bool receive_image (const Update::UImage &image)
void add (Update::Updater *updater)
 Add an additional Updater interface.
DCPS_IR_Domaindomain (DDS::DomainId_t domain)
 Convert a domain Id into a reference to a DCPS_IR_Domain object.
const DCPS_IR_Domain_Mapdomains () const
 Expose a readable reference of the domain map.
CORBA::ORB_ptr orb ()
 Expose the ORB.
bool init_persistence ()
bool init_reassociation (const ACE_Time_Value &delay)
bool init_dispatchChecking (const ACE_Time_Value &delay)
void finalize ()
 Cleanup state for shutdown.

Private Attributes

DCPS_IR_Domain_Map domains_
CORBA::ORB_var orb_
CORBA::ORB_var dispatchingOrb_
const TAO_DDS_DCPSFederationIdfederation_
RepoIdGenerator participantIdGenerator_
Update::Managerum_
bool reincarnate_
ShutdownInterfaceshutdown_
 Interface to effect shutdown of the process.
ACE_Recursive_Thread_Mutex lock_
long reassociate_timer_id_
long dispatch_check_timer_id_

Detailed Description

Implementation of the DCPSInfo.

This is the Information Repository object. Clients of the system will use the CORBA reference of this object.

Definition at line 52 of file DCPSInfo_i.h.


Constructor & Destructor Documentation

TAO_DDS_DCPSInfo_i::TAO_DDS_DCPSInfo_i ( CORBA::ORB_ptr  orb,
bool  reincarnate,
ShutdownInterface shutdown,
const TAO_DDS_DCPSFederationId federation 
)

Definition at line 34 of file DCPSInfo_i.cpp.

References dispatchingOrb_.

00038   : orb_(CORBA::ORB::_duplicate(orb))
00039   , federation_(federation)
00040   , participantIdGenerator_(federation.id())
00041   , um_(0)
00042   , reincarnate_(reincarnate)
00043   , shutdown_(shutdown)
00044   , reassociate_timer_id_(-1)
00045   , dispatch_check_timer_id_(-1)
00046 {
00047   int argc = 0;
00048   char** no_argv = 0;
00049   dispatchingOrb_ = CORBA::ORB_init(argc, no_argv, "dispatchingOnly");
00050 }

TAO_DDS_DCPSInfo_i::~TAO_DDS_DCPSInfo_i (  )  [virtual]

Definition at line 53 of file DCPSInfo_i.cpp.

00054 {
00055 }


Member Function Documentation

void TAO_DDS_DCPSInfo_i::add ( Update::Updater updater  ) 

Add an additional Updater interface.

Definition at line 2349 of file DCPSInfo_i.cpp.

References Update::Manager::add(), and um_.

02350 {
02351   if (this->um_) {
02352     this->um_->add(updater);
02353   }
02354 }

bool TAO_DDS_DCPSInfo_i::add_domain_participant ( DDS::DomainId_t  domainId,
const OpenDDS::DCPS::RepoId participantId,
const DDS::DomainParticipantQos qos 
)

Add a previously existing participant to the repository.

Parameters:
domainId the Domain in which the Participant is contained.
participantId the GUID Id value to use for the Participant.
qos the QoS value of the Participant.
Adds a Participant to the repository using a specified Participant GUID Id value. If the ParticipantId indicates that this Participant was created by within this repository (the federation Id is the current repositories federation Id), this method will ensure that any subsequent calls to add a Publication and obtain a newly generated Id value will return an Id value greater than the Id value of the current one.

Definition at line 1071 of file DCPSInfo_i.cpp.

References DCPS_IR_Domain::add_participant(), OpenDDS::DCPS::DCPS_debug_level, domain(), federation_, OpenDDS::DCPS::RepoIdConverter::federationId(), TAO_DDS_DCPSFederationId::id(), DCPS_IR_Domain::last_participant_key(), DCPS_IR_Domain::participant(), OpenDDS::DCPS::RepoIdConverter::participantId(), and um_.

01074 {
01075   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, false);
01076 
01077   // Grab the domain.
01078   DCPS_IR_Domain* domainPtr = this->domain(domainId);
01079 
01080   if (0 == domainPtr) {
01081     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
01082       ACE_DEBUG((LM_WARNING,
01083                  ACE_TEXT("(%P|%t) WARNING: (bool)TAO_DDS_DCPSInfo_i::add_domain_participant: ")
01084                  ACE_TEXT("invalid domain Id: %d\n"),
01085                  domainId));
01086     }
01087 
01088     return false;
01089   }
01090 
01091   // Prepare to manipulate the participant's Id value.
01092   OpenDDS::DCPS::RepoIdConverter converter(participantId);
01093 
01094   // Grab the participant.
01095   DCPS_IR_Participant* partPtr
01096   = domainPtr->participant(participantId);
01097 
01098   if (0 != partPtr) {
01099     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
01100       ACE_ERROR((LM_ERROR,
01101                  ACE_TEXT("(%P|%t) (bool)TAO_DDS_DCPSInfo_i::add_domain_participant: ")
01102                  ACE_TEXT("participant id %C already exists.\n"),
01103                  std::string(converter).c_str()));
01104     }
01105 
01106     return false;
01107   }
01108 
01109   DCPS_IR_Participant* participant;
01110   ACE_NEW_RETURN(participant,
01111                  DCPS_IR_Participant(this->federation_,
01112                                      participantId,
01113                                      domainPtr,
01114                                      qos, um_), 0);
01115 
01116   switch (domainPtr->add_participant(participant)) {
01117   case -1: {
01118     ACE_ERROR((LM_ERROR,
01119                ACE_TEXT("(%P|%t) ERROR: (bool)TAO_DDS_DCPSInfo_i::add_domain_participant: ")
01120                ACE_TEXT("failed to load participant %C in domain %d.\n"),
01121                std::string(converter).c_str(),
01122                domainId));
01123   }
01124   delete participant;
01125   return false;
01126 
01127   case 1:
01128 
01129     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
01130       ACE_DEBUG((LM_WARNING,
01131                  ACE_TEXT("(%P|%t) WARNING: (bool)TAO_DDS_DCPSInfo_i::add_domain_participant: ")
01132                  ACE_TEXT("attempt to load duplicate participant %C in domain %d.\n"),
01133                  std::string(converter).c_str(),
01134                  domainId));
01135     }
01136 
01137     delete participant;
01138     return false;
01139 
01140   case 0:
01141   default:
01142     break;
01143   }
01144 
01145   // See if we are adding a participant that was created within this
01146   // repository or a different repository.
01147   if (converter.federationId() == this->federation_.id()) {
01148     // Ensure the participant GUID values do not conflict.
01149     domainPtr->last_participant_key(converter.participantId());
01150 
01151     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
01152       ACE_DEBUG((LM_DEBUG,
01153                  ACE_TEXT("(%P|%t) (bool)TAO_DDS_DCPSInfo_i::add_domain_participant: ")
01154                  ACE_TEXT("Adjusting highest participant Id value to at least %d.\n"),
01155                  converter.participantId()));
01156     }
01157   }
01158 
01159   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
01160     ACE_DEBUG((LM_DEBUG,
01161                ACE_TEXT("(%P|%t) (bool)TAO_DDS_DCPSInfo_i::add_domain_participant: ")
01162                ACE_TEXT("loaded participant %C at 0x%x in domain %d.\n"),
01163                std::string(converter).c_str(),
01164                participant,
01165                domainId));
01166   }
01167 
01168   return true;
01169 }

OpenDDS::DCPS::AddDomainStatus TAO_DDS_DCPSInfo_i::add_domain_participant ( DDS::DomainId_t  domain,
const DDS::DomainParticipantQos qos 
) [virtual]

Definition at line 979 of file DCPSInfo_i.cpp.

References DCPS_IR_Domain::add_participant(), Update::Manager::create(), OpenDDS::DCPS::DCPS_debug_level, domain(), OpenDDS::DCPS::AddDomainStatus::federated, federation_, DCPS_IR_Domain::get_next_participant_id(), OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::DCPS::AddDomainStatus::id, DCPS_IR_Participant::isBitPublisher(), TAO_DDS_DCPSFederationId::overridden(), DCPS_IR_Participant::owner(), DCPS_IR_Domain::participants(), DCPS_IR_Participant::takeOwnership(), TheServiceParticipant, and um_.

Referenced by Update::Manager::add(), and OpenDDS::Federator::ManagerImpl::processCreate().

00982 {
00983   // A value to return.
00984   OpenDDS::DCPS::AddDomainStatus value;
00985   value.id        = OpenDDS::DCPS::GUID_UNKNOWN;
00986   value.federated = this->federation_.overridden();
00987 
00988   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, value);
00989 
00990   // Grab the domain.
00991   DCPS_IR_Domain* domainPtr = this->domain(domain);
00992 
00993   if (0 == domainPtr) {
00994     throw OpenDDS::DCPS::Invalid_Domain();
00995   }
00996 
00997   // Obtain a shiny new GUID value.
00998   OpenDDS::DCPS::RepoId participantId = domainPtr->get_next_participant_id();
00999 
01000   DCPS_IR_Participant* participant;
01001   ACE_NEW_RETURN(participant,
01002                  DCPS_IR_Participant(
01003                    this->federation_,
01004                    participantId,
01005                    domainPtr,
01006                    qos, um_),
01007                  value);
01008 
01009   // We created the participant, now we can return the Id value (eventually).
01010   value.id = participantId;
01011 
01012   // Determine if this is the 'special' repository internal participant
01013   // that publishes the built-in topics for a domain.
01014   if (domainPtr->participants().empty() && TheServiceParticipant->get_BIT()) {
01015     participant->isBitPublisher() = true;
01016 
01017     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
01018       OpenDDS::DCPS::RepoIdConverter converter(participantId);
01019       ACE_DEBUG((LM_DEBUG,
01020                  ACE_TEXT("(%P|%t) (RepoId)TAO_DDS_DCPSInfo_i::add_domain_participant: ")
01021                  ACE_TEXT("participant %C in domain %d is BIT publisher for this domain.\n"),
01022                  std::string(converter).c_str(),
01023                  domain));
01024     }
01025   }
01026 
01027   // Assume responsibilty for writing back to the participant.
01028   participant->takeOwnership();
01029 
01030   int status = domainPtr->add_participant(participant);
01031 
01032   if (0 != status) {
01033     // Adding the participant failed return the invalid
01034     // pariticipant Id number.
01035     participantId = OpenDDS::DCPS::GUID_UNKNOWN;
01036     delete participant;
01037     participant = 0;
01038 
01039   } else if (this->um_ && (participant->isBitPublisher() == false)) {
01040     // Push this participant to interested observers.
01041     Update::UParticipant updateParticipant(
01042       domain,
01043       participant->owner(),
01044       participantId,
01045       const_cast<DDS::DomainParticipantQos &>(qos));
01046     this->um_->create(updateParticipant);
01047 
01048     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
01049       OpenDDS::DCPS::RepoIdConverter converter(participantId);
01050       ACE_DEBUG((LM_DEBUG,
01051                  ACE_TEXT("(%P|%t) (RepoId)TAO_DDS_DCPSInfo_i::add_domain_participant: ")
01052                  ACE_TEXT("pushing creation of participant %C in domain %d.\n"),
01053                  std::string(converter).c_str(),
01054                  domain));
01055     }
01056   }
01057 
01058   if (OpenDDS::DCPS::DCPS_debug_level > 4) {
01059     OpenDDS::DCPS::RepoIdConverter converter(participantId);
01060     ACE_DEBUG((LM_DEBUG,
01061                ACE_TEXT("(%P|%t) (RepoId)TAO_DDS_DCPSInfo_i::add_domain_participant: ")
01062                ACE_TEXT("domain %d loaded participant %C at 0x%x.\n"),
01063                domain,
01064                std::string(converter).c_str(),
01065                participant));
01066   }
01067   return value;
01068 }

bool TAO_DDS_DCPSInfo_i::add_publication ( DDS::DomainId_t  domainId,
const OpenDDS::DCPS::RepoId participantId,
const OpenDDS::DCPS::RepoId topicId,
const OpenDDS::DCPS::RepoId pubId,
const char *  pub_str,
const DDS::DataWriterQos qos,
const OpenDDS::DCPS::TransportLocatorSeq transInfo,
const DDS::PublisherQos publisherQos,
bool  associate = false 
)

Add a previously existing publication to the repository.

Parameters:
domainId the Domain in which the Publication is contained.
participantId the Participant in which the Publication is contained.
topicId the Topic of the Publication.
pubId the GUID Id value to use for the Publication.
pub_str stringified publication callback to DataWriter.
qos the QoS value of the DataWriter.
transInfo the transport information for the Publication.
publisherQos the QoS value of the Publisher.
associate indicate whether to create new associations.
Adds a Publication to the repository using a specified Publication GUID Id value. If the PublicationId indicates that this Publication was created by within this repository (the federation Id is the current repositories federation Id), this method will ensure that any subsequent calls to add a Publication and obtain a newly generated Id value will return an Id value greater than the Id value of the current one.

: Check if this is already stored. If so, just clear the callback IOR.

Definition at line 473 of file DCPSInfo_i.cpp.

References DCPS_IR_Participant::add_publication(), DCPS_IR_Topic::add_publication_reference(), OpenDDS::DCPS::DCPS_debug_level, dispatchingOrb_, domains_, OpenDDS::DCPS::GuidConverter::entityKey(), federation_, OpenDDS::DCPS::RepoIdConverter::federationId(), TAO_DDS_DCPSFederationId::id(), DCPS_IR_Participant::last_publication_key(), and DCPS_IR_Participant::remove_publication().

00482 {
00483   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, false);
00484 
00485   // Grab the domain.
00486   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
00487 
00488   if (where == this->domains_.end()) {
00489     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00490       ACE_DEBUG((LM_WARNING,
00491                  ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_publication: ")
00492                  ACE_TEXT("invalid domain %d.\n"),
00493                  domainId));
00494     }
00495 
00496     return false;
00497   }
00498 
00499   // Grab the participant.
00500   DCPS_IR_Participant* partPtr
00501   = where->second->participant(participantId);
00502 
00503   if (0 == partPtr) {
00504     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00505       OpenDDS::DCPS::RepoIdConverter converter(pubId);
00506       ACE_DEBUG((LM_WARNING,
00507                  ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_publication: ")
00508                  ACE_TEXT("invalid participant %C in domain %d.\n"),
00509                  std::string(converter).c_str(),
00510                  domainId));
00511     }
00512 
00513     return false;
00514   }
00515 
00516   DCPS_IR_Topic* topic = where->second->find_topic(topicId);
00517 
00518   if (topic == 0) {
00519     OpenDDS::DCPS::RepoIdConverter converter(topicId);
00520     ACE_DEBUG((LM_WARNING,
00521                ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_publication: ")
00522                ACE_TEXT("invalid topic %C in domain %d.\n"),
00523                std::string(converter).c_str(),
00524                domainId));
00525     return false;
00526   }
00527 
00528   /// @TODO: Check if this is already stored.  If so, just clear the callback IOR.
00529 
00530   CORBA::Object_var obj = dispatchingOrb_->string_to_object(pub_str);
00531   if (CORBA::is_nil(obj.in())) {
00532     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00533       ACE_DEBUG((LM_WARNING,
00534                  ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_publication: ")
00535                  ACE_TEXT("failure marshalling publication %s on dispatching orb.\n"),
00536                  pub_str));
00537     }
00538     return false;
00539   }
00540   OpenDDS::DCPS::DataWriterRemote_var publication
00541   = OpenDDS::DCPS::DataWriterRemote::_unchecked_narrow(obj.in());
00542 
00543   DCPS_IR_Publication* pubPtr;
00544   ACE_NEW_RETURN(pubPtr,
00545                  DCPS_IR_Publication(
00546                    pubId,
00547                    partPtr,
00548                    topic,
00549                    publication.in(),
00550                    qos,
00551                    transInfo,
00552                    publisherQos),
00553                  0);
00554 
00555   switch (partPtr->add_publication(pubPtr)) {
00556   case -1: {
00557     OpenDDS::DCPS::RepoIdConverter converter(pubId);
00558     ACE_ERROR((LM_ERROR,
00559                ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::add_publication: ")
00560                ACE_TEXT("failed to add publication to participant %C.\n"),
00561                std::string(converter).c_str()));
00562   }
00563   // Deliberate fall through to next case.
00564 
00565   case 1:
00566     delete pubPtr;
00567     return false;
00568 
00569   case 0:
00570   default:
00571     break;
00572   }
00573 
00574   switch (topic->add_publication_reference(pubPtr, associate)) {
00575   case -1: {
00576     OpenDDS::DCPS::RepoIdConverter converter(pubId);
00577     ACE_ERROR((LM_ERROR,
00578                ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::add_publication: ")
00579                ACE_TEXT("failed to add publication to participant %C topic list.\n"),
00580                std::string(converter).c_str()));
00581 
00582     // Remove the publication.
00583     partPtr->remove_publication(pubId);
00584 
00585   }
00586   return false;
00587 
00588   case 1: // This is actually a really really bad place to jump to.
00589     // This means that we successfully added the new publication
00590     // to the participant (it had not been inserted before) but
00591     // that we are adding a duplicate publication to the topic
00592     // list - which should never ever be able to happen.
00593     return false;
00594 
00595   case 0:
00596   default:
00597     break;
00598   }
00599 
00600   OpenDDS::DCPS::RepoIdConverter converter(pubId);
00601 
00602   // See if we are adding a publication that was created within this
00603   // repository or a different repository.
00604   if (converter.federationId() == federation_.id()) {
00605     // Ensure the publication RepoId values do not conflict.
00606     partPtr->last_publication_key(converter.entityKey());
00607   }
00608 
00609   return true;
00610 }

OpenDDS::DCPS::RepoId TAO_DDS_DCPSInfo_i::add_publication ( DDS::DomainId_t  domainId,
const OpenDDS::DCPS::RepoId participantId,
const OpenDDS::DCPS::RepoId topicId,
OpenDDS::DCPS::DataWriterRemote_ptr  publication,
const DDS::DataWriterQos qos,
const OpenDDS::DCPS::TransportLocatorSeq transInfo,
const DDS::PublisherQos publisherQos 
) [virtual]

Definition at line 364 of file DCPSInfo_i.cpp.

References DCPS_IR_Participant::add_publication(), DCPS_IR_Topic::add_publication_reference(), Update::Manager::create(), Update::DataWriter, OpenDDS::DCPS::DCPS_debug_level, dispatchingOrb_, domains_, DCPS_IR_Participant::get_next_publication_id(), OpenDDS::DCPS::GUID_UNKNOWN, DCPS_IR_Participant::isBitPublisher(), orb_, DCPS_IR_Participant::remove_publication(), and um_.

Referenced by Update::Manager::add().

00372 {
00373   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, OpenDDS::DCPS::GUID_UNKNOWN);
00374 
00375   // Grab the domain.
00376   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
00377 
00378   if (where == this->domains_.end()) {
00379     throw OpenDDS::DCPS::Invalid_Domain();
00380   }
00381 
00382   // Grab the participant.
00383   DCPS_IR_Participant* partPtr
00384   = where->second->participant(participantId);
00385 
00386   if (0 == partPtr) {
00387     throw OpenDDS::DCPS::Invalid_Participant();
00388   }
00389 
00390   DCPS_IR_Topic* topic = where->second->find_topic(topicId);
00391 
00392   if (topic == 0) {
00393     throw OpenDDS::DCPS::Invalid_Topic();
00394   }
00395 
00396   OpenDDS::DCPS::RepoId pubId = partPtr->get_next_publication_id();
00397 
00398   // Remarshall the remote reference onto the dispatching orb.
00399   OpenDDS::DCPS::DataWriterRemote_var marshalledPub(OpenDDS::DCPS::DataWriterRemote::_duplicate(publication));
00400   if (CORBA::is_nil(marshalledPub.in())) {
00401     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00402       ACE_DEBUG((LM_WARNING,
00403                  ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_publication: ")
00404                  ACE_TEXT("invalid publication reference.\n")));
00405     }
00406     return OpenDDS::DCPS::GUID_UNKNOWN;
00407   }
00408   CORBA::String_var pubStr = orb_->object_to_string(marshalledPub.in());
00409   CORBA::Object_var pubObj = dispatchingOrb_->string_to_object(pubStr.in());
00410   if (CORBA::is_nil(pubObj.in()))  {
00411     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00412       ACE_DEBUG((LM_WARNING,
00413                  ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_publication: ")
00414                  ACE_TEXT("failure marshalling publication on dispatching orb.\n")));
00415     }
00416     return OpenDDS::DCPS::GUID_UNKNOWN;
00417   }
00418   OpenDDS::DCPS::DataWriterRemote_var dispatchingPublication
00419   = OpenDDS::DCPS::DataWriterRemote::_unchecked_narrow(pubObj.in());
00420 
00421   DCPS_IR_Publication* pubPtr;
00422   ACE_NEW_RETURN(pubPtr,
00423                  DCPS_IR_Publication(
00424                    pubId,
00425                    partPtr,
00426                    topic,
00427                    dispatchingPublication.in(),
00428                    qos,
00429                    transInfo,
00430                    publisherQos),
00431                  OpenDDS::DCPS::GUID_UNKNOWN);
00432 
00433   if (partPtr->add_publication(pubPtr) != 0) {
00434     // failed to add.  we are responsible for the memory.
00435     pubId = OpenDDS::DCPS::GUID_UNKNOWN;
00436     delete pubPtr;
00437     pubPtr = 0;
00438 
00439   } else if (topic->add_publication_reference(pubPtr) != 0) {
00440     // Failed to add to the topic
00441     // so remove from participant and fail.
00442     partPtr->remove_publication(pubId);
00443     pubId = OpenDDS::DCPS::GUID_UNKNOWN;
00444   }
00445 
00446   if (this->um_ && (partPtr->isBitPublisher() == false)) {
00447     CORBA::String_var callback = orb_->object_to_string(publication);
00448     Update::ContentSubscriptionInfo csi;
00449 
00450     Update::UWActor actor(domainId, pubId, topicId, participantId, Update::DataWriter
00451                           , callback.in()
00452                           , const_cast<DDS::PublisherQos &>(publisherQos)
00453                           , const_cast<DDS::DataWriterQos &>(qos)
00454                           , const_cast<OpenDDS::DCPS::TransportLocatorSeq &>
00455                           (transInfo), csi);
00456     this->um_->create(actor);
00457 
00458     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00459       OpenDDS::DCPS::RepoIdConverter converter(pubId);
00460       ACE_DEBUG((LM_DEBUG,
00461                  ACE_TEXT("(%P|%t) (RepoId)TAO_DDS_DCPSInfo_i::add_publication: ")
00462                  ACE_TEXT("pushing creation of publication %C in domain %d.\n"),
00463                  std::string(converter).c_str(),
00464                  domainId));
00465     }
00466   }
00467 
00468   where->second->remove_dead_participants();
00469   return pubId;
00470 }

bool TAO_DDS_DCPSInfo_i::add_subscription ( DDS::DomainId_t  domainId,
const OpenDDS::DCPS::RepoId participantId,
const OpenDDS::DCPS::RepoId topicId,
const OpenDDS::DCPS::RepoId subId,
const char *  sub_str,
const DDS::DataReaderQos qos,
const OpenDDS::DCPS::TransportLocatorSeq transInfo,
const DDS::SubscriberQos subscriberQos,
const char *  filterClassName,
const char *  filterExpression,
const DDS::StringSeq exprParams,
bool  associate = false 
)

Add a previously existing subscription to the repository.

Parameters:
domainId the Domain in which the Subscription is contained.
participantId the Participant in which the Subscription is contained.
topicId the Topic of the Subscription.
subId the GUID Id value to use for the Subscription.
sub_str stringified publication callback to DataReader.
qos the QoS value of the DataReader.
transInfo the transport information for the Subscription.
subscriberQos the QoS value of the Subscriber.
associate indicate whether to create new associations.
Adds a Subscription to the repository using a specified Subscription GUID Id value. If the SubscriptionId indicates that this Subscription was created by within this repository (the federation Id is the current repositories federation Id), this method will ensure that any subsequent calls to add a Publication and obtain a newly generated Id value will return an Id value greater than the Id value of the current one.

Definition at line 786 of file DCPSInfo_i.cpp.

References DCPS_IR_Participant::add_subscription(), DCPS_IR_Topic::add_subscription_reference(), OpenDDS::DCPS::DCPS_debug_level, dispatchingOrb_, domains_, OpenDDS::DCPS::GuidConverter::entityKey(), federation_, OpenDDS::DCPS::RepoIdConverter::federationId(), TAO_DDS_DCPSFederationId::id(), DCPS_IR_Participant::last_subscription_key(), and DCPS_IR_Participant::remove_subscription().

00799 {
00800   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, false);
00801 
00802   // Grab the domain.
00803   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
00804 
00805   if (where == this->domains_.end()) {
00806     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00807       ACE_DEBUG((LM_WARNING,
00808                  ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_subscription: ")
00809                  ACE_TEXT("invalid domain %d.\n"),
00810                  domainId));
00811     }
00812 
00813     return false;
00814   }
00815 
00816   // Grab the participant.
00817   DCPS_IR_Participant* partPtr
00818   = where->second->participant(participantId);
00819 
00820   if (0 == partPtr) {
00821     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00822       OpenDDS::DCPS::RepoIdConverter converter(participantId);
00823       ACE_DEBUG((LM_WARNING,
00824                  ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_subscription: ")
00825                  ACE_TEXT("invalid participant %C in domain %d.\n"),
00826                  std::string(converter).c_str(),
00827                  domainId));
00828     }
00829 
00830     return false;
00831   }
00832 
00833   DCPS_IR_Topic* topic = where->second->find_topic(topicId);
00834 
00835   if (topic == 0) {
00836     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00837       OpenDDS::DCPS::RepoIdConverter converter(topicId);
00838       ACE_DEBUG((LM_WARNING,
00839                  ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_subscription: ")
00840                  ACE_TEXT("invalid topic %C in domain %d.\n"),
00841                  std::string(converter).c_str(),
00842                  domainId));
00843     }
00844 
00845     return false;
00846   }
00847 
00848   CORBA::Object_var obj = dispatchingOrb_->string_to_object(sub_str);
00849   if (CORBA::is_nil(obj.in())) {
00850     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00851       ACE_DEBUG((LM_WARNING,
00852                  ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_subscription: ")
00853                  ACE_TEXT("failure marshalling subscription %s on dispatching orb.\n"),
00854                  sub_str));
00855     }
00856     return false;
00857   }
00858   OpenDDS::DCPS::DataReaderRemote_var subscription
00859   = OpenDDS::DCPS::DataReaderRemote::_unchecked_narrow(obj.in());
00860 
00861   DCPS_IR_Subscription* subPtr;
00862   ACE_NEW_RETURN(subPtr,
00863                  DCPS_IR_Subscription(
00864                    subId,
00865                    partPtr,
00866                    topic,
00867                    subscription.in(),
00868                    qos,
00869                    transInfo,
00870                    subscriberQos,
00871                    filterClassName,
00872                    filterExpression,
00873                    exprParams),
00874                  0);
00875 
00876   switch (partPtr->add_subscription(subPtr)) {
00877   case -1: {
00878     OpenDDS::DCPS::RepoIdConverter converter(subId);
00879     ACE_ERROR((LM_ERROR,
00880                ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::add_subscription: ")
00881                ACE_TEXT("failed to add subscription to participant %C.\n"),
00882                std::string(converter).c_str()));
00883   }
00884   // Deliberate fall through to next case.
00885 
00886   case 1:
00887     delete subPtr;
00888     return false;
00889 
00890   case 0:
00891   default:
00892     break;
00893   }
00894 
00895   switch (topic->add_subscription_reference(subPtr, associate)) {
00896   case -1: {
00897     OpenDDS::DCPS::RepoIdConverter converter(subId);
00898     ACE_ERROR((LM_ERROR,
00899                ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::add_subscription: ")
00900                ACE_TEXT("failed to add subscription to participant %C topic list.\n"),
00901                std::string(converter).c_str()));
00902 
00903     // Remove the subscription.
00904     partPtr->remove_subscription(subId);
00905 
00906   }
00907   return false;
00908 
00909   case 1: // This is actually a really really bad place to jump to.
00910     // This means that we successfully added the new subscription
00911     // to the participant (it had not been inserted before) but
00912     // that we are adding a duplicate subscription to the topic
00913     // list - which should never ever be able to happen.
00914     return false;
00915 
00916   case 0:
00917   default:
00918     break;
00919   }
00920 
00921   OpenDDS::DCPS::RepoIdConverter converter(subId);
00922 
00923   // See if we are adding a subscription that was created within this
00924   // repository or a different repository.
00925   if (converter.federationId() == federation_.id()) {
00926     // Ensure the subscription RepoId values do not conflict.
00927     partPtr->last_subscription_key(converter.entityKey());
00928   }
00929 
00930   return true;
00931 }

OpenDDS::DCPS::RepoId TAO_DDS_DCPSInfo_i::add_subscription ( DDS::DomainId_t  domainId,
const OpenDDS::DCPS::RepoId participantId,
const OpenDDS::DCPS::RepoId topicId,
OpenDDS::DCPS::DataReaderRemote_ptr  subscription,
const DDS::DataReaderQos qos,
const OpenDDS::DCPS::TransportLocatorSeq transInfo,
const DDS::SubscriberQos subscriberQos,
const char *  filterClassName,
const char *  filterExpression,
const DDS::StringSeq exprParams 
) [virtual]

Definition at line 660 of file DCPSInfo_i.cpp.

References DCPS_IR_Participant::add_subscription(), DCPS_IR_Topic::add_subscription_reference(), Update::Manager::create(), Update::DataReader, OpenDDS::DCPS::DCPS_debug_level, dispatchingOrb_, domains_, DCPS_IR_Participant::get_next_subscription_id(), OpenDDS::DCPS::GUID_UNKNOWN, DCPS_IR_Participant::isBitPublisher(), orb_, DCPS_IR_Domain::participant(), DCPS_IR_Domain::remove_dead_participants(), DCPS_IR_Participant::remove_subscription(), and um_.

Referenced by Update::Manager::add().

00671 {
00672   DCPS_IR_Domain* domainPtr;
00673   DCPS_IR_Participant* partPtr;
00674   DCPS_IR_Subscription* subPtr;
00675   DCPS_IR_Topic* topic;
00676   OpenDDS::DCPS::RepoId subId;
00677   {
00678     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, OpenDDS::DCPS::GUID_UNKNOWN);
00679 
00680     // Grab the domain.
00681     DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
00682 
00683     if (where == this->domains_.end()) {
00684       throw OpenDDS::DCPS::Invalid_Domain();
00685     }
00686 
00687     // Grab the domain and participant.
00688     domainPtr = where->second;
00689     partPtr = domainPtr->participant(participantId);
00690 
00691     if (0 == partPtr) {
00692       throw OpenDDS::DCPS::Invalid_Participant();
00693     }
00694 
00695     topic = where->second->find_topic(topicId);
00696 
00697     if (topic == 0) {
00698       throw OpenDDS::DCPS::Invalid_Topic();
00699     }
00700 
00701     subId = partPtr->get_next_subscription_id();
00702 
00703     // Remarshall the remote reference onto the dispatching orb.
00704     OpenDDS::DCPS::DataReaderRemote_var marshalledSub (
00705       OpenDDS::DCPS::DataReaderRemote::_duplicate(subscription));
00706     if (CORBA::is_nil(marshalledSub.in())) {
00707       if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00708         ACE_DEBUG((LM_WARNING,
00709                    ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_subscription: ")
00710                    ACE_TEXT("invalid subscription reference.\n")));
00711       }
00712       return OpenDDS::DCPS::GUID_UNKNOWN;
00713     }
00714 
00715     CORBA::String_var subStr = orb_->object_to_string(marshalledSub.in());
00716     CORBA::Object_var subObj = dispatchingOrb_->string_to_object(subStr.in());
00717     if (CORBA::is_nil(subObj.in())) {
00718       if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00719         ACE_DEBUG((LM_WARNING,
00720                    ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_subscription: ")
00721                    ACE_TEXT("failure marshalling subscription on dispatching orb.\n")));
00722       }
00723       return OpenDDS::DCPS::GUID_UNKNOWN;
00724     }
00725     OpenDDS::DCPS::DataReaderRemote_var dispatchingSubscription
00726     = OpenDDS::DCPS::DataReaderRemote::_unchecked_narrow(subObj.in());
00727 
00728     ACE_NEW_RETURN(subPtr,
00729                    DCPS_IR_Subscription(
00730                      subId,
00731                      partPtr,
00732                      topic,
00733                      dispatchingSubscription.in(),
00734                      qos,
00735                      transInfo,
00736                      subscriberQos,
00737                      filterClassName,
00738                      filterExpression,
00739                      exprParams),
00740                    OpenDDS::DCPS::GUID_UNKNOWN);
00741 
00742     // Release lock
00743   }
00744   if (partPtr->add_subscription(subPtr) != 0) {
00745     // failed to add.  we are responsible for the memory.
00746     subId = OpenDDS::DCPS::GUID_UNKNOWN;
00747     delete subPtr;
00748     subPtr = 0;
00749 
00750   } else if (topic->add_subscription_reference(subPtr) != 0) {
00751     ACE_ERROR((LM_ERROR, ACE_TEXT("Failed to add subscription to topic list.\n")));
00752     // No associations were made so remove and fail.
00753     partPtr->remove_subscription(subId);
00754     subId = OpenDDS::DCPS::GUID_UNKNOWN;
00755   }
00756 
00757   if (this->um_ && (partPtr->isBitPublisher() == false)) {
00758     CORBA::String_var callback = orb_->object_to_string(subscription);
00759     Update::ContentSubscriptionInfo csi(filterClassName, filterExpression, exprParams);
00760 
00761     Update::URActor actor(domainId, subId, topicId, participantId, Update::DataReader
00762                           , callback.in()
00763                           , const_cast<DDS::SubscriberQos &>(subscriberQos)
00764                           , const_cast<DDS::DataReaderQos &>(qos)
00765                           , const_cast<OpenDDS::DCPS::TransportLocatorSeq &>
00766                           (transInfo), csi);
00767 
00768     this->um_->create(actor);
00769 
00770     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00771       OpenDDS::DCPS::RepoIdConverter converter(subId);
00772       ACE_DEBUG((LM_DEBUG,
00773                  ACE_TEXT("(%P|%t) (RepoId)TAO_DDS_DCPSInfo_i::add_subscription: ")
00774                  ACE_TEXT("pushing creation of subscription %C in domain %d.\n"),
00775                  std::string(converter).c_str(),
00776                  domainId));
00777     }
00778   }
00779 
00780   domainPtr->remove_dead_participants();
00781 
00782   return subId;
00783 }

bool TAO_DDS_DCPSInfo_i::add_topic ( const OpenDDS::DCPS::RepoId topicId,
DDS::DomainId_t  domainId,
const OpenDDS::DCPS::RepoId participantId,
const char *  topicName,
const char *  dataTypeName,
const DDS::TopicQos qos 
)

Add a previously existing topic to the repository.

Parameters:
topicId the Topic Entity GUID Id to use.
domainId the Domain in which the Topic is contained.
participantId the Participant in which the Topic is contained.
topicName the name of the Topic.
dataTypeName the name of the data type.
qos the QoS value to use for the Topic.
Adds a Topic Entity to the repository using a specified TopicId value. If the TopicId indicates that this Topic was created by within this repository (the federation Id is the current repositories federation Id), this method will ensure that any subsequent calls to add a Topic and obtain a newly generated Id value will return an Id value greater than the Id value of the current one.

Definition at line 222 of file DCPSInfo_i.cpp.

References OpenDDS::DCPS::CREATED, OpenDDS::DCPS::DCPS_debug_level, domains_, OpenDDS::DCPS::GuidConverter::entityKey(), federation_, OpenDDS::DCPS::RepoIdConverter::federationId(), TAO_DDS_DCPSFederationId::id(), and DCPS_IR_Participant::last_topic_key().

Referenced by Update::Manager::add().

00228 {
00229   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, false);
00230 
00231   // Grab the domain.
00232   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
00233 
00234   if (where == this->domains_.end()) {
00235     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00236       ACE_DEBUG((LM_WARNING,
00237                  ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_topic: ")
00238                  ACE_TEXT("invalid domain %d.\n"),
00239                  domainId));
00240     }
00241 
00242     return false;
00243   }
00244 
00245   // Grab the participant.
00246   DCPS_IR_Participant* participantPtr
00247   = where->second->participant(participantId);
00248 
00249   if (0 == participantPtr) {
00250     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00251       OpenDDS::DCPS::RepoIdConverter converter(participantId);
00252       ACE_DEBUG((LM_WARNING,
00253                  ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_topic: ")
00254                  ACE_TEXT("invalid participant %C.\n"),
00255                  std::string(converter).c_str()));
00256     }
00257 
00258     return false;
00259   }
00260 
00261   OpenDDS::DCPS::TopicStatus topicStatus
00262   = where->second->force_add_topic(topicId, topicName, dataTypeName,
00263                                    qos, participantPtr);
00264 
00265   if (topicStatus != OpenDDS::DCPS::CREATED) {
00266     return false;
00267   }
00268 
00269   OpenDDS::DCPS::RepoIdConverter converter(topicId);
00270 
00271   // See if we are adding a topic that was created within this
00272   // repository or a different repository.
00273   if (converter.federationId() == federation_.id()) {
00274     // Ensure the topic RepoId values do not conflict.
00275     participantPtr->last_topic_key(converter.entityKey());
00276   }
00277 
00278   return true;
00279 }

OpenDDS::DCPS::TopicStatus TAO_DDS_DCPSInfo_i::assert_topic ( OpenDDS::DCPS::RepoId_out  topicId,
DDS::DomainId_t  domainId,
const OpenDDS::DCPS::RepoId participantId,
const char *  topicName,
const char *  dataTypeName,
const DDS::TopicQos qos,
bool  hasDcpsKey 
) [virtual]

Definition at line 170 of file DCPSInfo_i.cpp.

References Update::Manager::create(), OpenDDS::DCPS::DCPS_debug_level, domains_, OpenDDS::DCPS::INTERNAL_ERROR, DCPS_IR_Participant::isBitPublisher(), and um_.

00178 {
00179   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, OpenDDS::DCPS::INTERNAL_ERROR);
00180   // Grab the domain.
00181   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
00182 
00183   if (where == this->domains_.end()) {
00184     throw OpenDDS::DCPS::Invalid_Domain();
00185   }
00186 
00187   // Grab the participant.
00188   DCPS_IR_Participant* participantPtr
00189   = where->second->participant(participantId);
00190 
00191   if (0 == participantPtr) {
00192     throw OpenDDS::DCPS::Invalid_Participant();
00193   }
00194 
00195   OpenDDS::DCPS::TopicStatus topicStatus
00196   = where->second->add_topic(
00197       topicId,
00198       topicName,
00199       dataTypeName,
00200       qos,
00201       participantPtr);
00202 
00203   if (this->um_ && (participantPtr->isBitPublisher() == false)) {
00204     Update::UTopic topic(domainId, topicId, participantId
00205                          , topicName, dataTypeName
00206                          , const_cast<DDS::TopicQos &>(qos));
00207     this->um_->create(topic);
00208 
00209     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00210       OpenDDS::DCPS::RepoIdConverter converter(topicId);
00211       ACE_DEBUG((LM_DEBUG,
00212                  ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::assert_topic: ")
00213                  ACE_TEXT("pushing creation of topic %C in domain %d.\n"),
00214                  std::string(converter).c_str(),
00215                  domainId));
00216     }
00217   }
00218   return topicStatus;
00219 }

void TAO_DDS_DCPSInfo_i::association_complete ( DDS::DomainId_t  domainId,
const OpenDDS::DCPS::RepoId participantId,
const OpenDDS::DCPS::RepoId localId,
const OpenDDS::DCPS::RepoId remoteId 
) [virtual]

Definition at line 1469 of file DCPSInfo_i.cpp.

References DCPS_IR_Publication::association_complete(), DCPS_IR_Subscription::association_complete(), OpenDDS::DCPS::DCPS_debug_level, domains_, DCPS_IR_Participant::find_publication_reference(), and DCPS_IR_Participant::find_subscription_reference().

01473 {
01474   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
01475 
01476   DCPS_IR_Domain_Map::iterator dom_iter = this->domains_.find(domainId);
01477   if (dom_iter == this->domains_.end()) {
01478     return;
01479   }
01480 
01481   DCPS_IR_Participant* partPtr = dom_iter->second->participant(participantId);
01482   if (0 == partPtr) {
01483     return;
01484   }
01485 
01486   // localId could be pub or sub (initial implementation will only use sub
01487   // since the DataReader is the passive peer)
01488   DCPS_IR_Subscription* sub = 0;
01489   DCPS_IR_Publication* pub = 0;
01490   if (OpenDDS::DCPS::DCPS_debug_level > 3) {
01491     ACE_DEBUG((LM_INFO, "(%P|%t) completing association\n"));
01492   }
01493   if (0 == partPtr->find_subscription_reference(localId, sub)) {
01494     sub->association_complete(remoteId);
01495   } else if (0 == partPtr->find_publication_reference(localId, pub)) {
01496     pub->association_complete(remoteId);
01497   } else {
01498     if (OpenDDS::DCPS::DCPS_debug_level > 3) {
01499       OpenDDS::DCPS::RepoIdConverter part_converter(participantId);
01500       OpenDDS::DCPS::RepoIdConverter local_converter(localId);
01501       OpenDDS::DCPS::RepoIdConverter remote_converter(remoteId);
01502       ACE_DEBUG((LM_WARNING,
01503                  ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i::association_complete: ")
01504                  ACE_TEXT("participant %C could not find subscription or publication %C ")
01505                  ACE_TEXT("to complete association with remote %C.\n"),
01506                  std::string(part_converter).c_str(),
01507                  std::string(local_converter).c_str(),
01508                  std::string(remote_converter).c_str()));
01509     }
01510   }
01511 }

CORBA::Boolean TAO_DDS_DCPSInfo_i::attach_participant ( DDS::DomainId_t  domainId,
const OpenDDS::DCPS::RepoId participantId 
) [virtual]

Definition at line 114 of file DCPSInfo_i.cpp.

References domains_, and DCPS_IR_Participant::takeOwnership().

00117 {
00118   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, 0);
00119 
00120   // Grab the domain.
00121   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
00122 
00123   if (where == this->domains_.end()) {
00124     throw OpenDDS::DCPS::Invalid_Domain();
00125   }
00126 
00127   // Grab the participant.
00128   DCPS_IR_Participant* participant
00129   = where->second->participant(participantId);
00130 
00131   if (0 == participant) {
00132     throw OpenDDS::DCPS::Invalid_Participant();
00133   }
00134 
00135   // Establish ownership within the local repository.
00136   participant->takeOwnership();
00137 
00138   return false;
00139 }

bool TAO_DDS_DCPSInfo_i::changeOwnership ( DDS::DomainId_t  domainId,
const OpenDDS::DCPS::RepoId participantId,
long  sender,
long  owner 
)

assert new ownership for a participant and its contained entities.

Parameters:
domainId the domain in which the participant resides.
participantId the participant to be owned.
sender the repository sending the update data.
owner the repository which is to make callbacks for entities within the participant.
Returns:
boolean indicating that ownership has been assigned.
This establishes owner as the new owner of the participant. Ownership consists of calling back to the reader and writer remote interfaces when associations are established and removed from a publication or subscription. Owner may be the special value of OWNER_NONE to indicate that the previous owner is no longer available to make callbacks and the application has not indicated which repository is to replace it in this capacity.

The sender of the update is included so that the participant can check that transitions to OWNER_NONE are only honored when initiated by the current owner of the participant.

A return value of false indicates that the ownership was specified for a domain or participant which could not be found.

Definition at line 142 of file DCPSInfo_i.cpp.

References DCPS_IR_Participant::changeOwner(), and domains_.

Referenced by OpenDDS::Federator::ManagerImpl::processCreate().

00147 {
00148   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, false);
00149 
00150   // Grab the domain.
00151   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
00152 
00153   if (where == this->domains_.end()) {
00154     return false;
00155   }
00156 
00157   // Grab the participant.
00158   DCPS_IR_Participant* participant
00159   = where->second->participant(participantId);
00160 
00161   if (0 == participant) {
00162     return false;
00163   }
00164 
00165   // Establish the ownership.
00166   participant->changeOwner(sender, owner);
00167   return true;
00168 }

void TAO_DDS_DCPSInfo_i::disassociate_participant ( DDS::DomainId_t  domainId,
const OpenDDS::DCPS::RepoId local_id,
const OpenDDS::DCPS::RepoId remote_id 
) [virtual]

Definition at line 1295 of file DCPSInfo_i.cpp.

References DCPS_IR_Participant::publications(), and DCPS_IR_Participant::subscriptions().

01299 {
01300   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
01301 
01302   DCPS_IR_Domain_Map::iterator it(this->domains_.find(domainId));
01303   if (it == this->domains_.end()) {
01304     throw OpenDDS::DCPS::Invalid_Domain();
01305   }
01306 
01307   DCPS_IR_Participant* participant = it->second->participant(local_id);
01308   if (participant == 0) {
01309     throw OpenDDS::DCPS::Invalid_Participant();
01310   }
01311 
01312   // Disassociate from participant temporarily:
01313   const DCPS_IR_Subscription_Map& subscriptions = participant->subscriptions();
01314   for (DCPS_IR_Subscription_Map::const_iterator sub(subscriptions.begin());
01315        sub != subscriptions.end(); ++sub) {
01316     sub->second->disassociate_participant(remote_id, true);
01317   }
01318 
01319   const DCPS_IR_Publication_Map& publications = participant->publications();
01320   for (DCPS_IR_Publication_Map::const_iterator pub(publications.begin());
01321        pub != publications.end(); ++pub) {
01322     pub->second->disassociate_participant(remote_id, true);
01323   }
01324 
01325   it->second->remove_dead_participants();
01326 }

void TAO_DDS_DCPSInfo_i::disassociate_publication ( DDS::DomainId_t  domainId,
const OpenDDS::DCPS::RepoId participantId,
const OpenDDS::DCPS::RepoId local_id,
const OpenDDS::DCPS::RepoId remote_id 
) [virtual]

Definition at line 1371 of file DCPSInfo_i.cpp.

References OpenDDS::DCPS::DCPS_debug_level, DCPS_IR_Publication::disassociate_subscription(), and DCPS_IR_Participant::find_publication_reference().

01376 {
01377   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
01378 
01379   DCPS_IR_Domain_Map::iterator it(this->domains_.find(domainId));
01380   if (it == this->domains_.end()) {
01381     throw OpenDDS::DCPS::Invalid_Domain();
01382   }
01383 
01384   DCPS_IR_Participant* participant = it->second->participant(participantId);
01385   if (participant == 0) {
01386     throw OpenDDS::DCPS::Invalid_Participant();
01387   }
01388 
01389   if (OpenDDS::DCPS::DCPS_debug_level > 3) {
01390     ACE_DEBUG((LM_INFO, "(%P|%t) disassociating publication\n"));
01391   }
01392 
01393   DCPS_IR_Publication* publication;
01394   if (participant->find_publication_reference(local_id, publication)
01395       != 0 || publication == 0) {
01396     OpenDDS::DCPS::RepoIdConverter part_converter(participantId);
01397     OpenDDS::DCPS::RepoIdConverter pub_converter(local_id);
01398     ACE_ERROR((LM_ERROR,
01399                ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::disassociate_publication: ")
01400                ACE_TEXT("participant %C could not find publication %C.\n"),
01401                std::string(part_converter).c_str(),
01402                std::string(pub_converter).c_str()));
01403     throw OpenDDS::DCPS::Invalid_Publication();
01404   }
01405 
01406   // Disassociate from subscription temporarily:
01407   publication->disassociate_subscription(remote_id, true);
01408 
01409   it->second->remove_dead_participants();
01410 }

void TAO_DDS_DCPSInfo_i::disassociate_subscription ( DDS::DomainId_t  domainId,
const OpenDDS::DCPS::RepoId participantId,
const OpenDDS::DCPS::RepoId local_id,
const OpenDDS::DCPS::RepoId remote_id 
) [virtual]

Definition at line 1329 of file DCPSInfo_i.cpp.

References OpenDDS::DCPS::DCPS_debug_level, DCPS_IR_Subscription::disassociate_publication(), and DCPS_IR_Participant::find_subscription_reference().

01334 {
01335   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
01336 
01337   DCPS_IR_Domain_Map::iterator it(this->domains_.find(domainId));
01338   if (it == this->domains_.end()) {
01339     throw OpenDDS::DCPS::Invalid_Domain();
01340   }
01341 
01342   DCPS_IR_Participant* participant = it->second->participant(participantId);
01343   if (participant == 0) {
01344     throw OpenDDS::DCPS::Invalid_Participant();
01345   }
01346 
01347   if (OpenDDS::DCPS::DCPS_debug_level > 3) {
01348     ACE_DEBUG((LM_INFO, "(%P|%t) disassociating subscription\n"));
01349   }
01350 
01351   DCPS_IR_Subscription* subscription;
01352   if (participant->find_subscription_reference(local_id, subscription)
01353       != 0 || subscription == 0) {
01354     OpenDDS::DCPS::RepoIdConverter part_converter(participantId);
01355     OpenDDS::DCPS::RepoIdConverter sub_converter(local_id);
01356     ACE_ERROR((LM_ERROR,
01357                ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::disassociate_subscription: ")
01358                ACE_TEXT("participant %C could not find subscription %C.\n"),
01359                std::string(part_converter).c_str(),
01360                std::string(sub_converter).c_str()));
01361     throw OpenDDS::DCPS::Invalid_Subscription();
01362   }
01363 
01364   // Disassociate from publication temporarily:
01365   subscription->disassociate_publication(remote_id, true);
01366 
01367   it->second->remove_dead_participants();
01368 }

DCPS_IR_Domain * TAO_DDS_DCPSInfo_i::domain ( DDS::DomainId_t  domain  ) 

Convert a domain Id into a reference to a DCPS_IR_Domain object.

Definition at line 2090 of file DCPSInfo_i.cpp.

References OpenDDS::DCPS::Service_Participant::ANY_DOMAIN, OpenDDS::DCPS::DCPS_debug_level, domains_, and TheServiceParticipant.

Referenced by add_domain_participant(), and update_subscription_params().

02091 {
02092   if (domain == OpenDDS::DCPS::Service_Participant::ANY_DOMAIN) {
02093     ACE_ERROR((LM_ERROR,
02094                ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::domain: ")
02095                ACE_TEXT("ANY_DOMAIN not supported for operations.\n")));
02096     return 0;
02097   }
02098 
02099   // Check if the domain is already in the map.
02100   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domain);
02101 
02102   if (where == this->domains_.end()) {
02103     // We will attempt to insert a new domain, go ahead and allocate it.
02104     DCPS_IR_Domain* domainPtr;
02105     ACE_NEW_RETURN(domainPtr,
02106                    DCPS_IR_Domain(domain, this->participantIdGenerator_),
02107                    0);
02108 
02109     // We need to insert the domain into the map at this time since it
02110     // might be looked up during the init_built_in_topics() call.
02111     this->domains_.insert(
02112       where,
02113       DCPS_IR_Domain_Map::value_type(domain, domainPtr));
02114 
02115     int bit_status = 0;
02116 
02117     if (TheServiceParticipant->get_BIT()) {
02118 #if !defined (DDS_HAS_MINIMUM_BIT)
02119       bit_status = domainPtr->init_built_in_topics(this->federation_.overridden());
02120 #endif // !defined (DDS_HAS_MINIMUM_BIT)
02121     }
02122 
02123     if (0 != bit_status) {
02124       ACE_ERROR((LM_ERROR,
02125                  ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::domain: ")
02126                  ACE_TEXT("failed to initialize the Built-In Topics ")
02127                  ACE_TEXT("when loading domain %d.\n"),
02128                  domain));
02129       this->domains_.erase(domain);
02130       delete domainPtr;
02131       return 0;
02132     }
02133 
02134     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
02135       ACE_DEBUG((LM_DEBUG,
02136                  ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::domain: ")
02137                  ACE_TEXT("successfully loaded domain %d at %x.\n"),
02138                  domain,
02139                  domainPtr));
02140     }
02141     return domainPtr;
02142 
02143   } else {
02144     return where->second;
02145   }
02146 }

const DCPS_IR_Domain_Map & TAO_DDS_DCPSInfo_i::domains (  )  const

Expose a readable reference of the domain map.

Definition at line 2419 of file DCPSInfo_i.cpp.

References domains_.

Referenced by OpenDDS::Federator::ManagerImpl::pushState().

02420 {
02421   return this->domains_;
02422 }

char * TAO_DDS_DCPSInfo_i::dump_to_string (  )  [virtual]

Dump the Repos state to string.

Definition at line 2426 of file DCPSInfo_i.cpp.

References domains_, and CORBA::string_dup().

02427 {
02428   std::string dump;
02429 #if !defined (OPENDDS_INFOREPO_REDUCED_FOOTPRINT)
02430   std::string indent ("    ");
02431 
02432   for (DCPS_IR_Domain_Map::const_iterator dm = domains_.begin();
02433        dm != domains_.end();
02434        dm++)
02435   {
02436     dump += dm->second->dump_to_string(indent, 0);
02437   }
02438 #endif // !defined (OPENDDS_INFOREPO_REDUCED_FOOTPRINT)
02439   return CORBA::string_dup(dump.c_str());
02440 
02441 }

void TAO_DDS_DCPSInfo_i::finalize (  ) 

Cleanup state for shutdown.

Definition at line 2401 of file DCPSInfo_i.cpp.

References dispatch_check_timer_id_, orb_, and reassociate_timer_id_.

02402 {
02403   if (reassociate_timer_id_ != -1) {
02404     ACE_Reactor* reactor = this->orb_->orb_core()->reactor();
02405 
02406     reactor->cancel_timer(this->reassociate_timer_id_);
02407     this->reassociate_timer_id_ = -1;
02408   }
02409 
02410   if (dispatch_check_timer_id_ != -1) {
02411     ACE_Reactor* reactor = this->orb_->orb_core()->reactor();
02412 
02413     reactor->cancel_timer(this->dispatch_check_timer_id_);
02414     this->dispatch_check_timer_id_ = -1;
02415   }
02416 }

OpenDDS::DCPS::TopicStatus TAO_DDS_DCPSInfo_i::find_topic ( DDS::DomainId_t  domainId,
const char *  topicName,
CORBA::String_out  dataTypeName,
DDS::TopicQos_out  qos,
OpenDDS::DCPS::RepoId_out  topicId 
) [virtual]

Definition at line 281 of file DCPSInfo_i.cpp.

References domains_, OpenDDS::DCPS::FOUND, DCPS_IR_Topic_Description::get_dataTypeName(), DCPS_IR_Topic::get_id(), DCPS_IR_Topic::get_topic_description(), DCPS_IR_Topic::get_topic_qos(), OpenDDS::DCPS::INTERNAL_ERROR, and OpenDDS::DCPS::NOT_FOUND.

00287 {
00288   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, OpenDDS::DCPS::INTERNAL_ERROR);
00289 
00290   // Grab the domain.
00291   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
00292 
00293   if (where == this->domains_.end()) {
00294     throw OpenDDS::DCPS::Invalid_Domain();
00295   }
00296 
00297   OpenDDS::DCPS::TopicStatus status = OpenDDS::DCPS::NOT_FOUND;
00298 
00299   DCPS_IR_Topic* topic = 0;
00300   qos = new DDS::TopicQos;
00301 
00302   status = where->second->find_topic(topicName, topic);
00303 
00304   if (0 != topic) {
00305     status = OpenDDS::DCPS::FOUND;
00306     const DCPS_IR_Topic_Description* desc = topic->get_topic_description();
00307     dataTypeName = desc->get_dataTypeName();
00308     *qos = *(topic->get_topic_qos());
00309     topicId = topic->get_id();
00310   }
00311 
00312   return status;
00313 }

int TAO_DDS_DCPSInfo_i::handle_timeout ( const ACE_Time_Value &  now,
const void *  arg 
) [virtual]

Definition at line 58 of file DCPSInfo_i.cpp.

References dispatchingOrb_, and domains_.

00060 {
00061   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, 0);
00062 
00063   if (arg == this) {
00064     if ( !CORBA::is_nil(this->dispatchingOrb_.in())){
00065       if (this->dispatchingOrb_->work_pending())
00066       {
00067         // Ten microseconds
00068         ACE_Time_Value small(0,10);
00069         this->dispatchingOrb_->perform_work(small);
00070       }
00071     }
00072   }
00073   else {
00074   // NOTE: This is a purposefully naive approach to addressing defunct
00075   // associations.  In the future, it may be worthwhile to introduce a
00076   // callback model to fix the heinous runtime cost below:
00077   for (DCPS_IR_Domain_Map::const_iterator dom(this->domains_.begin());
00078        dom != this->domains_.end(); ++dom) {
00079 
00080     const DCPS_IR_Participant_Map& participants(dom->second->participants());
00081     for (DCPS_IR_Participant_Map::const_iterator part(participants.begin());
00082          part != participants.end(); ++part) {
00083 
00084       const DCPS_IR_Subscription_Map& subscriptions(part->second->subscriptions());
00085       for (DCPS_IR_Subscription_Map::const_iterator sub(subscriptions.begin());
00086            sub != subscriptions.end(); ++sub) {
00087         sub->second->reevaluate_defunct_associations();
00088       }
00089 
00090       const DCPS_IR_Publication_Map& publications(part->second->publications());
00091       for (DCPS_IR_Publication_Map::const_iterator pub(publications.begin());
00092            pub != publications.end(); ++pub) {
00093         pub->second->reevaluate_defunct_associations();
00094       }
00095     }
00096   }
00097   }
00098 
00099   return 0;
00100 }

void TAO_DDS_DCPSInfo_i::ignore_domain_participant ( DDS::DomainId_t  domainId,
const OpenDDS::DCPS::RepoId myParticipantId,
const OpenDDS::DCPS::RepoId ignoreId 
) [virtual]

Definition at line 1513 of file DCPSInfo_i.cpp.

References domains_, and DCPS_IR_Participant::ignore_participant().

01517 {
01518   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
01519 
01520   // Grab the domain.
01521   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
01522 
01523   if (where == this->domains_.end()) {
01524     throw OpenDDS::DCPS::Invalid_Domain();
01525   }
01526 
01527   // Grab the participant.
01528   DCPS_IR_Participant* partPtr
01529   = where->second->participant(myParticipantId);
01530 
01531   if (0 == partPtr) {
01532     throw OpenDDS::DCPS::Invalid_Participant();
01533   }
01534 
01535   partPtr->ignore_participant(ignoreId);
01536 
01537   where->second->remove_dead_participants();
01538 }

void TAO_DDS_DCPSInfo_i::ignore_publication ( DDS::DomainId_t  domainId,
const OpenDDS::DCPS::RepoId myParticipantId,
const OpenDDS::DCPS::RepoId ignoreId 
) [virtual]

Definition at line 1594 of file DCPSInfo_i.cpp.

References domains_, and DCPS_IR_Participant::ignore_publication().

01598 {
01599   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
01600 
01601   // Grab the domain.
01602   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
01603 
01604   if (where == this->domains_.end()) {
01605     throw OpenDDS::DCPS::Invalid_Domain();
01606   }
01607 
01608   // Grab the participant.
01609   DCPS_IR_Participant* partPtr
01610   = where->second->participant(myParticipantId);
01611 
01612   if (0 == partPtr) {
01613     throw OpenDDS::DCPS::Invalid_Participant();
01614   }
01615 
01616   partPtr->ignore_publication(ignoreId);
01617 
01618   where->second->remove_dead_participants();
01619 }

void TAO_DDS_DCPSInfo_i::ignore_subscription ( DDS::DomainId_t  domainId,
const OpenDDS::DCPS::RepoId myParticipantId,
const OpenDDS::DCPS::RepoId ignoreId 
) [virtual]

Definition at line 1567 of file DCPSInfo_i.cpp.

References domains_, and DCPS_IR_Participant::ignore_subscription().

01571 {
01572   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
01573 
01574   // Grab the domain.
01575   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
01576 
01577   if (where == this->domains_.end()) {
01578     throw OpenDDS::DCPS::Invalid_Domain();
01579   }
01580 
01581   // Grab the participant.
01582   DCPS_IR_Participant* partPtr
01583   = where->second->participant(myParticipantId);
01584 
01585   if (0 == partPtr) {
01586     throw OpenDDS::DCPS::Invalid_Participant();
01587   }
01588 
01589   partPtr->ignore_subscription(ignoreId);
01590 
01591   where->second->remove_dead_participants();
01592 }

void TAO_DDS_DCPSInfo_i::ignore_topic ( DDS::DomainId_t  domainId,
const OpenDDS::DCPS::RepoId myParticipantId,
const OpenDDS::DCPS::RepoId ignoreId 
) [virtual]

Definition at line 1540 of file DCPSInfo_i.cpp.

References domains_, and DCPS_IR_Participant::ignore_topic().

01544 {
01545   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
01546 
01547   // Grab the domain.
01548   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
01549 
01550   if (where == this->domains_.end()) {
01551     throw OpenDDS::DCPS::Invalid_Domain();
01552   }
01553 
01554   // Grab the participant.
01555   DCPS_IR_Participant* partPtr
01556   = where->second->participant(myParticipantId);
01557 
01558   if (0 == partPtr) {
01559     throw OpenDDS::DCPS::Invalid_Participant();
01560   }
01561 
01562   partPtr->ignore_topic(ignoreId);
01563 
01564   where->second->remove_dead_participants();
01565 }

bool TAO_DDS_DCPSInfo_i::init_dispatchChecking ( const ACE_Time_Value &  delay  ) 

Definition at line 2390 of file DCPSInfo_i.cpp.

References dispatch_check_timer_id_, and orb_.

02391 {
02392   if (this->dispatch_check_timer_id_ != -1) return false;  // already scheduled
02393 
02394   ACE_Reactor* reactor = this->orb_->orb_core()->reactor();
02395 
02396   this->dispatch_check_timer_id_ = reactor->schedule_timer(this, this, delay, delay);
02397   return this->dispatch_check_timer_id_ != -1;
02398 }

bool TAO_DDS_DCPSInfo_i::init_persistence (  ) 

Definition at line 2357 of file DCPSInfo_i.cpp.

References Update::Manager::add(), reincarnate_, Update::Manager::requestImage(), and um_.

02358 {
02359   um_ = ACE_Dynamic_Service<UpdateManagerSvc>::instance
02360         ("UpdateManagerSvc");
02361 
02362   if (um_ != 0) {
02363     um_->add(this);
02364 
02365     // Request persistent image.
02366     if (reincarnate_) {
02367       um_->requestImage();
02368     }
02369 
02370   } else {
02371     ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("TAO_DDS_DCPSInfo_i> Failed to discover ")
02372                       ACE_TEXT("UpdateManagerSvc.\n")), false);
02373   }
02374 
02375   return true;
02376 }

bool TAO_DDS_DCPSInfo_i::init_reassociation ( const ACE_Time_Value &  delay  ) 

Definition at line 2379 of file DCPSInfo_i.cpp.

References orb_, and reassociate_timer_id_.

02380 {
02381   if (this->reassociate_timer_id_ != -1) return false;  // already scheduled
02382 
02383   ACE_Reactor* reactor = this->orb_->orb_core()->reactor();
02384 
02385   this->reassociate_timer_id_ = reactor->schedule_timer(this, 0, delay, delay);
02386   return this->reassociate_timer_id_ != -1;
02387 }

int TAO_DDS_DCPSInfo_i::init_transport ( int  listen_address_given,
const char *  listen_str 
)

Initialize the transport for the Built-In Topics Returns 0 (zero) if succeeds

Definition at line 2148 of file DCPSInfo_i.cpp.

References OpenDDS::DCPS::TransportRegistry::DEFAULT_INST_PREFIX, Util::find(), and OpenDDS::DCPS::TransportRegistry::instance().

02150 {
02151   int status = 0;
02152 
02153   try {
02154 
02155 #ifndef ACE_AS_STATIC_LIBS
02156     if (ACE_Service_Config::current()->find(ACE_TEXT("OpenDDS_Tcp"))
02157         < 0 /* not found (-1) or suspended (-2) */) {
02158       static const ACE_TCHAR directive[] =
02159         ACE_TEXT("dynamic OpenDDS_Tcp Service_Object * ")
02160         ACE_TEXT("OpenDDS_Tcp:_make_TcpLoader()");
02161       ACE_Service_Config::process_directive(directive);
02162     }
02163 #endif
02164 
02165     std::string config_name =
02166       OpenDDS::DCPS::TransportRegistry::DEFAULT_INST_PREFIX
02167       + std::string("InfoRepoBITTransportConfig");
02168     OpenDDS::DCPS::TransportConfig_rch config =
02169       OpenDDS::DCPS::TransportRegistry::instance()->create_config(config_name);
02170 
02171     std::string inst_name =
02172       OpenDDS::DCPS::TransportRegistry::DEFAULT_INST_PREFIX
02173       + std::string("InfoRepoBITTCPTransportInst");
02174     OpenDDS::DCPS::TransportInst_rch inst =
02175       OpenDDS::DCPS::TransportRegistry::instance()->create_inst(inst_name,
02176                                                                "tcp");
02177     config->instances_.push_back(inst);
02178 
02179     OpenDDS::DCPS::TcpInst_rch tcp_inst =
02180       OpenDDS::DCPS::dynamic_rchandle_cast<OpenDDS::DCPS::TcpInst>(inst);
02181     inst->datalink_release_delay_ = 0;
02182 
02183     tcp_inst->conn_retry_attempts_ = 0;
02184 
02185     if (listen_address_given) {
02186       tcp_inst->local_address(listen_str);
02187     }
02188 
02189   } catch (...) {
02190     // TransportRegistry is extremely varied in the exceptions that
02191     // it throws on failure; do not allow exceptions to bubble up
02192     // beyond this point.
02193     status = 1;
02194   }
02195   return status;
02196 }

CORBA::ORB_ptr TAO_DDS_DCPSInfo_i::orb (  ) 

Expose the ORB.

Definition at line 109 of file DCPSInfo_i.cpp.

Referenced by OpenDDS::Federator::ManagerImpl::pushState().

00110 {
00111   return CORBA::ORB::_duplicate(this->orb_.in());
00112 }

bool TAO_DDS_DCPSInfo_i::receive_image ( const Update::UImage image  ) 

Definition at line 2199 of file DCPSInfo_i.cpp.

References Update::ImageData< T, P, A, W >::actors, OpenDDS::DCPS::DCPS_debug_level, domains_, federation_, OpenDDS::DCPS::RepoIdConverter::federationId(), TAO_DDS_DCPSFederationId::id(), RepoIdGenerator::last(), OpenDDS::DCPS::RepoIdConverter::participantId(), participantIdGenerator_, Update::ImageData< T, P, A, W >::participants, Update::ImageData< T, P, A, W >::topics, and Update::ImageData< T, P, A, W >::wActors.

Referenced by Update::Manager::pushImage().

02200 {
02201   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
02202     ACE_DEBUG((LM_DEBUG,
02203                ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::receive_image: ")
02204                ACE_TEXT("processing persistent data.\n")));
02205   }
02206 
02207   // Ensure that new BIT participants do not reuse an id
02208   for (Update::UImage::ParticipantSeq::const_iterator
02209        iter = image.participants.begin();
02210        iter != image.participants.end(); iter++) {
02211     const Update::UParticipant* part = *iter;
02212     OpenDDS::DCPS::RepoIdConverter converter(part->participantId);
02213     if (converter.federationId() == this->federation_.id()) {
02214       participantIdGenerator_.last(converter.participantId());
02215     }
02216   }
02217 
02218   for (Update::UImage::ParticipantSeq::const_iterator
02219        iter = image.participants.begin();
02220        iter != image.participants.end(); iter++) {
02221     const Update::UParticipant* part = *iter;
02222 
02223     if (!this->add_domain_participant(part->domainId, part->participantId
02224                                       , part->participantQos)) {
02225       OpenDDS::DCPS::RepoIdConverter converter(part->participantId);
02226       ACE_ERROR((LM_ERROR,
02227                  ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::receive_image: ")
02228                  ACE_TEXT("failed to add participant %C to domain %d.\n"),
02229                  std::string(converter).c_str(),
02230                  part->domainId));
02231       return false;
02232 
02233     } else if (OpenDDS::DCPS::DCPS_debug_level > 0) {
02234       OpenDDS::DCPS::RepoIdConverter converter(part->participantId);
02235       ACE_DEBUG((LM_DEBUG,
02236                  ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::receive_image: ")
02237                  ACE_TEXT("added participant %C to domain %d.\n"),
02238                  std::string(converter).c_str(),
02239                  part->domainId));
02240     }
02241   }
02242 
02243   for (Update::UImage::TopicSeq::const_iterator iter = image.topics.begin();
02244        iter != image.topics.end(); iter++) {
02245     const Update::UTopic* topic = *iter;
02246 
02247     if (!this->add_topic(topic->topicId, topic->domainId
02248                          , topic->participantId, topic->name.c_str()
02249                          , topic->dataType.c_str(), topic->topicQos)) {
02250       OpenDDS::DCPS::RepoIdConverter topic_converter(topic->topicId);
02251       OpenDDS::DCPS::RepoIdConverter part_converter(topic->participantId);
02252       ACE_ERROR((LM_ERROR,
02253                  ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::receive_image: ")
02254                  ACE_TEXT("failed to add topic %C to participant %C.\n"),
02255                  std::string(topic_converter).c_str(),
02256                  std::string(part_converter).c_str()));
02257       return false;
02258 
02259     } else if (OpenDDS::DCPS::DCPS_debug_level > 0) {
02260       OpenDDS::DCPS::RepoIdConverter topic_converter(topic->topicId);
02261       OpenDDS::DCPS::RepoIdConverter part_converter(topic->participantId);
02262       ACE_DEBUG((LM_DEBUG,
02263                  ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::receive_image: ")
02264                  ACE_TEXT("added topic %C to participant %C.\n"),
02265                  std::string(topic_converter).c_str(),
02266                  std::string(part_converter).c_str()));
02267     }
02268   }
02269 
02270   for (Update::UImage::ReaderSeq::const_iterator iter = image.actors.begin();
02271        iter != image.actors.end(); iter++) {
02272     const Update::URActor* sub = *iter;
02273 
02274     // no reason to associate, there are no publishers yet to associate with
02275     if (!this->add_subscription(sub->domainId, sub->participantId
02276                                 , sub->topicId, sub->actorId
02277                                 , sub->callback.c_str(), sub->drdwQos
02278                                 , sub->transportInterfaceInfo
02279                                 , sub->pubsubQos
02280                                 , sub->contentSubscriptionProfile.filterClassName
02281                                 , sub->contentSubscriptionProfile.filterExpr
02282                                 , sub->contentSubscriptionProfile.exprParams)) {
02283       OpenDDS::DCPS::RepoIdConverter sub_converter(sub->actorId);
02284       OpenDDS::DCPS::RepoIdConverter part_converter(sub->participantId);
02285       ACE_ERROR((LM_ERROR,
02286                  ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::receive_image: ")
02287                  ACE_TEXT("failed to add subscription %C to participant %C.\n"),
02288                  std::string(sub_converter).c_str(),
02289                  std::string(part_converter).c_str()));
02290       return false;
02291 
02292     } else if (OpenDDS::DCPS::DCPS_debug_level > 0) {
02293       OpenDDS::DCPS::RepoIdConverter sub_converter(sub->actorId);
02294       OpenDDS::DCPS::RepoIdConverter part_converter(sub->participantId);
02295       ACE_DEBUG((LM_DEBUG,
02296                  ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::receive_image: ")
02297                  ACE_TEXT("added subscription %C to participant %C.\n"),
02298                  std::string(sub_converter).c_str(),
02299                  std::string(part_converter).c_str()));
02300     }
02301   }
02302 
02303   for (Update::UImage::WriterSeq::const_iterator iter = image.wActors.begin();
02304        iter != image.wActors.end(); iter++) {
02305     const Update::UWActor* pub = *iter;
02306 
02307     // try to associate with any persisted subscriptions to track any expected
02308     // existing associations
02309     if (!this->add_publication(pub->domainId, pub->participantId
02310                                , pub->topicId, pub->actorId
02311                                , pub->callback.c_str() , pub->drdwQos
02312                                , pub->transportInterfaceInfo
02313                                , pub->pubsubQos
02314                                , true)) {
02315       OpenDDS::DCPS::RepoIdConverter pub_converter(pub->actorId);
02316       OpenDDS::DCPS::RepoIdConverter part_converter(pub->participantId);
02317       ACE_ERROR((LM_ERROR,
02318                  ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::receive_image: ")
02319                  ACE_TEXT("failed to add publication %C to participant %C.\n"),
02320                  std::string(pub_converter).c_str(),
02321                  std::string(part_converter).c_str()));
02322       return false;
02323 
02324     } else if (OpenDDS::DCPS::DCPS_debug_level > 0) {
02325       OpenDDS::DCPS::RepoIdConverter pub_converter(pub->actorId);
02326       OpenDDS::DCPS::RepoIdConverter part_converter(pub->participantId);
02327       ACE_DEBUG((LM_DEBUG,
02328                  ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::receive_image: ")
02329                  ACE_TEXT("added publication %C to participant %C.\n"),
02330                  std::string(pub_converter).c_str(),
02331                  std::string(part_converter).c_str()));
02332     }
02333   }
02334 
02335 #if !defined (DDS_HAS_MINIMUM_BIT)
02336   // reassociate the bit publisher and subscribers
02337   for (DCPS_IR_Domain_Map::const_iterator currentDomain = domains_.begin();
02338        currentDomain != domains_.end();
02339        ++currentDomain) {
02340 
02341     currentDomain->second->reassociate_built_in_topic_pubs();
02342   }
02343 #endif // !defined (DDS_HAS_MINIMUM_BIT)
02344 
02345   return true;
02346 }

bool TAO_DDS_DCPSInfo_i::remove_by_owner ( DDS::DomainId_t  domain,
long  owner 
)

Definition at line 1172 of file DCPSInfo_i.cpp.

References OpenDDS::DCPS::DCPS_debug_level, domains_, and remove_domain_participant().

01175 {
01176   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, false);
01177 
01178   // Grab the domain.
01179   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domain);
01180 
01181   if (where == this->domains_.end()) {
01182     return false;
01183   }
01184 
01185   std::vector<OpenDDS::DCPS::RepoId> candidates;
01186 
01187   for (DCPS_IR_Participant_Map::const_iterator
01188        current = where->second->participants().begin();
01189        current != where->second->participants().end();
01190        ++current) {
01191     if (current->second->owner() == owner) {
01192       candidates.push_back(current->second->get_id());
01193     }
01194   }
01195 
01196   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
01197     ACE_DEBUG((LM_DEBUG,
01198                ACE_TEXT("(%P|%t) (bool)TAO_DDS_DCPSInfo_i::remove_by_owner: ")
01199                ACE_TEXT("%d participants to remove from domain %d.\n"),
01200                candidates.size(),
01201                domain));
01202   }
01203 
01204   bool status = true;
01205 
01206   for (unsigned int index = 0; index < candidates.size(); ++index) {
01207     DCPS_IR_Participant* participant
01208     = where->second->participant(candidates[index]);
01209 
01210     std::vector<OpenDDS::DCPS::RepoId> keylist;
01211 
01212     // Remove Subscriptions
01213     for (DCPS_IR_Subscription_Map::const_iterator
01214          current = participant->subscriptions().begin();
01215          current != participant->subscriptions().end();
01216          ++current) {
01217       keylist.push_back(current->second->get_id());
01218     }
01219 
01220     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
01221       OpenDDS::DCPS::RepoIdConverter converter(candidates[index]);
01222       ACE_DEBUG((LM_DEBUG,
01223                  ACE_TEXT("(%P|%t) (bool)TAO_DDS_DCPSInfo_i::remove_by_owner: ")
01224                  ACE_TEXT("%d subscriptions to remove from participant %C.\n"),
01225                  keylist.size(),
01226                  std::string(converter).c_str()));
01227     }
01228 
01229     for (unsigned int key = 0; key < keylist.size(); ++key) {
01230       if (participant->remove_subscription(keylist[ key]) != 0) {
01231         status = false;
01232       }
01233     }
01234 
01235     // Remove Publications
01236     keylist.clear();
01237 
01238     for (DCPS_IR_Publication_Map::const_iterator
01239          current = participant->publications().begin();
01240          current != participant->publications().end();
01241          ++current) {
01242       keylist.push_back(current->second->get_id());
01243     }
01244 
01245     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
01246       OpenDDS::DCPS::RepoIdConverter converter(candidates[index]);
01247       ACE_DEBUG((LM_DEBUG,
01248                  ACE_TEXT("(%P|%t) (bool)TAO_DDS_DCPSInfo_i::remove_by_owner: ")
01249                  ACE_TEXT("%d publications to remove from participant %C.\n"),
01250                  keylist.size(),
01251                  std::string(converter).c_str()));
01252     }
01253 
01254     for (unsigned int key = 0; key < keylist.size(); ++key) {
01255       if (participant->remove_publication(keylist[ key]) != 0) {
01256         status = false;
01257       }
01258     }
01259 
01260     // Remove Topics
01261     keylist.clear();
01262 
01263     for (DCPS_IR_Topic_Map::const_iterator
01264          current = participant->topics().begin();
01265          current != participant->topics().end();
01266          ++current) {
01267       keylist.push_back(current->second->get_id());
01268     }
01269 
01270     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
01271       OpenDDS::DCPS::RepoIdConverter converter(candidates[index]);
01272       ACE_DEBUG((LM_DEBUG,
01273                  ACE_TEXT("(%P|%t) (bool)TAO_DDS_DCPSInfo_i::remove_by_owner: ")
01274                  ACE_TEXT("%d topics to remove from participant %C.\n"),
01275                  keylist.size(),
01276                  std::string(converter).c_str()));
01277     }
01278 
01279     for (unsigned int key = 0; key < keylist.size(); ++key) {
01280       DCPS_IR_Topic* discard;
01281 
01282       if (participant->remove_topic_reference(keylist[ key], discard) != 0) {
01283         status = false;
01284       }
01285     }
01286 
01287     // Remove Participant
01288     this->remove_domain_participant(domain, candidates[ index]);
01289   }
01290 
01291   return status;
01292 }

void TAO_DDS_DCPSInfo_i::remove_domain_participant ( DDS::DomainId_t  domainId,
const OpenDDS::DCPS::RepoId participantId 
) [virtual]

Definition at line 1412 of file DCPSInfo_i.cpp.

References OpenDDS::DCPS::DCPS_debug_level, Update::Manager::destroy(), domains_, DCPS_IR_Participant::isBitPublisher(), DCPS_IR_Participant::isOwner(), Update::Participant, and um_.

Referenced by OpenDDS::Federator::ManagerImpl::processDelete(), and remove_by_owner().

01415 {
01416   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
01417 
01418   // Grab the domain.
01419   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
01420 
01421   if (where == this->domains_.end()) {
01422     throw OpenDDS::DCPS::Invalid_Domain();
01423   }
01424 
01425   DCPS_IR_Participant* participant = where->second->participant(participantId);
01426 
01427   if (participant == 0) {
01428     OpenDDS::DCPS::RepoIdConverter converter(participantId);
01429     ACE_ERROR((LM_ERROR,
01430                ACE_TEXT("(%P|%t) ERROR: (bool)TAO_DDS_DCPSInfo_i::remove_domain_participant: ")
01431                ACE_TEXT("failed to locate participant %C in domain %d.\n"),
01432                std::string(converter).c_str(),
01433                domainId));
01434     throw OpenDDS::DCPS::Invalid_Participant();
01435   }
01436 
01437   // Determine if we should propagate this event;  we need to cache this
01438   // result as the participant will be gone by the time we use the result.
01439   bool sendUpdate = (participant->isOwner() == true)
01440                     && (participant->isBitPublisher() == false);
01441 
01442   CORBA::Boolean dont_notify_lost = 0;
01443   int status = where->second->remove_participant(participantId, dont_notify_lost);
01444 
01445   if (0 != status) {
01446     // Removing the participant failed
01447     throw OpenDDS::DCPS::Invalid_Participant();
01448   }
01449 
01450   // Update any concerned observers that the participant was destroyed.
01451   if (this->um_ && sendUpdate) {
01452     Update::IdPath path(
01453       where->second->get_id(),
01454       participantId,
01455       participantId);
01456     this->um_->destroy(path, Update::Participant);
01457 
01458     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
01459       OpenDDS::DCPS::RepoIdConverter converter(participantId);
01460       ACE_DEBUG((LM_DEBUG,
01461                  ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::remove_domain_participant: ")
01462                  ACE_TEXT("pushing deletion of participant %C in domain %d.\n"),
01463                  std::string(converter).c_str(),
01464                  domainId));
01465     }
01466   }
01467 }

void TAO_DDS_DCPSInfo_i::remove_publication ( DDS::DomainId_t  domainId,
const OpenDDS::DCPS::RepoId participantId,
const OpenDDS::DCPS::RepoId publicationId 
) [virtual]

Definition at line 612 of file DCPSInfo_i.cpp.

References Update::Actor, Update::DataWriter, OpenDDS::DCPS::DCPS_debug_level, Update::Manager::destroy(), domains_, DCPS_IR_Participant::isBitPublisher(), DCPS_IR_Participant::isOwner(), DCPS_IR_Participant::remove_publication(), and um_.

Referenced by OpenDDS::Federator::ManagerImpl::processDelete().

00616 {
00617   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00618 
00619   // Grab the domain.
00620   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
00621 
00622   if (where == this->domains_.end()) {
00623     throw OpenDDS::DCPS::Invalid_Domain();
00624   }
00625 
00626   // Grab the participant.
00627   DCPS_IR_Participant* partPtr
00628   = where->second->participant(participantId);
00629 
00630   if (0 == partPtr) {
00631     throw OpenDDS::DCPS::Invalid_Participant();
00632   }
00633 
00634   if (partPtr->remove_publication(publicationId) != 0) {
00635     where->second->remove_dead_participants();
00636 
00637     // throw exception because the publication was not removed!
00638     throw OpenDDS::DCPS::Invalid_Publication();
00639   }
00640 
00641   where->second->remove_dead_participants();
00642 
00643   if (this->um_
00644       && (partPtr->isOwner() == true)
00645       && (partPtr->isBitPublisher() == false)) {
00646     Update::IdPath path(domainId, participantId, publicationId);
00647     this->um_->destroy(path, Update::Actor, Update::DataWriter);
00648 
00649     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00650       OpenDDS::DCPS::RepoIdConverter converter(publicationId);
00651       ACE_DEBUG((LM_DEBUG,
00652                  ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::remove_publication: ")
00653                  ACE_TEXT("pushing deletion of publication %C in domain %d.\n"),
00654                  std::string(converter).c_str(),
00655                  domainId));
00656     }
00657   }
00658 }

void TAO_DDS_DCPSInfo_i::remove_subscription ( DDS::DomainId_t  domainId,
const OpenDDS::DCPS::RepoId participantId,
const OpenDDS::DCPS::RepoId subscriptionId 
) [virtual]

Definition at line 933 of file DCPSInfo_i.cpp.

References Update::Actor, Update::DataReader, OpenDDS::DCPS::DCPS_debug_level, Update::Manager::destroy(), domains_, DCPS_IR_Participant::isBitPublisher(), DCPS_IR_Participant::isOwner(), DCPS_IR_Participant::remove_subscription(), and um_.

Referenced by OpenDDS::Federator::ManagerImpl::processDelete().

00937 {
00938   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00939 
00940   // Grab the domain.
00941   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
00942 
00943   if (where == this->domains_.end()) {
00944     throw OpenDDS::DCPS::Invalid_Domain();
00945   }
00946 
00947   // Grab the participant.
00948   DCPS_IR_Participant* partPtr
00949   = where->second->participant(participantId);
00950 
00951   if (0 == partPtr) {
00952     throw OpenDDS::DCPS::Invalid_Participant();
00953   }
00954 
00955   if (partPtr->remove_subscription(subscriptionId) != 0) {
00956     // throw exception because the subscription was not removed!
00957     throw OpenDDS::DCPS::Invalid_Subscription();
00958   }
00959 
00960   where->second->remove_dead_participants();
00961 
00962   if (this->um_
00963       && (partPtr->isOwner() == true)
00964       && (partPtr->isBitPublisher() == false)) {
00965     Update::IdPath path(domainId, participantId, subscriptionId);
00966     this->um_->destroy(path, Update::Actor, Update::DataReader);
00967 
00968     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00969       OpenDDS::DCPS::RepoIdConverter converter(subscriptionId);
00970       ACE_DEBUG((LM_DEBUG,
00971                  ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::remove_subscription: ")
00972                  ACE_TEXT("pushing deletion of subscription %C in domain %d.\n"),
00973                  std::string(converter).c_str(),
00974                  domainId));
00975     }
00976   }
00977 }

OpenDDS::DCPS::TopicStatus TAO_DDS_DCPSInfo_i::remove_topic ( DDS::DomainId_t  domainId,
const OpenDDS::DCPS::RepoId participantId,
const OpenDDS::DCPS::RepoId topicId 
) [virtual]

Definition at line 315 of file DCPSInfo_i.cpp.

References OpenDDS::DCPS::DCPS_debug_level, Update::Manager::destroy(), domains_, DCPS_IR_Participant::find_topic_reference(), OpenDDS::DCPS::INTERNAL_ERROR, DCPS_IR_Participant::isBitPublisher(), DCPS_IR_Participant::isOwner(), Update::Topic, and um_.

Referenced by OpenDDS::Federator::ManagerImpl::processDelete().

00319 {
00320   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, OpenDDS::DCPS::INTERNAL_ERROR);
00321 
00322   // Grab the domain.
00323   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
00324 
00325   if (where == this->domains_.end()) {
00326     throw OpenDDS::DCPS::Invalid_Domain();
00327   }
00328 
00329   // Grab the participant.
00330   DCPS_IR_Participant* partPtr
00331   = where->second->participant(participantId);
00332 
00333   if (0 == partPtr) {
00334     throw OpenDDS::DCPS::Invalid_Participant();
00335   }
00336 
00337   DCPS_IR_Topic* topic;
00338 
00339   if (partPtr->find_topic_reference(topicId, topic) != 0) {
00340     throw OpenDDS::DCPS::Invalid_Topic();
00341   }
00342 
00343   OpenDDS::DCPS::TopicStatus removedStatus = where->second->remove_topic(partPtr, topic);
00344 
00345   if (this->um_
00346       && (partPtr->isOwner() == true)
00347       && (partPtr->isBitPublisher() == false)) {
00348     Update::IdPath path(domainId, participantId, topicId);
00349     this->um_->destroy(path, Update::Topic);
00350 
00351     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00352       OpenDDS::DCPS::RepoIdConverter converter(topicId);
00353       ACE_DEBUG((LM_DEBUG,
00354                  ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::remove_topic: ")
00355                  ACE_TEXT("pushing deletion of topic %C in domain %d.\n"),
00356                  std::string(converter).c_str(),
00357                  domainId));
00358     }
00359   }
00360 
00361   return removedStatus;
00362 }

void TAO_DDS_DCPSInfo_i::shutdown (  )  [virtual]

Cause the entire repository to exit.

Definition at line 103 of file DCPSInfo_i.cpp.

References ShutdownInterface::shutdown(), and shutdown_.

Referenced by OpenDDS::Federator::ManagerImpl::leave_and_shutdown(), and OpenDDS::Federator::ManagerImpl::shutdown().

00104 {
00105   this->shutdown_->shutdown();
00106 }

CORBA::Boolean TAO_DDS_DCPSInfo_i::update_domain_participant_qos ( DDS::DomainId_t  domain,
const OpenDDS::DCPS::RepoId participantId,
const DDS::DomainParticipantQos qos 
) [virtual]

Definition at line 2045 of file DCPSInfo_i.cpp.

References OpenDDS::DCPS::DCPS_debug_level, domains_, DCPS_IR_Participant::isBitPublisher(), DCPS_IR_Participant::isOwner(), DCPS_IR_Participant::set_qos(), um_, and Update::Manager::update().

Referenced by OpenDDS::Federator::ManagerImpl::processUpdateQos1().

02049 {
02050   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, 0);
02051 
02052   // Grab the domain.
02053   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
02054 
02055   if (where == this->domains_.end()) {
02056     throw OpenDDS::DCPS::Invalid_Domain();
02057   }
02058 
02059   // Grab the participant.
02060   DCPS_IR_Participant* partPtr
02061   = where->second->participant(participantId);
02062 
02063   if (0 == partPtr) {
02064     throw OpenDDS::DCPS::Invalid_Participant();
02065   }
02066 
02067   if (partPtr->set_qos(qos) == false)
02068     return 0;
02069 
02070   if (this->um_
02071       && (partPtr->isOwner() == true)
02072       && (partPtr->isBitPublisher() == false)) {
02073     Update::IdPath path(domainId, participantId, participantId);
02074     this->um_->update(path, qos);
02075 
02076     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
02077       OpenDDS::DCPS::RepoIdConverter converter(participantId);
02078       ACE_DEBUG((LM_DEBUG,
02079                  ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::update_domain_participant_qos: ")
02080                  ACE_TEXT("pushing update of participant %C in domain %d.\n"),
02081                  std::string(converter).c_str(),
02082                  domainId));
02083     }
02084   }
02085 
02086   return 1;
02087 }

void TAO_DDS_DCPSInfo_i::update_publication_qos ( DDS::DomainId_t  domainId,
const OpenDDS::DCPS::RepoId partId,
const OpenDDS::DCPS::RepoId dwId,
const DDS::PublisherQos qos 
)

Entry for federation updates of PublisherQos values.

Definition at line 1742 of file DCPSInfo_i.cpp.

References OpenDDS::DCPS::DCPS_debug_level, domains_, DCPS_IR_Participant::find_publication_reference(), and DCPS_IR_Publication::set_qos().

01747 {
01748   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
01749 
01750   // Grab the domain.
01751   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
01752 
01753   if (where == this->domains_.end()) {
01754     throw OpenDDS::DCPS::Invalid_Domain();
01755   }
01756 
01757   // Grab the participant.
01758   DCPS_IR_Participant* partPtr
01759   = where->second->participant(partId);
01760 
01761   if (0 == partPtr) {
01762     throw OpenDDS::DCPS::Invalid_Participant();
01763   }
01764 
01765   if (OpenDDS::DCPS::DCPS_debug_level > 3) {
01766     ACE_DEBUG((LM_INFO, "(%P|%t) updating  publication qos 3\n"));
01767   }
01768 
01769   DCPS_IR_Publication* pub;
01770 
01771   if (partPtr->find_publication_reference(dwId, pub) != 0 || pub == 0) {
01772     OpenDDS::DCPS::RepoIdConverter part_converter(partId);
01773     OpenDDS::DCPS::RepoIdConverter pub_converter(dwId);
01774     ACE_ERROR((LM_ERROR,
01775                ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::update_publication_qos: ")
01776                ACE_TEXT("participant %C could not find publication %C.\n"),
01777                std::string(part_converter).c_str(),
01778                std::string(pub_converter).c_str()));
01779     throw OpenDDS::DCPS::Invalid_Publication();
01780   }
01781 
01782   pub->set_qos(qos);
01783 }

void TAO_DDS_DCPSInfo_i::update_publication_qos ( DDS::DomainId_t  domainId,
const OpenDDS::DCPS::RepoId partId,
const OpenDDS::DCPS::RepoId dwId,
const DDS::DataWriterQos qos 
)

Entry for federation updates of DataWriterQos values.

Definition at line 1698 of file DCPSInfo_i.cpp.

References OpenDDS::DCPS::DCPS_debug_level, domains_, DCPS_IR_Participant::find_publication_reference(), and DCPS_IR_Publication::set_qos().

01703 {
01704   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
01705 
01706   // Grab the domain.
01707   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
01708 
01709   if (where == this->domains_.end()) {
01710     throw OpenDDS::DCPS::Invalid_Domain();
01711   }
01712 
01713   // Grab the participant.
01714   DCPS_IR_Participant* partPtr
01715   = where->second->participant(partId);
01716 
01717   if (0 == partPtr) {
01718     throw OpenDDS::DCPS::Invalid_Participant();
01719   }
01720 
01721   if (OpenDDS::DCPS::DCPS_debug_level > 3) {
01722     ACE_DEBUG((LM_INFO, "(%P|%t) updating  publication qos 2\n"));
01723   }
01724 
01725   DCPS_IR_Publication* pub;
01726 
01727   if (partPtr->find_publication_reference(dwId, pub) != 0 || pub == 0) {
01728     OpenDDS::DCPS::RepoIdConverter part_converter(partId);
01729     OpenDDS::DCPS::RepoIdConverter pub_converter(dwId);
01730     ACE_ERROR((LM_ERROR,
01731                ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::update_publication_qos: ")
01732                ACE_TEXT("participant %C could not find publication %C.\n"),
01733                std::string(part_converter).c_str(),
01734                std::string(pub_converter).c_str()));
01735     throw OpenDDS::DCPS::Invalid_Publication();
01736   }
01737 
01738   pub->set_qos(qos);
01739 }

CORBA::Boolean TAO_DDS_DCPSInfo_i::update_publication_qos ( DDS::DomainId_t  domainId,
const OpenDDS::DCPS::RepoId partId,
const OpenDDS::DCPS::RepoId dwId,
const DDS::DataWriterQos qos,
const DDS::PublisherQos publisherQos 
) [virtual]

Definition at line 1621 of file DCPSInfo_i.cpp.

References Update::DataWriterQos, OpenDDS::DCPS::DCPS_debug_level, domains_, DCPS_IR_Participant::find_publication_reference(), DCPS_IR_Participant::isBitPublisher(), Update::NoQos, Update::PublisherQos, DCPS_IR_Publication::set_qos(), um_, and Update::Manager::update().

Referenced by OpenDDS::Federator::ManagerImpl::processUpdateQos1(), and OpenDDS::Federator::ManagerImpl::processUpdateQos2().

01627 {
01628   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, 0);
01629 
01630   // Grab the domain.
01631   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
01632 
01633   if (where == this->domains_.end()) {
01634     throw OpenDDS::DCPS::Invalid_Domain();
01635   }
01636 
01637   // Grab the participant.
01638   DCPS_IR_Participant* partPtr
01639   = where->second->participant(partId);
01640 
01641   if (0 == partPtr) {
01642     throw OpenDDS::DCPS::Invalid_Participant();
01643   }
01644 
01645   if (OpenDDS::DCPS::DCPS_debug_level > 3) {
01646     ACE_DEBUG((LM_INFO, "(%P|%t) updating  publication qos 1\n"));
01647   }
01648 
01649   DCPS_IR_Publication* pub;
01650 
01651   if (partPtr->find_publication_reference(dwId, pub) != 0 || pub == 0) {
01652     OpenDDS::DCPS::RepoIdConverter part_converter(partId);
01653     OpenDDS::DCPS::RepoIdConverter pub_converter(dwId);
01654     ACE_ERROR((LM_ERROR,
01655                ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::update_publication_qos: ")
01656                ACE_TEXT("participant %C could not find publication %C.\n"),
01657                std::string(part_converter).c_str(),
01658                std::string(pub_converter).c_str()));
01659     throw OpenDDS::DCPS::Invalid_Publication();
01660   }
01661 
01662   Update::SpecificQos qosType;
01663 
01664   if (pub->set_qos(qos, publisherQos, qosType) == false)  // failed
01665     return 0;
01666 
01667   if (this->um_ && (partPtr->isBitPublisher() == false)) {
01668     Update::IdPath path(domainId, partId, dwId);
01669 
01670     switch (qosType) {
01671     case Update::DataWriterQos:
01672       this->um_->update(path, qos);
01673       break;
01674 
01675     case Update::PublisherQos:
01676       this->um_->update(path, publisherQos);
01677       break;
01678 
01679     case Update::NoQos:
01680     default:
01681       break;
01682     }
01683 
01684     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
01685       OpenDDS::DCPS::RepoIdConverter converter(dwId);
01686       ACE_DEBUG((LM_DEBUG,
01687                  ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::update_publication_qos: ")
01688                  ACE_TEXT("pushing update of publication %C in domain %d.\n"),
01689                  std::string(converter).c_str(),
01690                  domainId));
01691     }
01692   }
01693 
01694   return 1;
01695 }

CORBA::Boolean TAO_DDS_DCPSInfo_i::update_subscription_params ( DDS::DomainId_t  domainId,
const OpenDDS::DCPS::RepoId participantId,
const OpenDDS::DCPS::RepoId subscriptionId,
const DDS::StringSeq params 
)

Definition at line 1950 of file DCPSInfo_i.cpp.

References OpenDDS::DCPS::DCPS_debug_level, domain(), domains_, DCPS_IR_Participant::find_subscription_reference(), DCPS_IR_Participant::isBitPublisher(), um_, Update::Manager::update(), and DCPS_IR_Subscription::update_expr_params().

Referenced by OpenDDS::Federator::ManagerImpl::processUpdateFilterExpressionParams().

01955 {
01956   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, 0);
01957 
01958   DCPS_IR_Domain_Map::iterator domain = this->domains_.find(domainId);
01959   if (domain == this->domains_.end()) {
01960     throw OpenDDS::DCPS::Invalid_Domain();
01961   }
01962 
01963   DCPS_IR_Participant* partPtr = domain->second->participant(participantId);
01964   if (0 == partPtr) {
01965     throw OpenDDS::DCPS::Invalid_Participant();
01966   }
01967 
01968   if (OpenDDS::DCPS::DCPS_debug_level > 3) {
01969     ACE_DEBUG((LM_INFO, "(%P|%t) updating subscription params\n"));
01970   }
01971 
01972   DCPS_IR_Subscription* sub;
01973   if (partPtr->find_subscription_reference(subscriptionId, sub) != 0) {
01974     OpenDDS::DCPS::RepoIdConverter part_converter(participantId);
01975     OpenDDS::DCPS::RepoIdConverter sub_converter(subscriptionId);
01976     ACE_ERROR((LM_ERROR,
01977                ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::update_subscription_params: ")
01978                ACE_TEXT("participant %C could not find subscription %C.\n"),
01979                std::string(part_converter).c_str(),
01980                std::string(sub_converter).c_str()));
01981     throw OpenDDS::DCPS::Invalid_Subscription();
01982   }
01983 
01984   sub->update_expr_params(params);  // calls writers via DataWriterRemote
01985 
01986   if (this->um_ && !partPtr->isBitPublisher()) {
01987     Update::IdPath path(domainId, participantId, subscriptionId);
01988     this->um_->update(path, params);
01989   }
01990 
01991   return true;
01992 }

void TAO_DDS_DCPSInfo_i::update_subscription_qos ( DDS::DomainId_t  domainId,
const OpenDDS::DCPS::RepoId partId,
const OpenDDS::DCPS::RepoId drId,
const DDS::SubscriberQos qos 
)

Entry for federation updates of SubscriberQos values.

Definition at line 1906 of file DCPSInfo_i.cpp.

References OpenDDS::DCPS::DCPS_debug_level, domains_, DCPS_IR_Participant::find_subscription_reference(), and DCPS_IR_Subscription::set_qos().

01911 {
01912   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
01913 
01914   // Grab the domain.
01915   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
01916 
01917   if (where == this->domains_.end()) {
01918     throw OpenDDS::DCPS::Invalid_Domain();
01919   }
01920 
01921   // Grab the participant.
01922   DCPS_IR_Participant* partPtr
01923   = where->second->participant(partId);
01924 
01925   if (0 == partPtr) {
01926     throw OpenDDS::DCPS::Invalid_Participant();
01927   }
01928 
01929   DCPS_IR_Subscription* sub;
01930 
01931   if (OpenDDS::DCPS::DCPS_debug_level > 3) {
01932     ACE_DEBUG((LM_INFO, "(%P|%t) updating QOS for subscription 3\n"));
01933   }
01934 
01935   if (partPtr->find_subscription_reference(drId, sub) != 0 || sub == 0) {
01936     OpenDDS::DCPS::RepoIdConverter part_converter(partId);
01937     OpenDDS::DCPS::RepoIdConverter sub_converter(drId);
01938     ACE_ERROR((LM_ERROR,
01939                ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::update_subscription_qos: ")
01940                ACE_TEXT("participant %C could not find subscription %C.\n"),
01941                std::string(part_converter).c_str(),
01942                std::string(sub_converter).c_str()));
01943     throw OpenDDS::DCPS::Invalid_Subscription();
01944   }
01945 
01946   sub->set_qos(qos);
01947 }

void TAO_DDS_DCPSInfo_i::update_subscription_qos ( DDS::DomainId_t  domainId,
const OpenDDS::DCPS::RepoId partId,
const OpenDDS::DCPS::RepoId drId,
const DDS::DataReaderQos qos 
)

Entry for federation updates of DataReaderQos values.

Definition at line 1862 of file DCPSInfo_i.cpp.

References OpenDDS::DCPS::DCPS_debug_level, domains_, DCPS_IR_Participant::find_subscription_reference(), and DCPS_IR_Subscription::set_qos().

01867 {
01868   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
01869 
01870   // Grab the domain.
01871   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
01872 
01873   if (where == this->domains_.end()) {
01874     throw OpenDDS::DCPS::Invalid_Domain();
01875   }
01876 
01877   // Grab the participant.
01878   DCPS_IR_Participant* partPtr
01879   = where->second->participant(partId);
01880 
01881   if (0 == partPtr) {
01882     throw OpenDDS::DCPS::Invalid_Participant();
01883   }
01884 
01885   DCPS_IR_Subscription* sub;
01886 
01887   if (OpenDDS::DCPS::DCPS_debug_level > 3) {
01888     ACE_DEBUG((LM_INFO, "(%P|%t) updating QOS for subscription 2\n"));
01889   }
01890 
01891   if (partPtr->find_subscription_reference(drId, sub) != 0 || sub == 0) {
01892     OpenDDS::DCPS::RepoIdConverter part_converter(partId);
01893     OpenDDS::DCPS::RepoIdConverter sub_converter(drId);
01894     ACE_ERROR((LM_ERROR,
01895                ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::update_subscription_qos: ")
01896                ACE_TEXT("participant %C could not find subscription %C.\n"),
01897                std::string(part_converter).c_str(),
01898                std::string(sub_converter).c_str()));
01899     throw OpenDDS::DCPS::Invalid_Subscription();
01900   }
01901 
01902   sub->set_qos(qos);
01903 }

CORBA::Boolean TAO_DDS_DCPSInfo_i::update_subscription_qos ( DDS::DomainId_t  domainId,
const OpenDDS::DCPS::RepoId partId,
const OpenDDS::DCPS::RepoId drId,
const DDS::DataReaderQos qos,
const DDS::SubscriberQos subscriberQos 
) [virtual]

Definition at line 1785 of file DCPSInfo_i.cpp.

References Update::DataReaderQos, OpenDDS::DCPS::DCPS_debug_level, domains_, DCPS_IR_Participant::find_subscription_reference(), DCPS_IR_Participant::isBitPublisher(), Update::NoQos, DCPS_IR_Subscription::set_qos(), Update::SubscriberQos, um_, and Update::Manager::update().

Referenced by OpenDDS::Federator::ManagerImpl::processUpdateQos1(), and OpenDDS::Federator::ManagerImpl::processUpdateQos2().

01791 {
01792   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, 0);
01793 
01794   // Grab the domain.
01795   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
01796 
01797   if (where == this->domains_.end()) {
01798     throw OpenDDS::DCPS::Invalid_Domain();
01799   }
01800 
01801   // Grab the participant.
01802   DCPS_IR_Participant* partPtr
01803   = where->second->participant(partId);
01804 
01805   if (0 == partPtr) {
01806     throw OpenDDS::DCPS::Invalid_Participant();
01807   }
01808 
01809   DCPS_IR_Subscription* sub;
01810 
01811   if (OpenDDS::DCPS::DCPS_debug_level > 3) {
01812     ACE_DEBUG((LM_INFO, "(%P|%t) updating QOS for subscription 1\n"));
01813   }
01814 
01815   if (partPtr->find_subscription_reference(drId, sub) != 0 || sub == 0) {
01816     OpenDDS::DCPS::RepoIdConverter part_converter(partId);
01817     OpenDDS::DCPS::RepoIdConverter sub_converter(drId);
01818     ACE_ERROR((LM_ERROR,
01819                ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::update_subscription_qos: ")
01820                ACE_TEXT("participant %C could not find subscription %C.\n"),
01821                std::string(part_converter).c_str(),
01822                std::string(sub_converter).c_str()));
01823     throw OpenDDS::DCPS::Invalid_Subscription();
01824   }
01825 
01826   Update::SpecificQos qosType;
01827 
01828   if (sub->set_qos(qos, subscriberQos, qosType) == false) // failed
01829     return 0;
01830 
01831   if (this->um_ && (partPtr->isBitPublisher() == false)) {
01832     Update::IdPath path(domainId, partId, drId);
01833 
01834     switch (qosType) {
01835     case Update::DataReaderQos:
01836       this->um_->update(path, qos);
01837       break;
01838 
01839     case Update::SubscriberQos:
01840       this->um_->update(path, subscriberQos);
01841       break;
01842 
01843     case Update::NoQos:
01844     default:
01845       break;
01846     }
01847 
01848     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
01849       OpenDDS::DCPS::RepoIdConverter converter(drId);
01850       ACE_DEBUG((LM_DEBUG,
01851                  ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::update_subscription_qos: ")
01852                  ACE_TEXT("pushing update of subscription %C in domain %d.\n"),
01853                  std::string(converter).c_str(),
01854                  domainId));
01855     }
01856   }
01857 
01858   return 1;
01859 }

CORBA::Boolean TAO_DDS_DCPSInfo_i::update_topic_qos ( const OpenDDS::DCPS::RepoId topicId,
DDS::DomainId_t  domainId,
const OpenDDS::DCPS::RepoId participantId,
const DDS::TopicQos qos 
) [virtual]

Definition at line 1994 of file DCPSInfo_i.cpp.

References OpenDDS::DCPS::DCPS_debug_level, domains_, DCPS_IR_Participant::find_topic_reference(), DCPS_IR_Participant::isBitPublisher(), DCPS_IR_Participant::isOwner(), DCPS_IR_Topic::set_topic_qos(), um_, and Update::Manager::update().

Referenced by OpenDDS::Federator::ManagerImpl::processUpdateQos1().

01999 {
02000   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, 0);
02001 
02002   // Grab the domain.
02003   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
02004 
02005   if (where == this->domains_.end()) {
02006     throw OpenDDS::DCPS::Invalid_Domain();
02007   }
02008 
02009   // Grab the participant.
02010   DCPS_IR_Participant* partPtr
02011   = where->second->participant(participantId);
02012 
02013   if (0 == partPtr) {
02014     throw OpenDDS::DCPS::Invalid_Participant();
02015   }
02016 
02017   DCPS_IR_Topic* topic;
02018 
02019   if (partPtr->find_topic_reference(topicId, topic) != 0) {
02020     throw OpenDDS::DCPS::Invalid_Topic();
02021   }
02022 
02023   if (topic->set_topic_qos(qos) == false)
02024     return 0;
02025 
02026   if (this->um_
02027       && (partPtr->isOwner() == true)
02028       && (partPtr->isBitPublisher() == false)) {
02029     Update::IdPath path(domainId, participantId, topicId);
02030     this->um_->update(path, qos);
02031 
02032     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
02033       OpenDDS::DCPS::RepoIdConverter converter(topicId);
02034       ACE_DEBUG((LM_DEBUG,
02035                  ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::update_topic_qos: ")
02036                  ACE_TEXT("pushing update of topic %C in domain %d.\n"),
02037                  std::string(converter).c_str(),
02038                  domainId));
02039     }
02040   }
02041 
02042   return 1;
02043 }


Member Data Documentation

long TAO_DDS_DCPSInfo_i::dispatch_check_timer_id_ [private]

Definition at line 428 of file DCPSInfo_i.h.

Referenced by finalize(), and init_dispatchChecking().

CORBA::ORB_var TAO_DDS_DCPSInfo_i::dispatchingOrb_ [private]

Definition at line 414 of file DCPSInfo_i.h.

Referenced by add_publication(), add_subscription(), handle_timeout(), and TAO_DDS_DCPSInfo_i().

DCPS_IR_Domain_Map TAO_DDS_DCPSInfo_i::domains_ [private]

Definition at line 412 of file DCPSInfo_i.h.

Referenced by add_publication(), add_subscription(), add_topic(), assert_topic(), association_complete(), attach_participant(), changeOwnership(), domain(), domains(), dump_to_string(), find_topic(), handle_timeout(), ignore_domain_participant(), ignore_publication(), ignore_subscription(), ignore_topic(), receive_image(), remove_by_owner(), remove_domain_participant(), remove_publication(), remove_subscription(), remove_topic(), update_domain_participant_qos(), update_publication_qos(), update_subscription_params(), update_subscription_qos(), and update_topic_qos().

const TAO_DDS_DCPSFederationId& TAO_DDS_DCPSInfo_i::federation_ [private]

Definition at line 416 of file DCPSInfo_i.h.

Referenced by add_domain_participant(), add_publication(), add_subscription(), add_topic(), and receive_image().

ACE_Recursive_Thread_Mutex TAO_DDS_DCPSInfo_i::lock_ [private]

Definition at line 425 of file DCPSInfo_i.h.

CORBA::ORB_var TAO_DDS_DCPSInfo_i::orb_ [private]

Definition at line 413 of file DCPSInfo_i.h.

Referenced by add_publication(), add_subscription(), finalize(), init_dispatchChecking(), and init_reassociation().

RepoIdGenerator TAO_DDS_DCPSInfo_i::participantIdGenerator_ [private]

Definition at line 417 of file DCPSInfo_i.h.

Referenced by receive_image().

long TAO_DDS_DCPSInfo_i::reassociate_timer_id_ [private]

Definition at line 427 of file DCPSInfo_i.h.

Referenced by finalize(), and init_reassociation().

bool TAO_DDS_DCPSInfo_i::reincarnate_ [private]

Definition at line 420 of file DCPSInfo_i.h.

Referenced by init_persistence().

ShutdownInterface* TAO_DDS_DCPSInfo_i::shutdown_ [private]

Interface to effect shutdown of the process.

Definition at line 423 of file DCPSInfo_i.h.

Referenced by shutdown().

Update::Manager* TAO_DDS_DCPSInfo_i::um_ [private]

Definition at line 419 of file DCPSInfo_i.h.

Referenced by add(), add_domain_participant(), add_publication(), add_subscription(), assert_topic(), init_persistence(), remove_domain_participant(), remove_publication(), remove_subscription(), remove_topic(), update_domain_participant_qos(), update_publication_qos(), update_subscription_params(), update_subscription_qos(), and update_topic_qos().


The documentation for this class was generated from the following files:
Generated on Fri Feb 12 20:05:58 2016 for OpenDDS by  doxygen 1.4.7