Implementation of the DCPSInfo. More...
#include <DCPSInfo_i.h>
Classes | |
struct | BIT_Cleanup_Handler |
Public Member Functions | |
TAO_DDS_DCPSInfo_i (CORBA::ORB_ptr orb, bool reincarnate, ShutdownInterface *shutdown, const TAO_DDS_DCPSFederationId &federation) | |
virtual | ~TAO_DDS_DCPSInfo_i () |
virtual int | handle_timeout (const ACE_Time_Value &now, const void *arg) |
virtual CORBA::Boolean | attach_participant (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &participantId) |
virtual OpenDDS::DCPS::TopicStatus | assert_topic (OpenDDS::DCPS::RepoId_out topicId, DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &participantId, const char *topicName, const char *dataTypeName, const DDS::TopicQos &qos, bool hasDcpsKey) |
bool | add_topic (const OpenDDS::DCPS::RepoId &topicId, DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &participantId, const char *topicName, const char *dataTypeName, const DDS::TopicQos &qos) |
Add a previously existing topic to the repository. | |
virtual OpenDDS::DCPS::TopicStatus | find_topic (DDS::DomainId_t domainId, const char *topicName, CORBA::String_out dataTypeName, DDS::TopicQos_out qos, OpenDDS::DCPS::RepoId_out topicId) |
virtual OpenDDS::DCPS::TopicStatus | remove_topic (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &participantId, const OpenDDS::DCPS::RepoId &topicId) |
virtual OpenDDS::DCPS::RepoId | add_publication (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &participantId, const OpenDDS::DCPS::RepoId &topicId, OpenDDS::DCPS::DataWriterRemote_ptr publication, const DDS::DataWriterQos &qos, const OpenDDS::DCPS::TransportLocatorSeq &transInfo, const DDS::PublisherQos &publisherQos) |
bool | add_publication (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &participantId, const OpenDDS::DCPS::RepoId &topicId, const OpenDDS::DCPS::RepoId &pubId, const char *pub_str, const DDS::DataWriterQos &qos, const OpenDDS::DCPS::TransportLocatorSeq &transInfo, const DDS::PublisherQos &publisherQos, bool associate=false) |
Add a previously existing publication to the repository. | |
virtual void | remove_publication (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &participantId, const OpenDDS::DCPS::RepoId &publicationId) |
virtual OpenDDS::DCPS::RepoId | add_subscription (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &participantId, const OpenDDS::DCPS::RepoId &topicId, OpenDDS::DCPS::DataReaderRemote_ptr subscription, const DDS::DataReaderQos &qos, const OpenDDS::DCPS::TransportLocatorSeq &transInfo, const DDS::SubscriberQos &subscriberQos, const char *filterClassName, const char *filterExpression, const DDS::StringSeq &exprParams) |
bool | add_subscription (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &participantId, const OpenDDS::DCPS::RepoId &topicId, const OpenDDS::DCPS::RepoId &subId, const char *sub_str, const DDS::DataReaderQos &qos, const OpenDDS::DCPS::TransportLocatorSeq &transInfo, const DDS::SubscriberQos &subscriberQos, const char *filterClassName, const char *filterExpression, const DDS::StringSeq &exprParams, bool associate=false) |
Add a previously existing subscription to the repository. | |
virtual void | remove_subscription (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &participantId, const OpenDDS::DCPS::RepoId &subscriptionId) |
virtual OpenDDS::DCPS::AddDomainStatus | add_domain_participant (DDS::DomainId_t domain, const DDS::DomainParticipantQos &qos) |
bool | add_domain_participant (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &participantId, const DDS::DomainParticipantQos &qos) |
Add a previously existing participant to the repository. | |
virtual void | remove_domain_participant (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &participantId) |
virtual void | association_complete (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &participantId, const OpenDDS::DCPS::RepoId &localId, const OpenDDS::DCPS::RepoId &remoteId) |
bool | remove_by_owner (DDS::DomainId_t domain, long owner) |
virtual void | disassociate_participant (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &local_id, const OpenDDS::DCPS::RepoId &remote_id) |
virtual void | disassociate_subscription (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &participantId, const OpenDDS::DCPS::RepoId &local_id, const OpenDDS::DCPS::RepoId &remote_id) |
virtual void | disassociate_publication (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &participantId, const OpenDDS::DCPS::RepoId &local_id, const OpenDDS::DCPS::RepoId &remote_id) |
virtual void | ignore_domain_participant (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &myParticipantId, const OpenDDS::DCPS::RepoId &ignoreId) |
virtual void | ignore_topic (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &myParticipantId, const OpenDDS::DCPS::RepoId &ignoreId) |
virtual void | ignore_subscription (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &myParticipantId, const OpenDDS::DCPS::RepoId &ignoreId) |
virtual void | ignore_publication (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &myParticipantId, const OpenDDS::DCPS::RepoId &ignoreId) |
virtual CORBA::Boolean | update_publication_qos (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &partId, const OpenDDS::DCPS::RepoId &dwId, const DDS::DataWriterQos &qos, const DDS::PublisherQos &publisherQos) |
void | update_publication_qos (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &partId, const OpenDDS::DCPS::RepoId &dwId, const DDS::DataWriterQos &qos) |
Entry for federation updates of DataWriterQos values. | |
void | update_publication_qos (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &partId, const OpenDDS::DCPS::RepoId &dwId, const DDS::PublisherQos &qos) |
Entry for federation updates of PublisherQos values. | |
virtual CORBA::Boolean | update_subscription_qos (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &partId, const OpenDDS::DCPS::RepoId &drId, const DDS::DataReaderQos &qos, const DDS::SubscriberQos &subscriberQos) |
void | update_subscription_qos (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &partId, const OpenDDS::DCPS::RepoId &drId, const DDS::DataReaderQos &qos) |
Entry for federation updates of DataReaderQos values. | |
void | update_subscription_qos (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &partId, const OpenDDS::DCPS::RepoId &drId, const DDS::SubscriberQos &qos) |
Entry for federation updates of SubscriberQos values. | |
virtual ::CORBA::Boolean | update_subscription_params (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &participantId, const OpenDDS::DCPS::RepoId &subscriptionId, const DDS::StringSeq ¶ms) |
virtual CORBA::Boolean | update_topic_qos (const OpenDDS::DCPS::RepoId &topicId, DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &participantId, const DDS::TopicQos &qos) |
virtual CORBA::Boolean | update_domain_participant_qos (DDS::DomainId_t domain, const OpenDDS::DCPS::RepoId &participantId, const DDS::DomainParticipantQos &qos) |
virtual void | shutdown () |
Cause the entire repository to exit. | |
virtual char * | dump_to_string () |
Dump the Repos state to string. | |
bool | changeOwnership (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &participantId, long sender, long owner) |
assert new ownership for a participant and its contained entities. | |
int | init_transport (int listen_address_given, const char *listen_str) |
bool | receive_image (const Update::UImage &image) |
void | add (Update::Updater *updater) |
Add an additional Updater interface. | |
DCPS_IR_Domain * | domain (DDS::DomainId_t domain) |
Convert a domain Id into a reference to a DCPS_IR_Domain object. | |
const DCPS_IR_Domain_Map & | domains () const |
Expose a readable reference of the domain map. | |
CORBA::ORB_ptr | orb () |
Expose the ORB. | |
bool | init_persistence () |
bool | init_reassociation (const ACE_Time_Value &delay) |
bool | init_dispatchChecking (const ACE_Time_Value &delay) |
void | finalize () |
Cleanup state for shutdown. | |
Private Attributes | |
DCPS_IR_Domain_Map | domains_ |
CORBA::ORB_var | orb_ |
CORBA::ORB_var | dispatchingOrb_ |
const TAO_DDS_DCPSFederationId & | federation_ |
OpenDDS::DCPS::RepoIdGenerator | participantIdGenerator_ |
Update::Manager * | um_ |
bool | reincarnate_ |
ShutdownInterface * | shutdown_ |
Interface to effect shutdown of the process. | |
ACE_Recursive_Thread_Mutex | lock_ |
long | reassociate_timer_id_ |
long | dispatch_check_timer_id_ |
Implementation of the DCPSInfo.
This is the Information Repository object. Clients of the system will use the CORBA reference of this object.
Definition at line 54 of file DCPSInfo_i.h.
DCPSInfo_i h OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL TAO_DDS_DCPSInfo_i::TAO_DDS_DCPSInfo_i | ( | CORBA::ORB_ptr | orb, | |
bool | reincarnate, | |||
ShutdownInterface * | shutdown, | |||
const TAO_DDS_DCPSFederationId & | federation | |||
) |
Definition at line 38 of file DCPSInfo_i.cpp.
References _duplicate(), dispatchingOrb_, CORBA::ORB_init(), and TheServiceParticipant.
00042 : orb_(CORBA::ORB::_duplicate(orb)) 00043 , federation_(federation) 00044 , participantIdGenerator_(federation.id()) 00045 , um_(0) 00046 , reincarnate_(reincarnate) 00047 , shutdown_(shutdown) 00048 , reassociate_timer_id_(-1) 00049 , dispatch_check_timer_id_(-1) 00050 { 00051 if (!TheServiceParticipant->use_bidir_giop()) { 00052 int argc = 0; 00053 char** no_argv = 0; 00054 dispatchingOrb_ = CORBA::ORB_init(argc, no_argv, "dispatchingOnly"); 00055 } 00056 }
TAO_DDS_DCPSInfo_i::~TAO_DDS_DCPSInfo_i | ( | ) | [virtual] |
Definition at line 58 of file DCPSInfo_i.cpp.
void TAO_DDS_DCPSInfo_i::add | ( | Update::Updater * | updater | ) |
Add an additional Updater interface.
Definition at line 2400 of file DCPSInfo_i.cpp.
References Update::Manager::add(), and um_.
bool TAO_DDS_DCPSInfo_i::add_domain_participant | ( | DDS::DomainId_t | domainId, | |
const OpenDDS::DCPS::RepoId & | participantId, | |||
const DDS::DomainParticipantQos & | qos | |||
) |
Add a previously existing participant to the repository.
domainId | the Domain in which the Participant is contained. | |
participantId | the GUID Id value to use for the Participant. | |
qos | the QoS value of the Participant. |
Adds a Participant to the repository using a specified Participant GUID Id value. If the ParticipantId indicates that this Participant was created by within this repository (the federation Id is the current repositories federation Id), this method will ensure that any subsequent calls to add a Publication and obtain a newly generated Id value will return an Id value greater than the Id value of the current one.
Definition at line 1079 of file DCPSInfo_i.cpp.
References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, domain(), federation_, OpenDDS::DCPS::RepoIdConverter::federationId(), OpenDDS::DCPS::RcHandle< T >::in(), LM_DEBUG, LM_ERROR, LM_WARNING, lock_, DCPS_IR_Domain::participant(), OpenDDS::DCPS::RepoIdConverter::participantId(), DCPS_IR_Domain::participants(), TheServiceParticipant, and um_.
01082 { 01083 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, false); 01084 01085 // Grab the domain. 01086 DCPS_IR_Domain* domainPtr = this->domain(domainId); 01087 01088 if (0 == domainPtr) { 01089 if (OpenDDS::DCPS::DCPS_debug_level > 4) { 01090 ACE_DEBUG((LM_WARNING, 01091 ACE_TEXT("(%P|%t) WARNING: (bool)TAO_DDS_DCPSInfo_i::add_domain_participant: ") 01092 ACE_TEXT("invalid domain Id: %d\n"), 01093 domainId)); 01094 } 01095 01096 return false; 01097 } 01098 01099 // Prepare to manipulate the participant's Id value. 01100 OpenDDS::DCPS::RepoIdConverter converter(participantId); 01101 01102 // Determine if this is the 'special' repository internal participant 01103 // that publishes the built-in topics for a domain. 01104 bool isBitPart = domainPtr->participants().empty() && TheServiceParticipant->get_BIT(); 01105 01106 // Grab the participant. 01107 DCPS_IR_Participant* partPtr = domainPtr->participant(participantId); 01108 01109 if (0 != partPtr) { 01110 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 01111 ACE_ERROR((LM_ERROR, 01112 ACE_TEXT("(%P|%t) (bool)TAO_DDS_DCPSInfo_i::add_domain_participant: ") 01113 ACE_TEXT("participant id %C already exists.\n"), 01114 std::string(converter).c_str())); 01115 } 01116 01117 return false; 01118 } 01119 01120 DCPS_IR_Participant_rch participant = 01121 OpenDDS::DCPS::make_rch<DCPS_IR_Participant>(this->federation_, 01122 participantId, 01123 domainPtr, 01124 qos, um_, isBitPart); 01125 01126 switch (domainPtr->add_participant(participant)) { 01127 case -1: { 01128 ACE_ERROR((LM_ERROR, 01129 ACE_TEXT("(%P|%t) ERROR: (bool)TAO_DDS_DCPSInfo_i::add_domain_participant: ") 01130 ACE_TEXT("failed to load participant %C in domain %d.\n"), 01131 std::string(converter).c_str(), 01132 domainId)); 01133 } 01134 return false; 01135 01136 case 1: 01137 01138 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 01139 ACE_DEBUG((LM_WARNING, 01140 ACE_TEXT("(%P|%t) WARNING: (bool)TAO_DDS_DCPSInfo_i::add_domain_participant: ") 01141 ACE_TEXT("attempt to load duplicate participant %C in domain %d.\n"), 01142 std::string(converter).c_str(), 01143 domainId)); 01144 } 01145 01146 return false; 01147 01148 case 0: 01149 default: 01150 break; 01151 } 01152 01153 // See if we are adding a participant that was created within this 01154 // repository or a different repository. 01155 if (converter.federationId() == this->federation_.id()) { 01156 // Ensure the participant GUID values do not conflict. 01157 domainPtr->last_participant_key(converter.participantId()); 01158 01159 if (OpenDDS::DCPS::DCPS_debug_level > 4) { 01160 ACE_DEBUG((LM_DEBUG, 01161 ACE_TEXT("(%P|%t) (bool)TAO_DDS_DCPSInfo_i::add_domain_participant: ") 01162 ACE_TEXT("Adjusting highest participant Id value to at least %d.\n"), 01163 converter.participantId())); 01164 } 01165 } 01166 01167 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 01168 ACE_DEBUG((LM_DEBUG, 01169 ACE_TEXT("(%P|%t) (bool)TAO_DDS_DCPSInfo_i::add_domain_participant: ") 01170 ACE_TEXT("loaded participant %C at 0x%x in domain %d.\n"), 01171 std::string(converter).c_str(), 01172 participant.in(), 01173 domainId)); 01174 } 01175 01176 return true; 01177 }
OpenDDS::DCPS::AddDomainStatus TAO_DDS_DCPSInfo_i::add_domain_participant | ( | DDS::DomainId_t | domain, | |
const DDS::DomainParticipantQos & | qos | |||
) | [virtual] |
Definition at line 984 of file DCPSInfo_i.cpp.
References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, domain(), OpenDDS::DCPS::AddDomainStatus::federated, federation_, OpenDDS::DCPS::RcHandle< T >::get(), DCPS_IR_Domain::get_next_participant_id(), OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::DCPS::AddDomainStatus::id, LM_DEBUG, lock_, TAO_DDS_DCPSFederationId::overridden(), OpenDDS::DCPS::RepoIdConverter::participantId(), DCPS_IR_Domain::participants(), status, TheServiceParticipant, and um_.
Referenced by Update::Manager::add(), OpenDDS::Federator::ManagerImpl::processCreate(), and receive_image().
00987 { 00988 // A value to return. 00989 OpenDDS::DCPS::AddDomainStatus value; 00990 value.id = OpenDDS::DCPS::GUID_UNKNOWN; 00991 value.federated = this->federation_.overridden(); 00992 00993 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, value); 00994 00995 // Grab the domain. 00996 DCPS_IR_Domain* domainPtr = this->domain(domain); 00997 00998 if (0 == domainPtr) { 00999 throw OpenDDS::DCPS::Invalid_Domain(); 01000 } 01001 01002 // Obtain a shiny new GUID value. 01003 OpenDDS::DCPS::RepoId participantId = domainPtr->get_next_participant_id(); 01004 01005 // Determine if this is the 'special' repository internal participant 01006 // that publishes the built-in topics for a domain. 01007 bool isBitPart = domainPtr->participants().empty() && TheServiceParticipant->get_BIT(); 01008 01009 DCPS_IR_Participant_rch participant = 01010 OpenDDS::DCPS::make_rch<DCPS_IR_Participant>( 01011 this->federation_, 01012 participantId, 01013 domainPtr, 01014 qos, um_, isBitPart); 01015 01016 // We created the participant, now we can return the Id value (eventually). 01017 value.id = participantId; 01018 01019 if (isBitPart) { 01020 participant->isBitPublisher() = true; 01021 01022 if (OpenDDS::DCPS::DCPS_debug_level > 4) { 01023 OpenDDS::DCPS::RepoIdConverter converter(participantId); 01024 ACE_DEBUG((LM_DEBUG, 01025 ACE_TEXT("(%P|%t) (RepoId)TAO_DDS_DCPSInfo_i::add_domain_participant: ") 01026 ACE_TEXT("participant %C in domain %d is BIT publisher for this domain.\n"), 01027 std::string(converter).c_str(), 01028 domain)); 01029 } 01030 } 01031 01032 // Assume responsibility for writing back to the participant. 01033 participant->takeOwnership(); 01034 01035 int status = domainPtr->add_participant(participant); 01036 01037 if (0 != status) { 01038 // Adding the participant failed return the invalid 01039 // participant Id number. 01040 participantId = OpenDDS::DCPS::GUID_UNKNOWN; 01041 01042 } else if (this->um_) { 01043 OpenDDS::DCPS::RepoIdConverter converter(participantId); 01044 if (participant->isBitPublisher() == false) { 01045 // Push this participant to interested observers. 01046 Update::UParticipant updateParticipant( 01047 domain, 01048 participant->owner(), 01049 participantId, 01050 const_cast<DDS::DomainParticipantQos &>(qos)); 01051 this->um_->create(updateParticipant); 01052 01053 if (OpenDDS::DCPS::DCPS_debug_level > 4) { 01054 ACE_DEBUG((LM_DEBUG, 01055 ACE_TEXT("(%P|%t) (RepoId)TAO_DDS_DCPSInfo_i::add_domain_participant: ") 01056 ACE_TEXT("pushing creation of participant %C in domain %d.\n"), 01057 std::string(converter).c_str(), 01058 domain)); 01059 } 01060 } 01061 01062 // Update what the last participant id was 01063 um_->updateLastPartId(converter.participantId()); 01064 } 01065 01066 if (OpenDDS::DCPS::DCPS_debug_level > 4) { 01067 OpenDDS::DCPS::RepoIdConverter converter(participantId); 01068 ACE_DEBUG((LM_DEBUG, 01069 ACE_TEXT("(%P|%t) (RepoId)TAO_DDS_DCPSInfo_i::add_domain_participant: ") 01070 ACE_TEXT("domain %d loaded participant %C at 0x%x.\n"), 01071 domain, 01072 std::string(converter).c_str(), 01073 participant.get())); 01074 } 01075 return value; 01076 }
bool TAO_DDS_DCPSInfo_i::add_publication | ( | DDS::DomainId_t | domainId, | |
const OpenDDS::DCPS::RepoId & | participantId, | |||
const OpenDDS::DCPS::RepoId & | topicId, | |||
const OpenDDS::DCPS::RepoId & | pubId, | |||
const char * | pub_str, | |||
const DDS::DataWriterQos & | qos, | |||
const OpenDDS::DCPS::TransportLocatorSeq & | transInfo, | |||
const DDS::PublisherQos & | publisherQos, | |||
bool | associate = false | |||
) |
Add a previously existing publication to the repository.
domainId | the Domain in which the Publication is contained. | |
participantId | the Participant in which the Publication is contained. | |
topicId | the Topic of the Publication. | |
pubId | the GUID Id value to use for the Publication. | |
pub_str | stringified publication callback to DataWriter. | |
qos | the QoS value of the DataWriter. | |
transInfo | the transport information for the Publication. | |
publisherQos | the QoS value of the Publisher. | |
associate | indicate whether to create new associations. |
Adds a Publication to the repository using a specified Publication GUID Id value. If the PublicationId indicates that this Publication was created by within this repository (the federation Id is the current repositories federation Id), this method will ensure that any subsequent calls to add a Publication and obtain a newly generated Id value will return an Id value greater than the Id value of the current one.
: Check if this is already stored. If so, just clear the callback IOR.
Definition at line 481 of file DCPSInfo_i.cpp.
References ACE_TEXT(), DCPS_IR_Participant::add_publication(), DCPS_IR_Topic::add_publication_reference(), OpenDDS::DCPS::DCPS_debug_level, dispatchingOrb_, domains_, OpenDDS::DCPS::GuidConverter::entityKey(), federation_, OpenDDS::DCPS::RepoIdConverter::federationId(), TAO_DDS_DCPSFederationId::id(), TAO_Pseudo_Var_T< T >::in(), CORBA::is_nil(), DCPS_IR_Participant::last_publication_key(), LM_ERROR, LM_WARNING, lock_, OpenDDS::DCPS::move(), orb_, and DCPS_IR_Participant::remove_publication().
00490 { 00491 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, false); 00492 00493 // Grab the domain. 00494 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId); 00495 00496 if (where == this->domains_.end()) { 00497 if (OpenDDS::DCPS::DCPS_debug_level > 4) { 00498 ACE_DEBUG((LM_WARNING, 00499 ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_publication: ") 00500 ACE_TEXT("invalid domain %d.\n"), 00501 domainId)); 00502 } 00503 00504 return false; 00505 } 00506 00507 // Grab the participant. 00508 DCPS_IR_Participant* partPtr 00509 = where->second->participant(participantId); 00510 00511 if (0 == partPtr) { 00512 if (OpenDDS::DCPS::DCPS_debug_level > 4) { 00513 OpenDDS::DCPS::RepoIdConverter converter(pubId); 00514 ACE_DEBUG((LM_WARNING, 00515 ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_publication: ") 00516 ACE_TEXT("invalid participant %C in domain %d.\n"), 00517 std::string(converter).c_str(), 00518 domainId)); 00519 } 00520 00521 return false; 00522 } 00523 00524 DCPS_IR_Topic* topic = where->second->find_topic(topicId); 00525 00526 if (topic == 0) { 00527 OpenDDS::DCPS::RepoIdConverter converter(topicId); 00528 ACE_DEBUG((LM_WARNING, 00529 ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_publication: ") 00530 ACE_TEXT("invalid topic %C in domain %d.\n"), 00531 std::string(converter).c_str(), 00532 domainId)); 00533 return false; 00534 } 00535 00536 /// @TODO: Check if this is already stored. If so, just clear the callback IOR. 00537 00538 CORBA::Object_var obj = (dispatchingOrb_ ? dispatchingOrb_ : orb_)->string_to_object(pub_str); 00539 if (CORBA::is_nil(obj.in())) { 00540 if (OpenDDS::DCPS::DCPS_debug_level > 4) { 00541 ACE_DEBUG((LM_WARNING, 00542 ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_publication: ") 00543 ACE_TEXT("failure converting string %C to objref\n"), 00544 pub_str)); 00545 } 00546 return false; 00547 } 00548 00549 OpenDDS::DCPS::DataWriterRemote_var publication = OpenDDS::DCPS::DataWriterRemote::_unchecked_narrow(obj.in()); 00550 00551 OpenDDS::DCPS::unique_ptr<DCPS_IR_Publication> pubPtr( 00552 new DCPS_IR_Publication( 00553 pubId, 00554 partPtr, 00555 topic, 00556 publication.in(), 00557 qos, 00558 transInfo, 00559 publisherQos)); 00560 00561 DCPS_IR_Publication* pub = pubPtr.get(); 00562 switch (partPtr->add_publication(move(pubPtr))) { 00563 case -1: { 00564 OpenDDS::DCPS::RepoIdConverter converter(pubId); 00565 ACE_ERROR((LM_ERROR, 00566 ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::add_publication: ") 00567 ACE_TEXT("failed to add publication to participant %C.\n"), 00568 std::string(converter).c_str())); 00569 return false; 00570 } 00571 00572 case 1: 00573 return false; 00574 case 0: 00575 default: 00576 break; 00577 } 00578 00579 switch (topic->add_publication_reference(pub, associate)) { 00580 case -1: { 00581 OpenDDS::DCPS::RepoIdConverter converter(pubId); 00582 ACE_ERROR((LM_ERROR, 00583 ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::add_publication: ") 00584 ACE_TEXT("failed to add publication to participant %C topic list.\n"), 00585 std::string(converter).c_str())); 00586 00587 // Remove the publication. 00588 partPtr->remove_publication(pubId); 00589 00590 } 00591 return false; 00592 00593 case 1: // This is actually a really really bad place to jump to. 00594 // This means that we successfully added the new publication 00595 // to the participant (it had not been inserted before) but 00596 // that we are adding a duplicate publication to the topic 00597 // list - which should never ever be able to happen. 00598 return false; 00599 00600 case 0: 00601 default: 00602 break; 00603 } 00604 00605 OpenDDS::DCPS::RepoIdConverter converter(pubId); 00606 00607 // See if we are adding a publication that was created within this 00608 // repository or a different repository. 00609 if (converter.federationId() == federation_.id()) { 00610 // Ensure the publication RepoId values do not conflict. 00611 partPtr->last_publication_key(converter.entityKey()); 00612 } 00613 00614 return true; 00615 }
OpenDDS::DCPS::RepoId TAO_DDS_DCPSInfo_i::add_publication | ( | DDS::DomainId_t | domainId, | |
const OpenDDS::DCPS::RepoId & | participantId, | |||
const OpenDDS::DCPS::RepoId & | topicId, | |||
OpenDDS::DCPS::DataWriterRemote_ptr | publication, | |||
const DDS::DataWriterQos & | qos, | |||
const OpenDDS::DCPS::TransportLocatorSeq & | transInfo, | |||
const DDS::PublisherQos & | publisherQos | |||
) | [virtual] |
Definition at line 369 of file DCPSInfo_i.cpp.
References ACE_TEXT(), DCPS_IR_Participant::add_publication(), DCPS_IR_Topic::add_publication_reference(), Update::Manager::create(), Update::DataWriter, OpenDDS::DCPS::DCPS_debug_level, dispatchingOrb_, domains_, OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), DCPS_IR_Participant::get_next_publication_id(), OpenDDS::DCPS::GUID_UNKNOWN, CORBA::is_nil(), DCPS_IR_Participant::isBitPublisher(), LM_DEBUG, LM_WARNING, lock_, OpenDDS::DCPS::move(), orb_, DCPS_IR_Participant::remove_publication(), and um_.
Referenced by Update::Manager::add(), OpenDDS::Federator::ManagerImpl::processCreate(), OpenDDS::Federator::ManagerImpl::processDeferred(), and receive_image().
00377 { 00378 if (CORBA::is_nil(publication)) { 00379 if (OpenDDS::DCPS::DCPS_debug_level > 4) { 00380 ACE_DEBUG((LM_WARNING, 00381 ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_publication: ") 00382 ACE_TEXT("invalid publication reference.\n"))); 00383 } 00384 return OpenDDS::DCPS::GUID_UNKNOWN; 00385 } 00386 00387 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, OpenDDS::DCPS::GUID_UNKNOWN); 00388 00389 // Grab the domain. 00390 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId); 00391 00392 if (where == this->domains_.end()) { 00393 throw OpenDDS::DCPS::Invalid_Domain(); 00394 } 00395 00396 // Grab the participant. 00397 DCPS_IR_Participant* partPtr 00398 = where->second->participant(participantId); 00399 00400 if (0 == partPtr) { 00401 throw OpenDDS::DCPS::Invalid_Participant(); 00402 } 00403 00404 DCPS_IR_Topic* topic = where->second->find_topic(topicId); 00405 00406 if (topic == 0) { 00407 throw OpenDDS::DCPS::Invalid_Topic(); 00408 } 00409 00410 // Get a Id for the Writer, make it a builtin kind if this is for a BIT 00411 OpenDDS::DCPS::RepoId pubId = partPtr->get_next_publication_id( 00412 OpenDDS::DCPS::RepoIdConverter(topicId).isBuiltinDomainEntity()); 00413 00414 OpenDDS::DCPS::DataWriterRemote_var dispatchingPublication = 00415 OpenDDS::DCPS::DataWriterRemote::_duplicate(publication); 00416 00417 if (dispatchingOrb_) { 00418 // Remarshall the remote reference onto the dispatching orb. 00419 CORBA::String_var pubStr = orb_->object_to_string(dispatchingPublication); 00420 CORBA::Object_var pubObj = dispatchingOrb_->string_to_object(pubStr); 00421 if (CORBA::is_nil(pubObj)) { 00422 if (OpenDDS::DCPS::DCPS_debug_level > 4) { 00423 ACE_DEBUG((LM_WARNING, 00424 ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_publication: ") 00425 ACE_TEXT("failure marshalling publication on dispatching orb.\n"))); 00426 } 00427 return OpenDDS::DCPS::GUID_UNKNOWN; 00428 } 00429 00430 dispatchingPublication = OpenDDS::DCPS::DataWriterRemote::_unchecked_narrow(pubObj); 00431 } 00432 00433 OpenDDS::DCPS::unique_ptr<DCPS_IR_Publication> pubPtr( 00434 new DCPS_IR_Publication( 00435 pubId, 00436 partPtr, 00437 topic, 00438 dispatchingPublication.in(), 00439 qos, 00440 transInfo, 00441 publisherQos)); 00442 00443 DCPS_IR_Publication* pub = pubPtr.get(); 00444 if (partPtr->add_publication(OpenDDS::DCPS::move(pubPtr)) != 0) { 00445 // failed to add. we are responsible for the memory. 00446 pubId = OpenDDS::DCPS::GUID_UNKNOWN; 00447 } else if (topic->add_publication_reference(pub) != 0) { 00448 // Failed to add to the topic 00449 // so remove from participant and fail. 00450 partPtr->remove_publication(pubId); 00451 pubId = OpenDDS::DCPS::GUID_UNKNOWN; 00452 } 00453 00454 if (this->um_ && (partPtr->isBitPublisher() == false)) { 00455 CORBA::String_var callback = orb_->object_to_string(publication); 00456 Update::ContentSubscriptionInfo csi; 00457 00458 Update::UWActor actor(domainId, pubId, topicId, participantId, Update::DataWriter 00459 , callback.in() 00460 , const_cast<DDS::PublisherQos &>(publisherQos) 00461 , const_cast<DDS::DataWriterQos &>(qos) 00462 , const_cast<OpenDDS::DCPS::TransportLocatorSeq &> 00463 (transInfo), csi); 00464 this->um_->create(actor); 00465 00466 if (OpenDDS::DCPS::DCPS_debug_level > 4) { 00467 OpenDDS::DCPS::RepoIdConverter converter(pubId); 00468 ACE_DEBUG((LM_DEBUG, 00469 ACE_TEXT("(%P|%t) (RepoId)TAO_DDS_DCPSInfo_i::add_publication: ") 00470 ACE_TEXT("pushing creation of publication %C in domain %d.\n"), 00471 std::string(converter).c_str(), 00472 domainId)); 00473 } 00474 } 00475 00476 where->second->remove_dead_participants(); 00477 return pubId; 00478 }
bool TAO_DDS_DCPSInfo_i::add_subscription | ( | DDS::DomainId_t | domainId, | |
const OpenDDS::DCPS::RepoId & | participantId, | |||
const OpenDDS::DCPS::RepoId & | topicId, | |||
const OpenDDS::DCPS::RepoId & | subId, | |||
const char * | sub_str, | |||
const DDS::DataReaderQos & | qos, | |||
const OpenDDS::DCPS::TransportLocatorSeq & | transInfo, | |||
const DDS::SubscriberQos & | subscriberQos, | |||
const char * | filterClassName, | |||
const char * | filterExpression, | |||
const DDS::StringSeq & | exprParams, | |||
bool | associate = false | |||
) |
Add a previously existing subscription to the repository.
domainId | the Domain in which the Subscription is contained. | |
participantId | the Participant in which the Subscription is contained. | |
topicId | the Topic of the Subscription. | |
subId | the GUID Id value to use for the Subscription. | |
sub_str | stringified publication callback to DataReader. | |
qos | the QoS value of the DataReader. | |
transInfo | the transport information for the Subscription. | |
subscriberQos | the QoS value of the Subscriber. | |
associate | indicate whether to create new associations. |
Adds a Subscription to the repository using a specified Subscription GUID Id value. If the SubscriptionId indicates that this Subscription was created by within this repository (the federation Id is the current repositories federation Id), this method will ensure that any subsequent calls to add a Publication and obtain a newly generated Id value will return an Id value greater than the Id value of the current one.
Definition at line 793 of file DCPSInfo_i.cpp.
References ACE_TEXT(), DCPS_IR_Participant::add_subscription(), DCPS_IR_Topic::add_subscription_reference(), OpenDDS::DCPS::DCPS_debug_level, dispatchingOrb_, domains_, OpenDDS::DCPS::GuidConverter::entityKey(), federation_, OpenDDS::DCPS::RepoIdConverter::federationId(), TAO_DDS_DCPSFederationId::id(), TAO_Pseudo_Var_T< T >::in(), CORBA::is_nil(), DCPS_IR_Participant::last_subscription_key(), LM_ERROR, LM_WARNING, lock_, OpenDDS::DCPS::move(), orb_, and DCPS_IR_Participant::remove_subscription().
00806 { 00807 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, false); 00808 00809 // Grab the domain. 00810 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId); 00811 00812 if (where == this->domains_.end()) { 00813 if (OpenDDS::DCPS::DCPS_debug_level > 4) { 00814 ACE_DEBUG((LM_WARNING, 00815 ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_subscription: ") 00816 ACE_TEXT("invalid domain %d.\n"), 00817 domainId)); 00818 } 00819 00820 return false; 00821 } 00822 00823 // Grab the participant. 00824 DCPS_IR_Participant* partPtr 00825 = where->second->participant(participantId); 00826 00827 if (0 == partPtr) { 00828 if (OpenDDS::DCPS::DCPS_debug_level > 4) { 00829 OpenDDS::DCPS::RepoIdConverter converter(participantId); 00830 ACE_DEBUG((LM_WARNING, 00831 ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_subscription: ") 00832 ACE_TEXT("invalid participant %C in domain %d.\n"), 00833 std::string(converter).c_str(), 00834 domainId)); 00835 } 00836 00837 return false; 00838 } 00839 00840 DCPS_IR_Topic* topic = where->second->find_topic(topicId); 00841 00842 if (topic == 0) { 00843 if (OpenDDS::DCPS::DCPS_debug_level > 4) { 00844 OpenDDS::DCPS::RepoIdConverter converter(topicId); 00845 ACE_DEBUG((LM_WARNING, 00846 ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_subscription: ") 00847 ACE_TEXT("invalid topic %C in domain %d.\n"), 00848 std::string(converter).c_str(), 00849 domainId)); 00850 } 00851 00852 return false; 00853 } 00854 00855 CORBA::Object_var obj = (dispatchingOrb_ ? dispatchingOrb_ : orb_) ->string_to_object(sub_str); 00856 if (CORBA::is_nil(obj.in())) { 00857 if (OpenDDS::DCPS::DCPS_debug_level > 4) { 00858 ACE_DEBUG((LM_WARNING, 00859 ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_subscription: ") 00860 ACE_TEXT("failure converting string %C to objref\n"), 00861 sub_str)); 00862 } 00863 return false; 00864 } 00865 00866 OpenDDS::DCPS::DataReaderRemote_var subscription = OpenDDS::DCPS::DataReaderRemote::_unchecked_narrow(obj.in()); 00867 00868 OpenDDS::DCPS::unique_ptr<DCPS_IR_Subscription> subPtr( 00869 new DCPS_IR_Subscription( 00870 subId, 00871 partPtr, 00872 topic, 00873 subscription.in(), 00874 qos, 00875 transInfo, 00876 subscriberQos, 00877 filterClassName, 00878 filterExpression, 00879 exprParams)); 00880 00881 DCPS_IR_Subscription* sub = subPtr.get(); 00882 switch (partPtr->add_subscription(OpenDDS::DCPS::move(subPtr))) { 00883 case -1: { 00884 OpenDDS::DCPS::RepoIdConverter converter(subId); 00885 ACE_ERROR((LM_ERROR, 00886 ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::add_subscription: ") 00887 ACE_TEXT("failed to add subscription to participant %C.\n"), 00888 std::string(converter).c_str())); 00889 return false; 00890 } 00891 00892 case 1: 00893 return false; 00894 00895 case 0: 00896 default: 00897 break; 00898 } 00899 00900 switch (topic->add_subscription_reference(sub, associate)) { 00901 case -1: { 00902 OpenDDS::DCPS::RepoIdConverter converter(subId); 00903 ACE_ERROR((LM_ERROR, 00904 ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::add_subscription: ") 00905 ACE_TEXT("failed to add subscription to participant %C topic list.\n"), 00906 std::string(converter).c_str())); 00907 00908 // Remove the subscription. 00909 partPtr->remove_subscription(subId); 00910 00911 } 00912 return false; 00913 00914 case 1: // This is actually a really really bad place to jump to. 00915 // This means that we successfully added the new subscription 00916 // to the participant (it had not been inserted before) but 00917 // that we are adding a duplicate subscription to the topic 00918 // list - which should never ever be able to happen. 00919 return false; 00920 00921 case 0: 00922 default: 00923 break; 00924 } 00925 00926 OpenDDS::DCPS::RepoIdConverter converter(subId); 00927 00928 // See if we are adding a subscription that was created within this 00929 // repository or a different repository. 00930 if (converter.federationId() == federation_.id()) { 00931 // Ensure the subscription RepoId values do not conflict. 00932 partPtr->last_subscription_key(converter.entityKey()); 00933 } 00934 00935 return true; 00936 }
OpenDDS::DCPS::RepoId TAO_DDS_DCPSInfo_i::add_subscription | ( | DDS::DomainId_t | domainId, | |
const OpenDDS::DCPS::RepoId & | participantId, | |||
const OpenDDS::DCPS::RepoId & | topicId, | |||
OpenDDS::DCPS::DataReaderRemote_ptr | subscription, | |||
const DDS::DataReaderQos & | qos, | |||
const OpenDDS::DCPS::TransportLocatorSeq & | transInfo, | |||
const DDS::SubscriberQos & | subscriberQos, | |||
const char * | filterClassName, | |||
const char * | filterExpression, | |||
const DDS::StringSeq & | exprParams | |||
) | [virtual] |
Definition at line 665 of file DCPSInfo_i.cpp.
References ACE_TEXT(), DCPS_IR_Participant::add_subscription(), DCPS_IR_Topic::add_subscription_reference(), Update::Manager::create(), Update::DataReader, OpenDDS::DCPS::DCPS_debug_level, dispatchingOrb_, domains_, OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), DCPS_IR_Participant::get_next_subscription_id(), OpenDDS::DCPS::GUID_UNKNOWN, TAO_Pseudo_Var_T< T >::in(), CORBA::is_nil(), DCPS_IR_Participant::isBitPublisher(), LM_DEBUG, LM_ERROR, LM_WARNING, lock_, OpenDDS::DCPS::move(), orb_, DCPS_IR_Domain::participant(), DCPS_IR_Domain::remove_dead_participants(), DCPS_IR_Participant::remove_subscription(), OpenDDS::DCPS::unique_ptr< T, Deleter >::reset(), and um_.
Referenced by Update::Manager::add(), OpenDDS::Federator::ManagerImpl::processCreate(), OpenDDS::Federator::ManagerImpl::processDeferred(), and receive_image().
00676 { 00677 if (CORBA::is_nil(subscription)) { 00678 if (OpenDDS::DCPS::DCPS_debug_level > 4) { 00679 ACE_DEBUG((LM_WARNING, 00680 ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_subscription: ") 00681 ACE_TEXT("invalid subscription reference.\n"))); 00682 } 00683 return OpenDDS::DCPS::GUID_UNKNOWN; 00684 } 00685 00686 DCPS_IR_Domain* domainPtr; 00687 DCPS_IR_Participant* partPtr; 00688 DCPS_IR_Topic* topic; 00689 OpenDDS::DCPS::RepoId subId; 00690 OpenDDS::DCPS::unique_ptr<DCPS_IR_Subscription> subPtr; 00691 { 00692 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, OpenDDS::DCPS::GUID_UNKNOWN); 00693 00694 // Grab the domain. 00695 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId); 00696 00697 if (where == this->domains_.end()) { 00698 throw OpenDDS::DCPS::Invalid_Domain(); 00699 } 00700 00701 // Grab the domain and participant. 00702 domainPtr = where->second.get(); 00703 partPtr = domainPtr->participant(participantId); 00704 00705 if (0 == partPtr) { 00706 throw OpenDDS::DCPS::Invalid_Participant(); 00707 } 00708 00709 topic = where->second->find_topic(topicId); 00710 00711 if (topic == 0) { 00712 throw OpenDDS::DCPS::Invalid_Topic(); 00713 } 00714 00715 // Get a Id for the Reader, make it a builtin kind if this is for a BIT 00716 subId = partPtr->get_next_subscription_id( 00717 OpenDDS::DCPS::RepoIdConverter(topicId).isBuiltinDomainEntity()); 00718 00719 OpenDDS::DCPS::DataReaderRemote_var dispatchingSubscription ( 00720 OpenDDS::DCPS::DataReaderRemote::_duplicate(subscription)); 00721 00722 if (dispatchingOrb_) { 00723 // Remarshall the remote reference onto the dispatching orb. 00724 CORBA::String_var subStr = orb_->object_to_string(dispatchingSubscription); 00725 CORBA::Object_var subObj = dispatchingOrb_->string_to_object(subStr); 00726 if (CORBA::is_nil(subObj.in())) { 00727 if (OpenDDS::DCPS::DCPS_debug_level > 4) { 00728 ACE_DEBUG((LM_WARNING, 00729 ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_subscription: ") 00730 ACE_TEXT("failure marshalling subscription on dispatching orb.\n"))); 00731 } 00732 return OpenDDS::DCPS::GUID_UNKNOWN; 00733 } 00734 dispatchingSubscription = OpenDDS::DCPS::DataReaderRemote::_unchecked_narrow(subObj); 00735 } 00736 00737 subPtr.reset( 00738 new DCPS_IR_Subscription( 00739 subId, 00740 partPtr, 00741 topic, 00742 dispatchingSubscription.in(), 00743 qos, 00744 transInfo, 00745 subscriberQos, 00746 filterClassName, 00747 filterExpression, 00748 exprParams)); 00749 00750 // Release lock 00751 } 00752 00753 DCPS_IR_Subscription* sub = subPtr.get(); 00754 if (partPtr->add_subscription(move(subPtr)) != 0) { 00755 // failed to add. we are responsible for the memory. 00756 subId = OpenDDS::DCPS::GUID_UNKNOWN; 00757 } else if (topic->add_subscription_reference(sub) != 0) { 00758 ACE_ERROR((LM_ERROR, ACE_TEXT("Failed to add subscription to topic list.\n"))); 00759 // No associations were made so remove and fail. 00760 partPtr->remove_subscription(subId); 00761 subId = OpenDDS::DCPS::GUID_UNKNOWN; 00762 } 00763 00764 if (this->um_ && (partPtr->isBitPublisher() == false)) { 00765 CORBA::String_var callback = orb_->object_to_string(subscription); 00766 Update::ContentSubscriptionInfo csi(filterClassName, filterExpression, exprParams); 00767 00768 Update::URActor actor(domainId, subId, topicId, participantId, Update::DataReader 00769 , callback.in() 00770 , const_cast<DDS::SubscriberQos &>(subscriberQos) 00771 , const_cast<DDS::DataReaderQos &>(qos) 00772 , const_cast<OpenDDS::DCPS::TransportLocatorSeq &> 00773 (transInfo), csi); 00774 00775 this->um_->create(actor); 00776 00777 if (OpenDDS::DCPS::DCPS_debug_level > 4) { 00778 OpenDDS::DCPS::RepoIdConverter converter(subId); 00779 ACE_DEBUG((LM_DEBUG, 00780 ACE_TEXT("(%P|%t) (RepoId)TAO_DDS_DCPSInfo_i::add_subscription: ") 00781 ACE_TEXT("pushing creation of subscription %C in domain %d.\n"), 00782 std::string(converter).c_str(), 00783 domainId)); 00784 } 00785 } 00786 00787 domainPtr->remove_dead_participants(); 00788 00789 return subId; 00790 }
bool TAO_DDS_DCPSInfo_i::add_topic | ( | const OpenDDS::DCPS::RepoId & | topicId, | |
DDS::DomainId_t | domainId, | |||
const OpenDDS::DCPS::RepoId & | participantId, | |||
const char * | topicName, | |||
const char * | dataTypeName, | |||
const DDS::TopicQos & | qos | |||
) |
Add a previously existing topic to the repository.
topicId | the Topic Entity GUID Id to use. | |
domainId | the Domain in which the Topic is contained. | |
participantId | the Participant in which the Topic is contained. | |
topicName | the name of the Topic. | |
dataTypeName | the name of the data type. | |
qos | the QoS value to use for the Topic. |
Adds a Topic Entity to the repository using a specified TopicId value. If the TopicId indicates that this Topic was created by within this repository (the federation Id is the current repositories federation Id), this method will ensure that any subsequent calls to add a Topic and obtain a newly generated Id value will return an Id value greater than the Id value of the current one.
Definition at line 227 of file DCPSInfo_i.cpp.
References ACE_TEXT(), OpenDDS::DCPS::CREATED, OpenDDS::DCPS::DCPS_debug_level, domains_, OpenDDS::DCPS::GuidConverter::entityKey(), federation_, OpenDDS::DCPS::RepoIdConverter::federationId(), TAO_DDS_DCPSFederationId::id(), DCPS_IR_Participant::last_topic_key(), LM_WARNING, and lock_.
Referenced by Update::Manager::add(), OpenDDS::Federator::ManagerImpl::processCreate(), OpenDDS::Federator::ManagerImpl::processDeferred(), and receive_image().
00233 { 00234 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, false); 00235 00236 // Grab the domain. 00237 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId); 00238 00239 if (where == this->domains_.end()) { 00240 if (OpenDDS::DCPS::DCPS_debug_level > 4) { 00241 ACE_DEBUG((LM_WARNING, 00242 ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_topic: ") 00243 ACE_TEXT("invalid domain %d.\n"), 00244 domainId)); 00245 } 00246 00247 return false; 00248 } 00249 00250 // Grab the participant. 00251 DCPS_IR_Participant* participantPtr 00252 = where->second->participant(participantId); 00253 00254 if (0 == participantPtr) { 00255 if (OpenDDS::DCPS::DCPS_debug_level > 4) { 00256 OpenDDS::DCPS::RepoIdConverter converter(participantId); 00257 ACE_DEBUG((LM_WARNING, 00258 ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_topic: ") 00259 ACE_TEXT("invalid participant %C.\n"), 00260 std::string(converter).c_str())); 00261 } 00262 00263 return false; 00264 } 00265 00266 OpenDDS::DCPS::TopicStatus topicStatus 00267 = where->second->force_add_topic(topicId, topicName, dataTypeName, 00268 qos, participantPtr); 00269 00270 if (topicStatus != OpenDDS::DCPS::CREATED) { 00271 return false; 00272 } 00273 00274 OpenDDS::DCPS::RepoIdConverter converter(topicId); 00275 00276 // See if we are adding a topic that was created within this 00277 // repository or a different repository. 00278 if (converter.federationId() == federation_.id()) { 00279 // Ensure the topic RepoId values do not conflict. 00280 participantPtr->last_topic_key(converter.entityKey()); 00281 } 00282 00283 return true; 00284 }
OpenDDS::DCPS::TopicStatus TAO_DDS_DCPSInfo_i::assert_topic | ( | OpenDDS::DCPS::RepoId_out | topicId, | |
DDS::DomainId_t | domainId, | |||
const OpenDDS::DCPS::RepoId & | participantId, | |||
const char * | topicName, | |||
const char * | dataTypeName, | |||
const DDS::TopicQos & | qos, | |||
bool | hasDcpsKey | |||
) | [virtual] |
Definition at line 175 of file DCPSInfo_i.cpp.
References ACE_TEXT(), Update::Manager::create(), OpenDDS::DCPS::DCPS_debug_level, domains_, OpenDDS::DCPS::INTERNAL_ERROR, DCPS_IR_Participant::isBitPublisher(), LM_DEBUG, lock_, and um_.
00183 { 00184 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, OpenDDS::DCPS::INTERNAL_ERROR); 00185 // Grab the domain. 00186 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId); 00187 00188 if (where == this->domains_.end()) { 00189 throw OpenDDS::DCPS::Invalid_Domain(); 00190 } 00191 00192 // Grab the participant. 00193 DCPS_IR_Participant* participantPtr 00194 = where->second->participant(participantId); 00195 00196 if (0 == participantPtr) { 00197 throw OpenDDS::DCPS::Invalid_Participant(); 00198 } 00199 00200 OpenDDS::DCPS::TopicStatus topicStatus 00201 = where->second->add_topic( 00202 topicId, 00203 topicName, 00204 dataTypeName, 00205 qos, 00206 participantPtr); 00207 00208 if (this->um_ && (participantPtr->isBitPublisher() == false)) { 00209 Update::UTopic topic(domainId, topicId, participantId 00210 , topicName, dataTypeName 00211 , const_cast<DDS::TopicQos &>(qos)); 00212 this->um_->create(topic); 00213 00214 if (OpenDDS::DCPS::DCPS_debug_level > 4) { 00215 OpenDDS::DCPS::RepoIdConverter converter(topicId); 00216 ACE_DEBUG((LM_DEBUG, 00217 ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::assert_topic: ") 00218 ACE_TEXT("pushing creation of topic %C in domain %d.\n"), 00219 std::string(converter).c_str(), 00220 domainId)); 00221 } 00222 } 00223 return topicStatus; 00224 }
void TAO_DDS_DCPSInfo_i::association_complete | ( | DDS::DomainId_t | domainId, | |
const OpenDDS::DCPS::RepoId & | participantId, | |||
const OpenDDS::DCPS::RepoId & | localId, | |||
const OpenDDS::DCPS::RepoId & | remoteId | |||
) | [virtual] |
Definition at line 1513 of file DCPSInfo_i.cpp.
References ACE_TEXT(), DCPS_IR_Publication::association_complete(), DCPS_IR_Subscription::association_complete(), OpenDDS::DCPS::DCPS_debug_level, domains_, DCPS_IR_Participant::find_publication_reference(), DCPS_IR_Participant::find_subscription_reference(), LM_INFO, LM_WARNING, and lock_.
01517 { 01518 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_); 01519 01520 DCPS_IR_Domain_Map::iterator dom_iter = this->domains_.find(domainId); 01521 if (dom_iter == this->domains_.end()) { 01522 return; 01523 } 01524 01525 DCPS_IR_Participant* partPtr = dom_iter->second->participant(participantId); 01526 if (0 == partPtr) { 01527 return; 01528 } 01529 01530 // localId could be pub or sub (initial implementation will only use sub 01531 // since the DataReader is the passive peer) 01532 DCPS_IR_Subscription* sub = 0; 01533 DCPS_IR_Publication* pub = 0; 01534 if (OpenDDS::DCPS::DCPS_debug_level > 3) { 01535 ACE_DEBUG((LM_INFO, "(%P|%t) completing association\n")); 01536 } 01537 if (0 == partPtr->find_subscription_reference(localId, sub)) { 01538 sub->association_complete(remoteId); 01539 } else if (0 == partPtr->find_publication_reference(localId, pub)) { 01540 pub->association_complete(remoteId); 01541 } else { 01542 if (OpenDDS::DCPS::DCPS_debug_level > 3) { 01543 OpenDDS::DCPS::RepoIdConverter part_converter(participantId); 01544 OpenDDS::DCPS::RepoIdConverter local_converter(localId); 01545 OpenDDS::DCPS::RepoIdConverter remote_converter(remoteId); 01546 ACE_DEBUG((LM_WARNING, 01547 ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i::association_complete: ") 01548 ACE_TEXT("participant %C could not find subscription or publication %C ") 01549 ACE_TEXT("to complete association with remote %C.\n"), 01550 std::string(part_converter).c_str(), 01551 std::string(local_converter).c_str(), 01552 std::string(remote_converter).c_str())); 01553 } 01554 } 01555 }
CORBA::Boolean TAO_DDS_DCPSInfo_i::attach_participant | ( | DDS::DomainId_t | domainId, | |
const OpenDDS::DCPS::RepoId & | participantId | |||
) | [virtual] |
Definition at line 119 of file DCPSInfo_i.cpp.
References domains_, lock_, and DCPS_IR_Participant::takeOwnership().
00122 { 00123 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, 0); 00124 00125 // Grab the domain. 00126 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId); 00127 00128 if (where == this->domains_.end()) { 00129 throw OpenDDS::DCPS::Invalid_Domain(); 00130 } 00131 00132 // Grab the participant. 00133 DCPS_IR_Participant* participant 00134 = where->second->participant(participantId); 00135 00136 if (0 == participant) { 00137 throw OpenDDS::DCPS::Invalid_Participant(); 00138 } 00139 00140 // Establish ownership within the local repository. 00141 participant->takeOwnership(); 00142 00143 return false; 00144 }
bool TAO_DDS_DCPSInfo_i::changeOwnership | ( | DDS::DomainId_t | domainId, | |
const OpenDDS::DCPS::RepoId & | participantId, | |||
long | sender, | |||
long | owner | |||
) |
assert new ownership for a participant and its contained entities.
domainId | the domain in which the participant resides. | |
participantId | the participant to be owned. | |
sender | the repository sending the update data. | |
owner | the repository which is to make callbacks for entities within the participant. |
This establishes owner
as the new owner of the participant. Ownership consists of calling back to the reader and writer remote interfaces when associations are established and removed from a publication or subscription. Owner may be the special value of OWNER_NONE to indicate that the previous owner is no longer available to make callbacks and the application has not indicated which repository is to replace it in this capacity.
The sender
of the update is included so that the participant can check that transitions to OWNER_NONE are only honored when initiated by the current owner of the participant.
A return value of false
indicates that the ownership was specified for a domain or participant which could not be found.
Definition at line 147 of file DCPSInfo_i.cpp.
References DCPS_IR_Participant::changeOwner(), domains_, and lock_.
Referenced by OpenDDS::Federator::ManagerImpl::processCreate(), OpenDDS::Federator::ManagerImpl::processDeferred(), OpenDDS::Federator::ManagerImpl::processDelete(), and OpenDDS::Federator::ManagerImpl::processUpdateQos1().
00152 { 00153 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, false); 00154 00155 // Grab the domain. 00156 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId); 00157 00158 if (where == this->domains_.end()) { 00159 return false; 00160 } 00161 00162 // Grab the participant. 00163 DCPS_IR_Participant* participant 00164 = where->second->participant(participantId); 00165 00166 if (0 == participant) { 00167 return false; 00168 } 00169 00170 // Establish the ownership. 00171 participant->changeOwner(sender, owner); 00172 return true; 00173 }
void TAO_DDS_DCPSInfo_i::disassociate_participant | ( | DDS::DomainId_t | domainId, | |
const OpenDDS::DCPS::RepoId & | local_id, | |||
const OpenDDS::DCPS::RepoId & | remote_id | |||
) | [virtual] |
Definition at line 1304 of file DCPSInfo_i.cpp.
References domains_, lock_, DCPS_IR_Participant::publications(), and DCPS_IR_Participant::subscriptions().
01308 { 01309 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_); 01310 01311 DCPS_IR_Domain_Map::iterator it(this->domains_.find(domainId)); 01312 if (it == this->domains_.end()) { 01313 throw OpenDDS::DCPS::Invalid_Domain(); 01314 } 01315 01316 DCPS_IR_Participant* participant = it->second->participant(local_id); 01317 if (participant == 0) { 01318 throw OpenDDS::DCPS::Invalid_Participant(); 01319 } 01320 01321 // Disassociate from participant temporarily: 01322 const DCPS_IR_Subscription_Map& subscriptions = participant->subscriptions(); 01323 for (DCPS_IR_Subscription_Map::const_iterator sub(subscriptions.begin()); 01324 sub != subscriptions.end(); ++sub) { 01325 sub->second->disassociate_participant(remote_id, true); 01326 } 01327 01328 const DCPS_IR_Publication_Map& publications = participant->publications(); 01329 for (DCPS_IR_Publication_Map::const_iterator pub(publications.begin()); 01330 pub != publications.end(); ++pub) { 01331 pub->second->disassociate_participant(remote_id, true); 01332 } 01333 01334 it->second->remove_dead_participants(); 01335 }
void TAO_DDS_DCPSInfo_i::disassociate_publication | ( | DDS::DomainId_t | domainId, | |
const OpenDDS::DCPS::RepoId & | participantId, | |||
const OpenDDS::DCPS::RepoId & | local_id, | |||
const OpenDDS::DCPS::RepoId & | remote_id | |||
) | [virtual] |
Definition at line 1380 of file DCPSInfo_i.cpp.
References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, DCPS_IR_Publication::disassociate_subscription(), domains_, DCPS_IR_Participant::find_publication_reference(), LM_ERROR, LM_INFO, and lock_.
01385 { 01386 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_); 01387 01388 DCPS_IR_Domain_Map::iterator it(this->domains_.find(domainId)); 01389 if (it == this->domains_.end()) { 01390 throw OpenDDS::DCPS::Invalid_Domain(); 01391 } 01392 01393 DCPS_IR_Participant* participant = it->second->participant(participantId); 01394 if (participant == 0) { 01395 throw OpenDDS::DCPS::Invalid_Participant(); 01396 } 01397 01398 if (OpenDDS::DCPS::DCPS_debug_level > 3) { 01399 ACE_DEBUG((LM_INFO, "(%P|%t) disassociating publication\n")); 01400 } 01401 01402 DCPS_IR_Publication* publication; 01403 if (participant->find_publication_reference(local_id, publication) 01404 != 0 || publication == 0) { 01405 OpenDDS::DCPS::RepoIdConverter part_converter(participantId); 01406 OpenDDS::DCPS::RepoIdConverter pub_converter(local_id); 01407 ACE_ERROR((LM_ERROR, 01408 ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::disassociate_publication: ") 01409 ACE_TEXT("participant %C could not find publication %C.\n"), 01410 std::string(part_converter).c_str(), 01411 std::string(pub_converter).c_str())); 01412 throw OpenDDS::DCPS::Invalid_Publication(); 01413 } 01414 01415 // Disassociate from subscription temporarily: 01416 publication->disassociate_subscription(remote_id, true); 01417 01418 it->second->remove_dead_participants(); 01419 }
void TAO_DDS_DCPSInfo_i::disassociate_subscription | ( | DDS::DomainId_t | domainId, | |
const OpenDDS::DCPS::RepoId & | participantId, | |||
const OpenDDS::DCPS::RepoId & | local_id, | |||
const OpenDDS::DCPS::RepoId & | remote_id | |||
) | [virtual] |
Definition at line 1338 of file DCPSInfo_i.cpp.
References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, DCPS_IR_Subscription::disassociate_publication(), domains_, DCPS_IR_Participant::find_subscription_reference(), LM_ERROR, LM_INFO, and lock_.
01343 { 01344 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_); 01345 01346 DCPS_IR_Domain_Map::iterator it(this->domains_.find(domainId)); 01347 if (it == this->domains_.end()) { 01348 throw OpenDDS::DCPS::Invalid_Domain(); 01349 } 01350 01351 DCPS_IR_Participant* participant = it->second->participant(participantId); 01352 if (participant == 0) { 01353 throw OpenDDS::DCPS::Invalid_Participant(); 01354 } 01355 01356 if (OpenDDS::DCPS::DCPS_debug_level > 3) { 01357 ACE_DEBUG((LM_INFO, "(%P|%t) disassociating subscription\n")); 01358 } 01359 01360 DCPS_IR_Subscription* subscription; 01361 if (participant->find_subscription_reference(local_id, subscription) 01362 != 0 || subscription == 0) { 01363 OpenDDS::DCPS::RepoIdConverter part_converter(participantId); 01364 OpenDDS::DCPS::RepoIdConverter sub_converter(local_id); 01365 ACE_ERROR((LM_ERROR, 01366 ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::disassociate_subscription: ") 01367 ACE_TEXT("participant %C could not find subscription %C.\n"), 01368 std::string(part_converter).c_str(), 01369 std::string(sub_converter).c_str())); 01370 throw OpenDDS::DCPS::Invalid_Subscription(); 01371 } 01372 01373 // Disassociate from publication temporarily: 01374 subscription->disassociate_publication(remote_id, true); 01375 01376 it->second->remove_dead_participants(); 01377 }
DCPS_IR_Domain * TAO_DDS_DCPSInfo_i::domain | ( | DDS::DomainId_t | domain | ) |
Convert a domain Id into a reference to a DCPS_IR_Domain object.
Definition at line 2134 of file DCPSInfo_i.cpp.
References ACE_TEXT(), OpenDDS::DCPS::Service_Participant::ANY_DOMAIN, OpenDDS::DCPS::DCPS_debug_level, domains_, federation_, OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), DCPS_IR_Domain::init_built_in_topics(), LM_DEBUG, LM_ERROR, OpenDDS::DCPS::move(), TAO_DDS_DCPSFederationId::overridden(), participantIdGenerator_, reincarnate_, TheServiceParticipant, and DCPS_IR_Domain::useBIT().
Referenced by add_domain_participant(), receive_image(), and update_subscription_params().
02135 { 02136 if (domain == OpenDDS::DCPS::Service_Participant::ANY_DOMAIN) { 02137 ACE_ERROR((LM_ERROR, 02138 ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::domain: ") 02139 ACE_TEXT("ANY_DOMAIN not supported for operations.\n"))); 02140 return 0; 02141 } 02142 02143 // Check if the domain is already in the map. 02144 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domain); 02145 02146 if (where == this->domains_.end()) { 02147 // We will attempt to insert a new domain, go ahead and allocate it. 02148 OpenDDS::DCPS::unique_ptr<DCPS_IR_Domain> domain_uptr( new 02149 DCPS_IR_Domain(domain, this->participantIdGenerator_)); 02150 02151 DCPS_IR_Domain* domainPtr = domain_uptr.get(); 02152 02153 // We need to insert the domain into the map at this time since it 02154 // might be looked up during the init_built_in_topics() call. 02155 this->domains_.insert( 02156 where, 02157 DCPS_IR_Domain_Map::value_type(domain, OpenDDS::DCPS::move(domain_uptr))); 02158 02159 #ifndef DDS_HAS_MINIMUM_BIT 02160 if (TheServiceParticipant->get_BIT() && !domainPtr->useBIT() && 02161 domainPtr->init_built_in_topics(federation_.overridden(), reincarnate_) 02162 ) { 02163 ACE_ERROR((LM_ERROR, 02164 ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::domain: ") 02165 ACE_TEXT("failed to initialize the Built-In Topics ") 02166 ACE_TEXT("when loading domain %d.\n"), 02167 domain)); 02168 this->domains_.erase(domain); 02169 return 0; 02170 } 02171 #endif 02172 02173 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 02174 ACE_DEBUG((LM_DEBUG, 02175 ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::domain: ") 02176 ACE_TEXT("successfully loaded domain %d at %x.\n"), 02177 domain, 02178 domainPtr)); 02179 } 02180 return domainPtr; 02181 02182 } else { 02183 return where->second.get(); 02184 } 02185 }
const DCPS_IR_Domain_Map & TAO_DDS_DCPSInfo_i::domains | ( | ) | const |
Expose a readable reference of the domain map.
Definition at line 2470 of file DCPSInfo_i.cpp.
References domains_.
Referenced by OpenDDS::Federator::ManagerImpl::pushState().
02471 { 02472 return this->domains_; 02473 }
char * TAO_DDS_DCPSInfo_i::dump_to_string | ( | ) | [virtual] |
Dump the Repos state to string.
Definition at line 2477 of file DCPSInfo_i.cpp.
References domains_, and FACE::string_dup().
02478 { 02479 std::string dump; 02480 #if !defined (OPENDDS_INFOREPO_REDUCED_FOOTPRINT) 02481 std::string indent (" "); 02482 02483 for (DCPS_IR_Domain_Map::const_iterator dm = domains_.begin(); 02484 dm != domains_.end(); 02485 dm++) 02486 { 02487 dump += dm->second->dump_to_string(indent, 0); 02488 } 02489 #endif // !defined (OPENDDS_INFOREPO_REDUCED_FOOTPRINT) 02490 return CORBA::string_dup(dump.c_str()); 02491 02492 }
void TAO_DDS_DCPSInfo_i::finalize | ( | void | ) |
Cleanup state for shutdown.
Definition at line 2452 of file DCPSInfo_i.cpp.
References ACE_Reactor::cancel_timer(), dispatch_check_timer_id_, orb_, ACE_Event_Handler::reactor(), and reassociate_timer_id_.
02453 { 02454 if (reassociate_timer_id_ != -1) { 02455 ACE_Reactor* reactor = this->orb_->orb_core()->reactor(); 02456 02457 reactor->cancel_timer(this->reassociate_timer_id_); 02458 this->reassociate_timer_id_ = -1; 02459 } 02460 02461 if (dispatch_check_timer_id_ != -1) { 02462 ACE_Reactor* reactor = this->orb_->orb_core()->reactor(); 02463 02464 reactor->cancel_timer(this->dispatch_check_timer_id_); 02465 this->dispatch_check_timer_id_ = -1; 02466 } 02467 }
OpenDDS::DCPS::TopicStatus TAO_DDS_DCPSInfo_i::find_topic | ( | DDS::DomainId_t | domainId, | |
const char * | topicName, | |||
CORBA::String_out | dataTypeName, | |||
DDS::TopicQos_out | qos, | |||
OpenDDS::DCPS::RepoId_out | topicId | |||
) | [virtual] |
Definition at line 286 of file DCPSInfo_i.cpp.
References domains_, OpenDDS::DCPS::FOUND, DCPS_IR_Topic_Description::get_dataTypeName(), DCPS_IR_Topic::get_id(), DCPS_IR_Topic::get_topic_description(), DCPS_IR_Topic::get_topic_qos(), OpenDDS::DCPS::INTERNAL_ERROR, lock_, OpenDDS::DCPS::NOT_FOUND, and status.
00292 { 00293 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, OpenDDS::DCPS::INTERNAL_ERROR); 00294 00295 // Grab the domain. 00296 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId); 00297 00298 if (where == this->domains_.end()) { 00299 throw OpenDDS::DCPS::Invalid_Domain(); 00300 } 00301 00302 OpenDDS::DCPS::TopicStatus status = OpenDDS::DCPS::NOT_FOUND; 00303 00304 DCPS_IR_Topic* topic = 0; 00305 qos = new DDS::TopicQos; 00306 00307 status = where->second->find_topic(topicName, topic); 00308 00309 if (0 != topic) { 00310 status = OpenDDS::DCPS::FOUND; 00311 const DCPS_IR_Topic_Description* desc = topic->get_topic_description(); 00312 dataTypeName = desc->get_dataTypeName(); 00313 *qos = *(topic->get_topic_qos()); 00314 topicId = topic->get_id(); 00315 } 00316 00317 return status; 00318 }
int TAO_DDS_DCPSInfo_i::handle_timeout | ( | const ACE_Time_Value & | now, | |
const void * | arg | |||
) | [virtual] |
Reimplemented from ACE_Event_Handler.
Definition at line 63 of file DCPSInfo_i.cpp.
References dispatchingOrb_, domains_, TAO_Pseudo_Var_T< T >::in(), CORBA::is_nil(), and lock_.
00065 { 00066 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, 0); 00067 00068 if (arg == this) { 00069 if ( !CORBA::is_nil(this->dispatchingOrb_.in())){ 00070 if (this->dispatchingOrb_->work_pending()) 00071 { 00072 // Ten microseconds 00073 ACE_Time_Value small(0,10); 00074 this->dispatchingOrb_->perform_work(small); 00075 } 00076 } 00077 } 00078 else { 00079 // NOTE: This is a purposefully naive approach to addressing defunct 00080 // associations. In the future, it may be worthwhile to introduce a 00081 // callback model to fix the heinous runtime cost below: 00082 for (DCPS_IR_Domain_Map::const_iterator dom(this->domains_.begin()); 00083 dom != this->domains_.end(); ++dom) { 00084 00085 const DCPS_IR_Participant_Map& participants(dom->second->participants()); 00086 for (DCPS_IR_Participant_Map::const_iterator part(participants.begin()); 00087 part != participants.end(); ++part) { 00088 00089 const DCPS_IR_Subscription_Map& subscriptions(part->second->subscriptions()); 00090 for (DCPS_IR_Subscription_Map::const_iterator sub(subscriptions.begin()); 00091 sub != subscriptions.end(); ++sub) { 00092 sub->second->reevaluate_defunct_associations(); 00093 } 00094 00095 const DCPS_IR_Publication_Map& publications(part->second->publications()); 00096 for (DCPS_IR_Publication_Map::const_iterator pub(publications.begin()); 00097 pub != publications.end(); ++pub) { 00098 pub->second->reevaluate_defunct_associations(); 00099 } 00100 } 00101 } 00102 } 00103 00104 return 0; 00105 }
void TAO_DDS_DCPSInfo_i::ignore_domain_participant | ( | DDS::DomainId_t | domainId, | |
const OpenDDS::DCPS::RepoId & | myParticipantId, | |||
const OpenDDS::DCPS::RepoId & | ignoreId | |||
) | [virtual] |
Definition at line 1557 of file DCPSInfo_i.cpp.
References domains_, DCPS_IR_Participant::ignore_participant(), and lock_.
01561 { 01562 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_); 01563 01564 // Grab the domain. 01565 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId); 01566 01567 if (where == this->domains_.end()) { 01568 throw OpenDDS::DCPS::Invalid_Domain(); 01569 } 01570 01571 // Grab the participant. 01572 DCPS_IR_Participant* partPtr 01573 = where->second->participant(myParticipantId); 01574 01575 if (0 == partPtr) { 01576 throw OpenDDS::DCPS::Invalid_Participant(); 01577 } 01578 01579 partPtr->ignore_participant(ignoreId); 01580 01581 where->second->remove_dead_participants(); 01582 }
void TAO_DDS_DCPSInfo_i::ignore_publication | ( | DDS::DomainId_t | domainId, | |
const OpenDDS::DCPS::RepoId & | myParticipantId, | |||
const OpenDDS::DCPS::RepoId & | ignoreId | |||
) | [virtual] |
Definition at line 1638 of file DCPSInfo_i.cpp.
References domains_, DCPS_IR_Participant::ignore_publication(), and lock_.
01642 { 01643 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_); 01644 01645 // Grab the domain. 01646 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId); 01647 01648 if (where == this->domains_.end()) { 01649 throw OpenDDS::DCPS::Invalid_Domain(); 01650 } 01651 01652 // Grab the participant. 01653 DCPS_IR_Participant* partPtr 01654 = where->second->participant(myParticipantId); 01655 01656 if (0 == partPtr) { 01657 throw OpenDDS::DCPS::Invalid_Participant(); 01658 } 01659 01660 partPtr->ignore_publication(ignoreId); 01661 01662 where->second->remove_dead_participants(); 01663 }
void TAO_DDS_DCPSInfo_i::ignore_subscription | ( | DDS::DomainId_t | domainId, | |
const OpenDDS::DCPS::RepoId & | myParticipantId, | |||
const OpenDDS::DCPS::RepoId & | ignoreId | |||
) | [virtual] |
Definition at line 1611 of file DCPSInfo_i.cpp.
References domains_, DCPS_IR_Participant::ignore_subscription(), and lock_.
01615 { 01616 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_); 01617 01618 // Grab the domain. 01619 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId); 01620 01621 if (where == this->domains_.end()) { 01622 throw OpenDDS::DCPS::Invalid_Domain(); 01623 } 01624 01625 // Grab the participant. 01626 DCPS_IR_Participant* partPtr 01627 = where->second->participant(myParticipantId); 01628 01629 if (0 == partPtr) { 01630 throw OpenDDS::DCPS::Invalid_Participant(); 01631 } 01632 01633 partPtr->ignore_subscription(ignoreId); 01634 01635 where->second->remove_dead_participants(); 01636 }
void TAO_DDS_DCPSInfo_i::ignore_topic | ( | DDS::DomainId_t | domainId, | |
const OpenDDS::DCPS::RepoId & | myParticipantId, | |||
const OpenDDS::DCPS::RepoId & | ignoreId | |||
) | [virtual] |
Definition at line 1584 of file DCPSInfo_i.cpp.
References domains_, DCPS_IR_Participant::ignore_topic(), and lock_.
01588 { 01589 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_); 01590 01591 // Grab the domain. 01592 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId); 01593 01594 if (where == this->domains_.end()) { 01595 throw OpenDDS::DCPS::Invalid_Domain(); 01596 } 01597 01598 // Grab the participant. 01599 DCPS_IR_Participant* partPtr 01600 = where->second->participant(myParticipantId); 01601 01602 if (0 == partPtr) { 01603 throw OpenDDS::DCPS::Invalid_Participant(); 01604 } 01605 01606 partPtr->ignore_topic(ignoreId); 01607 01608 where->second->remove_dead_participants(); 01609 }
bool TAO_DDS_DCPSInfo_i::init_dispatchChecking | ( | const ACE_Time_Value & | delay | ) |
Definition at line 2441 of file DCPSInfo_i.cpp.
References dispatch_check_timer_id_, orb_, ACE_Event_Handler::reactor(), and ACE_Reactor::schedule_timer().
02442 { 02443 if (this->dispatch_check_timer_id_ != -1) return false; // already scheduled 02444 02445 ACE_Reactor* reactor = this->orb_->orb_core()->reactor(); 02446 02447 this->dispatch_check_timer_id_ = reactor->schedule_timer(this, this, delay, delay); 02448 return this->dispatch_check_timer_id_ != -1; 02449 }
bool TAO_DDS_DCPSInfo_i::init_persistence | ( | ) |
Definition at line 2408 of file DCPSInfo_i.cpp.
References ACE_TEXT(), Update::Manager::add(), LM_ERROR, reincarnate_, Update::Manager::requestImage(), and um_.
02409 { 02410 um_ = ACE_Dynamic_Service<UpdateManagerSvc>::instance 02411 ("UpdateManagerSvc"); 02412 02413 if (um_ != 0) { 02414 um_->add(this); 02415 02416 // Request persistent image. 02417 if (reincarnate_) { 02418 um_->requestImage(); 02419 } 02420 02421 } else { 02422 ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("TAO_DDS_DCPSInfo_i> Failed to discover ") 02423 ACE_TEXT("UpdateManagerSvc.\n")), false); 02424 } 02425 02426 return true; 02427 }
bool TAO_DDS_DCPSInfo_i::init_reassociation | ( | const ACE_Time_Value & | delay | ) |
Definition at line 2430 of file DCPSInfo_i.cpp.
References orb_, ACE_Event_Handler::reactor(), reassociate_timer_id_, and ACE_Reactor::schedule_timer().
02431 { 02432 if (this->reassociate_timer_id_ != -1) return false; // already scheduled 02433 02434 ACE_Reactor* reactor = this->orb_->orb_core()->reactor(); 02435 02436 this->reassociate_timer_id_ = reactor->schedule_timer(this, 0, delay, delay); 02437 return this->reassociate_timer_id_ != -1; 02438 }
int TAO_DDS_DCPSInfo_i::init_transport | ( | int | listen_address_given, | |
const char * | listen_str | |||
) |
Initialize the transport for the Built-In Topics Returns 0 (zero) if succeeds
Definition at line 2187 of file DCPSInfo_i.cpp.
References ACE_TEXT(), ACE_Service_Config::current(), OpenDDS::DCPS::TransportRegistry::DEFAULT_INST_PREFIX, OpenDDS::DCPS::dynamic_rchandle_cast(), Util::find(), OpenDDS::DCPS::TransportRegistry::instance(), ACE_Service_Config::process_directive(), and status.
02189 { 02190 int status = 0; 02191 02192 try { 02193 02194 #ifndef ACE_AS_STATIC_LIBS 02195 if (ACE_Service_Config::current()->find(ACE_TEXT("OpenDDS_Tcp")) 02196 < 0 /* not found (-1) or suspended (-2) */) { 02197 static const ACE_TCHAR directive[] = 02198 ACE_TEXT("dynamic OpenDDS_Tcp Service_Object * ") 02199 ACE_TEXT("OpenDDS_Tcp:_make_TcpLoader()"); 02200 ACE_Service_Config::process_directive(directive); 02201 } 02202 #endif 02203 02204 std::string config_name = 02205 OpenDDS::DCPS::TransportRegistry::DEFAULT_INST_PREFIX 02206 + std::string("InfoRepoBITTransportConfig"); 02207 OpenDDS::DCPS::TransportConfig_rch config = 02208 OpenDDS::DCPS::TransportRegistry::instance()->create_config(config_name); 02209 02210 std::string inst_name = 02211 OpenDDS::DCPS::TransportRegistry::DEFAULT_INST_PREFIX 02212 + std::string("InfoRepoBITTCPTransportInst"); 02213 OpenDDS::DCPS::TransportInst_rch inst = 02214 OpenDDS::DCPS::TransportRegistry::instance()->create_inst(inst_name, 02215 "tcp"); 02216 config->instances_.push_back(inst); 02217 02218 OpenDDS::DCPS::TcpInst_rch tcp_inst = 02219 OpenDDS::DCPS::dynamic_rchandle_cast<OpenDDS::DCPS::TcpInst>(inst); 02220 inst->datalink_release_delay_ = 0; 02221 02222 tcp_inst->conn_retry_attempts_ = 0; 02223 02224 if (listen_address_given) { 02225 tcp_inst->local_address(listen_str); 02226 } 02227 02228 } catch (...) { 02229 // TransportRegistry is extremely varied in the exceptions that 02230 // it throws on failure; do not allow exceptions to bubble up 02231 // beyond this point. 02232 status = 1; 02233 } 02234 return status; 02235 }
CORBA::ORB_ptr TAO_DDS_DCPSInfo_i::orb | ( | void | ) |
Expose the ORB.
Definition at line 114 of file DCPSInfo_i.cpp.
References CORBA::ORB::_duplicate(), TAO_Pseudo_Var_T< T >::in(), and orb_.
Referenced by OpenDDS::Federator::ManagerImpl::pushState().
00115 { 00116 return CORBA::ORB::_duplicate(this->orb_.in()); 00117 }
bool TAO_DDS_DCPSInfo_i::receive_image | ( | const Update::UImage & | image | ) |
Definition at line 2238 of file DCPSInfo_i.cpp.
References ACE_TEXT(), Update::ActorStrt< PSQ, RWQ, C, T, CSP >::actorId, Update::ImageData< T, P, A, W >::actors, add_domain_participant(), add_publication(), add_subscription(), add_topic(), Update::ActorStrt< PSQ, RWQ, C, T, CSP >::callback, Update::ActorStrt< PSQ, RWQ, C, T, CSP >::contentSubscriptionProfile, Update::TopicStrt< Q, S >::dataType, OpenDDS::DCPS::DCPS_debug_level, domain(), Update::ActorStrt< PSQ, RWQ, C, T, CSP >::domainId, Update::TopicStrt< Q, S >::domainId, Update::ParticipantStrt< Q >::domainId, domains_, Update::ActorStrt< PSQ, RWQ, C, T, CSP >::drdwQos, OpenDDS::DCPS::RepoIdGenerator::last(), Update::ImageData< T, P, A, W >::lastPartId, LM_DEBUG, LM_ERROR, LM_WARNING, Update::TopicStrt< Q, S >::name, Update::ActorStrt< PSQ, RWQ, C, T, CSP >::participantId, Update::TopicStrt< Q, S >::participantId, Update::ParticipantStrt< Q >::participantId, participantIdGenerator_, Update::ParticipantStrt< Q >::participantQos, Update::ImageData< T, P, A, W >::participants, Update::ActorStrt< PSQ, RWQ, C, T, CSP >::pubsubQos, TheServiceParticipant, Update::ActorStrt< PSQ, RWQ, C, T, CSP >::topicId, Update::TopicStrt< Q, S >::topicId, Update::TopicStrt< Q, S >::topicQos, Update::ImageData< T, P, A, W >::topics, Update::ActorStrt< PSQ, RWQ, C, T, CSP >::transportInterfaceInfo, and Update::ImageData< T, P, A, W >::wActors.
Referenced by Update::Manager::pushImage().
02239 { 02240 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 02241 ACE_DEBUG((LM_DEBUG, 02242 ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::receive_image: ") 02243 ACE_TEXT("processing persistent data.\n"))); 02244 } 02245 02246 // Initialize builtin topics first so that they always have the same IDs 02247 #ifndef DDS_HAS_MINIMUM_BIT 02248 if (TheServiceParticipant->get_BIT()) { 02249 for (Update::UImage::ParticipantSeq::const_iterator 02250 iter = image.participants.begin(); 02251 iter != image.participants.end(); iter++) { 02252 const Update::UParticipant* part = *iter; 02253 if (!domain(part->domainId)) { 02254 if (OpenDDS::DCPS::DCPS_debug_level > 4) { 02255 ACE_DEBUG((LM_WARNING, 02256 ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i::receive_image: ") 02257 ACE_TEXT("invalid domain Id: %d\n"), 02258 part->domainId)); 02259 } 02260 return false; 02261 } 02262 } 02263 } 02264 #endif 02265 02266 // Ensure that new non-BIT participants do not reuse an id 02267 participantIdGenerator_.last(image.lastPartId); 02268 02269 for (Update::UImage::ParticipantSeq::const_iterator 02270 iter = image.participants.begin(); 02271 iter != image.participants.end(); iter++) { 02272 const Update::UParticipant* part = *iter; 02273 02274 if (!this->add_domain_participant(part->domainId, part->participantId 02275 , part->participantQos)) { 02276 OpenDDS::DCPS::RepoIdConverter converter(part->participantId); 02277 ACE_ERROR((LM_ERROR, 02278 ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::receive_image: ") 02279 ACE_TEXT("failed to add participant %C to domain %d.\n"), 02280 std::string(converter).c_str(), 02281 part->domainId)); 02282 return false; 02283 02284 } else if (OpenDDS::DCPS::DCPS_debug_level > 0) { 02285 OpenDDS::DCPS::RepoIdConverter converter(part->participantId); 02286 ACE_DEBUG((LM_DEBUG, 02287 ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::receive_image: ") 02288 ACE_TEXT("added participant %C to domain %d.\n"), 02289 std::string(converter).c_str(), 02290 part->domainId)); 02291 } 02292 } 02293 02294 for (Update::UImage::TopicSeq::const_iterator iter = image.topics.begin(); 02295 iter != image.topics.end(); iter++) { 02296 const Update::UTopic* topic = *iter; 02297 02298 if (!this->add_topic(topic->topicId, topic->domainId 02299 , topic->participantId, topic->name.c_str() 02300 , topic->dataType.c_str(), topic->topicQos)) { 02301 OpenDDS::DCPS::RepoIdConverter topic_converter(topic->topicId); 02302 OpenDDS::DCPS::RepoIdConverter part_converter(topic->participantId); 02303 ACE_ERROR((LM_ERROR, 02304 ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::receive_image: ") 02305 ACE_TEXT("failed to add topic %C to participant %C.\n"), 02306 std::string(topic_converter).c_str(), 02307 std::string(part_converter).c_str())); 02308 return false; 02309 02310 } else if (OpenDDS::DCPS::DCPS_debug_level > 0) { 02311 OpenDDS::DCPS::RepoIdConverter topic_converter(topic->topicId); 02312 OpenDDS::DCPS::RepoIdConverter part_converter(topic->participantId); 02313 ACE_DEBUG((LM_DEBUG, 02314 ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::receive_image: ") 02315 ACE_TEXT("added topic %C to participant %C.\n"), 02316 std::string(topic_converter).c_str(), 02317 std::string(part_converter).c_str())); 02318 } 02319 } 02320 02321 for (Update::UImage::ReaderSeq::const_iterator iter = image.actors.begin(); 02322 iter != image.actors.end(); iter++) { 02323 const Update::URActor* sub = *iter; 02324 02325 // no reason to associate, there are no publishers yet to associate with 02326 if (!this->add_subscription(sub->domainId, sub->participantId 02327 , sub->topicId, sub->actorId 02328 , sub->callback.c_str(), sub->drdwQos 02329 , sub->transportInterfaceInfo 02330 , sub->pubsubQos 02331 , sub->contentSubscriptionProfile.filterClassName 02332 , sub->contentSubscriptionProfile.filterExpr 02333 , sub->contentSubscriptionProfile.exprParams)) { 02334 OpenDDS::DCPS::RepoIdConverter sub_converter(sub->actorId); 02335 OpenDDS::DCPS::RepoIdConverter part_converter(sub->participantId); 02336 ACE_ERROR((LM_ERROR, 02337 ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::receive_image: ") 02338 ACE_TEXT("failed to add subscription %C to participant %C.\n"), 02339 std::string(sub_converter).c_str(), 02340 std::string(part_converter).c_str())); 02341 return false; 02342 02343 } else if (OpenDDS::DCPS::DCPS_debug_level > 0) { 02344 OpenDDS::DCPS::RepoIdConverter sub_converter(sub->actorId); 02345 OpenDDS::DCPS::RepoIdConverter part_converter(sub->participantId); 02346 ACE_DEBUG((LM_DEBUG, 02347 ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::receive_image: ") 02348 ACE_TEXT("added subscription %C to participant %C.\n"), 02349 std::string(sub_converter).c_str(), 02350 std::string(part_converter).c_str())); 02351 } 02352 } 02353 02354 for (Update::UImage::WriterSeq::const_iterator iter = image.wActors.begin(); 02355 iter != image.wActors.end(); iter++) { 02356 const Update::UWActor* pub = *iter; 02357 02358 // try to associate with any persisted subscriptions to track any expected 02359 // existing associations 02360 if (!this->add_publication(pub->domainId, pub->participantId 02361 , pub->topicId, pub->actorId 02362 , pub->callback.c_str() , pub->drdwQos 02363 , pub->transportInterfaceInfo 02364 , pub->pubsubQos 02365 , true)) { 02366 OpenDDS::DCPS::RepoIdConverter pub_converter(pub->actorId); 02367 OpenDDS::DCPS::RepoIdConverter part_converter(pub->participantId); 02368 ACE_ERROR((LM_ERROR, 02369 ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::receive_image: ") 02370 ACE_TEXT("failed to add publication %C to participant %C.\n"), 02371 std::string(pub_converter).c_str(), 02372 std::string(part_converter).c_str())); 02373 return false; 02374 02375 } else if (OpenDDS::DCPS::DCPS_debug_level > 0) { 02376 OpenDDS::DCPS::RepoIdConverter pub_converter(pub->actorId); 02377 OpenDDS::DCPS::RepoIdConverter part_converter(pub->participantId); 02378 ACE_DEBUG((LM_DEBUG, 02379 ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::receive_image: ") 02380 ACE_TEXT("added publication %C to participant %C.\n"), 02381 std::string(pub_converter).c_str(), 02382 std::string(part_converter).c_str())); 02383 } 02384 } 02385 02386 #ifndef DDS_HAS_MINIMUM_BIT 02387 if (TheServiceParticipant->get_BIT()) { 02388 for (DCPS_IR_Domain_Map::const_iterator currentDomain = domains_.begin(); 02389 currentDomain != domains_.end(); 02390 ++currentDomain) { 02391 currentDomain->second->reassociate_built_in_topic_pubs(); 02392 } 02393 } 02394 #endif 02395 02396 return true; 02397 }
bool TAO_DDS_DCPSInfo_i::remove_by_owner | ( | DDS::DomainId_t | domain, | |
long | owner | |||
) |
Definition at line 1180 of file DCPSInfo_i.cpp.
References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, domains_, LM_DEBUG, lock_, DCPS_IR_Participant::publications(), remove_domain_participant(), DCPS_IR_Participant::remove_publication(), DCPS_IR_Participant::remove_subscription(), DCPS_IR_Participant::remove_topic_reference(), status, DCPS_IR_Participant::subscriptions(), and DCPS_IR_Participant::topics().
Referenced by OpenDDS::Federator::ManagerImpl::leave_federation().
01183 { 01184 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, false); 01185 01186 // Grab the domain. 01187 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domain); 01188 01189 if (where == this->domains_.end()) { 01190 return false; 01191 } 01192 01193 std::vector<OpenDDS::DCPS::RepoId> candidates; 01194 01195 for (DCPS_IR_Participant_Map::const_iterator 01196 current = where->second->participants().begin(); 01197 current != where->second->participants().end(); 01198 ++current) { 01199 if (current->second->owner() == owner) { 01200 candidates.push_back(current->second->get_id()); 01201 } 01202 } 01203 01204 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 01205 ACE_DEBUG((LM_DEBUG, 01206 ACE_TEXT("(%P|%t) (bool)TAO_DDS_DCPSInfo_i::remove_by_owner: ") 01207 ACE_TEXT("%d participants to remove from domain %d.\n"), 01208 candidates.size(), 01209 domain)); 01210 } 01211 01212 bool status = true; 01213 01214 for (unsigned int index = 0; index < candidates.size(); ++index) { 01215 DCPS_IR_Participant* participant 01216 = where->second->participant(candidates[index]); 01217 if (participant) { 01218 std::vector<OpenDDS::DCPS::RepoId> keylist; 01219 01220 // Remove Subscriptions 01221 for (DCPS_IR_Subscription_Map::const_iterator 01222 current = participant->subscriptions().begin(); 01223 current != participant->subscriptions().end(); 01224 ++current) { 01225 keylist.push_back(current->second->get_id()); 01226 } 01227 01228 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 01229 OpenDDS::DCPS::RepoIdConverter converter(candidates[index]); 01230 ACE_DEBUG((LM_DEBUG, 01231 ACE_TEXT("(%P|%t) (bool)TAO_DDS_DCPSInfo_i::remove_by_owner: ") 01232 ACE_TEXT("%d subscriptions to remove from participant %C.\n"), 01233 keylist.size(), 01234 std::string(converter).c_str())); 01235 } 01236 01237 for (unsigned int key = 0; key < keylist.size(); ++key) { 01238 if (participant->remove_subscription(keylist[key]) != 0) { 01239 status = false; 01240 } 01241 } 01242 01243 // Remove Publications 01244 keylist.clear(); 01245 01246 for (DCPS_IR_Publication_Map::const_iterator 01247 current = participant->publications().begin(); 01248 current != participant->publications().end(); 01249 ++current) { 01250 keylist.push_back(current->second->get_id()); 01251 } 01252 01253 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 01254 OpenDDS::DCPS::RepoIdConverter converter(candidates[index]); 01255 ACE_DEBUG((LM_DEBUG, 01256 ACE_TEXT("(%P|%t) (bool)TAO_DDS_DCPSInfo_i::remove_by_owner: ") 01257 ACE_TEXT("%d publications to remove from participant %C.\n"), 01258 keylist.size(), 01259 std::string(converter).c_str())); 01260 } 01261 01262 for (unsigned int key = 0; key < keylist.size(); ++key) { 01263 if (participant->remove_publication(keylist[key]) != 0) { 01264 status = false; 01265 } 01266 } 01267 01268 // Remove Topics 01269 keylist.clear(); 01270 01271 for (DCPS_IR_Topic_Map::const_iterator 01272 current = participant->topics().begin(); 01273 current != participant->topics().end(); 01274 ++current) { 01275 keylist.push_back(current->second->get_id()); 01276 } 01277 01278 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 01279 OpenDDS::DCPS::RepoIdConverter converter(candidates[index]); 01280 ACE_DEBUG((LM_DEBUG, 01281 ACE_TEXT("(%P|%t) (bool)TAO_DDS_DCPSInfo_i::remove_by_owner: ") 01282 ACE_TEXT("%d topics to remove from participant %C.\n"), 01283 keylist.size(), 01284 std::string(converter).c_str())); 01285 } 01286 01287 for (unsigned int key = 0; key < keylist.size(); ++key) { 01288 DCPS_IR_Topic* discard; 01289 01290 if (participant->remove_topic_reference(keylist[key], discard) != 0) { 01291 status = false; 01292 } 01293 } 01294 } 01295 01296 // Remove Participant 01297 this->remove_domain_participant(domain, candidates[ index]); 01298 } 01299 01300 return status; 01301 }
void TAO_DDS_DCPSInfo_i::remove_domain_participant | ( | DDS::DomainId_t | domainId, | |
const OpenDDS::DCPS::RepoId & | participantId | |||
) | [virtual] |
Definition at line 1421 of file DCPSInfo_i.cpp.
References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, Update::Manager::destroy(), domains_, ACE_Event_Handler_var::handler(), DCPS_IR_Participant::isBitPublisher(), DCPS_IR_Participant::isOwner(), LM_DEBUG, LM_ERROR, lock_, Update::Participant, status, TheServiceParticipant, and um_.
Referenced by OpenDDS::Federator::ManagerImpl::processDelete(), and remove_by_owner().
01424 { 01425 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_); 01426 01427 // Grab the domain. 01428 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId); 01429 01430 if (where == this->domains_.end()) { 01431 throw OpenDDS::DCPS::Invalid_Domain(); 01432 } 01433 01434 DCPS_IR_Participant* participant = where->second->participant(participantId); 01435 01436 if (participant == 0) { 01437 OpenDDS::DCPS::RepoIdConverter converter(participantId); 01438 ACE_ERROR((LM_ERROR, 01439 ACE_TEXT("(%P|%t) ERROR: (bool)TAO_DDS_DCPSInfo_i::remove_domain_participant: ") 01440 ACE_TEXT("failed to locate participant %C in domain %d.\n"), 01441 std::string(converter).c_str(), 01442 domainId)); 01443 throw OpenDDS::DCPS::Invalid_Participant(); 01444 } 01445 01446 // Determine if we should propagate this event; we need to cache this 01447 // result as the participant will be gone by the time we use the result. 01448 bool sendUpdate = (participant->isOwner() == true) 01449 && (participant->isBitPublisher() == false); 01450 01451 CORBA::Boolean dont_notify_lost = 0; 01452 int status = where->second->remove_participant(participantId, dont_notify_lost); 01453 01454 if (0 != status) { 01455 // Removing the participant failed 01456 throw OpenDDS::DCPS::Invalid_Participant(); 01457 } 01458 01459 // Update any concerned observers that the participant was destroyed. 01460 if (this->um_ && sendUpdate) { 01461 Update::IdPath path( 01462 where->second->get_id(), 01463 participantId, 01464 participantId); 01465 this->um_->destroy(path, Update::Participant); 01466 01467 if (OpenDDS::DCPS::DCPS_debug_level > 4) { 01468 OpenDDS::DCPS::RepoIdConverter converter(participantId); 01469 ACE_DEBUG((LM_DEBUG, 01470 ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::remove_domain_participant: ") 01471 ACE_TEXT("pushing deletion of participant %C in domain %d.\n"), 01472 std::string(converter).c_str(), 01473 domainId)); 01474 } 01475 } 01476 01477 if (where->second->participants().empty()) { 01478 domains_.erase(where); 01479 } 01480 01481 #ifndef DDS_HAS_MINIMUM_BIT 01482 else if (where->second->useBIT() && 01483 where->second->participants().size() == 1) { 01484 // The only participant left is the one we created to publish BITs. 01485 // It can be removed now since no user participants exist in this domain, 01486 // but it has to be removed on the Service Participant's reactor thread 01487 // in order to make the locking work properly in delete_participant(). 01488 const ACE_Event_Handler_var eh = new BIT_Cleanup_Handler(this, domainId); 01489 TheServiceParticipant->reactor()->notify(eh.handler()); 01490 } 01491 #endif 01492 }
void TAO_DDS_DCPSInfo_i::remove_publication | ( | DDS::DomainId_t | domainId, | |
const OpenDDS::DCPS::RepoId & | participantId, | |||
const OpenDDS::DCPS::RepoId & | publicationId | |||
) | [virtual] |
Definition at line 617 of file DCPSInfo_i.cpp.
References ACE_TEXT(), Update::Actor, Update::DataWriter, OpenDDS::DCPS::DCPS_debug_level, Update::Manager::destroy(), domains_, DCPS_IR_Participant::isBitPublisher(), DCPS_IR_Participant::isOwner(), LM_DEBUG, lock_, DCPS_IR_Participant::remove_publication(), and um_.
Referenced by OpenDDS::Federator::ManagerImpl::processDelete().
00621 { 00622 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_); 00623 00624 // Grab the domain. 00625 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId); 00626 00627 if (where == this->domains_.end()) { 00628 throw OpenDDS::DCPS::Invalid_Domain(); 00629 } 00630 00631 // Grab the participant. 00632 DCPS_IR_Participant* partPtr 00633 = where->second->participant(participantId); 00634 00635 if (0 == partPtr) { 00636 throw OpenDDS::DCPS::Invalid_Participant(); 00637 } 00638 00639 if (partPtr->remove_publication(publicationId) != 0) { 00640 where->second->remove_dead_participants(); 00641 00642 // throw exception because the publication was not removed! 00643 throw OpenDDS::DCPS::Invalid_Publication(); 00644 } 00645 00646 where->second->remove_dead_participants(); 00647 00648 if (this->um_ 00649 && (partPtr->isOwner() == true) 00650 && (partPtr->isBitPublisher() == false)) { 00651 Update::IdPath path(domainId, participantId, publicationId); 00652 this->um_->destroy(path, Update::Actor, Update::DataWriter); 00653 00654 if (OpenDDS::DCPS::DCPS_debug_level > 4) { 00655 OpenDDS::DCPS::RepoIdConverter converter(publicationId); 00656 ACE_DEBUG((LM_DEBUG, 00657 ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::remove_publication: ") 00658 ACE_TEXT("pushing deletion of publication %C in domain %d.\n"), 00659 std::string(converter).c_str(), 00660 domainId)); 00661 } 00662 } 00663 }
void TAO_DDS_DCPSInfo_i::remove_subscription | ( | DDS::DomainId_t | domainId, | |
const OpenDDS::DCPS::RepoId & | participantId, | |||
const OpenDDS::DCPS::RepoId & | subscriptionId | |||
) | [virtual] |
Definition at line 938 of file DCPSInfo_i.cpp.
References ACE_TEXT(), Update::Actor, Update::DataReader, OpenDDS::DCPS::DCPS_debug_level, Update::Manager::destroy(), domains_, DCPS_IR_Participant::isBitPublisher(), DCPS_IR_Participant::isOwner(), LM_DEBUG, lock_, DCPS_IR_Participant::remove_subscription(), and um_.
Referenced by OpenDDS::Federator::ManagerImpl::processDelete().
00942 { 00943 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_); 00944 00945 // Grab the domain. 00946 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId); 00947 00948 if (where == this->domains_.end()) { 00949 throw OpenDDS::DCPS::Invalid_Domain(); 00950 } 00951 00952 // Grab the participant. 00953 DCPS_IR_Participant* partPtr 00954 = where->second->participant(participantId); 00955 00956 if (0 == partPtr) { 00957 throw OpenDDS::DCPS::Invalid_Participant(); 00958 } 00959 00960 if (partPtr->remove_subscription(subscriptionId) != 0) { 00961 // throw exception because the subscription was not removed! 00962 throw OpenDDS::DCPS::Invalid_Subscription(); 00963 } 00964 00965 where->second->remove_dead_participants(); 00966 00967 if (this->um_ 00968 && (partPtr->isOwner() == true) 00969 && (partPtr->isBitPublisher() == false)) { 00970 Update::IdPath path(domainId, participantId, subscriptionId); 00971 this->um_->destroy(path, Update::Actor, Update::DataReader); 00972 00973 if (OpenDDS::DCPS::DCPS_debug_level > 4) { 00974 OpenDDS::DCPS::RepoIdConverter converter(subscriptionId); 00975 ACE_DEBUG((LM_DEBUG, 00976 ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::remove_subscription: ") 00977 ACE_TEXT("pushing deletion of subscription %C in domain %d.\n"), 00978 std::string(converter).c_str(), 00979 domainId)); 00980 } 00981 } 00982 }
OpenDDS::DCPS::TopicStatus TAO_DDS_DCPSInfo_i::remove_topic | ( | DDS::DomainId_t | domainId, | |
const OpenDDS::DCPS::RepoId & | participantId, | |||
const OpenDDS::DCPS::RepoId & | topicId | |||
) | [virtual] |
Definition at line 320 of file DCPSInfo_i.cpp.
References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, Update::Manager::destroy(), domains_, DCPS_IR_Participant::find_topic_reference(), OpenDDS::DCPS::INTERNAL_ERROR, DCPS_IR_Participant::isBitPublisher(), DCPS_IR_Participant::isOwner(), LM_DEBUG, lock_, Update::Topic, and um_.
Referenced by OpenDDS::Federator::ManagerImpl::processDelete().
00324 { 00325 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, OpenDDS::DCPS::INTERNAL_ERROR); 00326 00327 // Grab the domain. 00328 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId); 00329 00330 if (where == this->domains_.end()) { 00331 throw OpenDDS::DCPS::Invalid_Domain(); 00332 } 00333 00334 // Grab the participant. 00335 DCPS_IR_Participant* partPtr 00336 = where->second->participant(participantId); 00337 00338 if (0 == partPtr) { 00339 throw OpenDDS::DCPS::Invalid_Participant(); 00340 } 00341 00342 DCPS_IR_Topic* topic; 00343 00344 if (partPtr->find_topic_reference(topicId, topic) != 0) { 00345 throw OpenDDS::DCPS::Invalid_Topic(); 00346 } 00347 00348 OpenDDS::DCPS::TopicStatus removedStatus = where->second->remove_topic(partPtr, topic); 00349 00350 if (this->um_ 00351 && (partPtr->isOwner() == true) 00352 && (partPtr->isBitPublisher() == false)) { 00353 Update::IdPath path(domainId, participantId, topicId); 00354 this->um_->destroy(path, Update::Topic); 00355 00356 if (OpenDDS::DCPS::DCPS_debug_level > 4) { 00357 OpenDDS::DCPS::RepoIdConverter converter(topicId); 00358 ACE_DEBUG((LM_DEBUG, 00359 ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::remove_topic: ") 00360 ACE_TEXT("pushing deletion of topic %C in domain %d.\n"), 00361 std::string(converter).c_str(), 00362 domainId)); 00363 } 00364 } 00365 00366 return removedStatus; 00367 }
void TAO_DDS_DCPSInfo_i::shutdown | ( | void | ) | [virtual] |
Cause the entire repository to exit.
Definition at line 108 of file DCPSInfo_i.cpp.
References ShutdownInterface::shutdown(), and shutdown_.
Referenced by OpenDDS::Federator::ManagerImpl::leave_and_shutdown(), and OpenDDS::Federator::ManagerImpl::shutdown().
CORBA::Boolean TAO_DDS_DCPSInfo_i::update_domain_participant_qos | ( | DDS::DomainId_t | domain, | |
const OpenDDS::DCPS::RepoId & | participantId, | |||
const DDS::DomainParticipantQos & | qos | |||
) | [virtual] |
Definition at line 2089 of file DCPSInfo_i.cpp.
References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, domains_, DCPS_IR_Participant::isBitPublisher(), DCPS_IR_Participant::isOwner(), LM_DEBUG, lock_, DCPS_IR_Participant::set_qos(), um_, and Update::Manager::update().
Referenced by OpenDDS::Federator::ManagerImpl::processUpdateQos1().
02093 { 02094 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, 0); 02095 02096 // Grab the domain. 02097 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId); 02098 02099 if (where == this->domains_.end()) { 02100 throw OpenDDS::DCPS::Invalid_Domain(); 02101 } 02102 02103 // Grab the participant. 02104 DCPS_IR_Participant* partPtr 02105 = where->second->participant(participantId); 02106 02107 if (0 == partPtr) { 02108 throw OpenDDS::DCPS::Invalid_Participant(); 02109 } 02110 02111 if (partPtr->set_qos(qos) == false) 02112 return 0; 02113 02114 if (this->um_ 02115 && (partPtr->isOwner() == true) 02116 && (partPtr->isBitPublisher() == false)) { 02117 Update::IdPath path(domainId, participantId, participantId); 02118 this->um_->update(path, qos); 02119 02120 if (OpenDDS::DCPS::DCPS_debug_level > 4) { 02121 OpenDDS::DCPS::RepoIdConverter converter(participantId); 02122 ACE_DEBUG((LM_DEBUG, 02123 ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::update_domain_participant_qos: ") 02124 ACE_TEXT("pushing update of participant %C in domain %d.\n"), 02125 std::string(converter).c_str(), 02126 domainId)); 02127 } 02128 } 02129 02130 return 1; 02131 }
void TAO_DDS_DCPSInfo_i::update_publication_qos | ( | DDS::DomainId_t | domainId, | |
const OpenDDS::DCPS::RepoId & | partId, | |||
const OpenDDS::DCPS::RepoId & | dwId, | |||
const DDS::PublisherQos & | qos | |||
) |
Entry for federation updates of PublisherQos values.
Definition at line 1786 of file DCPSInfo_i.cpp.
References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, domains_, DCPS_IR_Participant::find_publication_reference(), LM_ERROR, LM_INFO, lock_, and DCPS_IR_Publication::set_qos().
01791 { 01792 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_); 01793 01794 // Grab the domain. 01795 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId); 01796 01797 if (where == this->domains_.end()) { 01798 throw OpenDDS::DCPS::Invalid_Domain(); 01799 } 01800 01801 // Grab the participant. 01802 DCPS_IR_Participant* partPtr 01803 = where->second->participant(partId); 01804 01805 if (0 == partPtr) { 01806 throw OpenDDS::DCPS::Invalid_Participant(); 01807 } 01808 01809 if (OpenDDS::DCPS::DCPS_debug_level > 3) { 01810 ACE_DEBUG((LM_INFO, "(%P|%t) updating publication qos 3\n")); 01811 } 01812 01813 DCPS_IR_Publication* pub; 01814 01815 if (partPtr->find_publication_reference(dwId, pub) != 0 || pub == 0) { 01816 OpenDDS::DCPS::RepoIdConverter part_converter(partId); 01817 OpenDDS::DCPS::RepoIdConverter pub_converter(dwId); 01818 ACE_ERROR((LM_ERROR, 01819 ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::update_publication_qos: ") 01820 ACE_TEXT("participant %C could not find publication %C.\n"), 01821 std::string(part_converter).c_str(), 01822 std::string(pub_converter).c_str())); 01823 throw OpenDDS::DCPS::Invalid_Publication(); 01824 } 01825 01826 pub->set_qos(qos); 01827 }
void TAO_DDS_DCPSInfo_i::update_publication_qos | ( | DDS::DomainId_t | domainId, | |
const OpenDDS::DCPS::RepoId & | partId, | |||
const OpenDDS::DCPS::RepoId & | dwId, | |||
const DDS::DataWriterQos & | qos | |||
) |
Entry for federation updates of DataWriterQos values.
Definition at line 1742 of file DCPSInfo_i.cpp.
References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, domains_, DCPS_IR_Participant::find_publication_reference(), LM_ERROR, LM_INFO, lock_, and DCPS_IR_Publication::set_qos().
01747 { 01748 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_); 01749 01750 // Grab the domain. 01751 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId); 01752 01753 if (where == this->domains_.end()) { 01754 throw OpenDDS::DCPS::Invalid_Domain(); 01755 } 01756 01757 // Grab the participant. 01758 DCPS_IR_Participant* partPtr 01759 = where->second->participant(partId); 01760 01761 if (0 == partPtr) { 01762 throw OpenDDS::DCPS::Invalid_Participant(); 01763 } 01764 01765 if (OpenDDS::DCPS::DCPS_debug_level > 3) { 01766 ACE_DEBUG((LM_INFO, "(%P|%t) updating publication qos 2\n")); 01767 } 01768 01769 DCPS_IR_Publication* pub; 01770 01771 if (partPtr->find_publication_reference(dwId, pub) != 0 || pub == 0) { 01772 OpenDDS::DCPS::RepoIdConverter part_converter(partId); 01773 OpenDDS::DCPS::RepoIdConverter pub_converter(dwId); 01774 ACE_ERROR((LM_ERROR, 01775 ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::update_publication_qos: ") 01776 ACE_TEXT("participant %C could not find publication %C.\n"), 01777 std::string(part_converter).c_str(), 01778 std::string(pub_converter).c_str())); 01779 throw OpenDDS::DCPS::Invalid_Publication(); 01780 } 01781 01782 pub->set_qos(qos); 01783 }
CORBA::Boolean TAO_DDS_DCPSInfo_i::update_publication_qos | ( | DDS::DomainId_t | domainId, | |
const OpenDDS::DCPS::RepoId & | partId, | |||
const OpenDDS::DCPS::RepoId & | dwId, | |||
const DDS::DataWriterQos & | qos, | |||
const DDS::PublisherQos & | publisherQos | |||
) | [virtual] |
Definition at line 1665 of file DCPSInfo_i.cpp.
References ACE_TEXT(), Update::DataWriterQos, OpenDDS::DCPS::DCPS_debug_level, domains_, DCPS_IR_Participant::find_publication_reference(), DCPS_IR_Participant::isBitPublisher(), LM_DEBUG, LM_ERROR, LM_INFO, lock_, Update::NoQos, Update::PublisherQos, DCPS_IR_Publication::set_qos(), um_, and Update::Manager::update().
Referenced by OpenDDS::Federator::ManagerImpl::processUpdateQos1(), and OpenDDS::Federator::ManagerImpl::processUpdateQos2().
01671 { 01672 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, 0); 01673 01674 // Grab the domain. 01675 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId); 01676 01677 if (where == this->domains_.end()) { 01678 throw OpenDDS::DCPS::Invalid_Domain(); 01679 } 01680 01681 // Grab the participant. 01682 DCPS_IR_Participant* partPtr 01683 = where->second->participant(partId); 01684 01685 if (0 == partPtr) { 01686 throw OpenDDS::DCPS::Invalid_Participant(); 01687 } 01688 01689 if (OpenDDS::DCPS::DCPS_debug_level > 3) { 01690 ACE_DEBUG((LM_INFO, "(%P|%t) updating publication qos 1\n")); 01691 } 01692 01693 DCPS_IR_Publication* pub; 01694 01695 if (partPtr->find_publication_reference(dwId, pub) != 0 || pub == 0) { 01696 OpenDDS::DCPS::RepoIdConverter part_converter(partId); 01697 OpenDDS::DCPS::RepoIdConverter pub_converter(dwId); 01698 ACE_ERROR((LM_ERROR, 01699 ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::update_publication_qos: ") 01700 ACE_TEXT("participant %C could not find publication %C.\n"), 01701 std::string(part_converter).c_str(), 01702 std::string(pub_converter).c_str())); 01703 throw OpenDDS::DCPS::Invalid_Publication(); 01704 } 01705 01706 Update::SpecificQos qosType; 01707 01708 if (pub->set_qos(qos, publisherQos, qosType) == false) // failed 01709 return 0; 01710 01711 if (this->um_ && (partPtr->isBitPublisher() == false)) { 01712 Update::IdPath path(domainId, partId, dwId); 01713 01714 switch (qosType) { 01715 case Update::DataWriterQos: 01716 this->um_->update(path, qos); 01717 break; 01718 01719 case Update::PublisherQos: 01720 this->um_->update(path, publisherQos); 01721 break; 01722 01723 case Update::NoQos: 01724 default: 01725 break; 01726 } 01727 01728 if (OpenDDS::DCPS::DCPS_debug_level > 4) { 01729 OpenDDS::DCPS::RepoIdConverter converter(dwId); 01730 ACE_DEBUG((LM_DEBUG, 01731 ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::update_publication_qos: ") 01732 ACE_TEXT("pushing update of publication %C in domain %d.\n"), 01733 std::string(converter).c_str(), 01734 domainId)); 01735 } 01736 } 01737 01738 return 1; 01739 }
CORBA::Boolean TAO_DDS_DCPSInfo_i::update_subscription_params | ( | DDS::DomainId_t | domainId, | |
const OpenDDS::DCPS::RepoId & | participantId, | |||
const OpenDDS::DCPS::RepoId & | subscriptionId, | |||
const DDS::StringSeq & | params | |||
) |
Definition at line 1994 of file DCPSInfo_i.cpp.
References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, domain(), domains_, DCPS_IR_Participant::find_subscription_reference(), DCPS_IR_Participant::isBitPublisher(), LM_ERROR, LM_INFO, lock_, um_, Update::Manager::update(), and DCPS_IR_Subscription::update_expr_params().
Referenced by OpenDDS::Federator::ManagerImpl::processUpdateFilterExpressionParams().
01999 { 02000 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, 0); 02001 02002 DCPS_IR_Domain_Map::iterator domain = this->domains_.find(domainId); 02003 if (domain == this->domains_.end()) { 02004 throw OpenDDS::DCPS::Invalid_Domain(); 02005 } 02006 02007 DCPS_IR_Participant* partPtr = domain->second->participant(participantId); 02008 if (0 == partPtr) { 02009 throw OpenDDS::DCPS::Invalid_Participant(); 02010 } 02011 02012 if (OpenDDS::DCPS::DCPS_debug_level > 3) { 02013 ACE_DEBUG((LM_INFO, "(%P|%t) updating subscription params\n")); 02014 } 02015 02016 DCPS_IR_Subscription* sub; 02017 if (partPtr->find_subscription_reference(subscriptionId, sub) != 0) { 02018 OpenDDS::DCPS::RepoIdConverter part_converter(participantId); 02019 OpenDDS::DCPS::RepoIdConverter sub_converter(subscriptionId); 02020 ACE_ERROR((LM_ERROR, 02021 ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::update_subscription_params: ") 02022 ACE_TEXT("participant %C could not find subscription %C.\n"), 02023 std::string(part_converter).c_str(), 02024 std::string(sub_converter).c_str())); 02025 throw OpenDDS::DCPS::Invalid_Subscription(); 02026 } 02027 02028 sub->update_expr_params(params); // calls writers via DataWriterRemote 02029 02030 if (this->um_ && !partPtr->isBitPublisher()) { 02031 Update::IdPath path(domainId, participantId, subscriptionId); 02032 this->um_->update(path, params); 02033 } 02034 02035 return true; 02036 }
void TAO_DDS_DCPSInfo_i::update_subscription_qos | ( | DDS::DomainId_t | domainId, | |
const OpenDDS::DCPS::RepoId & | partId, | |||
const OpenDDS::DCPS::RepoId & | drId, | |||
const DDS::SubscriberQos & | qos | |||
) |
Entry for federation updates of SubscriberQos values.
Definition at line 1950 of file DCPSInfo_i.cpp.
References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, domains_, DCPS_IR_Participant::find_subscription_reference(), LM_ERROR, LM_INFO, lock_, and DCPS_IR_Subscription::set_qos().
01955 { 01956 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_); 01957 01958 // Grab the domain. 01959 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId); 01960 01961 if (where == this->domains_.end()) { 01962 throw OpenDDS::DCPS::Invalid_Domain(); 01963 } 01964 01965 // Grab the participant. 01966 DCPS_IR_Participant* partPtr 01967 = where->second->participant(partId); 01968 01969 if (0 == partPtr) { 01970 throw OpenDDS::DCPS::Invalid_Participant(); 01971 } 01972 01973 DCPS_IR_Subscription* sub; 01974 01975 if (OpenDDS::DCPS::DCPS_debug_level > 3) { 01976 ACE_DEBUG((LM_INFO, "(%P|%t) updating QOS for subscription 3\n")); 01977 } 01978 01979 if (partPtr->find_subscription_reference(drId, sub) != 0 || sub == 0) { 01980 OpenDDS::DCPS::RepoIdConverter part_converter(partId); 01981 OpenDDS::DCPS::RepoIdConverter sub_converter(drId); 01982 ACE_ERROR((LM_ERROR, 01983 ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::update_subscription_qos: ") 01984 ACE_TEXT("participant %C could not find subscription %C.\n"), 01985 std::string(part_converter).c_str(), 01986 std::string(sub_converter).c_str())); 01987 throw OpenDDS::DCPS::Invalid_Subscription(); 01988 } 01989 01990 sub->set_qos(qos); 01991 }
void TAO_DDS_DCPSInfo_i::update_subscription_qos | ( | DDS::DomainId_t | domainId, | |
const OpenDDS::DCPS::RepoId & | partId, | |||
const OpenDDS::DCPS::RepoId & | drId, | |||
const DDS::DataReaderQos & | qos | |||
) |
Entry for federation updates of DataReaderQos values.
Definition at line 1906 of file DCPSInfo_i.cpp.
References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, domains_, DCPS_IR_Participant::find_subscription_reference(), LM_ERROR, LM_INFO, lock_, and DCPS_IR_Subscription::set_qos().
01911 { 01912 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_); 01913 01914 // Grab the domain. 01915 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId); 01916 01917 if (where == this->domains_.end()) { 01918 throw OpenDDS::DCPS::Invalid_Domain(); 01919 } 01920 01921 // Grab the participant. 01922 DCPS_IR_Participant* partPtr 01923 = where->second->participant(partId); 01924 01925 if (0 == partPtr) { 01926 throw OpenDDS::DCPS::Invalid_Participant(); 01927 } 01928 01929 DCPS_IR_Subscription* sub; 01930 01931 if (OpenDDS::DCPS::DCPS_debug_level > 3) { 01932 ACE_DEBUG((LM_INFO, "(%P|%t) updating QOS for subscription 2\n")); 01933 } 01934 01935 if (partPtr->find_subscription_reference(drId, sub) != 0 || sub == 0) { 01936 OpenDDS::DCPS::RepoIdConverter part_converter(partId); 01937 OpenDDS::DCPS::RepoIdConverter sub_converter(drId); 01938 ACE_ERROR((LM_ERROR, 01939 ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::update_subscription_qos: ") 01940 ACE_TEXT("participant %C could not find subscription %C.\n"), 01941 std::string(part_converter).c_str(), 01942 std::string(sub_converter).c_str())); 01943 throw OpenDDS::DCPS::Invalid_Subscription(); 01944 } 01945 01946 sub->set_qos(qos); 01947 }
CORBA::Boolean TAO_DDS_DCPSInfo_i::update_subscription_qos | ( | DDS::DomainId_t | domainId, | |
const OpenDDS::DCPS::RepoId & | partId, | |||
const OpenDDS::DCPS::RepoId & | drId, | |||
const DDS::DataReaderQos & | qos, | |||
const DDS::SubscriberQos & | subscriberQos | |||
) | [virtual] |
Definition at line 1829 of file DCPSInfo_i.cpp.
References ACE_TEXT(), Update::DataReaderQos, OpenDDS::DCPS::DCPS_debug_level, domains_, DCPS_IR_Participant::find_subscription_reference(), DCPS_IR_Participant::isBitPublisher(), LM_DEBUG, LM_ERROR, LM_INFO, lock_, Update::NoQos, DCPS_IR_Subscription::set_qos(), Update::SubscriberQos, um_, and Update::Manager::update().
Referenced by OpenDDS::Federator::ManagerImpl::processUpdateQos1(), and OpenDDS::Federator::ManagerImpl::processUpdateQos2().
01835 { 01836 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, 0); 01837 01838 // Grab the domain. 01839 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId); 01840 01841 if (where == this->domains_.end()) { 01842 throw OpenDDS::DCPS::Invalid_Domain(); 01843 } 01844 01845 // Grab the participant. 01846 DCPS_IR_Participant* partPtr 01847 = where->second->participant(partId); 01848 01849 if (0 == partPtr) { 01850 throw OpenDDS::DCPS::Invalid_Participant(); 01851 } 01852 01853 DCPS_IR_Subscription* sub; 01854 01855 if (OpenDDS::DCPS::DCPS_debug_level > 3) { 01856 ACE_DEBUG((LM_INFO, "(%P|%t) updating QOS for subscription 1\n")); 01857 } 01858 01859 if (partPtr->find_subscription_reference(drId, sub) != 0 || sub == 0) { 01860 OpenDDS::DCPS::RepoIdConverter part_converter(partId); 01861 OpenDDS::DCPS::RepoIdConverter sub_converter(drId); 01862 ACE_ERROR((LM_ERROR, 01863 ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::update_subscription_qos: ") 01864 ACE_TEXT("participant %C could not find subscription %C.\n"), 01865 std::string(part_converter).c_str(), 01866 std::string(sub_converter).c_str())); 01867 throw OpenDDS::DCPS::Invalid_Subscription(); 01868 } 01869 01870 Update::SpecificQos qosType; 01871 01872 if (sub->set_qos(qos, subscriberQos, qosType) == false) // failed 01873 return 0; 01874 01875 if (this->um_ && (partPtr->isBitPublisher() == false)) { 01876 Update::IdPath path(domainId, partId, drId); 01877 01878 switch (qosType) { 01879 case Update::DataReaderQos: 01880 this->um_->update(path, qos); 01881 break; 01882 01883 case Update::SubscriberQos: 01884 this->um_->update(path, subscriberQos); 01885 break; 01886 01887 case Update::NoQos: 01888 default: 01889 break; 01890 } 01891 01892 if (OpenDDS::DCPS::DCPS_debug_level > 4) { 01893 OpenDDS::DCPS::RepoIdConverter converter(drId); 01894 ACE_DEBUG((LM_DEBUG, 01895 ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::update_subscription_qos: ") 01896 ACE_TEXT("pushing update of subscription %C in domain %d.\n"), 01897 std::string(converter).c_str(), 01898 domainId)); 01899 } 01900 } 01901 01902 return 1; 01903 }
CORBA::Boolean TAO_DDS_DCPSInfo_i::update_topic_qos | ( | const OpenDDS::DCPS::RepoId & | topicId, | |
DDS::DomainId_t | domainId, | |||
const OpenDDS::DCPS::RepoId & | participantId, | |||
const DDS::TopicQos & | qos | |||
) | [virtual] |
Definition at line 2038 of file DCPSInfo_i.cpp.
References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, domains_, DCPS_IR_Participant::find_topic_reference(), DCPS_IR_Participant::isBitPublisher(), DCPS_IR_Participant::isOwner(), LM_DEBUG, lock_, DCPS_IR_Topic::set_topic_qos(), um_, and Update::Manager::update().
Referenced by OpenDDS::Federator::ManagerImpl::processUpdateQos1().
02043 { 02044 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, 0); 02045 02046 // Grab the domain. 02047 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId); 02048 02049 if (where == this->domains_.end()) { 02050 throw OpenDDS::DCPS::Invalid_Domain(); 02051 } 02052 02053 // Grab the participant. 02054 DCPS_IR_Participant* partPtr 02055 = where->second->participant(participantId); 02056 02057 if (0 == partPtr) { 02058 throw OpenDDS::DCPS::Invalid_Participant(); 02059 } 02060 02061 DCPS_IR_Topic* topic; 02062 02063 if (partPtr->find_topic_reference(topicId, topic) != 0) { 02064 throw OpenDDS::DCPS::Invalid_Topic(); 02065 } 02066 02067 if (topic->set_topic_qos(qos) == false) 02068 return 0; 02069 02070 if (this->um_ 02071 && (partPtr->isOwner() == true) 02072 && (partPtr->isBitPublisher() == false)) { 02073 Update::IdPath path(domainId, participantId, topicId); 02074 this->um_->update(path, qos); 02075 02076 if (OpenDDS::DCPS::DCPS_debug_level > 4) { 02077 OpenDDS::DCPS::RepoIdConverter converter(topicId); 02078 ACE_DEBUG((LM_DEBUG, 02079 ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::update_topic_qos: ") 02080 ACE_TEXT("pushing update of topic %C in domain %d.\n"), 02081 std::string(converter).c_str(), 02082 domainId)); 02083 } 02084 } 02085 02086 return 1; 02087 }
long TAO_DDS_DCPSInfo_i::dispatch_check_timer_id_ [private] |
Definition at line 428 of file DCPSInfo_i.h.
Referenced by finalize(), and init_dispatchChecking().
Definition at line 414 of file DCPSInfo_i.h.
Referenced by add_publication(), add_subscription(), handle_timeout(), and TAO_DDS_DCPSInfo_i().
Definition at line 412 of file DCPSInfo_i.h.
Referenced by add_publication(), add_subscription(), add_topic(), assert_topic(), association_complete(), attach_participant(), changeOwnership(), disassociate_participant(), disassociate_publication(), disassociate_subscription(), domain(), domains(), dump_to_string(), find_topic(), handle_timeout(), ignore_domain_participant(), ignore_publication(), ignore_subscription(), ignore_topic(), receive_image(), remove_by_owner(), remove_domain_participant(), remove_publication(), remove_subscription(), remove_topic(), update_domain_participant_qos(), update_publication_qos(), update_subscription_params(), update_subscription_qos(), and update_topic_qos().
const TAO_DDS_DCPSFederationId& TAO_DDS_DCPSInfo_i::federation_ [private] |
Definition at line 416 of file DCPSInfo_i.h.
Referenced by add_domain_participant(), add_publication(), add_subscription(), add_topic(), and domain().
Definition at line 425 of file DCPSInfo_i.h.
Referenced by add_domain_participant(), add_publication(), add_subscription(), add_topic(), assert_topic(), association_complete(), attach_participant(), changeOwnership(), disassociate_participant(), disassociate_publication(), disassociate_subscription(), find_topic(), handle_timeout(), ignore_domain_participant(), ignore_publication(), ignore_subscription(), ignore_topic(), remove_by_owner(), remove_domain_participant(), remove_publication(), remove_subscription(), remove_topic(), update_domain_participant_qos(), update_publication_qos(), update_subscription_params(), update_subscription_qos(), and update_topic_qos().
CORBA::ORB_var TAO_DDS_DCPSInfo_i::orb_ [private] |
Definition at line 413 of file DCPSInfo_i.h.
Referenced by add_publication(), add_subscription(), finalize(), init_dispatchChecking(), init_reassociation(), and orb().
Definition at line 417 of file DCPSInfo_i.h.
Referenced by domain(), and receive_image().
long TAO_DDS_DCPSInfo_i::reassociate_timer_id_ [private] |
Definition at line 427 of file DCPSInfo_i.h.
Referenced by finalize(), and init_reassociation().
bool TAO_DDS_DCPSInfo_i::reincarnate_ [private] |
Definition at line 420 of file DCPSInfo_i.h.
Referenced by domain(), and init_persistence().
ShutdownInterface* TAO_DDS_DCPSInfo_i::shutdown_ [private] |
Interface to effect shutdown of the process.
Definition at line 423 of file DCPSInfo_i.h.
Referenced by shutdown().
Update::Manager* TAO_DDS_DCPSInfo_i::um_ [private] |
Definition at line 419 of file DCPSInfo_i.h.
Referenced by add(), add_domain_participant(), add_publication(), add_subscription(), assert_topic(), init_persistence(), remove_domain_participant(), remove_publication(), remove_subscription(), remove_topic(), update_domain_participant_qos(), update_publication_qos(), update_subscription_params(), update_subscription_qos(), and update_topic_qos().