00001
00002
00003
00004
00005
00006 #ifndef OPENDDS_DDS_DCPS_DISCOVERYBASE_H
00007 #define OPENDDS_DDS_DCPS_DISCOVERYBASE_H
00008
00009 #include "dds/DCPS/Discovery.h"
00010 #include "dds/DCPS/GuidUtils.h"
00011 #include "dds/DCPS/DCPS_Utils.h"
00012 #include "dds/DCPS/DomainParticipantImpl.h"
00013 #include "dds/DCPS/Marked_Default_Qos.h"
00014 #include "dds/DCPS/SubscriberImpl.h"
00015 #include "dds/DCPS/BuiltInTopicUtils.h"
00016 #include "dds/DCPS/Registered_Data_Types.h"
00017 #include "dds/DCPS/DataReaderImpl_T.h"
00018 #include "dds/DdsDcpsCoreTypeSupportImpl.h"
00019 #include "dds/DdsSecurityCoreC.h"
00020
00021 #include "ace/Select_Reactor.h"
00022 #include "ace/Condition_Thread_Mutex.h"
00023
00024 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00025 #pragma once
00026 #endif
00027
00028 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00029
00030 namespace OpenDDS {
00031 namespace DCPS {
00032 typedef DataReaderImpl_T<DDS::ParticipantBuiltinTopicData> ParticipantBuiltinTopicDataDataReaderImpl;
00033 typedef DataReaderImpl_T<DDS::PublicationBuiltinTopicData> PublicationBuiltinTopicDataDataReaderImpl;
00034 typedef DataReaderImpl_T<DDS::SubscriptionBuiltinTopicData> SubscriptionBuiltinTopicDataDataReaderImpl;
00035 typedef DataReaderImpl_T<DDS::TopicBuiltinTopicData> TopicBuiltinTopicDataDataReaderImpl;
00036
00037 #if defined(OPENDDS_SECURITY)
00038 typedef OPENDDS_MAP_CMP(DCPS::RepoId, DDS::Security::DatareaderCryptoHandle, DCPS::GUID_tKeyLessThan) DatareaderCryptoHandleMap;
00039 typedef OPENDDS_MAP_CMP(DCPS::RepoId, DDS::Security::DatawriterCryptoHandle, DCPS::GUID_tKeyLessThan) DatawriterCryptoHandleMap;
00040 typedef OPENDDS_MAP_CMP(DCPS::RepoId, DDS::Security::DatareaderCryptoTokenSeq, DCPS::GUID_tKeyLessThan) DatareaderCryptoTokenSeqMap;
00041 typedef OPENDDS_MAP_CMP(DCPS::RepoId, DDS::Security::DatawriterCryptoTokenSeq, DCPS::GUID_tKeyLessThan) DatawriterCryptoTokenSeqMap;
00042 typedef OPENDDS_MAP_CMP(DCPS::RepoId, DDS::Security::EndpointSecurityAttributes, DCPS::GUID_tKeyLessThan) EndpointSecurityAttributesMap;
00043
00044 typedef enum {
00045 AS_UNKNOWN,
00046 AS_VALIDATING_REMOTE,
00047 AS_HANDSHAKE_REQUEST,
00048 AS_HANDSHAKE_REQUEST_SENT,
00049 AS_HANDSHAKE_REPLY,
00050 AS_HANDSHAKE_REPLY_SENT,
00051 AS_AUTHENTICATED,
00052 AS_UNAUTHENTICATED
00053 } AuthState;
00054 #endif
00055
00056 inline void assign(DCPS::EntityKey_t& lhs, unsigned int rhs)
00057 {
00058 lhs[0] = static_cast<CORBA::Octet>(rhs);
00059 lhs[1] = static_cast<CORBA::Octet>(rhs >> 8);
00060 lhs[2] = static_cast<CORBA::Octet>(rhs >> 16);
00061 }
00062
00063 struct DcpsUpcalls : ACE_Task_Base {
00064 DcpsUpcalls(DCPS::DataReaderCallbacks* drr,
00065 const RepoId& reader,
00066 const DCPS::WriterAssociation& wa,
00067 bool active,
00068 DCPS::DataWriterCallbacks* dwr)
00069 : drr_(drr), reader_(reader), wa_(wa), active_(active), dwr_(dwr)
00070 , reader_done_(false), writer_done_(false), cnd_(mtx_)
00071 {}
00072
00073 int svc()
00074 {
00075 drr_->add_association(reader_, wa_, active_);
00076 {
00077 ACE_GUARD_RETURN(ACE_Thread_Mutex, g, mtx_, -1);
00078 reader_done_ = true;
00079 cnd_.signal();
00080 while (!writer_done_) {
00081 cnd_.wait();
00082 }
00083 }
00084 dwr_->association_complete(reader_);
00085 return 0;
00086 }
00087
00088 void writer_done()
00089 {
00090 {
00091 ACE_GUARD(ACE_Thread_Mutex, g, mtx_);
00092 writer_done_ = true;
00093 cnd_.signal();
00094 }
00095 wait();
00096 }
00097
00098 DCPS::DataReaderCallbacks* const drr_;
00099 const RepoId& reader_;
00100 const DCPS::WriterAssociation& wa_;
00101 bool active_;
00102 DCPS::DataWriterCallbacks* const dwr_;
00103 bool reader_done_, writer_done_;
00104 ACE_Thread_Mutex mtx_;
00105 ACE_Condition_Thread_Mutex cnd_;
00106 };
00107
00108 template <typename DiscoveredParticipantData_>
00109 class EndpointManager {
00110 protected:
00111
00112 struct DiscoveredSubscription {
00113 DiscoveredSubscription()
00114 : bit_ih_(DDS::HANDLE_NIL)
00115 {
00116 }
00117
00118 explicit DiscoveredSubscription(const OpenDDS::DCPS::DiscoveredReaderData& r)
00119 : reader_data_(r)
00120 , bit_ih_(DDS::HANDLE_NIL)
00121 {
00122 }
00123
00124 OpenDDS::DCPS::DiscoveredReaderData reader_data_;
00125 DDS::InstanceHandle_t bit_ih_;
00126
00127 #if defined(OPENDDS_SECURITY)
00128 DDS::Security::EndpointSecurityAttributes security_attribs_;
00129 #endif
00130
00131 };
00132
00133 typedef OPENDDS_MAP_CMP(DCPS::RepoId, DiscoveredSubscription,
00134 DCPS::GUID_tKeyLessThan) DiscoveredSubscriptionMap;
00135
00136 typedef typename DiscoveredSubscriptionMap::iterator DiscoveredSubscriptionIter;
00137
00138 struct DiscoveredPublication {
00139 DiscoveredPublication()
00140 : bit_ih_(DDS::HANDLE_NIL)
00141 {
00142 }
00143
00144 explicit DiscoveredPublication(const OpenDDS::DCPS::DiscoveredWriterData& w)
00145 : writer_data_(w)
00146 , bit_ih_(DDS::HANDLE_NIL)
00147 {
00148 }
00149
00150 OpenDDS::DCPS::DiscoveredWriterData writer_data_;
00151 DDS::InstanceHandle_t bit_ih_;
00152
00153 #if defined(OPENDDS_SECURITY)
00154 DDS::Security::EndpointSecurityAttributes security_attribs_;
00155 #endif
00156
00157 };
00158
00159 typedef OPENDDS_MAP_CMP(DCPS::RepoId, DiscoveredPublication,
00160 DCPS::GUID_tKeyLessThan) DiscoveredPublicationMap;
00161 typedef typename DiscoveredPublicationMap::iterator DiscoveredPublicationIter;
00162
00163 public:
00164 typedef DiscoveredParticipantData_ DiscoveredParticipantData;
00165
00166 struct TopicDetails {
00167 OPENDDS_STRING data_type_;
00168 DDS::TopicQos qos_;
00169 DCPS::RepoId repo_id_;
00170 bool has_dcps_key_;
00171 RepoIdSet endpoints_;
00172 };
00173
00174 EndpointManager(const RepoId& participant_id, ACE_Thread_Mutex& lock)
00175 : lock_(lock)
00176 , participant_id_(participant_id)
00177 , publication_counter_(0)
00178 , subscription_counter_(0)
00179 , topic_counter_(0)
00180
00181 #if defined(OPENDDS_SECURITY)
00182 , permissions_handle_(DDS::HANDLE_NIL)
00183 #endif
00184
00185 {
00186
00187 }
00188
00189 virtual ~EndpointManager() { }
00190
00191 RepoId bit_key_to_repo_id(const char* bit_topic_name,
00192 const DDS::BuiltinTopicKey_t& key)
00193 {
00194 ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, RepoId());
00195 if (0 == std::strcmp(bit_topic_name, DCPS::BUILT_IN_PUBLICATION_TOPIC)) {
00196 return pub_key_to_id_[key];
00197 }
00198 if (0 == std::strcmp(bit_topic_name, DCPS::BUILT_IN_SUBSCRIPTION_TOPIC)) {
00199 return sub_key_to_id_[key];
00200 }
00201 return RepoId();
00202 }
00203
00204 void ignore(const DCPS::RepoId& to_ignore)
00205 {
00206
00207 ignored_guids_.insert(to_ignore);
00208 {
00209 const DiscoveredPublicationIter iter =
00210 discovered_publications_.find(to_ignore);
00211 if (iter != discovered_publications_.end()) {
00212
00213 topics_[get_topic_name(iter->second)].endpoints_.erase(iter->first);
00214 remove_from_bit(iter->second);
00215 OPENDDS_STRING topic_name = get_topic_name(iter->second);
00216 discovered_publications_.erase(iter);
00217
00218 typename OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it =
00219 topics_.find(topic_name);
00220 if (top_it != topics_.end()) {
00221 match_endpoints(to_ignore, top_it->second, true );
00222 }
00223 return;
00224 }
00225 }
00226 {
00227 const DiscoveredSubscriptionIter iter =
00228 discovered_subscriptions_.find(to_ignore);
00229 if (iter != discovered_subscriptions_.end()) {
00230
00231 topics_[get_topic_name(iter->second)].endpoints_.erase(iter->first);
00232 remove_from_bit(iter->second);
00233 OPENDDS_STRING topic_name = get_topic_name(iter->second);
00234 discovered_subscriptions_.erase(iter);
00235
00236 typename OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it =
00237 topics_.find(topic_name);
00238 if (top_it != topics_.end()) {
00239 match_endpoints(to_ignore, top_it->second, true );
00240 }
00241 return;
00242 }
00243 }
00244 {
00245 const OPENDDS_MAP_CMP(RepoId, OPENDDS_STRING, DCPS::GUID_tKeyLessThan)::iterator
00246 iter = topic_names_.find(to_ignore);
00247 if (iter != topic_names_.end()) {
00248 ignored_topics_.insert(iter->second);
00249
00250 typename OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it =
00251 topics_.find(iter->second);
00252 if (top_it != topics_.end()) {
00253 TopicDetails& td = top_it->second;
00254 RepoIdSet::iterator ep;
00255 for (ep = td.endpoints_.begin(); ep!= td.endpoints_.end(); ++ep) {
00256 match_endpoints(*ep, td, true );
00257 if (shutting_down()) { return; }
00258 }
00259 }
00260 }
00261 }
00262 }
00263
00264 bool ignoring(const DCPS::RepoId& guid) const {
00265 return ignored_guids_.count(guid);
00266 }
00267 bool ignoring(const char* topic_name) const {
00268 return ignored_topics_.count(topic_name);
00269 }
00270
00271 DCPS::TopicStatus assert_topic(DCPS::RepoId_out topicId, const char* topicName,
00272 const char* dataTypeName, const DDS::TopicQos& qos,
00273 bool hasDcpsKey)
00274 {
00275 ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, DCPS::INTERNAL_ERROR);
00276 typename OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator iter =
00277 topics_.find(topicName);
00278 if (iter != topics_.end()) {
00279 iter->second.qos_ = qos;
00280 iter->second.has_dcps_key_ = hasDcpsKey;
00281 topicId = iter->second.repo_id_;
00282 topic_names_[iter->second.repo_id_] = topicName;
00283 return DCPS::FOUND;
00284 }
00285
00286 TopicDetails& td = topics_[topicName];
00287 td.data_type_ = dataTypeName;
00288 td.qos_ = qos;
00289 td.has_dcps_key_ = hasDcpsKey;
00290 td.repo_id_ = make_topic_guid();
00291 topicId = td.repo_id_;
00292 topic_names_[td.repo_id_] = topicName;
00293
00294 return DCPS::CREATED;
00295 }
00296
00297 DCPS::TopicStatus remove_topic(const RepoId& topicId, OPENDDS_STRING& name)
00298 {
00299 ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, DCPS::INTERNAL_ERROR);
00300 name = topic_names_[topicId];
00301 typename OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it =
00302 topics_.find(name);
00303 if (top_it != topics_.end()) {
00304 TopicDetails& td = top_it->second;
00305 if (td.endpoints_.empty()) {
00306 topics_.erase(name);
00307 }
00308 }
00309
00310 topic_names_.erase(topicId);
00311 return DCPS::REMOVED;
00312 }
00313
00314 virtual bool update_topic_qos(const DCPS::RepoId& topicId, const DDS::TopicQos& qos,
00315 OPENDDS_STRING& name) = 0;
00316
00317 DCPS::RepoId add_publication(const DCPS::RepoId& topicId,
00318 DCPS::DataWriterCallbacks* publication,
00319 const DDS::DataWriterQos& qos,
00320 const DCPS::TransportLocatorSeq& transInfo,
00321 const DDS::PublisherQos& publisherQos)
00322 {
00323 ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, RepoId());
00324
00325 RepoId rid = participant_id_;
00326 assign_publication_key(rid, topicId, qos);
00327 LocalPublication& pb = local_publications_[rid];
00328 pb.topic_id_ = topicId;
00329 pb.publication_ = publication;
00330 pb.qos_ = qos;
00331 pb.trans_info_ = transInfo;
00332 pb.publisher_qos_ = publisherQos;
00333
00334 const std::string& topic_name = topic_names_[topicId];
00335
00336 #if defined(OPENDDS_SECURITY)
00337 if (is_security_enabled()) {
00338 DDS::Security::SecurityException ex;
00339
00340 DDS::Security::TopicSecurityAttributes topic_sec_attr;
00341 if (!get_access_control()->get_topic_sec_attributes(get_permissions_handle(), topic_name.data(), topic_sec_attr, ex)) {
00342 ACE_ERROR((LM_ERROR,
00343 ACE_TEXT("(%P|%t) ERROR: ")
00344 ACE_TEXT("DomainParticipant::add_publication, ")
00345 ACE_TEXT("Unable to get security attributes for topic '%C'. SecurityException[%d.%d]: %C\n"),
00346 topic_name.data(), ex.code, ex.minor_code, ex.message.in()));
00347 return RepoId();
00348 }
00349
00350 if (topic_sec_attr.is_write_protected == true) {
00351 if (!get_access_control()->check_create_datawriter(get_permissions_handle(), get_domain_id(), topic_name.data(), qos,
00352 publisherQos.partition, DDS::Security::DataTagQosPolicy(), ex)) {
00353 ACE_ERROR((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: ")
00354 ACE_TEXT("EndpointManager::add_publication() - ")
00355 ACE_TEXT("Permissions check failed for local datawriter on topic '%C'. Security Exception[%d.%d]: %C\n"), topic_name.data(),
00356 ex.code, ex.minor_code, ex.message.in()));
00357 return RepoId();
00358 }
00359 }
00360
00361 if (!get_access_control()->get_datawriter_sec_attributes(get_permissions_handle(), topic_name.data(),
00362 publisherQos.partition, DDS::Security::DataTagQosPolicy(), pb.security_attribs_, ex)) {
00363 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00364 ACE_TEXT("EndpointManager::add_publication() - ")
00365 ACE_TEXT("Unable to get security attributes for local datawriter. Security Exception[%d.%d]: %C\n"),
00366 ex.code, ex.minor_code, ex.message.in()));
00367 return RepoId();
00368 }
00369
00370 if (pb.security_attribs_.is_submessage_protected || pb.security_attribs_.is_payload_protected) {
00371 DDS::Security::DatawriterCryptoHandle handle = get_crypto_key_factory()->register_local_datawriter(crypto_handle_, DDS::PropertySeq(), pb.security_attribs_, ex);
00372 if (handle == DDS::HANDLE_NIL) {
00373 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00374 ACE_TEXT("EndpointManager::add_publication() - ")
00375 ACE_TEXT("Unable to get local datawriter crypto handle. Security Exception[%d.%d]: %C\n"),
00376 ex.code, ex.minor_code, ex.message.in()));
00377 }
00378
00379 local_writer_crypto_handles_[rid] = handle;
00380 local_writer_security_attribs_[rid] = pb.security_attribs_;
00381 }
00382 }
00383 #endif
00384
00385 TopicDetails& td = topics_[topic_name];
00386 td.endpoints_.insert(rid);
00387
00388 if (DDS::RETCODE_OK != add_publication_i(rid, pb)) {
00389 return RepoId();
00390 }
00391
00392 if (DDS::RETCODE_OK != write_publication_data(rid, pb)) {
00393 return RepoId();
00394 }
00395
00396 if (DCPS::DCPS_debug_level > 3) {
00397 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) EndpointManager::add_publication - ")
00398 ACE_TEXT("calling match_endpoints\n")));
00399 }
00400 match_endpoints(rid, td);
00401
00402 return rid;
00403 }
00404
00405 void remove_publication(const DCPS::RepoId& publicationId)
00406 {
00407 ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00408 LocalPublicationIter iter = local_publications_.find(publicationId);
00409 if (iter != local_publications_.end()) {
00410 if (DDS::RETCODE_OK == remove_publication_i(publicationId))
00411 {
00412 OPENDDS_STRING topic_name = topic_names_[iter->second.topic_id_];
00413 local_publications_.erase(publicationId);
00414 typename OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it =
00415 topics_.find(topic_name);
00416 if (top_it != topics_.end()) {
00417 match_endpoints(publicationId, top_it->second, true );
00418 top_it->second.endpoints_.erase(publicationId);
00419 }
00420 } else {
00421 ACE_ERROR((LM_ERROR,
00422 ACE_TEXT("(%P|%t) ERROR: EndpointManager::remove_publication - ")
00423 ACE_TEXT("Failed to publish dispose msg\n")));
00424 }
00425 }
00426 }
00427
00428 virtual bool update_publication_qos(const DCPS::RepoId& publicationId,
00429 const DDS::DataWriterQos& qos,
00430 const DDS::PublisherQos& publisherQos) = 0;
00431
00432 DCPS::RepoId add_subscription(const DCPS::RepoId& topicId,
00433 DCPS::DataReaderCallbacks* subscription,
00434 const DDS::DataReaderQos& qos,
00435 const DCPS::TransportLocatorSeq& transInfo,
00436 const DDS::SubscriberQos& subscriberQos,
00437 const char* filterClassName,
00438 const char* filterExpr,
00439 const DDS::StringSeq& params)
00440 {
00441 ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, RepoId());
00442
00443 RepoId rid = participant_id_;
00444 assign_subscription_key(rid, topicId, qos);
00445 LocalSubscription& sb = local_subscriptions_[rid];
00446 sb.topic_id_ = topicId;
00447 sb.subscription_ = subscription;
00448 sb.qos_ = qos;
00449 sb.trans_info_ = transInfo;
00450 sb.subscriber_qos_ = subscriberQos;
00451 sb.filterProperties.filterClassName = filterClassName;
00452 sb.filterProperties.filterExpression = filterExpr;
00453 sb.filterProperties.expressionParameters = params;
00454
00455 const std::string& topic_name = topic_names_[topicId];
00456
00457 #if defined(OPENDDS_SECURITY)
00458 if (is_security_enabled()) {
00459 DDS::Security::SecurityException ex;
00460
00461 DDS::Security::TopicSecurityAttributes topic_sec_attr;
00462 if (!get_access_control()->get_topic_sec_attributes(get_permissions_handle(), topic_name.data(), topic_sec_attr, ex)) {
00463 ACE_ERROR((LM_ERROR,
00464 ACE_TEXT("(%P|%t) ERROR: ")
00465 ACE_TEXT("DomainParticipant::add_subscription, ")
00466 ACE_TEXT("Unable to get security attributes for topic '%C'. SecurityException[%d.%d]: %C\n"),
00467 topic_name.data(), ex.code, ex.minor_code, ex.message.in()));
00468 return RepoId();
00469 }
00470
00471 if (topic_sec_attr.is_read_protected == true) {
00472 if (!get_access_control()->check_create_datareader(get_permissions_handle(), get_domain_id(), topic_name.data(), qos,
00473 subscriberQos.partition, DDS::Security::DataTagQosPolicy(), ex)) {
00474 ACE_ERROR((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: ")
00475 ACE_TEXT("EndpointManager::add_subscription() - ")
00476 ACE_TEXT("Permissions check failed for local datareader on topic '%C'. Security Exception[%d.%d]: %C\n"), topic_name.data(),
00477 ex.code, ex.minor_code, ex.message.in()));
00478 return RepoId();
00479 }
00480 }
00481
00482 if (!get_access_control()->get_datareader_sec_attributes(get_permissions_handle(), topic_name.data(),
00483 subscriberQos.partition, DDS::Security::DataTagQosPolicy(), sb.security_attribs_, ex)) {
00484 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00485 ACE_TEXT("EndpointManager::add_subscription() - ")
00486 ACE_TEXT("Unable to get security attributes for local datareader. Security Exception[%d.%d]: %C\n"),
00487 ex.code, ex.minor_code, ex.message.in()));
00488 return RepoId();
00489 }
00490
00491 if (sb.security_attribs_.is_submessage_protected || sb.security_attribs_.is_payload_protected) {
00492 DDS::Security::DatareaderCryptoHandle handle = get_crypto_key_factory()->register_local_datareader(crypto_handle_, DDS::PropertySeq(), sb.security_attribs_, ex);
00493 if (handle == DDS::HANDLE_NIL) {
00494 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00495 ACE_TEXT("EndpointManager::add_subscription() - ")
00496 ACE_TEXT("Unable to get local datareader crypto handle. Security Exception[%d.%d]: %C\n"),
00497 ex.code, ex.minor_code, ex.message.in()));
00498 }
00499
00500 local_reader_crypto_handles_[rid] = handle;
00501 local_reader_security_attribs_[rid] = sb.security_attribs_;
00502 }
00503 }
00504 #endif
00505
00506 TopicDetails& td = topics_[topic_name];
00507 td.endpoints_.insert(rid);
00508
00509 if (DDS::RETCODE_OK != add_subscription_i(rid, sb)) {
00510 return RepoId();
00511 }
00512
00513 if (DDS::RETCODE_OK != write_subscription_data(rid, sb)) {
00514 return RepoId();
00515 }
00516
00517 if (DCPS::DCPS_debug_level > 3) {
00518 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) EndpointManager::add_subscription - ")
00519 ACE_TEXT("calling match_endpoints\n")));
00520 }
00521 match_endpoints(rid, td);
00522
00523 return rid;
00524 }
00525
00526 void remove_subscription(const DCPS::RepoId& subscriptionId)
00527 {
00528 ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00529 LocalSubscriptionIter iter = local_subscriptions_.find(subscriptionId);
00530 if (iter != local_subscriptions_.end()) {
00531 if (DDS::RETCODE_OK == remove_subscription_i(subscriptionId)
00532 ) {
00533 OPENDDS_STRING topic_name = topic_names_[iter->second.topic_id_];
00534 local_subscriptions_.erase(subscriptionId);
00535 typename OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it =
00536 topics_.find(topic_name);
00537 if (top_it != topics_.end()) {
00538 match_endpoints(subscriptionId, top_it->second, true );
00539 top_it->second.endpoints_.erase(subscriptionId);
00540 }
00541 } else {
00542 ACE_ERROR((LM_ERROR,
00543 ACE_TEXT("(%P|%t) ERROR: EndpointManager::remove_subscription - ")
00544 ACE_TEXT("Failed to publish dispose msg\n")));
00545 }
00546 }
00547 }
00548
00549 virtual bool update_subscription_qos(const DCPS::RepoId& subscriptionId,
00550 const DDS::DataReaderQos& qos,
00551 const DDS::SubscriberQos& subscriberQos) = 0;
00552
00553 virtual bool update_subscription_params(const DCPS::RepoId& subId,
00554 const DDS::StringSeq& params) = 0;
00555
00556 virtual void association_complete(const DCPS::RepoId& localId,
00557 const DCPS::RepoId& remoteId) = 0;
00558
00559 virtual bool disassociate(const DiscoveredParticipantData& pdata) = 0;
00560
00561 protected:
00562 struct LocalEndpoint {
00563 LocalEndpoint() : topic_id_(DCPS::GUID_UNKNOWN), sequence_(DCPS::SequenceNumber::SEQUENCENUMBER_UNKNOWN()) {}
00564 DCPS::RepoId topic_id_;
00565 DCPS::TransportLocatorSeq trans_info_;
00566 RepoIdSet matched_endpoints_;
00567 DCPS::SequenceNumber sequence_;
00568 RepoIdSet remote_opendds_associations_;
00569 };
00570
00571 struct LocalPublication : LocalEndpoint {
00572 DCPS::DataWriterCallbacks* publication_;
00573 DDS::DataWriterQos qos_;
00574 DDS::PublisherQos publisher_qos_;
00575
00576 #if defined(OPENDDS_SECURITY)
00577 DDS::Security::EndpointSecurityAttributes security_attribs_;
00578 #endif
00579
00580 };
00581
00582 struct LocalSubscription : LocalEndpoint {
00583 DCPS::DataReaderCallbacks* subscription_;
00584 DDS::DataReaderQos qos_;
00585 DDS::SubscriberQos subscriber_qos_;
00586 OpenDDS::DCPS::ContentFilterProperty_t filterProperties;
00587
00588 #if defined(OPENDDS_SECURITY)
00589 DDS::Security::EndpointSecurityAttributes security_attribs_;
00590 #endif
00591
00592 };
00593
00594 typedef OPENDDS_MAP_CMP(DDS::BuiltinTopicKey_t, DCPS::RepoId,
00595 DCPS::BuiltinTopicKeyLess) BitKeyMap;
00596
00597 typedef OPENDDS_MAP_CMP(DCPS::RepoId, LocalPublication,
00598 DCPS::GUID_tKeyLessThan) LocalPublicationMap;
00599 typedef typename LocalPublicationMap::iterator LocalPublicationIter;
00600 typedef typename LocalPublicationMap::const_iterator LocalPublicationCIter;
00601
00602 typedef OPENDDS_MAP_CMP(DCPS::RepoId, LocalSubscription,
00603 DCPS::GUID_tKeyLessThan) LocalSubscriptionMap;
00604 typedef typename LocalSubscriptionMap::iterator LocalSubscriptionIter;
00605 typedef typename LocalSubscriptionMap::const_iterator LocalSubscriptionCIter;
00606
00607 typedef typename OPENDDS_MAP_CMP(DCPS::RepoId, OPENDDS_STRING, DCPS::GUID_tKeyLessThan) TopicNameMap;
00608
00609 static const char* get_topic_name(const DiscoveredPublication& pub) {
00610 return pub.writer_data_.ddsPublicationData.topic_name;
00611 }
00612 static const char* get_topic_name(const DiscoveredSubscription& sub) {
00613 return sub.reader_data_.ddsSubscriptionData.topic_name;
00614 }
00615 static DDS::BuiltinTopicKey_t get_key(const DiscoveredPublication& pub) {
00616 return pub.writer_data_.ddsPublicationData.key;
00617 }
00618 static DDS::BuiltinTopicKey_t get_key(const DiscoveredSubscription& sub) {
00619 return sub.reader_data_.ddsSubscriptionData.key;
00620 }
00621
00622 virtual void remove_from_bit_i(const DiscoveredPublication& ) { }
00623 virtual void remove_from_bit_i(const DiscoveredSubscription& ) { }
00624
00625 virtual void assign_publication_key(RepoId& rid,
00626 const RepoId& topicId,
00627 const DDS::DataWriterQos& ) {
00628 rid.entityId.entityKind =
00629 has_dcps_key(topicId)
00630 ? DCPS::ENTITYKIND_USER_WRITER_WITH_KEY
00631 : DCPS::ENTITYKIND_USER_WRITER_NO_KEY;
00632 assign(rid.entityId.entityKey, publication_counter_++);
00633 }
00634 virtual void assign_subscription_key(RepoId& rid,
00635 const RepoId& topicId,
00636 const DDS::DataReaderQos& ) {
00637 rid.entityId.entityKind =
00638 has_dcps_key(topicId)
00639 ? DCPS::ENTITYKIND_USER_READER_WITH_KEY
00640 : DCPS::ENTITYKIND_USER_READER_NO_KEY;
00641 assign(rid.entityId.entityKey, subscription_counter_++);
00642 }
00643 virtual void assign_topic_key(RepoId& guid) {
00644 assign(guid.entityId.entityKey, topic_counter_++);
00645
00646 if (topic_counter_ == 0x1000000) {
00647 ACE_ERROR((LM_ERROR,
00648 ACE_TEXT("(%P|%t) ERROR: EndpointManager::make_topic_guid: ")
00649 ACE_TEXT("Exceeded Maximum number of topic entity keys!")
00650 ACE_TEXT("Next key will be a duplicate!\n")));
00651 topic_counter_ = 0;
00652 }
00653 }
00654
00655 virtual DDS::ReturnCode_t add_publication_i(const DCPS::RepoId& ,
00656 LocalPublication& ) { return DDS::RETCODE_OK; }
00657 virtual DDS::ReturnCode_t write_publication_data(const DCPS::RepoId& ,
00658 LocalPublication& ,
00659 const DCPS::RepoId& reader = DCPS::GUID_UNKNOWN) { ACE_UNUSED_ARG(reader); return DDS::RETCODE_OK; }
00660 virtual DDS::ReturnCode_t remove_publication_i(const RepoId& publicationId) = 0;
00661
00662 virtual DDS::ReturnCode_t add_subscription_i(const DCPS::RepoId& ,
00663 LocalSubscription& ) { return DDS::RETCODE_OK; };
00664 virtual DDS::ReturnCode_t write_subscription_data(const DCPS::RepoId& ,
00665 LocalSubscription& ,
00666 const DCPS::RepoId& reader = DCPS::GUID_UNKNOWN) { ACE_UNUSED_ARG(reader); return DDS::RETCODE_OK; }
00667 virtual DDS::ReturnCode_t remove_subscription_i(const RepoId& subscriptionId) = 0;
00668
00669 void match_endpoints(DCPS::RepoId repoId, const TopicDetails& td,
00670 bool remove = false)
00671 {
00672 const bool reader = repoId.entityId.entityKind & 4;
00673
00674 RepoIdSet endpoints_copy = td.endpoints_;
00675
00676 for (RepoIdSet::const_iterator iter = endpoints_copy.begin();
00677 iter != endpoints_copy.end(); ++iter) {
00678
00679 if (bool(iter->entityId.entityKind & 4) != reader) {
00680 if (remove) {
00681 remove_assoc(*iter, repoId);
00682 } else {
00683 match(reader ? *iter : repoId, reader ? repoId : *iter);
00684 }
00685 }
00686 }
00687 }
00688
00689 void
00690 remove_assoc(const RepoId& remove_from,
00691 const RepoId& removing)
00692 {
00693 const bool reader = remove_from.entityId.entityKind & 4;
00694 if (reader) {
00695 const LocalSubscriptionIter lsi = local_subscriptions_.find(remove_from);
00696 if (lsi != local_subscriptions_.end()) {
00697 lsi->second.matched_endpoints_.erase(removing);
00698 DCPS::WriterIdSeq writer_seq(1);
00699 writer_seq.length(1);
00700 writer_seq[0] = removing;
00701 lsi->second.remote_opendds_associations_.erase(removing);
00702 lsi->second.subscription_->remove_associations(writer_seq,
00703 false );
00704
00705 write_subscription_data(remove_from, lsi->second);
00706 }
00707
00708 } else {
00709 const LocalPublicationIter lpi = local_publications_.find(remove_from);
00710 if (lpi != local_publications_.end()) {
00711 lpi->second.matched_endpoints_.erase(removing);
00712 DCPS::ReaderIdSeq reader_seq(1);
00713 reader_seq.length(1);
00714 reader_seq[0] = removing;
00715 lpi->second.remote_opendds_associations_.erase(removing);
00716 lpi->second.publication_->remove_associations(reader_seq,
00717 false );
00718 }
00719 }
00720 }
00721
00722 #if defined(OPENDDS_SECURITY)
00723 virtual DDS::Security::DatawriterCryptoHandle
00724 generate_remote_matched_writer_crypto_handle(const RepoId&, const DDS::Security::DatareaderCryptoHandle&)
00725 {
00726 return DDS::HANDLE_NIL;
00727 }
00728
00729 virtual DDS::Security::DatareaderCryptoHandle
00730 generate_remote_matched_reader_crypto_handle(const RepoId&, const DDS::Security::DatawriterCryptoHandle&, bool)
00731 {
00732 return DDS::HANDLE_NIL;
00733 }
00734
00735 virtual void
00736 create_and_send_datareader_crypto_tokens(const DDS::Security::DatareaderCryptoHandle&, const DCPS::RepoId&, const DDS::Security::DatawriterCryptoHandle&, const DCPS::RepoId&)
00737 {
00738 return;
00739 }
00740
00741 virtual void
00742 create_and_send_datawriter_crypto_tokens(const DDS::Security::DatawriterCryptoHandle&, const DCPS::RepoId&, const DDS::Security::DatareaderCryptoHandle&, const DCPS::RepoId&)
00743 {
00744 return;
00745 }
00746 #endif
00747
00748 virtual DDS::DomainId_t
00749 get_domain_id() const
00750 {
00751 return -1;
00752 }
00753
00754 void
00755 match(const RepoId& writer, const RepoId& reader)
00756 {
00757
00758
00759
00760
00761 DDS::DataWriterQos tempDwQos;
00762 DDS::PublisherQos tempPubQos;
00763 DDS::DataReaderQos tempDrQos;
00764 DDS::SubscriberQos tempSubQos;
00765 ContentFilterProperty_t tempCfp;
00766
00767
00768 const DDS::DataWriterQos* dwQos = 0;
00769 const DDS::PublisherQos* pubQos = 0;
00770 DCPS::TransportLocatorSeq* wTls = 0;
00771
00772 const LocalPublicationIter lpi = local_publications_.find(writer);
00773 DiscoveredPublicationIter dpi;
00774 bool writer_local = false, already_matched = false;
00775 if (lpi != local_publications_.end()) {
00776 writer_local = true;
00777 dwQos = &lpi->second.qos_;
00778 pubQos = &lpi->second.publisher_qos_;
00779 wTls = &lpi->second.trans_info_;
00780 already_matched = lpi->second.matched_endpoints_.count(reader);
00781 } else if ((dpi = discovered_publications_.find(writer))
00782 != discovered_publications_.end()) {
00783 wTls = &dpi->second.writer_data_.writerProxy.allLocators;
00784 } else {
00785 return;
00786 }
00787
00788
00789 const DDS::DataReaderQos* drQos = 0;
00790 const DDS::SubscriberQos* subQos = 0;
00791 DCPS::TransportLocatorSeq* rTls = 0;
00792 const ContentFilterProperty_t* cfProp = 0;
00793
00794 const LocalSubscriptionIter lsi = local_subscriptions_.find(reader);
00795 DiscoveredSubscriptionIter dsi;
00796 bool reader_local = false;
00797 if (lsi != local_subscriptions_.end()) {
00798 reader_local = true;
00799 drQos = &lsi->second.qos_;
00800 subQos = &lsi->second.subscriber_qos_;
00801 rTls = &lsi->second.trans_info_;
00802 if (lsi->second.filterProperties.filterExpression[0] != 0) {
00803 tempCfp.filterExpression = lsi->second.filterProperties.filterExpression;
00804 tempCfp.expressionParameters = lsi->second.filterProperties.expressionParameters;
00805 }
00806 cfProp = &tempCfp;
00807 if (!already_matched) {
00808 already_matched = lsi->second.matched_endpoints_.count(writer);
00809 }
00810 } else if ((dsi = discovered_subscriptions_.find(reader))
00811 != discovered_subscriptions_.end()) {
00812 if (!writer_local) {
00813
00814 return;
00815 }
00816 rTls = &dsi->second.reader_data_.readerProxy.allLocators;
00817
00818 populate_transport_locator_sequence(rTls, dsi, reader);
00819
00820 const DDS::SubscriptionBuiltinTopicData& bit =
00821 dsi->second.reader_data_.ddsSubscriptionData;
00822 tempDrQos.durability = bit.durability;
00823 tempDrQos.deadline = bit.deadline;
00824 tempDrQos.latency_budget = bit.latency_budget;
00825 tempDrQos.liveliness = bit.liveliness;
00826 tempDrQos.reliability = bit.reliability;
00827 tempDrQos.destination_order = bit.destination_order;
00828 tempDrQos.history = TheServiceParticipant->initial_HistoryQosPolicy();
00829 tempDrQos.resource_limits =
00830 TheServiceParticipant->initial_ResourceLimitsQosPolicy();
00831 tempDrQos.user_data = bit.user_data;
00832 tempDrQos.ownership = bit.ownership;
00833 tempDrQos.time_based_filter = bit.time_based_filter;
00834 tempDrQos.reader_data_lifecycle =
00835 TheServiceParticipant->initial_ReaderDataLifecycleQosPolicy();
00836 drQos = &tempDrQos;
00837 tempSubQos.presentation = bit.presentation;
00838 tempSubQos.partition = bit.partition;
00839 tempSubQos.group_data = bit.group_data;
00840 tempSubQos.entity_factory =
00841 TheServiceParticipant->initial_EntityFactoryQosPolicy();
00842 subQos = &tempSubQos;
00843 cfProp = &dsi->second.reader_data_.contentFilterProperty;
00844 } else {
00845 return;
00846 }
00847
00848
00849
00850 if (!writer_local) {
00851 const DDS::PublicationBuiltinTopicData& bit =
00852 dpi->second.writer_data_.ddsPublicationData;
00853 tempDwQos.durability = bit.durability;
00854 tempDwQos.durability_service = bit.durability_service;
00855 tempDwQos.deadline = bit.deadline;
00856 tempDwQos.latency_budget = bit.latency_budget;
00857 tempDwQos.liveliness = bit.liveliness;
00858 tempDwQos.reliability = bit.reliability;
00859 tempDwQos.destination_order = bit.destination_order;
00860 tempDwQos.history = TheServiceParticipant->initial_HistoryQosPolicy();
00861 tempDwQos.resource_limits =
00862 TheServiceParticipant->initial_ResourceLimitsQosPolicy();
00863 tempDwQos.transport_priority =
00864 TheServiceParticipant->initial_TransportPriorityQosPolicy();
00865 tempDwQos.lifespan = bit.lifespan;
00866 tempDwQos.user_data = bit.user_data;
00867 tempDwQos.ownership = bit.ownership;
00868 tempDwQos.ownership_strength = bit.ownership_strength;
00869 tempDwQos.writer_data_lifecycle =
00870 TheServiceParticipant->initial_WriterDataLifecycleQosPolicy();
00871 dwQos = &tempDwQos;
00872 tempPubQos.presentation = bit.presentation;
00873 tempPubQos.partition = bit.partition;
00874 tempPubQos.group_data = bit.group_data;
00875 tempPubQos.entity_factory =
00876 TheServiceParticipant->initial_EntityFactoryQosPolicy();
00877 pubQos = &tempPubQos;
00878
00879 populate_transport_locator_sequence(wTls, dpi, writer);
00880 }
00881
00882
00883
00884
00885 ACE_Reverse_Lock<ACE_Thread_Mutex> rev_lock(lock_);
00886
00887
00888
00889
00890
00891 DCPS::DataWriterCallbacks* dwr = 0;
00892 DCPS::DataReaderCallbacks* drr = 0;
00893 if (writer_local) {
00894 dwr = lpi->second.publication_;
00895 }
00896 if (reader_local) {
00897 drr = lsi->second.subscription_;
00898 }
00899
00900 DCPS::IncompatibleQosStatus writerStatus = {0, 0, 0, DDS::QosPolicyCountSeq()};
00901 DCPS::IncompatibleQosStatus readerStatus = {0, 0, 0, DDS::QosPolicyCountSeq()};
00902
00903 if (DCPS::compatibleQOS(&writerStatus, &readerStatus, *wTls, *rTls,
00904 dwQos, drQos, pubQos, subQos)) {
00905 if (!writer_local) {
00906 RepoId writer_participant = writer;
00907 writer_participant.entityId = ENTITYID_PARTICIPANT;
00908 if (defer_writer(writer, writer_participant)) {
00909 return;
00910 }
00911 }
00912 if (!reader_local) {
00913 RepoId reader_participant = reader;
00914 reader_participant.entityId = ENTITYID_PARTICIPANT;
00915 if (defer_reader(reader, reader_participant)) {
00916 return;
00917 }
00918 }
00919
00920 bool call_writer = false, call_reader = false;
00921 if (writer_local) {
00922 call_writer = lpi->second.matched_endpoints_.insert(reader).second;
00923 }
00924 if (reader_local) {
00925 call_reader = lsi->second.matched_endpoints_.insert(writer).second;
00926 }
00927 if (!call_writer && !call_reader) {
00928 return;
00929 }
00930
00931 #if defined(OPENDDS_SECURITY)
00932 if (is_security_enabled()) {
00933 if (call_reader) {
00934 RepoId writer_participant = writer;
00935 writer_participant.entityId = ENTITYID_PARTICIPANT;
00936 DatareaderCryptoHandleMap::const_iterator iter = local_reader_crypto_handles_.find(reader);
00937 if (iter != local_reader_crypto_handles_.end()) {
00938 DDS::Security::DatareaderCryptoHandle drch = iter->second;
00939 remote_writer_crypto_handles_[writer] = generate_remote_matched_writer_crypto_handle(writer_participant, drch);
00940 DatawriterCryptoTokenSeqMap::iterator t_iter = pending_remote_writer_crypto_tokens_.find(writer);
00941 if (t_iter != pending_remote_writer_crypto_tokens_.end()) {
00942 DDS::Security::SecurityException se;
00943 if (get_crypto_key_exchange()->set_remote_datawriter_crypto_tokens(iter->second, remote_writer_crypto_handles_[writer], t_iter->second, se) == false) {
00944 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00945 ACE_TEXT("(%P|%t) ERROR: DiscoveryBase::match() - ")
00946 ACE_TEXT("Unable to set pending remote datawriter crypto tokens with crypto key exchange plugin. Security Exception[%d.%d]: %C\n"),
00947 se.code, se.minor_code, se.message.in()));
00948 }
00949 pending_remote_writer_crypto_tokens_.erase(t_iter);
00950 }
00951 EndpointSecurityAttributesMap::const_iterator s_iter = local_reader_security_attribs_.find(reader);
00952 if (s_iter != local_reader_security_attribs_.end() && s_iter->second.is_submessage_protected) {
00953 create_and_send_datareader_crypto_tokens(drch, reader, remote_writer_crypto_handles_[writer], writer);
00954 }
00955 }
00956 }
00957 if (call_writer) {
00958 RepoId reader_participant = reader;
00959 reader_participant.entityId = ENTITYID_PARTICIPANT;
00960 DatawriterCryptoHandleMap::const_iterator iter = local_writer_crypto_handles_.find(writer);
00961 if (iter != local_writer_crypto_handles_.end()) {
00962 DDS::Security::DatawriterCryptoHandle dwch = iter->second;
00963 remote_reader_crypto_handles_[reader] = generate_remote_matched_reader_crypto_handle(reader_participant, dwch, (relay_only_readers_.count(reader) != 0));
00964 DatareaderCryptoTokenSeqMap::iterator t_iter = pending_remote_reader_crypto_tokens_.find(reader);
00965 if (t_iter != pending_remote_reader_crypto_tokens_.end()) {
00966 DDS::Security::SecurityException se;
00967 if (get_crypto_key_exchange()->set_remote_datareader_crypto_tokens(iter->second, remote_reader_crypto_handles_[reader], t_iter->second, se) == false) {
00968 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00969 ACE_TEXT("(%P|%t) ERROR: DiscoveryBase::match() - ")
00970 ACE_TEXT("Unable to set pending remote datareader crypto tokens with crypto key exchange plugin. Security Exception[%d.%d]: %C\n"),
00971 se.code, se.minor_code, se.message.in()));
00972 }
00973 pending_remote_reader_crypto_tokens_.erase(t_iter);
00974 }
00975 EndpointSecurityAttributesMap::const_iterator s_iter = local_writer_security_attribs_.find(writer);
00976 if (s_iter != local_writer_security_attribs_.end() && (s_iter->second.is_submessage_protected || s_iter->second.is_payload_protected)) {
00977 create_and_send_datawriter_crypto_tokens(dwch, writer, remote_reader_crypto_handles_[reader], reader);
00978 }
00979 }
00980 }
00981 }
00982 #endif
00983
00984
00985 #ifdef __SUNPRO_CC
00986 DCPS::ReaderAssociation ra;
00987 ra.readerTransInfo = *rTls;
00988 ra.readerId = reader;
00989 ra.subQos = *subQos;
00990 ra.readerQos = *drQos;
00991 ra.filterClassName = cfProp->filterClassName;
00992 ra.filterExpression = cfProp->filterExpression;
00993 ra.exprParams = cfProp->expressionParameters;
00994 DCPS::WriterAssociation wa;
00995 wa.writerTransInfo = *wTls;
00996 wa.writerId = writer;
00997 wa.pubQos = *pubQos;
00998 wa.writerQos = *dwQos;
00999 #else
01000 const DCPS::ReaderAssociation ra =
01001 {add_security_info(*rTls, writer, reader), reader, *subQos, *drQos,
01002 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
01003 cfProp->filterClassName, cfProp->filterExpression,
01004 #else
01005 "", "",
01006 #endif
01007 cfProp->expressionParameters};
01008
01009 const DCPS::WriterAssociation wa =
01010 {add_security_info(*wTls, writer, reader), writer, *pubQos, *dwQos};
01011 #endif
01012
01013 ACE_GUARD(ACE_Reverse_Lock<ACE_Thread_Mutex>, rg, rev_lock);
01014 static const bool writer_active = true;
01015
01016 if (call_writer) {
01017 if (DCPS::DCPS_debug_level > 3) {
01018 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) EndpointManager::match - ")
01019 ACE_TEXT("adding writer association\n")));
01020 }
01021 DcpsUpcalls thr(drr, reader, wa, !writer_active, dwr);
01022 if (call_reader) {
01023 thr.activate();
01024 }
01025 dwr->add_association(writer, ra, writer_active);
01026 if (call_reader) {
01027 thr.writer_done();
01028 }
01029
01030 } else if (call_reader) {
01031 if (DCPS::DCPS_debug_level > 3) {
01032 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) EndpointManager::match - ")
01033 ACE_TEXT("adding reader association\n")));
01034 }
01035 drr->add_association(reader, wa, !writer_active);
01036 }
01037
01038
01039 if (call_writer && !call_reader && !is_opendds(reader)) {
01040 if (DCPS::DCPS_debug_level > 3) {
01041 ACE_DEBUG((LM_DEBUG,
01042 ACE_TEXT("(%P|%t) EndpointManager::match - ")
01043 ACE_TEXT("calling writer association_complete\n")));
01044 }
01045 dwr->association_complete(reader);
01046 }
01047
01048 } else if (already_matched) {
01049 if (writer_local) {
01050 lpi->second.matched_endpoints_.erase(reader);
01051 lpi->second.remote_opendds_associations_.erase(reader);
01052 }
01053 if (reader_local) {
01054 lsi->second.matched_endpoints_.erase(writer);
01055 lsi->second.remote_opendds_associations_.erase(writer);
01056 }
01057 ACE_GUARD(ACE_Reverse_Lock<ACE_Thread_Mutex>, rg, rev_lock);
01058 if (writer_local) {
01059 DCPS::ReaderIdSeq reader_seq(1);
01060 reader_seq.length(1);
01061 reader_seq[0] = reader;
01062 dwr->remove_associations(reader_seq, false );
01063 }
01064 if (reader_local) {
01065 DCPS::WriterIdSeq writer_seq(1);
01066 writer_seq.length(1);
01067 writer_seq[0] = writer;
01068 drr->remove_associations(writer_seq, false );
01069 }
01070
01071 } else {
01072 ACE_GUARD(ACE_Reverse_Lock< ACE_Thread_Mutex>, rg, rev_lock);
01073 if (writer_local && writerStatus.count_since_last_send) {
01074 if (DCPS::DCPS_debug_level > 3) {
01075 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) EndpointManager::match - ")
01076 ACE_TEXT("writer incompatible\n")));
01077 }
01078 dwr->update_incompatible_qos(writerStatus);
01079 }
01080 if (reader_local && readerStatus.count_since_last_send) {
01081 if (DCPS::DCPS_debug_level > 3) {
01082 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) EndpointManager::match - ")
01083 ACE_TEXT("reader incompatible\n")));
01084 }
01085 drr->update_incompatible_qos(readerStatus);
01086 }
01087 }
01088 }
01089
01090 virtual bool is_opendds(const GUID_t& endpoint) const
01091 {
01092 return !std::memcmp(endpoint.guidPrefix, DCPS::VENDORID_OCI,
01093 sizeof(DCPS::VENDORID_OCI));
01094 }
01095
01096 virtual bool shutting_down() const = 0;
01097
01098 virtual void populate_transport_locator_sequence(DCPS::TransportLocatorSeq*& tls,
01099 DiscoveredSubscriptionIter& iter,
01100 const RepoId& reader) = 0;
01101
01102 virtual void populate_transport_locator_sequence(DCPS::TransportLocatorSeq*& tls,
01103 DiscoveredPublicationIter& iter,
01104 const RepoId& reader) = 0;
01105
01106 virtual DCPS::TransportLocatorSeq
01107 add_security_info(const DCPS::TransportLocatorSeq& locators,
01108 const RepoId& , const RepoId& )
01109 { return locators; }
01110
01111 virtual bool defer_writer(const RepoId& writer,
01112 const RepoId& writer_participant) = 0;
01113
01114 virtual bool defer_reader(const RepoId& writer,
01115 const RepoId& writer_participant) = 0;
01116
01117 void remove_from_bit(const DiscoveredPublication& pub)
01118 {
01119 pub_key_to_id_.erase(get_key(pub));
01120 remove_from_bit_i(pub);
01121 }
01122
01123 void remove_from_bit(const DiscoveredSubscription& sub)
01124 {
01125 sub_key_to_id_.erase(get_key(sub));
01126 remove_from_bit_i(sub);
01127 }
01128
01129 RepoId make_topic_guid()
01130 {
01131 RepoId guid;
01132 guid = participant_id_;
01133 guid.entityId.entityKind = DCPS::ENTITYKIND_OPENDDS_TOPIC;
01134 assign_topic_key(guid);
01135 return guid;
01136 }
01137
01138 bool has_dcps_key(const DCPS::RepoId& topicId) const
01139 {
01140 typedef OPENDDS_MAP_CMP(RepoId, OPENDDS_STRING, DCPS::GUID_tKeyLessThan) TNMap;
01141 TNMap::const_iterator tn = topic_names_.find(topicId);
01142 if (tn == topic_names_.end()) return false;
01143
01144 typedef OPENDDS_MAP(OPENDDS_STRING, TopicDetails) TDMap;
01145 typename TDMap::const_iterator td = topics_.find(tn->second);
01146 if (td == topics_.end()) return false;
01147
01148 return td->second.has_dcps_key_;
01149 }
01150
01151 void
01152 increment_key(DDS::BuiltinTopicKey_t& key)
01153 {
01154 for (int idx = 0; idx < 3; ++idx) {
01155 CORBA::ULong ukey = static_cast<CORBA::ULong>(key.value[idx]);
01156 if (ukey == 0xFFFFFFFF) {
01157 key.value[idx] = 0;
01158 } else {
01159 ++ukey;
01160 key.value[idx] = ukey;
01161 return;
01162 }
01163 }
01164 ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) EndpointManager::increment_key - ")
01165 ACE_TEXT("ran out of builtin topic keys\n")));
01166 }
01167
01168 #if defined(OPENDDS_SECURITY)
01169 inline bool is_security_enabled()
01170 {
01171 return (permissions_handle_ != DDS::HANDLE_NIL) && (access_control_ != 0);
01172 }
01173
01174 inline void set_permissions_handle(DDS::Security::PermissionsHandle h)
01175 {
01176 permissions_handle_ = h;
01177 }
01178
01179 inline DDS::Security::PermissionsHandle get_permissions_handle() const
01180 {
01181 return permissions_handle_;
01182 }
01183
01184 inline void set_access_control(DDS::Security::AccessControl_var acl)
01185 {
01186 access_control_ = acl;
01187 }
01188
01189 inline DDS::Security::AccessControl_var get_access_control() const
01190 {
01191 return access_control_;
01192 }
01193
01194 inline void set_crypto_key_factory(DDS::Security::CryptoKeyFactory_var ckf)
01195 {
01196 crypto_key_factory_ = ckf;
01197 }
01198
01199 inline DDS::Security::CryptoKeyFactory_var get_crypto_key_factory() const
01200 {
01201 return crypto_key_factory_;
01202 }
01203
01204 inline void set_crypto_key_exchange(DDS::Security::CryptoKeyExchange_var ckf)
01205 {
01206 crypto_key_exchange_ = ckf;
01207 }
01208
01209 inline DDS::Security::CryptoKeyExchange_var get_crypto_key_exchange() const
01210 {
01211 return crypto_key_exchange_;
01212 }
01213 #endif
01214
01215 ACE_Thread_Mutex& lock_;
01216 DCPS::RepoId participant_id_;
01217 BitKeyMap pub_key_to_id_, sub_key_to_id_;
01218 RepoIdSet ignored_guids_;
01219 unsigned int publication_counter_, subscription_counter_, topic_counter_;
01220 LocalPublicationMap local_publications_;
01221 LocalSubscriptionMap local_subscriptions_;
01222 DiscoveredPublicationMap discovered_publications_;
01223 DiscoveredSubscriptionMap discovered_subscriptions_;
01224 OPENDDS_MAP(OPENDDS_STRING, TopicDetails) topics_;
01225 TopicNameMap topic_names_;
01226 OPENDDS_SET(OPENDDS_STRING) ignored_topics_;
01227 OPENDDS_SET_CMP(DCPS::RepoId, DCPS::GUID_tKeyLessThan) relay_only_readers_;
01228 DDS::BuiltinTopicKey_t pub_bit_key_, sub_bit_key_;
01229
01230 #if defined(OPENDDS_SECURITY)
01231 DDS::Security::AccessControl_var access_control_;
01232 DDS::Security::CryptoKeyFactory_var crypto_key_factory_;
01233 DDS::Security::CryptoKeyExchange_var crypto_key_exchange_;
01234
01235 DDS::Security::PermissionsHandle permissions_handle_;
01236 DDS::Security::ParticipantCryptoHandle crypto_handle_;
01237
01238 DatareaderCryptoHandleMap local_reader_crypto_handles_;
01239 DatawriterCryptoHandleMap local_writer_crypto_handles_;
01240
01241 EndpointSecurityAttributesMap local_reader_security_attribs_;
01242 EndpointSecurityAttributesMap local_writer_security_attribs_;
01243
01244 DatareaderCryptoHandleMap remote_reader_crypto_handles_;
01245 DatawriterCryptoHandleMap remote_writer_crypto_handles_;
01246
01247 DatareaderCryptoTokenSeqMap pending_remote_reader_crypto_tokens_;
01248 DatawriterCryptoTokenSeqMap pending_remote_writer_crypto_tokens_;
01249 #endif
01250
01251 };
01252
01253 template <typename EndpointManagerType>
01254 class LocalParticipant : public DCPS::RcObject {
01255 public:
01256 typedef typename EndpointManagerType::DiscoveredParticipantData DiscoveredParticipantData;
01257 typedef typename EndpointManagerType::TopicDetails TopicDetails;
01258
01259 LocalParticipant (const DDS::DomainParticipantQos& qos)
01260 : qos_(qos)
01261 { }
01262
01263 virtual ~LocalParticipant() { }
01264
01265 DCPS::RepoId bit_key_to_repo_id(const char* bit_topic_name,
01266 const DDS::BuiltinTopicKey_t& key)
01267 {
01268 if (0 == std::strcmp(bit_topic_name, DCPS::BUILT_IN_PARTICIPANT_TOPIC)) {
01269 RepoId guid;
01270 std::memcpy(guid.guidPrefix, key.value, sizeof(DDS::BuiltinTopicKeyValue));
01271 guid.entityId = ENTITYID_PARTICIPANT;
01272 return guid;
01273
01274 } else {
01275 return endpoint_manager().bit_key_to_repo_id(bit_topic_name, key);
01276 }
01277 }
01278
01279 void ignore_domain_participant(const RepoId& ignoreId)
01280 {
01281 ACE_GUARD(ACE_Thread_Mutex, g, lock_);
01282 endpoint_manager().ignore(ignoreId);
01283
01284 const DiscoveredParticipantIter iter = participants_.find(ignoreId);
01285 if (iter != participants_.end()) {
01286 remove_discovered_participant(iter);
01287 }
01288 }
01289
01290 virtual bool
01291 announce_domain_participant_qos()
01292 {
01293 return true;
01294 }
01295
01296 bool
01297 update_domain_participant_qos(const DDS::DomainParticipantQos& qos)
01298 {
01299 ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, false);
01300 qos_ = qos;
01301 return announce_domain_participant_qos();
01302 }
01303
01304 DCPS::TopicStatus
01305 assert_topic(DCPS::RepoId_out topicId, const char* topicName,
01306 const char* dataTypeName, const DDS::TopicQos& qos,
01307 bool hasDcpsKey)
01308 {
01309 if (std::strlen(topicName) > 256 || std::strlen(dataTypeName) > 256) {
01310 if (DCPS::DCPS_debug_level) {
01311 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR LocalParticipant::assert_topic() - ")
01312 ACE_TEXT("topic or type name length limit (256) exceeded\n")));
01313 }
01314 return DCPS::PRECONDITION_NOT_MET;
01315 }
01316
01317 return endpoint_manager().assert_topic(topicId, topicName, dataTypeName, qos, hasDcpsKey);
01318 }
01319
01320 DCPS::TopicStatus
01321 remove_topic(const RepoId& topicId, OPENDDS_STRING& name)
01322 {
01323 return endpoint_manager().remove_topic(topicId, name);
01324 }
01325
01326 void
01327 ignore_topic(const RepoId& ignoreId)
01328 {
01329 ACE_GUARD(ACE_Thread_Mutex, g, lock_);
01330 endpoint_manager().ignore(ignoreId);
01331 }
01332
01333 bool
01334 update_topic_qos(const RepoId& topicId, const DDS::TopicQos& qos,
01335 OPENDDS_STRING& name)
01336 {
01337 return endpoint_manager().update_topic_qos(topicId, qos, name);
01338 }
01339
01340 RepoId
01341 add_publication(const RepoId& topicId,
01342 DCPS::DataWriterCallbacks* publication,
01343 const DDS::DataWriterQos& qos,
01344 const DCPS::TransportLocatorSeq& transInfo,
01345 const DDS::PublisherQos& publisherQos)
01346 {
01347 return endpoint_manager().add_publication(topicId, publication, qos,
01348 transInfo, publisherQos);
01349 }
01350
01351 void
01352 remove_publication(const RepoId& publicationId)
01353 {
01354 endpoint_manager().remove_publication(publicationId);
01355 }
01356
01357 void
01358 ignore_publication(const RepoId& ignoreId)
01359 {
01360 ACE_GUARD(ACE_Thread_Mutex, g, lock_);
01361 return endpoint_manager().ignore(ignoreId);
01362 }
01363
01364 bool
01365 update_publication_qos(const RepoId& publicationId,
01366 const DDS::DataWriterQos& qos,
01367 const DDS::PublisherQos& publisherQos)
01368 {
01369 return endpoint_manager().update_publication_qos(publicationId, qos, publisherQos);
01370 }
01371
01372 RepoId
01373 add_subscription(const RepoId& topicId,
01374 DCPS::DataReaderCallbacks* subscription,
01375 const DDS::DataReaderQos& qos,
01376 const DCPS::TransportLocatorSeq& transInfo,
01377 const DDS::SubscriberQos& subscriberQos,
01378 const char* filterClassName,
01379 const char* filterExpr,
01380 const DDS::StringSeq& params)
01381 {
01382 return endpoint_manager().add_subscription(topicId, subscription, qos, transInfo,
01383 subscriberQos, filterClassName, filterExpr, params);
01384 }
01385
01386 void
01387 remove_subscription(const RepoId& subscriptionId)
01388 {
01389 endpoint_manager().remove_subscription(subscriptionId);
01390 }
01391
01392 void
01393 ignore_subscription(const RepoId& ignoreId)
01394 {
01395 ACE_GUARD(ACE_Thread_Mutex, g, lock_);
01396 return endpoint_manager().ignore(ignoreId);
01397 }
01398
01399 bool
01400 update_subscription_qos(const RepoId& subscriptionId,
01401 const DDS::DataReaderQos& qos,
01402 const DDS::SubscriberQos& subscriberQos)
01403 {
01404 return endpoint_manager().update_subscription_qos(subscriptionId, qos, subscriberQos);
01405 }
01406
01407 bool
01408 update_subscription_params(const RepoId& subId,
01409 const DDS::StringSeq& params)
01410 {
01411 return endpoint_manager().update_subscription_params(subId, params);
01412 }
01413
01414 void
01415 association_complete(const RepoId& localId, const RepoId& remoteId)
01416 {
01417 endpoint_manager().association_complete(localId, remoteId);
01418 }
01419
01420 DDS::Subscriber_var bit_subscriber() const { return bit_subscriber_; }
01421
01422 protected:
01423
01424 struct DiscoveredParticipant {
01425
01426 DiscoveredParticipant() :
01427 #if defined(OPENDDS_SECURITY)
01428 bit_ih_(0),
01429 has_last_stateless_msg_(false),
01430 last_stateless_msg_time_(0, 0),
01431 auth_started_time_(0, 0),
01432 auth_state_(AS_UNKNOWN)
01433 #else
01434 bit_ih_(0)
01435 #endif
01436 {
01437
01438 }
01439
01440 DiscoveredParticipant(const DiscoveredParticipantData& p, const ACE_Time_Value& t) :
01441 #if defined(OPENDDS_SECURITY)
01442 pdata_(p),
01443 last_seen_(t),
01444 bit_ih_(DDS::HANDLE_NIL),
01445 has_last_stateless_msg_(false),
01446 last_stateless_msg_time_(0, 0),
01447 auth_started_time_(0, 0),
01448 auth_state_(AS_UNKNOWN)
01449 #else
01450 pdata_(p),
01451 last_seen_(t),
01452 bit_ih_(DDS::HANDLE_NIL)
01453 #endif
01454 {
01455
01456 }
01457
01458 DiscoveredParticipantData pdata_;
01459 ACE_Time_Value last_seen_;
01460 DDS::InstanceHandle_t bit_ih_;
01461
01462 #if defined(OPENDDS_SECURITY)
01463 bool has_last_stateless_msg_;
01464 ACE_Time_Value last_stateless_msg_time_;
01465 DDS::Security::ParticipantStatelessMessage last_stateless_msg_;
01466
01467 ACE_Time_Value auth_started_time_;
01468 AuthState auth_state_;
01469
01470 DDS::Security::IdentityToken identity_token_;
01471 DDS::Security::PermissionsToken permissions_token_;
01472 DDS::Security::PropertyQosPolicy property_qos_;
01473 DDS::Security::ParticipantSecurityInfo security_info_;
01474 DDS::Security::IdentityStatusToken identity_status_token_;
01475 DDS::Security::IdentityHandle identity_handle_;
01476 DDS::Security::HandshakeHandle handshake_handle_;
01477 DDS::Security::AuthRequestMessageToken local_auth_request_token_;
01478 DDS::Security::AuthRequestMessageToken remote_auth_request_token_;
01479 DDS::Security::AuthenticatedPeerCredentialToken authenticated_peer_credential_token_;
01480 DDS::Security::SharedSecretHandle_var shared_secret_handle_;
01481 DDS::Security::PermissionsHandle permissions_handle_;
01482 DDS::Security::ParticipantCryptoHandle crypto_handle_;
01483 DDS::Security::ParticipantCryptoTokenSeq crypto_tokens_;
01484 #endif
01485
01486 };
01487
01488 typedef OPENDDS_MAP_CMP(DCPS::RepoId, DiscoveredParticipant,
01489 DCPS::GUID_tKeyLessThan) DiscoveredParticipantMap;
01490 typedef typename DiscoveredParticipantMap::iterator DiscoveredParticipantIter;
01491 typedef typename DiscoveredParticipantMap::const_iterator
01492 DiscoveredParticipantConstIter;
01493
01494 #if defined(OPENDDS_SECURITY)
01495 typedef OPENDDS_MAP_CMP(DCPS::RepoId, DDS::Security::AuthRequestMessageToken, DCPS::GUID_tKeyLessThan) PendingRemoteAuthTokenMap;
01496 #endif
01497
01498 virtual EndpointManagerType& endpoint_manager() = 0;
01499
01500 void remove_discovered_participant(DiscoveredParticipantIter iter)
01501 {
01502 bool removed = endpoint_manager().disassociate(iter->second.pdata_);
01503 if (removed) {
01504 #ifndef DDS_HAS_MINIMUM_BIT
01505 ParticipantBuiltinTopicDataDataReaderImpl* bit = part_bit();
01506
01507 if (bit && iter->second.bit_ih_ != DDS::HANDLE_NIL) {
01508 bit->set_instance_state(iter->second.bit_ih_,
01509 DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE);
01510 }
01511 #endif
01512 if (DCPS::DCPS_debug_level > 3) {
01513 DCPS::GuidConverter conv(iter->first);
01514 ACE_DEBUG((LM_INFO, ACE_TEXT("(%P|%t) LocalParticipant::remove_discovered_participant")
01515 ACE_TEXT(" - erasing %C\n"), OPENDDS_STRING(conv).c_str()));
01516 }
01517 participants_.erase(iter);
01518 }
01519 }
01520
01521 #ifndef DDS_HAS_MINIMUM_BIT
01522 ParticipantBuiltinTopicDataDataReaderImpl* part_bit()
01523 {
01524 if (!bit_subscriber_.in())
01525 return 0;
01526
01527 DDS::DataReader_var d =
01528 bit_subscriber_->lookup_datareader(DCPS::BUILT_IN_PARTICIPANT_TOPIC);
01529 return dynamic_cast<ParticipantBuiltinTopicDataDataReaderImpl*>(d.in());
01530 }
01531 #endif
01532
01533 mutable ACE_Thread_Mutex lock_;
01534 DDS::Subscriber_var bit_subscriber_;
01535 DDS::DomainParticipantQos qos_;
01536 DiscoveredParticipantMap participants_;
01537
01538 #if defined(OPENDDS_SECURITY)
01539 PendingRemoteAuthTokenMap pending_remote_auth_tokens_;
01540 #endif
01541
01542 };
01543
01544 template<typename Participant>
01545 class PeerDiscovery : public Discovery {
01546 public:
01547 typedef typename Participant::TopicDetails TopicDetails;
01548
01549 explicit PeerDiscovery(const RepoKey& key) : Discovery(key) { }
01550
01551 ~PeerDiscovery() {
01552 reactor_runner_.end();
01553 }
01554
01555 virtual DDS::Subscriber_ptr init_bit(DomainParticipantImpl* participant) {
01556 using namespace DCPS;
01557 if (create_bit_topics(participant) != DDS::RETCODE_OK) {
01558 return 0;
01559 }
01560
01561 DDS::Subscriber_var bit_subscriber =
01562 participant->create_subscriber(SUBSCRIBER_QOS_DEFAULT,
01563 DDS::SubscriberListener::_nil(),
01564 DEFAULT_STATUS_MASK);
01565 SubscriberImpl* sub = dynamic_cast<SubscriberImpl*>(bit_subscriber.in());
01566
01567 DDS::DataReaderQos dr_qos;
01568 sub->get_default_datareader_qos(dr_qos);
01569 dr_qos.durability.kind = DDS::TRANSIENT_LOCAL_DURABILITY_QOS;
01570
01571 #ifndef DDS_HAS_MINIMUM_BIT
01572 DDS::TopicDescription_var bit_part_topic =
01573 participant->lookup_topicdescription(BUILT_IN_PARTICIPANT_TOPIC);
01574 create_bit_dr(bit_part_topic, BUILT_IN_PARTICIPANT_TOPIC_TYPE,
01575 sub, dr_qos);
01576
01577 DDS::TopicDescription_var bit_topic_topic =
01578 participant->lookup_topicdescription(BUILT_IN_TOPIC_TOPIC);
01579 create_bit_dr(bit_topic_topic, BUILT_IN_TOPIC_TOPIC_TYPE,
01580 sub, dr_qos);
01581
01582 DDS::TopicDescription_var bit_pub_topic =
01583 participant->lookup_topicdescription(BUILT_IN_PUBLICATION_TOPIC);
01584 create_bit_dr(bit_pub_topic, BUILT_IN_PUBLICATION_TOPIC_TYPE,
01585 sub, dr_qos);
01586
01587 DDS::TopicDescription_var bit_sub_topic =
01588 participant->lookup_topicdescription(BUILT_IN_SUBSCRIPTION_TOPIC);
01589 create_bit_dr(bit_sub_topic, BUILT_IN_SUBSCRIPTION_TOPIC_TYPE,
01590 sub, dr_qos);
01591
01592 const DDS::ReturnCode_t ret = bit_subscriber->enable();
01593 if (ret != DDS::RETCODE_OK) {
01594 if (DCPS_debug_level) {
01595 ACE_DEBUG((LM_INFO, ACE_TEXT("(%P|%t) PeerDiscovery::init_bit")
01596 ACE_TEXT(" - Error %d enabling subscriber\n"), ret));
01597 }
01598 return 0;
01599 }
01600 #endif
01601
01602 get_part(participant->get_domain_id(), participant->get_id())->init_bit(bit_subscriber);
01603
01604 return bit_subscriber._retn();
01605 }
01606
01607 virtual void fini_bit(DCPS::DomainParticipantImpl* participant)
01608 {
01609 get_part(participant->get_domain_id(), participant->get_id())->fini_bit();
01610 }
01611
01612 virtual OpenDDS::DCPS::RepoId bit_key_to_repo_id(DCPS::DomainParticipantImpl* participant,
01613 const char* bit_topic_name,
01614 const DDS::BuiltinTopicKey_t& key) const
01615 {
01616 return get_part(participant->get_domain_id(), participant->get_id())
01617 ->bit_key_to_repo_id(bit_topic_name, key);
01618 }
01619
01620 virtual bool attach_participant(DDS::DomainId_t ,
01621 const OpenDDS::DCPS::RepoId& )
01622 {
01623 return false;
01624 }
01625
01626 virtual bool remove_domain_participant(DDS::DomainId_t domain_id,
01627 const OpenDDS::DCPS::RepoId& participantId)
01628 {
01629
01630
01631 ParticipantHandle participant;
01632 ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, false);
01633 typename DomainParticipantMap::iterator domain = participants_.find(domain_id);
01634 if (domain == participants_.end()) {
01635 return false;
01636 }
01637 typename ParticipantMap::iterator part = domain->second.find(participantId);
01638 if (part == domain->second.end()) {
01639 return false;
01640 }
01641 participant = part->second;
01642 domain->second.erase(part);
01643 if (domain->second.empty()) {
01644 participants_.erase(domain);
01645 }
01646
01647 return true;
01648 }
01649
01650 virtual bool ignore_domain_participant(DDS::DomainId_t domain,
01651 const OpenDDS::DCPS::RepoId& myParticipantId,
01652 const OpenDDS::DCPS::RepoId& ignoreId)
01653 {
01654 get_part(domain, myParticipantId)->ignore_domain_participant(ignoreId);
01655 return true;
01656 }
01657
01658 virtual bool update_domain_participant_qos(DDS::DomainId_t domain,
01659 const OpenDDS::DCPS::RepoId& participant,
01660 const DDS::DomainParticipantQos& qos)
01661 {
01662 return get_part(domain, participant)->update_domain_participant_qos(qos);
01663 }
01664
01665 virtual DCPS::TopicStatus assert_topic(OpenDDS::DCPS::RepoId_out topicId,
01666 DDS::DomainId_t domainId,
01667 const OpenDDS::DCPS::RepoId& participantId,
01668 const char* topicName,
01669 const char* dataTypeName,
01670 const DDS::TopicQos& qos,
01671 bool hasDcpsKey)
01672 {
01673 ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, DCPS::INTERNAL_ERROR);
01674 typename OPENDDS_MAP(DDS::DomainId_t,
01675 OPENDDS_MAP(OPENDDS_STRING, TopicDetails) )::iterator topic_it =
01676 topics_.find(domainId);
01677 if (topic_it != topics_.end()) {
01678 const typename OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator it =
01679 topic_it->second.find(topicName);
01680 if (it != topic_it->second.end()
01681 && it->second.data_type_ != dataTypeName) {
01682 topicId = GUID_UNKNOWN;
01683 return DCPS::CONFLICTING_TYPENAME;
01684 }
01685 }
01686
01687
01688 const DCPS::TopicStatus stat =
01689 participants_[domainId][participantId]->assert_topic(topicId, topicName,
01690 dataTypeName, qos,
01691 hasDcpsKey);
01692 if (stat == DCPS::CREATED || stat == DCPS::FOUND) {
01693 TopicDetails& td = topics_[domainId][topicName];
01694 td.data_type_ = dataTypeName;
01695 td.qos_ = qos;
01696 td.repo_id_ = topicId;
01697 ++topic_use_[domainId][topicName];
01698 }
01699 return stat;
01700 }
01701
01702 virtual DCPS::TopicStatus find_topic(DDS::DomainId_t domainId, const char* topicName,
01703 CORBA::String_out dataTypeName, DDS::TopicQos_out qos,
01704 OpenDDS::DCPS::RepoId_out topicId)
01705 {
01706 ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, DCPS::INTERNAL_ERROR);
01707 typename OPENDDS_MAP(DDS::DomainId_t,
01708 OPENDDS_MAP(OPENDDS_STRING, TopicDetails) )::iterator topic_it =
01709 topics_.find(domainId);
01710 if (topic_it == topics_.end()) {
01711 return DCPS::NOT_FOUND;
01712 }
01713 typename OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator iter =
01714 topic_it->second.find(topicName);
01715 if (iter == topic_it->second.end()) {
01716 return DCPS::NOT_FOUND;
01717 }
01718 TopicDetails& td = iter->second;
01719 dataTypeName = td.data_type_.c_str();
01720 qos = new DDS::TopicQos(td.qos_);
01721 topicId = td.repo_id_;
01722 ++topic_use_[domainId][topicName];
01723 return DCPS::FOUND;
01724 }
01725
01726 virtual DCPS::TopicStatus remove_topic(DDS::DomainId_t domainId,
01727 const OpenDDS::DCPS::RepoId& participantId,
01728 const OpenDDS::DCPS::RepoId& topicId)
01729 {
01730 ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, DCPS::INTERNAL_ERROR);
01731 typename OPENDDS_MAP(DDS::DomainId_t,
01732 OPENDDS_MAP(OPENDDS_STRING, TopicDetails) )::iterator topic_it =
01733 topics_.find(domainId);
01734 if (topic_it == topics_.end()) {
01735 return DCPS::NOT_FOUND;
01736 }
01737
01738 OPENDDS_STRING name;
01739
01740 const DCPS::TopicStatus stat =
01741 participants_[domainId][participantId]->remove_topic(topicId, name);
01742
01743 if (stat == DCPS::REMOVED) {
01744 if (0 == --topic_use_[domainId][name]) {
01745 topic_use_[domainId].erase(name);
01746 if (topic_it->second.empty()) {
01747 topic_use_.erase(domainId);
01748 }
01749 topic_it->second.erase(name);
01750 if (topic_it->second.empty()) {
01751 topics_.erase(topic_it);
01752 }
01753 }
01754 }
01755 return stat;
01756 }
01757
01758 virtual bool ignore_topic(DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId& myParticipantId,
01759 const OpenDDS::DCPS::RepoId& ignoreId)
01760 {
01761 get_part(domainId, myParticipantId)->ignore_topic(ignoreId);
01762 return true;
01763 }
01764
01765 virtual bool update_topic_qos(const OpenDDS::DCPS::RepoId& topicId, DDS::DomainId_t domainId,
01766 const OpenDDS::DCPS::RepoId& participantId, const DDS::TopicQos& qos)
01767 {
01768 ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, false);
01769 OPENDDS_STRING name;
01770
01771 if (participants_[domainId][participantId]->update_topic_qos(topicId,
01772 qos, name)) {
01773 topics_[domainId][name].qos_ = qos;
01774 return true;
01775 }
01776 return false;
01777 }
01778
01779 virtual OpenDDS::DCPS::RepoId add_publication(DDS::DomainId_t domainId,
01780 const OpenDDS::DCPS::RepoId& participantId,
01781 const OpenDDS::DCPS::RepoId& topicId,
01782 DCPS::DataWriterCallbacks* publication,
01783 const DDS::DataWriterQos& qos,
01784 const DCPS::TransportLocatorSeq& transInfo,
01785 const DDS::PublisherQos& publisherQos)
01786 {
01787 return get_part(domainId, participantId)->add_publication(topicId, publication, qos, transInfo, publisherQos);
01788 }
01789
01790 virtual bool remove_publication(DDS::DomainId_t domainId,
01791 const OpenDDS::DCPS::RepoId& participantId,
01792 const OpenDDS::DCPS::RepoId& publicationId)
01793 {
01794 get_part(domainId, participantId)->remove_publication(publicationId);
01795 return true;
01796 }
01797
01798 virtual bool ignore_publication(DDS::DomainId_t domainId,
01799 const OpenDDS::DCPS::RepoId& participantId,
01800 const OpenDDS::DCPS::RepoId& ignoreId)
01801 {
01802 get_part(domainId, participantId)->ignore_publication(ignoreId);
01803 return true;
01804 }
01805
01806 virtual bool update_publication_qos(DDS::DomainId_t domainId,
01807 const OpenDDS::DCPS::RepoId& partId,
01808 const OpenDDS::DCPS::RepoId& dwId,
01809 const DDS::DataWriterQos& qos,
01810 const DDS::PublisherQos& publisherQos)
01811 {
01812 return get_part(domainId, partId)->update_publication_qos(dwId, qos,
01813 publisherQos);
01814 }
01815
01816 virtual OpenDDS::DCPS::RepoId add_subscription(DDS::DomainId_t domainId,
01817 const OpenDDS::DCPS::RepoId& participantId,
01818 const OpenDDS::DCPS::RepoId& topicId,
01819 DCPS::DataReaderCallbacks* subscription,
01820 const DDS::DataReaderQos& qos,
01821 const DCPS::TransportLocatorSeq& transInfo,
01822 const DDS::SubscriberQos& subscriberQos,
01823 const char* filterClassName,
01824 const char* filterExpr,
01825 const DDS::StringSeq& params)
01826 {
01827 return get_part(domainId, participantId)->add_subscription(topicId, subscription, qos, transInfo, subscriberQos, filterClassName, filterExpr, params);
01828 }
01829
01830 virtual bool remove_subscription(DDS::DomainId_t domainId,
01831 const OpenDDS::DCPS::RepoId& participantId,
01832 const OpenDDS::DCPS::RepoId& subscriptionId)
01833 {
01834 get_part(domainId, participantId)->remove_subscription(subscriptionId);
01835 return true;
01836 }
01837
01838 virtual bool ignore_subscription(DDS::DomainId_t domainId,
01839 const OpenDDS::DCPS::RepoId& participantId,
01840 const OpenDDS::DCPS::RepoId& ignoreId)
01841 {
01842 get_part(domainId, participantId)->ignore_subscription(ignoreId);
01843 return true;
01844 }
01845
01846 virtual bool update_subscription_qos(DDS::DomainId_t domainId,
01847 const OpenDDS::DCPS::RepoId& partId,
01848 const OpenDDS::DCPS::RepoId& drId,
01849 const DDS::DataReaderQos& qos,
01850 const DDS::SubscriberQos& subQos)
01851 {
01852 return get_part(domainId, partId)->update_subscription_qos(drId, qos, subQos);
01853 }
01854
01855 virtual bool update_subscription_params(DDS::DomainId_t domainId,
01856 const OpenDDS::DCPS::RepoId& partId,
01857 const OpenDDS::DCPS::RepoId& subId,
01858 const DDS::StringSeq& params)
01859 {
01860 return get_part(domainId, partId)->update_subscription_params(subId, params);
01861 }
01862
01863 virtual void association_complete(DDS::DomainId_t domainId,
01864 const OpenDDS::DCPS::RepoId& participantId,
01865 const OpenDDS::DCPS::RepoId& localId,
01866 const OpenDDS::DCPS::RepoId& remoteId)
01867 {
01868 get_part(domainId, participantId)->association_complete(localId, remoteId);
01869 }
01870
01871 ACE_Reactor*
01872 reactor()
01873 {
01874 ACE_GUARD_RETURN(ACE_Thread_Mutex, g, reactor_runner_.mtx_, 0);
01875 if (!reactor_runner_.reactor_) {
01876 reactor_runner_.reactor_.reset(new ACE_Reactor(new ACE_Select_Reactor, true));
01877 reactor_runner_.activate();
01878 }
01879 return reactor_runner_.reactor_.get();
01880 }
01881
01882 protected:
01883
01884 typedef DCPS::RcHandle<Participant> ParticipantHandle;
01885 typedef OPENDDS_MAP_CMP(DCPS::RepoId, ParticipantHandle, DCPS::GUID_tKeyLessThan) ParticipantMap;
01886 typedef OPENDDS_MAP(DDS::DomainId_t, ParticipantMap) DomainParticipantMap;
01887
01888 ParticipantHandle
01889 get_part(const DDS::DomainId_t domain_id,
01890 const OpenDDS::DCPS::RepoId& part_id) const
01891 {
01892 ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, ParticipantHandle());
01893 typename DomainParticipantMap::const_iterator domain = participants_.find(domain_id);
01894 if (domain == participants_.end()) {
01895 return ParticipantHandle();
01896 }
01897 typename ParticipantMap::const_iterator part = domain->second.find(part_id);
01898 if (part == domain->second.end()) {
01899 return ParticipantHandle();
01900 }
01901 return part->second;
01902 }
01903
01904 void create_bit_dr(DDS::TopicDescription_ptr topic, const char* type,
01905 DCPS::SubscriberImpl* sub,
01906 const DDS::DataReaderQos& qos)
01907 {
01908 using namespace DCPS;
01909 TopicDescriptionImpl* bit_topic_i =
01910 dynamic_cast<TopicDescriptionImpl*>(topic);
01911
01912 DDS::DomainParticipant_var participant = sub->get_participant();
01913 DomainParticipantImpl* participant_i =
01914 dynamic_cast<DomainParticipantImpl*>(participant.in());
01915
01916 TypeSupport_var type_support =
01917 Registered_Data_Types->lookup(participant, type);
01918
01919 DDS::DataReader_var dr = type_support->create_datareader();
01920 OpenDDS::DCPS::DataReaderImpl* dri = dynamic_cast<OpenDDS::DCPS::DataReaderImpl*>(dr.in());
01921
01922 dri->init(bit_topic_i, qos, 0 , 0 , participant_i, sub);
01923 dri->disable_transport();
01924 dri->enable();
01925 }
01926
01927 mutable ACE_Thread_Mutex lock_;
01928
01929
01930 struct ReactorRunner : ACE_Task_Base {
01931 ReactorRunner() {}
01932
01933 int svc()
01934 {
01935 reactor_->owner(ACE_Thread_Manager::instance()->thr_self());
01936 reactor_->run_reactor_event_loop();
01937 return 0;
01938 }
01939
01940 void end()
01941 {
01942 ACE_GUARD(ACE_Thread_Mutex, g, mtx_);
01943 if (reactor_) {
01944 reactor_->end_reactor_event_loop();
01945 wait();
01946 }
01947 }
01948
01949 unique_ptr<ACE_Reactor> reactor_;
01950 ACE_Thread_Mutex mtx_;
01951 } reactor_runner_;
01952
01953 DomainParticipantMap participants_;
01954 OPENDDS_MAP(DDS::DomainId_t, OPENDDS_MAP(OPENDDS_STRING, TopicDetails) ) topics_;
01955 OPENDDS_MAP(DDS::DomainId_t, OPENDDS_MAP(OPENDDS_STRING, unsigned int) ) topic_use_;
01956 };
01957
01958 }
01959 }
01960
01961 OPENDDS_END_VERSIONED_NAMESPACE_DECL
01962
01963 #endif