TAO_DDS_DCPSInfo_i Class Reference

Implementation of the DCPSInfo. More...

#include <DCPSInfo_i.h>

Inheritance diagram for TAO_DDS_DCPSInfo_i:
Inheritance graph
[legend]
Collaboration diagram for TAO_DDS_DCPSInfo_i:
Collaboration graph
[legend]

List of all members.

Classes

struct  BIT_Cleanup_Handler

Public Member Functions

 TAO_DDS_DCPSInfo_i (CORBA::ORB_ptr orb, bool reincarnate, ShutdownInterface *shutdown, const TAO_DDS_DCPSFederationId &federation)
virtual ~TAO_DDS_DCPSInfo_i ()
virtual int handle_timeout (const ACE_Time_Value &now, const void *arg)
virtual CORBA::Boolean attach_participant (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &participantId)
virtual OpenDDS::DCPS::TopicStatus assert_topic (OpenDDS::DCPS::RepoId_out topicId, DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &participantId, const char *topicName, const char *dataTypeName, const DDS::TopicQos &qos, bool hasDcpsKey)
bool add_topic (const OpenDDS::DCPS::RepoId &topicId, DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &participantId, const char *topicName, const char *dataTypeName, const DDS::TopicQos &qos)
 Add a previously existing topic to the repository.
virtual OpenDDS::DCPS::TopicStatus find_topic (DDS::DomainId_t domainId, const char *topicName, CORBA::String_out dataTypeName, DDS::TopicQos_out qos, OpenDDS::DCPS::RepoId_out topicId)
virtual OpenDDS::DCPS::TopicStatus remove_topic (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &participantId, const OpenDDS::DCPS::RepoId &topicId)
virtual OpenDDS::DCPS::RepoId add_publication (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &participantId, const OpenDDS::DCPS::RepoId &topicId, OpenDDS::DCPS::DataWriterRemote_ptr publication, const DDS::DataWriterQos &qos, const OpenDDS::DCPS::TransportLocatorSeq &transInfo, const DDS::PublisherQos &publisherQos)
bool add_publication (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &participantId, const OpenDDS::DCPS::RepoId &topicId, const OpenDDS::DCPS::RepoId &pubId, const char *pub_str, const DDS::DataWriterQos &qos, const OpenDDS::DCPS::TransportLocatorSeq &transInfo, const DDS::PublisherQos &publisherQos, bool associate=false)
 Add a previously existing publication to the repository.
virtual void remove_publication (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &participantId, const OpenDDS::DCPS::RepoId &publicationId)
virtual OpenDDS::DCPS::RepoId add_subscription (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &participantId, const OpenDDS::DCPS::RepoId &topicId, OpenDDS::DCPS::DataReaderRemote_ptr subscription, const DDS::DataReaderQos &qos, const OpenDDS::DCPS::TransportLocatorSeq &transInfo, const DDS::SubscriberQos &subscriberQos, const char *filterClassName, const char *filterExpression, const DDS::StringSeq &exprParams)
bool add_subscription (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &participantId, const OpenDDS::DCPS::RepoId &topicId, const OpenDDS::DCPS::RepoId &subId, const char *sub_str, const DDS::DataReaderQos &qos, const OpenDDS::DCPS::TransportLocatorSeq &transInfo, const DDS::SubscriberQos &subscriberQos, const char *filterClassName, const char *filterExpression, const DDS::StringSeq &exprParams, bool associate=false)
 Add a previously existing subscription to the repository.
virtual void remove_subscription (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &participantId, const OpenDDS::DCPS::RepoId &subscriptionId)
virtual
OpenDDS::DCPS::AddDomainStatus 
add_domain_participant (DDS::DomainId_t domain, const DDS::DomainParticipantQos &qos)
bool add_domain_participant (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &participantId, const DDS::DomainParticipantQos &qos)
 Add a previously existing participant to the repository.
virtual void remove_domain_participant (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &participantId)
virtual void association_complete (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &participantId, const OpenDDS::DCPS::RepoId &localId, const OpenDDS::DCPS::RepoId &remoteId)
bool remove_by_owner (DDS::DomainId_t domain, long owner)
virtual void disassociate_participant (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &local_id, const OpenDDS::DCPS::RepoId &remote_id)
virtual void disassociate_subscription (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &participantId, const OpenDDS::DCPS::RepoId &local_id, const OpenDDS::DCPS::RepoId &remote_id)
virtual void disassociate_publication (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &participantId, const OpenDDS::DCPS::RepoId &local_id, const OpenDDS::DCPS::RepoId &remote_id)
virtual void ignore_domain_participant (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &myParticipantId, const OpenDDS::DCPS::RepoId &ignoreId)
virtual void ignore_topic (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &myParticipantId, const OpenDDS::DCPS::RepoId &ignoreId)
virtual void ignore_subscription (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &myParticipantId, const OpenDDS::DCPS::RepoId &ignoreId)
virtual void ignore_publication (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &myParticipantId, const OpenDDS::DCPS::RepoId &ignoreId)
virtual CORBA::Boolean update_publication_qos (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &partId, const OpenDDS::DCPS::RepoId &dwId, const DDS::DataWriterQos &qos, const DDS::PublisherQos &publisherQos)
void update_publication_qos (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &partId, const OpenDDS::DCPS::RepoId &dwId, const DDS::DataWriterQos &qos)
 Entry for federation updates of DataWriterQos values.
void update_publication_qos (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &partId, const OpenDDS::DCPS::RepoId &dwId, const DDS::PublisherQos &qos)
 Entry for federation updates of PublisherQos values.
virtual CORBA::Boolean update_subscription_qos (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &partId, const OpenDDS::DCPS::RepoId &drId, const DDS::DataReaderQos &qos, const DDS::SubscriberQos &subscriberQos)
void update_subscription_qos (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &partId, const OpenDDS::DCPS::RepoId &drId, const DDS::DataReaderQos &qos)
 Entry for federation updates of DataReaderQos values.
void update_subscription_qos (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &partId, const OpenDDS::DCPS::RepoId &drId, const DDS::SubscriberQos &qos)
 Entry for federation updates of SubscriberQos values.
virtual ::CORBA::Boolean update_subscription_params (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &participantId, const OpenDDS::DCPS::RepoId &subscriptionId, const DDS::StringSeq &params)
virtual CORBA::Boolean update_topic_qos (const OpenDDS::DCPS::RepoId &topicId, DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &participantId, const DDS::TopicQos &qos)
virtual CORBA::Boolean update_domain_participant_qos (DDS::DomainId_t domain, const OpenDDS::DCPS::RepoId &participantId, const DDS::DomainParticipantQos &qos)
virtual void shutdown ()
 Cause the entire repository to exit.
virtual char * dump_to_string ()
 Dump the Repos state to string.
bool changeOwnership (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &participantId, long sender, long owner)
 assert new ownership for a participant and its contained entities.
int init_transport (int listen_address_given, const char *listen_str)
bool receive_image (const Update::UImage &image)
void add (Update::Updater *updater)
 Add an additional Updater interface.
DCPS_IR_Domaindomain (DDS::DomainId_t domain)
 Convert a domain Id into a reference to a DCPS_IR_Domain object.
const DCPS_IR_Domain_Mapdomains () const
 Expose a readable reference of the domain map.
CORBA::ORB_ptr orb ()
 Expose the ORB.
bool init_persistence ()
bool init_reassociation (const ACE_Time_Value &delay)
bool init_dispatchChecking (const ACE_Time_Value &delay)
void finalize ()
 Cleanup state for shutdown.

Private Attributes

DCPS_IR_Domain_Map domains_
CORBA::ORB_var orb_
CORBA::ORB_var dispatchingOrb_
const TAO_DDS_DCPSFederationIdfederation_
OpenDDS::DCPS::RepoIdGenerator participantIdGenerator_
Update::Managerum_
bool reincarnate_
ShutdownInterfaceshutdown_
 Interface to effect shutdown of the process.
ACE_Recursive_Thread_Mutex lock_
long reassociate_timer_id_
long dispatch_check_timer_id_

Detailed Description

Implementation of the DCPSInfo.

This is the Information Repository object. Clients of the system will use the CORBA reference of this object.

Definition at line 54 of file DCPSInfo_i.h.


Constructor & Destructor Documentation

DCPSInfo_i h OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL TAO_DDS_DCPSInfo_i::TAO_DDS_DCPSInfo_i ( CORBA::ORB_ptr  orb,
bool  reincarnate,
ShutdownInterface shutdown,
const TAO_DDS_DCPSFederationId federation 
)

Definition at line 38 of file DCPSInfo_i.cpp.

References _duplicate(), dispatchingOrb_, CORBA::ORB_init(), and TheServiceParticipant.

00042   : orb_(CORBA::ORB::_duplicate(orb))
00043   , federation_(federation)
00044   , participantIdGenerator_(federation.id())
00045   , um_(0)
00046   , reincarnate_(reincarnate)
00047   , shutdown_(shutdown)
00048   , reassociate_timer_id_(-1)
00049   , dispatch_check_timer_id_(-1)
00050 {
00051   if (!TheServiceParticipant->use_bidir_giop()) {
00052     int argc = 0;
00053     char** no_argv = 0;
00054     dispatchingOrb_ = CORBA::ORB_init(argc, no_argv, "dispatchingOnly");
00055   }
00056 }

Here is the call graph for this function:

TAO_DDS_DCPSInfo_i::~TAO_DDS_DCPSInfo_i (  )  [virtual]

Definition at line 58 of file DCPSInfo_i.cpp.

00059 {
00060 }


Member Function Documentation

void TAO_DDS_DCPSInfo_i::add ( Update::Updater updater  ) 

Add an additional Updater interface.

Definition at line 2400 of file DCPSInfo_i.cpp.

References Update::Manager::add(), and um_.

02401 {
02402   if (this->um_) {
02403     this->um_->add(updater);
02404   }
02405 }

Here is the call graph for this function:

bool TAO_DDS_DCPSInfo_i::add_domain_participant ( DDS::DomainId_t  domainId,
const OpenDDS::DCPS::RepoId participantId,
const DDS::DomainParticipantQos qos 
)

Add a previously existing participant to the repository.

Parameters:
domainId the Domain in which the Participant is contained.
participantId the GUID Id value to use for the Participant.
qos the QoS value of the Participant.

Adds a Participant to the repository using a specified Participant GUID Id value. If the ParticipantId indicates that this Participant was created by within this repository (the federation Id is the current repositories federation Id), this method will ensure that any subsequent calls to add a Publication and obtain a newly generated Id value will return an Id value greater than the Id value of the current one.

Definition at line 1079 of file DCPSInfo_i.cpp.

References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, domain(), federation_, OpenDDS::DCPS::RepoIdConverter::federationId(), OpenDDS::DCPS::RcHandle< T >::in(), LM_DEBUG, LM_ERROR, LM_WARNING, lock_, DCPS_IR_Domain::participant(), OpenDDS::DCPS::RepoIdConverter::participantId(), DCPS_IR_Domain::participants(), TheServiceParticipant, and um_.

01082 {
01083   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, false);
01084 
01085   // Grab the domain.
01086   DCPS_IR_Domain* domainPtr = this->domain(domainId);
01087 
01088   if (0 == domainPtr) {
01089     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
01090       ACE_DEBUG((LM_WARNING,
01091                  ACE_TEXT("(%P|%t) WARNING: (bool)TAO_DDS_DCPSInfo_i::add_domain_participant: ")
01092                  ACE_TEXT("invalid domain Id: %d\n"),
01093                  domainId));
01094     }
01095 
01096     return false;
01097   }
01098 
01099   // Prepare to manipulate the participant's Id value.
01100   OpenDDS::DCPS::RepoIdConverter converter(participantId);
01101 
01102   // Determine if this is the 'special' repository internal participant
01103   // that publishes the built-in topics for a domain.
01104   bool isBitPart = domainPtr->participants().empty() && TheServiceParticipant->get_BIT();
01105 
01106   // Grab the participant.
01107   DCPS_IR_Participant* partPtr = domainPtr->participant(participantId);
01108 
01109   if (0 != partPtr) {
01110     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
01111       ACE_ERROR((LM_ERROR,
01112                  ACE_TEXT("(%P|%t) (bool)TAO_DDS_DCPSInfo_i::add_domain_participant: ")
01113                  ACE_TEXT("participant id %C already exists.\n"),
01114                  std::string(converter).c_str()));
01115     }
01116 
01117     return false;
01118   }
01119 
01120   DCPS_IR_Participant_rch participant =
01121     OpenDDS::DCPS::make_rch<DCPS_IR_Participant>(this->federation_,
01122                                      participantId,
01123                                      domainPtr,
01124                                      qos, um_, isBitPart);
01125 
01126   switch (domainPtr->add_participant(participant)) {
01127   case -1: {
01128     ACE_ERROR((LM_ERROR,
01129                ACE_TEXT("(%P|%t) ERROR: (bool)TAO_DDS_DCPSInfo_i::add_domain_participant: ")
01130                ACE_TEXT("failed to load participant %C in domain %d.\n"),
01131                std::string(converter).c_str(),
01132                domainId));
01133   }
01134   return false;
01135 
01136   case 1:
01137 
01138     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
01139       ACE_DEBUG((LM_WARNING,
01140                  ACE_TEXT("(%P|%t) WARNING: (bool)TAO_DDS_DCPSInfo_i::add_domain_participant: ")
01141                  ACE_TEXT("attempt to load duplicate participant %C in domain %d.\n"),
01142                  std::string(converter).c_str(),
01143                  domainId));
01144     }
01145 
01146     return false;
01147 
01148   case 0:
01149   default:
01150     break;
01151   }
01152 
01153   // See if we are adding a participant that was created within this
01154   // repository or a different repository.
01155   if (converter.federationId() == this->federation_.id()) {
01156     // Ensure the participant GUID values do not conflict.
01157     domainPtr->last_participant_key(converter.participantId());
01158 
01159     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
01160       ACE_DEBUG((LM_DEBUG,
01161                  ACE_TEXT("(%P|%t) (bool)TAO_DDS_DCPSInfo_i::add_domain_participant: ")
01162                  ACE_TEXT("Adjusting highest participant Id value to at least %d.\n"),
01163                  converter.participantId()));
01164     }
01165   }
01166 
01167   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
01168     ACE_DEBUG((LM_DEBUG,
01169                ACE_TEXT("(%P|%t) (bool)TAO_DDS_DCPSInfo_i::add_domain_participant: ")
01170                ACE_TEXT("loaded participant %C at 0x%x in domain %d.\n"),
01171                std::string(converter).c_str(),
01172                participant.in(),
01173                domainId));
01174   }
01175 
01176   return true;
01177 }

Here is the call graph for this function:

OpenDDS::DCPS::AddDomainStatus TAO_DDS_DCPSInfo_i::add_domain_participant ( DDS::DomainId_t  domain,
const DDS::DomainParticipantQos qos 
) [virtual]

Definition at line 984 of file DCPSInfo_i.cpp.

References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, domain(), OpenDDS::DCPS::AddDomainStatus::federated, federation_, OpenDDS::DCPS::RcHandle< T >::get(), DCPS_IR_Domain::get_next_participant_id(), OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::DCPS::AddDomainStatus::id, LM_DEBUG, lock_, TAO_DDS_DCPSFederationId::overridden(), OpenDDS::DCPS::RepoIdConverter::participantId(), DCPS_IR_Domain::participants(), status, TheServiceParticipant, and um_.

Referenced by Update::Manager::add(), OpenDDS::Federator::ManagerImpl::processCreate(), and receive_image().

