Sedp.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "Sedp.h"
00009 
00010 #include "MessageTypes.h"
00011 #include "ParameterListConverter.h"
00012 #include "RtpsDiscovery.h"
00013 #include "RtpsCoreTypeSupportImpl.h"
00014 
00015 #if defined(OPENDDS_SECURITY)
00016 #include "SecurityHelpers.h"
00017 #endif
00018 
00019 #include "Spdp.h"
00020 
00021 #include "dds/DCPS/transport/framework/ReceivedDataSample.h"
00022 #include "dds/DCPS/transport/rtps_udp/RtpsUdpInst.h"
00023 #include "dds/DCPS/transport/rtps_udp/RtpsUdpInst_rch.h"
00024 
00025 #include "dds/DCPS/Serializer.h"
00026 #include "dds/DCPS/Definitions.h"
00027 #include "dds/DCPS/GuidConverter.h"
00028 #include "dds/DCPS/GuidUtils.h"
00029 #include "dds/DdsDcpsGuidTypeSupportImpl.h"
00030 #include "dds/DCPS/AssociationData.h"
00031 #include "dds/DCPS/Service_Participant.h"
00032 #include "dds/DCPS/Qos_Helper.h"
00033 #include "dds/DCPS/DataSampleHeader.h"
00034 #include "dds/DCPS/SendStateDataSampleList.h"
00035 #include "dds/DCPS/DataReaderCallbacks.h"
00036 #include "dds/DCPS/DataWriterCallbacks.h"
00037 #include "dds/DCPS/Marked_Default_Qos.h"
00038 #include "dds/DCPS/BuiltInTopicUtils.h"
00039 #include "dds/DCPS/DCPS_Utils.h"
00040 #include "dds/DCPS/transport/framework/NetworkAddress.h"
00041 
00042 #include <ace/Reverse_Lock_T.h>
00043 #include <ace/Auto_Ptr.h>
00044 
00045 #include <cstring>
00046 
00047 namespace {
00048 bool qosChanged(DDS::PublicationBuiltinTopicData& dest,
00049                 const DDS::PublicationBuiltinTopicData& src)
00050 {
00051 #ifndef OPENDDS_SAFETY_PROFILE
00052   using OpenDDS::DCPS::operator!=;
00053 #endif
00054   bool changed = false;
00055 
00056   // check each Changeable QoS policy value in Publication BIT Data
00057 
00058   if (dest.deadline != src.deadline) {
00059     changed = true;
00060     dest.deadline = src.deadline;
00061   }
00062 
00063   if (dest.latency_budget != src.latency_budget) {
00064     changed = true;
00065     dest.latency_budget = src.latency_budget;
00066   }
00067 
00068   if (dest.lifespan != src.lifespan) {
00069     changed = true;
00070     dest.lifespan = src.lifespan;
00071   }
00072 
00073   if (dest.user_data != src.user_data) {
00074     changed = true;
00075     dest.user_data = src.user_data;
00076   }
00077 
00078   if (dest.ownership_strength != src.ownership_strength) {
00079     changed = true;
00080     dest.ownership_strength = src.ownership_strength;
00081   }
00082 
00083   if (dest.partition != src.partition) {
00084     changed = true;
00085     dest.partition = src.partition;
00086   }
00087 
00088   if (dest.topic_data != src.topic_data) {
00089     changed = true;
00090     dest.topic_data = src.topic_data;
00091   }
00092 
00093   if (dest.group_data != src.group_data) {
00094     changed = true;
00095     dest.group_data = src.group_data;
00096   }
00097 
00098   return changed;
00099 }
00100 
00101 bool qosChanged(DDS::SubscriptionBuiltinTopicData& dest,
00102                 const DDS::SubscriptionBuiltinTopicData& src)
00103 {
00104 #ifndef OPENDDS_SAFETY_PROFILE
00105   using OpenDDS::DCPS::operator!=;
00106 #endif
00107   bool changed = false;
00108 
00109   // check each Changeable QoS policy value in Subcription BIT Data
00110 
00111   if (dest.deadline != src.deadline) {
00112     changed = true;
00113     dest.deadline = src.deadline;
00114   }
00115 
00116   if (dest.latency_budget != src.latency_budget) {
00117     changed = true;
00118     dest.latency_budget = src.latency_budget;
00119   }
00120 
00121   if (dest.user_data != src.user_data) {
00122     changed = true;
00123     dest.user_data = src.user_data;
00124   }
00125 
00126   if (dest.time_based_filter != src.time_based_filter) {
00127     changed = true;
00128     dest.time_based_filter = src.time_based_filter;
00129   }
00130 
00131   if (dest.partition != src.partition) {
00132     changed = true;
00133     dest.partition = src.partition;
00134   }
00135 
00136   if (dest.topic_data != src.topic_data) {
00137     changed = true;
00138     dest.topic_data = src.topic_data;
00139   }
00140 
00141   if (dest.group_data != src.group_data) {
00142     changed = true;
00143     dest.group_data = src.group_data;
00144   }
00145 
00146   return changed;
00147 }
00148 
00149 bool paramsChanged(OpenDDS::DCPS::ContentFilterProperty_t& dest,
00150                    const OpenDDS::DCPS::ContentFilterProperty_t& src)
00151 {
00152   if (dest.expressionParameters.length() != src.expressionParameters.length()) {
00153     dest.expressionParameters = src.expressionParameters;
00154     return true;
00155   }
00156   for (CORBA::ULong i = 0; i < src.expressionParameters.length(); ++i) {
00157     if (0 != std::strcmp(dest.expressionParameters[i],
00158                          src.expressionParameters[i])) {
00159       dest.expressionParameters = src.expressionParameters;
00160       return true;
00161     }
00162   }
00163   return false;
00164 }
00165 
00166 }
00167 
00168 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00169 
00170 namespace OpenDDS {
00171 namespace RTPS {
00172 using DCPS::RepoId;
00173 using DCPS::make_rch;
00174 
00175 const bool Sedp::host_is_bigendian_(!ACE_CDR_BYTE_ORDER);
00176 
00177 Sedp::Sedp(const RepoId& participant_id, Spdp& owner, ACE_Thread_Mutex& lock) :
00178   DCPS::EndpointManager<Security::SPDPdiscoveredParticipantData>(participant_id, lock),
00179   spdp_(owner),
00180   publications_writer_(make_id(participant_id, ENTITYID_SEDP_BUILTIN_PUBLICATIONS_WRITER), *this),
00181 
00182 #if defined(OPENDDS_SECURITY)
00183   publications_secure_writer_(make_id(participant_id, ENTITYID_SEDP_BUILTIN_PUBLICATIONS_SECURE_WRITER), *this),
00184 #endif
00185 
00186   subscriptions_writer_(make_id(participant_id, ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_WRITER), *this),
00187 
00188 #if defined(OPENDDS_SECURITY)
00189   subscriptions_secure_writer_(make_id(participant_id, ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_SECURE_WRITER), *this),
00190 #endif
00191 
00192   participant_message_writer_(make_id(participant_id, ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_WRITER), *this),
00193 
00194 #if defined(OPENDDS_SECURITY)
00195   participant_message_secure_writer_(make_id(participant_id, ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_SECURE_WRITER), *this),
00196   participant_stateless_message_writer_(make_id(participant_id, ENTITYID_P2P_BUILTIN_PARTICIPANT_STATELESS_WRITER), *this),
00197   participant_volatile_message_secure_writer_(make_id(participant_id, ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_WRITER), *this),
00198   dcps_participant_secure_writer_(make_id(participant_id, ENTITYID_SPDP_RELIABLE_BUILTIN_PARTICIPANT_SECURE_WRITER), *this),
00199 #endif
00200 
00201   publications_reader_(make_rch<Reader>(
00202       make_id(participant_id, ENTITYID_SEDP_BUILTIN_PUBLICATIONS_READER),
00203       ref(*this))),
00204 
00205 #if defined(OPENDDS_SECURITY)
00206   publications_secure_reader_(make_rch<Reader>(
00207       make_id(participant_id, ENTITYID_SEDP_BUILTIN_PUBLICATIONS_SECURE_READER),
00208       ref(*this))),
00209 #endif
00210 
00211   subscriptions_reader_(make_rch<Reader>(
00212       make_id(participant_id, ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_READER),
00213       ref(*this))),
00214 
00215 #if defined(OPENDDS_SECURITY)
00216   subscriptions_secure_reader_(make_rch<Reader>(
00217       make_id(participant_id, ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_SECURE_READER),
00218       ref(*this))),
00219 #endif
00220 
00221   participant_message_reader_(make_rch<Reader>(
00222       make_id(participant_id, ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_READER),
00223       ref(*this))),
00224 
00225 #if defined(OPENDDS_SECURITY)
00226   participant_message_secure_reader_(make_rch<Reader>(
00227       make_id(participant_id, ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_SECURE_READER),
00228       ref(*this))),
00229   participant_stateless_message_reader_(make_rch<Reader>(
00230       make_id(participant_id, ENTITYID_P2P_BUILTIN_PARTICIPANT_STATELESS_READER),
00231       ref(*this))),
00232   participant_volatile_message_secure_reader_(make_rch<Reader>(
00233       make_id(participant_id, ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_READER),
00234       ref(*this))),
00235   dcps_participant_secure_reader_(make_rch<Reader>(
00236       make_id(participant_id, ENTITYID_SPDP_RELIABLE_BUILTIN_PARTICIPANT_SECURE_READER),
00237       ref(*this))),
00238 #endif
00239 
00240   task_(this),
00241 
00242 #if defined(OPENDDS_SECURITY)
00243   secure_automatic_liveliness_seq_ (DCPS::SequenceNumber::SEQUENCENUMBER_UNKNOWN()),
00244   secure_manual_liveliness_seq_ (DCPS::SequenceNumber::SEQUENCENUMBER_UNKNOWN()),
00245 #endif
00246 
00247   automatic_liveliness_seq_ (DCPS::SequenceNumber::SEQUENCENUMBER_UNKNOWN()),
00248   manual_liveliness_seq_ (DCPS::SequenceNumber::SEQUENCENUMBER_UNKNOWN())
00249 {
00250   pub_bit_key_.value[0] = pub_bit_key_.value[1] = pub_bit_key_.value[2] = 0;
00251   sub_bit_key_.value[0] = sub_bit_key_.value[1] = sub_bit_key_.value[2] = 0;
00252 }
00253 
00254 RepoId
00255 Sedp::make_id(const RepoId& participant_id, const EntityId_t& entity)
00256 {
00257   RepoId id = participant_id;
00258   id.entityId = entity;
00259   return id;
00260 }
00261 
00262 DDS::ReturnCode_t
00263 Sedp::init(const RepoId& guid,
00264            const RtpsDiscovery& disco,
00265            DDS::DomainId_t domainId)
00266 {
00267   char domainStr[16];
00268   ACE_OS::snprintf(domainStr, 16, "%d", domainId);
00269 
00270   OPENDDS_STRING key = DCPS::GuidConverter(guid).uniqueId();
00271 
00272   // configure one transport
00273   transport_inst_ = TheTransportRegistry->create_inst(
00274                        DCPS::TransportRegistry::DEFAULT_INST_PREFIX +
00275                        OPENDDS_STRING("_SEDPTransportInst_") + key.c_str() + domainStr,
00276                        "rtps_udp");
00277   // Use a static cast to avoid dependency on the RtpsUdp library
00278   DCPS::RtpsUdpInst_rch rtps_inst =
00279       DCPS::static_rchandle_cast<DCPS::RtpsUdpInst>(transport_inst_);
00280   // The SEDP endpoints may need to wait at least one resend period before
00281   // the handshake completes (allows time for our SPDP multicast to be
00282   // received by the other side).  Arbitrary constant of 5 to account for
00283   // possible network lossiness.
00284   static const double HANDSHAKE_MULTIPLIER = 5;
00285   rtps_inst->handshake_timeout_ = disco.resend_period() * HANDSHAKE_MULTIPLIER;
00286 
00287   if (disco.sedp_multicast()) {
00288     // Bind to a specific multicast group
00289     const u_short mc_port = disco.pb() + disco.dg() * domainId + disco.dx();
00290 
00291     OPENDDS_STRING mc_addr = disco.default_multicast_group();
00292     if (rtps_inst->multicast_group_address_.set(mc_port, mc_addr.c_str())) {
00293       ACE_ERROR((LM_ERROR,
00294                  ACE_TEXT("(%P|%t) ERROR: Sedp::init - ")
00295                  ACE_TEXT("failed setting multicast local_addr to port %hd\n"),
00296                           mc_port));
00297       return DDS::RETCODE_ERROR;
00298     }
00299 
00300     rtps_inst->ttl_ = disco.ttl();
00301     rtps_inst->multicast_interface_ = disco.multicast_interface();
00302 
00303   } else {
00304     rtps_inst->use_multicast_ = false;
00305   }
00306 
00307   const OPENDDS_STRING sedp_addr = disco.sedp_local_address();
00308   if (!sedp_addr.empty()) {
00309     rtps_inst->local_address_config_str_ = sedp_addr;
00310     rtps_inst->local_address_.set(sedp_addr.c_str());
00311   }
00312 
00313   // Create a config
00314   OPENDDS_STRING config_name = DCPS::TransportRegistry::DEFAULT_INST_PREFIX +
00315                             OPENDDS_STRING("_SEDP_TransportCfg_") + key +
00316                             domainStr;
00317   DCPS::TransportConfig_rch transport_cfg =
00318     TheTransportRegistry->create_config(config_name.c_str());
00319   transport_cfg->instances_.push_back(transport_inst_);
00320 
00321   // Configure and enable each reader/writer
00322   rtps_inst->opendds_discovery_default_listener_ = publications_reader_;
00323   rtps_inst->opendds_discovery_guid_ = guid;
00324   const bool reliability = true, durability = true;
00325 
00326 #if defined(OPENDDS_SECURITY)
00327   const bool besteffort = false, nondurable = false;
00328 #endif
00329 
00330   publications_writer_.enable_transport_using_config(reliability, durability, transport_cfg);
00331   publications_reader_->enable_transport_using_config(reliability, durability, transport_cfg);
00332 
00333 #if defined(OPENDDS_SECURITY)
00334   publications_secure_writer_.set_crypto_handles(spdp_.crypto_handle());
00335   publications_secure_reader_->set_crypto_handles(spdp_.crypto_handle());
00336   publications_secure_writer_.enable_transport_using_config(reliability, durability, transport_cfg);
00337   publications_secure_reader_->enable_transport_using_config(reliability, durability, transport_cfg);
00338 #endif
00339 
00340   subscriptions_writer_.enable_transport_using_config(reliability, durability, transport_cfg);
00341   subscriptions_reader_->enable_transport_using_config(reliability, durability, transport_cfg);
00342 
00343 #if defined(OPENDDS_SECURITY)
00344   subscriptions_secure_writer_.set_crypto_handles(spdp_.crypto_handle());
00345   subscriptions_secure_reader_->set_crypto_handles(spdp_.crypto_handle());
00346   subscriptions_secure_writer_.enable_transport_using_config(reliability, durability, transport_cfg);
00347   subscriptions_secure_reader_->enable_transport_using_config(reliability, durability, transport_cfg);
00348 #endif
00349 
00350   participant_message_writer_.enable_transport_using_config(reliability, durability, transport_cfg);
00351   participant_message_reader_->enable_transport_using_config(reliability, durability, transport_cfg);
00352 
00353 #if defined(OPENDDS_SECURITY)
00354   participant_message_secure_writer_.set_crypto_handles(spdp_.crypto_handle());
00355   participant_message_secure_reader_->set_crypto_handles(spdp_.crypto_handle());
00356   participant_message_secure_writer_.enable_transport_using_config(reliability, durability, transport_cfg);
00357   participant_message_secure_reader_->enable_transport_using_config(reliability, durability, transport_cfg);
00358 
00359   participant_stateless_message_writer_.enable_transport_using_config(besteffort, nondurable, transport_cfg);
00360   participant_stateless_message_reader_->enable_transport_using_config(besteffort, nondurable, transport_cfg);
00361 
00362   participant_volatile_message_secure_writer_.set_crypto_handles(spdp_.crypto_handle());
00363   participant_volatile_message_secure_reader_->set_crypto_handles(spdp_.crypto_handle());
00364   participant_volatile_message_secure_writer_.enable_transport_using_config(reliability, nondurable, transport_cfg);
00365   participant_volatile_message_secure_reader_->enable_transport_using_config(reliability, nondurable, transport_cfg);
00366 
00367   dcps_participant_secure_writer_.set_crypto_handles(spdp_.crypto_handle());
00368   dcps_participant_secure_reader_->set_crypto_handles(spdp_.crypto_handle());
00369   dcps_participant_secure_writer_.enable_transport_using_config(reliability, durability, transport_cfg);
00370   dcps_participant_secure_reader_->enable_transport_using_config(reliability, durability, transport_cfg);
00371 #endif
00372 
00373   return DDS::RETCODE_OK;
00374 }
00375 
00376 #if defined(OPENDDS_SECURITY)
00377 DDS::ReturnCode_t Sedp::init_security(DDS::Security::IdentityHandle /* id_handle */,
00378                                       DDS::Security::PermissionsHandle perm_handle,
00379                                       DDS::Security::ParticipantCryptoHandle crypto_handle)
00380 {
00381   using namespace OpenDDS::Security;
00382   using namespace DDS::Security;
00383 
00384   DDS::ReturnCode_t result = DDS::RETCODE_OK;
00385 
00386   CryptoKeyFactory_var key_factory = spdp_.get_security_config()->get_crypto_key_factory();
00387   CryptoKeyExchange_var key_exchange = spdp_.get_security_config()->get_crypto_key_exchange();
00388   AccessControl_var acl = spdp_.get_security_config()->get_access_control();
00389   Authentication_var auth = spdp_.get_security_config()->get_authentication();
00390 
00391   set_permissions_handle(perm_handle);
00392   set_access_control(acl);
00393   set_crypto_key_factory(key_factory);
00394   set_crypto_key_exchange(key_exchange);
00395   crypto_handle_ = crypto_handle;
00396 
00397   // TODO: Handle all exceptions below once error-codes have been defined, etc.
00398   SecurityException ex = {"", 0, 0};
00399 
00400   bool ok = acl->get_participant_sec_attributes(perm_handle, participant_sec_attr_, ex);
00401   if (ok) {
00402 
00403     EndpointSecurityAttributes default_sec_attr;
00404     default_sec_attr.base.is_read_protected = false;
00405     default_sec_attr.base.is_write_protected = false;
00406     default_sec_attr.base.is_discovery_protected = false;
00407     default_sec_attr.base.is_liveliness_protected = false;
00408     default_sec_attr.is_submessage_protected = false;
00409     default_sec_attr.is_payload_protected = false;
00410     default_sec_attr.is_key_protected = false;
00411     default_sec_attr.plugin_endpoint_attributes = 0;
00412 
00413     NativeCryptoHandle h = DDS::HANDLE_NIL;
00414 
00415     const DDS::PartitionQosPolicy& default_part_qos = TheServiceParticipant->initial_PartitionQosPolicy();
00416     const DDS::Security::DataTagQosPolicy default_data_tag_qos; // default is empty sequence
00417 
00418     // Volatile-Message-Secure
00419     {
00420       PropertySeq writer_props(1), reader_props(1);
00421       writer_props.length(1);
00422       writer_props[0].name = "dds.sec.builtin_endpoint_name";
00423       writer_props[0].value = "BuiltinParticipantVolatileMessageSecureWriter";
00424 
00425       reader_props.length(1);
00426       reader_props[0].name = "dds.sec.builtin_endpoint_name";
00427       reader_props[0].value = "BuiltinParticipantVolatileMessageSecureReader";
00428 
00429       EndpointSecurityAttributes dw_sec_attr(default_sec_attr);
00430 
00431       ok = acl->get_datawriter_sec_attributes(perm_handle, "DCPSParticipantVolatileMessageSecure",
00432                                               default_part_qos, default_data_tag_qos, dw_sec_attr, ex);
00433       if (!ok) {
00434         ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: Sedp::init_security() - ")
00435           ACE_TEXT("Failure calling get_datawriter_sec_attributes for topic 'DCPSParticipantVolatileMessageSecure'. ")
00436           ACE_TEXT("Security Exception[%d.%d]: %C\n"), ex.code, ex.minor_code, ex.message.in()));
00437         result = DDS::RETCODE_ERROR;
00438       }
00439 
00440       h = key_factory->register_local_datawriter(crypto_handle, writer_props, dw_sec_attr, ex);
00441       participant_volatile_message_secure_writer_.set_crypto_handles(crypto_handle, h);
00442       local_writer_crypto_handles_[participant_volatile_message_secure_writer_.get_repo_id()] = h;
00443 
00444       EndpointSecurityAttributes dr_sec_attr(default_sec_attr);
00445       ok = acl->get_datareader_sec_attributes(perm_handle, "DCPSParticipantVolatileMessageSecure",
00446                                               default_part_qos, default_data_tag_qos, dr_sec_attr, ex);
00447       if (!ok) {
00448         ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: Sedp::init_security() - ")
00449           ACE_TEXT("Failure calling get_datareader_sec_attributes for topic 'DCPSParticipantVolatileMessageSecure'.")
00450           ACE_TEXT(" Security Exception[%d.%d]: %C\n"), ex.code, ex.minor_code, ex.message.in()));
00451         result = DDS::RETCODE_ERROR;
00452       }
00453 
00454       h = key_factory->register_local_datareader(crypto_handle, reader_props, dr_sec_attr, ex);
00455       participant_volatile_message_secure_reader_->set_crypto_handles(crypto_handle, h);
00456       local_reader_crypto_handles_[participant_volatile_message_secure_reader_->get_repo_id()] = h;
00457     }
00458 
00459     // DCPS-Participant-Message-Secure
00460     {
00461       PropertySeq reader_props, writer_props;
00462 
00463       EndpointSecurityAttributes dw_sec_attr(default_sec_attr);
00464       ok = acl->get_datawriter_sec_attributes(perm_handle, "DCPSParticipantMessageSecure",
00465                                               default_part_qos, default_data_tag_qos, dw_sec_attr, ex);
00466       if (!ok) {
00467         ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: Sedp::init_security() - ")
00468           ACE_TEXT("Failure calling get_datawriter_sec_attributes for topic 'DCPSParticipantMessageSecure'. ")
00469           ACE_TEXT("Security Exception[%d.%d]: %C\n"), ex.code, ex.minor_code, ex.message.in()));
00470         result = DDS::RETCODE_ERROR;
00471       }
00472 
00473       h = key_factory->register_local_datawriter(crypto_handle, writer_props, dw_sec_attr, ex);
00474       participant_message_secure_writer_.set_crypto_handles(crypto_handle, h);
00475       local_writer_crypto_handles_[participant_message_secure_writer_.get_repo_id()] = h;
00476 
00477       EndpointSecurityAttributes dr_sec_attr(default_sec_attr);
00478       ok = acl->get_datareader_sec_attributes(perm_handle, "DCPSParticipantMessageSecure",
00479                                               default_part_qos, default_data_tag_qos, dr_sec_attr, ex);
00480       if (!ok) {
00481         ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: Sedp::init_security() - ")
00482           ACE_TEXT("Failure calling get_datareader_sec_attributes for topic 'DCPSParticipantMessageSecure'. ")
00483           ACE_TEXT("Security Exception[%d.%d]: %C\n"), ex.code, ex.minor_code, ex.message.in()));
00484         result = DDS::RETCODE_ERROR;
00485       }
00486 
00487       h = key_factory->register_local_datareader(crypto_handle, reader_props, dr_sec_attr, ex);
00488       participant_message_secure_reader_->set_crypto_handles(crypto_handle, h);
00489       local_reader_crypto_handles_[participant_message_secure_reader_->get_repo_id()] = h;
00490     }
00491 
00492     // DCPS-Publications-Secure
00493     {
00494       PropertySeq reader_props, writer_props;
00495 
00496       EndpointSecurityAttributes dw_sec_attr(default_sec_attr);
00497       ok = acl->get_datawriter_sec_attributes(perm_handle, "DCPSPublicationsSecure",
00498                                               default_part_qos, default_data_tag_qos, dw_sec_attr, ex);
00499       if (!ok) {
00500         ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: Sedp::init_security() - ")
00501           ACE_TEXT("Failure calling get_datawriter_sec_attributes for topic 'DCPSPublicationsSecure'. ")
00502           ACE_TEXT("Security Exception[%d.%d]: %C\n"), ex.code, ex.minor_code, ex.message.in()));
00503         result = DDS::RETCODE_ERROR;
00504       }
00505 
00506       h = key_factory->register_local_datawriter(crypto_handle, writer_props, dw_sec_attr, ex);
00507       publications_secure_writer_.set_crypto_handles(crypto_handle, h);
00508       local_writer_crypto_handles_[publications_secure_writer_.get_repo_id()] = h;
00509 
00510       EndpointSecurityAttributes dr_sec_attr(default_sec_attr);
00511       ok = acl->get_datareader_sec_attributes(perm_handle, "DCPSPublicationsSecure",
00512                                               default_part_qos, default_data_tag_qos, dr_sec_attr, ex);
00513       if (!ok) {
00514         ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: Sedp::init_security() - ")
00515           ACE_TEXT("Failure calling get_datareader_sec_attributes for topic 'DCPSPublicationsSecure'. ")
00516           ACE_TEXT("Security Exception[%d.%d]: %C\n"), ex.code, ex.minor_code, ex.message.in()));
00517         result = DDS::RETCODE_ERROR;
00518       }
00519 
00520       h = key_factory->register_local_datareader(crypto_handle, reader_props, dr_sec_attr, ex);
00521       publications_secure_reader_->set_crypto_handles(crypto_handle, h);
00522       local_reader_crypto_handles_[publications_secure_reader_->get_repo_id()] = h;
00523     }
00524 
00525     // DCPS-Subscriptions-Secure
00526     {
00527       PropertySeq reader_props, writer_props;
00528 
00529       EndpointSecurityAttributes dw_sec_attr(default_sec_attr);
00530       ok = acl->get_datawriter_sec_attributes(perm_handle, "DCPSSubscriptionsSecure",
00531                                               default_part_qos, default_data_tag_qos, dw_sec_attr, ex);
00532       if (!ok) {
00533         ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: Sedp::init_security() - ")
00534           ACE_TEXT("Failure calling get_datawriter_sec_attributes for topic 'DCPSSubscriptionsSecure'. ")
00535           ACE_TEXT("Security Exception[%d.%d]: %C\n"), ex.code, ex.minor_code, ex.message.in()));
00536         result = DDS::RETCODE_ERROR;
00537       }
00538 
00539       h = key_factory->register_local_datawriter(crypto_handle, writer_props, dw_sec_attr, ex);
00540       subscriptions_secure_writer_.set_crypto_handles(crypto_handle, h);
00541       local_writer_crypto_handles_[subscriptions_secure_writer_.get_repo_id()] = h;
00542 
00543       EndpointSecurityAttributes dr_sec_attr(default_sec_attr);
00544       ok = acl->get_datareader_sec_attributes(perm_handle, "DCPSSubscriptionsSecure",
00545                                               default_part_qos, default_data_tag_qos, dr_sec_attr, ex);
00546       if (!ok) {
00547         ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: Sedp::init_security() - ")
00548           ACE_TEXT("Failure calling get_datareader_sec_attributes for topic 'DCPSSubscriptionsSecure'. ")
00549           ACE_TEXT("Security Exception[%d.%d]: %C\n"), ex.code, ex.minor_code, ex.message.in()));
00550         result = DDS::RETCODE_ERROR;
00551       }
00552 
00553       h = key_factory->register_local_datareader(crypto_handle, reader_props, dr_sec_attr, ex);
00554       subscriptions_secure_reader_->set_crypto_handles(crypto_handle, h);
00555       local_reader_crypto_handles_[subscriptions_secure_reader_->get_repo_id()] = h;
00556     }
00557 
00558     // DCPS-Participants-Secure
00559     {
00560       PropertySeq reader_props, writer_props;
00561 
00562       EndpointSecurityAttributes dw_sec_attr(default_sec_attr);
00563       ok = acl->get_datawriter_sec_attributes(perm_handle, "DCPSParticipantSecure",
00564                                               default_part_qos, default_data_tag_qos, dw_sec_attr, ex);
00565       if (!ok) {
00566         ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: Sedp::init_security() - ")
00567           ACE_TEXT("Failure calling get_datawriter_sec_attributes for topic 'DCPSParticipantSecure'. ")
00568           ACE_TEXT("Security Exception[%d.%d]: %C\n"), ex.code, ex.minor_code, ex.message.in()));
00569         result = DDS::RETCODE_ERROR;
00570       }
00571 
00572       h = key_factory->register_local_datawriter(crypto_handle, writer_props, dw_sec_attr, ex);
00573       dcps_participant_secure_writer_.set_crypto_handles(crypto_handle, h);
00574       local_writer_crypto_handles_[dcps_participant_secure_writer_.get_repo_id()] = h;
00575 
00576       EndpointSecurityAttributes dr_sec_attr(default_sec_attr);
00577       ok = acl->get_datareader_sec_attributes(perm_handle, "DCPSParticipantSecure",
00578                                               default_part_qos, default_data_tag_qos, dr_sec_attr, ex);
00579       if (!ok) {
00580         ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: Sedp::init_security() - ")
00581           ACE_TEXT("Failure calling get_datareader_sec_attributes for topic 'DCPSParticipantSecure'. ")
00582           ACE_TEXT("Security Exception[%d.%d]: %C\n"), ex.code, ex.minor_code, ex.message.in()));
00583         result = DDS::RETCODE_ERROR;
00584       }
00585 
00586       h = key_factory->register_local_datareader(crypto_handle, reader_props, dr_sec_attr, ex);
00587       dcps_participant_secure_reader_->set_crypto_handles(crypto_handle, h);
00588       local_reader_crypto_handles_[dcps_participant_secure_reader_->get_repo_id()] = h;
00589     }
00590 
00591   } else {
00592     ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: Sedp::init_security() - ")
00593       ACE_TEXT("Failure calling get_participant_sec_attributes. ")
00594       ACE_TEXT("Security Exception[%d.%d]: %C\n"), ex.code, ex.minor_code, ex.message.in()));
00595     result = DDS::RETCODE_ERROR;
00596   }
00597   return result;
00598 }
00599 #endif
00600 
00601 void
00602 Sedp::unicast_locators(DCPS::LocatorSeq& locators) const
00603 {
00604   DCPS::RtpsUdpInst_rch rtps_inst =
00605     DCPS::static_rchandle_cast<DCPS::RtpsUdpInst>(transport_inst_);
00606   using namespace OpenDDS::RTPS;
00607 
00608   CORBA::ULong idx = 0;
00609 
00610   // multicast first so it's preferred by remote peers
00611   if (rtps_inst->use_multicast_ && rtps_inst->multicast_group_address_ != ACE_INET_Addr()) {
00612     idx = locators.length();
00613     locators.length(idx + 1);
00614     locators[idx].kind = address_to_kind(rtps_inst->multicast_group_address_);
00615     locators[idx].port = rtps_inst->multicast_group_address_.get_port_number();
00616     RTPS::address_to_bytes(locators[idx].address,
00617       rtps_inst->multicast_group_address_);
00618   }
00619 
00620   //if local_address_string is empty, or only the port has been set
00621   //need to get interface addresses to populate into the locator
00622   if (rtps_inst->local_address_config_str_.empty() ||
00623       rtps_inst->local_address_config_str_.rfind(':') == 0) {
00624     typedef OPENDDS_VECTOR(ACE_INET_Addr) AddrVector;
00625     AddrVector addrs;
00626     if (TheServiceParticipant->default_address ().empty ()) {
00627       DCPS::get_interface_addrs(addrs);
00628     } else {
00629       addrs.push_back(ACE_INET_Addr(static_cast<u_short>(0), TheServiceParticipant->default_address().c_str()));
00630     }
00631     for (AddrVector::iterator adr_it = addrs.begin(); adr_it != addrs.end(); ++adr_it) {
00632       idx = locators.length();
00633       locators.length(idx + 1);
00634       locators[idx].kind = address_to_kind(*adr_it);
00635       locators[idx].port = rtps_inst->local_address_.get_port_number();
00636       RTPS::address_to_bytes(locators[idx].address,
00637         *adr_it);
00638     }
00639   } else {
00640     idx = locators.length();
00641     locators.length(idx + 1);
00642     locators[idx].kind = address_to_kind(rtps_inst->local_address_);
00643     locators[idx].port = rtps_inst->local_address_.get_port_number();
00644     RTPS::address_to_bytes(locators[idx].address,
00645       rtps_inst->local_address_);
00646   }
00647 }
00648 
00649 const ACE_INET_Addr&
00650 Sedp::local_address() const
00651 {
00652   DCPS::RtpsUdpInst_rch rtps_inst =
00653       DCPS::static_rchandle_cast<DCPS::RtpsUdpInst>(transport_inst_);
00654   return rtps_inst->local_address_;
00655 }
00656 
00657 const ACE_INET_Addr&
00658 Sedp::multicast_group() const
00659 {
00660   DCPS::RtpsUdpInst_rch rtps_inst =
00661       DCPS::static_rchandle_cast<DCPS::RtpsUdpInst>(transport_inst_);
00662   return rtps_inst->multicast_group_address_;
00663 }
00664 bool
00665 Sedp::map_ipv4_to_ipv6() const
00666 {
00667   bool map = false;
00668   if (local_address().get_type() != AF_INET) {
00669     map = true;
00670   }
00671   return map;
00672 }
00673 
00674 void
00675 Sedp::assign_bit_key(DiscoveredPublication& pub)
00676 {
00677   increment_key(pub_bit_key_);
00678   pub_key_to_id_[pub_bit_key_] = pub.writer_data_.writerProxy.remoteWriterGuid;
00679   pub.writer_data_.ddsPublicationData.key = pub_bit_key_;
00680 }
00681 
00682 void
00683 Sedp::assign_bit_key(DiscoveredSubscription& sub)
00684 {
00685   increment_key(sub_bit_key_);
00686   sub_key_to_id_[sub_bit_key_] = sub.reader_data_.readerProxy.remoteReaderGuid;
00687   sub.reader_data_.ddsSubscriptionData.key = sub_bit_key_;
00688 }
00689 
00690 void
00691 create_association_data_proto(DCPS::AssociationData& proto,
00692                               const Security::SPDPdiscoveredParticipantData& pdata) {
00693   proto.publication_transport_priority_ = 0;
00694   proto.remote_reliable_ = true;
00695   proto.remote_durable_ = true;
00696   std::memcpy(proto.remote_id_.guidPrefix, pdata.participantProxy.guidPrefix,
00697               sizeof(GuidPrefix_t));
00698 
00699   const DCPS::LocatorSeq& mll =
00700     pdata.participantProxy.metatrafficMulticastLocatorList;
00701   const DCPS::LocatorSeq& ull =
00702     pdata.participantProxy.metatrafficUnicastLocatorList;
00703   const CORBA::ULong locator_count = mll.length() + ull.length();
00704 
00705   ACE_Message_Block mb_locator(4 + locator_count * sizeof(DCPS::Locator_t) + 1);
00706   using DCPS::Serializer;
00707   Serializer ser_loc(&mb_locator, ACE_CDR_BYTE_ORDER, Serializer::ALIGN_CDR);
00708   ser_loc << locator_count;
00709 
00710   for (CORBA::ULong i = 0; i < mll.length(); ++i) {
00711     ser_loc << mll[i];
00712   }
00713   for (CORBA::ULong i = 0; i < ull.length(); ++i) {
00714     ser_loc << ull[i];
00715   }
00716   ser_loc << ACE_OutputCDR::from_boolean(false); // requires_inline_qos
00717 
00718   proto.remote_data_.length(1);
00719   proto.remote_data_[0].transport_type = "rtps_udp";
00720   message_block_to_sequence (mb_locator, proto.remote_data_[0].data);
00721 }
00722 
00723 
00724 #if defined(OPENDDS_SECURITY)
00725 void
00726 Sedp::associate_preauth(const Security::SPDPdiscoveredParticipantData& pdata)
00727 {
00728   // First create a 'prototypical' instance of AssociationData.  It will
00729   // be copied and modified for each of the (up to) four SEDP Endpoints.
00730   DCPS::AssociationData proto;
00731   create_association_data_proto(proto, pdata);
00732 
00733   const BuiltinEndpointSet_t& avail =
00734     pdata.participantProxy.availableBuiltinEndpoints;
00735   /*
00736    * Stateless messages are associated here because they are the first step in the
00737    * security-enablement process and as such they are sent in the clear.
00738    */
00739 
00740   if (avail & DDS::Security::BUILTIN_PARTICIPANT_STATELESS_MESSAGE_WRITER) {
00741     DCPS::AssociationData peer = proto;
00742     peer.remote_id_.entityId = ENTITYID_P2P_BUILTIN_PARTICIPANT_STATELESS_WRITER;
00743     participant_stateless_message_reader_->assoc(peer);
00744   }
00745 
00746   if (avail & DDS::Security::BUILTIN_PARTICIPANT_STATELESS_MESSAGE_READER) {
00747     DCPS::AssociationData peer = proto;
00748     peer.remote_id_.entityId = ENTITYID_P2P_BUILTIN_PARTICIPANT_STATELESS_READER;
00749     participant_stateless_message_writer_.assoc(peer);
00750   }
00751 }
00752 #endif
00753 
00754 void
00755 Sedp::associate(const Security::SPDPdiscoveredParticipantData& pdata)
00756 {
00757   // First create a 'prototypical' instance of AssociationData.  It will
00758   // be copied and modified for each of the (up to) four SEDP Endpoints.
00759   DCPS::AssociationData proto;
00760   create_association_data_proto(proto, pdata);
00761 
00762   const BuiltinEndpointSet_t& avail =
00763     pdata.participantProxy.availableBuiltinEndpoints;
00764 
00765   // See RTPS v2.1 section 8.5.5.1
00766   if (avail & DISC_BUILTIN_ENDPOINT_PUBLICATION_ANNOUNCER) {
00767     DCPS::AssociationData peer = proto;
00768     peer.remote_id_.entityId = ENTITYID_SEDP_BUILTIN_PUBLICATIONS_WRITER;
00769     publications_reader_->assoc(peer);
00770   }
00771   if (avail & DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_ANNOUNCER) {
00772     DCPS::AssociationData peer = proto;
00773     peer.remote_id_.entityId = ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_WRITER;
00774     subscriptions_reader_->assoc(peer);
00775   }
00776   if (avail & BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_DATA_WRITER) {
00777     DCPS::AssociationData peer = proto;
00778     peer.remote_id_.entityId = ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_WRITER;
00779     participant_message_reader_->assoc(peer);
00780   }
00781 
00782   DCPS::unique_ptr<Security::SPDPdiscoveredParticipantData> dpd(
00783     new Security::SPDPdiscoveredParticipantData(pdata));
00784 
00785   task_.enqueue(DCPS::SAMPLE_DATA, move(dpd));
00786 }
00787 
00788 #if defined(OPENDDS_SECURITY)
00789 void Sedp::associate_volatile(const Security::SPDPdiscoveredParticipantData& pdata)
00790 {
00791   using namespace DDS::Security;
00792 
00793   DCPS::AssociationData proto;
00794   create_association_data_proto(proto, pdata);
00795 
00796   DCPS::RepoId part = proto.remote_id_;
00797   part.entityId = ENTITYID_PARTICIPANT;
00798 
00799   const BuiltinEndpointSet_t& avail = pdata.participantProxy.availableBuiltinEndpoints;
00800 
00801   if (avail & BUILTIN_PARTICIPANT_VOLATILE_MESSAGE_SECURE_WRITER) {
00802     DCPS::AssociationData peer = proto;
00803     peer.remote_id_.entityId = ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_WRITER;
00804     remote_writer_crypto_handles_[peer.remote_id_] = generate_remote_matched_writer_crypto_handle(
00805       part, participant_volatile_message_secure_reader_->get_endpoint_crypto_handle());
00806     peer.remote_data_ = add_security_info(
00807       peer.remote_data_, peer.remote_id_, participant_volatile_message_secure_reader_->get_repo_id());
00808     participant_volatile_message_secure_reader_->assoc(peer);
00809   }
00810   if (avail & BUILTIN_PARTICIPANT_VOLATILE_MESSAGE_SECURE_READER) {
00811     DCPS::AssociationData peer = proto;
00812     peer.remote_id_.entityId = ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_READER;
00813     remote_reader_crypto_handles_[peer.remote_id_] = generate_remote_matched_reader_crypto_handle(
00814       part, participant_volatile_message_secure_writer_.get_endpoint_crypto_handle(), false);
00815     peer.remote_data_ = add_security_info(
00816       peer.remote_data_, participant_volatile_message_secure_writer_.get_repo_id(), peer.remote_id_);
00817     participant_volatile_message_secure_writer_.assoc(peer);
00818   }
00819 }
00820 
00821 void Sedp::associate_secure_writers_to_readers(const Security::SPDPdiscoveredParticipantData& pdata)
00822 {
00823   using namespace DDS::Security;
00824 
00825   DCPS::AssociationData proto;
00826   create_association_data_proto(proto, pdata);
00827 
00828   DCPS::RepoId part = proto.remote_id_;
00829   part.entityId = ENTITYID_PARTICIPANT;
00830 
00831   const BuiltinEndpointSet_t& avail = pdata.participantProxy.availableBuiltinEndpoints;
00832 
00833   if (avail & BUILTIN_PARTICIPANT_MESSAGE_SECURE_WRITER) {
00834     DCPS::AssociationData peer = proto;
00835     peer.remote_id_.entityId = ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_SECURE_WRITER;
00836     remote_writer_crypto_handles_[peer.remote_id_] = generate_remote_matched_writer_crypto_handle(
00837       part, participant_message_secure_reader_->get_endpoint_crypto_handle());
00838     peer.remote_data_ = add_security_info(
00839       peer.remote_data_, peer.remote_id_, participant_message_secure_reader_->get_repo_id());
00840     participant_message_secure_reader_->assoc(peer);
00841   }
00842   if (avail & SPDP_BUILTIN_PARTICIPANT_SECURE_WRITER) {
00843     DCPS::AssociationData peer = proto;
00844     peer.remote_id_.entityId = ENTITYID_SPDP_RELIABLE_BUILTIN_PARTICIPANT_SECURE_WRITER;
00845     remote_writer_crypto_handles_[peer.remote_id_] = generate_remote_matched_writer_crypto_handle(
00846       part, dcps_participant_secure_reader_->get_endpoint_crypto_handle());
00847     peer.remote_data_ = add_security_info(
00848       peer.remote_data_, peer.remote_id_, dcps_participant_secure_reader_->get_repo_id());
00849     dcps_participant_secure_reader_->assoc(peer);
00850   }
00851   if (avail & SEDP_BUILTIN_PUBLICATIONS_SECURE_WRITER) {
00852     DCPS::AssociationData peer = proto;
00853     peer.remote_id_.entityId = ENTITYID_SEDP_BUILTIN_PUBLICATIONS_SECURE_WRITER;
00854     remote_writer_crypto_handles_[peer.remote_id_] = generate_remote_matched_writer_crypto_handle(
00855       part, publications_secure_reader_->get_endpoint_crypto_handle());
00856     peer.remote_data_ = add_security_info(
00857       peer.remote_data_, peer.remote_id_, publications_secure_reader_->get_repo_id());
00858     publications_secure_reader_->assoc(peer);
00859   }
00860   if (avail & SEDP_BUILTIN_SUBSCRIPTIONS_SECURE_WRITER) {
00861     DCPS::AssociationData peer = proto;
00862     peer.remote_id_.entityId = ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_SECURE_WRITER;
00863     remote_writer_crypto_handles_[peer.remote_id_] = generate_remote_matched_writer_crypto_handle(
00864       part, subscriptions_secure_reader_->get_endpoint_crypto_handle());
00865     peer.remote_data_ = add_security_info(
00866       peer.remote_data_, peer.remote_id_, subscriptions_secure_reader_->get_repo_id());
00867     subscriptions_secure_reader_->assoc(peer);
00868   }
00869 }
00870 
00871 void Sedp::associate_secure_readers_to_writers(const Security::SPDPdiscoveredParticipantData& pdata)
00872 {
00873   using namespace DDS::Security;
00874 
00875   DCPS::AssociationData proto;
00876   create_association_data_proto(proto, pdata);
00877 
00878   DCPS::RepoId part = proto.remote_id_;
00879   part.entityId = ENTITYID_PARTICIPANT;
00880 
00881   const BuiltinEndpointSet_t& avail = pdata.participantProxy.availableBuiltinEndpoints;
00882 
00883   if (avail & BUILTIN_PARTICIPANT_MESSAGE_SECURE_READER) {
00884     DCPS::AssociationData peer = proto;
00885     peer.remote_id_.entityId = ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_SECURE_READER;
00886     remote_reader_crypto_handles_[peer.remote_id_] = generate_remote_matched_reader_crypto_handle(
00887       part, participant_message_secure_writer_.get_endpoint_crypto_handle(), false);
00888     peer.remote_data_ = add_security_info(
00889       peer.remote_data_, participant_message_secure_writer_.get_repo_id(), peer.remote_id_);
00890     participant_message_secure_writer_.assoc(peer);
00891   }
00892   if (avail & SPDP_BUILTIN_PARTICIPANT_SECURE_READER) {
00893     DCPS::AssociationData peer = proto;
00894     peer.remote_id_.entityId = ENTITYID_SPDP_RELIABLE_BUILTIN_PARTICIPANT_SECURE_READER;
00895     remote_reader_crypto_handles_[peer.remote_id_] = generate_remote_matched_reader_crypto_handle(
00896       part, dcps_participant_secure_writer_.get_endpoint_crypto_handle(), false);
00897     peer.remote_data_ = add_security_info(
00898       peer.remote_data_, dcps_participant_secure_writer_.get_repo_id(), peer.remote_id_);
00899     dcps_participant_secure_writer_.assoc(peer);
00900   }
00901   if (avail & SEDP_BUILTIN_PUBLICATIONS_SECURE_READER) {
00902     DCPS::AssociationData peer = proto;
00903     peer.remote_id_.entityId = ENTITYID_SEDP_BUILTIN_PUBLICATIONS_SECURE_READER;
00904     remote_reader_crypto_handles_[peer.remote_id_] = generate_remote_matched_reader_crypto_handle(
00905       part, publications_secure_writer_.get_endpoint_crypto_handle(), false);
00906     peer.remote_data_ = add_security_info(
00907       peer.remote_data_, publications_secure_writer_.get_repo_id(), peer.remote_id_);
00908     publications_secure_writer_.assoc(peer);
00909   }
00910   if (avail & SEDP_BUILTIN_SUBSCRIPTIONS_SECURE_READER) {
00911     DCPS::AssociationData peer = proto;
00912     peer.remote_id_.entityId = ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_SECURE_READER;
00913     remote_reader_crypto_handles_[peer.remote_id_] = generate_remote_matched_reader_crypto_handle(
00914       part, subscriptions_secure_writer_.get_endpoint_crypto_handle(), false);
00915     peer.remote_data_ = add_security_info(
00916       peer.remote_data_, subscriptions_secure_writer_.get_repo_id(), peer.remote_id_);
00917     subscriptions_secure_writer_.assoc(peer);
00918   }
00919 }
00920 
00921 void
00922 Sedp::send_builtin_crypto_tokens(const DCPS::RepoId& dstParticipant, const DCPS::EntityId_t& dstEntity, const DCPS::RepoId& src)
00923 {
00924   DCPS::RepoId dst = dstParticipant;
00925   dst.entityId = dstEntity;
00926   if (DCPS::GuidConverter(src).entityKind() == DCPS::KIND_READER) {
00927     create_and_send_datareader_crypto_tokens(local_reader_crypto_handles_[src],
00928                                              src, remote_writer_crypto_handles_[dst],
00929                                              dst);
00930   } else {
00931     create_and_send_datawriter_crypto_tokens(local_writer_crypto_handles_[src],
00932                                              src, remote_reader_crypto_handles_[dst],
00933                                              dst);
00934   }
00935 }
00936 
00937 void
00938 Sedp::send_builtin_crypto_tokens(const Security::SPDPdiscoveredParticipantData& pdata)
00939 {
00940   using namespace DDS::Security;
00941 
00942   DCPS::RepoId part;
00943   std::memcpy(part.guidPrefix, pdata.participantProxy.guidPrefix, sizeof(GuidPrefix_t));
00944   part.entityId = ENTITYID_PARTICIPANT;
00945 
00946   const BuiltinEndpointSet_t& avail = pdata.participantProxy.availableBuiltinEndpoints;
00947 
00948   if (avail & BUILTIN_PARTICIPANT_MESSAGE_SECURE_WRITER) {
00949     send_builtin_crypto_tokens(part, ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_SECURE_WRITER,
00950                                participant_message_secure_reader_->get_repo_id());
00951   }
00952 
00953   if (avail & SPDP_BUILTIN_PARTICIPANT_SECURE_WRITER) {
00954     send_builtin_crypto_tokens(part, ENTITYID_SPDP_RELIABLE_BUILTIN_PARTICIPANT_SECURE_WRITER,
00955                                dcps_participant_secure_reader_->get_repo_id());
00956   }
00957 
00958   if (avail & SEDP_BUILTIN_PUBLICATIONS_SECURE_WRITER) {
00959     send_builtin_crypto_tokens(part, ENTITYID_SEDP_BUILTIN_PUBLICATIONS_SECURE_WRITER,
00960                                publications_secure_reader_->get_repo_id());
00961   }
00962 
00963   if (avail & SEDP_BUILTIN_SUBSCRIPTIONS_SECURE_WRITER) {
00964     send_builtin_crypto_tokens(part, ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_SECURE_WRITER,
00965                                subscriptions_secure_reader_->get_repo_id());
00966   }
00967 
00968   if (avail & BUILTIN_PARTICIPANT_MESSAGE_SECURE_READER) {
00969     send_builtin_crypto_tokens(part, ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_SECURE_READER,
00970                                participant_message_secure_writer_.get_repo_id());
00971   }
00972 
00973   if (avail & SPDP_BUILTIN_PARTICIPANT_SECURE_READER) {
00974     send_builtin_crypto_tokens(part, ENTITYID_SPDP_RELIABLE_BUILTIN_PARTICIPANT_SECURE_READER,
00975                                dcps_participant_secure_writer_.get_repo_id());
00976   }
00977 
00978   if (avail & SEDP_BUILTIN_PUBLICATIONS_SECURE_READER) {
00979     send_builtin_crypto_tokens(part, ENTITYID_SEDP_BUILTIN_PUBLICATIONS_SECURE_READER,
00980                                publications_secure_writer_.get_repo_id());
00981   }
00982 
00983   if (avail & SEDP_BUILTIN_SUBSCRIPTIONS_SECURE_READER) {
00984     send_builtin_crypto_tokens(part, ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_SECURE_READER,
00985                                subscriptions_secure_writer_.get_repo_id());
00986   }
00987 }
00988 #endif
00989 
00990 void
00991 Sedp::Task::svc_i(const Security::SPDPdiscoveredParticipantData* ppdata)
00992 {
00993   DCPS::unique_ptr<const Security::SPDPdiscoveredParticipantData> pdata(ppdata);
00994 
00995   // First create a 'prototypical' instance of AssociationData.  It will
00996   // be copied and modified for each of the (up to) four SEDP Endpoints.
00997   DCPS::AssociationData proto;
00998   create_association_data_proto(proto, *pdata);
00999 
01000   const BuiltinEndpointSet_t& avail =
01001     pdata->participantProxy.availableBuiltinEndpoints;
01002 
01003   // See RTPS v2.1 section 8.5.5.1
01004   if (avail & DISC_BUILTIN_ENDPOINT_PUBLICATION_DETECTOR) {
01005     DCPS::AssociationData peer = proto;
01006     peer.remote_id_.entityId = ENTITYID_SEDP_BUILTIN_PUBLICATIONS_READER;
01007     sedp_->publications_writer_.assoc(peer);
01008   }
01009   if (avail & DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_DETECTOR) {
01010     DCPS::AssociationData peer = proto;
01011     peer.remote_id_.entityId = ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_READER;
01012     sedp_->subscriptions_writer_.assoc(peer);
01013   }
01014   if (avail & BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_DATA_READER) {
01015     DCPS::AssociationData peer = proto;
01016     peer.remote_id_.entityId = ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_READER;
01017     sedp_->participant_message_writer_.assoc(peer);
01018   }
01019 
01020 #if defined(OPENDDS_SECURITY)
01021   if (sedp_->is_security_enabled()) {
01022     sedp_->associate_secure_readers_to_writers(*pdata);
01023   }
01024 #endif
01025 
01026   //FUTURE: if/when topic propagation is supported, add it here
01027 
01028   // Process deferred publications and subscriptions.
01029   for (DeferredSubscriptionMap::iterator pos = sedp_->deferred_subscriptions_.lower_bound(proto.remote_id_),
01030          limit = sedp_->deferred_subscriptions_.upper_bound(proto.remote_id_);
01031        pos != limit;
01032        /* Increment in body. */) {
01033     sedp_->data_received (pos->second.first, pos->second.second);
01034     sedp_->deferred_subscriptions_.erase (pos++);
01035   }
01036   for (DeferredPublicationMap::iterator pos = sedp_->deferred_publications_.lower_bound(proto.remote_id_),
01037          limit = sedp_->deferred_publications_.upper_bound(proto.remote_id_);
01038        pos != limit;
01039        /* Increment in body. */) {
01040     sedp_->data_received (pos->second.first, pos->second.second);
01041     sedp_->deferred_publications_.erase (pos++);
01042   }
01043 
01044   ACE_GUARD(ACE_Thread_Mutex, g, sedp_->lock_);
01045   if (spdp_->shutting_down()) { return; }
01046 
01047   proto.remote_id_.entityId = ENTITYID_PARTICIPANT;
01048   sedp_->associated_participants_.insert(proto.remote_id_);
01049 
01050 #if defined(OPENDDS_SECURITY)
01051   if (sedp_->is_security_enabled()) {
01052     spdp_->send_participant_crypto_tokens(proto.remote_id_);
01053     sedp_->send_builtin_crypto_tokens(*pdata);
01054   }
01055 #endif
01056 
01057   // Write durable data
01058   if (avail & DISC_BUILTIN_ENDPOINT_PUBLICATION_DETECTOR) {
01059     proto.remote_id_.entityId = ENTITYID_SEDP_BUILTIN_PUBLICATIONS_READER;
01060     sedp_->write_durable_publication_data(proto.remote_id_);
01061   }
01062   if (avail & DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_DETECTOR) {
01063     proto.remote_id_.entityId = ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_READER;
01064     sedp_->write_durable_subscription_data(proto.remote_id_);
01065   }
01066   if (avail & BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_DATA_READER) {
01067     proto.remote_id_.entityId = ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_READER;
01068     sedp_->write_durable_participant_message_data(proto.remote_id_);
01069   }
01070 
01071   for (DCPS::RepoIdSet::iterator it = sedp_->defer_match_endpoints_.begin();
01072        it != sedp_->defer_match_endpoints_.end(); /*incremented in body*/) {
01073     if (0 == std::memcmp(it->guidPrefix, proto.remote_id_.guidPrefix,
01074                          sizeof(GuidPrefix_t))) {
01075       OPENDDS_STRING topic;
01076       if (it->entityId.entityKind & 4) {
01077         DiscoveredSubscriptionIter dsi =
01078           sedp_->discovered_subscriptions_.find(*it);
01079         if (dsi != sedp_->discovered_subscriptions_.end()) {
01080           topic = dsi->second.reader_data_.ddsSubscriptionData.topic_name;
01081         }
01082       } else {
01083         DiscoveredPublicationIter dpi =
01084           sedp_->discovered_publications_.find(*it);
01085         if (dpi != sedp_->discovered_publications_.end()) {
01086           topic = dpi->second.writer_data_.ddsPublicationData.topic_name;
01087         }
01088       }
01089       if (DCPS::DCPS_debug_level > 3) {
01090         ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) Sedp::AssociateTask::svc - ")
01091           ACE_TEXT("processing deferred endpoints for topic %C\n"),
01092           topic.c_str()));
01093       }
01094       if (!topic.empty()) {
01095         OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator ti =
01096           sedp_->topics_.find(topic);
01097         if (ti != sedp_->topics_.end()) {
01098           if (DCPS::DCPS_debug_level > 3) {
01099             DCPS::GuidConverter conv(*it);
01100             ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) Sedp::AssociateTask::svc - ")
01101               ACE_TEXT("calling match_endpoints %C\n"),
01102               OPENDDS_STRING(conv).c_str()));
01103           }
01104           sedp_->match_endpoints(*it, ti->second);
01105           if (spdp_->shutting_down()) { return; }
01106         }
01107       }
01108       sedp_->defer_match_endpoints_.erase(it++);
01109     } else {
01110       ++it;
01111     }
01112   }
01113 }
01114 
01115 void
01116 Sedp::Task::svc_secure_i(DCPS::MessageId id,
01117                          const Security::SPDPdiscoveredParticipantData* ppdata)
01118 {
01119   DCPS::unique_ptr<const Security::SPDPdiscoveredParticipantData> pdata(ppdata);
01120   spdp_->handle_participant_data(id, *pdata);
01121 }
01122 
01123 namespace {
01124   void disassociate_helper(const BuiltinEndpointSet_t& avail, const CORBA::ULong flags,
01125                            const RepoId& id, const EntityId_t& ent, DCPS::TransportClient& client)
01126   {
01127     if (avail & flags) {
01128       RepoId temp = id;
01129       temp.entityId = ent;
01130       client.disassociate(temp);
01131     }
01132   }
01133 }
01134 
01135 
01136 bool
01137 Sedp::disassociate(const Security::SPDPdiscoveredParticipantData& pdata)
01138 {
01139   RepoId part;
01140   std::memcpy(part.guidPrefix, pdata.participantProxy.guidPrefix,
01141               sizeof(GuidPrefix_t));
01142   part.entityId = ENTITYID_PARTICIPANT;
01143   associated_participants_.erase(part);
01144   const BuiltinEndpointSet_t avail =
01145     pdata.participantProxy.availableBuiltinEndpoints;
01146 
01147   { // Release lock, so we can call into transport
01148     ACE_Reverse_Lock<ACE_Thread_Mutex> rev_lock(lock_);
01149     ACE_GUARD_RETURN(ACE_Reverse_Lock< ACE_Thread_Mutex>, rg, rev_lock, false);
01150 
01151     disassociate_helper(avail, DISC_BUILTIN_ENDPOINT_PUBLICATION_DETECTOR, part,
01152       ENTITYID_SEDP_BUILTIN_PUBLICATIONS_READER, publications_writer_);
01153     disassociate_helper(avail, DISC_BUILTIN_ENDPOINT_PUBLICATION_ANNOUNCER, part,
01154       ENTITYID_SEDP_BUILTIN_PUBLICATIONS_WRITER, *publications_reader_);
01155 
01156     disassociate_helper(avail, DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_DETECTOR, part,
01157       ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_READER, subscriptions_writer_);
01158     disassociate_helper(avail, DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_ANNOUNCER, part,
01159       ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_WRITER, *subscriptions_reader_);
01160 
01161     disassociate_helper(avail, BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_DATA_READER, part,
01162       ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_READER, participant_message_writer_);
01163     disassociate_helper(avail, BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_DATA_WRITER, part,
01164       ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_WRITER, *participant_message_reader_);
01165 
01166     //FUTURE: if/when topic propagation is supported, add it here
01167 
01168 #if defined(OPENDDS_SECURITY)
01169     using namespace DDS::Security;
01170 
01171     disassociate_helper(avail, SEDP_BUILTIN_PUBLICATIONS_SECURE_READER, part,
01172       ENTITYID_SEDP_BUILTIN_PUBLICATIONS_SECURE_READER, publications_secure_writer_);
01173     disassociate_helper(avail, SEDP_BUILTIN_PUBLICATIONS_SECURE_WRITER, part,
01174       ENTITYID_SEDP_BUILTIN_PUBLICATIONS_SECURE_WRITER, *publications_secure_reader_);
01175 
01176     disassociate_helper(avail, SEDP_BUILTIN_SUBSCRIPTIONS_SECURE_READER, part,
01177       ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_SECURE_READER, subscriptions_secure_writer_);
01178     disassociate_helper(avail, SEDP_BUILTIN_SUBSCRIPTIONS_SECURE_WRITER, part,
01179       ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_SECURE_WRITER, *subscriptions_secure_reader_);
01180 
01181     disassociate_helper(avail, BUILTIN_PARTICIPANT_MESSAGE_SECURE_READER, part,
01182       ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_SECURE_READER, participant_message_secure_writer_);
01183     disassociate_helper(avail, BUILTIN_PARTICIPANT_MESSAGE_SECURE_WRITER, part,
01184       ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_SECURE_WRITER, *participant_message_secure_reader_);
01185 
01186     disassociate_helper(avail, BUILTIN_PARTICIPANT_STATELESS_MESSAGE_READER, part,
01187       ENTITYID_P2P_BUILTIN_PARTICIPANT_STATELESS_READER, participant_stateless_message_writer_);
01188     disassociate_helper(avail, BUILTIN_PARTICIPANT_STATELESS_MESSAGE_WRITER, part,
01189       ENTITYID_P2P_BUILTIN_PARTICIPANT_STATELESS_WRITER, *participant_stateless_message_reader_);
01190 
01191     disassociate_helper(avail, BUILTIN_PARTICIPANT_VOLATILE_MESSAGE_SECURE_READER, part,
01192       ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_READER, participant_volatile_message_secure_writer_);
01193     disassociate_helper(avail, BUILTIN_PARTICIPANT_VOLATILE_MESSAGE_SECURE_WRITER, part,
01194       ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_WRITER, *participant_volatile_message_secure_reader_);
01195 
01196     disassociate_helper(avail, SPDP_BUILTIN_PARTICIPANT_SECURE_READER, part,
01197       ENTITYID_SPDP_RELIABLE_BUILTIN_PARTICIPANT_SECURE_READER, dcps_participant_secure_writer_);
01198     disassociate_helper(avail, SPDP_BUILTIN_PARTICIPANT_SECURE_WRITER, part,
01199       ENTITYID_SPDP_RELIABLE_BUILTIN_PARTICIPANT_SECURE_WRITER, *dcps_participant_secure_reader_);
01200 #endif
01201 
01202   }
01203 
01204   if (spdp_.has_discovered_participant(part)) {
01205     remove_entities_belonging_to(discovered_publications_, part);
01206     remove_entities_belonging_to(discovered_subscriptions_, part);
01207     return true;
01208   } else {
01209     return false;
01210   }
01211 }
01212 
01213 template<typename Map>
01214 void
01215 Sedp::remove_entities_belonging_to(Map& m, RepoId participant)
01216 {
01217   participant.entityId.entityKey[0] = 0;
01218   participant.entityId.entityKey[1] = 0;
01219   participant.entityId.entityKey[2] = 0;
01220   participant.entityId.entityKind = 0;
01221   for (typename Map::iterator i = m.lower_bound(participant);
01222        i != m.end() && 0 == std::memcmp(i->first.guidPrefix,
01223                                         participant.guidPrefix,
01224                                         sizeof(GuidPrefix_t));) {
01225     OPENDDS_STRING topic_name = get_topic_name(i->second);
01226     OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it =
01227       topics_.find(topic_name);
01228     if (top_it != topics_.end()) {
01229       top_it->second.endpoints_.erase(i->first);
01230       if (DCPS::DCPS_debug_level > 3) {
01231         ACE_DEBUG((LM_DEBUG,
01232                    ACE_TEXT("(%P|%t) Sedp::remove_entities_belonging_to - ")
01233                    ACE_TEXT("calling match_endpoints remove\n")));
01234       }
01235       match_endpoints(i->first, top_it->second, true /*remove*/);
01236       if (spdp_.shutting_down()) { return; }
01237     }
01238     remove_from_bit(i->second);
01239     m.erase(i++);
01240   }
01241 }
01242 
01243 void
01244 Sedp::remove_from_bit_i(const DiscoveredPublication& pub)
01245 {
01246 #ifndef DDS_HAS_MINIMUM_BIT
01247   task_.enqueue(Msg::MSG_REMOVE_FROM_PUB_BIT, pub.bit_ih_);
01248 #else
01249   ACE_UNUSED_ARG(pub);
01250 #endif /* DDS_HAS_MINIMUM_BIT */
01251 }
01252 
01253 void
01254 Sedp::remove_from_bit_i(const DiscoveredSubscription& sub)
01255 {
01256 #ifndef DDS_HAS_MINIMUM_BIT
01257   task_.enqueue(Msg::MSG_REMOVE_FROM_SUB_BIT, sub.bit_ih_);
01258 #else
01259   ACE_UNUSED_ARG(sub);
01260 #endif /* DDS_HAS_MINIMUM_BIT */
01261 }
01262 
01263 void
01264 Sedp::Task::svc_i(Msg::MsgType which_bit, const DDS::InstanceHandle_t bit_ih)
01265 {
01266 #ifndef DDS_HAS_MINIMUM_BIT
01267   switch (which_bit) {
01268   case Msg::MSG_REMOVE_FROM_PUB_BIT: {
01269     DCPS::PublicationBuiltinTopicDataDataReaderImpl* bit = sedp_->pub_bit();
01270     // bit may be null if the DomainParticipant is shutting down
01271     if (bit && bit_ih != DDS::HANDLE_NIL) {
01272       bit->set_instance_state(bit_ih,
01273                               DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE);
01274     }
01275     break;
01276   }
01277   case Msg::MSG_REMOVE_FROM_SUB_BIT: {
01278     DCPS::SubscriptionBuiltinTopicDataDataReaderImpl* bit = sedp_->sub_bit();
01279     // bit may be null if the DomainParticipant is shutting down
01280     if (bit && bit_ih != DDS::HANDLE_NIL) {
01281       bit->set_instance_state(bit_ih,
01282                               DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE);
01283     }
01284     break;
01285   }
01286   default:
01287     break;
01288   }
01289 #else
01290   ACE_UNUSED_ARG(which_bit);
01291   ACE_UNUSED_ARG(bit_ih);
01292 #endif /* DDS_HAS_MINIMUM_BIT */
01293 }
01294 
01295 #ifndef DDS_HAS_MINIMUM_BIT
01296 DCPS::TopicBuiltinTopicDataDataReaderImpl*
01297 Sedp::topic_bit()
01298 {
01299   DDS::Subscriber_var sub = spdp_.bit_subscriber();
01300   if (!sub.in())
01301     return 0;
01302 
01303   DDS::DataReader_var d =
01304     sub->lookup_datareader(DCPS::BUILT_IN_TOPIC_TOPIC);
01305   return dynamic_cast<DCPS::TopicBuiltinTopicDataDataReaderImpl*>(d.in());
01306 }
01307 
01308 DCPS::PublicationBuiltinTopicDataDataReaderImpl*
01309 Sedp::pub_bit()
01310 {
01311   DDS::Subscriber_var sub = spdp_.bit_subscriber();
01312   if (!sub.in())
01313     return 0;
01314 
01315   DDS::DataReader_var d =
01316     sub->lookup_datareader(DCPS::BUILT_IN_PUBLICATION_TOPIC);
01317   return dynamic_cast<DCPS::PublicationBuiltinTopicDataDataReaderImpl*>(d.in());
01318 }
01319 
01320 DCPS::SubscriptionBuiltinTopicDataDataReaderImpl*
01321 Sedp::sub_bit()
01322 {
01323   DDS::Subscriber_var sub = spdp_.bit_subscriber();
01324   if (!sub.in())
01325     return 0;
01326 
01327   DDS::DataReader_var d =
01328     sub->lookup_datareader(DCPS::BUILT_IN_SUBSCRIPTION_TOPIC);
01329   return dynamic_cast<DCPS::SubscriptionBuiltinTopicDataDataReaderImpl*>(d.in());
01330 }
01331 #endif /* DDS_HAS_MINIMUM_BIT */
01332 
01333 bool
01334 Sedp::update_topic_qos(const RepoId& topicId, const DDS::TopicQos& qos,
01335                        OPENDDS_STRING& name)
01336 {
01337   ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, false);
01338   OPENDDS_MAP_CMP(RepoId, OPENDDS_STRING, DCPS::GUID_tKeyLessThan)::iterator iter =
01339     topic_names_.find(topicId);
01340   if (iter == topic_names_.end()) {
01341     return false;
01342   }
01343   name = iter->second;
01344   TopicDetails& topic = topics_[name];
01345   using namespace DCPS;
01346   // If the TOPIC_DATA QoS changed our local endpoints must be resent
01347   // with new QoS
01348   if (qos.topic_data != topic.qos_.topic_data) {
01349     topic.qos_ = qos;
01350     // For each endpoint associated on this topic
01351     for (RepoIdSet::iterator topic_endpoints = topic.endpoints_.begin();
01352          topic_endpoints != topic.endpoints_.end(); ++topic_endpoints) {
01353 
01354       const RepoId& rid = *topic_endpoints;
01355       GuidConverter conv(rid);
01356       if (conv.isWriter()) {
01357         // This may be our local publication, verify
01358         LocalPublicationIter lp = local_publications_.find(rid);
01359         if (lp != local_publications_.end()) {
01360           write_publication_data(rid, lp->second);
01361         }
01362       } else if (conv.isReader()) {
01363         // This may be our local subscription, verify
01364         LocalSubscriptionIter ls = local_subscriptions_.find(rid);
01365         if (ls != local_subscriptions_.end()) {
01366           write_subscription_data(rid, ls->second);
01367         }
01368       }
01369     }
01370   }
01371 
01372   return true;
01373 }
01374 
01375 void
01376 Sedp::inconsistent_topic(const DCPS::RepoIdSet& eps) const
01377 {
01378   using DCPS::RepoIdSet;
01379   for (RepoIdSet::const_iterator iter(eps.begin()); iter != eps.end(); ++iter) {
01380     if (0 == std::memcmp(participant_id_.guidPrefix, iter->guidPrefix,
01381                          sizeof(GuidPrefix_t))) {
01382       const bool reader = iter->entityId.entityKind & 4;
01383       if (reader) {
01384         const LocalSubscriptionCIter lsi = local_subscriptions_.find(*iter);
01385         if (lsi != local_subscriptions_.end()) {
01386           lsi->second.subscription_->inconsistent_topic();
01387           // Only make one callback per inconsistent topic, even if we have
01388           // more than one reader/writer on the topic -- it's the Topic object
01389           // that will actually see the InconsistentTopicStatus change.
01390           return;
01391         }
01392       } else {
01393         const LocalPublicationCIter lpi = local_publications_.find(*iter);
01394         if (lpi != local_publications_.end()) {
01395           lpi->second.publication_->inconsistent_topic();
01396           return; // see comment above
01397         }
01398       }
01399     }
01400   }
01401 }
01402 
01403 DDS::ReturnCode_t
01404 Sedp::remove_publication_i(const RepoId& publicationId)
01405 {
01406   return publications_writer_.write_unregister_dispose(publicationId);
01407 }
01408 
01409 bool
01410 Sedp::update_publication_qos(const RepoId& publicationId,
01411                              const DDS::DataWriterQos& qos,
01412                              const DDS::PublisherQos& publisherQos)
01413 {
01414   ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, false);
01415   LocalPublicationIter iter = local_publications_.find(publicationId);
01416   if (iter != local_publications_.end()) {
01417     LocalPublication& pb = iter->second;
01418     pb.qos_ = qos;
01419     pb.publisher_qos_ = publisherQos;
01420 
01421     if (DDS::RETCODE_OK != write_publication_data(publicationId, pb)) {
01422       return false;
01423     }
01424     // Match/unmatch with subscriptions
01425     OPENDDS_STRING topic_name = topic_names_[pb.topic_id_];
01426     OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it =
01427           topics_.find(topic_name);
01428     if (top_it != topics_.end()) {
01429       match_endpoints(publicationId, top_it->second);
01430     }
01431     return true;
01432   }
01433   return false;
01434 }
01435 
01436 DDS::ReturnCode_t
01437 Sedp::remove_subscription_i(const RepoId& subscriptionId)
01438 {
01439   return subscriptions_writer_.write_unregister_dispose(subscriptionId);
01440 }
01441 
01442 bool
01443 Sedp::update_subscription_qos(const RepoId& subscriptionId,
01444                               const DDS::DataReaderQos& qos,
01445                               const DDS::SubscriberQos& subscriberQos)
01446 {
01447   ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, false);
01448   LocalSubscriptionIter iter = local_subscriptions_.find(subscriptionId);
01449   if (iter != local_subscriptions_.end()) {
01450     LocalSubscription& sb = iter->second;
01451     sb.qos_ = qos;
01452     sb.subscriber_qos_ = subscriberQos;
01453 
01454     if (DDS::RETCODE_OK != write_subscription_data(subscriptionId, sb)) {
01455       return false;
01456     }
01457     // Match/unmatch with subscriptions
01458     OPENDDS_STRING topic_name = topic_names_[sb.topic_id_];
01459     OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it =
01460           topics_.find(topic_name);
01461     if (top_it != topics_.end()) {
01462       match_endpoints(subscriptionId, top_it->second);
01463     }
01464     return true;
01465   }
01466   return false;
01467 }
01468 
01469 bool
01470 Sedp::update_subscription_params(const RepoId& subId,
01471                                  const DDS::StringSeq& params)
01472 {
01473   ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, false);
01474   const LocalSubscriptionIter iter = local_subscriptions_.find(subId);
01475   if (iter != local_subscriptions_.end()) {
01476     LocalSubscription& sb = iter->second;
01477     sb.filterProperties.expressionParameters = params;
01478 
01479     if (DDS::RETCODE_OK != write_subscription_data(subId, sb)) {
01480       return false;
01481     }
01482 
01483     // Let any associated local publications know about the change
01484     for (DCPS::RepoIdSet::iterator i = iter->second.matched_endpoints_.begin();
01485          i != iter->second.matched_endpoints_.end(); ++i) {
01486       const LocalPublicationIter lpi = local_publications_.find(*i);
01487       if (lpi != local_publications_.end()) {
01488         lpi->second.publication_->update_subscription_params(subId, params);
01489       }
01490     }
01491 
01492     return true;
01493   }
01494   return false;
01495 }
01496 
01497 void
01498 Sedp::shutdown()
01499 {
01500   task_.shutdown();
01501   publications_reader_->shutting_down_ = true;
01502   subscriptions_reader_->shutting_down_ = true;
01503   participant_message_reader_->shutting_down_ = true;
01504 }
01505 
01506 void
01507 Sedp::Task::acknowledge()
01508 {
01509   // id is really a don't care, but just set to REQUEST_ACK
01510   putq(new Msg(Msg::MSG_FINI_BIT, DCPS::REQUEST_ACK, 0));
01511 }
01512 
01513 void
01514 Sedp::Task::shutdown()
01515 {
01516   if (!shutting_down_) {
01517     shutting_down_ = true;
01518     putq(new Msg(Msg::MSG_STOP, DCPS::GRACEFUL_DISCONNECT, 0));
01519     wait();
01520   }
01521 }
01522 
01523 void
01524 Sedp::Task::svc_i(DCPS::MessageId message_id,
01525                   const DCPS::DiscoveredWriterData* pwdata)
01526 {
01527   DCPS::unique_ptr<const DCPS::DiscoveredWriterData> delete_the_data(pwdata);
01528   sedp_->data_received(message_id, *pwdata);
01529 }
01530 
01531 void Sedp::process_discovered_writer_data(DCPS::MessageId message_id,
01532                                           const DCPS::DiscoveredWriterData& wdata,
01533                                           const RepoId& guid,
01534                                           const DDS::Security::EndpointSecurityInfo* security_info)
01535 {
01536 
01537 #if ! defined(OPENDDS_SECURITY)
01538   ACE_UNUSED_ARG(security_info);
01539 #endif
01540 
01541   OPENDDS_STRING topic_name;
01542 
01543   // Find the publication - iterator valid only as long as we hold the lock
01544   DiscoveredPublicationIter iter = discovered_publications_.find(guid);
01545 
01546   if (message_id == DCPS::SAMPLE_DATA) {
01547     DCPS::DiscoveredWriterData wdata_copy;
01548 
01549     if (iter == discovered_publications_.end()) { // add new
01550       // Must unlock when calling into pub_bit() as it may call back into us
01551       ACE_Reverse_Lock<ACE_Thread_Mutex> rev_lock(lock_);
01552 
01553       { // Reduce scope of pub and td
01554         DiscoveredPublication prepub(wdata);
01555 
01556         topic_name = get_topic_name(prepub);
01557 
01558 #if defined(OPENDDS_SECURITY)
01559         if (is_security_enabled()) {
01560 
01561           DDS::Security::SecurityException ex = {"", 0, 0};
01562 
01563           RepoId part = guid;
01564           part.entityId = ENTITYID_PARTICIPANT;
01565 
01566           DDS::TopicBuiltinTopicData data;
01567           data.key = wdata.ddsPublicationData.key;
01568           data.name = wdata.ddsPublicationData.topic_name;
01569           data.type_name = wdata.ddsPublicationData.type_name;
01570           data.durability = wdata.ddsPublicationData.durability;
01571           data.durability_service = wdata.ddsPublicationData.durability_service;
01572           data.deadline = wdata.ddsPublicationData.deadline;
01573           data.latency_budget = wdata.ddsPublicationData.latency_budget;
01574           data.liveliness = wdata.ddsPublicationData.liveliness;
01575           data.reliability = wdata.ddsPublicationData.reliability;
01576           data.lifespan = wdata.ddsPublicationData.lifespan;
01577           data.destination_order = wdata.ddsPublicationData.destination_order;
01578           data.ownership = wdata.ddsPublicationData.ownership;
01579           data.topic_data = wdata.ddsPublicationData.topic_data;
01580 
01581           DCPS::AuthState auth_state = spdp_.lookup_participant_auth_state(part);
01582           if (auth_state == DCPS::AS_AUTHENTICATED) {
01583 
01584             DDS::Security::PermissionsHandle remote_permissions = spdp_.lookup_participant_permissions(part);
01585 
01586             if (participant_sec_attr_.is_access_protected &&
01587                 !get_access_control()->check_remote_topic(remote_permissions, spdp_.get_domain_id(), data, ex))
01588             {
01589               ACE_ERROR((LM_WARNING,
01590                 ACE_TEXT("(%P|%t) WARNING: ")
01591                 ACE_TEXT("Sedp::data_received(dwd) - ")
01592                 ACE_TEXT("Unable to check remote topic '%C'. SecurityException[%d.%d]: %C\n"),
01593                 topic_name.data(), ex.code, ex.minor_code, ex.message.in()));
01594               return;
01595             }
01596 
01597             DDS::Security::TopicSecurityAttributes topic_sec_attr;
01598             if (!get_access_control()->get_topic_sec_attributes(remote_permissions, topic_name.data(), topic_sec_attr, ex))
01599             {
01600               ACE_ERROR((LM_WARNING,
01601                 ACE_TEXT("(%P|%t) WARNING: ")
01602                 ACE_TEXT("Sedp::data_received(dwd) - ")
01603                 ACE_TEXT("Unable to get security attributes for remote topic '%C'. SecurityException[%d.%d]: %C\n"),
01604                 topic_name.data(), ex.code, ex.minor_code, ex.message.in()));
01605               return;
01606             }
01607 
01608             DDS::Security::PublicationBuiltinTopicDataSecure pub_data_sec;
01609             pub_data_sec.base.base = wdata.ddsPublicationData;
01610 
01611             if (security_info != NULL) {
01612               pub_data_sec.base.security_info.endpoint_security_attributes = security_info->endpoint_security_attributes;
01613               pub_data_sec.base.security_info.plugin_endpoint_security_attributes = security_info->plugin_endpoint_security_attributes;
01614             }
01615 
01616             if (topic_sec_attr.is_write_protected &&
01617               !get_access_control()->check_remote_datawriter(remote_permissions, spdp_.get_domain_id(), pub_data_sec, ex))
01618             {
01619               ACE_ERROR((LM_WARNING,
01620                 ACE_TEXT("(%P|%t) WARNING: ")
01621                 ACE_TEXT("Sedp::data_received(dwd) - ")
01622                 ACE_TEXT("Unable to check remote datawriter '%C'. SecurityException[%d.%d]: %C\n"),
01623                 topic_name.data(), ex.code, ex.minor_code, ex.message.in()));
01624               return;
01625             }
01626           } else if (auth_state != DCPS::AS_UNAUTHENTICATED) {
01627             ACE_ERROR((LM_WARNING,
01628               ACE_TEXT("(%P|%t) WARNING: ")
01629               ACE_TEXT("Sedp::data_received(dwd) - ")
01630               ACE_TEXT("Unsupported remote participant authentication state for discovered datawriter '%C'. SecurityException[%d.%d]: %C\n"),
01631               topic_name.data(), ex.code, ex.minor_code, ex.message.in()));
01632             return;
01633           }
01634         }
01635 #endif
01636 
01637         DiscoveredPublication& pub = discovered_publications_[guid] = prepub;
01638 
01639         OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it =
01640           topics_.find(topic_name);
01641         if (top_it == topics_.end()) {
01642           top_it =
01643             topics_.insert(std::make_pair(topic_name, TopicDetails())).first;
01644           top_it->second.data_type_ = wdata.ddsPublicationData.type_name;
01645           top_it->second.qos_.topic_data = wdata.ddsPublicationData.topic_data;
01646           top_it->second.repo_id_ = make_topic_guid();
01647 
01648         } else if (top_it->second.data_type_ !=
01649                    wdata.ddsPublicationData.type_name.in()) {
01650           inconsistent_topic(top_it->second.endpoints_);
01651           if (DCPS::DCPS_debug_level) {
01652             ACE_DEBUG((LM_WARNING,
01653               ACE_TEXT("(%P|%t) Sedp::data_received(dwd) - WARNING ")
01654               ACE_TEXT("topic %C discovered data type %C doesn't ")
01655               ACE_TEXT("match known data type %C, ignoring ")
01656               ACE_TEXT("discovered publication.\n"),
01657               topic_name.c_str(),
01658               wdata.ddsPublicationData.type_name.in(),
01659               top_it->second.data_type_.c_str()));
01660           }
01661           return;
01662         }
01663 
01664         TopicDetails& td = top_it->second;
01665         topic_names_[td.repo_id_] = topic_name;
01666         td.endpoints_.insert(guid);
01667 
01668         std::memcpy(pub.writer_data_.ddsPublicationData.participant_key.value,
01669                     guid.guidPrefix, sizeof(DDS::BuiltinTopicKey_t));
01670         assign_bit_key(pub);
01671         wdata_copy = pub.writer_data_;
01672       }
01673 
01674       // Iter no longer valid once lock released
01675       iter = discovered_publications_.end();
01676 
01677       DDS::InstanceHandle_t instance_handle = DDS::HANDLE_NIL;
01678 #ifndef DDS_HAS_MINIMUM_BIT
01679       {
01680         // Release lock for call into pub_bit
01681         ACE_GUARD(ACE_Reverse_Lock< ACE_Thread_Mutex>, rg, rev_lock);
01682         DCPS::PublicationBuiltinTopicDataDataReaderImpl* bit = pub_bit();
01683         if (bit) { // bit may be null if the DomainParticipant is shutting down
01684           instance_handle =
01685             bit->store_synthetic_data(wdata_copy.ddsPublicationData,
01686                                       DDS::NEW_VIEW_STATE);
01687         }
01688       }
01689 #endif /* DDS_HAS_MINIMUM_BIT */
01690 
01691       if (spdp_.shutting_down()) { return; }
01692       // Publication may have been removed while lock released
01693       iter = discovered_publications_.find(guid);
01694       if (iter != discovered_publications_.end()) {
01695         iter->second.bit_ih_ = instance_handle;
01696         OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it =
01697             topics_.find(topic_name);
01698         if (top_it != topics_.end()) {
01699           if (DCPS::DCPS_debug_level > 3) {
01700             ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) Sedp::data_received(dwd) - ")
01701                                  ACE_TEXT("calling match_endpoints new\n")));
01702           }
01703           match_endpoints(guid, top_it->second);
01704         }
01705       }
01706 
01707     } else if (qosChanged(iter->second.writer_data_.ddsPublicationData,
01708                           wdata.ddsPublicationData)) { // update existing
01709 #ifndef DDS_HAS_MINIMUM_BIT
01710       DCPS::PublicationBuiltinTopicDataDataReaderImpl* bit = pub_bit();
01711       if (bit) { // bit may be null if the DomainParticipant is shutting down
01712         bit->store_synthetic_data(iter->second.writer_data_.ddsPublicationData,
01713                                   DDS::NOT_NEW_VIEW_STATE);
01714       }
01715 #endif /* DDS_HAS_MINIMUM_BIT */
01716 
01717       // Match/unmatch local subscription(s)
01718       topic_name = get_topic_name(iter->second);
01719       OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it =
01720           topics_.find(topic_name);
01721       if (top_it != topics_.end()) {
01722         if (DCPS::DCPS_debug_level > 3) {
01723           ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) Sedp::data_received(dwd) - ")
01724                                ACE_TEXT("calling match_endpoints update\n")));
01725         }
01726         match_endpoints(guid, top_it->second);
01727       }
01728     }
01729 
01730   } else if (message_id == DCPS::UNREGISTER_INSTANCE ||
01731              message_id == DCPS::DISPOSE_INSTANCE ||
01732              message_id == DCPS::DISPOSE_UNREGISTER_INSTANCE) {
01733     if (iter != discovered_publications_.end()) {
01734       // Unmatch local subscription(s)
01735       topic_name = get_topic_name(iter->second);
01736       OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it =
01737           topics_.find(topic_name);
01738       if (top_it != topics_.end()) {
01739         top_it->second.endpoints_.erase(guid);
01740         match_endpoints(guid, top_it->second, true /*remove*/);
01741         if (spdp_.shutting_down()) { return; }
01742       }
01743       remove_from_bit(iter->second);
01744       if (DCPS::DCPS_debug_level > 3) {
01745         ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) Sedp::data_received(dwd) - ")
01746                              ACE_TEXT("calling match_endpoints disp/unreg\n")));
01747       }
01748       discovered_publications_.erase(iter);
01749     }
01750   }
01751 }
01752 
01753 void
01754 Sedp::data_received(DCPS::MessageId message_id,
01755                     const DCPS::DiscoveredWriterData& wdata)
01756 {
01757   if (spdp_.shutting_down()) { return; }
01758 
01759   const RepoId& guid = wdata.writerProxy.remoteWriterGuid;
01760   RepoId guid_participant = guid;
01761   guid_participant.entityId = ENTITYID_PARTICIPANT;
01762 
01763   ACE_GUARD(ACE_Thread_Mutex, g, lock_);
01764 
01765   if (ignoring(guid)
01766       || ignoring(guid_participant)
01767       || ignoring(wdata.ddsPublicationData.topic_name)) {
01768     return;
01769   }
01770 
01771 #if defined(OPENDDS_SECURITY)
01772   if (should_drop_message(wdata.ddsPublicationData.topic_name)) {
01773     return;
01774   }
01775 #endif
01776 
01777   if (!spdp_.has_discovered_participant(guid_participant)) {
01778     deferred_publications_[guid] = std::make_pair(message_id, wdata);
01779     return;
01780   }
01781 
01782   process_discovered_writer_data(message_id, wdata, guid);
01783 }
01784 
01785 #if defined(OPENDDS_SECURITY)
01786 void
01787 Sedp::Task::svc_i(DCPS::MessageId message_id,
01788                   const DiscoveredWriterData_SecurityWrapper* data)
01789 {
01790   DCPS::unique_ptr<const DiscoveredWriterData_SecurityWrapper> delete_the_data(data);
01791   sedp_->data_received(message_id, *data);
01792 }
01793 
01794 void Sedp::data_received(DCPS::MessageId message_id,
01795                          const DiscoveredWriterData_SecurityWrapper& wrapper)
01796 {
01797   if (spdp_.shutting_down()) { return; }
01798 
01799   const RepoId& guid = wrapper.data.writerProxy.remoteWriterGuid;
01800   RepoId guid_participant = guid;
01801   guid_participant.entityId = ENTITYID_PARTICIPANT;
01802 
01803   ACE_GUARD(ACE_Thread_Mutex, g, lock_);
01804 
01805   if (ignoring(guid)
01806       || ignoring(guid_participant)
01807       || ignoring(wrapper.data.ddsPublicationData.topic_name)) {
01808     return;
01809   }
01810 
01811   process_discovered_writer_data(message_id, wrapper.data, guid, &wrapper.security_info);
01812 }
01813 #endif
01814 
01815 void Sedp::process_discovered_reader_data(DCPS::MessageId message_id,
01816                                     const DCPS::DiscoveredReaderData& rdata,
01817                                     const RepoId& guid,
01818                                     const DDS::Security::EndpointSecurityInfo* security_info)
01819 {
01820 
01821 #if ! defined(OPENDDS_SECURITY)
01822   ACE_UNUSED_ARG(security_info);
01823 #endif
01824 
01825   OPENDDS_STRING topic_name;
01826 
01827   // Find the subscripion - iterator valid only as long as we hold the lock
01828   DiscoveredSubscriptionIter iter = discovered_subscriptions_.find(guid);
01829 
01830   // Must unlock when calling into sub_bit() as it may call back into us
01831   ACE_Reverse_Lock<ACE_Thread_Mutex> rev_lock(lock_);
01832 
01833   if (message_id == DCPS::SAMPLE_DATA) {
01834     DCPS::DiscoveredReaderData rdata_copy;
01835 
01836     if (iter == discovered_subscriptions_.end()) { // add new
01837       { // Reduce scope of sub and td
01838         DiscoveredSubscription presub(rdata);
01839 
01840         topic_name = get_topic_name(presub);
01841 
01842 #if defined(OPENDDS_SECURITY)
01843         if (is_security_enabled()) {
01844 
01845           DDS::Security::SecurityException ex = {"", 0, 0};
01846 
01847           RepoId part = guid;
01848           part.entityId = ENTITYID_PARTICIPANT;
01849 
01850           DDS::TopicBuiltinTopicData data;
01851           data.key = rdata.ddsSubscriptionData.key;
01852           data.name = rdata.ddsSubscriptionData.topic_name;
01853           data.type_name = rdata.ddsSubscriptionData.type_name;
01854           data.durability = rdata.ddsSubscriptionData.durability;
01855           data.deadline = rdata.ddsSubscriptionData.deadline;
01856           data.latency_budget = rdata.ddsSubscriptionData.latency_budget;
01857           data.liveliness = rdata.ddsSubscriptionData.liveliness;
01858           data.reliability = rdata.ddsSubscriptionData.reliability;
01859           data.destination_order = rdata.ddsSubscriptionData.destination_order;
01860           data.ownership = rdata.ddsSubscriptionData.ownership;
01861           data.topic_data = rdata.ddsSubscriptionData.topic_data;
01862 
01863           DCPS::AuthState auth_state = spdp_.lookup_participant_auth_state(part);
01864           if (auth_state == DCPS::AS_AUTHENTICATED) {
01865 
01866             DDS::Security::PermissionsHandle remote_permissions = spdp_.lookup_participant_permissions(part);
01867 
01868             if (participant_sec_attr_.is_access_protected &&
01869                 !get_access_control()->check_remote_topic(remote_permissions, spdp_.get_domain_id(), data, ex))
01870             {
01871               ACE_ERROR((LM_WARNING,
01872                 ACE_TEXT("(%P|%t) WARNING: ")
01873                 ACE_TEXT("Sedp::data_received(drd) - ")
01874                 ACE_TEXT("Unable to check remote topic '%C'. SecurityException[%d.%d]: %C\n"),
01875                 topic_name.data(), ex.code, ex.minor_code, ex.message.in()));
01876               return;
01877             }
01878 
01879             DDS::Security::TopicSecurityAttributes topic_sec_attr;
01880             if (!get_access_control()->get_topic_sec_attributes(remote_permissions, topic_name.data(), topic_sec_attr, ex))
01881             {
01882               ACE_ERROR((LM_WARNING,
01883                 ACE_TEXT("(%P|%t) WARNING: ")
01884                 ACE_TEXT("Sedp::data_received(drd) - ")
01885                 ACE_TEXT("Unable to get security attributes for remote topic '%C'. SecurityException[%d.%d]: %C\n"),
01886                 topic_name.data(), ex.code, ex.minor_code, ex.message.in()));
01887               return;
01888             }
01889 
01890             DDS::Security::SubscriptionBuiltinTopicDataSecure sub_data_sec;
01891             sub_data_sec.base.base = rdata.ddsSubscriptionData;
01892 
01893             if (security_info != NULL) {
01894               sub_data_sec.base.security_info.endpoint_security_attributes = security_info->endpoint_security_attributes;
01895               sub_data_sec.base.security_info.plugin_endpoint_security_attributes = security_info->plugin_endpoint_security_attributes;
01896             }
01897 
01898             bool relay_only = false;
01899             if (topic_sec_attr.is_read_protected &&
01900                 !get_access_control()->check_remote_datareader(remote_permissions, spdp_.get_domain_id(), sub_data_sec, relay_only, ex))
01901             {
01902               ACE_ERROR((LM_WARNING,
01903                 ACE_TEXT("(%P|%t) WARNING: ")
01904                 ACE_TEXT("Sedp::data_received(drd) - ")
01905                 ACE_TEXT("Unable to check remote datareader '%C'. SecurityException[%d.%d]: %C\n"),
01906                 topic_name.data(), ex.code, ex.minor_code, ex.message.in()));
01907               return;
01908             }
01909 
01910             if (relay_only) {
01911               relay_only_readers_.insert(guid);
01912             } else {
01913               relay_only_readers_.erase(guid);
01914             }
01915           } else if (auth_state != DCPS::AS_UNAUTHENTICATED) {
01916             ACE_ERROR((LM_WARNING,
01917               ACE_TEXT("(%P|%t) WARNING: ")
01918               ACE_TEXT("Sedp::data_received(dwd) - ")
01919               ACE_TEXT("Unsupported remote participant authentication state for discovered datawriter '%C'. SecurityException[%d.%d]: %C\n"),
01920               topic_name.data(), ex.code, ex.minor_code, ex.message.in()));
01921             return;
01922           }
01923         }
01924 #endif
01925 
01926         DiscoveredSubscription& sub = discovered_subscriptions_[guid] = presub;
01927 
01928         OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it =
01929           topics_.find(topic_name);
01930         if (top_it == topics_.end()) {
01931           top_it =
01932             topics_.insert(std::make_pair(topic_name, TopicDetails())).first;
01933           top_it->second.data_type_ = rdata.ddsSubscriptionData.type_name;
01934           top_it->second.qos_.topic_data = rdata.ddsSubscriptionData.topic_data;
01935           top_it->second.repo_id_ = make_topic_guid();
01936 
01937         } else if (top_it->second.data_type_ !=
01938                    rdata.ddsSubscriptionData.type_name.in()) {
01939           inconsistent_topic(top_it->second.endpoints_);
01940           if (DCPS::DCPS_debug_level) {
01941             ACE_DEBUG((LM_WARNING,
01942                        ACE_TEXT("(%P|%t) Sedp::data_received(drd) - WARNING ")
01943                        ACE_TEXT("topic %C discovered data type %C doesn't ")
01944                        ACE_TEXT("match known data type %C, ignoring ")
01945                        ACE_TEXT("discovered subcription.\n"),
01946                        topic_name.c_str(),
01947                        rdata.ddsSubscriptionData.type_name.in(),
01948                        top_it->second.data_type_.c_str()));
01949           }
01950           return;
01951         }
01952 
01953         TopicDetails& td = top_it->second;
01954         topic_names_[td.repo_id_] = topic_name;
01955         td.endpoints_.insert(guid);
01956 
01957         std::memcpy(sub.reader_data_.ddsSubscriptionData.participant_key.value,
01958                     guid.guidPrefix, sizeof(DDS::BuiltinTopicKey_t));
01959         assign_bit_key(sub);
01960         rdata_copy = sub.reader_data_;
01961       }
01962 
01963       // Iter no longer valid once lock released
01964       iter = discovered_subscriptions_.end();
01965 
01966       DDS::InstanceHandle_t instance_handle = DDS::HANDLE_NIL;
01967 #ifndef DDS_HAS_MINIMUM_BIT
01968       {
01969         // Release lock for call into sub_bit
01970         ACE_GUARD(ACE_Reverse_Lock< ACE_Thread_Mutex>, rg, rev_lock);
01971         DCPS::SubscriptionBuiltinTopicDataDataReaderImpl* bit = sub_bit();
01972         if (bit) { // bit may be null if the DomainParticipant is shutting down
01973           instance_handle =
01974             bit->store_synthetic_data(rdata_copy.ddsSubscriptionData,
01975                                       DDS::NEW_VIEW_STATE);
01976         }
01977       }
01978 #endif /* DDS_HAS_MINIMUM_BIT */
01979 
01980       if (spdp_.shutting_down()) { return; }
01981       // Subscription may have been removed while lock released
01982       iter = discovered_subscriptions_.find(guid);
01983       if (iter != discovered_subscriptions_.end()) {
01984         iter->second.bit_ih_ = instance_handle;
01985         OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it =
01986             topics_.find(topic_name);
01987         if (top_it != topics_.end()) {
01988           if (DCPS::DCPS_debug_level > 3) {
01989             ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) Sedp::data_received(drd) - ")
01990                                  ACE_TEXT("calling match_endpoints new\n")));
01991           }
01992           match_endpoints(guid, top_it->second);
01993         }
01994       }
01995 
01996     } else { // update existing
01997       if (qosChanged(iter->second.reader_data_.ddsSubscriptionData,
01998                      rdata.ddsSubscriptionData)) {
01999 #ifndef DDS_HAS_MINIMUM_BIT
02000         DCPS::SubscriptionBuiltinTopicDataDataReaderImpl* bit = sub_bit();
02001         if (bit) { // bit may be null if the DomainParticipant is shutting down
02002           bit->store_synthetic_data(
02003                 iter->second.reader_data_.ddsSubscriptionData,
02004                 DDS::NOT_NEW_VIEW_STATE);
02005         }
02006 #endif /* DDS_HAS_MINIMUM_BIT */
02007 
02008         // Match/unmatch local publication(s)
02009         topic_name = get_topic_name(iter->second);
02010         OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it =
02011             topics_.find(topic_name);
02012         if (top_it != topics_.end()) {
02013           if (DCPS::DCPS_debug_level > 3) {
02014             ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) Sedp::data_received(drd) - ")
02015                                  ACE_TEXT("calling match_endpoints update\n")));
02016           }
02017           match_endpoints(guid, top_it->second);
02018         }
02019       }
02020 
02021       if (paramsChanged(iter->second.reader_data_.contentFilterProperty,
02022                         rdata.contentFilterProperty)) {
02023         // Let any associated local publications know about the change
02024         topic_name = get_topic_name(iter->second);
02025         OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it =
02026             topics_.find(topic_name);
02027         using DCPS::RepoIdSet;
02028         const RepoIdSet& assoc =
02029           (top_it == topics_.end()) ? RepoIdSet() : top_it->second.endpoints_;
02030         for (RepoIdSet::const_iterator i = assoc.begin(); i != assoc.end(); ++i) {
02031           if (i->entityId.entityKind & 4) continue; // subscription
02032           const LocalPublicationIter lpi = local_publications_.find(*i);
02033           if (lpi != local_publications_.end()) {
02034             lpi->second.publication_->update_subscription_params(guid,
02035               rdata.contentFilterProperty.expressionParameters);
02036           }
02037         }
02038       }
02039     }
02040     // For each associated opendds writer to this reader
02041     CORBA::ULong len = rdata.readerProxy.associatedWriters.length();
02042     for (CORBA::ULong writerIndex = 0; writerIndex < len; ++writerIndex)
02043     {
02044       GUID_t writerGuid = rdata.readerProxy.associatedWriters[writerIndex];
02045 
02046       // If the associated writer is in this participant
02047       LocalPublicationIter lp = local_publications_.find(writerGuid);
02048       if (lp != local_publications_.end()) {
02049         // If the local writer is not fully associated with the reader
02050         if (lp->second.remote_opendds_associations_.insert(guid).second) {
02051           // This is a new association
02052           lp->second.publication_->association_complete(guid);
02053         }
02054       }
02055     }
02056 
02057   } else if (message_id == DCPS::UNREGISTER_INSTANCE ||
02058              message_id == DCPS::DISPOSE_INSTANCE ||
02059              message_id == DCPS::DISPOSE_UNREGISTER_INSTANCE) {
02060     if (iter != discovered_subscriptions_.end()) {
02061       // Unmatch local publication(s)
02062       topic_name = get_topic_name(iter->second);
02063       OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it =
02064           topics_.find(topic_name);
02065       if (top_it != topics_.end()) {
02066         top_it->second.endpoints_.erase(guid);
02067         if (DCPS::DCPS_debug_level > 3) {
02068           ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) Sedp::data_received(drd) - ")
02069                                ACE_TEXT("calling match_endpoints disp/unreg\n")));
02070         }
02071         match_endpoints(guid, top_it->second, true /*remove*/);
02072         if (spdp_.shutting_down()) { return; }
02073       }
02074       remove_from_bit(iter->second);
02075       discovered_subscriptions_.erase(iter);
02076     }
02077   }
02078 }
02079 
02080 void
02081 Sedp::Task::svc_i(DCPS::MessageId message_id,
02082                   const DCPS::DiscoveredReaderData* prdata)
02083 {
02084   DCPS::unique_ptr<const DCPS::DiscoveredReaderData> delete_the_data(prdata);
02085   sedp_->data_received(message_id, *prdata);
02086 }
02087 
02088 void
02089 Sedp::data_received(DCPS::MessageId message_id,
02090                     const DCPS::DiscoveredReaderData& rdata)
02091 {
02092   if (spdp_.shutting_down()) { return; }
02093 
02094   const RepoId& guid = rdata.readerProxy.remoteReaderGuid;
02095   RepoId guid_participant = guid;
02096   guid_participant.entityId = ENTITYID_PARTICIPANT;
02097 
02098   ACE_GUARD(ACE_Thread_Mutex, g, lock_);
02099 
02100   if (ignoring(guid)
02101       || ignoring(guid_participant)
02102       || ignoring(rdata.ddsSubscriptionData.topic_name)) {
02103     return;
02104   }
02105 
02106 #if defined(OPENDDS_SECURITY)
02107   if (should_drop_message(rdata.ddsSubscriptionData.topic_name)) {
02108     return;
02109   }
02110 #endif
02111 
02112   if (!spdp_.has_discovered_participant(guid_participant)) {
02113     deferred_subscriptions_[guid] = std::make_pair(message_id, rdata);
02114     return;
02115   }
02116 
02117   process_discovered_reader_data(message_id, rdata, guid);
02118 }
02119 
02120 #if defined(OPENDDS_SECURITY)
02121 void
02122 Sedp::Task::svc_i(DCPS::MessageId message_id,
02123                   const DiscoveredReaderData_SecurityWrapper* data)
02124 {
02125   DCPS::unique_ptr<const DiscoveredReaderData_SecurityWrapper> delete_the_data(data);
02126   sedp_->data_received(message_id, *data);
02127 }
02128 
02129 void Sedp::data_received(DCPS::MessageId message_id,
02130                          const DiscoveredReaderData_SecurityWrapper& wrapper)
02131 {
02132   if (spdp_.shutting_down()) { return; }
02133 
02134   const RepoId& guid = wrapper.data.readerProxy.remoteReaderGuid;
02135   RepoId guid_participant = guid;
02136   guid_participant.entityId = ENTITYID_PARTICIPANT;
02137 
02138   ACE_GUARD(ACE_Thread_Mutex, g, lock_);
02139 
02140   if (ignoring(guid)
02141       || ignoring(guid_participant)
02142       || ignoring(wrapper.data.ddsSubscriptionData.topic_name)) {
02143     return;
02144   }
02145 
02146   process_discovered_reader_data(message_id, wrapper.data, guid, &wrapper.security_info);
02147 }
02148 #endif
02149 
02150 void
02151 Sedp::Task::svc_i(DCPS::MessageId message_id,
02152                   const ParticipantMessageData* data)
02153 {
02154   DCPS::unique_ptr<const ParticipantMessageData> delete_the_data(data);
02155   sedp_->data_received(message_id, *data);
02156 }
02157 
02158 void
02159 Sedp::data_received(DCPS::MessageId /*message_id*/,
02160                     const ParticipantMessageData& data)
02161 {
02162   if (spdp_.shutting_down()) { return; }
02163 
02164   const RepoId& guid = data.participantGuid;
02165   RepoId guid_participant = guid;
02166   guid_participant.entityId = ENTITYID_PARTICIPANT;
02167   RepoId prefix = data.participantGuid;
02168   prefix.entityId = EntityId_t(); // Clear the entityId so lower bound will work.
02169 
02170   ACE_GUARD(ACE_Thread_Mutex, g, lock_);
02171 
02172   if (ignoring(guid)
02173       || ignoring(guid_participant)) {
02174     return;
02175   }
02176 
02177   if (!spdp_.has_discovered_participant(guid_participant)) {
02178     return;
02179   }
02180 
02181   for (LocalSubscriptionMap::const_iterator sub_pos = local_subscriptions_.begin(),
02182          sub_limit = local_subscriptions_.end();
02183        sub_pos != sub_limit; ++sub_pos) {
02184     const DCPS::RepoIdSet::const_iterator pos =
02185       sub_pos->second.matched_endpoints_.lower_bound(prefix);
02186     if (pos != sub_pos->second.matched_endpoints_.end() &&
02187         DCPS::GuidPrefixEqual()(pos->guidPrefix, prefix.guidPrefix)) {
02188       sub_pos->second.subscription_->signal_liveliness(guid_participant);
02189     }
02190   }
02191 }
02192 
02193 #if defined(OPENDDS_SECURITY)
02194 void
02195 Sedp::Task::svc_participant_message_data_secure(DCPS::MessageId message_id,
02196             const ParticipantMessageData* data)
02197 {
02198   DCPS::unique_ptr<const ParticipantMessageData> delete_the_data(data);
02199   sedp_->received_participant_message_data_secure(message_id, *data);
02200 }
02201 
02202 void
02203 Sedp::received_participant_message_data_secure(DCPS::MessageId /*message_id*/,
02204             const ParticipantMessageData& data)
02205 {
02206   if (spdp_.shutting_down()) {
02207       return;
02208   }
02209 
02210   const RepoId& guid = data.participantGuid;
02211   RepoId guid_participant = guid;
02212   guid_participant.entityId = ENTITYID_PARTICIPANT;
02213   RepoId prefix = data.participantGuid;
02214   prefix.entityId = EntityId_t(); // Clear the entityId so lower bound will work.
02215 
02216   ACE_GUARD(ACE_Thread_Mutex, g, lock_);
02217 
02218   if (ignoring(guid) || ignoring(guid_participant)) {
02219     return;
02220   }
02221 
02222   if (!spdp_.has_discovered_participant(guid_participant)) {
02223     return;
02224   }
02225 
02226   LocalSubscriptionMap::const_iterator i, n;
02227   for (i = local_subscriptions_.begin(), n = local_subscriptions_.end(); i != n; ++i) {
02228     const DCPS::RepoIdSet::const_iterator pos = i->second.matched_endpoints_.lower_bound(prefix);
02229 
02230     if (pos != i->second.matched_endpoints_.end() && DCPS::GuidPrefixEqual()(pos->guidPrefix, prefix.guidPrefix)) {
02231       (i->second.subscription_)->signal_liveliness(guid_participant);
02232     }
02233   }
02234 }
02235 
02236 bool Sedp::should_drop_stateless_message(const DDS::Security::ParticipantGenericMessage& msg)
02237 {
02238   using DCPS::GUID_t;
02239   using DCPS::GUID_UNKNOWN;
02240 
02241   ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, true);
02242 
02243   const GUID_t src_endpoint = msg.source_endpoint_guid;
02244   const GUID_t dst_endpoint = msg.destination_endpoint_guid;
02245   const GUID_t this_endpoint = participant_stateless_message_reader_->get_repo_id();
02246   const GUID_t dst_participant = msg.destination_participant_guid;
02247   const GUID_t this_participant = participant_id_;
02248 
02249   if (ignoring(src_endpoint)) {
02250     return true;
02251   }
02252 
02253   if (dst_participant != GUID_UNKNOWN && dst_participant != this_participant) {
02254     return true;
02255   }
02256 
02257   if (dst_endpoint != GUID_UNKNOWN && dst_endpoint != this_endpoint) {
02258     return true;
02259   }
02260 
02261   return false;
02262 }
02263 
02264 bool Sedp::should_drop_volatile_message(const DDS::Security::ParticipantGenericMessage& msg)
02265 {
02266   using DCPS::GUID_t;
02267   using DCPS::GUID_UNKNOWN;
02268 
02269   ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, true);
02270 
02271   const GUID_t src_endpoint = msg.source_endpoint_guid;
02272   const GUID_t dst_participant = msg.destination_participant_guid;
02273   const GUID_t this_participant = participant_id_;
02274 
02275   if (ignoring(src_endpoint)) {
02276     return true;
02277   }
02278 
02279   if (dst_participant != GUID_UNKNOWN && dst_participant != this_participant) {
02280     return true;
02281   }
02282 
02283   return false;
02284 }
02285 
02286 bool Sedp::should_drop_message(const char* unsecure_topic_name)
02287 {
02288   if (is_security_enabled()) {
02289     DDS::Security::TopicSecurityAttributes attribs;
02290     DDS::Security::SecurityException ex = {"", 0, 0};
02291 
02292     bool ok = get_access_control()->get_topic_sec_attributes(
02293       get_permissions_handle(),
02294       unsecure_topic_name,
02295       attribs,
02296       ex);
02297 
02298     if (!ok || attribs.is_discovery_protected) {
02299       return true;
02300     }
02301   }
02302 
02303   return false;
02304 }
02305 
02306 void
02307 Sedp::Task::svc_stateless_message(DCPS::MessageId id,
02308                   const DDS::Security::ParticipantStatelessMessage* data)
02309 {
02310   DCPS::unique_ptr<const DDS::Security::ParticipantStatelessMessage> delete_the_data(data);
02311   sedp_->received_stateless_message(id, *data);
02312 }
02313 
02314 void
02315 Sedp::received_stateless_message(DCPS::MessageId /*message_id*/,
02316                     const DDS::Security::ParticipantStatelessMessage& msg)
02317 {
02318   if (spdp_.shutting_down()) {
02319     return;
02320   }
02321 
02322   if (should_drop_stateless_message(msg)) {
02323     return;
02324   }
02325 
02326   if (0 == std::strcmp(msg.message_class_id,
02327                        DDS::Security::GMCLASSID_SECURITY_AUTH_REQUEST)) {
02328     spdp_.handle_auth_request(msg);
02329 
02330   } else if (0 == std::strcmp(msg.message_class_id,
02331                               DDS::Security::GMCLASSID_SECURITY_AUTH_HANDSHAKE)) {
02332     spdp_.handle_handshake_message(msg);
02333   }
02334   return;
02335 }
02336 
02337 void
02338 Sedp::Task::svc_volatile_message_secure(DCPS::MessageId id,
02339                   const DDS::Security::ParticipantVolatileMessageSecure* data)
02340 {
02341   DCPS::unique_ptr<const DDS::Security::ParticipantVolatileMessageSecure> delete_the_data(data);
02342   sedp_->received_volatile_message_secure(id, *data);
02343 }
02344 
02345 void
02346 Sedp::received_volatile_message_secure(DCPS::MessageId /* message_id */,
02347                     const DDS::Security::ParticipantVolatileMessageSecure& msg)
02348 {
02349   if (spdp_.shutting_down()) {
02350     return;
02351   }
02352 
02353   if (should_drop_volatile_message(msg)) {
02354     return;
02355   }
02356 
02357   if (0 == std::strcmp(msg.message_class_id,
02358                        DDS::Security::GMCLASSID_SECURITY_PARTICIPANT_CRYPTO_TOKENS)) {
02359     spdp_.handle_participant_crypto_tokens(msg);
02360   } else if (0 == std::strcmp(msg.message_class_id,
02361                               DDS::Security::GMCLASSID_SECURITY_DATAWRITER_CRYPTO_TOKENS)) {
02362     handle_datawriter_crypto_tokens(msg);
02363   } else if (0 == std::strcmp(msg.message_class_id,
02364                               DDS::Security::GMCLASSID_SECURITY_DATAREADER_CRYPTO_TOKENS)) {
02365     handle_datareader_crypto_tokens(msg);
02366   }
02367   return;
02368 }
02369 #endif
02370 
02371 bool
02372 Sedp::is_opendds(const GUID_t& endpoint) const
02373 {
02374   GUID_t participant = endpoint;
02375   participant.entityId = DCPS::ENTITYID_PARTICIPANT;
02376   return spdp_.is_opendds(participant);
02377 }
02378 
02379 void
02380 Sedp::association_complete(const RepoId& localId,
02381                            const RepoId& remoteId)
02382 {
02383   // If the remote endpoint is an opendds endpoint
02384   if (is_opendds(remoteId)) {
02385     LocalSubscriptionIter sub = local_subscriptions_.find(localId);
02386     // If the local endpoint is a reader
02387     if (sub != local_subscriptions_.end()) {
02388       std::pair<DCPS::RepoIdSet::iterator, bool> result =
02389           sub->second.remote_opendds_associations_.insert(remoteId);
02390       // If this is a new association for the local reader
02391       if (result.second) {
02392         // Tell other participants
02393         write_subscription_data(localId, sub->second);
02394       }
02395     }
02396   }
02397 }
02398 
02399 void Sedp::signal_liveliness(DDS::LivelinessQosPolicyKind kind)
02400 {
02401 
02402 #if defined(OPENDDS_SECURITY)
02403   DDS::Security::SecurityException se = {"", 0, 0};
02404   DDS::Security::TopicSecurityAttributes attribs;
02405 
02406   if (is_security_enabled()) {
02407     // TODO: Pending issue DDSSEC12-28 Topic security attributes
02408     // may get changed to a different set of security attributes.
02409     bool ok = get_access_control()->get_topic_sec_attributes(
02410       get_permissions_handle(), "DCPSParticipantMessageSecure", attribs, se);
02411 
02412     if (ok) {
02413 
02414       if (attribs.is_liveliness_protected) {
02415         signal_liveliness_secure(kind);
02416 
02417       } else {
02418         signal_liveliness_unsecure(kind);
02419       }
02420 
02421     } else {
02422       ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: Sedp::signal_liveliness() - ")
02423         ACE_TEXT("Failure calling get_topic_sec_attributes(). Security Exception[%d.%d]: %C\n"),
02424           se.code, se.minor_code, se.message.in()));
02425     }
02426 
02427   } else {
02428 #endif
02429 
02430     signal_liveliness_unsecure(kind);
02431 
02432 #if defined(OPENDDS_SECURITY)
02433   }
02434 #endif
02435 }
02436 
02437 void
02438 Sedp::signal_liveliness_unsecure(DDS::LivelinessQosPolicyKind kind)
02439 {
02440   ParticipantMessageData data;
02441   data.participantGuid = participant_id_;
02442 
02443   switch (kind) {
02444   case DDS::AUTOMATIC_LIVELINESS_QOS:
02445     data.participantGuid.entityId = DCPS::EntityIdConverter(PARTICIPANT_MESSAGE_DATA_KIND_AUTOMATIC_LIVELINESS_UPDATE);
02446     participant_message_writer_.write_participant_message(data, GUID_UNKNOWN, automatic_liveliness_seq_);
02447     break;
02448 
02449   case DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS:
02450     data.participantGuid.entityId = DCPS::EntityIdConverter(PARTICIPANT_MESSAGE_DATA_KIND_MANUAL_LIVELINESS_UPDATE);
02451     participant_message_writer_.write_participant_message(data, GUID_UNKNOWN, manual_liveliness_seq_);
02452     break;
02453 
02454   case DDS::MANUAL_BY_TOPIC_LIVELINESS_QOS:
02455     // Do nothing.
02456     break;
02457   }
02458 }
02459 
02460 #if defined(OPENDDS_SECURITY)
02461 void
02462 Sedp::signal_liveliness_secure(DDS::LivelinessQosPolicyKind kind)
02463 {
02464   ParticipantMessageData data;
02465   data.participantGuid = participant_id_;
02466 
02467   switch (kind) {
02468   case DDS::AUTOMATIC_LIVELINESS_QOS:
02469     data.participantGuid.entityId = DCPS::EntityIdConverter(PARTICIPANT_MESSAGE_DATA_KIND_AUTOMATIC_LIVELINESS_UPDATE);
02470     participant_message_secure_writer_.write_participant_message(data, GUID_UNKNOWN, secure_automatic_liveliness_seq_);
02471     break;
02472 
02473   case DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS:
02474     data.participantGuid.entityId = DCPS::EntityIdConverter(PARTICIPANT_MESSAGE_DATA_KIND_MANUAL_LIVELINESS_UPDATE);
02475     participant_message_secure_writer_.write_participant_message(data, GUID_UNKNOWN, secure_manual_liveliness_seq_);
02476     break;
02477 
02478   case DDS::MANUAL_BY_TOPIC_LIVELINESS_QOS:
02479     // Do nothing.
02480     break;
02481   }
02482 }
02483 #endif
02484 
02485 Sedp::Endpoint::~Endpoint()
02486 {
02487 }
02488 
02489 //---------------------------------------------------------------
02490 Sedp::Writer::Writer(const RepoId& pub_id, Sedp& sedp)
02491   : Endpoint(pub_id, sedp)
02492 {
02493   header_.prefix[0] = 'R';
02494   header_.prefix[1] = 'T';
02495   header_.prefix[2] = 'P';
02496   header_.prefix[3] = 'S';
02497   header_.version = PROTOCOLVERSION;
02498   header_.vendorId = VENDORID_OPENDDS;
02499   header_.guidPrefix[0] = pub_id.guidPrefix[0];
02500   header_.guidPrefix[1] = pub_id.guidPrefix[1],
02501   header_.guidPrefix[2] = pub_id.guidPrefix[2];
02502   header_.guidPrefix[3] = pub_id.guidPrefix[3];
02503   header_.guidPrefix[4] = pub_id.guidPrefix[4];
02504   header_.guidPrefix[5] = pub_id.guidPrefix[5];
02505   header_.guidPrefix[6] = pub_id.guidPrefix[6];
02506   header_.guidPrefix[7] = pub_id.guidPrefix[7];
02507   header_.guidPrefix[8] = pub_id.guidPrefix[8];
02508   header_.guidPrefix[9] = pub_id.guidPrefix[9];
02509   header_.guidPrefix[10] = pub_id.guidPrefix[10];
02510   header_.guidPrefix[11] = pub_id.guidPrefix[11];
02511 
02512   // The reference count is explicited incremented to avoid been explcitly deleted
02513   // via the RcHandle<TransportClient> because the object is always been created
02514   // on the stack.
02515   RcObject::_add_ref();
02516 }
02517 
02518 Sedp::Writer::~Writer()
02519 {
02520 }
02521 
02522 bool
02523 Sedp::Writer::assoc(const DCPS::AssociationData& subscription)
02524 {
02525   return associate(subscription, true);
02526 }
02527 
02528 void
02529 Sedp::Writer::data_delivered(const DCPS::DataSampleElement* dsle)
02530 {
02531   delete dsle;
02532 }
02533 
02534 void
02535 Sedp::Writer::data_dropped(const DCPS::DataSampleElement* dsle, bool)
02536 {
02537   delete dsle;
02538 }
02539 
02540 void
02541 Sedp::Writer::control_delivered(const DCPS::Message_Block_Ptr& /* sample */)
02542 {
02543 }
02544 
02545 void
02546 Sedp::Writer::control_dropped(const DCPS::Message_Block_Ptr& /* sample */, bool)
02547 {
02548 }
02549 
02550 void Sedp::Writer::send_sample(const ACE_Message_Block& data,
02551                                size_t size,
02552                                const RepoId& reader,
02553                                DCPS::SequenceNumber& sequence,
02554                                bool historic)
02555 {
02556   DCPS::DataSampleElement* el = new DCPS::DataSampleElement(repo_id_, this, DCPS::PublicationInstance_rch());
02557   set_header_fields(el->get_header(), size, reader, sequence, historic);
02558 
02559   DCPS::Message_Block_Ptr sample(new ACE_Message_Block(size));
02560   el->set_sample(DCPS::move(sample));
02561   *el->get_sample() << el->get_header();
02562   el->get_sample()->cont(data.duplicate());
02563 
02564   if (reader != GUID_UNKNOWN) {
02565     el->set_sub_id(0, reader);
02566     el->set_num_subs(1);
02567   }
02568 
02569   DCPS::SendStateDataSampleList list;
02570   list.enqueue_tail(el);
02571 
02572   send(list);
02573 }
02574 
02575 DDS::ReturnCode_t
02576 Sedp::Writer::write_parameter_list(const ParameterList& plist,
02577                                    const RepoId& reader,
02578                                    DCPS::SequenceNumber& sequence)
02579 {
02580   DDS::ReturnCode_t result = DDS::RETCODE_OK;
02581 
02582   // Determine message length
02583   size_t size = 0, padding = 0;
02584   DCPS::find_size_ulong(size, padding);
02585   DCPS::gen_find_size(plist, size, padding);
02586 
02587   // Build RTPS message
02588   ACE_Message_Block payload(DCPS::DataSampleHeader::max_marshaled_size(),
02589                             ACE_Message_Block::MB_DATA,
02590                             new ACE_Message_Block(size));
02591   using DCPS::Serializer;
02592   Serializer ser(payload.cont(), host_is_bigendian_, Serializer::ALIGN_CDR);
02593   bool ok = (ser << ACE_OutputCDR::from_octet(0)) &&  // PL_CDR_LE = 0x0003
02594             (ser << ACE_OutputCDR::from_octet(3)) &&
02595             (ser << ACE_OutputCDR::from_octet(0)) &&
02596             (ser << ACE_OutputCDR::from_octet(0)) &&
02597             (ser << plist);
02598 
02599   if (ok) {
02600     send_sample(payload, size, reader, sequence, reader != GUID_UNKNOWN);
02601 
02602   } else {
02603     result = DDS::RETCODE_ERROR;
02604   }
02605 
02606   delete payload.cont();
02607   return result;
02608 }
02609 
02610 DDS::ReturnCode_t
02611 Sedp::Writer::write_participant_message(const ParticipantMessageData& pmd,
02612                                         const RepoId& reader,
02613                                         DCPS::SequenceNumber& sequence)
02614 {
02615   DDS::ReturnCode_t result = DDS::RETCODE_OK;
02616 
02617   // Determine message length
02618   size_t size = 0, padding = 0;
02619   DCPS::find_size_ulong(size, padding);
02620   DCPS::gen_find_size(pmd, size, padding);
02621 
02622   // Build RTPS message
02623   ACE_Message_Block payload(DCPS::DataSampleHeader::max_marshaled_size(),
02624                             ACE_Message_Block::MB_DATA,
02625                             new ACE_Message_Block(size));
02626   using DCPS::Serializer;
02627   Serializer ser(payload.cont(), host_is_bigendian_, Serializer::ALIGN_CDR);
02628   bool ok = (ser << ACE_OutputCDR::from_octet(0)) &&  // CDR_LE = 0x0001
02629             (ser << ACE_OutputCDR::from_octet(1)) &&
02630             (ser << ACE_OutputCDR::from_octet(0)) &&
02631             (ser << ACE_OutputCDR::from_octet(0)) &&
02632             (ser << pmd);
02633 
02634   if (ok) {
02635       send_sample(payload, size, reader, sequence);
02636 
02637   } else {
02638       result = DDS::RETCODE_ERROR;
02639   }
02640 
02641   delete payload.cont();
02642   return result;
02643 }
02644 
02645 #if defined(OPENDDS_SECURITY)
02646 DDS::ReturnCode_t
02647 Sedp::Writer::write_stateless_message(const DDS::Security::ParticipantStatelessMessage& msg,
02648                                       const RepoId& reader,
02649                                       DCPS::SequenceNumber& sequence)
02650 {
02651   using DCPS::Serializer;
02652 
02653   DDS::ReturnCode_t result = DDS::RETCODE_OK;
02654 
02655   size_t size = 0, padding = 0;
02656   DCPS::find_size_ulong(size, padding);
02657   DCPS::gen_find_size(msg, size, padding);
02658 
02659   ACE_Message_Block payload(
02660       DCPS::DataSampleHeader::max_marshaled_size(),
02661       ACE_Message_Block::MB_DATA,
02662       new ACE_Message_Block(size + padding));
02663 
02664   Serializer ser(payload.cont(), host_is_bigendian_, Serializer::ALIGN_CDR);
02665   bool ok = (ser << ACE_OutputCDR::from_octet(0)) &&  // CDR_LE = 0x0001
02666             (ser << ACE_OutputCDR::from_octet(1)) &&
02667             (ser << ACE_OutputCDR::from_octet(0)) &&
02668             (ser << ACE_OutputCDR::from_octet(0));
02669   ser.reset_alignment(); // https://issues.omg.org/browse/DDSIRTP23-63
02670   ok &= (ser << msg);
02671 
02672   if (ok) {
02673     send_sample(payload, size, reader, sequence);
02674 
02675   } else {
02676     result = DDS::RETCODE_ERROR;
02677   }
02678 
02679   delete payload.cont();
02680   return result;
02681 }
02682 
02683 DDS::ReturnCode_t
02684 Sedp::Writer::write_volatile_message_secure(const DDS::Security::ParticipantVolatileMessageSecure& msg,
02685                                             const RepoId& reader,
02686                                             DCPS::SequenceNumber& sequence)
02687 {
02688   using DCPS::Serializer;
02689 
02690   DDS::ReturnCode_t result = DDS::RETCODE_OK;
02691 
02692   size_t size = 0, padding = 0;
02693   DCPS::find_size_ulong(size, padding);
02694   DCPS::gen_find_size(msg, size, padding);
02695 
02696   ACE_Message_Block payload(
02697       DCPS::DataSampleHeader::max_marshaled_size(),
02698       ACE_Message_Block::MB_DATA,
02699       new ACE_Message_Block(size + padding));
02700 
02701   Serializer ser(payload.cont(), host_is_bigendian_, Serializer::ALIGN_CDR);
02702   bool ok = (ser << ACE_OutputCDR::from_octet(0)) &&  // CDR_LE = 0x0001
02703             (ser << ACE_OutputCDR::from_octet(1)) &&
02704             (ser << ACE_OutputCDR::from_octet(0)) &&
02705             (ser << ACE_OutputCDR::from_octet(0));
02706   ser.reset_alignment(); // https://issues.omg.org/browse/DDSIRTP23-63
02707   ok &= (ser << msg);
02708 
02709   if (ok) {
02710     send_sample(payload, size, reader, sequence);
02711 
02712   } else {
02713     result = DDS::RETCODE_ERROR;
02714   }
02715 
02716   delete payload.cont();
02717   return result;
02718 }
02719 
02720 DDS::ReturnCode_t
02721 Sedp::Writer::write_dcps_participant_secure(const Security::SPDPdiscoveredParticipantData& msg,
02722                                             const RepoId& reader,
02723                                             DCPS::SequenceNumber& sequence)
02724 {
02725   using DCPS::Serializer;
02726 
02727   DDS::ReturnCode_t result = DDS::RETCODE_OK;
02728 
02729   ParameterList plist;
02730 
02731   bool err = ParameterListConverter::to_param_list(msg, plist);
02732 
02733   if (err) {
02734     ACE_ERROR((LM_ERROR,
02735                ACE_TEXT("(%P|%t) ERROR: Sedp::write_dcps_participant_secure - ")
02736                ACE_TEXT("Failed to convert SPDPdiscoveredParticipantData ")
02737                ACE_TEXT("to ParameterList\n")));
02738 
02739     result = DDS::RETCODE_ERROR;
02740 
02741   } else {
02742     result = write_parameter_list(plist, reader, sequence);
02743   }
02744 
02745   return result;
02746 }
02747 #endif
02748 
02749 DDS::ReturnCode_t
02750 Sedp::Writer::write_unregister_dispose(const RepoId& rid, CORBA::UShort pid)
02751 {
02752   // Build param list for message
02753   Parameter param;
02754   param.guid(rid);
02755   param._d(pid);
02756   ParameterList plist;
02757   plist.length(1);
02758   plist[0] = param;
02759 
02760   // Determine message length
02761   size_t size = 0, padding = 0;
02762   DCPS::find_size_ulong(size, padding);
02763   DCPS::gen_find_size(plist, size, padding);
02764 
02765   DCPS::Message_Block_Ptr payload(new ACE_Message_Block(DCPS::DataSampleHeader::max_marshaled_size(),
02766                             ACE_Message_Block::MB_DATA,
02767                             new ACE_Message_Block(size)));
02768 
02769   if (!payload) {
02770     ACE_ERROR((LM_ERROR,
02771                ACE_TEXT("(%P|%t) ERROR: Sedp::Writer::write_unregister_dispose")
02772                ACE_TEXT(" - Failed to allocate message block message\n")));
02773     return DDS::RETCODE_ERROR;
02774   }
02775 
02776   using DCPS::Serializer;
02777   Serializer ser(payload->cont(), host_is_bigendian_, Serializer::ALIGN_CDR);
02778   bool ok = (ser << ACE_OutputCDR::from_octet(0)) &&  // PL_CDR_LE = 0x0003
02779             (ser << ACE_OutputCDR::from_octet(3)) &&
02780             (ser << ACE_OutputCDR::from_octet(0)) &&
02781             (ser << ACE_OutputCDR::from_octet(0)) &&
02782             (ser << plist);
02783 
02784   if (ok) {
02785     // Send
02786     write_control_msg(move(payload), size, DCPS::DISPOSE_UNREGISTER_INSTANCE);
02787     return DDS::RETCODE_OK;
02788   } else {
02789     // Error
02790     ACE_ERROR((LM_ERROR,
02791                ACE_TEXT("(%P|%t) ERROR: Sedp::Writer::write_unregister_dispose")
02792                ACE_TEXT(" - Failed to serialize RTPS control message\n")));
02793     return DDS::RETCODE_ERROR;
02794   }
02795 }
02796 
02797 void
02798 Sedp::Writer::end_historic_samples(const RepoId& reader)
02799 {
02800   const void* pReader = static_cast<const void*>(&reader);
02801   DCPS::Message_Block_Ptr mb(new ACE_Message_Block (DCPS::DataSampleHeader::max_marshaled_size(),
02802                                                  ACE_Message_Block::MB_DATA,
02803                                                  new ACE_Message_Block(static_cast<const char*>(pReader),
02804                                                   sizeof(reader))));
02805   if (mb.get()) {
02806     mb->cont()->wr_ptr(sizeof(reader));
02807     // 'mb' would contain the DSHeader, but we skip it. mb.cont() has the data
02808     write_control_msg(move(mb), sizeof(reader), DCPS::END_HISTORIC_SAMPLES,
02809                       DCPS::SequenceNumber::SEQUENCENUMBER_UNKNOWN());
02810   } else {
02811     ACE_ERROR((LM_ERROR,
02812                ACE_TEXT("(%P|%t) ERROR: Sedp::Writer::end_historic_samples")
02813                ACE_TEXT(" - Failed to allocate message block message\n")));
02814   }
02815 }
02816 
02817 void
02818 Sedp::Writer::write_control_msg(DCPS::Message_Block_Ptr payload,
02819                                 size_t size,
02820                                 DCPS::MessageId id,
02821                                 DCPS::SequenceNumber seq)
02822 {
02823   DCPS::DataSampleHeader header;
02824   set_header_fields(header, size, GUID_UNKNOWN, seq, false, id);
02825   // no need to serialize header since rtps_udp transport ignores it
02826   send_control(header, DCPS::move(payload));
02827 }
02828 
02829 void
02830 Sedp::Writer::set_header_fields(DCPS::DataSampleHeader& dsh,
02831                                 size_t size,
02832                                 const RepoId& reader,
02833                                 DCPS::SequenceNumber& sequence,
02834                                 bool historic_sample,
02835                                 DCPS::MessageId id)
02836 {
02837   dsh.message_id_ = id;
02838   dsh.byte_order_ = ACE_CDR_BYTE_ORDER;
02839   dsh.message_length_ = static_cast<ACE_UINT32>(size);
02840   dsh.publication_id_ = repo_id_;
02841 
02842   if (id != DCPS::END_HISTORIC_SAMPLES &&
02843       (reader == GUID_UNKNOWN ||
02844       sequence == DCPS::SequenceNumber::SEQUENCENUMBER_UNKNOWN())) {
02845     sequence = seq_++;
02846   }
02847 
02848   if (historic_sample && reader != GUID_UNKNOWN) {
02849     // retransmit with same seq# for durability
02850     dsh.historic_sample_ = true;
02851   }
02852 
02853   dsh.sequence_ = sequence;
02854 
02855   const ACE_Time_Value now = ACE_OS::gettimeofday();
02856   dsh.source_timestamp_sec_ = static_cast<ACE_INT32>(now.sec());
02857   dsh.source_timestamp_nanosec_ = now.usec() * 1000;
02858 }
02859 
02860 //-------------------------------------------------------------------------
02861 
02862 Sedp::Reader::~Reader()
02863 {}
02864 
02865 bool
02866 Sedp::Reader::assoc(const DCPS::AssociationData& publication)
02867 {
02868   return associate(publication, false);
02869 }
02870 
02871 
02872 // Implementing TransportReceiveListener
02873 
02874 static bool
02875 decode_parameter_list(const DCPS::ReceivedDataSample& sample,
02876                       DCPS::Serializer& ser,
02877                       const ACE_CDR::Octet& encap,
02878                       ParameterList& data)
02879 {
02880   if (sample.header_.key_fields_only_ && encap < 2) {
02881     GUID_t guid;
02882     if (!(ser >> guid)) return false;
02883     data.length(1);
02884     data[0].guid(guid);
02885     data[0]._d(PID_ENDPOINT_GUID);
02886   } else {
02887     return ser >> data;
02888   }
02889   return true;
02890 }
02891 
02892 void
02893 Sedp::Reader::data_received(const DCPS::ReceivedDataSample& sample)
02894 {
02895   if (shutting_down_.value()) return;
02896 
02897   switch (sample.header_.message_id_) {
02898   case DCPS::SAMPLE_DATA:
02899   case DCPS::DISPOSE_INSTANCE:
02900   case DCPS::UNREGISTER_INSTANCE:
02901   case DCPS::DISPOSE_UNREGISTER_INSTANCE: {
02902     const DCPS::MessageId id =
02903       static_cast<DCPS::MessageId>(sample.header_.message_id_);
02904 
02905     DCPS::Serializer ser(sample.sample_.get(),
02906                          sample.header_.byte_order_ != ACE_CDR_BYTE_ORDER,
02907                          DCPS::Serializer::ALIGN_CDR);
02908     ACE_CDR::Octet encap, dummy;
02909     ACE_CDR::UShort options;
02910     const bool ok = (ser >> ACE_InputCDR::to_octet(dummy))
02911               && (ser >> ACE_InputCDR::to_octet(encap))
02912               && (ser >> options);
02913     if (!ok) {
02914       ACE_ERROR((LM_ERROR, ACE_TEXT("ERROR: Sedp::Reader::data_received - ")
02915                  ACE_TEXT("failed to deserialize encap and options\n")));
02916       return;
02917     }
02918 
02919     // Ignore the 'encap' byte order since we use sample.header_.byte_order_
02920     // to determine whether or not to swap bytes.
02921 
02922     if (sample.header_.publication_id_.entityId == ENTITYID_SEDP_BUILTIN_PUBLICATIONS_WRITER) {
02923       ParameterList data;
02924       if (!decode_parameter_list(sample, ser, encap, data)) {
02925         ACE_ERROR((LM_ERROR, ACE_TEXT("ERROR: Sedp::Reader::data_received - ")
02926                    ACE_TEXT("failed to deserialize data\n")));
02927         return;
02928       }
02929 
02930       DCPS::unique_ptr<DCPS::DiscoveredWriterData> wdata(new DCPS::DiscoveredWriterData);
02931       if (ParameterListConverter::from_param_list(data, *wdata) < 0) {
02932         ACE_ERROR((LM_ERROR,
02933                    ACE_TEXT("(%P|%t) ERROR: Sedp::Reader::data_received - ")
02934                    ACE_TEXT("failed to convert from ParameterList ")
02935                    ACE_TEXT("to DiscoveredWriterData\n")));
02936         return;
02937       }
02938       sedp_.task_.enqueue(id, move(wdata));
02939 
02940 #if defined(OPENDDS_SECURITY)
02941     } else if (sample.header_.publication_id_.entityId == ENTITYID_SEDP_BUILTIN_PUBLICATIONS_SECURE_WRITER) {
02942       ParameterList data;
02943       if (!decode_parameter_list(sample, ser, encap, data)) {
02944         ACE_ERROR((LM_ERROR, ACE_TEXT("ERROR: Sedp::Reader::data_received - ")
02945                    ACE_TEXT("failed to deserialize data\n")));
02946         return;
02947       }
02948 
02949       DCPS::unique_ptr<DiscoveredWriterData_SecurityWrapper> wdata_secure(new DiscoveredWriterData_SecurityWrapper);
02950 
02951       if (ParameterListConverter::from_param_list(data, *wdata_secure) < 0) {
02952         ACE_ERROR((LM_ERROR,
02953                    ACE_TEXT("(%P|%t) ERROR: Sedp::Reader::data_received - ")
02954                    ACE_TEXT("failed to convert from ParameterList ")
02955                    ACE_TEXT("to DiscoveredWriterData_SecurityWrapper\n")));
02956         return;
02957       }
02958       sedp_.task_.enqueue(id, move(wdata_secure));
02959 #endif
02960 
02961     } else if (sample.header_.publication_id_.entityId == ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_WRITER) {
02962       ParameterList data;
02963       if (!decode_parameter_list(sample, ser, encap, data)) {
02964         ACE_ERROR((LM_ERROR, ACE_TEXT("ERROR: Sedp::Reader::data_received - ")
02965                    ACE_TEXT("failed to deserialize data\n")));
02966         return;
02967       }
02968 
02969       DCPS::unique_ptr<DCPS::DiscoveredReaderData> rdata(new DCPS::DiscoveredReaderData);
02970       if (ParameterListConverter::from_param_list(data, *rdata) < 0) {
02971         ACE_ERROR((LM_ERROR,
02972                    ACE_TEXT("(%P|%t) ERROR Sedp::Reader::data_received - ")
02973                    ACE_TEXT("failed to convert from ParameterList ")
02974                    ACE_TEXT("to DiscoveredReaderData\n")));
02975         return;
02976       }
02977       if (rdata->readerProxy.expectsInlineQos) {
02978         set_inline_qos(rdata->readerProxy.allLocators);
02979       }
02980       sedp_.task_.enqueue(id, move(rdata));
02981 
02982 #if defined(OPENDDS_SECURITY)
02983     } else if (sample.header_.publication_id_.entityId == ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_SECURE_WRITER) {
02984       ParameterList data;
02985       if (!decode_parameter_list(sample, ser, encap, data)) {
02986         ACE_ERROR((LM_ERROR, ACE_TEXT("ERROR: Sedp::Reader::data_received - ")
02987                    ACE_TEXT("failed to deserialize data\n")));
02988         return;
02989       }
02990 
02991       DCPS::unique_ptr<DiscoveredReaderData_SecurityWrapper> rdata(new DiscoveredReaderData_SecurityWrapper);
02992 
02993       if (ParameterListConverter::from_param_list(data, *rdata) < 0) {
02994         ACE_ERROR((LM_ERROR,
02995                    ACE_TEXT("(%P|%t) ERROR Sedp::Reader::data_received - ")
02996                    ACE_TEXT("failed to convert from ParameterList ")
02997                    ACE_TEXT("to DiscoveredReaderData_SecurityWrapper\n")));
02998         return;
02999       }
03000       if ((rdata->data).readerProxy.expectsInlineQos) {
03001         set_inline_qos((rdata->data).readerProxy.allLocators);
03002       }
03003       sedp_.task_.enqueue(id, move(rdata));
03004 #endif
03005 
03006     } else if (sample.header_.publication_id_.entityId == ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_WRITER
03007                && !sample.header_.key_fields_only_) {
03008       DCPS::unique_ptr<ParticipantMessageData> data(new ParticipantMessageData);
03009       if (!(ser >> *data)) {
03010         ACE_ERROR((LM_ERROR, ACE_TEXT("ERROR: Sedp::Reader::data_received - ")
03011                    ACE_TEXT("failed to deserialize data\n")));
03012         return;
03013       }
03014       sedp_.task_.enqueue(id, move(data));
03015 
03016 #if defined(OPENDDS_SECURITY)
03017     } else if (sample.header_.publication_id_.entityId == ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_SECURE_WRITER
03018                && !sample.header_.key_fields_only_) {
03019 
03020       DCPS::unique_ptr<ParticipantMessageData> data(new ParticipantMessageData);
03021 
03022       if (!(ser >> *data)) {
03023         ACE_ERROR((LM_ERROR, ACE_TEXT("ERROR: Sedp::Reader::data_received - ")
03024                    ACE_TEXT("failed to deserialize data\n")));
03025         return;
03026       }
03027       sedp_.task_.enqueue_participant_message_secure(id, move(data));
03028 
03029     } else if (sample.header_.publication_id_.entityId == ENTITYID_P2P_BUILTIN_PARTICIPANT_STATELESS_WRITER) {
03030 
03031       DCPS::unique_ptr<DDS::Security::ParticipantStatelessMessage> data(new DDS::Security::ParticipantStatelessMessage);
03032       ser.reset_alignment(); // https://issues.omg.org/browse/DDSIRTP23-63
03033       if (!(ser >> *data)) {
03034         ACE_ERROR((LM_ERROR, ACE_TEXT("ERROR: Sedp::Reader::data_received - ")
03035                    ACE_TEXT("failed to deserialize data\n")));
03036         return;
03037       }
03038       sedp_.task_.enqueue_stateless_message(id, move(data));
03039 
03040     } else if (sample.header_.publication_id_.entityId == ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_WRITER) {
03041 
03042       DCPS::unique_ptr<DDS::Security::ParticipantVolatileMessageSecure> data(new DDS::Security::ParticipantVolatileMessageSecure);
03043       ser.reset_alignment(); // https://issues.omg.org/browse/DDSIRTP23-63
03044       if (!(ser >> *data)) {
03045         ACE_ERROR((LM_ERROR, ACE_TEXT("ERROR: Sedp::Reader::data_received - ")
03046                    ACE_TEXT("failed to deserialize data\n")));
03047         return;
03048       }
03049       sedp_.task_.enqueue_volatile_message_secure(id, move(data));
03050 
03051     } else if (sample.header_.publication_id_.entityId == ENTITYID_SPDP_RELIABLE_BUILTIN_PARTICIPANT_SECURE_WRITER) {
03052 
03053       ParameterList data;
03054       if (!decode_parameter_list(sample, ser, encap, data)) {
03055         ACE_ERROR((LM_ERROR, ACE_TEXT("ERROR: Sedp::Reader::data_received - ")
03056                    ACE_TEXT("failed to deserialize data\n")));
03057         return;
03058       }
03059 
03060       DCPS::unique_ptr<Security::SPDPdiscoveredParticipantData> pdata(new Security::SPDPdiscoveredParticipantData);
03061 
03062       if (ParameterListConverter::from_param_list(data, *pdata) < 0) {
03063         ACE_ERROR((LM_ERROR,
03064                    ACE_TEXT("(%P|%t) ERROR: Sedp::Reader::data_received - ")
03065                    ACE_TEXT("failed to convert from ParameterList ")
03066                    ACE_TEXT("to Security::SPDPdiscoveredParticipantData\n")));
03067         return;
03068       }
03069       sedp_.task_.enqueue(id, move(pdata));
03070 #endif
03071 
03072     }
03073     break;
03074   }
03075 
03076   default:
03077     break;
03078   }
03079 }
03080 
03081 void
03082 Sedp::populate_discovered_writer_msg(
03083     DCPS::DiscoveredWriterData& dwd,
03084     const RepoId& publication_id,
03085     const LocalPublication& pub)
03086 {
03087   // Ignored on the wire dwd.ddsPublicationData.key
03088   // Ignored on the wire dwd.ddsPublicationData.participant_key
03089   OPENDDS_STRING topic_name = topic_names_[pub.topic_id_];
03090   dwd.ddsPublicationData.topic_name = topic_name.c_str();
03091   TopicDetails& topic_details = topics_[topic_name];
03092   dwd.ddsPublicationData.type_name = topic_details.data_type_.c_str();
03093   dwd.ddsPublicationData.durability = pub.qos_.durability;
03094   dwd.ddsPublicationData.durability_service = pub.qos_.durability_service;
03095   dwd.ddsPublicationData.deadline = pub.qos_.deadline;
03096   dwd.ddsPublicationData.latency_budget = pub.qos_.latency_budget;
03097   dwd.ddsPublicationData.liveliness = pub.qos_.liveliness;
03098   dwd.ddsPublicationData.reliability = pub.qos_.reliability;
03099   dwd.ddsPublicationData.lifespan = pub.qos_.lifespan;
03100   dwd.ddsPublicationData.user_data = pub.qos_.user_data;
03101   dwd.ddsPublicationData.ownership = pub.qos_.ownership;
03102   dwd.ddsPublicationData.ownership_strength = pub.qos_.ownership_strength;
03103   dwd.ddsPublicationData.destination_order = pub.qos_.destination_order;
03104   dwd.ddsPublicationData.presentation = pub.publisher_qos_.presentation;
03105   dwd.ddsPublicationData.partition = pub.publisher_qos_.partition;
03106   dwd.ddsPublicationData.topic_data = topic_details.qos_.topic_data;
03107   dwd.ddsPublicationData.group_data = pub.publisher_qos_.group_data;
03108   dwd.writerProxy.remoteWriterGuid = publication_id;
03109   // Ignore dwd.writerProxy.unicastLocatorList;
03110   // Ignore dwd.writerProxy.multicastLocatorList;
03111   dwd.writerProxy.allLocators = pub.trans_info_;
03112 }
03113 
03114 void
03115 Sedp::populate_discovered_reader_msg(
03116     DCPS::DiscoveredReaderData& drd,
03117     const RepoId& subscription_id,
03118     const LocalSubscription& sub)
03119 {
03120   // Ignored on the wire drd.ddsSubscription.key
03121   // Ignored on the wire drd.ddsSubscription.participant_key
03122   OPENDDS_STRING topic_name = topic_names_[sub.topic_id_];
03123   drd.ddsSubscriptionData.topic_name = topic_name.c_str();
03124   TopicDetails& topic_details = topics_[topic_name];
03125   drd.ddsSubscriptionData.type_name = topic_details.data_type_.c_str();
03126   drd.ddsSubscriptionData.durability = sub.qos_.durability;
03127   drd.ddsSubscriptionData.deadline = sub.qos_.deadline;
03128   drd.ddsSubscriptionData.latency_budget = sub.qos_.latency_budget;
03129   drd.ddsSubscriptionData.liveliness = sub.qos_.liveliness;
03130   drd.ddsSubscriptionData.reliability = sub.qos_.reliability;
03131   drd.ddsSubscriptionData.ownership = sub.qos_.ownership;
03132   drd.ddsSubscriptionData.destination_order = sub.qos_.destination_order;
03133   drd.ddsSubscriptionData.user_data = sub.qos_.user_data;
03134   drd.ddsSubscriptionData.time_based_filter = sub.qos_.time_based_filter;
03135   drd.ddsSubscriptionData.presentation = sub.subscriber_qos_.presentation;
03136   drd.ddsSubscriptionData.partition = sub.subscriber_qos_.partition;
03137   drd.ddsSubscriptionData.topic_data = topic_details.qos_.topic_data;
03138   drd.ddsSubscriptionData.group_data = sub.subscriber_qos_.group_data;
03139   drd.readerProxy.remoteReaderGuid = subscription_id;
03140   drd.readerProxy.expectsInlineQos = false;  // We never expect inline qos
03141   // Ignore drd.readerProxy.unicastLocatorList;
03142   // Ignore drd.readerProxy.multicastLocatorList;
03143   drd.readerProxy.allLocators = sub.trans_info_;
03144   drd.contentFilterProperty.contentFilteredTopicName =
03145     OPENDDS_STRING(DCPS::GuidConverter(subscription_id)).c_str();
03146   drd.contentFilterProperty.relatedTopicName = topic_name.c_str();
03147   drd.contentFilterProperty.filterClassName = ""; // PLConverter adds default
03148   drd.contentFilterProperty.filterExpression = sub.filterProperties.filterExpression;
03149   drd.contentFilterProperty.expressionParameters = sub.filterProperties.expressionParameters;
03150   for (DCPS::RepoIdSet::const_iterator writer =
03151         sub.remote_opendds_associations_.begin();
03152        writer != sub.remote_opendds_associations_.end();
03153        ++writer) {
03154     CORBA::ULong len = drd.readerProxy.associatedWriters.length();
03155     drd.readerProxy.associatedWriters.length(len + 1);
03156     drd.readerProxy.associatedWriters[len] = *writer;
03157   }
03158 }
03159 
03160 void
03161 Sedp::write_durable_publication_data(const RepoId& reader)
03162 {
03163   LocalPublicationIter pub, end = local_publications_.end();
03164   for (pub = local_publications_.begin(); pub != end; ++pub) {
03165     write_publication_data(pub->first, pub->second, reader);
03166   }
03167   publications_writer_.end_historic_samples(reader);
03168 
03169 #if defined(OPENDDS_SECURITY)
03170   publications_secure_writer_.end_historic_samples(reader);
03171 #endif
03172 
03173 }
03174 
03175 #if defined(OPENDDS_SECURITY)
03176 void
03177 Sedp::write_durable_publication_data_secure(const RepoId& reader)
03178 {
03179   LocalPublicationIter pub, end = local_publications_.end();
03180   for (pub = local_publications_.begin(); pub != end; ++pub) {
03181     write_publication_data(pub->first, pub->second, reader);
03182   }
03183   publications_secure_writer_.end_historic_samples(reader);
03184 }
03185 #endif
03186 
03187 void
03188 Sedp::write_durable_subscription_data(const RepoId& reader)
03189 {
03190   LocalSubscriptionIter sub, end = local_subscriptions_.end();
03191   for (sub = local_subscriptions_.begin(); sub != end; ++sub) {
03192     write_subscription_data(sub->first, sub->second, reader);
03193   }
03194   subscriptions_writer_.end_historic_samples(reader);
03195 
03196 #if defined(OPENDDS_SECURITY)
03197   subscriptions_secure_writer_.end_historic_samples(reader);
03198 #endif
03199 
03200 }
03201 
03202 #if defined(OPENDDS_SECURITY)
03203 void
03204 Sedp::write_durable_subscription_data_secure(const RepoId& reader)
03205 {
03206   LocalSubscriptionIter sub, end = local_subscriptions_.end();
03207   for (sub = local_subscriptions_.begin(); sub != end; ++sub) {
03208     write_subscription_data(sub->first, sub->second, reader);
03209   }
03210   subscriptions_secure_writer_.end_historic_samples(reader);
03211 }
03212 #endif
03213 
03214 void
03215 Sedp::write_durable_participant_message_data(const RepoId& reader)
03216 {
03217   LocalParticipantMessageIter part, end = local_participant_messages_.end();
03218   for (part = local_participant_messages_.begin(); part != end; ++part) {
03219     write_participant_message_data(part->first, part->second, reader);
03220   }
03221   participant_message_writer_.end_historic_samples(reader);
03222 }
03223 
03224 #if defined(OPENDDS_SECURITY)
03225 DDS::ReturnCode_t
03226 Sedp::write_stateless_message(DDS::Security::ParticipantStatelessMessage& msg,
03227                               const RepoId& reader)
03228 {
03229   static DCPS::SequenceNumber sequence = 0;
03230   msg.message_identity.sequence_number = static_cast<unsigned long>((++sequence).getValue());
03231   return participant_stateless_message_writer_.write_stateless_message(msg, reader, sequence);
03232 }
03233 
03234 DDS::ReturnCode_t
03235 Sedp::write_volatile_message(DDS::Security::ParticipantVolatileMessageSecure& msg,
03236                              const RepoId& reader)
03237 {
03238   static DCPS::SequenceNumber sequence = 0;
03239   msg.message_identity.sequence_number = static_cast<unsigned long>((++sequence).getValue());
03240   return participant_volatile_message_secure_writer_.write_volatile_message_secure(msg, reader, sequence);
03241 }
03242 
03243 DDS::ReturnCode_t
03244 Sedp::write_dcps_participant_secure(const Security::SPDPdiscoveredParticipantData& msg,
03245                                     const RepoId& reader)
03246 {
03247   static DCPS::SequenceNumber sequence = 0;
03248 
03249   DCPS::RepoId dcps_reader(reader);
03250   dcps_reader.entityId = ENTITYID_SPDP_RELIABLE_BUILTIN_PARTICIPANT_SECURE_READER;
03251 
03252   return dcps_participant_secure_writer_.write_dcps_participant_secure(msg, reader, ++sequence);
03253 }
03254 
03255 DDS::ReturnCode_t
03256 Sedp::write_dcps_participant_dispose(const RepoId& part)
03257 {
03258   return dcps_participant_secure_writer_.write_unregister_dispose(part, PID_PARTICIPANT_GUID);
03259 }
03260 #endif
03261 
03262 DDS::ReturnCode_t
03263 Sedp::write_publication_data(
03264     const RepoId& rid,
03265     LocalPublication& lp,
03266     const RepoId& reader)
03267 {
03268   DDS::ReturnCode_t result = DDS::RETCODE_OK;
03269 
03270 #if defined(OPENDDS_SECURITY)
03271   if (lp.security_attribs_.base.is_discovery_protected) {
03272     result = write_publication_data_secure(rid, lp, reader);
03273 
03274   } else {
03275 #endif
03276 
03277     result = write_publication_data_unsecure(rid, lp, reader);
03278 
03279 #if defined(OPENDDS_SECURITY)
03280   }
03281 #endif
03282 
03283   return result;
03284 }
03285 
03286 DDS::ReturnCode_t
03287 Sedp::write_publication_data_unsecure(
03288     const RepoId& rid,
03289     LocalPublication& lp,
03290     const RepoId& reader)
03291 {
03292   DDS::ReturnCode_t result = DDS::RETCODE_OK;
03293   if (spdp_.associated() && (reader != GUID_UNKNOWN ||
03294                              !associated_participants_.empty())) {
03295     DCPS::DiscoveredWriterData dwd;
03296     ParameterList plist;
03297     populate_discovered_writer_msg(dwd, rid, lp);
03298 
03299     // Convert to parameter list
03300     if (ParameterListConverter::to_param_list(dwd, plist, map_ipv4_to_ipv6())) {
03301       ACE_ERROR((LM_ERROR,
03302                  ACE_TEXT("(%P|%t) ERROR: Sedp::write_publication_data - ")
03303                  ACE_TEXT("Failed to convert DiscoveredWriterData ")
03304                  ACE_TEXT(" to ParameterList\n")));
03305       result = DDS::RETCODE_ERROR;
03306     }
03307     if (DDS::RETCODE_OK == result) {
03308       result = publications_writer_.write_parameter_list(plist, reader, lp.sequence_);
03309     }
03310   } else if (DCPS::DCPS_debug_level > 3) {
03311     ACE_DEBUG((LM_INFO, ACE_TEXT("(%P|%t) Sedp::write_publication_data - ")
03312                         ACE_TEXT("not currently associated, dropping msg.\n")));
03313   }
03314   return result;
03315 }
03316 
03317 #if defined(OPENDDS_SECURITY)
03318 DDS::ReturnCode_t
03319 Sedp::write_publication_data_secure(
03320     const RepoId& rid,
03321     LocalPublication& lp,
03322     const RepoId& reader)
03323 {
03324   DDS::ReturnCode_t result = DDS::RETCODE_OK;
03325   if (spdp_.associated() && (reader != GUID_UNKNOWN ||
03326                              !associated_participants_.empty())) {
03327 
03328     DiscoveredWriterData_SecurityWrapper dwd;
03329     ParameterList plist;
03330     populate_discovered_writer_msg(dwd.data, rid, lp);
03331 
03332     dwd.security_info.endpoint_security_attributes = security_attributes_to_bitmask(lp.security_attribs_);
03333     dwd.security_info.plugin_endpoint_security_attributes = lp.security_attribs_.plugin_endpoint_attributes;
03334 
03335     // Convert to parameter list
03336     if (ParameterListConverter::to_param_list(dwd, plist, map_ipv4_to_ipv6())) {
03337       ACE_ERROR((LM_ERROR,
03338                  ACE_TEXT("(%P|%t) ERROR: Sedp::write_publication_data - ")
03339                  ACE_TEXT("Failed to convert DiscoveredWriterData ")
03340                  ACE_TEXT("to ParameterList\n")));
03341       result = DDS::RETCODE_ERROR;
03342     }
03343     if (DDS::RETCODE_OK == result) {
03344       RepoId effective_reader = reader;
03345       if (reader != GUID_UNKNOWN)
03346         effective_reader.entityId = ENTITYID_SEDP_BUILTIN_PUBLICATIONS_SECURE_READER;
03347       result = publications_secure_writer_.write_parameter_list(plist, effective_reader, lp.sequence_);
03348     }
03349   } else if (DCPS::DCPS_debug_level > 3) {
03350     ACE_DEBUG((LM_INFO, ACE_TEXT("(%P|%t) Sedp::write_publication_data - ")
03351                         ACE_TEXT("not currently associated, dropping msg.\n")));
03352   }
03353   return result;
03354 }
03355 #endif
03356 
03357 DDS::ReturnCode_t
03358 Sedp::write_subscription_data(
03359     const RepoId& rid,
03360     LocalSubscription& ls,
03361     const RepoId& reader)
03362 {
03363   DDS::ReturnCode_t result = DDS::RETCODE_OK;
03364 
03365 #if defined(OPENDDS_SECURITY)
03366   if (ls.security_attribs_.base.is_discovery_protected) {
03367     result = write_subscription_data_secure(rid, ls, reader);
03368 
03369   } else {
03370 #endif
03371 
03372     result = write_subscription_data_unsecure(rid, ls, reader);
03373 
03374 #if defined(OPENDDS_SECURITY)
03375   }
03376 #endif
03377 
03378   return result;
03379 }
03380 
03381 DDS::ReturnCode_t
03382 Sedp::write_subscription_data_unsecure(
03383     const RepoId& rid,
03384     LocalSubscription& ls,
03385     const RepoId& reader)
03386 {
03387   DDS::ReturnCode_t result = DDS::RETCODE_OK;
03388   if (spdp_.associated() && (reader != GUID_UNKNOWN ||
03389                              !associated_participants_.empty())) {
03390     DCPS::DiscoveredReaderData drd;
03391     ParameterList plist;
03392     populate_discovered_reader_msg(drd, rid, ls);
03393 
03394     // Convert to parameter list
03395     if (ParameterListConverter::to_param_list(drd, plist, map_ipv4_to_ipv6())) {
03396       ACE_ERROR((LM_ERROR,
03397                  ACE_TEXT("(%P|%t) ERROR: Sedp::write_subscription_data - ")
03398                  ACE_TEXT("Failed to convert DiscoveredReaderData ")
03399                  ACE_TEXT("to ParameterList\n")));
03400       result = DDS::RETCODE_ERROR;
03401     }
03402     if (DDS::RETCODE_OK == result) {
03403       result = subscriptions_writer_.write_parameter_list(plist, reader, ls.sequence_);
03404     }
03405   } else if (DCPS::DCPS_debug_level > 3) {
03406     ACE_DEBUG((LM_INFO, ACE_TEXT("(%P|%t) Sedp::write_subscription_data - ")
03407                         ACE_TEXT("not currently associated, dropping msg.\n")));
03408   }
03409   return result;
03410 }
03411 
03412 #if defined(OPENDDS_SECURITY)
03413 DDS::ReturnCode_t
03414 Sedp::write_subscription_data_secure(
03415     const RepoId& rid,
03416     LocalSubscription& ls,
03417     const RepoId& reader)
03418 {
03419   DDS::ReturnCode_t result = DDS::RETCODE_OK;
03420   if (spdp_.associated() && (reader != GUID_UNKNOWN ||
03421                              !associated_participants_.empty())) {
03422 
03423     DiscoveredReaderData_SecurityWrapper drd;
03424     ParameterList plist;
03425     populate_discovered_reader_msg(drd.data, rid, ls);
03426 
03427     drd.security_info.endpoint_security_attributes = security_attributes_to_bitmask(ls.security_attribs_);
03428     drd.security_info.plugin_endpoint_security_attributes = ls.security_attribs_.plugin_endpoint_attributes;
03429 
03430     // Convert to parameter list
03431     if (ParameterListConverter::to_param_list(drd, plist, map_ipv4_to_ipv6())) {
03432       ACE_ERROR((LM_ERROR,
03433                  ACE_TEXT("(%P|%t) ERROR: Sedp::write_subscription_data - ")
03434                  ACE_TEXT("Failed to convert DiscoveredReaderData ")
03435                  ACE_TEXT("to ParameterList\n")));
03436       result = DDS::RETCODE_ERROR;
03437     }
03438     if (DDS::RETCODE_OK == result) {
03439       RepoId effective_reader = reader;
03440       if (reader != GUID_UNKNOWN)
03441         effective_reader.entityId = ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_SECURE_READER;
03442       result = subscriptions_secure_writer_.write_parameter_list(plist, effective_reader, ls.sequence_);
03443     }
03444   } else if (DCPS::DCPS_debug_level > 3) {
03445     ACE_DEBUG((LM_INFO, ACE_TEXT("(%P|%t) Sedp::write_subscription_data - ")
03446                         ACE_TEXT("not currently associated, dropping msg.\n")));
03447   }
03448   return result;
03449 }
03450 #endif
03451 
03452 DDS::ReturnCode_t
03453 Sedp::write_participant_message_data(
03454     const RepoId& rid,
03455     LocalParticipantMessage& pm,
03456     const RepoId& reader)
03457 {
03458   DDS::ReturnCode_t result = DDS::RETCODE_OK;
03459   if (spdp_.associated() && (reader != GUID_UNKNOWN ||
03460                              !associated_participants_.empty())) {
03461     ParticipantMessageData pmd;
03462     pmd.participantGuid = rid;
03463     result = participant_message_writer_.write_participant_message(pmd, reader, pm.sequence_);
03464   } else if (DCPS::DCPS_debug_level > 3) {
03465     ACE_DEBUG((LM_INFO, ACE_TEXT("(%P|%t) Sedp::write_participant_message_data - ")
03466                ACE_TEXT("not currently associated, dropping msg.\n")));
03467   }
03468   return result;
03469 }
03470 
03471 void
03472 Sedp::set_inline_qos(DCPS::TransportLocatorSeq& locators)
03473 {
03474   const OPENDDS_STRING rtps_udp = "rtps_udp";
03475   for (CORBA::ULong i = 0; i < locators.length(); ++i) {
03476     if (locators[i].transport_type.in() == rtps_udp) {
03477       const CORBA::ULong len = locators[i].data.length();
03478       locators[i].data.length(len + 1);
03479       locators[i].data[len] = CORBA::Octet(1);
03480     }
03481   }
03482 }
03483 
03484 void
03485 Sedp::acknowledge()
03486 {
03487   task_.acknowledge();
03488 }
03489 
03490 void
03491 Sedp::Task::enqueue(DCPS::MessageId id, DCPS::unique_ptr<Security::SPDPdiscoveredParticipantData> pdata)
03492 {
03493   if (spdp_->shutting_down()) { return; }
03494 
03495   Msg::MsgType type = Msg::MSG_PARTICIPANT;
03496 
03497 #if defined(OPENDDS_SECURITY)
03498   if (pdata->dataKind == Security::DPDK_SECURE) {
03499     type = Msg::MSG_DCPS_PARTICIPANT_SECURE;
03500   }
03501 #endif
03502 
03503   putq(new Msg(type, id, pdata.release()));
03504 }
03505 
03506 void
03507 Sedp::Task::enqueue(DCPS::MessageId id, DCPS::unique_ptr<DCPS::DiscoveredWriterData> wdata)
03508 {
03509   if (spdp_->shutting_down()) { return; }
03510   putq(new Msg(Msg::MSG_WRITER, id, wdata.release()));
03511 }
03512 
03513 #if defined(OPENDDS_SECURITY)
03514 void
03515 Sedp::Task::enqueue(DCPS::MessageId id, DCPS::unique_ptr<DiscoveredWriterData_SecurityWrapper> wrapper)
03516 {
03517   if (spdp_->shutting_down()) { return; }
03518   putq(new Msg(Msg::MSG_WRITER_SECURE, id, wrapper.release()));
03519 }
03520 #endif
03521 
03522 void
03523 Sedp::Task::enqueue(DCPS::MessageId id, DCPS::unique_ptr<DCPS::DiscoveredReaderData> rdata)
03524 {
03525   if (spdp_->shutting_down()) { return; }
03526   putq(new Msg(Msg::MSG_READER, id, rdata.release()));
03527 }
03528 
03529 #if defined(OPENDDS_SECURITY)
03530 void
03531 Sedp::Task::enqueue(DCPS::MessageId id, DCPS::unique_ptr<DiscoveredReaderData_SecurityWrapper> wrapper)
03532 {
03533   if (spdp_->shutting_down()) { return; }
03534   putq(new Msg(Msg::MSG_READER_SECURE, id, wrapper.release()));
03535 }
03536 #endif
03537 
03538 void
03539 Sedp::Task::enqueue(DCPS::MessageId id, DCPS::unique_ptr<ParticipantMessageData> data)
03540 {
03541   if (spdp_->shutting_down()) { return; }
03542   putq(new Msg(Msg::MSG_PARTICIPANT_DATA, id, data.release()));
03543 }
03544 
03545 void
03546 Sedp::Task::enqueue(Msg::MsgType which_bit, const DDS::InstanceHandle_t bit_ih)
03547 {
03548 #ifndef DDS_HAS_MINIMUM_BIT
03549   if (spdp_->shutting_down()) { return; }
03550   putq(new Msg(which_bit, DCPS::DISPOSE_INSTANCE, bit_ih));
03551 #else
03552   ACE_UNUSED_ARG(which_bit);
03553   ACE_UNUSED_ARG(bit_ih);
03554 #endif /* DDS_HAS_MINIMUM_BIT */
03555 }
03556 
03557 #if defined(OPENDDS_SECURITY)
03558 void
03559 Sedp::Task::enqueue_participant_message_secure(DCPS::MessageId id, DCPS::unique_ptr<ParticipantMessageData> data)
03560 {
03561   if (spdp_->shutting_down()) { return; }
03562   putq(new Msg(Msg::MSG_PARTICIPANT_DATA_SECURE, id, data.release()));
03563 }
03564 
03565 void
03566 Sedp::Task::enqueue_stateless_message(DCPS::MessageId id, DCPS::unique_ptr<DDS::Security::ParticipantStatelessMessage> data)
03567 {
03568   if (spdp_->shutting_down()) { return; }
03569   putq(new Msg(Msg::MSG_PARTICIPANT_STATELESS_DATA, id, data.release()));
03570 }
03571 
03572 void
03573 Sedp::Task::enqueue_volatile_message_secure(DCPS::MessageId id, DCPS::unique_ptr<DDS::Security::ParticipantVolatileMessageSecure> data)
03574 {
03575   if (spdp_->shutting_down()) { return; }
03576   putq(new Msg(Msg::MSG_PARTICIPANT_VOLATILE_SECURE, id, data.release()));
03577 }
03578 #endif
03579 
03580 int
03581 Sedp::Task::svc()
03582 {
03583   for (Msg* msg = 0; getq(msg) != -1; /*no increment*/) {
03584     if (DCPS::DCPS_debug_level > 5) {
03585       ACE_DEBUG((LM_DEBUG, "(%P|%t) Sedp::Task::svc "
03586         "got message from queue type %d\n", msg->type_));
03587     }
03588     DCPS::unique_ptr<Msg> delete_the_msg(msg);
03589     switch (msg->type_) {
03590       case Msg::MSG_PARTICIPANT:
03591       svc_i(msg->dpdata_);
03592       break;
03593 
03594     case Msg::MSG_WRITER:
03595       svc_i(msg->id_, msg->wdata_);
03596       break;
03597 
03598 #if defined(OPENDDS_SECURITY)
03599     case Msg::MSG_WRITER_SECURE:
03600       svc_i(msg->id_, msg->wdata_secure_);
03601       break;
03602 #endif
03603 
03604     case Msg::MSG_READER:
03605       svc_i(msg->id_, msg->rdata_);
03606       break;
03607 
03608 #if defined(OPENDDS_SECURITY)
03609     case Msg::MSG_READER_SECURE:
03610       svc_i(msg->id_, msg->rdata_secure_);
03611       break;
03612 #endif
03613 
03614     case Msg::MSG_PARTICIPANT_DATA:
03615       svc_i(msg->id_, msg->pmdata_);
03616       break;
03617 
03618 #if defined(OPENDDS_SECURITY)
03619     case Msg::MSG_PARTICIPANT_DATA_SECURE:
03620       svc_participant_message_data_secure(msg->id_, msg->pmdata_);
03621       break;
03622 
03623     case Msg::MSG_PARTICIPANT_STATELESS_DATA:
03624       svc_stateless_message(msg->id_, msg->pgmdata_);
03625       break;
03626 
03627     case Msg::MSG_PARTICIPANT_VOLATILE_SECURE:
03628       svc_volatile_message_secure(msg->id_, msg->pgmdata_);
03629       break;
03630 
03631     case Msg::MSG_DCPS_PARTICIPANT_SECURE:
03632       svc_secure_i(msg->id_, msg->dpdata_);
03633       break;
03634 #endif
03635 
03636     case Msg::MSG_REMOVE_FROM_PUB_BIT:
03637     case Msg::MSG_REMOVE_FROM_SUB_BIT:
03638       svc_i(msg->type_, msg->ih_);
03639       break;
03640 
03641     case Msg::MSG_FINI_BIT:
03642       // acknowledge that fini_bit has been called (this just ensures that
03643       // this task is not in the act of using one of BIT Subscriber's Data
03644       // Readers while it is being deleted
03645       spdp_->wait_for_acks().ack();
03646       break;
03647 
03648     case Msg::MSG_STOP:
03649       if (DCPS::DCPS_debug_level > 3) {
03650         ACE_DEBUG((LM_INFO, ACE_TEXT("(%P|%t) Sedp::Task::svc - ")
03651                             ACE_TEXT("received MSG_STOP. Task exiting\n")));
03652       }
03653       return 0;
03654     }
03655 
03656     if (DCPS::DCPS_debug_level > 5) {
03657       ACE_DEBUG((LM_DEBUG, "(%P|%t) Sedp::Task::svc done with message\n"));
03658     }
03659   }
03660   if (DCPS::DCPS_debug_level > 3) {
03661     ACE_DEBUG((LM_INFO, ACE_TEXT("(%P|%t) Sedp::Task::svc - ")
03662                         ACE_TEXT("Task exiting.\n")));
03663   }
03664   return 0;
03665 }
03666 
03667 Sedp::Task::~Task()
03668 {
03669   shutdown();
03670 }
03671 
03672 bool
03673 Sedp::shutting_down() const
03674 {
03675   return spdp_.shutting_down();
03676 }
03677 
03678 void
03679 Sedp::populate_transport_locator_sequence(DCPS::TransportLocatorSeq*& rTls,
03680                                           DiscoveredSubscriptionIter& dsi,
03681                                           const RepoId& reader)
03682 {
03683   DCPS::LocatorSeq locs;
03684   bool participantExpectsInlineQos = false;
03685   RepoId remote_participant = reader;
03686   remote_participant.entityId = ENTITYID_PARTICIPANT;
03687   const bool participant_found =
03688     spdp_.get_default_locators(remote_participant, locs,
03689                                participantExpectsInlineQos);
03690   if (!rTls->length()) {     // if no locators provided, add the default
03691     if (!participant_found) {
03692       defer_match_endpoints_.insert(reader);
03693       return;
03694     } else if (locs.length()) {
03695       size_t size = 0, padding = 0;
03696       DCPS::gen_find_size(locs, size, padding);
03697 
03698       ACE_Message_Block mb_locator(size + 1);   // Add space for boolean
03699       using DCPS::Serializer;
03700       Serializer ser_loc(&mb_locator,
03701                          ACE_CDR_BYTE_ORDER,
03702                          Serializer::ALIGN_CDR);
03703       ser_loc << locs;
03704       const bool readerExpectsInlineQos =
03705         dsi->second.reader_data_.readerProxy.expectsInlineQos;
03706       ser_loc << ACE_OutputCDR::from_boolean(participantExpectsInlineQos
03707                                              || readerExpectsInlineQos);
03708 
03709       DCPS::TransportLocator tl;
03710       tl.transport_type = "rtps_udp";
03711       message_block_to_sequence (mb_locator, tl.data);
03712       rTls->length(1);
03713       (*rTls)[0] = tl;
03714     } else {
03715       ACE_DEBUG((LM_WARNING,
03716                  ACE_TEXT("(%P|%t) Sedp::match - ")
03717                  ACE_TEXT("remote reader found with no locators ")
03718                  ACE_TEXT("and no default locators\n")));
03719     }
03720   }
03721 }
03722 
03723 void
03724 Sedp::populate_transport_locator_sequence(DCPS::TransportLocatorSeq*& wTls,
03725                                           DiscoveredPublicationIter& /*dpi*/,
03726                                           const RepoId& writer)
03727 {
03728   DCPS::LocatorSeq locs;
03729   bool participantExpectsInlineQos = false;
03730   RepoId remote_participant = writer;
03731   remote_participant.entityId = ENTITYID_PARTICIPANT;
03732   const bool participant_found =
03733     spdp_.get_default_locators(remote_participant, locs,
03734                                participantExpectsInlineQos);
03735   if (!wTls->length()) {     // if no locators provided, add the default
03736     if (!participant_found) {
03737       defer_match_endpoints_.insert(writer);
03738       return;
03739     } else if (locs.length()) {
03740       size_t size = 0, padding = 0;
03741       DCPS::gen_find_size(locs, size, padding);
03742 
03743       ACE_Message_Block mb_locator(size + 1);   // Add space for boolean
03744       using DCPS::Serializer;
03745       Serializer ser_loc(&mb_locator,
03746                          ACE_CDR_BYTE_ORDER,
03747                          Serializer::ALIGN_CDR);
03748       ser_loc << locs;
03749       ser_loc << ACE_OutputCDR::from_boolean(participantExpectsInlineQos);
03750 
03751       DCPS::TransportLocator tl;
03752       tl.transport_type = "rtps_udp";
03753       message_block_to_sequence (mb_locator, tl.data);
03754       wTls->length(1);
03755       (*wTls)[0] = tl;
03756     } else {
03757       ACE_DEBUG((LM_WARNING,
03758                  ACE_TEXT("(%P|%t) Sedp::match - ")
03759                  ACE_TEXT("remote writer found with no locators ")
03760                  ACE_TEXT("and no default locators\n")));
03761     }
03762   }
03763 }
03764 
03765 #if defined(OPENDDS_SECURITY)
03766 DCPS::TransportLocatorSeq
03767 Sedp::add_security_info(const DCPS::TransportLocatorSeq& locators,
03768                         const RepoId& writer, const RepoId& reader)
03769 {
03770   using DCPS::Serializer;
03771 
03772   if (std::memcmp(writer.guidPrefix, reader.guidPrefix,
03773                   sizeof(DCPS::GuidPrefix_t)) == 0) {
03774     // writer and reader belong to the same participant, no security needed
03775     return locators;
03776   }
03777 
03778   DDS::Security::ParticipantCryptoHandle part_handle = DDS::HANDLE_NIL;
03779   DDS::Security::DatawriterCryptoHandle dw_handle = DDS::HANDLE_NIL;
03780   DDS::Security::DatareaderCryptoHandle dr_handle = DDS::HANDLE_NIL;
03781 
03782   if (local_reader_crypto_handles_.find(reader) != local_reader_crypto_handles_.end() &&
03783       remote_writer_crypto_handles_.find(writer) != remote_writer_crypto_handles_.end()) {
03784     dr_handle = local_reader_crypto_handles_[reader];
03785     dw_handle = remote_writer_crypto_handles_[writer];
03786     DCPS::RepoId part = writer;
03787     part.entityId = ENTITYID_PARTICIPANT;
03788     part_handle = spdp_.lookup_participant_crypto_info(part).first;
03789   } else if (local_writer_crypto_handles_.find(writer) != local_writer_crypto_handles_.end() &&
03790              remote_reader_crypto_handles_.find(reader) != remote_reader_crypto_handles_.end()) {
03791     dw_handle = local_writer_crypto_handles_[writer];
03792     dr_handle = remote_reader_crypto_handles_[reader];
03793     DCPS::RepoId part = reader;
03794     part.entityId = ENTITYID_PARTICIPANT;
03795     part_handle = spdp_.lookup_participant_crypto_info(part).first;
03796   }
03797 
03798   if (part_handle == DDS::HANDLE_NIL) {
03799     // security not enabled for this discovered participant
03800     return locators;
03801   }
03802 
03803   DCPS::TransportLocatorSeq_var newLoc;
03804   DDS::OctetSeq added;
03805   for (unsigned int i = 0; i < locators.length(); ++i) {
03806     if (std::strcmp(locators[i].transport_type.in(), "rtps_udp") == 0) {
03807       if (!newLoc) {
03808         newLoc = new DCPS::TransportLocatorSeq(locators);
03809 
03810         DDS::OctetSeq handleOctets = handle_to_octets(part_handle);
03811         const DDS::BinaryProperty_t prop = {BLOB_PROP_PART_CRYPTO_HANDLE,
03812                                             handleOctets, true /*serialize*/};
03813         DDS::OctetSeq handleOctetsDw = handle_to_octets(dw_handle);
03814         const DDS::BinaryProperty_t dw_p = {BLOB_PROP_DW_CRYPTO_HANDLE,
03815                                             handleOctetsDw, true /*serialize*/};
03816         DDS::OctetSeq handleOctetsDr = handle_to_octets(dr_handle);
03817         const DDS::BinaryProperty_t dr_p = {BLOB_PROP_DR_CRYPTO_HANDLE,
03818                                             handleOctetsDr, true /*serialize*/};
03819         size_t size = 0, padding = 0;
03820         DCPS::gen_find_size(prop, size, padding);
03821         if (dw_handle != DDS::HANDLE_NIL) {
03822           DCPS::gen_find_size(dw_p, size, padding);
03823         }
03824         if (dr_handle != DDS::HANDLE_NIL) {
03825           DCPS::gen_find_size(dr_p, size, padding);
03826         }
03827         ACE_Message_Block mb(size + padding);
03828         Serializer ser(&mb, ACE_CDR_BYTE_ORDER, Serializer::ALIGN_CDR);
03829         ser << prop;
03830         if (dw_handle != DDS::HANDLE_NIL) {
03831           ser << dw_p;
03832         }
03833         if (dr_handle != DDS::HANDLE_NIL) {
03834           ser << dr_p;
03835         }
03836         added.length(mb.size());
03837         std::memcpy(added.get_buffer(), mb.rd_ptr(), mb.size());
03838       }
03839 
03840       const unsigned int prevLen = newLoc[i].data.length();
03841       newLoc[i].data.length(prevLen + added.length());
03842       std::memcpy(newLoc[i].data.get_buffer() + prevLen, added.get_buffer(),
03843                   added.length());
03844     }
03845   }
03846 
03847   return newLoc ? newLoc : locators;
03848 }
03849 #endif
03850 
03851 bool
03852 Sedp::defer_writer(const RepoId& writer,
03853                    const RepoId& writer_participant)
03854 {
03855   if (!associated_participants_.count(writer_participant)) {
03856     if (DCPS::DCPS_debug_level > 3) {
03857       ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) Sedp::match - ")
03858                  ACE_TEXT("remote writer deferred\n")));
03859     }
03860     defer_match_endpoints_.insert(writer);
03861     return true;
03862   }
03863   return false;
03864 }
03865 
03866 bool
03867 Sedp::defer_reader(const RepoId& reader,
03868                    const RepoId& reader_participant)
03869 {
03870   if (!associated_participants_.count(reader_participant)) {
03871     if (DCPS::DCPS_debug_level > 3) {
03872       ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) Sedp::match - ")
03873                  ACE_TEXT("remote reader deferred\n")));
03874     }
03875     defer_match_endpoints_.insert(reader);
03876     return true;
03877   }
03878   return false;
03879 }
03880 
03881 #if defined(OPENDDS_SECURITY)
03882 DDS::Security::DatawriterCryptoHandle
03883 Sedp::generate_remote_matched_writer_crypto_handle(const RepoId& writer_part, const DDS::Security::DatareaderCryptoHandle& drch)
03884 {
03885   DDS::Security::DatawriterCryptoHandle result = DDS::HANDLE_NIL;
03886 
03887   DDS::Security::CryptoKeyFactory_var key_factory = spdp_.get_security_config()->get_crypto_key_factory();
03888 
03889   Spdp::ParticipantCryptoInfoPair info = spdp_.lookup_participant_crypto_info(writer_part);
03890 
03891   if (info.first != DDS::HANDLE_NIL && info.second) {
03892     DDS::Security::SecurityException se = {"", 0, 0};
03893     result = key_factory->register_matched_remote_datawriter(drch, info.first, info.second, se);
03894     if (result == DDS::HANDLE_NIL) {
03895       ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: Sedp::generate_remote_matched_writer_crypto_handle() - ")
03896         ACE_TEXT("Failure calling register_matched_remote_datawriter(). Security Exception[%d.%d]: %C\n"),
03897           se.code, se.minor_code, se.message.in()));
03898     }
03899   } else {
03900     ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: Sedp::generate_remote_matched_writer_crypto_handle() - ")
03901       ACE_TEXT("Unable to lookup remote participant crypto info.\n")));
03902   }
03903   return result;
03904 }
03905 
03906 DDS::Security::DatareaderCryptoHandle
03907 Sedp::generate_remote_matched_reader_crypto_handle(const RepoId& reader_part,
03908                                                    const DDS::Security::DatawriterCryptoHandle& dwch,
03909                                                    bool relay_only)
03910 {
03911   DDS::Security::DatareaderCryptoHandle result = DDS::HANDLE_NIL;
03912 
03913   DDS::Security::CryptoKeyFactory_var key_factory = spdp_.get_security_config()->get_crypto_key_factory();
03914 
03915   Spdp::ParticipantCryptoInfoPair info = spdp_.lookup_participant_crypto_info(reader_part);
03916 
03917   if (info.first != DDS::HANDLE_NIL && info.second) {
03918     DDS::Security::SecurityException se = {"", 0, 0};
03919     result = key_factory->register_matched_remote_datareader(dwch, info.first, info.second, relay_only, se);
03920     if (result == DDS::HANDLE_NIL) {
03921       ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: Sedp::generate_remote_matched_reader_crypto_handle() - ")
03922         ACE_TEXT("Failure calling register_matched_remote_datareader(). Security Exception[%d.%d]: %C\n"),
03923           se.code, se.minor_code, se.message.in()));
03924     }
03925   } else {
03926     ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: Sedp::generate_remote_matched_reader_crypto_handle() - ")
03927       ACE_TEXT("Unable to lookup remote participant crypto info.\n")));
03928   }
03929   return result;
03930 }
03931 
03932 void
03933 Sedp::create_and_send_datareader_crypto_tokens(const DDS::Security::DatareaderCryptoHandle& drch,
03934                                                const RepoId& local_reader,
03935                                                const DDS::Security::DatawriterCryptoHandle& dwch,
03936                                                const RepoId& remote_writer)
03937 {
03938   DDS::Security::SecurityException se = {"", 0, 0};
03939   DDS::Security::CryptoKeyExchange_var key_exchange = spdp_.get_security_config()->get_crypto_key_exchange();
03940 
03941   DDS::Security::DatareaderCryptoTokenSeq drcts;
03942 
03943   if (!key_exchange->create_local_datareader_crypto_tokens(drcts, drch, dwch, se)) {
03944     ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: ")
03945       ACE_TEXT("Sedp::create_and_send_datareader_crypto_tokens() - ")
03946       ACE_TEXT("Unable to create local datareader crypto tokens with crypto key exchange plugin. ")
03947       ACE_TEXT("Security Exception[%d.%d]: %C\n"), se.code, se.minor_code, se.message.in()));
03948     return;
03949   }
03950 
03951   if (drcts.length() != 0) {
03952     DCPS::RepoId remote_part = remote_writer;
03953     remote_part.entityId = ENTITYID_PARTICIPANT;
03954 
03955     DCPS::RepoId local_volatile_writer = participant_id_;
03956     local_volatile_writer.entityId = ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_WRITER;
03957 
03958     DCPS::RepoId remote_volatile_reader = remote_part;
03959     remote_volatile_reader.entityId = ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_READER;
03960 
03961     DDS::Security::ParticipantVolatileMessageSecure msg;
03962     msg.message_identity.source_guid = local_volatile_writer;
03963     msg.message_class_id = DDS::Security::GMCLASSID_SECURITY_DATAREADER_CRYPTO_TOKENS;
03964     msg.destination_participant_guid = remote_part;
03965     msg.destination_endpoint_guid = remote_writer;
03966     msg.source_endpoint_guid = local_reader;
03967     msg.message_data = reinterpret_cast<const DDS::Security::DataHolderSeq&>(drcts);
03968 
03969     if (write_volatile_message(msg, remote_volatile_reader) != DDS::RETCODE_OK) {
03970       ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: Sedp::create_and_send_datareader_crypto_tokens() - ")
03971         ACE_TEXT("Unable to write volatile message.\n")));
03972       return;
03973     }
03974   }
03975   return;
03976 }
03977 
03978 void
03979 Sedp::create_and_send_datawriter_crypto_tokens(const DDS::Security::DatawriterCryptoHandle& dwch,
03980                                                const RepoId& local_writer,
03981                                                const DDS::Security::DatareaderCryptoHandle& drch,
03982                                                const RepoId& remote_reader)
03983 {
03984   DDS::Security::SecurityException se = {"", 0, 0};
03985   DDS::Security::CryptoKeyExchange_var key_exchange = spdp_.get_security_config()->get_crypto_key_exchange();
03986 
03987   DDS::Security::DatawriterCryptoTokenSeq dwcts;
03988 
03989   if (key_exchange->create_local_datawriter_crypto_tokens(dwcts, dwch, drch, se) == false) {
03990     ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: ")
03991       ACE_TEXT("Sedp::create_and_send_datawriter_crypto_tokens() - ")
03992       ACE_TEXT("Unable to create local datawriter crypto tokens with crypto key exchange plugin. Security Exception[%d.%d]: %C\n"),
03993         se.code, se.minor_code, se.message.in()));
03994     return;
03995   }
03996 
03997   if (dwcts.length() != 0) {
03998     DCPS::RepoId remote_part = remote_reader;
03999     remote_part.entityId = ENTITYID_PARTICIPANT;
04000 
04001     DCPS::RepoId local_volatile_writer = participant_id_;
04002     local_volatile_writer.entityId = ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_WRITER;
04003 
04004     DCPS::RepoId remote_volatile_reader = remote_part;
04005     remote_volatile_reader.entityId = ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_READER;
04006 
04007     DDS::Security::ParticipantVolatileMessageSecure msg;
04008     msg.message_identity.source_guid = local_volatile_writer;
04009     msg.message_class_id = DDS::Security::GMCLASSID_SECURITY_DATAWRITER_CRYPTO_TOKENS;
04010     msg.destination_participant_guid = remote_part;
04011     msg.destination_endpoint_guid = remote_reader;
04012     msg.source_endpoint_guid = local_writer;
04013     msg.message_data = reinterpret_cast<const DDS::Security::DataHolderSeq&>(dwcts);
04014 
04015     if (write_volatile_message(msg, remote_volatile_reader) != DDS::RETCODE_OK) {
04016       ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: Sedp::create_and_send_datawriter_crypto_tokens() - ")
04017         ACE_TEXT("Unable to write volatile message.\n")));
04018       return;
04019     }
04020   }
04021   return;
04022 }
04023 
04024 void
04025 Sedp::handle_datawriter_crypto_tokens(const DDS::Security::ParticipantVolatileMessageSecure& msg) {
04026   DDS::Security::SecurityException se = {"", 0, 0};
04027   Security::CryptoKeyExchange_var key_exchange = spdp_.get_security_config()->get_crypto_key_exchange();
04028 
04029   // If this message wasn't intended for us, ignore volatile message
04030   if (msg.destination_participant_guid != participant_id_ || !msg.message_data.length()) {
04031     return;
04032   }
04033 
04034   ACE_Guard<ACE_Thread_Mutex> g(lock_, false);
04035 
04036   DCPS::DatawriterCryptoHandleMap::const_iterator w_iter = remote_writer_crypto_handles_.find(msg.source_endpoint_guid);
04037   DCPS::DatareaderCryptoHandleMap::const_iterator r_iter = local_reader_crypto_handles_.find(msg.destination_endpoint_guid);
04038 
04039   DDS::Security::DatawriterCryptoTokenSeq dwcts;
04040   dwcts = reinterpret_cast<const DDS::Security::DatawriterCryptoTokenSeq&>(msg.message_data);
04041 
04042   if (w_iter == remote_writer_crypto_handles_.end()) {
04043     ACE_DEBUG((LM_DEBUG,
04044       ACE_TEXT("(%P|%t) Sedp::handle_datawriter_crypto_tokens() - ")
04045       ACE_TEXT("received tokens for unknown remote writer. Caching.\n")));
04046 
04047     pending_remote_writer_crypto_tokens_[msg.source_endpoint_guid] = dwcts;
04048     return;
04049   }
04050 
04051   if (r_iter == local_reader_crypto_handles_.end()) {
04052     ACE_DEBUG((LM_WARNING,
04053       ACE_TEXT("(%P|%t) Sedp::handle_datawriter_crypto_tokens() - ")
04054       ACE_TEXT("received tokens for unknown local reader. Ignoring.\n")));
04055     return;
04056   }
04057 
04058   if (!key_exchange->set_remote_datawriter_crypto_tokens(r_iter->second, w_iter->second, dwcts, se)) {
04059     ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) ERROR: ")
04060       ACE_TEXT("(%P|%t) WARNING: Sedp::handle_datawriter_crypto_tokens() - ")
04061       ACE_TEXT("Unable to set remote datawriter crypto tokens with crypto key exchange plugin. ")
04062       ACE_TEXT("Security Exception[%d.%d]: %C\n"), se.code, se.minor_code, se.message.in()));
04063     return;
04064   }
04065 }
04066 
04067 void
04068 Sedp::handle_datareader_crypto_tokens(const DDS::Security::ParticipantVolatileMessageSecure& msg) {
04069   DDS::Security::SecurityException se = {"", 0, 0};
04070   Security::CryptoKeyExchange_var key_exchange = spdp_.get_security_config()->get_crypto_key_exchange();
04071 
04072   // If this message wasn't intended for us, ignore volatile message
04073   if (msg.destination_participant_guid != participant_id_ || !msg.message_data.length()) {
04074     return;
04075   }
04076 
04077   ACE_Guard<ACE_Thread_Mutex> g(lock_, false);
04078 
04079   DCPS::DatareaderCryptoHandleMap::const_iterator r_iter = remote_reader_crypto_handles_.find(msg.source_endpoint_guid);
04080   DCPS::DatawriterCryptoHandleMap::const_iterator w_iter = local_writer_crypto_handles_.find(msg.destination_endpoint_guid);
04081 
04082   DDS::Security::DatareaderCryptoTokenSeq drcts;
04083   drcts = reinterpret_cast<const DDS::Security::DatareaderCryptoTokenSeq&>(msg.message_data);
04084 
04085   if (r_iter == remote_reader_crypto_handles_.end()) {
04086     ACE_DEBUG((LM_DEBUG,
04087       ACE_TEXT("(%P|%t) Sedp::handle_datareader_crypto_tokens() - ")
04088       ACE_TEXT("received tokens for unknown remote reader. Caching.\n")));
04089 
04090     pending_remote_reader_crypto_tokens_[msg.source_endpoint_guid] = drcts;
04091     return;
04092   }
04093 
04094   if (w_iter == local_writer_crypto_handles_.end()) {
04095     ACE_DEBUG((LM_WARNING,
04096       ACE_TEXT("(%P|%t) Sedp::handle_datareader_crypto_tokens() - ")
04097       ACE_TEXT("received tokens for unknown local writer. Ignoring.\n")));
04098     return;
04099   }
04100 
04101   if (!key_exchange->set_remote_datareader_crypto_tokens(w_iter->second, r_iter->second, drcts, se)) {
04102     ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) ERROR: ")
04103       ACE_TEXT("(%P|%t) WARNING: Sedp::handle_datareader_crypto_tokens() - ")
04104       ACE_TEXT("Unable to set remote datareader crypto tokens with crypto key exchange plugin. ")
04105       ACE_TEXT("Security Exception[%d.%d]: %C\n"), se.code, se.minor_code, se.message.in()));
04106     return;
04107   }
04108 }
04109 
04110 DDS::DomainId_t Sedp::get_domain_id() const {
04111   return spdp_.get_domain_id();
04112 }
04113 #endif
04114 
04115 WaitForAcks::WaitForAcks()
04116 : cond_(lock_)
04117 , acks_(0)
04118 {
04119 }
04120 
04121 void
04122 WaitForAcks::ack()
04123 {
04124   {
04125     ACE_GUARD(ACE_Thread_Mutex, g, lock_);
04126     ++acks_;
04127   }
04128   cond_.signal();
04129 }
04130 
04131 void
04132 WaitForAcks::wait_for_acks(unsigned int num_acks)
04133 {
04134   ACE_GUARD(ACE_Thread_Mutex, g, lock_);
04135   while (num_acks > acks_) {
04136     cond_.wait();
04137   }
04138 }
04139 
04140 void
04141 WaitForAcks::reset()
04142 {
04143   ACE_GUARD(ACE_Thread_Mutex, g, lock_);
04144   acks_ = 0;
04145   // no need to signal, going back to zero won't ever
04146   // cause wait_for_acks() to exit it's loop
04147 }
04148 
04149 }
04150 }
04151 
04152 OPENDDS_END_VERSIONED_NAMESPACE_DECL
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1