Sedp.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "Sedp.h"
00009 #include "Spdp.h"
00010 #include "MessageTypes.h"
00011 #include "RtpsDiscovery.h"
00012 #include "RtpsCoreTypeSupportImpl.h"
00013 #include "ParameterListConverter.h"
00014 
00015 #include "dds/DCPS/transport/framework/ReceivedDataSample.h"
00016 #include "dds/DCPS/transport/rtps_udp/RtpsUdpInst.h"
00017 #include "dds/DCPS/transport/rtps_udp/RtpsUdpInst_rch.h"
00018 
00019 #include "dds/DCPS/Serializer.h"
00020 #include "dds/DCPS/Definitions.h"
00021 #include "dds/DCPS/GuidConverter.h"
00022 #include "dds/DCPS/GuidUtils.h"
00023 #include "dds/DdsDcpsGuidTypeSupportImpl.h"
00024 #include "dds/DCPS/AssociationData.h"
00025 #include "dds/DCPS/Service_Participant.h"
00026 #include "dds/DCPS/Qos_Helper.h"
00027 #include "dds/DCPS/DataSampleHeader.h"
00028 #include "dds/DCPS/SendStateDataSampleList.h"
00029 #include "dds/DCPS/DataReaderCallbacks.h"
00030 #include "dds/DCPS/DataWriterCallbacks.h"
00031 #include "dds/DCPS/Marked_Default_Qos.h"
00032 #include "dds/DCPS/BuiltInTopicUtils.h"
00033 #include "dds/DCPS/DCPS_Utils.h"
00034 #include "dds/DCPS/transport/framework/NetworkAddress.h"
00035 
00036 #include <ace/Reverse_Lock_T.h>
00037 #include <ace/Auto_Ptr.h>
00038 
00039 #include <cstring>
00040 
00041 namespace {
00042 bool qosChanged(DDS::PublicationBuiltinTopicData& dest,
00043                 const DDS::PublicationBuiltinTopicData& src)
00044 {
00045 #ifndef OPENDDS_SAFETY_PROFILE
00046   using OpenDDS::DCPS::operator!=;
00047 #endif
00048   bool changed = false;
00049 
00050   // check each Changeable QoS policy value in Publication BIT Data
00051 
00052   if (dest.deadline != src.deadline) {
00053     changed = true;
00054     dest.deadline = src.deadline;
00055   }
00056 
00057   if (dest.latency_budget != src.latency_budget) {
00058     changed = true;
00059     dest.latency_budget = src.latency_budget;
00060   }
00061 
00062   if (dest.lifespan != src.lifespan) {
00063     changed = true;
00064     dest.lifespan = src.lifespan;
00065   }
00066 
00067   if (dest.user_data != src.user_data) {
00068     changed = true;
00069     dest.user_data = src.user_data;
00070   }
00071 
00072   if (dest.ownership_strength != src.ownership_strength) {
00073     changed = true;
00074     dest.ownership_strength = src.ownership_strength;
00075   }
00076 
00077   if (dest.partition != src.partition) {
00078     changed = true;
00079     dest.partition = src.partition;
00080   }
00081 
00082   if (dest.topic_data != src.topic_data) {
00083     changed = true;
00084     dest.topic_data = src.topic_data;
00085   }
00086 
00087   if (dest.group_data != src.group_data) {
00088     changed = true;
00089     dest.group_data = src.group_data;
00090   }
00091 
00092   return changed;
00093 }
00094 
00095 bool qosChanged(DDS::SubscriptionBuiltinTopicData& dest,
00096                 const DDS::SubscriptionBuiltinTopicData& src)
00097 {
00098 #ifndef OPENDDS_SAFETY_PROFILE
00099   using OpenDDS::DCPS::operator!=;
00100 #endif
00101   bool changed = false;
00102 
00103   // check each Changeable QoS policy value in Subcription BIT Data
00104 
00105   if (dest.deadline != src.deadline) {
00106     changed = true;
00107     dest.deadline = src.deadline;
00108   }
00109 
00110   if (dest.latency_budget != src.latency_budget) {
00111     changed = true;
00112     dest.latency_budget = src.latency_budget;
00113   }
00114 
00115   if (dest.user_data != src.user_data) {
00116     changed = true;
00117     dest.user_data = src.user_data;
00118   }
00119 
00120   if (dest.time_based_filter != src.time_based_filter) {
00121     changed = true;
00122     dest.time_based_filter = src.time_based_filter;
00123   }
00124 
00125   if (dest.partition != src.partition) {
00126     changed = true;
00127     dest.partition = src.partition;
00128   }
00129 
00130   if (dest.topic_data != src.topic_data) {
00131     changed = true;
00132     dest.topic_data = src.topic_data;
00133   }
00134 
00135   if (dest.group_data != src.group_data) {
00136     changed = true;
00137     dest.group_data = src.group_data;
00138   }
00139 
00140   return changed;
00141 }
00142 
00143 bool paramsChanged(OpenDDS::DCPS::ContentFilterProperty_t& dest,
00144                    const OpenDDS::DCPS::ContentFilterProperty_t& src)
00145 {
00146   if (dest.expressionParameters.length() != src.expressionParameters.length()) {
00147     dest.expressionParameters = src.expressionParameters;
00148     return true;
00149   }
00150   for (CORBA::ULong i = 0; i < src.expressionParameters.length(); ++i) {
00151     if (0 != std::strcmp(dest.expressionParameters[i],
00152                          src.expressionParameters[i])) {
00153       dest.expressionParameters = src.expressionParameters;
00154       return true;
00155     }
00156   }
00157   return false;
00158 }
00159 
00160 }
00161 
00162 namespace OpenDDS {
00163 namespace RTPS {
00164 using DCPS::RepoId;
00165 
00166 const bool Sedp::host_is_bigendian_(!ACE_CDR_BYTE_ORDER);
00167 
00168 Sedp::Sedp(const RepoId& participant_id, Spdp& owner, ACE_Thread_Mutex& lock)
00169   : OpenDDS::DCPS::EndpointManager<SPDPdiscoveredParticipantData>(participant_id, lock)
00170   , spdp_(owner)
00171   , publications_writer_(make_id(participant_id,
00172                                  ENTITYID_SEDP_BUILTIN_PUBLICATIONS_WRITER),
00173                          *this)
00174   , subscriptions_writer_(make_id(participant_id,
00175                                   ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_WRITER),
00176                           *this)
00177   , participant_message_writer_(make_id(participant_id,
00178                                         ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_WRITER),
00179                         *this)
00180   , publications_reader_(new Reader(make_id(participant_id,
00181                                             ENTITYID_SEDP_BUILTIN_PUBLICATIONS_READER),
00182                                     *this))
00183   , subscriptions_reader_(new Reader(make_id(participant_id,
00184                                              ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_READER),
00185                                      *this))
00186   , participant_message_reader_(new Reader(make_id(participant_id,
00187                                                    ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_READER),
00188                                            *this))
00189   , task_(this)
00190   , automatic_liveliness_seq_ (DCPS::SequenceNumber::SEQUENCENUMBER_UNKNOWN())
00191   , manual_liveliness_seq_ (DCPS::SequenceNumber::SEQUENCENUMBER_UNKNOWN())
00192 {
00193   pub_bit_key_.value[0] = pub_bit_key_.value[1] = pub_bit_key_.value[2] = 0;
00194   sub_bit_key_.value[0] = sub_bit_key_.value[1] = sub_bit_key_.value[2] = 0;
00195 }
00196 
00197 RepoId
00198 Sedp::make_id(const RepoId& participant_id, const EntityId_t& entity)
00199 {
00200   RepoId id = participant_id;
00201   id.entityId = entity;
00202   return id;
00203 }
00204 
00205 DDS::ReturnCode_t
00206 Sedp::init(const RepoId& guid, const RtpsDiscovery& disco,
00207            DDS::DomainId_t domainId)
00208 {
00209   char domainStr[16];
00210   ACE_OS::snprintf(domainStr, 16, "%d", domainId);
00211 
00212   OPENDDS_STRING key = OpenDDS::DCPS::GuidConverter(guid).uniqueId();
00213 
00214   // configure one transport
00215   transport_inst_ = TheTransportRegistry->create_inst(
00216                        DCPS::TransportRegistry::DEFAULT_INST_PREFIX +
00217                        OPENDDS_STRING("_SEDPTransportInst_") + key.c_str() + domainStr,
00218                        "rtps_udp");
00219   // Use a static cast to avoid dependency on the RtpsUdp library
00220   DCPS::RtpsUdpInst_rch rtps_inst =
00221       DCPS::static_rchandle_cast<DCPS::RtpsUdpInst>(transport_inst_);
00222   // The SEDP endpoints may need to wait at least one resend period before
00223   // the handshake completes (allows time for our SPDP multicast to be
00224   // received by the other side).  Arbitrary constant of 5 to account for
00225   // possible network lossiness.
00226   static const double HANDSHAKE_MULTIPLIER = 5;
00227   rtps_inst->handshake_timeout_ = disco.resend_period() * HANDSHAKE_MULTIPLIER;
00228 
00229   if (disco.sedp_multicast()) {
00230     // Bind to a specific multicast group
00231     const u_short mc_port = disco.pb() + disco.dg() * domainId + disco.dx();
00232 
00233     OPENDDS_STRING mc_addr = disco.default_multicast_group();
00234     if (rtps_inst->multicast_group_address_.set(mc_port, mc_addr.c_str())) {
00235       ACE_DEBUG((LM_ERROR,
00236                  ACE_TEXT("(%P|%t) ERROR: Sedp::init - ")
00237                  ACE_TEXT("failed setting multicast local_addr to port %hd\n"),
00238                           mc_port));
00239       return DDS::RETCODE_ERROR;
00240     }
00241 
00242     rtps_inst->ttl_ = disco.ttl();
00243     rtps_inst->multicast_interface_ = disco.multicast_interface();
00244 
00245   } else {
00246     rtps_inst->use_multicast_ = false;
00247   }
00248 
00249   const OPENDDS_STRING sedp_addr = disco.sedp_local_address();
00250   if (!sedp_addr.empty()) {
00251     rtps_inst->local_address_config_str_ = sedp_addr;
00252     rtps_inst->local_address_.set(sedp_addr.c_str());
00253   }
00254 
00255   // Create a config
00256   OPENDDS_STRING config_name = DCPS::TransportRegistry::DEFAULT_INST_PREFIX +
00257                             OPENDDS_STRING("_SEDP_TransportCfg_") + key +
00258                             domainStr;
00259   DCPS::TransportConfig_rch transport_cfg =
00260     TheTransportRegistry->create_config(config_name.c_str());
00261   transport_cfg->instances_.push_back(transport_inst_);
00262 
00263   // Configure and enable each reader/writer
00264   rtps_inst->opendds_discovery_default_listener_ = publications_reader_.in();
00265   rtps_inst->opendds_discovery_guid_ = guid;
00266   const bool reliability = true, durability = true;
00267   publications_writer_.enable_transport_using_config(reliability, durability,
00268                                                      transport_cfg);
00269   publications_reader_->enable_transport_using_config(reliability, durability,
00270                                                       transport_cfg);
00271   subscriptions_writer_.enable_transport_using_config(reliability, durability,
00272                                                       transport_cfg);
00273   subscriptions_reader_->enable_transport_using_config(reliability, durability,
00274                                                        transport_cfg);
00275   participant_message_writer_.enable_transport_using_config(reliability, durability,
00276                                                             transport_cfg);
00277   participant_message_reader_->enable_transport_using_config(reliability, durability,
00278                                                              transport_cfg);
00279   return DDS::RETCODE_OK;
00280 }
00281 
00282 void
00283 Sedp::unicast_locators(OpenDDS::DCPS::LocatorSeq& locators) const
00284 {
00285   DCPS::RtpsUdpInst_rch rtps_inst =
00286     DCPS::static_rchandle_cast<DCPS::RtpsUdpInst>(transport_inst_);
00287   using namespace OpenDDS::RTPS;
00288 
00289   CORBA::ULong idx = 0;
00290 
00291   // multicast first so it's preferred by remote peers
00292   if (rtps_inst->use_multicast_ && rtps_inst->multicast_group_address_ != ACE_INET_Addr()) {
00293     idx = locators.length();
00294     locators.length(idx + 1);
00295     locators[idx].kind = address_to_kind(rtps_inst->multicast_group_address_);
00296     locators[idx].port = rtps_inst->multicast_group_address_.get_port_number();
00297     RTPS::address_to_bytes(locators[idx].address,
00298       rtps_inst->multicast_group_address_);
00299   }
00300 
00301   //if local_address_string is empty, or only the port has been set
00302   //need to get interface addresses to populate into the locator
00303   if (rtps_inst->local_address_config_str_.empty() ||
00304       rtps_inst->local_address_config_str_.rfind(':') == 0) {
00305     typedef OPENDDS_VECTOR(ACE_INET_Addr) AddrVector;
00306     AddrVector addrs;
00307     if (TheServiceParticipant->default_address ().empty ()) {
00308       OpenDDS::DCPS::get_interface_addrs(addrs);
00309     } else {
00310       addrs.push_back (ACE_INET_Addr (static_cast<u_short> (0), TheServiceParticipant->default_address ().c_str ()));
00311     }
00312     for (AddrVector::iterator adr_it = addrs.begin(); adr_it != addrs.end(); ++adr_it) {
00313       idx = locators.length();
00314       locators.length(idx + 1);
00315       locators[idx].kind = address_to_kind(*adr_it);
00316       locators[idx].port = rtps_inst->local_address_.get_port_number();
00317       RTPS::address_to_bytes(locators[idx].address,
00318         *adr_it);
00319     }
00320   } else {
00321     idx = locators.length();
00322     locators.length(idx + 1);
00323     locators[idx].kind = address_to_kind(rtps_inst->local_address_);
00324     locators[idx].port = rtps_inst->local_address_.get_port_number();
00325     RTPS::address_to_bytes(locators[idx].address,
00326       rtps_inst->local_address_);
00327   }
00328 }
00329 
00330 const ACE_INET_Addr&
00331 Sedp::local_address() const
00332 {
00333   DCPS::RtpsUdpInst_rch rtps_inst =
00334       DCPS::static_rchandle_cast<DCPS::RtpsUdpInst>(transport_inst_);
00335   return rtps_inst->local_address_;
00336 }
00337 
00338 const ACE_INET_Addr&
00339 Sedp::multicast_group() const
00340 {
00341   DCPS::RtpsUdpInst_rch rtps_inst =
00342       DCPS::static_rchandle_cast<DCPS::RtpsUdpInst>(transport_inst_);
00343   return rtps_inst->multicast_group_address_;
00344 }
00345 bool
00346 Sedp::map_ipv4_to_ipv6() const
00347 {
00348   bool map = false;
00349   if (local_address().get_type() != AF_INET) {
00350     map = true;
00351   }
00352   return map;
00353 }
00354 
00355 void
00356 Sedp::assign_bit_key(DiscoveredPublication& pub)
00357 {
00358   increment_key(pub_bit_key_);
00359   pub_key_to_id_[pub_bit_key_] = pub.writer_data_.writerProxy.remoteWriterGuid;
00360   pub.writer_data_.ddsPublicationData.key = pub_bit_key_;
00361 }
00362 
00363 void
00364 Sedp::assign_bit_key(DiscoveredSubscription& sub)
00365 {
00366   increment_key(sub_bit_key_);
00367   sub_key_to_id_[sub_bit_key_] = sub.reader_data_.readerProxy.remoteReaderGuid;
00368   sub.reader_data_.ddsSubscriptionData.key = sub_bit_key_;
00369 }
00370 
00371 void
00372 create_association_data_proto(DCPS::AssociationData& proto,
00373                               const SPDPdiscoveredParticipantData& pdata) {
00374   proto.publication_transport_priority_ = 0;
00375   proto.remote_reliable_ = true;
00376   proto.remote_durable_ = true;
00377   std::memcpy(proto.remote_id_.guidPrefix, pdata.participantProxy.guidPrefix,
00378               sizeof(GuidPrefix_t));
00379 
00380   const OpenDDS::DCPS::LocatorSeq& mll =
00381     pdata.participantProxy.metatrafficMulticastLocatorList;
00382   const OpenDDS::DCPS::LocatorSeq& ull =
00383     pdata.participantProxy.metatrafficUnicastLocatorList;
00384   const CORBA::ULong locator_count = mll.length() + ull.length();
00385 
00386   ACE_Message_Block mb_locator(4 + locator_count * sizeof(OpenDDS::DCPS::Locator_t) + 1);
00387   using DCPS::Serializer;
00388   Serializer ser_loc(&mb_locator, ACE_CDR_BYTE_ORDER, Serializer::ALIGN_CDR);
00389   ser_loc << locator_count;
00390 
00391   for (CORBA::ULong i = 0; i < mll.length(); ++i) {
00392     ser_loc << mll[i];
00393   }
00394   for (CORBA::ULong i = 0; i < ull.length(); ++i) {
00395     ser_loc << ull[i];
00396   }
00397   ser_loc << ACE_OutputCDR::from_boolean(false); // requires_inline_qos
00398 
00399   proto.remote_data_.length(1);
00400   proto.remote_data_[0].transport_type = "rtps_udp";
00401   message_block_to_sequence (mb_locator, proto.remote_data_[0].data);
00402 }
00403 
00404 void
00405 Sedp::associate(const SPDPdiscoveredParticipantData& pdata)
00406 {
00407   // First create a 'prototypical' instance of AssociationData.  It will
00408   // be copied and modified for each of the (up to) four SEDP Endpoints.
00409   DCPS::AssociationData proto;
00410   create_association_data_proto(proto, pdata);
00411 
00412   const BuiltinEndpointSet_t& avail =
00413     pdata.participantProxy.availableBuiltinEndpoints;
00414 
00415   // See RTPS v2.1 section 8.5.5.1
00416   if (avail & DISC_BUILTIN_ENDPOINT_PUBLICATION_ANNOUNCER) {
00417     DCPS::AssociationData peer = proto;
00418     peer.remote_id_.entityId = ENTITYID_SEDP_BUILTIN_PUBLICATIONS_WRITER;
00419     publications_reader_->assoc(peer);
00420   }
00421   if (avail & DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_ANNOUNCER) {
00422     DCPS::AssociationData peer = proto;
00423     peer.remote_id_.entityId = ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_WRITER;
00424     subscriptions_reader_->assoc(peer);
00425   }
00426   if (avail & BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_DATA_WRITER) {
00427     DCPS::AssociationData peer = proto;
00428     peer.remote_id_.entityId = ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_WRITER;
00429     participant_message_reader_->assoc(peer);
00430   }
00431 
00432   SPDPdiscoveredParticipantData* dpd =
00433     new SPDPdiscoveredParticipantData(pdata);
00434   task_.enqueue(dpd);
00435 }
00436 
00437 void
00438 Sedp::Task::svc_i(const SPDPdiscoveredParticipantData* ppdata)
00439 {
00440   ACE_Auto_Basic_Ptr<const SPDPdiscoveredParticipantData> delete_the_data(ppdata);
00441   const SPDPdiscoveredParticipantData& pdata = *ppdata;
00442   // First create a 'prototypical' instance of AssociationData.  It will
00443   // be copied and modified for each of the (up to) four SEDP Endpoints.
00444   DCPS::AssociationData proto;
00445   create_association_data_proto(proto, pdata);
00446 
00447   const BuiltinEndpointSet_t& avail =
00448     pdata.participantProxy.availableBuiltinEndpoints;
00449 
00450   // See RTPS v2.1 section 8.5.5.1
00451   if (avail & DISC_BUILTIN_ENDPOINT_PUBLICATION_DETECTOR) {
00452     DCPS::AssociationData peer = proto;
00453     peer.remote_id_.entityId = ENTITYID_SEDP_BUILTIN_PUBLICATIONS_READER;
00454     sedp_->publications_writer_.assoc(peer);
00455   }
00456   if (avail & DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_DETECTOR) {
00457     DCPS::AssociationData peer = proto;
00458     peer.remote_id_.entityId = ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_READER;
00459     sedp_->subscriptions_writer_.assoc(peer);
00460   }
00461   if (avail & BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_DATA_READER) {
00462     DCPS::AssociationData peer = proto;
00463     peer.remote_id_.entityId = ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_READER;
00464     sedp_->participant_message_writer_.assoc(peer);
00465   }
00466 
00467   //FUTURE: if/when topic propagation is supported, add it here
00468 
00469   // Process deferred publications and subscriptions.
00470   for (DeferredSubscriptionMap::iterator pos = sedp_->deferred_subscriptions_.lower_bound (proto.remote_id_),
00471          limit = sedp_->deferred_subscriptions_.upper_bound (proto.remote_id_);
00472        pos != limit;
00473        /* Increment in body. */) {
00474     sedp_->data_received (pos->second.first, pos->second.second);
00475     sedp_->deferred_subscriptions_.erase (pos++);
00476   }
00477   for (DeferredPublicationMap::iterator pos = sedp_->deferred_publications_.lower_bound (proto.remote_id_),
00478          limit = sedp_->deferred_publications_.upper_bound (proto.remote_id_);
00479        pos != limit;
00480        /* Increment in body. */) {
00481     sedp_->data_received (pos->second.first, pos->second.second);
00482     sedp_->deferred_publications_.erase (pos++);
00483   }
00484 
00485   ACE_GUARD(ACE_Thread_Mutex, g, sedp_->lock_);
00486   if (spdp_->shutting_down()) { return; }
00487 
00488   proto.remote_id_.entityId = ENTITYID_PARTICIPANT;
00489   sedp_->associated_participants_.insert(proto.remote_id_);
00490 
00491   // Write durable data
00492   if (avail & DISC_BUILTIN_ENDPOINT_PUBLICATION_DETECTOR) {
00493     proto.remote_id_.entityId = ENTITYID_SEDP_BUILTIN_PUBLICATIONS_READER;
00494     sedp_->write_durable_publication_data(proto.remote_id_);
00495   }
00496   if (avail & DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_DETECTOR) {
00497     proto.remote_id_.entityId = ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_READER;
00498     sedp_->write_durable_subscription_data(proto.remote_id_);
00499   }
00500   if (avail & BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_DATA_READER) {
00501     proto.remote_id_.entityId = ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_READER;
00502     sedp_->write_durable_participant_message_data(proto.remote_id_);
00503   }
00504 
00505   for (DCPS::RepoIdSet::iterator it = sedp_->defer_match_endpoints_.begin();
00506        it != sedp_->defer_match_endpoints_.end(); /*incremented in body*/) {
00507     if (0 == std::memcmp(it->guidPrefix, proto.remote_id_.guidPrefix,
00508                          sizeof(GuidPrefix_t))) {
00509       OPENDDS_STRING topic;
00510       if (it->entityId.entityKind & 4) {
00511         DiscoveredSubscriptionIter dsi =
00512           sedp_->discovered_subscriptions_.find(*it);
00513         if (dsi != sedp_->discovered_subscriptions_.end()) {
00514           topic = dsi->second.reader_data_.ddsSubscriptionData.topic_name;
00515         }
00516       } else {
00517         DiscoveredPublicationIter dpi =
00518           sedp_->discovered_publications_.find(*it);
00519         if (dpi != sedp_->discovered_publications_.end()) {
00520           topic = dpi->second.writer_data_.ddsPublicationData.topic_name;
00521         }
00522       }
00523       if (DCPS::DCPS_debug_level > 3) {
00524         ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) Sedp::AssociateTask::svc - ")
00525           ACE_TEXT("processing deferred endpoints for topic %C\n"),
00526           topic.c_str()));
00527       }
00528       if (!topic.empty()) {
00529         OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator ti =
00530           sedp_->topics_.find(topic);
00531         if (ti != sedp_->topics_.end()) {
00532           if (DCPS::DCPS_debug_level > 3) {
00533             DCPS::GuidConverter conv(*it);
00534             ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) Sedp::AssociateTask::svc - ")
00535               ACE_TEXT("calling match_endpoints %C\n"),
00536               OPENDDS_STRING(conv).c_str()));
00537           }
00538           sedp_->match_endpoints(*it, ti->second);
00539           if (spdp_->shutting_down()) { return; }
00540         }
00541       }
00542       sedp_->defer_match_endpoints_.erase(it++);
00543     } else {
00544       ++it;
00545     }
00546   }
00547 }
00548 
00549 bool
00550 Sedp::disassociate(const SPDPdiscoveredParticipantData& pdata)
00551 {
00552   RepoId part;
00553   std::memcpy(part.guidPrefix, pdata.participantProxy.guidPrefix,
00554               sizeof(GuidPrefix_t));
00555   part.entityId = ENTITYID_PARTICIPANT;
00556   associated_participants_.erase(part);
00557   const BuiltinEndpointSet_t avail =
00558     pdata.participantProxy.availableBuiltinEndpoints;
00559 
00560   { // Release lock, so we can call into transport
00561     ACE_Reverse_Lock<ACE_Thread_Mutex> rev_lock(lock_);
00562     ACE_GUARD_RETURN(ACE_Reverse_Lock< ACE_Thread_Mutex>, rg, rev_lock, false);
00563 
00564     // See RTPS v2.1 section 8.5.5.2
00565     if (avail & DISC_BUILTIN_ENDPOINT_PUBLICATION_DETECTOR) {
00566       RepoId id = part;
00567       id.entityId = ENTITYID_SEDP_BUILTIN_PUBLICATIONS_READER;
00568       publications_writer_.disassociate(id);
00569     }
00570     if (avail & DISC_BUILTIN_ENDPOINT_PUBLICATION_ANNOUNCER) {
00571       RepoId id = part;
00572       id.entityId = ENTITYID_SEDP_BUILTIN_PUBLICATIONS_WRITER;
00573       publications_reader_->disassociate(id);
00574     }
00575     if (avail & DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_DETECTOR) {
00576       RepoId id = part;
00577       id.entityId = ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_READER;
00578       subscriptions_writer_.disassociate(id);
00579     }
00580     if (avail & DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_ANNOUNCER) {
00581       RepoId id = part;
00582       id.entityId = ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_WRITER;
00583       subscriptions_reader_->disassociate(id);
00584     }
00585     if (avail & BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_DATA_READER) {
00586       RepoId id = part;
00587       id.entityId = ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_READER;
00588       participant_message_writer_.disassociate(id);
00589     }
00590     if (avail & BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_DATA_WRITER) {
00591       RepoId id = part;
00592       id.entityId = ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_WRITER;
00593       participant_message_reader_->disassociate(id);
00594     }
00595     //FUTURE: if/when topic propagation is supported, add it here
00596   }
00597   if (spdp_.has_discovered_participant(part)) {
00598     remove_entities_belonging_to(discovered_publications_, part);
00599     remove_entities_belonging_to(discovered_subscriptions_, part);
00600     return true;
00601   } else {
00602     return false;
00603   }
00604 }
00605 
00606 template<typename Map>
00607 void
00608 Sedp::remove_entities_belonging_to(Map& m, RepoId participant)
00609 {
00610   participant.entityId.entityKey[0] = 0;
00611   participant.entityId.entityKey[1] = 0;
00612   participant.entityId.entityKey[2] = 0;
00613   participant.entityId.entityKind = 0;
00614   for (typename Map::iterator i = m.lower_bound(participant);
00615        i != m.end() && 0 == std::memcmp(i->first.guidPrefix,
00616                                         participant.guidPrefix,
00617                                         sizeof(GuidPrefix_t));) {
00618     OPENDDS_STRING topic_name = get_topic_name(i->second);
00619     OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it =
00620       topics_.find(topic_name);
00621     if (top_it != topics_.end()) {
00622       top_it->second.endpoints_.erase(i->first);
00623       if (DCPS::DCPS_debug_level > 3) {
00624         ACE_DEBUG((LM_DEBUG,
00625                    ACE_TEXT("(%P|%t) Sedp::remove_entities_belonging_to - ")
00626                    ACE_TEXT("calling match_endpoints remove\n")));
00627       }
00628       match_endpoints(i->first, top_it->second, true /*remove*/);
00629       if (spdp_.shutting_down()) { return; }
00630     }
00631     remove_from_bit(i->second);
00632     m.erase(i++);
00633   }
00634 }
00635 
00636 void
00637 Sedp::remove_from_bit_i(const DiscoveredPublication& pub)
00638 {
00639 #ifndef DDS_HAS_MINIMUM_BIT
00640   task_.enqueue(Msg::MSG_REMOVE_FROM_PUB_BIT, pub.bit_ih_);
00641 #else
00642   ACE_UNUSED_ARG(pub);
00643 #endif /* DDS_HAS_MINIMUM_BIT */
00644 }
00645 
00646 void
00647 Sedp::remove_from_bit_i(const DiscoveredSubscription& sub)
00648 {
00649 #ifndef DDS_HAS_MINIMUM_BIT
00650   task_.enqueue(Msg::MSG_REMOVE_FROM_SUB_BIT, sub.bit_ih_);
00651 #else
00652   ACE_UNUSED_ARG(sub);
00653 #endif /* DDS_HAS_MINIMUM_BIT */
00654 }
00655 
00656 void
00657 Sedp::Task::svc_i(Msg::MsgType which_bit, const DDS::InstanceHandle_t bit_ih)
00658 {
00659 #ifndef DDS_HAS_MINIMUM_BIT
00660   switch (which_bit) {
00661   case Msg::MSG_REMOVE_FROM_PUB_BIT: {
00662     DDS::PublicationBuiltinTopicDataDataReaderImpl* bit = sedp_->pub_bit();
00663     // bit may be null if the DomainParticipant is shutting down
00664     if (bit && bit_ih != DDS::HANDLE_NIL) {
00665       bit->set_instance_state(bit_ih,
00666                               DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE);
00667     }
00668     break;
00669   }
00670   case Msg::MSG_REMOVE_FROM_SUB_BIT: {
00671     DDS::SubscriptionBuiltinTopicDataDataReaderImpl* bit = sedp_->sub_bit();
00672     // bit may be null if the DomainParticipant is shutting down
00673     if (bit && bit_ih != DDS::HANDLE_NIL) {
00674       bit->set_instance_state(bit_ih,
00675                               DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE);
00676     }
00677     break;
00678   }
00679   default:
00680     break;
00681   }
00682 #else
00683   ACE_UNUSED_ARG(which_bit);
00684   ACE_UNUSED_ARG(bit_ih);
00685 #endif /* DDS_HAS_MINIMUM_BIT */
00686 }
00687 
00688 #ifndef DDS_HAS_MINIMUM_BIT
00689 DDS::TopicBuiltinTopicDataDataReaderImpl*
00690 Sedp::topic_bit()
00691 {
00692   DDS::Subscriber_var sub = spdp_.bit_subscriber();
00693   if (!sub.in())
00694     return 0;
00695 
00696   DDS::DataReader_var d =
00697     sub->lookup_datareader(DCPS::BUILT_IN_TOPIC_TOPIC);
00698   return dynamic_cast<DDS::TopicBuiltinTopicDataDataReaderImpl*>(d.in());
00699 }
00700 
00701 DDS::PublicationBuiltinTopicDataDataReaderImpl*
00702 Sedp::pub_bit()
00703 {
00704   DDS::Subscriber_var sub = spdp_.bit_subscriber();
00705   if (!sub.in())
00706     return 0;
00707 
00708   DDS::DataReader_var d =
00709     sub->lookup_datareader(DCPS::BUILT_IN_PUBLICATION_TOPIC);
00710   return dynamic_cast<DDS::PublicationBuiltinTopicDataDataReaderImpl*>(d.in());
00711 }
00712 
00713 DDS::SubscriptionBuiltinTopicDataDataReaderImpl*
00714 Sedp::sub_bit()
00715 {
00716   DDS::Subscriber_var sub = spdp_.bit_subscriber();
00717   if (!sub.in())
00718     return 0;
00719 
00720   DDS::DataReader_var d =
00721     sub->lookup_datareader(DCPS::BUILT_IN_SUBSCRIPTION_TOPIC);
00722   return dynamic_cast<DDS::SubscriptionBuiltinTopicDataDataReaderImpl*>(d.in());
00723 }
00724 #endif /* DDS_HAS_MINIMUM_BIT */
00725 
00726 bool
00727 Sedp::update_topic_qos(const RepoId& topicId, const DDS::TopicQos& qos,
00728                        OPENDDS_STRING& name)
00729 {
00730   ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, false);
00731   OPENDDS_MAP_CMP(RepoId, OPENDDS_STRING, DCPS::GUID_tKeyLessThan)::iterator iter =
00732     topic_names_.find(topicId);
00733   if (iter == topic_names_.end()) {
00734     return false;
00735   }
00736   name = iter->second;
00737   TopicDetails& topic = topics_[name];
00738   using namespace DCPS;
00739   // If the TOPIC_DATA QoS changed our local endpoints must be resent
00740   // with new QoS
00741   if (qos.topic_data != topic.qos_.topic_data) {
00742     topic.qos_ = qos;
00743     // For each endpoint associated on this topic
00744     for (RepoIdSet::iterator topic_endpoints = topic.endpoints_.begin();
00745          topic_endpoints != topic.endpoints_.end(); ++topic_endpoints) {
00746 
00747       const RepoId& rid = *topic_endpoints;
00748       EntityKind kind = GuidConverter(rid).entityKind();
00749       if (KIND_WRITER == kind) {
00750         // This may be our local publication, verify
00751         LocalPublicationIter lp = local_publications_.find(rid);
00752         if (lp != local_publications_.end()) {
00753           write_publication_data(rid, lp->second);
00754         }
00755       } else if (KIND_READER == kind) {
00756         // This may be our local subscription, verify
00757         LocalSubscriptionIter ls = local_subscriptions_.find(rid);
00758         if (ls != local_subscriptions_.end()) {
00759           write_subscription_data(rid, ls->second);
00760         }
00761       }
00762     }
00763   }
00764 
00765   return true;
00766 }
00767 
00768 void
00769 Sedp::inconsistent_topic(const DCPS::RepoIdSet& eps) const
00770 {
00771   using DCPS::RepoIdSet;
00772   for (RepoIdSet::const_iterator iter(eps.begin()); iter != eps.end(); ++iter) {
00773     if (0 == std::memcmp(participant_id_.guidPrefix, iter->guidPrefix,
00774                          sizeof(GuidPrefix_t))) {
00775       const bool reader = iter->entityId.entityKind & 4;
00776       if (reader) {
00777         const LocalSubscriptionCIter lsi = local_subscriptions_.find(*iter);
00778         if (lsi != local_subscriptions_.end()) {
00779           lsi->second.subscription_->inconsistent_topic();
00780           // Only make one callback per inconsistent topic, even if we have
00781           // more than one reader/writer on the topic -- it's the Topic object
00782           // that will actually see the InconsistentTopicStatus change.
00783           return;
00784         }
00785       } else {
00786         const LocalPublicationCIter lpi = local_publications_.find(*iter);
00787         if (lpi != local_publications_.end()) {
00788           lpi->second.publication_->inconsistent_topic();
00789           return; // see comment above
00790         }
00791       }
00792     }
00793   }
00794 }
00795 
00796 DDS::ReturnCode_t
00797 Sedp::remove_publication_i(const RepoId& publicationId)
00798 {
00799   return publications_writer_.write_unregister_dispose(publicationId);
00800 }
00801 
00802 bool
00803 Sedp::update_publication_qos(const RepoId& publicationId,
00804                              const DDS::DataWriterQos& qos,
00805                              const DDS::PublisherQos& publisherQos)
00806 {
00807   ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, false);
00808   LocalPublicationIter iter = local_publications_.find(publicationId);
00809   if (iter != local_publications_.end()) {
00810     LocalPublication& pb = iter->second;
00811     pb.qos_ = qos;
00812     pb.publisher_qos_ = publisherQos;
00813 
00814     if (DDS::RETCODE_OK != write_publication_data(publicationId, pb)) {
00815       return false;
00816     }
00817     // Match/unmatch with subscriptions
00818     OPENDDS_STRING topic_name = topic_names_[pb.topic_id_];
00819     OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it =
00820           topics_.find(topic_name);
00821     if (top_it != topics_.end()) {
00822       match_endpoints(publicationId, top_it->second);
00823     }
00824     return true;
00825   }
00826   return false;
00827 }
00828 
00829 DDS::ReturnCode_t
00830 Sedp::remove_subscription_i(const RepoId& subscriptionId)
00831 {
00832   return subscriptions_writer_.write_unregister_dispose(subscriptionId);
00833 }
00834 
00835 bool
00836 Sedp::update_subscription_qos(const RepoId& subscriptionId,
00837                               const DDS::DataReaderQos& qos,
00838                               const DDS::SubscriberQos& subscriberQos)
00839 {
00840   ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, false);
00841   LocalSubscriptionIter iter = local_subscriptions_.find(subscriptionId);
00842   if (iter != local_subscriptions_.end()) {
00843     LocalSubscription& sb = iter->second;
00844     sb.qos_ = qos;
00845     sb.subscriber_qos_ = subscriberQos;
00846 
00847     if (DDS::RETCODE_OK != write_subscription_data(subscriptionId, sb)) {
00848       return false;
00849     }
00850     // Match/unmatch with subscriptions
00851     OPENDDS_STRING topic_name = topic_names_[sb.topic_id_];
00852     OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it =
00853           topics_.find(topic_name);
00854     if (top_it != topics_.end()) {
00855       match_endpoints(subscriptionId, top_it->second);
00856     }
00857     return true;
00858   }
00859   return false;
00860 }
00861 
00862 bool
00863 Sedp::update_subscription_params(const RepoId& subId,
00864                                  const DDS::StringSeq& params)
00865 {
00866   ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, false);
00867   const LocalSubscriptionIter iter = local_subscriptions_.find(subId);
00868   if (iter != local_subscriptions_.end()) {
00869     LocalSubscription& sb = iter->second;
00870     sb.filterProperties.expressionParameters = params;
00871 
00872     if (DDS::RETCODE_OK != write_subscription_data(subId, sb)) {
00873       return false;
00874     }
00875 
00876     // Let any associated local publications know about the change
00877     for (DCPS::RepoIdSet::iterator i = iter->second.matched_endpoints_.begin();
00878          i != iter->second.matched_endpoints_.end(); ++i) {
00879       const LocalPublicationIter lpi = local_publications_.find(*i);
00880       if (lpi != local_publications_.end()) {
00881         lpi->second.publication_->update_subscription_params(subId, params);
00882       }
00883     }
00884 
00885     return true;
00886   }
00887   return false;
00888 }
00889 
00890 void
00891 Sedp::shutdown()
00892 {
00893   task_.shutdown();
00894   publications_reader_->shutting_down_ = 1;
00895   subscriptions_reader_->shutting_down_ = 1;
00896   participant_message_reader_->shutting_down_ = 1;
00897 }
00898 
00899 void
00900 Sedp::Task::acknowledge()
00901 {
00902   // id is really a don't care, but just set to REQUEST_ACK
00903   putq(new Msg(Msg::MSG_FINI_BIT, DCPS::REQUEST_ACK, 0));
00904 }
00905 
00906 void
00907 Sedp::Task::shutdown()
00908 {
00909   if (!shutting_down_) {
00910     shutting_down_ = true;
00911     putq(new Msg(Msg::MSG_STOP, DCPS::GRACEFUL_DISCONNECT, 0));
00912     wait();
00913   }
00914 }
00915 
00916 void
00917 Sedp::Task::svc_i(DCPS::MessageId message_id,
00918                   const OpenDDS::DCPS::DiscoveredWriterData* pwdata)
00919 {
00920   ACE_Auto_Basic_Ptr<const OpenDDS::DCPS::DiscoveredWriterData> delete_the_data(pwdata);
00921   sedp_->data_received(message_id, *pwdata);
00922 }
00923 
00924 void
00925 Sedp::data_received(DCPS::MessageId message_id,
00926                     const OpenDDS::DCPS::DiscoveredWriterData& wdata)
00927 {
00928   if (spdp_.shutting_down()) { return; }
00929 
00930   const RepoId& guid = wdata.writerProxy.remoteWriterGuid;
00931   RepoId guid_participant = guid;
00932   guid_participant.entityId = ENTITYID_PARTICIPANT;
00933 
00934   ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00935   if (ignoring(guid)
00936       || ignoring(guid_participant)
00937       || ignoring(wdata.ddsPublicationData.topic_name)) {
00938     return;
00939   }
00940 
00941   if (!spdp_.has_discovered_participant (guid_participant)) {
00942     deferred_publications_[guid] = std::make_pair (message_id, wdata);
00943     return;
00944   }
00945 
00946   OPENDDS_STRING topic_name;
00947   // Find the publication  - iterator valid only as long as we hold the lock
00948   DiscoveredPublicationIter iter = discovered_publications_.find(guid);
00949 
00950   if (message_id == DCPS::SAMPLE_DATA) {
00951     OpenDDS::DCPS::DiscoveredWriterData wdata_copy;
00952 
00953     if (iter == discovered_publications_.end()) { // add new
00954       // Must unlock when calling into pub_bit() as it may call back into us
00955       ACE_Reverse_Lock<ACE_Thread_Mutex> rev_lock(lock_);
00956 
00957       { // Reduce scope of pub and td
00958         DiscoveredPublication& pub =
00959             discovered_publications_[guid] = DiscoveredPublication(wdata);
00960 
00961         topic_name = get_topic_name(pub);
00962         OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it =
00963           topics_.find(topic_name);
00964         if (top_it == topics_.end()) {
00965           top_it =
00966             topics_.insert(std::make_pair(topic_name, TopicDetails())).first;
00967           top_it->second.data_type_ = wdata.ddsPublicationData.type_name;
00968           top_it->second.qos_.topic_data = wdata.ddsPublicationData.topic_data;
00969           top_it->second.repo_id_ = make_topic_guid();
00970 
00971         } else if (top_it->second.data_type_ !=
00972                    wdata.ddsPublicationData.type_name.in()) {
00973           inconsistent_topic(top_it->second.endpoints_);
00974           if (DCPS::DCPS_debug_level) {
00975             ACE_DEBUG((LM_WARNING,
00976                        ACE_TEXT("(%P|%t) Sedp::data_received(dwd) - WARNING ")
00977                        ACE_TEXT("topic %C discovered data type %C doesn't ")
00978                        ACE_TEXT("match known data type %C, ignoring ")
00979                        ACE_TEXT("discovered publication.\n"),
00980                        topic_name.c_str(),
00981                        wdata.ddsPublicationData.type_name.in(),
00982                        top_it->second.data_type_.c_str()));
00983           }
00984           return;
00985         }
00986 
00987         TopicDetails& td = top_it->second;
00988         topic_names_[td.repo_id_] = topic_name;
00989         td.endpoints_.insert(guid);
00990 
00991         std::memcpy(pub.writer_data_.ddsPublicationData.participant_key.value,
00992                     guid.guidPrefix, sizeof(DDS::BuiltinTopicKey_t));
00993         assign_bit_key(pub);
00994         wdata_copy = pub.writer_data_;
00995       }
00996 
00997       // Iter no longer valid once lock released
00998       iter = discovered_publications_.end();
00999 
01000       DDS::InstanceHandle_t instance_handle = DDS::HANDLE_NIL;
01001 #ifndef DDS_HAS_MINIMUM_BIT
01002       {
01003         // Release lock for call into pub_bit
01004         ACE_GUARD(ACE_Reverse_Lock< ACE_Thread_Mutex>, rg, rev_lock);
01005         DDS::PublicationBuiltinTopicDataDataReaderImpl* bit = pub_bit();
01006         if (bit) { // bit may be null if the DomainParticipant is shutting down
01007           instance_handle =
01008             bit->store_synthetic_data(wdata_copy.ddsPublicationData,
01009                                       DDS::NEW_VIEW_STATE);
01010         }
01011       }
01012 #endif /* DDS_HAS_MINIMUM_BIT */
01013 
01014       if (spdp_.shutting_down()) { return; }
01015       // Publication may have been removed while lock released
01016       iter = discovered_publications_.find(guid);
01017       if (iter != discovered_publications_.end()) {
01018         iter->second.bit_ih_ = instance_handle;
01019         OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it =
01020             topics_.find(topic_name);
01021         if (top_it != topics_.end()) {
01022           if (DCPS::DCPS_debug_level > 3) {
01023             ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) Sedp::data_received(dwd) - ")
01024                                  ACE_TEXT("calling match_endpoints new\n")));
01025           }
01026           match_endpoints(guid, top_it->second);
01027         }
01028       }
01029 
01030     } else if (qosChanged(iter->second.writer_data_.ddsPublicationData,
01031                           wdata.ddsPublicationData)) { // update existing
01032 #ifndef DDS_HAS_MINIMUM_BIT
01033       DDS::PublicationBuiltinTopicDataDataReaderImpl* bit = pub_bit();
01034       if (bit) { // bit may be null if the DomainParticipant is shutting down
01035         bit->store_synthetic_data(iter->second.writer_data_.ddsPublicationData,
01036                                   DDS::NOT_NEW_VIEW_STATE);
01037       }
01038 #endif /* DDS_HAS_MINIMUM_BIT */
01039 
01040       // Match/unmatch local subscription(s)
01041       topic_name = get_topic_name(iter->second);
01042       OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it =
01043           topics_.find(topic_name);
01044       if (top_it != topics_.end()) {
01045         if (DCPS::DCPS_debug_level > 3) {
01046           ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) Sedp::data_received(dwd) - ")
01047                                ACE_TEXT("calling match_endpoints update\n")));
01048         }
01049         match_endpoints(guid, top_it->second);
01050       }
01051     }
01052 
01053   } else if (message_id == DCPS::UNREGISTER_INSTANCE ||
01054              message_id == DCPS::DISPOSE_INSTANCE ||
01055              message_id == DCPS::DISPOSE_UNREGISTER_INSTANCE) {
01056     if (iter != discovered_publications_.end()) {
01057       // Unmatch local subscription(s)
01058       topic_name = get_topic_name(iter->second);
01059       OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it =
01060           topics_.find(topic_name);
01061       if (top_it != topics_.end()) {
01062         top_it->second.endpoints_.erase(guid);
01063         match_endpoints(guid, top_it->second, true /*remove*/);
01064         if (spdp_.shutting_down()) { return; }
01065       }
01066       remove_from_bit(iter->second);
01067       if (DCPS::DCPS_debug_level > 3) {
01068         ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) Sedp::data_received(dwd) - ")
01069                              ACE_TEXT("calling match_endpoints disp/unreg\n")));
01070       }
01071       discovered_publications_.erase(iter);
01072     }
01073   }
01074 }
01075 
01076 void
01077 Sedp::Task::svc_i(DCPS::MessageId message_id,
01078                   const OpenDDS::DCPS::DiscoveredReaderData* prdata)
01079 {
01080   ACE_Auto_Basic_Ptr<const OpenDDS::DCPS::DiscoveredReaderData> delete_the_data(prdata);
01081   sedp_->data_received(message_id, *prdata);
01082 }
01083 
01084 void
01085 Sedp::data_received(DCPS::MessageId message_id,
01086                     const OpenDDS::DCPS::DiscoveredReaderData& rdata)
01087 {
01088   if (spdp_.shutting_down()) { return; }
01089 
01090   const RepoId& guid = rdata.readerProxy.remoteReaderGuid;
01091   RepoId guid_participant = guid;
01092   guid_participant.entityId = ENTITYID_PARTICIPANT;
01093 
01094   ACE_GUARD(ACE_Thread_Mutex, g, lock_);
01095 
01096   if (ignoring(guid)
01097       || ignoring(guid_participant)
01098       || ignoring(rdata.ddsSubscriptionData.topic_name)) {
01099     return;
01100   }
01101 
01102   if (!spdp_.has_discovered_participant (guid_participant)) {
01103     deferred_subscriptions_[guid] = std::make_pair (message_id, rdata);
01104     return;
01105   }
01106 
01107   OPENDDS_STRING topic_name;
01108   // Find the publication  - iterator valid only as long as we hold the lock
01109   DiscoveredSubscriptionIter iter = discovered_subscriptions_.find(guid);
01110 
01111   // Must unlock when calling into sub_bit() as it may call back into us
01112   ACE_Reverse_Lock<ACE_Thread_Mutex> rev_lock(lock_);
01113 
01114   if (message_id == DCPS::SAMPLE_DATA) {
01115     OpenDDS::DCPS::DiscoveredReaderData rdata_copy;
01116 
01117     if (iter == discovered_subscriptions_.end()) { // add new
01118       { // Reduce scope of sub and td
01119         DiscoveredSubscription& sub =
01120           discovered_subscriptions_[guid] = DiscoveredSubscription(rdata);
01121 
01122         topic_name = get_topic_name(sub);
01123         OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it =
01124           topics_.find(topic_name);
01125         if (top_it == topics_.end()) {
01126           top_it =
01127             topics_.insert(std::make_pair(topic_name, TopicDetails())).first;
01128           top_it->second.data_type_ = rdata.ddsSubscriptionData.type_name;
01129           top_it->second.qos_.topic_data = rdata.ddsSubscriptionData.topic_data;
01130           top_it->second.repo_id_ = make_topic_guid();
01131 
01132         } else if (top_it->second.data_type_ !=
01133                    rdata.ddsSubscriptionData.type_name.in()) {
01134           inconsistent_topic(top_it->second.endpoints_);
01135           if (DCPS::DCPS_debug_level) {
01136             ACE_DEBUG((LM_WARNING,
01137                        ACE_TEXT("(%P|%t) Sedp::data_received(drd) - WARNING ")
01138                        ACE_TEXT("topic %C discovered data type %C doesn't ")
01139                        ACE_TEXT("match known data type %C, ignoring ")
01140                        ACE_TEXT("discovered subcription.\n"),
01141                        topic_name.c_str(),
01142                        rdata.ddsSubscriptionData.type_name.in(),
01143                        top_it->second.data_type_.c_str()));
01144           }
01145           return;
01146         }
01147 
01148         TopicDetails& td = top_it->second;
01149         topic_names_[td.repo_id_] = topic_name;
01150         td.endpoints_.insert(guid);
01151 
01152         std::memcpy(sub.reader_data_.ddsSubscriptionData.participant_key.value,
01153                     guid.guidPrefix, sizeof(DDS::BuiltinTopicKey_t));
01154         assign_bit_key(sub);
01155         rdata_copy = sub.reader_data_;
01156       }
01157 
01158       // Iter no longer valid once lock released
01159       iter = discovered_subscriptions_.end();
01160 
01161       DDS::InstanceHandle_t instance_handle = DDS::HANDLE_NIL;
01162 #ifndef DDS_HAS_MINIMUM_BIT
01163       {
01164         // Release lock for call into sub_bit
01165         ACE_GUARD(ACE_Reverse_Lock< ACE_Thread_Mutex>, rg, rev_lock);
01166         DDS::SubscriptionBuiltinTopicDataDataReaderImpl* bit = sub_bit();
01167         if (bit) { // bit may be null if the DomainParticipant is shutting down
01168           instance_handle =
01169             bit->store_synthetic_data(rdata_copy.ddsSubscriptionData,
01170                                       DDS::NEW_VIEW_STATE);
01171         }
01172       }
01173 #endif /* DDS_HAS_MINIMUM_BIT */
01174 
01175       if (spdp_.shutting_down()) { return; }
01176       // Subscription may have been removed while lock released
01177       iter = discovered_subscriptions_.find(guid);
01178       if (iter != discovered_subscriptions_.end()) {
01179         iter->second.bit_ih_ = instance_handle;
01180         OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it =
01181             topics_.find(topic_name);
01182         if (top_it != topics_.end()) {
01183           if (DCPS::DCPS_debug_level > 3) {
01184             ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) Sedp::data_received(drd) - ")
01185                                  ACE_TEXT("calling match_endpoints new\n")));
01186           }
01187           match_endpoints(guid, top_it->second);
01188         }
01189       }
01190 
01191     } else { // update existing
01192       if (qosChanged(iter->second.reader_data_.ddsSubscriptionData,
01193                      rdata.ddsSubscriptionData)) {
01194 #ifndef DDS_HAS_MINIMUM_BIT
01195         DDS::SubscriptionBuiltinTopicDataDataReaderImpl* bit = sub_bit();
01196         if (bit) { // bit may be null if the DomainParticipant is shutting down
01197           bit->store_synthetic_data(
01198                 iter->second.reader_data_.ddsSubscriptionData,
01199                 DDS::NOT_NEW_VIEW_STATE);
01200         }
01201 #endif /* DDS_HAS_MINIMUM_BIT */
01202 
01203         // Match/unmatch local publication(s)
01204         topic_name = get_topic_name(iter->second);
01205         OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it =
01206             topics_.find(topic_name);
01207         if (top_it != topics_.end()) {
01208           if (DCPS::DCPS_debug_level > 3) {
01209             ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) Sedp::data_received(drd) - ")
01210                                  ACE_TEXT("calling match_endpoints update\n")));
01211           }
01212           match_endpoints(guid, top_it->second);
01213         }
01214       }
01215 
01216       if (paramsChanged(iter->second.reader_data_.contentFilterProperty,
01217                         rdata.contentFilterProperty)) {
01218         // Let any associated local publications know about the change
01219         topic_name = get_topic_name(iter->second);
01220         OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it =
01221             topics_.find(topic_name);
01222         using DCPS::RepoIdSet;
01223         const RepoIdSet& assoc =
01224           (top_it == topics_.end()) ? RepoIdSet() : top_it->second.endpoints_;
01225         for (RepoIdSet::const_iterator i = assoc.begin(); i != assoc.end(); ++i) {
01226           if (i->entityId.entityKind & 4) continue; // subscription
01227           const LocalPublicationIter lpi = local_publications_.find(*i);
01228           if (lpi != local_publications_.end()) {
01229             lpi->second.publication_->update_subscription_params(guid,
01230               rdata.contentFilterProperty.expressionParameters);
01231           }
01232         }
01233       }
01234     }
01235     // For each associated opendds writer to this reader
01236     CORBA::ULong len = rdata.readerProxy.associatedWriters.length();
01237     for (CORBA::ULong writerIndex = 0; writerIndex < len; ++writerIndex)
01238     {
01239       GUID_t writerGuid = rdata.readerProxy.associatedWriters[writerIndex];
01240 
01241       // If the associated writer is in this participant
01242       LocalPublicationIter lp = local_publications_.find(writerGuid);
01243       if (lp != local_publications_.end()) {
01244         // If the local writer is not fully associated with the reader
01245         if (lp->second.remote_opendds_associations_.insert(guid).second) {
01246           // This is a new association
01247           lp->second.publication_->association_complete(guid);
01248         }
01249       }
01250     }
01251 
01252   } else if (message_id == DCPS::UNREGISTER_INSTANCE ||
01253              message_id == DCPS::DISPOSE_INSTANCE ||
01254              message_id == DCPS::DISPOSE_UNREGISTER_INSTANCE) {
01255     if (iter != discovered_subscriptions_.end()) {
01256       // Unmatch local publication(s)
01257       topic_name = get_topic_name(iter->second);
01258       OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it =
01259           topics_.find(topic_name);
01260       if (top_it != topics_.end()) {
01261         top_it->second.endpoints_.erase(guid);
01262         if (DCPS::DCPS_debug_level > 3) {
01263           ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) Sedp::data_received(drd) - ")
01264                                ACE_TEXT("calling match_endpoints disp/unreg\n")));
01265         }
01266         match_endpoints(guid, top_it->second, true /*remove*/);
01267         if (spdp_.shutting_down()) { return; }
01268       }
01269       remove_from_bit(iter->second);
01270       discovered_subscriptions_.erase(iter);
01271     }
01272   }
01273 }
01274 
01275 void
01276 Sedp::Task::svc_i(DCPS::MessageId message_id,
01277                   const ParticipantMessageData* data)
01278 {
01279   ACE_Auto_Basic_Ptr<const ParticipantMessageData> delete_the_data(data);
01280   sedp_->data_received(message_id, *data);
01281 }
01282 
01283 void
01284 Sedp::data_received(DCPS::MessageId /*message_id*/,
01285                     const ParticipantMessageData& data)
01286 {
01287   if (spdp_.shutting_down()) { return; }
01288 
01289   const RepoId& guid = data.participantGuid;
01290   RepoId guid_participant = guid;
01291   guid_participant.entityId = ENTITYID_PARTICIPANT;
01292   RepoId prefix = data.participantGuid;
01293   prefix.entityId = EntityId_t(); // Clear the entityId so lower bound will work.
01294 
01295   ACE_GUARD(ACE_Thread_Mutex, g, lock_);
01296 
01297   if (ignoring(guid)
01298       || ignoring(guid_participant)) {
01299     return;
01300   }
01301 
01302   if (!spdp_.has_discovered_participant (guid_participant)) {
01303     return;
01304   }
01305 
01306   for (LocalSubscriptionMap::const_iterator sub_pos = local_subscriptions_.begin(),
01307          sub_limit = local_subscriptions_.end();
01308        sub_pos != sub_limit; ++sub_pos) {
01309     const DCPS::RepoIdSet::const_iterator pos =
01310       sub_pos->second.matched_endpoints_.lower_bound(prefix);
01311     if (pos != sub_pos->second.matched_endpoints_.end() &&
01312         OpenDDS::DCPS::GuidPrefixEqual()(pos->guidPrefix, prefix.guidPrefix)) {
01313       sub_pos->second.subscription_->signal_liveliness(guid_participant);
01314     }
01315   }
01316 }
01317 
01318 // helper for match(), below
01319 
01320 void
01321 Sedp::association_complete(const RepoId& localId,
01322                            const RepoId& remoteId)
01323 {
01324   // If the remote endpoint is an opendds endpoint
01325   if (is_opendds(remoteId)) {
01326     LocalSubscriptionIter sub = local_subscriptions_.find(localId);
01327     // If the local endpoint is a reader
01328     if (sub != local_subscriptions_.end()) {
01329       std::pair<DCPS::RepoIdSet::iterator, bool> result =
01330           sub->second.remote_opendds_associations_.insert(remoteId);
01331       // If this is a new association for the local reader
01332       if (result.second) {
01333         // Tell other participants
01334         write_subscription_data(localId, sub->second);
01335       }
01336     }
01337   }
01338 }
01339 
01340 void
01341 Sedp::signal_liveliness(DDS::LivelinessQosPolicyKind kind)
01342 {
01343   ParticipantMessageData pmd;
01344   pmd.participantGuid = participant_id_;
01345   switch (kind) {
01346   case DDS::AUTOMATIC_LIVELINESS_QOS:
01347     pmd.participantGuid.entityId = OpenDDS::DCPS::EntityIdConverter(PARTICIPANT_MESSAGE_DATA_KIND_AUTOMATIC_LIVELINESS_UPDATE);
01348     participant_message_writer_.write_sample(pmd, GUID_UNKNOWN, automatic_liveliness_seq_);
01349     break;
01350   case DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS:
01351     pmd.participantGuid.entityId = OpenDDS::DCPS::EntityIdConverter(PARTICIPANT_MESSAGE_DATA_KIND_MANUAL_LIVELINESS_UPDATE);
01352     participant_message_writer_.write_sample(pmd, GUID_UNKNOWN, manual_liveliness_seq_);
01353     break;
01354   case DDS::MANUAL_BY_TOPIC_LIVELINESS_QOS:
01355     // Do nothing.
01356     break;
01357   }
01358 }
01359 
01360 Sedp::Endpoint::~Endpoint()
01361 {
01362 }
01363 
01364 //---------------------------------------------------------------
01365 Sedp::Writer::Writer(const RepoId& pub_id, Sedp& sedp)
01366   : Endpoint(pub_id, sedp),
01367     alloc_(2, sizeof(DCPS::TransportSendElementAllocator))
01368 {
01369   header_.prefix[0] = 'R';
01370   header_.prefix[1] = 'T';
01371   header_.prefix[2] = 'P';
01372   header_.prefix[3] = 'S';
01373   header_.version = PROTOCOLVERSION;
01374   header_.vendorId = VENDORID_OPENDDS;
01375   header_.guidPrefix[0] = pub_id.guidPrefix[0];
01376   header_.guidPrefix[1] = pub_id.guidPrefix[1],
01377   header_.guidPrefix[2] = pub_id.guidPrefix[2];
01378   header_.guidPrefix[3] = pub_id.guidPrefix[3];
01379   header_.guidPrefix[4] = pub_id.guidPrefix[4];
01380   header_.guidPrefix[5] = pub_id.guidPrefix[5];
01381   header_.guidPrefix[6] = pub_id.guidPrefix[6];
01382   header_.guidPrefix[7] = pub_id.guidPrefix[7];
01383   header_.guidPrefix[8] = pub_id.guidPrefix[8];
01384   header_.guidPrefix[9] = pub_id.guidPrefix[9];
01385   header_.guidPrefix[10] = pub_id.guidPrefix[10];
01386   header_.guidPrefix[11] = pub_id.guidPrefix[11];
01387 }
01388 
01389 Sedp::Writer::~Writer()
01390 {
01391 }
01392 
01393 bool
01394 Sedp::Writer::assoc(const DCPS::AssociationData& subscription)
01395 {
01396   return associate(subscription, true);
01397 }
01398 
01399 void
01400 Sedp::Writer::data_delivered(const DCPS::DataSampleElement* dsle)
01401 {
01402   delete dsle;
01403 }
01404 
01405 void
01406 Sedp::Writer::data_dropped(const DCPS::DataSampleElement* dsle, bool)
01407 {
01408   delete dsle;
01409 }
01410 
01411 void
01412 Sedp::Writer::control_delivered(ACE_Message_Block* mb)
01413 {
01414   if (mb->flags() == ACE_Message_Block::DONT_DELETE) {
01415     // We allocated mb on stack, its continuation block on heap
01416     delete mb->cont();
01417   } else {
01418     mb->release();
01419   }
01420 }
01421 
01422 void
01423 Sedp::Writer::control_dropped(ACE_Message_Block* mb, bool)
01424 {
01425   if (mb->flags() == ACE_Message_Block::DONT_DELETE) {
01426     // We allocated mb on stack, its continuation block on heap
01427     delete mb->cont();
01428   } else {
01429     mb->release();
01430   }
01431 }
01432 
01433 DDS::ReturnCode_t
01434 Sedp::Writer::write_sample(const ParameterList& plist,
01435                            const DCPS::RepoId& reader,
01436                            DCPS::SequenceNumber& sequence)
01437 {
01438   DDS::ReturnCode_t result = DDS::RETCODE_OK;
01439 
01440   // Determine message length
01441   size_t size = 0, padding = 0;
01442   DCPS::find_size_ulong(size, padding);
01443   DCPS::gen_find_size(plist, size, padding);
01444 
01445   // Build RTPS message
01446   ACE_Message_Block payload(DCPS::DataSampleHeader::max_marshaled_size(),
01447                             ACE_Message_Block::MB_DATA,
01448                             new ACE_Message_Block(size));
01449   using DCPS::Serializer;
01450   Serializer ser(payload.cont(), host_is_bigendian_, Serializer::ALIGN_CDR);
01451   bool ok = (ser << ACE_OutputCDR::from_octet(0)) &&  // PL_CDR_LE = 0x0003
01452             (ser << ACE_OutputCDR::from_octet(3)) &&
01453             (ser << ACE_OutputCDR::from_octet(0)) &&
01454             (ser << ACE_OutputCDR::from_octet(0)) &&
01455             (ser << plist);
01456   if (!ok) {
01457     result = DDS::RETCODE_ERROR;
01458   }
01459 
01460   if (result == DDS::RETCODE_OK) {
01461     // Send sample
01462     DCPS::DataSampleElement* list_el =
01463       new DCPS::DataSampleElement(repo_id_, this, 0, &alloc_, 0);
01464     set_header_fields(list_el->get_header(), size, reader, sequence);
01465 
01466     list_el->set_sample(new ACE_Message_Block(size));
01467     *list_el->get_sample() << list_el->get_header();
01468     list_el->get_sample()->cont(payload.duplicate());
01469 
01470     if (reader != GUID_UNKNOWN) {
01471       list_el->set_sub_id(0, reader);
01472       list_el->set_num_subs(1);
01473     }
01474 
01475     DCPS::SendStateDataSampleList list;
01476     list.enqueue_tail(list_el);
01477 
01478     send(list);
01479   }
01480   delete payload.cont();
01481   return result;
01482 }
01483 
01484 DDS::ReturnCode_t
01485 Sedp::Writer::write_sample(const ParticipantMessageData& pmd,
01486                            const DCPS::RepoId& reader,
01487                            DCPS::SequenceNumber& sequence)
01488 {
01489   DDS::ReturnCode_t result = DDS::RETCODE_OK;
01490 
01491   // Determine message length
01492   size_t size = 0, padding = 0;
01493   DCPS::find_size_ulong(size, padding);
01494   DCPS::gen_find_size(pmd, size, padding);
01495 
01496   // Build RTPS message
01497   ACE_Message_Block payload(DCPS::DataSampleHeader::max_marshaled_size(),
01498                             ACE_Message_Block::MB_DATA,
01499                             new ACE_Message_Block(size));
01500   using DCPS::Serializer;
01501   Serializer ser(payload.cont(), host_is_bigendian_, Serializer::ALIGN_CDR);
01502   bool ok = (ser << ACE_OutputCDR::from_octet(0)) &&  // CDR_LE = 0x0001
01503             (ser << ACE_OutputCDR::from_octet(1)) &&
01504             (ser << ACE_OutputCDR::from_octet(0)) &&
01505             (ser << ACE_OutputCDR::from_octet(0)) &&
01506             (ser << pmd);
01507   if (!ok) {
01508     result = DDS::RETCODE_ERROR;
01509   }
01510 
01511   if (result == DDS::RETCODE_OK) {
01512     // Send sample
01513     DCPS::DataSampleElement* list_el =
01514       new DCPS::DataSampleElement(repo_id_, this, 0, &alloc_, 0);
01515     set_header_fields(list_el->get_header(), size, reader, sequence);
01516 
01517     list_el->set_sample(new ACE_Message_Block(size));
01518     *list_el->get_sample() << list_el->get_header();
01519     list_el->get_sample()->cont(payload.duplicate());
01520 
01521     if (reader != GUID_UNKNOWN) {
01522       list_el->set_sub_id(0, reader);
01523       list_el->set_num_subs(1);
01524     }
01525 
01526     DCPS::SendStateDataSampleList list;
01527     list.enqueue_tail(list_el);
01528 
01529     send(list);
01530   }
01531   delete payload.cont();
01532   return result;
01533 }
01534 
01535 DDS::ReturnCode_t
01536 Sedp::Writer::write_unregister_dispose(const RepoId& rid)
01537 {
01538   // Build param list for message
01539   Parameter param;
01540   param.guid(rid);
01541   param._d(PID_ENDPOINT_GUID);
01542   ParameterList plist;
01543   plist.length(1);
01544   plist[0] = param;
01545 
01546   // Determine message length
01547   size_t size = 0, padding = 0;
01548   DCPS::find_size_ulong(size, padding);
01549   DCPS::gen_find_size(plist, size, padding);
01550 
01551   ACE_Message_Block* payload = new ACE_Message_Block(DCPS::DataSampleHeader::max_marshaled_size(),
01552                             ACE_Message_Block::MB_DATA,
01553                             new ACE_Message_Block(size));
01554 
01555   if (!payload) {
01556     ACE_DEBUG((LM_ERROR,
01557                ACE_TEXT("(%P|%t) ERROR: Sedp::Writer::write_unregister_dispose")
01558                ACE_TEXT(" - Failed to allocate message block message\n")));
01559     return DDS::RETCODE_ERROR;
01560   }
01561 
01562   using DCPS::Serializer;
01563   Serializer ser(payload->cont(), host_is_bigendian_, Serializer::ALIGN_CDR);
01564   bool ok = (ser << ACE_OutputCDR::from_octet(0)) &&  // PL_CDR_LE = 0x0003
01565             (ser << ACE_OutputCDR::from_octet(3)) &&
01566             (ser << ACE_OutputCDR::from_octet(0)) &&
01567             (ser << ACE_OutputCDR::from_octet(0)) &&
01568             (ser << plist);
01569 
01570   if (ok) {
01571     // Send
01572     write_control_msg(*payload, size, DCPS::DISPOSE_UNREGISTER_INSTANCE);
01573     return DDS::RETCODE_OK;
01574   } else {
01575     // Error
01576     ACE_DEBUG((LM_ERROR,
01577                ACE_TEXT("(%P|%t) ERROR: Sedp::Writer::write_unregister_dispose")
01578                ACE_TEXT(" - Failed to serialize RTPS control message\n")));
01579     return DDS::RETCODE_ERROR;
01580   }
01581 }
01582 
01583 void
01584 Sedp::Writer::end_historic_samples(const DCPS::RepoId& reader)
01585 {
01586   const void* pReader = static_cast<const void*>(&reader);
01587   ACE_Message_Block mb(DCPS::DataSampleHeader::max_marshaled_size(),
01588                        ACE_Message_Block::MB_DATA,
01589                        new ACE_Message_Block(static_cast<const char*>(pReader),
01590                                              sizeof(reader)));
01591   mb.set_flags(ACE_Message_Block::DONT_DELETE);
01592   mb.cont()->wr_ptr(sizeof(reader));
01593   // 'mb' would contain the DSHeader, but we skip it. mb.cont() has the data
01594   write_control_msg(mb, sizeof(reader), DCPS::END_HISTORIC_SAMPLES,
01595                     DCPS::SequenceNumber::SEQUENCENUMBER_UNKNOWN());
01596 }
01597 
01598 void
01599 Sedp::Writer::write_control_msg(ACE_Message_Block& payload,
01600                                 size_t size,
01601                                 DCPS::MessageId id,
01602                                 DCPS::SequenceNumber seq)
01603 {
01604   DCPS::DataSampleHeader header;
01605   set_header_fields(header, size, GUID_UNKNOWN, seq, id);
01606   // no need to serialize header since rtps_udp transport ignores it
01607   send_control(header, &payload);
01608 }
01609 
01610 void
01611 Sedp::Writer::set_header_fields(DCPS::DataSampleHeader& dsh,
01612                                 size_t size,
01613                                 const DCPS::RepoId& reader,
01614                                 DCPS::SequenceNumber& sequence,
01615                                 DCPS::MessageId id)
01616 {
01617   dsh.message_id_ = id;
01618   dsh.byte_order_ = ACE_CDR_BYTE_ORDER;
01619   dsh.message_length_ = static_cast<ACE_UINT32>(size);
01620   dsh.publication_id_ = repo_id_;
01621 
01622   if (reader == GUID_UNKNOWN ||
01623       sequence == DCPS::SequenceNumber::SEQUENCENUMBER_UNKNOWN()) {
01624     sequence = seq_++;
01625   }
01626 
01627   if (reader != GUID_UNKNOWN) {
01628     // retransmit with same seq# for durability
01629     dsh.historic_sample_ = true;
01630   }
01631 
01632   dsh.sequence_ = sequence;
01633 
01634   const ACE_Time_Value now = ACE_OS::gettimeofday();
01635   dsh.source_timestamp_sec_ = static_cast<ACE_INT32>(now.sec());
01636   dsh.source_timestamp_nanosec_ = now.usec() * 1000;
01637 }
01638 
01639 //-------------------------------------------------------------------------
01640 
01641 Sedp::Reader::~Reader()
01642 {}
01643 
01644 bool
01645 Sedp::Reader::assoc(const DCPS::AssociationData& publication)
01646 {
01647   return associate(publication, false);
01648 }
01649 
01650 
01651 // Implementing TransportReceiveListener
01652 
01653 static bool
01654 decode_parameter_list(const DCPS::ReceivedDataSample& sample,
01655                       DCPS::Serializer& ser,
01656                       const ACE_CDR::Octet& encap,
01657                       ParameterList& data)
01658 {
01659   bool ok = true;
01660   if (ok && sample.header_.key_fields_only_ && encap < 2) {
01661     GUID_t guid;
01662     ok &= (ser >> guid);
01663     data.length(1);
01664     data[0].guid(guid);
01665     data[0]._d(PID_ENDPOINT_GUID);
01666   } else {
01667     ok &= (ser >> data);
01668   }
01669   return ok;
01670 }
01671 
01672 void
01673 Sedp::Reader::data_received(const DCPS::ReceivedDataSample& sample)
01674 {
01675   if (shutting_down_.value()) return;
01676 
01677   switch (sample.header_.message_id_) {
01678   case DCPS::SAMPLE_DATA:
01679   case DCPS::DISPOSE_INSTANCE:
01680   case DCPS::UNREGISTER_INSTANCE:
01681   case DCPS::DISPOSE_UNREGISTER_INSTANCE: {
01682     const DCPS::MessageId id =
01683       static_cast<DCPS::MessageId>(sample.header_.message_id_);
01684 
01685     DCPS::Serializer ser(sample.sample_,
01686                          sample.header_.byte_order_ != ACE_CDR_BYTE_ORDER,
01687                          DCPS::Serializer::ALIGN_CDR);
01688     bool ok = true;
01689     ACE_CDR::Octet encap, dummy;
01690     ACE_CDR::UShort options;
01691     ok &= (ser >> ACE_InputCDR::to_octet(dummy))
01692       && (ser >> ACE_InputCDR::to_octet(encap))
01693       && (ser >> options);
01694 
01695     // Ignore the 'encap' byte order since we use sample.header_.byte_order_
01696     // to determine whether or not to swap bytes.
01697 
01698     if (sample.header_.publication_id_.entityId == ENTITYID_SEDP_BUILTIN_PUBLICATIONS_WRITER) {
01699       ParameterList data;
01700       if (!decode_parameter_list(sample, ser, encap, data)) {
01701         ACE_DEBUG((LM_ERROR, ACE_TEXT("ERROR: Sedp::Reader::data_received - ")
01702                    ACE_TEXT("failed to deserialize data\n")));
01703         return;
01704       }
01705 
01706       ACE_Auto_Ptr<OpenDDS::DCPS::DiscoveredWriterData> wdata(new OpenDDS::DCPS::DiscoveredWriterData);
01707       if (ParameterListConverter::from_param_list(data, *wdata) < 0) {
01708         ACE_DEBUG((LM_ERROR,
01709                    ACE_TEXT("(%P|%t) ERROR: Sedp::Reader::data_received - ")
01710                    ACE_TEXT("failed to convert from ParameterList ")
01711                    ACE_TEXT("to DiscoveredWriterData\n")));
01712         return;
01713       }
01714       sedp_.task_.enqueue(id, wdata.release());
01715 
01716     } else if (sample.header_.publication_id_.entityId == ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_WRITER) {
01717       ParameterList data;
01718       if (!decode_parameter_list(sample, ser, encap, data)) {
01719         ACE_DEBUG((LM_ERROR, ACE_TEXT("ERROR: Sedp::Reader::data_received - ")
01720                    ACE_TEXT("failed to deserialize data\n")));
01721         return;
01722       }
01723 
01724       ACE_Auto_Ptr<OpenDDS::DCPS::DiscoveredReaderData> rdata(new OpenDDS::DCPS::DiscoveredReaderData);
01725       if (ParameterListConverter::from_param_list(data, *rdata) < 0) {
01726         ACE_DEBUG((LM_ERROR,
01727                    ACE_TEXT("(%P|%t) ERROR Sedp::Reader::data_received - ")
01728                    ACE_TEXT("failed to convert from ParameterList ")
01729                    ACE_TEXT("to DiscoveredReaderData\n")));
01730         return;
01731       }
01732       if (rdata->readerProxy.expectsInlineQos) {
01733         set_inline_qos(rdata->readerProxy.allLocators);
01734       }
01735       sedp_.task_.enqueue(id, rdata.release());
01736 
01737     } else if (sample.header_.publication_id_.entityId == ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_WRITER
01738                && !sample.header_.key_fields_only_) {
01739       ACE_Auto_Ptr<ParticipantMessageData> data(new ParticipantMessageData);
01740       if (!(ser >> *data)) {
01741         ACE_DEBUG((LM_ERROR, ACE_TEXT("ERROR: Sedp::Reader::data_received - ")
01742                    ACE_TEXT("failed to deserialize data\n")));
01743         return;
01744       }
01745 
01746       sedp_.task_.enqueue(id, data.release());
01747 
01748     }
01749   }
01750     break;
01751 
01752   default:
01753     break;
01754   }
01755 }
01756 
01757 void
01758 Sedp::populate_discovered_writer_msg(
01759     OpenDDS::DCPS::DiscoveredWriterData& dwd,
01760     const RepoId& publication_id,
01761     const LocalPublication& pub)
01762 {
01763   // Ignored on the wire dwd.ddsPublicationData.key
01764   // Ignored on the wire dwd.ddsPublicationData.participant_key
01765   OPENDDS_STRING topic_name = topic_names_[pub.topic_id_];
01766   dwd.ddsPublicationData.topic_name = topic_name.c_str();
01767   TopicDetails& topic_details = topics_[topic_name];
01768   dwd.ddsPublicationData.type_name = topic_details.data_type_.c_str();
01769   dwd.ddsPublicationData.durability = pub.qos_.durability;
01770   dwd.ddsPublicationData.durability_service = pub.qos_.durability_service;
01771   dwd.ddsPublicationData.deadline = pub.qos_.deadline;
01772   dwd.ddsPublicationData.latency_budget = pub.qos_.latency_budget;
01773   dwd.ddsPublicationData.liveliness = pub.qos_.liveliness;
01774   dwd.ddsPublicationData.reliability = pub.qos_.reliability;
01775   dwd.ddsPublicationData.lifespan = pub.qos_.lifespan;
01776   dwd.ddsPublicationData.user_data = pub.qos_.user_data;
01777   dwd.ddsPublicationData.ownership = pub.qos_.ownership;
01778   dwd.ddsPublicationData.ownership_strength = pub.qos_.ownership_strength;
01779   dwd.ddsPublicationData.destination_order = pub.qos_.destination_order;
01780   dwd.ddsPublicationData.presentation = pub.publisher_qos_.presentation;
01781   dwd.ddsPublicationData.partition = pub.publisher_qos_.partition;
01782   dwd.ddsPublicationData.topic_data = topic_details.qos_.topic_data;
01783   dwd.ddsPublicationData.group_data = pub.publisher_qos_.group_data;
01784   dwd.writerProxy.remoteWriterGuid = publication_id;
01785   // Ignore dwd.writerProxy.unicastLocatorList;
01786   // Ignore dwd.writerProxy.multicastLocatorList;
01787   dwd.writerProxy.allLocators = pub.trans_info_;
01788 }
01789 
01790 void
01791 Sedp::populate_discovered_reader_msg(
01792     OpenDDS::DCPS::DiscoveredReaderData& drd,
01793     const RepoId& subscription_id,
01794     const LocalSubscription& sub)
01795 {
01796   // Ignored on the wire drd.ddsSubscription.key
01797   // Ignored on the wire drd.ddsSubscription.participant_key
01798   OPENDDS_STRING topic_name = topic_names_[sub.topic_id_];
01799   drd.ddsSubscriptionData.topic_name = topic_name.c_str();
01800   TopicDetails& topic_details = topics_[topic_name];
01801   drd.ddsSubscriptionData.type_name = topic_details.data_type_.c_str();
01802   drd.ddsSubscriptionData.durability = sub.qos_.durability;
01803   drd.ddsSubscriptionData.deadline = sub.qos_.deadline;
01804   drd.ddsSubscriptionData.latency_budget = sub.qos_.latency_budget;
01805   drd.ddsSubscriptionData.liveliness = sub.qos_.liveliness;
01806   drd.ddsSubscriptionData.reliability = sub.qos_.reliability;
01807   drd.ddsSubscriptionData.ownership = sub.qos_.ownership;
01808   drd.ddsSubscriptionData.destination_order = sub.qos_.destination_order;
01809   drd.ddsSubscriptionData.user_data = sub.qos_.user_data;
01810   drd.ddsSubscriptionData.time_based_filter = sub.qos_.time_based_filter;
01811   drd.ddsSubscriptionData.presentation = sub.subscriber_qos_.presentation;
01812   drd.ddsSubscriptionData.partition = sub.subscriber_qos_.partition;
01813   drd.ddsSubscriptionData.topic_data = topic_details.qos_.topic_data;
01814   drd.ddsSubscriptionData.group_data = sub.subscriber_qos_.group_data;
01815   drd.readerProxy.remoteReaderGuid = subscription_id;
01816   drd.readerProxy.expectsInlineQos = false;  // We never expect inline qos
01817   // Ignore drd.readerProxy.unicastLocatorList;
01818   // Ignore drd.readerProxy.multicastLocatorList;
01819   drd.readerProxy.allLocators = sub.trans_info_;
01820   drd.contentFilterProperty.contentFilteredTopicName =
01821     OPENDDS_STRING(DCPS::GuidConverter(subscription_id)).c_str();
01822   drd.contentFilterProperty.relatedTopicName = topic_name.c_str();
01823   drd.contentFilterProperty.filterClassName = ""; // PLConverter adds default
01824   drd.contentFilterProperty.filterExpression = sub.filterProperties.filterExpression;
01825   drd.contentFilterProperty.expressionParameters = sub.filterProperties.expressionParameters;
01826   for (DCPS::RepoIdSet::const_iterator writer =
01827         sub.remote_opendds_associations_.begin();
01828        writer != sub.remote_opendds_associations_.end();
01829        ++writer)
01830   {
01831     CORBA::ULong len = drd.readerProxy.associatedWriters.length();
01832     drd.readerProxy.associatedWriters.length(len + 1);
01833     drd.readerProxy.associatedWriters[len] = *writer;
01834   }
01835 }
01836 
01837 void
01838 Sedp::write_durable_publication_data(const DCPS::RepoId& reader)
01839 {
01840   LocalPublicationIter pub, end = local_publications_.end();
01841   for (pub = local_publications_.begin(); pub != end; ++pub) {
01842     write_publication_data(pub->first, pub->second, reader);
01843   }
01844   publications_writer_.end_historic_samples(reader);
01845 }
01846 
01847 void
01848 Sedp::write_durable_subscription_data(const DCPS::RepoId& reader)
01849 {
01850   LocalSubscriptionIter sub, end = local_subscriptions_.end();
01851   for (sub = local_subscriptions_.begin(); sub != end; ++sub) {
01852     write_subscription_data(sub->first, sub->second, reader);
01853   }
01854   subscriptions_writer_.end_historic_samples(reader);
01855 }
01856 
01857 void
01858 Sedp::write_durable_participant_message_data(const DCPS::RepoId& reader)
01859 {
01860   LocalParticipantMessageIter part, end = local_participant_messages_.end();
01861   for (part = local_participant_messages_.begin(); part != end; ++part) {
01862     write_participant_message_data(part->first, part->second, reader);
01863   }
01864   participant_message_writer_.end_historic_samples(reader);
01865 }
01866 
01867 DDS::ReturnCode_t
01868 Sedp::write_publication_data(
01869     const RepoId& rid,
01870     LocalPublication& lp,
01871     const DCPS::RepoId& reader)
01872 {
01873   DDS::ReturnCode_t result = DDS::RETCODE_OK;
01874   if (spdp_.associated() && (reader != GUID_UNKNOWN ||
01875                              !associated_participants_.empty())) {
01876     OpenDDS::DCPS::DiscoveredWriterData dwd;
01877     ParameterList plist;
01878     populate_discovered_writer_msg(dwd, rid, lp);
01879 
01880     // Convert to parameter list
01881     if (ParameterListConverter::to_param_list(dwd, plist, map_ipv4_to_ipv6())) {
01882       ACE_DEBUG((LM_ERROR,
01883                  ACE_TEXT("(%P|%t) ERROR: Sedp::write_publication_data - ")
01884                  ACE_TEXT("Failed to convert DiscoveredWriterData ")
01885                  ACE_TEXT(" to ParameterList\n")));
01886       result = DDS::RETCODE_ERROR;
01887     }
01888     if (DDS::RETCODE_OK == result) {
01889       result = publications_writer_.write_sample(plist, reader, lp.sequence_);
01890     }
01891   } else if (DCPS::DCPS_debug_level > 3) {
01892     ACE_DEBUG((LM_INFO, ACE_TEXT("(%P|%t) Sedp::write_publication_data - ")
01893                         ACE_TEXT("not currently associated, dropping msg.\n")));
01894   }
01895   return result;
01896 }
01897 
01898 DDS::ReturnCode_t
01899 Sedp::write_subscription_data(
01900     const RepoId& rid,
01901     LocalSubscription& ls,
01902     const DCPS::RepoId& reader)
01903 {
01904   DDS::ReturnCode_t result = DDS::RETCODE_OK;
01905   if (spdp_.associated() && (reader != GUID_UNKNOWN ||
01906                              !associated_participants_.empty())) {
01907     OpenDDS::DCPS::DiscoveredReaderData drd;
01908     ParameterList plist;
01909     populate_discovered_reader_msg(drd, rid, ls);
01910 
01911     // Convert to parameter list
01912     if (ParameterListConverter::to_param_list(drd, plist, map_ipv4_to_ipv6())) {
01913       ACE_DEBUG((LM_ERROR,
01914                  ACE_TEXT("(%P|%t) ERROR: Sedp::write_subscription_data - ")
01915                  ACE_TEXT("Failed to convert DiscoveredReaderData ")
01916                  ACE_TEXT("to ParameterList\n")));
01917       result = DDS::RETCODE_ERROR;
01918     }
01919     if (DDS::RETCODE_OK == result) {
01920       result = subscriptions_writer_.write_sample(plist, reader, ls.sequence_);
01921     }
01922   } else if (DCPS::DCPS_debug_level > 3) {
01923     ACE_DEBUG((LM_INFO, ACE_TEXT("(%P|%t) Sedp::write_subscription_data - ")
01924                         ACE_TEXT("not currently associated, dropping msg.\n")));
01925   }
01926   return result;
01927 }
01928 
01929 DDS::ReturnCode_t
01930 Sedp::write_participant_message_data(
01931     const RepoId& rid,
01932     LocalParticipantMessage& pm,
01933     const DCPS::RepoId& reader)
01934 {
01935   DDS::ReturnCode_t result = DDS::RETCODE_OK;
01936   if (spdp_.associated() && (reader != GUID_UNKNOWN ||
01937                              !associated_participants_.empty())) {
01938     ParticipantMessageData pmd;
01939     pmd.participantGuid = rid;
01940     result = participant_message_writer_.write_sample(pmd, reader, pm.sequence_);
01941   } else if (DCPS::DCPS_debug_level > 3) {
01942     ACE_DEBUG((LM_INFO, ACE_TEXT("(%P|%t) Sedp::write_participant_message_data - ")
01943                ACE_TEXT("not currently associated, dropping msg.\n")));
01944   }
01945   return result;
01946 }
01947 
01948 void
01949 Sedp::set_inline_qos(DCPS::TransportLocatorSeq& locators)
01950 {
01951   const OPENDDS_STRING rtps_udp = "rtps_udp";
01952   for (CORBA::ULong i = 0; i < locators.length(); ++i) {
01953     if (locators[i].transport_type.in() == rtps_udp) {
01954       const CORBA::ULong len = locators[i].data.length();
01955       locators[i].data.length(len + 1);
01956       locators[i].data[len] = CORBA::Octet(1);
01957     }
01958   }
01959 }
01960 
01961 void
01962 Sedp::acknowledge()
01963 {
01964   task_.acknowledge();
01965 }
01966 
01967 void
01968 Sedp::Task::enqueue(const SPDPdiscoveredParticipantData* pdata)
01969 {
01970   if (spdp_->shutting_down()) { return; }
01971   putq(new Msg(Msg::MSG_PARTICIPANT, DCPS::SAMPLE_DATA, pdata));
01972 }
01973 
01974 void
01975 Sedp::Task::enqueue(DCPS::MessageId id, const OpenDDS::DCPS::DiscoveredWriterData* wdata)
01976 {
01977   if (spdp_->shutting_down()) { return; }
01978   putq(new Msg(Msg::MSG_WRITER, id, wdata));
01979 }
01980 
01981 void
01982 Sedp::Task::enqueue(DCPS::MessageId id, const OpenDDS::DCPS::DiscoveredReaderData* rdata)
01983 {
01984   if (spdp_->shutting_down()) { return; }
01985   putq(new Msg(Msg::MSG_READER, id, rdata));
01986 }
01987 
01988 void
01989 Sedp::Task::enqueue(DCPS::MessageId id, const ParticipantMessageData* data)
01990 {
01991   if (spdp_->shutting_down()) { return; }
01992   putq(new Msg(Msg::MSG_PARTICIPANT_DATA, id, data));
01993 }
01994 
01995 void
01996 Sedp::Task::enqueue(Msg::MsgType which_bit, const DDS::InstanceHandle_t bit_ih)
01997 {
01998 #ifndef DDS_HAS_MINIMUM_BIT
01999   if (spdp_->shutting_down()) { return; }
02000   putq(new Msg(which_bit, DCPS::DISPOSE_INSTANCE, bit_ih));
02001 #else
02002   ACE_UNUSED_ARG(which_bit);
02003   ACE_UNUSED_ARG(bit_ih);
02004 #endif /* DDS_HAS_MINIMUM_BIT */
02005 }
02006 
02007 int
02008 Sedp::Task::svc()
02009 {
02010   for (Msg* msg = 0; getq(msg) != -1; /*no increment*/) {
02011     if (DCPS::DCPS_debug_level > 5) {
02012       ACE_DEBUG((LM_DEBUG, "(%P|%t) Sedp::Task::svc "
02013         "got message from queue type %d\n", msg->type_));
02014     }
02015     ACE_Auto_Basic_Ptr<Msg> delete_the_msg(msg);
02016     switch (msg->type_) {
02017     case Msg::MSG_PARTICIPANT:
02018       svc_i(msg->dpdata_);
02019       break;
02020     case Msg::MSG_WRITER:
02021       svc_i(msg->id_, msg->wdata_);
02022       break;
02023     case Msg::MSG_READER:
02024       svc_i(msg->id_, msg->rdata_);
02025       break;
02026     case Msg::MSG_PARTICIPANT_DATA:
02027       svc_i(msg->id_, msg->pmdata_);
02028       break;
02029     case Msg::MSG_REMOVE_FROM_PUB_BIT:
02030     case Msg::MSG_REMOVE_FROM_SUB_BIT:
02031       svc_i(msg->type_, msg->ih_);
02032       break;
02033     case Msg::MSG_FINI_BIT:
02034       // acknowledge that fini_bit has been called (this just ensures that
02035       // this task is not in the act of using one of BIT Subscriber's Data
02036       // Readers while it is being deleted
02037       spdp_->wait_for_acks().ack();
02038       break;
02039     case Msg::MSG_STOP:
02040       if (DCPS::DCPS_debug_level > 3) {
02041         ACE_DEBUG((LM_INFO, ACE_TEXT("(%P|%t) Sedp::Task::svc - ")
02042                             ACE_TEXT("received MSG_STOP. Task exiting\n")));
02043       }
02044       return 0;
02045     }
02046     if (DCPS::DCPS_debug_level > 5) {
02047       ACE_DEBUG((LM_DEBUG, "(%P|%t) Sedp::Task::svc done with message\n"));
02048     }
02049   }
02050   if (DCPS::DCPS_debug_level > 3) {
02051     ACE_DEBUG((LM_INFO, ACE_TEXT("(%P|%t) Sedp::Task::svc - ")
02052                         ACE_TEXT("Task exiting.\n")));
02053   }
02054   return 0;
02055 }
02056 
02057 Sedp::Task::~Task()
02058 {
02059   shutdown();
02060 }
02061 
02062 bool
02063 Sedp::shutting_down() const
02064 {
02065   return spdp_.shutting_down();
02066 }
02067 
02068 void
02069 Sedp::populate_transport_locator_sequence(DCPS::TransportLocatorSeq*& rTls,
02070                                           DiscoveredSubscriptionIter& dsi,
02071                                           const DCPS::RepoId& reader)
02072 {
02073   OpenDDS::DCPS::LocatorSeq locs;
02074   bool participantExpectsInlineQos = false;
02075   RepoId remote_participant = reader;
02076   remote_participant.entityId = ENTITYID_PARTICIPANT;
02077   const bool participant_found =
02078     spdp_.get_default_locators(remote_participant, locs,
02079                                participantExpectsInlineQos);
02080   if (!rTls->length()) {     // if no locators provided, add the default
02081     if (!participant_found) {
02082       defer_match_endpoints_.insert(reader);
02083       return;
02084     } else if (locs.length()) {
02085       size_t size = 0, padding = 0;
02086       DCPS::gen_find_size(locs, size, padding);
02087 
02088       ACE_Message_Block mb_locator(size + 1);   // Add space for boolean
02089       using DCPS::Serializer;
02090       Serializer ser_loc(&mb_locator,
02091                          ACE_CDR_BYTE_ORDER,
02092                          Serializer::ALIGN_CDR);
02093       ser_loc << locs;
02094       const bool readerExpectsInlineQos =
02095         dsi->second.reader_data_.readerProxy.expectsInlineQos;
02096       ser_loc << ACE_OutputCDR::from_boolean(participantExpectsInlineQos
02097                                              || readerExpectsInlineQos);
02098 
02099       DCPS::TransportLocator tl;
02100       tl.transport_type = "rtps_udp";
02101       message_block_to_sequence (mb_locator, tl.data);
02102       rTls->length(1);
02103       (*rTls)[0] = tl;
02104     } else {
02105       ACE_DEBUG((LM_WARNING,
02106                  ACE_TEXT("(%P|%t) Sedp::match - ")
02107                  ACE_TEXT("remote reader found with no locators ")
02108                  ACE_TEXT("and no default locators\n")));
02109     }
02110   }
02111 }
02112 
02113 void
02114 Sedp::populate_transport_locator_sequence(DCPS::TransportLocatorSeq*& wTls,
02115                                           DiscoveredPublicationIter& /*dpi*/,
02116                                           const DCPS::RepoId& writer)
02117 {
02118   OpenDDS::DCPS::LocatorSeq locs;
02119   bool participantExpectsInlineQos = false;
02120   RepoId remote_participant = writer;
02121   remote_participant.entityId = ENTITYID_PARTICIPANT;
02122   const bool participant_found =
02123     spdp_.get_default_locators(remote_participant, locs,
02124                                participantExpectsInlineQos);
02125   if (!wTls->length()) {     // if no locators provided, add the default
02126     if (!participant_found) {
02127       defer_match_endpoints_.insert(writer);
02128       return;
02129     } else if (locs.length()) {
02130       size_t size = 0, padding = 0;
02131       DCPS::gen_find_size(locs, size, padding);
02132 
02133       ACE_Message_Block mb_locator(size + 1);   // Add space for boolean
02134       using DCPS::Serializer;
02135       Serializer ser_loc(&mb_locator,
02136                          ACE_CDR_BYTE_ORDER,
02137                          Serializer::ALIGN_CDR);
02138       ser_loc << locs;
02139       ser_loc << ACE_OutputCDR::from_boolean(participantExpectsInlineQos);
02140 
02141       DCPS::TransportLocator tl;
02142       tl.transport_type = "rtps_udp";
02143       message_block_to_sequence (mb_locator, tl.data);
02144       wTls->length(1);
02145       (*wTls)[0] = tl;
02146     } else {
02147       ACE_DEBUG((LM_WARNING,
02148                  ACE_TEXT("(%P|%t) Sedp::match - ")
02149                  ACE_TEXT("remote writer found with no locators ")
02150                  ACE_TEXT("and no default locators\n")));
02151     }
02152   }
02153 }
02154 
02155 bool
02156 Sedp::defer_writer(const RepoId& writer,
02157                    const RepoId& writer_participant)
02158 {
02159   if (!associated_participants_.count(writer_participant)) {
02160     if (DCPS::DCPS_debug_level > 3) {
02161       ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) Sedp::match - ")
02162                  ACE_TEXT("remote writer deferred\n")));
02163     }
02164     defer_match_endpoints_.insert(writer);
02165     return true;
02166   }
02167   return false;
02168 }
02169 
02170 bool
02171 Sedp::defer_reader(const RepoId& reader,
02172                    const RepoId& reader_participant)
02173 {
02174   if (!associated_participants_.count(reader_participant)) {
02175     if (DCPS::DCPS_debug_level > 3) {
02176       ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) Sedp::match - ")
02177                  ACE_TEXT("remote reader deferred\n")));
02178     }
02179     defer_match_endpoints_.insert(reader);
02180     return true;
02181   }
02182   return false;
02183 }
02184 
02185 WaitForAcks::WaitForAcks()
02186 : cond_(lock_)
02187 , acks_(0)
02188 {
02189 }
02190 
02191 void
02192 WaitForAcks::ack()
02193 {
02194   {
02195     ACE_GUARD(ACE_Thread_Mutex, g, lock_);
02196     ++acks_;
02197   }
02198   cond_.signal();
02199 }
02200 
02201 void
02202 WaitForAcks::wait_for_acks(unsigned int num_acks)
02203 {
02204   ACE_GUARD(ACE_Thread_Mutex, g, lock_);
02205   while (num_acks > acks_) {
02206     cond_.wait();
02207   }
02208 }
02209 
02210 void
02211 WaitForAcks::reset()
02212 {
02213   ACE_GUARD(ACE_Thread_Mutex, g, lock_);
02214   acks_ = 0;
02215   // no need to signal, going back to zero won't ever
02216   // cause wait_for_acks() to exit it's loop
02217 }
02218 
02219 }
02220 }

Generated on Fri Feb 12 20:05:26 2016 for OpenDDS by  doxygen 1.4.7