00987 {
00988   // A value to return.
00989   OpenDDS::DCPS::AddDomainStatus value;
00990   value.id        = OpenDDS::DCPS::GUID_UNKNOWN;
00991   value.federated = this->federation_.overridden();
00992 
00993   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, value);
00994 
00995   // Grab the domain.
00996   DCPS_IR_Domain* domainPtr = this->domain(domain);
00997 
00998   if (0 == domainPtr) {
00999     throw OpenDDS::DCPS::Invalid_Domain();
01000   }
01001 
01002   // Obtain a shiny new GUID value.
01003   OpenDDS::DCPS::RepoId participantId = domainPtr->get_next_participant_id();
01004 
01005   // Determine if this is the 'special' repository internal participant
01006   // that publishes the built-in topics for a domain.
01007   bool isBitPart = domainPtr->participants().empty() && TheServiceParticipant->get_BIT();
01008 
01009   DCPS_IR_Participant_rch participant =
01010     OpenDDS::DCPS::make_rch<DCPS_IR_Participant>(
01011                    this->federation_,
01012                    participantId,
01013                    domainPtr,
01014                    qos, um_, isBitPart);
01015 
01016   // We created the participant, now we can return the Id value (eventually).
01017   value.id = participantId;
01018 
01019   if (isBitPart) {
01020     participant->isBitPublisher() = true;
01021 
01022     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
01023       OpenDDS::DCPS::RepoIdConverter converter(participantId);
01024       ACE_DEBUG((LM_DEBUG,
01025                  ACE_TEXT("(%P|%t) (RepoId)TAO_DDS_DCPSInfo_i::add_domain_participant: ")
01026                  ACE_TEXT("participant %C in domain %d is BIT publisher for this domain.\n"),
01027                  std::string(converter).c_str(),
01028                  domain));
01029     }
01030   }
01031 
01032   // Assume responsibility for writing back to the participant.
01033   participant->takeOwnership();
01034 
01035   int status = domainPtr->add_participant(participant);
01036 
01037   if (0 != status) {
01038     // Adding the participant failed return the invalid
01039     // participant Id number.
01040     participantId = OpenDDS::DCPS::GUID_UNKNOWN;
01041 
01042   } else if (this->um_) {
01043     OpenDDS::DCPS::RepoIdConverter converter(participantId);
01044     if (participant->isBitPublisher() == false) {
01045       // Push this participant to interested observers.
01046       Update::UParticipant updateParticipant(
01047         domain,
01048         participant->owner(),
01049         participantId,
01050         const_cast<DDS::DomainParticipantQos &>(qos));
01051       this->um_->create(updateParticipant);
01052 
01053       if (OpenDDS::DCPS::DCPS_debug_level > 4) {
01054         ACE_DEBUG((LM_DEBUG,
01055                    ACE_TEXT("(%P|%t) (RepoId)TAO_DDS_DCPSInfo_i::add_domain_participant: ")
01056                    ACE_TEXT("pushing creation of participant %C in domain %d.\n"),
01057                    std::string(converter).c_str(),
01058                    domain));
01059       }
01060     }
01061 
01062     // Update what the last participant id was
01063     um_->updateLastPartId(converter.participantId());
01064   }
01065 
01066   if (OpenDDS::DCPS::DCPS_debug_level > 4) {
01067     OpenDDS::DCPS::RepoIdConverter converter(participantId);
01068     ACE_DEBUG((LM_DEBUG,
01069                ACE_TEXT("(%P|%t) (RepoId)TAO_DDS_DCPSInfo_i::add_domain_participant: ")
01070                ACE_TEXT("domain %d loaded participant %C at 0x%x.\n"),
01071                domain,
01072                std::string(converter).c_str(),
01073                participant.get()));
01074   }
01075   return value;
01076 }

Here is the call graph for this function:

Here is the caller graph for this function:

bool TAO_DDS_DCPSInfo_i::add_publication ( DDS::DomainId_t  domainId,
const OpenDDS::DCPS::RepoId participantId,
const OpenDDS::DCPS::RepoId topicId,
const OpenDDS::DCPS::RepoId pubId,
const char *  pub_str,
const DDS::DataWriterQos qos,
const OpenDDS::DCPS::TransportLocatorSeq transInfo,
const DDS::PublisherQos publisherQos,
bool  associate = false 
)

Add a previously existing publication to the repository.

Parameters:
domainId the Domain in which the Publication is contained.
participantId the Participant in which the Publication is contained.
topicId the Topic of the Publication.
pubId the GUID Id value to use for the Publication.
pub_str stringified publication callback to DataWriter.
qos the QoS value of the DataWriter.
transInfo the transport information for the Publication.
publisherQos the QoS value of the Publisher.
associate indicate whether to create new associations.

Adds a Publication to the repository using a specified Publication GUID Id value. If the PublicationId indicates that this Publication was created by within this repository (the federation Id is the current repositories federation Id), this method will ensure that any subsequent calls to add a Publication and obtain a newly generated Id value will return an Id value greater than the Id value of the current one.

: Check if this is already stored. If so, just clear the callback IOR.

Definition at line 481 of file DCPSInfo_i.cpp.

References ACE_TEXT(), DCPS_IR_Participant::add_publication(), DCPS_IR_Topic::add_publication_reference(), OpenDDS::DCPS::DCPS_debug_level, dispatchingOrb_, domains_, OpenDDS::DCPS::GuidConverter::entityKey(), federation_, OpenDDS::DCPS::RepoIdConverter::federationId(), TAO_DDS_DCPSFederationId::id(), TAO_Pseudo_Var_T< T >::in(), CORBA::is_nil(), DCPS_IR_Participant::last_publication_key(), LM_ERROR, LM_WARNING, lock_, OpenDDS::DCPS::move(), orb_, and DCPS_IR_Participant::remove_publication().

00490 {
00491   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, false);
00492 
00493   // Grab the domain.
00494   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
00495 
00496   if (where == this->domains_.end()) {
00497     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00498       ACE_DEBUG((LM_WARNING,
00499                  ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_publication: ")
00500                  ACE_TEXT("invalid domain %d.\n"),
00501                  domainId));
00502     }
00503 
00504     return false;
00505   }
00506 
00507   // Grab the participant.
00508   DCPS_IR_Participant* partPtr
00509   = where->second->participant(participantId);
00510 
00511   if (0 == partPtr) {
00512     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00513       OpenDDS::DCPS::RepoIdConverter converter(pubId);
00514       ACE_DEBUG((LM_WARNING,
00515                  ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_publication: ")
00516                  ACE_TEXT("invalid participant %C in domain %d.\n"),
00517                  std::string(converter).c_str(),
00518                  domainId));
00519     }
00520 
00521     return false;
00522   }
00523 
00524   DCPS_IR_Topic* topic = where->second->find_topic(topicId);
00525 
00526   if (topic == 0) {
00527     OpenDDS::DCPS::RepoIdConverter converter(topicId);
00528     ACE_DEBUG((LM_WARNING,
00529                ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_publication: ")
00530                ACE_TEXT("invalid topic %C in domain %d.\n"),
00531                std::string(converter).c_str(),
00532                domainId));
00533     return false;
00534   }
00535 
00536   /// @TODO: Check if this is already stored.  If so, just clear the callback IOR.
00537 
00538   CORBA::Object_var obj = (dispatchingOrb_ ? dispatchingOrb_ : orb_)->string_to_object(pub_str);
00539   if (CORBA::is_nil(obj.in())) {
00540     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00541       ACE_DEBUG((LM_WARNING,
00542                  ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_publication: ")
00543                  ACE_TEXT("failure converting string %C to objref\n"),
00544                  pub_str));
00545     }
00546     return false;
00547   }
00548 
00549   OpenDDS::DCPS::DataWriterRemote_var publication = OpenDDS::DCPS::DataWriterRemote::_unchecked_narrow(obj.in());
00550 
00551   OpenDDS::DCPS::unique_ptr<DCPS_IR_Publication> pubPtr(
00552     new DCPS_IR_Publication(
00553                    pubId,
00554                    partPtr,
00555                    topic,
00556                    publication.in(),
00557                    qos,
00558                    transInfo,
00559                    publisherQos));
00560 
00561   DCPS_IR_Publication* pub = pubPtr.get();
00562   switch (partPtr->add_publication(move(pubPtr))) {
00563   case -1: {
00564     OpenDDS::DCPS::RepoIdConverter converter(pubId);
00565     ACE_ERROR((LM_ERROR,
00566                ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::add_publication: ")
00567                ACE_TEXT("failed to add publication to participant %C.\n"),
00568                std::string(converter).c_str()));
00569     return false;
00570   }
00571 
00572   case 1:
00573     return false;
00574   case 0:
00575   default:
00576     break;
00577   }
00578 
00579   switch (topic->add_publication_reference(pub, associate)) {
00580   case -1: {
00581     OpenDDS::DCPS::RepoIdConverter converter(pubId);
00582     ACE_ERROR((LM_ERROR,
00583                ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::add_publication: ")
00584                ACE_TEXT("failed to add publication to participant %C topic list.\n"),
00585                std::string(converter).c_str()));
00586 
00587     // Remove the publication.
00588     partPtr->remove_publication(pubId);
00589 
00590   }
00591   return false;
00592 
00593   case 1: // This is actually a really really bad place to jump to.
00594     // This means that we successfully added the new publication
00595     // to the participant (it had not been inserted before) but
00596     // that we are adding a duplicate publication to the topic
00597     // list - which should never ever be able to happen.
00598     return false;
00599 
00600   case 0:
00601   default:
00602     break;
00603   }
00604 
00605   OpenDDS::DCPS::RepoIdConverter converter(pubId);
00606 
00607   // See if we are adding a publication that was created within this
00608   // repository or a different repository.
00609   if (converter.federationId() == federation_.id()) {
00610     // Ensure the publication RepoId values do not conflict.
00611     partPtr->last_publication_key(converter.entityKey());
00612   }
00613 
00614   return true;
00615 }

Here is the call graph for this function:

OpenDDS::DCPS::RepoId TAO_DDS_DCPSInfo_i::add_publication ( DDS::DomainId_t  domainId,
const OpenDDS::DCPS::RepoId participantId,
const OpenDDS::DCPS::RepoId topicId,
OpenDDS::DCPS::DataWriterRemote_ptr  publication,
const DDS::DataWriterQos qos,
const OpenDDS::DCPS::TransportLocatorSeq transInfo,
const DDS::PublisherQos publisherQos 
) [virtual]

Definition at line 369 of file DCPSInfo_i.cpp.

References ACE_TEXT(), DCPS_IR_Participant::add_publication(), DCPS_IR_Topic::add_publication_reference(), Update::Manager::create(), Update::DataWriter, OpenDDS::DCPS::DCPS_debug_level, dispatchingOrb_, domains_, OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), DCPS_IR_Participant::get_next_publication_id(), OpenDDS::DCPS::GUID_UNKNOWN, CORBA::is_nil(), DCPS_IR_Participant::isBitPublisher(), LM_DEBUG, LM_WARNING, lock_, OpenDDS::DCPS::move(), orb_, DCPS_IR_Participant::remove_publication(), and um_.

Referenced by Update::Manager::add(), OpenDDS::Federator::ManagerImpl::processCreate(), OpenDDS::Federator::ManagerImpl::processDeferred(), and receive_image().

00377 {
00378   if (CORBA::is_nil(publication)) {
00379     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00380       ACE_DEBUG((LM_WARNING,
00381         ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_publication: ")
00382         ACE_TEXT("invalid publication reference.\n")));
00383     }
00384     return OpenDDS::DCPS::GUID_UNKNOWN;
00385   }
00386 
00387   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, OpenDDS::DCPS::GUID_UNKNOWN);
00388 
00389   // Grab the domain.
00390   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
00391 
00392   if (where == this->domains_.end()) {
00393     throw OpenDDS::DCPS::Invalid_Domain();
00394   }
00395 
00396   // Grab the participant.
00397   DCPS_IR_Participant* partPtr
00398   = where->second->participant(participantId);
00399 
00400   if (0 == partPtr) {
00401     throw OpenDDS::DCPS::Invalid_Participant();
00402   }
00403 
00404   DCPS_IR_Topic* topic = where->second->find_topic(topicId);
00405 
00406   if (topic == 0) {
00407     throw OpenDDS::DCPS::Invalid_Topic();
00408   }
00409 
00410   // Get a Id for the Writer, make it a builtin kind if this is for a BIT
00411   OpenDDS::DCPS::RepoId pubId = partPtr->get_next_publication_id(
00412     OpenDDS::DCPS::RepoIdConverter(topicId).isBuiltinDomainEntity());
00413 
00414   OpenDDS::DCPS::DataWriterRemote_var dispatchingPublication =
00415     OpenDDS::DCPS::DataWriterRemote::_duplicate(publication);
00416 
00417   if (dispatchingOrb_) {
00418     // Remarshall the remote reference onto the dispatching orb.
00419     CORBA::String_var pubStr = orb_->object_to_string(dispatchingPublication);
00420     CORBA::Object_var pubObj = dispatchingOrb_->string_to_object(pubStr);
00421     if (CORBA::is_nil(pubObj))  {
00422       if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00423         ACE_DEBUG((LM_WARNING,
00424                    ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_publication: ")
00425                    ACE_TEXT("failure marshalling publication on dispatching orb.\n")));
00426       }
00427       return OpenDDS::DCPS::GUID_UNKNOWN;
00428     }
00429 
00430     dispatchingPublication = OpenDDS::DCPS::DataWriterRemote::_unchecked_narrow(pubObj);
00431   }
00432 
00433   OpenDDS::DCPS::unique_ptr<DCPS_IR_Publication> pubPtr(
00434     new DCPS_IR_Publication(
00435                    pubId,
00436                    partPtr,
00437                    topic,
00438                    dispatchingPublication.in(),
00439                    qos,
00440                    transInfo,
00441                    publisherQos));
00442 
00443   DCPS_IR_Publication* pub = pubPtr.get();
00444   if (partPtr->add_publication(OpenDDS::DCPS::move(pubPtr)) != 0) {
00445     // failed to add.  we are responsible for the memory.
00446     pubId = OpenDDS::DCPS::GUID_UNKNOWN;
00447   } else if (topic->add_publication_reference(pub) != 0) {
00448     // Failed to add to the topic
00449     // so remove from participant and fail.
00450     partPtr->remove_publication(pubId);
00451     pubId = OpenDDS::DCPS::GUID_UNKNOWN;
00452   }
00453 
00454   if (this->um_ && (partPtr->isBitPublisher() == false)) {
00455     CORBA::String_var callback = orb_->object_to_string(publication);
00456     Update::ContentSubscriptionInfo csi;
00457 
00458     Update::UWActor actor(domainId, pubId, topicId, participantId, Update::DataWriter
00459                           , callback.in()
00460                           , const_cast<DDS::PublisherQos &>(publisherQos)
00461                           , const_cast<DDS::DataWriterQos &>(qos)
00462                           , const_cast<OpenDDS::DCPS::TransportLocatorSeq &>
00463                           (transInfo), csi);
00464     this->um_->create(actor);
00465 
00466     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00467       OpenDDS::DCPS::RepoIdConverter converter(pubId);
00468       ACE_DEBUG((LM_DEBUG,
00469                  ACE_TEXT("(%P|%t) (RepoId)TAO_DDS_DCPSInfo_i::add_publication: ")
00470                  ACE_TEXT("pushing creation of publication %C in domain %d.\n"),
00471                  std::string(converter).c_str(),
00472                  domainId));
00473     }
00474   }
00475 
00476   where->second->remove_dead_participants();
00477   return pubId;
00478 }

Here is the call graph for this function:

Here is the caller graph for this function:

