00001
00002
00003
00004
00005
00006 #ifndef OPENDDS_DDS_DCPS_DISCOVERYBASE_H
00007 #define OPENDDS_DDS_DCPS_DISCOVERYBASE_H
00008
00009 #include "dds/DCPS/Discovery.h"
00010 #include "dds/DCPS/GuidUtils.h"
00011 #include "dds/DCPS/DCPS_Utils.h"
00012 #include "dds/DCPS/DomainParticipantImpl.h"
00013 #include "dds/DCPS/Marked_Default_Qos.h"
00014 #include "dds/DCPS/SubscriberImpl.h"
00015 #include "dds/DCPS/BuiltInTopicUtils.h"
00016 #include "dds/DCPS/Registered_Data_Types.h"
00017 #include "dds/DdsDcpsCoreTypeSupportImpl.h"
00018 #include "ace/Select_Reactor.h"
00019
00020 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00021 #pragma once
00022 #endif
00023
00024 namespace OpenDDS {
00025 namespace DCPS {
00026
00027 inline void assign(DCPS::EntityKey_t& lhs, unsigned int rhs)
00028 {
00029 lhs[0] = static_cast<CORBA::Octet>(rhs);
00030 lhs[1] = static_cast<CORBA::Octet>(rhs >> 8);
00031 lhs[2] = static_cast<CORBA::Octet>(rhs >> 16);
00032 }
00033
00034 struct DcpsUpcalls : ACE_Task_Base {
00035 DcpsUpcalls(DCPS::DataReaderCallbacks* drr,
00036 const RepoId& reader,
00037 const DCPS::WriterAssociation& wa,
00038 bool active,
00039 DCPS::DataWriterCallbacks* dwr)
00040 : drr_(drr), reader_(reader), wa_(wa), active_(active), dwr_(dwr)
00041 , reader_done_(false), writer_done_(false), cnd_(mtx_)
00042 {}
00043
00044 int svc()
00045 {
00046 drr_->add_association(reader_, wa_, active_);
00047 {
00048 ACE_GUARD_RETURN(ACE_Thread_Mutex, g, mtx_, -1);
00049 reader_done_ = true;
00050 cnd_.signal();
00051 while (!writer_done_) {
00052 cnd_.wait();
00053 }
00054 }
00055 dwr_->association_complete(reader_);
00056 return 0;
00057 }
00058
00059 void writer_done()
00060 {
00061 {
00062 ACE_GUARD(ACE_Thread_Mutex, g, mtx_);
00063 writer_done_ = true;
00064 cnd_.signal();
00065 }
00066 wait();
00067 }
00068
00069 DCPS::DataReaderCallbacks* const drr_;
00070 const RepoId& reader_;
00071 const DCPS::WriterAssociation& wa_;
00072 bool active_;
00073 DCPS::DataWriterCallbacks* const dwr_;
00074 bool reader_done_, writer_done_;
00075 ACE_Thread_Mutex mtx_;
00076 ACE_Condition_Thread_Mutex cnd_;
00077 };
00078
00079 template <typename DiscoveredParticipantData_>
00080 class EndpointManager {
00081 protected:
00082 struct DiscoveredSubscription {
00083 DiscoveredSubscription() : bit_ih_(DDS::HANDLE_NIL) {}
00084 explicit DiscoveredSubscription(const OpenDDS::DCPS::DiscoveredReaderData& r)
00085 : reader_data_(r), bit_ih_(DDS::HANDLE_NIL) {}
00086 OpenDDS::DCPS::DiscoveredReaderData reader_data_;
00087 DDS::InstanceHandle_t bit_ih_;
00088 };
00089 typedef OPENDDS_MAP_CMP(DCPS::RepoId, DiscoveredSubscription,
00090 DCPS::GUID_tKeyLessThan) DiscoveredSubscriptionMap;
00091 typedef typename DiscoveredSubscriptionMap::iterator DiscoveredSubscriptionIter;
00092
00093 struct DiscoveredPublication {
00094 DiscoveredPublication() : bit_ih_(DDS::HANDLE_NIL) {}
00095 explicit DiscoveredPublication(const OpenDDS::DCPS::DiscoveredWriterData& w)
00096 : writer_data_(w), bit_ih_(DDS::HANDLE_NIL) {}
00097 OpenDDS::DCPS::DiscoveredWriterData writer_data_;
00098 DDS::InstanceHandle_t bit_ih_;
00099 };
00100
00101 typedef OPENDDS_MAP_CMP(DCPS::RepoId, DiscoveredPublication,
00102 DCPS::GUID_tKeyLessThan) DiscoveredPublicationMap;
00103 typedef typename DiscoveredPublicationMap::iterator DiscoveredPublicationIter;
00104
00105 public:
00106 typedef DiscoveredParticipantData_ DiscoveredParticipantData;
00107
00108 struct TopicDetails {
00109 OPENDDS_STRING data_type_;
00110 DDS::TopicQos qos_;
00111 DCPS::RepoId repo_id_;
00112 bool has_dcps_key_;
00113 RepoIdSet endpoints_;
00114 };
00115
00116 EndpointManager(const RepoId& participant_id, ACE_Thread_Mutex& lock)
00117 : lock_(lock)
00118 , participant_id_(participant_id)
00119 , publication_counter_(0)
00120 , subscription_counter_(0)
00121 , topic_counter_(0)
00122 { }
00123
00124 virtual ~EndpointManager() { }
00125
00126 RepoId bit_key_to_repo_id(const char* bit_topic_name,
00127 const DDS::BuiltinTopicKey_t& key)
00128 {
00129 ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, RepoId());
00130 if (0 == std::strcmp(bit_topic_name, DCPS::BUILT_IN_PUBLICATION_TOPIC)) {
00131 return pub_key_to_id_[key];
00132 }
00133 if (0 == std::strcmp(bit_topic_name, DCPS::BUILT_IN_SUBSCRIPTION_TOPIC)) {
00134 return sub_key_to_id_[key];
00135 }
00136 return RepoId();
00137 }
00138
00139 void ignore(const DCPS::RepoId& to_ignore)
00140 {
00141
00142 ignored_guids_.insert(to_ignore);
00143 {
00144 const DiscoveredPublicationIter iter =
00145 discovered_publications_.find(to_ignore);
00146 if (iter != discovered_publications_.end()) {
00147
00148 topics_[get_topic_name(iter->second)].endpoints_.erase(iter->first);
00149 remove_from_bit(iter->second);
00150 OPENDDS_STRING topic_name = get_topic_name(iter->second);
00151 discovered_publications_.erase(iter);
00152
00153 typename OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it =
00154 topics_.find(topic_name);
00155 if (top_it != topics_.end()) {
00156 match_endpoints(to_ignore, top_it->second, true );
00157 }
00158 return;
00159 }
00160 }
00161 {
00162 const DiscoveredSubscriptionIter iter =
00163 discovered_subscriptions_.find(to_ignore);
00164 if (iter != discovered_subscriptions_.end()) {
00165
00166 topics_[get_topic_name(iter->second)].endpoints_.erase(iter->first);
00167 remove_from_bit(iter->second);
00168 OPENDDS_STRING topic_name = get_topic_name(iter->second);
00169 discovered_subscriptions_.erase(iter);
00170
00171 typename OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it =
00172 topics_.find(topic_name);
00173 if (top_it != topics_.end()) {
00174 match_endpoints(to_ignore, top_it->second, true );
00175 }
00176 return;
00177 }
00178 }
00179 {
00180 const OPENDDS_MAP_CMP(RepoId, OPENDDS_STRING, DCPS::GUID_tKeyLessThan)::iterator
00181 iter = topic_names_.find(to_ignore);
00182 if (iter != topic_names_.end()) {
00183 ignored_topics_.insert(iter->second);
00184
00185 typename OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it =
00186 topics_.find(iter->second);
00187 if (top_it != topics_.end()) {
00188 TopicDetails& td = top_it->second;
00189 RepoIdSet::iterator ep;
00190 for (ep = td.endpoints_.begin(); ep!= td.endpoints_.end(); ++ep) {
00191 match_endpoints(*ep, td, true );
00192 if (shutting_down()) { return; }
00193 }
00194 }
00195 }
00196 }
00197 }
00198
00199 bool ignoring(const DCPS::RepoId& guid) const {
00200 return ignored_guids_.count(guid);
00201 }
00202 bool ignoring(const char* topic_name) const {
00203 return ignored_topics_.count(topic_name);
00204 }
00205
00206 DCPS::TopicStatus assert_topic(DCPS::RepoId_out topicId, const char* topicName,
00207 const char* dataTypeName, const DDS::TopicQos& qos,
00208 bool hasDcpsKey)
00209 {
00210 ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, DCPS::INTERNAL_ERROR);
00211 typename OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator iter =
00212 topics_.find(topicName);
00213 if (iter != topics_.end()) {
00214 iter->second.qos_ = qos;
00215 iter->second.has_dcps_key_ = hasDcpsKey;
00216 topicId = iter->second.repo_id_;
00217 topic_names_[iter->second.repo_id_] = topicName;
00218 return DCPS::FOUND;
00219 }
00220
00221 TopicDetails& td = topics_[topicName];
00222 td.data_type_ = dataTypeName;
00223 td.qos_ = qos;
00224 td.has_dcps_key_ = hasDcpsKey;
00225 td.repo_id_ = make_topic_guid();
00226 topicId = td.repo_id_;
00227 topic_names_[td.repo_id_] = topicName;
00228
00229 return DCPS::CREATED;
00230 }
00231
00232 DCPS::TopicStatus remove_topic(const RepoId& topicId, OPENDDS_STRING& name)
00233 {
00234 ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, DCPS::INTERNAL_ERROR);
00235 name = topic_names_[topicId];
00236 typename OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it =
00237 topics_.find(name);
00238 if (top_it != topics_.end()) {
00239 TopicDetails& td = top_it->second;
00240 if (td.endpoints_.empty()) {
00241 topics_.erase(name);
00242 }
00243 }
00244
00245 topic_names_.erase(topicId);
00246 return DCPS::REMOVED;
00247 }
00248
00249 virtual bool update_topic_qos(const DCPS::RepoId& topicId, const DDS::TopicQos& qos,
00250 OPENDDS_STRING& name) = 0;
00251
00252 DCPS::RepoId add_publication(const DCPS::RepoId& topicId,
00253 DCPS::DataWriterCallbacks* publication,
00254 const DDS::DataWriterQos& qos,
00255 const DCPS::TransportLocatorSeq& transInfo,
00256 const DDS::PublisherQos& publisherQos)
00257 {
00258 ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, RepoId());
00259 RepoId rid = participant_id_;
00260 assign_publication_key(rid, topicId, qos);
00261 LocalPublication& pb = local_publications_[rid];
00262 pb.topic_id_ = topicId;
00263 pb.publication_ = publication;
00264 pb.qos_ = qos;
00265 pb.trans_info_ = transInfo;
00266 pb.publisher_qos_ = publisherQos;
00267 TopicDetails& td = topics_[topic_names_[topicId]];
00268 td.endpoints_.insert(rid);
00269
00270 if (DDS::RETCODE_OK != add_publication_i(rid, pb)) {
00271 return RepoId();
00272 }
00273
00274 if (DDS::RETCODE_OK != write_publication_data(rid, pb)) {
00275 return RepoId();
00276 }
00277
00278 if (DCPS::DCPS_debug_level > 3) {
00279 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) EndpointManager::add_publication - ")
00280 ACE_TEXT("calling match_endpoints\n")));
00281 }
00282 match_endpoints(rid, td);
00283
00284 return rid;
00285 }
00286
00287 void remove_publication(const DCPS::RepoId& publicationId)
00288 {
00289 ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00290 LocalPublicationIter iter = local_publications_.find(publicationId);
00291 if (iter != local_publications_.end()) {
00292 if (DDS::RETCODE_OK == remove_publication_i(publicationId))
00293 {
00294 OPENDDS_STRING topic_name = topic_names_[iter->second.topic_id_];
00295 local_publications_.erase(publicationId);
00296 typename OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it =
00297 topics_.find(topic_name);
00298 if (top_it != topics_.end()) {
00299 match_endpoints(publicationId, top_it->second, true );
00300 top_it->second.endpoints_.erase(publicationId);
00301 }
00302 } else {
00303 ACE_DEBUG((LM_ERROR,
00304 ACE_TEXT("(%P|%t) ERROR: EndpointManager::remove_publication - ")
00305 ACE_TEXT("Failed to publish dispose msg\n")));
00306 }
00307 }
00308 }
00309
00310 virtual bool update_publication_qos(const DCPS::RepoId& publicationId,
00311 const DDS::DataWriterQos& qos,
00312 const DDS::PublisherQos& publisherQos) = 0;
00313
00314 DCPS::RepoId add_subscription(const DCPS::RepoId& topicId,
00315 DCPS::DataReaderCallbacks* subscription,
00316 const DDS::DataReaderQos& qos,
00317 const DCPS::TransportLocatorSeq& transInfo,
00318 const DDS::SubscriberQos& subscriberQos,
00319 const char* filterClassName,
00320 const char* filterExpr,
00321 const DDS::StringSeq& params)
00322 {
00323 ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, RepoId());
00324 RepoId rid = participant_id_;
00325 assign_subscription_key(rid, topicId, qos);
00326 LocalSubscription& sb = local_subscriptions_[rid];
00327 sb.topic_id_ = topicId;
00328 sb.subscription_ = subscription;
00329 sb.qos_ = qos;
00330 sb.trans_info_ = transInfo;
00331 sb.subscriber_qos_ = subscriberQos;
00332 sb.filterProperties.filterClassName = filterClassName;
00333 sb.filterProperties.filterExpression = filterExpr;
00334 sb.filterProperties.expressionParameters = params;
00335
00336 TopicDetails& td = topics_[topic_names_[topicId]];
00337 td.endpoints_.insert(rid);
00338
00339 if (DDS::RETCODE_OK != add_subscription_i(rid, sb)) {
00340 return RepoId();
00341 }
00342
00343 if (DDS::RETCODE_OK != write_subscription_data(rid, sb)) {
00344 return RepoId();
00345 }
00346
00347 if (DCPS::DCPS_debug_level > 3) {
00348 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) EndpointManager::add_subscription - ")
00349 ACE_TEXT("calling match_endpoints\n")));
00350 }
00351 match_endpoints(rid, td);
00352
00353 return rid;
00354 }
00355
00356 void remove_subscription(const DCPS::RepoId& subscriptionId)
00357 {
00358 ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00359 LocalSubscriptionIter iter = local_subscriptions_.find(subscriptionId);
00360 if (iter != local_subscriptions_.end()) {
00361 if (DDS::RETCODE_OK == remove_subscription_i(subscriptionId)
00362 ) {
00363 OPENDDS_STRING topic_name = topic_names_[iter->second.topic_id_];
00364 local_subscriptions_.erase(subscriptionId);
00365 typename OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it =
00366 topics_.find(topic_name);
00367 if (top_it != topics_.end()) {
00368 match_endpoints(subscriptionId, top_it->second, true );
00369 top_it->second.endpoints_.erase(subscriptionId);
00370 }
00371 } else {
00372 ACE_DEBUG((LM_ERROR,
00373 ACE_TEXT("(%P|%t) ERROR: EndpointManager::remove_subscription - ")
00374 ACE_TEXT("Failed to publish dispose msg\n")));
00375 }
00376 }
00377 }
00378
00379 virtual bool update_subscription_qos(const DCPS::RepoId& subscriptionId,
00380 const DDS::DataReaderQos& qos,
00381 const DDS::SubscriberQos& subscriberQos) = 0;
00382
00383 virtual bool update_subscription_params(const DCPS::RepoId& subId,
00384 const DDS::StringSeq& params) = 0;
00385
00386 virtual void association_complete(const DCPS::RepoId& localId,
00387 const DCPS::RepoId& remoteId) = 0;
00388
00389 virtual bool disassociate(const DiscoveredParticipantData& pdata) = 0;
00390
00391 protected:
00392 struct LocalEndpoint {
00393 LocalEndpoint() : topic_id_(DCPS::GUID_UNKNOWN), sequence_(DCPS::SequenceNumber::SEQUENCENUMBER_UNKNOWN()) {}
00394 DCPS::RepoId topic_id_;
00395 DCPS::TransportLocatorSeq trans_info_;
00396 RepoIdSet matched_endpoints_;
00397 DCPS::SequenceNumber sequence_;
00398 RepoIdSet remote_opendds_associations_;
00399 };
00400
00401 struct LocalPublication : LocalEndpoint {
00402 DCPS::DataWriterCallbacks* publication_;
00403 DDS::DataWriterQos qos_;
00404 DDS::PublisherQos publisher_qos_;
00405 };
00406
00407 struct LocalSubscription : LocalEndpoint {
00408 DCPS::DataReaderCallbacks* subscription_;
00409 DDS::DataReaderQos qos_;
00410 DDS::SubscriberQos subscriber_qos_;
00411 OpenDDS::DCPS::ContentFilterProperty_t filterProperties;
00412 };
00413
00414 typedef OPENDDS_MAP_CMP(DDS::BuiltinTopicKey_t, DCPS::RepoId,
00415 DCPS::BuiltinTopicKeyLess) BitKeyMap;
00416
00417 typedef OPENDDS_MAP_CMP(DCPS::RepoId, LocalPublication,
00418 DCPS::GUID_tKeyLessThan) LocalPublicationMap;
00419 typedef typename LocalPublicationMap::iterator LocalPublicationIter;
00420 typedef typename LocalPublicationMap::const_iterator LocalPublicationCIter;
00421
00422 typedef OPENDDS_MAP_CMP(DCPS::RepoId, LocalSubscription,
00423 DCPS::GUID_tKeyLessThan) LocalSubscriptionMap;
00424 typedef typename LocalSubscriptionMap::iterator LocalSubscriptionIter;
00425 typedef typename LocalSubscriptionMap::const_iterator LocalSubscriptionCIter;
00426
00427 typedef typename OPENDDS_MAP_CMP(DCPS::RepoId, OPENDDS_STRING, DCPS::GUID_tKeyLessThan) TopicNameMap;
00428
00429 static const char* get_topic_name(const DiscoveredPublication& pub) {
00430 return pub.writer_data_.ddsPublicationData.topic_name;
00431 }
00432 static const char* get_topic_name(const DiscoveredSubscription& sub) {
00433 return sub.reader_data_.ddsSubscriptionData.topic_name;
00434 }
00435 static DDS::BuiltinTopicKey_t get_key(const DiscoveredPublication& pub) {
00436 return pub.writer_data_.ddsPublicationData.key;
00437 }
00438 static DDS::BuiltinTopicKey_t get_key(const DiscoveredSubscription& sub) {
00439 return sub.reader_data_.ddsSubscriptionData.key;
00440 }
00441
00442 virtual void remove_from_bit_i(const DiscoveredPublication& ) { }
00443 virtual void remove_from_bit_i(const DiscoveredSubscription& ) { }
00444
00445 virtual void assign_publication_key(RepoId& rid,
00446 const RepoId& topicId,
00447 const DDS::DataWriterQos& ) {
00448 rid.entityId.entityKind =
00449 has_dcps_key(topicId)
00450 ? DCPS::ENTITYKIND_USER_WRITER_WITH_KEY
00451 : DCPS::ENTITYKIND_USER_WRITER_NO_KEY;
00452 assign(rid.entityId.entityKey, publication_counter_++);
00453 }
00454 virtual void assign_subscription_key(RepoId& rid,
00455 const RepoId& topicId,
00456 const DDS::DataReaderQos& ) {
00457 rid.entityId.entityKind =
00458 has_dcps_key(topicId)
00459 ? DCPS::ENTITYKIND_USER_READER_WITH_KEY
00460 : DCPS::ENTITYKIND_USER_READER_NO_KEY;
00461 assign(rid.entityId.entityKey, subscription_counter_++);
00462 }
00463 virtual void assign_topic_key(RepoId& guid) {
00464 assign(guid.entityId.entityKey, topic_counter_++);
00465
00466 if (topic_counter_ == 0x1000000) {
00467 ACE_DEBUG((LM_ERROR,
00468 ACE_TEXT("(%P|%t) ERROR: EndpointManager::make_topic_guid: ")
00469 ACE_TEXT("Exceeded Maximum number of topic entity keys!")
00470 ACE_TEXT("Next key will be a duplicate!\n")));
00471 topic_counter_ = 0;
00472 }
00473 }
00474
00475 virtual DDS::ReturnCode_t add_publication_i(const DCPS::RepoId& ,
00476 LocalPublication& ) { return DDS::RETCODE_OK; }
00477 virtual DDS::ReturnCode_t write_publication_data(const DCPS::RepoId& ,
00478 LocalPublication& ,
00479 const DCPS::RepoId& reader = DCPS::GUID_UNKNOWN) { ACE_UNUSED_ARG(reader); return DDS::RETCODE_OK; }
00480 virtual DDS::ReturnCode_t remove_publication_i(const RepoId& publicationId) = 0;
00481
00482 virtual DDS::ReturnCode_t add_subscription_i(const DCPS::RepoId& ,
00483 LocalSubscription& ) { return DDS::RETCODE_OK; };
00484 virtual DDS::ReturnCode_t write_subscription_data(const DCPS::RepoId& ,
00485 LocalSubscription& ,
00486 const DCPS::RepoId& reader = DCPS::GUID_UNKNOWN) { ACE_UNUSED_ARG(reader); return DDS::RETCODE_OK; }
00487 virtual DDS::ReturnCode_t remove_subscription_i(const RepoId& subscriptionId) = 0;
00488
00489 void match_endpoints(DCPS::RepoId repoId, const TopicDetails& td,
00490 bool remove = false)
00491 {
00492 const bool reader = repoId.entityId.entityKind & 4;
00493
00494 RepoIdSet endpoints_copy = td.endpoints_;
00495
00496 for (RepoIdSet::const_iterator iter = endpoints_copy.begin();
00497 iter != endpoints_copy.end(); ++iter) {
00498
00499 if (bool(iter->entityId.entityKind & 4) != reader) {
00500 if (remove) {
00501 remove_assoc(*iter, repoId);
00502 } else {
00503 match(reader ? *iter : repoId, reader ? repoId : *iter);
00504 }
00505 }
00506 }
00507 }
00508
00509 void
00510 remove_assoc(const RepoId& remove_from,
00511 const RepoId& removing)
00512 {
00513 const bool reader = remove_from.entityId.entityKind & 4;
00514 if (reader) {
00515 const LocalSubscriptionIter lsi = local_subscriptions_.find(remove_from);
00516 if (lsi != local_subscriptions_.end()) {
00517 lsi->second.matched_endpoints_.erase(removing);
00518 DCPS::WriterIdSeq writer_seq(1);
00519 writer_seq.length(1);
00520 writer_seq[0] = removing;
00521 lsi->second.remote_opendds_associations_.erase(removing);
00522 lsi->second.subscription_->remove_associations(writer_seq,
00523 false );
00524
00525 write_subscription_data(remove_from, lsi->second);
00526 }
00527
00528 } else {
00529 const LocalPublicationIter lpi = local_publications_.find(remove_from);
00530 if (lpi != local_publications_.end()) {
00531 lpi->second.matched_endpoints_.erase(removing);
00532 DCPS::ReaderIdSeq reader_seq(1);
00533 reader_seq.length(1);
00534 reader_seq[0] = removing;
00535 lpi->second.remote_opendds_associations_.erase(removing);
00536 lpi->second.publication_->remove_associations(reader_seq,
00537 false );
00538 }
00539 }
00540 }
00541
00542 void
00543 match(const RepoId& writer, const RepoId& reader)
00544 {
00545
00546
00547
00548
00549 DDS::DataWriterQos tempDwQos;
00550 DDS::PublisherQos tempPubQos;
00551 DDS::DataReaderQos tempDrQos;
00552 DDS::SubscriberQos tempSubQos;
00553 ContentFilterProperty_t tempCfp;
00554
00555
00556 const DDS::DataWriterQos* dwQos = 0;
00557 const DDS::PublisherQos* pubQos = 0;
00558 DCPS::TransportLocatorSeq* wTls = 0;
00559
00560 const LocalPublicationIter lpi = local_publications_.find(writer);
00561 DiscoveredPublicationIter dpi;
00562 bool writer_local = false, already_matched = false;
00563 if (lpi != local_publications_.end()) {
00564 writer_local = true;
00565 dwQos = &lpi->second.qos_;
00566 pubQos = &lpi->second.publisher_qos_;
00567 wTls = &lpi->second.trans_info_;
00568 already_matched = lpi->second.matched_endpoints_.count(reader);
00569 } else if ((dpi = discovered_publications_.find(writer))
00570 != discovered_publications_.end()) {
00571 wTls = &dpi->second.writer_data_.writerProxy.allLocators;
00572 } else {
00573 return;
00574 }
00575
00576
00577 const DDS::DataReaderQos* drQos = 0;
00578 const DDS::SubscriberQos* subQos = 0;
00579 DCPS::TransportLocatorSeq* rTls = 0;
00580 const ContentFilterProperty_t* cfProp = 0;
00581
00582 const LocalSubscriptionIter lsi = local_subscriptions_.find(reader);
00583 DiscoveredSubscriptionIter dsi;
00584 bool reader_local = false;
00585 if (lsi != local_subscriptions_.end()) {
00586 reader_local = true;
00587 drQos = &lsi->second.qos_;
00588 subQos = &lsi->second.subscriber_qos_;
00589 rTls = &lsi->second.trans_info_;
00590 if (lsi->second.filterProperties.filterExpression[0] != 0) {
00591 tempCfp.filterExpression = lsi->second.filterProperties.filterExpression;
00592 tempCfp.expressionParameters = lsi->second.filterProperties.expressionParameters;
00593 }
00594 cfProp = &tempCfp;
00595 if (!already_matched) {
00596 already_matched = lsi->second.matched_endpoints_.count(writer);
00597 }
00598 } else if ((dsi = discovered_subscriptions_.find(reader))
00599 != discovered_subscriptions_.end()) {
00600 if (!writer_local) {
00601
00602 return;
00603 }
00604 rTls = &dsi->second.reader_data_.readerProxy.allLocators;
00605
00606 populate_transport_locator_sequence(rTls, dsi, reader);
00607
00608 const DDS::SubscriptionBuiltinTopicData& bit =
00609 dsi->second.reader_data_.ddsSubscriptionData;
00610 tempDrQos.durability = bit.durability;
00611 tempDrQos.deadline = bit.deadline;
00612 tempDrQos.latency_budget = bit.latency_budget;
00613 tempDrQos.liveliness = bit.liveliness;
00614 tempDrQos.reliability = bit.reliability;
00615 tempDrQos.destination_order = bit.destination_order;
00616 tempDrQos.history = TheServiceParticipant->initial_HistoryQosPolicy();
00617 tempDrQos.resource_limits =
00618 TheServiceParticipant->initial_ResourceLimitsQosPolicy();
00619 tempDrQos.user_data = bit.user_data;
00620 tempDrQos.ownership = bit.ownership;
00621 tempDrQos.time_based_filter = bit.time_based_filter;
00622 tempDrQos.reader_data_lifecycle =
00623 TheServiceParticipant->initial_ReaderDataLifecycleQosPolicy();
00624 drQos = &tempDrQos;
00625 tempSubQos.presentation = bit.presentation;
00626 tempSubQos.partition = bit.partition;
00627 tempSubQos.group_data = bit.group_data;
00628 tempSubQos.entity_factory =
00629 TheServiceParticipant->initial_EntityFactoryQosPolicy();
00630 subQos = &tempSubQos;
00631 cfProp = &dsi->second.reader_data_.contentFilterProperty;
00632 } else {
00633 return;
00634 }
00635
00636
00637
00638 if (!writer_local) {
00639 const DDS::PublicationBuiltinTopicData& bit =
00640 dpi->second.writer_data_.ddsPublicationData;
00641 tempDwQos.durability = bit.durability;
00642 tempDwQos.durability_service = bit.durability_service;
00643 tempDwQos.deadline = bit.deadline;
00644 tempDwQos.latency_budget = bit.latency_budget;
00645 tempDwQos.liveliness = bit.liveliness;
00646 tempDwQos.reliability = bit.reliability;
00647 tempDwQos.destination_order = bit.destination_order;
00648 tempDwQos.history = TheServiceParticipant->initial_HistoryQosPolicy();
00649 tempDwQos.resource_limits =
00650 TheServiceParticipant->initial_ResourceLimitsQosPolicy();
00651 tempDwQos.transport_priority =
00652 TheServiceParticipant->initial_TransportPriorityQosPolicy();
00653 tempDwQos.lifespan = bit.lifespan;
00654 tempDwQos.user_data = bit.user_data;
00655 tempDwQos.ownership = bit.ownership;
00656 tempDwQos.ownership_strength = bit.ownership_strength;
00657 tempDwQos.writer_data_lifecycle =
00658 TheServiceParticipant->initial_WriterDataLifecycleQosPolicy();
00659 dwQos = &tempDwQos;
00660 tempPubQos.presentation = bit.presentation;
00661 tempPubQos.partition = bit.partition;
00662 tempPubQos.group_data = bit.group_data;
00663 tempPubQos.entity_factory =
00664 TheServiceParticipant->initial_EntityFactoryQosPolicy();
00665 pubQos = &tempPubQos;
00666
00667 populate_transport_locator_sequence(wTls, dpi, writer);
00668 }
00669
00670
00671
00672
00673 ACE_Reverse_Lock<ACE_Thread_Mutex> rev_lock(lock_);
00674
00675
00676
00677
00678
00679 DCPS::DataWriterCallbacks* dwr = 0;
00680 DCPS::DataReaderCallbacks* drr = 0;
00681 if (writer_local) {
00682 dwr = lpi->second.publication_;
00683 }
00684 if (reader_local) {
00685 drr = lsi->second.subscription_;
00686 }
00687
00688 DCPS::IncompatibleQosStatus writerStatus = {0, 0, 0, DDS::QosPolicyCountSeq()};
00689 DCPS::IncompatibleQosStatus readerStatus = {0, 0, 0, DDS::QosPolicyCountSeq()};
00690
00691 if (DCPS::compatibleQOS(&writerStatus, &readerStatus, *wTls, *rTls,
00692 dwQos, drQos, pubQos, subQos)) {
00693 if (!writer_local) {
00694 RepoId writer_participant = writer;
00695 writer_participant.entityId = ENTITYID_PARTICIPANT;
00696 if (defer_writer(writer, writer_participant)) {
00697 return;
00698 }
00699 }
00700 if (!reader_local) {
00701 RepoId reader_participant = reader;
00702 reader_participant.entityId = ENTITYID_PARTICIPANT;
00703 if (defer_reader(reader, reader_participant)) {
00704 return;
00705 }
00706 }
00707
00708 bool call_writer = false, call_reader = false;
00709 if (writer_local) {
00710 call_writer = lpi->second.matched_endpoints_.insert(reader).second;
00711 }
00712 if (reader_local) {
00713 call_reader = lsi->second.matched_endpoints_.insert(writer).second;
00714 }
00715 if (!call_writer && !call_reader) {
00716 return;
00717 }
00718
00719 #ifdef __SUNPRO_CC
00720 DCPS::ReaderAssociation ra;
00721 ra.readerTransInfo = *rTls;
00722 ra.readerId = reader;
00723 ra.subQos = *subQos;
00724 ra.readerQos = *drQos;
00725 ra.filterClassName = cfProp->filterClassName;
00726 ra.filterExpression = cfProp->filterExpression;
00727 ra.exprParams = cfProp->expressionParameters;
00728 DCPS::WriterAssociation wa;
00729 wa.writerTransInfo = *wTls;
00730 wa.writerId = writer;
00731 wa.pubQos = *pubQos;
00732 wa.writerQos = *dwQos;
00733 #else
00734 const DCPS::ReaderAssociation ra =
00735 {*rTls, reader, *subQos, *drQos,
00736 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00737 cfProp->filterClassName, cfProp->filterExpression,
00738 #else
00739 "", "",
00740 #endif
00741 cfProp->expressionParameters};
00742
00743 const DCPS::WriterAssociation wa = {*wTls, writer, *pubQos, *dwQos};
00744 #endif
00745
00746 ACE_GUARD(ACE_Reverse_Lock<ACE_Thread_Mutex>, rg, rev_lock);
00747 static const bool writer_active = true;
00748
00749 if (call_writer) {
00750 if (DCPS::DCPS_debug_level > 3) {
00751 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) EndpointManager::match - ")
00752 ACE_TEXT("adding writer association\n")));
00753 }
00754 DcpsUpcalls thr(drr, reader, wa, !writer_active, dwr);
00755 if (call_reader) {
00756 thr.activate();
00757 }
00758 dwr->add_association(writer, ra, writer_active);
00759 if (call_reader) {
00760 thr.writer_done();
00761 }
00762
00763 } else if (call_reader) {
00764 if (DCPS::DCPS_debug_level > 3) {
00765 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) EndpointManager::match - ")
00766 ACE_TEXT("adding reader association\n")));
00767 }
00768 drr->add_association(reader, wa, !writer_active);
00769 }
00770
00771
00772 if (call_writer && !call_reader && !is_opendds(reader)) {
00773 if (DCPS::DCPS_debug_level > 3) {
00774 ACE_DEBUG((LM_DEBUG,
00775 ACE_TEXT("(%P|%t) EndpointManager::match - ")
00776 ACE_TEXT("calling writer association_complete\n")));
00777 }
00778 dwr->association_complete(reader);
00779 }
00780
00781 } else if (already_matched) {
00782 if (writer_local) {
00783 lpi->second.matched_endpoints_.erase(reader);
00784 lpi->second.remote_opendds_associations_.erase(reader);
00785 }
00786 if (reader_local) {
00787 lsi->second.matched_endpoints_.erase(writer);
00788 lsi->second.remote_opendds_associations_.erase(writer);
00789 }
00790 ACE_GUARD(ACE_Reverse_Lock<ACE_Thread_Mutex>, rg, rev_lock);
00791 if (writer_local) {
00792 DCPS::ReaderIdSeq reader_seq(1);
00793 reader_seq.length(1);
00794 reader_seq[0] = reader;
00795 dwr->remove_associations(reader_seq, false );
00796 }
00797 if (reader_local) {
00798 DCPS::WriterIdSeq writer_seq(1);
00799 writer_seq.length(1);
00800 writer_seq[0] = writer;
00801 drr->remove_associations(writer_seq, false );
00802 }
00803
00804 } else {
00805 ACE_GUARD(ACE_Reverse_Lock< ACE_Thread_Mutex>, rg, rev_lock);
00806 if (writer_local && writerStatus.count_since_last_send) {
00807 if (DCPS::DCPS_debug_level > 3) {
00808 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) EndpointManager::match - ")
00809 ACE_TEXT("writer incompatible\n")));
00810 }
00811 dwr->update_incompatible_qos(writerStatus);
00812 }
00813 if (reader_local && readerStatus.count_since_last_send) {
00814 if (DCPS::DCPS_debug_level > 3) {
00815 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) EndpointManager::match - ")
00816 ACE_TEXT("reader incompatible\n")));
00817 }
00818 drr->update_incompatible_qos(readerStatus);
00819 }
00820 }
00821 }
00822
00823 static bool is_opendds(const GUID_t& endpoint)
00824 {
00825 return !std::memcmp(endpoint.guidPrefix, DCPS::VENDORID_OCI,
00826 sizeof(DCPS::VENDORID_OCI));
00827 }
00828
00829 virtual bool shutting_down() const = 0;
00830
00831 virtual void populate_transport_locator_sequence(DCPS::TransportLocatorSeq*& tls,
00832 DiscoveredSubscriptionIter& iter,
00833 const RepoId& reader) = 0;
00834
00835 virtual void populate_transport_locator_sequence(DCPS::TransportLocatorSeq*& tls,
00836 DiscoveredPublicationIter& iter,
00837 const RepoId& reader) = 0;
00838
00839 virtual bool defer_writer(const RepoId& writer,
00840 const RepoId& writer_participant) = 0;
00841
00842 virtual bool defer_reader(const RepoId& writer,
00843 const RepoId& writer_participant) = 0;
00844
00845 void remove_from_bit(const DiscoveredPublication& pub)
00846 {
00847 pub_key_to_id_.erase(get_key(pub));
00848 remove_from_bit_i(pub);
00849 }
00850
00851 void remove_from_bit(const DiscoveredSubscription& sub)
00852 {
00853 sub_key_to_id_.erase(get_key(sub));
00854 remove_from_bit_i(sub);
00855 }
00856
00857 RepoId make_topic_guid()
00858 {
00859 RepoId guid;
00860 guid = participant_id_;
00861 guid.entityId.entityKind = DCPS::ENTITYKIND_OPENDDS_TOPIC;
00862 assign_topic_key(guid);
00863 return guid;
00864 }
00865
00866 bool has_dcps_key(const DCPS::RepoId& topicId) const
00867 {
00868 typedef OPENDDS_MAP_CMP(RepoId, OPENDDS_STRING, DCPS::GUID_tKeyLessThan) TNMap;
00869 TNMap::const_iterator tn = topic_names_.find(topicId);
00870 if (tn == topic_names_.end()) return false;
00871
00872 typedef OPENDDS_MAP(OPENDDS_STRING, TopicDetails) TDMap;
00873 typename TDMap::const_iterator td = topics_.find(tn->second);
00874 if (td == topics_.end()) return false;
00875
00876 return td->second.has_dcps_key_;
00877 }
00878
00879 void
00880 increment_key(DDS::BuiltinTopicKey_t& key)
00881 {
00882 for (int idx = 0; idx < 3; ++idx) {
00883 CORBA::ULong ukey = static_cast<CORBA::ULong>(key.value[idx]);
00884 if (ukey == 0xFFFFFFFF) {
00885 key.value[idx] = 0;
00886 } else {
00887 ++ukey;
00888 key.value[idx] = ukey;
00889 return;
00890 }
00891 }
00892 ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) EndpointManager::increment_key - ")
00893 ACE_TEXT("ran out of builtin topic keys\n")));
00894 }
00895
00896 ACE_Thread_Mutex& lock_;
00897 DCPS::RepoId participant_id_;
00898 BitKeyMap pub_key_to_id_, sub_key_to_id_;
00899 RepoIdSet ignored_guids_;
00900 unsigned int publication_counter_, subscription_counter_, topic_counter_;
00901 LocalPublicationMap local_publications_;
00902 LocalSubscriptionMap local_subscriptions_;
00903 DiscoveredPublicationMap discovered_publications_;
00904 DiscoveredSubscriptionMap discovered_subscriptions_;
00905 OPENDDS_MAP(OPENDDS_STRING, TopicDetails) topics_;
00906 TopicNameMap topic_names_;
00907 OPENDDS_SET(OPENDDS_STRING) ignored_topics_;
00908 DDS::BuiltinTopicKey_t pub_bit_key_, sub_bit_key_;
00909 };
00910
00911 template <typename EndpointManagerType>
00912 class LocalParticipant : public DCPS::RcObject<ACE_SYNCH_MUTEX> {
00913 public:
00914 typedef typename EndpointManagerType::DiscoveredParticipantData DiscoveredParticipantData;
00915 typedef typename EndpointManagerType::TopicDetails TopicDetails;
00916
00917 LocalParticipant (const DDS::DomainParticipantQos& qos)
00918 : qos_(qos)
00919 { }
00920
00921 virtual ~LocalParticipant() { }
00922
00923 DCPS::RepoId bit_key_to_repo_id(const char* bit_topic_name,
00924 const DDS::BuiltinTopicKey_t& key)
00925 {
00926 if (0 == std::strcmp(bit_topic_name, DCPS::BUILT_IN_PARTICIPANT_TOPIC)) {
00927 RepoId guid;
00928 std::memcpy(guid.guidPrefix, key.value, sizeof(DDS::BuiltinTopicKeyValue));
00929 guid.entityId = ENTITYID_PARTICIPANT;
00930 return guid;
00931
00932 } else {
00933 return endpoint_manager().bit_key_to_repo_id(bit_topic_name, key);
00934 }
00935 }
00936
00937 void ignore_domain_participant(const RepoId& ignoreId)
00938 {
00939 ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00940 endpoint_manager().ignore(ignoreId);
00941
00942 const DiscoveredParticipantIter iter = participants_.find(ignoreId);
00943 if (iter != participants_.end()) {
00944 remove_discovered_participant(iter);
00945 }
00946 }
00947
00948 bool
00949 update_domain_participant_qos(const DDS::DomainParticipantQos& qos)
00950 {
00951 ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, false);
00952 qos_ = qos;
00953 return true;
00954 }
00955
00956 DCPS::TopicStatus
00957 assert_topic(DCPS::RepoId_out topicId, const char* topicName,
00958 const char* dataTypeName, const DDS::TopicQos& qos,
00959 bool hasDcpsKey)
00960 {
00961 if (std::strlen(topicName) > 256 || std::strlen(dataTypeName) > 256) {
00962 if (DCPS::DCPS_debug_level) {
00963 ACE_DEBUG((LM_ERROR, ACE_TEXT("(%P|%t) ERROR LocalParticipant::assert_topic() - ")
00964 ACE_TEXT("topic or type name length limit (256) exceeded\n")));
00965 }
00966 return DCPS::PRECONDITION_NOT_MET;
00967 }
00968
00969 return endpoint_manager().assert_topic(topicId, topicName, dataTypeName, qos, hasDcpsKey);
00970 }
00971
00972 DCPS::TopicStatus
00973 remove_topic(const RepoId& topicId, OPENDDS_STRING& name)
00974 {
00975 return endpoint_manager().remove_topic(topicId, name);
00976 }
00977
00978 void
00979 ignore_topic(const RepoId& ignoreId)
00980 {
00981 ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00982 endpoint_manager().ignore(ignoreId);
00983 }
00984
00985 bool
00986 update_topic_qos(const RepoId& topicId, const DDS::TopicQos& qos,
00987 OPENDDS_STRING& name)
00988 {
00989 return endpoint_manager().update_topic_qos(topicId, qos, name);
00990 }
00991
00992 RepoId
00993 add_publication(const RepoId& topicId,
00994 DCPS::DataWriterCallbacks* publication,
00995 const DDS::DataWriterQos& qos,
00996 const DCPS::TransportLocatorSeq& transInfo,
00997 const DDS::PublisherQos& publisherQos)
00998 {
00999 return endpoint_manager().add_publication(topicId, publication, qos,
01000 transInfo, publisherQos);
01001 }
01002
01003 void
01004 remove_publication(const RepoId& publicationId)
01005 {
01006 endpoint_manager().remove_publication(publicationId);
01007 }
01008
01009 void
01010 ignore_publication(const RepoId& ignoreId)
01011 {
01012 ACE_GUARD(ACE_Thread_Mutex, g, lock_);
01013 return endpoint_manager().ignore(ignoreId);
01014 }
01015
01016 bool
01017 update_publication_qos(const RepoId& publicationId,
01018 const DDS::DataWriterQos& qos,
01019 const DDS::PublisherQos& publisherQos)
01020 {
01021 return endpoint_manager().update_publication_qos(publicationId, qos, publisherQos);
01022 }
01023
01024 RepoId
01025 add_subscription(const RepoId& topicId,
01026 DCPS::DataReaderCallbacks* subscription,
01027 const DDS::DataReaderQos& qos,
01028 const DCPS::TransportLocatorSeq& transInfo,
01029 const DDS::SubscriberQos& subscriberQos,
01030 const char* filterClassName,
01031 const char* filterExpr,
01032 const DDS::StringSeq& params)
01033 {
01034 return endpoint_manager().add_subscription(topicId, subscription, qos, transInfo,
01035 subscriberQos, filterClassName, filterExpr, params);
01036 }
01037
01038 void
01039 remove_subscription(const RepoId& subscriptionId)
01040 {
01041 endpoint_manager().remove_subscription(subscriptionId);
01042 }
01043
01044 void
01045 ignore_subscription(const RepoId& ignoreId)
01046 {
01047 ACE_GUARD(ACE_Thread_Mutex, g, lock_);
01048 return endpoint_manager().ignore(ignoreId);
01049 }
01050
01051 bool
01052 update_subscription_qos(const RepoId& subscriptionId,
01053 const DDS::DataReaderQos& qos,
01054 const DDS::SubscriberQos& subscriberQos)
01055 {
01056 return endpoint_manager().update_subscription_qos(subscriptionId, qos, subscriberQos);
01057 }
01058
01059 bool
01060 update_subscription_params(const RepoId& subId,
01061 const DDS::StringSeq& params)
01062 {
01063 return endpoint_manager().update_subscription_params(subId, params);
01064 }
01065
01066 void
01067 association_complete(const RepoId& localId, const RepoId& remoteId)
01068 {
01069 endpoint_manager().association_complete(localId, remoteId);
01070 }
01071
01072 DDS::Subscriber_var bit_subscriber() const { return bit_subscriber_; }
01073
01074 protected:
01075
01076 struct DiscoveredParticipant {
01077 DiscoveredParticipant() : bit_ih_(0) {}
01078 DiscoveredParticipant(const DiscoveredParticipantData& p,
01079 const ACE_Time_Value& t)
01080 : pdata_(p), last_seen_(t), bit_ih_(DDS::HANDLE_NIL) {}
01081
01082 DiscoveredParticipantData pdata_;
01083 ACE_Time_Value last_seen_;
01084 DDS::InstanceHandle_t bit_ih_;
01085 };
01086 typedef OPENDDS_MAP_CMP(DCPS::RepoId, DiscoveredParticipant,
01087 DCPS::GUID_tKeyLessThan) DiscoveredParticipantMap;
01088 typedef typename DiscoveredParticipantMap::iterator DiscoveredParticipantIter;
01089
01090 virtual EndpointManagerType& endpoint_manager() = 0;
01091
01092 void remove_discovered_participant(DiscoveredParticipantIter iter)
01093 {
01094 bool removed = endpoint_manager().disassociate(iter->second.pdata_);
01095 if (removed) {
01096 #ifndef DDS_HAS_MINIMUM_BIT
01097 DDS::ParticipantBuiltinTopicDataDataReaderImpl* bit = part_bit();
01098
01099 if (bit && iter->second.bit_ih_ != DDS::HANDLE_NIL) {
01100 bit->set_instance_state(iter->second.bit_ih_,
01101 DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE);
01102 }
01103 #endif
01104 if (DCPS::DCPS_debug_level > 3) {
01105 DCPS::GuidConverter conv(iter->first);
01106 ACE_DEBUG((LM_INFO, ACE_TEXT("(%P|%t) LocalParticipant::remove_discovered_participant")
01107 ACE_TEXT(" - erasing %C\n"), OPENDDS_STRING(conv).c_str()));
01108 }
01109 participants_.erase(iter);
01110 }
01111 }
01112
01113 #ifndef DDS_HAS_MINIMUM_BIT
01114 DDS::ParticipantBuiltinTopicDataDataReaderImpl* part_bit()
01115 {
01116 if (!bit_subscriber_.in())
01117 return 0;
01118
01119 DDS::DataReader_var d =
01120 bit_subscriber_->lookup_datareader(DCPS::BUILT_IN_PARTICIPANT_TOPIC);
01121 return dynamic_cast<DDS::ParticipantBuiltinTopicDataDataReaderImpl*>(d.in());
01122 }
01123 #endif
01124
01125 ACE_Thread_Mutex lock_;
01126 DDS::Subscriber_var bit_subscriber_;
01127 DDS::DomainParticipantQos qos_;
01128 DiscoveredParticipantMap participants_;
01129 };
01130
01131 template<typename Participant>
01132 class PeerDiscovery : public Discovery {
01133 public:
01134 typedef typename Participant::TopicDetails TopicDetails;
01135
01136 explicit PeerDiscovery(const RepoKey& key) : Discovery(key) { }
01137
01138 ~PeerDiscovery() {
01139 reactor_runner_.end();
01140 }
01141
01142 virtual DDS::Subscriber_ptr init_bit(DomainParticipantImpl* participant) {
01143 using namespace DCPS;
01144 if (create_bit_topics(participant) != DDS::RETCODE_OK) {
01145 return 0;
01146 }
01147
01148 DDS::Subscriber_var bit_subscriber =
01149 participant->create_subscriber(SUBSCRIBER_QOS_DEFAULT,
01150 DDS::SubscriberListener::_nil(),
01151 DEFAULT_STATUS_MASK);
01152 SubscriberImpl* sub = dynamic_cast<SubscriberImpl*>(bit_subscriber.in());
01153
01154 DDS::DataReaderQos dr_qos;
01155 sub->get_default_datareader_qos(dr_qos);
01156 dr_qos.durability.kind = DDS::TRANSIENT_LOCAL_DURABILITY_QOS;
01157
01158 #ifndef DDS_HAS_MINIMUM_BIT
01159 DDS::TopicDescription_var bit_part_topic =
01160 participant->lookup_topicdescription(BUILT_IN_PARTICIPANT_TOPIC);
01161 create_bit_dr(bit_part_topic, BUILT_IN_PARTICIPANT_TOPIC_TYPE,
01162 sub, dr_qos);
01163
01164 DDS::TopicDescription_var bit_topic_topic =
01165 participant->lookup_topicdescription(BUILT_IN_TOPIC_TOPIC);
01166 create_bit_dr(bit_topic_topic, BUILT_IN_TOPIC_TOPIC_TYPE,
01167 sub, dr_qos);
01168
01169 DDS::TopicDescription_var bit_pub_topic =
01170 participant->lookup_topicdescription(BUILT_IN_PUBLICATION_TOPIC);
01171 create_bit_dr(bit_pub_topic, BUILT_IN_PUBLICATION_TOPIC_TYPE,
01172 sub, dr_qos);
01173
01174 DDS::TopicDescription_var bit_sub_topic =
01175 participant->lookup_topicdescription(BUILT_IN_SUBSCRIPTION_TOPIC);
01176 create_bit_dr(bit_sub_topic, BUILT_IN_SUBSCRIPTION_TOPIC_TYPE,
01177 sub, dr_qos);
01178 #endif
01179
01180 get_part(participant->get_domain_id(), participant->get_id())->init_bit(bit_subscriber);
01181
01182 return bit_subscriber._retn();
01183 }
01184
01185 virtual void fini_bit(DCPS::DomainParticipantImpl* participant)
01186 {
01187 get_part(participant->get_domain_id(), participant->get_id())->fini_bit();
01188 }
01189
01190 virtual OpenDDS::DCPS::RepoId bit_key_to_repo_id(DCPS::DomainParticipantImpl* participant,
01191 const char* bit_topic_name,
01192 const DDS::BuiltinTopicKey_t& key) const
01193 {
01194 return get_part(participant->get_domain_id(), participant->get_id())
01195 ->bit_key_to_repo_id(bit_topic_name, key);
01196 }
01197
01198 virtual bool attach_participant(DDS::DomainId_t ,
01199 const OpenDDS::DCPS::RepoId& )
01200 {
01201 return false;
01202 }
01203
01204 virtual bool remove_domain_participant(DDS::DomainId_t domain_id,
01205 const OpenDDS::DCPS::RepoId& participantId)
01206 {
01207
01208
01209 ParticipantHandle participant;
01210 ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, false);
01211 typename DomainParticipantMap::iterator domain = participants_.find(domain_id);
01212 if (domain == participants_.end()) {
01213 return false;
01214 }
01215 typename ParticipantMap::iterator part = domain->second.find(participantId);
01216 if (part == domain->second.end()) {
01217 return false;
01218 }
01219 participant = part->second;
01220 domain->second.erase(part);
01221 if (domain->second.empty()) {
01222 participants_.erase(domain);
01223 }
01224
01225 return true;
01226 }
01227
01228 virtual bool ignore_domain_participant(DDS::DomainId_t domain,
01229 const OpenDDS::DCPS::RepoId& myParticipantId,
01230 const OpenDDS::DCPS::RepoId& ignoreId)
01231 {
01232 get_part(domain, myParticipantId)->ignore_domain_participant(ignoreId);
01233 return true;
01234 }
01235
01236 virtual bool update_domain_participant_qos(DDS::DomainId_t domain,
01237 const OpenDDS::DCPS::RepoId& participant,
01238 const DDS::DomainParticipantQos& qos)
01239 {
01240 return get_part(domain, participant)->update_domain_participant_qos(qos);
01241 }
01242
01243 virtual DCPS::TopicStatus assert_topic(OpenDDS::DCPS::RepoId_out topicId,
01244 DDS::DomainId_t domainId,
01245 const OpenDDS::DCPS::RepoId& participantId,
01246 const char* topicName,
01247 const char* dataTypeName,
01248 const DDS::TopicQos& qos,
01249 bool hasDcpsKey)
01250 {
01251 ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, DCPS::INTERNAL_ERROR);
01252 typename OPENDDS_MAP(DDS::DomainId_t,
01253 OPENDDS_MAP(OPENDDS_STRING, TopicDetails) )::iterator topic_it =
01254 topics_.find(domainId);
01255 if (topic_it != topics_.end()) {
01256 const typename OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator it =
01257 topic_it->second.find(topicName);
01258 if (it != topic_it->second.end()
01259 && it->second.data_type_ != dataTypeName) {
01260 topicId = GUID_UNKNOWN;
01261 return DCPS::CONFLICTING_TYPENAME;
01262 }
01263 }
01264
01265
01266 const DCPS::TopicStatus stat =
01267 participants_[domainId][participantId]->assert_topic(topicId, topicName,
01268 dataTypeName, qos,
01269 hasDcpsKey);
01270 if (stat == DCPS::CREATED || stat == DCPS::FOUND) {
01271 TopicDetails& td = topics_[domainId][topicName];
01272 td.data_type_ = dataTypeName;
01273 td.qos_ = qos;
01274 td.repo_id_ = topicId;
01275 ++topic_use_[domainId][topicName];
01276 }
01277 return stat;
01278 }
01279
01280 virtual DCPS::TopicStatus find_topic(DDS::DomainId_t domainId, const char* topicName,
01281 CORBA::String_out dataTypeName, DDS::TopicQos_out qos,
01282 OpenDDS::DCPS::RepoId_out topicId)
01283 {
01284 ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, DCPS::INTERNAL_ERROR);
01285 typename OPENDDS_MAP(DDS::DomainId_t,
01286 OPENDDS_MAP(OPENDDS_STRING, TopicDetails) )::iterator topic_it =
01287 topics_.find(domainId);
01288 if (topic_it == topics_.end()) {
01289 return DCPS::NOT_FOUND;
01290 }
01291 typename OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator iter =
01292 topic_it->second.find(topicName);
01293 if (iter == topic_it->second.end()) {
01294 return DCPS::NOT_FOUND;
01295 }
01296 TopicDetails& td = iter->second;
01297 dataTypeName = td.data_type_.c_str();
01298 qos = new DDS::TopicQos(td.qos_);
01299 topicId = td.repo_id_;
01300 ++topic_use_[domainId][topicName];
01301 return DCPS::FOUND;
01302 }
01303
01304 virtual DCPS::TopicStatus remove_topic(DDS::DomainId_t domainId,
01305 const OpenDDS::DCPS::RepoId& participantId,
01306 const OpenDDS::DCPS::RepoId& topicId)
01307 {
01308 ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, DCPS::INTERNAL_ERROR);
01309 typename OPENDDS_MAP(DDS::DomainId_t,
01310 OPENDDS_MAP(OPENDDS_STRING, TopicDetails) )::iterator topic_it =
01311 topics_.find(domainId);
01312 if (topic_it == topics_.end()) {
01313 return DCPS::NOT_FOUND;
01314 }
01315
01316 OPENDDS_STRING name;
01317
01318 const DCPS::TopicStatus stat =
01319 participants_[domainId][participantId]->remove_topic(topicId, name);
01320
01321 if (stat == DCPS::REMOVED) {
01322 if (0 == --topic_use_[domainId][name]) {
01323 topic_use_[domainId].erase(name);
01324 if (topic_it->second.empty()) {
01325 topic_use_.erase(domainId);
01326 }
01327 topic_it->second.erase(name);
01328 if (topic_it->second.empty()) {
01329 topics_.erase(topic_it);
01330 }
01331 }
01332 }
01333 return stat;
01334 }
01335
01336 virtual bool ignore_topic(DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId& myParticipantId,
01337 const OpenDDS::DCPS::RepoId& ignoreId)
01338 {
01339 get_part(domainId, myParticipantId)->ignore_topic(ignoreId);
01340 return true;
01341 }
01342
01343 virtual bool update_topic_qos(const OpenDDS::DCPS::RepoId& topicId, DDS::DomainId_t domainId,
01344 const OpenDDS::DCPS::RepoId& participantId, const DDS::TopicQos& qos)
01345 {
01346 ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, false);
01347 OPENDDS_STRING name;
01348
01349 if (participants_[domainId][participantId]->update_topic_qos(topicId,
01350 qos, name)) {
01351 topics_[domainId][name].qos_ = qos;
01352 return true;
01353 }
01354 return false;
01355 }
01356
01357 virtual OpenDDS::DCPS::RepoId add_publication(DDS::DomainId_t domainId,
01358 const OpenDDS::DCPS::RepoId& participantId,
01359 const OpenDDS::DCPS::RepoId& topicId,
01360 DCPS::DataWriterCallbacks* publication,
01361 const DDS::DataWriterQos& qos,
01362 const DCPS::TransportLocatorSeq& transInfo,
01363 const DDS::PublisherQos& publisherQos)
01364 {
01365 return get_part(domainId, participantId)->add_publication(
01366 topicId, publication, qos, transInfo, publisherQos);
01367 }
01368
01369 virtual bool remove_publication(DDS::DomainId_t domainId,
01370 const OpenDDS::DCPS::RepoId& participantId,
01371 const OpenDDS::DCPS::RepoId& publicationId)
01372 {
01373 get_part(domainId, participantId)->remove_publication(publicationId);
01374 return true;
01375 }
01376
01377 virtual bool ignore_publication(DDS::DomainId_t domainId,
01378 const OpenDDS::DCPS::RepoId& participantId,
01379 const OpenDDS::DCPS::RepoId& ignoreId)
01380 {
01381 get_part(domainId, participantId)->ignore_publication(ignoreId);
01382 return true;
01383 }
01384
01385 virtual bool update_publication_qos(DDS::DomainId_t domainId,
01386 const OpenDDS::DCPS::RepoId& partId,
01387 const OpenDDS::DCPS::RepoId& dwId,
01388 const DDS::DataWriterQos& qos,
01389 const DDS::PublisherQos& publisherQos)
01390 {
01391 return get_part(domainId, partId)->update_publication_qos(dwId, qos,
01392 publisherQos);
01393 }
01394
01395 virtual OpenDDS::DCPS::RepoId add_subscription(DDS::DomainId_t domainId,
01396 const OpenDDS::DCPS::RepoId& participantId,
01397 const OpenDDS::DCPS::RepoId& topicId,
01398 DCPS::DataReaderCallbacks* subscription,
01399 const DDS::DataReaderQos& qos,
01400 const DCPS::TransportLocatorSeq& transInfo,
01401 const DDS::SubscriberQos& subscriberQos,
01402 const char* filterClassName,
01403 const char* filterExpr,
01404 const DDS::StringSeq& params)
01405 {
01406 return get_part(domainId, participantId)->add_subscription(topicId, subscription, qos, transInfo, subscriberQos, filterClassName, filterExpr, params);
01407 }
01408
01409 virtual bool remove_subscription(DDS::DomainId_t domainId,
01410 const OpenDDS::DCPS::RepoId& participantId,
01411 const OpenDDS::DCPS::RepoId& subscriptionId)
01412 {
01413 get_part(domainId, participantId)->remove_subscription(subscriptionId);
01414 return true;
01415 }
01416
01417 virtual bool ignore_subscription(DDS::DomainId_t domainId,
01418 const OpenDDS::DCPS::RepoId& participantId,
01419 const OpenDDS::DCPS::RepoId& ignoreId)
01420 {
01421 get_part(domainId, participantId)->ignore_subscription(ignoreId);
01422 return true;
01423 }
01424
01425 virtual bool update_subscription_qos(DDS::DomainId_t domainId,
01426 const OpenDDS::DCPS::RepoId& partId,
01427 const OpenDDS::DCPS::RepoId& drId,
01428 const DDS::DataReaderQos& qos,
01429 const DDS::SubscriberQos& subQos)
01430 {
01431 return get_part(domainId, partId)->update_subscription_qos(drId, qos, subQos);
01432 }
01433
01434 virtual bool update_subscription_params(DDS::DomainId_t domainId,
01435 const OpenDDS::DCPS::RepoId& partId,
01436 const OpenDDS::DCPS::RepoId& subId,
01437 const DDS::StringSeq& params)
01438 {
01439 return get_part(domainId, partId)->update_subscription_params(subId, params);
01440 }
01441
01442 virtual void association_complete(DDS::DomainId_t domainId,
01443 const OpenDDS::DCPS::RepoId& participantId,
01444 const OpenDDS::DCPS::RepoId& localId,
01445 const OpenDDS::DCPS::RepoId& remoteId)
01446 {
01447 get_part(domainId, participantId)->association_complete(localId, remoteId);
01448 }
01449
01450 ACE_Reactor*
01451 reactor()
01452 {
01453 ACE_GUARD_RETURN(ACE_Thread_Mutex, g, reactor_runner_.mtx_, 0);
01454 if (!reactor_runner_.reactor_) {
01455 reactor_runner_.reactor_ = new ACE_Reactor(new ACE_Select_Reactor, true);
01456 reactor_runner_.activate();
01457 }
01458 return reactor_runner_.reactor_;
01459 }
01460
01461 protected:
01462
01463 typedef DCPS::RcHandle<Participant> ParticipantHandle;
01464 typedef OPENDDS_MAP_CMP(DCPS::RepoId, ParticipantHandle, DCPS::GUID_tKeyLessThan) ParticipantMap;
01465 typedef OPENDDS_MAP(DDS::DomainId_t, ParticipantMap) DomainParticipantMap;
01466
01467 ParticipantHandle
01468 get_part(const DDS::DomainId_t domain_id,
01469 const OpenDDS::DCPS::RepoId& part_id) const
01470 {
01471 ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, ParticipantHandle());
01472 typename DomainParticipantMap::const_iterator domain = participants_.find(domain_id);
01473 if (domain == participants_.end()) {
01474 return ParticipantHandle();
01475 }
01476 typename ParticipantMap::const_iterator part = domain->second.find(part_id);
01477 if (part == domain->second.end()) {
01478 return ParticipantHandle();
01479 }
01480 return part->second;
01481 }
01482
01483 void create_bit_dr(DDS::TopicDescription_ptr topic, const char* type,
01484 DCPS::SubscriberImpl* sub,
01485 const DDS::DataReaderQos& qos)
01486 {
01487 using namespace DCPS;
01488 TopicDescriptionImpl* bit_topic_i =
01489 dynamic_cast<TopicDescriptionImpl*>(topic);
01490
01491 DDS::DomainParticipant_var participant = sub->get_participant();
01492 DomainParticipantImpl* participant_i =
01493 dynamic_cast<DomainParticipantImpl*>(participant.in());
01494
01495 TypeSupport_var type_support =
01496 Registered_Data_Types->lookup(participant, type);
01497
01498 DDS::DataReader_var dr = type_support->create_datareader();
01499 OpenDDS::DCPS::DataReaderImpl* dri = dynamic_cast<OpenDDS::DCPS::DataReaderImpl*>(dr.in());
01500
01501 dri->init(bit_topic_i, qos, 0 , 0 ,
01502 participant_i, sub, dr);
01503 dri->disable_transport();
01504 dri->enable();
01505 }
01506
01507 mutable ACE_Thread_Mutex lock_;
01508
01509
01510 struct ReactorRunner : ACE_Task_Base {
01511 ReactorRunner() : reactor_(0) {}
01512 ~ReactorRunner()
01513 {
01514 delete reactor_;
01515 }
01516
01517 int svc()
01518 {
01519 reactor_->owner(ACE_Thread_Manager::instance()->thr_self());
01520 reactor_->run_reactor_event_loop();
01521 return 0;
01522 }
01523
01524 void end()
01525 {
01526 ACE_GUARD(ACE_Thread_Mutex, g, mtx_);
01527 if (reactor_) {
01528 reactor_->end_reactor_event_loop();
01529 wait();
01530 }
01531 }
01532
01533 ACE_Reactor* reactor_;
01534 ACE_Thread_Mutex mtx_;
01535 } reactor_runner_;
01536
01537 DomainParticipantMap participants_;
01538 OPENDDS_MAP(DDS::DomainId_t, OPENDDS_MAP(OPENDDS_STRING, TopicDetails) ) topics_;
01539 OPENDDS_MAP(DDS::DomainId_t, OPENDDS_MAP(OPENDDS_STRING, unsigned int) ) topic_use_;
01540 };
01541
01542 }
01543 }
01544
01545 #endif