Spdp.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "Spdp.h"
00009 #include "BaseMessageTypes.h"
00010 #include "MessageTypes.h"
00011 #include "RtpsCoreTypeSupportImpl.h"
00012 #include "ParameterListConverter.h"
00013 #include "RtpsDiscovery.h"
00014 
00015 #include "dds/DdsDcpsGuidC.h"
00016 
00017 #include "dds/DCPS/Service_Participant.h"
00018 #include "dds/DCPS/BuiltInTopicUtils.h"
00019 #include "dds/DCPS/GuidConverter.h"
00020 #include "dds/DCPS/Qos_Helper.h"
00021 
00022 #include "ace/Reactor.h"
00023 #include "ace/OS_NS_sys_socket.h" // For setsockopt()
00024 
00025 #include <cstring>
00026 #include <stdexcept>
00027 
00028 namespace OpenDDS {
00029 namespace RTPS {
00030 using DCPS::RepoId;
00031 
00032 namespace {
00033   // Multiplier for resend period -> lease duration conversion,
00034   // if a remote discovery misses this many resends from us it will consider
00035   // us offline / unreachable.
00036   const int LEASE_MULT = 10;
00037   const CORBA::UShort encap_LE = 0x0300; // {PL_CDR_LE} in LE
00038   const CORBA::UShort encap_BE = 0x0200; // {PL_CDR_BE} in LE
00039 
00040   bool disposed(const ParameterList& inlineQos)
00041   {
00042     for (CORBA::ULong i = 0; i < inlineQos.length(); ++i) {
00043       if (inlineQos[i]._d() == PID_STATUS_INFO) {
00044         return inlineQos[i].status_info().value[3] & 1;
00045       }
00046     }
00047     return false;
00048   }
00049 }
00050 
00051 
00052 Spdp::Spdp(DDS::DomainId_t domain, RepoId& guid,
00053            const DDS::DomainParticipantQos& qos, RtpsDiscovery* disco)
00054   : OpenDDS::DCPS::LocalParticipant<Sedp>(qos)
00055   , disco_(disco), domain_(domain), guid_(guid)
00056   , tport_(new SpdpTransport(this)), eh_(tport_), eh_shutdown_(false)
00057   , shutdown_cond_(lock_), shutdown_flag_(0), sedp_(guid_, *this, lock_)
00058 {
00059   ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00060   guid = guid_; // may have changed in SpdpTransport constructor
00061   sedp_.ignore(guid);
00062   sedp_.init(guid_, *disco, domain_);
00063 
00064   // Append metatraffic unicast locator
00065   sedp_.unicast_locators(sedp_unicast_);
00066 
00067   if (disco->sedp_multicast()) { // Append metatraffic multicast locator
00068     const ACE_INET_Addr& mc_addr = sedp_.multicast_group();
00069     OpenDDS::DCPS::Locator_t mc_locator;
00070     mc_locator.kind = address_to_kind(mc_addr);
00071     mc_locator.port = mc_addr.get_port_number();
00072     address_to_bytes(mc_locator.address, mc_addr);
00073     sedp_multicast_.length(1);
00074     sedp_multicast_[0] = mc_locator;
00075   }
00076 }
00077 
00078 Spdp::~Spdp()
00079 {
00080   shutdown_flag_ = 1;
00081   {
00082     ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00083     if (DCPS::DCPS_debug_level > 3) {
00084       ACE_DEBUG((LM_INFO,
00085                  ACE_TEXT("(%P|%t) Spdp::~Spdp ")
00086                  ACE_TEXT("remove discovered participants\n")));
00087     }
00088     // Iterate through a copy of the repo Ids, rather than the map
00089     //   as it gets unlocked in remove_discovered_participant()
00090     DCPS::RepoIdSet participant_ids;
00091     get_discovered_participant_ids(participant_ids);
00092     for (DCPS::RepoIdSet::iterator participant_id = participant_ids.begin();
00093          participant_id != participant_ids.end();
00094          ++participant_id)
00095     {
00096       DiscoveredParticipantIter part = participants_.find(*participant_id);
00097       if (part != participants_.end()) {
00098         remove_discovered_participant(part);
00099       }
00100     }
00101   }
00102 
00103   // ensure sedp's task queue is drained before data members are being
00104   // deleted
00105   sedp_.shutdown();
00106 
00107   // release lock for reset of event handler, which may delete transport
00108   tport_->close();
00109   eh_.reset();
00110   {
00111     ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00112     while (!eh_shutdown_) {
00113       shutdown_cond_.wait();
00114     }
00115   }
00116 }
00117 
00118 void
00119 Spdp::data_received(const DataSubmessage& data, const ParameterList& plist)
00120 {
00121   if (shutdown_flag_.value()) { return; }
00122 
00123   const ACE_Time_Value time = ACE_OS::gettimeofday();
00124   SPDPdiscoveredParticipantData pdata;
00125   if (ParameterListConverter::from_param_list(plist, pdata) < 0) {
00126     ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: Spdp::data_received - ")
00127       ACE_TEXT("failed to convert from ParameterList to ")
00128       ACE_TEXT("SPDPdiscoveredParticipantData\n")));
00129     return;
00130   }
00131 
00132   DCPS::RepoId guid;
00133   std::memcpy(guid.guidPrefix, pdata.participantProxy.guidPrefix,
00134               sizeof(guid.guidPrefix));
00135   guid.entityId = OpenDDS::DCPS::ENTITYID_PARTICIPANT;
00136 
00137   ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00138   if (sedp_.ignoring(guid)) {
00139     // Ignore, this is our domain participant or one that the user has
00140     // asked us to ignore.
00141     return;
00142   }
00143 
00144   // Find the participant - iterator valid only as long as we hold the lock
00145   DiscoveredParticipantIter iter = participants_.find(guid);
00146 
00147   // Must unlock when calling into part_bit() as it may call back into us
00148   ACE_Reverse_Lock<ACE_Thread_Mutex> rev_lock(lock_);
00149 
00150   if (iter == participants_.end()) {
00151     // copy guid prefix (octet[12]) into BIT key (long[3])
00152     std::memcpy(pdata.ddsParticipantData.key.value,
00153                 pdata.participantProxy.guidPrefix,
00154                 sizeof(pdata.ddsParticipantData.key.value));
00155 
00156     if (DCPS::DCPS_debug_level) {
00157       DCPS::GuidConverter local(guid_), remote(guid);
00158       ACE_DEBUG((LM_DEBUG,
00159         ACE_TEXT("(%P|%t) Spdp::data_received - %C discovered %C lease %ds\n"),
00160         OPENDDS_STRING(local).c_str(), OPENDDS_STRING(remote).c_str(),
00161         pdata.leaseDuration.seconds));
00162     }
00163 
00164     // add a new participant
00165     participants_[guid] = DiscoveredParticipant(pdata, time);
00166     DDS::InstanceHandle_t bit_instance_handle = DDS::HANDLE_NIL;
00167 #ifndef DDS_HAS_MINIMUM_BIT
00168     DDS::ParticipantBuiltinTopicDataDataReaderImpl* bit = part_bit();
00169     if (bit) {
00170       ACE_GUARD(ACE_Reverse_Lock<ACE_Thread_Mutex>, rg, rev_lock);
00171       bit_instance_handle =
00172         bit->store_synthetic_data(pdata.ddsParticipantData,
00173                                   DDS::NEW_VIEW_STATE);
00174     }
00175 #endif /* DDS_HAS_MINIMUM_BIT */
00176 
00177     // notify Sedp of association
00178     // Sedp may call has_discovered_participant.
00179     // This is what the participant must be added before this call to associate.
00180     sedp_.associate(pdata);
00181 
00182     // Since we've just seen a new participant, let's send out our
00183     // own announcement, so they don't have to wait.
00184     this->tport_->write_i();
00185 
00186     // Iterator is no longer valid
00187     iter = participants_.find(guid);
00188     if (iter != participants_.end()) {
00189       iter->second.bit_ih_ = bit_instance_handle;
00190     }
00191 
00192   } else if (data.inlineQos.length() && disposed(data.inlineQos)) {
00193     remove_discovered_participant(iter);
00194 
00195   } else {
00196     // update an existing participant
00197     pdata.ddsParticipantData.key = iter->second.pdata_.ddsParticipantData.key;
00198 #ifndef OPENDDS_SAFETY_PROFILE
00199     using OpenDDS::DCPS::operator!=;
00200 #endif
00201     if (iter->second.pdata_.ddsParticipantData.user_data !=
00202         pdata.ddsParticipantData.user_data) {
00203       iter->second.pdata_.ddsParticipantData.user_data =
00204         pdata.ddsParticipantData.user_data;
00205 #ifndef DDS_HAS_MINIMUM_BIT
00206       DDS::ParticipantBuiltinTopicDataDataReaderImpl* bit = part_bit();
00207       if (bit) {
00208         ACE_GUARD(ACE_Reverse_Lock<ACE_Thread_Mutex>, rg, rev_lock);
00209         bit->store_synthetic_data(pdata.ddsParticipantData,
00210                                   DDS::NOT_NEW_VIEW_STATE);
00211       }
00212 #endif /* DDS_HAS_MINIMUM_BIT */
00213       // Perform search again, so iterator becomes valid
00214       iter = participants_.find(guid);
00215     }
00216     // Participant may have been removed while lock released
00217     if (iter != participants_.end()) {
00218       iter->second.pdata_ = pdata;
00219       iter->second.last_seen_ = time;
00220     }
00221   }
00222 }
00223 
00224 void
00225 Spdp::remove_expired_participants()
00226 {
00227   // Find and remove any expired discovered participant
00228   ACE_GUARD (ACE_Thread_Mutex, g, lock_);
00229   // Iterate through a copy of the repo Ids, rather than the map
00230   //   as it gets unlocked in remove_discovered_participant()
00231   DCPS::RepoIdSet participant_ids;
00232   get_discovered_participant_ids(participant_ids);
00233   for (DCPS::RepoIdSet::iterator participant_id = participant_ids.begin();
00234        participant_id != participant_ids.end();
00235        ++participant_id)
00236   {
00237     DiscoveredParticipantIter part = participants_.find(*participant_id);
00238     if (part != participants_.end()) {
00239       if (part->second.last_seen_ <
00240           ACE_OS::gettimeofday() -
00241           ACE_Time_Value(part->second.pdata_.leaseDuration.seconds)) {
00242         if (DCPS::DCPS_debug_level > 1) {
00243           DCPS::GuidConverter conv(part->first);
00244           ACE_DEBUG((LM_WARNING,
00245             ACE_TEXT("(%P|%t) Spdp::remove_expired_participants() - ")
00246             ACE_TEXT("participant %C exceeded lease duration, removing\n"),
00247             OPENDDS_STRING(conv).c_str()));
00248         }
00249         remove_discovered_participant(part);
00250       }
00251     }
00252   }
00253 }
00254 
00255 void
00256 Spdp::init_bit(const DDS::Subscriber_var& bit_subscriber)
00257 {
00258   bit_subscriber_ = bit_subscriber;
00259   tport_->open();
00260 }
00261 
00262 void
00263 Spdp::fini_bit()
00264 {
00265   bit_subscriber_ = 0;
00266   wait_for_acks_.reset();
00267   // request for SpdpTransport(actually Reactor) thread and Sedp::Task
00268   // to acknowledge
00269   tport_->acknowledge();
00270   sedp_.acknowledge();
00271   // wait for the 2 acknowledgements
00272   wait_for_acks_.wait_for_acks(2);
00273 }
00274 
00275 #ifndef DDS_HAS_MINIMUM_BIT
00276 DDS::ParticipantBuiltinTopicDataDataReaderImpl*
00277 Spdp::part_bit()
00278 {
00279   if (!bit_subscriber_.in())
00280     return 0;
00281 
00282   DDS::DataReader_var d =
00283     bit_subscriber_->lookup_datareader(DCPS::BUILT_IN_PARTICIPANT_TOPIC);
00284   return dynamic_cast<DDS::ParticipantBuiltinTopicDataDataReaderImpl*>(d.in());
00285 }
00286 #endif /* DDS_HAS_MINIMUM_BIT */
00287 
00288 ACE_Reactor*
00289 Spdp::reactor() const
00290 {
00291   return disco_->reactor();
00292 }
00293 
00294 WaitForAcks&
00295 Spdp::wait_for_acks()
00296 {
00297   return wait_for_acks_;
00298 }
00299 
00300 Spdp::SpdpTransport::SpdpTransport(Spdp* outer)
00301   : outer_(outer), lease_duration_(outer_->disco_->resend_period() * LEASE_MULT)
00302   , buff_(64 * 1024)
00303   , wbuff_(64 * 1024)
00304 {
00305   hdr_.prefix[0] = 'R';
00306   hdr_.prefix[1] = 'T';
00307   hdr_.prefix[2] = 'P';
00308   hdr_.prefix[3] = 'S';
00309   hdr_.version = PROTOCOLVERSION;
00310   hdr_.vendorId = VENDORID_OPENDDS;
00311   std::memcpy(hdr_.guidPrefix, outer_->guid_.guidPrefix, sizeof(GuidPrefix_t));
00312   data_.smHeader.submessageId = DATA;
00313   data_.smHeader.flags = 1 /*FLAG_E*/ | 4 /*FLAG_D*/;
00314   data_.smHeader.submessageLength = 0; // last submessage in the Message
00315   data_.extraFlags = 0;
00316   data_.octetsToInlineQos = DATA_OCTETS_TO_IQOS;
00317   data_.readerId = ENTITYID_UNKNOWN;
00318   data_.writerId = ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER;
00319   data_.writerSN.high = 0;
00320   data_.writerSN.low = 0;
00321 
00322   // Ports are set by the formulas in RTPS v2.1 Table 9.8
00323   const u_short port_common = outer_->disco_->pb() +
00324                               (outer_->disco_->dg() * outer_->domain_),
00325     mc_port = port_common + outer_->disco_->d0();
00326 
00327   u_short participantId = (hdr_.guidPrefix[10] << 8) | hdr_.guidPrefix[11];
00328 
00329 #ifdef OPENDDS_SAFETY_PROFILE
00330   const u_short startingParticipantId = participantId;
00331 #endif
00332 
00333   while (!open_unicast_socket(port_common, participantId)) {
00334     ++participantId;
00335   }
00336 
00337 #ifdef OPENDDS_SAFETY_PROFILE
00338   if (participantId > startingParticipantId && ACE_OS::getpid() == -1) {
00339     // Since pids are not available, use the fact that we had to increment
00340     // participantId to modify the GUID's pid bytes.  This avoids GUID conflicts
00341     // between processes on the same host which start at the same time
00342     // (resulting in the same seed value for the random number generator).
00343     hdr_.guidPrefix[8] = static_cast<CORBA::Octet>(participantId >> 8);
00344     hdr_.guidPrefix[9] = static_cast<CORBA::Octet>(participantId & 0xFF);
00345     outer_->guid_.guidPrefix[8] = hdr_.guidPrefix[8];
00346     outer_->guid_.guidPrefix[9] = hdr_.guidPrefix[9];
00347   }
00348 #endif
00349 
00350   OPENDDS_STRING mc_addr = outer_->disco_->default_multicast_group();
00351   ACE_INET_Addr default_multicast;
00352   if (0 != default_multicast.set(mc_port, mc_addr.c_str())) {
00353     ACE_DEBUG((
00354           LM_ERROR,
00355           ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::SpdpTransport() - ")
00356           ACE_TEXT("failed setting default_multicast address %C:%hd %p\n"),
00357           mc_addr.c_str(), mc_port, ACE_TEXT("ACE_INET_Addr::set")));
00358     throw std::runtime_error("failed to set default_multicast address");
00359   }
00360 
00361   const OPENDDS_STRING& net_if = outer_->disco_->multicast_interface();
00362 
00363   if (DCPS::DCPS_debug_level > 3) {
00364     ACE_DEBUG((LM_INFO,
00365                ACE_TEXT("(%P|%t) Spdp::SpdpTransport::SpdpTransport ")
00366                ACE_TEXT("joining group %C %C:%hd\n"),
00367                net_if.c_str (),
00368                mc_addr.c_str (),
00369                mc_port));
00370   }
00371 
00372 #ifdef ACE_HAS_MAC_OSX
00373   multicast_socket_.opts(ACE_SOCK_Dgram_Mcast::OPT_BINDADDR_NO |
00374                          ACE_SOCK_Dgram_Mcast::DEFOPT_NULLIFACE);
00375 #endif
00376 
00377   if (0 != multicast_socket_.join(default_multicast, 1,
00378                                   net_if.empty() ? 0 :
00379                                   ACE_TEXT_CHAR_TO_TCHAR(net_if.c_str()))) {
00380     ACE_DEBUG((LM_ERROR,
00381         ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::SpdpTransport() - ")
00382         ACE_TEXT("failed to join multicast group %C:%hd %p\n"),
00383         mc_addr.c_str(), mc_port, ACE_TEXT("ACE_SOCK_Dgram_Mcast::join")));
00384     throw std::runtime_error("failed to join multicast group");
00385   }
00386 
00387   send_addrs_.insert(default_multicast);
00388 
00389   typedef RtpsDiscovery::AddrVec::iterator iter;
00390   for (iter it = outer_->disco_->spdp_send_addrs().begin(),
00391        end = outer_->disco_->spdp_send_addrs().end(); it != end; ++it) {
00392     send_addrs_.insert(ACE_INET_Addr(it->c_str()));
00393   }
00394 
00395   reference_counting_policy().value(Reference_Counting_Policy::ENABLED);
00396 }
00397 
00398 void
00399 Spdp::SpdpTransport::open()
00400 {
00401   ACE_Reactor* reactor = outer_->reactor();
00402   if (reactor->register_handler(unicast_socket_.get_handle(),
00403                                 this, ACE_Event_Handler::READ_MASK) != 0) {
00404     throw std::runtime_error("failed to register unicast input handler");
00405   }
00406 
00407   if (reactor->register_handler(multicast_socket_.get_handle(),
00408                                 this, ACE_Event_Handler::READ_MASK) != 0) {
00409     throw std::runtime_error("failed to register multicast input handler");
00410   }
00411 
00412   const ACE_Time_Value per = outer_->disco_->resend_period();
00413   if (-1 == reactor->schedule_timer(this, 0, ACE_Time_Value(0), per)) {
00414     throw std::runtime_error("failed to schedule timer with reactor");
00415   }
00416 }
00417 
00418 Spdp::SpdpTransport::~SpdpTransport()
00419 {
00420   if (DCPS::DCPS_debug_level > 3) {
00421     ACE_DEBUG((LM_INFO, ACE_TEXT("(%P|%t) SpdpTransport::~SpdpTransport\n")));
00422   }
00423   dispose_unregister();
00424   {
00425     // Acquire lock for modification of condition variable
00426     ACE_GUARD(ACE_Thread_Mutex, g, outer_->lock_);
00427     outer_->eh_shutdown_ = true;
00428   }
00429   outer_->shutdown_cond_.signal();
00430   unicast_socket_.close();
00431   multicast_socket_.close();
00432 }
00433 
00434 void
00435 Spdp::SpdpTransport::dispose_unregister()
00436 {
00437   // Send the dispose/unregister SPDP sample
00438   data_.writerSN.high = seq_.getHigh();
00439   data_.writerSN.low = seq_.getLow();
00440   data_.smHeader.flags = 1 /*FLAG_E*/ | 2 /*FLAG_Q*/ | 8 /*FLAG_K*/;
00441   data_.inlineQos.length(1);
00442   static const StatusInfo_t dispose_unregister = { {0, 0, 0, 3} };
00443   data_.inlineQos[0].status_info(dispose_unregister);
00444 
00445   ParameterList plist(1);
00446   plist.length(1);
00447   plist[0].guid(outer_->guid_);
00448   plist[0]._d(PID_PARTICIPANT_GUID);
00449 
00450   wbuff_.reset();
00451   DCPS::Serializer ser(&wbuff_, false, DCPS::Serializer::ALIGN_CDR);
00452   CORBA::UShort options = 0;
00453   if (!(ser << hdr_) || !(ser << data_) || !(ser << encap_LE) || !(ser << options)
00454       || !(ser << plist)) {
00455     ACE_ERROR((LM_ERROR,
00456       ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::dispose_unregister() - ")
00457       ACE_TEXT("failed to serialize headers for dispose/unregister\n")));
00458     return;
00459   }
00460 
00461   typedef OPENDDS_SET(ACE_INET_Addr)::const_iterator iter_t;
00462   for (iter_t iter = send_addrs_.begin(); iter != send_addrs_.end(); ++iter) {
00463     const ssize_t res =
00464       unicast_socket_.send(wbuff_.rd_ptr(), wbuff_.length(), *iter);
00465     if (res < 0) {
00466       ACE_TCHAR addr_buff[256] = {};
00467       iter->addr_to_string(addr_buff, 256, 0);
00468       ACE_ERROR((LM_ERROR,
00469         ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::dispose_unregister() - ")
00470         ACE_TEXT("destination %s failed %p\n"), addr_buff, ACE_TEXT("send")));
00471     }
00472   }
00473 }
00474 
00475 void
00476 Spdp::SpdpTransport::close()
00477 {
00478   if (DCPS::DCPS_debug_level > 3) {
00479     ACE_DEBUG((LM_INFO, ACE_TEXT("(%P|%t) SpdpTransport::close\n")));
00480   }
00481   ACE_Reactor* reactor = outer_->reactor();
00482   reactor->cancel_timer(this);
00483   const ACE_Reactor_Mask mask =
00484     ACE_Event_Handler::READ_MASK | ACE_Event_Handler::DONT_CALL;
00485   reactor->remove_handler(multicast_socket_.get_handle(), mask);
00486   reactor->remove_handler(unicast_socket_.get_handle(), mask);
00487 }
00488 
00489 void
00490 Spdp::SpdpTransport::write()
00491 {
00492   ACE_GUARD(ACE_Thread_Mutex, g, outer_->lock_);
00493   write_i();
00494 }
00495 
00496 void
00497 Spdp::SpdpTransport::write_i()
00498 {
00499   static const BuiltinEndpointSet_t availableBuiltinEndpoints =
00500     DISC_BUILTIN_ENDPOINT_PARTICIPANT_ANNOUNCER |
00501     DISC_BUILTIN_ENDPOINT_PARTICIPANT_DETECTOR |
00502     DISC_BUILTIN_ENDPOINT_PUBLICATION_ANNOUNCER |
00503     DISC_BUILTIN_ENDPOINT_PUBLICATION_DETECTOR |
00504     DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_ANNOUNCER |
00505     DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_DETECTOR |
00506     BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_DATA_WRITER |
00507     BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_DATA_READER
00508     ;
00509   // The RTPS spec has no constants for the builtinTopics{Writer,Reader}
00510 
00511   // This locator list should not be empty, but we won't actually be using it.
00512   // The OpenDDS publication/subscription data will have locators included.
00513   OpenDDS::DCPS::LocatorSeq nonEmptyList(1);
00514   nonEmptyList.length(1);
00515   nonEmptyList[0].kind = LOCATOR_KIND_UDPv4;
00516   nonEmptyList[0].port = 12345;
00517   std::memset(nonEmptyList[0].address, 0, 12);
00518   nonEmptyList[0].address[12] = 127;
00519   nonEmptyList[0].address[13] = 0;
00520   nonEmptyList[0].address[14] = 0;
00521   nonEmptyList[0].address[15] = 1;
00522 
00523   data_.writerSN.high = seq_.getHigh();
00524   data_.writerSN.low = seq_.getLow();
00525   ++seq_;
00526 
00527   const GuidPrefix_t& gp = outer_->guid_.guidPrefix;
00528 
00529   const SPDPdiscoveredParticipantData pdata = {
00530     { // ParticipantBuiltinTopicData
00531       DDS::BuiltinTopicKey_t() /*ignored*/,
00532       outer_->qos_.user_data
00533     },
00534     { // ParticipantProxy_t
00535       PROTOCOLVERSION,
00536       {gp[0], gp[1], gp[2], gp[3], gp[4], gp[5],
00537        gp[6], gp[7], gp[8], gp[9], gp[10], gp[11]},
00538       VENDORID_OPENDDS,
00539       false /*expectsIQoS*/,
00540       availableBuiltinEndpoints,
00541       outer_->sedp_unicast_,
00542       outer_->sedp_multicast_,
00543       nonEmptyList /*defaultMulticastLocatorList*/,
00544       nonEmptyList /*defaultUnicastLocatorList*/,
00545       {0 /*manualLivelinessCount*/}   //FUTURE: implement manual liveliness
00546     },
00547     { // Duration_t (leaseDuration)
00548       static_cast<CORBA::Long>(lease_duration_.sec()),
00549       0 // we are not supporting fractional seconds in the lease duration
00550     }
00551   };
00552 
00553   ParameterList plist;
00554   if (ParameterListConverter::to_param_list(pdata, plist) < 0) {
00555     ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00556       ACE_TEXT("Spdp::SpdpTransport::write() - ")
00557       ACE_TEXT("failed to convert from SPDPdiscoveredParticipantData ")
00558       ACE_TEXT("to ParameterList\n")));
00559     return;
00560   }
00561 
00562   wbuff_.reset();
00563   CORBA::UShort options = 0;
00564   DCPS::Serializer ser(&wbuff_, false, DCPS::Serializer::ALIGN_CDR);
00565   if (!(ser << hdr_) || !(ser << data_) || !(ser << encap_LE) || !(ser << options)
00566       || !(ser << plist)) {
00567     ACE_ERROR((LM_ERROR,
00568       ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::write() - ")
00569       ACE_TEXT("failed to serialize headers for SPDP\n")));
00570     return;
00571   }
00572 
00573   typedef OPENDDS_SET(ACE_INET_Addr)::const_iterator iter_t;
00574   for (iter_t iter = send_addrs_.begin(); iter != send_addrs_.end(); ++iter) {
00575     const ssize_t res =
00576       unicast_socket_.send(wbuff_.rd_ptr(), wbuff_.length(), *iter);
00577     if (res < 0) {
00578       ACE_TCHAR addr_buff[256] = {};
00579       iter->addr_to_string(addr_buff, 256, 0);
00580       ACE_ERROR((LM_ERROR,
00581         ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::write() - ")
00582         ACE_TEXT("destination %s failed %p\n"), addr_buff, ACE_TEXT("send")));
00583     }
00584   }
00585 }
00586 
00587 int
00588 Spdp::SpdpTransport::handle_timeout(const ACE_Time_Value&, const void*)
00589 {
00590   write();
00591   outer_->remove_expired_participants();
00592   return 0;
00593 }
00594 
00595 int
00596 Spdp::SpdpTransport::handle_input(ACE_HANDLE h)
00597 {
00598   const ACE_SOCK_Dgram& socket = (h == unicast_socket_.get_handle())
00599                                  ? unicast_socket_ : multicast_socket_;
00600   ACE_INET_Addr remote;
00601   buff_.reset();
00602   const ssize_t bytes = socket.recv(buff_.wr_ptr(), buff_.space(), remote);
00603 
00604   if (bytes > 0) {
00605     buff_.wr_ptr(bytes);
00606   } else if (bytes == 0) {
00607     return -1;
00608   } else {
00609     ACE_DEBUG((
00610           LM_ERROR,
00611           ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::handle_input() - ")
00612           ACE_TEXT("error reading from %C socket %p\n")
00613           , (h == unicast_socket_.get_handle()) ? "unicast" : "multicast",
00614           ACE_TEXT("ACE_SOCK_Dgram::recv")));
00615     return -1;
00616   }
00617 
00618   // Handle some RTI protocol multicast to the same address
00619   if ((buff_.size() >= 4) && (!ACE_OS::memcmp(buff_.rd_ptr(), "RTPX", 4))) {
00620     return 0; // Ignore
00621   }
00622 
00623   DCPS::Serializer ser(&buff_, false, DCPS::Serializer::ALIGN_CDR);
00624   Header header;
00625   if (!(ser >> header)) {
00626     ACE_ERROR((LM_ERROR,
00627                ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::handle_input() - ")
00628                ACE_TEXT("failed to deserialize RTPS header for SPDP\n")));
00629     return 0;
00630   }
00631 
00632   while (buff_.length() > 3) {
00633     const char subm = buff_.rd_ptr()[0], flags = buff_.rd_ptr()[1];
00634     ser.swap_bytes((flags & 1 /*FLAG_E*/) != ACE_CDR_BYTE_ORDER);
00635     const size_t start = buff_.length();
00636     CORBA::UShort submessageLength = 0;
00637     switch (subm) {
00638     case DATA: {
00639       DataSubmessage data;
00640       if (!(ser >> data)) {
00641         ACE_ERROR((
00642               LM_ERROR,
00643               ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::handle_input() - ")
00644               ACE_TEXT("failed to deserialize DATA header for SPDP\n")));
00645         return 0;
00646       }
00647       submessageLength = data.smHeader.submessageLength;
00648 
00649       if (data.writerId != ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER) {
00650         // Not our message: this could be the same multicast group used
00651         // for SEDP and other traffic.
00652         break;
00653       }
00654 
00655       ParameterList plist;
00656       if (data.smHeader.flags & (4 /*FLAG_D*/ | 8 /*FLAG_K*/)) {
00657         ser.swap_bytes(!ACE_CDR_BYTE_ORDER); // read "encap" itself in LE
00658         CORBA::UShort encap, options;
00659         if (!(ser >> encap) || (encap != encap_LE && encap != encap_BE)) {
00660           ACE_ERROR((LM_ERROR,
00661             ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::handle_input() - ")
00662             ACE_TEXT("failed to deserialize encapsulation header for SPDP\n")));
00663           return 0;
00664         }
00665         ser >> options;
00666         // bit 8 in encap is on if it's PL_CDR_LE
00667         ser.swap_bytes(((encap & 0x100) >> 8) != ACE_CDR_BYTE_ORDER);
00668         if (!(ser >> plist)) {
00669           ACE_ERROR((LM_ERROR,
00670             ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::handle_input() - ")
00671             ACE_TEXT("failed to deserialize data payload for SPDP\n")));
00672           return 0;
00673         }
00674       } else {
00675         plist.length(1);
00676         RepoId guid;
00677         std::memcpy(guid.guidPrefix, header.guidPrefix, sizeof(GuidPrefix_t));
00678         guid.entityId = ENTITYID_PARTICIPANT;
00679         plist[0].guid(guid);
00680         plist[0]._d(PID_PARTICIPANT_GUID);
00681       }
00682 
00683       outer_->data_received(data, plist);
00684       break;
00685     }
00686     default:
00687       SubmessageHeader smHeader;
00688       if (!(ser >> smHeader)) {
00689         ACE_ERROR((
00690               LM_ERROR,
00691               ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::handle_input() - ")
00692               ACE_TEXT("failed to deserialize SubmessageHeader for SPDP\n")));
00693         return 0;
00694       }
00695       submessageLength = smHeader.submessageLength;
00696       break;
00697     }
00698     if (submessageLength && buff_.length()) {
00699       const size_t read = start - buff_.length();
00700       if (read < static_cast<size_t>(submessageLength + SMHDR_SZ)) {
00701         ser.skip(static_cast<CORBA::UShort>(submessageLength + SMHDR_SZ - read));
00702       }
00703     } else if (!submessageLength) {
00704       break; // submessageLength of 0 indicates the last submessage
00705     }
00706   }
00707 
00708   return 0;
00709 }
00710 
00711 int
00712 Spdp::SpdpTransport::handle_exception(ACE_HANDLE )
00713 {
00714   outer_->wait_for_acks().ack();
00715   return 0;
00716 }
00717 
00718 void
00719 Spdp::SpdpTransport::acknowledge()
00720 {
00721   ACE_Reactor* reactor = outer_->reactor();
00722   reactor->notify(this);
00723 }
00724 
00725 void
00726 Spdp::signal_liveliness(DDS::LivelinessQosPolicyKind kind)
00727 {
00728   sedp_.signal_liveliness(kind);
00729 }
00730 
00731 bool
00732 Spdp::SpdpTransport::open_unicast_socket(u_short port_common,
00733                                          u_short participant_id)
00734 {
00735   const u_short uni_port = port_common + outer_->disco_->d1() +
00736                            (outer_->disco_->pg() * participant_id);
00737 
00738   ACE_INET_Addr local_addr;
00739   if (0 != local_addr.set(uni_port)) {
00740     ACE_DEBUG((
00741           LM_ERROR,
00742           ACE_TEXT("(%P|%t) Spdp::SpdpTransport::open_unicast_socket() - ")
00743           ACE_TEXT("failed setting unicast local_addr to port %d %p\n"),
00744           uni_port, ACE_TEXT("ACE_INET_Addr::set")));
00745     throw std::runtime_error("failed to set unicast local address");
00746   }
00747 
00748   if (!OpenDDS::DCPS::open_appropriate_socket_type(unicast_socket_, local_addr)) {
00749     if (DCPS::DCPS_debug_level > 3) {
00750       ACE_DEBUG((
00751             LM_WARNING,
00752             ACE_TEXT("(%P|%t) Spdp::SpdpTransport::open_unicast_socket() - ")
00753             ACE_TEXT("failed to open_appropriate_socket_type unicast socket on port %d %p.  ")
00754             ACE_TEXT("Trying next participantId...\n"),
00755             uni_port, ACE_TEXT("ACE_SOCK_Dgram::open")));
00756     }
00757     return false;
00758 
00759   } else if (DCPS::DCPS_debug_level > 3) {
00760     ACE_DEBUG((
00761           LM_INFO,
00762           ACE_TEXT("(%P|%t) Spdp::SpdpTransport::open_unicast_socket() - ")
00763           ACE_TEXT("opened unicast socket on port %d\n"),
00764           uni_port));
00765   }
00766 
00767   if (!OpenDDS::DCPS::set_socket_multicast_ttl(unicast_socket_, outer_->disco_->ttl())) {
00768     ACE_DEBUG((LM_ERROR,
00769                ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::open_unicast_socket() - ")
00770                ACE_TEXT("failed to set TTL value to %d ")
00771                ACE_TEXT("for port:%hd %p\n"),
00772                outer_->disco_->ttl(), uni_port, ACE_TEXT("OpenDDS::DCPS::set_socket_multicast_ttl:")));
00773     throw std::runtime_error("failed to set TTL");
00774   }
00775   return true;
00776 }
00777 
00778 bool
00779 Spdp::get_default_locators(const RepoId& part_id, OpenDDS::DCPS::LocatorSeq& target,
00780                            bool& inlineQos)
00781 {
00782   DiscoveredParticipantIter part_iter = participants_.find(part_id);
00783   if (part_iter == participants_.end()) {
00784     return false;
00785   } else {
00786     inlineQos = part_iter->second.pdata_.participantProxy.expectsInlineQos;
00787     OpenDDS::DCPS::LocatorSeq& mc_source =
00788           part_iter->second.pdata_.participantProxy.defaultMulticastLocatorList;
00789     OpenDDS::DCPS::LocatorSeq& uc_source =
00790           part_iter->second.pdata_.participantProxy.defaultUnicastLocatorList;
00791     CORBA::ULong mc_source_len = mc_source.length();
00792     CORBA::ULong uc_source_len = uc_source.length();
00793     CORBA::ULong target_len = target.length();
00794     target.length(mc_source_len + uc_source_len + target_len);
00795     // Copy multicast
00796     for (CORBA::ULong mci = 0; mci < mc_source.length(); ++mci) {
00797       target[target_len + mci] = mc_source[mci];
00798     }
00799     // Copy unicast
00800     for (CORBA::ULong uci = 0; uci < uc_source.length(); ++uci) {
00801       target[target_len + mc_source_len + uci] = uc_source[uci];
00802     }
00803   }
00804   return true;
00805 }
00806 
00807 bool
00808 Spdp::associated() const
00809 {
00810   return !participants_.empty();
00811 }
00812 
00813 bool
00814 Spdp::has_discovered_participant(const DCPS::RepoId& guid)
00815 {
00816   return participants_.find(guid) != participants_.end();
00817 }
00818 
00819 
00820 void
00821 Spdp::get_discovered_participant_ids(DCPS::RepoIdSet& results) const
00822 {
00823   DiscoveredParticipantMap::const_iterator idx;
00824   for (idx = participants_.begin(); idx != participants_.end(); ++idx)
00825   {
00826     results.insert(idx->first);
00827   }
00828 }
00829 
00830 }
00831 }

Generated on Fri Feb 12 20:05:27 2016 for OpenDDS by  doxygen 1.4.7