bool TAO_DDS_DCPSInfo_i::add_subscription ( DDS::DomainId_t  domainId,
const OpenDDS::DCPS::RepoId participantId,
const OpenDDS::DCPS::RepoId topicId,
const OpenDDS::DCPS::RepoId subId,
const char *  sub_str,
const DDS::DataReaderQos qos,
const OpenDDS::DCPS::TransportLocatorSeq transInfo,
const DDS::SubscriberQos subscriberQos,
const char *  filterClassName,
const char *  filterExpression,
const DDS::StringSeq exprParams,
bool  associate = false 
)

Add a previously existing subscription to the repository.

Parameters:
domainId the Domain in which the Subscription is contained.
participantId the Participant in which the Subscription is contained.
topicId the Topic of the Subscription.
subId the GUID Id value to use for the Subscription.
sub_str stringified publication callback to DataReader.
qos the QoS value of the DataReader.
transInfo the transport information for the Subscription.
subscriberQos the QoS value of the Subscriber.
associate indicate whether to create new associations.

Adds a Subscription to the repository using a specified Subscription GUID Id value. If the SubscriptionId indicates that this Subscription was created by within this repository (the federation Id is the current repositories federation Id), this method will ensure that any subsequent calls to add a Publication and obtain a newly generated Id value will return an Id value greater than the Id value of the current one.

Definition at line 793 of file DCPSInfo_i.cpp.

References ACE_TEXT(), DCPS_IR_Participant::add_subscription(), DCPS_IR_Topic::add_subscription_reference(), OpenDDS::DCPS::DCPS_debug_level, dispatchingOrb_, domains_, OpenDDS::DCPS::GuidConverter::entityKey(), federation_, OpenDDS::DCPS::RepoIdConverter::federationId(), TAO_DDS_DCPSFederationId::id(), TAO_Pseudo_Var_T< T >::in(), CORBA::is_nil(), DCPS_IR_Participant::last_subscription_key(), LM_ERROR, LM_WARNING, lock_, OpenDDS::DCPS::move(), orb_, and DCPS_IR_Participant::remove_subscription().

00806 {
00807   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, false);
00808 
00809   // Grab the domain.
00810   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
00811 
00812   if (where == this->domains_.end()) {
00813     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00814       ACE_DEBUG((LM_WARNING,
00815                  ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_subscription: ")
00816                  ACE_TEXT("invalid domain %d.\n"),
00817                  domainId));
00818     }
00819 
00820     return false;
00821   }
00822 
00823   // Grab the participant.
00824   DCPS_IR_Participant* partPtr
00825   = where->second->participant(participantId);
00826 
00827   if (0 == partPtr) {
00828     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00829       OpenDDS::DCPS::RepoIdConverter converter(participantId);
00830       ACE_DEBUG((LM_WARNING,
00831                  ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_subscription: ")
00832                  ACE_TEXT("invalid participant %C in domain %d.\n"),
00833                  std::string(converter).c_str(),
00834                  domainId));
00835     }
00836 
00837     return false;
00838   }
00839 
00840   DCPS_IR_Topic* topic = where->second->find_topic(topicId);
00841 
00842   if (topic == 0) {
00843     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00844       OpenDDS::DCPS::RepoIdConverter converter(topicId);
00845       ACE_DEBUG((LM_WARNING,
00846                  ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_subscription: ")
00847                  ACE_TEXT("invalid topic %C in domain %d.\n"),
00848                  std::string(converter).c_str(),
00849                  domainId));
00850     }
00851 
00852     return false;
00853   }
00854 
00855   CORBA::Object_var obj = (dispatchingOrb_ ? dispatchingOrb_ : orb_) ->string_to_object(sub_str);
00856   if (CORBA::is_nil(obj.in())) {
00857     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00858       ACE_DEBUG((LM_WARNING,
00859                  ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_subscription: ")
00860                  ACE_TEXT("failure converting string %C to objref\n"),
00861                  sub_str));
00862     }
00863     return false;
00864   }
00865 
00866   OpenDDS::DCPS::DataReaderRemote_var subscription = OpenDDS::DCPS::DataReaderRemote::_unchecked_narrow(obj.in());
00867 
00868   OpenDDS::DCPS::unique_ptr<DCPS_IR_Subscription> subPtr(
00869     new DCPS_IR_Subscription(
00870                    subId,
00871                    partPtr,
00872                    topic,
00873                    subscription.in(),
00874                    qos,
00875                    transInfo,
00876                    subscriberQos,
00877                    filterClassName,
00878                    filterExpression,
00879                    exprParams));
00880 
00881   DCPS_IR_Subscription* sub = subPtr.get();
00882   switch (partPtr->add_subscription(OpenDDS::DCPS::move(subPtr))) {
00883   case -1: {
00884     OpenDDS::DCPS::RepoIdConverter converter(subId);
00885     ACE_ERROR((LM_ERROR,
00886                ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::add_subscription: ")
00887                ACE_TEXT("failed to add subscription to participant %C.\n"),
00888                std::string(converter).c_str()));
00889     return false;
00890   }
00891 
00892   case 1:
00893     return false;
00894 
00895   case 0:
00896   default:
00897     break;
00898   }
00899 
00900   switch (topic->add_subscription_reference(sub, associate)) {
00901   case -1: {
00902     OpenDDS::DCPS::RepoIdConverter converter(subId);
00903     ACE_ERROR((LM_ERROR,
00904                ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::add_subscription: ")
00905                ACE_TEXT("failed to add subscription to participant %C topic list.\n"),
00906                std::string(converter).c_str()));
00907 
00908     // Remove the subscription.
00909     partPtr->remove_subscription(subId);
00910 
00911   }
00912   return false;
00913 
00914   case 1: // This is actually a really really bad place to jump to.
00915     // This means that we successfully added the new subscription
00916     // to the participant (it had not been inserted before) but
00917     // that we are adding a duplicate subscription to the topic
00918     // list - which should never ever be able to happen.
00919     return false;
00920 
00921   case 0:
00922   default:
00923     break;
00924   }
00925 
00926   OpenDDS::DCPS::RepoIdConverter converter(subId);
00927 
00928   // See if we are adding a subscription that was created within this
00929   // repository or a different repository.
00930   if (converter.federationId() == federation_.id()) {
00931     // Ensure the subscription RepoId values do not conflict.
00932     partPtr->last_subscription_key(converter.entityKey());
00933   }
00934 
00935   return true;
00936 }

Here is the call graph for this function:

OpenDDS::DCPS::RepoId TAO_DDS_DCPSInfo_i::add_subscription ( DDS::DomainId_t  domainId,
const OpenDDS::DCPS::RepoId participantId,
const OpenDDS::DCPS::RepoId topicId,
OpenDDS::DCPS::DataReaderRemote_ptr  subscription,
const DDS::DataReaderQos qos,
const OpenDDS::DCPS::TransportLocatorSeq transInfo,
const DDS::SubscriberQos subscriberQos,
const char *  filterClassName,
const char *  filterExpression,
const DDS::StringSeq exprParams 
) [virtual]

Definition at line 665 of file DCPSInfo_i.cpp.

References ACE_TEXT(), DCPS_IR_Participant::add_subscription(), DCPS_IR_Topic::add_subscription_reference(), Update::Manager::create(), Update::DataReader, OpenDDS::DCPS::DCPS_debug_level, dispatchingOrb_, domains_, OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), DCPS_IR_Participant::get_next_subscription_id(), OpenDDS::DCPS::GUID_UNKNOWN, TAO_Pseudo_Var_T< T >::in(), CORBA::is_nil(), DCPS_IR_Participant::isBitPublisher(), LM_DEBUG, LM_ERROR, LM_WARNING, lock_, OpenDDS::DCPS::move(), orb_, DCPS_IR_Domain::participant(), DCPS_IR_Domain::remove_dead_participants(), DCPS_IR_Participant::remove_subscription(), OpenDDS::DCPS::unique_ptr< T, Deleter >::reset(), and um_.

Referenced by Update::Manager::add(), OpenDDS::Federator::ManagerImpl::processCreate(), OpenDDS::Federator::ManagerImpl::processDeferred(), and receive_image().

00676 {
00677   if (CORBA::is_nil(subscription)) {
00678     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00679       ACE_DEBUG((LM_WARNING,
00680         ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_subscription: ")
00681         ACE_TEXT("invalid subscription reference.\n")));
00682     }
00683     return OpenDDS::DCPS::GUID_UNKNOWN;
00684   }
00685 
00686   DCPS_IR_Domain* domainPtr;
00687   DCPS_IR_Participant* partPtr;
00688   DCPS_IR_Topic* topic;
00689   OpenDDS::DCPS::RepoId subId;
00690   OpenDDS::DCPS::unique_ptr<DCPS_IR_Subscription> subPtr;
00691   {
00692     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, OpenDDS::DCPS::GUID_UNKNOWN);
00693 
00694     // Grab the domain.
00695     DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
00696 
00697     if (where == this->domains_.end()) {
00698       throw OpenDDS::DCPS::Invalid_Domain();
00699     }
00700 
00701     // Grab the domain and participant.
00702     domainPtr = where->second.get();
00703     partPtr = domainPtr->participant(participantId);
00704 
00705     if (0 == partPtr) {
00706       throw OpenDDS::DCPS::Invalid_Participant();
00707     }
00708 
00709     topic = where->second->find_topic(topicId);
00710 
00711     if (topic == 0) {
00712       throw OpenDDS::DCPS::Invalid_Topic();
00713     }
00714 
00715     // Get a Id for the Reader, make it a builtin kind if this is for a BIT
00716     subId = partPtr->get_next_subscription_id(
00717       OpenDDS::DCPS::RepoIdConverter(topicId).isBuiltinDomainEntity());
00718 
00719     OpenDDS::DCPS::DataReaderRemote_var dispatchingSubscription (
00720       OpenDDS::DCPS::DataReaderRemote::_duplicate(subscription));
00721 
00722     if (dispatchingOrb_) {
00723       // Remarshall the remote reference onto the dispatching orb.
00724       CORBA::String_var subStr = orb_->object_to_string(dispatchingSubscription);
00725       CORBA::Object_var subObj = dispatchingOrb_->string_to_object(subStr);
00726       if (CORBA::is_nil(subObj.in())) {
00727         if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00728           ACE_DEBUG((LM_WARNING,
00729                      ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_subscription: ")
00730                      ACE_TEXT("failure marshalling subscription on dispatching orb.\n")));
00731         }
00732         return OpenDDS::DCPS::GUID_UNKNOWN;
00733       }
00734       dispatchingSubscription = OpenDDS::DCPS::DataReaderRemote::_unchecked_narrow(subObj);
00735     }
00736 
00737     subPtr.reset(
00738       new DCPS_IR_Subscription(
00739                      subId,
00740                      partPtr,
00741                      topic,
00742                      dispatchingSubscription.in(),
00743                      qos,
00744                      transInfo,
00745                      subscriberQos,
00746                      filterClassName,
00747                      filterExpression,
00748                      exprParams));
00749 
00750     // Release lock
00751   }
00752 
00753   DCPS_IR_Subscription* sub = subPtr.get();
00754   if (partPtr->add_subscription(move(subPtr)) != 0) {
00755     // failed to add.  we are responsible for the memory.
00756     subId = OpenDDS::DCPS::GUID_UNKNOWN;
00757   } else if (topic->add_subscription_reference(sub) != 0) {
00758     ACE_ERROR((LM_ERROR, ACE_TEXT("Failed to add subscription to topic list.\n")));
00759     // No associations were made so remove and fail.
00760     partPtr->remove_subscription(subId);
00761     subId = OpenDDS::DCPS::GUID_UNKNOWN;
00762   }
00763 
00764   if (this->um_ && (partPtr->isBitPublisher() == false)) {
00765     CORBA::String_var callback = orb_->object_to_string(subscription);
00766     Update::ContentSubscriptionInfo csi(filterClassName, filterExpression, exprParams);
00767 
00768     Update::URActor actor(domainId, subId, topicId, participantId, Update::DataReader
00769                           , callback.in()
00770                           , const_cast<DDS::SubscriberQos &>(subscriberQos)
00771                           , const_cast<DDS::DataReaderQos &>(qos)
00772                           , const_cast<OpenDDS::DCPS::TransportLocatorSeq &>
00773                           (transInfo), csi);
00774 
00775     this->um_->create(actor);
00776 
00777     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00778       OpenDDS::DCPS::RepoIdConverter converter(subId);
00779       ACE_DEBUG((LM_DEBUG,
00780                  ACE_TEXT("(%P|%t) (RepoId)TAO_DDS_DCPSInfo_i::add_subscription: ")
00781                  ACE_TEXT("pushing creation of subscription %C in domain %d.\n"),
00782                  std::string(converter).c_str(),
00783                  domainId));
00784     }
00785   }
00786 
00787   domainPtr->remove_dead_participants();
00788 
00789   return subId;
00790 }

Here is the call graph for this function:

Here is the caller graph for this function:

bool TAO_DDS_DCPSInfo_i::add_topic ( const OpenDDS::DCPS::RepoId topicId,
DDS::DomainId_t  domainId,
const OpenDDS::DCPS::RepoId participantId,
const char *  topicName,
const char *  dataTypeName,
const DDS::TopicQos qos 
)

Add a previously existing topic to the repository.

Parameters:
topicId the Topic Entity GUID Id to use.
domainId the Domain in which the Topic is contained.
participantId the Participant in which the Topic is contained.
topicName the name of the Topic.
dataTypeName the name of the data type.
qos the QoS value to use for the Topic.

Adds a Topic Entity to the repository using a specified TopicId value. If the TopicId indicates that this Topic was created by within this repository (the federation Id is the current repositories federation Id), this method will ensure that any subsequent calls to add a Topic and obtain a newly generated Id value will return an Id value greater than the Id value of the current one.

Definition at line 227 of file DCPSInfo_i.cpp.

References ACE_TEXT(), OpenDDS::DCPS::CREATED, OpenDDS::DCPS::DCPS_debug_level, domains_, OpenDDS::DCPS::GuidConverter::entityKey(), federation_, OpenDDS::DCPS::RepoIdConverter::federationId(), TAO_DDS_DCPSFederationId::id(), DCPS_IR_Participant::last_topic_key(), LM_WARNING, and lock_.

Referenced by Update::Manager::add(), OpenDDS::Federator::ManagerImpl::processCreate(), OpenDDS::Federator::ManagerImpl::processDeferred(), and receive_image().

00233 {
00234   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, false);
00235 
00236   // Grab the domain.
00237   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
00238 
00239   if (where == this->domains_.end()) {
00240     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00241       ACE_DEBUG((LM_WARNING,
00242                  ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_topic: ")
00243                  ACE_TEXT("invalid domain %d.\n"),
00244                  domainId));
00245     }
00246 
00247     return false;
00248   }
00249 
00250   // Grab the participant.
00251   DCPS_IR_Participant* participantPtr
00252   = where->second->participant(participantId);
00253 
00254   if (0 == participantPtr) {
00255     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00256       OpenDDS::DCPS::RepoIdConverter converter(participantId);
00257       ACE_DEBUG((LM_WARNING,
00258                  ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_topic: ")
00259                  ACE_TEXT("invalid participant %C.\n"),
00260                  std::string(converter).c_str()));
00261     }
00262 
00263     return false;
00264   }
00265 
00266   OpenDDS::DCPS::TopicStatus topicStatus
00267   = where->second->force_add_topic(topicId, topicName, dataTypeName,
00268                                    qos, participantPtr);
00269 
00270   if (topicStatus != OpenDDS::DCPS::CREATED) {
00271     return false;
00272   }
00273 
00274   OpenDDS::DCPS::RepoIdConverter converter(topicId);
00275 
00276   // See if we are adding a topic that was created within this
00277   // repository or a different repository.
00278   if (converter.federationId() == federation_.id()) {
00279     // Ensure the topic RepoId values do not conflict.
00280     participantPtr->last_topic_key(converter.entityKey());
00281   }
00282 
00283   return true;
00284 }

