Discovery Strategy class that implements InfoRepo discovery. More...
#include <InfoRepoDiscovery.h>
Discovery Strategy class that implements InfoRepo discovery.
This class implements the Discovery interface for InfoRepo-based discovery.
Definition at line 44 of file InfoRepoDiscovery.h.
OpenDDS::DCPS::InfoRepoDiscovery::InfoRepoDiscovery | ( | const RepoKey & | key, | |
const std::string & | ior | |||
) |
Definition at line 133 of file InfoRepoDiscovery.cpp.
00135 : Discovery(key), 00136 ior_(ior), 00137 bit_transport_port_(0), 00138 use_local_bit_config_(false), 00139 orb_from_user_(false) 00140 { 00141 }
OpenDDS::DCPS::InfoRepoDiscovery::InfoRepoDiscovery | ( | const RepoKey & | key, | |
const DCPSInfo_var & | info | |||
) |
Definition at line 143 of file InfoRepoDiscovery.cpp.
00145 : Discovery(key), 00146 info_(info), 00147 bit_transport_port_(0), 00148 use_local_bit_config_(false), 00149 orb_from_user_(false) 00150 { 00151 }
OpenDDS::DCPS::InfoRepoDiscovery::~InfoRepoDiscovery | ( | ) | [virtual] |
Definition at line 153 of file InfoRepoDiscovery.cpp.
References CORBA::Exception::_info(), ACE_TEXT(), ACE_String_Base< ACE_CHAR_T >::c_str(), LM_ERROR, orb_from_user_, orb_runner_, OpenDDS::DCPS::InfoRepoDiscovery::OrbRunner::shutdown(), and OpenDDS::DCPS::InfoRepoDiscovery::OrbRunner::use_count_.
00154 { 00155 if (!orb_from_user_ && orb_runner_) { 00156 if (0 == --orb_runner_->use_count_) { 00157 try { 00158 orb_runner_->shutdown(); 00159 } 00160 catch (const CORBA::Exception& ex) { 00161 ACE_ERROR((LM_ERROR, 00162 ACE_TEXT("ERROR: InfoRepoDiscovery::~InfoRepoDiscovery - ") 00163 ACE_TEXT("Exception caught during ORB shutdown: %C.\n"), 00164 ex._info().c_str())); 00165 } 00166 00167 delete orb_runner_; 00168 orb_runner_ = 0; 00169 } 00170 } 00171 }
bool OpenDDS::DCPS::InfoRepoDiscovery::active | ( | void | ) | [virtual] |
Reimplemented from OpenDDS::DCPS::Discovery.
Definition at line 421 of file InfoRepoDiscovery.cpp.
References get_dcps_info().
00422 { 00423 try { 00424 // invoke a CORBA call, if we are active then there will be no exception 00425 get_dcps_info()->_is_a("Not_An_IDL_Type"); 00426 return true; 00427 } catch (const CORBA::Exception&) { 00428 return false; 00429 } 00430 }
DCPS::AddDomainStatus OpenDDS::DCPS::InfoRepoDiscovery::add_domain_participant | ( | DDS::DomainId_t | domain, | |
const DDS::DomainParticipantQos & | qos | |||
) | [virtual] |
Implements OpenDDS::DCPS::Discovery.
Definition at line 453 of file InfoRepoDiscovery.cpp.
References CORBA::Exception::_tao_print_exception(), get_dcps_info(), OpenDDS::DCPS::GUID_UNKNOWN, and CORBA::is_nil().
00455 { 00456 try { 00457 const DCPSInfo_var info = get_dcps_info(); 00458 if (!CORBA::is_nil(info)) { 00459 return info->add_domain_participant(domainId, qos); 00460 } 00461 } catch (const CORBA::Exception& ex) { 00462 ex._tao_print_exception("ERROR: InfoRepoDiscovery::add_domain_participant: "); 00463 } 00464 const DCPS::AddDomainStatus ads = {OpenDDS::DCPS::GUID_UNKNOWN, false /*federated*/}; 00465 return ads; 00466 }
RepoId OpenDDS::DCPS::InfoRepoDiscovery::add_publication | ( | DDS::DomainId_t | domainId, | |
const OpenDDS::DCPS::RepoId & | participantId, | |||
const OpenDDS::DCPS::RepoId & | topicId, | |||
OpenDDS::DCPS::DataWriterCallbacks * | publication, | |||
const DDS::DataWriterQos & | qos, | |||
const OpenDDS::DCPS::TransportLocatorSeq & | transInfo, | |||
const DDS::PublisherQos & | publisherQos | |||
) | [virtual] |
add the passed in publication into discovery. Discovery does not participate in memory management for the publication pointer, so it requires that the publication pointer remain valid until remove_publication is called.
Implements OpenDDS::DCPS::Discovery.
Definition at line 594 of file InfoRepoDiscovery.cpp.
References CORBA::Exception::_tao_print_exception(), ACE_NEW_RETURN(), dataWriterMap_, get_dcps_info(), OpenDDS::DCPS::GUID_UNKNOWN, lock_, and orb_.
00601 { 00602 RepoId pubId; 00603 00604 try { 00605 DCPS::DataWriterRemoteImpl* writer_remote_impl = 0; 00606 ACE_NEW_RETURN(writer_remote_impl, 00607 DataWriterRemoteImpl(*publication), 00608 DCPS::GUID_UNKNOWN); 00609 00610 //this is taking ownership of the DataWriterRemoteImpl (server side) allocated above 00611 PortableServer::ServantBase_var writer_remote(writer_remote_impl); 00612 00613 //this is the client reference to the DataWriterRemoteImpl 00614 OpenDDS::DCPS::DataWriterRemote_var dr_remote_obj = 00615 servant_to_remote_reference(writer_remote_impl, orb_); 00616 00617 pubId = get_dcps_info()->add_publication(domainId, participantId, topicId, 00618 dr_remote_obj, qos, transInfo, publisherQos); 00619 00620 ACE_GUARD_RETURN(ACE_Thread_Mutex, g, this->lock_, DCPS::GUID_UNKNOWN); 00621 // take ownership of the client allocated above 00622 dataWriterMap_[pubId] = dr_remote_obj; 00623 00624 } catch (const CORBA::Exception& ex) { 00625 ex._tao_print_exception("ERROR: InfoRepoDiscovery::add_publication: "); 00626 pubId = DCPS::GUID_UNKNOWN; 00627 } 00628 00629 return pubId; 00630 }
RepoId OpenDDS::DCPS::InfoRepoDiscovery::add_subscription | ( | DDS::DomainId_t | domainId, | |
const OpenDDS::DCPS::RepoId & | participantId, | |||
const OpenDDS::DCPS::RepoId & | topicId, | |||
OpenDDS::DCPS::DataReaderCallbacks * | subscription, | |||
const DDS::DataReaderQos & | qos, | |||
const OpenDDS::DCPS::TransportLocatorSeq & | transInfo, | |||
const DDS::SubscriberQos & | subscriberQos, | |||
const char * | filterClassName, | |||
const char * | filterExpression, | |||
const DDS::StringSeq & | exprParams | |||
) | [virtual] |
add the passed in subscription into discovery. Discovery does not participate in memory management for the subscription pointer, so it requires that the subscription pointer remain valid until remove_subscription is called.
Implements OpenDDS::DCPS::Discovery.
Definition at line 686 of file InfoRepoDiscovery.cpp.
References CORBA::Exception::_tao_print_exception(), ACE_NEW_RETURN(), dataReaderMap_, get_dcps_info(), OpenDDS::DCPS::GUID_UNKNOWN, lock_, and orb_.
00696 { 00697 RepoId subId; 00698 00699 try { 00700 DCPS::DataReaderRemoteImpl* reader_remote_impl = 0; 00701 ACE_NEW_RETURN(reader_remote_impl, 00702 DataReaderRemoteImpl(*subscription), 00703 DCPS::GUID_UNKNOWN); 00704 00705 //this is taking ownership of the DataReaderRemoteImpl (server side) allocated above 00706 PortableServer::ServantBase_var reader_remote(reader_remote_impl); 00707 00708 //this is the client reference to the DataReaderRemoteImpl 00709 OpenDDS::DCPS::DataReaderRemote_var dr_remote_obj = 00710 servant_to_remote_reference(reader_remote_impl, orb_); 00711 00712 subId = get_dcps_info()->add_subscription(domainId, participantId, topicId, 00713 dr_remote_obj, qos, transInfo, subscriberQos, 00714 filterClassName, filterExpr, params); 00715 00716 ACE_GUARD_RETURN(ACE_Thread_Mutex, g, this->lock_, DCPS::GUID_UNKNOWN); 00717 // take ownership of the client allocated above 00718 dataReaderMap_[subId] = dr_remote_obj; 00719 00720 } catch (const CORBA::Exception& ex) { 00721 ex._tao_print_exception("ERROR: InfoRepoDiscovery::add_subscription: "); 00722 subId = DCPS::GUID_UNKNOWN; 00723 } 00724 return subId; 00725 }
virtual OpenDDS::DCPS::TopicStatus OpenDDS::DCPS::InfoRepoDiscovery::assert_topic | ( | OpenDDS::DCPS::RepoId_out | topicId, | |
DDS::DomainId_t | domainId, | |||
const OpenDDS::DCPS::RepoId & | participantId, | |||
const char * | topicName, | |||
const char * | dataTypeName, | |||
const DDS::TopicQos & | qos, | |||
bool | hasDcpsKey | |||
) | [virtual] |
void OpenDDS::DCPS::InfoRepoDiscovery::association_complete | ( | DDS::DomainId_t | domainId, | |
const OpenDDS::DCPS::RepoId & | participantId, | |||
const OpenDDS::DCPS::RepoId & | localId, | |||
const OpenDDS::DCPS::RepoId & | remoteId | |||
) | [virtual] |
Implements OpenDDS::DCPS::Discovery.
Definition at line 797 of file InfoRepoDiscovery.cpp.
References CORBA::Exception::_tao_print_exception(), and get_dcps_info().
00800 { 00801 try { 00802 get_dcps_info()->association_complete(domainId, participantId, localId, remoteId); 00803 } catch (const CORBA::Exception& ex) { 00804 ex._tao_print_exception("ERROR: InfoRepoDiscovery::association_complete: "); 00805 } 00806 }
bool OpenDDS::DCPS::InfoRepoDiscovery::attach_participant | ( | DDS::DomainId_t | domainId, | |
const OpenDDS::DCPS::RepoId & | participantId | |||
) | [virtual] |
Implements OpenDDS::DCPS::Discovery.
Definition at line 435 of file InfoRepoDiscovery.cpp.
References CORBA::Exception::_tao_print_exception(), and get_dcps_info().
00437 { 00438 try { 00439 return get_dcps_info()->attach_participant(domainId, participantId); 00440 } catch (const CORBA::Exception& ex) { 00441 ex._tao_print_exception("ERROR: InfoRepoDiscovery::attach_participant: "); 00442 return false; 00443 } 00444 }
TransportConfig_rch OpenDDS::DCPS::InfoRepoDiscovery::bit_config | ( | ) | [private] |
Definition at line 261 of file InfoRepoDiscovery.cpp.
References bit_config_, bit_transport_ip_, bit_transport_port_, OpenDDS::DCPS::TransportRegistry::create_config(), OpenDDS::DCPS::TransportRegistry::create_inst(), OpenDDS::DCPS::TransportRegistry::DEFAULT_INST_PREFIX, OpenDDS::DCPS::TransportRegistry::instance(), OpenDDS::DCPS::RcHandle< T >::is_nil(), OpenDDS::DCPS::Discovery::key(), OpenDDS::DCPS::static_rchandle_cast(), TheServiceParticipant, and use_local_bit_config_.
Referenced by init_bit().
00262 { 00263 #if !defined (DDS_HAS_MINIMUM_BIT) 00264 if (bit_config_.is_nil()) { 00265 const std::string cfg_name = TransportRegistry::DEFAULT_INST_PREFIX + 00266 std::string("_BITTransportConfig_") + key(); 00267 bit_config_ = TransportRegistry::instance()->create_config(cfg_name); 00268 00269 const std::string inst_name = TransportRegistry::DEFAULT_INST_PREFIX + 00270 std::string("_BITTCPTransportInst_") + key(); 00271 TransportInst_rch inst = 00272 TransportRegistry::instance()->create_inst(inst_name, "tcp"); 00273 bit_config_->instances_.push_back(inst); 00274 00275 if (!use_local_bit_config_) { 00276 bit_transport_ip_ = TheServiceParticipant->bit_transport_ip(); 00277 bit_transport_port_ = TheServiceParticipant->bit_transport_port(); 00278 } 00279 00280 // Use a static cast to avoid dependency on the Tcp library 00281 TcpInst_rch tcp_inst = static_rchandle_cast<TcpInst>(inst); 00282 00283 tcp_inst->datalink_release_delay_ = 0; 00284 if (!bit_transport_ip_.empty()) { 00285 tcp_inst->local_address(bit_transport_port_, 00286 bit_transport_ip_.c_str()); 00287 } else { 00288 tcp_inst->local_address_set_port(bit_transport_port_); 00289 } 00290 } 00291 return bit_config_; 00292 #else 00293 return TransportConfig_rch(); 00294 #endif 00295 }
RepoId OpenDDS::DCPS::InfoRepoDiscovery::bit_key_to_repo_id | ( | DomainParticipantImpl * | participant, | |
const char * | bit_topic_name, | |||
const DDS::BuiltinTopicKey_t & | key | |||
) | const [virtual] |
Implements OpenDDS::DCPS::Discovery.
Definition at line 408 of file InfoRepoDiscovery.cpp.
References OpenDDS::DCPS::GuidBuilder::create(), OpenDDS::DCPS::GuidBuilder::entityId(), OpenDDS::DCPS::RepoIdBuilder::federationId(), OpenDDS::DCPS::RepoIdBuilder::participantId(), and DDS::BuiltinTopicKey_t::value.
00411 { 00412 RepoId id = RepoIdBuilder::create(); 00413 RepoIdBuilder builder(id); 00414 builder.federationId(key.value[0]); 00415 builder.participantId(key.value[1]); 00416 builder.entityId(key.value[2]); 00417 return id; 00418 }
void OpenDDS::DCPS::InfoRepoDiscovery::bit_transport_ip | ( | const std::string & | ip | ) | [inline] |
Definition at line 63 of file InfoRepoDiscovery.h.
00063 { 00064 bit_transport_ip_ = ip; 00065 use_local_bit_config_ = true; 00066 }
std::string OpenDDS::DCPS::InfoRepoDiscovery::bit_transport_ip | ( | ) | const [inline] |
Definition at line 62 of file InfoRepoDiscovery.h.
00062 { return bit_transport_ip_; }
void OpenDDS::DCPS::InfoRepoDiscovery::bit_transport_port | ( | int | port | ) | [inline] |
Definition at line 57 of file InfoRepoDiscovery.h.
00057 { 00058 bit_transport_port_ = port; 00059 use_local_bit_config_ = true; 00060 }
int OpenDDS::DCPS::InfoRepoDiscovery::bit_transport_port | ( | ) | const [inline] |
Definition at line 56 of file InfoRepoDiscovery.h.
00056 { return bit_transport_port_; }
virtual OpenDDS::DCPS::TopicStatus OpenDDS::DCPS::InfoRepoDiscovery::find_topic | ( | DDS::DomainId_t | domainId, | |
const char * | topicName, | |||
CORBA::String_out | dataTypeName, | |||
DDS::TopicQos_out | qos, | |||
OpenDDS::DCPS::RepoId_out | topicId | |||
) | [virtual] |
void OpenDDS::DCPS::InfoRepoDiscovery::fini_bit | ( | DCPS::DomainParticipantImpl * | participant | ) | [virtual] |
Implements OpenDDS::DCPS::Discovery.
Definition at line 402 of file InfoRepoDiscovery.cpp.
OpenDDS::DCPS::RepoId OpenDDS::DCPS::InfoRepoDiscovery::generate_participant_guid | ( | ) | [virtual] |
Implements OpenDDS::DCPS::Discovery.
Definition at line 447 of file InfoRepoDiscovery.cpp.
References OpenDDS::DCPS::GUID_UNKNOWN.
00448 { 00449 return GUID_UNKNOWN; 00450 }
DCPSInfo_var OpenDDS::DCPS::InfoRepoDiscovery::get_dcps_info | ( | ) |
Definition at line 207 of file InfoRepoDiscovery.cpp.
References PortableServer::POA::_narrow(), CORBA::Exception::_tao_print_exception(), ACE_TEXT(), ACE_Task_Base::activate(), ACE_ARGV_T< CHAR_TYPE >::argc(), ACE_ARGV_T< CHAR_TYPE >::argv(), OpenDDS::DCPS::DEFAULT_ORB_NAME, info_, ior_, CORBA::is_nil(), OpenDDS::DCPS::Discovery::key(), LM_ERROR, mtx_orb_runner_, OpenDDS::DCPS::InfoRepoDiscovery::OrbRunner::orb_, orb_, CORBA::ORB_init(), orb_runner_, TheServiceParticipant, and OpenDDS::DCPS::InfoRepoDiscovery::OrbRunner::use_count_.
Referenced by active(), add_domain_participant(), add_publication(), add_subscription(), association_complete(), attach_participant(), ignore_domain_participant(), ignore_publication(), ignore_subscription(), ignore_topic(), remove_domain_participant(), remove_publication(), remove_subscription(), remove_topic(), update_domain_participant_qos(), update_publication_qos(), update_subscription_params(), update_subscription_qos(), and update_topic_qos().
00208 { 00209 if (CORBA::is_nil(this->info_.in())) { 00210 00211 if (!orb_) { 00212 ACE_GUARD_RETURN(ACE_Thread_Mutex, g, mtx_orb_runner_, 0); 00213 if (!orb_runner_) { 00214 orb_runner_ = new OrbRunner; 00215 ACE_ARGV* argv = TheServiceParticipant->ORB_argv(); 00216 int argc = argv->argc(); 00217 orb_runner_->orb_ = 00218 CORBA::ORB_init(argc, argv->argv(), DEFAULT_ORB_NAME); 00219 orb_runner_->use_count_ = 1; 00220 orb_runner_->activate(); 00221 00222 CORBA::Object_var rp = 00223 orb_runner_->orb_->resolve_initial_references(ROOT_POA); 00224 PortableServer::POA_var poa = PortableServer::POA::_narrow(rp); 00225 PortableServer::POAManager_var poa_manager = poa->the_POAManager(); 00226 poa_manager->activate(); 00227 } else { 00228 ++orb_runner_->use_count_; 00229 } 00230 orb_ = orb_runner_->orb_; 00231 } 00232 00233 try { 00234 this->info_ = get_repo(this->ior_.c_str(), orb_); 00235 00236 if (CORBA::is_nil(this->info_.in())) { 00237 ACE_ERROR((LM_ERROR, 00238 ACE_TEXT("(%P|%t) ERROR: InfoRepoDiscovery::get_dcps_info: ") 00239 ACE_TEXT("unable to narrow DCPSInfo (%C) for key %C.\n"), 00240 this->ior_.c_str(), 00241 this->key().c_str())); 00242 return DCPSInfo::_nil(); 00243 } 00244 00245 } catch (const CORBA::Exception& ex) { 00246 ex._tao_print_exception("ERROR: InfoRepoDiscovery::get_dcps_info: failed to resolve ior - "); 00247 return DCPSInfo::_nil(); 00248 } 00249 } 00250 00251 return this->info_; 00252 }
std::string OpenDDS::DCPS::InfoRepoDiscovery::get_stringified_dcps_info_ior | ( | ) |
Definition at line 255 of file InfoRepoDiscovery.cpp.
References ior_.
00256 { 00257 return this->ior_; 00258 }
bool OpenDDS::DCPS::InfoRepoDiscovery::ignore_domain_participant | ( | DDS::DomainId_t | domainId, | |
const OpenDDS::DCPS::RepoId & | myParticipantId, | |||
const OpenDDS::DCPS::RepoId & | ignoreId | |||
) | [virtual] |
Implements OpenDDS::DCPS::Discovery.
Definition at line 497 of file InfoRepoDiscovery.cpp.
References CORBA::Exception::_tao_print_exception(), and get_dcps_info().
00500 { 00501 try { 00502 get_dcps_info()->ignore_domain_participant(domainId, myParticipantId, ignoreId); 00503 return true; 00504 } catch (const CORBA::Exception& ex) { 00505 ex._tao_print_exception("ERROR: InfoRepoDiscovery::ignore_domain_participant: "); 00506 return false; 00507 } 00508 }
bool OpenDDS::DCPS::InfoRepoDiscovery::ignore_publication | ( | DDS::DomainId_t | domainId, | |
const OpenDDS::DCPS::RepoId & | myParticipantId, | |||
const OpenDDS::DCPS::RepoId & | ignoreId | |||
) | [virtual] |
Implements OpenDDS::DCPS::Discovery.
Definition at line 653 of file InfoRepoDiscovery.cpp.
References CORBA::Exception::_tao_print_exception(), and get_dcps_info().
00656 { 00657 try { 00658 get_dcps_info()->ignore_publication(domainId, participantId, ignoreId); 00659 return true; 00660 } catch (const CORBA::Exception& ex) { 00661 ex._tao_print_exception("ERROR: InfoRepoDiscovery::ignore_publication: "); 00662 return false; 00663 } 00664 }
bool OpenDDS::DCPS::InfoRepoDiscovery::ignore_subscription | ( | DDS::DomainId_t | domainId, | |
const OpenDDS::DCPS::RepoId & | myParticipantId, | |||
const OpenDDS::DCPS::RepoId & | ignoreId | |||
) | [virtual] |
Implements OpenDDS::DCPS::Discovery.
Definition at line 748 of file InfoRepoDiscovery.cpp.
References CORBA::Exception::_tao_print_exception(), and get_dcps_info().
00751 { 00752 try { 00753 get_dcps_info()->ignore_subscription(domainId, participantId, ignoreId); 00754 return true; 00755 } catch (const CORBA::Exception& ex) { 00756 ex._tao_print_exception("ERROR: InfoRepoDiscovery::ignore_subscription: "); 00757 return false; 00758 } 00759 }
bool OpenDDS::DCPS::InfoRepoDiscovery::ignore_topic | ( | DDS::DomainId_t | domainId, | |
const OpenDDS::DCPS::RepoId & | myParticipantId, | |||
const OpenDDS::DCPS::RepoId & | ignoreId | |||
) | [virtual] |
Implements OpenDDS::DCPS::Discovery.
Definition at line 566 of file InfoRepoDiscovery.cpp.
References CORBA::Exception::_tao_print_exception(), and get_dcps_info().
00568 { 00569 try { 00570 get_dcps_info()->ignore_topic(domainId, myParticipantId, ignoreId); 00571 return true; 00572 } catch (const CORBA::Exception& ex) { 00573 ex._tao_print_exception("ERROR: InfoRepoDiscovery::ignore_topic: "); 00574 return false; 00575 } 00576 }
DDS::Subscriber_ptr OpenDDS::DCPS::InfoRepoDiscovery::init_bit | ( | DomainParticipantImpl * | participant | ) | [virtual] |
Implements OpenDDS::DCPS::Discovery.
Definition at line 298 of file InfoRepoDiscovery.cpp.
References ACE_TEXT(), OpenDDS::DCPS::TransportRegistry::bind_config(), bit_config(), OpenDDS::DCPS::BUILT_IN_PARTICIPANT_TOPIC, OpenDDS::DCPS::BUILT_IN_PUBLICATION_TOPIC, OpenDDS::DCPS::BUILT_IN_SUBSCRIPTION_TOPIC, OpenDDS::DCPS::BUILT_IN_TOPIC_TOPIC, OpenDDS::DCPS::Discovery::create_bit_topics(), OpenDDS::DCPS::DomainParticipantImpl::create_subscriber(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::DEFAULT_STATUS_MASK, DDS::DataReaderQos::durability, OpenDDS::DCPS::DomainParticipantImpl::federated(), OpenDDS::DCPS::TransportRegistry::instance(), OpenDDS::DCPS::Discovery::key(), DDS::DataReaderQos::liveliness, LM_ERROR, LM_INFO, OpenDDS::DCPS::DomainParticipantImpl::lookup_topicdescription(), DDS::RETCODE_OK, SUBSCRIBER_QOS_DEFAULT, TheServiceParticipant, and DDS::TRANSIENT_LOCAL_DURABILITY_QOS.
00299 { 00300 #if defined (DDS_HAS_MINIMUM_BIT) 00301 ACE_UNUSED_ARG(participant); 00302 return 0; 00303 #else 00304 if (!TheServiceParticipant->get_BIT()) { 00305 return 0; 00306 } 00307 00308 if (create_bit_topics(participant) != DDS::RETCODE_OK) { 00309 return 0; 00310 } 00311 00312 DDS::Subscriber_var bit_subscriber = 00313 participant->create_subscriber(SUBSCRIBER_QOS_DEFAULT, 00314 DDS::SubscriberListener::_nil(), 00315 DEFAULT_STATUS_MASK); 00316 try { 00317 TransportConfig_rch config = bit_config(); 00318 TransportRegistry::instance()->bind_config(config, bit_subscriber); 00319 00320 } catch (const Transport::Exception&) { 00321 ACE_ERROR((LM_ERROR, "(%P|%t) InfoRepoDiscovery::init_bit, " 00322 "exception during transport initialization\n")); 00323 return 0; 00324 } 00325 00326 // DataReaders 00327 try { 00328 DDS::DataReaderQos participantReaderQos; 00329 bit_subscriber->get_default_datareader_qos(participantReaderQos); 00330 participantReaderQos.durability.kind = DDS::TRANSIENT_LOCAL_DURABILITY_QOS; 00331 00332 if (participant->federated()) { 00333 participantReaderQos.liveliness.lease_duration.nanosec = 0; 00334 participantReaderQos.liveliness.lease_duration.sec = 00335 TheServiceParticipant->federation_liveliness(); 00336 } 00337 00338 DDS::TopicDescription_var bit_part_topic = 00339 participant->lookup_topicdescription(BUILT_IN_PARTICIPANT_TOPIC); 00340 00341 DDS::DataReader_var dr = 00342 bit_subscriber->create_datareader(bit_part_topic, 00343 participantReaderQos, 00344 DDS::DataReaderListener::_nil(), 00345 DEFAULT_STATUS_MASK); 00346 00347 if (participant->federated()) { 00348 DDS::ParticipantBuiltinTopicDataDataReader_var pbit_dr = 00349 DDS::ParticipantBuiltinTopicDataDataReader::_narrow(dr.in()); 00350 00351 DataReaderListener_var failover = new FailoverListener(key()); 00352 pbit_dr->set_listener(failover, DEFAULT_STATUS_MASK); 00353 } 00354 00355 DDS::DataReaderQos dr_qos; 00356 bit_subscriber->get_default_datareader_qos(dr_qos); 00357 dr_qos.durability.kind = DDS::TRANSIENT_LOCAL_DURABILITY_QOS; 00358 00359 DDS::TopicDescription_var bit_topic_topic = 00360 participant->lookup_topicdescription(BUILT_IN_TOPIC_TOPIC); 00361 00362 dr = bit_subscriber->create_datareader(bit_topic_topic, 00363 dr_qos, 00364 DDS::DataReaderListener::_nil(), 00365 DEFAULT_STATUS_MASK); 00366 00367 DDS::TopicDescription_var bit_pub_topic = 00368 participant->lookup_topicdescription(BUILT_IN_PUBLICATION_TOPIC); 00369 00370 dr = bit_subscriber->create_datareader(bit_pub_topic, 00371 dr_qos, 00372 DDS::DataReaderListener::_nil(), 00373 OpenDDS::DCPS::DEFAULT_STATUS_MASK); 00374 00375 DDS::TopicDescription_var bit_sub_topic = 00376 participant->lookup_topicdescription(BUILT_IN_SUBSCRIPTION_TOPIC); 00377 00378 dr = bit_subscriber->create_datareader(bit_sub_topic, 00379 dr_qos, 00380 DDS::DataReaderListener::_nil(), 00381 OpenDDS::DCPS::DEFAULT_STATUS_MASK); 00382 00383 const DDS::ReturnCode_t ret = bit_subscriber->enable(); 00384 if (ret != DDS::RETCODE_OK) { 00385 if (DCPS_debug_level) { 00386 ACE_DEBUG((LM_INFO, ACE_TEXT("(%P|%t) InfoRepoDiscovery::init_bit") 00387 ACE_TEXT(" - Error %d enabling subscriber\n"), ret)); 00388 } 00389 return 0; 00390 } 00391 00392 } catch (const CORBA::Exception&) { 00393 ACE_ERROR((LM_ERROR, "(%P|%t) InfoRepoDiscovery::init_bit, " 00394 "exception during DataReader initialization\n")); 00395 return 0; 00396 } 00397 return bit_subscriber._retn(); 00398 #endif 00399 }
typedef OpenDDS::DCPS::InfoRepoDiscovery::OPENDDS_MAP_CMP | ( | RepoId | , | |
DataWriterRemote_var | , | |||
DCPS::GUID_tKeyLessThan | ||||
) | [private] |
typedef OpenDDS::DCPS::InfoRepoDiscovery::OPENDDS_MAP_CMP | ( | RepoId | , | |
DataReaderRemote_var | , | |||
DCPS::GUID_tKeyLessThan | ||||
) | [private] |
bool OpenDDS::DCPS::InfoRepoDiscovery::remove_domain_participant | ( | DDS::DomainId_t | domainId, | |
const OpenDDS::DCPS::RepoId & | participantId | |||
) | [virtual] |
Implements OpenDDS::DCPS::Discovery.
Definition at line 484 of file InfoRepoDiscovery.cpp.
References CORBA::Exception::_tao_print_exception(), and get_dcps_info().
00486 { 00487 try { 00488 get_dcps_info()->remove_domain_participant(domainId, participantId); 00489 return true; 00490 } catch (const CORBA::Exception& ex) { 00491 ex._tao_print_exception("ERROR: InfoRepoDiscovery::remove_domain_participant: "); 00492 return false; 00493 } 00494 }
bool OpenDDS::DCPS::InfoRepoDiscovery::remove_publication | ( | DDS::DomainId_t | domainId, | |
const OpenDDS::DCPS::RepoId & | participantId, | |||
const OpenDDS::DCPS::RepoId & | publicationId | |||
) | [virtual] |
Implements OpenDDS::DCPS::Discovery.
Definition at line 633 of file InfoRepoDiscovery.cpp.
References CORBA::Exception::_tao_print_exception(), get_dcps_info(), lock_, and removeDataWriterRemote().
00636 { 00637 { 00638 ACE_GUARD_RETURN(ACE_Thread_Mutex, g, this->lock_, false); 00639 removeDataWriterRemote(publicationId); 00640 } 00641 bool removed = false; 00642 try { 00643 get_dcps_info()->remove_publication(domainId, participantId, publicationId); 00644 removed = true; 00645 } catch (const CORBA::Exception& ex) { 00646 ex._tao_print_exception("ERROR: InfoRepoDiscovery::remove_publication: "); 00647 } 00648 00649 return removed; 00650 }
bool OpenDDS::DCPS::InfoRepoDiscovery::remove_subscription | ( | DDS::DomainId_t | domainId, | |
const OpenDDS::DCPS::RepoId & | participantId, | |||
const OpenDDS::DCPS::RepoId & | subscriptionId | |||
) | [virtual] |
Implements OpenDDS::DCPS::Discovery.
Definition at line 728 of file InfoRepoDiscovery.cpp.
References CORBA::Exception::_tao_print_exception(), get_dcps_info(), lock_, and removeDataReaderRemote().
00731 { 00732 { 00733 ACE_GUARD_RETURN(ACE_Thread_Mutex, g, this->lock_, false); 00734 removeDataReaderRemote(subscriptionId); 00735 } 00736 bool removed = false; 00737 try { 00738 get_dcps_info()->remove_subscription(domainId, participantId, subscriptionId); 00739 removed = true; 00740 } catch (const CORBA::Exception& ex) { 00741 ex._tao_print_exception("ERROR: InfoRepoDiscovery::remove_subscription: "); 00742 } 00743 00744 return removed; 00745 }
DCPS::TopicStatus OpenDDS::DCPS::InfoRepoDiscovery::remove_topic | ( | DDS::DomainId_t | domainId, | |
const OpenDDS::DCPS::RepoId & | participantId, | |||
const OpenDDS::DCPS::RepoId & | topicId | |||
) | [virtual] |
Implements OpenDDS::DCPS::Discovery.
Definition at line 554 of file InfoRepoDiscovery.cpp.
References CORBA::Exception::_tao_print_exception(), get_dcps_info(), and OpenDDS::DCPS::INTERNAL_ERROR.
00556 { 00557 try { 00558 return get_dcps_info()->remove_topic(domainId, participantId, topicId); 00559 } catch (const CORBA::Exception& ex) { 00560 ex._tao_print_exception("ERROR: InfoRepoDiscovery::remove_topic: "); 00561 return DCPS::INTERNAL_ERROR; 00562 } 00563 }
void OpenDDS::DCPS::InfoRepoDiscovery::removeDataReaderRemote | ( | const RepoId & | subscriptionId | ) | [private] |
Definition at line 809 of file InfoRepoDiscovery.cpp.
References ACE_TEXT(), dataReaderMap_, OpenDDS::DCPS::DataReaderRemoteImpl::detach_parent(), LM_ERROR, and orb_.
Referenced by remove_subscription().
00810 { 00811 DataReaderMap::iterator drr = dataReaderMap_.find(subscriptionId); 00812 if (drr == dataReaderMap_.end()) { 00813 ACE_ERROR((LM_ERROR, 00814 ACE_TEXT("(%P|%t) ERROR: InfoRepoDiscovery::removeDataReaderRemote: ") 00815 ACE_TEXT(" could not find DataReader for subscriptionId.\n"))); 00816 return; 00817 } 00818 00819 try { 00820 DataReaderRemoteImpl* impl = 00821 remote_reference_to_servant<DataReaderRemoteImpl>(drr->second.in(), orb_); 00822 impl->detach_parent(); 00823 deactivate_remote_object(drr->second.in(), orb_); 00824 } 00825 catch (::CORBA::BAD_INV_ORDER&){ 00826 // The orb may throw ::CORBA::BAD_INV_ORDER when is has been shutdown. 00827 // Ignore it anyway. 00828 } 00829 00830 dataReaderMap_.erase(drr); 00831 }
void OpenDDS::DCPS::InfoRepoDiscovery::removeDataWriterRemote | ( | const RepoId & | publicationId | ) | [private] |
Definition at line 834 of file InfoRepoDiscovery.cpp.
References ACE_TEXT(), dataWriterMap_, OpenDDS::DCPS::DataWriterRemoteImpl::detach_parent(), LM_ERROR, and orb_.
Referenced by remove_publication().
00835 { 00836 DataWriterMap::iterator dwr = dataWriterMap_.find(publicationId); 00837 if (dwr == dataWriterMap_.end()) { 00838 ACE_ERROR((LM_ERROR, 00839 ACE_TEXT("(%P|%t) ERROR: InfoRepoDiscovery::removeDataWriterRemote: ") 00840 ACE_TEXT(" could not find DataWriter for publicationId.\n"))); 00841 return; 00842 } 00843 00844 try { 00845 DataWriterRemoteImpl* impl = 00846 remote_reference_to_servant<DataWriterRemoteImpl>(dwr->second.in(), orb_); 00847 impl->detach_parent(); 00848 deactivate_remote_object(dwr->second.in(), orb_); 00849 } 00850 catch (::CORBA::BAD_INV_ORDER&){ 00851 // The orb may throw ::CORBA::BAD_INV_ORDER when is has been shutdown. 00852 // Ignore it anyway. 00853 } 00854 00855 dataWriterMap_.erase(dwr); 00856 }
bool OpenDDS::DCPS::InfoRepoDiscovery::set_ORB | ( | CORBA::ORB_ptr | orb | ) |
User provides an ORB for OpenDDS to use.
Definition at line 174 of file InfoRepoDiscovery.cpp.
References CORBA::ORB::_duplicate(), TAO_Pseudo_Var_T< T >::in(), CORBA::is_nil(), orb_, and orb_from_user_.
00175 { 00176 if (!CORBA::is_nil (orb_.in()) || CORBA::is_nil (orb)) { 00177 return false; 00178 } 00179 00180 orb_ = CORBA::ORB::_duplicate(orb); 00181 orb_from_user_ = true; 00182 return true; 00183 }
bool OpenDDS::DCPS::InfoRepoDiscovery::update_domain_participant_qos | ( | DDS::DomainId_t | domain, | |
const OpenDDS::DCPS::RepoId & | participantId, | |||
const DDS::DomainParticipantQos & | qos | |||
) | [virtual] |
Implements OpenDDS::DCPS::Discovery.
Definition at line 511 of file InfoRepoDiscovery.cpp.
References CORBA::Exception::_tao_print_exception(), and get_dcps_info().
00514 { 00515 try { 00516 return get_dcps_info()->update_domain_participant_qos(domainId, participant, qos); 00517 } catch (const CORBA::Exception& ex) { 00518 ex._tao_print_exception("ERROR: InfoRepoDiscovery::update_domain_participant_qos: "); 00519 return false; 00520 } 00521 }
bool OpenDDS::DCPS::InfoRepoDiscovery::update_publication_qos | ( | DDS::DomainId_t | domainId, | |
const OpenDDS::DCPS::RepoId & | partId, | |||
const OpenDDS::DCPS::RepoId & | dwId, | |||
const DDS::DataWriterQos & | qos, | |||
const DDS::PublisherQos & | publisherQos | |||
) | [virtual] |
Implements OpenDDS::DCPS::Discovery.
Definition at line 667 of file InfoRepoDiscovery.cpp.
References CORBA::Exception::_tao_print_exception(), and get_dcps_info().
00672 { 00673 try { 00674 return get_dcps_info()->update_publication_qos(domainId, participantId, dwId, 00675 qos, publisherQos); 00676 } catch (const CORBA::Exception& ex) { 00677 ex._tao_print_exception("ERROR: InfoRepoDiscovery::update_publication_qos: "); 00678 return false; 00679 } 00680 }
bool OpenDDS::DCPS::InfoRepoDiscovery::update_subscription_params | ( | DDS::DomainId_t | domainId, | |
const OpenDDS::DCPS::RepoId & | participantId, | |||
const OpenDDS::DCPS::RepoId & | subscriptionId, | |||
const DDS::StringSeq & | params | |||
) | [virtual] |
Implements OpenDDS::DCPS::Discovery.
Definition at line 778 of file InfoRepoDiscovery.cpp.
References CORBA::Exception::_tao_print_exception(), and get_dcps_info().
00783 { 00784 try { 00785 return get_dcps_info()->update_subscription_params(domainId, participantId, 00786 subId, params); 00787 } catch (const CORBA::Exception& ex) { 00788 ex._tao_print_exception("ERROR: InfoRepoDiscovery::update_subscription_params: "); 00789 return false; 00790 } 00791 }
bool OpenDDS::DCPS::InfoRepoDiscovery::update_subscription_qos | ( | DDS::DomainId_t | domainId, | |
const OpenDDS::DCPS::RepoId & | partId, | |||
const OpenDDS::DCPS::RepoId & | drId, | |||
const DDS::DataReaderQos & | qos, | |||
const DDS::SubscriberQos & | subscriberQos | |||
) | [virtual] |
Implements OpenDDS::DCPS::Discovery.
Definition at line 762 of file InfoRepoDiscovery.cpp.
References CORBA::Exception::_tao_print_exception(), and get_dcps_info().
00767 { 00768 try { 00769 return get_dcps_info()->update_subscription_qos(domainId, participantId, 00770 drId, qos, subQos); 00771 } catch (const CORBA::Exception& ex) { 00772 ex._tao_print_exception("ERROR: InfoRepoDiscovery::update_subscription_qos: "); 00773 return false; 00774 } 00775 }
bool OpenDDS::DCPS::InfoRepoDiscovery::update_topic_qos | ( | const OpenDDS::DCPS::RepoId & | topicId, | |
DDS::DomainId_t | domainId, | |||
const OpenDDS::DCPS::RepoId & | participantId, | |||
const DDS::TopicQos & | qos | |||
) | [virtual] |
Implements OpenDDS::DCPS::Discovery.
Definition at line 579 of file InfoRepoDiscovery.cpp.
References CORBA::Exception::_tao_print_exception(), and get_dcps_info().
00581 { 00582 try { 00583 return get_dcps_info()->update_topic_qos(topicId, domainId, participantId, qos); 00584 } catch (const CORBA::Exception& ex) { 00585 ex._tao_print_exception("ERROR: InfoRepoDiscovery::update_topic_qos: "); 00586 return false; 00587 } 00588 }
Definition at line 243 of file InfoRepoDiscovery.h.
Referenced by bit_config().
std::string OpenDDS::DCPS::InfoRepoDiscovery::bit_transport_ip_ [private] |
The builtin topic transport address.
Definition at line 237 of file InfoRepoDiscovery.h.
Referenced by bit_config().
int OpenDDS::DCPS::InfoRepoDiscovery::bit_transport_port_ [private] |
The builtin topic transport port number.
Definition at line 240 of file InfoRepoDiscovery.h.
Referenced by bit_config().
DataReaderMap OpenDDS::DCPS::InfoRepoDiscovery::dataReaderMap_ [private] |
Definition at line 265 of file InfoRepoDiscovery.h.
Referenced by add_subscription(), and removeDataReaderRemote().
DataWriterMap OpenDDS::DCPS::InfoRepoDiscovery::dataWriterMap_ [private] |
Definition at line 269 of file InfoRepoDiscovery.h.
Referenced by add_publication(), and removeDataWriterRemote().
DCPSInfo_var OpenDDS::DCPS::InfoRepoDiscovery::info_ [private] |
Definition at line 234 of file InfoRepoDiscovery.h.
Referenced by get_dcps_info().
std::string OpenDDS::DCPS::InfoRepoDiscovery::ior_ [private] |
Definition at line 233 of file InfoRepoDiscovery.h.
Referenced by get_dcps_info(), and get_stringified_dcps_info_ior().
ACE_Thread_Mutex OpenDDS::DCPS::InfoRepoDiscovery::lock_ [mutable, private] |
Definition at line 271 of file InfoRepoDiscovery.h.
Referenced by add_publication(), add_subscription(), remove_publication(), and remove_subscription().
ACE_Thread_Mutex OpenDDS::DCPS::InfoRepoDiscovery::mtx_orb_runner_ [static, private] |
Definition at line 261 of file InfoRepoDiscovery.h.
Referenced by get_dcps_info().
Definition at line 245 of file InfoRepoDiscovery.h.
Referenced by add_publication(), add_subscription(), get_dcps_info(), removeDataReaderRemote(), removeDataWriterRemote(), and set_ORB().
bool OpenDDS::DCPS::InfoRepoDiscovery::orb_from_user_ [private] |
Definition at line 246 of file InfoRepoDiscovery.h.
Referenced by set_ORB(), and ~InfoRepoDiscovery().
InfoRepoDiscovery::OrbRunner * OpenDDS::DCPS::InfoRepoDiscovery::orb_runner_ [static, private] |
Definition at line 260 of file InfoRepoDiscovery.h.
Referenced by get_dcps_info(), and ~InfoRepoDiscovery().
bool OpenDDS::DCPS::InfoRepoDiscovery::use_local_bit_config_ [private] |
Definition at line 242 of file InfoRepoDiscovery.h.
Referenced by bit_config().