00001
00002
00003
00004
00005
00006
00007
00008 #include "Spdp.h"
00009
00010 #include "BaseMessageTypes.h"
00011 #include "MessageTypes.h"
00012 #include "ParameterListConverter.h"
00013 #include "RtpsCoreTypeSupportImpl.h"
00014 #include "RtpsDiscovery.h"
00015
00016 #include "dds/DdsDcpsGuidC.h"
00017
00018 #include "dds/DCPS/Service_Participant.h"
00019 #include "dds/DCPS/BuiltInTopicUtils.h"
00020 #include "dds/DCPS/GuidConverter.h"
00021 #include "dds/DCPS/Qos_Helper.h"
00022
00023 #if defined(OPENDDS_SECURITY)
00024 #include "SecurityHelpers.h"
00025 #include "dds/DCPS/security/framework/SecurityRegistry.h"
00026 #endif
00027
00028 #include "ace/Reactor.h"
00029 #include "ace/OS_NS_sys_socket.h"
00030
00031 #include <cstring>
00032 #include <stdexcept>
00033
00034 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00035
00036 namespace OpenDDS {
00037 namespace RTPS {
00038 using DCPS::RepoId;
00039
00040 namespace {
00041
00042
00043
00044 const int LEASE_MULT = 10;
00045 const CORBA::UShort encap_LE = 0x0300;
00046 const CORBA::UShort encap_BE = 0x0200;
00047
00048 const ACE_Time_Value MAX_SPDP_TIMER_PERIOD(0, 10000);
00049 const ACE_Time_Value MAX_AUTH_TIME(3, 0);
00050 const ACE_Time_Value AUTH_RESEND_PERIOD(0, 25000);
00051
00052 bool disposed(const ParameterList& inlineQos)
00053 {
00054 for (CORBA::ULong i = 0; i < inlineQos.length(); ++i) {
00055 if (inlineQos[i]._d() == PID_STATUS_INFO) {
00056 return inlineQos[i].status_info().value[3] & 1;
00057 }
00058 }
00059 return false;
00060 }
00061
00062 #if defined(OPENDDS_SECURITY)
00063 bool operator==(const DDS::Security::Property_t& rhs, const DDS::Security::Property_t& lhs) {
00064 return rhs.name == lhs.name && rhs.value == lhs.value && rhs.propagate == lhs.propagate;
00065 }
00066
00067 bool operator==(const DDS::Security::BinaryProperty_t& rhs, const DDS::Security::BinaryProperty_t& lhs) {
00068 return rhs.name == lhs.name && rhs.value == lhs.value && rhs.propagate == lhs.propagate;
00069 }
00070
00071 bool operator==(const DDS::Security::PropertySeq& rhs, const DDS::Security::PropertySeq& lhs) {
00072 bool result = (rhs.length() == lhs.length());
00073 for (size_t i = 0; result && i < rhs.length(); ++i) {
00074 result = (rhs[i] == lhs[i]);
00075 }
00076 return result;
00077 }
00078
00079 bool operator==(const DDS::Security::BinaryPropertySeq& rhs, const DDS::Security::BinaryPropertySeq& lhs) {
00080 bool result = (rhs.length() == lhs.length());
00081 for (size_t i = 0; result && i < rhs.length(); ++i) {
00082 result = (rhs[i] == lhs[i]);
00083 }
00084 return result;
00085 }
00086
00087 bool operator==(const DDS::Security::DataHolder& rhs, const DDS::Security::DataHolder& lhs) {
00088 return rhs.class_id == lhs.class_id && rhs.properties == lhs.properties && rhs.binary_properties == lhs.binary_properties;
00089 }
00090
00091 void init_participant_sec_attributes(DDS::Security::ParticipantSecurityAttributes& attr)
00092 {
00093 attr.allow_unauthenticated_participants = false;
00094 attr.is_access_protected = false;
00095 attr.is_rtps_protected = false;
00096 attr.is_discovery_protected = false;
00097 attr.is_liveliness_protected = false;
00098 attr.plugin_participant_attributes = 0;
00099 attr.ac_endpoint_properties.length(0);
00100 }
00101 #endif
00102
00103 GUID_t make_guid(const DCPS::GuidPrefix_t prefix, const DCPS::EntityId_t entity)
00104 {
00105 GUID_t result;
00106 std::memcpy(result.guidPrefix, prefix, sizeof(GuidPrefix_t));
00107 std::memcpy(&result.entityId, &entity, sizeof(EntityId_t));
00108 return result;
00109 }
00110 }
00111
00112 void Spdp::init(DDS::DomainId_t ,
00113 DCPS::RepoId& guid,
00114 const DDS::DomainParticipantQos& ,
00115 RtpsDiscovery* disco)
00116 {
00117 guid = guid_;
00118 sedp_.ignore(guid);
00119 sedp_.init(guid_, *disco, domain_);
00120
00121
00122 sedp_.unicast_locators(sedp_unicast_);
00123
00124 if (disco->sedp_multicast()) {
00125 const ACE_INET_Addr& mc_addr = sedp_.multicast_group();
00126 DCPS::Locator_t mc_locator;
00127 mc_locator.kind = address_to_kind(mc_addr);
00128 mc_locator.port = mc_addr.get_port_number();
00129 address_to_bytes(mc_locator.address, mc_addr);
00130 sedp_multicast_.length(1);
00131 sedp_multicast_[0] = mc_locator;
00132 }
00133 }
00134
00135
00136 Spdp::Spdp(DDS::DomainId_t domain,
00137 RepoId& guid,
00138 const DDS::DomainParticipantQos& qos,
00139 RtpsDiscovery* disco)
00140
00141 : DCPS::LocalParticipant<Sedp>(qos)
00142 , disco_(disco)
00143 , domain_(domain)
00144 , guid_(guid)
00145 , tport_(new SpdpTransport(this, false))
00146 , eh_(tport_)
00147 , eh_shutdown_(false)
00148 , shutdown_cond_(lock_)
00149 , shutdown_flag_(false)
00150 , sedp_(guid_, *this, lock_)
00151 #if defined(OPENDDS_SECURITY)
00152 , security_config_()
00153 , security_enabled_(false)
00154 #endif
00155 {
00156 ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00157
00158 init(domain, guid, qos, disco);
00159
00160 #if defined(OPENDDS_SECURITY)
00161 init_participant_sec_attributes(participant_sec_attr_);
00162 #endif
00163
00164 }
00165
00166 #if defined(OPENDDS_SECURITY)
00167 Spdp::Spdp(DDS::DomainId_t domain,
00168 const DCPS::RepoId& guid,
00169 const DDS::DomainParticipantQos& qos,
00170 RtpsDiscovery* disco,
00171 DDS::Security::IdentityHandle identity_handle,
00172 DDS::Security::PermissionsHandle perm_handle,
00173 DDS::Security::ParticipantCryptoHandle crypto_handle)
00174
00175 : DCPS::LocalParticipant<Sedp>(qos)
00176 , disco_(disco)
00177 , domain_(domain)
00178 , guid_(guid)
00179 , tport_(new SpdpTransport(this, true))
00180 , eh_(tport_)
00181 , eh_shutdown_(false)
00182 , shutdown_cond_(lock_)
00183 , shutdown_flag_(false)
00184 , sedp_(guid_, *this, lock_)
00185 , security_config_(Security::SecurityRegistry::instance()->default_config())
00186 , security_enabled_(security_config_->get_authentication() && security_config_->get_access_control() && security_config_->get_crypto_key_factory() && security_config_->get_crypto_key_exchange())
00187 , identity_handle_(identity_handle)
00188 , permissions_handle_(perm_handle)
00189 , crypto_handle_(crypto_handle)
00190 {
00191 ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00192
00193 init(domain, guid_, qos, disco);
00194
00195 DDS::Security::Authentication_var auth = security_config_->get_authentication();
00196 DDS::Security::AccessControl_var access = security_config_->get_access_control();
00197
00198 DDS::Security::SecurityException se = {"", 0, 0};
00199
00200 if (auth->get_identity_token(identity_token_, identity_handle_, se) == false) {
00201 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00202 ACE_TEXT("Spdp::Spdp() - ")
00203 ACE_TEXT("unable to get identity token. Security Exception[%d.%d]: %C\n"),
00204 se.code, se.minor_code, se.message.in()));
00205 throw std::runtime_error("unable to get identity token");
00206 }
00207 if (auth->get_identity_status_token(identity_status_token_, identity_handle_, se) == false) {
00208 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00209 ACE_TEXT("Spdp::Spdp() - ")
00210 ACE_TEXT("unable to get identity status token. Security Exception[%d.%d]: %C\n"),
00211 se.code, se.minor_code, se.message.in()));
00212 throw std::runtime_error("unable to get identity status token");
00213 }
00214 if (access->get_permissions_token(permissions_token_, permissions_handle_, se) == false) {
00215 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00216 ACE_TEXT("Spdp::Spdp() - ")
00217 ACE_TEXT("unable to get permissions handle. Security Exception[%d.%d]: %C\n"),
00218 se.code, se.minor_code, se.message.in()));
00219 throw std::runtime_error("unable to get permissions token");
00220 }
00221 if (access->get_permissions_credential_token(permissions_credential_token_, permissions_handle_, se) == false) {
00222 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00223 ACE_TEXT("Spdp::Spdp() - ")
00224 ACE_TEXT("unable to get permissions credential handle. Security Exception[%d.%d]: %C\n"),
00225 se.code, se.minor_code, se.message.in()));
00226 throw std::runtime_error("unable to get permissions credential token");
00227 }
00228
00229 if (auth->set_permissions_credential_and_token(identity_handle_, permissions_credential_token_, permissions_token_, se) == false) {
00230 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00231 ACE_TEXT("Spdp::Spdp() - ")
00232 ACE_TEXT("unable to set permissions credential and token. Security Exception[%d.%d]: %C\n"),
00233 se.code, se.minor_code, se.message.in()));
00234 throw std::runtime_error("unable to set permissions credential and token");
00235 }
00236
00237 init_participant_sec_attributes(participant_sec_attr_);
00238
00239 if (access->get_participant_sec_attributes(permissions_handle_, participant_sec_attr_, se) == false) {
00240 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00241 ACE_TEXT("Spdp::Spdp() - ")
00242 ACE_TEXT("failed to retrieve participant security attributes. Security Exception[%d.%d]: %C\n"),
00243 se.code, se.minor_code, se.message.in()));
00244 throw std::runtime_error("unable to retrieve participant security attributes");
00245 }
00246
00247 sedp_.init_security(identity_handle, perm_handle, crypto_handle);
00248 }
00249 #endif
00250
00251 Spdp::~Spdp()
00252 {
00253 shutdown_flag_ = true;
00254 {
00255 ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00256 if (DCPS::DCPS_debug_level > 3) {
00257 ACE_DEBUG((LM_INFO,
00258 ACE_TEXT("(%P|%t) Spdp::~Spdp ")
00259 ACE_TEXT("remove discovered participants\n")));
00260 }
00261
00262 #if defined(OPENDDS_SECURITY)
00263 write_secure_disposes();
00264 #endif
00265
00266
00267
00268 DCPS::RepoIdSet participant_ids;
00269 get_discovered_participant_ids(participant_ids);
00270 for (DCPS::RepoIdSet::iterator participant_id = participant_ids.begin();
00271 participant_id != participant_ids.end();
00272 ++participant_id)
00273 {
00274 DiscoveredParticipantIter part = participants_.find(*participant_id);
00275 if (part != participants_.end()) {
00276 remove_discovered_participant(part);
00277 }
00278 }
00279 }
00280
00281
00282
00283 sedp_.shutdown();
00284
00285
00286 tport_->close();
00287 eh_.reset();
00288 {
00289 ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00290 while (!eh_shutdown_) {
00291 shutdown_cond_.wait();
00292 }
00293 }
00294 }
00295
00296 #if defined(OPENDDS_SECURITY)
00297 void
00298 Spdp::write_secure_updates()
00299 {
00300 if (shutdown_flag_.value()) { return; }
00301
00302 ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00303
00304 const Security::SPDPdiscoveredParticipantData& pdata =
00305 build_local_pdata(Security::DPDK_SECURE);
00306
00307 for (DiscoveredParticipantIter pi = participants_.begin(); pi != participants_.end(); ++pi) {
00308 if (pi->second.auth_state_ == DCPS::AS_AUTHENTICATED) {
00309 sedp_.write_dcps_participant_secure(pdata, pi->first);
00310 }
00311 }
00312 }
00313
00314 void
00315 Spdp::write_secure_disposes()
00316 {
00317 sedp_.write_dcps_participant_dispose(guid_);
00318 }
00319 #endif
00320
00321 void
00322 Spdp::handle_participant_data(DCPS::MessageId id, const Security::SPDPdiscoveredParticipantData& cpdata)
00323 {
00324 const ACE_Time_Value now = ACE_OS::gettimeofday();
00325
00326
00327 Security::SPDPdiscoveredParticipantData pdata(cpdata);
00328
00329 const DCPS::RepoId guid = make_guid(pdata.participantProxy.guidPrefix, DCPS::ENTITYID_PARTICIPANT);
00330
00331 ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00332 if (sedp_.ignoring(guid)) {
00333
00334
00335 return;
00336 }
00337
00338
00339 DiscoveredParticipantIter iter = participants_.find(guid);
00340
00341 if (iter == participants_.end()) {
00342
00343
00344 if (id == DCPS::DISPOSE_INSTANCE || id == DCPS::DISPOSE_UNREGISTER_INSTANCE) {
00345 return;
00346 }
00347
00348
00349 std::memcpy(pdata.ddsParticipantDataSecure.base.base.key.value,
00350 pdata.participantProxy.guidPrefix,
00351 sizeof(pdata.ddsParticipantDataSecure.base.base.key.value));
00352
00353 if (DCPS::DCPS_debug_level) {
00354 DCPS::GuidConverter local(guid_), remote(guid);
00355 ACE_DEBUG((LM_DEBUG,
00356 ACE_TEXT("(%P|%t) Spdp::data_received - %C discovered %C lease %ds\n"),
00357 OPENDDS_STRING(local).c_str(), OPENDDS_STRING(remote).c_str(),
00358 pdata.leaseDuration.seconds));
00359 }
00360
00361
00362 participants_[guid] = DiscoveredParticipant(pdata, now);
00363 DiscoveredParticipant& dp = participants_[guid];
00364
00365 #if defined(OPENDDS_SECURITY)
00366 if (is_security_enabled()) {
00367
00368 sedp_.associate_preauth(dp.pdata_);
00369
00370
00371 PendingRemoteAuthTokenMap::iterator token_iter = pending_remote_auth_tokens_.find(guid);
00372 if (token_iter != pending_remote_auth_tokens_.end()) {
00373 dp.remote_auth_request_token_ = token_iter->second;
00374 pending_remote_auth_tokens_.erase(token_iter);
00375 }
00376 }
00377 #endif
00378
00379
00380
00381 this->tport_->write_i();
00382
00383 #if defined(OPENDDS_SECURITY)
00384 if (is_security_enabled()) {
00385 bool has_security_data = dp.pdata_.dataKind == Security::DPDK_ENHANCED ||
00386 dp.pdata_.dataKind == Security::DPDK_SECURE;
00387
00388 if (has_security_data == false) {
00389 if (participant_sec_attr_.allow_unauthenticated_participants == false) {
00390 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DEBUG: Spdp::data_received - ")
00391 ACE_TEXT("Incompatible security attributes in discovered participant: %C\n"),
00392 std::string(DCPS::GuidConverter(guid)).c_str()));
00393 participants_.erase(guid);
00394 } else {
00395 dp.auth_state_ = DCPS::AS_UNAUTHENTICATED;
00396 match_unauthenticated(guid, dp);
00397 }
00398 } else {
00399 dp.identity_token_ = pdata.ddsParticipantDataSecure.base.identity_token;
00400 dp.permissions_token_ = pdata.ddsParticipantDataSecure.base.permissions_token;
00401 dp.property_qos_ = pdata.ddsParticipantDataSecure.base.property;
00402 dp.security_info_ = pdata.ddsParticipantDataSecure.base.security_info;
00403
00404 attempt_authentication(guid, dp);
00405 if (dp.auth_state_ == DCPS::AS_UNAUTHENTICATED) {
00406 if (participant_sec_attr_.allow_unauthenticated_participants == false) {
00407 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DEBUG: Spdp::data_received - ")
00408 ACE_TEXT("Incompatible security attributes in discovered participant: %C\n"),
00409 std::string(DCPS::GuidConverter(guid)).c_str()));
00410 participants_.erase(guid);
00411 } else {
00412 dp.auth_state_ = DCPS::AS_UNAUTHENTICATED;
00413 match_unauthenticated(guid, dp);
00414 }
00415 } else if (dp.auth_state_ == DCPS::AS_AUTHENTICATED) {
00416 if (match_authenticated(guid, dp) == false) {
00417 participants_.erase(guid);
00418 }
00419 }
00420
00421 }
00422 } else {
00423
00424 dp.auth_state_ = DCPS::AS_UNAUTHENTICATED;
00425 match_unauthenticated(guid, dp);
00426
00427 }
00428 #else
00429 match_unauthenticated(guid, dp);
00430 #endif
00431
00432 } else {
00433
00434 #if defined(OPENDDS_SECURITY)
00435
00436
00437 if (iter->second.auth_state_ == DCPS::AS_AUTHENTICATED &&
00438 pdata.dataKind != Security::DPDK_SECURE &&
00439 id != DCPS::DISPOSE_INSTANCE &&
00440 id != DCPS::DISPOSE_UNREGISTER_INSTANCE)
00441 {
00442 iter->second.last_seen_ = now;
00443 return;
00444 }
00445 #endif
00446
00447 if (id == DCPS::DISPOSE_INSTANCE || id == DCPS::DISPOSE_UNREGISTER_INSTANCE) {
00448 remove_discovered_participant(iter);
00449 return;
00450 }
00451
00452
00453 ACE_Reverse_Lock<ACE_Thread_Mutex> rev_lock(lock_);
00454
00455
00456 pdata.ddsParticipantDataSecure.base.base.key = iter->second.pdata_.ddsParticipantDataSecure.base.base.key;
00457 #ifndef OPENDDS_SAFETY_PROFILE
00458 using DCPS::operator!=;
00459 #endif
00460 if (iter->second.pdata_.ddsParticipantDataSecure.base.base.user_data !=
00461 pdata.ddsParticipantDataSecure.base.base.user_data) {
00462 iter->second.pdata_.ddsParticipantDataSecure.base.base.user_data =
00463 pdata.ddsParticipantDataSecure.base.base.user_data;
00464 #ifndef DDS_HAS_MINIMUM_BIT
00465 DCPS::ParticipantBuiltinTopicDataDataReaderImpl* bit = part_bit();
00466 if (bit) {
00467 ACE_GUARD(ACE_Reverse_Lock<ACE_Thread_Mutex>, rg, rev_lock);
00468 bit->store_synthetic_data(pdata.ddsParticipantDataSecure.base.base,
00469 DDS::NOT_NEW_VIEW_STATE);
00470 }
00471 #endif
00472
00473 iter = participants_.find(guid);
00474 }
00475
00476 if (iter != participants_.end()) {
00477 iter->second.pdata_ = pdata;
00478 iter->second.last_seen_ = now;
00479 }
00480 }
00481 }
00482
00483 void
00484 Spdp::data_received(const DataSubmessage& data, const ParameterList& plist)
00485 {
00486 if (shutdown_flag_.value()) { return; }
00487
00488 Security::SPDPdiscoveredParticipantData pdata;
00489 if (ParameterListConverter::from_param_list(plist, pdata) < 0) {
00490 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: Spdp::data_received - ")
00491 ACE_TEXT("failed to convert from ParameterList to ")
00492 ACE_TEXT("SPDPdiscoveredParticipantData\n")));
00493 return;
00494 }
00495
00496 DCPS::MessageId msg_id = (data.inlineQos.length() && disposed(data.inlineQos)) ? DCPS::DISPOSE_INSTANCE : DCPS::SAMPLE_DATA;
00497
00498 handle_participant_data(msg_id, pdata);
00499 }
00500
00501 void
00502 Spdp::match_unauthenticated(const DCPS::RepoId& guid, DiscoveredParticipant& dp)
00503 {
00504
00505 ACE_Reverse_Lock<ACE_Thread_Mutex> rev_lock(lock_);
00506
00507 DDS::InstanceHandle_t bit_instance_handle = DDS::HANDLE_NIL;
00508 #ifndef DDS_HAS_MINIMUM_BIT
00509 DCPS::ParticipantBuiltinTopicDataDataReaderImpl* bit = part_bit();
00510 if (bit) {
00511 ACE_GUARD(ACE_Reverse_Lock<ACE_Thread_Mutex>, rg, rev_lock);
00512 bit_instance_handle =
00513 bit->store_synthetic_data(dp.pdata_.ddsParticipantDataSecure.base.base,
00514 DDS::NEW_VIEW_STATE);
00515 }
00516 #endif
00517
00518
00519
00520 sedp_.associate(dp.pdata_);
00521
00522
00523 DiscoveredParticipantIter iter = participants_.find(guid);
00524 if (iter != participants_.end()) {
00525 iter->second.bit_ih_ = bit_instance_handle;
00526 }
00527 }
00528
00529 #if defined(OPENDDS_SECURITY)
00530 void
00531 Spdp::handle_auth_request(const DDS::Security::ParticipantStatelessMessage& msg)
00532 {
00533
00534 if (msg.destination_participant_guid != guid_ || msg.message_data.length() == 0) {
00535 return;
00536 }
00537
00538 const ACE_Time_Value time = ACE_OS::gettimeofday();
00539
00540 RepoId guid = msg.message_identity.source_guid;
00541 guid.entityId = DCPS::ENTITYID_PARTICIPANT;
00542
00543 ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00544
00545 if (sedp_.ignoring(guid)) {
00546
00547
00548 return;
00549 }
00550
00551 DiscoveredParticipantMap::iterator iter = participants_.find(guid);
00552
00553 if (iter == participants_.end()) {
00554
00555 pending_remote_auth_tokens_[guid] = msg.message_data[0];
00556 } else {
00557 iter->second.remote_auth_request_token_ = msg.message_data[0];
00558 }
00559 }
00560
00561 namespace {
00562 void set_participant_guid(const GUID_t& guid, ParameterList& param_list)
00563 {
00564 Parameter gp_param;
00565 gp_param.guid(guid);
00566 gp_param._d(PID_PARTICIPANT_GUID);
00567 param_list.length(param_list.length() + 1);
00568 param_list[param_list.length() - 1] = gp_param;
00569 }
00570 }
00571
00572 void
00573 Spdp::handle_handshake_message(const DDS::Security::ParticipantStatelessMessage& msg)
00574 {
00575 DDS::Security::SecurityException se = {"", 0, 0};
00576 Security::Authentication_var auth = security_config_->get_authentication();
00577
00578
00579 if (msg.destination_participant_guid != guid_ || !msg.message_data.length()) {
00580 return;
00581 }
00582
00583 RepoId src_participant = msg.message_identity.source_guid;
00584 src_participant.entityId = DCPS::ENTITYID_PARTICIPANT;
00585
00586 ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00587
00588
00589 DiscoveredParticipantIter iter = participants_.find(src_participant);
00590 if (iter == participants_.end()) {
00591 ACE_DEBUG((LM_WARNING,
00592 ACE_TEXT("(%P|%t) Spdp::handle_handshake_message() - ")
00593 ACE_TEXT("received handshake for undiscovered participant %C. Ignoring.\n"),
00594 std::string(DCPS::GuidConverter(src_participant)).c_str()));
00595 return;
00596 }
00597
00598 DiscoveredParticipant& dp = iter->second;
00599
00600 DCPS::RepoId writer = guid_;
00601 writer.entityId = ENTITYID_P2P_BUILTIN_PARTICIPANT_STATELESS_WRITER;
00602
00603 DCPS::RepoId reader = src_participant;
00604 reader.entityId = ENTITYID_P2P_BUILTIN_PARTICIPANT_STATELESS_READER;
00605
00606 if (dp.auth_state_ == DCPS::AS_HANDSHAKE_REPLY && msg.related_message_identity.source_guid == GUID_UNKNOWN) {
00607 DDS::Security::ParticipantBuiltinTopicDataSecure pbtds = {
00608 {
00609 {
00610 DDS::BuiltinTopicKey_t() ,
00611 qos_.user_data
00612 },
00613 identity_token_,
00614 permissions_token_,
00615 qos_.property,
00616 {0, 0}
00617 },
00618 identity_status_token_
00619 };
00620
00621 pbtds.base.security_info.plugin_participant_security_attributes = participant_sec_attr_.plugin_participant_attributes;
00622 pbtds.base.security_info.participant_security_attributes = security_attributes_to_bitmask(participant_sec_attr_);
00623
00624 ParameterList plist;
00625 set_participant_guid(guid_, plist);
00626 if (ParameterListConverter::to_param_list(pbtds.base, plist) < 0) {
00627 ACE_ERROR((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: Spdp::handle_handshake_message() - ")
00628 ACE_TEXT("Failed to convert from ParticipantBuiltinTopicData to ParameterList\n")));
00629 return;
00630 }
00631
00632 ACE_Message_Block temp_buff(64 * 1024);
00633 DCPS::Serializer ser(&temp_buff, DCPS::Serializer::SWAP_BE, DCPS::Serializer::ALIGN_INITIALIZE);
00634 if (!(ser << plist)) {
00635 ACE_ERROR((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: Spdp::handle_handshake_message() - ")
00636 ACE_TEXT("Failed to serialize parameter list.\n")));
00637 return;
00638 }
00639
00640 DDS::Security::ParticipantStatelessMessage reply;
00641 reply.message_identity.source_guid = guid_;
00642 reply.message_identity.sequence_number = 0;
00643 reply.message_class_id = DDS::Security::GMCLASSID_SECURITY_AUTH_HANDSHAKE;
00644 reply.related_message_identity = msg.message_identity;
00645 reply.destination_participant_guid = src_participant;
00646 reply.destination_endpoint_guid = reader;
00647 reply.source_endpoint_guid = GUID_UNKNOWN;
00648 reply.message_data.length(1);
00649 reply.message_data[0] = msg.message_data[0];
00650
00651 DDS::Security::ValidationResult_t vr = auth->begin_handshake_reply(dp.handshake_handle_, reply.message_data[0], dp.identity_handle_, identity_handle_, DDS::OctetSeq(temp_buff.length(), &temp_buff), se);
00652 if (vr == DDS::Security::VALIDATION_FAILED) {
00653 ACE_ERROR((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: Spdp::handle_handshake_message() - ")
00654 ACE_TEXT("Failed to reply to incoming handshake message. Security Exception[%d.%d]: %C\n"),
00655 se.code, se.minor_code, se.message.in()));
00656 return;
00657 } else if (vr == DDS::Security::VALIDATION_PENDING_HANDSHAKE_MESSAGE) {
00658 if (sedp_.write_stateless_message(reply, reader) != DDS::RETCODE_OK) {
00659 ACE_ERROR((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: Spdp::handle_handshake_message() - ")
00660 ACE_TEXT("Unable to write stateless message for handshake reply.\n")));
00661 return;
00662 }
00663 dp.has_last_stateless_msg_ = true;
00664 dp.last_stateless_msg_time_ = ACE_OS::gettimeofday();
00665 dp.last_stateless_msg_ = reply;
00666 dp.auth_state_ = DCPS::AS_HANDSHAKE_REPLY_SENT;
00667 return;
00668 } else if (vr == DDS::Security::VALIDATION_OK_FINAL_MESSAGE) {
00669
00670 if (sedp_.write_stateless_message(reply, reader) != DDS::RETCODE_OK) {
00671 ACE_ERROR((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: Spdp::handle_handshake_message() - ")
00672 ACE_TEXT("Unable to write stateless message for final message.\n")));
00673 return;
00674 }
00675 dp.has_last_stateless_msg_ = false;
00676 dp.auth_state_ = DCPS::AS_AUTHENTICATED;
00677 match_authenticated(src_participant, dp);
00678 } else if (vr == DDS::Security::VALIDATION_OK) {
00679
00680 dp.has_last_stateless_msg_ = false;
00681 dp.auth_state_ = DCPS::AS_AUTHENTICATED;
00682 match_authenticated(src_participant, dp);
00683 }
00684 }
00685
00686 if ((dp.auth_state_ == DCPS::AS_HANDSHAKE_REQUEST_SENT || dp.auth_state_ == DCPS::AS_HANDSHAKE_REPLY_SENT) && msg.related_message_identity.source_guid == guid_) {
00687 DDS::Security::ParticipantStatelessMessage reply;
00688 reply.message_identity.source_guid = guid_;
00689 reply.message_identity.sequence_number = 0;
00690 reply.message_class_id = DDS::Security::GMCLASSID_SECURITY_AUTH_HANDSHAKE;
00691 reply.related_message_identity = msg.message_identity;
00692 reply.destination_participant_guid = src_participant;
00693 reply.destination_endpoint_guid = reader;
00694 reply.source_endpoint_guid = GUID_UNKNOWN;
00695 reply.message_data.length(1);
00696
00697 DDS::Security::ValidationResult_t vr = auth->process_handshake(reply.message_data[0], msg.message_data[0], dp.handshake_handle_, se);
00698 if (vr == DDS::Security::VALIDATION_FAILED) {
00699 if (dp.auth_state_ == DCPS::AS_HANDSHAKE_REQUEST_SENT) {
00700 ACE_ERROR((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: Spdp::handle_handshake_message() - ")
00701 ACE_TEXT("Failed to process incoming handshake message when expecting reply from %C. Security Exception[%d.%d]: %C\n"),
00702 std::string(DCPS::GuidConverter(src_participant)).c_str(), se.code, se.minor_code, se.message.in()));
00703 } else {
00704 ACE_ERROR((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: Spdp::handle_handshake_message() - ")
00705 ACE_TEXT("Failed to process incoming handshake message when expecting final message from %C. Security Exception[%d.%d]: %C\n"),
00706 std::string(DCPS::GuidConverter(src_participant)).c_str(), se.code, se.minor_code, se.message.in()));
00707 }
00708 return;
00709 } else if (vr == DDS::Security::VALIDATION_PENDING_HANDSHAKE_MESSAGE) {
00710
00711 if (sedp_.write_stateless_message(reply, reader) != DDS::RETCODE_OK) {
00712 ACE_ERROR((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: Spdp::handle_handshake_message() - ")
00713 ACE_TEXT("Unable to write stateless message for handshake reply.\n")));
00714 return;
00715 }
00716 dp.has_last_stateless_msg_ = true;
00717 dp.last_stateless_msg_time_ = ACE_OS::gettimeofday();
00718 dp.last_stateless_msg_ = reply;
00719
00720 } else if (vr == DDS::Security::VALIDATION_OK_FINAL_MESSAGE) {
00721 if (sedp_.write_stateless_message(reply, reader) != DDS::RETCODE_OK) {
00722 ACE_ERROR((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: Spdp::handle_handshake_message() - ")
00723 ACE_TEXT("Unable to write stateless message for final message.\n")));
00724 return;
00725 }
00726 dp.has_last_stateless_msg_ = false;
00727 dp.auth_state_ = DCPS::AS_AUTHENTICATED;
00728 match_authenticated(src_participant, dp);
00729 } else if (vr == DDS::Security::VALIDATION_OK) {
00730 dp.has_last_stateless_msg_ = false;
00731 dp.auth_state_ = DCPS::AS_AUTHENTICATED;
00732 match_authenticated(src_participant, dp);
00733 }
00734 }
00735
00736 return;
00737 }
00738
00739 void
00740 Spdp::check_auth_states(const ACE_Time_Value& tv) {
00741 ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00742 OPENDDS_SET_CMP(RepoId, DCPS::GUID_tKeyLessThan) to_erase;
00743 for (DiscoveredParticipantIter pi = participants_.begin(); pi != participants_.end(); ++pi) {
00744 switch (pi->second.auth_state_) {
00745 case DCPS::AS_HANDSHAKE_REQUEST_SENT:
00746 case DCPS::AS_HANDSHAKE_REPLY_SENT:
00747 if (tv > pi->second.auth_started_time_ + MAX_AUTH_TIME) {
00748 to_erase.insert(pi->first);
00749 } else if (pi->second.has_last_stateless_msg_ && (tv > (pi->second.last_stateless_msg_time_ + AUTH_RESEND_PERIOD))) {
00750 RepoId reader = pi->first;
00751 reader.entityId = ENTITYID_P2P_BUILTIN_PARTICIPANT_STATELESS_READER;
00752 pi->second.last_stateless_msg_time_ = tv;
00753 if (sedp_.write_stateless_message(pi->second.last_stateless_msg_, reader) != DDS::RETCODE_OK) {
00754 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DEBUG: Spdp::check_auth_states() - ")
00755 ACE_TEXT("Unable to write stateless message retry.\n")));
00756 }
00757 }
00758 break;
00759 case DCPS::AS_UNKNOWN:
00760 case DCPS::AS_VALIDATING_REMOTE:
00761 case DCPS::AS_HANDSHAKE_REQUEST:
00762 case DCPS::AS_HANDSHAKE_REPLY:
00763 case DCPS::AS_AUTHENTICATED:
00764 case DCPS::AS_UNAUTHENTICATED:
00765 default:
00766 break;
00767 }
00768 }
00769 for (OPENDDS_SET_CMP(RepoId, DCPS::GUID_tKeyLessThan)::const_iterator it = to_erase.begin(); it != to_erase.end(); ++it) {
00770 DiscoveredParticipantIter pit = participants_.find(*it);
00771 if (pit != participants_.end()) {
00772 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DEBUG: Spdp::check_auth_states() - Removing discovered participant due to authentication timeout: %C\n"), std::string(DCPS::GuidConverter(*it)).c_str()));
00773 if (participant_sec_attr_.allow_unauthenticated_participants == false) {
00774 remove_discovered_participant(pit);
00775 } else {
00776 pit->second.auth_state_ = DCPS::AS_UNAUTHENTICATED;
00777 match_unauthenticated(*it, pit->second);
00778 }
00779 }
00780 }
00781 }
00782
00783
00784 void
00785 Spdp::handle_participant_crypto_tokens(const DDS::Security::ParticipantVolatileMessageSecure& msg) {
00786 DDS::Security::SecurityException se = {"", 0, 0};
00787 Security::CryptoKeyExchange_var key_exchange = security_config_->get_crypto_key_exchange();
00788
00789
00790 if (msg.destination_participant_guid != guid_ || !msg.message_data.length()) {
00791 return;
00792 }
00793
00794 RepoId src_participant = msg.message_identity.source_guid;
00795 src_participant.entityId = DCPS::ENTITYID_PARTICIPANT;
00796
00797 ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00798
00799
00800 DiscoveredParticipantIter iter = participants_.find(src_participant);
00801 if (iter == participants_.end()) {
00802 ACE_DEBUG((LM_WARNING,
00803 ACE_TEXT("(%P|%t) Spdp::handle_participant_crypto_tokens() - ")
00804 ACE_TEXT("received tokens for undiscovered participant %C. Ignoring.\n"),
00805 std::string(DCPS::GuidConverter(src_participant)).c_str()));
00806 return;
00807 }
00808 DiscoveredParticipant& dp = iter->second;
00809
00810 dp.crypto_tokens_ = reinterpret_cast<const DDS::Security::ParticipantCryptoTokenSeq&>(msg.message_data);
00811
00812 if (key_exchange->set_remote_participant_crypto_tokens(crypto_handle_, dp.crypto_handle_, dp.crypto_tokens_, se) == false) {
00813 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00814 ACE_TEXT("(%P|%t) ERROR: Spdp::handle_participant_crypto_tokens() - ")
00815 ACE_TEXT("Unable to set remote participant crypto tokens with crypto key exchange plugin. Security Exception[%d.%d]: %C\n"),
00816 se.code, se.minor_code, se.message.in()));
00817 return;
00818 }
00819 }
00820
00821 bool
00822 Spdp::match_authenticated(const DCPS::RepoId& guid, DiscoveredParticipant& dp)
00823 {
00824 DDS::Security::SecurityException se = {"", 0, 0};
00825
00826 Security::Authentication_var auth = security_config_->get_authentication();
00827 Security::AccessControl_var access = security_config_->get_access_control();
00828 Security::CryptoKeyFactory_var key_factory = security_config_->get_crypto_key_factory();
00829 Security::CryptoKeyExchange_var key_exchange = security_config_->get_crypto_key_exchange();
00830
00831 dp.shared_secret_handle_ = auth->get_shared_secret(dp.handshake_handle_, se);
00832 if (dp.shared_secret_handle_ == 0) {
00833 ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: ")
00834 ACE_TEXT("Spdp::match_authenticated() - ")
00835 ACE_TEXT("Unable to get shared secret handle. Security Exception[%d.%d]: %C\n"),
00836 se.code, se.minor_code, se.message.in()));
00837 return false;
00838 }
00839
00840 if (auth->get_authenticated_peer_credential_token(dp.authenticated_peer_credential_token_, dp.handshake_handle_, se) == false) {
00841 ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: ")
00842 ACE_TEXT("Spdp::match_authenticated() - ")
00843 ACE_TEXT("Unable to get authenticated peer credential token. Security Exception[%d.%d]: %C\n"),
00844 se.code, se.minor_code, se.message.in()));
00845 return false;
00846 }
00847
00848 dp.permissions_handle_ = access->validate_remote_permissions(auth, identity_handle_, dp.identity_handle_, dp.permissions_token_, dp.authenticated_peer_credential_token_, se);
00849 if (participant_sec_attr_.is_access_protected == true && dp.permissions_handle_ == DDS::HANDLE_NIL) {
00850 ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: ")
00851 ACE_TEXT("Spdp::match_authenticated() - ")
00852 ACE_TEXT("Unable to validate remote participant with access control plugin. Security Exception[%d.%d]: %C\n"),
00853 se.code, se.minor_code, se.message.in()));
00854 return false;
00855 }
00856
00857 if (participant_sec_attr_.is_access_protected == true) {
00858 if (access->check_remote_participant(dp.permissions_handle_, domain_, dp.pdata_.ddsParticipantDataSecure, se) == false) {
00859 ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: ")
00860 ACE_TEXT("Spdp::match_authenticated() - ")
00861 ACE_TEXT("Remote participant check failed. Security Exception[%d.%d]: %C\n"),
00862 se.code, se.minor_code, se.message.in()));
00863 return false;
00864 }
00865 }
00866
00867 if (DCPS::DCPS_debug_level > 3) {
00868 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) Spdp::match_authenticated - ")
00869 ACE_TEXT("auth and access control complete for peer %C\n"),
00870 std::string(DCPS::GuidConverter(guid)).c_str()));
00871 }
00872
00873 dp.crypto_handle_ = key_factory->register_matched_remote_participant(crypto_handle_, dp.identity_handle_, dp.permissions_handle_, dp.shared_secret_handle_, se);
00874 if (dp.crypto_handle_ == DDS::HANDLE_NIL) {
00875 ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: ")
00876 ACE_TEXT("Spdp::match_authenticated() - ")
00877 ACE_TEXT("Unable to register remote participant with crypto key factory plugin. Security Exception[%d.%d]: %C\n"),
00878 se.code, se.minor_code, se.message.in()));
00879 return false;
00880 }
00881
00882 if (key_exchange->create_local_participant_crypto_tokens(crypto_tokens_, crypto_handle_, dp.crypto_handle_, se) == false) {
00883 ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: ")
00884 ACE_TEXT("Spdp::match_authenticated() - ")
00885 ACE_TEXT("Unable to create local participant crypto tokens with crypto key exchange plugin. Security Exception[%d.%d]: %C\n"),
00886 se.code, se.minor_code, se.message.in()));
00887 return false;
00888 }
00889
00890
00891 ACE_Reverse_Lock<ACE_Thread_Mutex> rev_lock(lock_);
00892
00893 DDS::InstanceHandle_t bit_instance_handle = DDS::HANDLE_NIL;
00894 #ifndef DDS_HAS_MINIMUM_BIT
00895 DCPS::ParticipantBuiltinTopicDataDataReaderImpl* bit = part_bit();
00896 if (bit) {
00897 ACE_GUARD_REACTION(ACE_Reverse_Lock<ACE_Thread_Mutex>, rg, rev_lock, return false);
00898 bit_instance_handle =
00899 bit->store_synthetic_data(dp.pdata_.ddsParticipantDataSecure.base.base,
00900 DDS::NEW_VIEW_STATE);
00901 }
00902 #endif
00903
00904
00905
00906 sedp_.associate(dp.pdata_);
00907 sedp_.associate_volatile(dp.pdata_);
00908 sedp_.associate_secure_writers_to_readers(dp.pdata_);
00909 sedp_.associate_secure_readers_to_writers(dp.pdata_);
00910
00911
00912 DiscoveredParticipantIter iter = participants_.find(guid);
00913 if (iter != participants_.end()) {
00914 iter->second.bit_ih_ = bit_instance_handle;
00915 }
00916 return true;
00917 }
00918
00919 void
00920 Spdp::attempt_authentication(const DCPS::RepoId& guid, DiscoveredParticipant& dp)
00921 {
00922 DDS::Security::Authentication_var auth = security_config_->get_authentication();
00923 DDS::Security::SecurityException se = {"", 0, 0};
00924
00925 if (dp.auth_state_ == DCPS::AS_UNKNOWN) {
00926 dp.auth_started_time_ = ACE_OS::gettimeofday();
00927 dp.auth_state_ = DCPS::AS_VALIDATING_REMOTE;
00928 }
00929
00930 if (dp.auth_state_ == DCPS::AS_VALIDATING_REMOTE) {
00931 DDS::Security::ValidationResult_t vr = auth->validate_remote_identity(dp.identity_handle_, dp.local_auth_request_token_, dp.remote_auth_request_token_, identity_handle_, dp.identity_token_, guid, se);
00932
00933
00934 if (!(dp.local_auth_request_token_ == DDS::Security::Token())) {
00935 DDS::Security::ParticipantStatelessMessage msg;
00936 msg.message_identity.source_guid = guid_;
00937 msg.message_class_id = DDS::Security::GMCLASSID_SECURITY_AUTH_REQUEST;
00938 msg.destination_participant_guid = guid;
00939 msg.destination_endpoint_guid = GUID_UNKNOWN;
00940 msg.source_endpoint_guid = GUID_UNKNOWN;
00941 msg.related_message_identity.source_guid = GUID_UNKNOWN;
00942 msg.related_message_identity.sequence_number = 0;
00943 msg.message_data.length(1);
00944 msg.message_data[0] = dp.local_auth_request_token_;
00945
00946 DCPS::RepoId reader = guid;
00947 reader.entityId = ENTITYID_P2P_BUILTIN_PARTICIPANT_STATELESS_READER;
00948
00949 if (sedp_.write_stateless_message(msg, reader) != DDS::RETCODE_OK) {
00950 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: Spdp::attempt_authentication() - ")
00951 ACE_TEXT("Unable to write stateless message (auth request).\n")));
00952 }
00953 }
00954 switch (vr) {
00955 case DDS::Security::VALIDATION_OK: {
00956 dp.auth_state_ = DCPS::AS_AUTHENTICATED;
00957 return;
00958 }
00959 case DDS::Security::VALIDATION_PENDING_HANDSHAKE_MESSAGE: {
00960 dp.auth_state_ = DCPS::AS_HANDSHAKE_REPLY;
00961 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DEBUG: Spdp::attempt_authentication() - Attempting authentication (expecting reply) for participant: %C\n"), std::string(DCPS::GuidConverter(guid)).c_str()));
00962 return;
00963 }
00964 case DDS::Security::VALIDATION_PENDING_HANDSHAKE_REQUEST: {
00965 dp.auth_state_ = DCPS::AS_HANDSHAKE_REQUEST;
00966 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DEBUG: Spdp::attempt_authentication() - Attempting authentication (sending request) for participant: %C\n"), std::string(DCPS::GuidConverter(guid)).c_str()));
00967 break;
00968 }
00969 case DDS::Security::VALIDATION_FAILED: {
00970 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DEBUG: Spdp::attempt_authentication() - ")
00971 ACE_TEXT("Remote participant identity is invalid. Security Exception[%d.%d]: %C\n"),
00972 se.code, se.minor_code, se.message.in()));
00973 dp.auth_state_ = DCPS::AS_UNAUTHENTICATED;
00974 return;
00975 }
00976 default: {
00977 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DEBUG: Spdp::attempt_authentication() - ")
00978 ACE_TEXT("Unexpected return value while validating remote identity. Security Exception[%d.%d]: %C\n"),
00979 se.code, se.minor_code, se.message.in()));
00980 dp.auth_state_ = DCPS::AS_UNAUTHENTICATED;
00981 return;
00982 }
00983 }
00984 }
00985
00986 if (dp.auth_state_ == DCPS::AS_HANDSHAKE_REQUEST) {
00987 DDS::Security::ParticipantBuiltinTopicDataSecure pbtds = {
00988 {
00989 {
00990 DDS::BuiltinTopicKey_t() ,
00991 qos_.user_data
00992 },
00993 identity_token_,
00994 permissions_token_,
00995 qos_.property,
00996 {0, 0}
00997 },
00998 identity_status_token_
00999 };
01000
01001 pbtds.base.security_info.plugin_participant_security_attributes = participant_sec_attr_.plugin_participant_attributes;
01002 pbtds.base.security_info.participant_security_attributes = DDS::Security::PARTICIPANT_SECURITY_ATTRIBUTES_FLAG_IS_VALID;
01003 if (participant_sec_attr_.is_rtps_protected) {
01004 pbtds.base.security_info.participant_security_attributes |= DDS::Security::PARTICIPANT_SECURITY_ATTRIBUTES_FLAG_IS_RTPS_PROTECTED;
01005 }
01006 if (participant_sec_attr_.is_discovery_protected) {
01007 pbtds.base.security_info.participant_security_attributes |= DDS::Security::PARTICIPANT_SECURITY_ATTRIBUTES_FLAG_IS_DISCOVERY_PROTECTED;
01008 }
01009 if (participant_sec_attr_.is_liveliness_protected) {
01010 pbtds.base.security_info.participant_security_attributes |= DDS::Security::PARTICIPANT_SECURITY_ATTRIBUTES_FLAG_IS_LIVELINESS_PROTECTED;
01011 }
01012
01013 ParameterList plist;
01014 set_participant_guid(guid_, plist);
01015 if (ParameterListConverter::to_param_list(pbtds.base, plist) < 0) {
01016 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: Spdp::attempt_authentication() - ")
01017 ACE_TEXT("Failed to convert from ParticipantBuiltinTopicData to ParameterList\n")));
01018 return;
01019 }
01020
01021 ACE_Message_Block temp_buff(64 * 1024);
01022 DCPS::Serializer ser(&temp_buff, DCPS::Serializer::SWAP_BE, DCPS::Serializer::ALIGN_INITIALIZE);
01023 if (!(ser << plist)) {
01024 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: Spdp::attempt_authentication() - ")
01025 ACE_TEXT("Failed to serialize parameter list.\n")));
01026 return;
01027 }
01028
01029 DDS::Security::HandshakeMessageToken hs_mt;
01030
01031 if (auth->begin_handshake_request(dp.handshake_handle_, hs_mt, identity_handle_, dp.identity_handle_, DDS::OctetSeq(temp_buff.length(), &temp_buff), se) != DDS::Security::VALIDATION_PENDING_HANDSHAKE_MESSAGE) {
01032 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: Spdp::attempt_authentication() - ")
01033 ACE_TEXT("Failed to begin handshake_request. Security Exception[%d.%d]: %C\n"),
01034 se.code, se.minor_code, se.message.in()));
01035 return;
01036 }
01037
01038 DCPS::RepoId writer = guid_;
01039 writer.entityId = ENTITYID_P2P_BUILTIN_PARTICIPANT_STATELESS_WRITER;
01040
01041 DCPS::RepoId reader = guid;
01042 reader.entityId = ENTITYID_P2P_BUILTIN_PARTICIPANT_STATELESS_READER;
01043
01044 DDS::Security::ParticipantStatelessMessage msg;
01045 msg.message_identity.source_guid = guid_;
01046 msg.message_class_id = DDS::Security::GMCLASSID_SECURITY_AUTH_HANDSHAKE;
01047 msg.destination_participant_guid = guid;
01048 msg.destination_endpoint_guid = reader;
01049 msg.source_endpoint_guid = GUID_UNKNOWN;
01050 msg.related_message_identity.source_guid = GUID_UNKNOWN;
01051 msg.related_message_identity.sequence_number = 0;
01052 msg.message_data.length(1);
01053 msg.message_data[0] = hs_mt;
01054
01055 if (sedp_.write_stateless_message(msg, reader) != DDS::RETCODE_OK) {
01056 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: Spdp::attempt_authentication() - ")
01057 ACE_TEXT("Unable to write stateless message (handshake).\n")));
01058 return;
01059 }
01060 dp.has_last_stateless_msg_ = true;
01061 dp.last_stateless_msg_time_ = ACE_OS::gettimeofday();
01062 dp.last_stateless_msg_ = msg;
01063 dp.auth_state_ = DCPS::AS_HANDSHAKE_REQUEST_SENT;
01064 }
01065
01066 return;
01067 }
01068 #endif
01069
01070 void
01071 Spdp::remove_expired_participants()
01072 {
01073
01074 ACE_GUARD (ACE_Thread_Mutex, g, lock_);
01075
01076
01077 DCPS::RepoIdSet participant_ids;
01078 get_discovered_participant_ids(participant_ids);
01079 for (DCPS::RepoIdSet::iterator participant_id = participant_ids.begin();
01080 participant_id != participant_ids.end();
01081 ++participant_id)
01082 {
01083 DiscoveredParticipantIter part = participants_.find(*participant_id);
01084 if (part != participants_.end()) {
01085 if (part->second.last_seen_ <
01086 ACE_OS::gettimeofday() -
01087 ACE_Time_Value(part->second.pdata_.leaseDuration.seconds)) {
01088 if (DCPS::DCPS_debug_level > 1) {
01089 DCPS::GuidConverter conv(part->first);
01090 ACE_DEBUG((LM_WARNING,
01091 ACE_TEXT("(%P|%t) Spdp::remove_expired_participants() - ")
01092 ACE_TEXT("participant %C exceeded lease duration, removing\n"),
01093 OPENDDS_STRING(conv).c_str()));
01094 }
01095 remove_discovered_participant(part);
01096 }
01097 }
01098 }
01099 }
01100
01101 void
01102 Spdp::init_bit(const DDS::Subscriber_var& bit_subscriber)
01103 {
01104 bit_subscriber_ = bit_subscriber;
01105 tport_->open();
01106 }
01107
01108 void
01109 Spdp::fini_bit()
01110 {
01111 bit_subscriber_ = 0;
01112 wait_for_acks_.reset();
01113
01114
01115 tport_->acknowledge();
01116 sedp_.acknowledge();
01117
01118 wait_for_acks_.wait_for_acks(2);
01119 }
01120
01121 #ifndef DDS_HAS_MINIMUM_BIT
01122 DCPS::ParticipantBuiltinTopicDataDataReaderImpl*
01123 Spdp::part_bit()
01124 {
01125 if (!bit_subscriber_.in())
01126 return 0;
01127
01128 DDS::DataReader_var d =
01129 bit_subscriber_->lookup_datareader(DCPS::BUILT_IN_PARTICIPANT_TOPIC);
01130 return dynamic_cast<DCPS::ParticipantBuiltinTopicDataDataReaderImpl*>(d.in());
01131 }
01132 #endif
01133
01134 ACE_Reactor*
01135 Spdp::reactor() const
01136 {
01137 return disco_->reactor();
01138 }
01139
01140 WaitForAcks&
01141 Spdp::wait_for_acks()
01142 {
01143 return wait_for_acks_;
01144 }
01145
01146 bool
01147 Spdp::is_opendds(const GUID_t& participant) const
01148 {
01149 const DiscoveredParticipantConstIter iter = participants_.find(participant);
01150 if (iter == participants_.end()) {
01151 return false;
01152 }
01153 return 0 == std::memcmp(&iter->second.pdata_.participantProxy.vendorId,
01154 DCPS::VENDORID_OCI, sizeof(VendorId_t));
01155 }
01156
01157 Security::SPDPdiscoveredParticipantData
01158 Spdp::build_local_pdata(Security::DiscoveredParticipantDataKind kind)
01159 {
01160 BuiltinEndpointSet_t availableBuiltinEndpoints =
01161 DISC_BUILTIN_ENDPOINT_PARTICIPANT_ANNOUNCER |
01162 DISC_BUILTIN_ENDPOINT_PARTICIPANT_DETECTOR |
01163 DISC_BUILTIN_ENDPOINT_PUBLICATION_ANNOUNCER |
01164 DISC_BUILTIN_ENDPOINT_PUBLICATION_DETECTOR |
01165 DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_ANNOUNCER |
01166 DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_DETECTOR |
01167 BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_DATA_WRITER |
01168 BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_DATA_READER
01169 ;
01170
01171 #if defined(OPENDDS_SECURITY)
01172 if (is_security_enabled()) {
01173 availableBuiltinEndpoints |=
01174 DDS::Security::SEDP_BUILTIN_PUBLICATIONS_SECURE_WRITER |
01175 DDS::Security::SEDP_BUILTIN_PUBLICATIONS_SECURE_READER |
01176 DDS::Security::SEDP_BUILTIN_SUBSCRIPTIONS_SECURE_WRITER |
01177 DDS::Security::SEDP_BUILTIN_SUBSCRIPTIONS_SECURE_READER |
01178 DDS::Security::BUILTIN_PARTICIPANT_MESSAGE_SECURE_WRITER |
01179 DDS::Security::BUILTIN_PARTICIPANT_MESSAGE_SECURE_READER |
01180 DDS::Security::BUILTIN_PARTICIPANT_STATELESS_MESSAGE_WRITER |
01181 DDS::Security::BUILTIN_PARTICIPANT_STATELESS_MESSAGE_READER |
01182 DDS::Security::BUILTIN_PARTICIPANT_VOLATILE_MESSAGE_SECURE_WRITER |
01183 DDS::Security::BUILTIN_PARTICIPANT_VOLATILE_MESSAGE_SECURE_READER |
01184 DDS::Security::SPDP_BUILTIN_PARTICIPANT_SECURE_WRITER |
01185 DDS::Security::SPDP_BUILTIN_PARTICIPANT_SECURE_READER
01186 ;
01187 }
01188 #endif
01189
01190
01191
01192
01193
01194 DCPS::LocatorSeq nonEmptyList(1);
01195 nonEmptyList.length(1);
01196 nonEmptyList[0].kind = LOCATOR_KIND_UDPv4;
01197 nonEmptyList[0].port = 12345;
01198 std::memset(nonEmptyList[0].address, 0, 12);
01199 nonEmptyList[0].address[12] = 127;
01200 nonEmptyList[0].address[13] = 0;
01201 nonEmptyList[0].address[14] = 0;
01202 nonEmptyList[0].address[15] = 1;
01203
01204 const GuidPrefix_t& gp = guid_.guidPrefix;
01205
01206 const Security::SPDPdiscoveredParticipantData pdata = {
01207 kind,
01208 {
01209 {
01210 {
01211 DDS::BuiltinTopicKey_t() ,
01212 qos_.user_data
01213 },
01214
01215 #if defined(OPENDDS_SECURITY)
01216 identity_token_,
01217 permissions_token_,
01218 #else
01219 DDS::Security::Token(),
01220 DDS::Security::Token(),
01221 #endif
01222
01223 qos_.property,
01224
01225 #if defined(OPENDDS_SECURITY)
01226 {
01227 security_attributes_to_bitmask(participant_sec_attr_),
01228 participant_sec_attr_.plugin_participant_attributes
01229 }
01230 #else
01231 DDS::Security::ParticipantSecurityInfo()
01232 #endif
01233
01234 },
01235
01236 #if defined(OPENDDS_SECURITY)
01237 identity_status_token_
01238 #else
01239 DDS::Security::Token()
01240 #endif
01241
01242 },
01243 {
01244 PROTOCOLVERSION,
01245 {gp[0], gp[1], gp[2], gp[3], gp[4], gp[5],
01246 gp[6], gp[7], gp[8], gp[9], gp[10], gp[11]},
01247 VENDORID_OPENDDS,
01248 false ,
01249 availableBuiltinEndpoints,
01250 sedp_unicast_,
01251 sedp_multicast_,
01252 nonEmptyList ,
01253 nonEmptyList ,
01254 {0 }
01255 },
01256 {
01257 static_cast<CORBA::Long>((disco_->resend_period() * LEASE_MULT).sec()),
01258 0
01259 }
01260 };
01261
01262 return pdata;
01263 }
01264
01265 bool Spdp::announce_domain_participant_qos()
01266 {
01267
01268 #if defined(OPENDDS_SECURITY)
01269 if (is_security_enabled())
01270 write_secure_updates();
01271 #endif
01272
01273 return true;
01274 }
01275
01276 Spdp::SpdpTransport::SpdpTransport(Spdp* outer, bool securityGuids)
01277 : outer_(outer), lease_duration_(outer_->disco_->resend_period() * LEASE_MULT)
01278 , buff_(64 * 1024)
01279 , wbuff_(64 * 1024)
01280 {
01281 hdr_.prefix[0] = 'R';
01282 hdr_.prefix[1] = 'T';
01283 hdr_.prefix[2] = 'P';
01284 hdr_.prefix[3] = 'S';
01285 hdr_.version = PROTOCOLVERSION;
01286 hdr_.vendorId = VENDORID_OPENDDS;
01287 std::memcpy(hdr_.guidPrefix, outer_->guid_.guidPrefix, sizeof(GuidPrefix_t));
01288 data_.smHeader.submessageId = DATA;
01289 data_.smHeader.flags = FLAG_E | FLAG_D;
01290 data_.smHeader.submessageLength = 0;
01291 data_.extraFlags = 0;
01292 data_.octetsToInlineQos = DATA_OCTETS_TO_IQOS;
01293 data_.readerId = ENTITYID_UNKNOWN;
01294 data_.writerId = ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER;
01295 data_.writerSN.high = 0;
01296 data_.writerSN.low = 0;
01297
01298
01299 const u_short port_common = outer_->disco_->pb() +
01300 (outer_->disco_->dg() * outer_->domain_),
01301 mc_port = port_common + outer_->disco_->d0();
01302
01303
01304 u_short participantId = securityGuids ? 0
01305 : (hdr_.guidPrefix[10] << 8) | hdr_.guidPrefix[11];
01306
01307 #ifdef OPENDDS_SAFETY_PROFILE
01308 const u_short startingParticipantId = participantId;
01309 #endif
01310
01311 while (!open_unicast_socket(port_common, participantId)) {
01312 ++participantId;
01313 }
01314
01315 #ifdef OPENDDS_SAFETY_PROFILE
01316 if (participantId > startingParticipantId && ACE_OS::getpid() == -1) {
01317
01318
01319
01320
01321 hdr_.guidPrefix[8] = static_cast<CORBA::Octet>(participantId >> 8);
01322 hdr_.guidPrefix[9] = static_cast<CORBA::Octet>(participantId & 0xFF);
01323 outer_->guid_.guidPrefix[8] = hdr_.guidPrefix[8];
01324 outer_->guid_.guidPrefix[9] = hdr_.guidPrefix[9];
01325 }
01326 #endif
01327
01328 OPENDDS_STRING mc_addr = outer_->disco_->default_multicast_group();
01329 ACE_INET_Addr default_multicast;
01330 if (0 != default_multicast.set(mc_port, mc_addr.c_str())) {
01331 ACE_DEBUG((
01332 LM_ERROR,
01333 ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::SpdpTransport() - ")
01334 ACE_TEXT("failed setting default_multicast address %C:%hd %p\n"),
01335 mc_addr.c_str(), mc_port, ACE_TEXT("ACE_INET_Addr::set")));
01336 throw std::runtime_error("failed to set default_multicast address");
01337 }
01338
01339 const OPENDDS_STRING& net_if = outer_->disco_->multicast_interface();
01340
01341 if (DCPS::DCPS_debug_level > 3) {
01342 ACE_DEBUG((LM_INFO,
01343 ACE_TEXT("(%P|%t) Spdp::SpdpTransport::SpdpTransport ")
01344 ACE_TEXT("joining group %C %C:%hd\n"),
01345 net_if.c_str (),
01346 mc_addr.c_str (),
01347 mc_port));
01348 }
01349
01350 #ifdef ACE_HAS_MAC_OSX
01351 multicast_socket_.opts(ACE_SOCK_Dgram_Mcast::OPT_BINDADDR_NO |
01352 ACE_SOCK_Dgram_Mcast::DEFOPT_NULLIFACE);
01353 #endif
01354
01355 if (0 != multicast_socket_.join(default_multicast, 1,
01356 net_if.empty() ? 0 :
01357 ACE_TEXT_CHAR_TO_TCHAR(net_if.c_str()))) {
01358 ACE_ERROR((LM_ERROR,
01359 ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::SpdpTransport() - ")
01360 ACE_TEXT("failed to join multicast group %C:%hd %p\n"),
01361 mc_addr.c_str(), mc_port, ACE_TEXT("ACE_SOCK_Dgram_Mcast::join")));
01362 throw std::runtime_error("failed to join multicast group");
01363 }
01364
01365 send_addrs_.insert(default_multicast);
01366
01367 typedef RtpsDiscovery::AddrVec::iterator iter;
01368 for (iter it = outer_->disco_->spdp_send_addrs().begin(),
01369 end = outer_->disco_->spdp_send_addrs().end(); it != end; ++it) {
01370 send_addrs_.insert(ACE_INET_Addr(it->c_str()));
01371 }
01372 }
01373
01374 void
01375 Spdp::SpdpTransport::open()
01376 {
01377 ACE_Reactor* reactor = outer_->reactor();
01378 if (reactor->register_handler(unicast_socket_.get_handle(),
01379 this, ACE_Event_Handler::READ_MASK) != 0) {
01380 throw std::runtime_error("failed to register unicast input handler");
01381 }
01382
01383 if (reactor->register_handler(multicast_socket_.get_handle(),
01384 this, ACE_Event_Handler::READ_MASK) != 0) {
01385 throw std::runtime_error("failed to register multicast input handler");
01386 }
01387
01388 disco_resend_period_ = outer_->disco_->resend_period();
01389 last_disco_resend_ = 0;
01390
01391 ACE_Time_Value timer_period = disco_resend_period_ < MAX_SPDP_TIMER_PERIOD ? disco_resend_period_ : MAX_SPDP_TIMER_PERIOD;
01392
01393 if (-1 == reactor->schedule_timer(this, 0, ACE_Time_Value(0), timer_period)) {
01394 throw std::runtime_error("failed to schedule timer with reactor");
01395 }
01396 }
01397
01398 Spdp::SpdpTransport::~SpdpTransport()
01399 {
01400 if (DCPS::DCPS_debug_level > 3) {
01401 ACE_DEBUG((LM_INFO, ACE_TEXT("(%P|%t) SpdpTransport::~SpdpTransport\n")));
01402 }
01403 try {
01404 dispose_unregister();
01405 }
01406 catch (const CORBA::Exception& ex) {
01407 if (DCPS::DCPS_debug_level > 0) {
01408 ACE_DEBUG((LM_WARNING,
01409 ACE_TEXT("(%P|%t) WARNING: Exception caught in ")
01410 ACE_TEXT("SpdpTransport::~SpdpTransport: %C\n"),
01411 ex._info().c_str()));
01412 }
01413 }
01414 {
01415
01416 ACE_GUARD(ACE_Thread_Mutex, g, outer_->lock_);
01417 outer_->eh_shutdown_ = true;
01418 }
01419 outer_->shutdown_cond_.signal();
01420 unicast_socket_.close();
01421 multicast_socket_.close();
01422 }
01423
01424 void
01425 Spdp::SpdpTransport::dispose_unregister()
01426 {
01427
01428 data_.writerSN.high = seq_.getHigh();
01429 data_.writerSN.low = seq_.getLow();
01430 data_.smHeader.flags = FLAG_E | FLAG_Q | FLAG_K_IN_DATA;
01431 data_.inlineQos.length(1);
01432 static const StatusInfo_t dispose_unregister = { {0, 0, 0, 3} };
01433 data_.inlineQos[0].status_info(dispose_unregister);
01434
01435 ParameterList plist(1);
01436 plist.length(1);
01437 plist[0].guid(outer_->guid_);
01438 plist[0]._d(PID_PARTICIPANT_GUID);
01439
01440 wbuff_.reset();
01441 DCPS::Serializer ser(&wbuff_, false, DCPS::Serializer::ALIGN_CDR);
01442 CORBA::UShort options = 0;
01443 if (!(ser << hdr_) || !(ser << data_) || !(ser << encap_LE) || !(ser << options)
01444 || !(ser << plist)) {
01445 ACE_ERROR((LM_ERROR,
01446 ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::dispose_unregister() - ")
01447 ACE_TEXT("failed to serialize headers for dispose/unregister\n")));
01448 return;
01449 }
01450
01451 typedef OPENDDS_SET(ACE_INET_Addr)::const_iterator iter_t;
01452 for (iter_t iter = send_addrs_.begin(); iter != send_addrs_.end(); ++iter) {
01453 const ssize_t res =
01454 unicast_socket_.send(wbuff_.rd_ptr(), wbuff_.length(), *iter);
01455 if (res < 0) {
01456 ACE_TCHAR addr_buff[256] = {};
01457 iter->addr_to_string(addr_buff, 256, 0);
01458 ACE_ERROR((LM_ERROR,
01459 ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::dispose_unregister() - ")
01460 ACE_TEXT("destination %s failed %p\n"), addr_buff, ACE_TEXT("send")));
01461 }
01462 }
01463 }
01464
01465 void
01466 Spdp::SpdpTransport::close()
01467 {
01468 if (DCPS::DCPS_debug_level > 3) {
01469 ACE_DEBUG((LM_INFO, ACE_TEXT("(%P|%t) SpdpTransport::close\n")));
01470 }
01471 ACE_Reactor* reactor = outer_->reactor();
01472 reactor->cancel_timer(this);
01473 const ACE_Reactor_Mask mask =
01474 ACE_Event_Handler::READ_MASK | ACE_Event_Handler::DONT_CALL;
01475 reactor->remove_handler(multicast_socket_.get_handle(), mask);
01476 reactor->remove_handler(unicast_socket_.get_handle(), mask);
01477 }
01478
01479 void
01480 Spdp::SpdpTransport::write()
01481 {
01482 ACE_GUARD(ACE_Thread_Mutex, g, outer_->lock_);
01483 write_i();
01484 }
01485
01486 void
01487 Spdp::SpdpTransport::write_i()
01488 {
01489 #if defined(OPENDDS_SECURITY)
01490 const Security::SPDPdiscoveredParticipantData& pdata =
01491 outer_->build_local_pdata(outer_->is_security_enabled() ?
01492 Security::DPDK_ENHANCED :
01493 Security::DPDK_ORIGINAL);
01494 #else
01495 const Security::SPDPdiscoveredParticipantData& pdata =
01496 outer_->build_local_pdata(Security::DPDK_ORIGINAL);
01497 #endif
01498
01499 data_.writerSN.high = seq_.getHigh();
01500 data_.writerSN.low = seq_.getLow();
01501 ++seq_;
01502
01503 ParameterList plist;
01504 if (ParameterListConverter::to_param_list(pdata, plist) < 0) {
01505 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
01506 ACE_TEXT("Spdp::SpdpTransport::write() - ")
01507 ACE_TEXT("failed to convert from SPDPdiscoveredParticipantData ")
01508 ACE_TEXT("to ParameterList\n")));
01509 return;
01510 }
01511
01512 wbuff_.reset();
01513 CORBA::UShort options = 0;
01514 DCPS::Serializer ser(&wbuff_, false, DCPS::Serializer::ALIGN_CDR);
01515 if (!(ser << hdr_) || !(ser << data_) || !(ser << encap_LE) || !(ser << options)
01516 || !(ser << plist)) {
01517 ACE_ERROR((LM_ERROR,
01518 ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::write() - ")
01519 ACE_TEXT("failed to serialize headers for SPDP\n")));
01520 return;
01521 }
01522
01523 typedef OPENDDS_SET(ACE_INET_Addr)::const_iterator iter_t;
01524 for (iter_t iter = send_addrs_.begin(); iter != send_addrs_.end(); ++iter) {
01525 const ssize_t res =
01526 unicast_socket_.send(wbuff_.rd_ptr(), wbuff_.length(), *iter);
01527 if (res < 0) {
01528 ACE_TCHAR addr_buff[256] = {};
01529 iter->addr_to_string(addr_buff, 256, 0);
01530 ACE_ERROR((LM_ERROR,
01531 ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::write() - ")
01532 ACE_TEXT("destination %s failed %p\n"), addr_buff, ACE_TEXT("send")));
01533 }
01534 }
01535 }
01536
01537 int
01538 Spdp::SpdpTransport::handle_timeout(const ACE_Time_Value& tv, const void*)
01539 {
01540 if (tv > last_disco_resend_ + disco_resend_period_) {
01541 write();
01542 outer_->remove_expired_participants();
01543 last_disco_resend_ = tv;
01544 }
01545
01546 #if defined(OPENDDS_SECURITY)
01547 outer_->check_auth_states(tv);
01548 #endif
01549
01550 return 0;
01551 }
01552
01553 int
01554 Spdp::SpdpTransport::handle_input(ACE_HANDLE h)
01555 {
01556 const ACE_SOCK_Dgram& socket = (h == unicast_socket_.get_handle())
01557 ? unicast_socket_ : multicast_socket_;
01558 ACE_INET_Addr remote;
01559 buff_.reset();
01560 const ssize_t bytes = socket.recv(buff_.wr_ptr(), buff_.space(), remote);
01561
01562 if (bytes > 0) {
01563 buff_.wr_ptr(bytes);
01564 } else if (bytes == 0) {
01565 return -1;
01566 } else {
01567 ACE_DEBUG((
01568 LM_ERROR,
01569 ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::handle_input() - ")
01570 ACE_TEXT("error reading from %C socket %p\n")
01571 , (h == unicast_socket_.get_handle()) ? "unicast" : "multicast",
01572 ACE_TEXT("ACE_SOCK_Dgram::recv")));
01573 return -1;
01574 }
01575
01576
01577 if ((buff_.size() >= 4) && (!ACE_OS::memcmp(buff_.rd_ptr(), "RTPX", 4))) {
01578 return 0;
01579 }
01580
01581 DCPS::Serializer ser(&buff_, false, DCPS::Serializer::ALIGN_CDR);
01582 Header header;
01583 if (!(ser >> header)) {
01584 ACE_ERROR((LM_ERROR,
01585 ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::handle_input() - ")
01586 ACE_TEXT("failed to deserialize RTPS header for SPDP\n")));
01587 return 0;
01588 }
01589
01590 while (buff_.length() > 3) {
01591 const char subm = buff_.rd_ptr()[0], flags = buff_.rd_ptr()[1];
01592 ser.swap_bytes((flags & FLAG_E) != ACE_CDR_BYTE_ORDER);
01593 const size_t start = buff_.length();
01594 CORBA::UShort submessageLength = 0;
01595 switch (subm) {
01596 case DATA: {
01597 DataSubmessage data;
01598 if (!(ser >> data)) {
01599 ACE_ERROR((
01600 LM_ERROR,
01601 ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::handle_input() - ")
01602 ACE_TEXT("failed to deserialize DATA header for SPDP\n")));
01603 return 0;
01604 }
01605 submessageLength = data.smHeader.submessageLength;
01606
01607 if (data.writerId != ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER) {
01608
01609
01610 break;
01611 }
01612
01613 ParameterList plist;
01614 if (data.smHeader.flags & (FLAG_D | FLAG_K_IN_DATA)) {
01615 ser.swap_bytes(!ACE_CDR_BYTE_ORDER);
01616 CORBA::UShort encap, options;
01617 if (!(ser >> encap) || (encap != encap_LE && encap != encap_BE)) {
01618 ACE_ERROR((LM_ERROR,
01619 ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::handle_input() - ")
01620 ACE_TEXT("failed to deserialize encapsulation header for SPDP\n")));
01621 return 0;
01622 }
01623 ser >> options;
01624
01625 ser.swap_bytes(((encap & 0x100) >> 8) != ACE_CDR_BYTE_ORDER);
01626 if (!(ser >> plist)) {
01627 ACE_ERROR((LM_ERROR,
01628 ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::handle_input() - ")
01629 ACE_TEXT("failed to deserialize data payload for SPDP\n")));
01630 return 0;
01631 }
01632 } else {
01633 plist.length(1);
01634 RepoId guid;
01635 std::memcpy(guid.guidPrefix, header.guidPrefix, sizeof(GuidPrefix_t));
01636 guid.entityId = ENTITYID_PARTICIPANT;
01637 plist[0].guid(guid);
01638 plist[0]._d(PID_PARTICIPANT_GUID);
01639 }
01640
01641 outer_->data_received(data, plist);
01642 break;
01643 }
01644 default:
01645 SubmessageHeader smHeader;
01646 if (!(ser >> smHeader)) {
01647 ACE_ERROR((
01648 LM_ERROR,
01649 ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::handle_input() - ")
01650 ACE_TEXT("failed to deserialize SubmessageHeader for SPDP\n")));
01651 return 0;
01652 }
01653 submessageLength = smHeader.submessageLength;
01654 break;
01655 }
01656 if (submessageLength && buff_.length()) {
01657 const size_t read = start - buff_.length();
01658 if (read < static_cast<size_t>(submessageLength + SMHDR_SZ)) {
01659 if (!ser.skip(static_cast<CORBA::UShort>(submessageLength + SMHDR_SZ
01660 - read))) {
01661 ACE_ERROR((LM_ERROR,
01662 ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::handle_input() - ")
01663 ACE_TEXT("failed to skip sub message length\n")));
01664 return 0;
01665 }
01666 }
01667 } else if (!submessageLength) {
01668 break;
01669 }
01670 }
01671
01672 return 0;
01673 }
01674
01675 int
01676 Spdp::SpdpTransport::handle_exception(ACE_HANDLE)
01677 {
01678 outer_->wait_for_acks().ack();
01679 return 0;
01680 }
01681
01682 void
01683 Spdp::SpdpTransport::acknowledge()
01684 {
01685 ACE_Reactor* reactor = outer_->reactor();
01686 reactor->notify(this);
01687 }
01688
01689 void
01690 Spdp::signal_liveliness(DDS::LivelinessQosPolicyKind kind)
01691 {
01692 sedp_.signal_liveliness(kind);
01693 }
01694
01695 bool
01696 Spdp::SpdpTransport::open_unicast_socket(u_short port_common,
01697 u_short participant_id)
01698 {
01699 const u_short uni_port = port_common + outer_->disco_->d1() +
01700 (outer_->disco_->pg() * participant_id);
01701
01702 ACE_INET_Addr local_addr;
01703 OPENDDS_STRING spdpaddr = outer_->disco_->spdp_local_address().c_str();
01704
01705 if (spdpaddr.empty()) {
01706 spdpaddr = "0.0.0.0";
01707 }
01708
01709 if (0 != local_addr.set(uni_port, spdpaddr.c_str())) {
01710 ACE_DEBUG((
01711 LM_ERROR,
01712 ACE_TEXT("(%P|%t) Spdp::SpdpTransport::open_unicast_socket() - ")
01713 ACE_TEXT("failed setting unicast local_addr to port %d %p\n"),
01714 uni_port, ACE_TEXT("ACE_INET_Addr::set")));
01715 throw std::runtime_error("failed to set unicast local address");
01716 }
01717
01718 if (!DCPS::open_appropriate_socket_type(unicast_socket_, local_addr)) {
01719 if (DCPS::DCPS_debug_level > 3) {
01720 ACE_DEBUG((
01721 LM_WARNING,
01722 ACE_TEXT("(%P|%t) Spdp::SpdpTransport::open_unicast_socket() - ")
01723 ACE_TEXT("failed to open_appropriate_socket_type unicast socket on port %d %p. ")
01724 ACE_TEXT("Trying next participantId...\n"),
01725 uni_port, ACE_TEXT("ACE_SOCK_Dgram::open")));
01726 }
01727 return false;
01728
01729 } else if (DCPS::DCPS_debug_level > 3) {
01730 ACE_DEBUG((
01731 LM_INFO,
01732 ACE_TEXT("(%P|%t) Spdp::SpdpTransport::open_unicast_socket() - ")
01733 ACE_TEXT("opened unicast socket on port %d\n"),
01734 uni_port));
01735 }
01736
01737 if (!DCPS::set_socket_multicast_ttl(unicast_socket_, outer_->disco_->ttl())) {
01738 ACE_ERROR((LM_ERROR,
01739 ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::open_unicast_socket() - ")
01740 ACE_TEXT("failed to set TTL value to %d ")
01741 ACE_TEXT("for port:%hd %p\n"),
01742 outer_->disco_->ttl(), uni_port, ACE_TEXT("DCPS::set_socket_multicast_ttl:")));
01743 throw std::runtime_error("failed to set TTL");
01744 }
01745 return true;
01746 }
01747
01748 bool
01749 Spdp::get_default_locators(const RepoId& part_id, DCPS::LocatorSeq& target,
01750 bool& inlineQos)
01751 {
01752 DiscoveredParticipantIter part_iter = participants_.find(part_id);
01753 if (part_iter == participants_.end()) {
01754 return false;
01755 } else {
01756 inlineQos = part_iter->second.pdata_.participantProxy.expectsInlineQos;
01757 DCPS::LocatorSeq& mc_source =
01758 part_iter->second.pdata_.participantProxy.defaultMulticastLocatorList;
01759 DCPS::LocatorSeq& uc_source =
01760 part_iter->second.pdata_.participantProxy.defaultUnicastLocatorList;
01761 CORBA::ULong mc_source_len = mc_source.length();
01762 CORBA::ULong uc_source_len = uc_source.length();
01763 CORBA::ULong target_len = target.length();
01764 target.length(mc_source_len + uc_source_len + target_len);
01765
01766 for (CORBA::ULong mci = 0; mci < mc_source.length(); ++mci) {
01767 target[target_len + mci] = mc_source[mci];
01768 }
01769
01770 for (CORBA::ULong uci = 0; uci < uc_source.length(); ++uci) {
01771 target[target_len + mc_source_len + uci] = uc_source[uci];
01772 }
01773 }
01774 return true;
01775 }
01776
01777 bool
01778 Spdp::associated() const
01779 {
01780 return !participants_.empty();
01781 }
01782
01783 bool
01784 Spdp::has_discovered_participant(const DCPS::RepoId& guid)
01785 {
01786 return participants_.find(guid) != participants_.end();
01787 }
01788
01789
01790 void
01791 Spdp::get_discovered_participant_ids(DCPS::RepoIdSet& results) const
01792 {
01793 DiscoveredParticipantMap::const_iterator idx;
01794 for (idx = participants_.begin(); idx != participants_.end(); ++idx)
01795 {
01796 results.insert(idx->first);
01797 }
01798 }
01799
01800 #if defined(OPENDDS_SECURITY)
01801 Spdp::ParticipantCryptoInfoPair
01802 Spdp::lookup_participant_crypto_info(const DCPS::RepoId& id) const
01803 {
01804 ParticipantCryptoInfoPair result = ParticipantCryptoInfoPair(DDS::HANDLE_NIL, DDS::Security::SharedSecretHandle_var());
01805
01806 ACE_Guard<ACE_Thread_Mutex> g(lock_, false);
01807 DiscoveredParticipantConstIter pi = participants_.find(id);
01808 if (pi != participants_.end()) {
01809 result.first = pi->second.crypto_handle_;
01810 result.second = pi->second.shared_secret_handle_;
01811 }
01812 return result;
01813 }
01814
01815 void
01816 Spdp::send_participant_crypto_tokens(const DCPS::RepoId& id)
01817 {
01818 if (crypto_tokens_.length() != 0) {
01819 DCPS::RepoId writer = guid_;
01820 writer.entityId = ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_WRITER;
01821
01822 DCPS::RepoId reader = id;
01823 reader.entityId = ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_READER;
01824
01825 DDS::Security::ParticipantVolatileMessageSecure msg;
01826 msg.message_identity.source_guid = writer;
01827 msg.message_class_id = DDS::Security::GMCLASSID_SECURITY_PARTICIPANT_CRYPTO_TOKENS;
01828 msg.destination_participant_guid = id;
01829 msg.destination_endpoint_guid = GUID_UNKNOWN;
01830 msg.source_endpoint_guid = GUID_UNKNOWN;
01831 msg.message_data = reinterpret_cast<const DDS::Security::DataHolderSeq&>(crypto_tokens_);
01832
01833 if (sedp_.write_volatile_message(msg, reader) != DDS::RETCODE_OK) {
01834 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: Spdp::send_participant_crypto_tokens() - ")
01835 ACE_TEXT("Unable to write volatile message.\n")));
01836 }
01837 }
01838 return;
01839 }
01840
01841 DDS::Security::PermissionsHandle
01842 Spdp::lookup_participant_permissions(const DCPS::RepoId& id) const
01843 {
01844 DDS::Security::PermissionsHandle result = DDS::HANDLE_NIL;
01845
01846 ACE_Guard<ACE_Thread_Mutex> g(lock_, false);
01847 DiscoveredParticipantConstIter pi = participants_.find(id);
01848 if (pi != participants_.end()) {
01849 result = pi->second.permissions_handle_;
01850 }
01851 return result;
01852 }
01853
01854 DCPS::AuthState
01855 Spdp::lookup_participant_auth_state(const DCPS::RepoId& id) const
01856 {
01857 DCPS::AuthState result = DCPS::AS_UNKNOWN;
01858
01859 ACE_Guard<ACE_Thread_Mutex> g(lock_, false);
01860 DiscoveredParticipantConstIter pi = participants_.find(id);
01861 if (pi != participants_.end()) {
01862 result = pi->second.auth_state_;
01863 }
01864 return result;
01865 }
01866 #endif
01867
01868 }
01869 }
01870
01871 OPENDDS_END_VERSIONED_NAMESPACE_DECL