Here is the call graph for this function:

Here is the caller graph for this function:

OpenDDS::DCPS::TopicStatus TAO_DDS_DCPSInfo_i::assert_topic ( OpenDDS::DCPS::RepoId_out  topicId,
DDS::DomainId_t  domainId,
const OpenDDS::DCPS::RepoId participantId,
const char *  topicName,
const char *  dataTypeName,
const DDS::TopicQos qos,
bool  hasDcpsKey 
) [virtual]

Definition at line 175 of file DCPSInfo_i.cpp.

References ACE_TEXT(), Update::Manager::create(), OpenDDS::DCPS::DCPS_debug_level, domains_, OpenDDS::DCPS::INTERNAL_ERROR, DCPS_IR_Participant::isBitPublisher(), LM_DEBUG, lock_, and um_.

00183 {
00184   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, OpenDDS::DCPS::INTERNAL_ERROR);
00185   // Grab the domain.
00186   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
00187 
00188   if (where == this->domains_.end()) {
00189     throw OpenDDS::DCPS::Invalid_Domain();
00190   }
00191 
00192   // Grab the participant.
00193   DCPS_IR_Participant* participantPtr
00194   = where->second->participant(participantId);
00195 
00196   if (0 == participantPtr) {
00197     throw OpenDDS::DCPS::Invalid_Participant();
00198   }
00199 
00200   OpenDDS::DCPS::TopicStatus topicStatus
00201   = where->second->add_topic(
00202       topicId,
00203       topicName,
00204       dataTypeName,
00205       qos,
00206       participantPtr);
00207 
00208   if (this->um_ && (participantPtr->isBitPublisher() == false)) {
00209     Update::UTopic topic(domainId, topicId, participantId
00210                          , topicName, dataTypeName
00211                          , const_cast<DDS::TopicQos &>(qos));
00212     this->um_->create(topic);
00213 
00214     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00215       OpenDDS::DCPS::RepoIdConverter converter(topicId);
00216       ACE_DEBUG((LM_DEBUG,
00217                  ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::assert_topic: ")
00218                  ACE_TEXT("pushing creation of topic %C in domain %d.\n"),
00219                  std::string(converter).c_str(),
00220                  domainId));
00221     }
00222   }
00223   return topicStatus;
00224 }

Here is the call graph for this function:

void TAO_DDS_DCPSInfo_i::association_complete ( DDS::DomainId_t  domainId,
const OpenDDS::DCPS::RepoId participantId,
const OpenDDS::DCPS::RepoId localId,
const OpenDDS::DCPS::RepoId remoteId 
) [virtual]

Definition at line 1513 of file DCPSInfo_i.cpp.

References ACE_TEXT(), DCPS_IR_Publication::association_complete(), DCPS_IR_Subscription::association_complete(), OpenDDS::DCPS::DCPS_debug_level, domains_, DCPS_IR_Participant::find_publication_reference(), DCPS_IR_Participant::find_subscription_reference(), LM_INFO, LM_WARNING, and lock_.

01517 {
01518   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
01519 
01520   DCPS_IR_Domain_Map::iterator dom_iter = this->domains_.find(domainId);
01521   if (dom_iter == this->domains_.end()) {
01522     return;
01523   }
01524 
01525   DCPS_IR_Participant* partPtr = dom_iter->second->participant(participantId);
01526   if (0 == partPtr) {
01527     return;
01528   }
01529 
01530   // localId could be pub or sub (initial implementation will only use sub
01531   // since the DataReader is the passive peer)
01532   DCPS_IR_Subscription* sub = 0;
01533   DCPS_IR_Publication* pub = 0;
01534   if (OpenDDS::DCPS::DCPS_debug_level > 3) {
01535     ACE_DEBUG((LM_INFO, "(%P|%t) completing association\n"));
01536   }
01537   if (0 == partPtr->find_subscription_reference(localId, sub)) {
01538     sub->association_complete(remoteId);
01539   } else if (0 == partPtr->find_publication_reference(localId, pub)) {
01540     pub->association_complete(remoteId);
01541   } else {
01542     if (OpenDDS::DCPS::DCPS_debug_level > 3) {
01543       OpenDDS::DCPS::RepoIdConverter part_converter(participantId);
01544       OpenDDS::DCPS::RepoIdConverter local_converter(localId);
01545       OpenDDS::DCPS::RepoIdConverter remote_converter(remoteId);
01546       ACE_DEBUG((LM_WARNING,
01547                  ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i::association_complete: ")
01548                  ACE_TEXT("participant %C could not find subscription or publication %C ")
01549                  ACE_TEXT("to complete association with remote %C.\n"),
01550                  std::string(part_converter).c_str(),
01551                  std::string(local_converter).c_str(),
01552                  std::string(remote_converter).c_str()));
01553     }
01554   }
01555 }

Here is the call graph for this function:

CORBA::Boolean TAO_DDS_DCPSInfo_i::attach_participant ( DDS::DomainId_t  domainId,
const OpenDDS::DCPS::RepoId participantId 
) [virtual]

Definition at line 119 of file DCPSInfo_i.cpp.

References domains_, lock_, and DCPS_IR_Participant::takeOwnership().

00122 {
00123   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, 0);
00124 
00125   // Grab the domain.
00126   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
00127 
00128   if (where == this->domains_.end()) {
00129     throw OpenDDS::DCPS::Invalid_Domain();
00130   }
00131 
00132   // Grab the participant.
00133   DCPS_IR_Participant* participant
00134   = where->second->participant(participantId);
00135 
00136   if (0 == participant) {
00137     throw OpenDDS::DCPS::Invalid_Participant();
00138   }
00139 
00140   // Establish ownership within the local repository.
00141   participant->takeOwnership();
00142 
00143   return false;
00144 }

Here is the call graph for this function:

bool TAO_DDS_DCPSInfo_i::changeOwnership ( DDS::DomainId_t  domainId,
const OpenDDS::DCPS::RepoId participantId,
long  sender,
long  owner 
)

assert new ownership for a participant and its contained entities.

Parameters:
domainId the domain in which the participant resides.
participantId the participant to be owned.
sender the repository sending the update data.
owner the repository which is to make callbacks for entities within the participant.
Returns:
boolean indicating that ownership has been assigned.

This establishes owner as the new owner of the participant. Ownership consists of calling back to the reader and writer remote interfaces when associations are established and removed from a publication or subscription. Owner may be the special value of OWNER_NONE to indicate that the previous owner is no longer available to make callbacks and the application has not indicated which repository is to replace it in this capacity.

The sender of the update is included so that the participant can check that transitions to OWNER_NONE are only honored when initiated by the current owner of the participant.

A return value of false indicates that the ownership was specified for a domain or participant which could not be found.

Definition at line 147 of file DCPSInfo_i.cpp.

References DCPS_IR_Participant::changeOwner(), domains_, and lock_.

Referenced by OpenDDS::Federator::ManagerImpl::processCreate(), OpenDDS::Federator::ManagerImpl::processDeferred(), OpenDDS::Federator::ManagerImpl::processDelete(), and OpenDDS::Federator::ManagerImpl::processUpdateQos1().

00152 {
00153   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, false);
00154 
00155   // Grab the domain.
00156   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
00157 
00158   if (where == this->domains_.end()) {
00159     return false;
00160   }
00161 
00162   // Grab the participant.
00163   DCPS_IR_Participant* participant
00164   = where->second->participant(participantId);
00165 
00166   if (0 == participant) {
00167     return false;
00168   }
00169 
00170   // Establish the ownership.
00171   participant->changeOwner(sender, owner);
00172   return true;
00173 }

Here is the call graph for this function:

Here is the caller graph for this function:

void TAO_DDS_DCPSInfo_i::disassociate_participant ( DDS::DomainId_t  domainId,
const OpenDDS::DCPS::RepoId local_id,
const OpenDDS::DCPS::RepoId remote_id 
) [virtual]

Definition at line 1304 of file DCPSInfo_i.cpp.

References domains_, lock_, DCPS_IR_Participant::publications(), and DCPS_IR_Participant::subscriptions().

01308 {
01309   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
01310 
01311   DCPS_IR_Domain_Map::iterator it(this->domains_.find(domainId));
01312   if (it == this->domains_.end()) {
01313     throw OpenDDS::DCPS::Invalid_Domain();
01314   }
01315 
01316   DCPS_IR_Participant* participant = it->second->participant(local_id);
01317   if (participant == 0) {
01318     throw OpenDDS::DCPS::Invalid_Participant();
01319   }
01320 
01321   // Disassociate from participant temporarily:
01322   const DCPS_IR_Subscription_Map& subscriptions = participant->subscriptions();
01323   for (DCPS_IR_Subscription_Map::const_iterator sub(subscriptions.begin());
01324        sub != subscriptions.end(); ++sub) {
01325     sub->second->disassociate_participant(remote_id, true);
01326   }
01327 
01328   const DCPS_IR_Publication_Map& publications = participant->publications();
01329   for (DCPS_IR_Publication_Map::const_iterator pub(publications.begin());
01330        pub != publications.end(); ++pub) {
01331     pub->second->disassociate_participant(remote_id, true);
01332   }
01333 
01334   it->second->remove_dead_participants();
01335 }

Here is the call graph for this function:

void TAO_DDS_DCPSInfo_i::disassociate_publication ( DDS::DomainId_t  domainId,
const OpenDDS::DCPS::RepoId participantId,
const OpenDDS::DCPS::RepoId local_id,
const OpenDDS::DCPS::RepoId remote_id 
) [virtual]

Definition at line 1380 of file DCPSInfo_i.cpp.

References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, DCPS_IR_Publication::disassociate_subscription(), domains_, DCPS_IR_Participant::find_publication_reference(), LM_ERROR, LM_INFO, and lock_.

01385 {
01386   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
01387 
01388   DCPS_IR_Domain_Map::iterator it(this->domains_.find(domainId));
01389   if (it == this->domains_.end()) {
01390     throw OpenDDS::DCPS::Invalid_Domain();
01391   }
01392 
01393   DCPS_IR_Participant* participant = it->second->participant(participantId);
01394   if (participant == 0) {
01395     throw OpenDDS::DCPS::Invalid_Participant();
01396   }
01397 
01398   if (OpenDDS::DCPS::DCPS_debug_level > 3) {
01399     ACE_DEBUG((LM_INFO, "(%P|%t) disassociating publication\n"));
01400   }
01401 
01402   DCPS_IR_Publication* publication;
01403   if (participant->find_publication_reference(local_id, publication)
01404       != 0 || publication == 0) {
01405     OpenDDS::DCPS::RepoIdConverter part_converter(participantId);
01406     OpenDDS::DCPS::RepoIdConverter pub_converter(local_id);
01407     ACE_ERROR((LM_ERROR,
01408                ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::disassociate_publication: ")
01409                ACE_TEXT("participant %C could not find publication %C.\n"),
01410                std::string(part_converter).c_str(),
01411                std::string(pub_converter).c_str()));
01412     throw OpenDDS::DCPS::Invalid_Publication();
01413   }
01414 
01415   // Disassociate from subscription temporarily:
01416   publication->disassociate_subscription(remote_id, true);
01417 
01418   it->second->remove_dead_participants();
01419 }

Here is the call graph for this function:

void TAO_DDS_DCPSInfo_i::disassociate_subscription ( DDS::DomainId_t  domainId,
const OpenDDS::DCPS::RepoId participantId,
const OpenDDS::DCPS::RepoId local_id,
const OpenDDS::DCPS::RepoId remote_id 
) [virtual]

Definition at line 1338 of file DCPSInfo_i.cpp.

References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, DCPS_IR_Subscription::disassociate_publication(), domains_, DCPS_IR_Participant::find_subscription_reference(), LM_ERROR, LM_INFO, and lock_.

01343 {
01344   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
01345 
01346   DCPS_IR_Domain_Map::iterator it(this->domains_.find(domainId));
01347   if (it == this->domains_.end()) {
01348     throw OpenDDS::DCPS::Invalid_Domain();
01349   }
01350 
01351   DCPS_IR_Participant* participant = it->second->participant(participantId);
01352   if (participant == 0) {
01353     throw OpenDDS::DCPS::Invalid_Participant();
01354   }
01355 
01356   if (OpenDDS::DCPS::DCPS_debug_level > 3) {
01357     ACE_DEBUG((LM_INFO, "(%P|%t) disassociating subscription\n"));
01358   }
01359 
01360   DCPS_IR_Subscription* subscription;
01361   if (participant->find_subscription_reference(local_id, subscription)
01362       != 0 || subscription == 0) {
01363     OpenDDS::DCPS::RepoIdConverter part_converter(participantId);
01364     OpenDDS::DCPS::RepoIdConverter sub_converter(local_id);
01365     ACE_ERROR((LM_ERROR,
01366                ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::disassociate_subscription: ")
01367                ACE_TEXT("participant %C could not find subscription %C.\n"),
01368                std::string(part_converter).c_str(),
01369                std::string(sub_converter).c_str()));
01370     throw OpenDDS::DCPS::Invalid_Subscription();
01371   }
01372 
01373   // Disassociate from publication temporarily:
01374   subscription->disassociate_publication(remote_id, true);
01375 
01376   it->second->remove_dead_participants();
01377 }

Here is the call graph for this function:

DCPS_IR_Domain * TAO_DDS_DCPSInfo_i::domain ( DDS::DomainId_t  domain  ) 

Convert a domain Id into a reference to a DCPS_IR_Domain object.

Definition at line 2134 of file DCPSInfo_i.cpp.

References ACE_TEXT(), OpenDDS::DCPS::Service_Participant::ANY_DOMAIN, OpenDDS::DCPS::DCPS_debug_level, domains_, federation_, OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), DCPS_IR_Domain::init_built_in_topics(), LM_DEBUG, LM_ERROR, OpenDDS::DCPS::move(), TAO_DDS_DCPSFederationId::overridden(), participantIdGenerator_, reincarnate_, TheServiceParticipant, and DCPS_IR_Domain::useBIT().

Referenced by add_domain_participant(), receive_image(), and update_subscription_params().

02135 {
02136   if (domain == OpenDDS::DCPS::Service_Participant::ANY_DOMAIN) {
02137     ACE_ERROR((LM_ERROR,
02138                ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::domain: ")
02139                ACE_TEXT("ANY_DOMAIN not supported for operations.\n")));
02140     return 0;
02141   }
02142 
02143   // Check if the domain is already in the map.
02144   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domain);
02145 
02146   if (where == this->domains_.end()) {
02147     // We will attempt to insert a new domain, go ahead and allocate it.
02148     OpenDDS::DCPS::unique_ptr<DCPS_IR_Domain> domain_uptr( new
02149                    DCPS_IR_Domain(domain, this->participantIdGenerator_));
02150 
02151     DCPS_IR_Domain* domainPtr = domain_uptr.get();
02152 
02153     // We need to insert the domain into the map at this time since it
02154     // might be looked up during the init_built_in_topics() call.
02155     this->domains_.insert(
02156       where,
02157       DCPS_IR_Domain_Map::value_type(domain, OpenDDS::DCPS::move(domain_uptr)));
02158 
02159 #ifndef DDS_HAS_MINIMUM_BIT
02160     if (TheServiceParticipant->get_BIT() && !domainPtr->useBIT() &&
02161       domainPtr->init_built_in_topics(federation_.overridden(), reincarnate_)
02162     ) {
02163       ACE_ERROR((LM_ERROR,
02164                  ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::domain: ")
02165                  ACE_TEXT("failed to initialize the Built-In Topics ")
02166                  ACE_TEXT("when loading domain %d.\n"),
02167                  domain));
02168       this->domains_.erase(domain);
02169       return 0;
02170     }
02171 #endif
02172 
02173     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
02174       ACE_DEBUG((LM_DEBUG,
02175                  ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::domain: ")
02176                  ACE_TEXT("successfully loaded domain %d at %x.\n"),
02177                  domain,
02178                  domainPtr));
02179     }
02180     return domainPtr;
02181 
02182   } else {
02183     return where->second.get();
02184   }
02185 }

