00001
00002
00003
00004
00005
00006
00007
00008 #include "Sedp.h"
00009
00010 #include "MessageTypes.h"
00011 #include "ParameterListConverter.h"
00012 #include "RtpsDiscovery.h"
00013 #include "RtpsCoreTypeSupportImpl.h"
00014
00015 #if defined(OPENDDS_SECURITY)
00016 #include "SecurityHelpers.h"
00017 #endif
00018
00019 #include "Spdp.h"
00020
00021 #include "dds/DCPS/transport/framework/ReceivedDataSample.h"
00022 #include "dds/DCPS/transport/rtps_udp/RtpsUdpInst.h"
00023 #include "dds/DCPS/transport/rtps_udp/RtpsUdpInst_rch.h"
00024
00025 #include "dds/DCPS/Serializer.h"
00026 #include "dds/DCPS/Definitions.h"
00027 #include "dds/DCPS/GuidConverter.h"
00028 #include "dds/DCPS/GuidUtils.h"
00029 #include "dds/DdsDcpsGuidTypeSupportImpl.h"
00030 #include "dds/DCPS/AssociationData.h"
00031 #include "dds/DCPS/Service_Participant.h"
00032 #include "dds/DCPS/Qos_Helper.h"
00033 #include "dds/DCPS/DataSampleHeader.h"
00034 #include "dds/DCPS/SendStateDataSampleList.h"
00035 #include "dds/DCPS/DataReaderCallbacks.h"
00036 #include "dds/DCPS/DataWriterCallbacks.h"
00037 #include "dds/DCPS/Marked_Default_Qos.h"
00038 #include "dds/DCPS/BuiltInTopicUtils.h"
00039 #include "dds/DCPS/DCPS_Utils.h"
00040 #include "dds/DCPS/transport/framework/NetworkAddress.h"
00041
00042 #include <ace/Reverse_Lock_T.h>
00043 #include <ace/Auto_Ptr.h>
00044
00045 #include <cstring>
00046
00047 namespace {
00048 bool qosChanged(DDS::PublicationBuiltinTopicData& dest,
00049 const DDS::PublicationBuiltinTopicData& src)
00050 {
00051 #ifndef OPENDDS_SAFETY_PROFILE
00052 using OpenDDS::DCPS::operator!=;
00053 #endif
00054 bool changed = false;
00055
00056
00057
00058 if (dest.deadline != src.deadline) {
00059 changed = true;
00060 dest.deadline = src.deadline;
00061 }
00062
00063 if (dest.latency_budget != src.latency_budget) {
00064 changed = true;
00065 dest.latency_budget = src.latency_budget;
00066 }
00067
00068 if (dest.lifespan != src.lifespan) {
00069 changed = true;
00070 dest.lifespan = src.lifespan;
00071 }
00072
00073 if (dest.user_data != src.user_data) {
00074 changed = true;
00075 dest.user_data = src.user_data;
00076 }
00077
00078 if (dest.ownership_strength != src.ownership_strength) {
00079 changed = true;
00080 dest.ownership_strength = src.ownership_strength;
00081 }
00082
00083 if (dest.partition != src.partition) {
00084 changed = true;
00085 dest.partition = src.partition;
00086 }
00087
00088 if (dest.topic_data != src.topic_data) {
00089 changed = true;
00090 dest.topic_data = src.topic_data;
00091 }
00092
00093 if (dest.group_data != src.group_data) {
00094 changed = true;
00095 dest.group_data = src.group_data;
00096 }
00097
00098 return changed;
00099 }
00100
00101 bool qosChanged(DDS::SubscriptionBuiltinTopicData& dest,
00102 const DDS::SubscriptionBuiltinTopicData& src)
00103 {
00104 #ifndef OPENDDS_SAFETY_PROFILE
00105 using OpenDDS::DCPS::operator!=;
00106 #endif
00107 bool changed = false;
00108
00109
00110
00111 if (dest.deadline != src.deadline) {
00112 changed = true;
00113 dest.deadline = src.deadline;
00114 }
00115
00116 if (dest.latency_budget != src.latency_budget) {
00117 changed = true;
00118 dest.latency_budget = src.latency_budget;
00119 }
00120
00121 if (dest.user_data != src.user_data) {
00122 changed = true;
00123 dest.user_data = src.user_data;
00124 }
00125
00126 if (dest.time_based_filter != src.time_based_filter) {
00127 changed = true;
00128 dest.time_based_filter = src.time_based_filter;
00129 }
00130
00131 if (dest.partition != src.partition) {
00132 changed = true;
00133 dest.partition = src.partition;
00134 }
00135
00136 if (dest.topic_data != src.topic_data) {
00137 changed = true;
00138 dest.topic_data = src.topic_data;
00139 }
00140
00141 if (dest.group_data != src.group_data) {
00142 changed = true;
00143 dest.group_data = src.group_data;
00144 }
00145
00146 return changed;
00147 }
00148
00149 bool paramsChanged(OpenDDS::DCPS::ContentFilterProperty_t& dest,
00150 const OpenDDS::DCPS::ContentFilterProperty_t& src)
00151 {
00152 if (dest.expressionParameters.length() != src.expressionParameters.length()) {
00153 dest.expressionParameters = src.expressionParameters;
00154 return true;
00155 }
00156 for (CORBA::ULong i = 0; i < src.expressionParameters.length(); ++i) {
00157 if (0 != std::strcmp(dest.expressionParameters[i],
00158 src.expressionParameters[i])) {
00159 dest.expressionParameters = src.expressionParameters;
00160 return true;
00161 }
00162 }
00163 return false;
00164 }
00165
00166 }
00167
00168 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00169
00170 namespace OpenDDS {
00171 namespace RTPS {
00172 using DCPS::RepoId;
00173 using DCPS::make_rch;
00174
00175 const bool Sedp::host_is_bigendian_(!ACE_CDR_BYTE_ORDER);
00176
00177 Sedp::Sedp(const RepoId& participant_id, Spdp& owner, ACE_Thread_Mutex& lock) :
00178 DCPS::EndpointManager<Security::SPDPdiscoveredParticipantData>(participant_id, lock),
00179 spdp_(owner),
00180 publications_writer_(make_id(participant_id, ENTITYID_SEDP_BUILTIN_PUBLICATIONS_WRITER), *this),
00181
00182 #if defined(OPENDDS_SECURITY)
00183 publications_secure_writer_(make_id(participant_id, ENTITYID_SEDP_BUILTIN_PUBLICATIONS_SECURE_WRITER), *this),
00184 #endif
00185
00186 subscriptions_writer_(make_id(participant_id, ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_WRITER), *this),
00187
00188 #if defined(OPENDDS_SECURITY)
00189 subscriptions_secure_writer_(make_id(participant_id, ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_SECURE_WRITER), *this),
00190 #endif
00191
00192 participant_message_writer_(make_id(participant_id, ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_WRITER), *this),
00193
00194 #if defined(OPENDDS_SECURITY)
00195 participant_message_secure_writer_(make_id(participant_id, ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_SECURE_WRITER), *this),
00196 participant_stateless_message_writer_(make_id(participant_id, ENTITYID_P2P_BUILTIN_PARTICIPANT_STATELESS_WRITER), *this),
00197 participant_volatile_message_secure_writer_(make_id(participant_id, ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_WRITER), *this),
00198 dcps_participant_secure_writer_(make_id(participant_id, ENTITYID_SPDP_RELIABLE_BUILTIN_PARTICIPANT_SECURE_WRITER), *this),
00199 #endif
00200
00201 publications_reader_(make_rch<Reader>(
00202 make_id(participant_id, ENTITYID_SEDP_BUILTIN_PUBLICATIONS_READER),
00203 ref(*this))),
00204
00205 #if defined(OPENDDS_SECURITY)
00206 publications_secure_reader_(make_rch<Reader>(
00207 make_id(participant_id, ENTITYID_SEDP_BUILTIN_PUBLICATIONS_SECURE_READER),
00208 ref(*this))),
00209 #endif
00210
00211 subscriptions_reader_(make_rch<Reader>(
00212 make_id(participant_id, ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_READER),
00213 ref(*this))),
00214
00215 #if defined(OPENDDS_SECURITY)
00216 subscriptions_secure_reader_(make_rch<Reader>(
00217 make_id(participant_id, ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_SECURE_READER),
00218 ref(*this))),
00219 #endif
00220
00221 participant_message_reader_(make_rch<Reader>(
00222 make_id(participant_id, ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_READER),
00223 ref(*this))),
00224
00225 #if defined(OPENDDS_SECURITY)
00226 participant_message_secure_reader_(make_rch<Reader>(
00227 make_id(participant_id, ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_SECURE_READER),
00228 ref(*this))),
00229 participant_stateless_message_reader_(make_rch<Reader>(
00230 make_id(participant_id, ENTITYID_P2P_BUILTIN_PARTICIPANT_STATELESS_READER),
00231 ref(*this))),
00232 participant_volatile_message_secure_reader_(make_rch<Reader>(
00233 make_id(participant_id, ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_READER),
00234 ref(*this))),
00235 dcps_participant_secure_reader_(make_rch<Reader>(
00236 make_id(participant_id, ENTITYID_SPDP_RELIABLE_BUILTIN_PARTICIPANT_SECURE_READER),
00237 ref(*this))),
00238 #endif
00239
00240 task_(this),
00241
00242 #if defined(OPENDDS_SECURITY)
00243 secure_automatic_liveliness_seq_ (DCPS::SequenceNumber::SEQUENCENUMBER_UNKNOWN()),
00244 secure_manual_liveliness_seq_ (DCPS::SequenceNumber::SEQUENCENUMBER_UNKNOWN()),
00245 #endif
00246
00247 automatic_liveliness_seq_ (DCPS::SequenceNumber::SEQUENCENUMBER_UNKNOWN()),
00248 manual_liveliness_seq_ (DCPS::SequenceNumber::SEQUENCENUMBER_UNKNOWN())
00249 {
00250 pub_bit_key_.value[0] = pub_bit_key_.value[1] = pub_bit_key_.value[2] = 0;
00251 sub_bit_key_.value[0] = sub_bit_key_.value[1] = sub_bit_key_.value[2] = 0;
00252 }
00253
00254 RepoId
00255 Sedp::make_id(const RepoId& participant_id, const EntityId_t& entity)
00256 {
00257 RepoId id = participant_id;
00258 id.entityId = entity;
00259 return id;
00260 }
00261
00262 DDS::ReturnCode_t
00263 Sedp::init(const RepoId& guid,
00264 const RtpsDiscovery& disco,
00265 DDS::DomainId_t domainId)
00266 {
00267 char domainStr[16];
00268 ACE_OS::snprintf(domainStr, 16, "%d", domainId);
00269
00270 OPENDDS_STRING key = DCPS::GuidConverter(guid).uniqueId();
00271
00272
00273 transport_inst_ = TheTransportRegistry->create_inst(
00274 DCPS::TransportRegistry::DEFAULT_INST_PREFIX +
00275 OPENDDS_STRING("_SEDPTransportInst_") + key.c_str() + domainStr,
00276 "rtps_udp");
00277
00278 DCPS::RtpsUdpInst_rch rtps_inst =
00279 DCPS::static_rchandle_cast<DCPS::RtpsUdpInst>(transport_inst_);
00280
00281
00282
00283
00284 static const double HANDSHAKE_MULTIPLIER = 5;
00285 rtps_inst->handshake_timeout_ = disco.resend_period() * HANDSHAKE_MULTIPLIER;
00286
00287 if (disco.sedp_multicast()) {
00288
00289 const u_short mc_port = disco.pb() + disco.dg() * domainId + disco.dx();
00290
00291 OPENDDS_STRING mc_addr = disco.default_multicast_group();
00292 if (rtps_inst->multicast_group_address_.set(mc_port, mc_addr.c_str())) {
00293 ACE_ERROR((LM_ERROR,
00294 ACE_TEXT("(%P|%t) ERROR: Sedp::init - ")
00295 ACE_TEXT("failed setting multicast local_addr to port %hd\n"),
00296 mc_port));
00297 return DDS::RETCODE_ERROR;
00298 }
00299
00300 rtps_inst->ttl_ = disco.ttl();
00301 rtps_inst->multicast_interface_ = disco.multicast_interface();
00302
00303 } else {
00304 rtps_inst->use_multicast_ = false;
00305 }
00306
00307 const OPENDDS_STRING sedp_addr = disco.sedp_local_address();
00308 if (!sedp_addr.empty()) {
00309 rtps_inst->local_address_config_str_ = sedp_addr;
00310 rtps_inst->local_address_.set(sedp_addr.c_str());
00311 }
00312
00313
00314 OPENDDS_STRING config_name = DCPS::TransportRegistry::DEFAULT_INST_PREFIX +
00315 OPENDDS_STRING("_SEDP_TransportCfg_") + key +
00316 domainStr;
00317 DCPS::TransportConfig_rch transport_cfg =
00318 TheTransportRegistry->create_config(config_name.c_str());
00319 transport_cfg->instances_.push_back(transport_inst_);
00320
00321
00322 rtps_inst->opendds_discovery_default_listener_ = publications_reader_;
00323 rtps_inst->opendds_discovery_guid_ = guid;
00324 const bool reliability = true, durability = true;
00325
00326 #if defined(OPENDDS_SECURITY)
00327 const bool besteffort = false, nondurable = false;
00328 #endif
00329
00330 publications_writer_.enable_transport_using_config(reliability, durability, transport_cfg);
00331 publications_reader_->enable_transport_using_config(reliability, durability, transport_cfg);
00332
00333 #if defined(OPENDDS_SECURITY)
00334 publications_secure_writer_.set_crypto_handles(spdp_.crypto_handle());
00335 publications_secure_reader_->set_crypto_handles(spdp_.crypto_handle());
00336 publications_secure_writer_.enable_transport_using_config(reliability, durability, transport_cfg);
00337 publications_secure_reader_->enable_transport_using_config(reliability, durability, transport_cfg);
00338 #endif
00339
00340 subscriptions_writer_.enable_transport_using_config(reliability, durability, transport_cfg);
00341 subscriptions_reader_->enable_transport_using_config(reliability, durability, transport_cfg);
00342
00343 #if defined(OPENDDS_SECURITY)
00344 subscriptions_secure_writer_.set_crypto_handles(spdp_.crypto_handle());
00345 subscriptions_secure_reader_->set_crypto_handles(spdp_.crypto_handle());
00346 subscriptions_secure_writer_.enable_transport_using_config(reliability, durability, transport_cfg);
00347 subscriptions_secure_reader_->enable_transport_using_config(reliability, durability, transport_cfg);
00348 #endif
00349
00350 participant_message_writer_.enable_transport_using_config(reliability, durability, transport_cfg);
00351 participant_message_reader_->enable_transport_using_config(reliability, durability, transport_cfg);
00352
00353 #if defined(OPENDDS_SECURITY)
00354 participant_message_secure_writer_.set_crypto_handles(spdp_.crypto_handle());
00355 participant_message_secure_reader_->set_crypto_handles(spdp_.crypto_handle());
00356 participant_message_secure_writer_.enable_transport_using_config(reliability, durability, transport_cfg);
00357 participant_message_secure_reader_->enable_transport_using_config(reliability, durability, transport_cfg);
00358
00359 participant_stateless_message_writer_.enable_transport_using_config(besteffort, nondurable, transport_cfg);
00360 participant_stateless_message_reader_->enable_transport_using_config(besteffort, nondurable, transport_cfg);
00361
00362 participant_volatile_message_secure_writer_.set_crypto_handles(spdp_.crypto_handle());
00363 participant_volatile_message_secure_reader_->set_crypto_handles(spdp_.crypto_handle());
00364 participant_volatile_message_secure_writer_.enable_transport_using_config(reliability, nondurable, transport_cfg);
00365 participant_volatile_message_secure_reader_->enable_transport_using_config(reliability, nondurable, transport_cfg);
00366
00367 dcps_participant_secure_writer_.set_crypto_handles(spdp_.crypto_handle());
00368 dcps_participant_secure_reader_->set_crypto_handles(spdp_.crypto_handle());
00369 dcps_participant_secure_writer_.enable_transport_using_config(reliability, durability, transport_cfg);
00370 dcps_participant_secure_reader_->enable_transport_using_config(reliability, durability, transport_cfg);
00371 #endif
00372
00373 return DDS::RETCODE_OK;
00374 }
00375
00376 #if defined(OPENDDS_SECURITY)
00377 DDS::ReturnCode_t Sedp::init_security(DDS::Security::IdentityHandle ,
00378 DDS::Security::PermissionsHandle perm_handle,
00379 DDS::Security::ParticipantCryptoHandle crypto_handle)
00380 {
00381 using namespace OpenDDS::Security;
00382 using namespace DDS::Security;
00383
00384 DDS::ReturnCode_t result = DDS::RETCODE_OK;
00385
00386 CryptoKeyFactory_var key_factory = spdp_.get_security_config()->get_crypto_key_factory();
00387 CryptoKeyExchange_var key_exchange = spdp_.get_security_config()->get_crypto_key_exchange();
00388 AccessControl_var acl = spdp_.get_security_config()->get_access_control();
00389 Authentication_var auth = spdp_.get_security_config()->get_authentication();
00390
00391 set_permissions_handle(perm_handle);
00392 set_access_control(acl);
00393 set_crypto_key_factory(key_factory);
00394 set_crypto_key_exchange(key_exchange);
00395 crypto_handle_ = crypto_handle;
00396
00397
00398 SecurityException ex = {"", 0, 0};
00399
00400 bool ok = acl->get_participant_sec_attributes(perm_handle, participant_sec_attr_, ex);
00401 if (ok) {
00402
00403 EndpointSecurityAttributes default_sec_attr;
00404 default_sec_attr.base.is_read_protected = false;
00405 default_sec_attr.base.is_write_protected = false;
00406 default_sec_attr.base.is_discovery_protected = false;
00407 default_sec_attr.base.is_liveliness_protected = false;
00408 default_sec_attr.is_submessage_protected = false;
00409 default_sec_attr.is_payload_protected = false;
00410 default_sec_attr.is_key_protected = false;
00411 default_sec_attr.plugin_endpoint_attributes = 0;
00412
00413 NativeCryptoHandle h = DDS::HANDLE_NIL;
00414
00415 const DDS::PartitionQosPolicy& default_part_qos = TheServiceParticipant->initial_PartitionQosPolicy();
00416 const DDS::Security::DataTagQosPolicy default_data_tag_qos;
00417
00418
00419 {
00420 PropertySeq writer_props(1), reader_props(1);
00421 writer_props.length(1);
00422 writer_props[0].name = "dds.sec.builtin_endpoint_name";
00423 writer_props[0].value = "BuiltinParticipantVolatileMessageSecureWriter";
00424
00425 reader_props.length(1);
00426 reader_props[0].name = "dds.sec.builtin_endpoint_name";
00427 reader_props[0].value = "BuiltinParticipantVolatileMessageSecureReader";
00428
00429 EndpointSecurityAttributes dw_sec_attr(default_sec_attr);
00430
00431 ok = acl->get_datawriter_sec_attributes(perm_handle, "DCPSParticipantVolatileMessageSecure",
00432 default_part_qos, default_data_tag_qos, dw_sec_attr, ex);
00433 if (!ok) {
00434 ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: Sedp::init_security() - ")
00435 ACE_TEXT("Failure calling get_datawriter_sec_attributes for topic 'DCPSParticipantVolatileMessageSecure'. ")
00436 ACE_TEXT("Security Exception[%d.%d]: %C\n"), ex.code, ex.minor_code, ex.message.in()));
00437 result = DDS::RETCODE_ERROR;
00438 }
00439
00440 h = key_factory->register_local_datawriter(crypto_handle, writer_props, dw_sec_attr, ex);
00441 participant_volatile_message_secure_writer_.set_crypto_handles(crypto_handle, h);
00442 local_writer_crypto_handles_[participant_volatile_message_secure_writer_.get_repo_id()] = h;
00443
00444 EndpointSecurityAttributes dr_sec_attr(default_sec_attr);
00445 ok = acl->get_datareader_sec_attributes(perm_handle, "DCPSParticipantVolatileMessageSecure",
00446 default_part_qos, default_data_tag_qos, dr_sec_attr, ex);
00447 if (!ok) {
00448 ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: Sedp::init_security() - ")
00449 ACE_TEXT("Failure calling get_datareader_sec_attributes for topic 'DCPSParticipantVolatileMessageSecure'.")
00450 ACE_TEXT(" Security Exception[%d.%d]: %C\n"), ex.code, ex.minor_code, ex.message.in()));
00451 result = DDS::RETCODE_ERROR;
00452 }
00453
00454 h = key_factory->register_local_datareader(crypto_handle, reader_props, dr_sec_attr, ex);
00455 participant_volatile_message_secure_reader_->set_crypto_handles(crypto_handle, h);
00456 local_reader_crypto_handles_[participant_volatile_message_secure_reader_->get_repo_id()] = h;
00457 }
00458
00459
00460 {
00461 PropertySeq reader_props, writer_props;
00462
00463 EndpointSecurityAttributes dw_sec_attr(default_sec_attr);
00464 ok = acl->get_datawriter_sec_attributes(perm_handle, "DCPSParticipantMessageSecure",
00465 default_part_qos, default_data_tag_qos, dw_sec_attr, ex);
00466 if (!ok) {
00467 ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: Sedp::init_security() - ")
00468 ACE_TEXT("Failure calling get_datawriter_sec_attributes for topic 'DCPSParticipantMessageSecure'. ")
00469 ACE_TEXT("Security Exception[%d.%d]: %C\n"), ex.code, ex.minor_code, ex.message.in()));
00470 result = DDS::RETCODE_ERROR;
00471 }
00472
00473 h = key_factory->register_local_datawriter(crypto_handle, writer_props, dw_sec_attr, ex);
00474 participant_message_secure_writer_.set_crypto_handles(crypto_handle, h);
00475 local_writer_crypto_handles_[participant_message_secure_writer_.get_repo_id()] = h;
00476
00477 EndpointSecurityAttributes dr_sec_attr(default_sec_attr);
00478 ok = acl->get_datareader_sec_attributes(perm_handle, "DCPSParticipantMessageSecure",
00479 default_part_qos, default_data_tag_qos, dr_sec_attr, ex);
00480 if (!ok) {
00481 ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: Sedp::init_security() - ")
00482 ACE_TEXT("Failure calling get_datareader_sec_attributes for topic 'DCPSParticipantMessageSecure'. ")
00483 ACE_TEXT("Security Exception[%d.%d]: %C\n"), ex.code, ex.minor_code, ex.message.in()));
00484 result = DDS::RETCODE_ERROR;
00485 }
00486
00487 h = key_factory->register_local_datareader(crypto_handle, reader_props, dr_sec_attr, ex);
00488 participant_message_secure_reader_->set_crypto_handles(crypto_handle, h);
00489 local_reader_crypto_handles_[participant_message_secure_reader_->get_repo_id()] = h;
00490 }
00491
00492
00493 {
00494 PropertySeq reader_props, writer_props;
00495
00496 EndpointSecurityAttributes dw_sec_attr(default_sec_attr);
00497 ok = acl->get_datawriter_sec_attributes(perm_handle, "DCPSPublicationsSecure",
00498 default_part_qos, default_data_tag_qos, dw_sec_attr, ex);
00499 if (!ok) {
00500 ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: Sedp::init_security() - ")
00501 ACE_TEXT("Failure calling get_datawriter_sec_attributes for topic 'DCPSPublicationsSecure'. ")
00502 ACE_TEXT("Security Exception[%d.%d]: %C\n"), ex.code, ex.minor_code, ex.message.in()));
00503 result = DDS::RETCODE_ERROR;
00504 }
00505
00506 h = key_factory->register_local_datawriter(crypto_handle, writer_props, dw_sec_attr, ex);
00507 publications_secure_writer_.set_crypto_handles(crypto_handle, h);
00508 local_writer_crypto_handles_[publications_secure_writer_.get_repo_id()] = h;
00509
00510 EndpointSecurityAttributes dr_sec_attr(default_sec_attr);
00511 ok = acl->get_datareader_sec_attributes(perm_handle, "DCPSPublicationsSecure",
00512 default_part_qos, default_data_tag_qos, dr_sec_attr, ex);
00513 if (!ok) {
00514 ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: Sedp::init_security() - ")
00515 ACE_TEXT("Failure calling get_datareader_sec_attributes for topic 'DCPSPublicationsSecure'. ")
00516 ACE_TEXT("Security Exception[%d.%d]: %C\n"), ex.code, ex.minor_code, ex.message.in()));
00517 result = DDS::RETCODE_ERROR;
00518 }
00519
00520 h = key_factory->register_local_datareader(crypto_handle, reader_props, dr_sec_attr, ex);
00521 publications_secure_reader_->set_crypto_handles(crypto_handle, h);
00522 local_reader_crypto_handles_[publications_secure_reader_->get_repo_id()] = h;
00523 }
00524
00525
00526 {
00527 PropertySeq reader_props, writer_props;
00528
00529 EndpointSecurityAttributes dw_sec_attr(default_sec_attr);
00530 ok = acl->get_datawriter_sec_attributes(perm_handle, "DCPSSubscriptionsSecure",
00531 default_part_qos, default_data_tag_qos, dw_sec_attr, ex);
00532 if (!ok) {
00533 ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: Sedp::init_security() - ")
00534 ACE_TEXT("Failure calling get_datawriter_sec_attributes for topic 'DCPSSubscriptionsSecure'. ")
00535 ACE_TEXT("Security Exception[%d.%d]: %C\n"), ex.code, ex.minor_code, ex.message.in()));
00536 result = DDS::RETCODE_ERROR;
00537 }
00538
00539 h = key_factory->register_local_datawriter(crypto_handle, writer_props, dw_sec_attr, ex);
00540 subscriptions_secure_writer_.set_crypto_handles(crypto_handle, h);
00541 local_writer_crypto_handles_[subscriptions_secure_writer_.get_repo_id()] = h;
00542
00543 EndpointSecurityAttributes dr_sec_attr(default_sec_attr);
00544 ok = acl->get_datareader_sec_attributes(perm_handle, "DCPSSubscriptionsSecure",
00545 default_part_qos, default_data_tag_qos, dr_sec_attr, ex);
00546 if (!ok) {
00547 ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: Sedp::init_security() - ")
00548 ACE_TEXT("Failure calling get_datareader_sec_attributes for topic 'DCPSSubscriptionsSecure'. ")
00549 ACE_TEXT("Security Exception[%d.%d]: %C\n"), ex.code, ex.minor_code, ex.message.in()));
00550 result = DDS::RETCODE_ERROR;
00551 }
00552
00553 h = key_factory->register_local_datareader(crypto_handle, reader_props, dr_sec_attr, ex);
00554 subscriptions_secure_reader_->set_crypto_handles(crypto_handle, h);
00555 local_reader_crypto_handles_[subscriptions_secure_reader_->get_repo_id()] = h;
00556 }
00557
00558
00559 {
00560 PropertySeq reader_props, writer_props;
00561
00562 EndpointSecurityAttributes dw_sec_attr(default_sec_attr);
00563 ok = acl->get_datawriter_sec_attributes(perm_handle, "DCPSParticipantSecure",
00564 default_part_qos, default_data_tag_qos, dw_sec_attr, ex);
00565 if (!ok) {
00566 ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: Sedp::init_security() - ")
00567 ACE_TEXT("Failure calling get_datawriter_sec_attributes for topic 'DCPSParticipantSecure'. ")
00568 ACE_TEXT("Security Exception[%d.%d]: %C\n"), ex.code, ex.minor_code, ex.message.in()));
00569 result = DDS::RETCODE_ERROR;
00570 }
00571
00572 h = key_factory->register_local_datawriter(crypto_handle, writer_props, dw_sec_attr, ex);
00573 dcps_participant_secure_writer_.set_crypto_handles(crypto_handle, h);
00574 local_writer_crypto_handles_[dcps_participant_secure_writer_.get_repo_id()] = h;
00575
00576 EndpointSecurityAttributes dr_sec_attr(default_sec_attr);
00577 ok = acl->get_datareader_sec_attributes(perm_handle, "DCPSParticipantSecure",
00578 default_part_qos, default_data_tag_qos, dr_sec_attr, ex);
00579 if (!ok) {
00580 ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: Sedp::init_security() - ")
00581 ACE_TEXT("Failure calling get_datareader_sec_attributes for topic 'DCPSParticipantSecure'. ")
00582 ACE_TEXT("Security Exception[%d.%d]: %C\n"), ex.code, ex.minor_code, ex.message.in()));
00583 result = DDS::RETCODE_ERROR;
00584 }
00585
00586 h = key_factory->register_local_datareader(crypto_handle, reader_props, dr_sec_attr, ex);
00587 dcps_participant_secure_reader_->set_crypto_handles(crypto_handle, h);
00588 local_reader_crypto_handles_[dcps_participant_secure_reader_->get_repo_id()] = h;
00589 }
00590
00591 } else {
00592 ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: Sedp::init_security() - ")
00593 ACE_TEXT("Failure calling get_participant_sec_attributes. ")
00594 ACE_TEXT("Security Exception[%d.%d]: %C\n"), ex.code, ex.minor_code, ex.message.in()));
00595 result = DDS::RETCODE_ERROR;
00596 }
00597 return result;
00598 }
00599 #endif
00600
00601 void
00602 Sedp::unicast_locators(DCPS::LocatorSeq& locators) const
00603 {
00604 DCPS::RtpsUdpInst_rch rtps_inst =
00605 DCPS::static_rchandle_cast<DCPS::RtpsUdpInst>(transport_inst_);
00606 using namespace OpenDDS::RTPS;
00607
00608 CORBA::ULong idx = 0;
00609
00610
00611 if (rtps_inst->use_multicast_ && rtps_inst->multicast_group_address_ != ACE_INET_Addr()) {
00612 idx = locators.length();
00613 locators.length(idx + 1);
00614 locators[idx].kind = address_to_kind(rtps_inst->multicast_group_address_);
00615 locators[idx].port = rtps_inst->multicast_group_address_.get_port_number();
00616 RTPS::address_to_bytes(locators[idx].address,
00617 rtps_inst->multicast_group_address_);
00618 }
00619
00620
00621
00622 if (rtps_inst->local_address_config_str_.empty() ||
00623 rtps_inst->local_address_config_str_.rfind(':') == 0) {
00624 typedef OPENDDS_VECTOR(ACE_INET_Addr) AddrVector;
00625 AddrVector addrs;
00626 if (TheServiceParticipant->default_address ().empty ()) {
00627 DCPS::get_interface_addrs(addrs);
00628 } else {
00629 addrs.push_back(ACE_INET_Addr(static_cast<u_short>(0), TheServiceParticipant->default_address().c_str()));
00630 }
00631 for (AddrVector::iterator adr_it = addrs.begin(); adr_it != addrs.end(); ++adr_it) {
00632 idx = locators.length();
00633 locators.length(idx + 1);
00634 locators[idx].kind = address_to_kind(*adr_it);
00635 locators[idx].port = rtps_inst->local_address_.get_port_number();
00636 RTPS::address_to_bytes(locators[idx].address,
00637 *adr_it);
00638 }
00639 } else {
00640 idx = locators.length();
00641 locators.length(idx + 1);
00642 locators[idx].kind = address_to_kind(rtps_inst->local_address_);
00643 locators[idx].port = rtps_inst->local_address_.get_port_number();
00644 RTPS::address_to_bytes(locators[idx].address,
00645 rtps_inst->local_address_);
00646 }
00647 }
00648
00649 const ACE_INET_Addr&
00650 Sedp::local_address() const
00651 {
00652 DCPS::RtpsUdpInst_rch rtps_inst =
00653 DCPS::static_rchandle_cast<DCPS::RtpsUdpInst>(transport_inst_);
00654 return rtps_inst->local_address_;
00655 }
00656
00657 const ACE_INET_Addr&
00658 Sedp::multicast_group() const
00659 {
00660 DCPS::RtpsUdpInst_rch rtps_inst =
00661 DCPS::static_rchandle_cast<DCPS::RtpsUdpInst>(transport_inst_);
00662 return rtps_inst->multicast_group_address_;
00663 }
00664 bool
00665 Sedp::map_ipv4_to_ipv6() const
00666 {
00667 bool map = false;
00668 if (local_address().get_type() != AF_INET) {
00669 map = true;
00670 }
00671 return map;
00672 }
00673
00674 void
00675 Sedp::assign_bit_key(DiscoveredPublication& pub)
00676 {
00677 increment_key(pub_bit_key_);
00678 pub_key_to_id_[pub_bit_key_] = pub.writer_data_.writerProxy.remoteWriterGuid;
00679 pub.writer_data_.ddsPublicationData.key = pub_bit_key_;
00680 }
00681
00682 void
00683 Sedp::assign_bit_key(DiscoveredSubscription& sub)
00684 {
00685 increment_key(sub_bit_key_);
00686 sub_key_to_id_[sub_bit_key_] = sub.reader_data_.readerProxy.remoteReaderGuid;
00687 sub.reader_data_.ddsSubscriptionData.key = sub_bit_key_;
00688 }
00689
00690 void
00691 create_association_data_proto(DCPS::AssociationData& proto,
00692 const Security::SPDPdiscoveredParticipantData& pdata) {
00693 proto.publication_transport_priority_ = 0;
00694 proto.remote_reliable_ = true;
00695 proto.remote_durable_ = true;
00696 std::memcpy(proto.remote_id_.guidPrefix, pdata.participantProxy.guidPrefix,
00697 sizeof(GuidPrefix_t));
00698
00699 const DCPS::LocatorSeq& mll =
00700 pdata.participantProxy.metatrafficMulticastLocatorList;
00701 const DCPS::LocatorSeq& ull =
00702 pdata.participantProxy.metatrafficUnicastLocatorList;
00703 const CORBA::ULong locator_count = mll.length() + ull.length();
00704
00705 ACE_Message_Block mb_locator(4 + locator_count * sizeof(DCPS::Locator_t) + 1);
00706 using DCPS::Serializer;
00707 Serializer ser_loc(&mb_locator, ACE_CDR_BYTE_ORDER, Serializer::ALIGN_CDR);
00708 ser_loc << locator_count;
00709
00710 for (CORBA::ULong i = 0; i < mll.length(); ++i) {
00711 ser_loc << mll[i];
00712 }
00713 for (CORBA::ULong i = 0; i < ull.length(); ++i) {
00714 ser_loc << ull[i];
00715 }
00716 ser_loc << ACE_OutputCDR::from_boolean(false);
00717
00718 proto.remote_data_.length(1);
00719 proto.remote_data_[0].transport_type = "rtps_udp";
00720 message_block_to_sequence (mb_locator, proto.remote_data_[0].data);
00721 }
00722
00723
00724 #if defined(OPENDDS_SECURITY)
00725 void
00726 Sedp::associate_preauth(const Security::SPDPdiscoveredParticipantData& pdata)
00727 {
00728
00729
00730 DCPS::AssociationData proto;
00731 create_association_data_proto(proto, pdata);
00732
00733 const BuiltinEndpointSet_t& avail =
00734 pdata.participantProxy.availableBuiltinEndpoints;
00735
00736
00737
00738
00739
00740 if (avail & DDS::Security::BUILTIN_PARTICIPANT_STATELESS_MESSAGE_WRITER) {
00741 DCPS::AssociationData peer = proto;
00742 peer.remote_id_.entityId = ENTITYID_P2P_BUILTIN_PARTICIPANT_STATELESS_WRITER;
00743 participant_stateless_message_reader_->assoc(peer);
00744 }
00745
00746 if (avail & DDS::Security::BUILTIN_PARTICIPANT_STATELESS_MESSAGE_READER) {
00747 DCPS::AssociationData peer = proto;
00748 peer.remote_id_.entityId = ENTITYID_P2P_BUILTIN_PARTICIPANT_STATELESS_READER;
00749 participant_stateless_message_writer_.assoc(peer);
00750 }
00751 }
00752 #endif
00753
00754 void
00755 Sedp::associate(const Security::SPDPdiscoveredParticipantData& pdata)
00756 {
00757
00758
00759 DCPS::AssociationData proto;
00760 create_association_data_proto(proto, pdata);
00761
00762 const BuiltinEndpointSet_t& avail =
00763 pdata.participantProxy.availableBuiltinEndpoints;
00764
00765
00766 if (avail & DISC_BUILTIN_ENDPOINT_PUBLICATION_ANNOUNCER) {
00767 DCPS::AssociationData peer = proto;
00768 peer.remote_id_.entityId = ENTITYID_SEDP_BUILTIN_PUBLICATIONS_WRITER;
00769 publications_reader_->assoc(peer);
00770 }
00771 if (avail & DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_ANNOUNCER) {
00772 DCPS::AssociationData peer = proto;
00773 peer.remote_id_.entityId = ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_WRITER;
00774 subscriptions_reader_->assoc(peer);
00775 }
00776 if (avail & BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_DATA_WRITER) {
00777 DCPS::AssociationData peer = proto;
00778 peer.remote_id_.entityId = ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_WRITER;
00779 participant_message_reader_->assoc(peer);
00780 }
00781
00782 DCPS::unique_ptr<Security::SPDPdiscoveredParticipantData> dpd(
00783 new Security::SPDPdiscoveredParticipantData(pdata));
00784
00785 task_.enqueue(DCPS::SAMPLE_DATA, move(dpd));
00786 }
00787
00788 #if defined(OPENDDS_SECURITY)
00789 void Sedp::associate_volatile(const Security::SPDPdiscoveredParticipantData& pdata)
00790 {
00791 using namespace DDS::Security;
00792
00793 DCPS::AssociationData proto;
00794 create_association_data_proto(proto, pdata);
00795
00796 DCPS::RepoId part = proto.remote_id_;
00797 part.entityId = ENTITYID_PARTICIPANT;
00798
00799 const BuiltinEndpointSet_t& avail = pdata.participantProxy.availableBuiltinEndpoints;
00800
00801 if (avail & BUILTIN_PARTICIPANT_VOLATILE_MESSAGE_SECURE_WRITER) {
00802 DCPS::AssociationData peer = proto;
00803 peer.remote_id_.entityId = ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_WRITER;
00804 remote_writer_crypto_handles_[peer.remote_id_] = generate_remote_matched_writer_crypto_handle(
00805 part, participant_volatile_message_secure_reader_->get_endpoint_crypto_handle());
00806 peer.remote_data_ = add_security_info(
00807 peer.remote_data_, peer.remote_id_, participant_volatile_message_secure_reader_->get_repo_id());
00808 participant_volatile_message_secure_reader_->assoc(peer);
00809 }
00810 if (avail & BUILTIN_PARTICIPANT_VOLATILE_MESSAGE_SECURE_READER) {
00811 DCPS::AssociationData peer = proto;
00812 peer.remote_id_.entityId = ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_READER;
00813 remote_reader_crypto_handles_[peer.remote_id_] = generate_remote_matched_reader_crypto_handle(
00814 part, participant_volatile_message_secure_writer_.get_endpoint_crypto_handle(), false);
00815 peer.remote_data_ = add_security_info(
00816 peer.remote_data_, participant_volatile_message_secure_writer_.get_repo_id(), peer.remote_id_);
00817 participant_volatile_message_secure_writer_.assoc(peer);
00818 }
00819 }
00820
00821 void Sedp::associate_secure_writers_to_readers(const Security::SPDPdiscoveredParticipantData& pdata)
00822 {
00823 using namespace DDS::Security;
00824
00825 DCPS::AssociationData proto;
00826 create_association_data_proto(proto, pdata);
00827
00828 DCPS::RepoId part = proto.remote_id_;
00829 part.entityId = ENTITYID_PARTICIPANT;
00830
00831 const BuiltinEndpointSet_t& avail = pdata.participantProxy.availableBuiltinEndpoints;
00832
00833 if (avail & BUILTIN_PARTICIPANT_MESSAGE_SECURE_WRITER) {
00834 DCPS::AssociationData peer = proto;
00835 peer.remote_id_.entityId = ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_SECURE_WRITER;
00836 remote_writer_crypto_handles_[peer.remote_id_] = generate_remote_matched_writer_crypto_handle(
00837 part, participant_message_secure_reader_->get_endpoint_crypto_handle());
00838 peer.remote_data_ = add_security_info(
00839 peer.remote_data_, peer.remote_id_, participant_message_secure_reader_->get_repo_id());
00840 participant_message_secure_reader_->assoc(peer);
00841 }
00842 if (avail & SPDP_BUILTIN_PARTICIPANT_SECURE_WRITER) {
00843 DCPS::AssociationData peer = proto;
00844 peer.remote_id_.entityId = ENTITYID_SPDP_RELIABLE_BUILTIN_PARTICIPANT_SECURE_WRITER;
00845 remote_writer_crypto_handles_[peer.remote_id_] = generate_remote_matched_writer_crypto_handle(
00846 part, dcps_participant_secure_reader_->get_endpoint_crypto_handle());
00847 peer.remote_data_ = add_security_info(
00848 peer.remote_data_, peer.remote_id_, dcps_participant_secure_reader_->get_repo_id());
00849 dcps_participant_secure_reader_->assoc(peer);
00850 }
00851 if (avail & SEDP_BUILTIN_PUBLICATIONS_SECURE_WRITER) {
00852 DCPS::AssociationData peer = proto;
00853 peer.remote_id_.entityId = ENTITYID_SEDP_BUILTIN_PUBLICATIONS_SECURE_WRITER;
00854 remote_writer_crypto_handles_[peer.remote_id_] = generate_remote_matched_writer_crypto_handle(
00855 part, publications_secure_reader_->get_endpoint_crypto_handle());
00856 peer.remote_data_ = add_security_info(
00857 peer.remote_data_, peer.remote_id_, publications_secure_reader_->get_repo_id());
00858 publications_secure_reader_->assoc(peer);
00859 }
00860 if (avail & SEDP_BUILTIN_SUBSCRIPTIONS_SECURE_WRITER) {
00861 DCPS::AssociationData peer = proto;
00862 peer.remote_id_.entityId = ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_SECURE_WRITER;
00863 remote_writer_crypto_handles_[peer.remote_id_] = generate_remote_matched_writer_crypto_handle(
00864 part, subscriptions_secure_reader_->get_endpoint_crypto_handle());
00865 peer.remote_data_ = add_security_info(
00866 peer.remote_data_, peer.remote_id_, subscriptions_secure_reader_->get_repo_id());
00867 subscriptions_secure_reader_->assoc(peer);
00868 }
00869 }
00870
00871 void Sedp::associate_secure_readers_to_writers(const Security::SPDPdiscoveredParticipantData& pdata)
00872 {
00873 using namespace DDS::Security;
00874
00875 DCPS::AssociationData proto;
00876 create_association_data_proto(proto, pdata);
00877
00878 DCPS::RepoId part = proto.remote_id_;
00879 part.entityId = ENTITYID_PARTICIPANT;
00880
00881 const BuiltinEndpointSet_t& avail = pdata.participantProxy.availableBuiltinEndpoints;
00882
00883 if (avail & BUILTIN_PARTICIPANT_MESSAGE_SECURE_READER) {
00884 DCPS::AssociationData peer = proto;
00885 peer.remote_id_.entityId = ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_SECURE_READER;
00886 remote_reader_crypto_handles_[peer.remote_id_] = generate_remote_matched_reader_crypto_handle(
00887 part, participant_message_secure_writer_.get_endpoint_crypto_handle(), false);
00888 peer.remote_data_ = add_security_info(
00889 peer.remote_data_, participant_message_secure_writer_.get_repo_id(), peer.remote_id_);
00890 participant_message_secure_writer_.assoc(peer);
00891 }
00892 if (avail & SPDP_BUILTIN_PARTICIPANT_SECURE_READER) {
00893 DCPS::AssociationData peer = proto;
00894 peer.remote_id_.entityId = ENTITYID_SPDP_RELIABLE_BUILTIN_PARTICIPANT_SECURE_READER;
00895 remote_reader_crypto_handles_[peer.remote_id_] = generate_remote_matched_reader_crypto_handle(
00896 part, dcps_participant_secure_writer_.get_endpoint_crypto_handle(), false);
00897 peer.remote_data_ = add_security_info(
00898 peer.remote_data_, dcps_participant_secure_writer_.get_repo_id(), peer.remote_id_);
00899 dcps_participant_secure_writer_.assoc(peer);
00900 }
00901 if (avail & SEDP_BUILTIN_PUBLICATIONS_SECURE_READER) {
00902 DCPS::AssociationData peer = proto;
00903 peer.remote_id_.entityId = ENTITYID_SEDP_BUILTIN_PUBLICATIONS_SECURE_READER;
00904 remote_reader_crypto_handles_[peer.remote_id_] = generate_remote_matched_reader_crypto_handle(
00905 part, publications_secure_writer_.get_endpoint_crypto_handle(), false);
00906 peer.remote_data_ = add_security_info(
00907 peer.remote_data_, publications_secure_writer_.get_repo_id(), peer.remote_id_);
00908 publications_secure_writer_.assoc(peer);
00909 }
00910 if (avail & SEDP_BUILTIN_SUBSCRIPTIONS_SECURE_READER) {
00911 DCPS::AssociationData peer = proto;
00912 peer.remote_id_.entityId = ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_SECURE_READER;
00913 remote_reader_crypto_handles_[peer.remote_id_] = generate_remote_matched_reader_crypto_handle(
00914 part, subscriptions_secure_writer_.get_endpoint_crypto_handle(), false);
00915 peer.remote_data_ = add_security_info(
00916 peer.remote_data_, subscriptions_secure_writer_.get_repo_id(), peer.remote_id_);
00917 subscriptions_secure_writer_.assoc(peer);
00918 }
00919 }
00920
00921 void
00922 Sedp::send_builtin_crypto_tokens(const DCPS::RepoId& dstParticipant, const DCPS::EntityId_t& dstEntity, const DCPS::RepoId& src)
00923 {
00924 DCPS::RepoId dst = dstParticipant;
00925 dst.entityId = dstEntity;
00926 if (DCPS::GuidConverter(src).entityKind() == DCPS::KIND_READER) {
00927 create_and_send_datareader_crypto_tokens(local_reader_crypto_handles_[src],
00928 src, remote_writer_crypto_handles_[dst],
00929 dst);
00930 } else {
00931 create_and_send_datawriter_crypto_tokens(local_writer_crypto_handles_[src],
00932 src, remote_reader_crypto_handles_[dst],
00933 dst);
00934 }
00935 }
00936
00937 void
00938 Sedp::send_builtin_crypto_tokens(const Security::SPDPdiscoveredParticipantData& pdata)
00939 {
00940 using namespace DDS::Security;
00941
00942 DCPS::RepoId part;
00943 std::memcpy(part.guidPrefix, pdata.participantProxy.guidPrefix, sizeof(GuidPrefix_t));
00944 part.entityId = ENTITYID_PARTICIPANT;
00945
00946 const BuiltinEndpointSet_t& avail = pdata.participantProxy.availableBuiltinEndpoints;
00947
00948 if (avail & BUILTIN_PARTICIPANT_MESSAGE_SECURE_WRITER) {
00949 send_builtin_crypto_tokens(part, ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_SECURE_WRITER,
00950 participant_message_secure_reader_->get_repo_id());
00951 }
00952
00953 if (avail & SPDP_BUILTIN_PARTICIPANT_SECURE_WRITER) {
00954 send_builtin_crypto_tokens(part, ENTITYID_SPDP_RELIABLE_BUILTIN_PARTICIPANT_SECURE_WRITER,
00955 dcps_participant_secure_reader_->get_repo_id());
00956 }
00957
00958 if (avail & SEDP_BUILTIN_PUBLICATIONS_SECURE_WRITER) {
00959 send_builtin_crypto_tokens(part, ENTITYID_SEDP_BUILTIN_PUBLICATIONS_SECURE_WRITER,
00960 publications_secure_reader_->get_repo_id());
00961 }
00962
00963 if (avail & SEDP_BUILTIN_SUBSCRIPTIONS_SECURE_WRITER) {
00964 send_builtin_crypto_tokens(part, ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_SECURE_WRITER,
00965 subscriptions_secure_reader_->get_repo_id());
00966 }
00967
00968 if (avail & BUILTIN_PARTICIPANT_MESSAGE_SECURE_READER) {
00969 send_builtin_crypto_tokens(part, ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_SECURE_READER,
00970 participant_message_secure_writer_.get_repo_id());
00971 }
00972
00973 if (avail & SPDP_BUILTIN_PARTICIPANT_SECURE_READER) {
00974 send_builtin_crypto_tokens(part, ENTITYID_SPDP_RELIABLE_BUILTIN_PARTICIPANT_SECURE_READER,
00975 dcps_participant_secure_writer_.get_repo_id());
00976 }
00977
00978 if (avail & SEDP_BUILTIN_PUBLICATIONS_SECURE_READER) {
00979 send_builtin_crypto_tokens(part, ENTITYID_SEDP_BUILTIN_PUBLICATIONS_SECURE_READER,
00980 publications_secure_writer_.get_repo_id());
00981 }
00982
00983 if (avail & SEDP_BUILTIN_SUBSCRIPTIONS_SECURE_READER) {
00984 send_builtin_crypto_tokens(part, ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_SECURE_READER,
00985 subscriptions_secure_writer_.get_repo_id());
00986 }
00987 }
00988 #endif
00989
00990 void
00991 Sedp::Task::svc_i(const Security::SPDPdiscoveredParticipantData* ppdata)
00992 {
00993 DCPS::unique_ptr<const Security::SPDPdiscoveredParticipantData> pdata(ppdata);
00994
00995
00996
00997 DCPS::AssociationData proto;
00998 create_association_data_proto(proto, *pdata);
00999
01000 const BuiltinEndpointSet_t& avail =
01001 pdata->participantProxy.availableBuiltinEndpoints;
01002
01003
01004 if (avail & DISC_BUILTIN_ENDPOINT_PUBLICATION_DETECTOR) {
01005 DCPS::AssociationData peer = proto;
01006 peer.remote_id_.entityId = ENTITYID_SEDP_BUILTIN_PUBLICATIONS_READER;
01007 sedp_->publications_writer_.assoc(peer);
01008 }
01009 if (avail & DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_DETECTOR) {
01010 DCPS::AssociationData peer = proto;
01011 peer.remote_id_.entityId = ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_READER;
01012 sedp_->subscriptions_writer_.assoc(peer);
01013 }
01014 if (avail & BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_DATA_READER) {
01015 DCPS::AssociationData peer = proto;
01016 peer.remote_id_.entityId = ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_READER;
01017 sedp_->participant_message_writer_.assoc(peer);
01018 }
01019
01020 #if defined(OPENDDS_SECURITY)
01021 if (sedp_->is_security_enabled()) {
01022 sedp_->associate_secure_readers_to_writers(*pdata);
01023 }
01024 #endif
01025
01026
01027
01028
01029 for (DeferredSubscriptionMap::iterator pos = sedp_->deferred_subscriptions_.lower_bound(proto.remote_id_),
01030 limit = sedp_->deferred_subscriptions_.upper_bound(proto.remote_id_);
01031 pos != limit;
01032 ) {
01033 sedp_->data_received (pos->second.first, pos->second.second);
01034 sedp_->deferred_subscriptions_.erase (pos++);
01035 }
01036 for (DeferredPublicationMap::iterator pos = sedp_->deferred_publications_.lower_bound(proto.remote_id_),
01037 limit = sedp_->deferred_publications_.upper_bound(proto.remote_id_);
01038 pos != limit;
01039 ) {
01040 sedp_->data_received (pos->second.first, pos->second.second);
01041 sedp_->deferred_publications_.erase (pos++);
01042 }
01043
01044 ACE_GUARD(ACE_Thread_Mutex, g, sedp_->lock_);
01045 if (spdp_->shutting_down()) { return; }
01046
01047 proto.remote_id_.entityId = ENTITYID_PARTICIPANT;
01048 sedp_->associated_participants_.insert(proto.remote_id_);
01049
01050 #if defined(OPENDDS_SECURITY)
01051 if (sedp_->is_security_enabled()) {
01052 spdp_->send_participant_crypto_tokens(proto.remote_id_);
01053 sedp_->send_builtin_crypto_tokens(*pdata);
01054 }
01055 #endif
01056
01057
01058 if (avail & DISC_BUILTIN_ENDPOINT_PUBLICATION_DETECTOR) {
01059 proto.remote_id_.entityId = ENTITYID_SEDP_BUILTIN_PUBLICATIONS_READER;
01060 sedp_->write_durable_publication_data(proto.remote_id_);
01061 }
01062 if (avail & DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_DETECTOR) {
01063 proto.remote_id_.entityId = ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_READER;
01064 sedp_->write_durable_subscription_data(proto.remote_id_);
01065 }
01066 if (avail & BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_DATA_READER) {
01067 proto.remote_id_.entityId = ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_READER;
01068 sedp_->write_durable_participant_message_data(proto.remote_id_);
01069 }
01070
01071 for (DCPS::RepoIdSet::iterator it = sedp_->defer_match_endpoints_.begin();
01072 it != sedp_->defer_match_endpoints_.end(); ) {
01073 if (0 == std::memcmp(it->guidPrefix, proto.remote_id_.guidPrefix,
01074 sizeof(GuidPrefix_t))) {
01075 OPENDDS_STRING topic;
01076 if (it->entityId.entityKind & 4) {
01077 DiscoveredSubscriptionIter dsi =
01078 sedp_->discovered_subscriptions_.find(*it);
01079 if (dsi != sedp_->discovered_subscriptions_.end()) {
01080 topic = dsi->second.reader_data_.ddsSubscriptionData.topic_name;
01081 }
01082 } else {
01083 DiscoveredPublicationIter dpi =
01084 sedp_->discovered_publications_.find(*it);
01085 if (dpi != sedp_->discovered_publications_.end()) {
01086 topic = dpi->second.writer_data_.ddsPublicationData.topic_name;
01087 }
01088 }
01089 if (DCPS::DCPS_debug_level > 3) {
01090 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) Sedp::AssociateTask::svc - ")
01091 ACE_TEXT("processing deferred endpoints for topic %C\n"),
01092 topic.c_str()));
01093 }
01094 if (!topic.empty()) {
01095 OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator ti =
01096 sedp_->topics_.find(topic);
01097 if (ti != sedp_->topics_.end()) {
01098 if (DCPS::DCPS_debug_level > 3) {
01099 DCPS::GuidConverter conv(*it);
01100 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) Sedp::AssociateTask::svc - ")
01101 ACE_TEXT("calling match_endpoints %C\n"),
01102 OPENDDS_STRING(conv).c_str()));
01103 }
01104 sedp_->match_endpoints(*it, ti->second);
01105 if (spdp_->shutting_down()) { return; }
01106 }
01107 }
01108 sedp_->defer_match_endpoints_.erase(it++);
01109 } else {
01110 ++it;
01111 }
01112 }
01113 }
01114
01115 void
01116 Sedp::Task::svc_secure_i(DCPS::MessageId id,
01117 const Security::SPDPdiscoveredParticipantData* ppdata)
01118 {
01119 DCPS::unique_ptr<const Security::SPDPdiscoveredParticipantData> pdata(ppdata);
01120 spdp_->handle_participant_data(id, *pdata);
01121 }
01122
01123 namespace {
01124 void disassociate_helper(const BuiltinEndpointSet_t& avail, const CORBA::ULong flags,
01125 const RepoId& id, const EntityId_t& ent, DCPS::TransportClient& client)
01126 {
01127 if (avail & flags) {
01128 RepoId temp = id;
01129 temp.entityId = ent;
01130 client.disassociate(temp);
01131 }
01132 }
01133 }
01134
01135
01136 bool
01137 Sedp::disassociate(const Security::SPDPdiscoveredParticipantData& pdata)
01138 {
01139 RepoId part;
01140 std::memcpy(part.guidPrefix, pdata.participantProxy.guidPrefix,
01141 sizeof(GuidPrefix_t));
01142 part.entityId = ENTITYID_PARTICIPANT;
01143 associated_participants_.erase(part);
01144 const BuiltinEndpointSet_t avail =
01145 pdata.participantProxy.availableBuiltinEndpoints;
01146
01147 {
01148 ACE_Reverse_Lock<ACE_Thread_Mutex> rev_lock(lock_);
01149 ACE_GUARD_RETURN(ACE_Reverse_Lock< ACE_Thread_Mutex>, rg, rev_lock, false);
01150
01151 disassociate_helper(avail, DISC_BUILTIN_ENDPOINT_PUBLICATION_DETECTOR, part,
01152 ENTITYID_SEDP_BUILTIN_PUBLICATIONS_READER, publications_writer_);
01153 disassociate_helper(avail, DISC_BUILTIN_ENDPOINT_PUBLICATION_ANNOUNCER, part,
01154 ENTITYID_SEDP_BUILTIN_PUBLICATIONS_WRITER, *publications_reader_);
01155
01156 disassociate_helper(avail, DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_DETECTOR, part,
01157 ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_READER, subscriptions_writer_);
01158 disassociate_helper(avail, DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_ANNOUNCER, part,
01159 ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_WRITER, *subscriptions_reader_);
01160
01161 disassociate_helper(avail, BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_DATA_READER, part,
01162 ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_READER, participant_message_writer_);
01163 disassociate_helper(avail, BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_DATA_WRITER, part,
01164 ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_WRITER, *participant_message_reader_);
01165
01166
01167
01168 #if defined(OPENDDS_SECURITY)
01169 using namespace DDS::Security;
01170
01171 disassociate_helper(avail, SEDP_BUILTIN_PUBLICATIONS_SECURE_READER, part,
01172 ENTITYID_SEDP_BUILTIN_PUBLICATIONS_SECURE_READER, publications_secure_writer_);
01173 disassociate_helper(avail, SEDP_BUILTIN_PUBLICATIONS_SECURE_WRITER, part,
01174 ENTITYID_SEDP_BUILTIN_PUBLICATIONS_SECURE_WRITER, *publications_secure_reader_);
01175
01176 disassociate_helper(avail, SEDP_BUILTIN_SUBSCRIPTIONS_SECURE_READER, part,
01177 ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_SECURE_READER, subscriptions_secure_writer_);
01178 disassociate_helper(avail, SEDP_BUILTIN_SUBSCRIPTIONS_SECURE_WRITER, part,
01179 ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_SECURE_WRITER, *subscriptions_secure_reader_);
01180
01181 disassociate_helper(avail, BUILTIN_PARTICIPANT_MESSAGE_SECURE_READER, part,
01182 ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_SECURE_READER, participant_message_secure_writer_);
01183 disassociate_helper(avail, BUILTIN_PARTICIPANT_MESSAGE_SECURE_WRITER, part,
01184 ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_SECURE_WRITER, *participant_message_secure_reader_);
01185
01186 disassociate_helper(avail, BUILTIN_PARTICIPANT_STATELESS_MESSAGE_READER, part,
01187 ENTITYID_P2P_BUILTIN_PARTICIPANT_STATELESS_READER, participant_stateless_message_writer_);
01188 disassociate_helper(avail, BUILTIN_PARTICIPANT_STATELESS_MESSAGE_WRITER, part,
01189 ENTITYID_P2P_BUILTIN_PARTICIPANT_STATELESS_WRITER, *participant_stateless_message_reader_);
01190
01191 disassociate_helper(avail, BUILTIN_PARTICIPANT_VOLATILE_MESSAGE_SECURE_READER, part,
01192 ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_READER, participant_volatile_message_secure_writer_);
01193 disassociate_helper(avail, BUILTIN_PARTICIPANT_VOLATILE_MESSAGE_SECURE_WRITER, part,
01194 ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_WRITER, *participant_volatile_message_secure_reader_);
01195
01196 disassociate_helper(avail, SPDP_BUILTIN_PARTICIPANT_SECURE_READER, part,
01197 ENTITYID_SPDP_RELIABLE_BUILTIN_PARTICIPANT_SECURE_READER, dcps_participant_secure_writer_);
01198 disassociate_helper(avail, SPDP_BUILTIN_PARTICIPANT_SECURE_WRITER, part,
01199 ENTITYID_SPDP_RELIABLE_BUILTIN_PARTICIPANT_SECURE_WRITER, *dcps_participant_secure_reader_);
01200 #endif
01201
01202 }
01203
01204 if (spdp_.has_discovered_participant(part)) {
01205 remove_entities_belonging_to(discovered_publications_, part);
01206 remove_entities_belonging_to(discovered_subscriptions_, part);
01207 return true;
01208 } else {
01209 return false;
01210 }
01211 }
01212
01213 template<typename Map>
01214 void
01215 Sedp::remove_entities_belonging_to(Map& m, RepoId participant)
01216 {
01217 participant.entityId.entityKey[0] = 0;
01218 participant.entityId.entityKey[1] = 0;
01219 participant.entityId.entityKey[2] = 0;
01220 participant.entityId.entityKind = 0;
01221 for (typename Map::iterator i = m.lower_bound(participant);
01222 i != m.end() && 0 == std::memcmp(i->first.guidPrefix,
01223 participant.guidPrefix,
01224 sizeof(GuidPrefix_t));) {
01225 OPENDDS_STRING topic_name = get_topic_name(i->second);
01226 OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it =
01227 topics_.find(topic_name);
01228 if (top_it != topics_.end()) {
01229 top_it->second.endpoints_.erase(i->first);
01230 if (DCPS::DCPS_debug_level > 3) {
01231 ACE_DEBUG((LM_DEBUG,
01232 ACE_TEXT("(%P|%t) Sedp::remove_entities_belonging_to - ")
01233 ACE_TEXT("calling match_endpoints remove\n")));
01234 }
01235 match_endpoints(i->first, top_it->second, true );
01236 if (spdp_.shutting_down()) { return; }
01237 }
01238 remove_from_bit(i->second);
01239 m.erase(i++);
01240 }
01241 }
01242
01243 void
01244 Sedp::remove_from_bit_i(const DiscoveredPublication& pub)
01245 {
01246 #ifndef DDS_HAS_MINIMUM_BIT
01247 task_.enqueue(Msg::MSG_REMOVE_FROM_PUB_BIT, pub.bit_ih_);
01248 #else
01249 ACE_UNUSED_ARG(pub);
01250 #endif
01251 }
01252
01253 void
01254 Sedp::remove_from_bit_i(const DiscoveredSubscription& sub)
01255 {
01256 #ifndef DDS_HAS_MINIMUM_BIT
01257 task_.enqueue(Msg::MSG_REMOVE_FROM_SUB_BIT, sub.bit_ih_);
01258 #else
01259 ACE_UNUSED_ARG(sub);
01260 #endif
01261 }
01262
01263 void
01264 Sedp::Task::svc_i(Msg::MsgType which_bit, const DDS::InstanceHandle_t bit_ih)
01265 {
01266 #ifndef DDS_HAS_MINIMUM_BIT
01267 switch (which_bit) {
01268 case Msg::MSG_REMOVE_FROM_PUB_BIT: {
01269 DCPS::PublicationBuiltinTopicDataDataReaderImpl* bit = sedp_->pub_bit();
01270
01271 if (bit && bit_ih != DDS::HANDLE_NIL) {
01272 bit->set_instance_state(bit_ih,
01273 DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE);
01274 }
01275 break;
01276 }
01277 case Msg::MSG_REMOVE_FROM_SUB_BIT: {
01278 DCPS::SubscriptionBuiltinTopicDataDataReaderImpl* bit = sedp_->sub_bit();
01279
01280 if (bit && bit_ih != DDS::HANDLE_NIL) {
01281 bit->set_instance_state(bit_ih,
01282 DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE);
01283 }
01284 break;
01285 }
01286 default:
01287 break;
01288 }
01289 #else
01290 ACE_UNUSED_ARG(which_bit);
01291 ACE_UNUSED_ARG(bit_ih);
01292 #endif
01293 }
01294
01295 #ifndef DDS_HAS_MINIMUM_BIT
01296 DCPS::TopicBuiltinTopicDataDataReaderImpl*
01297 Sedp::topic_bit()
01298 {
01299 DDS::Subscriber_var sub = spdp_.bit_subscriber();
01300 if (!sub.in())
01301 return 0;
01302
01303 DDS::DataReader_var d =
01304 sub->lookup_datareader(DCPS::BUILT_IN_TOPIC_TOPIC);
01305 return dynamic_cast<DCPS::TopicBuiltinTopicDataDataReaderImpl*>(d.in());
01306 }
01307
01308 DCPS::PublicationBuiltinTopicDataDataReaderImpl*
01309 Sedp::pub_bit()
01310 {
01311 DDS::Subscriber_var sub = spdp_.bit_subscriber();
01312 if (!sub.in())
01313 return 0;
01314
01315 DDS::DataReader_var d =
01316 sub->lookup_datareader(DCPS::BUILT_IN_PUBLICATION_TOPIC);
01317 return dynamic_cast<DCPS::PublicationBuiltinTopicDataDataReaderImpl*>(d.in());
01318 }
01319
01320 DCPS::SubscriptionBuiltinTopicDataDataReaderImpl*
01321 Sedp::sub_bit()
01322 {
01323 DDS::Subscriber_var sub = spdp_.bit_subscriber();
01324 if (!sub.in())
01325 return 0;
01326
01327 DDS::DataReader_var d =
01328 sub->lookup_datareader(DCPS::BUILT_IN_SUBSCRIPTION_TOPIC);
01329 return dynamic_cast<DCPS::SubscriptionBuiltinTopicDataDataReaderImpl*>(d.in());
01330 }
01331 #endif
01332
01333 bool
01334 Sedp::update_topic_qos(const RepoId& topicId, const DDS::TopicQos& qos,
01335 OPENDDS_STRING& name)
01336 {
01337 ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, false);
01338 OPENDDS_MAP_CMP(RepoId, OPENDDS_STRING, DCPS::GUID_tKeyLessThan)::iterator iter =
01339 topic_names_.find(topicId);
01340 if (iter == topic_names_.end()) {
01341 return false;
01342 }
01343 name = iter->second;
01344 TopicDetails& topic = topics_[name];
01345 using namespace DCPS;
01346
01347
01348 if (qos.topic_data != topic.qos_.topic_data) {
01349 topic.qos_ = qos;
01350
01351 for (RepoIdSet::iterator topic_endpoints = topic.endpoints_.begin();
01352 topic_endpoints != topic.endpoints_.end(); ++topic_endpoints) {
01353
01354 const RepoId& rid = *topic_endpoints;
01355 GuidConverter conv(rid);
01356 if (conv.isWriter()) {
01357
01358 LocalPublicationIter lp = local_publications_.find(rid);
01359 if (lp != local_publications_.end()) {
01360 write_publication_data(rid, lp->second);
01361 }
01362 } else if (conv.isReader()) {
01363
01364 LocalSubscriptionIter ls = local_subscriptions_.find(rid);
01365 if (ls != local_subscriptions_.end()) {
01366 write_subscription_data(rid, ls->second);
01367 }
01368 }
01369 }
01370 }
01371
01372 return true;
01373 }
01374
01375 void
01376 Sedp::inconsistent_topic(const DCPS::RepoIdSet& eps) const
01377 {
01378 using DCPS::RepoIdSet;
01379 for (RepoIdSet::const_iterator iter(eps.begin()); iter != eps.end(); ++iter) {
01380 if (0 == std::memcmp(participant_id_.guidPrefix, iter->guidPrefix,
01381 sizeof(GuidPrefix_t))) {
01382 const bool reader = iter->entityId.entityKind & 4;
01383 if (reader) {
01384 const LocalSubscriptionCIter lsi = local_subscriptions_.find(*iter);
01385 if (lsi != local_subscriptions_.end()) {
01386 lsi->second.subscription_->inconsistent_topic();
01387
01388
01389
01390 return;
01391 }
01392 } else {
01393 const LocalPublicationCIter lpi = local_publications_.find(*iter);
01394 if (lpi != local_publications_.end()) {
01395 lpi->second.publication_->inconsistent_topic();
01396 return;
01397 }
01398 }
01399 }
01400 }
01401 }
01402
01403 DDS::ReturnCode_t
01404 Sedp::remove_publication_i(const RepoId& publicationId)
01405 {
01406 return publications_writer_.write_unregister_dispose(publicationId);
01407 }
01408
01409 bool
01410 Sedp::update_publication_qos(const RepoId& publicationId,
01411 const DDS::DataWriterQos& qos,
01412 const DDS::PublisherQos& publisherQos)
01413 {
01414 ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, false);
01415 LocalPublicationIter iter = local_publications_.find(publicationId);
01416 if (iter != local_publications_.end()) {
01417 LocalPublication& pb = iter->second;
01418 pb.qos_ = qos;
01419 pb.publisher_qos_ = publisherQos;
01420
01421 if (DDS::RETCODE_OK != write_publication_data(publicationId, pb)) {
01422 return false;
01423 }
01424
01425 OPENDDS_STRING topic_name = topic_names_[pb.topic_id_];
01426 OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it =
01427 topics_.find(topic_name);
01428 if (top_it != topics_.end()) {
01429 match_endpoints(publicationId, top_it->second);
01430 }
01431 return true;
01432 }
01433 return false;
01434 }
01435
01436 DDS::ReturnCode_t
01437 Sedp::remove_subscription_i(const RepoId& subscriptionId)
01438 {
01439 return subscriptions_writer_.write_unregister_dispose(subscriptionId);
01440 }
01441
01442 bool
01443 Sedp::update_subscription_qos(const RepoId& subscriptionId,
01444 const DDS::DataReaderQos& qos,
01445 const DDS::SubscriberQos& subscriberQos)
01446 {
01447 ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, false);
01448 LocalSubscriptionIter iter = local_subscriptions_.find(subscriptionId);
01449 if (iter != local_subscriptions_.end()) {
01450 LocalSubscription& sb = iter->second;
01451 sb.qos_ = qos;
01452 sb.subscriber_qos_ = subscriberQos;
01453
01454 if (DDS::RETCODE_OK != write_subscription_data(subscriptionId, sb)) {
01455 return false;
01456 }
01457
01458 OPENDDS_STRING topic_name = topic_names_[sb.topic_id_];
01459 OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it =
01460 topics_.find(topic_name);
01461 if (top_it != topics_.end()) {
01462 match_endpoints(subscriptionId, top_it->second);
01463 }
01464 return true;
01465 }
01466 return false;
01467 }
01468
01469 bool
01470 Sedp::update_subscription_params(const RepoId& subId,
01471 const DDS::StringSeq& params)
01472 {
01473 ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, false);
01474 const LocalSubscriptionIter iter = local_subscriptions_.find(subId);
01475 if (iter != local_subscriptions_.end()) {
01476 LocalSubscription& sb = iter->second;
01477 sb.filterProperties.expressionParameters = params;
01478
01479 if (DDS::RETCODE_OK != write_subscription_data(subId, sb)) {
01480 return false;
01481 }
01482
01483
01484 for (DCPS::RepoIdSet::iterator i = iter->second.matched_endpoints_.begin();
01485 i != iter->second.matched_endpoints_.end(); ++i) {
01486 const LocalPublicationIter lpi = local_publications_.find(*i);
01487 if (lpi != local_publications_.end()) {
01488 lpi->second.publication_->update_subscription_params(subId, params);
01489 }
01490 }
01491
01492 return true;
01493 }
01494 return false;
01495 }
01496
01497 void
01498 Sedp::shutdown()
01499 {
01500 task_.shutdown();
01501 publications_reader_->shutting_down_ = true;
01502 subscriptions_reader_->shutting_down_ = true;
01503 participant_message_reader_->shutting_down_ = true;
01504 }
01505
01506 void
01507 Sedp::Task::acknowledge()
01508 {
01509
01510 putq(new Msg(Msg::MSG_FINI_BIT, DCPS::REQUEST_ACK, 0));
01511 }
01512
01513 void
01514 Sedp::Task::shutdown()
01515 {
01516 if (!shutting_down_) {
01517 shutting_down_ = true;
01518 putq(new Msg(Msg::MSG_STOP, DCPS::GRACEFUL_DISCONNECT, 0));
01519 wait();
01520 }
01521 }
01522
01523 void
01524 Sedp::Task::svc_i(DCPS::MessageId message_id,
01525 const DCPS::DiscoveredWriterData* pwdata)
01526 {
01527 DCPS::unique_ptr<const DCPS::DiscoveredWriterData> delete_the_data(pwdata);
01528 sedp_->data_received(message_id, *pwdata);
01529 }
01530
01531 void Sedp::process_discovered_writer_data(DCPS::MessageId message_id,
01532 const DCPS::DiscoveredWriterData& wdata,
01533 const RepoId& guid,
01534 const DDS::Security::EndpointSecurityInfo* security_info)
01535 {
01536
01537 #if ! defined(OPENDDS_SECURITY)
01538 ACE_UNUSED_ARG(security_info);
01539 #endif
01540
01541 OPENDDS_STRING topic_name;
01542
01543
01544 DiscoveredPublicationIter iter = discovered_publications_.find(guid);
01545
01546 if (message_id == DCPS::SAMPLE_DATA) {
01547 DCPS::DiscoveredWriterData wdata_copy;
01548
01549 if (iter == discovered_publications_.end()) {
01550
01551 ACE_Reverse_Lock<ACE_Thread_Mutex> rev_lock(lock_);
01552
01553 {
01554 DiscoveredPublication prepub(wdata);
01555
01556 topic_name = get_topic_name(prepub);
01557
01558 #if defined(OPENDDS_SECURITY)
01559 if (is_security_enabled()) {
01560
01561 DDS::Security::SecurityException ex = {"", 0, 0};
01562
01563 RepoId part = guid;
01564 part.entityId = ENTITYID_PARTICIPANT;
01565
01566 DDS::TopicBuiltinTopicData data;
01567 data.key = wdata.ddsPublicationData.key;
01568 data.name = wdata.ddsPublicationData.topic_name;
01569 data.type_name = wdata.ddsPublicationData.type_name;
01570 data.durability = wdata.ddsPublicationData.durability;
01571 data.durability_service = wdata.ddsPublicationData.durability_service;
01572 data.deadline = wdata.ddsPublicationData.deadline;
01573 data.latency_budget = wdata.ddsPublicationData.latency_budget;
01574 data.liveliness = wdata.ddsPublicationData.liveliness;
01575 data.reliability = wdata.ddsPublicationData.reliability;
01576 data.lifespan = wdata.ddsPublicationData.lifespan;
01577 data.destination_order = wdata.ddsPublicationData.destination_order;
01578 data.ownership = wdata.ddsPublicationData.ownership;
01579 data.topic_data = wdata.ddsPublicationData.topic_data;
01580
01581 DCPS::AuthState auth_state = spdp_.lookup_participant_auth_state(part);
01582 if (auth_state == DCPS::AS_AUTHENTICATED) {
01583
01584 DDS::Security::PermissionsHandle remote_permissions = spdp_.lookup_participant_permissions(part);
01585
01586 if (participant_sec_attr_.is_access_protected &&
01587 !get_access_control()->check_remote_topic(remote_permissions, spdp_.get_domain_id(), data, ex))
01588 {
01589 ACE_ERROR((LM_WARNING,
01590 ACE_TEXT("(%P|%t) WARNING: ")
01591 ACE_TEXT("Sedp::data_received(dwd) - ")
01592 ACE_TEXT("Unable to check remote topic '%C'. SecurityException[%d.%d]: %C\n"),
01593 topic_name.data(), ex.code, ex.minor_code, ex.message.in()));
01594 return;
01595 }
01596
01597 DDS::Security::TopicSecurityAttributes topic_sec_attr;
01598 if (!get_access_control()->get_topic_sec_attributes(remote_permissions, topic_name.data(), topic_sec_attr, ex))
01599 {
01600 ACE_ERROR((LM_WARNING,
01601 ACE_TEXT("(%P|%t) WARNING: ")
01602 ACE_TEXT("Sedp::data_received(dwd) - ")
01603 ACE_TEXT("Unable to get security attributes for remote topic '%C'. SecurityException[%d.%d]: %C\n"),
01604 topic_name.data(), ex.code, ex.minor_code, ex.message.in()));
01605 return;
01606 }
01607
01608 DDS::Security::PublicationBuiltinTopicDataSecure pub_data_sec;
01609 pub_data_sec.base.base = wdata.ddsPublicationData;
01610
01611 if (security_info != NULL) {
01612 pub_data_sec.base.security_info.endpoint_security_attributes = security_info->endpoint_security_attributes;
01613 pub_data_sec.base.security_info.plugin_endpoint_security_attributes = security_info->plugin_endpoint_security_attributes;
01614 }
01615
01616 if (topic_sec_attr.is_write_protected &&
01617 !get_access_control()->check_remote_datawriter(remote_permissions, spdp_.get_domain_id(), pub_data_sec, ex))
01618 {
01619 ACE_ERROR((LM_WARNING,
01620 ACE_TEXT("(%P|%t) WARNING: ")
01621 ACE_TEXT("Sedp::data_received(dwd) - ")
01622 ACE_TEXT("Unable to check remote datawriter '%C'. SecurityException[%d.%d]: %C\n"),
01623 topic_name.data(), ex.code, ex.minor_code, ex.message.in()));
01624 return;
01625 }
01626 } else if (auth_state != DCPS::AS_UNAUTHENTICATED) {
01627 ACE_ERROR((LM_WARNING,
01628 ACE_TEXT("(%P|%t) WARNING: ")
01629 ACE_TEXT("Sedp::data_received(dwd) - ")
01630 ACE_TEXT("Unsupported remote participant authentication state for discovered datawriter '%C'. SecurityException[%d.%d]: %C\n"),
01631 topic_name.data(), ex.code, ex.minor_code, ex.message.in()));
01632 return;
01633 }
01634 }
01635 #endif
01636
01637 DiscoveredPublication& pub = discovered_publications_[guid] = prepub;
01638
01639 OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it =
01640 topics_.find(topic_name);
01641 if (top_it == topics_.end()) {
01642 top_it =
01643 topics_.insert(std::make_pair(topic_name, TopicDetails())).first;
01644 top_it->second.data_type_ = wdata.ddsPublicationData.type_name;
01645 top_it->second.qos_.topic_data = wdata.ddsPublicationData.topic_data;
01646 top_it->second.repo_id_ = make_topic_guid();
01647
01648 } else if (top_it->second.data_type_ !=
01649 wdata.ddsPublicationData.type_name.in()) {
01650 inconsistent_topic(top_it->second.endpoints_);
01651 if (DCPS::DCPS_debug_level) {
01652 ACE_DEBUG((LM_WARNING,
01653 ACE_TEXT("(%P|%t) Sedp::data_received(dwd) - WARNING ")
01654 ACE_TEXT("topic %C discovered data type %C doesn't ")
01655 ACE_TEXT("match known data type %C, ignoring ")
01656 ACE_TEXT("discovered publication.\n"),
01657 topic_name.c_str(),
01658 wdata.ddsPublicationData.type_name.in(),
01659 top_it->second.data_type_.c_str()));
01660 }
01661 return;
01662 }
01663
01664 TopicDetails& td = top_it->second;
01665 topic_names_[td.repo_id_] = topic_name;
01666 td.endpoints_.insert(guid);
01667
01668 std::memcpy(pub.writer_data_.ddsPublicationData.participant_key.value,
01669 guid.guidPrefix, sizeof(DDS::BuiltinTopicKey_t));
01670 assign_bit_key(pub);
01671 wdata_copy = pub.writer_data_;
01672 }
01673
01674
01675 iter = discovered_publications_.end();
01676
01677 DDS::InstanceHandle_t instance_handle = DDS::HANDLE_NIL;
01678 #ifndef DDS_HAS_MINIMUM_BIT
01679 {
01680
01681 ACE_GUARD(ACE_Reverse_Lock< ACE_Thread_Mutex>, rg, rev_lock);
01682 DCPS::PublicationBuiltinTopicDataDataReaderImpl* bit = pub_bit();
01683 if (bit) {
01684 instance_handle =
01685 bit->store_synthetic_data(wdata_copy.ddsPublicationData,
01686 DDS::NEW_VIEW_STATE);
01687 }
01688 }
01689 #endif
01690
01691 if (spdp_.shutting_down()) { return; }
01692
01693 iter = discovered_publications_.find(guid);
01694 if (iter != discovered_publications_.end()) {
01695 iter->second.bit_ih_ = instance_handle;
01696 OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it =
01697 topics_.find(topic_name);
01698 if (top_it != topics_.end()) {
01699 if (DCPS::DCPS_debug_level > 3) {
01700 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) Sedp::data_received(dwd) - ")
01701 ACE_TEXT("calling match_endpoints new\n")));
01702 }
01703 match_endpoints(guid, top_it->second);
01704 }
01705 }
01706
01707 } else if (qosChanged(iter->second.writer_data_.ddsPublicationData,
01708 wdata.ddsPublicationData)) {
01709 #ifndef DDS_HAS_MINIMUM_BIT
01710 DCPS::PublicationBuiltinTopicDataDataReaderImpl* bit = pub_bit();
01711 if (bit) {
01712 bit->store_synthetic_data(iter->second.writer_data_.ddsPublicationData,
01713 DDS::NOT_NEW_VIEW_STATE);
01714 }
01715 #endif
01716
01717
01718 topic_name = get_topic_name(iter->second);
01719 OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it =
01720 topics_.find(topic_name);
01721 if (top_it != topics_.end()) {
01722 if (DCPS::DCPS_debug_level > 3) {
01723 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) Sedp::data_received(dwd) - ")
01724 ACE_TEXT("calling match_endpoints update\n")));
01725 }
01726 match_endpoints(guid, top_it->second);
01727 }
01728 }
01729
01730 } else if (message_id == DCPS::UNREGISTER_INSTANCE ||
01731 message_id == DCPS::DISPOSE_INSTANCE ||
01732 message_id == DCPS::DISPOSE_UNREGISTER_INSTANCE) {
01733 if (iter != discovered_publications_.end()) {
01734
01735 topic_name = get_topic_name(iter->second);
01736 OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it =
01737 topics_.find(topic_name);
01738 if (top_it != topics_.end()) {
01739 top_it->second.endpoints_.erase(guid);
01740 match_endpoints(guid, top_it->second, true );
01741 if (spdp_.shutting_down()) { return; }
01742 }
01743 remove_from_bit(iter->second);
01744 if (DCPS::DCPS_debug_level > 3) {
01745 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) Sedp::data_received(dwd) - ")
01746 ACE_TEXT("calling match_endpoints disp/unreg\n")));
01747 }
01748 discovered_publications_.erase(iter);
01749 }
01750 }
01751 }
01752
01753 void
01754 Sedp::data_received(DCPS::MessageId message_id,
01755 const DCPS::DiscoveredWriterData& wdata)
01756 {
01757 if (spdp_.shutting_down()) { return; }
01758
01759 const RepoId& guid = wdata.writerProxy.remoteWriterGuid;
01760 RepoId guid_participant = guid;
01761 guid_participant.entityId = ENTITYID_PARTICIPANT;
01762
01763 ACE_GUARD(ACE_Thread_Mutex, g, lock_);
01764
01765 if (ignoring(guid)
01766 || ignoring(guid_participant)
01767 || ignoring(wdata.ddsPublicationData.topic_name)) {
01768 return;
01769 }
01770
01771 #if defined(OPENDDS_SECURITY)
01772 if (should_drop_message(wdata.ddsPublicationData.topic_name)) {
01773 return;
01774 }
01775 #endif
01776
01777 if (!spdp_.has_discovered_participant(guid_participant)) {
01778 deferred_publications_[guid] = std::make_pair(message_id, wdata);
01779 return;
01780 }
01781
01782 process_discovered_writer_data(message_id, wdata, guid);
01783 }
01784
01785 #if defined(OPENDDS_SECURITY)
01786 void
01787 Sedp::Task::svc_i(DCPS::MessageId message_id,
01788 const DiscoveredWriterData_SecurityWrapper* data)
01789 {
01790 DCPS::unique_ptr<const DiscoveredWriterData_SecurityWrapper> delete_the_data(data);
01791 sedp_->data_received(message_id, *data);
01792 }
01793
01794 void Sedp::data_received(DCPS::MessageId message_id,
01795 const DiscoveredWriterData_SecurityWrapper& wrapper)
01796 {
01797 if (spdp_.shutting_down()) { return; }
01798
01799 const RepoId& guid = wrapper.data.writerProxy.remoteWriterGuid;
01800 RepoId guid_participant = guid;
01801 guid_participant.entityId = ENTITYID_PARTICIPANT;
01802
01803 ACE_GUARD(ACE_Thread_Mutex, g, lock_);
01804
01805 if (ignoring(guid)
01806 || ignoring(guid_participant)
01807 || ignoring(wrapper.data.ddsPublicationData.topic_name)) {
01808 return;
01809 }
01810
01811 process_discovered_writer_data(message_id, wrapper.data, guid, &wrapper.security_info);
01812 }
01813 #endif
01814
01815 void Sedp::process_discovered_reader_data(DCPS::MessageId message_id,
01816 const DCPS::DiscoveredReaderData& rdata,
01817 const RepoId& guid,
01818 const DDS::Security::EndpointSecurityInfo* security_info)
01819 {
01820
01821 #if ! defined(OPENDDS_SECURITY)
01822 ACE_UNUSED_ARG(security_info);
01823 #endif
01824
01825 OPENDDS_STRING topic_name;
01826
01827
01828 DiscoveredSubscriptionIter iter = discovered_subscriptions_.find(guid);
01829
01830
01831 ACE_Reverse_Lock<ACE_Thread_Mutex> rev_lock(lock_);
01832
01833 if (message_id == DCPS::SAMPLE_DATA) {
01834 DCPS::DiscoveredReaderData rdata_copy;
01835
01836 if (iter == discovered_subscriptions_.end()) {
01837 {
01838 DiscoveredSubscription presub(rdata);
01839
01840 topic_name = get_topic_name(presub);
01841
01842 #if defined(OPENDDS_SECURITY)
01843 if (is_security_enabled()) {
01844
01845 DDS::Security::SecurityException ex = {"", 0, 0};
01846
01847 RepoId part = guid;
01848 part.entityId = ENTITYID_PARTICIPANT;
01849
01850 DDS::TopicBuiltinTopicData data;
01851 data.key = rdata.ddsSubscriptionData.key;
01852 data.name = rdata.ddsSubscriptionData.topic_name;
01853 data.type_name = rdata.ddsSubscriptionData.type_name;
01854 data.durability = rdata.ddsSubscriptionData.durability;
01855 data.deadline = rdata.ddsSubscriptionData.deadline;
01856 data.latency_budget = rdata.ddsSubscriptionData.latency_budget;
01857 data.liveliness = rdata.ddsSubscriptionData.liveliness;
01858 data.reliability = rdata.ddsSubscriptionData.reliability;
01859 data.destination_order = rdata.ddsSubscriptionData.destination_order;
01860 data.ownership = rdata.ddsSubscriptionData.ownership;
01861 data.topic_data = rdata.ddsSubscriptionData.topic_data;
01862
01863 DCPS::AuthState auth_state = spdp_.lookup_participant_auth_state(part);
01864 if (auth_state == DCPS::AS_AUTHENTICATED) {
01865
01866 DDS::Security::PermissionsHandle remote_permissions = spdp_.lookup_participant_permissions(part);
01867
01868 if (participant_sec_attr_.is_access_protected &&
01869 !get_access_control()->check_remote_topic(remote_permissions, spdp_.get_domain_id(), data, ex))
01870 {
01871 ACE_ERROR((LM_WARNING,
01872 ACE_TEXT("(%P|%t) WARNING: ")
01873 ACE_TEXT("Sedp::data_received(drd) - ")
01874 ACE_TEXT("Unable to check remote topic '%C'. SecurityException[%d.%d]: %C\n"),
01875 topic_name.data(), ex.code, ex.minor_code, ex.message.in()));
01876 return;
01877 }
01878
01879 DDS::Security::TopicSecurityAttributes topic_sec_attr;
01880 if (!get_access_control()->get_topic_sec_attributes(remote_permissions, topic_name.data(), topic_sec_attr, ex))
01881 {
01882 ACE_ERROR((LM_WARNING,
01883 ACE_TEXT("(%P|%t) WARNING: ")
01884 ACE_TEXT("Sedp::data_received(drd) - ")
01885 ACE_TEXT("Unable to get security attributes for remote topic '%C'. SecurityException[%d.%d]: %C\n"),
01886 topic_name.data(), ex.code, ex.minor_code, ex.message.in()));
01887 return;
01888 }
01889
01890 DDS::Security::SubscriptionBuiltinTopicDataSecure sub_data_sec;
01891 sub_data_sec.base.base = rdata.ddsSubscriptionData;
01892
01893 if (security_info != NULL) {
01894 sub_data_sec.base.security_info.endpoint_security_attributes = security_info->endpoint_security_attributes;
01895 sub_data_sec.base.security_info.plugin_endpoint_security_attributes = security_info->plugin_endpoint_security_attributes;
01896 }
01897
01898 bool relay_only = false;
01899 if (topic_sec_attr.is_read_protected &&
01900 !get_access_control()->check_remote_datareader(remote_permissions, spdp_.get_domain_id(), sub_data_sec, relay_only, ex))
01901 {
01902 ACE_ERROR((LM_WARNING,
01903 ACE_TEXT("(%P|%t) WARNING: ")
01904 ACE_TEXT("Sedp::data_received(drd) - ")
01905 ACE_TEXT("Unable to check remote datareader '%C'. SecurityException[%d.%d]: %C\n"),
01906 topic_name.data(), ex.code, ex.minor_code, ex.message.in()));
01907 return;
01908 }
01909
01910 if (relay_only) {
01911 relay_only_readers_.insert(guid);
01912 } else {
01913 relay_only_readers_.erase(guid);
01914 }
01915 } else if (auth_state != DCPS::AS_UNAUTHENTICATED) {
01916 ACE_ERROR((LM_WARNING,
01917 ACE_TEXT("(%P|%t) WARNING: ")
01918 ACE_TEXT("Sedp::data_received(dwd) - ")
01919 ACE_TEXT("Unsupported remote participant authentication state for discovered datawriter '%C'. SecurityException[%d.%d]: %C\n"),
01920 topic_name.data(), ex.code, ex.minor_code, ex.message.in()));
01921 return;
01922 }
01923 }
01924 #endif
01925
01926 DiscoveredSubscription& sub = discovered_subscriptions_[guid] = presub;
01927
01928 OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it =
01929 topics_.find(topic_name);
01930 if (top_it == topics_.end()) {
01931 top_it =
01932 topics_.insert(std::make_pair(topic_name, TopicDetails())).first;
01933 top_it->second.data_type_ = rdata.ddsSubscriptionData.type_name;
01934 top_it->second.qos_.topic_data = rdata.ddsSubscriptionData.topic_data;
01935 top_it->second.repo_id_ = make_topic_guid();
01936
01937 } else if (top_it->second.data_type_ !=
01938 rdata.ddsSubscriptionData.type_name.in()) {
01939 inconsistent_topic(top_it->second.endpoints_);
01940 if (DCPS::DCPS_debug_level) {
01941 ACE_DEBUG((LM_WARNING,
01942 ACE_TEXT("(%P|%t) Sedp::data_received(drd) - WARNING ")
01943 ACE_TEXT("topic %C discovered data type %C doesn't ")
01944 ACE_TEXT("match known data type %C, ignoring ")
01945 ACE_TEXT("discovered subcription.\n"),
01946 topic_name.c_str(),
01947 rdata.ddsSubscriptionData.type_name.in(),
01948 top_it->second.data_type_.c_str()));
01949 }
01950 return;
01951 }
01952
01953 TopicDetails& td = top_it->second;
01954 topic_names_[td.repo_id_] = topic_name;
01955 td.endpoints_.insert(guid);
01956
01957 std::memcpy(sub.reader_data_.ddsSubscriptionData.participant_key.value,
01958 guid.guidPrefix, sizeof(DDS::BuiltinTopicKey_t));
01959 assign_bit_key(sub);
01960 rdata_copy = sub.reader_data_;
01961 }
01962
01963
01964 iter = discovered_subscriptions_.end();
01965
01966 DDS::InstanceHandle_t instance_handle = DDS::HANDLE_NIL;
01967 #ifndef DDS_HAS_MINIMUM_BIT
01968 {
01969
01970 ACE_GUARD(ACE_Reverse_Lock< ACE_Thread_Mutex>, rg, rev_lock);
01971 DCPS::SubscriptionBuiltinTopicDataDataReaderImpl* bit = sub_bit();
01972 if (bit) {
01973 instance_handle =
01974 bit->store_synthetic_data(rdata_copy.ddsSubscriptionData,
01975 DDS::NEW_VIEW_STATE);
01976 }
01977 }
01978 #endif
01979
01980 if (spdp_.shutting_down()) { return; }
01981
01982 iter = discovered_subscriptions_.find(guid);
01983 if (iter != discovered_subscriptions_.end()) {
01984 iter->second.bit_ih_ = instance_handle;
01985 OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it =
01986 topics_.find(topic_name);
01987 if (top_it != topics_.end()) {
01988 if (DCPS::DCPS_debug_level > 3) {
01989 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) Sedp::data_received(drd) - ")
01990 ACE_TEXT("calling match_endpoints new\n")));
01991 }
01992 match_endpoints(guid, top_it->second);
01993 }
01994 }
01995
01996 } else {
01997 if (qosChanged(iter->second.reader_data_.ddsSubscriptionData,
01998 rdata.ddsSubscriptionData)) {
01999 #ifndef DDS_HAS_MINIMUM_BIT
02000 DCPS::SubscriptionBuiltinTopicDataDataReaderImpl* bit = sub_bit();
02001 if (bit) {
02002 bit->store_synthetic_data(
02003 iter->second.reader_data_.ddsSubscriptionData,
02004 DDS::NOT_NEW_VIEW_STATE);
02005 }
02006 #endif
02007
02008
02009 topic_name = get_topic_name(iter->second);
02010 OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it =
02011 topics_.find(topic_name);
02012 if (top_it != topics_.end()) {
02013 if (DCPS::DCPS_debug_level > 3) {
02014 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) Sedp::data_received(drd) - ")
02015 ACE_TEXT("calling match_endpoints update\n")));
02016 }
02017 match_endpoints(guid, top_it->second);
02018 }
02019 }
02020
02021 if (paramsChanged(iter->second.reader_data_.contentFilterProperty,
02022 rdata.contentFilterProperty)) {
02023
02024 topic_name = get_topic_name(iter->second);
02025 OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it =
02026 topics_.find(topic_name);
02027 using DCPS::RepoIdSet;
02028 const RepoIdSet& assoc =
02029 (top_it == topics_.end()) ? RepoIdSet() : top_it->second.endpoints_;
02030 for (RepoIdSet::const_iterator i = assoc.begin(); i != assoc.end(); ++i) {
02031 if (i->entityId.entityKind & 4) continue;
02032 const LocalPublicationIter lpi = local_publications_.find(*i);
02033 if (lpi != local_publications_.end()) {
02034 lpi->second.publication_->update_subscription_params(guid,
02035 rdata.contentFilterProperty.expressionParameters);
02036 }
02037 }
02038 }
02039 }
02040
02041 CORBA::ULong len = rdata.readerProxy.associatedWriters.length();
02042 for (CORBA::ULong writerIndex = 0; writerIndex < len; ++writerIndex)
02043 {
02044 GUID_t writerGuid = rdata.readerProxy.associatedWriters[writerIndex];
02045
02046
02047 LocalPublicationIter lp = local_publications_.find(writerGuid);
02048 if (lp != local_publications_.end()) {
02049
02050 if (lp->second.remote_opendds_associations_.insert(guid).second) {
02051
02052 lp->second.publication_->association_complete(guid);
02053 }
02054 }
02055 }
02056
02057 } else if (message_id == DCPS::UNREGISTER_INSTANCE ||
02058 message_id == DCPS::DISPOSE_INSTANCE ||
02059 message_id == DCPS::DISPOSE_UNREGISTER_INSTANCE) {
02060 if (iter != discovered_subscriptions_.end()) {
02061
02062 topic_name = get_topic_name(iter->second);
02063 OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it =
02064 topics_.find(topic_name);
02065 if (top_it != topics_.end()) {
02066 top_it->second.endpoints_.erase(guid);
02067 if (DCPS::DCPS_debug_level > 3) {
02068 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) Sedp::data_received(drd) - ")
02069 ACE_TEXT("calling match_endpoints disp/unreg\n")));
02070 }
02071 match_endpoints(guid, top_it->second, true );
02072 if (spdp_.shutting_down()) { return; }
02073 }
02074 remove_from_bit(iter->second);
02075 discovered_subscriptions_.erase(iter);
02076 }
02077 }
02078 }
02079
02080 void
02081 Sedp::Task::svc_i(DCPS::MessageId message_id,
02082 const DCPS::DiscoveredReaderData* prdata)
02083 {
02084 DCPS::unique_ptr<const DCPS::DiscoveredReaderData> delete_the_data(prdata);
02085 sedp_->data_received(message_id, *prdata);
02086 }
02087
02088 void
02089 Sedp::data_received(DCPS::MessageId message_id,
02090 const DCPS::DiscoveredReaderData& rdata)
02091 {
02092 if (spdp_.shutting_down()) { return; }
02093
02094 const RepoId& guid = rdata.readerProxy.remoteReaderGuid;
02095 RepoId guid_participant = guid;
02096 guid_participant.entityId = ENTITYID_PARTICIPANT;
02097
02098 ACE_GUARD(ACE_Thread_Mutex, g, lock_);
02099
02100 if (ignoring(guid)
02101 || ignoring(guid_participant)
02102 || ignoring(rdata.ddsSubscriptionData.topic_name)) {
02103 return;
02104 }
02105
02106 #if defined(OPENDDS_SECURITY)
02107 if (should_drop_message(rdata.ddsSubscriptionData.topic_name)) {
02108 return;
02109 }
02110 #endif
02111
02112 if (!spdp_.has_discovered_participant(guid_participant)) {
02113 deferred_subscriptions_[guid] = std::make_pair(message_id, rdata);
02114 return;
02115 }
02116
02117 process_discovered_reader_data(message_id, rdata, guid);
02118 }
02119
02120 #if defined(OPENDDS_SECURITY)
02121 void
02122 Sedp::Task::svc_i(DCPS::MessageId message_id,
02123 const DiscoveredReaderData_SecurityWrapper* data)
02124 {
02125 DCPS::unique_ptr<const DiscoveredReaderData_SecurityWrapper> delete_the_data(data);
02126 sedp_->data_received(message_id, *data);
02127 }
02128
02129 void Sedp::data_received(DCPS::MessageId message_id,
02130 const DiscoveredReaderData_SecurityWrapper& wrapper)
02131 {
02132 if (spdp_.shutting_down()) { return; }
02133
02134 const RepoId& guid = wrapper.data.readerProxy.remoteReaderGuid;
02135 RepoId guid_participant = guid;
02136 guid_participant.entityId = ENTITYID_PARTICIPANT;
02137
02138 ACE_GUARD(ACE_Thread_Mutex, g, lock_);
02139
02140 if (ignoring(guid)
02141 || ignoring(guid_participant)
02142 || ignoring(wrapper.data.ddsSubscriptionData.topic_name)) {
02143 return;
02144 }
02145
02146 process_discovered_reader_data(message_id, wrapper.data, guid, &wrapper.security_info);
02147 }
02148 #endif
02149
02150 void
02151 Sedp::Task::svc_i(DCPS::MessageId message_id,
02152 const ParticipantMessageData* data)
02153 {
02154 DCPS::unique_ptr<const ParticipantMessageData> delete_the_data(data);
02155 sedp_->data_received(message_id, *data);
02156 }
02157
02158 void
02159 Sedp::data_received(DCPS::MessageId ,
02160 const ParticipantMessageData& data)
02161 {
02162 if (spdp_.shutting_down()) { return; }
02163
02164 const RepoId& guid = data.participantGuid;
02165 RepoId guid_participant = guid;
02166 guid_participant.entityId = ENTITYID_PARTICIPANT;
02167 RepoId prefix = data.participantGuid;
02168 prefix.entityId = EntityId_t();
02169
02170 ACE_GUARD(ACE_Thread_Mutex, g, lock_);
02171
02172 if (ignoring(guid)
02173 || ignoring(guid_participant)) {
02174 return;
02175 }
02176
02177 if (!spdp_.has_discovered_participant(guid_participant)) {
02178 return;
02179 }
02180
02181 for (LocalSubscriptionMap::const_iterator sub_pos = local_subscriptions_.begin(),
02182 sub_limit = local_subscriptions_.end();
02183 sub_pos != sub_limit; ++sub_pos) {
02184 const DCPS::RepoIdSet::const_iterator pos =
02185 sub_pos->second.matched_endpoints_.lower_bound(prefix);
02186 if (pos != sub_pos->second.matched_endpoints_.end() &&
02187 DCPS::GuidPrefixEqual()(pos->guidPrefix, prefix.guidPrefix)) {
02188 sub_pos->second.subscription_->signal_liveliness(guid_participant);
02189 }
02190 }
02191 }
02192
02193 #if defined(OPENDDS_SECURITY)
02194 void
02195 Sedp::Task::svc_participant_message_data_secure(DCPS::MessageId message_id,
02196 const ParticipantMessageData* data)
02197 {
02198 DCPS::unique_ptr<const ParticipantMessageData> delete_the_data(data);
02199 sedp_->received_participant_message_data_secure(message_id, *data);
02200 }
02201
02202 void
02203 Sedp::received_participant_message_data_secure(DCPS::MessageId ,
02204 const ParticipantMessageData& data)
02205 {
02206 if (spdp_.shutting_down()) {
02207 return;
02208 }
02209
02210 const RepoId& guid = data.participantGuid;
02211 RepoId guid_participant = guid;
02212 guid_participant.entityId = ENTITYID_PARTICIPANT;
02213 RepoId prefix = data.participantGuid;
02214 prefix.entityId = EntityId_t();
02215
02216 ACE_GUARD(ACE_Thread_Mutex, g, lock_);
02217
02218 if (ignoring(guid) || ignoring(guid_participant)) {
02219 return;
02220 }
02221
02222 if (!spdp_.has_discovered_participant(guid_participant)) {
02223 return;
02224 }
02225
02226 LocalSubscriptionMap::const_iterator i, n;
02227 for (i = local_subscriptions_.begin(), n = local_subscriptions_.end(); i != n; ++i) {
02228 const DCPS::RepoIdSet::const_iterator pos = i->second.matched_endpoints_.lower_bound(prefix);
02229
02230 if (pos != i->second.matched_endpoints_.end() && DCPS::GuidPrefixEqual()(pos->guidPrefix, prefix.guidPrefix)) {
02231 (i->second.subscription_)->signal_liveliness(guid_participant);
02232 }
02233 }
02234 }
02235
02236 bool Sedp::should_drop_stateless_message(const DDS::Security::ParticipantGenericMessage& msg)
02237 {
02238 using DCPS::GUID_t;
02239 using DCPS::GUID_UNKNOWN;
02240
02241 ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, true);
02242
02243 const GUID_t src_endpoint = msg.source_endpoint_guid;
02244 const GUID_t dst_endpoint = msg.destination_endpoint_guid;
02245 const GUID_t this_endpoint = participant_stateless_message_reader_->get_repo_id();
02246 const GUID_t dst_participant = msg.destination_participant_guid;
02247 const GUID_t this_participant = participant_id_;
02248
02249 if (ignoring(src_endpoint)) {
02250 return true;
02251 }
02252
02253 if (dst_participant != GUID_UNKNOWN && dst_participant != this_participant) {
02254 return true;
02255 }
02256
02257 if (dst_endpoint != GUID_UNKNOWN && dst_endpoint != this_endpoint) {
02258 return true;
02259 }
02260
02261 return false;
02262 }
02263
02264 bool Sedp::should_drop_volatile_message(const DDS::Security::ParticipantGenericMessage& msg)
02265 {
02266 using DCPS::GUID_t;
02267 using DCPS::GUID_UNKNOWN;
02268
02269 ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, true);
02270
02271 const GUID_t src_endpoint = msg.source_endpoint_guid;
02272 const GUID_t dst_participant = msg.destination_participant_guid;
02273 const GUID_t this_participant = participant_id_;
02274
02275 if (ignoring(src_endpoint)) {
02276 return true;
02277 }
02278
02279 if (dst_participant != GUID_UNKNOWN && dst_participant != this_participant) {
02280 return true;
02281 }
02282
02283 return false;
02284 }
02285
02286 bool Sedp::should_drop_message(const char* unsecure_topic_name)
02287 {
02288 if (is_security_enabled()) {
02289 DDS::Security::TopicSecurityAttributes attribs;
02290 DDS::Security::SecurityException ex = {"", 0, 0};
02291
02292 bool ok = get_access_control()->get_topic_sec_attributes(
02293 get_permissions_handle(),
02294 unsecure_topic_name,
02295 attribs,
02296 ex);
02297
02298 if (!ok || attribs.is_discovery_protected) {
02299 return true;
02300 }
02301 }
02302
02303 return false;
02304 }
02305
02306 void
02307 Sedp::Task::svc_stateless_message(DCPS::MessageId id,
02308 const DDS::Security::ParticipantStatelessMessage* data)
02309 {
02310 DCPS::unique_ptr<const DDS::Security::ParticipantStatelessMessage> delete_the_data(data);
02311 sedp_->received_stateless_message(id, *data);
02312 }
02313
02314 void
02315 Sedp::received_stateless_message(DCPS::MessageId ,
02316 const DDS::Security::ParticipantStatelessMessage& msg)
02317 {
02318 if (spdp_.shutting_down()) {
02319 return;
02320 }
02321
02322 if (should_drop_stateless_message(msg)) {
02323 return;
02324 }
02325
02326 if (0 == std::strcmp(msg.message_class_id,
02327 DDS::Security::GMCLASSID_SECURITY_AUTH_REQUEST)) {
02328 spdp_.handle_auth_request(msg);
02329
02330 } else if (0 == std::strcmp(msg.message_class_id,
02331 DDS::Security::GMCLASSID_SECURITY_AUTH_HANDSHAKE)) {
02332 spdp_.handle_handshake_message(msg);
02333 }
02334 return;
02335 }
02336
02337 void
02338 Sedp::Task::svc_volatile_message_secure(DCPS::MessageId id,
02339 const DDS::Security::ParticipantVolatileMessageSecure* data)
02340 {
02341 DCPS::unique_ptr<const DDS::Security::ParticipantVolatileMessageSecure> delete_the_data(data);
02342 sedp_->received_volatile_message_secure(id, *data);
02343 }
02344
02345 void
02346 Sedp::received_volatile_message_secure(DCPS::MessageId ,
02347 const DDS::Security::ParticipantVolatileMessageSecure& msg)
02348 {
02349 if (spdp_.shutting_down()) {
02350 return;
02351 }
02352
02353 if (should_drop_volatile_message(msg)) {
02354 return;
02355 }
02356
02357 if (0 == std::strcmp(msg.message_class_id,
02358 DDS::Security::GMCLASSID_SECURITY_PARTICIPANT_CRYPTO_TOKENS)) {
02359 spdp_.handle_participant_crypto_tokens(msg);
02360 } else if (0 == std::strcmp(msg.message_class_id,
02361 DDS::Security::GMCLASSID_SECURITY_DATAWRITER_CRYPTO_TOKENS)) {
02362 handle_datawriter_crypto_tokens(msg);
02363 } else if (0 == std::strcmp(msg.message_class_id,
02364 DDS::Security::GMCLASSID_SECURITY_DATAREADER_CRYPTO_TOKENS)) {
02365 handle_datareader_crypto_tokens(msg);
02366 }
02367 return;
02368 }
02369 #endif
02370
02371 bool
02372 Sedp::is_opendds(const GUID_t& endpoint) const
02373 {
02374 GUID_t participant = endpoint;
02375 participant.entityId = DCPS::ENTITYID_PARTICIPANT;
02376 return spdp_.is_opendds(participant);
02377 }
02378
02379 void
02380 Sedp::association_complete(const RepoId& localId,
02381 const RepoId& remoteId)
02382 {
02383
02384 if (is_opendds(remoteId)) {
02385 LocalSubscriptionIter sub = local_subscriptions_.find(localId);
02386
02387 if (sub != local_subscriptions_.end()) {
02388 std::pair<DCPS::RepoIdSet::iterator, bool> result =
02389 sub->second.remote_opendds_associations_.insert(remoteId);
02390
02391 if (result.second) {
02392
02393 write_subscription_data(localId, sub->second);
02394 }
02395 }
02396 }
02397 }
02398
02399 void Sedp::signal_liveliness(DDS::LivelinessQosPolicyKind kind)
02400 {
02401
02402 #if defined(OPENDDS_SECURITY)
02403 DDS::Security::SecurityException se = {"", 0, 0};
02404 DDS::Security::TopicSecurityAttributes attribs;
02405
02406 if (is_security_enabled()) {
02407
02408
02409 bool ok = get_access_control()->get_topic_sec_attributes(
02410 get_permissions_handle(), "DCPSParticipantMessageSecure", attribs, se);
02411
02412 if (ok) {
02413
02414 if (attribs.is_liveliness_protected) {
02415 signal_liveliness_secure(kind);
02416
02417 } else {
02418 signal_liveliness_unsecure(kind);
02419 }
02420
02421 } else {
02422 ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: Sedp::signal_liveliness() - ")
02423 ACE_TEXT("Failure calling get_topic_sec_attributes(). Security Exception[%d.%d]: %C\n"),
02424 se.code, se.minor_code, se.message.in()));
02425 }
02426
02427 } else {
02428 #endif
02429
02430 signal_liveliness_unsecure(kind);
02431
02432 #if defined(OPENDDS_SECURITY)
02433 }
02434 #endif
02435 }
02436
02437 void
02438 Sedp::signal_liveliness_unsecure(DDS::LivelinessQosPolicyKind kind)
02439 {
02440 ParticipantMessageData data;
02441 data.participantGuid = participant_id_;
02442
02443 switch (kind) {
02444 case DDS::AUTOMATIC_LIVELINESS_QOS:
02445 data.participantGuid.entityId = DCPS::EntityIdConverter(PARTICIPANT_MESSAGE_DATA_KIND_AUTOMATIC_LIVELINESS_UPDATE);
02446 participant_message_writer_.write_participant_message(data, GUID_UNKNOWN, automatic_liveliness_seq_);
02447 break;
02448
02449 case DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS:
02450 data.participantGuid.entityId = DCPS::EntityIdConverter(PARTICIPANT_MESSAGE_DATA_KIND_MANUAL_LIVELINESS_UPDATE);
02451 participant_message_writer_.write_participant_message(data, GUID_UNKNOWN, manual_liveliness_seq_);
02452 break;
02453
02454 case DDS::MANUAL_BY_TOPIC_LIVELINESS_QOS:
02455
02456 break;
02457 }
02458 }
02459
02460 #if defined(OPENDDS_SECURITY)
02461 void
02462 Sedp::signal_liveliness_secure(DDS::LivelinessQosPolicyKind kind)
02463 {
02464 ParticipantMessageData data;
02465 data.participantGuid = participant_id_;
02466
02467 switch (kind) {
02468 case DDS::AUTOMATIC_LIVELINESS_QOS:
02469 data.participantGuid.entityId = DCPS::EntityIdConverter(PARTICIPANT_MESSAGE_DATA_KIND_AUTOMATIC_LIVELINESS_UPDATE);
02470 participant_message_secure_writer_.write_participant_message(data, GUID_UNKNOWN, secure_automatic_liveliness_seq_);
02471 break;
02472
02473 case DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS:
02474 data.participantGuid.entityId = DCPS::EntityIdConverter(PARTICIPANT_MESSAGE_DATA_KIND_MANUAL_LIVELINESS_UPDATE);
02475 participant_message_secure_writer_.write_participant_message(data, GUID_UNKNOWN, secure_manual_liveliness_seq_);
02476 break;
02477
02478 case DDS::MANUAL_BY_TOPIC_LIVELINESS_QOS:
02479
02480 break;
02481 }
02482 }
02483 #endif
02484
02485 Sedp::Endpoint::~Endpoint()
02486 {
02487 }
02488
02489
02490 Sedp::Writer::Writer(const RepoId& pub_id, Sedp& sedp)
02491 : Endpoint(pub_id, sedp)
02492 {
02493 header_.prefix[0] = 'R';
02494 header_.prefix[1] = 'T';
02495 header_.prefix[2] = 'P';
02496 header_.prefix[3] = 'S';
02497 header_.version = PROTOCOLVERSION;
02498 header_.vendorId = VENDORID_OPENDDS;
02499 header_.guidPrefix[0] = pub_id.guidPrefix[0];
02500 header_.guidPrefix[1] = pub_id.guidPrefix[1],
02501 header_.guidPrefix[2] = pub_id.guidPrefix[2];
02502 header_.guidPrefix[3] = pub_id.guidPrefix[3];
02503 header_.guidPrefix[4] = pub_id.guidPrefix[4];
02504 header_.guidPrefix[5] = pub_id.guidPrefix[5];
02505 header_.guidPrefix[6] = pub_id.guidPrefix[6];
02506 header_.guidPrefix[7] = pub_id.guidPrefix[7];
02507 header_.guidPrefix[8] = pub_id.guidPrefix[8];
02508 header_.guidPrefix[9] = pub_id.guidPrefix[9];
02509 header_.guidPrefix[10] = pub_id.guidPrefix[10];
02510 header_.guidPrefix[11] = pub_id.guidPrefix[11];
02511
02512
02513
02514
02515 RcObject::_add_ref();
02516 }
02517
02518 Sedp::Writer::~Writer()
02519 {
02520 }
02521
02522 bool
02523 Sedp::Writer::assoc(const DCPS::AssociationData& subscription)
02524 {
02525 return associate(subscription, true);
02526 }
02527
02528 void
02529 Sedp::Writer::data_delivered(const DCPS::DataSampleElement* dsle)
02530 {
02531 delete dsle;
02532 }
02533
02534 void
02535 Sedp::Writer::data_dropped(const DCPS::DataSampleElement* dsle, bool)
02536 {
02537 delete dsle;
02538 }
02539
02540 void
02541 Sedp::Writer::control_delivered(const DCPS::Message_Block_Ptr& )
02542 {
02543 }
02544
02545 void
02546 Sedp::Writer::control_dropped(const DCPS::Message_Block_Ptr& , bool)
02547 {
02548 }
02549
02550 void Sedp::Writer::send_sample(const ACE_Message_Block& data,
02551 size_t size,
02552 const RepoId& reader,
02553 DCPS::SequenceNumber& sequence,
02554 bool historic)
02555 {
02556 DCPS::DataSampleElement* el = new DCPS::DataSampleElement(repo_id_, this, DCPS::PublicationInstance_rch());
02557 set_header_fields(el->get_header(), size, reader, sequence, historic);
02558
02559 DCPS::Message_Block_Ptr sample(new ACE_Message_Block(size));
02560 el->set_sample(DCPS::move(sample));
02561 *el->get_sample() << el->get_header();
02562 el->get_sample()->cont(data.duplicate());
02563
02564 if (reader != GUID_UNKNOWN) {
02565 el->set_sub_id(0, reader);
02566 el->set_num_subs(1);
02567 }
02568
02569 DCPS::SendStateDataSampleList list;
02570 list.enqueue_tail(el);
02571
02572 send(list);
02573 }
02574
02575 DDS::ReturnCode_t
02576 Sedp::Writer::write_parameter_list(const ParameterList& plist,
02577 const RepoId& reader,
02578 DCPS::SequenceNumber& sequence)
02579 {
02580 DDS::ReturnCode_t result = DDS::RETCODE_OK;
02581
02582
02583 size_t size = 0, padding = 0;
02584 DCPS::find_size_ulong(size, padding);
02585 DCPS::gen_find_size(plist, size, padding);
02586
02587
02588 ACE_Message_Block payload(DCPS::DataSampleHeader::max_marshaled_size(),
02589 ACE_Message_Block::MB_DATA,
02590 new ACE_Message_Block(size));
02591 using DCPS::Serializer;
02592 Serializer ser(payload.cont(), host_is_bigendian_, Serializer::ALIGN_CDR);
02593 bool ok = (ser << ACE_OutputCDR::from_octet(0)) &&
02594 (ser << ACE_OutputCDR::from_octet(3)) &&
02595 (ser << ACE_OutputCDR::from_octet(0)) &&
02596 (ser << ACE_OutputCDR::from_octet(0)) &&
02597 (ser << plist);
02598
02599 if (ok) {
02600 send_sample(payload, size, reader, sequence, reader != GUID_UNKNOWN);
02601
02602 } else {
02603 result = DDS::RETCODE_ERROR;
02604 }
02605
02606 delete payload.cont();
02607 return result;
02608 }
02609
02610 DDS::ReturnCode_t
02611 Sedp::Writer::write_participant_message(const ParticipantMessageData& pmd,
02612 const RepoId& reader,
02613 DCPS::SequenceNumber& sequence)
02614 {
02615 DDS::ReturnCode_t result = DDS::RETCODE_OK;
02616
02617
02618 size_t size = 0, padding = 0;
02619 DCPS::find_size_ulong(size, padding);
02620 DCPS::gen_find_size(pmd, size, padding);
02621
02622
02623 ACE_Message_Block payload(DCPS::DataSampleHeader::max_marshaled_size(),
02624 ACE_Message_Block::MB_DATA,
02625 new ACE_Message_Block(size));
02626 using DCPS::Serializer;
02627 Serializer ser(payload.cont(), host_is_bigendian_, Serializer::ALIGN_CDR);
02628 bool ok = (ser << ACE_OutputCDR::from_octet(0)) &&
02629 (ser << ACE_OutputCDR::from_octet(1)) &&
02630 (ser << ACE_OutputCDR::from_octet(0)) &&
02631 (ser << ACE_OutputCDR::from_octet(0)) &&
02632 (ser << pmd);
02633
02634 if (ok) {
02635 send_sample(payload, size, reader, sequence);
02636
02637 } else {
02638 result = DDS::RETCODE_ERROR;
02639 }
02640
02641 delete payload.cont();
02642 return result;
02643 }
02644
02645 #if defined(OPENDDS_SECURITY)
02646 DDS::ReturnCode_t
02647 Sedp::Writer::write_stateless_message(const DDS::Security::ParticipantStatelessMessage& msg,
02648 const RepoId& reader,
02649 DCPS::SequenceNumber& sequence)
02650 {
02651 using DCPS::Serializer;
02652
02653 DDS::ReturnCode_t result = DDS::RETCODE_OK;
02654
02655 size_t size = 0, padding = 0;
02656 DCPS::find_size_ulong(size, padding);
02657 DCPS::gen_find_size(msg, size, padding);
02658
02659 ACE_Message_Block payload(
02660 DCPS::DataSampleHeader::max_marshaled_size(),
02661 ACE_Message_Block::MB_DATA,
02662 new ACE_Message_Block(size + padding));
02663
02664 Serializer ser(payload.cont(), host_is_bigendian_, Serializer::ALIGN_CDR);
02665 bool ok = (ser << ACE_OutputCDR::from_octet(0)) &&
02666 (ser << ACE_OutputCDR::from_octet(1)) &&
02667 (ser << ACE_OutputCDR::from_octet(0)) &&
02668 (ser << ACE_OutputCDR::from_octet(0));
02669 ser.reset_alignment();
02670 ok &= (ser << msg);
02671
02672 if (ok) {
02673 send_sample(payload, size, reader, sequence);
02674
02675 } else {
02676 result = DDS::RETCODE_ERROR;
02677 }
02678
02679 delete payload.cont();
02680 return result;
02681 }
02682
02683 DDS::ReturnCode_t
02684 Sedp::Writer::write_volatile_message_secure(const DDS::Security::ParticipantVolatileMessageSecure& msg,
02685 const RepoId& reader,
02686 DCPS::SequenceNumber& sequence)
02687 {
02688 using DCPS::Serializer;
02689
02690 DDS::ReturnCode_t result = DDS::RETCODE_OK;
02691
02692 size_t size = 0, padding = 0;
02693 DCPS::find_size_ulong(size, padding);
02694 DCPS::gen_find_size(msg, size, padding);
02695
02696 ACE_Message_Block payload(
02697 DCPS::DataSampleHeader::max_marshaled_size(),
02698 ACE_Message_Block::MB_DATA,
02699 new ACE_Message_Block(size + padding));
02700
02701 Serializer ser(payload.cont(), host_is_bigendian_, Serializer::ALIGN_CDR);
02702 bool ok = (ser << ACE_OutputCDR::from_octet(0)) &&
02703 (ser << ACE_OutputCDR::from_octet(1)) &&
02704 (ser << ACE_OutputCDR::from_octet(0)) &&
02705 (ser << ACE_OutputCDR::from_octet(0));
02706 ser.reset_alignment();
02707 ok &= (ser << msg);
02708
02709 if (ok) {
02710 send_sample(payload, size, reader, sequence);
02711
02712 } else {
02713 result = DDS::RETCODE_ERROR;
02714 }
02715
02716 delete payload.cont();
02717 return result;
02718 }
02719
02720 DDS::ReturnCode_t
02721 Sedp::Writer::write_dcps_participant_secure(const Security::SPDPdiscoveredParticipantData& msg,
02722 const RepoId& reader,
02723 DCPS::SequenceNumber& sequence)
02724 {
02725 using DCPS::Serializer;
02726
02727 DDS::ReturnCode_t result = DDS::RETCODE_OK;
02728
02729 ParameterList plist;
02730
02731 bool err = ParameterListConverter::to_param_list(msg, plist);
02732
02733 if (err) {
02734 ACE_ERROR((LM_ERROR,
02735 ACE_TEXT("(%P|%t) ERROR: Sedp::write_dcps_participant_secure - ")
02736 ACE_TEXT("Failed to convert SPDPdiscoveredParticipantData ")
02737 ACE_TEXT("to ParameterList\n")));
02738
02739 result = DDS::RETCODE_ERROR;
02740
02741 } else {
02742 result = write_parameter_list(plist, reader, sequence);
02743 }
02744
02745 return result;
02746 }
02747 #endif
02748
02749 DDS::ReturnCode_t
02750 Sedp::Writer::write_unregister_dispose(const RepoId& rid, CORBA::UShort pid)
02751 {
02752
02753 Parameter param;
02754 param.guid(rid);
02755 param._d(pid);
02756 ParameterList plist;
02757 plist.length(1);
02758 plist[0] = param;
02759
02760
02761 size_t size = 0, padding = 0;
02762 DCPS::find_size_ulong(size, padding);
02763 DCPS::gen_find_size(plist, size, padding);
02764
02765 DCPS::Message_Block_Ptr payload(new ACE_Message_Block(DCPS::DataSampleHeader::max_marshaled_size(),
02766 ACE_Message_Block::MB_DATA,
02767 new ACE_Message_Block(size)));
02768
02769 if (!payload) {
02770 ACE_ERROR((LM_ERROR,
02771 ACE_TEXT("(%P|%t) ERROR: Sedp::Writer::write_unregister_dispose")
02772 ACE_TEXT(" - Failed to allocate message block message\n")));
02773 return DDS::RETCODE_ERROR;
02774 }
02775
02776 using DCPS::Serializer;
02777 Serializer ser(payload->cont(), host_is_bigendian_, Serializer::ALIGN_CDR);
02778 bool ok = (ser << ACE_OutputCDR::from_octet(0)) &&
02779 (ser << ACE_OutputCDR::from_octet(3)) &&
02780 (ser << ACE_OutputCDR::from_octet(0)) &&
02781 (ser << ACE_OutputCDR::from_octet(0)) &&
02782 (ser << plist);
02783
02784 if (ok) {
02785
02786 write_control_msg(move(payload), size, DCPS::DISPOSE_UNREGISTER_INSTANCE);
02787 return DDS::RETCODE_OK;
02788 } else {
02789
02790 ACE_ERROR((LM_ERROR,
02791 ACE_TEXT("(%P|%t) ERROR: Sedp::Writer::write_unregister_dispose")
02792 ACE_TEXT(" - Failed to serialize RTPS control message\n")));
02793 return DDS::RETCODE_ERROR;
02794 }
02795 }
02796
02797 void
02798 Sedp::Writer::end_historic_samples(const RepoId& reader)
02799 {
02800 const void* pReader = static_cast<const void*>(&reader);
02801 DCPS::Message_Block_Ptr mb(new ACE_Message_Block (DCPS::DataSampleHeader::max_marshaled_size(),
02802 ACE_Message_Block::MB_DATA,
02803 new ACE_Message_Block(static_cast<const char*>(pReader),
02804 sizeof(reader))));
02805 if (mb.get()) {
02806 mb->cont()->wr_ptr(sizeof(reader));
02807
02808 write_control_msg(move(mb), sizeof(reader), DCPS::END_HISTORIC_SAMPLES,
02809 DCPS::SequenceNumber::SEQUENCENUMBER_UNKNOWN());
02810 } else {
02811 ACE_ERROR((LM_ERROR,
02812 ACE_TEXT("(%P|%t) ERROR: Sedp::Writer::end_historic_samples")
02813 ACE_TEXT(" - Failed to allocate message block message\n")));
02814 }
02815 }
02816
02817 void
02818 Sedp::Writer::write_control_msg(DCPS::Message_Block_Ptr payload,
02819 size_t size,
02820 DCPS::MessageId id,
02821 DCPS::SequenceNumber seq)
02822 {
02823 DCPS::DataSampleHeader header;
02824 set_header_fields(header, size, GUID_UNKNOWN, seq, false, id);
02825
02826 send_control(header, DCPS::move(payload));
02827 }
02828
02829 void
02830 Sedp::Writer::set_header_fields(DCPS::DataSampleHeader& dsh,
02831 size_t size,
02832 const RepoId& reader,
02833 DCPS::SequenceNumber& sequence,
02834 bool historic_sample,
02835 DCPS::MessageId id)
02836 {
02837 dsh.message_id_ = id;
02838 dsh.byte_order_ = ACE_CDR_BYTE_ORDER;
02839 dsh.message_length_ = static_cast<ACE_UINT32>(size);
02840 dsh.publication_id_ = repo_id_;
02841
02842 if (id != DCPS::END_HISTORIC_SAMPLES &&
02843 (reader == GUID_UNKNOWN ||
02844 sequence == DCPS::SequenceNumber::SEQUENCENUMBER_UNKNOWN())) {
02845 sequence = seq_++;
02846 }
02847
02848 if (historic_sample && reader != GUID_UNKNOWN) {
02849
02850 dsh.historic_sample_ = true;
02851 }
02852
02853 dsh.sequence_ = sequence;
02854
02855 const ACE_Time_Value now = ACE_OS::gettimeofday();
02856 dsh.source_timestamp_sec_ = static_cast<ACE_INT32>(now.sec());
02857 dsh.source_timestamp_nanosec_ = now.usec() * 1000;
02858 }
02859
02860
02861
02862 Sedp::Reader::~Reader()
02863 {}
02864
02865 bool
02866 Sedp::Reader::assoc(const DCPS::AssociationData& publication)
02867 {
02868 return associate(publication, false);
02869 }
02870
02871
02872
02873
02874 static bool
02875 decode_parameter_list(const DCPS::ReceivedDataSample& sample,
02876 DCPS::Serializer& ser,
02877 const ACE_CDR::Octet& encap,
02878 ParameterList& data)
02879 {
02880 if (sample.header_.key_fields_only_ && encap < 2) {
02881 GUID_t guid;
02882 if (!(ser >> guid)) return false;
02883 data.length(1);
02884 data[0].guid(guid);
02885 data[0]._d(PID_ENDPOINT_GUID);
02886 } else {
02887 return ser >> data;
02888 }
02889 return true;
02890 }
02891
02892 void
02893 Sedp::Reader::data_received(const DCPS::ReceivedDataSample& sample)
02894 {
02895 if (shutting_down_.value()) return;
02896
02897 switch (sample.header_.message_id_) {
02898 case DCPS::SAMPLE_DATA:
02899 case DCPS::DISPOSE_INSTANCE:
02900 case DCPS::UNREGISTER_INSTANCE:
02901 case DCPS::DISPOSE_UNREGISTER_INSTANCE: {
02902 const DCPS::MessageId id =
02903 static_cast<DCPS::MessageId>(sample.header_.message_id_);
02904
02905 DCPS::Serializer ser(sample.sample_.get(),
02906 sample.header_.byte_order_ != ACE_CDR_BYTE_ORDER,
02907 DCPS::Serializer::ALIGN_CDR);
02908 ACE_CDR::Octet encap, dummy;
02909 ACE_CDR::UShort options;
02910 const bool ok = (ser >> ACE_InputCDR::to_octet(dummy))
02911 && (ser >> ACE_InputCDR::to_octet(encap))
02912 && (ser >> options);
02913 if (!ok) {
02914 ACE_ERROR((LM_ERROR, ACE_TEXT("ERROR: Sedp::Reader::data_received - ")
02915 ACE_TEXT("failed to deserialize encap and options\n")));
02916 return;
02917 }
02918
02919
02920
02921
02922 if (sample.header_.publication_id_.entityId == ENTITYID_SEDP_BUILTIN_PUBLICATIONS_WRITER) {
02923 ParameterList data;
02924 if (!decode_parameter_list(sample, ser, encap, data)) {
02925 ACE_ERROR((LM_ERROR, ACE_TEXT("ERROR: Sedp::Reader::data_received - ")
02926 ACE_TEXT("failed to deserialize data\n")));
02927 return;
02928 }
02929
02930 DCPS::unique_ptr<DCPS::DiscoveredWriterData> wdata(new DCPS::DiscoveredWriterData);
02931 if (ParameterListConverter::from_param_list(data, *wdata) < 0) {
02932 ACE_ERROR((LM_ERROR,
02933 ACE_TEXT("(%P|%t) ERROR: Sedp::Reader::data_received - ")
02934 ACE_TEXT("failed to convert from ParameterList ")
02935 ACE_TEXT("to DiscoveredWriterData\n")));
02936 return;
02937 }
02938 sedp_.task_.enqueue(id, move(wdata));
02939
02940 #if defined(OPENDDS_SECURITY)
02941 } else if (sample.header_.publication_id_.entityId == ENTITYID_SEDP_BUILTIN_PUBLICATIONS_SECURE_WRITER) {
02942 ParameterList data;
02943 if (!decode_parameter_list(sample, ser, encap, data)) {
02944 ACE_ERROR((LM_ERROR, ACE_TEXT("ERROR: Sedp::Reader::data_received - ")
02945 ACE_TEXT("failed to deserialize data\n")));
02946 return;
02947 }
02948
02949 DCPS::unique_ptr<DiscoveredWriterData_SecurityWrapper> wdata_secure(new DiscoveredWriterData_SecurityWrapper);
02950
02951 if (ParameterListConverter::from_param_list(data, *wdata_secure) < 0) {
02952 ACE_ERROR((LM_ERROR,
02953 ACE_TEXT("(%P|%t) ERROR: Sedp::Reader::data_received - ")
02954 ACE_TEXT("failed to convert from ParameterList ")
02955 ACE_TEXT("to DiscoveredWriterData_SecurityWrapper\n")));
02956 return;
02957 }
02958 sedp_.task_.enqueue(id, move(wdata_secure));
02959 #endif
02960
02961 } else if (sample.header_.publication_id_.entityId == ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_WRITER) {
02962 ParameterList data;
02963 if (!decode_parameter_list(sample, ser, encap, data)) {
02964 ACE_ERROR((LM_ERROR, ACE_TEXT("ERROR: Sedp::Reader::data_received - ")
02965 ACE_TEXT("failed to deserialize data\n")));
02966 return;
02967 }
02968
02969 DCPS::unique_ptr<DCPS::DiscoveredReaderData> rdata(new DCPS::DiscoveredReaderData);
02970 if (ParameterListConverter::from_param_list(data, *rdata) < 0) {
02971 ACE_ERROR((LM_ERROR,
02972 ACE_TEXT("(%P|%t) ERROR Sedp::Reader::data_received - ")
02973 ACE_TEXT("failed to convert from ParameterList ")
02974 ACE_TEXT("to DiscoveredReaderData\n")));
02975 return;
02976 }
02977 if (rdata->readerProxy.expectsInlineQos) {
02978 set_inline_qos(rdata->readerProxy.allLocators);
02979 }
02980 sedp_.task_.enqueue(id, move(rdata));
02981
02982 #if defined(OPENDDS_SECURITY)
02983 } else if (sample.header_.publication_id_.entityId == ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_SECURE_WRITER) {
02984 ParameterList data;
02985 if (!decode_parameter_list(sample, ser, encap, data)) {
02986 ACE_ERROR((LM_ERROR, ACE_TEXT("ERROR: Sedp::Reader::data_received - ")
02987 ACE_TEXT("failed to deserialize data\n")));
02988 return;
02989 }
02990
02991 DCPS::unique_ptr<DiscoveredReaderData_SecurityWrapper> rdata(new DiscoveredReaderData_SecurityWrapper);
02992
02993 if (ParameterListConverter::from_param_list(data, *rdata) < 0) {
02994 ACE_ERROR((LM_ERROR,
02995 ACE_TEXT("(%P|%t) ERROR Sedp::Reader::data_received - ")
02996 ACE_TEXT("failed to convert from ParameterList ")
02997 ACE_TEXT("to DiscoveredReaderData_SecurityWrapper\n")));
02998 return;
02999 }
03000 if ((rdata->data).readerProxy.expectsInlineQos) {
03001 set_inline_qos((rdata->data).readerProxy.allLocators);
03002 }
03003 sedp_.task_.enqueue(id, move(rdata));
03004 #endif
03005
03006 } else if (sample.header_.publication_id_.entityId == ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_WRITER
03007 && !sample.header_.key_fields_only_) {
03008 DCPS::unique_ptr<ParticipantMessageData> data(new ParticipantMessageData);
03009 if (!(ser >> *data)) {
03010 ACE_ERROR((LM_ERROR, ACE_TEXT("ERROR: Sedp::Reader::data_received - ")
03011 ACE_TEXT("failed to deserialize data\n")));
03012 return;
03013 }
03014 sedp_.task_.enqueue(id, move(data));
03015
03016 #if defined(OPENDDS_SECURITY)
03017 } else if (sample.header_.publication_id_.entityId == ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_SECURE_WRITER
03018 && !sample.header_.key_fields_only_) {
03019
03020 DCPS::unique_ptr<ParticipantMessageData> data(new ParticipantMessageData);
03021
03022 if (!(ser >> *data)) {
03023 ACE_ERROR((LM_ERROR, ACE_TEXT("ERROR: Sedp::Reader::data_received - ")
03024 ACE_TEXT("failed to deserialize data\n")));
03025 return;
03026 }
03027 sedp_.task_.enqueue_participant_message_secure(id, move(data));
03028
03029 } else if (sample.header_.publication_id_.entityId == ENTITYID_P2P_BUILTIN_PARTICIPANT_STATELESS_WRITER) {
03030
03031 DCPS::unique_ptr<DDS::Security::ParticipantStatelessMessage> data(new DDS::Security::ParticipantStatelessMessage);
03032 ser.reset_alignment();
03033 if (!(ser >> *data)) {
03034 ACE_ERROR((LM_ERROR, ACE_TEXT("ERROR: Sedp::Reader::data_received - ")
03035 ACE_TEXT("failed to deserialize data\n")));
03036 return;
03037 }
03038 sedp_.task_.enqueue_stateless_message(id, move(data));
03039
03040 } else if (sample.header_.publication_id_.entityId == ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_WRITER) {
03041
03042 DCPS::unique_ptr<DDS::Security::ParticipantVolatileMessageSecure> data(new DDS::Security::ParticipantVolatileMessageSecure);
03043 ser.reset_alignment();
03044 if (!(ser >> *data)) {
03045 ACE_ERROR((LM_ERROR, ACE_TEXT("ERROR: Sedp::Reader::data_received - ")
03046 ACE_TEXT("failed to deserialize data\n")));
03047 return;
03048 }
03049 sedp_.task_.enqueue_volatile_message_secure(id, move(data));
03050
03051 } else if (sample.header_.publication_id_.entityId == ENTITYID_SPDP_RELIABLE_BUILTIN_PARTICIPANT_SECURE_WRITER) {
03052
03053 ParameterList data;
03054 if (!decode_parameter_list(sample, ser, encap, data)) {
03055 ACE_ERROR((LM_ERROR, ACE_TEXT("ERROR: Sedp::Reader::data_received - ")
03056 ACE_TEXT("failed to deserialize data\n")));
03057 return;
03058 }
03059
03060 DCPS::unique_ptr<Security::SPDPdiscoveredParticipantData> pdata(new Security::SPDPdiscoveredParticipantData);
03061
03062 if (ParameterListConverter::from_param_list(data, *pdata) < 0) {
03063 ACE_ERROR((LM_ERROR,
03064 ACE_TEXT("(%P|%t) ERROR: Sedp::Reader::data_received - ")
03065 ACE_TEXT("failed to convert from ParameterList ")
03066 ACE_TEXT("to Security::SPDPdiscoveredParticipantData\n")));
03067 return;
03068 }
03069 sedp_.task_.enqueue(id, move(pdata));
03070 #endif
03071
03072 }
03073 break;
03074 }
03075
03076 default:
03077 break;
03078 }
03079 }
03080
03081 void
03082 Sedp::populate_discovered_writer_msg(
03083 DCPS::DiscoveredWriterData& dwd,
03084 const RepoId& publication_id,
03085 const LocalPublication& pub)
03086 {
03087
03088
03089 OPENDDS_STRING topic_name = topic_names_[pub.topic_id_];
03090 dwd.ddsPublicationData.topic_name = topic_name.c_str();
03091 TopicDetails& topic_details = topics_[topic_name];
03092 dwd.ddsPublicationData.type_name = topic_details.data_type_.c_str();
03093 dwd.ddsPublicationData.durability = pub.qos_.durability;
03094 dwd.ddsPublicationData.durability_service = pub.qos_.durability_service;
03095 dwd.ddsPublicationData.deadline = pub.qos_.deadline;
03096 dwd.ddsPublicationData.latency_budget = pub.qos_.latency_budget;
03097 dwd.ddsPublicationData.liveliness = pub.qos_.liveliness;
03098 dwd.ddsPublicationData.reliability = pub.qos_.reliability;
03099 dwd.ddsPublicationData.lifespan = pub.qos_.lifespan;
03100 dwd.ddsPublicationData.user_data = pub.qos_.user_data;
03101 dwd.ddsPublicationData.ownership = pub.qos_.ownership;
03102 dwd.ddsPublicationData.ownership_strength = pub.qos_.ownership_strength;
03103 dwd.ddsPublicationData.destination_order = pub.qos_.destination_order;
03104 dwd.ddsPublicationData.presentation = pub.publisher_qos_.presentation;
03105 dwd.ddsPublicationData.partition = pub.publisher_qos_.partition;
03106 dwd.ddsPublicationData.topic_data = topic_details.qos_.topic_data;
03107 dwd.ddsPublicationData.group_data = pub.publisher_qos_.group_data;
03108 dwd.writerProxy.remoteWriterGuid = publication_id;
03109
03110
03111 dwd.writerProxy.allLocators = pub.trans_info_;
03112 }
03113
03114 void
03115 Sedp::populate_discovered_reader_msg(
03116 DCPS::DiscoveredReaderData& drd,
03117 const RepoId& subscription_id,
03118 const LocalSubscription& sub)
03119 {
03120
03121
03122 OPENDDS_STRING topic_name = topic_names_[sub.topic_id_];
03123 drd.ddsSubscriptionData.topic_name = topic_name.c_str();
03124 TopicDetails& topic_details = topics_[topic_name];
03125 drd.ddsSubscriptionData.type_name = topic_details.data_type_.c_str();
03126 drd.ddsSubscriptionData.durability = sub.qos_.durability;
03127 drd.ddsSubscriptionData.deadline = sub.qos_.deadline;
03128 drd.ddsSubscriptionData.latency_budget = sub.qos_.latency_budget;
03129 drd.ddsSubscriptionData.liveliness = sub.qos_.liveliness;
03130 drd.ddsSubscriptionData.reliability = sub.qos_.reliability;
03131 drd.ddsSubscriptionData.ownership = sub.qos_.ownership;
03132 drd.ddsSubscriptionData.destination_order = sub.qos_.destination_order;
03133 drd.ddsSubscriptionData.user_data = sub.qos_.user_data;
03134 drd.ddsSubscriptionData.time_based_filter = sub.qos_.time_based_filter;
03135 drd.ddsSubscriptionData.presentation = sub.subscriber_qos_.presentation;
03136 drd.ddsSubscriptionData.partition = sub.subscriber_qos_.partition;
03137 drd.ddsSubscriptionData.topic_data = topic_details.qos_.topic_data;
03138 drd.ddsSubscriptionData.group_data = sub.subscriber_qos_.group_data;
03139 drd.readerProxy.remoteReaderGuid = subscription_id;
03140 drd.readerProxy.expectsInlineQos = false;
03141
03142
03143 drd.readerProxy.allLocators = sub.trans_info_;
03144 drd.contentFilterProperty.contentFilteredTopicName =
03145 OPENDDS_STRING(DCPS::GuidConverter(subscription_id)).c_str();
03146 drd.contentFilterProperty.relatedTopicName = topic_name.c_str();
03147 drd.contentFilterProperty.filterClassName = "";
03148 drd.contentFilterProperty.filterExpression = sub.filterProperties.filterExpression;
03149 drd.contentFilterProperty.expressionParameters = sub.filterProperties.expressionParameters;
03150 for (DCPS::RepoIdSet::const_iterator writer =
03151 sub.remote_opendds_associations_.begin();
03152 writer != sub.remote_opendds_associations_.end();
03153 ++writer) {
03154 CORBA::ULong len = drd.readerProxy.associatedWriters.length();
03155 drd.readerProxy.associatedWriters.length(len + 1);
03156 drd.readerProxy.associatedWriters[len] = *writer;
03157 }
03158 }
03159
03160 void
03161 Sedp::write_durable_publication_data(const RepoId& reader)
03162 {
03163 LocalPublicationIter pub, end = local_publications_.end();
03164 for (pub = local_publications_.begin(); pub != end; ++pub) {
03165 write_publication_data(pub->first, pub->second, reader);
03166 }
03167 publications_writer_.end_historic_samples(reader);
03168
03169 #if defined(OPENDDS_SECURITY)
03170 publications_secure_writer_.end_historic_samples(reader);
03171 #endif
03172
03173 }
03174
03175 #if defined(OPENDDS_SECURITY)
03176 void
03177 Sedp::write_durable_publication_data_secure(const RepoId& reader)
03178 {
03179 LocalPublicationIter pub, end = local_publications_.end();
03180 for (pub = local_publications_.begin(); pub != end; ++pub) {
03181 write_publication_data(pub->first, pub->second, reader);
03182 }
03183 publications_secure_writer_.end_historic_samples(reader);
03184 }
03185 #endif
03186
03187 void
03188 Sedp::write_durable_subscription_data(const RepoId& reader)
03189 {
03190 LocalSubscriptionIter sub, end = local_subscriptions_.end();
03191 for (sub = local_subscriptions_.begin(); sub != end; ++sub) {
03192 write_subscription_data(sub->first, sub->second, reader);
03193 }
03194 subscriptions_writer_.end_historic_samples(reader);
03195
03196 #if defined(OPENDDS_SECURITY)
03197 subscriptions_secure_writer_.end_historic_samples(reader);
03198 #endif
03199
03200 }
03201
03202 #if defined(OPENDDS_SECURITY)
03203 void
03204 Sedp::write_durable_subscription_data_secure(const RepoId& reader)
03205 {
03206 LocalSubscriptionIter sub, end = local_subscriptions_.end();
03207 for (sub = local_subscriptions_.begin(); sub != end; ++sub) {
03208 write_subscription_data(sub->first, sub->second, reader);
03209 }
03210 subscriptions_secure_writer_.end_historic_samples(reader);
03211 }
03212 #endif
03213
03214 void
03215 Sedp::write_durable_participant_message_data(const RepoId& reader)
03216 {
03217 LocalParticipantMessageIter part, end = local_participant_messages_.end();
03218 for (part = local_participant_messages_.begin(); part != end; ++part) {
03219 write_participant_message_data(part->first, part->second, reader);
03220 }
03221 participant_message_writer_.end_historic_samples(reader);
03222 }
03223
03224 #if defined(OPENDDS_SECURITY)
03225 DDS::ReturnCode_t
03226 Sedp::write_stateless_message(DDS::Security::ParticipantStatelessMessage& msg,
03227 const RepoId& reader)
03228 {
03229 static DCPS::SequenceNumber sequence = 0;
03230 msg.message_identity.sequence_number = static_cast<unsigned long>((++sequence).getValue());
03231 return participant_stateless_message_writer_.write_stateless_message(msg, reader, sequence);
03232 }
03233
03234 DDS::ReturnCode_t
03235 Sedp::write_volatile_message(DDS::Security::ParticipantVolatileMessageSecure& msg,
03236 const RepoId& reader)
03237 {
03238 static DCPS::SequenceNumber sequence = 0;
03239 msg.message_identity.sequence_number = static_cast<unsigned long>((++sequence).getValue());
03240 return participant_volatile_message_secure_writer_.write_volatile_message_secure(msg, reader, sequence);
03241 }
03242
03243 DDS::ReturnCode_t
03244 Sedp::write_dcps_participant_secure(const Security::SPDPdiscoveredParticipantData& msg,
03245 const RepoId& reader)
03246 {
03247 static DCPS::SequenceNumber sequence = 0;
03248
03249 DCPS::RepoId dcps_reader(reader);
03250 dcps_reader.entityId = ENTITYID_SPDP_RELIABLE_BUILTIN_PARTICIPANT_SECURE_READER;
03251
03252 return dcps_participant_secure_writer_.write_dcps_participant_secure(msg, reader, ++sequence);
03253 }
03254
03255 DDS::ReturnCode_t
03256 Sedp::write_dcps_participant_dispose(const RepoId& part)
03257 {
03258 return dcps_participant_secure_writer_.write_unregister_dispose(part, PID_PARTICIPANT_GUID);
03259 }
03260 #endif
03261
03262 DDS::ReturnCode_t
03263 Sedp::write_publication_data(
03264 const RepoId& rid,
03265 LocalPublication& lp,
03266 const RepoId& reader)
03267 {
03268 DDS::ReturnCode_t result = DDS::RETCODE_OK;
03269
03270 #if defined(OPENDDS_SECURITY)
03271 if (lp.security_attribs_.base.is_discovery_protected) {
03272 result = write_publication_data_secure(rid, lp, reader);
03273
03274 } else {
03275 #endif
03276
03277 result = write_publication_data_unsecure(rid, lp, reader);
03278
03279 #if defined(OPENDDS_SECURITY)
03280 }
03281 #endif
03282
03283 return result;
03284 }
03285
03286 DDS::ReturnCode_t
03287 Sedp::write_publication_data_unsecure(
03288 const RepoId& rid,
03289 LocalPublication& lp,
03290 const RepoId& reader)
03291 {
03292 DDS::ReturnCode_t result = DDS::RETCODE_OK;
03293 if (spdp_.associated() && (reader != GUID_UNKNOWN ||
03294 !associated_participants_.empty())) {
03295 DCPS::DiscoveredWriterData dwd;
03296 ParameterList plist;
03297 populate_discovered_writer_msg(dwd, rid, lp);
03298
03299
03300 if (ParameterListConverter::to_param_list(dwd, plist, map_ipv4_to_ipv6())) {
03301 ACE_ERROR((LM_ERROR,
03302 ACE_TEXT("(%P|%t) ERROR: Sedp::write_publication_data - ")
03303 ACE_TEXT("Failed to convert DiscoveredWriterData ")
03304 ACE_TEXT(" to ParameterList\n")));
03305 result = DDS::RETCODE_ERROR;
03306 }
03307 if (DDS::RETCODE_OK == result) {
03308 result = publications_writer_.write_parameter_list(plist, reader, lp.sequence_);
03309 }
03310 } else if (DCPS::DCPS_debug_level > 3) {
03311 ACE_DEBUG((LM_INFO, ACE_TEXT("(%P|%t) Sedp::write_publication_data - ")
03312 ACE_TEXT("not currently associated, dropping msg.\n")));
03313 }
03314 return result;
03315 }
03316
03317 #if defined(OPENDDS_SECURITY)
03318 DDS::ReturnCode_t
03319 Sedp::write_publication_data_secure(
03320 const RepoId& rid,
03321 LocalPublication& lp,
03322 const RepoId& reader)
03323 {
03324 DDS::ReturnCode_t result = DDS::RETCODE_OK;
03325 if (spdp_.associated() && (reader != GUID_UNKNOWN ||
03326 !associated_participants_.empty())) {
03327
03328 DiscoveredWriterData_SecurityWrapper dwd;
03329 ParameterList plist;
03330 populate_discovered_writer_msg(dwd.data, rid, lp);
03331
03332 dwd.security_info.endpoint_security_attributes = security_attributes_to_bitmask(lp.security_attribs_);
03333 dwd.security_info.plugin_endpoint_security_attributes = lp.security_attribs_.plugin_endpoint_attributes;
03334
03335
03336 if (ParameterListConverter::to_param_list(dwd, plist, map_ipv4_to_ipv6())) {
03337 ACE_ERROR((LM_ERROR,
03338 ACE_TEXT("(%P|%t) ERROR: Sedp::write_publication_data - ")
03339 ACE_TEXT("Failed to convert DiscoveredWriterData ")
03340 ACE_TEXT("to ParameterList\n")));
03341 result = DDS::RETCODE_ERROR;
03342 }
03343 if (DDS::RETCODE_OK == result) {
03344 RepoId effective_reader = reader;
03345 if (reader != GUID_UNKNOWN)
03346 effective_reader.entityId = ENTITYID_SEDP_BUILTIN_PUBLICATIONS_SECURE_READER;
03347 result = publications_secure_writer_.write_parameter_list(plist, effective_reader, lp.sequence_);
03348 }
03349 } else if (DCPS::DCPS_debug_level > 3) {
03350 ACE_DEBUG((LM_INFO, ACE_TEXT("(%P|%t) Sedp::write_publication_data - ")
03351 ACE_TEXT("not currently associated, dropping msg.\n")));
03352 }
03353 return result;
03354 }
03355 #endif
03356
03357 DDS::ReturnCode_t
03358 Sedp::write_subscription_data(
03359 const RepoId& rid,
03360 LocalSubscription& ls,
03361 const RepoId& reader)
03362 {
03363 DDS::ReturnCode_t result = DDS::RETCODE_OK;
03364
03365 #if defined(OPENDDS_SECURITY)
03366 if (ls.security_attribs_.base.is_discovery_protected) {
03367 result = write_subscription_data_secure(rid, ls, reader);
03368
03369 } else {
03370 #endif
03371
03372 result = write_subscription_data_unsecure(rid, ls, reader);
03373
03374 #if defined(OPENDDS_SECURITY)
03375 }
03376 #endif
03377
03378 return result;
03379 }
03380
03381 DDS::ReturnCode_t
03382 Sedp::write_subscription_data_unsecure(
03383 const RepoId& rid,
03384 LocalSubscription& ls,
03385 const RepoId& reader)
03386 {
03387 DDS::ReturnCode_t result = DDS::RETCODE_OK;
03388 if (spdp_.associated() && (reader != GUID_UNKNOWN ||
03389 !associated_participants_.empty())) {
03390 DCPS::DiscoveredReaderData drd;
03391 ParameterList plist;
03392 populate_discovered_reader_msg(drd, rid, ls);
03393
03394
03395 if (ParameterListConverter::to_param_list(drd, plist, map_ipv4_to_ipv6())) {
03396 ACE_ERROR((LM_ERROR,
03397 ACE_TEXT("(%P|%t) ERROR: Sedp::write_subscription_data - ")
03398 ACE_TEXT("Failed to convert DiscoveredReaderData ")
03399 ACE_TEXT("to ParameterList\n")));
03400 result = DDS::RETCODE_ERROR;
03401 }
03402 if (DDS::RETCODE_OK == result) {
03403 result = subscriptions_writer_.write_parameter_list(plist, reader, ls.sequence_);
03404 }
03405 } else if (DCPS::DCPS_debug_level > 3) {
03406 ACE_DEBUG((LM_INFO, ACE_TEXT("(%P|%t) Sedp::write_subscription_data - ")
03407 ACE_TEXT("not currently associated, dropping msg.\n")));
03408 }
03409 return result;
03410 }
03411
03412 #if defined(OPENDDS_SECURITY)
03413 DDS::ReturnCode_t
03414 Sedp::write_subscription_data_secure(
03415 const RepoId& rid,
03416 LocalSubscription& ls,
03417 const RepoId& reader)
03418 {
03419 DDS::ReturnCode_t result = DDS::RETCODE_OK;
03420 if (spdp_.associated() && (reader != GUID_UNKNOWN ||
03421 !associated_participants_.empty())) {
03422
03423 DiscoveredReaderData_SecurityWrapper drd;
03424 ParameterList plist;
03425 populate_discovered_reader_msg(drd.data, rid, ls);
03426
03427 drd.security_info.endpoint_security_attributes = security_attributes_to_bitmask(ls.security_attribs_);
03428 drd.security_info.plugin_endpoint_security_attributes = ls.security_attribs_.plugin_endpoint_attributes;
03429
03430
03431 if (ParameterListConverter::to_param_list(drd, plist, map_ipv4_to_ipv6())) {
03432 ACE_ERROR((LM_ERROR,
03433 ACE_TEXT("(%P|%t) ERROR: Sedp::write_subscription_data - ")
03434 ACE_TEXT("Failed to convert DiscoveredReaderData ")
03435 ACE_TEXT("to ParameterList\n")));
03436 result = DDS::RETCODE_ERROR;
03437 }
03438 if (DDS::RETCODE_OK == result) {
03439 RepoId effective_reader = reader;
03440 if (reader != GUID_UNKNOWN)
03441 effective_reader.entityId = ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_SECURE_READER;
03442 result = subscriptions_secure_writer_.write_parameter_list(plist, effective_reader, ls.sequence_);
03443 }
03444 } else if (DCPS::DCPS_debug_level > 3) {
03445 ACE_DEBUG((LM_INFO, ACE_TEXT("(%P|%t) Sedp::write_subscription_data - ")
03446 ACE_TEXT("not currently associated, dropping msg.\n")));
03447 }
03448 return result;
03449 }
03450 #endif
03451
03452 DDS::ReturnCode_t
03453 Sedp::write_participant_message_data(
03454 const RepoId& rid,
03455 LocalParticipantMessage& pm,
03456 const RepoId& reader)
03457 {
03458 DDS::ReturnCode_t result = DDS::RETCODE_OK;
03459 if (spdp_.associated() && (reader != GUID_UNKNOWN ||
03460 !associated_participants_.empty())) {
03461 ParticipantMessageData pmd;
03462 pmd.participantGuid = rid;
03463 result = participant_message_writer_.write_participant_message(pmd, reader, pm.sequence_);
03464 } else if (DCPS::DCPS_debug_level > 3) {
03465 ACE_DEBUG((LM_INFO, ACE_TEXT("(%P|%t) Sedp::write_participant_message_data - ")
03466 ACE_TEXT("not currently associated, dropping msg.\n")));
03467 }
03468 return result;
03469 }
03470
03471 void
03472 Sedp::set_inline_qos(DCPS::TransportLocatorSeq& locators)
03473 {
03474 const OPENDDS_STRING rtps_udp = "rtps_udp";
03475 for (CORBA::ULong i = 0; i < locators.length(); ++i) {
03476 if (locators[i].transport_type.in() == rtps_udp) {
03477 const CORBA::ULong len = locators[i].data.length();
03478 locators[i].data.length(len + 1);
03479 locators[i].data[len] = CORBA::Octet(1);
03480 }
03481 }
03482 }
03483
03484 void
03485 Sedp::acknowledge()
03486 {
03487 task_.acknowledge();
03488 }
03489
03490 void
03491 Sedp::Task::enqueue(DCPS::MessageId id, DCPS::unique_ptr<Security::SPDPdiscoveredParticipantData> pdata)
03492 {
03493 if (spdp_->shutting_down()) { return; }
03494
03495 Msg::MsgType type = Msg::MSG_PARTICIPANT;
03496
03497 #if defined(OPENDDS_SECURITY)
03498 if (pdata->dataKind == Security::DPDK_SECURE) {
03499 type = Msg::MSG_DCPS_PARTICIPANT_SECURE;
03500 }
03501 #endif
03502
03503 putq(new Msg(type, id, pdata.release()));
03504 }
03505
03506 void
03507 Sedp::Task::enqueue(DCPS::MessageId id, DCPS::unique_ptr<DCPS::DiscoveredWriterData> wdata)
03508 {
03509 if (spdp_->shutting_down()) { return; }
03510 putq(new Msg(Msg::MSG_WRITER, id, wdata.release()));
03511 }
03512
03513 #if defined(OPENDDS_SECURITY)
03514 void
03515 Sedp::Task::enqueue(DCPS::MessageId id, DCPS::unique_ptr<DiscoveredWriterData_SecurityWrapper> wrapper)
03516 {
03517 if (spdp_->shutting_down()) { return; }
03518 putq(new Msg(Msg::MSG_WRITER_SECURE, id, wrapper.release()));
03519 }
03520 #endif
03521
03522 void
03523 Sedp::Task::enqueue(DCPS::MessageId id, DCPS::unique_ptr<DCPS::DiscoveredReaderData> rdata)
03524 {
03525 if (spdp_->shutting_down()) { return; }
03526 putq(new Msg(Msg::MSG_READER, id, rdata.release()));
03527 }
03528
03529 #if defined(OPENDDS_SECURITY)
03530 void
03531 Sedp::Task::enqueue(DCPS::MessageId id, DCPS::unique_ptr<DiscoveredReaderData_SecurityWrapper> wrapper)
03532 {
03533 if (spdp_->shutting_down()) { return; }
03534 putq(new Msg(Msg::MSG_READER_SECURE, id, wrapper.release()));
03535 }
03536 #endif
03537
03538 void
03539 Sedp::Task::enqueue(DCPS::MessageId id, DCPS::unique_ptr<ParticipantMessageData> data)
03540 {
03541 if (spdp_->shutting_down()) { return; }
03542 putq(new Msg(Msg::MSG_PARTICIPANT_DATA, id, data.release()));
03543 }
03544
03545 void
03546 Sedp::Task::enqueue(Msg::MsgType which_bit, const DDS::InstanceHandle_t bit_ih)
03547 {
03548 #ifndef DDS_HAS_MINIMUM_BIT
03549 if (spdp_->shutting_down()) { return; }
03550 putq(new Msg(which_bit, DCPS::DISPOSE_INSTANCE, bit_ih));
03551 #else
03552 ACE_UNUSED_ARG(which_bit);
03553 ACE_UNUSED_ARG(bit_ih);
03554 #endif
03555 }
03556
03557 #if defined(OPENDDS_SECURITY)
03558 void
03559 Sedp::Task::enqueue_participant_message_secure(DCPS::MessageId id, DCPS::unique_ptr<ParticipantMessageData> data)
03560 {
03561 if (spdp_->shutting_down()) { return; }
03562 putq(new Msg(Msg::MSG_PARTICIPANT_DATA_SECURE, id, data.release()));
03563 }
03564
03565 void
03566 Sedp::Task::enqueue_stateless_message(DCPS::MessageId id, DCPS::unique_ptr<DDS::Security::ParticipantStatelessMessage> data)
03567 {
03568 if (spdp_->shutting_down()) { return; }
03569 putq(new Msg(Msg::MSG_PARTICIPANT_STATELESS_DATA, id, data.release()));
03570 }
03571
03572 void
03573 Sedp::Task::enqueue_volatile_message_secure(DCPS::MessageId id, DCPS::unique_ptr<DDS::Security::ParticipantVolatileMessageSecure> data)
03574 {
03575 if (spdp_->shutting_down()) { return; }
03576 putq(new Msg(Msg::MSG_PARTICIPANT_VOLATILE_SECURE, id, data.release()));
03577 }
03578 #endif
03579
03580 int
03581 Sedp::Task::svc()
03582 {
03583 for (Msg* msg = 0; getq(msg) != -1; ) {
03584 if (DCPS::DCPS_debug_level > 5) {
03585 ACE_DEBUG((LM_DEBUG, "(%P|%t) Sedp::Task::svc "
03586 "got message from queue type %d\n", msg->type_));
03587 }
03588 DCPS::unique_ptr<Msg> delete_the_msg(msg);
03589 switch (msg->type_) {
03590 case Msg::MSG_PARTICIPANT:
03591 svc_i(msg->dpdata_);
03592 break;
03593
03594 case Msg::MSG_WRITER:
03595 svc_i(msg->id_, msg->wdata_);
03596 break;
03597
03598 #if defined(OPENDDS_SECURITY)
03599 case Msg::MSG_WRITER_SECURE:
03600 svc_i(msg->id_, msg->wdata_secure_);
03601 break;
03602 #endif
03603
03604 case Msg::MSG_READER:
03605 svc_i(msg->id_, msg->rdata_);
03606 break;
03607
03608 #if defined(OPENDDS_SECURITY)
03609 case Msg::MSG_READER_SECURE:
03610 svc_i(msg->id_, msg->rdata_secure_);
03611 break;
03612 #endif
03613
03614 case Msg::MSG_PARTICIPANT_DATA:
03615 svc_i(msg->id_, msg->pmdata_);
03616 break;
03617
03618 #if defined(OPENDDS_SECURITY)
03619 case Msg::MSG_PARTICIPANT_DATA_SECURE:
03620 svc_participant_message_data_secure(msg->id_, msg->pmdata_);
03621 break;
03622
03623 case Msg::MSG_PARTICIPANT_STATELESS_DATA:
03624 svc_stateless_message(msg->id_, msg->pgmdata_);
03625 break;
03626
03627 case Msg::MSG_PARTICIPANT_VOLATILE_SECURE:
03628 svc_volatile_message_secure(msg->id_, msg->pgmdata_);
03629 break;
03630
03631 case Msg::MSG_DCPS_PARTICIPANT_SECURE:
03632 svc_secure_i(msg->id_, msg->dpdata_);
03633 break;
03634 #endif
03635
03636 case Msg::MSG_REMOVE_FROM_PUB_BIT:
03637 case Msg::MSG_REMOVE_FROM_SUB_BIT:
03638 svc_i(msg->type_, msg->ih_);
03639 break;
03640
03641 case Msg::MSG_FINI_BIT:
03642
03643
03644
03645 spdp_->wait_for_acks().ack();
03646 break;
03647
03648 case Msg::MSG_STOP:
03649 if (DCPS::DCPS_debug_level > 3) {
03650 ACE_DEBUG((LM_INFO, ACE_TEXT("(%P|%t) Sedp::Task::svc - ")
03651 ACE_TEXT("received MSG_STOP. Task exiting\n")));
03652 }
03653 return 0;
03654 }
03655
03656 if (DCPS::DCPS_debug_level > 5) {
03657 ACE_DEBUG((LM_DEBUG, "(%P|%t) Sedp::Task::svc done with message\n"));
03658 }
03659 }
03660 if (DCPS::DCPS_debug_level > 3) {
03661 ACE_DEBUG((LM_INFO, ACE_TEXT("(%P|%t) Sedp::Task::svc - ")
03662 ACE_TEXT("Task exiting.\n")));
03663 }
03664 return 0;
03665 }
03666
03667 Sedp::Task::~Task()
03668 {
03669 shutdown();
03670 }
03671
03672 bool
03673 Sedp::shutting_down() const
03674 {
03675 return spdp_.shutting_down();
03676 }
03677
03678 void
03679 Sedp::populate_transport_locator_sequence(DCPS::TransportLocatorSeq*& rTls,
03680 DiscoveredSubscriptionIter& dsi,
03681 const RepoId& reader)
03682 {
03683 DCPS::LocatorSeq locs;
03684 bool participantExpectsInlineQos = false;
03685 RepoId remote_participant = reader;
03686 remote_participant.entityId = ENTITYID_PARTICIPANT;
03687 const bool participant_found =
03688 spdp_.get_default_locators(remote_participant, locs,
03689 participantExpectsInlineQos);
03690 if (!rTls->length()) {
03691 if (!participant_found) {
03692 defer_match_endpoints_.insert(reader);
03693 return;
03694 } else if (locs.length()) {
03695 size_t size = 0, padding = 0;
03696 DCPS::gen_find_size(locs, size, padding);
03697
03698 ACE_Message_Block mb_locator(size + 1);
03699 using DCPS::Serializer;
03700 Serializer ser_loc(&mb_locator,
03701 ACE_CDR_BYTE_ORDER,
03702 Serializer::ALIGN_CDR);
03703 ser_loc << locs;
03704 const bool readerExpectsInlineQos =
03705 dsi->second.reader_data_.readerProxy.expectsInlineQos;
03706 ser_loc << ACE_OutputCDR::from_boolean(participantExpectsInlineQos
03707 || readerExpectsInlineQos);
03708
03709 DCPS::TransportLocator tl;
03710 tl.transport_type = "rtps_udp";
03711 message_block_to_sequence (mb_locator, tl.data);
03712 rTls->length(1);
03713 (*rTls)[0] = tl;
03714 } else {
03715 ACE_DEBUG((LM_WARNING,
03716 ACE_TEXT("(%P|%t) Sedp::match - ")
03717 ACE_TEXT("remote reader found with no locators ")
03718 ACE_TEXT("and no default locators\n")));
03719 }
03720 }
03721 }
03722
03723 void
03724 Sedp::populate_transport_locator_sequence(DCPS::TransportLocatorSeq*& wTls,
03725 DiscoveredPublicationIter& ,
03726 const RepoId& writer)
03727 {
03728 DCPS::LocatorSeq locs;
03729 bool participantExpectsInlineQos = false;
03730 RepoId remote_participant = writer;
03731 remote_participant.entityId = ENTITYID_PARTICIPANT;
03732 const bool participant_found =
03733 spdp_.get_default_locators(remote_participant, locs,
03734 participantExpectsInlineQos);
03735 if (!wTls->length()) {
03736 if (!participant_found) {
03737 defer_match_endpoints_.insert(writer);
03738 return;
03739 } else if (locs.length()) {
03740 size_t size = 0, padding = 0;
03741 DCPS::gen_find_size(locs, size, padding);
03742
03743 ACE_Message_Block mb_locator(size + 1);
03744 using DCPS::Serializer;
03745 Serializer ser_loc(&mb_locator,
03746 ACE_CDR_BYTE_ORDER,
03747 Serializer::ALIGN_CDR);
03748 ser_loc << locs;
03749 ser_loc << ACE_OutputCDR::from_boolean(participantExpectsInlineQos);
03750
03751 DCPS::TransportLocator tl;
03752 tl.transport_type = "rtps_udp";
03753 message_block_to_sequence (mb_locator, tl.data);
03754 wTls->length(1);
03755 (*wTls)[0] = tl;
03756 } else {
03757 ACE_DEBUG((LM_WARNING,
03758 ACE_TEXT("(%P|%t) Sedp::match - ")
03759 ACE_TEXT("remote writer found with no locators ")
03760 ACE_TEXT("and no default locators\n")));
03761 }
03762 }
03763 }
03764
03765 #if defined(OPENDDS_SECURITY)
03766 DCPS::TransportLocatorSeq
03767 Sedp::add_security_info(const DCPS::TransportLocatorSeq& locators,
03768 const RepoId& writer, const RepoId& reader)
03769 {
03770 using DCPS::Serializer;
03771
03772 if (std::memcmp(writer.guidPrefix, reader.guidPrefix,
03773 sizeof(DCPS::GuidPrefix_t)) == 0) {
03774
03775 return locators;
03776 }
03777
03778 DDS::Security::ParticipantCryptoHandle part_handle = DDS::HANDLE_NIL;
03779 DDS::Security::DatawriterCryptoHandle dw_handle = DDS::HANDLE_NIL;
03780 DDS::Security::DatareaderCryptoHandle dr_handle = DDS::HANDLE_NIL;
03781
03782 if (local_reader_crypto_handles_.find(reader) != local_reader_crypto_handles_.end() &&
03783 remote_writer_crypto_handles_.find(writer) != remote_writer_crypto_handles_.end()) {
03784 dr_handle = local_reader_crypto_handles_[reader];
03785 dw_handle = remote_writer_crypto_handles_[writer];
03786 DCPS::RepoId part = writer;
03787 part.entityId = ENTITYID_PARTICIPANT;
03788 part_handle = spdp_.lookup_participant_crypto_info(part).first;
03789 } else if (local_writer_crypto_handles_.find(writer) != local_writer_crypto_handles_.end() &&
03790 remote_reader_crypto_handles_.find(reader) != remote_reader_crypto_handles_.end()) {
03791 dw_handle = local_writer_crypto_handles_[writer];
03792 dr_handle = remote_reader_crypto_handles_[reader];
03793 DCPS::RepoId part = reader;
03794 part.entityId = ENTITYID_PARTICIPANT;
03795 part_handle = spdp_.lookup_participant_crypto_info(part).first;
03796 }
03797
03798 if (part_handle == DDS::HANDLE_NIL) {
03799
03800 return locators;
03801 }
03802
03803 DCPS::TransportLocatorSeq_var newLoc;
03804 DDS::OctetSeq added;
03805 for (unsigned int i = 0; i < locators.length(); ++i) {
03806 if (std::strcmp(locators[i].transport_type.in(), "rtps_udp") == 0) {
03807 if (!newLoc) {
03808 newLoc = new DCPS::TransportLocatorSeq(locators);
03809
03810 DDS::OctetSeq handleOctets = handle_to_octets(part_handle);
03811 const DDS::BinaryProperty_t prop = {BLOB_PROP_PART_CRYPTO_HANDLE,
03812 handleOctets, true };
03813 DDS::OctetSeq handleOctetsDw = handle_to_octets(dw_handle);
03814 const DDS::BinaryProperty_t dw_p = {BLOB_PROP_DW_CRYPTO_HANDLE,
03815 handleOctetsDw, true };
03816 DDS::OctetSeq handleOctetsDr = handle_to_octets(dr_handle);
03817 const DDS::BinaryProperty_t dr_p = {BLOB_PROP_DR_CRYPTO_HANDLE,
03818 handleOctetsDr, true };
03819 size_t size = 0, padding = 0;
03820 DCPS::gen_find_size(prop, size, padding);
03821 if (dw_handle != DDS::HANDLE_NIL) {
03822 DCPS::gen_find_size(dw_p, size, padding);
03823 }
03824 if (dr_handle != DDS::HANDLE_NIL) {
03825 DCPS::gen_find_size(dr_p, size, padding);
03826 }
03827 ACE_Message_Block mb(size + padding);
03828 Serializer ser(&mb, ACE_CDR_BYTE_ORDER, Serializer::ALIGN_CDR);
03829 ser << prop;
03830 if (dw_handle != DDS::HANDLE_NIL) {
03831 ser << dw_p;
03832 }
03833 if (dr_handle != DDS::HANDLE_NIL) {
03834 ser << dr_p;
03835 }
03836 added.length(mb.size());
03837 std::memcpy(added.get_buffer(), mb.rd_ptr(), mb.size());
03838 }
03839
03840 const unsigned int prevLen = newLoc[i].data.length();
03841 newLoc[i].data.length(prevLen + added.length());
03842 std::memcpy(newLoc[i].data.get_buffer() + prevLen, added.get_buffer(),
03843 added.length());
03844 }
03845 }
03846
03847 return newLoc ? newLoc : locators;
03848 }
03849 #endif
03850
03851 bool
03852 Sedp::defer_writer(const RepoId& writer,
03853 const RepoId& writer_participant)
03854 {
03855 if (!associated_participants_.count(writer_participant)) {
03856 if (DCPS::DCPS_debug_level > 3) {
03857 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) Sedp::match - ")
03858 ACE_TEXT("remote writer deferred\n")));
03859 }
03860 defer_match_endpoints_.insert(writer);
03861 return true;
03862 }
03863 return false;
03864 }
03865
03866 bool
03867 Sedp::defer_reader(const RepoId& reader,
03868 const RepoId& reader_participant)
03869 {
03870 if (!associated_participants_.count(reader_participant)) {
03871 if (DCPS::DCPS_debug_level > 3) {
03872 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) Sedp::match - ")
03873 ACE_TEXT("remote reader deferred\n")));
03874 }
03875 defer_match_endpoints_.insert(reader);
03876 return true;
03877 }
03878 return false;
03879 }
03880
03881 #if defined(OPENDDS_SECURITY)
03882 DDS::Security::DatawriterCryptoHandle
03883 Sedp::generate_remote_matched_writer_crypto_handle(const RepoId& writer_part, const DDS::Security::DatareaderCryptoHandle& drch)
03884 {
03885 DDS::Security::DatawriterCryptoHandle result = DDS::HANDLE_NIL;
03886
03887 DDS::Security::CryptoKeyFactory_var key_factory = spdp_.get_security_config()->get_crypto_key_factory();
03888
03889 Spdp::ParticipantCryptoInfoPair info = spdp_.lookup_participant_crypto_info(writer_part);
03890
03891 if (info.first != DDS::HANDLE_NIL && info.second) {
03892 DDS::Security::SecurityException se = {"", 0, 0};
03893 result = key_factory->register_matched_remote_datawriter(drch, info.first, info.second, se);
03894 if (result == DDS::HANDLE_NIL) {
03895 ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: Sedp::generate_remote_matched_writer_crypto_handle() - ")
03896 ACE_TEXT("Failure calling register_matched_remote_datawriter(). Security Exception[%d.%d]: %C\n"),
03897 se.code, se.minor_code, se.message.in()));
03898 }
03899 } else {
03900 ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: Sedp::generate_remote_matched_writer_crypto_handle() - ")
03901 ACE_TEXT("Unable to lookup remote participant crypto info.\n")));
03902 }
03903 return result;
03904 }
03905
03906 DDS::Security::DatareaderCryptoHandle
03907 Sedp::generate_remote_matched_reader_crypto_handle(const RepoId& reader_part,
03908 const DDS::Security::DatawriterCryptoHandle& dwch,
03909 bool relay_only)
03910 {
03911 DDS::Security::DatareaderCryptoHandle result = DDS::HANDLE_NIL;
03912
03913 DDS::Security::CryptoKeyFactory_var key_factory = spdp_.get_security_config()->get_crypto_key_factory();
03914
03915 Spdp::ParticipantCryptoInfoPair info = spdp_.lookup_participant_crypto_info(reader_part);
03916
03917 if (info.first != DDS::HANDLE_NIL && info.second) {
03918 DDS::Security::SecurityException se = {"", 0, 0};
03919 result = key_factory->register_matched_remote_datareader(dwch, info.first, info.second, relay_only, se);
03920 if (result == DDS::HANDLE_NIL) {
03921 ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: Sedp::generate_remote_matched_reader_crypto_handle() - ")
03922 ACE_TEXT("Failure calling register_matched_remote_datareader(). Security Exception[%d.%d]: %C\n"),
03923 se.code, se.minor_code, se.message.in()));
03924 }
03925 } else {
03926 ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: Sedp::generate_remote_matched_reader_crypto_handle() - ")
03927 ACE_TEXT("Unable to lookup remote participant crypto info.\n")));
03928 }
03929 return result;
03930 }
03931
03932 void
03933 Sedp::create_and_send_datareader_crypto_tokens(const DDS::Security::DatareaderCryptoHandle& drch,
03934 const RepoId& local_reader,
03935 const DDS::Security::DatawriterCryptoHandle& dwch,
03936 const RepoId& remote_writer)
03937 {
03938 DDS::Security::SecurityException se = {"", 0, 0};
03939 DDS::Security::CryptoKeyExchange_var key_exchange = spdp_.get_security_config()->get_crypto_key_exchange();
03940
03941 DDS::Security::DatareaderCryptoTokenSeq drcts;
03942
03943 if (!key_exchange->create_local_datareader_crypto_tokens(drcts, drch, dwch, se)) {
03944 ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: ")
03945 ACE_TEXT("Sedp::create_and_send_datareader_crypto_tokens() - ")
03946 ACE_TEXT("Unable to create local datareader crypto tokens with crypto key exchange plugin. ")
03947 ACE_TEXT("Security Exception[%d.%d]: %C\n"), se.code, se.minor_code, se.message.in()));
03948 return;
03949 }
03950
03951 if (drcts.length() != 0) {
03952 DCPS::RepoId remote_part = remote_writer;
03953 remote_part.entityId = ENTITYID_PARTICIPANT;
03954
03955 DCPS::RepoId local_volatile_writer = participant_id_;
03956 local_volatile_writer.entityId = ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_WRITER;
03957
03958 DCPS::RepoId remote_volatile_reader = remote_part;
03959 remote_volatile_reader.entityId = ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_READER;
03960
03961 DDS::Security::ParticipantVolatileMessageSecure msg;
03962 msg.message_identity.source_guid = local_volatile_writer;
03963 msg.message_class_id = DDS::Security::GMCLASSID_SECURITY_DATAREADER_CRYPTO_TOKENS;
03964 msg.destination_participant_guid = remote_part;
03965 msg.destination_endpoint_guid = remote_writer;
03966 msg.source_endpoint_guid = local_reader;
03967 msg.message_data = reinterpret_cast<const DDS::Security::DataHolderSeq&>(drcts);
03968
03969 if (write_volatile_message(msg, remote_volatile_reader) != DDS::RETCODE_OK) {
03970 ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: Sedp::create_and_send_datareader_crypto_tokens() - ")
03971 ACE_TEXT("Unable to write volatile message.\n")));
03972 return;
03973 }
03974 }
03975 return;
03976 }
03977
03978 void
03979 Sedp::create_and_send_datawriter_crypto_tokens(const DDS::Security::DatawriterCryptoHandle& dwch,
03980 const RepoId& local_writer,
03981 const DDS::Security::DatareaderCryptoHandle& drch,
03982 const RepoId& remote_reader)
03983 {
03984 DDS::Security::SecurityException se = {"", 0, 0};
03985 DDS::Security::CryptoKeyExchange_var key_exchange = spdp_.get_security_config()->get_crypto_key_exchange();
03986
03987 DDS::Security::DatawriterCryptoTokenSeq dwcts;
03988
03989 if (key_exchange->create_local_datawriter_crypto_tokens(dwcts, dwch, drch, se) == false) {
03990 ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: ")
03991 ACE_TEXT("Sedp::create_and_send_datawriter_crypto_tokens() - ")
03992 ACE_TEXT("Unable to create local datawriter crypto tokens with crypto key exchange plugin. Security Exception[%d.%d]: %C\n"),
03993 se.code, se.minor_code, se.message.in()));
03994 return;
03995 }
03996
03997 if (dwcts.length() != 0) {
03998 DCPS::RepoId remote_part = remote_reader;
03999 remote_part.entityId = ENTITYID_PARTICIPANT;
04000
04001 DCPS::RepoId local_volatile_writer = participant_id_;
04002 local_volatile_writer.entityId = ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_WRITER;
04003
04004 DCPS::RepoId remote_volatile_reader = remote_part;
04005 remote_volatile_reader.entityId = ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_READER;
04006
04007 DDS::Security::ParticipantVolatileMessageSecure msg;
04008 msg.message_identity.source_guid = local_volatile_writer;
04009 msg.message_class_id = DDS::Security::GMCLASSID_SECURITY_DATAWRITER_CRYPTO_TOKENS;
04010 msg.destination_participant_guid = remote_part;
04011 msg.destination_endpoint_guid = remote_reader;
04012 msg.source_endpoint_guid = local_writer;
04013 msg.message_data = reinterpret_cast<const DDS::Security::DataHolderSeq&>(dwcts);
04014
04015 if (write_volatile_message(msg, remote_volatile_reader) != DDS::RETCODE_OK) {
04016 ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: Sedp::create_and_send_datawriter_crypto_tokens() - ")
04017 ACE_TEXT("Unable to write volatile message.\n")));
04018 return;
04019 }
04020 }
04021 return;
04022 }
04023
04024 void
04025 Sedp::handle_datawriter_crypto_tokens(const DDS::Security::ParticipantVolatileMessageSecure& msg) {
04026 DDS::Security::SecurityException se = {"", 0, 0};
04027 Security::CryptoKeyExchange_var key_exchange = spdp_.get_security_config()->get_crypto_key_exchange();
04028
04029
04030 if (msg.destination_participant_guid != participant_id_ || !msg.message_data.length()) {
04031 return;
04032 }
04033
04034 ACE_Guard<ACE_Thread_Mutex> g(lock_, false);
04035
04036 DCPS::DatawriterCryptoHandleMap::const_iterator w_iter = remote_writer_crypto_handles_.find(msg.source_endpoint_guid);
04037 DCPS::DatareaderCryptoHandleMap::const_iterator r_iter = local_reader_crypto_handles_.find(msg.destination_endpoint_guid);
04038
04039 DDS::Security::DatawriterCryptoTokenSeq dwcts;
04040 dwcts = reinterpret_cast<const DDS::Security::DatawriterCryptoTokenSeq&>(msg.message_data);
04041
04042 if (w_iter == remote_writer_crypto_handles_.end()) {
04043 ACE_DEBUG((LM_DEBUG,
04044 ACE_TEXT("(%P|%t) Sedp::handle_datawriter_crypto_tokens() - ")
04045 ACE_TEXT("received tokens for unknown remote writer. Caching.\n")));
04046
04047 pending_remote_writer_crypto_tokens_[msg.source_endpoint_guid] = dwcts;
04048 return;
04049 }
04050
04051 if (r_iter == local_reader_crypto_handles_.end()) {
04052 ACE_DEBUG((LM_WARNING,
04053 ACE_TEXT("(%P|%t) Sedp::handle_datawriter_crypto_tokens() - ")
04054 ACE_TEXT("received tokens for unknown local reader. Ignoring.\n")));
04055 return;
04056 }
04057
04058 if (!key_exchange->set_remote_datawriter_crypto_tokens(r_iter->second, w_iter->second, dwcts, se)) {
04059 ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) ERROR: ")
04060 ACE_TEXT("(%P|%t) WARNING: Sedp::handle_datawriter_crypto_tokens() - ")
04061 ACE_TEXT("Unable to set remote datawriter crypto tokens with crypto key exchange plugin. ")
04062 ACE_TEXT("Security Exception[%d.%d]: %C\n"), se.code, se.minor_code, se.message.in()));
04063 return;
04064 }
04065 }
04066
04067 void
04068 Sedp::handle_datareader_crypto_tokens(const DDS::Security::ParticipantVolatileMessageSecure& msg) {
04069 DDS::Security::SecurityException se = {"", 0, 0};
04070 Security::CryptoKeyExchange_var key_exchange = spdp_.get_security_config()->get_crypto_key_exchange();
04071
04072
04073 if (msg.destination_participant_guid != participant_id_ || !msg.message_data.length()) {
04074 return;
04075 }
04076
04077 ACE_Guard<ACE_Thread_Mutex> g(lock_, false);
04078
04079 DCPS::DatareaderCryptoHandleMap::const_iterator r_iter = remote_reader_crypto_handles_.find(msg.source_endpoint_guid);
04080 DCPS::DatawriterCryptoHandleMap::const_iterator w_iter = local_writer_crypto_handles_.find(msg.destination_endpoint_guid);
04081
04082 DDS::Security::DatareaderCryptoTokenSeq drcts;
04083 drcts = reinterpret_cast<const DDS::Security::DatareaderCryptoTokenSeq&>(msg.message_data);
04084
04085 if (r_iter == remote_reader_crypto_handles_.end()) {
04086 ACE_DEBUG((LM_DEBUG,
04087 ACE_TEXT("(%P|%t) Sedp::handle_datareader_crypto_tokens() - ")
04088 ACE_TEXT("received tokens for unknown remote reader. Caching.\n")));
04089
04090 pending_remote_reader_crypto_tokens_[msg.source_endpoint_guid] = drcts;
04091 return;
04092 }
04093
04094 if (w_iter == local_writer_crypto_handles_.end()) {
04095 ACE_DEBUG((LM_WARNING,
04096 ACE_TEXT("(%P|%t) Sedp::handle_datareader_crypto_tokens() - ")
04097 ACE_TEXT("received tokens for unknown local writer. Ignoring.\n")));
04098 return;
04099 }
04100
04101 if (!key_exchange->set_remote_datareader_crypto_tokens(w_iter->second, r_iter->second, drcts, se)) {
04102 ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) ERROR: ")
04103 ACE_TEXT("(%P|%t) WARNING: Sedp::handle_datareader_crypto_tokens() - ")
04104 ACE_TEXT("Unable to set remote datareader crypto tokens with crypto key exchange plugin. ")
04105 ACE_TEXT("Security Exception[%d.%d]: %C\n"), se.code, se.minor_code, se.message.in()));
04106 return;
04107 }
04108 }
04109
04110 DDS::DomainId_t Sedp::get_domain_id() const {
04111 return spdp_.get_domain_id();
04112 }
04113 #endif
04114
04115 WaitForAcks::WaitForAcks()
04116 : cond_(lock_)
04117 , acks_(0)
04118 {
04119 }
04120
04121 void
04122 WaitForAcks::ack()
04123 {
04124 {
04125 ACE_GUARD(ACE_Thread_Mutex, g, lock_);
04126 ++acks_;
04127 }
04128 cond_.signal();
04129 }
04130
04131 void
04132 WaitForAcks::wait_for_acks(unsigned int num_acks)
04133 {
04134 ACE_GUARD(ACE_Thread_Mutex, g, lock_);
04135 while (num_acks > acks_) {
04136 cond_.wait();
04137 }
04138 }
04139
04140 void
04141 WaitForAcks::reset()
04142 {
04143 ACE_GUARD(ACE_Thread_Mutex, g, lock_);
04144 acks_ = 0;
04145
04146
04147 }
04148
04149 }
04150 }
04151
04152 OPENDDS_END_VERSIONED_NAMESPACE_DECL