00001
00002
00003
00004
00005
00006
00007
00008 #include "Spdp.h"
00009 #include "BaseMessageTypes.h"
00010 #include "MessageTypes.h"
00011 #include "RtpsCoreTypeSupportImpl.h"
00012 #include "ParameterListConverter.h"
00013 #include "RtpsDiscovery.h"
00014
00015 #include "dds/DdsDcpsGuidC.h"
00016
00017 #include "dds/DCPS/Service_Participant.h"
00018 #include "dds/DCPS/BuiltInTopicUtils.h"
00019 #include "dds/DCPS/GuidConverter.h"
00020 #include "dds/DCPS/Qos_Helper.h"
00021
00022 #include "ace/Reactor.h"
00023 #include "ace/OS_NS_sys_socket.h"
00024
00025 #include <cstring>
00026 #include <stdexcept>
00027
00028 namespace OpenDDS {
00029 namespace RTPS {
00030 using DCPS::RepoId;
00031
00032 namespace {
00033
00034
00035
00036 const int LEASE_MULT = 10;
00037 const CORBA::UShort encap_LE = 0x0300;
00038 const CORBA::UShort encap_BE = 0x0200;
00039
00040 bool disposed(const ParameterList& inlineQos)
00041 {
00042 for (CORBA::ULong i = 0; i < inlineQos.length(); ++i) {
00043 if (inlineQos[i]._d() == PID_STATUS_INFO) {
00044 return inlineQos[i].status_info().value[3] & 1;
00045 }
00046 }
00047 return false;
00048 }
00049 }
00050
00051
00052 Spdp::Spdp(DDS::DomainId_t domain, RepoId& guid,
00053 const DDS::DomainParticipantQos& qos, RtpsDiscovery* disco)
00054 : OpenDDS::DCPS::LocalParticipant<Sedp>(qos)
00055 , disco_(disco), domain_(domain), guid_(guid)
00056 , tport_(new SpdpTransport(this)), eh_(tport_), eh_shutdown_(false)
00057 , shutdown_cond_(lock_), shutdown_flag_(0), sedp_(guid_, *this, lock_)
00058 {
00059 ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00060 guid = guid_;
00061 sedp_.ignore(guid);
00062 sedp_.init(guid_, *disco, domain_);
00063
00064
00065 sedp_.unicast_locators(sedp_unicast_);
00066
00067 if (disco->sedp_multicast()) {
00068 const ACE_INET_Addr& mc_addr = sedp_.multicast_group();
00069 OpenDDS::DCPS::Locator_t mc_locator;
00070 mc_locator.kind = address_to_kind(mc_addr);
00071 mc_locator.port = mc_addr.get_port_number();
00072 address_to_bytes(mc_locator.address, mc_addr);
00073 sedp_multicast_.length(1);
00074 sedp_multicast_[0] = mc_locator;
00075 }
00076 }
00077
00078 Spdp::~Spdp()
00079 {
00080 shutdown_flag_ = 1;
00081 {
00082 ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00083 if (DCPS::DCPS_debug_level > 3) {
00084 ACE_DEBUG((LM_INFO,
00085 ACE_TEXT("(%P|%t) Spdp::~Spdp ")
00086 ACE_TEXT("remove discovered participants\n")));
00087 }
00088
00089
00090 DCPS::RepoIdSet participant_ids;
00091 get_discovered_participant_ids(participant_ids);
00092 for (DCPS::RepoIdSet::iterator participant_id = participant_ids.begin();
00093 participant_id != participant_ids.end();
00094 ++participant_id)
00095 {
00096 DiscoveredParticipantIter part = participants_.find(*participant_id);
00097 if (part != participants_.end()) {
00098 remove_discovered_participant(part);
00099 }
00100 }
00101 }
00102
00103
00104
00105 sedp_.shutdown();
00106
00107
00108 tport_->close();
00109 eh_.reset();
00110 {
00111 ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00112 while (!eh_shutdown_) {
00113 shutdown_cond_.wait();
00114 }
00115 }
00116 }
00117
00118 void
00119 Spdp::data_received(const DataSubmessage& data, const ParameterList& plist)
00120 {
00121 if (shutdown_flag_.value()) { return; }
00122
00123 const ACE_Time_Value time = ACE_OS::gettimeofday();
00124 SPDPdiscoveredParticipantData pdata;
00125 if (ParameterListConverter::from_param_list(plist, pdata) < 0) {
00126 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: Spdp::data_received - ")
00127 ACE_TEXT("failed to convert from ParameterList to ")
00128 ACE_TEXT("SPDPdiscoveredParticipantData\n")));
00129 return;
00130 }
00131
00132 DCPS::RepoId guid;
00133 std::memcpy(guid.guidPrefix, pdata.participantProxy.guidPrefix,
00134 sizeof(guid.guidPrefix));
00135 guid.entityId = OpenDDS::DCPS::ENTITYID_PARTICIPANT;
00136
00137 ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00138 if (sedp_.ignoring(guid)) {
00139
00140
00141 return;
00142 }
00143
00144
00145 DiscoveredParticipantIter iter = participants_.find(guid);
00146
00147
00148 ACE_Reverse_Lock<ACE_Thread_Mutex> rev_lock(lock_);
00149
00150 if (iter == participants_.end()) {
00151
00152 std::memcpy(pdata.ddsParticipantData.key.value,
00153 pdata.participantProxy.guidPrefix,
00154 sizeof(pdata.ddsParticipantData.key.value));
00155
00156 if (DCPS::DCPS_debug_level) {
00157 DCPS::GuidConverter local(guid_), remote(guid);
00158 ACE_DEBUG((LM_DEBUG,
00159 ACE_TEXT("(%P|%t) Spdp::data_received - %C discovered %C lease %ds\n"),
00160 OPENDDS_STRING(local).c_str(), OPENDDS_STRING(remote).c_str(),
00161 pdata.leaseDuration.seconds));
00162 }
00163
00164
00165 participants_[guid] = DiscoveredParticipant(pdata, time);
00166 DDS::InstanceHandle_t bit_instance_handle = DDS::HANDLE_NIL;
00167 #ifndef DDS_HAS_MINIMUM_BIT
00168 DDS::ParticipantBuiltinTopicDataDataReaderImpl* bit = part_bit();
00169 if (bit) {
00170 ACE_GUARD(ACE_Reverse_Lock<ACE_Thread_Mutex>, rg, rev_lock);
00171 bit_instance_handle =
00172 bit->store_synthetic_data(pdata.ddsParticipantData,
00173 DDS::NEW_VIEW_STATE);
00174 }
00175 #endif
00176
00177
00178
00179
00180 sedp_.associate(pdata);
00181
00182
00183
00184 this->tport_->write_i();
00185
00186
00187 iter = participants_.find(guid);
00188 if (iter != participants_.end()) {
00189 iter->second.bit_ih_ = bit_instance_handle;
00190 }
00191
00192 } else if (data.inlineQos.length() && disposed(data.inlineQos)) {
00193 remove_discovered_participant(iter);
00194
00195 } else {
00196
00197 pdata.ddsParticipantData.key = iter->second.pdata_.ddsParticipantData.key;
00198 #ifndef OPENDDS_SAFETY_PROFILE
00199 using OpenDDS::DCPS::operator!=;
00200 #endif
00201 if (iter->second.pdata_.ddsParticipantData.user_data !=
00202 pdata.ddsParticipantData.user_data) {
00203 iter->second.pdata_.ddsParticipantData.user_data =
00204 pdata.ddsParticipantData.user_data;
00205 #ifndef DDS_HAS_MINIMUM_BIT
00206 DDS::ParticipantBuiltinTopicDataDataReaderImpl* bit = part_bit();
00207 if (bit) {
00208 ACE_GUARD(ACE_Reverse_Lock<ACE_Thread_Mutex>, rg, rev_lock);
00209 bit->store_synthetic_data(pdata.ddsParticipantData,
00210 DDS::NOT_NEW_VIEW_STATE);
00211 }
00212 #endif
00213
00214 iter = participants_.find(guid);
00215 }
00216
00217 if (iter != participants_.end()) {
00218 iter->second.pdata_ = pdata;
00219 iter->second.last_seen_ = time;
00220 }
00221 }
00222 }
00223
00224 void
00225 Spdp::remove_expired_participants()
00226 {
00227
00228 ACE_GUARD (ACE_Thread_Mutex, g, lock_);
00229
00230
00231 DCPS::RepoIdSet participant_ids;
00232 get_discovered_participant_ids(participant_ids);
00233 for (DCPS::RepoIdSet::iterator participant_id = participant_ids.begin();
00234 participant_id != participant_ids.end();
00235 ++participant_id)
00236 {
00237 DiscoveredParticipantIter part = participants_.find(*participant_id);
00238 if (part != participants_.end()) {
00239 if (part->second.last_seen_ <
00240 ACE_OS::gettimeofday() -
00241 ACE_Time_Value(part->second.pdata_.leaseDuration.seconds)) {
00242 if (DCPS::DCPS_debug_level > 1) {
00243 DCPS::GuidConverter conv(part->first);
00244 ACE_DEBUG((LM_WARNING,
00245 ACE_TEXT("(%P|%t) Spdp::remove_expired_participants() - ")
00246 ACE_TEXT("participant %C exceeded lease duration, removing\n"),
00247 OPENDDS_STRING(conv).c_str()));
00248 }
00249 remove_discovered_participant(part);
00250 }
00251 }
00252 }
00253 }
00254
00255 void
00256 Spdp::init_bit(const DDS::Subscriber_var& bit_subscriber)
00257 {
00258 bit_subscriber_ = bit_subscriber;
00259 tport_->open();
00260 }
00261
00262 void
00263 Spdp::fini_bit()
00264 {
00265 bit_subscriber_ = 0;
00266 wait_for_acks_.reset();
00267
00268
00269 tport_->acknowledge();
00270 sedp_.acknowledge();
00271
00272 wait_for_acks_.wait_for_acks(2);
00273 }
00274
00275 #ifndef DDS_HAS_MINIMUM_BIT
00276 DDS::ParticipantBuiltinTopicDataDataReaderImpl*
00277 Spdp::part_bit()
00278 {
00279 if (!bit_subscriber_.in())
00280 return 0;
00281
00282 DDS::DataReader_var d =
00283 bit_subscriber_->lookup_datareader(DCPS::BUILT_IN_PARTICIPANT_TOPIC);
00284 return dynamic_cast<DDS::ParticipantBuiltinTopicDataDataReaderImpl*>(d.in());
00285 }
00286 #endif
00287
00288 ACE_Reactor*
00289 Spdp::reactor() const
00290 {
00291 return disco_->reactor();
00292 }
00293
00294 WaitForAcks&
00295 Spdp::wait_for_acks()
00296 {
00297 return wait_for_acks_;
00298 }
00299
00300 Spdp::SpdpTransport::SpdpTransport(Spdp* outer)
00301 : outer_(outer), lease_duration_(outer_->disco_->resend_period() * LEASE_MULT)
00302 , buff_(64 * 1024)
00303 , wbuff_(64 * 1024)
00304 {
00305 hdr_.prefix[0] = 'R';
00306 hdr_.prefix[1] = 'T';
00307 hdr_.prefix[2] = 'P';
00308 hdr_.prefix[3] = 'S';
00309 hdr_.version = PROTOCOLVERSION;
00310 hdr_.vendorId = VENDORID_OPENDDS;
00311 std::memcpy(hdr_.guidPrefix, outer_->guid_.guidPrefix, sizeof(GuidPrefix_t));
00312 data_.smHeader.submessageId = DATA;
00313 data_.smHeader.flags = 1 | 4 ;
00314 data_.smHeader.submessageLength = 0;
00315 data_.extraFlags = 0;
00316 data_.octetsToInlineQos = DATA_OCTETS_TO_IQOS;
00317 data_.readerId = ENTITYID_UNKNOWN;
00318 data_.writerId = ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER;
00319 data_.writerSN.high = 0;
00320 data_.writerSN.low = 0;
00321
00322
00323 const u_short port_common = outer_->disco_->pb() +
00324 (outer_->disco_->dg() * outer_->domain_),
00325 mc_port = port_common + outer_->disco_->d0();
00326
00327 u_short participantId = (hdr_.guidPrefix[10] << 8) | hdr_.guidPrefix[11];
00328
00329 #ifdef OPENDDS_SAFETY_PROFILE
00330 const u_short startingParticipantId = participantId;
00331 #endif
00332
00333 while (!open_unicast_socket(port_common, participantId)) {
00334 ++participantId;
00335 }
00336
00337 #ifdef OPENDDS_SAFETY_PROFILE
00338 if (participantId > startingParticipantId && ACE_OS::getpid() == -1) {
00339
00340
00341
00342
00343 hdr_.guidPrefix[8] = static_cast<CORBA::Octet>(participantId >> 8);
00344 hdr_.guidPrefix[9] = static_cast<CORBA::Octet>(participantId & 0xFF);
00345 outer_->guid_.guidPrefix[8] = hdr_.guidPrefix[8];
00346 outer_->guid_.guidPrefix[9] = hdr_.guidPrefix[9];
00347 }
00348 #endif
00349
00350 OPENDDS_STRING mc_addr = outer_->disco_->default_multicast_group();
00351 ACE_INET_Addr default_multicast;
00352 if (0 != default_multicast.set(mc_port, mc_addr.c_str())) {
00353 ACE_DEBUG((
00354 LM_ERROR,
00355 ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::SpdpTransport() - ")
00356 ACE_TEXT("failed setting default_multicast address %C:%hd %p\n"),
00357 mc_addr.c_str(), mc_port, ACE_TEXT("ACE_INET_Addr::set")));
00358 throw std::runtime_error("failed to set default_multicast address");
00359 }
00360
00361 const OPENDDS_STRING& net_if = outer_->disco_->multicast_interface();
00362
00363 if (DCPS::DCPS_debug_level > 3) {
00364 ACE_DEBUG((LM_INFO,
00365 ACE_TEXT("(%P|%t) Spdp::SpdpTransport::SpdpTransport ")
00366 ACE_TEXT("joining group %C %C:%hd\n"),
00367 net_if.c_str (),
00368 mc_addr.c_str (),
00369 mc_port));
00370 }
00371
00372 #ifdef ACE_HAS_MAC_OSX
00373 multicast_socket_.opts(ACE_SOCK_Dgram_Mcast::OPT_BINDADDR_NO |
00374 ACE_SOCK_Dgram_Mcast::DEFOPT_NULLIFACE);
00375 #endif
00376
00377 if (0 != multicast_socket_.join(default_multicast, 1,
00378 net_if.empty() ? 0 :
00379 ACE_TEXT_CHAR_TO_TCHAR(net_if.c_str()))) {
00380 ACE_DEBUG((LM_ERROR,
00381 ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::SpdpTransport() - ")
00382 ACE_TEXT("failed to join multicast group %C:%hd %p\n"),
00383 mc_addr.c_str(), mc_port, ACE_TEXT("ACE_SOCK_Dgram_Mcast::join")));
00384 throw std::runtime_error("failed to join multicast group");
00385 }
00386
00387 send_addrs_.insert(default_multicast);
00388
00389 typedef RtpsDiscovery::AddrVec::iterator iter;
00390 for (iter it = outer_->disco_->spdp_send_addrs().begin(),
00391 end = outer_->disco_->spdp_send_addrs().end(); it != end; ++it) {
00392 send_addrs_.insert(ACE_INET_Addr(it->c_str()));
00393 }
00394
00395 reference_counting_policy().value(Reference_Counting_Policy::ENABLED);
00396 }
00397
00398 void
00399 Spdp::SpdpTransport::open()
00400 {
00401 ACE_Reactor* reactor = outer_->reactor();
00402 if (reactor->register_handler(unicast_socket_.get_handle(),
00403 this, ACE_Event_Handler::READ_MASK) != 0) {
00404 throw std::runtime_error("failed to register unicast input handler");
00405 }
00406
00407 if (reactor->register_handler(multicast_socket_.get_handle(),
00408 this, ACE_Event_Handler::READ_MASK) != 0) {
00409 throw std::runtime_error("failed to register multicast input handler");
00410 }
00411
00412 const ACE_Time_Value per = outer_->disco_->resend_period();
00413 if (-1 == reactor->schedule_timer(this, 0, ACE_Time_Value(0), per)) {
00414 throw std::runtime_error("failed to schedule timer with reactor");
00415 }
00416 }
00417
00418 Spdp::SpdpTransport::~SpdpTransport()
00419 {
00420 if (DCPS::DCPS_debug_level > 3) {
00421 ACE_DEBUG((LM_INFO, ACE_TEXT("(%P|%t) SpdpTransport::~SpdpTransport\n")));
00422 }
00423 dispose_unregister();
00424 {
00425
00426 ACE_GUARD(ACE_Thread_Mutex, g, outer_->lock_);
00427 outer_->eh_shutdown_ = true;
00428 }
00429 outer_->shutdown_cond_.signal();
00430 unicast_socket_.close();
00431 multicast_socket_.close();
00432 }
00433
00434 void
00435 Spdp::SpdpTransport::dispose_unregister()
00436 {
00437
00438 data_.writerSN.high = seq_.getHigh();
00439 data_.writerSN.low = seq_.getLow();
00440 data_.smHeader.flags = 1 | 2 | 8 ;
00441 data_.inlineQos.length(1);
00442 static const StatusInfo_t dispose_unregister = { {0, 0, 0, 3} };
00443 data_.inlineQos[0].status_info(dispose_unregister);
00444
00445 ParameterList plist(1);
00446 plist.length(1);
00447 plist[0].guid(outer_->guid_);
00448 plist[0]._d(PID_PARTICIPANT_GUID);
00449
00450 wbuff_.reset();
00451 DCPS::Serializer ser(&wbuff_, false, DCPS::Serializer::ALIGN_CDR);
00452 CORBA::UShort options = 0;
00453 if (!(ser << hdr_) || !(ser << data_) || !(ser << encap_LE) || !(ser << options)
00454 || !(ser << plist)) {
00455 ACE_ERROR((LM_ERROR,
00456 ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::dispose_unregister() - ")
00457 ACE_TEXT("failed to serialize headers for dispose/unregister\n")));
00458 return;
00459 }
00460
00461 typedef OPENDDS_SET(ACE_INET_Addr)::const_iterator iter_t;
00462 for (iter_t iter = send_addrs_.begin(); iter != send_addrs_.end(); ++iter) {
00463 const ssize_t res =
00464 unicast_socket_.send(wbuff_.rd_ptr(), wbuff_.length(), *iter);
00465 if (res < 0) {
00466 ACE_TCHAR addr_buff[256] = {};
00467 iter->addr_to_string(addr_buff, 256, 0);
00468 ACE_ERROR((LM_ERROR,
00469 ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::dispose_unregister() - ")
00470 ACE_TEXT("destination %s failed %p\n"), addr_buff, ACE_TEXT("send")));
00471 }
00472 }
00473 }
00474
00475 void
00476 Spdp::SpdpTransport::close()
00477 {
00478 if (DCPS::DCPS_debug_level > 3) {
00479 ACE_DEBUG((LM_INFO, ACE_TEXT("(%P|%t) SpdpTransport::close\n")));
00480 }
00481 ACE_Reactor* reactor = outer_->reactor();
00482 reactor->cancel_timer(this);
00483 const ACE_Reactor_Mask mask =
00484 ACE_Event_Handler::READ_MASK | ACE_Event_Handler::DONT_CALL;
00485 reactor->remove_handler(multicast_socket_.get_handle(), mask);
00486 reactor->remove_handler(unicast_socket_.get_handle(), mask);
00487 }
00488
00489 void
00490 Spdp::SpdpTransport::write()
00491 {
00492 ACE_GUARD(ACE_Thread_Mutex, g, outer_->lock_);
00493 write_i();
00494 }
00495
00496 void
00497 Spdp::SpdpTransport::write_i()
00498 {
00499 static const BuiltinEndpointSet_t availableBuiltinEndpoints =
00500 DISC_BUILTIN_ENDPOINT_PARTICIPANT_ANNOUNCER |
00501 DISC_BUILTIN_ENDPOINT_PARTICIPANT_DETECTOR |
00502 DISC_BUILTIN_ENDPOINT_PUBLICATION_ANNOUNCER |
00503 DISC_BUILTIN_ENDPOINT_PUBLICATION_DETECTOR |
00504 DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_ANNOUNCER |
00505 DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_DETECTOR |
00506 BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_DATA_WRITER |
00507 BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_DATA_READER
00508 ;
00509
00510
00511
00512
00513 OpenDDS::DCPS::LocatorSeq nonEmptyList(1);
00514 nonEmptyList.length(1);
00515 nonEmptyList[0].kind = LOCATOR_KIND_UDPv4;
00516 nonEmptyList[0].port = 12345;
00517 std::memset(nonEmptyList[0].address, 0, 12);
00518 nonEmptyList[0].address[12] = 127;
00519 nonEmptyList[0].address[13] = 0;
00520 nonEmptyList[0].address[14] = 0;
00521 nonEmptyList[0].address[15] = 1;
00522
00523 data_.writerSN.high = seq_.getHigh();
00524 data_.writerSN.low = seq_.getLow();
00525 ++seq_;
00526
00527 const GuidPrefix_t& gp = outer_->guid_.guidPrefix;
00528
00529 const SPDPdiscoveredParticipantData pdata = {
00530 {
00531 DDS::BuiltinTopicKey_t() ,
00532 outer_->qos_.user_data
00533 },
00534 {
00535 PROTOCOLVERSION,
00536 {gp[0], gp[1], gp[2], gp[3], gp[4], gp[5],
00537 gp[6], gp[7], gp[8], gp[9], gp[10], gp[11]},
00538 VENDORID_OPENDDS,
00539 false ,
00540 availableBuiltinEndpoints,
00541 outer_->sedp_unicast_,
00542 outer_->sedp_multicast_,
00543 nonEmptyList ,
00544 nonEmptyList ,
00545 {0 }
00546 },
00547 {
00548 static_cast<CORBA::Long>(lease_duration_.sec()),
00549 0
00550 }
00551 };
00552
00553 ParameterList plist;
00554 if (ParameterListConverter::to_param_list(pdata, plist) < 0) {
00555 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00556 ACE_TEXT("Spdp::SpdpTransport::write() - ")
00557 ACE_TEXT("failed to convert from SPDPdiscoveredParticipantData ")
00558 ACE_TEXT("to ParameterList\n")));
00559 return;
00560 }
00561
00562 wbuff_.reset();
00563 CORBA::UShort options = 0;
00564 DCPS::Serializer ser(&wbuff_, false, DCPS::Serializer::ALIGN_CDR);
00565 if (!(ser << hdr_) || !(ser << data_) || !(ser << encap_LE) || !(ser << options)
00566 || !(ser << plist)) {
00567 ACE_ERROR((LM_ERROR,
00568 ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::write() - ")
00569 ACE_TEXT("failed to serialize headers for SPDP\n")));
00570 return;
00571 }
00572
00573 typedef OPENDDS_SET(ACE_INET_Addr)::const_iterator iter_t;
00574 for (iter_t iter = send_addrs_.begin(); iter != send_addrs_.end(); ++iter) {
00575 const ssize_t res =
00576 unicast_socket_.send(wbuff_.rd_ptr(), wbuff_.length(), *iter);
00577 if (res < 0) {
00578 ACE_TCHAR addr_buff[256] = {};
00579 iter->addr_to_string(addr_buff, 256, 0);
00580 ACE_ERROR((LM_ERROR,
00581 ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::write() - ")
00582 ACE_TEXT("destination %s failed %p\n"), addr_buff, ACE_TEXT("send")));
00583 }
00584 }
00585 }
00586
00587 int
00588 Spdp::SpdpTransport::handle_timeout(const ACE_Time_Value&, const void*)
00589 {
00590 write();
00591 outer_->remove_expired_participants();
00592 return 0;
00593 }
00594
00595 int
00596 Spdp::SpdpTransport::handle_input(ACE_HANDLE h)
00597 {
00598 const ACE_SOCK_Dgram& socket = (h == unicast_socket_.get_handle())
00599 ? unicast_socket_ : multicast_socket_;
00600 ACE_INET_Addr remote;
00601 buff_.reset();
00602 const ssize_t bytes = socket.recv(buff_.wr_ptr(), buff_.space(), remote);
00603
00604 if (bytes > 0) {
00605 buff_.wr_ptr(bytes);
00606 } else if (bytes == 0) {
00607 return -1;
00608 } else {
00609 ACE_DEBUG((
00610 LM_ERROR,
00611 ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::handle_input() - ")
00612 ACE_TEXT("error reading from %C socket %p\n")
00613 , (h == unicast_socket_.get_handle()) ? "unicast" : "multicast",
00614 ACE_TEXT("ACE_SOCK_Dgram::recv")));
00615 return -1;
00616 }
00617
00618
00619 if ((buff_.size() >= 4) && (!ACE_OS::memcmp(buff_.rd_ptr(), "RTPX", 4))) {
00620 return 0;
00621 }
00622
00623 DCPS::Serializer ser(&buff_, false, DCPS::Serializer::ALIGN_CDR);
00624 Header header;
00625 if (!(ser >> header)) {
00626 ACE_ERROR((LM_ERROR,
00627 ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::handle_input() - ")
00628 ACE_TEXT("failed to deserialize RTPS header for SPDP\n")));
00629 return 0;
00630 }
00631
00632 while (buff_.length() > 3) {
00633 const char subm = buff_.rd_ptr()[0], flags = buff_.rd_ptr()[1];
00634 ser.swap_bytes((flags & 1 ) != ACE_CDR_BYTE_ORDER);
00635 const size_t start = buff_.length();
00636 CORBA::UShort submessageLength = 0;
00637 switch (subm) {
00638 case DATA: {
00639 DataSubmessage data;
00640 if (!(ser >> data)) {
00641 ACE_ERROR((
00642 LM_ERROR,
00643 ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::handle_input() - ")
00644 ACE_TEXT("failed to deserialize DATA header for SPDP\n")));
00645 return 0;
00646 }
00647 submessageLength = data.smHeader.submessageLength;
00648
00649 if (data.writerId != ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER) {
00650
00651
00652 break;
00653 }
00654
00655 ParameterList plist;
00656 if (data.smHeader.flags & (4 | 8 )) {
00657 ser.swap_bytes(!ACE_CDR_BYTE_ORDER);
00658 CORBA::UShort encap, options;
00659 if (!(ser >> encap) || (encap != encap_LE && encap != encap_BE)) {
00660 ACE_ERROR((LM_ERROR,
00661 ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::handle_input() - ")
00662 ACE_TEXT("failed to deserialize encapsulation header for SPDP\n")));
00663 return 0;
00664 }
00665 ser >> options;
00666
00667 ser.swap_bytes(((encap & 0x100) >> 8) != ACE_CDR_BYTE_ORDER);
00668 if (!(ser >> plist)) {
00669 ACE_ERROR((LM_ERROR,
00670 ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::handle_input() - ")
00671 ACE_TEXT("failed to deserialize data payload for SPDP\n")));
00672 return 0;
00673 }
00674 } else {
00675 plist.length(1);
00676 RepoId guid;
00677 std::memcpy(guid.guidPrefix, header.guidPrefix, sizeof(GuidPrefix_t));
00678 guid.entityId = ENTITYID_PARTICIPANT;
00679 plist[0].guid(guid);
00680 plist[0]._d(PID_PARTICIPANT_GUID);
00681 }
00682
00683 outer_->data_received(data, plist);
00684 break;
00685 }
00686 default:
00687 SubmessageHeader smHeader;
00688 if (!(ser >> smHeader)) {
00689 ACE_ERROR((
00690 LM_ERROR,
00691 ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::handle_input() - ")
00692 ACE_TEXT("failed to deserialize SubmessageHeader for SPDP\n")));
00693 return 0;
00694 }
00695 submessageLength = smHeader.submessageLength;
00696 break;
00697 }
00698 if (submessageLength && buff_.length()) {
00699 const size_t read = start - buff_.length();
00700 if (read < static_cast<size_t>(submessageLength + SMHDR_SZ)) {
00701 ser.skip(static_cast<CORBA::UShort>(submessageLength + SMHDR_SZ - read));
00702 }
00703 } else if (!submessageLength) {
00704 break;
00705 }
00706 }
00707
00708 return 0;
00709 }
00710
00711 int
00712 Spdp::SpdpTransport::handle_exception(ACE_HANDLE )
00713 {
00714 outer_->wait_for_acks().ack();
00715 return 0;
00716 }
00717
00718 void
00719 Spdp::SpdpTransport::acknowledge()
00720 {
00721 ACE_Reactor* reactor = outer_->reactor();
00722 reactor->notify(this);
00723 }
00724
00725 void
00726 Spdp::signal_liveliness(DDS::LivelinessQosPolicyKind kind)
00727 {
00728 sedp_.signal_liveliness(kind);
00729 }
00730
00731 bool
00732 Spdp::SpdpTransport::open_unicast_socket(u_short port_common,
00733 u_short participant_id)
00734 {
00735 const u_short uni_port = port_common + outer_->disco_->d1() +
00736 (outer_->disco_->pg() * participant_id);
00737
00738 ACE_INET_Addr local_addr;
00739 if (0 != local_addr.set(uni_port)) {
00740 ACE_DEBUG((
00741 LM_ERROR,
00742 ACE_TEXT("(%P|%t) Spdp::SpdpTransport::open_unicast_socket() - ")
00743 ACE_TEXT("failed setting unicast local_addr to port %d %p\n"),
00744 uni_port, ACE_TEXT("ACE_INET_Addr::set")));
00745 throw std::runtime_error("failed to set unicast local address");
00746 }
00747
00748 if (!OpenDDS::DCPS::open_appropriate_socket_type(unicast_socket_, local_addr)) {
00749 if (DCPS::DCPS_debug_level > 3) {
00750 ACE_DEBUG((
00751 LM_WARNING,
00752 ACE_TEXT("(%P|%t) Spdp::SpdpTransport::open_unicast_socket() - ")
00753 ACE_TEXT("failed to open_appropriate_socket_type unicast socket on port %d %p. ")
00754 ACE_TEXT("Trying next participantId...\n"),
00755 uni_port, ACE_TEXT("ACE_SOCK_Dgram::open")));
00756 }
00757 return false;
00758
00759 } else if (DCPS::DCPS_debug_level > 3) {
00760 ACE_DEBUG((
00761 LM_INFO,
00762 ACE_TEXT("(%P|%t) Spdp::SpdpTransport::open_unicast_socket() - ")
00763 ACE_TEXT("opened unicast socket on port %d\n"),
00764 uni_port));
00765 }
00766
00767 if (!OpenDDS::DCPS::set_socket_multicast_ttl(unicast_socket_, outer_->disco_->ttl())) {
00768 ACE_DEBUG((LM_ERROR,
00769 ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::open_unicast_socket() - ")
00770 ACE_TEXT("failed to set TTL value to %d ")
00771 ACE_TEXT("for port:%hd %p\n"),
00772 outer_->disco_->ttl(), uni_port, ACE_TEXT("OpenDDS::DCPS::set_socket_multicast_ttl:")));
00773 throw std::runtime_error("failed to set TTL");
00774 }
00775 return true;
00776 }
00777
00778 bool
00779 Spdp::get_default_locators(const RepoId& part_id, OpenDDS::DCPS::LocatorSeq& target,
00780 bool& inlineQos)
00781 {
00782 DiscoveredParticipantIter part_iter = participants_.find(part_id);
00783 if (part_iter == participants_.end()) {
00784 return false;
00785 } else {
00786 inlineQos = part_iter->second.pdata_.participantProxy.expectsInlineQos;
00787 OpenDDS::DCPS::LocatorSeq& mc_source =
00788 part_iter->second.pdata_.participantProxy.defaultMulticastLocatorList;
00789 OpenDDS::DCPS::LocatorSeq& uc_source =
00790 part_iter->second.pdata_.participantProxy.defaultUnicastLocatorList;
00791 CORBA::ULong mc_source_len = mc_source.length();
00792 CORBA::ULong uc_source_len = uc_source.length();
00793 CORBA::ULong target_len = target.length();
00794 target.length(mc_source_len + uc_source_len + target_len);
00795
00796 for (CORBA::ULong mci = 0; mci < mc_source.length(); ++mci) {
00797 target[target_len + mci] = mc_source[mci];
00798 }
00799
00800 for (CORBA::ULong uci = 0; uci < uc_source.length(); ++uci) {
00801 target[target_len + mc_source_len + uci] = uc_source[uci];
00802 }
00803 }
00804 return true;
00805 }
00806
00807 bool
00808 Spdp::associated() const
00809 {
00810 return !participants_.empty();
00811 }
00812
00813 bool
00814 Spdp::has_discovered_participant(const DCPS::RepoId& guid)
00815 {
00816 return participants_.find(guid) != participants_.end();
00817 }
00818
00819
00820 void
00821 Spdp::get_discovered_participant_ids(DCPS::RepoIdSet& results) const
00822 {
00823 DiscoveredParticipantMap::const_iterator idx;
00824 for (idx = participants_.begin(); idx != participants_.end(); ++idx)
00825 {
00826 results.insert(idx->first);
00827 }
00828 }
00829
00830 }
00831 }