Here is the call graph for this function:

Here is the caller graph for this function:

const DCPS_IR_Domain_Map & TAO_DDS_DCPSInfo_i::domains (  )  const

Expose a readable reference of the domain map.

Definition at line 2470 of file DCPSInfo_i.cpp.

References domains_.

Referenced by OpenDDS::Federator::ManagerImpl::pushState().

02471 {
02472   return this->domains_;
02473 }

Here is the caller graph for this function:

char * TAO_DDS_DCPSInfo_i::dump_to_string (  )  [virtual]

Dump the Repos state to string.

Definition at line 2477 of file DCPSInfo_i.cpp.

References domains_, and FACE::string_dup().

02478 {
02479   std::string dump;
02480 #if !defined (OPENDDS_INFOREPO_REDUCED_FOOTPRINT)
02481   std::string indent ("    ");
02482 
02483   for (DCPS_IR_Domain_Map::const_iterator dm = domains_.begin();
02484        dm != domains_.end();
02485        dm++)
02486   {
02487     dump += dm->second->dump_to_string(indent, 0);
02488   }
02489 #endif // !defined (OPENDDS_INFOREPO_REDUCED_FOOTPRINT)
02490   return CORBA::string_dup(dump.c_str());
02491 
02492 }

Here is the call graph for this function:

void TAO_DDS_DCPSInfo_i::finalize ( void   ) 

Cleanup state for shutdown.

Definition at line 2452 of file DCPSInfo_i.cpp.

References ACE_Reactor::cancel_timer(), dispatch_check_timer_id_, orb_, ACE_Event_Handler::reactor(), and reassociate_timer_id_.

02453 {
02454   if (reassociate_timer_id_ != -1) {
02455     ACE_Reactor* reactor = this->orb_->orb_core()->reactor();
02456 
02457     reactor->cancel_timer(this->reassociate_timer_id_);
02458     this->reassociate_timer_id_ = -1;
02459   }
02460 
02461   if (dispatch_check_timer_id_ != -1) {
02462     ACE_Reactor* reactor = this->orb_->orb_core()->reactor();
02463 
02464     reactor->cancel_timer(this->dispatch_check_timer_id_);
02465     this->dispatch_check_timer_id_ = -1;
02466   }
02467 }

Here is the call graph for this function:

OpenDDS::DCPS::TopicStatus TAO_DDS_DCPSInfo_i::find_topic ( DDS::DomainId_t  domainId,
const char *  topicName,
CORBA::String_out  dataTypeName,
DDS::TopicQos_out  qos,
OpenDDS::DCPS::RepoId_out  topicId 
) [virtual]

Definition at line 286 of file DCPSInfo_i.cpp.

References domains_, OpenDDS::DCPS::FOUND, DCPS_IR_Topic_Description::get_dataTypeName(), DCPS_IR_Topic::get_id(), DCPS_IR_Topic::get_topic_description(), DCPS_IR_Topic::get_topic_qos(), OpenDDS::DCPS::INTERNAL_ERROR, lock_, OpenDDS::DCPS::NOT_FOUND, and status.

00292 {
00293   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, OpenDDS::DCPS::INTERNAL_ERROR);
00294 
00295   // Grab the domain.
00296   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
00297 
00298   if (where == this->domains_.end()) {
00299     throw OpenDDS::DCPS::Invalid_Domain();
00300   }
00301 
00302   OpenDDS::DCPS::TopicStatus status = OpenDDS::DCPS::NOT_FOUND;
00303 
00304   DCPS_IR_Topic* topic = 0;
00305   qos = new DDS::TopicQos;
00306 
00307   status = where->second->find_topic(topicName, topic);
00308 
00309   if (0 != topic) {
00310     status = OpenDDS::DCPS::FOUND;
00311     const DCPS_IR_Topic_Description* desc = topic->get_topic_description();
00312     dataTypeName = desc->get_dataTypeName();
00313     *qos = *(topic->get_topic_qos());
00314     topicId = topic->get_id();
00315   }
00316 
00317   return status;
00318 }

Here is the call graph for this function:

int TAO_DDS_DCPSInfo_i::handle_timeout ( const ACE_Time_Value now,
const void *  arg 
) [virtual]

Reimplemented from ACE_Event_Handler.

Definition at line 63 of file DCPSInfo_i.cpp.

References dispatchingOrb_, domains_, TAO_Pseudo_Var_T< T >::in(), CORBA::is_nil(), and lock_.

00065 {
00066   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, 0);
00067 
00068   if (arg == this) {
00069     if ( !CORBA::is_nil(this->dispatchingOrb_.in())){
00070       if (this->dispatchingOrb_->work_pending())
00071       {
00072         // Ten microseconds
00073         ACE_Time_Value small(0,10);
00074         this->dispatchingOrb_->perform_work(small);
00075       }
00076     }
00077   }
00078   else {
00079   // NOTE: This is a purposefully naive approach to addressing defunct
00080   // associations.  In the future, it may be worthwhile to introduce a
00081   // callback model to fix the heinous runtime cost below:
00082   for (DCPS_IR_Domain_Map::const_iterator dom(this->domains_.begin());
00083        dom != this->domains_.end(); ++dom) {
00084 
00085     const DCPS_IR_Participant_Map& participants(dom->second->participants());
00086     for (DCPS_IR_Participant_Map::const_iterator part(participants.begin());
00087          part != participants.end(); ++part) {
00088 
00089       const DCPS_IR_Subscription_Map& subscriptions(part->second->subscriptions());
00090       for (DCPS_IR_Subscription_Map::const_iterator sub(subscriptions.begin());
00091            sub != subscriptions.end(); ++sub) {
00092         sub->second->reevaluate_defunct_associations();
00093       }
00094 
00095       const DCPS_IR_Publication_Map& publications(part->second->publications());
00096       for (DCPS_IR_Publication_Map::const_iterator pub(publications.begin());
00097            pub != publications.end(); ++pub) {
00098         pub->second->reevaluate_defunct_associations();
00099       }
00100     }
00101   }
00102   }
00103 
00104   return 0;
00105 }

Here is the call graph for this function:

void TAO_DDS_DCPSInfo_i::ignore_domain_participant ( DDS::DomainId_t  domainId,
const OpenDDS::DCPS::RepoId myParticipantId,
const OpenDDS::DCPS::RepoId ignoreId 
) [virtual]

Definition at line 1557 of file DCPSInfo_i.cpp.

References domains_, DCPS_IR_Participant::ignore_participant(), and lock_.

01561 {
01562   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
01563 
01564   // Grab the domain.
01565   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
01566 
01567   if (where == this->domains_.end()) {
01568     throw OpenDDS::DCPS::Invalid_Domain();
01569   }
01570 
01571   // Grab the participant.
01572   DCPS_IR_Participant* partPtr
01573   = where->second->participant(myParticipantId);
01574 
01575   if (0 == partPtr) {
01576     throw OpenDDS::DCPS::Invalid_Participant();
01577   }
01578 
01579   partPtr->ignore_participant(ignoreId);
01580 
01581   where->second->remove_dead_participants();
01582 }

Here is the call graph for this function:

void TAO_DDS_DCPSInfo_i::ignore_publication ( DDS::DomainId_t  domainId,
const OpenDDS::DCPS::RepoId myParticipantId,
const OpenDDS::DCPS::RepoId ignoreId 
) [virtual]

Definition at line 1638 of file DCPSInfo_i.cpp.

References domains_, DCPS_IR_Participant::ignore_publication(), and lock_.

01642 {
01643   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
01644 
01645   // Grab the domain.
01646   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
01647 
01648   if (where == this->domains_.end()) {
01649     throw OpenDDS::DCPS::Invalid_Domain();
01650   }
01651 
01652   // Grab the participant.
01653   DCPS_IR_Participant* partPtr
01654   = where->second->participant(myParticipantId);
01655 
01656   if (0 == partPtr) {
01657     throw OpenDDS::DCPS::Invalid_Participant();
01658   }
01659 
01660   partPtr->ignore_publication(ignoreId);
01661 
01662   where->second->remove_dead_participants();
01663 }

Here is the call graph for this function:

void TAO_DDS_DCPSInfo_i::ignore_subscription ( DDS::DomainId_t  domainId,
const OpenDDS::DCPS::RepoId myParticipantId,
const OpenDDS::DCPS::RepoId ignoreId 
) [virtual]

Definition at line 1611 of file DCPSInfo_i.cpp.

References domains_, DCPS_IR_Participant::ignore_subscription(), and lock_.

01615 {
01616   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
01617 
01618   // Grab the domain.
01619   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
01620 
01621   if (where == this->domains_.end()) {
01622     throw OpenDDS::DCPS::Invalid_Domain();
01623   }
01624 
01625   // Grab the participant.
01626   DCPS_IR_Participant* partPtr
01627   = where->second->participant(myParticipantId);
01628 
01629   if (0 == partPtr) {
01630     throw OpenDDS::DCPS::Invalid_Participant();
01631   }
01632 
01633   partPtr->ignore_subscription(ignoreId);
01634 
01635   where->second->remove_dead_participants();
01636 }

Here is the call graph for this function:

void TAO_DDS_DCPSInfo_i::ignore_topic ( DDS::DomainId_t  domainId,
const OpenDDS::DCPS::RepoId myParticipantId,
const OpenDDS::DCPS::RepoId ignoreId 
) [virtual]

Definition at line 1584 of file DCPSInfo_i.cpp.

References domains_, DCPS_IR_Participant::ignore_topic(), and lock_.

01588 {
01589   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
01590 
01591   // Grab the domain.
01592   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
01593 
01594   if (where == this->domains_.end()) {
01595     throw OpenDDS::DCPS::Invalid_Domain();
01596   }
01597 
01598   // Grab the participant.
01599   DCPS_IR_Participant* partPtr
01600   = where->second->participant(myParticipantId);
01601 
01602   if (0 == partPtr) {
01603     throw OpenDDS::DCPS::Invalid_Participant();
01604   }
01605 
01606   partPtr->ignore_topic(ignoreId);
01607 
01608   where->second->remove_dead_participants();
01609 }

Here is the call graph for this function:

bool TAO_DDS_DCPSInfo_i::init_dispatchChecking ( const ACE_Time_Value delay  ) 

Definition at line 2441 of file DCPSInfo_i.cpp.

References dispatch_check_timer_id_, orb_, ACE_Event_Handler::reactor(), and ACE_Reactor::schedule_timer().

02442 {
02443   if (this->dispatch_check_timer_id_ != -1) return false;  // already scheduled
02444 
02445   ACE_Reactor* reactor = this->orb_->orb_core()->reactor();
02446 
02447   this->dispatch_check_timer_id_ = reactor->schedule_timer(this, this, delay, delay);
02448   return this->dispatch_check_timer_id_ != -1;
02449 }

Here is the call graph for this function:

bool TAO_DDS_DCPSInfo_i::init_persistence (  ) 

Definition at line 2408 of file DCPSInfo_i.cpp.

References ACE_TEXT(), Update::Manager::add(), LM_ERROR, reincarnate_, Update::Manager::requestImage(), and um_.

02409 {
02410   um_ = ACE_Dynamic_Service<UpdateManagerSvc>::instance
02411         ("UpdateManagerSvc");
02412 
02413   if (um_ != 0) {
02414     um_->add(this);
02415 
02416     // Request persistent image.
02417     if (reincarnate_) {
02418       um_->requestImage();
02419     }
02420 
02421   } else {
02422     ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("TAO_DDS_DCPSInfo_i> Failed to discover ")
02423                       ACE_TEXT("UpdateManagerSvc.\n")), false);
02424   }
02425 
02426   return true;
02427 }

Here is the call graph for this function:

bool TAO_DDS_DCPSInfo_i::init_reassociation ( const ACE_Time_Value delay  ) 

Definition at line 2430 of file DCPSInfo_i.cpp.

References orb_, ACE_Event_Handler::reactor(), reassociate_timer_id_, and ACE_Reactor::schedule_timer().

02431 {
02432   if (this->reassociate_timer_id_ != -1) return false;  // already scheduled
02433 
02434   ACE_Reactor* reactor = this->orb_->orb_core()->reactor();
02435 
02436   this->reassociate_timer_id_ = reactor->schedule_timer(this, 0, delay, delay);
02437   return this->reassociate_timer_id_ != -1;
02438 }

Here is the call graph for this function:

int TAO_DDS_DCPSInfo_i::init_transport ( int  listen_address_given,
const char *  listen_str 
)

Initialize the transport for the Built-In Topics Returns 0 (zero) if succeeds

Definition at line 2187 of file DCPSInfo_i.cpp.

References ACE_TEXT(), ACE_Service_Config::current(), OpenDDS::DCPS::TransportRegistry::DEFAULT_INST_PREFIX, OpenDDS::DCPS::dynamic_rchandle_cast(), Util::find(), OpenDDS::DCPS::TransportRegistry::instance(), ACE_Service_Config::process_directive(), and status.

02189 {
02190   int status = 0;
02191 
02192   try {
02193 
02194 #ifndef ACE_AS_STATIC_LIBS
02195     if (ACE_Service_Config::current()->find(ACE_TEXT("OpenDDS_Tcp"))
02196         < 0 /* not found (-1) or suspended (-2) */) {
02197       static const ACE_TCHAR directive[] =
02198         ACE_TEXT("dynamic OpenDDS_Tcp Service_Object * ")
02199         ACE_TEXT("OpenDDS_Tcp:_make_TcpLoader()");
02200       ACE_Service_Config::process_directive(directive);
02201     }
02202 #endif
02203 
02204     std::string config_name =
02205       OpenDDS::DCPS::TransportRegistry::DEFAULT_INST_PREFIX
02206       + std::string("InfoRepoBITTransportConfig");
02207     OpenDDS::DCPS::TransportConfig_rch config =
02208       OpenDDS::DCPS::TransportRegistry::instance()->create_config(config_name);
02209 
02210     std::string inst_name =
02211       OpenDDS::DCPS::TransportRegistry::DEFAULT_INST_PREFIX
02212       + std::string("InfoRepoBITTCPTransportInst");
02213     OpenDDS::DCPS::TransportInst_rch inst =
02214       OpenDDS::DCPS::TransportRegistry::instance()->create_inst(inst_name,
02215                                                                "tcp");
02216     config->instances_.push_back(inst);
02217 
02218     OpenDDS::DCPS::TcpInst_rch tcp_inst =
02219       OpenDDS::DCPS::dynamic_rchandle_cast<OpenDDS::DCPS::TcpInst>(inst);
02220     inst->datalink_release_delay_ = 0;
02221 
02222     tcp_inst->conn_retry_attempts_ = 0;
02223 
02224     if (listen_address_given) {
02225       tcp_inst->local_address(listen_str);
02226     }
02227 
02228   } catch (...) {
02229     // TransportRegistry is extremely varied in the exceptions that
02230     // it throws on failure; do not allow exceptions to bubble up
02231     // beyond this point.
02232     status = 1;
02233   }
02234   return status;
02235 }

Here is the call graph for this function:

CORBA::ORB_ptr TAO_DDS_DCPSInfo_i::orb ( void   ) 

Expose the ORB.

Definition at line 114 of file DCPSInfo_i.cpp.

References CORBA::ORB::_duplicate(), TAO_Pseudo_Var_T< T >::in(), and orb_.

