Spdp.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "Spdp.h"
00009 
00010 #include "BaseMessageTypes.h"
00011 #include "MessageTypes.h"
00012 #include "ParameterListConverter.h"
00013 #include "RtpsCoreTypeSupportImpl.h"
00014 #include "RtpsDiscovery.h"
00015 
00016 #include "dds/DdsDcpsGuidC.h"
00017 
00018 #include "dds/DCPS/Service_Participant.h"
00019 #include "dds/DCPS/BuiltInTopicUtils.h"
00020 #include "dds/DCPS/GuidConverter.h"
00021 #include "dds/DCPS/Qos_Helper.h"
00022 
00023 #if defined(OPENDDS_SECURITY)
00024 #include "SecurityHelpers.h"
00025 #include "dds/DCPS/security/framework/SecurityRegistry.h"
00026 #endif
00027 
00028 #include "ace/Reactor.h"
00029 #include "ace/OS_NS_sys_socket.h" // For setsockopt()
00030 
00031 #include <cstring>
00032 #include <stdexcept>
00033 
00034 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00035 
00036 namespace OpenDDS {
00037 namespace RTPS {
00038 using DCPS::RepoId;
00039 
00040 namespace {
00041   // Multiplier for resend period -> lease duration conversion,
00042   // if a remote discovery misses this many resends from us it will consider
00043   // us offline / unreachable.
00044   const int LEASE_MULT = 10;
00045   const CORBA::UShort encap_LE = 0x0300; // {PL_CDR_LE} in LE
00046   const CORBA::UShort encap_BE = 0x0200; // {PL_CDR_BE} in LE
00047 
00048   const ACE_Time_Value MAX_SPDP_TIMER_PERIOD(0, 10000);
00049   const ACE_Time_Value MAX_AUTH_TIME(3, 0);
00050   const ACE_Time_Value AUTH_RESEND_PERIOD(0, 25000);
00051 
00052   bool disposed(const ParameterList& inlineQos)
00053   {
00054     for (CORBA::ULong i = 0; i < inlineQos.length(); ++i) {
00055       if (inlineQos[i]._d() == PID_STATUS_INFO) {
00056         return inlineQos[i].status_info().value[3] & 1;
00057       }
00058     }
00059     return false;
00060   }
00061 
00062 #if defined(OPENDDS_SECURITY)
00063   bool operator==(const DDS::Security::Property_t& rhs, const DDS::Security::Property_t& lhs) {
00064     return rhs.name == lhs.name && rhs.value == lhs.value && rhs.propagate == lhs.propagate;
00065   }
00066 
00067   bool operator==(const DDS::Security::BinaryProperty_t& rhs, const DDS::Security::BinaryProperty_t& lhs) {
00068     return rhs.name == lhs.name && rhs.value == lhs.value && rhs.propagate == lhs.propagate;
00069   }
00070 
00071   bool operator==(const DDS::Security::PropertySeq& rhs, const DDS::Security::PropertySeq& lhs) {
00072     bool result = (rhs.length() == lhs.length());
00073     for (size_t i = 0; result && i < rhs.length(); ++i) {
00074       result = (rhs[i] == lhs[i]);
00075     }
00076     return result;
00077   }
00078 
00079   bool operator==(const DDS::Security::BinaryPropertySeq& rhs, const DDS::Security::BinaryPropertySeq& lhs) {
00080     bool result = (rhs.length() == lhs.length());
00081     for (size_t i = 0; result && i < rhs.length(); ++i) {
00082       result = (rhs[i] == lhs[i]);
00083     }
00084     return result;
00085   }
00086 
00087   bool operator==(const DDS::Security::DataHolder& rhs, const DDS::Security::DataHolder& lhs) {
00088     return rhs.class_id == lhs.class_id && rhs.properties == lhs.properties && rhs.binary_properties == lhs.binary_properties;
00089   }
00090 
00091   void init_participant_sec_attributes(DDS::Security::ParticipantSecurityAttributes& attr)
00092   {
00093     attr.allow_unauthenticated_participants = false;
00094     attr.is_access_protected = false;
00095     attr.is_rtps_protected = false;
00096     attr.is_discovery_protected = false;
00097     attr.is_liveliness_protected = false;
00098     attr.plugin_participant_attributes = 0;
00099     attr.ac_endpoint_properties.length(0);
00100   }
00101 #endif
00102 
00103   GUID_t make_guid(const DCPS::GuidPrefix_t prefix, const DCPS::EntityId_t entity)
00104   {
00105     GUID_t result;
00106     std::memcpy(result.guidPrefix, prefix, sizeof(GuidPrefix_t));
00107     std::memcpy(&result.entityId, &entity, sizeof(EntityId_t));
00108     return result;
00109   }
00110 }
00111 
00112 void Spdp::init(DDS::DomainId_t /*domain*/,
00113                        DCPS::RepoId& guid,
00114                        const DDS::DomainParticipantQos& /*qos*/,
00115                        RtpsDiscovery* disco)
00116 {
00117   guid = guid_; // may have changed in SpdpTransport constructor
00118   sedp_.ignore(guid);
00119   sedp_.init(guid_, *disco, domain_);
00120 
00121   // Append metatraffic unicast locator
00122   sedp_.unicast_locators(sedp_unicast_);
00123 
00124   if (disco->sedp_multicast()) { // Append metatraffic multicast locator
00125     const ACE_INET_Addr& mc_addr = sedp_.multicast_group();
00126     DCPS::Locator_t mc_locator;
00127     mc_locator.kind = address_to_kind(mc_addr);
00128     mc_locator.port = mc_addr.get_port_number();
00129     address_to_bytes(mc_locator.address, mc_addr);
00130     sedp_multicast_.length(1);
00131     sedp_multicast_[0] = mc_locator;
00132   }
00133 }
00134 
00135 
00136 Spdp::Spdp(DDS::DomainId_t domain,
00137            RepoId& guid,
00138            const DDS::DomainParticipantQos& qos,
00139            RtpsDiscovery* disco)
00140 
00141   : DCPS::LocalParticipant<Sedp>(qos)
00142   , disco_(disco)
00143   , domain_(domain)
00144   , guid_(guid)
00145   , tport_(new SpdpTransport(this, false))
00146   , eh_(tport_)
00147   , eh_shutdown_(false)
00148   , shutdown_cond_(lock_)
00149   , shutdown_flag_(false)
00150   , sedp_(guid_, *this, lock_)
00151 #if defined(OPENDDS_SECURITY)
00152   , security_config_()
00153   , security_enabled_(false)
00154 #endif
00155 {
00156   ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00157 
00158   init(domain, guid, qos, disco);
00159 
00160 #if defined(OPENDDS_SECURITY)
00161   init_participant_sec_attributes(participant_sec_attr_);
00162 #endif
00163 
00164 }
00165 
00166 #if defined(OPENDDS_SECURITY)
00167 Spdp::Spdp(DDS::DomainId_t domain,
00168            const DCPS::RepoId& guid,
00169            const DDS::DomainParticipantQos& qos,
00170            RtpsDiscovery* disco,
00171            DDS::Security::IdentityHandle identity_handle,
00172            DDS::Security::PermissionsHandle perm_handle,
00173            DDS::Security::ParticipantCryptoHandle crypto_handle)
00174 
00175   : DCPS::LocalParticipant<Sedp>(qos)
00176   , disco_(disco)
00177   , domain_(domain)
00178   , guid_(guid)
00179   , tport_(new SpdpTransport(this, true))
00180   , eh_(tport_)
00181   , eh_shutdown_(false)
00182   , shutdown_cond_(lock_)
00183   , shutdown_flag_(false)
00184   , sedp_(guid_, *this, lock_)
00185   , security_config_(Security::SecurityRegistry::instance()->default_config())
00186   , security_enabled_(security_config_->get_authentication() && security_config_->get_access_control() && security_config_->get_crypto_key_factory() && security_config_->get_crypto_key_exchange())
00187   , identity_handle_(identity_handle)
00188   , permissions_handle_(perm_handle)
00189   , crypto_handle_(crypto_handle)
00190 {
00191   ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00192 
00193   init(domain, guid_, qos, disco);
00194 
00195   DDS::Security::Authentication_var auth = security_config_->get_authentication();
00196   DDS::Security::AccessControl_var access = security_config_->get_access_control();
00197 
00198   DDS::Security::SecurityException se = {"", 0, 0};
00199 
00200   if (auth->get_identity_token(identity_token_, identity_handle_, se) == false) {
00201     ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00202       ACE_TEXT("Spdp::Spdp() - ")
00203       ACE_TEXT("unable to get identity token. Security Exception[%d.%d]: %C\n"),
00204         se.code, se.minor_code, se.message.in()));
00205     throw std::runtime_error("unable to get identity token");
00206   }
00207   if (auth->get_identity_status_token(identity_status_token_, identity_handle_, se) == false) {
00208     ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00209       ACE_TEXT("Spdp::Spdp() - ")
00210       ACE_TEXT("unable to get identity status token. Security Exception[%d.%d]: %C\n"),
00211         se.code, se.minor_code, se.message.in()));
00212     throw std::runtime_error("unable to get identity status token");
00213   }
00214   if (access->get_permissions_token(permissions_token_, permissions_handle_, se) == false) {
00215     ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00216       ACE_TEXT("Spdp::Spdp() - ")
00217       ACE_TEXT("unable to get permissions handle. Security Exception[%d.%d]: %C\n"),
00218         se.code, se.minor_code, se.message.in()));
00219     throw std::runtime_error("unable to get permissions token");
00220   }
00221   if (access->get_permissions_credential_token(permissions_credential_token_, permissions_handle_, se) == false) {
00222     ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00223       ACE_TEXT("Spdp::Spdp() - ")
00224       ACE_TEXT("unable to get permissions credential handle. Security Exception[%d.%d]: %C\n"),
00225         se.code, se.minor_code, se.message.in()));
00226     throw std::runtime_error("unable to get permissions credential token");
00227   }
00228 
00229   if (auth->set_permissions_credential_and_token(identity_handle_, permissions_credential_token_, permissions_token_, se) == false) {
00230     ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00231       ACE_TEXT("Spdp::Spdp() - ")
00232       ACE_TEXT("unable to set permissions credential and token. Security Exception[%d.%d]: %C\n"),
00233         se.code, se.minor_code, se.message.in()));
00234     throw std::runtime_error("unable to set permissions credential and token");
00235   }
00236 
00237   init_participant_sec_attributes(participant_sec_attr_);
00238 
00239   if (access->get_participant_sec_attributes(permissions_handle_, participant_sec_attr_, se) == false) {
00240     ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00241       ACE_TEXT("Spdp::Spdp() - ")
00242       ACE_TEXT("failed to retrieve participant security attributes. Security Exception[%d.%d]: %C\n"),
00243         se.code, se.minor_code, se.message.in()));
00244     throw std::runtime_error("unable to retrieve participant security attributes");
00245   }
00246 
00247   sedp_.init_security(identity_handle, perm_handle, crypto_handle);
00248 }
00249 #endif
00250 
00251 Spdp::~Spdp()
00252 {
00253   shutdown_flag_ = true;
00254   {
00255     ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00256     if (DCPS::DCPS_debug_level > 3) {
00257       ACE_DEBUG((LM_INFO,
00258                  ACE_TEXT("(%P|%t) Spdp::~Spdp ")
00259                  ACE_TEXT("remove discovered participants\n")));
00260     }
00261 
00262 #if defined(OPENDDS_SECURITY)
00263     write_secure_disposes();
00264 #endif
00265 
00266     // Iterate through a copy of the repo Ids, rather than the map
00267     //   as it gets unlocked in remove_discovered_participant()
00268     DCPS::RepoIdSet participant_ids;
00269     get_discovered_participant_ids(participant_ids);
00270     for (DCPS::RepoIdSet::iterator participant_id = participant_ids.begin();
00271          participant_id != participant_ids.end();
00272          ++participant_id)
00273     {
00274       DiscoveredParticipantIter part = participants_.find(*participant_id);
00275       if (part != participants_.end()) {
00276         remove_discovered_participant(part);
00277       }
00278     }
00279   }
00280 
00281   // ensure sedp's task queue is drained before data members are being
00282   // deleted
00283   sedp_.shutdown();
00284 
00285   // release lock for reset of event handler, which may delete transport
00286   tport_->close();
00287   eh_.reset();
00288   {
00289     ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00290     while (!eh_shutdown_) {
00291       shutdown_cond_.wait();
00292     }
00293   }
00294 }
00295 
00296 #if defined(OPENDDS_SECURITY)
00297 void
00298 Spdp::write_secure_updates()
00299 {
00300   if (shutdown_flag_.value()) { return; }
00301 
00302   ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00303 
00304   const Security::SPDPdiscoveredParticipantData& pdata =
00305     build_local_pdata(Security::DPDK_SECURE);
00306 
00307   for (DiscoveredParticipantIter pi = participants_.begin(); pi != participants_.end(); ++pi) {
00308     if (pi->second.auth_state_ == DCPS::AS_AUTHENTICATED) {
00309       sedp_.write_dcps_participant_secure(pdata, pi->first);
00310     }
00311   }
00312 }
00313 
00314 void
00315 Spdp::write_secure_disposes()
00316 {
00317   sedp_.write_dcps_participant_dispose(guid_);
00318 }
00319 #endif
00320 
00321 void
00322 Spdp::handle_participant_data(DCPS::MessageId id, const Security::SPDPdiscoveredParticipantData& cpdata)
00323 {
00324   const ACE_Time_Value now = ACE_OS::gettimeofday();
00325 
00326   // Make a (non-const) copy so we can tweak values below
00327   Security::SPDPdiscoveredParticipantData pdata(cpdata);
00328 
00329   const DCPS::RepoId guid = make_guid(pdata.participantProxy.guidPrefix, DCPS::ENTITYID_PARTICIPANT);
00330 
00331   ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00332   if (sedp_.ignoring(guid)) {
00333     // Ignore, this is our domain participant or one that the user has
00334     // asked us to ignore.
00335     return;
00336   }
00337 
00338   // Find the participant - iterator valid only as long as we hold the lock
00339   DiscoveredParticipantIter iter = participants_.find(guid);
00340 
00341   if (iter == participants_.end()) {
00342 
00343     // Trying to delete something that doesn't exist is a NOOP
00344     if (id == DCPS::DISPOSE_INSTANCE || id == DCPS::DISPOSE_UNREGISTER_INSTANCE) {
00345       return;
00346     }
00347 
00348     // copy guid prefix (octet[12]) into BIT key (long[3])
00349     std::memcpy(pdata.ddsParticipantDataSecure.base.base.key.value,
00350                 pdata.participantProxy.guidPrefix,
00351                 sizeof(pdata.ddsParticipantDataSecure.base.base.key.value));
00352 
00353     if (DCPS::DCPS_debug_level) {
00354       DCPS::GuidConverter local(guid_), remote(guid);
00355       ACE_DEBUG((LM_DEBUG,
00356         ACE_TEXT("(%P|%t) Spdp::data_received - %C discovered %C lease %ds\n"),
00357         OPENDDS_STRING(local).c_str(), OPENDDS_STRING(remote).c_str(),
00358         pdata.leaseDuration.seconds));
00359     }
00360 
00361     // add a new participant
00362     participants_[guid] = DiscoveredParticipant(pdata, now);
00363     DiscoveredParticipant& dp = participants_[guid];
00364 
00365 #if defined(OPENDDS_SECURITY)
00366     if (is_security_enabled()) {
00367       // Associate the stateless reader / writer for handshakes & auth requests
00368       sedp_.associate_preauth(dp.pdata_);
00369 
00370       // If we've gotten auth requests for this (previously undiscovered) participant, pull in the tokens now
00371       PendingRemoteAuthTokenMap::iterator token_iter = pending_remote_auth_tokens_.find(guid);
00372       if (token_iter != pending_remote_auth_tokens_.end()) {
00373         dp.remote_auth_request_token_ = token_iter->second;
00374         pending_remote_auth_tokens_.erase(token_iter);
00375       }
00376     }
00377 #endif
00378 
00379     // Since we've just seen a new participant, let's send out our
00380     // own announcement, so they don't have to wait.
00381     this->tport_->write_i();
00382 
00383 #if defined(OPENDDS_SECURITY)
00384     if (is_security_enabled()) {
00385       bool has_security_data = dp.pdata_.dataKind == Security::DPDK_ENHANCED ||
00386                                dp.pdata_.dataKind == Security::DPDK_SECURE;
00387 
00388       if (has_security_data == false) {
00389         if (participant_sec_attr_.allow_unauthenticated_participants == false) {
00390           ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DEBUG: Spdp::data_received - ")
00391             ACE_TEXT("Incompatible security attributes in discovered participant: %C\n"),
00392             std::string(DCPS::GuidConverter(guid)).c_str()));
00393             participants_.erase(guid);
00394         } else { // allow_unauthenticated_participants == true
00395           dp.auth_state_ = DCPS::AS_UNAUTHENTICATED;
00396           match_unauthenticated(guid, dp);
00397         }
00398       } else { // has_security_data == true
00399         dp.identity_token_ = pdata.ddsParticipantDataSecure.base.identity_token;
00400         dp.permissions_token_ = pdata.ddsParticipantDataSecure.base.permissions_token;
00401         dp.property_qos_ = pdata.ddsParticipantDataSecure.base.property;
00402         dp.security_info_ = pdata.ddsParticipantDataSecure.base.security_info;
00403 
00404         attempt_authentication(guid, dp);
00405         if (dp.auth_state_ == DCPS::AS_UNAUTHENTICATED) {
00406           if (participant_sec_attr_.allow_unauthenticated_participants == false) {
00407             ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DEBUG: Spdp::data_received - ")
00408               ACE_TEXT("Incompatible security attributes in discovered participant: %C\n"),
00409               std::string(DCPS::GuidConverter(guid)).c_str()));
00410             participants_.erase(guid);
00411           } else { // allow_unauthenticated_participants == true
00412             dp.auth_state_ = DCPS::AS_UNAUTHENTICATED;
00413             match_unauthenticated(guid, dp);
00414           }
00415         } else if (dp.auth_state_ == DCPS::AS_AUTHENTICATED) {
00416           if (match_authenticated(guid, dp) == false) {
00417             participants_.erase(guid);
00418           }
00419         }
00420         // otherwise just return, since we're waiting for input to finish authentication
00421       }
00422     } else {
00423 
00424       dp.auth_state_ = DCPS::AS_UNAUTHENTICATED;
00425       match_unauthenticated(guid, dp);
00426 
00427     }
00428 #else
00429     match_unauthenticated(guid, dp);
00430 #endif
00431 
00432   } else {
00433 
00434 #if defined(OPENDDS_SECURITY)
00435     // Non-secure updates for authenticated participants are used for liveliness but
00436     // are otherwise ignored. Non-secure dispose messages are ignored completely.
00437     if (iter->second.auth_state_ == DCPS::AS_AUTHENTICATED &&
00438         pdata.dataKind != Security::DPDK_SECURE &&
00439         id != DCPS::DISPOSE_INSTANCE &&
00440         id != DCPS::DISPOSE_UNREGISTER_INSTANCE)
00441     {
00442       iter->second.last_seen_ = now;
00443       return;
00444     }
00445 #endif
00446 
00447     if (id == DCPS::DISPOSE_INSTANCE || id == DCPS::DISPOSE_UNREGISTER_INSTANCE) {
00448       remove_discovered_participant(iter);
00449       return;
00450     }
00451 
00452     // Must unlock when calling into part_bit() as it may call back into us
00453     ACE_Reverse_Lock<ACE_Thread_Mutex> rev_lock(lock_);
00454 
00455     // update an existing participant
00456     pdata.ddsParticipantDataSecure.base.base.key = iter->second.pdata_.ddsParticipantDataSecure.base.base.key;
00457 #ifndef OPENDDS_SAFETY_PROFILE
00458     using DCPS::operator!=;
00459 #endif
00460     if (iter->second.pdata_.ddsParticipantDataSecure.base.base.user_data !=
00461         pdata.ddsParticipantDataSecure.base.base.user_data) {
00462       iter->second.pdata_.ddsParticipantDataSecure.base.base.user_data =
00463         pdata.ddsParticipantDataSecure.base.base.user_data;
00464 #ifndef DDS_HAS_MINIMUM_BIT
00465       DCPS::ParticipantBuiltinTopicDataDataReaderImpl* bit = part_bit();
00466       if (bit) {
00467         ACE_GUARD(ACE_Reverse_Lock<ACE_Thread_Mutex>, rg, rev_lock);
00468         bit->store_synthetic_data(pdata.ddsParticipantDataSecure.base.base,
00469                                   DDS::NOT_NEW_VIEW_STATE);
00470       }
00471 #endif /* DDS_HAS_MINIMUM_BIT */
00472       // Perform search again, so iterator becomes valid
00473       iter = participants_.find(guid);
00474     }
00475     // Participant may have been removed while lock released
00476     if (iter != participants_.end()) {
00477       iter->second.pdata_ = pdata;
00478       iter->second.last_seen_ = now;
00479     }
00480   }
00481 }
00482 
00483 void
00484 Spdp::data_received(const DataSubmessage& data, const ParameterList& plist)
00485 {
00486   if (shutdown_flag_.value()) { return; }
00487 
00488   Security::SPDPdiscoveredParticipantData pdata;
00489   if (ParameterListConverter::from_param_list(plist, pdata) < 0) {
00490     ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: Spdp::data_received - ")
00491       ACE_TEXT("failed to convert from ParameterList to ")
00492       ACE_TEXT("SPDPdiscoveredParticipantData\n")));
00493     return;
00494   }
00495 
00496   DCPS::MessageId msg_id = (data.inlineQos.length() && disposed(data.inlineQos)) ? DCPS::DISPOSE_INSTANCE : DCPS::SAMPLE_DATA;
00497 
00498   handle_participant_data(msg_id, pdata);
00499 }
00500 
00501 void
00502 Spdp::match_unauthenticated(const DCPS::RepoId& guid, DiscoveredParticipant& dp)
00503 {
00504   // Must unlock when calling into part_bit() as it may call back into us
00505   ACE_Reverse_Lock<ACE_Thread_Mutex> rev_lock(lock_);
00506 
00507   DDS::InstanceHandle_t bit_instance_handle = DDS::HANDLE_NIL;
00508 #ifndef DDS_HAS_MINIMUM_BIT
00509   DCPS::ParticipantBuiltinTopicDataDataReaderImpl* bit = part_bit();
00510   if (bit) {
00511     ACE_GUARD(ACE_Reverse_Lock<ACE_Thread_Mutex>, rg, rev_lock);
00512     bit_instance_handle =
00513       bit->store_synthetic_data(dp.pdata_.ddsParticipantDataSecure.base.base,
00514                                 DDS::NEW_VIEW_STATE);
00515   }
00516 #endif /* DDS_HAS_MINIMUM_BIT */
00517 
00518   // notify Sedp of association
00519   // Sedp may call has_discovered_participant, which is why the participant must be added before this call to associate.
00520   sedp_.associate(dp.pdata_);
00521 
00522   // Iterator is no longer valid
00523   DiscoveredParticipantIter iter = participants_.find(guid);
00524   if (iter != participants_.end()) {
00525     iter->second.bit_ih_ = bit_instance_handle;
00526   }
00527 }
00528 
00529 #if defined(OPENDDS_SECURITY)
00530 void
00531 Spdp::handle_auth_request(const DDS::Security::ParticipantStatelessMessage& msg)
00532 {
00533   // If this message wasn't intended for us, ignore handshake message
00534   if (msg.destination_participant_guid != guid_ || msg.message_data.length() == 0) {
00535     return;
00536   }
00537 
00538   const ACE_Time_Value time = ACE_OS::gettimeofday();
00539 
00540   RepoId guid = msg.message_identity.source_guid;
00541   guid.entityId = DCPS::ENTITYID_PARTICIPANT;
00542 
00543   ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00544 
00545   if (sedp_.ignoring(guid)) {
00546     // Ignore, this is our domain participant or one that the user has
00547     // asked us to ignore.
00548     return;
00549   }
00550 
00551   DiscoveredParticipantMap::iterator iter = participants_.find(guid);
00552 
00553   if (iter == participants_.end()) {
00554     // We're simply caching this for later, since we can't actually do much without the SPDP announcement itself
00555     pending_remote_auth_tokens_[guid] = msg.message_data[0];
00556   } else {
00557     iter->second.remote_auth_request_token_ = msg.message_data[0];
00558   }
00559 }
00560 
00561 namespace {
00562   void set_participant_guid(const GUID_t& guid, ParameterList& param_list)
00563   {
00564     Parameter gp_param;
00565     gp_param.guid(guid);
00566     gp_param._d(PID_PARTICIPANT_GUID);
00567     param_list.length(param_list.length() + 1);
00568     param_list[param_list.length() - 1] = gp_param;
00569   }
00570 }
00571 
00572 void
00573 Spdp::handle_handshake_message(const DDS::Security::ParticipantStatelessMessage& msg)
00574 {
00575   DDS::Security::SecurityException se = {"", 0, 0};
00576   Security::Authentication_var auth = security_config_->get_authentication();
00577 
00578   // If this message wasn't intended for us, ignore handshake message
00579   if (msg.destination_participant_guid != guid_ || !msg.message_data.length()) {
00580     return;
00581   }
00582 
00583   RepoId src_participant = msg.message_identity.source_guid;
00584   src_participant.entityId = DCPS::ENTITYID_PARTICIPANT;
00585 
00586   ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00587 
00588   // If discovery hasn't initialized / validated this participant yet, ignore handshake messages
00589   DiscoveredParticipantIter iter = participants_.find(src_participant);
00590   if (iter == participants_.end()) {
00591     ACE_DEBUG((LM_WARNING,
00592       ACE_TEXT("(%P|%t) Spdp::handle_handshake_message() - ")
00593       ACE_TEXT("received handshake for undiscovered participant %C. Ignoring.\n"),
00594                std::string(DCPS::GuidConverter(src_participant)).c_str()));
00595     return;
00596   }
00597 
00598   DiscoveredParticipant& dp = iter->second;
00599 
00600   DCPS::RepoId writer = guid_;
00601   writer.entityId = ENTITYID_P2P_BUILTIN_PARTICIPANT_STATELESS_WRITER;
00602 
00603   DCPS::RepoId reader = src_participant;
00604   reader.entityId = ENTITYID_P2P_BUILTIN_PARTICIPANT_STATELESS_READER;
00605 
00606   if (dp.auth_state_ == DCPS::AS_HANDSHAKE_REPLY && msg.related_message_identity.source_guid == GUID_UNKNOWN) {
00607     DDS::Security::ParticipantBuiltinTopicDataSecure pbtds = {
00608       {
00609         {
00610           DDS::BuiltinTopicKey_t() /*ignored*/,
00611           qos_.user_data
00612         },
00613         identity_token_,
00614         permissions_token_,
00615         qos_.property,
00616         {0, 0}
00617       },
00618       identity_status_token_
00619     };
00620 
00621     pbtds.base.security_info.plugin_participant_security_attributes = participant_sec_attr_.plugin_participant_attributes;
00622     pbtds.base.security_info.participant_security_attributes = security_attributes_to_bitmask(participant_sec_attr_);
00623 
00624     ParameterList plist;
00625     set_participant_guid(guid_, plist);
00626     if (ParameterListConverter::to_param_list(pbtds.base, plist) < 0) {
00627       ACE_ERROR((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: Spdp::handle_handshake_message() - ")
00628         ACE_TEXT("Failed to convert from ParticipantBuiltinTopicData to ParameterList\n")));
00629       return;
00630     }
00631 
00632     ACE_Message_Block temp_buff(64 * 1024);
00633     DCPS::Serializer ser(&temp_buff, DCPS::Serializer::SWAP_BE, DCPS::Serializer::ALIGN_INITIALIZE);
00634     if (!(ser << plist)) {
00635       ACE_ERROR((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: Spdp::handle_handshake_message() - ")
00636         ACE_TEXT("Failed to serialize parameter list.\n")));
00637       return;
00638     }
00639 
00640     DDS::Security::ParticipantStatelessMessage reply;
00641     reply.message_identity.source_guid = guid_;
00642     reply.message_identity.sequence_number = 0;
00643     reply.message_class_id = DDS::Security::GMCLASSID_SECURITY_AUTH_HANDSHAKE;
00644     reply.related_message_identity = msg.message_identity;
00645     reply.destination_participant_guid = src_participant;
00646     reply.destination_endpoint_guid = reader;
00647     reply.source_endpoint_guid = GUID_UNKNOWN;
00648     reply.message_data.length(1);
00649     reply.message_data[0] = msg.message_data[0];
00650 
00651     DDS::Security::ValidationResult_t vr = auth->begin_handshake_reply(dp.handshake_handle_, reply.message_data[0], dp.identity_handle_, identity_handle_, DDS::OctetSeq(temp_buff.length(), &temp_buff), se);
00652     if (vr == DDS::Security::VALIDATION_FAILED) {
00653       ACE_ERROR((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: Spdp::handle_handshake_message() - ")
00654         ACE_TEXT("Failed to reply to incoming handshake message. Security Exception[%d.%d]: %C\n"),
00655           se.code, se.minor_code, se.message.in()));
00656       return;
00657     } else if (vr == DDS::Security::VALIDATION_PENDING_HANDSHAKE_MESSAGE) {
00658       if (sedp_.write_stateless_message(reply, reader) != DDS::RETCODE_OK) {
00659         ACE_ERROR((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: Spdp::handle_handshake_message() - ")
00660           ACE_TEXT("Unable to write stateless message for handshake reply.\n")));
00661         return;
00662       }
00663       dp.has_last_stateless_msg_ = true;
00664       dp.last_stateless_msg_time_ = ACE_OS::gettimeofday();
00665       dp.last_stateless_msg_ = reply;
00666       dp.auth_state_ = DCPS::AS_HANDSHAKE_REPLY_SENT;
00667       return;
00668     } else if (vr == DDS::Security::VALIDATION_OK_FINAL_MESSAGE) {
00669       // Theoretically, this shouldn't happen unless handshakes can involve fewer than 3 messages
00670       if (sedp_.write_stateless_message(reply, reader) != DDS::RETCODE_OK) {
00671         ACE_ERROR((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: Spdp::handle_handshake_message() - ")
00672           ACE_TEXT("Unable to write stateless message for final message.\n")));
00673         return;
00674       }
00675       dp.has_last_stateless_msg_ = false;
00676       dp.auth_state_ = DCPS::AS_AUTHENTICATED;
00677       match_authenticated(src_participant, dp);
00678     } else if (vr == DDS::Security::VALIDATION_OK) {
00679       // Theoretically, this shouldn't happen unless handshakes can involve fewer than 3 messages
00680       dp.has_last_stateless_msg_ = false;
00681       dp.auth_state_ = DCPS::AS_AUTHENTICATED;
00682       match_authenticated(src_participant, dp);
00683     }
00684   }
00685 
00686   if ((dp.auth_state_ == DCPS::AS_HANDSHAKE_REQUEST_SENT || dp.auth_state_ == DCPS::AS_HANDSHAKE_REPLY_SENT) && msg.related_message_identity.source_guid == guid_) {
00687     DDS::Security::ParticipantStatelessMessage reply;
00688     reply.message_identity.source_guid = guid_;
00689     reply.message_identity.sequence_number = 0;
00690     reply.message_class_id = DDS::Security::GMCLASSID_SECURITY_AUTH_HANDSHAKE;
00691     reply.related_message_identity = msg.message_identity;
00692     reply.destination_participant_guid = src_participant;
00693     reply.destination_endpoint_guid = reader;
00694     reply.source_endpoint_guid = GUID_UNKNOWN;
00695     reply.message_data.length(1);
00696 
00697     DDS::Security::ValidationResult_t vr = auth->process_handshake(reply.message_data[0], msg.message_data[0], dp.handshake_handle_, se);
00698     if (vr == DDS::Security::VALIDATION_FAILED) {
00699       if (dp.auth_state_ == DCPS::AS_HANDSHAKE_REQUEST_SENT) {
00700         ACE_ERROR((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: Spdp::handle_handshake_message() - ")
00701           ACE_TEXT("Failed to process incoming handshake message when expecting reply from %C. Security Exception[%d.%d]: %C\n"),
00702           std::string(DCPS::GuidConverter(src_participant)).c_str(), se.code, se.minor_code, se.message.in()));
00703       } else {
00704         ACE_ERROR((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: Spdp::handle_handshake_message() - ")
00705           ACE_TEXT("Failed to process incoming handshake message when expecting final message from %C. Security Exception[%d.%d]: %C\n"),
00706           std::string(DCPS::GuidConverter(src_participant)).c_str(), se.code, se.minor_code, se.message.in()));
00707       }
00708       return;
00709     } else if (vr == DDS::Security::VALIDATION_PENDING_HANDSHAKE_MESSAGE) {
00710       // Theoretically, this shouldn't happen unless handshakes can involve more than 3 messages
00711       if (sedp_.write_stateless_message(reply, reader) != DDS::RETCODE_OK) {
00712         ACE_ERROR((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: Spdp::handle_handshake_message() - ")
00713           ACE_TEXT("Unable to write stateless message for handshake reply.\n")));
00714         return;
00715       }
00716       dp.has_last_stateless_msg_ = true;
00717       dp.last_stateless_msg_time_ = ACE_OS::gettimeofday();
00718       dp.last_stateless_msg_ = reply;
00719       // cache the outbound message, but don't change state, since roles shouldn't have changed?
00720     } else if (vr == DDS::Security::VALIDATION_OK_FINAL_MESSAGE) {
00721       if (sedp_.write_stateless_message(reply, reader) != DDS::RETCODE_OK) {
00722         ACE_ERROR((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: Spdp::handle_handshake_message() - ")
00723           ACE_TEXT("Unable to write stateless message for final message.\n")));
00724         return;
00725       }
00726       dp.has_last_stateless_msg_ = false;
00727       dp.auth_state_ = DCPS::AS_AUTHENTICATED;
00728       match_authenticated(src_participant, dp);
00729     } else if (vr == DDS::Security::VALIDATION_OK) {
00730       dp.has_last_stateless_msg_ = false;
00731       dp.auth_state_ = DCPS::AS_AUTHENTICATED;
00732       match_authenticated(src_participant, dp);
00733     }
00734   }
00735 
00736   return;
00737 }
00738 
00739 void
00740 Spdp::check_auth_states(const ACE_Time_Value& tv) {
00741   ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00742   OPENDDS_SET_CMP(RepoId, DCPS::GUID_tKeyLessThan) to_erase;
00743   for (DiscoveredParticipantIter pi = participants_.begin(); pi != participants_.end(); ++pi) {
00744     switch (pi->second.auth_state_) {
00745       case DCPS::AS_HANDSHAKE_REQUEST_SENT:
00746       case DCPS::AS_HANDSHAKE_REPLY_SENT:
00747         if (tv > pi->second.auth_started_time_ + MAX_AUTH_TIME) {
00748           to_erase.insert(pi->first);
00749         } else if (pi->second.has_last_stateless_msg_ && (tv > (pi->second.last_stateless_msg_time_ + AUTH_RESEND_PERIOD))) {
00750           RepoId reader = pi->first;
00751           reader.entityId = ENTITYID_P2P_BUILTIN_PARTICIPANT_STATELESS_READER;
00752           pi->second.last_stateless_msg_time_ = tv;
00753           if (sedp_.write_stateless_message(pi->second.last_stateless_msg_, reader) != DDS::RETCODE_OK) {
00754             ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DEBUG: Spdp::check_auth_states() - ")
00755               ACE_TEXT("Unable to write stateless message retry.\n")));
00756           }
00757         }
00758         break;
00759       case DCPS::AS_UNKNOWN:
00760       case DCPS::AS_VALIDATING_REMOTE:
00761       case DCPS::AS_HANDSHAKE_REQUEST:
00762       case DCPS::AS_HANDSHAKE_REPLY:
00763       case DCPS::AS_AUTHENTICATED:
00764       case DCPS::AS_UNAUTHENTICATED:
00765       default:
00766         break;
00767     }
00768   }
00769   for (OPENDDS_SET_CMP(RepoId, DCPS::GUID_tKeyLessThan)::const_iterator it = to_erase.begin(); it != to_erase.end(); ++it) {
00770     DiscoveredParticipantIter pit = participants_.find(*it);
00771     if (pit != participants_.end()) {
00772       ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DEBUG: Spdp::check_auth_states()      - Removing discovered participant due to authentication timeout: %C\n"), std::string(DCPS::GuidConverter(*it)).c_str()));
00773       if (participant_sec_attr_.allow_unauthenticated_participants == false) {
00774         remove_discovered_participant(pit);
00775       } else {
00776         pit->second.auth_state_ = DCPS::AS_UNAUTHENTICATED;
00777         match_unauthenticated(*it, pit->second);
00778       }
00779     }
00780   }
00781 }
00782 
00783 
00784 void
00785 Spdp::handle_participant_crypto_tokens(const DDS::Security::ParticipantVolatileMessageSecure& msg) {
00786   DDS::Security::SecurityException se = {"", 0, 0};
00787   Security::CryptoKeyExchange_var key_exchange = security_config_->get_crypto_key_exchange();
00788 
00789   // If this message wasn't intended for us, ignore volatile message
00790   if (msg.destination_participant_guid != guid_ || !msg.message_data.length()) {
00791     return;
00792   }
00793 
00794   RepoId src_participant = msg.message_identity.source_guid;
00795   src_participant.entityId = DCPS::ENTITYID_PARTICIPANT;
00796 
00797   ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00798 
00799   // If discovery hasn't initialized / validated this participant yet, ignore volatile message
00800   DiscoveredParticipantIter iter = participants_.find(src_participant);
00801   if (iter == participants_.end()) {
00802     ACE_DEBUG((LM_WARNING,
00803       ACE_TEXT("(%P|%t) Spdp::handle_participant_crypto_tokens() - ")
00804       ACE_TEXT("received tokens for undiscovered participant %C. Ignoring.\n"),
00805                std::string(DCPS::GuidConverter(src_participant)).c_str()));
00806     return;
00807   }
00808   DiscoveredParticipant& dp = iter->second;
00809 
00810   dp.crypto_tokens_ = reinterpret_cast<const DDS::Security::ParticipantCryptoTokenSeq&>(msg.message_data);
00811 
00812   if (key_exchange->set_remote_participant_crypto_tokens(crypto_handle_, dp.crypto_handle_, dp.crypto_tokens_, se) == false) {
00813     ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00814       ACE_TEXT("(%P|%t) ERROR: Spdp::handle_participant_crypto_tokens() - ")
00815       ACE_TEXT("Unable to set remote participant crypto tokens with crypto key exchange plugin. Security Exception[%d.%d]: %C\n"),
00816         se.code, se.minor_code, se.message.in()));
00817     return;
00818   }
00819 }
00820 
00821 bool
00822 Spdp::match_authenticated(const DCPS::RepoId& guid, DiscoveredParticipant& dp)
00823 {
00824   DDS::Security::SecurityException se = {"", 0, 0};
00825 
00826   Security::Authentication_var auth = security_config_->get_authentication();
00827   Security::AccessControl_var access = security_config_->get_access_control();
00828   Security::CryptoKeyFactory_var key_factory = security_config_->get_crypto_key_factory();
00829   Security::CryptoKeyExchange_var key_exchange = security_config_->get_crypto_key_exchange();
00830 
00831   dp.shared_secret_handle_ = auth->get_shared_secret(dp.handshake_handle_, se);
00832   if (dp.shared_secret_handle_ == 0) {
00833     ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: ")
00834       ACE_TEXT("Spdp::match_authenticated() - ")
00835       ACE_TEXT("Unable to get shared secret handle. Security Exception[%d.%d]: %C\n"),
00836         se.code, se.minor_code, se.message.in()));
00837     return false;
00838   }
00839 
00840   if (auth->get_authenticated_peer_credential_token(dp.authenticated_peer_credential_token_, dp.handshake_handle_, se) == false) {
00841     ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: ")
00842       ACE_TEXT("Spdp::match_authenticated() - ")
00843       ACE_TEXT("Unable to get authenticated peer credential token. Security Exception[%d.%d]: %C\n"),
00844         se.code, se.minor_code, se.message.in()));
00845     return false;
00846   }
00847 
00848   dp.permissions_handle_ = access->validate_remote_permissions(auth, identity_handle_, dp.identity_handle_, dp.permissions_token_, dp.authenticated_peer_credential_token_, se);
00849   if (participant_sec_attr_.is_access_protected == true && dp.permissions_handle_ == DDS::HANDLE_NIL) {
00850     ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: ")
00851       ACE_TEXT("Spdp::match_authenticated() - ")
00852       ACE_TEXT("Unable to validate remote participant with access control plugin. Security Exception[%d.%d]: %C\n"),
00853         se.code, se.minor_code, se.message.in()));
00854     return false;
00855   }
00856 
00857   if (participant_sec_attr_.is_access_protected == true) {
00858     if (access->check_remote_participant(dp.permissions_handle_, domain_, dp.pdata_.ddsParticipantDataSecure, se) == false) {
00859       ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: ")
00860         ACE_TEXT("Spdp::match_authenticated() - ")
00861         ACE_TEXT("Remote participant check failed. Security Exception[%d.%d]: %C\n"),
00862           se.code, se.minor_code, se.message.in()));
00863       return false;
00864     }
00865   }
00866 
00867   if (DCPS::DCPS_debug_level > 3) {
00868     ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) Spdp::match_authenticated - ")
00869                ACE_TEXT("auth and access control complete for peer %C\n"),
00870                std::string(DCPS::GuidConverter(guid)).c_str()));
00871   }
00872 
00873   dp.crypto_handle_ = key_factory->register_matched_remote_participant(crypto_handle_, dp.identity_handle_, dp.permissions_handle_, dp.shared_secret_handle_, se);
00874   if (dp.crypto_handle_ == DDS::HANDLE_NIL) {
00875     ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: ")
00876       ACE_TEXT("Spdp::match_authenticated() - ")
00877       ACE_TEXT("Unable to register remote participant with crypto key factory plugin. Security Exception[%d.%d]: %C\n"),
00878         se.code, se.minor_code, se.message.in()));
00879     return false;
00880   }
00881 
00882   if (key_exchange->create_local_participant_crypto_tokens(crypto_tokens_, crypto_handle_, dp.crypto_handle_, se) == false) {
00883     ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: ")
00884       ACE_TEXT("Spdp::match_authenticated() - ")
00885       ACE_TEXT("Unable to create local participant crypto tokens with crypto key exchange plugin. Security Exception[%d.%d]: %C\n"),
00886         se.code, se.minor_code, se.message.in()));
00887     return false;
00888   }
00889 
00890   // Must unlock when calling into part_bit() as it may call back into us
00891   ACE_Reverse_Lock<ACE_Thread_Mutex> rev_lock(lock_);
00892 
00893   DDS::InstanceHandle_t bit_instance_handle = DDS::HANDLE_NIL;
00894 #ifndef DDS_HAS_MINIMUM_BIT
00895   DCPS::ParticipantBuiltinTopicDataDataReaderImpl* bit = part_bit();
00896   if (bit) {
00897     ACE_GUARD_REACTION(ACE_Reverse_Lock<ACE_Thread_Mutex>, rg, rev_lock, return false);
00898     bit_instance_handle =
00899       bit->store_synthetic_data(dp.pdata_.ddsParticipantDataSecure.base.base,
00900                                 DDS::NEW_VIEW_STATE);
00901   }
00902 #endif /* DDS_HAS_MINIMUM_BIT */
00903 
00904   // notify Sedp of association
00905   // Sedp may call has_discovered_participant, which is the participant must be added before these calls to associate.
00906   sedp_.associate(dp.pdata_);
00907   sedp_.associate_volatile(dp.pdata_);
00908   sedp_.associate_secure_writers_to_readers(dp.pdata_);
00909   sedp_.associate_secure_readers_to_writers(dp.pdata_);
00910 
00911   // Iterator is no longer valid
00912   DiscoveredParticipantIter iter = participants_.find(guid);
00913   if (iter != participants_.end()) {
00914     iter->second.bit_ih_ = bit_instance_handle;
00915   }
00916   return true;
00917 }
00918 
00919 void
00920 Spdp::attempt_authentication(const DCPS::RepoId& guid, DiscoveredParticipant& dp)
00921 {
00922   DDS::Security::Authentication_var auth = security_config_->get_authentication();
00923   DDS::Security::SecurityException se = {"", 0, 0};
00924 
00925   if (dp.auth_state_ == DCPS::AS_UNKNOWN) {
00926     dp.auth_started_time_ = ACE_OS::gettimeofday();
00927     dp.auth_state_ = DCPS::AS_VALIDATING_REMOTE;
00928   }
00929 
00930   if (dp.auth_state_ == DCPS::AS_VALIDATING_REMOTE) {
00931     DDS::Security::ValidationResult_t vr = auth->validate_remote_identity(dp.identity_handle_, dp.local_auth_request_token_, dp.remote_auth_request_token_, identity_handle_, dp.identity_token_, guid, se);
00932 
00933     // Take care of any auth tokens that need to be sent before handling return value
00934     if (!(dp.local_auth_request_token_ == DDS::Security::Token())) {
00935       DDS::Security::ParticipantStatelessMessage msg;
00936       msg.message_identity.source_guid = guid_;
00937       msg.message_class_id = DDS::Security::GMCLASSID_SECURITY_AUTH_REQUEST;
00938       msg.destination_participant_guid = guid;
00939       msg.destination_endpoint_guid = GUID_UNKNOWN;
00940       msg.source_endpoint_guid = GUID_UNKNOWN;
00941       msg.related_message_identity.source_guid = GUID_UNKNOWN;
00942       msg.related_message_identity.sequence_number = 0;
00943       msg.message_data.length(1);
00944       msg.message_data[0] = dp.local_auth_request_token_;
00945 
00946       DCPS::RepoId reader = guid;
00947       reader.entityId = ENTITYID_P2P_BUILTIN_PARTICIPANT_STATELESS_READER;
00948 
00949       if (sedp_.write_stateless_message(msg, reader) != DDS::RETCODE_OK) {
00950         ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: Spdp::attempt_authentication() - ")
00951           ACE_TEXT("Unable to write stateless message (auth request).\n")));
00952       }
00953     }
00954     switch (vr) {
00955       case DDS::Security::VALIDATION_OK: {
00956         dp.auth_state_ = DCPS::AS_AUTHENTICATED;
00957         return;
00958       }
00959       case DDS::Security::VALIDATION_PENDING_HANDSHAKE_MESSAGE: {
00960         dp.auth_state_ = DCPS::AS_HANDSHAKE_REPLY;
00961         ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DEBUG: Spdp::attempt_authentication() - Attempting authentication (expecting reply) for participant:   %C\n"), std::string(DCPS::GuidConverter(guid)).c_str()));
00962         return; // We'll need to wait for an inbound handshake request from the remote participant
00963       }
00964       case DDS::Security::VALIDATION_PENDING_HANDSHAKE_REQUEST: {
00965         dp.auth_state_ = DCPS::AS_HANDSHAKE_REQUEST;
00966         ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DEBUG: Spdp::attempt_authentication() - Attempting authentication (sending request) for participant:   %C\n"), std::string(DCPS::GuidConverter(guid)).c_str()));
00967         break; // We've got more to do, move on to handshake request
00968       }
00969       case DDS::Security::VALIDATION_FAILED: {
00970         ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DEBUG: Spdp::attempt_authentication() - ")
00971           ACE_TEXT("Remote participant identity is invalid. Security Exception[%d.%d]: %C\n"),
00972             se.code, se.minor_code, se.message.in()));
00973         dp.auth_state_ = DCPS::AS_UNAUTHENTICATED;
00974         return;
00975       }
00976       default: {
00977         ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DEBUG: Spdp::attempt_authentication() - ")
00978           ACE_TEXT("Unexpected return value while validating remote identity. Security Exception[%d.%d]: %C\n"),
00979             se.code, se.minor_code, se.message.in()));
00980         dp.auth_state_ = DCPS::AS_UNAUTHENTICATED;
00981         return;
00982       }
00983     }
00984   }
00985 
00986   if (dp.auth_state_ == DCPS::AS_HANDSHAKE_REQUEST) {
00987     DDS::Security::ParticipantBuiltinTopicDataSecure pbtds = {
00988       {
00989         {
00990           DDS::BuiltinTopicKey_t() /*ignored*/,
00991           qos_.user_data
00992         },
00993         identity_token_,
00994         permissions_token_,
00995         qos_.property,
00996         {0, 0}
00997       },
00998       identity_status_token_
00999     };
01000 
01001     pbtds.base.security_info.plugin_participant_security_attributes = participant_sec_attr_.plugin_participant_attributes;
01002     pbtds.base.security_info.participant_security_attributes = DDS::Security::PARTICIPANT_SECURITY_ATTRIBUTES_FLAG_IS_VALID;
01003     if (participant_sec_attr_.is_rtps_protected) {
01004       pbtds.base.security_info.participant_security_attributes |= DDS::Security::PARTICIPANT_SECURITY_ATTRIBUTES_FLAG_IS_RTPS_PROTECTED;
01005     }
01006     if (participant_sec_attr_.is_discovery_protected) {
01007       pbtds.base.security_info.participant_security_attributes |= DDS::Security::PARTICIPANT_SECURITY_ATTRIBUTES_FLAG_IS_DISCOVERY_PROTECTED;
01008     }
01009     if (participant_sec_attr_.is_liveliness_protected) {
01010       pbtds.base.security_info.participant_security_attributes |= DDS::Security::PARTICIPANT_SECURITY_ATTRIBUTES_FLAG_IS_LIVELINESS_PROTECTED;
01011     }
01012 
01013     ParameterList plist;
01014     set_participant_guid(guid_, plist);
01015     if (ParameterListConverter::to_param_list(pbtds.base, plist) < 0) {
01016       ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: Spdp::attempt_authentication() - ")
01017         ACE_TEXT("Failed to convert from ParticipantBuiltinTopicData to ParameterList\n")));
01018       return;
01019     }
01020 
01021     ACE_Message_Block temp_buff(64 * 1024);
01022     DCPS::Serializer ser(&temp_buff, DCPS::Serializer::SWAP_BE, DCPS::Serializer::ALIGN_INITIALIZE);
01023     if (!(ser << plist)) {
01024       ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: Spdp::attempt_authentication() - ")
01025         ACE_TEXT("Failed to serialize parameter list.\n")));
01026       return;
01027     }
01028 
01029     DDS::Security::HandshakeMessageToken hs_mt;
01030 
01031     if (auth->begin_handshake_request(dp.handshake_handle_, hs_mt, identity_handle_, dp.identity_handle_, DDS::OctetSeq(temp_buff.length(), &temp_buff), se) != DDS::Security::VALIDATION_PENDING_HANDSHAKE_MESSAGE) {
01032       ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: Spdp::attempt_authentication() - ")
01033         ACE_TEXT("Failed to begin handshake_request. Security Exception[%d.%d]: %C\n"),
01034           se.code, se.minor_code, se.message.in()));
01035       return;
01036     }
01037 
01038     DCPS::RepoId writer = guid_;
01039     writer.entityId = ENTITYID_P2P_BUILTIN_PARTICIPANT_STATELESS_WRITER;
01040 
01041     DCPS::RepoId reader = guid;
01042     reader.entityId = ENTITYID_P2P_BUILTIN_PARTICIPANT_STATELESS_READER;
01043 
01044     DDS::Security::ParticipantStatelessMessage msg;
01045     msg.message_identity.source_guid = guid_;
01046     msg.message_class_id = DDS::Security::GMCLASSID_SECURITY_AUTH_HANDSHAKE;
01047     msg.destination_participant_guid = guid;
01048     msg.destination_endpoint_guid = reader;
01049     msg.source_endpoint_guid = GUID_UNKNOWN;
01050     msg.related_message_identity.source_guid = GUID_UNKNOWN;
01051     msg.related_message_identity.sequence_number = 0;
01052     msg.message_data.length(1);
01053     msg.message_data[0] = hs_mt;
01054 
01055     if (sedp_.write_stateless_message(msg, reader) != DDS::RETCODE_OK) {
01056       ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: Spdp::attempt_authentication() - ")
01057         ACE_TEXT("Unable to write stateless message (handshake).\n")));
01058       return;
01059     }
01060     dp.has_last_stateless_msg_ = true;
01061     dp.last_stateless_msg_time_ = ACE_OS::gettimeofday();
01062     dp.last_stateless_msg_ = msg;
01063     dp.auth_state_ = DCPS::AS_HANDSHAKE_REQUEST_SENT;
01064   }
01065 
01066   return;
01067 }
01068 #endif
01069 
01070 void
01071 Spdp::remove_expired_participants()
01072 {
01073   // Find and remove any expired discovered participant
01074   ACE_GUARD (ACE_Thread_Mutex, g, lock_);
01075   // Iterate through a copy of the repo Ids, rather than the map
01076   //   as it gets unlocked in remove_discovered_participant()
01077   DCPS::RepoIdSet participant_ids;
01078   get_discovered_participant_ids(participant_ids);
01079   for (DCPS::RepoIdSet::iterator participant_id = participant_ids.begin();
01080        participant_id != participant_ids.end();
01081        ++participant_id)
01082   {
01083     DiscoveredParticipantIter part = participants_.find(*participant_id);
01084     if (part != participants_.end()) {
01085       if (part->second.last_seen_ <
01086           ACE_OS::gettimeofday() -
01087           ACE_Time_Value(part->second.pdata_.leaseDuration.seconds)) {
01088         if (DCPS::DCPS_debug_level > 1) {
01089           DCPS::GuidConverter conv(part->first);
01090           ACE_DEBUG((LM_WARNING,
01091             ACE_TEXT("(%P|%t) Spdp::remove_expired_participants() - ")
01092             ACE_TEXT("participant %C exceeded lease duration, removing\n"),
01093             OPENDDS_STRING(conv).c_str()));
01094         }
01095         remove_discovered_participant(part);
01096       }
01097     }
01098   }
01099 }
01100 
01101 void
01102 Spdp::init_bit(const DDS::Subscriber_var& bit_subscriber)
01103 {
01104   bit_subscriber_ = bit_subscriber;
01105   tport_->open();
01106 }
01107 
01108 void
01109 Spdp::fini_bit()
01110 {
01111   bit_subscriber_ = 0;
01112   wait_for_acks_.reset();
01113   // request for SpdpTransport(actually Reactor) thread and Sedp::Task
01114   // to acknowledge
01115   tport_->acknowledge();
01116   sedp_.acknowledge();
01117   // wait for the 2 acknowledgements
01118   wait_for_acks_.wait_for_acks(2);
01119 }
01120 
01121 #ifndef DDS_HAS_MINIMUM_BIT
01122 DCPS::ParticipantBuiltinTopicDataDataReaderImpl*
01123 Spdp::part_bit()
01124 {
01125   if (!bit_subscriber_.in())
01126     return 0;
01127 
01128   DDS::DataReader_var d =
01129     bit_subscriber_->lookup_datareader(DCPS::BUILT_IN_PARTICIPANT_TOPIC);
01130   return dynamic_cast<DCPS::ParticipantBuiltinTopicDataDataReaderImpl*>(d.in());
01131 }
01132 #endif /* DDS_HAS_MINIMUM_BIT */
01133 
01134 ACE_Reactor*
01135 Spdp::reactor() const
01136 {
01137   return disco_->reactor();
01138 }
01139 
01140 WaitForAcks&
01141 Spdp::wait_for_acks()
01142 {
01143   return wait_for_acks_;
01144 }
01145 
01146 bool
01147 Spdp::is_opendds(const GUID_t& participant) const
01148 {
01149   const DiscoveredParticipantConstIter iter = participants_.find(participant);
01150   if (iter == participants_.end()) {
01151     return false;
01152   }
01153   return 0 == std::memcmp(&iter->second.pdata_.participantProxy.vendorId,
01154                           DCPS::VENDORID_OCI, sizeof(VendorId_t));
01155 }
01156 
01157 Security::SPDPdiscoveredParticipantData
01158 Spdp::build_local_pdata(Security::DiscoveredParticipantDataKind kind)
01159 {
01160   BuiltinEndpointSet_t availableBuiltinEndpoints =
01161     DISC_BUILTIN_ENDPOINT_PARTICIPANT_ANNOUNCER |
01162     DISC_BUILTIN_ENDPOINT_PARTICIPANT_DETECTOR |
01163     DISC_BUILTIN_ENDPOINT_PUBLICATION_ANNOUNCER |
01164     DISC_BUILTIN_ENDPOINT_PUBLICATION_DETECTOR |
01165     DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_ANNOUNCER |
01166     DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_DETECTOR |
01167     BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_DATA_WRITER |
01168     BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_DATA_READER
01169     ;
01170 
01171 #if defined(OPENDDS_SECURITY)
01172   if (is_security_enabled()) {
01173     availableBuiltinEndpoints |=
01174       DDS::Security::SEDP_BUILTIN_PUBLICATIONS_SECURE_WRITER |
01175       DDS::Security::SEDP_BUILTIN_PUBLICATIONS_SECURE_READER |
01176       DDS::Security::SEDP_BUILTIN_SUBSCRIPTIONS_SECURE_WRITER |
01177       DDS::Security::SEDP_BUILTIN_SUBSCRIPTIONS_SECURE_READER |
01178       DDS::Security::BUILTIN_PARTICIPANT_MESSAGE_SECURE_WRITER |
01179       DDS::Security::BUILTIN_PARTICIPANT_MESSAGE_SECURE_READER |
01180       DDS::Security::BUILTIN_PARTICIPANT_STATELESS_MESSAGE_WRITER |
01181       DDS::Security::BUILTIN_PARTICIPANT_STATELESS_MESSAGE_READER |
01182       DDS::Security::BUILTIN_PARTICIPANT_VOLATILE_MESSAGE_SECURE_WRITER |
01183       DDS::Security::BUILTIN_PARTICIPANT_VOLATILE_MESSAGE_SECURE_READER |
01184       DDS::Security::SPDP_BUILTIN_PARTICIPANT_SECURE_WRITER |
01185       DDS::Security::SPDP_BUILTIN_PARTICIPANT_SECURE_READER
01186     ;
01187   }
01188 #endif
01189 
01190   // The RTPS spec has no constants for the builtinTopics{Writer,Reader}
01191 
01192   // This locator list should not be empty, but we won't actually be using it.
01193   // The OpenDDS publication/subscription data will have locators included.
01194   DCPS::LocatorSeq nonEmptyList(1);
01195   nonEmptyList.length(1);
01196   nonEmptyList[0].kind = LOCATOR_KIND_UDPv4;
01197   nonEmptyList[0].port = 12345;
01198   std::memset(nonEmptyList[0].address, 0, 12);
01199   nonEmptyList[0].address[12] = 127;
01200   nonEmptyList[0].address[13] = 0;
01201   nonEmptyList[0].address[14] = 0;
01202   nonEmptyList[0].address[15] = 1;
01203 
01204   const GuidPrefix_t& gp = guid_.guidPrefix;
01205 
01206   const Security::SPDPdiscoveredParticipantData pdata = {
01207     kind,
01208     { // ParticipantBuiltinTopicDataSecure
01209       { // ParticipantBuiltinTopicData (security enhanced)
01210         { // ParticipantBuiltinTopicData (original)
01211           DDS::BuiltinTopicKey_t() /*ignored*/,
01212           qos_.user_data
01213         },
01214 
01215 #if defined(OPENDDS_SECURITY)
01216         identity_token_,
01217         permissions_token_,
01218 #else
01219         DDS::Security::Token(),
01220         DDS::Security::Token(),
01221 #endif
01222 
01223         qos_.property,
01224 
01225 #if defined(OPENDDS_SECURITY)
01226         {
01227           security_attributes_to_bitmask(participant_sec_attr_),
01228           participant_sec_attr_.plugin_participant_attributes
01229         }
01230 #else
01231         DDS::Security::ParticipantSecurityInfo()
01232 #endif
01233 
01234       },
01235 
01236 #if defined(OPENDDS_SECURITY)
01237       identity_status_token_
01238 #else
01239       DDS::Security::Token()
01240 #endif
01241 
01242     },
01243     { // ParticipantProxy_t
01244       PROTOCOLVERSION,
01245       {gp[0], gp[1], gp[2], gp[3], gp[4], gp[5],
01246        gp[6], gp[7], gp[8], gp[9], gp[10], gp[11]},
01247       VENDORID_OPENDDS,
01248       false /*expectsIQoS*/,
01249       availableBuiltinEndpoints,
01250       sedp_unicast_,
01251       sedp_multicast_,
01252       nonEmptyList /*defaultMulticastLocatorList*/,
01253       nonEmptyList /*defaultUnicastLocatorList*/,
01254       {0 /*manualLivelinessCount*/}   //FUTURE: implement manual liveliness
01255     },
01256     { // Duration_t (leaseDuration)
01257       static_cast<CORBA::Long>((disco_->resend_period() * LEASE_MULT).sec()),
01258       0 // we are not supporting fractional seconds in the lease duration
01259     }
01260   };
01261 
01262   return pdata;
01263 }
01264 
01265 bool Spdp::announce_domain_participant_qos()
01266 {
01267 
01268 #if defined(OPENDDS_SECURITY)
01269   if (is_security_enabled())
01270     write_secure_updates();
01271 #endif
01272 
01273   return true;
01274 }
01275 
01276 Spdp::SpdpTransport::SpdpTransport(Spdp* outer, bool securityGuids)
01277   : outer_(outer), lease_duration_(outer_->disco_->resend_period() * LEASE_MULT)
01278   , buff_(64 * 1024)
01279   , wbuff_(64 * 1024)
01280 {
01281   hdr_.prefix[0] = 'R';
01282   hdr_.prefix[1] = 'T';
01283   hdr_.prefix[2] = 'P';
01284   hdr_.prefix[3] = 'S';
01285   hdr_.version = PROTOCOLVERSION;
01286   hdr_.vendorId = VENDORID_OPENDDS;
01287   std::memcpy(hdr_.guidPrefix, outer_->guid_.guidPrefix, sizeof(GuidPrefix_t));
01288   data_.smHeader.submessageId = DATA;
01289   data_.smHeader.flags = FLAG_E | FLAG_D;
01290   data_.smHeader.submessageLength = 0; // last submessage in the Message
01291   data_.extraFlags = 0;
01292   data_.octetsToInlineQos = DATA_OCTETS_TO_IQOS;
01293   data_.readerId = ENTITYID_UNKNOWN;
01294   data_.writerId = ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER;
01295   data_.writerSN.high = 0;
01296   data_.writerSN.low = 0;
01297 
01298   // Ports are set by the formulas in RTPS v2.1 Table 9.8
01299   const u_short port_common = outer_->disco_->pb() +
01300                               (outer_->disco_->dg() * outer_->domain_),
01301     mc_port = port_common + outer_->disco_->d0();
01302 
01303   // with security enabled the meaning of the bytes in guidPrefix changes
01304   u_short participantId = securityGuids ? 0
01305     : (hdr_.guidPrefix[10] << 8) | hdr_.guidPrefix[11];
01306 
01307 #ifdef OPENDDS_SAFETY_PROFILE
01308   const u_short startingParticipantId = participantId;
01309 #endif
01310 
01311   while (!open_unicast_socket(port_common, participantId)) {
01312     ++participantId;
01313   }
01314 
01315 #ifdef OPENDDS_SAFETY_PROFILE
01316   if (participantId > startingParticipantId && ACE_OS::getpid() == -1) {
01317     // Since pids are not available, use the fact that we had to increment
01318     // participantId to modify the GUID's pid bytes.  This avoids GUID conflicts
01319     // between processes on the same host which start at the same time
01320     // (resulting in the same seed value for the random number generator).
01321     hdr_.guidPrefix[8] = static_cast<CORBA::Octet>(participantId >> 8);
01322     hdr_.guidPrefix[9] = static_cast<CORBA::Octet>(participantId & 0xFF);
01323     outer_->guid_.guidPrefix[8] = hdr_.guidPrefix[8];
01324     outer_->guid_.guidPrefix[9] = hdr_.guidPrefix[9];
01325   }
01326 #endif
01327 
01328   OPENDDS_STRING mc_addr = outer_->disco_->default_multicast_group();
01329   ACE_INET_Addr default_multicast;
01330   if (0 != default_multicast.set(mc_port, mc_addr.c_str())) {
01331     ACE_DEBUG((
01332           LM_ERROR,
01333           ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::SpdpTransport() - ")
01334           ACE_TEXT("failed setting default_multicast address %C:%hd %p\n"),
01335           mc_addr.c_str(), mc_port, ACE_TEXT("ACE_INET_Addr::set")));
01336     throw std::runtime_error("failed to set default_multicast address");
01337   }
01338 
01339   const OPENDDS_STRING& net_if = outer_->disco_->multicast_interface();
01340 
01341   if (DCPS::DCPS_debug_level > 3) {
01342     ACE_DEBUG((LM_INFO,
01343                ACE_TEXT("(%P|%t) Spdp::SpdpTransport::SpdpTransport ")
01344                ACE_TEXT("joining group %C %C:%hd\n"),
01345                net_if.c_str (),
01346                mc_addr.c_str (),
01347                mc_port));
01348   }
01349 
01350 #ifdef ACE_HAS_MAC_OSX
01351   multicast_socket_.opts(ACE_SOCK_Dgram_Mcast::OPT_BINDADDR_NO |
01352                          ACE_SOCK_Dgram_Mcast::DEFOPT_NULLIFACE);
01353 #endif
01354 
01355   if (0 != multicast_socket_.join(default_multicast, 1,
01356                                   net_if.empty() ? 0 :
01357                                   ACE_TEXT_CHAR_TO_TCHAR(net_if.c_str()))) {
01358     ACE_ERROR((LM_ERROR,
01359         ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::SpdpTransport() - ")
01360         ACE_TEXT("failed to join multicast group %C:%hd %p\n"),
01361         mc_addr.c_str(), mc_port, ACE_TEXT("ACE_SOCK_Dgram_Mcast::join")));
01362     throw std::runtime_error("failed to join multicast group");
01363   }
01364 
01365   send_addrs_.insert(default_multicast);
01366 
01367   typedef RtpsDiscovery::AddrVec::iterator iter;
01368   for (iter it = outer_->disco_->spdp_send_addrs().begin(),
01369        end = outer_->disco_->spdp_send_addrs().end(); it != end; ++it) {
01370     send_addrs_.insert(ACE_INET_Addr(it->c_str()));
01371   }
01372 }
01373 
01374 void
01375 Spdp::SpdpTransport::open()
01376 {
01377   ACE_Reactor* reactor = outer_->reactor();
01378   if (reactor->register_handler(unicast_socket_.get_handle(),
01379                                 this, ACE_Event_Handler::READ_MASK) != 0) {
01380     throw std::runtime_error("failed to register unicast input handler");
01381   }
01382 
01383   if (reactor->register_handler(multicast_socket_.get_handle(),
01384                                 this, ACE_Event_Handler::READ_MASK) != 0) {
01385     throw std::runtime_error("failed to register multicast input handler");
01386   }
01387 
01388   disco_resend_period_ = outer_->disco_->resend_period();
01389   last_disco_resend_ = 0;
01390 
01391   ACE_Time_Value timer_period = disco_resend_period_ < MAX_SPDP_TIMER_PERIOD ? disco_resend_period_ : MAX_SPDP_TIMER_PERIOD;
01392 
01393   if (-1 == reactor->schedule_timer(this, 0, ACE_Time_Value(0), timer_period)) {
01394     throw std::runtime_error("failed to schedule timer with reactor");
01395   }
01396 }
01397 
01398 Spdp::SpdpTransport::~SpdpTransport()
01399 {
01400   if (DCPS::DCPS_debug_level > 3) {
01401     ACE_DEBUG((LM_INFO, ACE_TEXT("(%P|%t) SpdpTransport::~SpdpTransport\n")));
01402   }
01403   try {
01404     dispose_unregister();
01405   }
01406   catch (const CORBA::Exception& ex) {
01407     if (DCPS::DCPS_debug_level > 0) {
01408       ACE_DEBUG((LM_WARNING,
01409         ACE_TEXT("(%P|%t) WARNING: Exception caught in ")
01410         ACE_TEXT("SpdpTransport::~SpdpTransport: %C\n"),
01411         ex._info().c_str()));
01412     }
01413   }
01414   {
01415     // Acquire lock for modification of condition variable
01416     ACE_GUARD(ACE_Thread_Mutex, g, outer_->lock_);
01417     outer_->eh_shutdown_ = true;
01418   }
01419   outer_->shutdown_cond_.signal();
01420   unicast_socket_.close();
01421   multicast_socket_.close();
01422 }
01423 
01424 void
01425 Spdp::SpdpTransport::dispose_unregister()
01426 {
01427   // Send the dispose/unregister SPDP sample
01428   data_.writerSN.high = seq_.getHigh();
01429   data_.writerSN.low = seq_.getLow();
01430   data_.smHeader.flags = FLAG_E | FLAG_Q | FLAG_K_IN_DATA;
01431   data_.inlineQos.length(1);
01432   static const StatusInfo_t dispose_unregister = { {0, 0, 0, 3} };
01433   data_.inlineQos[0].status_info(dispose_unregister);
01434 
01435   ParameterList plist(1);
01436   plist.length(1);
01437   plist[0].guid(outer_->guid_);
01438   plist[0]._d(PID_PARTICIPANT_GUID);
01439 
01440   wbuff_.reset();
01441   DCPS::Serializer ser(&wbuff_, false, DCPS::Serializer::ALIGN_CDR);
01442   CORBA::UShort options = 0;
01443   if (!(ser << hdr_) || !(ser << data_) || !(ser << encap_LE) || !(ser << options)
01444       || !(ser << plist)) {
01445     ACE_ERROR((LM_ERROR,
01446       ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::dispose_unregister() - ")
01447       ACE_TEXT("failed to serialize headers for dispose/unregister\n")));
01448     return;
01449   }
01450 
01451   typedef OPENDDS_SET(ACE_INET_Addr)::const_iterator iter_t;
01452   for (iter_t iter = send_addrs_.begin(); iter != send_addrs_.end(); ++iter) {
01453     const ssize_t res =
01454       unicast_socket_.send(wbuff_.rd_ptr(), wbuff_.length(), *iter);
01455     if (res < 0) {
01456       ACE_TCHAR addr_buff[256] = {};
01457       iter->addr_to_string(addr_buff, 256, 0);
01458       ACE_ERROR((LM_ERROR,
01459         ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::dispose_unregister() - ")
01460         ACE_TEXT("destination %s failed %p\n"), addr_buff, ACE_TEXT("send")));
01461     }
01462   }
01463 }
01464 
01465 void
01466 Spdp::SpdpTransport::close()
01467 {
01468   if (DCPS::DCPS_debug_level > 3) {
01469     ACE_DEBUG((LM_INFO, ACE_TEXT("(%P|%t) SpdpTransport::close\n")));
01470   }
01471   ACE_Reactor* reactor = outer_->reactor();
01472   reactor->cancel_timer(this);
01473   const ACE_Reactor_Mask mask =
01474     ACE_Event_Handler::READ_MASK | ACE_Event_Handler::DONT_CALL;
01475   reactor->remove_handler(multicast_socket_.get_handle(), mask);
01476   reactor->remove_handler(unicast_socket_.get_handle(), mask);
01477 }
01478 
01479 void
01480 Spdp::SpdpTransport::write()
01481 {
01482   ACE_GUARD(ACE_Thread_Mutex, g, outer_->lock_);
01483   write_i();
01484 }
01485 
01486 void
01487 Spdp::SpdpTransport::write_i()
01488 {
01489 #if defined(OPENDDS_SECURITY)
01490   const Security::SPDPdiscoveredParticipantData& pdata =
01491     outer_->build_local_pdata(outer_->is_security_enabled() ?
01492                               Security::DPDK_ENHANCED :
01493                               Security::DPDK_ORIGINAL);
01494 #else
01495     const Security::SPDPdiscoveredParticipantData& pdata =
01496       outer_->build_local_pdata(Security::DPDK_ORIGINAL);
01497 #endif
01498 
01499   data_.writerSN.high = seq_.getHigh();
01500   data_.writerSN.low = seq_.getLow();
01501   ++seq_;
01502 
01503   ParameterList plist;
01504   if (ParameterListConverter::to_param_list(pdata, plist) < 0) {
01505     ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
01506       ACE_TEXT("Spdp::SpdpTransport::write() - ")
01507       ACE_TEXT("failed to convert from SPDPdiscoveredParticipantData ")
01508       ACE_TEXT("to ParameterList\n")));
01509     return;
01510   }
01511 
01512   wbuff_.reset();
01513   CORBA::UShort options = 0;
01514   DCPS::Serializer ser(&wbuff_, false, DCPS::Serializer::ALIGN_CDR);
01515   if (!(ser << hdr_) || !(ser << data_) || !(ser << encap_LE) || !(ser << options)
01516       || !(ser << plist)) {
01517     ACE_ERROR((LM_ERROR,
01518       ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::write() - ")
01519       ACE_TEXT("failed to serialize headers for SPDP\n")));
01520     return;
01521   }
01522 
01523   typedef OPENDDS_SET(ACE_INET_Addr)::const_iterator iter_t;
01524   for (iter_t iter = send_addrs_.begin(); iter != send_addrs_.end(); ++iter) {
01525     const ssize_t res =
01526       unicast_socket_.send(wbuff_.rd_ptr(), wbuff_.length(), *iter);
01527     if (res < 0) {
01528       ACE_TCHAR addr_buff[256] = {};
01529       iter->addr_to_string(addr_buff, 256, 0);
01530       ACE_ERROR((LM_ERROR,
01531         ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::write() - ")
01532         ACE_TEXT("destination %s failed %p\n"), addr_buff, ACE_TEXT("send")));
01533     }
01534   }
01535 }
01536 
01537 int
01538 Spdp::SpdpTransport::handle_timeout(const ACE_Time_Value& tv, const void*)
01539 {
01540   if (tv > last_disco_resend_ + disco_resend_period_) {
01541     write();
01542     outer_->remove_expired_participants();
01543     last_disco_resend_ = tv;
01544   }
01545 
01546 #if defined(OPENDDS_SECURITY)
01547   outer_->check_auth_states(tv);
01548 #endif
01549 
01550   return 0;
01551 }
01552 
01553 int
01554 Spdp::SpdpTransport::handle_input(ACE_HANDLE h)
01555 {
01556   const ACE_SOCK_Dgram& socket = (h == unicast_socket_.get_handle())
01557                                  ? unicast_socket_ : multicast_socket_;
01558   ACE_INET_Addr remote;
01559   buff_.reset();
01560   const ssize_t bytes = socket.recv(buff_.wr_ptr(), buff_.space(), remote);
01561 
01562   if (bytes > 0) {
01563     buff_.wr_ptr(bytes);
01564   } else if (bytes == 0) {
01565     return -1;
01566   } else {
01567     ACE_DEBUG((
01568           LM_ERROR,
01569           ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::handle_input() - ")
01570           ACE_TEXT("error reading from %C socket %p\n")
01571           , (h == unicast_socket_.get_handle()) ? "unicast" : "multicast",
01572           ACE_TEXT("ACE_SOCK_Dgram::recv")));
01573     return -1;
01574   }
01575 
01576   // Handle some RTI protocol multicast to the same address
01577   if ((buff_.size() >= 4) && (!ACE_OS::memcmp(buff_.rd_ptr(), "RTPX", 4))) {
01578     return 0; // Ignore
01579   }
01580 
01581   DCPS::Serializer ser(&buff_, false, DCPS::Serializer::ALIGN_CDR);
01582   Header header;
01583   if (!(ser >> header)) {
01584     ACE_ERROR((LM_ERROR,
01585                ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::handle_input() - ")
01586                ACE_TEXT("failed to deserialize RTPS header for SPDP\n")));
01587     return 0;
01588   }
01589 
01590   while (buff_.length() > 3) {
01591     const char subm = buff_.rd_ptr()[0], flags = buff_.rd_ptr()[1];
01592     ser.swap_bytes((flags & FLAG_E) != ACE_CDR_BYTE_ORDER);
01593     const size_t start = buff_.length();
01594     CORBA::UShort submessageLength = 0;
01595     switch (subm) {
01596     case DATA: {
01597       DataSubmessage data;
01598       if (!(ser >> data)) {
01599         ACE_ERROR((
01600               LM_ERROR,
01601               ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::handle_input() - ")
01602               ACE_TEXT("failed to deserialize DATA header for SPDP\n")));
01603         return 0;
01604       }
01605       submessageLength = data.smHeader.submessageLength;
01606 
01607       if (data.writerId != ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER) {
01608         // Not our message: this could be the same multicast group used
01609         // for SEDP and other traffic.
01610         break;
01611       }
01612 
01613       ParameterList plist;
01614       if (data.smHeader.flags & (FLAG_D | FLAG_K_IN_DATA)) {
01615         ser.swap_bytes(!ACE_CDR_BYTE_ORDER); // read "encap" itself in LE
01616         CORBA::UShort encap, options;
01617         if (!(ser >> encap) || (encap != encap_LE && encap != encap_BE)) {
01618           ACE_ERROR((LM_ERROR,
01619             ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::handle_input() - ")
01620             ACE_TEXT("failed to deserialize encapsulation header for SPDP\n")));
01621           return 0;
01622         }
01623         ser >> options;
01624         // bit 8 in encap is on if it's PL_CDR_LE
01625         ser.swap_bytes(((encap & 0x100) >> 8) != ACE_CDR_BYTE_ORDER);
01626         if (!(ser >> plist)) {
01627           ACE_ERROR((LM_ERROR,
01628             ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::handle_input() - ")
01629             ACE_TEXT("failed to deserialize data payload for SPDP\n")));
01630           return 0;
01631         }
01632       } else {
01633         plist.length(1);
01634         RepoId guid;
01635         std::memcpy(guid.guidPrefix, header.guidPrefix, sizeof(GuidPrefix_t));
01636         guid.entityId = ENTITYID_PARTICIPANT;
01637         plist[0].guid(guid);
01638         plist[0]._d(PID_PARTICIPANT_GUID);
01639       }
01640 
01641       outer_->data_received(data, plist);
01642       break;
01643     }
01644     default:
01645       SubmessageHeader smHeader;
01646       if (!(ser >> smHeader)) {
01647         ACE_ERROR((
01648               LM_ERROR,
01649               ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::handle_input() - ")
01650               ACE_TEXT("failed to deserialize SubmessageHeader for SPDP\n")));
01651         return 0;
01652       }
01653       submessageLength = smHeader.submessageLength;
01654       break;
01655     }
01656     if (submessageLength && buff_.length()) {
01657       const size_t read = start - buff_.length();
01658       if (read < static_cast<size_t>(submessageLength + SMHDR_SZ)) {
01659         if (!ser.skip(static_cast<CORBA::UShort>(submessageLength + SMHDR_SZ
01660                                                  - read))) {
01661           ACE_ERROR((LM_ERROR,
01662             ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::handle_input() - ")
01663             ACE_TEXT("failed to skip sub message length\n")));
01664           return 0;
01665         }
01666       }
01667     } else if (!submessageLength) {
01668       break; // submessageLength of 0 indicates the last submessage
01669     }
01670   }
01671 
01672   return 0;
01673 }
01674 
01675 int
01676 Spdp::SpdpTransport::handle_exception(ACE_HANDLE)
01677 {
01678   outer_->wait_for_acks().ack();
01679   return 0;
01680 }
01681 
01682 void
01683 Spdp::SpdpTransport::acknowledge()
01684 {
01685   ACE_Reactor* reactor = outer_->reactor();
01686   reactor->notify(this);
01687 }
01688 
01689 void
01690 Spdp::signal_liveliness(DDS::LivelinessQosPolicyKind kind)
01691 {
01692   sedp_.signal_liveliness(kind);
01693 }
01694 
01695 bool
01696 Spdp::SpdpTransport::open_unicast_socket(u_short port_common,
01697                                          u_short participant_id)
01698 {
01699   const u_short uni_port = port_common + outer_->disco_->d1() +
01700                            (outer_->disco_->pg() * participant_id);
01701 
01702   ACE_INET_Addr local_addr;
01703   OPENDDS_STRING spdpaddr = outer_->disco_->spdp_local_address().c_str();
01704 
01705   if (spdpaddr.empty()) {
01706     spdpaddr = "0.0.0.0";
01707   }
01708 
01709   if (0 != local_addr.set(uni_port, spdpaddr.c_str())) {
01710     ACE_DEBUG((
01711           LM_ERROR,
01712           ACE_TEXT("(%P|%t) Spdp::SpdpTransport::open_unicast_socket() - ")
01713           ACE_TEXT("failed setting unicast local_addr to port %d %p\n"),
01714           uni_port, ACE_TEXT("ACE_INET_Addr::set")));
01715     throw std::runtime_error("failed to set unicast local address");
01716   }
01717 
01718   if (!DCPS::open_appropriate_socket_type(unicast_socket_, local_addr)) {
01719     if (DCPS::DCPS_debug_level > 3) {
01720       ACE_DEBUG((
01721             LM_WARNING,
01722             ACE_TEXT("(%P|%t) Spdp::SpdpTransport::open_unicast_socket() - ")
01723             ACE_TEXT("failed to open_appropriate_socket_type unicast socket on port %d %p.  ")
01724             ACE_TEXT("Trying next participantId...\n"),
01725             uni_port, ACE_TEXT("ACE_SOCK_Dgram::open")));
01726     }
01727     return false;
01728 
01729   } else if (DCPS::DCPS_debug_level > 3) {
01730     ACE_DEBUG((
01731           LM_INFO,
01732           ACE_TEXT("(%P|%t) Spdp::SpdpTransport::open_unicast_socket() - ")
01733           ACE_TEXT("opened unicast socket on port %d\n"),
01734           uni_port));
01735   }
01736 
01737   if (!DCPS::set_socket_multicast_ttl(unicast_socket_, outer_->disco_->ttl())) {
01738     ACE_ERROR((LM_ERROR,
01739                ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::open_unicast_socket() - ")
01740                ACE_TEXT("failed to set TTL value to %d ")
01741                ACE_TEXT("for port:%hd %p\n"),
01742                outer_->disco_->ttl(), uni_port, ACE_TEXT("DCPS::set_socket_multicast_ttl:")));
01743     throw std::runtime_error("failed to set TTL");
01744   }
01745   return true;
01746 }
01747 
01748 bool
01749 Spdp::get_default_locators(const RepoId& part_id, DCPS::LocatorSeq& target,
01750                            bool& inlineQos)
01751 {
01752   DiscoveredParticipantIter part_iter = participants_.find(part_id);
01753   if (part_iter == participants_.end()) {
01754     return false;
01755   } else {
01756     inlineQos = part_iter->second.pdata_.participantProxy.expectsInlineQos;
01757     DCPS::LocatorSeq& mc_source =
01758           part_iter->second.pdata_.participantProxy.defaultMulticastLocatorList;
01759     DCPS::LocatorSeq& uc_source =
01760           part_iter->second.pdata_.participantProxy.defaultUnicastLocatorList;
01761     CORBA::ULong mc_source_len = mc_source.length();
01762     CORBA::ULong uc_source_len = uc_source.length();
01763     CORBA::ULong target_len = target.length();
01764     target.length(mc_source_len + uc_source_len + target_len);
01765     // Copy multicast
01766     for (CORBA::ULong mci = 0; mci < mc_source.length(); ++mci) {
01767       target[target_len + mci] = mc_source[mci];
01768     }
01769     // Copy unicast
01770     for (CORBA::ULong uci = 0; uci < uc_source.length(); ++uci) {
01771       target[target_len + mc_source_len + uci] = uc_source[uci];
01772     }
01773   }
01774   return true;
01775 }
01776 
01777 bool
01778 Spdp::associated() const
01779 {
01780   return !participants_.empty();
01781 }
01782 
01783 bool
01784 Spdp::has_discovered_participant(const DCPS::RepoId& guid)
01785 {
01786   return participants_.find(guid) != participants_.end();
01787 }
01788 
01789 
01790 void
01791 Spdp::get_discovered_participant_ids(DCPS::RepoIdSet& results) const
01792 {
01793   DiscoveredParticipantMap::const_iterator idx;
01794   for (idx = participants_.begin(); idx != participants_.end(); ++idx)
01795   {
01796     results.insert(idx->first);
01797   }
01798 }
01799 
01800 #if defined(OPENDDS_SECURITY)
01801 Spdp::ParticipantCryptoInfoPair
01802 Spdp::lookup_participant_crypto_info(const DCPS::RepoId& id) const
01803 {
01804   ParticipantCryptoInfoPair result = ParticipantCryptoInfoPair(DDS::HANDLE_NIL, DDS::Security::SharedSecretHandle_var());
01805 
01806   ACE_Guard<ACE_Thread_Mutex> g(lock_, false);
01807   DiscoveredParticipantConstIter pi = participants_.find(id);
01808   if (pi != participants_.end()) {
01809     result.first = pi->second.crypto_handle_;
01810     result.second = pi->second.shared_secret_handle_;
01811   }
01812   return result;
01813 }
01814 
01815 void
01816 Spdp::send_participant_crypto_tokens(const DCPS::RepoId& id)
01817 {
01818   if (crypto_tokens_.length() != 0) {
01819     DCPS::RepoId writer = guid_;
01820     writer.entityId = ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_WRITER;
01821 
01822     DCPS::RepoId reader = id;
01823     reader.entityId = ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_READER;
01824 
01825     DDS::Security::ParticipantVolatileMessageSecure msg;
01826     msg.message_identity.source_guid = writer;
01827     msg.message_class_id = DDS::Security::GMCLASSID_SECURITY_PARTICIPANT_CRYPTO_TOKENS;
01828     msg.destination_participant_guid = id;
01829     msg.destination_endpoint_guid = GUID_UNKNOWN; // unknown = whole participant
01830     msg.source_endpoint_guid = GUID_UNKNOWN;
01831     msg.message_data = reinterpret_cast<const DDS::Security::DataHolderSeq&>(crypto_tokens_);
01832 
01833     if (sedp_.write_volatile_message(msg, reader) != DDS::RETCODE_OK) {
01834       ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: Spdp::send_participant_crypto_tokens() - ")
01835         ACE_TEXT("Unable to write volatile message.\n")));
01836     }
01837   }
01838   return;
01839 }
01840 
01841 DDS::Security::PermissionsHandle
01842 Spdp::lookup_participant_permissions(const DCPS::RepoId& id) const
01843 {
01844   DDS::Security::PermissionsHandle result = DDS::HANDLE_NIL;
01845 
01846   ACE_Guard<ACE_Thread_Mutex> g(lock_, false);
01847   DiscoveredParticipantConstIter pi = participants_.find(id);
01848   if (pi != participants_.end()) {
01849     result = pi->second.permissions_handle_;
01850   }
01851   return result;
01852 }
01853 
01854 DCPS::AuthState
01855 Spdp::lookup_participant_auth_state(const DCPS::RepoId& id) const
01856 {
01857   DCPS::AuthState result = DCPS::AS_UNKNOWN;
01858 
01859   ACE_Guard<ACE_Thread_Mutex> g(lock_, false);
01860   DiscoveredParticipantConstIter pi = participants_.find(id);
01861   if (pi != participants_.end()) {
01862     result = pi->second.auth_state_;
01863   }
01864   return result;
01865 }
01866 #endif
01867 
01868 }
01869 }
01870 
01871 OPENDDS_END_VERSIONED_NAMESPACE_DECL
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1