#include <DCPSInfo_i.h>
Collaboration diagram for TAO_DDS_DCPSInfo_i:
Public Member Functions | |
TAO_DDS_DCPSInfo_i (CORBA::ORB_ptr orb, bool reincarnate, ShutdownInterface *shutdown, const TAO_DDS_DCPSFederationId &federation) | |
virtual | ~TAO_DDS_DCPSInfo_i () |
virtual int | handle_timeout (const ACE_Time_Value &now, const void *arg) |
virtual CORBA::Boolean | attach_participant (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &participantId) |
virtual OpenDDS::DCPS::TopicStatus | assert_topic (OpenDDS::DCPS::RepoId_out topicId, DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &participantId, const char *topicName, const char *dataTypeName, const DDS::TopicQos &qos, bool hasDcpsKey) |
bool | add_topic (const OpenDDS::DCPS::RepoId &topicId, DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &participantId, const char *topicName, const char *dataTypeName, const DDS::TopicQos &qos) |
Add a previously existing topic to the repository. | |
virtual OpenDDS::DCPS::TopicStatus | find_topic (DDS::DomainId_t domainId, const char *topicName, CORBA::String_out dataTypeName, DDS::TopicQos_out qos, OpenDDS::DCPS::RepoId_out topicId) |
virtual OpenDDS::DCPS::TopicStatus | remove_topic (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &participantId, const OpenDDS::DCPS::RepoId &topicId) |
virtual OpenDDS::DCPS::RepoId | add_publication (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &participantId, const OpenDDS::DCPS::RepoId &topicId, OpenDDS::DCPS::DataWriterRemote_ptr publication, const DDS::DataWriterQos &qos, const OpenDDS::DCPS::TransportLocatorSeq &transInfo, const DDS::PublisherQos &publisherQos) |
bool | add_publication (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &participantId, const OpenDDS::DCPS::RepoId &topicId, const OpenDDS::DCPS::RepoId &pubId, const char *pub_str, const DDS::DataWriterQos &qos, const OpenDDS::DCPS::TransportLocatorSeq &transInfo, const DDS::PublisherQos &publisherQos, bool associate=false) |
Add a previously existing publication to the repository. | |
virtual void | remove_publication (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &participantId, const OpenDDS::DCPS::RepoId &publicationId) |
virtual OpenDDS::DCPS::RepoId | add_subscription (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &participantId, const OpenDDS::DCPS::RepoId &topicId, OpenDDS::DCPS::DataReaderRemote_ptr subscription, const DDS::DataReaderQos &qos, const OpenDDS::DCPS::TransportLocatorSeq &transInfo, const DDS::SubscriberQos &subscriberQos, const char *filterClassName, const char *filterExpression, const DDS::StringSeq &exprParams) |
bool | add_subscription (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &participantId, const OpenDDS::DCPS::RepoId &topicId, const OpenDDS::DCPS::RepoId &subId, const char *sub_str, const DDS::DataReaderQos &qos, const OpenDDS::DCPS::TransportLocatorSeq &transInfo, const DDS::SubscriberQos &subscriberQos, const char *filterClassName, const char *filterExpression, const DDS::StringSeq &exprParams, bool associate=false) |
Add a previously existing subscription to the repository. | |
virtual void | remove_subscription (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &participantId, const OpenDDS::DCPS::RepoId &subscriptionId) |
virtual OpenDDS::DCPS::AddDomainStatus | add_domain_participant (DDS::DomainId_t domain, const DDS::DomainParticipantQos &qos) |
bool | add_domain_participant (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &participantId, const DDS::DomainParticipantQos &qos) |
Add a previously existing participant to the repository. | |
virtual void | remove_domain_participant (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &participantId) |
virtual void | association_complete (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &participantId, const OpenDDS::DCPS::RepoId &localId, const OpenDDS::DCPS::RepoId &remoteId) |
bool | remove_by_owner (DDS::DomainId_t domain, long owner) |
virtual void | disassociate_participant (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &local_id, const OpenDDS::DCPS::RepoId &remote_id) |
virtual void | disassociate_subscription (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &participantId, const OpenDDS::DCPS::RepoId &local_id, const OpenDDS::DCPS::RepoId &remote_id) |
virtual void | disassociate_publication (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &participantId, const OpenDDS::DCPS::RepoId &local_id, const OpenDDS::DCPS::RepoId &remote_id) |
virtual void | ignore_domain_participant (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &myParticipantId, const OpenDDS::DCPS::RepoId &ignoreId) |
virtual void | ignore_topic (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &myParticipantId, const OpenDDS::DCPS::RepoId &ignoreId) |
virtual void | ignore_subscription (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &myParticipantId, const OpenDDS::DCPS::RepoId &ignoreId) |
virtual void | ignore_publication (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &myParticipantId, const OpenDDS::DCPS::RepoId &ignoreId) |
virtual CORBA::Boolean | update_publication_qos (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &partId, const OpenDDS::DCPS::RepoId &dwId, const DDS::DataWriterQos &qos, const DDS::PublisherQos &publisherQos) |
void | update_publication_qos (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &partId, const OpenDDS::DCPS::RepoId &dwId, const DDS::DataWriterQos &qos) |
Entry for federation updates of DataWriterQos values. | |
void | update_publication_qos (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &partId, const OpenDDS::DCPS::RepoId &dwId, const DDS::PublisherQos &qos) |
Entry for federation updates of PublisherQos values. | |
virtual CORBA::Boolean | update_subscription_qos (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &partId, const OpenDDS::DCPS::RepoId &drId, const DDS::DataReaderQos &qos, const DDS::SubscriberQos &subscriberQos) |
void | update_subscription_qos (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &partId, const OpenDDS::DCPS::RepoId &drId, const DDS::DataReaderQos &qos) |
Entry for federation updates of DataReaderQos values. | |
void | update_subscription_qos (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &partId, const OpenDDS::DCPS::RepoId &drId, const DDS::SubscriberQos &qos) |
Entry for federation updates of SubscriberQos values. | |
virtual ::CORBA::Boolean | update_subscription_params (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &participantId, const OpenDDS::DCPS::RepoId &subscriptionId, const DDS::StringSeq ¶ms) |
virtual CORBA::Boolean | update_topic_qos (const OpenDDS::DCPS::RepoId &topicId, DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &participantId, const DDS::TopicQos &qos) |
virtual CORBA::Boolean | update_domain_participant_qos (DDS::DomainId_t domain, const OpenDDS::DCPS::RepoId &participantId, const DDS::DomainParticipantQos &qos) |
virtual void | shutdown () |
Cause the entire repository to exit. | |
virtual char * | dump_to_string () |
Dump the Repos state to string. | |
bool | changeOwnership (DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId &participantId, long sender, long owner) |
assert new ownership for a participant and its contained entities. | |
int | init_transport (int listen_address_given, const char *listen_str) |
bool | receive_image (const Update::UImage &image) |
void | add (Update::Updater *updater) |
Add an additional Updater interface. | |
DCPS_IR_Domain * | domain (DDS::DomainId_t domain) |
Convert a domain Id into a reference to a DCPS_IR_Domain object. | |
const DCPS_IR_Domain_Map & | domains () const |
Expose a readable reference of the domain map. | |
CORBA::ORB_ptr | orb () |
Expose the ORB. | |
bool | init_persistence () |
bool | init_reassociation (const ACE_Time_Value &delay) |
bool | init_dispatchChecking (const ACE_Time_Value &delay) |
void | finalize () |
Cleanup state for shutdown. | |
Private Attributes | |
DCPS_IR_Domain_Map | domains_ |
CORBA::ORB_var | orb_ |
CORBA::ORB_var | dispatchingOrb_ |
const TAO_DDS_DCPSFederationId & | federation_ |
RepoIdGenerator | participantIdGenerator_ |
Update::Manager * | um_ |
bool | reincarnate_ |
ShutdownInterface * | shutdown_ |
Interface to effect shutdown of the process. | |
ACE_Recursive_Thread_Mutex | lock_ |
long | reassociate_timer_id_ |
long | dispatch_check_timer_id_ |
This is the Information Repository object. Clients of the system will use the CORBA reference of this object.
Definition at line 52 of file DCPSInfo_i.h.
TAO_DDS_DCPSInfo_i::TAO_DDS_DCPSInfo_i | ( | CORBA::ORB_ptr | orb, | |
bool | reincarnate, | |||
ShutdownInterface * | shutdown, | |||
const TAO_DDS_DCPSFederationId & | federation | |||
) |
Definition at line 34 of file DCPSInfo_i.cpp.
References dispatchingOrb_.
00038 : orb_(CORBA::ORB::_duplicate(orb)) 00039 , federation_(federation) 00040 , participantIdGenerator_(federation.id()) 00041 , um_(0) 00042 , reincarnate_(reincarnate) 00043 , shutdown_(shutdown) 00044 , reassociate_timer_id_(-1) 00045 , dispatch_check_timer_id_(-1) 00046 { 00047 int argc = 0; 00048 char** no_argv = 0; 00049 dispatchingOrb_ = CORBA::ORB_init(argc, no_argv, "dispatchingOnly"); 00050 }
TAO_DDS_DCPSInfo_i::~TAO_DDS_DCPSInfo_i | ( | ) | [virtual] |
void TAO_DDS_DCPSInfo_i::add | ( | Update::Updater * | updater | ) |
Add an additional Updater interface.
Definition at line 2349 of file DCPSInfo_i.cpp.
References Update::Manager::add(), and um_.
bool TAO_DDS_DCPSInfo_i::add_domain_participant | ( | DDS::DomainId_t | domainId, | |
const OpenDDS::DCPS::RepoId & | participantId, | |||
const DDS::DomainParticipantQos & | qos | |||
) |
Add a previously existing participant to the repository.
domainId | the Domain in which the Participant is contained. | |
participantId | the GUID Id value to use for the Participant. | |
qos | the QoS value of the Participant. |
Definition at line 1071 of file DCPSInfo_i.cpp.
References DCPS_IR_Domain::add_participant(), OpenDDS::DCPS::DCPS_debug_level, domain(), federation_, OpenDDS::DCPS::RepoIdConverter::federationId(), TAO_DDS_DCPSFederationId::id(), DCPS_IR_Domain::last_participant_key(), DCPS_IR_Domain::participant(), OpenDDS::DCPS::RepoIdConverter::participantId(), and um_.
01074 { 01075 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, false); 01076 01077 // Grab the domain. 01078 DCPS_IR_Domain* domainPtr = this->domain(domainId); 01079 01080 if (0 == domainPtr) { 01081 if (OpenDDS::DCPS::DCPS_debug_level > 4) { 01082 ACE_DEBUG((LM_WARNING, 01083 ACE_TEXT("(%P|%t) WARNING: (bool)TAO_DDS_DCPSInfo_i::add_domain_participant: ") 01084 ACE_TEXT("invalid domain Id: %d\n"), 01085 domainId)); 01086 } 01087 01088 return false; 01089 } 01090 01091 // Prepare to manipulate the participant's Id value. 01092 OpenDDS::DCPS::RepoIdConverter converter(participantId); 01093 01094 // Grab the participant. 01095 DCPS_IR_Participant* partPtr 01096 = domainPtr->participant(participantId); 01097 01098 if (0 != partPtr) { 01099 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 01100 ACE_ERROR((LM_ERROR, 01101 ACE_TEXT("(%P|%t) (bool)TAO_DDS_DCPSInfo_i::add_domain_participant: ") 01102 ACE_TEXT("participant id %C already exists.\n"), 01103 std::string(converter).c_str())); 01104 } 01105 01106 return false; 01107 } 01108 01109 DCPS_IR_Participant* participant; 01110 ACE_NEW_RETURN(participant, 01111 DCPS_IR_Participant(this->federation_, 01112 participantId, 01113 domainPtr, 01114 qos, um_), 0); 01115 01116 switch (domainPtr->add_participant(participant)) { 01117 case -1: { 01118 ACE_ERROR((LM_ERROR, 01119 ACE_TEXT("(%P|%t) ERROR: (bool)TAO_DDS_DCPSInfo_i::add_domain_participant: ") 01120 ACE_TEXT("failed to load participant %C in domain %d.\n"), 01121 std::string(converter).c_str(), 01122 domainId)); 01123 } 01124 delete participant; 01125 return false; 01126 01127 case 1: 01128 01129 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 01130 ACE_DEBUG((LM_WARNING, 01131 ACE_TEXT("(%P|%t) WARNING: (bool)TAO_DDS_DCPSInfo_i::add_domain_participant: ") 01132 ACE_TEXT("attempt to load duplicate participant %C in domain %d.\n"), 01133 std::string(converter).c_str(), 01134 domainId)); 01135 } 01136 01137 delete participant; 01138 return false; 01139 01140 case 0: 01141 default: 01142 break; 01143 } 01144 01145 // See if we are adding a participant that was created within this 01146 // repository or a different repository. 01147 if (converter.federationId() == this->federation_.id()) { 01148 // Ensure the participant GUID values do not conflict. 01149 domainPtr->last_participant_key(converter.participantId()); 01150 01151 if (OpenDDS::DCPS::DCPS_debug_level > 4) { 01152 ACE_DEBUG((LM_DEBUG, 01153 ACE_TEXT("(%P|%t) (bool)TAO_DDS_DCPSInfo_i::add_domain_participant: ") 01154 ACE_TEXT("Adjusting highest participant Id value to at least %d.\n"), 01155 converter.participantId())); 01156 } 01157 } 01158 01159 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 01160 ACE_DEBUG((LM_DEBUG, 01161 ACE_TEXT("(%P|%t) (bool)TAO_DDS_DCPSInfo_i::add_domain_participant: ") 01162 ACE_TEXT("loaded participant %C at 0x%x in domain %d.\n"), 01163 std::string(converter).c_str(), 01164 participant, 01165 domainId)); 01166 } 01167 01168 return true; 01169 }
OpenDDS::DCPS::AddDomainStatus TAO_DDS_DCPSInfo_i::add_domain_participant | ( | DDS::DomainId_t | domain, | |
const DDS::DomainParticipantQos & | qos | |||
) | [virtual] |
Definition at line 979 of file DCPSInfo_i.cpp.
References DCPS_IR_Domain::add_participant(), Update::Manager::create(), OpenDDS::DCPS::DCPS_debug_level, domain(), OpenDDS::DCPS::AddDomainStatus::federated, federation_, DCPS_IR_Domain::get_next_participant_id(), OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::DCPS::AddDomainStatus::id, DCPS_IR_Participant::isBitPublisher(), TAO_DDS_DCPSFederationId::overridden(), DCPS_IR_Participant::owner(), DCPS_IR_Domain::participants(), DCPS_IR_Participant::takeOwnership(), TheServiceParticipant, and um_.
Referenced by Update::Manager::add(), and OpenDDS::Federator::ManagerImpl::processCreate().
00982 { 00983 // A value to return. 00984 OpenDDS::DCPS::AddDomainStatus value; 00985 value.id = OpenDDS::DCPS::GUID_UNKNOWN; 00986 value.federated = this->federation_.overridden(); 00987 00988 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, value); 00989 00990 // Grab the domain. 00991 DCPS_IR_Domain* domainPtr = this->domain(domain); 00992 00993 if (0 == domainPtr) { 00994 throw OpenDDS::DCPS::Invalid_Domain(); 00995 } 00996 00997 // Obtain a shiny new GUID value. 00998 OpenDDS::DCPS::RepoId participantId = domainPtr->get_next_participant_id(); 00999 01000 DCPS_IR_Participant* participant; 01001 ACE_NEW_RETURN(participant, 01002 DCPS_IR_Participant( 01003 this->federation_, 01004 participantId, 01005 domainPtr, 01006 qos, um_), 01007 value); 01008 01009 // We created the participant, now we can return the Id value (eventually). 01010 value.id = participantId; 01011 01012 // Determine if this is the 'special' repository internal participant 01013 // that publishes the built-in topics for a domain. 01014 if (domainPtr->participants().empty() && TheServiceParticipant->get_BIT()) { 01015 participant->isBitPublisher() = true; 01016 01017 if (OpenDDS::DCPS::DCPS_debug_level > 4) { 01018 OpenDDS::DCPS::RepoIdConverter converter(participantId); 01019 ACE_DEBUG((LM_DEBUG, 01020 ACE_TEXT("(%P|%t) (RepoId)TAO_DDS_DCPSInfo_i::add_domain_participant: ") 01021 ACE_TEXT("participant %C in domain %d is BIT publisher for this domain.\n"), 01022 std::string(converter).c_str(), 01023 domain)); 01024 } 01025 } 01026 01027 // Assume responsibilty for writing back to the participant. 01028 participant->takeOwnership(); 01029 01030 int status = domainPtr->add_participant(participant); 01031 01032 if (0 != status) { 01033 // Adding the participant failed return the invalid 01034 // pariticipant Id number. 01035 participantId = OpenDDS::DCPS::GUID_UNKNOWN; 01036 delete participant; 01037 participant = 0; 01038 01039 } else if (this->um_ && (participant->isBitPublisher() == false)) { 01040 // Push this participant to interested observers. 01041 Update::UParticipant updateParticipant( 01042 domain, 01043 participant->owner(), 01044 participantId, 01045 const_cast<DDS::DomainParticipantQos &>(qos)); 01046 this->um_->create(updateParticipant); 01047 01048 if (OpenDDS::DCPS::DCPS_debug_level > 4) { 01049 OpenDDS::DCPS::RepoIdConverter converter(participantId); 01050 ACE_DEBUG((LM_DEBUG, 01051 ACE_TEXT("(%P|%t) (RepoId)TAO_DDS_DCPSInfo_i::add_domain_participant: ") 01052 ACE_TEXT("pushing creation of participant %C in domain %d.\n"), 01053 std::string(converter).c_str(), 01054 domain)); 01055 } 01056 } 01057 01058 if (OpenDDS::DCPS::DCPS_debug_level > 4) { 01059 OpenDDS::DCPS::RepoIdConverter converter(participantId); 01060 ACE_DEBUG((LM_DEBUG, 01061 ACE_TEXT("(%P|%t) (RepoId)TAO_DDS_DCPSInfo_i::add_domain_participant: ") 01062 ACE_TEXT("domain %d loaded participant %C at 0x%x.\n"), 01063 domain, 01064 std::string(converter).c_str(), 01065 participant)); 01066 } 01067 return value; 01068 }
bool TAO_DDS_DCPSInfo_i::add_publication | ( | DDS::DomainId_t | domainId, | |
const OpenDDS::DCPS::RepoId & | participantId, | |||
const OpenDDS::DCPS::RepoId & | topicId, | |||
const OpenDDS::DCPS::RepoId & | pubId, | |||
const char * | pub_str, | |||
const DDS::DataWriterQos & | qos, | |||
const OpenDDS::DCPS::TransportLocatorSeq & | transInfo, | |||
const DDS::PublisherQos & | publisherQos, | |||
bool | associate = false | |||
) |
Add a previously existing publication to the repository.
domainId | the Domain in which the Publication is contained. | |
participantId | the Participant in which the Publication is contained. | |
topicId | the Topic of the Publication. | |
pubId | the GUID Id value to use for the Publication. | |
pub_str | stringified publication callback to DataWriter. | |
qos | the QoS value of the DataWriter. | |
transInfo | the transport information for the Publication. | |
publisherQos | the QoS value of the Publisher. | |
associate | indicate whether to create new associations. |
: Check if this is already stored. If so, just clear the callback IOR.
Definition at line 473 of file DCPSInfo_i.cpp.
References DCPS_IR_Participant::add_publication(), DCPS_IR_Topic::add_publication_reference(), OpenDDS::DCPS::DCPS_debug_level, dispatchingOrb_, domains_, OpenDDS::DCPS::GuidConverter::entityKey(), federation_, OpenDDS::DCPS::RepoIdConverter::federationId(), TAO_DDS_DCPSFederationId::id(), DCPS_IR_Participant::last_publication_key(), and DCPS_IR_Participant::remove_publication().
00482 { 00483 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, false); 00484 00485 // Grab the domain. 00486 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId); 00487 00488 if (where == this->domains_.end()) { 00489 if (OpenDDS::DCPS::DCPS_debug_level > 4) { 00490 ACE_DEBUG((LM_WARNING, 00491 ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_publication: ") 00492 ACE_TEXT("invalid domain %d.\n"), 00493 domainId)); 00494 } 00495 00496 return false; 00497 } 00498 00499 // Grab the participant. 00500 DCPS_IR_Participant* partPtr 00501 = where->second->participant(participantId); 00502 00503 if (0 == partPtr) { 00504 if (OpenDDS::DCPS::DCPS_debug_level > 4) { 00505 OpenDDS::DCPS::RepoIdConverter converter(pubId); 00506 ACE_DEBUG((LM_WARNING, 00507 ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_publication: ") 00508 ACE_TEXT("invalid participant %C in domain %d.\n"), 00509 std::string(converter).c_str(), 00510 domainId)); 00511 } 00512 00513 return false; 00514 } 00515 00516 DCPS_IR_Topic* topic = where->second->find_topic(topicId); 00517 00518 if (topic == 0) { 00519 OpenDDS::DCPS::RepoIdConverter converter(topicId); 00520 ACE_DEBUG((LM_WARNING, 00521 ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_publication: ") 00522 ACE_TEXT("invalid topic %C in domain %d.\n"), 00523 std::string(converter).c_str(), 00524 domainId)); 00525 return false; 00526 } 00527 00528 /// @TODO: Check if this is already stored. If so, just clear the callback IOR. 00529 00530 CORBA::Object_var obj = dispatchingOrb_->string_to_object(pub_str); 00531 if (CORBA::is_nil(obj.in())) { 00532 if (OpenDDS::DCPS::DCPS_debug_level > 4) { 00533 ACE_DEBUG((LM_WARNING, 00534 ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_publication: ") 00535 ACE_TEXT("failure marshalling publication %s on dispatching orb.\n"), 00536 pub_str)); 00537 } 00538 return false; 00539 } 00540 OpenDDS::DCPS::DataWriterRemote_var publication 00541 = OpenDDS::DCPS::DataWriterRemote::_unchecked_narrow(obj.in()); 00542 00543 DCPS_IR_Publication* pubPtr; 00544 ACE_NEW_RETURN(pubPtr, 00545 DCPS_IR_Publication( 00546 pubId, 00547 partPtr, 00548 topic, 00549 publication.in(), 00550 qos, 00551 transInfo, 00552 publisherQos), 00553 0); 00554 00555 switch (partPtr->add_publication(pubPtr)) { 00556 case -1: { 00557 OpenDDS::DCPS::RepoIdConverter converter(pubId); 00558 ACE_ERROR((LM_ERROR, 00559 ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::add_publication: ") 00560 ACE_TEXT("failed to add publication to participant %C.\n"), 00561 std::string(converter).c_str())); 00562 } 00563 // Deliberate fall through to next case. 00564 00565 case 1: 00566 delete pubPtr; 00567 return false; 00568 00569 case 0: 00570 default: 00571 break; 00572 } 00573 00574 switch (topic->add_publication_reference(pubPtr, associate)) { 00575 case -1: { 00576 OpenDDS::DCPS::RepoIdConverter converter(pubId); 00577 ACE_ERROR((LM_ERROR, 00578 ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::add_publication: ") 00579 ACE_TEXT("failed to add publication to participant %C topic list.\n"), 00580 std::string(converter).c_str())); 00581 00582 // Remove the publication. 00583 partPtr->remove_publication(pubId); 00584 00585 } 00586 return false; 00587 00588 case 1: // This is actually a really really bad place to jump to. 00589 // This means that we successfully added the new publication 00590 // to the participant (it had not been inserted before) but 00591 // that we are adding a duplicate publication to the topic 00592 // list - which should never ever be able to happen. 00593 return false; 00594 00595 case 0: 00596 default: 00597 break; 00598 } 00599 00600 OpenDDS::DCPS::RepoIdConverter converter(pubId); 00601 00602 // See if we are adding a publication that was created within this 00603 // repository or a different repository. 00604 if (converter.federationId() == federation_.id()) { 00605 // Ensure the publication RepoId values do not conflict. 00606 partPtr->last_publication_key(converter.entityKey()); 00607 } 00608 00609 return true; 00610 }
OpenDDS::DCPS::RepoId TAO_DDS_DCPSInfo_i::add_publication | ( | DDS::DomainId_t | domainId, | |
const OpenDDS::DCPS::RepoId & | participantId, | |||
const OpenDDS::DCPS::RepoId & | topicId, | |||
OpenDDS::DCPS::DataWriterRemote_ptr | publication, | |||
const DDS::DataWriterQos & | qos, | |||
const OpenDDS::DCPS::TransportLocatorSeq & | transInfo, | |||
const DDS::PublisherQos & | publisherQos | |||
) | [virtual] |
Definition at line 364 of file DCPSInfo_i.cpp.
References DCPS_IR_Participant::add_publication(), DCPS_IR_Topic::add_publication_reference(), Update::Manager::create(), Update::DataWriter, OpenDDS::DCPS::DCPS_debug_level, dispatchingOrb_, domains_, DCPS_IR_Participant::get_next_publication_id(), OpenDDS::DCPS::GUID_UNKNOWN, DCPS_IR_Participant::isBitPublisher(), orb_, DCPS_IR_Participant::remove_publication(), and um_.
Referenced by Update::Manager::add().
00372 { 00373 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, OpenDDS::DCPS::GUID_UNKNOWN); 00374 00375 // Grab the domain. 00376 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId); 00377 00378 if (where == this->domains_.end()) { 00379 throw OpenDDS::DCPS::Invalid_Domain(); 00380 } 00381 00382 // Grab the participant. 00383 DCPS_IR_Participant* partPtr 00384 = where->second->participant(participantId); 00385 00386 if (0 == partPtr) { 00387 throw OpenDDS::DCPS::Invalid_Participant(); 00388 } 00389 00390 DCPS_IR_Topic* topic = where->second->find_topic(topicId); 00391 00392 if (topic == 0) { 00393 throw OpenDDS::DCPS::Invalid_Topic(); 00394 } 00395 00396 OpenDDS::DCPS::RepoId pubId = partPtr->get_next_publication_id(); 00397 00398 // Remarshall the remote reference onto the dispatching orb. 00399 OpenDDS::DCPS::DataWriterRemote_var marshalledPub(OpenDDS::DCPS::DataWriterRemote::_duplicate(publication)); 00400 if (CORBA::is_nil(marshalledPub.in())) { 00401 if (OpenDDS::DCPS::DCPS_debug_level > 4) { 00402 ACE_DEBUG((LM_WARNING, 00403 ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_publication: ") 00404 ACE_TEXT("invalid publication reference.\n"))); 00405 } 00406 return OpenDDS::DCPS::GUID_UNKNOWN; 00407 } 00408 CORBA::String_var pubStr = orb_->object_to_string(marshalledPub.in()); 00409 CORBA::Object_var pubObj = dispatchingOrb_->string_to_object(pubStr.in()); 00410 if (CORBA::is_nil(pubObj.in())) { 00411 if (OpenDDS::DCPS::DCPS_debug_level > 4) { 00412 ACE_DEBUG((LM_WARNING, 00413 ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_publication: ") 00414 ACE_TEXT("failure marshalling publication on dispatching orb.\n"))); 00415 } 00416 return OpenDDS::DCPS::GUID_UNKNOWN; 00417 } 00418 OpenDDS::DCPS::DataWriterRemote_var dispatchingPublication 00419 = OpenDDS::DCPS::DataWriterRemote::_unchecked_narrow(pubObj.in()); 00420 00421 DCPS_IR_Publication* pubPtr; 00422 ACE_NEW_RETURN(pubPtr, 00423 DCPS_IR_Publication( 00424 pubId, 00425 partPtr, 00426 topic, 00427 dispatchingPublication.in(), 00428 qos, 00429 transInfo, 00430 publisherQos), 00431 OpenDDS::DCPS::GUID_UNKNOWN); 00432 00433 if (partPtr->add_publication(pubPtr) != 0) { 00434 // failed to add. we are responsible for the memory. 00435 pubId = OpenDDS::DCPS::GUID_UNKNOWN; 00436 delete pubPtr; 00437 pubPtr = 0; 00438 00439 } else if (topic->add_publication_reference(pubPtr) != 0) { 00440 // Failed to add to the topic 00441 // so remove from participant and fail. 00442 partPtr->remove_publication(pubId); 00443 pubId = OpenDDS::DCPS::GUID_UNKNOWN; 00444 } 00445 00446 if (this->um_ && (partPtr->isBitPublisher() == false)) { 00447 CORBA::String_var callback = orb_->object_to_string(publication); 00448 Update::ContentSubscriptionInfo csi; 00449 00450 Update::UWActor actor(domainId, pubId, topicId, participantId, Update::DataWriter 00451 , callback.in() 00452 , const_cast<DDS::PublisherQos &>(publisherQos) 00453 , const_cast<DDS::DataWriterQos &>(qos) 00454 , const_cast<OpenDDS::DCPS::TransportLocatorSeq &> 00455 (transInfo), csi); 00456 this->um_->create(actor); 00457 00458 if (OpenDDS::DCPS::DCPS_debug_level > 4) { 00459 OpenDDS::DCPS::RepoIdConverter converter(pubId); 00460 ACE_DEBUG((LM_DEBUG, 00461 ACE_TEXT("(%P|%t) (RepoId)TAO_DDS_DCPSInfo_i::add_publication: ") 00462 ACE_TEXT("pushing creation of publication %C in domain %d.\n"), 00463 std::string(converter).c_str(), 00464 domainId)); 00465 } 00466 } 00467 00468 where->second->remove_dead_participants(); 00469 return pubId; 00470 }
bool TAO_DDS_DCPSInfo_i::add_subscription | ( | DDS::DomainId_t | domainId, | |
const OpenDDS::DCPS::RepoId & | participantId, | |||
const OpenDDS::DCPS::RepoId & | topicId, | |||
const OpenDDS::DCPS::RepoId & | subId, | |||
const char * | sub_str, | |||
const DDS::DataReaderQos & | qos, | |||
const OpenDDS::DCPS::TransportLocatorSeq & | transInfo, | |||
const DDS::SubscriberQos & | subscriberQos, | |||
const char * | filterClassName, | |||
const char * | filterExpression, | |||
const DDS::StringSeq & | exprParams, | |||
bool | associate = false | |||
) |
Add a previously existing subscription to the repository.
domainId | the Domain in which the Subscription is contained. | |
participantId | the Participant in which the Subscription is contained. | |
topicId | the Topic of the Subscription. | |
subId | the GUID Id value to use for the Subscription. | |
sub_str | stringified publication callback to DataReader. | |
qos | the QoS value of the DataReader. | |
transInfo | the transport information for the Subscription. | |
subscriberQos | the QoS value of the Subscriber. | |
associate | indicate whether to create new associations. |
Definition at line 786 of file DCPSInfo_i.cpp.
References DCPS_IR_Participant::add_subscription(), DCPS_IR_Topic::add_subscription_reference(), OpenDDS::DCPS::DCPS_debug_level, dispatchingOrb_, domains_, OpenDDS::DCPS::GuidConverter::entityKey(), federation_, OpenDDS::DCPS::RepoIdConverter::federationId(), TAO_DDS_DCPSFederationId::id(), DCPS_IR_Participant::last_subscription_key(), and DCPS_IR_Participant::remove_subscription().
00799 { 00800 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, false); 00801 00802 // Grab the domain. 00803 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId); 00804 00805 if (where == this->domains_.end()) { 00806 if (OpenDDS::DCPS::DCPS_debug_level > 4) { 00807 ACE_DEBUG((LM_WARNING, 00808 ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_subscription: ") 00809 ACE_TEXT("invalid domain %d.\n"), 00810 domainId)); 00811 } 00812 00813 return false; 00814 } 00815 00816 // Grab the participant. 00817 DCPS_IR_Participant* partPtr 00818 = where->second->participant(participantId); 00819 00820 if (0 == partPtr) { 00821 if (OpenDDS::DCPS::DCPS_debug_level > 4) { 00822 OpenDDS::DCPS::RepoIdConverter converter(participantId); 00823 ACE_DEBUG((LM_WARNING, 00824 ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_subscription: ") 00825 ACE_TEXT("invalid participant %C in domain %d.\n"), 00826 std::string(converter).c_str(), 00827 domainId)); 00828 } 00829 00830 return false; 00831 } 00832 00833 DCPS_IR_Topic* topic = where->second->find_topic(topicId); 00834 00835 if (topic == 0) { 00836 if (OpenDDS::DCPS::DCPS_debug_level > 4) { 00837 OpenDDS::DCPS::RepoIdConverter converter(topicId); 00838 ACE_DEBUG((LM_WARNING, 00839 ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_subscription: ") 00840 ACE_TEXT("invalid topic %C in domain %d.\n"), 00841 std::string(converter).c_str(), 00842 domainId)); 00843 } 00844 00845 return false; 00846 } 00847 00848 CORBA::Object_var obj = dispatchingOrb_->string_to_object(sub_str); 00849 if (CORBA::is_nil(obj.in())) { 00850 if (OpenDDS::DCPS::DCPS_debug_level > 4) { 00851 ACE_DEBUG((LM_WARNING, 00852 ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_subscription: ") 00853 ACE_TEXT("failure marshalling subscription %s on dispatching orb.\n"), 00854 sub_str)); 00855 } 00856 return false; 00857 } 00858 OpenDDS::DCPS::DataReaderRemote_var subscription 00859 = OpenDDS::DCPS::DataReaderRemote::_unchecked_narrow(obj.in()); 00860 00861 DCPS_IR_Subscription* subPtr; 00862 ACE_NEW_RETURN(subPtr, 00863 DCPS_IR_Subscription( 00864 subId, 00865 partPtr, 00866 topic, 00867 subscription.in(), 00868 qos, 00869 transInfo, 00870 subscriberQos, 00871 filterClassName, 00872 filterExpression, 00873 exprParams), 00874 0); 00875 00876 switch (partPtr->add_subscription(subPtr)) { 00877 case -1: { 00878 OpenDDS::DCPS::RepoIdConverter converter(subId); 00879 ACE_ERROR((LM_ERROR, 00880 ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::add_subscription: ") 00881 ACE_TEXT("failed to add subscription to participant %C.\n"), 00882 std::string(converter).c_str())); 00883 } 00884 // Deliberate fall through to next case. 00885 00886 case 1: 00887 delete subPtr; 00888 return false; 00889 00890 case 0: 00891 default: 00892 break; 00893 } 00894 00895 switch (topic->add_subscription_reference(subPtr, associate)) { 00896 case -1: { 00897 OpenDDS::DCPS::RepoIdConverter converter(subId); 00898 ACE_ERROR((LM_ERROR, 00899 ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::add_subscription: ") 00900 ACE_TEXT("failed to add subscription to participant %C topic list.\n"), 00901 std::string(converter).c_str())); 00902 00903 // Remove the subscription. 00904 partPtr->remove_subscription(subId); 00905 00906 } 00907 return false; 00908 00909 case 1: // This is actually a really really bad place to jump to. 00910 // This means that we successfully added the new subscription 00911 // to the participant (it had not been inserted before) but 00912 // that we are adding a duplicate subscription to the topic 00913 // list - which should never ever be able to happen. 00914 return false; 00915 00916 case 0: 00917 default: 00918 break; 00919 } 00920 00921 OpenDDS::DCPS::RepoIdConverter converter(subId); 00922 00923 // See if we are adding a subscription that was created within this 00924 // repository or a different repository. 00925 if (converter.federationId() == federation_.id()) { 00926 // Ensure the subscription RepoId values do not conflict. 00927 partPtr->last_subscription_key(converter.entityKey()); 00928 } 00929 00930 return true; 00931 }
OpenDDS::DCPS::RepoId TAO_DDS_DCPSInfo_i::add_subscription | ( | DDS::DomainId_t | domainId, | |
const OpenDDS::DCPS::RepoId & | participantId, | |||
const OpenDDS::DCPS::RepoId & | topicId, | |||
OpenDDS::DCPS::DataReaderRemote_ptr | subscription, | |||
const DDS::DataReaderQos & | qos, | |||
const OpenDDS::DCPS::TransportLocatorSeq & | transInfo, | |||
const DDS::SubscriberQos & | subscriberQos, | |||
const char * | filterClassName, | |||
const char * | filterExpression, | |||
const DDS::StringSeq & | exprParams | |||
) | [virtual] |
Definition at line 660 of file DCPSInfo_i.cpp.
References DCPS_IR_Participant::add_subscription(), DCPS_IR_Topic::add_subscription_reference(), Update::Manager::create(), Update::DataReader, OpenDDS::DCPS::DCPS_debug_level, dispatchingOrb_, domains_, DCPS_IR_Participant::get_next_subscription_id(), OpenDDS::DCPS::GUID_UNKNOWN, DCPS_IR_Participant::isBitPublisher(), orb_, DCPS_IR_Domain::participant(), DCPS_IR_Domain::remove_dead_participants(), DCPS_IR_Participant::remove_subscription(), and um_.
Referenced by Update::Manager::add().
00671 { 00672 DCPS_IR_Domain* domainPtr; 00673 DCPS_IR_Participant* partPtr; 00674 DCPS_IR_Subscription* subPtr; 00675 DCPS_IR_Topic* topic; 00676 OpenDDS::DCPS::RepoId subId; 00677 { 00678 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, OpenDDS::DCPS::GUID_UNKNOWN); 00679 00680 // Grab the domain. 00681 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId); 00682 00683 if (where == this->domains_.end()) { 00684 throw OpenDDS::DCPS::Invalid_Domain(); 00685 } 00686 00687 // Grab the domain and participant. 00688 domainPtr = where->second; 00689 partPtr = domainPtr->participant(participantId); 00690 00691 if (0 == partPtr) { 00692 throw OpenDDS::DCPS::Invalid_Participant(); 00693 } 00694 00695 topic = where->second->find_topic(topicId); 00696 00697 if (topic == 0) { 00698 throw OpenDDS::DCPS::Invalid_Topic(); 00699 } 00700 00701 subId = partPtr->get_next_subscription_id(); 00702 00703 // Remarshall the remote reference onto the dispatching orb. 00704 OpenDDS::DCPS::DataReaderRemote_var marshalledSub ( 00705 OpenDDS::DCPS::DataReaderRemote::_duplicate(subscription)); 00706 if (CORBA::is_nil(marshalledSub.in())) { 00707 if (OpenDDS::DCPS::DCPS_debug_level > 4) { 00708 ACE_DEBUG((LM_WARNING, 00709 ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_subscription: ") 00710 ACE_TEXT("invalid subscription reference.\n"))); 00711 } 00712 return OpenDDS::DCPS::GUID_UNKNOWN; 00713 } 00714 00715 CORBA::String_var subStr = orb_->object_to_string(marshalledSub.in()); 00716 CORBA::Object_var subObj = dispatchingOrb_->string_to_object(subStr.in()); 00717 if (CORBA::is_nil(subObj.in())) { 00718 if (OpenDDS::DCPS::DCPS_debug_level > 4) { 00719 ACE_DEBUG((LM_WARNING, 00720 ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_subscription: ") 00721 ACE_TEXT("failure marshalling subscription on dispatching orb.\n"))); 00722 } 00723 return OpenDDS::DCPS::GUID_UNKNOWN; 00724 } 00725 OpenDDS::DCPS::DataReaderRemote_var dispatchingSubscription 00726 = OpenDDS::DCPS::DataReaderRemote::_unchecked_narrow(subObj.in()); 00727 00728 ACE_NEW_RETURN(subPtr, 00729 DCPS_IR_Subscription( 00730 subId, 00731 partPtr, 00732 topic, 00733 dispatchingSubscription.in(), 00734 qos, 00735 transInfo, 00736 subscriberQos, 00737 filterClassName, 00738 filterExpression, 00739 exprParams), 00740 OpenDDS::DCPS::GUID_UNKNOWN); 00741 00742 // Release lock 00743 } 00744 if (partPtr->add_subscription(subPtr) != 0) { 00745 // failed to add. we are responsible for the memory. 00746 subId = OpenDDS::DCPS::GUID_UNKNOWN; 00747 delete subPtr; 00748 subPtr = 0; 00749 00750 } else if (topic->add_subscription_reference(subPtr) != 0) { 00751 ACE_ERROR((LM_ERROR, ACE_TEXT("Failed to add subscription to topic list.\n"))); 00752 // No associations were made so remove and fail. 00753 partPtr->remove_subscription(subId); 00754 subId = OpenDDS::DCPS::GUID_UNKNOWN; 00755 } 00756 00757 if (this->um_ && (partPtr->isBitPublisher() == false)) { 00758 CORBA::String_var callback = orb_->object_to_string(subscription); 00759 Update::ContentSubscriptionInfo csi(filterClassName, filterExpression, exprParams); 00760 00761 Update::URActor actor(domainId, subId, topicId, participantId, Update::DataReader 00762 , callback.in() 00763 , const_cast<DDS::SubscriberQos &>(subscriberQos) 00764 , const_cast<DDS::DataReaderQos &>(qos) 00765 , const_cast<OpenDDS::DCPS::TransportLocatorSeq &> 00766 (transInfo), csi); 00767 00768 this->um_->create(actor); 00769 00770 if (OpenDDS::DCPS::DCPS_debug_level > 4) { 00771 OpenDDS::DCPS::RepoIdConverter converter(subId); 00772 ACE_DEBUG((LM_DEBUG, 00773 ACE_TEXT("(%P|%t) (RepoId)TAO_DDS_DCPSInfo_i::add_subscription: ") 00774 ACE_TEXT("pushing creation of subscription %C in domain %d.\n"), 00775 std::string(converter).c_str(), 00776 domainId)); 00777 } 00778 } 00779 00780 domainPtr->remove_dead_participants(); 00781 00782 return subId; 00783 }
bool TAO_DDS_DCPSInfo_i::add_topic | ( | const OpenDDS::DCPS::RepoId & | topicId, | |
DDS::DomainId_t | domainId, | |||
const OpenDDS::DCPS::RepoId & | participantId, | |||
const char * | topicName, | |||
const char * | dataTypeName, | |||
const DDS::TopicQos & | qos | |||
) |
Add a previously existing topic to the repository.
topicId | the Topic Entity GUID Id to use. | |
domainId | the Domain in which the Topic is contained. | |
participantId | the Participant in which the Topic is contained. | |
topicName | the name of the Topic. | |
dataTypeName | the name of the data type. | |
qos | the QoS value to use for the Topic. |
Definition at line 222 of file DCPSInfo_i.cpp.
References OpenDDS::DCPS::CREATED, OpenDDS::DCPS::DCPS_debug_level, domains_, OpenDDS::DCPS::GuidConverter::entityKey(), federation_, OpenDDS::DCPS::RepoIdConverter::federationId(), TAO_DDS_DCPSFederationId::id(), and DCPS_IR_Participant::last_topic_key().
Referenced by Update::Manager::add().
00228 { 00229 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, false); 00230 00231 // Grab the domain. 00232 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId); 00233 00234 if (where == this->domains_.end()) { 00235 if (OpenDDS::DCPS::DCPS_debug_level > 4) { 00236 ACE_DEBUG((LM_WARNING, 00237 ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_topic: ") 00238 ACE_TEXT("invalid domain %d.\n"), 00239 domainId)); 00240 } 00241 00242 return false; 00243 } 00244 00245 // Grab the participant. 00246 DCPS_IR_Participant* participantPtr 00247 = where->second->participant(participantId); 00248 00249 if (0 == participantPtr) { 00250 if (OpenDDS::DCPS::DCPS_debug_level > 4) { 00251 OpenDDS::DCPS::RepoIdConverter converter(participantId); 00252 ACE_DEBUG((LM_WARNING, 00253 ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_topic: ") 00254 ACE_TEXT("invalid participant %C.\n"), 00255 std::string(converter).c_str())); 00256 } 00257 00258 return false; 00259 } 00260 00261 OpenDDS::DCPS::TopicStatus topicStatus 00262 = where->second->force_add_topic(topicId, topicName, dataTypeName, 00263 qos, participantPtr); 00264 00265 if (topicStatus != OpenDDS::DCPS::CREATED) { 00266 return false; 00267 } 00268 00269 OpenDDS::DCPS::RepoIdConverter converter(topicId); 00270 00271 // See if we are adding a topic that was created within this 00272 // repository or a different repository. 00273 if (converter.federationId() == federation_.id()) { 00274 // Ensure the topic RepoId values do not conflict. 00275 participantPtr->last_topic_key(converter.entityKey()); 00276 } 00277 00278 return true; 00279 }
OpenDDS::DCPS::TopicStatus TAO_DDS_DCPSInfo_i::assert_topic | ( | OpenDDS::DCPS::RepoId_out | topicId, | |
DDS::DomainId_t | domainId, | |||
const OpenDDS::DCPS::RepoId & | participantId, | |||
const char * | topicName, | |||
const char * | dataTypeName, | |||
const DDS::TopicQos & | qos, | |||
bool | hasDcpsKey | |||
) | [virtual] |
Definition at line 170 of file DCPSInfo_i.cpp.
References Update::Manager::create(), OpenDDS::DCPS::DCPS_debug_level, domains_, OpenDDS::DCPS::INTERNAL_ERROR, DCPS_IR_Participant::isBitPublisher(), and um_.
00178 { 00179 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, OpenDDS::DCPS::INTERNAL_ERROR); 00180 // Grab the domain. 00181 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId); 00182 00183 if (where == this->domains_.end()) { 00184 throw OpenDDS::DCPS::Invalid_Domain(); 00185 } 00186 00187 // Grab the participant. 00188 DCPS_IR_Participant* participantPtr 00189 = where->second->participant(participantId); 00190 00191 if (0 == participantPtr) { 00192 throw OpenDDS::DCPS::Invalid_Participant(); 00193 } 00194 00195 OpenDDS::DCPS::TopicStatus topicStatus 00196 = where->second->add_topic( 00197 topicId, 00198 topicName, 00199 dataTypeName, 00200 qos, 00201 participantPtr); 00202 00203 if (this->um_ && (participantPtr->isBitPublisher() == false)) { 00204 Update::UTopic topic(domainId, topicId, participantId 00205 , topicName, dataTypeName 00206 , const_cast<DDS::TopicQos &>(qos)); 00207 this->um_->create(topic); 00208 00209 if (OpenDDS::DCPS::DCPS_debug_level > 4) { 00210 OpenDDS::DCPS::RepoIdConverter converter(topicId); 00211 ACE_DEBUG((LM_DEBUG, 00212 ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::assert_topic: ") 00213 ACE_TEXT("pushing creation of topic %C in domain %d.\n"), 00214 std::string(converter).c_str(), 00215 domainId)); 00216 } 00217 } 00218 return topicStatus; 00219 }
void TAO_DDS_DCPSInfo_i::association_complete | ( | DDS::DomainId_t | domainId, | |
const OpenDDS::DCPS::RepoId & | participantId, | |||
const OpenDDS::DCPS::RepoId & | localId, | |||
const OpenDDS::DCPS::RepoId & | remoteId | |||
) | [virtual] |
Definition at line 1469 of file DCPSInfo_i.cpp.
References DCPS_IR_Publication::association_complete(), DCPS_IR_Subscription::association_complete(), OpenDDS::DCPS::DCPS_debug_level, domains_, DCPS_IR_Participant::find_publication_reference(), and DCPS_IR_Participant::find_subscription_reference().
01473 { 01474 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_); 01475 01476 DCPS_IR_Domain_Map::iterator dom_iter = this->domains_.find(domainId); 01477 if (dom_iter == this->domains_.end()) { 01478 return; 01479 } 01480 01481 DCPS_IR_Participant* partPtr = dom_iter->second->participant(participantId); 01482 if (0 == partPtr) { 01483 return; 01484 } 01485 01486 // localId could be pub or sub (initial implementation will only use sub 01487 // since the DataReader is the passive peer) 01488 DCPS_IR_Subscription* sub = 0; 01489 DCPS_IR_Publication* pub = 0; 01490 if (OpenDDS::DCPS::DCPS_debug_level > 3) { 01491 ACE_DEBUG((LM_INFO, "(%P|%t) completing association\n")); 01492 } 01493 if (0 == partPtr->find_subscription_reference(localId, sub)) { 01494 sub->association_complete(remoteId); 01495 } else if (0 == partPtr->find_publication_reference(localId, pub)) { 01496 pub->association_complete(remoteId); 01497 } else { 01498 if (OpenDDS::DCPS::DCPS_debug_level > 3) { 01499 OpenDDS::DCPS::RepoIdConverter part_converter(participantId); 01500 OpenDDS::DCPS::RepoIdConverter local_converter(localId); 01501 OpenDDS::DCPS::RepoIdConverter remote_converter(remoteId); 01502 ACE_DEBUG((LM_WARNING, 01503 ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i::association_complete: ") 01504 ACE_TEXT("participant %C could not find subscription or publication %C ") 01505 ACE_TEXT("to complete association with remote %C.\n"), 01506 std::string(part_converter).c_str(), 01507 std::string(local_converter).c_str(), 01508 std::string(remote_converter).c_str())); 01509 } 01510 } 01511 }
CORBA::Boolean TAO_DDS_DCPSInfo_i::attach_participant | ( | DDS::DomainId_t | domainId, | |
const OpenDDS::DCPS::RepoId & | participantId | |||
) | [virtual] |
Definition at line 114 of file DCPSInfo_i.cpp.
References domains_, and DCPS_IR_Participant::takeOwnership().
00117 { 00118 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, 0); 00119 00120 // Grab the domain. 00121 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId); 00122 00123 if (where == this->domains_.end()) { 00124 throw OpenDDS::DCPS::Invalid_Domain(); 00125 } 00126 00127 // Grab the participant. 00128 DCPS_IR_Participant* participant 00129 = where->second->participant(participantId); 00130 00131 if (0 == participant) { 00132 throw OpenDDS::DCPS::Invalid_Participant(); 00133 } 00134 00135 // Establish ownership within the local repository. 00136 participant->takeOwnership(); 00137 00138 return false; 00139 }
bool TAO_DDS_DCPSInfo_i::changeOwnership | ( | DDS::DomainId_t | domainId, | |
const OpenDDS::DCPS::RepoId & | participantId, | |||
long | sender, | |||
long | owner | |||
) |
assert new ownership for a participant and its contained entities.
domainId | the domain in which the participant resides. | |
participantId | the participant to be owned. | |
sender | the repository sending the update data. | |
owner | the repository which is to make callbacks for entities within the participant. |
owner
as the new owner of the participant. Ownership consists of calling back to the reader and writer remote interfaces when associations are established and removed from a publication or subscription. Owner may be the special value of OWNER_NONE to indicate that the previous owner is no longer available to make callbacks and the application has not indicated which repository is to replace it in this capacity.
The sender
of the update is included so that the participant can check that transitions to OWNER_NONE are only honored when initiated by the current owner of the participant.
A return value of false
indicates that the ownership was specified for a domain or participant which could not be found.
Definition at line 142 of file DCPSInfo_i.cpp.
References DCPS_IR_Participant::changeOwner(), and domains_.
Referenced by OpenDDS::Federator::ManagerImpl::processCreate().
00147 { 00148 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, false); 00149 00150 // Grab the domain. 00151 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId); 00152 00153 if (where == this->domains_.end()) { 00154 return false; 00155 } 00156 00157 // Grab the participant. 00158 DCPS_IR_Participant* participant 00159 = where->second->participant(participantId); 00160 00161 if (0 == participant) { 00162 return false; 00163 } 00164 00165 // Establish the ownership. 00166 participant->changeOwner(sender, owner); 00167 return true; 00168 }
void TAO_DDS_DCPSInfo_i::disassociate_participant | ( | DDS::DomainId_t | domainId, | |
const OpenDDS::DCPS::RepoId & | local_id, | |||
const OpenDDS::DCPS::RepoId & | remote_id | |||
) | [virtual] |
Definition at line 1295 of file DCPSInfo_i.cpp.
References DCPS_IR_Participant::publications(), and DCPS_IR_Participant::subscriptions().
01299 { 01300 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_); 01301 01302 DCPS_IR_Domain_Map::iterator it(this->domains_.find(domainId)); 01303 if (it == this->domains_.end()) { 01304 throw OpenDDS::DCPS::Invalid_Domain(); 01305 } 01306 01307 DCPS_IR_Participant* participant = it->second->participant(local_id); 01308 if (participant == 0) { 01309 throw OpenDDS::DCPS::Invalid_Participant(); 01310 } 01311 01312 // Disassociate from participant temporarily: 01313 const DCPS_IR_Subscription_Map& subscriptions = participant->subscriptions(); 01314 for (DCPS_IR_Subscription_Map::const_iterator sub(subscriptions.begin()); 01315 sub != subscriptions.end(); ++sub) { 01316 sub->second->disassociate_participant(remote_id, true); 01317 } 01318 01319 const DCPS_IR_Publication_Map& publications = participant->publications(); 01320 for (DCPS_IR_Publication_Map::const_iterator pub(publications.begin()); 01321 pub != publications.end(); ++pub) { 01322 pub->second->disassociate_participant(remote_id, true); 01323 } 01324 01325 it->second->remove_dead_participants(); 01326 }
void TAO_DDS_DCPSInfo_i::disassociate_publication | ( | DDS::DomainId_t | domainId, | |
const OpenDDS::DCPS::RepoId & | participantId, | |||
const OpenDDS::DCPS::RepoId & | local_id, | |||
const OpenDDS::DCPS::RepoId & | remote_id | |||
) | [virtual] |
Definition at line 1371 of file DCPSInfo_i.cpp.
References OpenDDS::DCPS::DCPS_debug_level, DCPS_IR_Publication::disassociate_subscription(), and DCPS_IR_Participant::find_publication_reference().
01376 { 01377 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_); 01378 01379 DCPS_IR_Domain_Map::iterator it(this->domains_.find(domainId)); 01380 if (it == this->domains_.end()) { 01381 throw OpenDDS::DCPS::Invalid_Domain(); 01382 } 01383 01384 DCPS_IR_Participant* participant = it->second->participant(participantId); 01385 if (participant == 0) { 01386 throw OpenDDS::DCPS::Invalid_Participant(); 01387 } 01388 01389 if (OpenDDS::DCPS::DCPS_debug_level > 3) { 01390 ACE_DEBUG((LM_INFO, "(%P|%t) disassociating publication\n")); 01391 } 01392 01393 DCPS_IR_Publication* publication; 01394 if (participant->find_publication_reference(local_id, publication) 01395 != 0 || publication == 0) { 01396 OpenDDS::DCPS::RepoIdConverter part_converter(participantId); 01397 OpenDDS::DCPS::RepoIdConverter pub_converter(local_id); 01398 ACE_ERROR((LM_ERROR, 01399 ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::disassociate_publication: ") 01400 ACE_TEXT("participant %C could not find publication %C.\n"), 01401 std::string(part_converter).c_str(), 01402 std::string(pub_converter).c_str())); 01403 throw OpenDDS::DCPS::Invalid_Publication(); 01404 } 01405 01406 // Disassociate from subscription temporarily: 01407 publication->disassociate_subscription(remote_id, true); 01408 01409 it->second->remove_dead_participants(); 01410 }
void TAO_DDS_DCPSInfo_i::disassociate_subscription | ( | DDS::DomainId_t | domainId, | |
const OpenDDS::DCPS::RepoId & | participantId, | |||
const OpenDDS::DCPS::RepoId & | local_id, | |||
const OpenDDS::DCPS::RepoId & | remote_id | |||
) | [virtual] |
Definition at line 1329 of file DCPSInfo_i.cpp.
References OpenDDS::DCPS::DCPS_debug_level, DCPS_IR_Subscription::disassociate_publication(), and DCPS_IR_Participant::find_subscription_reference().
01334 { 01335 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_); 01336 01337 DCPS_IR_Domain_Map::iterator it(this->domains_.find(domainId)); 01338 if (it == this->domains_.end()) { 01339 throw OpenDDS::DCPS::Invalid_Domain(); 01340 } 01341 01342 DCPS_IR_Participant* participant = it->second->participant(participantId); 01343 if (participant == 0) { 01344 throw OpenDDS::DCPS::Invalid_Participant(); 01345 } 01346 01347 if (OpenDDS::DCPS::DCPS_debug_level > 3) { 01348 ACE_DEBUG((LM_INFO, "(%P|%t) disassociating subscription\n")); 01349 } 01350 01351 DCPS_IR_Subscription* subscription; 01352 if (participant->find_subscription_reference(local_id, subscription) 01353 != 0 || subscription == 0) { 01354 OpenDDS::DCPS::RepoIdConverter part_converter(participantId); 01355 OpenDDS::DCPS::RepoIdConverter sub_converter(local_id); 01356 ACE_ERROR((LM_ERROR, 01357 ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::disassociate_subscription: ") 01358 ACE_TEXT("participant %C could not find subscription %C.\n"), 01359 std::string(part_converter).c_str(), 01360 std::string(sub_converter).c_str())); 01361 throw OpenDDS::DCPS::Invalid_Subscription(); 01362 } 01363 01364 // Disassociate from publication temporarily: 01365 subscription->disassociate_publication(remote_id, true); 01366 01367 it->second->remove_dead_participants(); 01368 }
DCPS_IR_Domain * TAO_DDS_DCPSInfo_i::domain | ( | DDS::DomainId_t | domain | ) |
Convert a domain Id into a reference to a DCPS_IR_Domain object.
Definition at line 2090 of file DCPSInfo_i.cpp.
References OpenDDS::DCPS::Service_Participant::ANY_DOMAIN, OpenDDS::DCPS::DCPS_debug_level, domains_, and TheServiceParticipant.
Referenced by add_domain_participant(), and update_subscription_params().
02091 { 02092 if (domain == OpenDDS::DCPS::Service_Participant::ANY_DOMAIN) { 02093 ACE_ERROR((LM_ERROR, 02094 ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::domain: ") 02095 ACE_TEXT("ANY_DOMAIN not supported for operations.\n"))); 02096 return 0; 02097 } 02098 02099 // Check if the domain is already in the map. 02100 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domain); 02101 02102 if (where == this->domains_.end()) { 02103 // We will attempt to insert a new domain, go ahead and allocate it. 02104 DCPS_IR_Domain* domainPtr; 02105 ACE_NEW_RETURN(domainPtr, 02106 DCPS_IR_Domain(domain, this->participantIdGenerator_), 02107 0); 02108 02109 // We need to insert the domain into the map at this time since it 02110 // might be looked up during the init_built_in_topics() call. 02111 this->domains_.insert( 02112 where, 02113 DCPS_IR_Domain_Map::value_type(domain, domainPtr)); 02114 02115 int bit_status = 0; 02116 02117 if (TheServiceParticipant->get_BIT()) { 02118 #if !defined (DDS_HAS_MINIMUM_BIT) 02119 bit_status = domainPtr->init_built_in_topics(this->federation_.overridden()); 02120 #endif // !defined (DDS_HAS_MINIMUM_BIT) 02121 } 02122 02123 if (0 != bit_status) { 02124 ACE_ERROR((LM_ERROR, 02125 ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::domain: ") 02126 ACE_TEXT("failed to initialize the Built-In Topics ") 02127 ACE_TEXT("when loading domain %d.\n"), 02128 domain)); 02129 this->domains_.erase(domain); 02130 delete domainPtr; 02131 return 0; 02132 } 02133 02134 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 02135 ACE_DEBUG((LM_DEBUG, 02136 ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::domain: ") 02137 ACE_TEXT("successfully loaded domain %d at %x.\n"), 02138 domain, 02139 domainPtr)); 02140 } 02141 return domainPtr; 02142 02143 } else { 02144 return where->second; 02145 } 02146 }
const DCPS_IR_Domain_Map & TAO_DDS_DCPSInfo_i::domains | ( | ) | const |
Expose a readable reference of the domain map.
Definition at line 2419 of file DCPSInfo_i.cpp.
References domains_.
Referenced by OpenDDS::Federator::ManagerImpl::pushState().
02420 { 02421 return this->domains_; 02422 }
char * TAO_DDS_DCPSInfo_i::dump_to_string | ( | ) | [virtual] |
Dump the Repos state to string.
Definition at line 2426 of file DCPSInfo_i.cpp.
References domains_, and CORBA::string_dup().
02427 { 02428 std::string dump; 02429 #if !defined (OPENDDS_INFOREPO_REDUCED_FOOTPRINT) 02430 std::string indent (" "); 02431 02432 for (DCPS_IR_Domain_Map::const_iterator dm = domains_.begin(); 02433 dm != domains_.end(); 02434 dm++) 02435 { 02436 dump += dm->second->dump_to_string(indent, 0); 02437 } 02438 #endif // !defined (OPENDDS_INFOREPO_REDUCED_FOOTPRINT) 02439 return CORBA::string_dup(dump.c_str()); 02440 02441 }
void TAO_DDS_DCPSInfo_i::finalize | ( | ) |
Cleanup state for shutdown.
Definition at line 2401 of file DCPSInfo_i.cpp.
References dispatch_check_timer_id_, orb_, and reassociate_timer_id_.
02402 { 02403 if (reassociate_timer_id_ != -1) { 02404 ACE_Reactor* reactor = this->orb_->orb_core()->reactor(); 02405 02406 reactor->cancel_timer(this->reassociate_timer_id_); 02407 this->reassociate_timer_id_ = -1; 02408 } 02409 02410 if (dispatch_check_timer_id_ != -1) { 02411 ACE_Reactor* reactor = this->orb_->orb_core()->reactor(); 02412 02413 reactor->cancel_timer(this->dispatch_check_timer_id_); 02414 this->dispatch_check_timer_id_ = -1; 02415 } 02416 }
OpenDDS::DCPS::TopicStatus TAO_DDS_DCPSInfo_i::find_topic | ( | DDS::DomainId_t | domainId, | |
const char * | topicName, | |||
CORBA::String_out | dataTypeName, | |||
DDS::TopicQos_out | qos, | |||
OpenDDS::DCPS::RepoId_out | topicId | |||
) | [virtual] |
Definition at line 281 of file DCPSInfo_i.cpp.
References domains_, OpenDDS::DCPS::FOUND, DCPS_IR_Topic_Description::get_dataTypeName(), DCPS_IR_Topic::get_id(), DCPS_IR_Topic::get_topic_description(), DCPS_IR_Topic::get_topic_qos(), OpenDDS::DCPS::INTERNAL_ERROR, and OpenDDS::DCPS::NOT_FOUND.
00287 { 00288 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, OpenDDS::DCPS::INTERNAL_ERROR); 00289 00290 // Grab the domain. 00291 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId); 00292 00293 if (where == this->domains_.end()) { 00294 throw OpenDDS::DCPS::Invalid_Domain(); 00295 } 00296 00297 OpenDDS::DCPS::TopicStatus status = OpenDDS::DCPS::NOT_FOUND; 00298 00299 DCPS_IR_Topic* topic = 0; 00300 qos = new DDS::TopicQos; 00301 00302 status = where->second->find_topic(topicName, topic); 00303 00304 if (0 != topic) { 00305 status = OpenDDS::DCPS::FOUND; 00306 const DCPS_IR_Topic_Description* desc = topic->get_topic_description(); 00307 dataTypeName = desc->get_dataTypeName(); 00308 *qos = *(topic->get_topic_qos()); 00309 topicId = topic->get_id(); 00310 } 00311 00312 return status; 00313 }
int TAO_DDS_DCPSInfo_i::handle_timeout | ( | const ACE_Time_Value & | now, | |
const void * | arg | |||
) | [virtual] |
Definition at line 58 of file DCPSInfo_i.cpp.
References dispatchingOrb_, and domains_.
00060 { 00061 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, 0); 00062 00063 if (arg == this) { 00064 if ( !CORBA::is_nil(this->dispatchingOrb_.in())){ 00065 if (this->dispatchingOrb_->work_pending()) 00066 { 00067 // Ten microseconds 00068 ACE_Time_Value small(0,10); 00069 this->dispatchingOrb_->perform_work(small); 00070 } 00071 } 00072 } 00073 else { 00074 // NOTE: This is a purposefully naive approach to addressing defunct 00075 // associations. In the future, it may be worthwhile to introduce a 00076 // callback model to fix the heinous runtime cost below: 00077 for (DCPS_IR_Domain_Map::const_iterator dom(this->domains_.begin()); 00078 dom != this->domains_.end(); ++dom) { 00079 00080 const DCPS_IR_Participant_Map& participants(dom->second->participants()); 00081 for (DCPS_IR_Participant_Map::const_iterator part(participants.begin()); 00082 part != participants.end(); ++part) { 00083 00084 const DCPS_IR_Subscription_Map& subscriptions(part->second->subscriptions()); 00085 for (DCPS_IR_Subscription_Map::const_iterator sub(subscriptions.begin()); 00086 sub != subscriptions.end(); ++sub) { 00087 sub->second->reevaluate_defunct_associations(); 00088 } 00089 00090 const DCPS_IR_Publication_Map& publications(part->second->publications()); 00091 for (DCPS_IR_Publication_Map::const_iterator pub(publications.begin()); 00092 pub != publications.end(); ++pub) { 00093 pub->second->reevaluate_defunct_associations(); 00094 } 00095 } 00096 } 00097 } 00098 00099 return 0; 00100 }
void TAO_DDS_DCPSInfo_i::ignore_domain_participant | ( | DDS::DomainId_t | domainId, | |
const OpenDDS::DCPS::RepoId & | myParticipantId, | |||
const OpenDDS::DCPS::RepoId & | ignoreId | |||
) | [virtual] |
Definition at line 1513 of file DCPSInfo_i.cpp.
References domains_, and DCPS_IR_Participant::ignore_participant().
01517 { 01518 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_); 01519 01520 // Grab the domain. 01521 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId); 01522 01523 if (where == this->domains_.end()) { 01524 throw OpenDDS::DCPS::Invalid_Domain(); 01525 } 01526 01527 // Grab the participant. 01528 DCPS_IR_Participant* partPtr 01529 = where->second->participant(myParticipantId); 01530 01531 if (0 == partPtr) { 01532 throw OpenDDS::DCPS::Invalid_Participant(); 01533 } 01534 01535 partPtr->ignore_participant(ignoreId); 01536 01537 where->second->remove_dead_participants(); 01538 }
void TAO_DDS_DCPSInfo_i::ignore_publication | ( | DDS::DomainId_t | domainId, | |
const OpenDDS::DCPS::RepoId & | myParticipantId, | |||
const OpenDDS::DCPS::RepoId & | ignoreId | |||
) | [virtual] |
Definition at line 1594 of file DCPSInfo_i.cpp.
References domains_, and DCPS_IR_Participant::ignore_publication().
01598 { 01599 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_); 01600 01601 // Grab the domain. 01602 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId); 01603 01604 if (where == this->domains_.end()) { 01605 throw OpenDDS::DCPS::Invalid_Domain(); 01606 } 01607 01608 // Grab the participant. 01609 DCPS_IR_Participant* partPtr 01610 = where->second->participant(myParticipantId); 01611 01612 if (0 == partPtr) { 01613 throw OpenDDS::DCPS::Invalid_Participant(); 01614 } 01615 01616 partPtr->ignore_publication(ignoreId); 01617 01618 where->second->remove_dead_participants(); 01619 }
void TAO_DDS_DCPSInfo_i::ignore_subscription | ( | DDS::DomainId_t | domainId, | |
const OpenDDS::DCPS::RepoId & | myParticipantId, | |||
const OpenDDS::DCPS::RepoId & | ignoreId | |||
) | [virtual] |
Definition at line 1567 of file DCPSInfo_i.cpp.
References domains_, and DCPS_IR_Participant::ignore_subscription().
01571 { 01572 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_); 01573 01574 // Grab the domain. 01575 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId); 01576 01577 if (where == this->domains_.end()) { 01578 throw OpenDDS::DCPS::Invalid_Domain(); 01579 } 01580 01581 // Grab the participant. 01582 DCPS_IR_Participant* partPtr 01583 = where->second->participant(myParticipantId); 01584 01585 if (0 == partPtr) { 01586 throw OpenDDS::DCPS::Invalid_Participant(); 01587 } 01588 01589 partPtr->ignore_subscription(ignoreId); 01590 01591 where->second->remove_dead_participants(); 01592 }
void TAO_DDS_DCPSInfo_i::ignore_topic | ( | DDS::DomainId_t | domainId, | |
const OpenDDS::DCPS::RepoId & | myParticipantId, | |||
const OpenDDS::DCPS::RepoId & | ignoreId | |||
) | [virtual] |
Definition at line 1540 of file DCPSInfo_i.cpp.
References domains_, and DCPS_IR_Participant::ignore_topic().
01544 { 01545 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_); 01546 01547 // Grab the domain. 01548 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId); 01549 01550 if (where == this->domains_.end()) { 01551 throw OpenDDS::DCPS::Invalid_Domain(); 01552 } 01553 01554 // Grab the participant. 01555 DCPS_IR_Participant* partPtr 01556 = where->second->participant(myParticipantId); 01557 01558 if (0 == partPtr) { 01559 throw OpenDDS::DCPS::Invalid_Participant(); 01560 } 01561 01562 partPtr->ignore_topic(ignoreId); 01563 01564 where->second->remove_dead_participants(); 01565 }
bool TAO_DDS_DCPSInfo_i::init_dispatchChecking | ( | const ACE_Time_Value & | delay | ) |
Definition at line 2390 of file DCPSInfo_i.cpp.
References dispatch_check_timer_id_, and orb_.
02391 { 02392 if (this->dispatch_check_timer_id_ != -1) return false; // already scheduled 02393 02394 ACE_Reactor* reactor = this->orb_->orb_core()->reactor(); 02395 02396 this->dispatch_check_timer_id_ = reactor->schedule_timer(this, this, delay, delay); 02397 return this->dispatch_check_timer_id_ != -1; 02398 }
bool TAO_DDS_DCPSInfo_i::init_persistence | ( | ) |
Definition at line 2357 of file DCPSInfo_i.cpp.
References Update::Manager::add(), reincarnate_, Update::Manager::requestImage(), and um_.
02358 { 02359 um_ = ACE_Dynamic_Service<UpdateManagerSvc>::instance 02360 ("UpdateManagerSvc"); 02361 02362 if (um_ != 0) { 02363 um_->add(this); 02364 02365 // Request persistent image. 02366 if (reincarnate_) { 02367 um_->requestImage(); 02368 } 02369 02370 } else { 02371 ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("TAO_DDS_DCPSInfo_i> Failed to discover ") 02372 ACE_TEXT("UpdateManagerSvc.\n")), false); 02373 } 02374 02375 return true; 02376 }
bool TAO_DDS_DCPSInfo_i::init_reassociation | ( | const ACE_Time_Value & | delay | ) |
Definition at line 2379 of file DCPSInfo_i.cpp.
References orb_, and reassociate_timer_id_.
02380 { 02381 if (this->reassociate_timer_id_ != -1) return false; // already scheduled 02382 02383 ACE_Reactor* reactor = this->orb_->orb_core()->reactor(); 02384 02385 this->reassociate_timer_id_ = reactor->schedule_timer(this, 0, delay, delay); 02386 return this->reassociate_timer_id_ != -1; 02387 }
int TAO_DDS_DCPSInfo_i::init_transport | ( | int | listen_address_given, | |
const char * | listen_str | |||
) |
Initialize the transport for the Built-In Topics Returns 0 (zero) if succeeds
Definition at line 2148 of file DCPSInfo_i.cpp.
References OpenDDS::DCPS::TransportRegistry::DEFAULT_INST_PREFIX, Util::find(), and OpenDDS::DCPS::TransportRegistry::instance().
02150 { 02151 int status = 0; 02152 02153 try { 02154 02155 #ifndef ACE_AS_STATIC_LIBS 02156 if (ACE_Service_Config::current()->find(ACE_TEXT("OpenDDS_Tcp")) 02157 < 0 /* not found (-1) or suspended (-2) */) { 02158 static const ACE_TCHAR directive[] = 02159 ACE_TEXT("dynamic OpenDDS_Tcp Service_Object * ") 02160 ACE_TEXT("OpenDDS_Tcp:_make_TcpLoader()"); 02161 ACE_Service_Config::process_directive(directive); 02162 } 02163 #endif 02164 02165 std::string config_name = 02166 OpenDDS::DCPS::TransportRegistry::DEFAULT_INST_PREFIX 02167 + std::string("InfoRepoBITTransportConfig"); 02168 OpenDDS::DCPS::TransportConfig_rch config = 02169 OpenDDS::DCPS::TransportRegistry::instance()->create_config(config_name); 02170 02171 std::string inst_name = 02172 OpenDDS::DCPS::TransportRegistry::DEFAULT_INST_PREFIX 02173 + std::string("InfoRepoBITTCPTransportInst"); 02174 OpenDDS::DCPS::TransportInst_rch inst = 02175 OpenDDS::DCPS::TransportRegistry::instance()->create_inst(inst_name, 02176 "tcp"); 02177 config->instances_.push_back(inst); 02178 02179 OpenDDS::DCPS::TcpInst_rch tcp_inst = 02180 OpenDDS::DCPS::dynamic_rchandle_cast<OpenDDS::DCPS::TcpInst>(inst); 02181 inst->datalink_release_delay_ = 0; 02182 02183 tcp_inst->conn_retry_attempts_ = 0; 02184 02185 if (listen_address_given) { 02186 tcp_inst->local_address(listen_str); 02187 } 02188 02189 } catch (...) { 02190 // TransportRegistry is extremely varied in the exceptions that 02191 // it throws on failure; do not allow exceptions to bubble up 02192 // beyond this point. 02193 status = 1; 02194 } 02195 return status; 02196 }
CORBA::ORB_ptr TAO_DDS_DCPSInfo_i::orb | ( | ) |
Expose the ORB.
Definition at line 109 of file DCPSInfo_i.cpp.
Referenced by OpenDDS::Federator::ManagerImpl::pushState().
bool TAO_DDS_DCPSInfo_i::receive_image | ( | const Update::UImage & | image | ) |
Definition at line 2199 of file DCPSInfo_i.cpp.
References Update::ImageData< T, P, A, W >::actors, OpenDDS::DCPS::DCPS_debug_level, domains_, federation_, OpenDDS::DCPS::RepoIdConverter::federationId(), TAO_DDS_DCPSFederationId::id(), RepoIdGenerator::last(), OpenDDS::DCPS::RepoIdConverter::participantId(), participantIdGenerator_, Update::ImageData< T, P, A, W >::participants, Update::ImageData< T, P, A, W >::topics, and Update::ImageData< T, P, A, W >::wActors.
Referenced by Update::Manager::pushImage().
02200 { 02201 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 02202 ACE_DEBUG((LM_DEBUG, 02203 ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::receive_image: ") 02204 ACE_TEXT("processing persistent data.\n"))); 02205 } 02206 02207 // Ensure that new BIT participants do not reuse an id 02208 for (Update::UImage::ParticipantSeq::const_iterator 02209 iter = image.participants.begin(); 02210 iter != image.participants.end(); iter++) { 02211 const Update::UParticipant* part = *iter; 02212 OpenDDS::DCPS::RepoIdConverter converter(part->participantId); 02213 if (converter.federationId() == this->federation_.id()) { 02214 participantIdGenerator_.last(converter.participantId()); 02215 } 02216 } 02217 02218 for (Update::UImage::ParticipantSeq::const_iterator 02219 iter = image.participants.begin(); 02220 iter != image.participants.end(); iter++) { 02221 const Update::UParticipant* part = *iter; 02222 02223 if (!this->add_domain_participant(part->domainId, part->participantId 02224 , part->participantQos)) { 02225 OpenDDS::DCPS::RepoIdConverter converter(part->participantId); 02226 ACE_ERROR((LM_ERROR, 02227 ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::receive_image: ") 02228 ACE_TEXT("failed to add participant %C to domain %d.\n"), 02229 std::string(converter).c_str(), 02230 part->domainId)); 02231 return false; 02232 02233 } else if (OpenDDS::DCPS::DCPS_debug_level > 0) { 02234 OpenDDS::DCPS::RepoIdConverter converter(part->participantId); 02235 ACE_DEBUG((LM_DEBUG, 02236 ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::receive_image: ") 02237 ACE_TEXT("added participant %C to domain %d.\n"), 02238 std::string(converter).c_str(), 02239 part->domainId)); 02240 } 02241 } 02242 02243 for (Update::UImage::TopicSeq::const_iterator iter = image.topics.begin(); 02244 iter != image.topics.end(); iter++) { 02245 const Update::UTopic* topic = *iter; 02246 02247 if (!this->add_topic(topic->topicId, topic->domainId 02248 , topic->participantId, topic->name.c_str() 02249 , topic->dataType.c_str(), topic->topicQos)) { 02250 OpenDDS::DCPS::RepoIdConverter topic_converter(topic->topicId); 02251 OpenDDS::DCPS::RepoIdConverter part_converter(topic->participantId); 02252 ACE_ERROR((LM_ERROR, 02253 ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::receive_image: ") 02254 ACE_TEXT("failed to add topic %C to participant %C.\n"), 02255 std::string(topic_converter).c_str(), 02256 std::string(part_converter).c_str())); 02257 return false; 02258 02259 } else if (OpenDDS::DCPS::DCPS_debug_level > 0) { 02260 OpenDDS::DCPS::RepoIdConverter topic_converter(topic->topicId); 02261 OpenDDS::DCPS::RepoIdConverter part_converter(topic->participantId); 02262 ACE_DEBUG((LM_DEBUG, 02263 ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::receive_image: ") 02264 ACE_TEXT("added topic %C to participant %C.\n"), 02265 std::string(topic_converter).c_str(), 02266 std::string(part_converter).c_str())); 02267 } 02268 } 02269 02270 for (Update::UImage::ReaderSeq::const_iterator iter = image.actors.begin(); 02271 iter != image.actors.end(); iter++) { 02272 const Update::URActor* sub = *iter; 02273 02274 // no reason to associate, there are no publishers yet to associate with 02275 if (!this->add_subscription(sub->domainId, sub->participantId 02276 , sub->topicId, sub->actorId 02277 , sub->callback.c_str(), sub->drdwQos 02278 , sub->transportInterfaceInfo 02279 , sub->pubsubQos 02280 , sub->contentSubscriptionProfile.filterClassName 02281 , sub->contentSubscriptionProfile.filterExpr 02282 , sub->contentSubscriptionProfile.exprParams)) { 02283 OpenDDS::DCPS::RepoIdConverter sub_converter(sub->actorId); 02284 OpenDDS::DCPS::RepoIdConverter part_converter(sub->participantId); 02285 ACE_ERROR((LM_ERROR, 02286 ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::receive_image: ") 02287 ACE_TEXT("failed to add subscription %C to participant %C.\n"), 02288 std::string(sub_converter).c_str(), 02289 std::string(part_converter).c_str())); 02290 return false; 02291 02292 } else if (OpenDDS::DCPS::DCPS_debug_level > 0) { 02293 OpenDDS::DCPS::RepoIdConverter sub_converter(sub->actorId); 02294 OpenDDS::DCPS::RepoIdConverter part_converter(sub->participantId); 02295 ACE_DEBUG((LM_DEBUG, 02296 ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::receive_image: ") 02297 ACE_TEXT("added subscription %C to participant %C.\n"), 02298 std::string(sub_converter).c_str(), 02299 std::string(part_converter).c_str())); 02300 } 02301 } 02302 02303 for (Update::UImage::WriterSeq::const_iterator iter = image.wActors.begin(); 02304 iter != image.wActors.end(); iter++) { 02305 const Update::UWActor* pub = *iter; 02306 02307 // try to associate with any persisted subscriptions to track any expected 02308 // existing associations 02309 if (!this->add_publication(pub->domainId, pub->participantId 02310 , pub->topicId, pub->actorId 02311 , pub->callback.c_str() , pub->drdwQos 02312 , pub->transportInterfaceInfo 02313 , pub->pubsubQos 02314 , true)) { 02315 OpenDDS::DCPS::RepoIdConverter pub_converter(pub->actorId); 02316 OpenDDS::DCPS::RepoIdConverter part_converter(pub->participantId); 02317 ACE_ERROR((LM_ERROR, 02318 ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::receive_image: ") 02319 ACE_TEXT("failed to add publication %C to participant %C.\n"), 02320 std::string(pub_converter).c_str(), 02321 std::string(part_converter).c_str())); 02322 return false; 02323 02324 } else if (OpenDDS::DCPS::DCPS_debug_level > 0) { 02325 OpenDDS::DCPS::RepoIdConverter pub_converter(pub->actorId); 02326 OpenDDS::DCPS::RepoIdConverter part_converter(pub->participantId); 02327 ACE_DEBUG((LM_DEBUG, 02328 ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::receive_image: ") 02329 ACE_TEXT("added publication %C to participant %C.\n"), 02330 std::string(pub_converter).c_str(), 02331 std::string(part_converter).c_str())); 02332 } 02333 } 02334 02335 #if !defined (DDS_HAS_MINIMUM_BIT) 02336 // reassociate the bit publisher and subscribers 02337 for (DCPS_IR_Domain_Map::const_iterator currentDomain = domains_.begin(); 02338 currentDomain != domains_.end(); 02339 ++currentDomain) { 02340 02341 currentDomain->second->reassociate_built_in_topic_pubs(); 02342 } 02343 #endif // !defined (DDS_HAS_MINIMUM_BIT) 02344 02345 return true; 02346 }
bool TAO_DDS_DCPSInfo_i::remove_by_owner | ( | DDS::DomainId_t | domain, | |
long | owner | |||
) |
Definition at line 1172 of file DCPSInfo_i.cpp.
References OpenDDS::DCPS::DCPS_debug_level, domains_, and remove_domain_participant().
01175 { 01176 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, false); 01177 01178 // Grab the domain. 01179 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domain); 01180 01181 if (where == this->domains_.end()) { 01182 return false; 01183 } 01184 01185 std::vector<OpenDDS::DCPS::RepoId> candidates; 01186 01187 for (DCPS_IR_Participant_Map::const_iterator 01188 current = where->second->participants().begin(); 01189 current != where->second->participants().end(); 01190 ++current) { 01191 if (current->second->owner() == owner) { 01192 candidates.push_back(current->second->get_id()); 01193 } 01194 } 01195 01196 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 01197 ACE_DEBUG((LM_DEBUG, 01198 ACE_TEXT("(%P|%t) (bool)TAO_DDS_DCPSInfo_i::remove_by_owner: ") 01199 ACE_TEXT("%d participants to remove from domain %d.\n"), 01200 candidates.size(), 01201 domain)); 01202 } 01203 01204 bool status = true; 01205 01206 for (unsigned int index = 0; index < candidates.size(); ++index) { 01207 DCPS_IR_Participant* participant 01208 = where->second->participant(candidates[index]); 01209 01210 std::vector<OpenDDS::DCPS::RepoId> keylist; 01211 01212 // Remove Subscriptions 01213 for (DCPS_IR_Subscription_Map::const_iterator 01214 current = participant->subscriptions().begin(); 01215 current != participant->subscriptions().end(); 01216 ++current) { 01217 keylist.push_back(current->second->get_id()); 01218 } 01219 01220 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 01221 OpenDDS::DCPS::RepoIdConverter converter(candidates[index]); 01222 ACE_DEBUG((LM_DEBUG, 01223 ACE_TEXT("(%P|%t) (bool)TAO_DDS_DCPSInfo_i::remove_by_owner: ") 01224 ACE_TEXT("%d subscriptions to remove from participant %C.\n"), 01225 keylist.size(), 01226 std::string(converter).c_str())); 01227 } 01228 01229 for (unsigned int key = 0; key < keylist.size(); ++key) { 01230 if (participant->remove_subscription(keylist[ key]) != 0) { 01231 status = false; 01232 } 01233 } 01234 01235 // Remove Publications 01236 keylist.clear(); 01237 01238 for (DCPS_IR_Publication_Map::const_iterator 01239 current = participant->publications().begin(); 01240 current != participant->publications().end(); 01241 ++current) { 01242 keylist.push_back(current->second->get_id()); 01243 } 01244 01245 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 01246 OpenDDS::DCPS::RepoIdConverter converter(candidates[index]); 01247 ACE_DEBUG((LM_DEBUG, 01248 ACE_TEXT("(%P|%t) (bool)TAO_DDS_DCPSInfo_i::remove_by_owner: ") 01249 ACE_TEXT("%d publications to remove from participant %C.\n"), 01250 keylist.size(), 01251 std::string(converter).c_str())); 01252 } 01253 01254 for (unsigned int key = 0; key < keylist.size(); ++key) { 01255 if (participant->remove_publication(keylist[ key]) != 0) { 01256 status = false; 01257 } 01258 } 01259 01260 // Remove Topics 01261 keylist.clear(); 01262 01263 for (DCPS_IR_Topic_Map::const_iterator 01264 current = participant->topics().begin(); 01265 current != participant->topics().end(); 01266 ++current) { 01267 keylist.push_back(current->second->get_id()); 01268 } 01269 01270 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 01271 OpenDDS::DCPS::RepoIdConverter converter(candidates[index]); 01272 ACE_DEBUG((LM_DEBUG, 01273 ACE_TEXT("(%P|%t) (bool)TAO_DDS_DCPSInfo_i::remove_by_owner: ") 01274 ACE_TEXT("%d topics to remove from participant %C.\n"), 01275 keylist.size(), 01276 std::string(converter).c_str())); 01277 } 01278 01279 for (unsigned int key = 0; key < keylist.size(); ++key) { 01280 DCPS_IR_Topic* discard; 01281 01282 if (participant->remove_topic_reference(keylist[ key], discard) != 0) { 01283 status = false; 01284 } 01285 } 01286 01287 // Remove Participant 01288 this->remove_domain_participant(domain, candidates[ index]); 01289 } 01290 01291 return status; 01292 }
void TAO_DDS_DCPSInfo_i::remove_domain_participant | ( | DDS::DomainId_t | domainId, | |
const OpenDDS::DCPS::RepoId & | participantId | |||
) | [virtual] |
Definition at line 1412 of file DCPSInfo_i.cpp.
References OpenDDS::DCPS::DCPS_debug_level, Update::Manager::destroy(), domains_, DCPS_IR_Participant::isBitPublisher(), DCPS_IR_Participant::isOwner(), Update::Participant, and um_.
Referenced by OpenDDS::Federator::ManagerImpl::processDelete(), and remove_by_owner().
01415 { 01416 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_); 01417 01418 // Grab the domain. 01419 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId); 01420 01421 if (where == this->domains_.end()) { 01422 throw OpenDDS::DCPS::Invalid_Domain(); 01423 } 01424 01425 DCPS_IR_Participant* participant = where->second->participant(participantId); 01426 01427 if (participant == 0) { 01428 OpenDDS::DCPS::RepoIdConverter converter(participantId); 01429 ACE_ERROR((LM_ERROR, 01430 ACE_TEXT("(%P|%t) ERROR: (bool)TAO_DDS_DCPSInfo_i::remove_domain_participant: ") 01431 ACE_TEXT("failed to locate participant %C in domain %d.\n"), 01432 std::string(converter).c_str(), 01433 domainId)); 01434 throw OpenDDS::DCPS::Invalid_Participant(); 01435 } 01436 01437 // Determine if we should propagate this event; we need to cache this 01438 // result as the participant will be gone by the time we use the result. 01439 bool sendUpdate = (participant->isOwner() == true) 01440 && (participant->isBitPublisher() == false); 01441 01442 CORBA::Boolean dont_notify_lost = 0; 01443 int status = where->second->remove_participant(participantId, dont_notify_lost); 01444 01445 if (0 != status) { 01446 // Removing the participant failed 01447 throw OpenDDS::DCPS::Invalid_Participant(); 01448 } 01449 01450 // Update any concerned observers that the participant was destroyed. 01451 if (this->um_ && sendUpdate) { 01452 Update::IdPath path( 01453 where->second->get_id(), 01454 participantId, 01455 participantId); 01456 this->um_->destroy(path, Update::Participant); 01457 01458 if (OpenDDS::DCPS::DCPS_debug_level > 4) { 01459 OpenDDS::DCPS::RepoIdConverter converter(participantId); 01460 ACE_DEBUG((LM_DEBUG, 01461 ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::remove_domain_participant: ") 01462 ACE_TEXT("pushing deletion of participant %C in domain %d.\n"), 01463 std::string(converter).c_str(), 01464 domainId)); 01465 } 01466 } 01467 }
void TAO_DDS_DCPSInfo_i::remove_publication | ( | DDS::DomainId_t | domainId, | |
const OpenDDS::DCPS::RepoId & | participantId, | |||
const OpenDDS::DCPS::RepoId & | publicationId | |||
) | [virtual] |
Definition at line 612 of file DCPSInfo_i.cpp.
References Update::Actor, Update::DataWriter, OpenDDS::DCPS::DCPS_debug_level, Update::Manager::destroy(), domains_, DCPS_IR_Participant::isBitPublisher(), DCPS_IR_Participant::isOwner(), DCPS_IR_Participant::remove_publication(), and um_.
Referenced by OpenDDS::Federator::ManagerImpl::processDelete().
00616 { 00617 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_); 00618 00619 // Grab the domain. 00620 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId); 00621 00622 if (where == this->domains_.end()) { 00623 throw OpenDDS::DCPS::Invalid_Domain(); 00624 } 00625 00626 // Grab the participant. 00627 DCPS_IR_Participant* partPtr 00628 = where->second->participant(participantId); 00629 00630 if (0 == partPtr) { 00631 throw OpenDDS::DCPS::Invalid_Participant(); 00632 } 00633 00634 if (partPtr->remove_publication(publicationId) != 0) { 00635 where->second->remove_dead_participants(); 00636 00637 // throw exception because the publication was not removed! 00638 throw OpenDDS::DCPS::Invalid_Publication(); 00639 } 00640 00641 where->second->remove_dead_participants(); 00642 00643 if (this->um_ 00644 && (partPtr->isOwner() == true) 00645 && (partPtr->isBitPublisher() == false)) { 00646 Update::IdPath path(domainId, participantId, publicationId); 00647 this->um_->destroy(path, Update::Actor, Update::DataWriter); 00648 00649 if (OpenDDS::DCPS::DCPS_debug_level > 4) { 00650 OpenDDS::DCPS::RepoIdConverter converter(publicationId); 00651 ACE_DEBUG((LM_DEBUG, 00652 ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::remove_publication: ") 00653 ACE_TEXT("pushing deletion of publication %C in domain %d.\n"), 00654 std::string(converter).c_str(), 00655 domainId)); 00656 } 00657 } 00658 }
void TAO_DDS_DCPSInfo_i::remove_subscription | ( | DDS::DomainId_t | domainId, | |
const OpenDDS::DCPS::RepoId & | participantId, | |||
const OpenDDS::DCPS::RepoId & | subscriptionId | |||
) | [virtual] |
Definition at line 933 of file DCPSInfo_i.cpp.
References Update::Actor, Update::DataReader, OpenDDS::DCPS::DCPS_debug_level, Update::Manager::destroy(), domains_, DCPS_IR_Participant::isBitPublisher(), DCPS_IR_Participant::isOwner(), DCPS_IR_Participant::remove_subscription(), and um_.
Referenced by OpenDDS::Federator::ManagerImpl::processDelete().
00937 { 00938 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_); 00939 00940 // Grab the domain. 00941 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId); 00942 00943 if (where == this->domains_.end()) { 00944 throw OpenDDS::DCPS::Invalid_Domain(); 00945 } 00946 00947 // Grab the participant. 00948 DCPS_IR_Participant* partPtr 00949 = where->second->participant(participantId); 00950 00951 if (0 == partPtr) { 00952 throw OpenDDS::DCPS::Invalid_Participant(); 00953 } 00954 00955 if (partPtr->remove_subscription(subscriptionId) != 0) { 00956 // throw exception because the subscription was not removed! 00957 throw OpenDDS::DCPS::Invalid_Subscription(); 00958 } 00959 00960 where->second->remove_dead_participants(); 00961 00962 if (this->um_ 00963 && (partPtr->isOwner() == true) 00964 && (partPtr->isBitPublisher() == false)) { 00965 Update::IdPath path(domainId, participantId, subscriptionId); 00966 this->um_->destroy(path, Update::Actor, Update::DataReader); 00967 00968 if (OpenDDS::DCPS::DCPS_debug_level > 4) { 00969 OpenDDS::DCPS::RepoIdConverter converter(subscriptionId); 00970 ACE_DEBUG((LM_DEBUG, 00971 ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::remove_subscription: ") 00972 ACE_TEXT("pushing deletion of subscription %C in domain %d.\n"), 00973 std::string(converter).c_str(), 00974 domainId)); 00975 } 00976 } 00977 }
OpenDDS::DCPS::TopicStatus TAO_DDS_DCPSInfo_i::remove_topic | ( | DDS::DomainId_t | domainId, | |
const OpenDDS::DCPS::RepoId & | participantId, | |||
const OpenDDS::DCPS::RepoId & | topicId | |||
) | [virtual] |
Definition at line 315 of file DCPSInfo_i.cpp.
References OpenDDS::DCPS::DCPS_debug_level, Update::Manager::destroy(), domains_, DCPS_IR_Participant::find_topic_reference(), OpenDDS::DCPS::INTERNAL_ERROR, DCPS_IR_Participant::isBitPublisher(), DCPS_IR_Participant::isOwner(), Update::Topic, and um_.
Referenced by OpenDDS::Federator::ManagerImpl::processDelete().
00319 { 00320 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, OpenDDS::DCPS::INTERNAL_ERROR); 00321 00322 // Grab the domain. 00323 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId); 00324 00325 if (where == this->domains_.end()) { 00326 throw OpenDDS::DCPS::Invalid_Domain(); 00327 } 00328 00329 // Grab the participant. 00330 DCPS_IR_Participant* partPtr 00331 = where->second->participant(participantId); 00332 00333 if (0 == partPtr) { 00334 throw OpenDDS::DCPS::Invalid_Participant(); 00335 } 00336 00337 DCPS_IR_Topic* topic; 00338 00339 if (partPtr->find_topic_reference(topicId, topic) != 0) { 00340 throw OpenDDS::DCPS::Invalid_Topic(); 00341 } 00342 00343 OpenDDS::DCPS::TopicStatus removedStatus = where->second->remove_topic(partPtr, topic); 00344 00345 if (this->um_ 00346 && (partPtr->isOwner() == true) 00347 && (partPtr->isBitPublisher() == false)) { 00348 Update::IdPath path(domainId, participantId, topicId); 00349 this->um_->destroy(path, Update::Topic); 00350 00351 if (OpenDDS::DCPS::DCPS_debug_level > 4) { 00352 OpenDDS::DCPS::RepoIdConverter converter(topicId); 00353 ACE_DEBUG((LM_DEBUG, 00354 ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::remove_topic: ") 00355 ACE_TEXT("pushing deletion of topic %C in domain %d.\n"), 00356 std::string(converter).c_str(), 00357 domainId)); 00358 } 00359 } 00360 00361 return removedStatus; 00362 }
void TAO_DDS_DCPSInfo_i::shutdown | ( | ) | [virtual] |
Cause the entire repository to exit.
Definition at line 103 of file DCPSInfo_i.cpp.
References ShutdownInterface::shutdown(), and shutdown_.
Referenced by OpenDDS::Federator::ManagerImpl::leave_and_shutdown(), and OpenDDS::Federator::ManagerImpl::shutdown().
CORBA::Boolean TAO_DDS_DCPSInfo_i::update_domain_participant_qos | ( | DDS::DomainId_t | domain, | |
const OpenDDS::DCPS::RepoId & | participantId, | |||
const DDS::DomainParticipantQos & | qos | |||
) | [virtual] |
Definition at line 2045 of file DCPSInfo_i.cpp.
References OpenDDS::DCPS::DCPS_debug_level, domains_, DCPS_IR_Participant::isBitPublisher(), DCPS_IR_Participant::isOwner(), DCPS_IR_Participant::set_qos(), um_, and Update::Manager::update().
Referenced by OpenDDS::Federator::ManagerImpl::processUpdateQos1().
02049 { 02050 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, 0); 02051 02052 // Grab the domain. 02053 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId); 02054 02055 if (where == this->domains_.end()) { 02056 throw OpenDDS::DCPS::Invalid_Domain(); 02057 } 02058 02059 // Grab the participant. 02060 DCPS_IR_Participant* partPtr 02061 = where->second->participant(participantId); 02062 02063 if (0 == partPtr) { 02064 throw OpenDDS::DCPS::Invalid_Participant(); 02065 } 02066 02067 if (partPtr->set_qos(qos) == false) 02068 return 0; 02069 02070 if (this->um_ 02071 && (partPtr->isOwner() == true) 02072 && (partPtr->isBitPublisher() == false)) { 02073 Update::IdPath path(domainId, participantId, participantId); 02074 this->um_->update(path, qos); 02075 02076 if (OpenDDS::DCPS::DCPS_debug_level > 4) { 02077 OpenDDS::DCPS::RepoIdConverter converter(participantId); 02078 ACE_DEBUG((LM_DEBUG, 02079 ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::update_domain_participant_qos: ") 02080 ACE_TEXT("pushing update of participant %C in domain %d.\n"), 02081 std::string(converter).c_str(), 02082 domainId)); 02083 } 02084 } 02085 02086 return 1; 02087 }
void TAO_DDS_DCPSInfo_i::update_publication_qos | ( | DDS::DomainId_t | domainId, | |
const OpenDDS::DCPS::RepoId & | partId, | |||
const OpenDDS::DCPS::RepoId & | dwId, | |||
const DDS::PublisherQos & | qos | |||
) |
Entry for federation updates of PublisherQos values.
Definition at line 1742 of file DCPSInfo_i.cpp.
References OpenDDS::DCPS::DCPS_debug_level, domains_, DCPS_IR_Participant::find_publication_reference(), and DCPS_IR_Publication::set_qos().
01747 { 01748 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_); 01749 01750 // Grab the domain. 01751 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId); 01752 01753 if (where == this->domains_.end()) { 01754 throw OpenDDS::DCPS::Invalid_Domain(); 01755 } 01756 01757 // Grab the participant. 01758 DCPS_IR_Participant* partPtr 01759 = where->second->participant(partId); 01760 01761 if (0 == partPtr) { 01762 throw OpenDDS::DCPS::Invalid_Participant(); 01763 } 01764 01765 if (OpenDDS::DCPS::DCPS_debug_level > 3) { 01766 ACE_DEBUG((LM_INFO, "(%P|%t) updating publication qos 3\n")); 01767 } 01768 01769 DCPS_IR_Publication* pub; 01770 01771 if (partPtr->find_publication_reference(dwId, pub) != 0 || pub == 0) { 01772 OpenDDS::DCPS::RepoIdConverter part_converter(partId); 01773 OpenDDS::DCPS::RepoIdConverter pub_converter(dwId); 01774 ACE_ERROR((LM_ERROR, 01775 ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::update_publication_qos: ") 01776 ACE_TEXT("participant %C could not find publication %C.\n"), 01777 std::string(part_converter).c_str(), 01778 std::string(pub_converter).c_str())); 01779 throw OpenDDS::DCPS::Invalid_Publication(); 01780 } 01781 01782 pub->set_qos(qos); 01783 }
void TAO_DDS_DCPSInfo_i::update_publication_qos | ( | DDS::DomainId_t | domainId, | |
const OpenDDS::DCPS::RepoId & | partId, | |||
const OpenDDS::DCPS::RepoId & | dwId, | |||
const DDS::DataWriterQos & | qos | |||
) |
Entry for federation updates of DataWriterQos values.
Definition at line 1698 of file DCPSInfo_i.cpp.
References OpenDDS::DCPS::DCPS_debug_level, domains_, DCPS_IR_Participant::find_publication_reference(), and DCPS_IR_Publication::set_qos().
01703 { 01704 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_); 01705 01706 // Grab the domain. 01707 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId); 01708 01709 if (where == this->domains_.end()) { 01710 throw OpenDDS::DCPS::Invalid_Domain(); 01711 } 01712 01713 // Grab the participant. 01714 DCPS_IR_Participant* partPtr 01715 = where->second->participant(partId); 01716 01717 if (0 == partPtr) { 01718 throw OpenDDS::DCPS::Invalid_Participant(); 01719 } 01720 01721 if (OpenDDS::DCPS::DCPS_debug_level > 3) { 01722 ACE_DEBUG((LM_INFO, "(%P|%t) updating publication qos 2\n")); 01723 } 01724 01725 DCPS_IR_Publication* pub; 01726 01727 if (partPtr->find_publication_reference(dwId, pub) != 0 || pub == 0) { 01728 OpenDDS::DCPS::RepoIdConverter part_converter(partId); 01729 OpenDDS::DCPS::RepoIdConverter pub_converter(dwId); 01730 ACE_ERROR((LM_ERROR, 01731 ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::update_publication_qos: ") 01732 ACE_TEXT("participant %C could not find publication %C.\n"), 01733 std::string(part_converter).c_str(), 01734 std::string(pub_converter).c_str())); 01735 throw OpenDDS::DCPS::Invalid_Publication(); 01736 } 01737 01738 pub->set_qos(qos); 01739 }
CORBA::Boolean TAO_DDS_DCPSInfo_i::update_publication_qos | ( | DDS::DomainId_t | domainId, | |
const OpenDDS::DCPS::RepoId & | partId, | |||
const OpenDDS::DCPS::RepoId & | dwId, | |||
const DDS::DataWriterQos & | qos, | |||
const DDS::PublisherQos & | publisherQos | |||
) | [virtual] |
Definition at line 1621 of file DCPSInfo_i.cpp.
References Update::DataWriterQos, OpenDDS::DCPS::DCPS_debug_level, domains_, DCPS_IR_Participant::find_publication_reference(), DCPS_IR_Participant::isBitPublisher(), Update::NoQos, Update::PublisherQos, DCPS_IR_Publication::set_qos(), um_, and Update::Manager::update().
Referenced by OpenDDS::Federator::ManagerImpl::processUpdateQos1(), and OpenDDS::Federator::ManagerImpl::processUpdateQos2().
01627 { 01628 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, 0); 01629 01630 // Grab the domain. 01631 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId); 01632 01633 if (where == this->domains_.end()) { 01634 throw OpenDDS::DCPS::Invalid_Domain(); 01635 } 01636 01637 // Grab the participant. 01638 DCPS_IR_Participant* partPtr 01639 = where->second->participant(partId); 01640 01641 if (0 == partPtr) { 01642 throw OpenDDS::DCPS::Invalid_Participant(); 01643 } 01644 01645 if (OpenDDS::DCPS::DCPS_debug_level > 3) { 01646 ACE_DEBUG((LM_INFO, "(%P|%t) updating publication qos 1\n")); 01647 } 01648 01649 DCPS_IR_Publication* pub; 01650 01651 if (partPtr->find_publication_reference(dwId, pub) != 0 || pub == 0) { 01652 OpenDDS::DCPS::RepoIdConverter part_converter(partId); 01653 OpenDDS::DCPS::RepoIdConverter pub_converter(dwId); 01654 ACE_ERROR((LM_ERROR, 01655 ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::update_publication_qos: ") 01656 ACE_TEXT("participant %C could not find publication %C.\n"), 01657 std::string(part_converter).c_str(), 01658 std::string(pub_converter).c_str())); 01659 throw OpenDDS::DCPS::Invalid_Publication(); 01660 } 01661 01662 Update::SpecificQos qosType; 01663 01664 if (pub->set_qos(qos, publisherQos, qosType) == false) // failed 01665 return 0; 01666 01667 if (this->um_ && (partPtr->isBitPublisher() == false)) { 01668 Update::IdPath path(domainId, partId, dwId); 01669 01670 switch (qosType) { 01671 case Update::DataWriterQos: 01672 this->um_->update(path, qos); 01673 break; 01674 01675 case Update::PublisherQos: 01676 this->um_->update(path, publisherQos); 01677 break; 01678 01679 case Update::NoQos: 01680 default: 01681 break; 01682 } 01683 01684 if (OpenDDS::DCPS::DCPS_debug_level > 4) { 01685 OpenDDS::DCPS::RepoIdConverter converter(dwId); 01686 ACE_DEBUG((LM_DEBUG, 01687 ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::update_publication_qos: ") 01688 ACE_TEXT("pushing update of publication %C in domain %d.\n"), 01689 std::string(converter).c_str(), 01690 domainId)); 01691 } 01692 } 01693 01694 return 1; 01695 }
CORBA::Boolean TAO_DDS_DCPSInfo_i::update_subscription_params | ( | DDS::DomainId_t | domainId, | |
const OpenDDS::DCPS::RepoId & | participantId, | |||
const OpenDDS::DCPS::RepoId & | subscriptionId, | |||
const DDS::StringSeq & | params | |||
) |
Definition at line 1950 of file DCPSInfo_i.cpp.
References OpenDDS::DCPS::DCPS_debug_level, domain(), domains_, DCPS_IR_Participant::find_subscription_reference(), DCPS_IR_Participant::isBitPublisher(), um_, Update::Manager::update(), and DCPS_IR_Subscription::update_expr_params().
Referenced by OpenDDS::Federator::ManagerImpl::processUpdateFilterExpressionParams().
01955 { 01956 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, 0); 01957 01958 DCPS_IR_Domain_Map::iterator domain = this->domains_.find(domainId); 01959 if (domain == this->domains_.end()) { 01960 throw OpenDDS::DCPS::Invalid_Domain(); 01961 } 01962 01963 DCPS_IR_Participant* partPtr = domain->second->participant(participantId); 01964 if (0 == partPtr) { 01965 throw OpenDDS::DCPS::Invalid_Participant(); 01966 } 01967 01968 if (OpenDDS::DCPS::DCPS_debug_level > 3) { 01969 ACE_DEBUG((LM_INFO, "(%P|%t) updating subscription params\n")); 01970 } 01971 01972 DCPS_IR_Subscription* sub; 01973 if (partPtr->find_subscription_reference(subscriptionId, sub) != 0) { 01974 OpenDDS::DCPS::RepoIdConverter part_converter(participantId); 01975 OpenDDS::DCPS::RepoIdConverter sub_converter(subscriptionId); 01976 ACE_ERROR((LM_ERROR, 01977 ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::update_subscription_params: ") 01978 ACE_TEXT("participant %C could not find subscription %C.\n"), 01979 std::string(part_converter).c_str(), 01980 std::string(sub_converter).c_str())); 01981 throw OpenDDS::DCPS::Invalid_Subscription(); 01982 } 01983 01984 sub->update_expr_params(params); // calls writers via DataWriterRemote 01985 01986 if (this->um_ && !partPtr->isBitPublisher()) { 01987 Update::IdPath path(domainId, participantId, subscriptionId); 01988 this->um_->update(path, params); 01989 } 01990 01991 return true; 01992 }
void TAO_DDS_DCPSInfo_i::update_subscription_qos | ( | DDS::DomainId_t | domainId, | |
const OpenDDS::DCPS::RepoId & | partId, | |||
const OpenDDS::DCPS::RepoId & | drId, | |||
const DDS::SubscriberQos & | qos | |||
) |
Entry for federation updates of SubscriberQos values.
Definition at line 1906 of file DCPSInfo_i.cpp.
References OpenDDS::DCPS::DCPS_debug_level, domains_, DCPS_IR_Participant::find_subscription_reference(), and DCPS_IR_Subscription::set_qos().
01911 { 01912 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_); 01913 01914 // Grab the domain. 01915 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId); 01916 01917 if (where == this->domains_.end()) { 01918 throw OpenDDS::DCPS::Invalid_Domain(); 01919 } 01920 01921 // Grab the participant. 01922 DCPS_IR_Participant* partPtr 01923 = where->second->participant(partId); 01924 01925 if (0 == partPtr) { 01926 throw OpenDDS::DCPS::Invalid_Participant(); 01927 } 01928 01929 DCPS_IR_Subscription* sub; 01930 01931 if (OpenDDS::DCPS::DCPS_debug_level > 3) { 01932 ACE_DEBUG((LM_INFO, "(%P|%t) updating QOS for subscription 3\n")); 01933 } 01934 01935 if (partPtr->find_subscription_reference(drId, sub) != 0 || sub == 0) { 01936 OpenDDS::DCPS::RepoIdConverter part_converter(partId); 01937 OpenDDS::DCPS::RepoIdConverter sub_converter(drId); 01938 ACE_ERROR((LM_ERROR, 01939 ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::update_subscription_qos: ") 01940 ACE_TEXT("participant %C could not find subscription %C.\n"), 01941 std::string(part_converter).c_str(), 01942 std::string(sub_converter).c_str())); 01943 throw OpenDDS::DCPS::Invalid_Subscription(); 01944 } 01945 01946 sub->set_qos(qos); 01947 }
void TAO_DDS_DCPSInfo_i::update_subscription_qos | ( | DDS::DomainId_t | domainId, | |
const OpenDDS::DCPS::RepoId & | partId, | |||
const OpenDDS::DCPS::RepoId & | drId, | |||
const DDS::DataReaderQos & | qos | |||
) |
Entry for federation updates of DataReaderQos values.
Definition at line 1862 of file DCPSInfo_i.cpp.
References OpenDDS::DCPS::DCPS_debug_level, domains_, DCPS_IR_Participant::find_subscription_reference(), and DCPS_IR_Subscription::set_qos().
01867 { 01868 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_); 01869 01870 // Grab the domain. 01871 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId); 01872 01873 if (where == this->domains_.end()) { 01874 throw OpenDDS::DCPS::Invalid_Domain(); 01875 } 01876 01877 // Grab the participant. 01878 DCPS_IR_Participant* partPtr 01879 = where->second->participant(partId); 01880 01881 if (0 == partPtr) { 01882 throw OpenDDS::DCPS::Invalid_Participant(); 01883 } 01884 01885 DCPS_IR_Subscription* sub; 01886 01887 if (OpenDDS::DCPS::DCPS_debug_level > 3) { 01888 ACE_DEBUG((LM_INFO, "(%P|%t) updating QOS for subscription 2\n")); 01889 } 01890 01891 if (partPtr->find_subscription_reference(drId, sub) != 0 || sub == 0) { 01892 OpenDDS::DCPS::RepoIdConverter part_converter(partId); 01893 OpenDDS::DCPS::RepoIdConverter sub_converter(drId); 01894 ACE_ERROR((LM_ERROR, 01895 ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::update_subscription_qos: ") 01896 ACE_TEXT("participant %C could not find subscription %C.\n"), 01897 std::string(part_converter).c_str(), 01898 std::string(sub_converter).c_str())); 01899 throw OpenDDS::DCPS::Invalid_Subscription(); 01900 } 01901 01902 sub->set_qos(qos); 01903 }
CORBA::Boolean TAO_DDS_DCPSInfo_i::update_subscription_qos | ( | DDS::DomainId_t | domainId, | |
const OpenDDS::DCPS::RepoId & | partId, | |||
const OpenDDS::DCPS::RepoId & | drId, | |||
const DDS::DataReaderQos & | qos, | |||
const DDS::SubscriberQos & | subscriberQos | |||
) | [virtual] |
Definition at line 1785 of file DCPSInfo_i.cpp.
References Update::DataReaderQos, OpenDDS::DCPS::DCPS_debug_level, domains_, DCPS_IR_Participant::find_subscription_reference(), DCPS_IR_Participant::isBitPublisher(), Update::NoQos, DCPS_IR_Subscription::set_qos(), Update::SubscriberQos, um_, and Update::Manager::update().
Referenced by OpenDDS::Federator::ManagerImpl::processUpdateQos1(), and OpenDDS::Federator::ManagerImpl::processUpdateQos2().
01791 { 01792 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, 0); 01793 01794 // Grab the domain. 01795 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId); 01796 01797 if (where == this->domains_.end()) { 01798 throw OpenDDS::DCPS::Invalid_Domain(); 01799 } 01800 01801 // Grab the participant. 01802 DCPS_IR_Participant* partPtr 01803 = where->second->participant(partId); 01804 01805 if (0 == partPtr) { 01806 throw OpenDDS::DCPS::Invalid_Participant(); 01807 } 01808 01809 DCPS_IR_Subscription* sub; 01810 01811 if (OpenDDS::DCPS::DCPS_debug_level > 3) { 01812 ACE_DEBUG((LM_INFO, "(%P|%t) updating QOS for subscription 1\n")); 01813 } 01814 01815 if (partPtr->find_subscription_reference(drId, sub) != 0 || sub == 0) { 01816 OpenDDS::DCPS::RepoIdConverter part_converter(partId); 01817 OpenDDS::DCPS::RepoIdConverter sub_converter(drId); 01818 ACE_ERROR((LM_ERROR, 01819 ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::update_subscription_qos: ") 01820 ACE_TEXT("participant %C could not find subscription %C.\n"), 01821 std::string(part_converter).c_str(), 01822 std::string(sub_converter).c_str())); 01823 throw OpenDDS::DCPS::Invalid_Subscription(); 01824 } 01825 01826 Update::SpecificQos qosType; 01827 01828 if (sub->set_qos(qos, subscriberQos, qosType) == false) // failed 01829 return 0; 01830 01831 if (this->um_ && (partPtr->isBitPublisher() == false)) { 01832 Update::IdPath path(domainId, partId, drId); 01833 01834 switch (qosType) { 01835 case Update::DataReaderQos: 01836 this->um_->update(path, qos); 01837 break; 01838 01839 case Update::SubscriberQos: 01840 this->um_->update(path, subscriberQos); 01841 break; 01842 01843 case Update::NoQos: 01844 default: 01845 break; 01846 } 01847 01848 if (OpenDDS::DCPS::DCPS_debug_level > 4) { 01849 OpenDDS::DCPS::RepoIdConverter converter(drId); 01850 ACE_DEBUG((LM_DEBUG, 01851 ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::update_subscription_qos: ") 01852 ACE_TEXT("pushing update of subscription %C in domain %d.\n"), 01853 std::string(converter).c_str(), 01854 domainId)); 01855 } 01856 } 01857 01858 return 1; 01859 }
CORBA::Boolean TAO_DDS_DCPSInfo_i::update_topic_qos | ( | const OpenDDS::DCPS::RepoId & | topicId, | |
DDS::DomainId_t | domainId, | |||
const OpenDDS::DCPS::RepoId & | participantId, | |||
const DDS::TopicQos & | qos | |||
) | [virtual] |
Definition at line 1994 of file DCPSInfo_i.cpp.
References OpenDDS::DCPS::DCPS_debug_level, domains_, DCPS_IR_Participant::find_topic_reference(), DCPS_IR_Participant::isBitPublisher(), DCPS_IR_Participant::isOwner(), DCPS_IR_Topic::set_topic_qos(), um_, and Update::Manager::update().
Referenced by OpenDDS::Federator::ManagerImpl::processUpdateQos1().
01999 { 02000 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, 0); 02001 02002 // Grab the domain. 02003 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId); 02004 02005 if (where == this->domains_.end()) { 02006 throw OpenDDS::DCPS::Invalid_Domain(); 02007 } 02008 02009 // Grab the participant. 02010 DCPS_IR_Participant* partPtr 02011 = where->second->participant(participantId); 02012 02013 if (0 == partPtr) { 02014 throw OpenDDS::DCPS::Invalid_Participant(); 02015 } 02016 02017 DCPS_IR_Topic* topic; 02018 02019 if (partPtr->find_topic_reference(topicId, topic) != 0) { 02020 throw OpenDDS::DCPS::Invalid_Topic(); 02021 } 02022 02023 if (topic->set_topic_qos(qos) == false) 02024 return 0; 02025 02026 if (this->um_ 02027 && (partPtr->isOwner() == true) 02028 && (partPtr->isBitPublisher() == false)) { 02029 Update::IdPath path(domainId, participantId, topicId); 02030 this->um_->update(path, qos); 02031 02032 if (OpenDDS::DCPS::DCPS_debug_level > 4) { 02033 OpenDDS::DCPS::RepoIdConverter converter(topicId); 02034 ACE_DEBUG((LM_DEBUG, 02035 ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::update_topic_qos: ") 02036 ACE_TEXT("pushing update of topic %C in domain %d.\n"), 02037 std::string(converter).c_str(), 02038 domainId)); 02039 } 02040 } 02041 02042 return 1; 02043 }
long TAO_DDS_DCPSInfo_i::dispatch_check_timer_id_ [private] |
CORBA::ORB_var TAO_DDS_DCPSInfo_i::dispatchingOrb_ [private] |
Definition at line 414 of file DCPSInfo_i.h.
Referenced by add_publication(), add_subscription(), handle_timeout(), and TAO_DDS_DCPSInfo_i().
Definition at line 412 of file DCPSInfo_i.h.
Referenced by add_publication(), add_subscription(), add_topic(), assert_topic(), association_complete(), attach_participant(), changeOwnership(), domain(), domains(), dump_to_string(), find_topic(), handle_timeout(), ignore_domain_participant(), ignore_publication(), ignore_subscription(), ignore_topic(), receive_image(), remove_by_owner(), remove_domain_participant(), remove_publication(), remove_subscription(), remove_topic(), update_domain_participant_qos(), update_publication_qos(), update_subscription_params(), update_subscription_qos(), and update_topic_qos().
const TAO_DDS_DCPSFederationId& TAO_DDS_DCPSInfo_i::federation_ [private] |
Definition at line 416 of file DCPSInfo_i.h.
Referenced by add_domain_participant(), add_publication(), add_subscription(), add_topic(), and receive_image().
ACE_Recursive_Thread_Mutex TAO_DDS_DCPSInfo_i::lock_ [private] |
Definition at line 425 of file DCPSInfo_i.h.
CORBA::ORB_var TAO_DDS_DCPSInfo_i::orb_ [private] |
Definition at line 413 of file DCPSInfo_i.h.
Referenced by add_publication(), add_subscription(), finalize(), init_dispatchChecking(), and init_reassociation().
long TAO_DDS_DCPSInfo_i::reassociate_timer_id_ [private] |
bool TAO_DDS_DCPSInfo_i::reincarnate_ [private] |
ShutdownInterface* TAO_DDS_DCPSInfo_i::shutdown_ [private] |
Interface to effect shutdown of the process.
Definition at line 423 of file DCPSInfo_i.h.
Referenced by shutdown().
Update::Manager* TAO_DDS_DCPSInfo_i::um_ [private] |
Definition at line 419 of file DCPSInfo_i.h.
Referenced by add(), add_domain_participant(), add_publication(), add_subscription(), assert_topic(), init_persistence(), remove_domain_participant(), remove_publication(), remove_subscription(), remove_topic(), update_domain_participant_qos(), update_publication_qos(), update_subscription_params(), update_subscription_qos(), and update_topic_qos().