Referenced by OpenDDS::Federator::ManagerImpl::pushState().

00115 {
00116   return CORBA::ORB::_duplicate(this->orb_.in());
00117 }

Here is the call graph for this function:

Here is the caller graph for this function:

bool TAO_DDS_DCPSInfo_i::receive_image ( const Update::UImage image  ) 

Definition at line 2238 of file DCPSInfo_i.cpp.

References ACE_TEXT(), Update::ActorStrt< PSQ, RWQ, C, T, CSP >::actorId, Update::ImageData< T, P, A, W >::actors, add_domain_participant(), add_publication(), add_subscription(), add_topic(), Update::ActorStrt< PSQ, RWQ, C, T, CSP >::callback, Update::ActorStrt< PSQ, RWQ, C, T, CSP >::contentSubscriptionProfile, Update::TopicStrt< Q, S >::dataType, OpenDDS::DCPS::DCPS_debug_level, domain(), Update::ActorStrt< PSQ, RWQ, C, T, CSP >::domainId, Update::TopicStrt< Q, S >::domainId, Update::ParticipantStrt< Q >::domainId, domains_, Update::ActorStrt< PSQ, RWQ, C, T, CSP >::drdwQos, OpenDDS::DCPS::RepoIdGenerator::last(), Update::ImageData< T, P, A, W >::lastPartId, LM_DEBUG, LM_ERROR, LM_WARNING, Update::TopicStrt< Q, S >::name, Update::ActorStrt< PSQ, RWQ, C, T, CSP >::participantId, Update::TopicStrt< Q, S >::participantId, Update::ParticipantStrt< Q >::participantId, participantIdGenerator_, Update::ParticipantStrt< Q >::participantQos, Update::ImageData< T, P, A, W >::participants, Update::ActorStrt< PSQ, RWQ, C, T, CSP >::pubsubQos, TheServiceParticipant, Update::ActorStrt< PSQ, RWQ, C, T, CSP >::topicId, Update::TopicStrt< Q, S >::topicId, Update::TopicStrt< Q, S >::topicQos, Update::ImageData< T, P, A, W >::topics, Update::ActorStrt< PSQ, RWQ, C, T, CSP >::transportInterfaceInfo, and Update::ImageData< T, P, A, W >::wActors.

Referenced by Update::Manager::pushImage().

02239 {
02240   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
02241     ACE_DEBUG((LM_DEBUG,
02242                ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::receive_image: ")
02243                ACE_TEXT("processing persistent data.\n")));
02244   }
02245 
02246   // Initialize builtin topics first so that they always have the same IDs
02247 #ifndef DDS_HAS_MINIMUM_BIT
02248   if (TheServiceParticipant->get_BIT()) {
02249     for (Update::UImage::ParticipantSeq::const_iterator
02250          iter = image.participants.begin();
02251          iter != image.participants.end(); iter++) {
02252       const Update::UParticipant* part = *iter;
02253       if (!domain(part->domainId)) {
02254         if (OpenDDS::DCPS::DCPS_debug_level > 4) {
02255           ACE_DEBUG((LM_WARNING,
02256                      ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i::receive_image: ")
02257                      ACE_TEXT("invalid domain Id: %d\n"),
02258                      part->domainId));
02259         }
02260         return false;
02261       }
02262     }
02263   }
02264 #endif
02265 
02266   // Ensure that new non-BIT participants do not reuse an id
02267   participantIdGenerator_.last(image.lastPartId);
02268 
02269   for (Update::UImage::ParticipantSeq::const_iterator
02270        iter = image.participants.begin();
02271        iter != image.participants.end(); iter++) {
02272     const Update::UParticipant* part = *iter;
02273 
02274     if (!this->add_domain_participant(part->domainId, part->participantId
02275                                       , part->participantQos)) {
02276       OpenDDS::DCPS::RepoIdConverter converter(part->participantId);
02277       ACE_ERROR((LM_ERROR,
02278                  ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::receive_image: ")
02279                  ACE_TEXT("failed to add participant %C to domain %d.\n"),
02280                  std::string(converter).c_str(),
02281                  part->domainId));
02282       return false;
02283 
02284     } else if (OpenDDS::DCPS::DCPS_debug_level > 0) {
02285       OpenDDS::DCPS::RepoIdConverter converter(part->participantId);
02286       ACE_DEBUG((LM_DEBUG,
02287                  ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::receive_image: ")
02288                  ACE_TEXT("added participant %C to domain %d.\n"),
02289                  std::string(converter).c_str(),
02290                  part->domainId));
02291     }
02292   }
02293 
02294   for (Update::UImage::TopicSeq::const_iterator iter = image.topics.begin();
02295        iter != image.topics.end(); iter++) {
02296     const Update::UTopic* topic = *iter;
02297 
02298     if (!this->add_topic(topic->topicId, topic->domainId
02299                          , topic->participantId, topic->name.c_str()
02300                          , topic->dataType.c_str(), topic->topicQos)) {
02301       OpenDDS::DCPS::RepoIdConverter topic_converter(topic->topicId);
02302       OpenDDS::DCPS::RepoIdConverter part_converter(topic->participantId);
02303       ACE_ERROR((LM_ERROR,
02304                  ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::receive_image: ")
02305                  ACE_TEXT("failed to add topic %C to participant %C.\n"),
02306                  std::string(topic_converter).c_str(),
02307                  std::string(part_converter).c_str()));
02308       return false;
02309 
02310     } else if (OpenDDS::DCPS::DCPS_debug_level > 0) {
02311       OpenDDS::DCPS::RepoIdConverter topic_converter(topic->topicId);
02312       OpenDDS::DCPS::RepoIdConverter part_converter(topic->participantId);
02313       ACE_DEBUG((LM_DEBUG,
02314                  ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::receive_image: ")
02315                  ACE_TEXT("added topic %C to participant %C.\n"),
02316                  std::string(topic_converter).c_str(),
02317                  std::string(part_converter).c_str()));
02318     }
02319   }
02320 
02321   for (Update::UImage::ReaderSeq::const_iterator iter = image.actors.begin();
02322        iter != image.actors.end(); iter++) {
02323     const Update::URActor* sub = *iter;
02324 
02325     // no reason to associate, there are no publishers yet to associate with
02326     if (!this->add_subscription(sub->domainId, sub->participantId
02327                                 , sub->topicId, sub->actorId
02328                                 , sub->callback.c_str(), sub->drdwQos
02329                                 , sub->transportInterfaceInfo
02330                                 , sub->pubsubQos
02331                                 , sub->contentSubscriptionProfile.filterClassName
02332                                 , sub->contentSubscriptionProfile.filterExpr
02333                                 , sub->contentSubscriptionProfile.exprParams)) {
02334       OpenDDS::DCPS::RepoIdConverter sub_converter(sub->actorId);
02335       OpenDDS::DCPS::RepoIdConverter part_converter(sub->participantId);
02336       ACE_ERROR((LM_ERROR,
02337                  ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::receive_image: ")
02338                  ACE_TEXT("failed to add subscription %C to participant %C.\n"),
02339                  std::string(sub_converter).c_str(),
02340                  std::string(part_converter).c_str()));
02341       return false;
02342 
02343     } else if (OpenDDS::DCPS::DCPS_debug_level > 0) {
02344       OpenDDS::DCPS::RepoIdConverter sub_converter(sub->actorId);
02345       OpenDDS::DCPS::RepoIdConverter part_converter(sub->participantId);
02346       ACE_DEBUG((LM_DEBUG,
02347                  ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::receive_image: ")
02348                  ACE_TEXT("added subscription %C to participant %C.\n"),
02349                  std::string(sub_converter).c_str(),
02350                  std::string(part_converter).c_str()));
02351     }
02352   }
02353 
02354   for (Update::UImage::WriterSeq::const_iterator iter = image.wActors.begin();
02355        iter != image.wActors.end(); iter++) {
02356     const Update::UWActor* pub = *iter;
02357 
02358     // try to associate with any persisted subscriptions to track any expected
02359     // existing associations
02360     if (!this->add_publication(pub->domainId, pub->participantId
02361                                , pub->topicId, pub->actorId
02362                                , pub->callback.c_str() , pub->drdwQos
02363                                , pub->transportInterfaceInfo
02364                                , pub->pubsubQos
02365                                , true)) {
02366       OpenDDS::DCPS::RepoIdConverter pub_converter(pub->actorId);
02367       OpenDDS::DCPS::RepoIdConverter part_converter(pub->participantId);
02368       ACE_ERROR((LM_ERROR,
02369                  ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::receive_image: ")
02370                  ACE_TEXT("failed to add publication %C to participant %C.\n"),
02371                  std::string(pub_converter).c_str(),
02372                  std::string(part_converter).c_str()));
02373       return false;
02374 
02375     } else if (OpenDDS::DCPS::DCPS_debug_level > 0) {
02376       OpenDDS::DCPS::RepoIdConverter pub_converter(pub->actorId);
02377       OpenDDS::DCPS::RepoIdConverter part_converter(pub->participantId);
02378       ACE_DEBUG((LM_DEBUG,
02379                  ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::receive_image: ")
02380                  ACE_TEXT("added publication %C to participant %C.\n"),
02381                  std::string(pub_converter).c_str(),
02382                  std::string(part_converter).c_str()));
02383     }
02384   }
02385 
02386 #ifndef DDS_HAS_MINIMUM_BIT
02387   if (TheServiceParticipant->get_BIT()) {
02388     for (DCPS_IR_Domain_Map::const_iterator currentDomain = domains_.begin();
02389          currentDomain != domains_.end();
02390          ++currentDomain) {
02391       currentDomain->second->reassociate_built_in_topic_pubs();
02392     }
02393   }
02394 #endif
02395 
02396   return true;
02397 }

Here is the call graph for this function:

Here is the caller graph for this function:

bool TAO_DDS_DCPSInfo_i::remove_by_owner ( DDS::DomainId_t  domain,
long  owner 
)

Definition at line 1180 of file DCPSInfo_i.cpp.

References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, domains_, LM_DEBUG, lock_, DCPS_IR_Participant::publications(), remove_domain_participant(), DCPS_IR_Participant::remove_publication(), DCPS_IR_Participant::remove_subscription(), DCPS_IR_Participant::remove_topic_reference(), status, DCPS_IR_Participant::subscriptions(), and DCPS_IR_Participant::topics().

Referenced by OpenDDS::Federator::ManagerImpl::leave_federation().

01183 {
01184   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, false);
01185 
01186   // Grab the domain.
01187   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domain);
01188 
01189   if (where == this->domains_.end()) {
01190     return false;
01191   }
01192 
01193   std::vector<OpenDDS::DCPS::RepoId> candidates;
01194 
01195   for (DCPS_IR_Participant_Map::const_iterator
01196        current = where->second->participants().begin();
01197        current != where->second->participants().end();
01198        ++current) {
01199     if (current->second->owner() == owner) {
01200       candidates.push_back(current->second->get_id());
01201     }
01202   }
01203 
01204   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
01205     ACE_DEBUG((LM_DEBUG,
01206                ACE_TEXT("(%P|%t) (bool)TAO_DDS_DCPSInfo_i::remove_by_owner: ")
01207                ACE_TEXT("%d participants to remove from domain %d.\n"),
01208                candidates.size(),
01209                domain));
01210   }
01211 
01212   bool status = true;
01213 
01214   for (unsigned int index = 0; index < candidates.size(); ++index) {
01215     DCPS_IR_Participant* participant
01216     = where->second->participant(candidates[index]);
01217     if (participant) {
01218       std::vector<OpenDDS::DCPS::RepoId> keylist;
01219 
01220       // Remove Subscriptions
01221       for (DCPS_IR_Subscription_Map::const_iterator
01222         current = participant->subscriptions().begin();
01223         current != participant->subscriptions().end();
01224         ++current) {
01225         keylist.push_back(current->second->get_id());
01226       }
01227 
01228       if (OpenDDS::DCPS::DCPS_debug_level > 0) {
01229         OpenDDS::DCPS::RepoIdConverter converter(candidates[index]);
01230         ACE_DEBUG((LM_DEBUG,
01231           ACE_TEXT("(%P|%t) (bool)TAO_DDS_DCPSInfo_i::remove_by_owner: ")
01232           ACE_TEXT("%d subscriptions to remove from participant %C.\n"),
01233           keylist.size(),
01234           std::string(converter).c_str()));
01235       }
01236 
01237       for (unsigned int key = 0; key < keylist.size(); ++key) {
01238         if (participant->remove_subscription(keylist[key]) != 0) {
01239           status = false;
01240         }
01241       }
01242 
01243       // Remove Publications
01244       keylist.clear();
01245 
01246       for (DCPS_IR_Publication_Map::const_iterator
01247         current = participant->publications().begin();
01248         current != participant->publications().end();
01249         ++current) {
01250         keylist.push_back(current->second->get_id());
01251       }
01252 
01253       if (OpenDDS::DCPS::DCPS_debug_level > 0) {
01254         OpenDDS::DCPS::RepoIdConverter converter(candidates[index]);
01255         ACE_DEBUG((LM_DEBUG,
01256           ACE_TEXT("(%P|%t) (bool)TAO_DDS_DCPSInfo_i::remove_by_owner: ")
01257           ACE_TEXT("%d publications to remove from participant %C.\n"),
01258           keylist.size(),
01259           std::string(converter).c_str()));
01260       }
01261 
01262       for (unsigned int key = 0; key < keylist.size(); ++key) {
01263         if (participant->remove_publication(keylist[key]) != 0) {
01264           status = false;
01265         }
01266       }
01267 
01268       // Remove Topics
01269       keylist.clear();
01270 
01271       for (DCPS_IR_Topic_Map::const_iterator
01272         current = participant->topics().begin();
01273         current != participant->topics().end();
01274         ++current) {
01275         keylist.push_back(current->second->get_id());
01276       }
01277 
01278       if (OpenDDS::DCPS::DCPS_debug_level > 0) {
01279         OpenDDS::DCPS::RepoIdConverter converter(candidates[index]);
01280         ACE_DEBUG((LM_DEBUG,
01281           ACE_TEXT("(%P|%t) (bool)TAO_DDS_DCPSInfo_i::remove_by_owner: ")
01282           ACE_TEXT("%d topics to remove from participant %C.\n"),
01283           keylist.size(),
01284           std::string(converter).c_str()));
01285       }
01286 
01287       for (unsigned int key = 0; key < keylist.size(); ++key) {
01288         DCPS_IR_Topic* discard;
01289 
01290         if (participant->remove_topic_reference(keylist[key], discard) != 0) {
01291           status = false;
01292         }
01293       }
01294     }
01295 
01296     // Remove Participant
01297     this->remove_domain_participant(domain, candidates[ index]);
01298   }
01299 
01300   return status;
01301 }

Here is the call graph for this function:

Here is the caller graph for this function:

void TAO_DDS_DCPSInfo_i::remove_domain_participant ( DDS::DomainId_t  domainId,
const OpenDDS::DCPS::RepoId participantId 
) [virtual]

Definition at line 1421 of file DCPSInfo_i.cpp.

References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, Update::Manager::destroy(), domains_, ACE_Event_Handler_var::handler(), DCPS_IR_Participant::isBitPublisher(), DCPS_IR_Participant::isOwner(), LM_DEBUG, LM_ERROR, lock_, Update::Participant, status, TheServiceParticipant, and um_.

Referenced by OpenDDS::Federator::ManagerImpl::processDelete(), and remove_by_owner().

