00001
00002
00003
00004
00005
00006
00007
00008 #include "Sedp.h"
00009 #include "Spdp.h"
00010 #include "MessageTypes.h"
00011 #include "RtpsDiscovery.h"
00012 #include "RtpsCoreTypeSupportImpl.h"
00013 #include "ParameterListConverter.h"
00014
00015 #include "dds/DCPS/transport/framework/ReceivedDataSample.h"
00016 #include "dds/DCPS/transport/rtps_udp/RtpsUdpInst.h"
00017 #include "dds/DCPS/transport/rtps_udp/RtpsUdpInst_rch.h"
00018
00019 #include "dds/DCPS/Serializer.h"
00020 #include "dds/DCPS/Definitions.h"
00021 #include "dds/DCPS/GuidConverter.h"
00022 #include "dds/DCPS/GuidUtils.h"
00023 #include "dds/DdsDcpsGuidTypeSupportImpl.h"
00024 #include "dds/DCPS/AssociationData.h"
00025 #include "dds/DCPS/Service_Participant.h"
00026 #include "dds/DCPS/Qos_Helper.h"
00027 #include "dds/DCPS/DataSampleHeader.h"
00028 #include "dds/DCPS/SendStateDataSampleList.h"
00029 #include "dds/DCPS/DataReaderCallbacks.h"
00030 #include "dds/DCPS/DataWriterCallbacks.h"
00031 #include "dds/DCPS/Marked_Default_Qos.h"
00032 #include "dds/DCPS/BuiltInTopicUtils.h"
00033 #include "dds/DCPS/DCPS_Utils.h"
00034 #include "dds/DCPS/transport/framework/NetworkAddress.h"
00035
00036 #include <ace/Reverse_Lock_T.h>
00037 #include <ace/Auto_Ptr.h>
00038
00039 #include <cstring>
00040
00041 namespace {
00042 bool qosChanged(DDS::PublicationBuiltinTopicData& dest,
00043 const DDS::PublicationBuiltinTopicData& src)
00044 {
00045 #ifndef OPENDDS_SAFETY_PROFILE
00046 using OpenDDS::DCPS::operator!=;
00047 #endif
00048 bool changed = false;
00049
00050
00051
00052 if (dest.deadline != src.deadline) {
00053 changed = true;
00054 dest.deadline = src.deadline;
00055 }
00056
00057 if (dest.latency_budget != src.latency_budget) {
00058 changed = true;
00059 dest.latency_budget = src.latency_budget;
00060 }
00061
00062 if (dest.lifespan != src.lifespan) {
00063 changed = true;
00064 dest.lifespan = src.lifespan;
00065 }
00066
00067 if (dest.user_data != src.user_data) {
00068 changed = true;
00069 dest.user_data = src.user_data;
00070 }
00071
00072 if (dest.ownership_strength != src.ownership_strength) {
00073 changed = true;
00074 dest.ownership_strength = src.ownership_strength;
00075 }
00076
00077 if (dest.partition != src.partition) {
00078 changed = true;
00079 dest.partition = src.partition;
00080 }
00081
00082 if (dest.topic_data != src.topic_data) {
00083 changed = true;
00084 dest.topic_data = src.topic_data;
00085 }
00086
00087 if (dest.group_data != src.group_data) {
00088 changed = true;
00089 dest.group_data = src.group_data;
00090 }
00091
00092 return changed;
00093 }
00094
00095 bool qosChanged(DDS::SubscriptionBuiltinTopicData& dest,
00096 const DDS::SubscriptionBuiltinTopicData& src)
00097 {
00098 #ifndef OPENDDS_SAFETY_PROFILE
00099 using OpenDDS::DCPS::operator!=;
00100 #endif
00101 bool changed = false;
00102
00103
00104
00105 if (dest.deadline != src.deadline) {
00106 changed = true;
00107 dest.deadline = src.deadline;
00108 }
00109
00110 if (dest.latency_budget != src.latency_budget) {
00111 changed = true;
00112 dest.latency_budget = src.latency_budget;
00113 }
00114
00115 if (dest.user_data != src.user_data) {
00116 changed = true;
00117 dest.user_data = src.user_data;
00118 }
00119
00120 if (dest.time_based_filter != src.time_based_filter) {
00121 changed = true;
00122 dest.time_based_filter = src.time_based_filter;
00123 }
00124
00125 if (dest.partition != src.partition) {
00126 changed = true;
00127 dest.partition = src.partition;
00128 }
00129
00130 if (dest.topic_data != src.topic_data) {
00131 changed = true;
00132 dest.topic_data = src.topic_data;
00133 }
00134
00135 if (dest.group_data != src.group_data) {
00136 changed = true;
00137 dest.group_data = src.group_data;
00138 }
00139
00140 return changed;
00141 }
00142
00143 bool paramsChanged(OpenDDS::DCPS::ContentFilterProperty_t& dest,
00144 const OpenDDS::DCPS::ContentFilterProperty_t& src)
00145 {
00146 if (dest.expressionParameters.length() != src.expressionParameters.length()) {
00147 dest.expressionParameters = src.expressionParameters;
00148 return true;
00149 }
00150 for (CORBA::ULong i = 0; i < src.expressionParameters.length(); ++i) {
00151 if (0 != std::strcmp(dest.expressionParameters[i],
00152 src.expressionParameters[i])) {
00153 dest.expressionParameters = src.expressionParameters;
00154 return true;
00155 }
00156 }
00157 return false;
00158 }
00159
00160 }
00161
00162 namespace OpenDDS {
00163 namespace RTPS {
00164 using DCPS::RepoId;
00165
00166 const bool Sedp::host_is_bigendian_(!ACE_CDR_BYTE_ORDER);
00167
00168 Sedp::Sedp(const RepoId& participant_id, Spdp& owner, ACE_Thread_Mutex& lock)
00169 : OpenDDS::DCPS::EndpointManager<SPDPdiscoveredParticipantData>(participant_id, lock)
00170 , spdp_(owner)
00171 , publications_writer_(make_id(participant_id,
00172 ENTITYID_SEDP_BUILTIN_PUBLICATIONS_WRITER),
00173 *this)
00174 , subscriptions_writer_(make_id(participant_id,
00175 ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_WRITER),
00176 *this)
00177 , participant_message_writer_(make_id(participant_id,
00178 ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_WRITER),
00179 *this)
00180 , publications_reader_(new Reader(make_id(participant_id,
00181 ENTITYID_SEDP_BUILTIN_PUBLICATIONS_READER),
00182 *this))
00183 , subscriptions_reader_(new Reader(make_id(participant_id,
00184 ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_READER),
00185 *this))
00186 , participant_message_reader_(new Reader(make_id(participant_id,
00187 ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_READER),
00188 *this))
00189 , task_(this)
00190 , automatic_liveliness_seq_ (DCPS::SequenceNumber::SEQUENCENUMBER_UNKNOWN())
00191 , manual_liveliness_seq_ (DCPS::SequenceNumber::SEQUENCENUMBER_UNKNOWN())
00192 {
00193 pub_bit_key_.value[0] = pub_bit_key_.value[1] = pub_bit_key_.value[2] = 0;
00194 sub_bit_key_.value[0] = sub_bit_key_.value[1] = sub_bit_key_.value[2] = 0;
00195 }
00196
00197 RepoId
00198 Sedp::make_id(const RepoId& participant_id, const EntityId_t& entity)
00199 {
00200 RepoId id = participant_id;
00201 id.entityId = entity;
00202 return id;
00203 }
00204
00205 DDS::ReturnCode_t
00206 Sedp::init(const RepoId& guid, const RtpsDiscovery& disco,
00207 DDS::DomainId_t domainId)
00208 {
00209 char domainStr[16];
00210 ACE_OS::snprintf(domainStr, 16, "%d", domainId);
00211
00212 OPENDDS_STRING key = OpenDDS::DCPS::GuidConverter(guid).uniqueId();
00213
00214
00215 transport_inst_ = TheTransportRegistry->create_inst(
00216 DCPS::TransportRegistry::DEFAULT_INST_PREFIX +
00217 OPENDDS_STRING("_SEDPTransportInst_") + key.c_str() + domainStr,
00218 "rtps_udp");
00219
00220 DCPS::RtpsUdpInst_rch rtps_inst =
00221 DCPS::static_rchandle_cast<DCPS::RtpsUdpInst>(transport_inst_);
00222
00223
00224
00225
00226 static const double HANDSHAKE_MULTIPLIER = 5;
00227 rtps_inst->handshake_timeout_ = disco.resend_period() * HANDSHAKE_MULTIPLIER;
00228
00229 if (disco.sedp_multicast()) {
00230
00231 const u_short mc_port = disco.pb() + disco.dg() * domainId + disco.dx();
00232
00233 OPENDDS_STRING mc_addr = disco.default_multicast_group();
00234 if (rtps_inst->multicast_group_address_.set(mc_port, mc_addr.c_str())) {
00235 ACE_DEBUG((LM_ERROR,
00236 ACE_TEXT("(%P|%t) ERROR: Sedp::init - ")
00237 ACE_TEXT("failed setting multicast local_addr to port %hd\n"),
00238 mc_port));
00239 return DDS::RETCODE_ERROR;
00240 }
00241
00242 rtps_inst->ttl_ = disco.ttl();
00243 rtps_inst->multicast_interface_ = disco.multicast_interface();
00244
00245 } else {
00246 rtps_inst->use_multicast_ = false;
00247 }
00248
00249 const OPENDDS_STRING sedp_addr = disco.sedp_local_address();
00250 if (!sedp_addr.empty()) {
00251 rtps_inst->local_address_config_str_ = sedp_addr;
00252 rtps_inst->local_address_.set(sedp_addr.c_str());
00253 }
00254
00255
00256 OPENDDS_STRING config_name = DCPS::TransportRegistry::DEFAULT_INST_PREFIX +
00257 OPENDDS_STRING("_SEDP_TransportCfg_") + key +
00258 domainStr;
00259 DCPS::TransportConfig_rch transport_cfg =
00260 TheTransportRegistry->create_config(config_name.c_str());
00261 transport_cfg->instances_.push_back(transport_inst_);
00262
00263
00264 rtps_inst->opendds_discovery_default_listener_ = publications_reader_.in();
00265 rtps_inst->opendds_discovery_guid_ = guid;
00266 const bool reliability = true, durability = true;
00267 publications_writer_.enable_transport_using_config(reliability, durability,
00268 transport_cfg);
00269 publications_reader_->enable_transport_using_config(reliability, durability,
00270 transport_cfg);
00271 subscriptions_writer_.enable_transport_using_config(reliability, durability,
00272 transport_cfg);
00273 subscriptions_reader_->enable_transport_using_config(reliability, durability,
00274 transport_cfg);
00275 participant_message_writer_.enable_transport_using_config(reliability, durability,
00276 transport_cfg);
00277 participant_message_reader_->enable_transport_using_config(reliability, durability,
00278 transport_cfg);
00279 return DDS::RETCODE_OK;
00280 }
00281
00282 void
00283 Sedp::unicast_locators(OpenDDS::DCPS::LocatorSeq& locators) const
00284 {
00285 DCPS::RtpsUdpInst_rch rtps_inst =
00286 DCPS::static_rchandle_cast<DCPS::RtpsUdpInst>(transport_inst_);
00287 using namespace OpenDDS::RTPS;
00288
00289 CORBA::ULong idx = 0;
00290
00291
00292 if (rtps_inst->use_multicast_ && rtps_inst->multicast_group_address_ != ACE_INET_Addr()) {
00293 idx = locators.length();
00294 locators.length(idx + 1);
00295 locators[idx].kind = address_to_kind(rtps_inst->multicast_group_address_);
00296 locators[idx].port = rtps_inst->multicast_group_address_.get_port_number();
00297 RTPS::address_to_bytes(locators[idx].address,
00298 rtps_inst->multicast_group_address_);
00299 }
00300
00301
00302
00303 if (rtps_inst->local_address_config_str_.empty() ||
00304 rtps_inst->local_address_config_str_.rfind(':') == 0) {
00305 typedef OPENDDS_VECTOR(ACE_INET_Addr) AddrVector;
00306 AddrVector addrs;
00307 if (TheServiceParticipant->default_address ().empty ()) {
00308 OpenDDS::DCPS::get_interface_addrs(addrs);
00309 } else {
00310 addrs.push_back (ACE_INET_Addr (static_cast<u_short> (0), TheServiceParticipant->default_address ().c_str ()));
00311 }
00312 for (AddrVector::iterator adr_it = addrs.begin(); adr_it != addrs.end(); ++adr_it) {
00313 idx = locators.length();
00314 locators.length(idx + 1);
00315 locators[idx].kind = address_to_kind(*adr_it);
00316 locators[idx].port = rtps_inst->local_address_.get_port_number();
00317 RTPS::address_to_bytes(locators[idx].address,
00318 *adr_it);
00319 }
00320 } else {
00321 idx = locators.length();
00322 locators.length(idx + 1);
00323 locators[idx].kind = address_to_kind(rtps_inst->local_address_);
00324 locators[idx].port = rtps_inst->local_address_.get_port_number();
00325 RTPS::address_to_bytes(locators[idx].address,
00326 rtps_inst->local_address_);
00327 }
00328 }
00329
00330 const ACE_INET_Addr&
00331 Sedp::local_address() const
00332 {
00333 DCPS::RtpsUdpInst_rch rtps_inst =
00334 DCPS::static_rchandle_cast<DCPS::RtpsUdpInst>(transport_inst_);
00335 return rtps_inst->local_address_;
00336 }
00337
00338 const ACE_INET_Addr&
00339 Sedp::multicast_group() const
00340 {
00341 DCPS::RtpsUdpInst_rch rtps_inst =
00342 DCPS::static_rchandle_cast<DCPS::RtpsUdpInst>(transport_inst_);
00343 return rtps_inst->multicast_group_address_;
00344 }
00345 bool
00346 Sedp::map_ipv4_to_ipv6() const
00347 {
00348 bool map = false;
00349 if (local_address().get_type() != AF_INET) {
00350 map = true;
00351 }
00352 return map;
00353 }
00354
00355 void
00356 Sedp::assign_bit_key(DiscoveredPublication& pub)
00357 {
00358 increment_key(pub_bit_key_);
00359 pub_key_to_id_[pub_bit_key_] = pub.writer_data_.writerProxy.remoteWriterGuid;
00360 pub.writer_data_.ddsPublicationData.key = pub_bit_key_;
00361 }
00362
00363 void
00364 Sedp::assign_bit_key(DiscoveredSubscription& sub)
00365 {
00366 increment_key(sub_bit_key_);
00367 sub_key_to_id_[sub_bit_key_] = sub.reader_data_.readerProxy.remoteReaderGuid;
00368 sub.reader_data_.ddsSubscriptionData.key = sub_bit_key_;
00369 }
00370
00371 void
00372 create_association_data_proto(DCPS::AssociationData& proto,
00373 const SPDPdiscoveredParticipantData& pdata) {
00374 proto.publication_transport_priority_ = 0;
00375 proto.remote_reliable_ = true;
00376 proto.remote_durable_ = true;
00377 std::memcpy(proto.remote_id_.guidPrefix, pdata.participantProxy.guidPrefix,
00378 sizeof(GuidPrefix_t));
00379
00380 const OpenDDS::DCPS::LocatorSeq& mll =
00381 pdata.participantProxy.metatrafficMulticastLocatorList;
00382 const OpenDDS::DCPS::LocatorSeq& ull =
00383 pdata.participantProxy.metatrafficUnicastLocatorList;
00384 const CORBA::ULong locator_count = mll.length() + ull.length();
00385
00386 ACE_Message_Block mb_locator(4 + locator_count * sizeof(OpenDDS::DCPS::Locator_t) + 1);
00387 using DCPS::Serializer;
00388 Serializer ser_loc(&mb_locator, ACE_CDR_BYTE_ORDER, Serializer::ALIGN_CDR);
00389 ser_loc << locator_count;
00390
00391 for (CORBA::ULong i = 0; i < mll.length(); ++i) {
00392 ser_loc << mll[i];
00393 }
00394 for (CORBA::ULong i = 0; i < ull.length(); ++i) {
00395 ser_loc << ull[i];
00396 }
00397 ser_loc << ACE_OutputCDR::from_boolean(false);
00398
00399 proto.remote_data_.length(1);
00400 proto.remote_data_[0].transport_type = "rtps_udp";
00401 message_block_to_sequence (mb_locator, proto.remote_data_[0].data);
00402 }
00403
00404 void
00405 Sedp::associate(const SPDPdiscoveredParticipantData& pdata)
00406 {
00407
00408
00409 DCPS::AssociationData proto;
00410 create_association_data_proto(proto, pdata);
00411
00412 const BuiltinEndpointSet_t& avail =
00413 pdata.participantProxy.availableBuiltinEndpoints;
00414
00415
00416 if (avail & DISC_BUILTIN_ENDPOINT_PUBLICATION_ANNOUNCER) {
00417 DCPS::AssociationData peer = proto;
00418 peer.remote_id_.entityId = ENTITYID_SEDP_BUILTIN_PUBLICATIONS_WRITER;
00419 publications_reader_->assoc(peer);
00420 }
00421 if (avail & DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_ANNOUNCER) {
00422 DCPS::AssociationData peer = proto;
00423 peer.remote_id_.entityId = ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_WRITER;
00424 subscriptions_reader_->assoc(peer);
00425 }
00426 if (avail & BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_DATA_WRITER) {
00427 DCPS::AssociationData peer = proto;
00428 peer.remote_id_.entityId = ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_WRITER;
00429 participant_message_reader_->assoc(peer);
00430 }
00431
00432 SPDPdiscoveredParticipantData* dpd =
00433 new SPDPdiscoveredParticipantData(pdata);
00434 task_.enqueue(dpd);
00435 }
00436
00437 void
00438 Sedp::Task::svc_i(const SPDPdiscoveredParticipantData* ppdata)
00439 {
00440 ACE_Auto_Basic_Ptr<const SPDPdiscoveredParticipantData> delete_the_data(ppdata);
00441 const SPDPdiscoveredParticipantData& pdata = *ppdata;
00442
00443
00444 DCPS::AssociationData proto;
00445 create_association_data_proto(proto, pdata);
00446
00447 const BuiltinEndpointSet_t& avail =
00448 pdata.participantProxy.availableBuiltinEndpoints;
00449
00450
00451 if (avail & DISC_BUILTIN_ENDPOINT_PUBLICATION_DETECTOR) {
00452 DCPS::AssociationData peer = proto;
00453 peer.remote_id_.entityId = ENTITYID_SEDP_BUILTIN_PUBLICATIONS_READER;
00454 sedp_->publications_writer_.assoc(peer);
00455 }
00456 if (avail & DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_DETECTOR) {
00457 DCPS::AssociationData peer = proto;
00458 peer.remote_id_.entityId = ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_READER;
00459 sedp_->subscriptions_writer_.assoc(peer);
00460 }
00461 if (avail & BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_DATA_READER) {
00462 DCPS::AssociationData peer = proto;
00463 peer.remote_id_.entityId = ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_READER;
00464 sedp_->participant_message_writer_.assoc(peer);
00465 }
00466
00467
00468
00469
00470 for (DeferredSubscriptionMap::iterator pos = sedp_->deferred_subscriptions_.lower_bound (proto.remote_id_),
00471 limit = sedp_->deferred_subscriptions_.upper_bound (proto.remote_id_);
00472 pos != limit;
00473 ) {
00474 sedp_->data_received (pos->second.first, pos->second.second);
00475 sedp_->deferred_subscriptions_.erase (pos++);
00476 }
00477 for (DeferredPublicationMap::iterator pos = sedp_->deferred_publications_.lower_bound (proto.remote_id_),
00478 limit = sedp_->deferred_publications_.upper_bound (proto.remote_id_);
00479 pos != limit;
00480 ) {
00481 sedp_->data_received (pos->second.first, pos->second.second);
00482 sedp_->deferred_publications_.erase (pos++);
00483 }
00484
00485 ACE_GUARD(ACE_Thread_Mutex, g, sedp_->lock_);
00486 if (spdp_->shutting_down()) { return; }
00487
00488 proto.remote_id_.entityId = ENTITYID_PARTICIPANT;
00489 sedp_->associated_participants_.insert(proto.remote_id_);
00490
00491
00492 if (avail & DISC_BUILTIN_ENDPOINT_PUBLICATION_DETECTOR) {
00493 proto.remote_id_.entityId = ENTITYID_SEDP_BUILTIN_PUBLICATIONS_READER;
00494 sedp_->write_durable_publication_data(proto.remote_id_);
00495 }
00496 if (avail & DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_DETECTOR) {
00497 proto.remote_id_.entityId = ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_READER;
00498 sedp_->write_durable_subscription_data(proto.remote_id_);
00499 }
00500 if (avail & BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_DATA_READER) {
00501 proto.remote_id_.entityId = ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_READER;
00502 sedp_->write_durable_participant_message_data(proto.remote_id_);
00503 }
00504
00505 for (DCPS::RepoIdSet::iterator it = sedp_->defer_match_endpoints_.begin();
00506 it != sedp_->defer_match_endpoints_.end(); ) {
00507 if (0 == std::memcmp(it->guidPrefix, proto.remote_id_.guidPrefix,
00508 sizeof(GuidPrefix_t))) {
00509 OPENDDS_STRING topic;
00510 if (it->entityId.entityKind & 4) {
00511 DiscoveredSubscriptionIter dsi =
00512 sedp_->discovered_subscriptions_.find(*it);
00513 if (dsi != sedp_->discovered_subscriptions_.end()) {
00514 topic = dsi->second.reader_data_.ddsSubscriptionData.topic_name;
00515 }
00516 } else {
00517 DiscoveredPublicationIter dpi =
00518 sedp_->discovered_publications_.find(*it);
00519 if (dpi != sedp_->discovered_publications_.end()) {
00520 topic = dpi->second.writer_data_.ddsPublicationData.topic_name;
00521 }
00522 }
00523 if (DCPS::DCPS_debug_level > 3) {
00524 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) Sedp::AssociateTask::svc - ")
00525 ACE_TEXT("processing deferred endpoints for topic %C\n"),
00526 topic.c_str()));
00527 }
00528 if (!topic.empty()) {
00529 OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator ti =
00530 sedp_->topics_.find(topic);
00531 if (ti != sedp_->topics_.end()) {
00532 if (DCPS::DCPS_debug_level > 3) {
00533 DCPS::GuidConverter conv(*it);
00534 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) Sedp::AssociateTask::svc - ")
00535 ACE_TEXT("calling match_endpoints %C\n"),
00536 OPENDDS_STRING(conv).c_str()));
00537 }
00538 sedp_->match_endpoints(*it, ti->second);
00539 if (spdp_->shutting_down()) { return; }
00540 }
00541 }
00542 sedp_->defer_match_endpoints_.erase(it++);
00543 } else {
00544 ++it;
00545 }
00546 }
00547 }
00548
00549 bool
00550 Sedp::disassociate(const SPDPdiscoveredParticipantData& pdata)
00551 {
00552 RepoId part;
00553 std::memcpy(part.guidPrefix, pdata.participantProxy.guidPrefix,
00554 sizeof(GuidPrefix_t));
00555 part.entityId = ENTITYID_PARTICIPANT;
00556 associated_participants_.erase(part);
00557 const BuiltinEndpointSet_t avail =
00558 pdata.participantProxy.availableBuiltinEndpoints;
00559
00560 {
00561 ACE_Reverse_Lock<ACE_Thread_Mutex> rev_lock(lock_);
00562 ACE_GUARD_RETURN(ACE_Reverse_Lock< ACE_Thread_Mutex>, rg, rev_lock, false);
00563
00564
00565 if (avail & DISC_BUILTIN_ENDPOINT_PUBLICATION_DETECTOR) {
00566 RepoId id = part;
00567 id.entityId = ENTITYID_SEDP_BUILTIN_PUBLICATIONS_READER;
00568 publications_writer_.disassociate(id);
00569 }
00570 if (avail & DISC_BUILTIN_ENDPOINT_PUBLICATION_ANNOUNCER) {
00571 RepoId id = part;
00572 id.entityId = ENTITYID_SEDP_BUILTIN_PUBLICATIONS_WRITER;
00573 publications_reader_->disassociate(id);
00574 }
00575 if (avail & DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_DETECTOR) {
00576 RepoId id = part;
00577 id.entityId = ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_READER;
00578 subscriptions_writer_.disassociate(id);
00579 }
00580 if (avail & DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_ANNOUNCER) {
00581 RepoId id = part;
00582 id.entityId = ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_WRITER;
00583 subscriptions_reader_->disassociate(id);
00584 }
00585 if (avail & BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_DATA_READER) {
00586 RepoId id = part;
00587 id.entityId = ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_READER;
00588 participant_message_writer_.disassociate(id);
00589 }
00590 if (avail & BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_DATA_WRITER) {
00591 RepoId id = part;
00592 id.entityId = ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_WRITER;
00593 participant_message_reader_->disassociate(id);
00594 }
00595
00596 }
00597 if (spdp_.has_discovered_participant(part)) {
00598 remove_entities_belonging_to(discovered_publications_, part);
00599 remove_entities_belonging_to(discovered_subscriptions_, part);
00600 return true;
00601 } else {
00602 return false;
00603 }
00604 }
00605
00606 template<typename Map>
00607 void
00608 Sedp::remove_entities_belonging_to(Map& m, RepoId participant)
00609 {
00610 participant.entityId.entityKey[0] = 0;
00611 participant.entityId.entityKey[1] = 0;
00612 participant.entityId.entityKey[2] = 0;
00613 participant.entityId.entityKind = 0;
00614 for (typename Map::iterator i = m.lower_bound(participant);
00615 i != m.end() && 0 == std::memcmp(i->first.guidPrefix,
00616 participant.guidPrefix,
00617 sizeof(GuidPrefix_t));) {
00618 OPENDDS_STRING topic_name = get_topic_name(i->second);
00619 OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it =
00620 topics_.find(topic_name);
00621 if (top_it != topics_.end()) {
00622 top_it->second.endpoints_.erase(i->first);
00623 if (DCPS::DCPS_debug_level > 3) {
00624 ACE_DEBUG((LM_DEBUG,
00625 ACE_TEXT("(%P|%t) Sedp::remove_entities_belonging_to - ")
00626 ACE_TEXT("calling match_endpoints remove\n")));
00627 }
00628 match_endpoints(i->first, top_it->second, true );
00629 if (spdp_.shutting_down()) { return; }
00630 }
00631 remove_from_bit(i->second);
00632 m.erase(i++);
00633 }
00634 }
00635
00636 void
00637 Sedp::remove_from_bit_i(const DiscoveredPublication& pub)
00638 {
00639 #ifndef DDS_HAS_MINIMUM_BIT
00640 task_.enqueue(Msg::MSG_REMOVE_FROM_PUB_BIT, pub.bit_ih_);
00641 #else
00642 ACE_UNUSED_ARG(pub);
00643 #endif
00644 }
00645
00646 void
00647 Sedp::remove_from_bit_i(const DiscoveredSubscription& sub)
00648 {
00649 #ifndef DDS_HAS_MINIMUM_BIT
00650 task_.enqueue(Msg::MSG_REMOVE_FROM_SUB_BIT, sub.bit_ih_);
00651 #else
00652 ACE_UNUSED_ARG(sub);
00653 #endif
00654 }
00655
00656 void
00657 Sedp::Task::svc_i(Msg::MsgType which_bit, const DDS::InstanceHandle_t bit_ih)
00658 {
00659 #ifndef DDS_HAS_MINIMUM_BIT
00660 switch (which_bit) {
00661 case Msg::MSG_REMOVE_FROM_PUB_BIT: {
00662 DDS::PublicationBuiltinTopicDataDataReaderImpl* bit = sedp_->pub_bit();
00663
00664 if (bit && bit_ih != DDS::HANDLE_NIL) {
00665 bit->set_instance_state(bit_ih,
00666 DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE);
00667 }
00668 break;
00669 }
00670 case Msg::MSG_REMOVE_FROM_SUB_BIT: {
00671 DDS::SubscriptionBuiltinTopicDataDataReaderImpl* bit = sedp_->sub_bit();
00672
00673 if (bit && bit_ih != DDS::HANDLE_NIL) {
00674 bit->set_instance_state(bit_ih,
00675 DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE);
00676 }
00677 break;
00678 }
00679 default:
00680 break;
00681 }
00682 #else
00683 ACE_UNUSED_ARG(which_bit);
00684 ACE_UNUSED_ARG(bit_ih);
00685 #endif
00686 }
00687
00688 #ifndef DDS_HAS_MINIMUM_BIT
00689 DDS::TopicBuiltinTopicDataDataReaderImpl*
00690 Sedp::topic_bit()
00691 {
00692 DDS::Subscriber_var sub = spdp_.bit_subscriber();
00693 if (!sub.in())
00694 return 0;
00695
00696 DDS::DataReader_var d =
00697 sub->lookup_datareader(DCPS::BUILT_IN_TOPIC_TOPIC);
00698 return dynamic_cast<DDS::TopicBuiltinTopicDataDataReaderImpl*>(d.in());
00699 }
00700
00701 DDS::PublicationBuiltinTopicDataDataReaderImpl*
00702 Sedp::pub_bit()
00703 {
00704 DDS::Subscriber_var sub = spdp_.bit_subscriber();
00705 if (!sub.in())
00706 return 0;
00707
00708 DDS::DataReader_var d =
00709 sub->lookup_datareader(DCPS::BUILT_IN_PUBLICATION_TOPIC);
00710 return dynamic_cast<DDS::PublicationBuiltinTopicDataDataReaderImpl*>(d.in());
00711 }
00712
00713 DDS::SubscriptionBuiltinTopicDataDataReaderImpl*
00714 Sedp::sub_bit()
00715 {
00716 DDS::Subscriber_var sub = spdp_.bit_subscriber();
00717 if (!sub.in())
00718 return 0;
00719
00720 DDS::DataReader_var d =
00721 sub->lookup_datareader(DCPS::BUILT_IN_SUBSCRIPTION_TOPIC);
00722 return dynamic_cast<DDS::SubscriptionBuiltinTopicDataDataReaderImpl*>(d.in());
00723 }
00724 #endif
00725
00726 bool
00727 Sedp::update_topic_qos(const RepoId& topicId, const DDS::TopicQos& qos,
00728 OPENDDS_STRING& name)
00729 {
00730 ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, false);
00731 OPENDDS_MAP_CMP(RepoId, OPENDDS_STRING, DCPS::GUID_tKeyLessThan)::iterator iter =
00732 topic_names_.find(topicId);
00733 if (iter == topic_names_.end()) {
00734 return false;
00735 }
00736 name = iter->second;
00737 TopicDetails& topic = topics_[name];
00738 using namespace DCPS;
00739
00740
00741 if (qos.topic_data != topic.qos_.topic_data) {
00742 topic.qos_ = qos;
00743
00744 for (RepoIdSet::iterator topic_endpoints = topic.endpoints_.begin();
00745 topic_endpoints != topic.endpoints_.end(); ++topic_endpoints) {
00746
00747 const RepoId& rid = *topic_endpoints;
00748 EntityKind kind = GuidConverter(rid).entityKind();
00749 if (KIND_WRITER == kind) {
00750
00751 LocalPublicationIter lp = local_publications_.find(rid);
00752 if (lp != local_publications_.end()) {
00753 write_publication_data(rid, lp->second);
00754 }
00755 } else if (KIND_READER == kind) {
00756
00757 LocalSubscriptionIter ls = local_subscriptions_.find(rid);
00758 if (ls != local_subscriptions_.end()) {
00759 write_subscription_data(rid, ls->second);
00760 }
00761 }
00762 }
00763 }
00764
00765 return true;
00766 }
00767
00768 void
00769 Sedp::inconsistent_topic(const DCPS::RepoIdSet& eps) const
00770 {
00771 using DCPS::RepoIdSet;
00772 for (RepoIdSet::const_iterator iter(eps.begin()); iter != eps.end(); ++iter) {
00773 if (0 == std::memcmp(participant_id_.guidPrefix, iter->guidPrefix,
00774 sizeof(GuidPrefix_t))) {
00775 const bool reader = iter->entityId.entityKind & 4;
00776 if (reader) {
00777 const LocalSubscriptionCIter lsi = local_subscriptions_.find(*iter);
00778 if (lsi != local_subscriptions_.end()) {
00779 lsi->second.subscription_->inconsistent_topic();
00780
00781
00782
00783 return;
00784 }
00785 } else {
00786 const LocalPublicationCIter lpi = local_publications_.find(*iter);
00787 if (lpi != local_publications_.end()) {
00788 lpi->second.publication_->inconsistent_topic();
00789 return;
00790 }
00791 }
00792 }
00793 }
00794 }
00795
00796 DDS::ReturnCode_t
00797 Sedp::remove_publication_i(const RepoId& publicationId)
00798 {
00799 return publications_writer_.write_unregister_dispose(publicationId);
00800 }
00801
00802 bool
00803 Sedp::update_publication_qos(const RepoId& publicationId,
00804 const DDS::DataWriterQos& qos,
00805 const DDS::PublisherQos& publisherQos)
00806 {
00807 ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, false);
00808 LocalPublicationIter iter = local_publications_.find(publicationId);
00809 if (iter != local_publications_.end()) {
00810 LocalPublication& pb = iter->second;
00811 pb.qos_ = qos;
00812 pb.publisher_qos_ = publisherQos;
00813
00814 if (DDS::RETCODE_OK != write_publication_data(publicationId, pb)) {
00815 return false;
00816 }
00817
00818 OPENDDS_STRING topic_name = topic_names_[pb.topic_id_];
00819 OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it =
00820 topics_.find(topic_name);
00821 if (top_it != topics_.end()) {
00822 match_endpoints(publicationId, top_it->second);
00823 }
00824 return true;
00825 }
00826 return false;
00827 }
00828
00829 DDS::ReturnCode_t
00830 Sedp::remove_subscription_i(const RepoId& subscriptionId)
00831 {
00832 return subscriptions_writer_.write_unregister_dispose(subscriptionId);
00833 }
00834
00835 bool
00836 Sedp::update_subscription_qos(const RepoId& subscriptionId,
00837 const DDS::DataReaderQos& qos,
00838 const DDS::SubscriberQos& subscriberQos)
00839 {
00840 ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, false);
00841 LocalSubscriptionIter iter = local_subscriptions_.find(subscriptionId);
00842 if (iter != local_subscriptions_.end()) {
00843 LocalSubscription& sb = iter->second;
00844 sb.qos_ = qos;
00845 sb.subscriber_qos_ = subscriberQos;
00846
00847 if (DDS::RETCODE_OK != write_subscription_data(subscriptionId, sb)) {
00848 return false;
00849 }
00850
00851 OPENDDS_STRING topic_name = topic_names_[sb.topic_id_];
00852 OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it =
00853 topics_.find(topic_name);
00854 if (top_it != topics_.end()) {
00855 match_endpoints(subscriptionId, top_it->second);
00856 }
00857 return true;
00858 }
00859 return false;
00860 }
00861
00862 bool
00863 Sedp::update_subscription_params(const RepoId& subId,
00864 const DDS::StringSeq& params)
00865 {
00866 ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, false);
00867 const LocalSubscriptionIter iter = local_subscriptions_.find(subId);
00868 if (iter != local_subscriptions_.end()) {
00869 LocalSubscription& sb = iter->second;
00870 sb.filterProperties.expressionParameters = params;
00871
00872 if (DDS::RETCODE_OK != write_subscription_data(subId, sb)) {
00873 return false;
00874 }
00875
00876
00877 for (DCPS::RepoIdSet::iterator i = iter->second.matched_endpoints_.begin();
00878 i != iter->second.matched_endpoints_.end(); ++i) {
00879 const LocalPublicationIter lpi = local_publications_.find(*i);
00880 if (lpi != local_publications_.end()) {
00881 lpi->second.publication_->update_subscription_params(subId, params);
00882 }
00883 }
00884
00885 return true;
00886 }
00887 return false;
00888 }
00889
00890 void
00891 Sedp::shutdown()
00892 {
00893 task_.shutdown();
00894 publications_reader_->shutting_down_ = 1;
00895 subscriptions_reader_->shutting_down_ = 1;
00896 participant_message_reader_->shutting_down_ = 1;
00897 }
00898
00899 void
00900 Sedp::Task::acknowledge()
00901 {
00902
00903 putq(new Msg(Msg::MSG_FINI_BIT, DCPS::REQUEST_ACK, 0));
00904 }
00905
00906 void
00907 Sedp::Task::shutdown()
00908 {
00909 if (!shutting_down_) {
00910 shutting_down_ = true;
00911 putq(new Msg(Msg::MSG_STOP, DCPS::GRACEFUL_DISCONNECT, 0));
00912 wait();
00913 }
00914 }
00915
00916 void
00917 Sedp::Task::svc_i(DCPS::MessageId message_id,
00918 const OpenDDS::DCPS::DiscoveredWriterData* pwdata)
00919 {
00920 ACE_Auto_Basic_Ptr<const OpenDDS::DCPS::DiscoveredWriterData> delete_the_data(pwdata);
00921 sedp_->data_received(message_id, *pwdata);
00922 }
00923
00924 void
00925 Sedp::data_received(DCPS::MessageId message_id,
00926 const OpenDDS::DCPS::DiscoveredWriterData& wdata)
00927 {
00928 if (spdp_.shutting_down()) { return; }
00929
00930 const RepoId& guid = wdata.writerProxy.remoteWriterGuid;
00931 RepoId guid_participant = guid;
00932 guid_participant.entityId = ENTITYID_PARTICIPANT;
00933
00934 ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00935 if (ignoring(guid)
00936 || ignoring(guid_participant)
00937 || ignoring(wdata.ddsPublicationData.topic_name)) {
00938 return;
00939 }
00940
00941 if (!spdp_.has_discovered_participant (guid_participant)) {
00942 deferred_publications_[guid] = std::make_pair (message_id, wdata);
00943 return;
00944 }
00945
00946 OPENDDS_STRING topic_name;
00947
00948 DiscoveredPublicationIter iter = discovered_publications_.find(guid);
00949
00950 if (message_id == DCPS::SAMPLE_DATA) {
00951 OpenDDS::DCPS::DiscoveredWriterData wdata_copy;
00952
00953 if (iter == discovered_publications_.end()) {
00954
00955 ACE_Reverse_Lock<ACE_Thread_Mutex> rev_lock(lock_);
00956
00957 {
00958 DiscoveredPublication& pub =
00959 discovered_publications_[guid] = DiscoveredPublication(wdata);
00960
00961 topic_name = get_topic_name(pub);
00962 OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it =
00963 topics_.find(topic_name);
00964 if (top_it == topics_.end()) {
00965 top_it =
00966 topics_.insert(std::make_pair(topic_name, TopicDetails())).first;
00967 top_it->second.data_type_ = wdata.ddsPublicationData.type_name;
00968 top_it->second.qos_.topic_data = wdata.ddsPublicationData.topic_data;
00969 top_it->second.repo_id_ = make_topic_guid();
00970
00971 } else if (top_it->second.data_type_ !=
00972 wdata.ddsPublicationData.type_name.in()) {
00973 inconsistent_topic(top_it->second.endpoints_);
00974 if (DCPS::DCPS_debug_level) {
00975 ACE_DEBUG((LM_WARNING,
00976 ACE_TEXT("(%P|%t) Sedp::data_received(dwd) - WARNING ")
00977 ACE_TEXT("topic %C discovered data type %C doesn't ")
00978 ACE_TEXT("match known data type %C, ignoring ")
00979 ACE_TEXT("discovered publication.\n"),
00980 topic_name.c_str(),
00981 wdata.ddsPublicationData.type_name.in(),
00982 top_it->second.data_type_.c_str()));
00983 }
00984 return;
00985 }
00986
00987 TopicDetails& td = top_it->second;
00988 topic_names_[td.repo_id_] = topic_name;
00989 td.endpoints_.insert(guid);
00990
00991 std::memcpy(pub.writer_data_.ddsPublicationData.participant_key.value,
00992 guid.guidPrefix, sizeof(DDS::BuiltinTopicKey_t));
00993 assign_bit_key(pub);
00994 wdata_copy = pub.writer_data_;
00995 }
00996
00997
00998 iter = discovered_publications_.end();
00999
01000 DDS::InstanceHandle_t instance_handle = DDS::HANDLE_NIL;
01001 #ifndef DDS_HAS_MINIMUM_BIT
01002 {
01003
01004 ACE_GUARD(ACE_Reverse_Lock< ACE_Thread_Mutex>, rg, rev_lock);
01005 DDS::PublicationBuiltinTopicDataDataReaderImpl* bit = pub_bit();
01006 if (bit) {
01007 instance_handle =
01008 bit->store_synthetic_data(wdata_copy.ddsPublicationData,
01009 DDS::NEW_VIEW_STATE);
01010 }
01011 }
01012 #endif
01013
01014 if (spdp_.shutting_down()) { return; }
01015
01016 iter = discovered_publications_.find(guid);
01017 if (iter != discovered_publications_.end()) {
01018 iter->second.bit_ih_ = instance_handle;
01019 OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it =
01020 topics_.find(topic_name);
01021 if (top_it != topics_.end()) {
01022 if (DCPS::DCPS_debug_level > 3) {
01023 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) Sedp::data_received(dwd) - ")
01024 ACE_TEXT("calling match_endpoints new\n")));
01025 }
01026 match_endpoints(guid, top_it->second);
01027 }
01028 }
01029
01030 } else if (qosChanged(iter->second.writer_data_.ddsPublicationData,
01031 wdata.ddsPublicationData)) {
01032 #ifndef DDS_HAS_MINIMUM_BIT
01033 DDS::PublicationBuiltinTopicDataDataReaderImpl* bit = pub_bit();
01034 if (bit) {
01035 bit->store_synthetic_data(iter->second.writer_data_.ddsPublicationData,
01036 DDS::NOT_NEW_VIEW_STATE);
01037 }
01038 #endif
01039
01040
01041 topic_name = get_topic_name(iter->second);
01042 OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it =
01043 topics_.find(topic_name);
01044 if (top_it != topics_.end()) {
01045 if (DCPS::DCPS_debug_level > 3) {
01046 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) Sedp::data_received(dwd) - ")
01047 ACE_TEXT("calling match_endpoints update\n")));
01048 }
01049 match_endpoints(guid, top_it->second);
01050 }
01051 }
01052
01053 } else if (message_id == DCPS::UNREGISTER_INSTANCE ||
01054 message_id == DCPS::DISPOSE_INSTANCE ||
01055 message_id == DCPS::DISPOSE_UNREGISTER_INSTANCE) {
01056 if (iter != discovered_publications_.end()) {
01057
01058 topic_name = get_topic_name(iter->second);
01059 OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it =
01060 topics_.find(topic_name);
01061 if (top_it != topics_.end()) {
01062 top_it->second.endpoints_.erase(guid);
01063 match_endpoints(guid, top_it->second, true );
01064 if (spdp_.shutting_down()) { return; }
01065 }
01066 remove_from_bit(iter->second);
01067 if (DCPS::DCPS_debug_level > 3) {
01068 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) Sedp::data_received(dwd) - ")
01069 ACE_TEXT("calling match_endpoints disp/unreg\n")));
01070 }
01071 discovered_publications_.erase(iter);
01072 }
01073 }
01074 }
01075
01076 void
01077 Sedp::Task::svc_i(DCPS::MessageId message_id,
01078 const OpenDDS::DCPS::DiscoveredReaderData* prdata)
01079 {
01080 ACE_Auto_Basic_Ptr<const OpenDDS::DCPS::DiscoveredReaderData> delete_the_data(prdata);
01081 sedp_->data_received(message_id, *prdata);
01082 }
01083
01084 void
01085 Sedp::data_received(DCPS::MessageId message_id,
01086 const OpenDDS::DCPS::DiscoveredReaderData& rdata)
01087 {
01088 if (spdp_.shutting_down()) { return; }
01089
01090 const RepoId& guid = rdata.readerProxy.remoteReaderGuid;
01091 RepoId guid_participant = guid;
01092 guid_participant.entityId = ENTITYID_PARTICIPANT;
01093
01094 ACE_GUARD(ACE_Thread_Mutex, g, lock_);
01095
01096 if (ignoring(guid)
01097 || ignoring(guid_participant)
01098 || ignoring(rdata.ddsSubscriptionData.topic_name)) {
01099 return;
01100 }
01101
01102 if (!spdp_.has_discovered_participant (guid_participant)) {
01103 deferred_subscriptions_[guid] = std::make_pair (message_id, rdata);
01104 return;
01105 }
01106
01107 OPENDDS_STRING topic_name;
01108
01109 DiscoveredSubscriptionIter iter = discovered_subscriptions_.find(guid);
01110
01111
01112 ACE_Reverse_Lock<ACE_Thread_Mutex> rev_lock(lock_);
01113
01114 if (message_id == DCPS::SAMPLE_DATA) {
01115 OpenDDS::DCPS::DiscoveredReaderData rdata_copy;
01116
01117 if (iter == discovered_subscriptions_.end()) {
01118 {
01119 DiscoveredSubscription& sub =
01120 discovered_subscriptions_[guid] = DiscoveredSubscription(rdata);
01121
01122 topic_name = get_topic_name(sub);
01123 OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it =
01124 topics_.find(topic_name);
01125 if (top_it == topics_.end()) {
01126 top_it =
01127 topics_.insert(std::make_pair(topic_name, TopicDetails())).first;
01128 top_it->second.data_type_ = rdata.ddsSubscriptionData.type_name;
01129 top_it->second.qos_.topic_data = rdata.ddsSubscriptionData.topic_data;
01130 top_it->second.repo_id_ = make_topic_guid();
01131
01132 } else if (top_it->second.data_type_ !=
01133 rdata.ddsSubscriptionData.type_name.in()) {
01134 inconsistent_topic(top_it->second.endpoints_);
01135 if (DCPS::DCPS_debug_level) {
01136 ACE_DEBUG((LM_WARNING,
01137 ACE_TEXT("(%P|%t) Sedp::data_received(drd) - WARNING ")
01138 ACE_TEXT("topic %C discovered data type %C doesn't ")
01139 ACE_TEXT("match known data type %C, ignoring ")
01140 ACE_TEXT("discovered subcription.\n"),
01141 topic_name.c_str(),
01142 rdata.ddsSubscriptionData.type_name.in(),
01143 top_it->second.data_type_.c_str()));
01144 }
01145 return;
01146 }
01147
01148 TopicDetails& td = top_it->second;
01149 topic_names_[td.repo_id_] = topic_name;
01150 td.endpoints_.insert(guid);
01151
01152 std::memcpy(sub.reader_data_.ddsSubscriptionData.participant_key.value,
01153 guid.guidPrefix, sizeof(DDS::BuiltinTopicKey_t));
01154 assign_bit_key(sub);
01155 rdata_copy = sub.reader_data_;
01156 }
01157
01158
01159 iter = discovered_subscriptions_.end();
01160
01161 DDS::InstanceHandle_t instance_handle = DDS::HANDLE_NIL;
01162 #ifndef DDS_HAS_MINIMUM_BIT
01163 {
01164
01165 ACE_GUARD(ACE_Reverse_Lock< ACE_Thread_Mutex>, rg, rev_lock);
01166 DDS::SubscriptionBuiltinTopicDataDataReaderImpl* bit = sub_bit();
01167 if (bit) {
01168 instance_handle =
01169 bit->store_synthetic_data(rdata_copy.ddsSubscriptionData,
01170 DDS::NEW_VIEW_STATE);
01171 }
01172 }
01173 #endif
01174
01175 if (spdp_.shutting_down()) { return; }
01176
01177 iter = discovered_subscriptions_.find(guid);
01178 if (iter != discovered_subscriptions_.end()) {
01179 iter->second.bit_ih_ = instance_handle;
01180 OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it =
01181 topics_.find(topic_name);
01182 if (top_it != topics_.end()) {
01183 if (DCPS::DCPS_debug_level > 3) {
01184 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) Sedp::data_received(drd) - ")
01185 ACE_TEXT("calling match_endpoints new\n")));
01186 }
01187 match_endpoints(guid, top_it->second);
01188 }
01189 }
01190
01191 } else {
01192 if (qosChanged(iter->second.reader_data_.ddsSubscriptionData,
01193 rdata.ddsSubscriptionData)) {
01194 #ifndef DDS_HAS_MINIMUM_BIT
01195 DDS::SubscriptionBuiltinTopicDataDataReaderImpl* bit = sub_bit();
01196 if (bit) {
01197 bit->store_synthetic_data(
01198 iter->second.reader_data_.ddsSubscriptionData,
01199 DDS::NOT_NEW_VIEW_STATE);
01200 }
01201 #endif
01202
01203
01204 topic_name = get_topic_name(iter->second);
01205 OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it =
01206 topics_.find(topic_name);
01207 if (top_it != topics_.end()) {
01208 if (DCPS::DCPS_debug_level > 3) {
01209 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) Sedp::data_received(drd) - ")
01210 ACE_TEXT("calling match_endpoints update\n")));
01211 }
01212 match_endpoints(guid, top_it->second);
01213 }
01214 }
01215
01216 if (paramsChanged(iter->second.reader_data_.contentFilterProperty,
01217 rdata.contentFilterProperty)) {
01218
01219 topic_name = get_topic_name(iter->second);
01220 OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it =
01221 topics_.find(topic_name);
01222 using DCPS::RepoIdSet;
01223 const RepoIdSet& assoc =
01224 (top_it == topics_.end()) ? RepoIdSet() : top_it->second.endpoints_;
01225 for (RepoIdSet::const_iterator i = assoc.begin(); i != assoc.end(); ++i) {
01226 if (i->entityId.entityKind & 4) continue;
01227 const LocalPublicationIter lpi = local_publications_.find(*i);
01228 if (lpi != local_publications_.end()) {
01229 lpi->second.publication_->update_subscription_params(guid,
01230 rdata.contentFilterProperty.expressionParameters);
01231 }
01232 }
01233 }
01234 }
01235
01236 CORBA::ULong len = rdata.readerProxy.associatedWriters.length();
01237 for (CORBA::ULong writerIndex = 0; writerIndex < len; ++writerIndex)
01238 {
01239 GUID_t writerGuid = rdata.readerProxy.associatedWriters[writerIndex];
01240
01241
01242 LocalPublicationIter lp = local_publications_.find(writerGuid);
01243 if (lp != local_publications_.end()) {
01244
01245 if (lp->second.remote_opendds_associations_.insert(guid).second) {
01246
01247 lp->second.publication_->association_complete(guid);
01248 }
01249 }
01250 }
01251
01252 } else if (message_id == DCPS::UNREGISTER_INSTANCE ||
01253 message_id == DCPS::DISPOSE_INSTANCE ||
01254 message_id == DCPS::DISPOSE_UNREGISTER_INSTANCE) {
01255 if (iter != discovered_subscriptions_.end()) {
01256
01257 topic_name = get_topic_name(iter->second);
01258 OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it =
01259 topics_.find(topic_name);
01260 if (top_it != topics_.end()) {
01261 top_it->second.endpoints_.erase(guid);
01262 if (DCPS::DCPS_debug_level > 3) {
01263 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) Sedp::data_received(drd) - ")
01264 ACE_TEXT("calling match_endpoints disp/unreg\n")));
01265 }
01266 match_endpoints(guid, top_it->second, true );
01267 if (spdp_.shutting_down()) { return; }
01268 }
01269 remove_from_bit(iter->second);
01270 discovered_subscriptions_.erase(iter);
01271 }
01272 }
01273 }
01274
01275 void
01276 Sedp::Task::svc_i(DCPS::MessageId message_id,
01277 const ParticipantMessageData* data)
01278 {
01279 ACE_Auto_Basic_Ptr<const ParticipantMessageData> delete_the_data(data);
01280 sedp_->data_received(message_id, *data);
01281 }
01282
01283 void
01284 Sedp::data_received(DCPS::MessageId ,
01285 const ParticipantMessageData& data)
01286 {
01287 if (spdp_.shutting_down()) { return; }
01288
01289 const RepoId& guid = data.participantGuid;
01290 RepoId guid_participant = guid;
01291 guid_participant.entityId = ENTITYID_PARTICIPANT;
01292 RepoId prefix = data.participantGuid;
01293 prefix.entityId = EntityId_t();
01294
01295 ACE_GUARD(ACE_Thread_Mutex, g, lock_);
01296
01297 if (ignoring(guid)
01298 || ignoring(guid_participant)) {
01299 return;
01300 }
01301
01302 if (!spdp_.has_discovered_participant (guid_participant)) {
01303 return;
01304 }
01305
01306 for (LocalSubscriptionMap::const_iterator sub_pos = local_subscriptions_.begin(),
01307 sub_limit = local_subscriptions_.end();
01308 sub_pos != sub_limit; ++sub_pos) {
01309 const DCPS::RepoIdSet::const_iterator pos =
01310 sub_pos->second.matched_endpoints_.lower_bound(prefix);
01311 if (pos != sub_pos->second.matched_endpoints_.end() &&
01312 OpenDDS::DCPS::GuidPrefixEqual()(pos->guidPrefix, prefix.guidPrefix)) {
01313 sub_pos->second.subscription_->signal_liveliness(guid_participant);
01314 }
01315 }
01316 }
01317
01318
01319
01320 void
01321 Sedp::association_complete(const RepoId& localId,
01322 const RepoId& remoteId)
01323 {
01324
01325 if (is_opendds(remoteId)) {
01326 LocalSubscriptionIter sub = local_subscriptions_.find(localId);
01327
01328 if (sub != local_subscriptions_.end()) {
01329 std::pair<DCPS::RepoIdSet::iterator, bool> result =
01330 sub->second.remote_opendds_associations_.insert(remoteId);
01331
01332 if (result.second) {
01333
01334 write_subscription_data(localId, sub->second);
01335 }
01336 }
01337 }
01338 }
01339
01340 void
01341 Sedp::signal_liveliness(DDS::LivelinessQosPolicyKind kind)
01342 {
01343 ParticipantMessageData pmd;
01344 pmd.participantGuid = participant_id_;
01345 switch (kind) {
01346 case DDS::AUTOMATIC_LIVELINESS_QOS:
01347 pmd.participantGuid.entityId = OpenDDS::DCPS::EntityIdConverter(PARTICIPANT_MESSAGE_DATA_KIND_AUTOMATIC_LIVELINESS_UPDATE);
01348 participant_message_writer_.write_sample(pmd, GUID_UNKNOWN, automatic_liveliness_seq_);
01349 break;
01350 case DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS:
01351 pmd.participantGuid.entityId = OpenDDS::DCPS::EntityIdConverter(PARTICIPANT_MESSAGE_DATA_KIND_MANUAL_LIVELINESS_UPDATE);
01352 participant_message_writer_.write_sample(pmd, GUID_UNKNOWN, manual_liveliness_seq_);
01353 break;
01354 case DDS::MANUAL_BY_TOPIC_LIVELINESS_QOS:
01355
01356 break;
01357 }
01358 }
01359
01360 Sedp::Endpoint::~Endpoint()
01361 {
01362 }
01363
01364
01365 Sedp::Writer::Writer(const RepoId& pub_id, Sedp& sedp)
01366 : Endpoint(pub_id, sedp),
01367 alloc_(2, sizeof(DCPS::TransportSendElementAllocator))
01368 {
01369 header_.prefix[0] = 'R';
01370 header_.prefix[1] = 'T';
01371 header_.prefix[2] = 'P';
01372 header_.prefix[3] = 'S';
01373 header_.version = PROTOCOLVERSION;
01374 header_.vendorId = VENDORID_OPENDDS;
01375 header_.guidPrefix[0] = pub_id.guidPrefix[0];
01376 header_.guidPrefix[1] = pub_id.guidPrefix[1],
01377 header_.guidPrefix[2] = pub_id.guidPrefix[2];
01378 header_.guidPrefix[3] = pub_id.guidPrefix[3];
01379 header_.guidPrefix[4] = pub_id.guidPrefix[4];
01380 header_.guidPrefix[5] = pub_id.guidPrefix[5];
01381 header_.guidPrefix[6] = pub_id.guidPrefix[6];
01382 header_.guidPrefix[7] = pub_id.guidPrefix[7];
01383 header_.guidPrefix[8] = pub_id.guidPrefix[8];
01384 header_.guidPrefix[9] = pub_id.guidPrefix[9];
01385 header_.guidPrefix[10] = pub_id.guidPrefix[10];
01386 header_.guidPrefix[11] = pub_id.guidPrefix[11];
01387 }
01388
01389 Sedp::Writer::~Writer()
01390 {
01391 }
01392
01393 bool
01394 Sedp::Writer::assoc(const DCPS::AssociationData& subscription)
01395 {
01396 return associate(subscription, true);
01397 }
01398
01399 void
01400 Sedp::Writer::data_delivered(const DCPS::DataSampleElement* dsle)
01401 {
01402 delete dsle;
01403 }
01404
01405 void
01406 Sedp::Writer::data_dropped(const DCPS::DataSampleElement* dsle, bool)
01407 {
01408 delete dsle;
01409 }
01410
01411 void
01412 Sedp::Writer::control_delivered(ACE_Message_Block* mb)
01413 {
01414 if (mb->flags() == ACE_Message_Block::DONT_DELETE) {
01415
01416 delete mb->cont();
01417 } else {
01418 mb->release();
01419 }
01420 }
01421
01422 void
01423 Sedp::Writer::control_dropped(ACE_Message_Block* mb, bool)
01424 {
01425 if (mb->flags() == ACE_Message_Block::DONT_DELETE) {
01426
01427 delete mb->cont();
01428 } else {
01429 mb->release();
01430 }
01431 }
01432
01433 DDS::ReturnCode_t
01434 Sedp::Writer::write_sample(const ParameterList& plist,
01435 const DCPS::RepoId& reader,
01436 DCPS::SequenceNumber& sequence)
01437 {
01438 DDS::ReturnCode_t result = DDS::RETCODE_OK;
01439
01440
01441 size_t size = 0, padding = 0;
01442 DCPS::find_size_ulong(size, padding);
01443 DCPS::gen_find_size(plist, size, padding);
01444
01445
01446 ACE_Message_Block payload(DCPS::DataSampleHeader::max_marshaled_size(),
01447 ACE_Message_Block::MB_DATA,
01448 new ACE_Message_Block(size));
01449 using DCPS::Serializer;
01450 Serializer ser(payload.cont(), host_is_bigendian_, Serializer::ALIGN_CDR);
01451 bool ok = (ser << ACE_OutputCDR::from_octet(0)) &&
01452 (ser << ACE_OutputCDR::from_octet(3)) &&
01453 (ser << ACE_OutputCDR::from_octet(0)) &&
01454 (ser << ACE_OutputCDR::from_octet(0)) &&
01455 (ser << plist);
01456 if (!ok) {
01457 result = DDS::RETCODE_ERROR;
01458 }
01459
01460 if (result == DDS::RETCODE_OK) {
01461
01462 DCPS::DataSampleElement* list_el =
01463 new DCPS::DataSampleElement(repo_id_, this, 0, &alloc_, 0);
01464 set_header_fields(list_el->get_header(), size, reader, sequence);
01465
01466 list_el->set_sample(new ACE_Message_Block(size));
01467 *list_el->get_sample() << list_el->get_header();
01468 list_el->get_sample()->cont(payload.duplicate());
01469
01470 if (reader != GUID_UNKNOWN) {
01471 list_el->set_sub_id(0, reader);
01472 list_el->set_num_subs(1);
01473 }
01474
01475 DCPS::SendStateDataSampleList list;
01476 list.enqueue_tail(list_el);
01477
01478 send(list);
01479 }
01480 delete payload.cont();
01481 return result;
01482 }
01483
01484 DDS::ReturnCode_t
01485 Sedp::Writer::write_sample(const ParticipantMessageData& pmd,
01486 const DCPS::RepoId& reader,
01487 DCPS::SequenceNumber& sequence)
01488 {
01489 DDS::ReturnCode_t result = DDS::RETCODE_OK;
01490
01491
01492 size_t size = 0, padding = 0;
01493 DCPS::find_size_ulong(size, padding);
01494 DCPS::gen_find_size(pmd, size, padding);
01495
01496
01497 ACE_Message_Block payload(DCPS::DataSampleHeader::max_marshaled_size(),
01498 ACE_Message_Block::MB_DATA,
01499 new ACE_Message_Block(size));
01500 using DCPS::Serializer;
01501 Serializer ser(payload.cont(), host_is_bigendian_, Serializer::ALIGN_CDR);
01502 bool ok = (ser << ACE_OutputCDR::from_octet(0)) &&
01503 (ser << ACE_OutputCDR::from_octet(1)) &&
01504 (ser << ACE_OutputCDR::from_octet(0)) &&
01505 (ser << ACE_OutputCDR::from_octet(0)) &&
01506 (ser << pmd);
01507 if (!ok) {
01508 result = DDS::RETCODE_ERROR;
01509 }
01510
01511 if (result == DDS::RETCODE_OK) {
01512
01513 DCPS::DataSampleElement* list_el =
01514 new DCPS::DataSampleElement(repo_id_, this, 0, &alloc_, 0);
01515 set_header_fields(list_el->get_header(), size, reader, sequence);
01516
01517 list_el->set_sample(new ACE_Message_Block(size));
01518 *list_el->get_sample() << list_el->get_header();
01519 list_el->get_sample()->cont(payload.duplicate());
01520
01521 if (reader != GUID_UNKNOWN) {
01522 list_el->set_sub_id(0, reader);
01523 list_el->set_num_subs(1);
01524 }
01525
01526 DCPS::SendStateDataSampleList list;
01527 list.enqueue_tail(list_el);
01528
01529 send(list);
01530 }
01531 delete payload.cont();
01532 return result;
01533 }
01534
01535 DDS::ReturnCode_t
01536 Sedp::Writer::write_unregister_dispose(const RepoId& rid)
01537 {
01538
01539 Parameter param;
01540 param.guid(rid);
01541 param._d(PID_ENDPOINT_GUID);
01542 ParameterList plist;
01543 plist.length(1);
01544 plist[0] = param;
01545
01546
01547 size_t size = 0, padding = 0;
01548 DCPS::find_size_ulong(size, padding);
01549 DCPS::gen_find_size(plist, size, padding);
01550
01551 ACE_Message_Block* payload = new ACE_Message_Block(DCPS::DataSampleHeader::max_marshaled_size(),
01552 ACE_Message_Block::MB_DATA,
01553 new ACE_Message_Block(size));
01554
01555 if (!payload) {
01556 ACE_DEBUG((LM_ERROR,
01557 ACE_TEXT("(%P|%t) ERROR: Sedp::Writer::write_unregister_dispose")
01558 ACE_TEXT(" - Failed to allocate message block message\n")));
01559 return DDS::RETCODE_ERROR;
01560 }
01561
01562 using DCPS::Serializer;
01563 Serializer ser(payload->cont(), host_is_bigendian_, Serializer::ALIGN_CDR);
01564 bool ok = (ser << ACE_OutputCDR::from_octet(0)) &&
01565 (ser << ACE_OutputCDR::from_octet(3)) &&
01566 (ser << ACE_OutputCDR::from_octet(0)) &&
01567 (ser << ACE_OutputCDR::from_octet(0)) &&
01568 (ser << plist);
01569
01570 if (ok) {
01571
01572 write_control_msg(*payload, size, DCPS::DISPOSE_UNREGISTER_INSTANCE);
01573 return DDS::RETCODE_OK;
01574 } else {
01575
01576 ACE_DEBUG((LM_ERROR,
01577 ACE_TEXT("(%P|%t) ERROR: Sedp::Writer::write_unregister_dispose")
01578 ACE_TEXT(" - Failed to serialize RTPS control message\n")));
01579 return DDS::RETCODE_ERROR;
01580 }
01581 }
01582
01583 void
01584 Sedp::Writer::end_historic_samples(const DCPS::RepoId& reader)
01585 {
01586 const void* pReader = static_cast<const void*>(&reader);
01587 ACE_Message_Block mb(DCPS::DataSampleHeader::max_marshaled_size(),
01588 ACE_Message_Block::MB_DATA,
01589 new ACE_Message_Block(static_cast<const char*>(pReader),
01590 sizeof(reader)));
01591 mb.set_flags(ACE_Message_Block::DONT_DELETE);
01592 mb.cont()->wr_ptr(sizeof(reader));
01593
01594 write_control_msg(mb, sizeof(reader), DCPS::END_HISTORIC_SAMPLES,
01595 DCPS::SequenceNumber::SEQUENCENUMBER_UNKNOWN());
01596 }
01597
01598 void
01599 Sedp::Writer::write_control_msg(ACE_Message_Block& payload,
01600 size_t size,
01601 DCPS::MessageId id,
01602 DCPS::SequenceNumber seq)
01603 {
01604 DCPS::DataSampleHeader header;
01605 set_header_fields(header, size, GUID_UNKNOWN, seq, id);
01606
01607 send_control(header, &payload);
01608 }
01609
01610 void
01611 Sedp::Writer::set_header_fields(DCPS::DataSampleHeader& dsh,
01612 size_t size,
01613 const DCPS::RepoId& reader,
01614 DCPS::SequenceNumber& sequence,
01615 DCPS::MessageId id)
01616 {
01617 dsh.message_id_ = id;
01618 dsh.byte_order_ = ACE_CDR_BYTE_ORDER;
01619 dsh.message_length_ = static_cast<ACE_UINT32>(size);
01620 dsh.publication_id_ = repo_id_;
01621
01622 if (reader == GUID_UNKNOWN ||
01623 sequence == DCPS::SequenceNumber::SEQUENCENUMBER_UNKNOWN()) {
01624 sequence = seq_++;
01625 }
01626
01627 if (reader != GUID_UNKNOWN) {
01628
01629 dsh.historic_sample_ = true;
01630 }
01631
01632 dsh.sequence_ = sequence;
01633
01634 const ACE_Time_Value now = ACE_OS::gettimeofday();
01635 dsh.source_timestamp_sec_ = static_cast<ACE_INT32>(now.sec());
01636 dsh.source_timestamp_nanosec_ = now.usec() * 1000;
01637 }
01638
01639
01640
01641 Sedp::Reader::~Reader()
01642 {}
01643
01644 bool
01645 Sedp::Reader::assoc(const DCPS::AssociationData& publication)
01646 {
01647 return associate(publication, false);
01648 }
01649
01650
01651
01652
01653 static bool
01654 decode_parameter_list(const DCPS::ReceivedDataSample& sample,
01655 DCPS::Serializer& ser,
01656 const ACE_CDR::Octet& encap,
01657 ParameterList& data)
01658 {
01659 bool ok = true;
01660 if (ok && sample.header_.key_fields_only_ && encap < 2) {
01661 GUID_t guid;
01662 ok &= (ser >> guid);
01663 data.length(1);
01664 data[0].guid(guid);
01665 data[0]._d(PID_ENDPOINT_GUID);
01666 } else {
01667 ok &= (ser >> data);
01668 }
01669 return ok;
01670 }
01671
01672 void
01673 Sedp::Reader::data_received(const DCPS::ReceivedDataSample& sample)
01674 {
01675 if (shutting_down_.value()) return;
01676
01677 switch (sample.header_.message_id_) {
01678 case DCPS::SAMPLE_DATA:
01679 case DCPS::DISPOSE_INSTANCE:
01680 case DCPS::UNREGISTER_INSTANCE:
01681 case DCPS::DISPOSE_UNREGISTER_INSTANCE: {
01682 const DCPS::MessageId id =
01683 static_cast<DCPS::MessageId>(sample.header_.message_id_);
01684
01685 DCPS::Serializer ser(sample.sample_,
01686 sample.header_.byte_order_ != ACE_CDR_BYTE_ORDER,
01687 DCPS::Serializer::ALIGN_CDR);
01688 bool ok = true;
01689 ACE_CDR::Octet encap, dummy;
01690 ACE_CDR::UShort options;
01691 ok &= (ser >> ACE_InputCDR::to_octet(dummy))
01692 && (ser >> ACE_InputCDR::to_octet(encap))
01693 && (ser >> options);
01694
01695
01696
01697
01698 if (sample.header_.publication_id_.entityId == ENTITYID_SEDP_BUILTIN_PUBLICATIONS_WRITER) {
01699 ParameterList data;
01700 if (!decode_parameter_list(sample, ser, encap, data)) {
01701 ACE_DEBUG((LM_ERROR, ACE_TEXT("ERROR: Sedp::Reader::data_received - ")
01702 ACE_TEXT("failed to deserialize data\n")));
01703 return;
01704 }
01705
01706 ACE_Auto_Ptr<OpenDDS::DCPS::DiscoveredWriterData> wdata(new OpenDDS::DCPS::DiscoveredWriterData);
01707 if (ParameterListConverter::from_param_list(data, *wdata) < 0) {
01708 ACE_DEBUG((LM_ERROR,
01709 ACE_TEXT("(%P|%t) ERROR: Sedp::Reader::data_received - ")
01710 ACE_TEXT("failed to convert from ParameterList ")
01711 ACE_TEXT("to DiscoveredWriterData\n")));
01712 return;
01713 }
01714 sedp_.task_.enqueue(id, wdata.release());
01715
01716 } else if (sample.header_.publication_id_.entityId == ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_WRITER) {
01717 ParameterList data;
01718 if (!decode_parameter_list(sample, ser, encap, data)) {
01719 ACE_DEBUG((LM_ERROR, ACE_TEXT("ERROR: Sedp::Reader::data_received - ")
01720 ACE_TEXT("failed to deserialize data\n")));
01721 return;
01722 }
01723
01724 ACE_Auto_Ptr<OpenDDS::DCPS::DiscoveredReaderData> rdata(new OpenDDS::DCPS::DiscoveredReaderData);
01725 if (ParameterListConverter::from_param_list(data, *rdata) < 0) {
01726 ACE_DEBUG((LM_ERROR,
01727 ACE_TEXT("(%P|%t) ERROR Sedp::Reader::data_received - ")
01728 ACE_TEXT("failed to convert from ParameterList ")
01729 ACE_TEXT("to DiscoveredReaderData\n")));
01730 return;
01731 }
01732 if (rdata->readerProxy.expectsInlineQos) {
01733 set_inline_qos(rdata->readerProxy.allLocators);
01734 }
01735 sedp_.task_.enqueue(id, rdata.release());
01736
01737 } else if (sample.header_.publication_id_.entityId == ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_WRITER
01738 && !sample.header_.key_fields_only_) {
01739 ACE_Auto_Ptr<ParticipantMessageData> data(new ParticipantMessageData);
01740 if (!(ser >> *data)) {
01741 ACE_DEBUG((LM_ERROR, ACE_TEXT("ERROR: Sedp::Reader::data_received - ")
01742 ACE_TEXT("failed to deserialize data\n")));
01743 return;
01744 }
01745
01746 sedp_.task_.enqueue(id, data.release());
01747
01748 }
01749 }
01750 break;
01751
01752 default:
01753 break;
01754 }
01755 }
01756
01757 void
01758 Sedp::populate_discovered_writer_msg(
01759 OpenDDS::DCPS::DiscoveredWriterData& dwd,
01760 const RepoId& publication_id,
01761 const LocalPublication& pub)
01762 {
01763
01764
01765 OPENDDS_STRING topic_name = topic_names_[pub.topic_id_];
01766 dwd.ddsPublicationData.topic_name = topic_name.c_str();
01767 TopicDetails& topic_details = topics_[topic_name];
01768 dwd.ddsPublicationData.type_name = topic_details.data_type_.c_str();
01769 dwd.ddsPublicationData.durability = pub.qos_.durability;
01770 dwd.ddsPublicationData.durability_service = pub.qos_.durability_service;
01771 dwd.ddsPublicationData.deadline = pub.qos_.deadline;
01772 dwd.ddsPublicationData.latency_budget = pub.qos_.latency_budget;
01773 dwd.ddsPublicationData.liveliness = pub.qos_.liveliness;
01774 dwd.ddsPublicationData.reliability = pub.qos_.reliability;
01775 dwd.ddsPublicationData.lifespan = pub.qos_.lifespan;
01776 dwd.ddsPublicationData.user_data = pub.qos_.user_data;
01777 dwd.ddsPublicationData.ownership = pub.qos_.ownership;
01778 dwd.ddsPublicationData.ownership_strength = pub.qos_.ownership_strength;
01779 dwd.ddsPublicationData.destination_order = pub.qos_.destination_order;
01780 dwd.ddsPublicationData.presentation = pub.publisher_qos_.presentation;
01781 dwd.ddsPublicationData.partition = pub.publisher_qos_.partition;
01782 dwd.ddsPublicationData.topic_data = topic_details.qos_.topic_data;
01783 dwd.ddsPublicationData.group_data = pub.publisher_qos_.group_data;
01784 dwd.writerProxy.remoteWriterGuid = publication_id;
01785
01786
01787 dwd.writerProxy.allLocators = pub.trans_info_;
01788 }
01789
01790 void
01791 Sedp::populate_discovered_reader_msg(
01792 OpenDDS::DCPS::DiscoveredReaderData& drd,
01793 const RepoId& subscription_id,
01794 const LocalSubscription& sub)
01795 {
01796
01797
01798 OPENDDS_STRING topic_name = topic_names_[sub.topic_id_];
01799 drd.ddsSubscriptionData.topic_name = topic_name.c_str();
01800 TopicDetails& topic_details = topics_[topic_name];
01801 drd.ddsSubscriptionData.type_name = topic_details.data_type_.c_str();
01802 drd.ddsSubscriptionData.durability = sub.qos_.durability;
01803 drd.ddsSubscriptionData.deadline = sub.qos_.deadline;
01804 drd.ddsSubscriptionData.latency_budget = sub.qos_.latency_budget;
01805 drd.ddsSubscriptionData.liveliness = sub.qos_.liveliness;
01806 drd.ddsSubscriptionData.reliability = sub.qos_.reliability;
01807 drd.ddsSubscriptionData.ownership = sub.qos_.ownership;
01808 drd.ddsSubscriptionData.destination_order = sub.qos_.destination_order;
01809 drd.ddsSubscriptionData.user_data = sub.qos_.user_data;
01810 drd.ddsSubscriptionData.time_based_filter = sub.qos_.time_based_filter;
01811 drd.ddsSubscriptionData.presentation = sub.subscriber_qos_.presentation;
01812 drd.ddsSubscriptionData.partition = sub.subscriber_qos_.partition;
01813 drd.ddsSubscriptionData.topic_data = topic_details.qos_.topic_data;
01814 drd.ddsSubscriptionData.group_data = sub.subscriber_qos_.group_data;
01815 drd.readerProxy.remoteReaderGuid = subscription_id;
01816 drd.readerProxy.expectsInlineQos = false;
01817
01818
01819 drd.readerProxy.allLocators = sub.trans_info_;
01820 drd.contentFilterProperty.contentFilteredTopicName =
01821 OPENDDS_STRING(DCPS::GuidConverter(subscription_id)).c_str();
01822 drd.contentFilterProperty.relatedTopicName = topic_name.c_str();
01823 drd.contentFilterProperty.filterClassName = "";
01824 drd.contentFilterProperty.filterExpression = sub.filterProperties.filterExpression;
01825 drd.contentFilterProperty.expressionParameters = sub.filterProperties.expressionParameters;
01826 for (DCPS::RepoIdSet::const_iterator writer =
01827 sub.remote_opendds_associations_.begin();
01828 writer != sub.remote_opendds_associations_.end();
01829 ++writer)
01830 {
01831 CORBA::ULong len = drd.readerProxy.associatedWriters.length();
01832 drd.readerProxy.associatedWriters.length(len + 1);
01833 drd.readerProxy.associatedWriters[len] = *writer;
01834 }
01835 }
01836
01837 void
01838 Sedp::write_durable_publication_data(const DCPS::RepoId& reader)
01839 {
01840 LocalPublicationIter pub, end = local_publications_.end();
01841 for (pub = local_publications_.begin(); pub != end; ++pub) {
01842 write_publication_data(pub->first, pub->second, reader);
01843 }
01844 publications_writer_.end_historic_samples(reader);
01845 }
01846
01847 void
01848 Sedp::write_durable_subscription_data(const DCPS::RepoId& reader)
01849 {
01850 LocalSubscriptionIter sub, end = local_subscriptions_.end();
01851 for (sub = local_subscriptions_.begin(); sub != end; ++sub) {
01852 write_subscription_data(sub->first, sub->second, reader);
01853 }
01854 subscriptions_writer_.end_historic_samples(reader);
01855 }
01856
01857 void
01858 Sedp::write_durable_participant_message_data(const DCPS::RepoId& reader)
01859 {
01860 LocalParticipantMessageIter part, end = local_participant_messages_.end();
01861 for (part = local_participant_messages_.begin(); part != end; ++part) {
01862 write_participant_message_data(part->first, part->second, reader);
01863 }
01864 participant_message_writer_.end_historic_samples(reader);
01865 }
01866
01867 DDS::ReturnCode_t
01868 Sedp::write_publication_data(
01869 const RepoId& rid,
01870 LocalPublication& lp,
01871 const DCPS::RepoId& reader)
01872 {
01873 DDS::ReturnCode_t result = DDS::RETCODE_OK;
01874 if (spdp_.associated() && (reader != GUID_UNKNOWN ||
01875 !associated_participants_.empty())) {
01876 OpenDDS::DCPS::DiscoveredWriterData dwd;
01877 ParameterList plist;
01878 populate_discovered_writer_msg(dwd, rid, lp);
01879
01880
01881 if (ParameterListConverter::to_param_list(dwd, plist, map_ipv4_to_ipv6())) {
01882 ACE_DEBUG((LM_ERROR,
01883 ACE_TEXT("(%P|%t) ERROR: Sedp::write_publication_data - ")
01884 ACE_TEXT("Failed to convert DiscoveredWriterData ")
01885 ACE_TEXT(" to ParameterList\n")));
01886 result = DDS::RETCODE_ERROR;
01887 }
01888 if (DDS::RETCODE_OK == result) {
01889 result = publications_writer_.write_sample(plist, reader, lp.sequence_);
01890 }
01891 } else if (DCPS::DCPS_debug_level > 3) {
01892 ACE_DEBUG((LM_INFO, ACE_TEXT("(%P|%t) Sedp::write_publication_data - ")
01893 ACE_TEXT("not currently associated, dropping msg.\n")));
01894 }
01895 return result;
01896 }
01897
01898 DDS::ReturnCode_t
01899 Sedp::write_subscription_data(
01900 const RepoId& rid,
01901 LocalSubscription& ls,
01902 const DCPS::RepoId& reader)
01903 {
01904 DDS::ReturnCode_t result = DDS::RETCODE_OK;
01905 if (spdp_.associated() && (reader != GUID_UNKNOWN ||
01906 !associated_participants_.empty())) {
01907 OpenDDS::DCPS::DiscoveredReaderData drd;
01908 ParameterList plist;
01909 populate_discovered_reader_msg(drd, rid, ls);
01910
01911
01912 if (ParameterListConverter::to_param_list(drd, plist, map_ipv4_to_ipv6())) {
01913 ACE_DEBUG((LM_ERROR,
01914 ACE_TEXT("(%P|%t) ERROR: Sedp::write_subscription_data - ")
01915 ACE_TEXT("Failed to convert DiscoveredReaderData ")
01916 ACE_TEXT("to ParameterList\n")));
01917 result = DDS::RETCODE_ERROR;
01918 }
01919 if (DDS::RETCODE_OK == result) {
01920 result = subscriptions_writer_.write_sample(plist, reader, ls.sequence_);
01921 }
01922 } else if (DCPS::DCPS_debug_level > 3) {
01923 ACE_DEBUG((LM_INFO, ACE_TEXT("(%P|%t) Sedp::write_subscription_data - ")
01924 ACE_TEXT("not currently associated, dropping msg.\n")));
01925 }
01926 return result;
01927 }
01928
01929 DDS::ReturnCode_t
01930 Sedp::write_participant_message_data(
01931 const RepoId& rid,
01932 LocalParticipantMessage& pm,
01933 const DCPS::RepoId& reader)
01934 {
01935 DDS::ReturnCode_t result = DDS::RETCODE_OK;
01936 if (spdp_.associated() && (reader != GUID_UNKNOWN ||
01937 !associated_participants_.empty())) {
01938 ParticipantMessageData pmd;
01939 pmd.participantGuid = rid;
01940 result = participant_message_writer_.write_sample(pmd, reader, pm.sequence_);
01941 } else if (DCPS::DCPS_debug_level > 3) {
01942 ACE_DEBUG((LM_INFO, ACE_TEXT("(%P|%t) Sedp::write_participant_message_data - ")
01943 ACE_TEXT("not currently associated, dropping msg.\n")));
01944 }
01945 return result;
01946 }
01947
01948 void
01949 Sedp::set_inline_qos(DCPS::TransportLocatorSeq& locators)
01950 {
01951 const OPENDDS_STRING rtps_udp = "rtps_udp";
01952 for (CORBA::ULong i = 0; i < locators.length(); ++i) {
01953 if (locators[i].transport_type.in() == rtps_udp) {
01954 const CORBA::ULong len = locators[i].data.length();
01955 locators[i].data.length(len + 1);
01956 locators[i].data[len] = CORBA::Octet(1);
01957 }
01958 }
01959 }
01960
01961 void
01962 Sedp::acknowledge()
01963 {
01964 task_.acknowledge();
01965 }
01966
01967 void
01968 Sedp::Task::enqueue(const SPDPdiscoveredParticipantData* pdata)
01969 {
01970 if (spdp_->shutting_down()) { return; }
01971 putq(new Msg(Msg::MSG_PARTICIPANT, DCPS::SAMPLE_DATA, pdata));
01972 }
01973
01974 void
01975 Sedp::Task::enqueue(DCPS::MessageId id, const OpenDDS::DCPS::DiscoveredWriterData* wdata)
01976 {
01977 if (spdp_->shutting_down()) { return; }
01978 putq(new Msg(Msg::MSG_WRITER, id, wdata));
01979 }
01980
01981 void
01982 Sedp::Task::enqueue(DCPS::MessageId id, const OpenDDS::DCPS::DiscoveredReaderData* rdata)
01983 {
01984 if (spdp_->shutting_down()) { return; }
01985 putq(new Msg(Msg::MSG_READER, id, rdata));
01986 }
01987
01988 void
01989 Sedp::Task::enqueue(DCPS::MessageId id, const ParticipantMessageData* data)
01990 {
01991 if (spdp_->shutting_down()) { return; }
01992 putq(new Msg(Msg::MSG_PARTICIPANT_DATA, id, data));
01993 }
01994
01995 void
01996 Sedp::Task::enqueue(Msg::MsgType which_bit, const DDS::InstanceHandle_t bit_ih)
01997 {
01998 #ifndef DDS_HAS_MINIMUM_BIT
01999 if (spdp_->shutting_down()) { return; }
02000 putq(new Msg(which_bit, DCPS::DISPOSE_INSTANCE, bit_ih));
02001 #else
02002 ACE_UNUSED_ARG(which_bit);
02003 ACE_UNUSED_ARG(bit_ih);
02004 #endif
02005 }
02006
02007 int
02008 Sedp::Task::svc()
02009 {
02010 for (Msg* msg = 0; getq(msg) != -1; ) {
02011 if (DCPS::DCPS_debug_level > 5) {
02012 ACE_DEBUG((LM_DEBUG, "(%P|%t) Sedp::Task::svc "
02013 "got message from queue type %d\n", msg->type_));
02014 }
02015 ACE_Auto_Basic_Ptr<Msg> delete_the_msg(msg);
02016 switch (msg->type_) {
02017 case Msg::MSG_PARTICIPANT:
02018 svc_i(msg->dpdata_);
02019 break;
02020 case Msg::MSG_WRITER:
02021 svc_i(msg->id_, msg->wdata_);
02022 break;
02023 case Msg::MSG_READER:
02024 svc_i(msg->id_, msg->rdata_);
02025 break;
02026 case Msg::MSG_PARTICIPANT_DATA:
02027 svc_i(msg->id_, msg->pmdata_);
02028 break;
02029 case Msg::MSG_REMOVE_FROM_PUB_BIT:
02030 case Msg::MSG_REMOVE_FROM_SUB_BIT:
02031 svc_i(msg->type_, msg->ih_);
02032 break;
02033 case Msg::MSG_FINI_BIT:
02034
02035
02036
02037 spdp_->wait_for_acks().ack();
02038 break;
02039 case Msg::MSG_STOP:
02040 if (DCPS::DCPS_debug_level > 3) {
02041 ACE_DEBUG((LM_INFO, ACE_TEXT("(%P|%t) Sedp::Task::svc - ")
02042 ACE_TEXT("received MSG_STOP. Task exiting\n")));
02043 }
02044 return 0;
02045 }
02046 if (DCPS::DCPS_debug_level > 5) {
02047 ACE_DEBUG((LM_DEBUG, "(%P|%t) Sedp::Task::svc done with message\n"));
02048 }
02049 }
02050 if (DCPS::DCPS_debug_level > 3) {
02051 ACE_DEBUG((LM_INFO, ACE_TEXT("(%P|%t) Sedp::Task::svc - ")
02052 ACE_TEXT("Task exiting.\n")));
02053 }
02054 return 0;
02055 }
02056
02057 Sedp::Task::~Task()
02058 {
02059 shutdown();
02060 }
02061
02062 bool
02063 Sedp::shutting_down() const
02064 {
02065 return spdp_.shutting_down();
02066 }
02067
02068 void
02069 Sedp::populate_transport_locator_sequence(DCPS::TransportLocatorSeq*& rTls,
02070 DiscoveredSubscriptionIter& dsi,
02071 const DCPS::RepoId& reader)
02072 {
02073 OpenDDS::DCPS::LocatorSeq locs;
02074 bool participantExpectsInlineQos = false;
02075 RepoId remote_participant = reader;
02076 remote_participant.entityId = ENTITYID_PARTICIPANT;
02077 const bool participant_found =
02078 spdp_.get_default_locators(remote_participant, locs,
02079 participantExpectsInlineQos);
02080 if (!rTls->length()) {
02081 if (!participant_found) {
02082 defer_match_endpoints_.insert(reader);
02083 return;
02084 } else if (locs.length()) {
02085 size_t size = 0, padding = 0;
02086 DCPS::gen_find_size(locs, size, padding);
02087
02088 ACE_Message_Block mb_locator(size + 1);
02089 using DCPS::Serializer;
02090 Serializer ser_loc(&mb_locator,
02091 ACE_CDR_BYTE_ORDER,
02092 Serializer::ALIGN_CDR);
02093 ser_loc << locs;
02094 const bool readerExpectsInlineQos =
02095 dsi->second.reader_data_.readerProxy.expectsInlineQos;
02096 ser_loc << ACE_OutputCDR::from_boolean(participantExpectsInlineQos
02097 || readerExpectsInlineQos);
02098
02099 DCPS::TransportLocator tl;
02100 tl.transport_type = "rtps_udp";
02101 message_block_to_sequence (mb_locator, tl.data);
02102 rTls->length(1);
02103 (*rTls)[0] = tl;
02104 } else {
02105 ACE_DEBUG((LM_WARNING,
02106 ACE_TEXT("(%P|%t) Sedp::match - ")
02107 ACE_TEXT("remote reader found with no locators ")
02108 ACE_TEXT("and no default locators\n")));
02109 }
02110 }
02111 }
02112
02113 void
02114 Sedp::populate_transport_locator_sequence(DCPS::TransportLocatorSeq*& wTls,
02115 DiscoveredPublicationIter& ,
02116 const DCPS::RepoId& writer)
02117 {
02118 OpenDDS::DCPS::LocatorSeq locs;
02119 bool participantExpectsInlineQos = false;
02120 RepoId remote_participant = writer;
02121 remote_participant.entityId = ENTITYID_PARTICIPANT;
02122 const bool participant_found =
02123 spdp_.get_default_locators(remote_participant, locs,
02124 participantExpectsInlineQos);
02125 if (!wTls->length()) {
02126 if (!participant_found) {
02127 defer_match_endpoints_.insert(writer);
02128 return;
02129 } else if (locs.length()) {
02130 size_t size = 0, padding = 0;
02131 DCPS::gen_find_size(locs, size, padding);
02132
02133 ACE_Message_Block mb_locator(size + 1);
02134 using DCPS::Serializer;
02135 Serializer ser_loc(&mb_locator,
02136 ACE_CDR_BYTE_ORDER,
02137 Serializer::ALIGN_CDR);
02138 ser_loc << locs;
02139 ser_loc << ACE_OutputCDR::from_boolean(participantExpectsInlineQos);
02140
02141 DCPS::TransportLocator tl;
02142 tl.transport_type = "rtps_udp";
02143 message_block_to_sequence (mb_locator, tl.data);
02144 wTls->length(1);
02145 (*wTls)[0] = tl;
02146 } else {
02147 ACE_DEBUG((LM_WARNING,
02148 ACE_TEXT("(%P|%t) Sedp::match - ")
02149 ACE_TEXT("remote writer found with no locators ")
02150 ACE_TEXT("and no default locators\n")));
02151 }
02152 }
02153 }
02154
02155 bool
02156 Sedp::defer_writer(const RepoId& writer,
02157 const RepoId& writer_participant)
02158 {
02159 if (!associated_participants_.count(writer_participant)) {
02160 if (DCPS::DCPS_debug_level > 3) {
02161 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) Sedp::match - ")
02162 ACE_TEXT("remote writer deferred\n")));
02163 }
02164 defer_match_endpoints_.insert(writer);
02165 return true;
02166 }
02167 return false;
02168 }
02169
02170 bool
02171 Sedp::defer_reader(const RepoId& reader,
02172 const RepoId& reader_participant)
02173 {
02174 if (!associated_participants_.count(reader_participant)) {
02175 if (DCPS::DCPS_debug_level > 3) {
02176 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) Sedp::match - ")
02177 ACE_TEXT("remote reader deferred\n")));
02178 }
02179 defer_match_endpoints_.insert(reader);
02180 return true;
02181 }
02182 return false;
02183 }
02184
02185 WaitForAcks::WaitForAcks()
02186 : cond_(lock_)
02187 , acks_(0)
02188 {
02189 }
02190
02191 void
02192 WaitForAcks::ack()
02193 {
02194 {
02195 ACE_GUARD(ACE_Thread_Mutex, g, lock_);
02196 ++acks_;
02197 }
02198 cond_.signal();
02199 }
02200
02201 void
02202 WaitForAcks::wait_for_acks(unsigned int num_acks)
02203 {
02204 ACE_GUARD(ACE_Thread_Mutex, g, lock_);
02205 while (num_acks > acks_) {
02206 cond_.wait();
02207 }
02208 }
02209
02210 void
02211 WaitForAcks::reset()
02212 {
02213 ACE_GUARD(ACE_Thread_Mutex, g, lock_);
02214 acks_ = 0;
02215
02216
02217 }
02218
02219 }
02220 }