01424 {
01425   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
01426 
01427   // Grab the domain.
01428   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
01429 
01430   if (where == this->domains_.end()) {
01431     throw OpenDDS::DCPS::Invalid_Domain();
01432   }
01433 
01434   DCPS_IR_Participant* participant = where->second->participant(participantId);
01435 
01436   if (participant == 0) {
01437     OpenDDS::DCPS::RepoIdConverter converter(participantId);
01438     ACE_ERROR((LM_ERROR,
01439                ACE_TEXT("(%P|%t) ERROR: (bool)TAO_DDS_DCPSInfo_i::remove_domain_participant: ")
01440                ACE_TEXT("failed to locate participant %C in domain %d.\n"),
01441                std::string(converter).c_str(),
01442                domainId));
01443     throw OpenDDS::DCPS::Invalid_Participant();
01444   }
01445 
01446   // Determine if we should propagate this event;  we need to cache this
01447   // result as the participant will be gone by the time we use the result.
01448   bool sendUpdate = (participant->isOwner() == true)
01449                     && (participant->isBitPublisher() == false);
01450 
01451   CORBA::Boolean dont_notify_lost = 0;
01452   int status = where->second->remove_participant(participantId, dont_notify_lost);
01453 
01454   if (0 != status) {
01455     // Removing the participant failed
01456     throw OpenDDS::DCPS::Invalid_Participant();
01457   }
01458 
01459   // Update any concerned observers that the participant was destroyed.
01460   if (this->um_ && sendUpdate) {
01461     Update::IdPath path(
01462       where->second->get_id(),
01463       participantId,
01464       participantId);
01465     this->um_->destroy(path, Update::Participant);
01466 
01467     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
01468       OpenDDS::DCPS::RepoIdConverter converter(participantId);
01469       ACE_DEBUG((LM_DEBUG,
01470                  ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::remove_domain_participant: ")
01471                  ACE_TEXT("pushing deletion of participant %C in domain %d.\n"),
01472                  std::string(converter).c_str(),
01473                  domainId));
01474     }
01475   }
01476 
01477   if (where->second->participants().empty()) {
01478     domains_.erase(where);
01479   }
01480 
01481 #ifndef DDS_HAS_MINIMUM_BIT
01482   else if (where->second->useBIT() &&
01483            where->second->participants().size() == 1) {
01484     // The only participant left is the one we created to publish BITs.
01485     // It can be removed now since no user participants exist in this domain,
01486     // but it has to be removed on the Service Participant's reactor thread
01487     // in order to make the locking work properly in delete_participant().
01488     const ACE_Event_Handler_var eh = new BIT_Cleanup_Handler(this, domainId);
01489     TheServiceParticipant->reactor()->notify(eh.handler());
01490   }
01491 #endif
01492 }

Here is the call graph for this function:

Here is the caller graph for this function:

void TAO_DDS_DCPSInfo_i::remove_publication ( DDS::DomainId_t  domainId,
const OpenDDS::DCPS::RepoId participantId,
const OpenDDS::DCPS::RepoId publicationId 
) [virtual]

Definition at line 617 of file DCPSInfo_i.cpp.

References ACE_TEXT(), Update::Actor, Update::DataWriter, OpenDDS::DCPS::DCPS_debug_level, Update::Manager::destroy(), domains_, DCPS_IR_Participant::isBitPublisher(), DCPS_IR_Participant::isOwner(), LM_DEBUG, lock_, DCPS_IR_Participant::remove_publication(), and um_.

Referenced by OpenDDS::Federator::ManagerImpl::processDelete().

00621 {
00622   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00623 
00624   // Grab the domain.
00625   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
00626 
00627   if (where == this->domains_.end()) {
00628     throw OpenDDS::DCPS::Invalid_Domain();
00629   }
00630 
00631   // Grab the participant.
00632   DCPS_IR_Participant* partPtr
00633   = where->second->participant(participantId);
00634 
00635   if (0 == partPtr) {
00636     throw OpenDDS::DCPS::Invalid_Participant();
00637   }
00638 
00639   if (partPtr->remove_publication(publicationId) != 0) {
00640     where->second->remove_dead_participants();
00641 
00642     // throw exception because the publication was not removed!
00643     throw OpenDDS::DCPS::Invalid_Publication();
00644   }
00645 
00646   where->second->remove_dead_participants();
00647 
00648   if (this->um_
00649       && (partPtr->isOwner() == true)
00650       && (partPtr->isBitPublisher() == false)) {
00651     Update::IdPath path(domainId, participantId, publicationId);
00652     this->um_->destroy(path, Update::Actor, Update::DataWriter);
00653 
00654     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00655       OpenDDS::DCPS::RepoIdConverter converter(publicationId);
00656       ACE_DEBUG((LM_DEBUG,
00657                  ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::remove_publication: ")
00658                  ACE_TEXT("pushing deletion of publication %C in domain %d.\n"),
00659                  std::string(converter).c_str(),
00660                  domainId));
00661     }
00662   }
00663 }

Here is the call graph for this function:

Here is the caller graph for this function:

void TAO_DDS_DCPSInfo_i::remove_subscription ( DDS::DomainId_t  domainId,
const OpenDDS::DCPS::RepoId participantId,
const OpenDDS::DCPS::RepoId subscriptionId 
) [virtual]

Definition at line 938 of file DCPSInfo_i.cpp.

References ACE_TEXT(), Update::Actor, Update::DataReader, OpenDDS::DCPS::DCPS_debug_level, Update::Manager::destroy(), domains_, DCPS_IR_Participant::isBitPublisher(), DCPS_IR_Participant::isOwner(), LM_DEBUG, lock_, DCPS_IR_Participant::remove_subscription(), and um_.

Referenced by OpenDDS::Federator::ManagerImpl::processDelete().

00942 {
00943   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00944 
00945   // Grab the domain.
00946   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
00947 
00948   if (where == this->domains_.end()) {
00949     throw OpenDDS::DCPS::Invalid_Domain();
00950   }
00951 
00952   // Grab the participant.
00953   DCPS_IR_Participant* partPtr
00954   = where->second->participant(participantId);
00955 
00956   if (0 == partPtr) {
00957     throw OpenDDS::DCPS::Invalid_Participant();
00958   }
00959 
00960   if (partPtr->remove_subscription(subscriptionId) != 0) {
00961     // throw exception because the subscription was not removed!
00962     throw OpenDDS::DCPS::Invalid_Subscription();
00963   }
00964 
00965   where->second->remove_dead_participants();
00966 
00967   if (this->um_
00968       && (partPtr->isOwner() == true)
00969       && (partPtr->isBitPublisher() == false)) {
00970     Update::IdPath path(domainId, participantId, subscriptionId);
00971     this->um_->destroy(path, Update::Actor, Update::DataReader);
00972 
00973     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00974       OpenDDS::DCPS::RepoIdConverter converter(subscriptionId);
00975       ACE_DEBUG((LM_DEBUG,
00976                  ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::remove_subscription: ")
00977                  ACE_TEXT("pushing deletion of subscription %C in domain %d.\n"),
00978                  std::string(converter).c_str(),
00979                  domainId));
00980     }
00981   }
00982 }

Here is the call graph for this function:

Here is the caller graph for this function:

OpenDDS::DCPS::TopicStatus TAO_DDS_DCPSInfo_i::remove_topic ( DDS::DomainId_t  domainId,
const OpenDDS::DCPS::RepoId participantId,
const OpenDDS::DCPS::RepoId topicId 
) [virtual]

Definition at line 320 of file DCPSInfo_i.cpp.

References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, Update::Manager::destroy(), domains_, DCPS_IR_Participant::find_topic_reference(), OpenDDS::DCPS::INTERNAL_ERROR, DCPS_IR_Participant::isBitPublisher(), DCPS_IR_Participant::isOwner(), LM_DEBUG, lock_, Update::Topic, and um_.

Referenced by OpenDDS::Federator::ManagerImpl::processDelete().

00324 {
00325   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, OpenDDS::DCPS::INTERNAL_ERROR);
00326 
00327   // Grab the domain.
00328   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
00329 
00330   if (where == this->domains_.end()) {
00331     throw OpenDDS::DCPS::Invalid_Domain();
00332   }
00333 
00334   // Grab the participant.
00335   DCPS_IR_Participant* partPtr
00336   = where->second->participant(participantId);
00337 
00338   if (0 == partPtr) {
00339     throw OpenDDS::DCPS::Invalid_Participant();
00340   }
00341 
00342   DCPS_IR_Topic* topic;
00343 
00344   if (partPtr->find_topic_reference(topicId, topic) != 0) {
00345     throw OpenDDS::DCPS::Invalid_Topic();
00346   }
00347 
00348   OpenDDS::DCPS::TopicStatus removedStatus = where->second->remove_topic(partPtr, topic);
00349 
00350   if (this->um_
00351       && (partPtr->isOwner() == true)
00352       && (partPtr->isBitPublisher() == false)) {
00353     Update::IdPath path(domainId, participantId, topicId);
00354     this->um_->destroy(path, Update::Topic);
00355 
00356     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00357       OpenDDS::DCPS::RepoIdConverter converter(topicId);
00358       ACE_DEBUG((LM_DEBUG,
00359                  ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::remove_topic: ")
00360                  ACE_TEXT("pushing deletion of topic %C in domain %d.\n"),
00361                  std::string(converter).c_str(),
00362                  domainId));
00363     }
00364   }
00365 
00366   return removedStatus;
00367 }

Here is the call graph for this function:

Here is the caller graph for this function:

void TAO_DDS_DCPSInfo_i::shutdown ( void   )  [virtual]

Cause the entire repository to exit.

Definition at line 108 of file DCPSInfo_i.cpp.

References ShutdownInterface::shutdown(), and shutdown_.

Referenced by OpenDDS::Federator::ManagerImpl::leave_and_shutdown(), and OpenDDS::Federator::ManagerImpl::shutdown().

00109 {
00110   this->shutdown_->shutdown();
00111 }

Here is the call graph for this function:

Here is the caller graph for this function:

CORBA::Boolean TAO_DDS_DCPSInfo_i::update_domain_participant_qos ( DDS::DomainId_t  domain,
const OpenDDS::DCPS::RepoId participantId,
const DDS::DomainParticipantQos qos 
) [virtual]

Definition at line 2089 of file DCPSInfo_i.cpp.

References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, domains_, DCPS_IR_Participant::isBitPublisher(), DCPS_IR_Participant::isOwner(), LM_DEBUG, lock_, DCPS_IR_Participant::set_qos(), um_, and Update::Manager::update().

Referenced by OpenDDS::Federator::ManagerImpl::processUpdateQos1().

02093 {
02094   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, 0);
02095 
02096   // Grab the domain.
02097   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
02098 
02099   if (where == this->domains_.end()) {
02100     throw OpenDDS::DCPS::Invalid_Domain();
02101   }
02102 
02103   // Grab the participant.
02104   DCPS_IR_Participant* partPtr
02105   = where->second->participant(participantId);
02106 
02107   if (0 == partPtr) {
02108     throw OpenDDS::DCPS::Invalid_Participant();
02109   }
02110 
02111   if (partPtr->set_qos(qos) == false)
02112     return 0;
02113 
02114   if (this->um_
02115       && (partPtr->isOwner() == true)
02116       && (partPtr->isBitPublisher() == false)) {
02117     Update::IdPath path(domainId, participantId, participantId);
02118     this->um_->update(path, qos);
02119 
02120     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
02121       OpenDDS::DCPS::RepoIdConverter converter(participantId);
02122       ACE_DEBUG((LM_DEBUG,
02123                  ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::update_domain_participant_qos: ")
02124                  ACE_TEXT("pushing update of participant %C in domain %d.\n"),
02125                  std::string(converter).c_str(),
02126                  domainId));
02127     }
02128   }
02129 
02130   return 1;
02131 }

Here is the call graph for this function:

Here is the caller graph for this function:

void TAO_DDS_DCPSInfo_i::update_publication_qos ( DDS::DomainId_t  domainId,
const OpenDDS::DCPS::RepoId partId,
const OpenDDS::DCPS::RepoId dwId,
const DDS::PublisherQos qos 
)

Entry for federation updates of PublisherQos values.

Definition at line 1786 of file DCPSInfo_i.cpp.

References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, domains_, DCPS_IR_Participant::find_publication_reference(), LM_ERROR, LM_INFO, lock_, and DCPS_IR_Publication::set_qos().

01791 {
01792   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
01793 
01794   // Grab the domain.
01795   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
01796 
01797   if (where == this->domains_.end()) {
01798     throw OpenDDS::DCPS::Invalid_Domain();
01799   }
01800 
01801   // Grab the participant.
01802   DCPS_IR_Participant* partPtr
01803   = where->second->participant(partId);
01804 
01805   if (0 == partPtr) {
01806     throw OpenDDS::DCPS::Invalid_Participant();
01807   }
01808 
01809   if (OpenDDS::DCPS::DCPS_debug_level > 3) {
01810     ACE_DEBUG((LM_INFO, "(%P|%t) updating  publication qos 3\n"));
01811   }
01812 
01813   DCPS_IR_Publication* pub;
01814 
01815   if (partPtr->find_publication_reference(dwId, pub) != 0 || pub == 0) {
01816     OpenDDS::DCPS::RepoIdConverter part_converter(partId);
01817     OpenDDS::DCPS::RepoIdConverter pub_converter(dwId);
01818     ACE_ERROR((LM_ERROR,
01819                ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::update_publication_qos: ")
01820                ACE_TEXT("participant %C could not find publication %C.\n"),
01821                std::string(part_converter).c_str(),
01822                std::string(pub_converter).c_str()));
01823     throw OpenDDS::DCPS::Invalid_Publication();
01824   }
01825 
01826   pub->set_qos(qos);
01827 }

Here is the call graph for this function:

void TAO_DDS_DCPSInfo_i::update_publication_qos ( DDS::DomainId_t  domainId,
const OpenDDS::DCPS::RepoId partId,
const OpenDDS::DCPS::RepoId dwId,
const DDS::DataWriterQos qos 
)

Entry for federation updates of DataWriterQos values.

Definition at line 1742 of file DCPSInfo_i.cpp.

References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, domains_, DCPS_IR_Participant::find_publication_reference(), LM_ERROR, LM_INFO, lock_, and DCPS_IR_Publication::set_qos().

01747 {
01748   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
01749 
01750   // Grab the domain.
01751   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
01752 
01753   if (where == this->domains_.end()) {
01754     throw OpenDDS::DCPS::Invalid_Domain();
01755   }
01756 
01757   // Grab the participant.
01758   DCPS_IR_Participant* partPtr
01759   = where->second->participant(partId);
01760 
01761   if (0 == partPtr) {
01762     throw OpenDDS::DCPS::Invalid_Participant();
01763   }
01764 
01765   if (OpenDDS::DCPS::DCPS_debug_level > 3) {
01766     ACE_DEBUG((LM_INFO, "(%P|%t) updating  publication qos 2\n"));
01767   }
01768 
01769   DCPS_IR_Publication* pub;
01770 
01771   if (partPtr->find_publication_reference(dwId, pub) != 0 || pub == 0) {
01772     OpenDDS::DCPS::RepoIdConverter part_converter(partId);
01773     OpenDDS::DCPS::RepoIdConverter pub_converter(dwId);
01774     ACE_ERROR((LM_ERROR,
01775                ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::update_publication_qos: ")
01776                ACE_TEXT("participant %C could not find publication %C.\n"),
01777                std::string(part_converter).c_str(),
01778                std::string(pub_converter).c_str()));
01779     throw OpenDDS::DCPS::Invalid_Publication();
01780   }
01781 
01782   pub->set_qos(qos);
01783 }

Here is the call graph for this function:

CORBA::Boolean TAO_DDS_DCPSInfo_i::update_publication_qos ( DDS::DomainId_t  domainId,
const OpenDDS::DCPS::RepoId partId,
const OpenDDS::DCPS::RepoId dwId,
const DDS::DataWriterQos qos,
const DDS::PublisherQos publisherQos 
) [virtual]

Definition at line 1665 of file DCPSInfo_i.cpp.

References ACE_TEXT(), Update::DataWriterQos, OpenDDS::DCPS::DCPS_debug_level, domains_, DCPS_IR_Participant::find_publication_reference(), DCPS_IR_Participant::isBitPublisher(), LM_DEBUG, LM_ERROR, LM_INFO, lock_, Update::NoQos, Update::PublisherQos, DCPS_IR_Publication::set_qos(), um_, and Update::Manager::update().

Referenced by OpenDDS::Federator::ManagerImpl::processUpdateQos1(), and OpenDDS::Federator::ManagerImpl::processUpdateQos2().

01671 {
01672   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, 0);
01673 
01674   // Grab the domain.
01675   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
01676 
01677   if (where == this->domains_.end()) {
01678     throw OpenDDS::DCPS::Invalid_Domain();
01679   }
01680 
01681   // Grab the participant.
01682   DCPS_IR_Participant* partPtr
01683   = where->second->participant(partId);
01684 
01685   if (0 == partPtr) {
01686     throw OpenDDS::DCPS::Invalid_Participant();
01687   }
01688 
01689   if (OpenDDS::DCPS::DCPS_debug_level > 3) {
01690     ACE_DEBUG((LM_INFO, "(%P|%t) updating  publication qos 1\n"));
01691   }
01692 
01693   DCPS_IR_Publication* pub;
01694 
01695   if (partPtr->find_publication_reference(dwId, pub) != 0 || pub == 0) {
01696     OpenDDS::DCPS::RepoIdConverter part_converter(partId);
01697     OpenDDS::DCPS::RepoIdConverter pub_converter(dwId);
01698     ACE_ERROR((LM_ERROR,
01699                ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::update_publication_qos: ")
01700                ACE_TEXT("participant %C could not find publication %C.\n"),
01701                std::string(part_converter).c_str(),
01702                std::string(pub_converter).c_str()));
01703     throw OpenDDS::DCPS::Invalid_Publication();
01704   }
01705 
01706   Update::SpecificQos qosType;
01707 
01708   if (pub->set_qos(qos, publisherQos, qosType) == false)  // failed
01709     return 0;
01710 
01711   if (this->um_ && (partPtr->isBitPublisher() == false)) {
01712     Update::IdPath path(domainId, partId, dwId);
01713 
01714     switch (qosType) {
01715     case Update::DataWriterQos:
01716       this->um_->update(path, qos);
01717       break;
01718 
01719     case Update::PublisherQos:
01720       this->um_->update(path, publisherQos);
01721       break;
01722 
01723     case Update::NoQos:
01724     default:
01725       break;
01726     }
01727 
01728     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
01729       OpenDDS::DCPS::RepoIdConverter converter(dwId);
01730       ACE_DEBUG((LM_DEBUG,
01731                  ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::update_publication_qos: ")
01732                  ACE_TEXT("pushing update of publication %C in domain %d.\n"),
01733                  std::string(converter).c_str(),
01734                  domainId));
01735     }
01736   }
01737 
01738   return 1;
01739 }

Here is the call graph for this function:

Here is the caller graph for this function:

CORBA::Boolean TAO_DDS_DCPSInfo_i::update_subscription_params ( DDS::DomainId_t  domainId,
const OpenDDS::DCPS::RepoId participantId,
const OpenDDS::DCPS::RepoId subscriptionId,
const DDS::StringSeq params 
)

Definition at line 1994 of file DCPSInfo_i.cpp.

References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, domain(), domains_, DCPS_IR_Participant::find_subscription_reference(), DCPS_IR_Participant::isBitPublisher(), LM_ERROR, LM_INFO, lock_, um_, Update::Manager::update(), and DCPS_IR_Subscription::update_expr_params().

Referenced by OpenDDS::Federator::ManagerImpl::processUpdateFilterExpressionParams().

01999 {
02000   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, 0);
02001 
02002   DCPS_IR_Domain_Map::iterator domain = this->domains_.find(domainId);
02003   if (domain == this->domains_.end()) {
02004     throw OpenDDS::DCPS::Invalid_Domain();
02005   }
02006 
02007   DCPS_IR_Participant* partPtr = domain->second->participant(participantId);
02008   if (0 == partPtr) {
02009     throw OpenDDS::DCPS::Invalid_Participant();
02010   }
02011 
02012   if (OpenDDS::DCPS::DCPS_debug_level > 3) {
02013     ACE_DEBUG((LM_INFO, "(%P|%t) updating subscription params\n"));
02014   }
02015 
02016   DCPS_IR_Subscription* sub;
02017   if (partPtr->find_subscription_reference(subscriptionId, sub) != 0) {
02018     OpenDDS::DCPS::RepoIdConverter part_converter(participantId);
02019     OpenDDS::DCPS::RepoIdConverter sub_converter(subscriptionId);
02020     ACE_ERROR((LM_ERROR,
02021                ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::update_subscription_params: ")
02022                ACE_TEXT("participant %C could not find subscription %C.\n"),
02023                std::string(part_converter).c_str(),
02024                std::string(sub_converter).c_str()));
02025     throw OpenDDS::DCPS::Invalid_Subscription();
02026   }
02027 
02028   sub->update_expr_params(params);  // calls writers via DataWriterRemote
02029 
02030   if (this->um_ && !partPtr->isBitPublisher()) {
02031     Update::IdPath path(domainId, participantId, subscriptionId);
02032     this->um_->update(path, params);
02033   }
02034 
02035   return true;
02036 }

Here is the call graph for this function:

Here is the caller graph for this function:

void TAO_DDS_DCPSInfo_i::update_subscription_qos ( DDS::DomainId_t  domainId,
const OpenDDS::DCPS::RepoId partId,
const OpenDDS::DCPS::RepoId drId,
const DDS::SubscriberQos qos 
)

Entry for federation updates of SubscriberQos values.

Definition at line 1950 of file DCPSInfo_i.cpp.

References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, domains_, DCPS_IR_Participant::find_subscription_reference(), LM_ERROR, LM_INFO, lock_, and DCPS_IR_Subscription::set_qos().

01955 {
01956   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
01957 
01958   // Grab the domain.
01959   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
01960 
01961   if (where == this->domains_.end()) {
01962     throw OpenDDS::DCPS::Invalid_Domain();
01963   }
01964 
01965   // Grab the participant.
01966   DCPS_IR_Participant* partPtr
01967   = where->second->participant(partId);
01968 
01969   if (0 == partPtr) {
01970     throw OpenDDS::DCPS::Invalid_Participant();
01971   }
01972 
01973   DCPS_IR_Subscription* sub;
01974 
01975   if (OpenDDS::DCPS::DCPS_debug_level > 3) {
01976     ACE_DEBUG((LM_INFO, "(%P|%t) updating QOS for subscription 3\n"));
01977   }
01978 
01979   if (partPtr->find_subscription_reference(drId, sub) != 0 || sub == 0) {
01980     OpenDDS::DCPS::RepoIdConverter part_converter(partId);
01981     OpenDDS::DCPS::RepoIdConverter sub_converter(drId);
01982     ACE_ERROR((LM_ERROR,
01983                ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::update_subscription_qos: ")
01984                ACE_TEXT("participant %C could not find subscription %C.\n"),
01985                std::string(part_converter).c_str(),
01986                std::string(sub_converter).c_str()));
01987     throw OpenDDS::DCPS::Invalid_Subscription();
01988   }
01989 
01990   sub->set_qos(qos);
01991 }

Here is the call graph for this function:

void TAO_DDS_DCPSInfo_i::update_subscription_qos ( DDS::DomainId_t  domainId,
const OpenDDS::DCPS::RepoId partId,
const OpenDDS::DCPS::RepoId drId,
const DDS::DataReaderQos qos 
)

Entry for federation updates of DataReaderQos values.

Definition at line 1906 of file DCPSInfo_i.cpp.

References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, domains_, DCPS_IR_Participant::find_subscription_reference(), LM_ERROR, LM_INFO, lock_, and DCPS_IR_Subscription::set_qos().

01911 {
01912   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
01913 
01914   // Grab the domain.
01915   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
01916 
01917   if (where == this->domains_.end()) {
01918     throw OpenDDS::DCPS::Invalid_Domain();
01919   }
01920 
01921   // Grab the participant.
01922   DCPS_IR_Participant* partPtr
01923   = where->second->participant(partId);
01924 
01925   if (0 == partPtr) {
01926     throw OpenDDS::DCPS::Invalid_Participant();
01927   }
01928 
01929   DCPS_IR_Subscription* sub;
01930 
01931   if (OpenDDS::DCPS::DCPS_debug_level > 3) {
01932     ACE_DEBUG((LM_INFO, "(%P|%t) updating QOS for subscription 2\n"));
01933   }
01934 
01935   if (partPtr->find_subscription_reference(drId, sub) != 0 || sub == 0) {
01936     OpenDDS::DCPS::RepoIdConverter part_converter(partId);
01937     OpenDDS::DCPS::RepoIdConverter sub_converter(drId);
01938     ACE_ERROR((LM_ERROR,
01939                ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::update_subscription_qos: ")
01940                ACE_TEXT("participant %C could not find subscription %C.\n"),
01941                std::string(part_converter).c_str(),
01942                std::string(sub_converter).c_str()));
01943     throw OpenDDS::DCPS::Invalid_Subscription();
01944   }
01945 
01946   sub->set_qos(qos);
01947 }

Here is the call graph for this function:

CORBA::Boolean TAO_DDS_DCPSInfo_i::update_subscription_qos ( DDS::DomainId_t  domainId,
const OpenDDS::DCPS::RepoId partId,
const OpenDDS::DCPS::RepoId drId,
const DDS::DataReaderQos qos,
const DDS::SubscriberQos subscriberQos 
) [virtual]

Definition at line 1829 of file DCPSInfo_i.cpp.

References ACE_TEXT(), Update::DataReaderQos, OpenDDS::DCPS::DCPS_debug_level, domains_, DCPS_IR_Participant::find_subscription_reference(), DCPS_IR_Participant::isBitPublisher(), LM_DEBUG, LM_ERROR, LM_INFO, lock_, Update::NoQos, DCPS_IR_Subscription::set_qos(), Update::SubscriberQos, um_, and Update::Manager::update().

Referenced by OpenDDS::Federator::ManagerImpl::processUpdateQos1(), and OpenDDS::Federator::ManagerImpl::processUpdateQos2().

01835 {
01836   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, 0);
01837 
01838   // Grab the domain.
01839   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
01840 
01841   if (where == this->domains_.end()) {
01842     throw OpenDDS::DCPS::Invalid_Domain();
01843   }
01844 
01845   // Grab the participant.
01846   DCPS_IR_Participant* partPtr
01847   = where->second->participant(partId);
01848 
01849   if (0 == partPtr) {
01850     throw OpenDDS::DCPS::Invalid_Participant();
01851   }
01852 
01853   DCPS_IR_Subscription* sub;
01854 
01855   if (OpenDDS::DCPS::DCPS_debug_level > 3) {
01856     ACE_DEBUG((LM_INFO, "(%P|%t) updating QOS for subscription 1\n"));
01857   }
01858 
01859   if (partPtr->find_subscription_reference(drId, sub) != 0 || sub == 0) {
01860     OpenDDS::DCPS::RepoIdConverter part_converter(partId);
01861     OpenDDS::DCPS::RepoIdConverter sub_converter(drId);
01862     ACE_ERROR((LM_ERROR,
01863                ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::update_subscription_qos: ")
01864                ACE_TEXT("participant %C could not find subscription %C.\n"),
01865                std::string(part_converter).c_str(),
01866                std::string(sub_converter).c_str()));
01867     throw OpenDDS::DCPS::Invalid_Subscription();
01868   }
01869 
01870   Update::SpecificQos qosType;
01871 
01872   if (sub->set_qos(qos, subscriberQos, qosType) == false) // failed
01873     return 0;
01874 
01875   if (this->um_ && (partPtr->isBitPublisher() == false)) {
01876     Update::IdPath path(domainId, partId, drId);
01877 
01878     switch (qosType) {
01879     case Update::DataReaderQos:
01880       this->um_->update(path, qos);
01881       break;
01882 
01883     case Update::SubscriberQos:
01884       this->um_->update(path, subscriberQos);
01885       break;
01886 
01887     case Update::NoQos:
01888     default:
01889       break;
01890     }
01891 
01892     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
01893       OpenDDS::DCPS::RepoIdConverter converter(drId);
01894       ACE_DEBUG((LM_DEBUG,
01895                  ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::update_subscription_qos: ")
01896                  ACE_TEXT("pushing update of subscription %C in domain %d.\n"),
01897                  std::string(converter).c_str(),
01898                  domainId));
01899     }
01900   }
01901 
01902   return 1;
01903 }

Here is the call graph for this function:

Here is the caller graph for this function:

CORBA::Boolean TAO_DDS_DCPSInfo_i::update_topic_qos ( const OpenDDS::DCPS::RepoId topicId,
DDS::DomainId_t  domainId,
const OpenDDS::DCPS::RepoId participantId,
const DDS::TopicQos qos 
) [virtual]

Definition at line 2038 of file DCPSInfo_i.cpp.

References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, domains_, DCPS_IR_Participant::find_topic_reference(), DCPS_IR_Participant::isBitPublisher(), DCPS_IR_Participant::isOwner(), LM_DEBUG, lock_, DCPS_IR_Topic::set_topic_qos(), um_, and Update::Manager::update().

Referenced by OpenDDS::Federator::ManagerImpl::processUpdateQos1().

02043 {
02044   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, 0);
02045 
02046   // Grab the domain.
02047   DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
02048 
02049   if (where == this->domains_.end()) {
02050     throw OpenDDS::DCPS::Invalid_Domain();
02051   }
02052 
02053   // Grab the participant.
02054   DCPS_IR_Participant* partPtr
02055   = where->second->participant(participantId);
02056 
02057   if (0 == partPtr) {
02058     throw OpenDDS::DCPS::Invalid_Participant();
02059   }
02060 
02061   DCPS_IR_Topic* topic;
02062 
02063   if (partPtr->find_topic_reference(topicId, topic) != 0) {
02064     throw OpenDDS::DCPS::Invalid_Topic();
02065   }
02066 
02067   if (topic->set_topic_qos(qos) == false)
02068     return 0;
02069 
02070   if (this->um_
02071       && (partPtr->isOwner() == true)
02072       && (partPtr->isBitPublisher() == false)) {
02073     Update::IdPath path(domainId, participantId, topicId);
02074     this->um_->update(path, qos);
02075 
02076     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
02077       OpenDDS::DCPS::RepoIdConverter converter(topicId);
02078       ACE_DEBUG((LM_DEBUG,
02079                  ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::update_topic_qos: ")
02080                  ACE_TEXT("pushing update of topic %C in domain %d.\n"),
02081                  std::string(converter).c_str(),
02082                  domainId));
02083     }
02084   }
02085 
02086   return 1;
02087 }

Here is the call graph for this function:

Here is the caller graph for this function:


Member Data Documentation

Definition at line 428 of file DCPSInfo_i.h.

Referenced by finalize(), and init_dispatchChecking().

Definition at line 417 of file DCPSInfo_i.h.

Referenced by domain(), and receive_image().

Definition at line 427 of file DCPSInfo_i.h.

Referenced by finalize(), and init_reassociation().

Definition at line 420 of file DCPSInfo_i.h.

Referenced by domain(), and init_persistence().

Interface to effect shutdown of the process.

Definition at line 423 of file DCPSInfo_i.h.

Referenced by shutdown().


The documentation for this class was generated from the following files:
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1