Public Member Functions | |
SpdpTransport (Spdp *outer) | |
~SpdpTransport () | |
virtual int | handle_timeout (const ACE_Time_Value &, const void *) |
virtual int | handle_input (ACE_HANDLE h) |
virtual int | handle_exception (ACE_HANDLE fd=ACE_INVALID_HANDLE) |
void | open () |
void | write () |
void | write_i () |
void | close () |
void | dispose_unregister () |
bool | open_unicast_socket (u_short port_common, u_short participant_id) |
void | acknowledge () |
OPENDDS_SET (ACE_INET_Addr) send_addrs_ | |
Public Attributes | |
Spdp * | outer_ |
Header | hdr_ |
DataSubmessage | data_ |
DCPS::SequenceNumber | seq_ |
ACE_Time_Value | lease_duration_ |
ACE_SOCK_Dgram | unicast_socket_ |
ACE_SOCK_Dgram_Mcast | multicast_socket_ |
ACE_Message_Block | buff_ |
ACE_Message_Block | wbuff_ |
Definition at line 86 of file Spdp.h.
OpenDDS::RTPS::Spdp::SpdpTransport::SpdpTransport | ( | Spdp * | outer | ) | [explicit] |
Definition at line 300 of file Spdp.cpp.
References OpenDDS::RTPS::RtpsDiscovery::d0(), OpenDDS::RTPS::DATA, data_, OpenDDS::RTPS::DATA_OCTETS_TO_IQOS, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::RTPS::RtpsDiscovery::default_multicast_group(), OpenDDS::RTPS::RtpsDiscovery::dg(), OpenDDS::RTPS::Spdp::disco_, OpenDDS::RTPS::Spdp::domain_, OpenDDS::DCPS::ENABLED, OpenDDS::DCPS::ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER, OpenDDS::DCPS::ENTITYID_UNKNOWN, OpenDDS::RTPS::DataSubmessage::extraFlags, OpenDDS::RTPS::SubmessageHeader::flags, OpenDDS::RTPS::Spdp::guid_, OpenDDS::DCPS::GUID_t::guidPrefix, OpenDDS::RTPS::Header::guidPrefix, hdr_, OpenDDS::RTPS::SequenceNumber_t::high, OpenDDS::RTPS::SequenceNumber_t::low, OpenDDS::RTPS::RtpsDiscovery::multicast_interface(), multicast_socket_, OpenDDS::RTPS::DataSubmessage::octetsToInlineQos, open_unicast_socket(), OPENDDS_STRING, outer_, OpenDDS::RTPS::RtpsDiscovery::pb(), OpenDDS::RTPS::Header::prefix, OpenDDS::RTPS::PROTOCOLVERSION, OpenDDS::RTPS::DataSubmessage::readerId, OpenDDS::RTPS::DataSubmessage::smHeader, OpenDDS::RTPS::RtpsDiscovery::spdp_send_addrs(), OpenDDS::RTPS::SubmessageHeader::submessageId, OpenDDS::RTPS::SubmessageHeader::submessageLength, OpenDDS::RTPS::Header::vendorId, OpenDDS::RTPS::VENDORID_OPENDDS, OpenDDS::RTPS::Header::version, OpenDDS::RTPS::DataSubmessage::writerId, and OpenDDS::RTPS::DataSubmessage::writerSN.
00301 : outer_(outer), lease_duration_(outer_->disco_->resend_period() * LEASE_MULT) 00302 , buff_(64 * 1024) 00303 , wbuff_(64 * 1024) 00304 { 00305 hdr_.prefix[0] = 'R'; 00306 hdr_.prefix[1] = 'T'; 00307 hdr_.prefix[2] = 'P'; 00308 hdr_.prefix[3] = 'S'; 00309 hdr_.version = PROTOCOLVERSION; 00310 hdr_.vendorId = VENDORID_OPENDDS; 00311 std::memcpy(hdr_.guidPrefix, outer_->guid_.guidPrefix, sizeof(GuidPrefix_t)); 00312 data_.smHeader.submessageId = DATA; 00313 data_.smHeader.flags = 1 /*FLAG_E*/ | 4 /*FLAG_D*/; 00314 data_.smHeader.submessageLength = 0; // last submessage in the Message 00315 data_.extraFlags = 0; 00316 data_.octetsToInlineQos = DATA_OCTETS_TO_IQOS; 00317 data_.readerId = ENTITYID_UNKNOWN; 00318 data_.writerId = ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER; 00319 data_.writerSN.high = 0; 00320 data_.writerSN.low = 0; 00321 00322 // Ports are set by the formulas in RTPS v2.1 Table 9.8 00323 const u_short port_common = outer_->disco_->pb() + 00324 (outer_->disco_->dg() * outer_->domain_), 00325 mc_port = port_common + outer_->disco_->d0(); 00326 00327 u_short participantId = (hdr_.guidPrefix[10] << 8) | hdr_.guidPrefix[11]; 00328 00329 #ifdef OPENDDS_SAFETY_PROFILE 00330 const u_short startingParticipantId = participantId; 00331 #endif 00332 00333 while (!open_unicast_socket(port_common, participantId)) { 00334 ++participantId; 00335 } 00336 00337 #ifdef OPENDDS_SAFETY_PROFILE 00338 if (participantId > startingParticipantId && ACE_OS::getpid() == -1) { 00339 // Since pids are not available, use the fact that we had to increment 00340 // participantId to modify the GUID's pid bytes. This avoids GUID conflicts 00341 // between processes on the same host which start at the same time 00342 // (resulting in the same seed value for the random number generator). 00343 hdr_.guidPrefix[8] = static_cast<CORBA::Octet>(participantId >> 8); 00344 hdr_.guidPrefix[9] = static_cast<CORBA::Octet>(participantId & 0xFF); 00345 outer_->guid_.guidPrefix[8] = hdr_.guidPrefix[8]; 00346 outer_->guid_.guidPrefix[9] = hdr_.guidPrefix[9]; 00347 } 00348 #endif 00349 00350 OPENDDS_STRING mc_addr = outer_->disco_->default_multicast_group(); 00351 ACE_INET_Addr default_multicast; 00352 if (0 != default_multicast.set(mc_port, mc_addr.c_str())) { 00353 ACE_DEBUG(( 00354 LM_ERROR, 00355 ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::SpdpTransport() - ") 00356 ACE_TEXT("failed setting default_multicast address %C:%hd %p\n"), 00357 mc_addr.c_str(), mc_port, ACE_TEXT("ACE_INET_Addr::set"))); 00358 throw std::runtime_error("failed to set default_multicast address"); 00359 } 00360 00361 const OPENDDS_STRING& net_if = outer_->disco_->multicast_interface(); 00362 00363 if (DCPS::DCPS_debug_level > 3) { 00364 ACE_DEBUG((LM_INFO, 00365 ACE_TEXT("(%P|%t) Spdp::SpdpTransport::SpdpTransport ") 00366 ACE_TEXT("joining group %C %C:%hd\n"), 00367 net_if.c_str (), 00368 mc_addr.c_str (), 00369 mc_port)); 00370 } 00371 00372 #ifdef ACE_HAS_MAC_OSX 00373 multicast_socket_.opts(ACE_SOCK_Dgram_Mcast::OPT_BINDADDR_NO | 00374 ACE_SOCK_Dgram_Mcast::DEFOPT_NULLIFACE); 00375 #endif 00376 00377 if (0 != multicast_socket_.join(default_multicast, 1, 00378 net_if.empty() ? 0 : 00379 ACE_TEXT_CHAR_TO_TCHAR(net_if.c_str()))) { 00380 ACE_DEBUG((LM_ERROR, 00381 ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::SpdpTransport() - ") 00382 ACE_TEXT("failed to join multicast group %C:%hd %p\n"), 00383 mc_addr.c_str(), mc_port, ACE_TEXT("ACE_SOCK_Dgram_Mcast::join"))); 00384 throw std::runtime_error("failed to join multicast group"); 00385 } 00386 00387 send_addrs_.insert(default_multicast); 00388 00389 typedef RtpsDiscovery::AddrVec::iterator iter; 00390 for (iter it = outer_->disco_->spdp_send_addrs().begin(), 00391 end = outer_->disco_->spdp_send_addrs().end(); it != end; ++it) { 00392 send_addrs_.insert(ACE_INET_Addr(it->c_str())); 00393 } 00394 00395 reference_counting_policy().value(Reference_Counting_Policy::ENABLED); 00396 }
OpenDDS::RTPS::Spdp::SpdpTransport::~SpdpTransport | ( | ) |
Definition at line 418 of file Spdp.cpp.
References OpenDDS::DCPS::DCPS_debug_level, dispose_unregister(), OpenDDS::RTPS::Spdp::eh_shutdown_, OpenDDS::DCPS::LocalParticipant< EndpointManagerType >::lock_, multicast_socket_, outer_, OpenDDS::RTPS::Spdp::shutdown_cond_, and unicast_socket_.
00419 { 00420 if (DCPS::DCPS_debug_level > 3) { 00421 ACE_DEBUG((LM_INFO, ACE_TEXT("(%P|%t) SpdpTransport::~SpdpTransport\n"))); 00422 } 00423 dispose_unregister(); 00424 { 00425 // Acquire lock for modification of condition variable 00426 ACE_GUARD(ACE_Thread_Mutex, g, outer_->lock_); 00427 outer_->eh_shutdown_ = true; 00428 } 00429 outer_->shutdown_cond_.signal(); 00430 unicast_socket_.close(); 00431 multicast_socket_.close(); 00432 }
void OpenDDS::RTPS::Spdp::SpdpTransport::acknowledge | ( | ) |
Definition at line 719 of file Spdp.cpp.
References outer_, and OpenDDS::RTPS::Spdp::reactor().
Referenced by OpenDDS::RTPS::Spdp::fini_bit().
void OpenDDS::RTPS::Spdp::SpdpTransport::close | ( | ) |
Definition at line 476 of file Spdp.cpp.
References OpenDDS::DCPS::DCPS_debug_level, multicast_socket_, outer_, OpenDDS::RTPS::Spdp::reactor(), and unicast_socket_.
Referenced by OpenDDS::RTPS::Spdp::~Spdp().
00477 { 00478 if (DCPS::DCPS_debug_level > 3) { 00479 ACE_DEBUG((LM_INFO, ACE_TEXT("(%P|%t) SpdpTransport::close\n"))); 00480 } 00481 ACE_Reactor* reactor = outer_->reactor(); 00482 reactor->cancel_timer(this); 00483 const ACE_Reactor_Mask mask = 00484 ACE_Event_Handler::READ_MASK | ACE_Event_Handler::DONT_CALL; 00485 reactor->remove_handler(multicast_socket_.get_handle(), mask); 00486 reactor->remove_handler(unicast_socket_.get_handle(), mask); 00487 }
void OpenDDS::RTPS::Spdp::SpdpTransport::dispose_unregister | ( | ) |
Definition at line 435 of file Spdp.cpp.
References OpenDDS::DCPS::Serializer::ALIGN_CDR, data_, OpenDDS::RTPS::encap_LE, OpenDDS::RTPS::SubmessageHeader::flags, OpenDDS::DCPS::SequenceNumber::getHigh(), OpenDDS::DCPS::SequenceNumber::getLow(), OpenDDS::RTPS::Spdp::guid_, hdr_, OpenDDS::RTPS::SequenceNumber_t::high, OpenDDS::RTPS::DataSubmessage::inlineQos, OpenDDS::RTPS::SequenceNumber_t::low, OPENDDS_SET(), outer_, OpenDDS::RTPS::PID_PARTICIPANT_GUID, seq_, OpenDDS::RTPS::DataSubmessage::smHeader, unicast_socket_, wbuff_, and OpenDDS::RTPS::DataSubmessage::writerSN.
Referenced by ~SpdpTransport().
00436 { 00437 // Send the dispose/unregister SPDP sample 00438 data_.writerSN.high = seq_.getHigh(); 00439 data_.writerSN.low = seq_.getLow(); 00440 data_.smHeader.flags = 1 /*FLAG_E*/ | 2 /*FLAG_Q*/ | 8 /*FLAG_K*/; 00441 data_.inlineQos.length(1); 00442 static const StatusInfo_t dispose_unregister = { {0, 0, 0, 3} }; 00443 data_.inlineQos[0].status_info(dispose_unregister); 00444 00445 ParameterList plist(1); 00446 plist.length(1); 00447 plist[0].guid(outer_->guid_); 00448 plist[0]._d(PID_PARTICIPANT_GUID); 00449 00450 wbuff_.reset(); 00451 DCPS::Serializer ser(&wbuff_, false, DCPS::Serializer::ALIGN_CDR); 00452 CORBA::UShort options = 0; 00453 if (!(ser << hdr_) || !(ser << data_) || !(ser << encap_LE) || !(ser << options) 00454 || !(ser << plist)) { 00455 ACE_ERROR((LM_ERROR, 00456 ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::dispose_unregister() - ") 00457 ACE_TEXT("failed to serialize headers for dispose/unregister\n"))); 00458 return; 00459 } 00460 00461 typedef OPENDDS_SET(ACE_INET_Addr)::const_iterator iter_t; 00462 for (iter_t iter = send_addrs_.begin(); iter != send_addrs_.end(); ++iter) { 00463 const ssize_t res = 00464 unicast_socket_.send(wbuff_.rd_ptr(), wbuff_.length(), *iter); 00465 if (res < 0) { 00466 ACE_TCHAR addr_buff[256] = {}; 00467 iter->addr_to_string(addr_buff, 256, 0); 00468 ACE_ERROR((LM_ERROR, 00469 ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::dispose_unregister() - ") 00470 ACE_TEXT("destination %s failed %p\n"), addr_buff, ACE_TEXT("send"))); 00471 } 00472 } 00473 }
int OpenDDS::RTPS::Spdp::SpdpTransport::handle_exception | ( | ACE_HANDLE | fd = ACE_INVALID_HANDLE |
) | [virtual] |
Definition at line 712 of file Spdp.cpp.
References OpenDDS::RTPS::WaitForAcks::ack(), outer_, and OpenDDS::RTPS::Spdp::wait_for_acks().
00713 { 00714 outer_->wait_for_acks().ack(); 00715 return 0; 00716 }
int OpenDDS::RTPS::Spdp::SpdpTransport::handle_input | ( | ACE_HANDLE | h | ) | [virtual] |
Definition at line 596 of file Spdp.cpp.
References OpenDDS::DCPS::Serializer::ALIGN_CDR, buff_, OpenDDS::RTPS::DATA, OpenDDS::RTPS::Spdp::data_received(), OpenDDS::RTPS::encap_BE, OpenDDS::RTPS::encap_LE, OpenDDS::DCPS::ENTITYID_PARTICIPANT, OpenDDS::DCPS::ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER, OpenDDS::RTPS::SubmessageHeader::flags, header, multicast_socket_, outer_, OpenDDS::RTPS::PID_PARTICIPANT_GUID, OpenDDS::DCPS::Serializer::skip(), OpenDDS::RTPS::SMHDR_SZ, OpenDDS::RTPS::DataSubmessage::smHeader, OpenDDS::RTPS::SubmessageHeader::submessageLength, OpenDDS::DCPS::Serializer::swap_bytes(), unicast_socket_, and OpenDDS::RTPS::DataSubmessage::writerId.
00597 { 00598 const ACE_SOCK_Dgram& socket = (h == unicast_socket_.get_handle()) 00599 ? unicast_socket_ : multicast_socket_; 00600 ACE_INET_Addr remote; 00601 buff_.reset(); 00602 const ssize_t bytes = socket.recv(buff_.wr_ptr(), buff_.space(), remote); 00603 00604 if (bytes > 0) { 00605 buff_.wr_ptr(bytes); 00606 } else if (bytes == 0) { 00607 return -1; 00608 } else { 00609 ACE_DEBUG(( 00610 LM_ERROR, 00611 ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::handle_input() - ") 00612 ACE_TEXT("error reading from %C socket %p\n") 00613 , (h == unicast_socket_.get_handle()) ? "unicast" : "multicast", 00614 ACE_TEXT("ACE_SOCK_Dgram::recv"))); 00615 return -1; 00616 } 00617 00618 // Handle some RTI protocol multicast to the same address 00619 if ((buff_.size() >= 4) && (!ACE_OS::memcmp(buff_.rd_ptr(), "RTPX", 4))) { 00620 return 0; // Ignore 00621 } 00622 00623 DCPS::Serializer ser(&buff_, false, DCPS::Serializer::ALIGN_CDR); 00624 Header header; 00625 if (!(ser >> header)) { 00626 ACE_ERROR((LM_ERROR, 00627 ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::handle_input() - ") 00628 ACE_TEXT("failed to deserialize RTPS header for SPDP\n"))); 00629 return 0; 00630 } 00631 00632 while (buff_.length() > 3) { 00633 const char subm = buff_.rd_ptr()[0], flags = buff_.rd_ptr()[1]; 00634 ser.swap_bytes((flags & 1 /*FLAG_E*/) != ACE_CDR_BYTE_ORDER); 00635 const size_t start = buff_.length(); 00636 CORBA::UShort submessageLength = 0; 00637 switch (subm) { 00638 case DATA: { 00639 DataSubmessage data; 00640 if (!(ser >> data)) { 00641 ACE_ERROR(( 00642 LM_ERROR, 00643 ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::handle_input() - ") 00644 ACE_TEXT("failed to deserialize DATA header for SPDP\n"))); 00645 return 0; 00646 } 00647 submessageLength = data.smHeader.submessageLength; 00648 00649 if (data.writerId != ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER) { 00650 // Not our message: this could be the same multicast group used 00651 // for SEDP and other traffic. 00652 break; 00653 } 00654 00655 ParameterList plist; 00656 if (data.smHeader.flags & (4 /*FLAG_D*/ | 8 /*FLAG_K*/)) { 00657 ser.swap_bytes(!ACE_CDR_BYTE_ORDER); // read "encap" itself in LE 00658 CORBA::UShort encap, options; 00659 if (!(ser >> encap) || (encap != encap_LE && encap != encap_BE)) { 00660 ACE_ERROR((LM_ERROR, 00661 ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::handle_input() - ") 00662 ACE_TEXT("failed to deserialize encapsulation header for SPDP\n"))); 00663 return 0; 00664 } 00665 ser >> options; 00666 // bit 8 in encap is on if it's PL_CDR_LE 00667 ser.swap_bytes(((encap & 0x100) >> 8) != ACE_CDR_BYTE_ORDER); 00668 if (!(ser >> plist)) { 00669 ACE_ERROR((LM_ERROR, 00670 ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::handle_input() - ") 00671 ACE_TEXT("failed to deserialize data payload for SPDP\n"))); 00672 return 0; 00673 } 00674 } else { 00675 plist.length(1); 00676 RepoId guid; 00677 std::memcpy(guid.guidPrefix, header.guidPrefix, sizeof(GuidPrefix_t)); 00678 guid.entityId = ENTITYID_PARTICIPANT; 00679 plist[0].guid(guid); 00680 plist[0]._d(PID_PARTICIPANT_GUID); 00681 } 00682 00683 outer_->data_received(data, plist); 00684 break; 00685 } 00686 default: 00687 SubmessageHeader smHeader; 00688 if (!(ser >> smHeader)) { 00689 ACE_ERROR(( 00690 LM_ERROR, 00691 ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::handle_input() - ") 00692 ACE_TEXT("failed to deserialize SubmessageHeader for SPDP\n"))); 00693 return 0; 00694 } 00695 submessageLength = smHeader.submessageLength; 00696 break; 00697 } 00698 if (submessageLength && buff_.length()) { 00699 const size_t read = start - buff_.length(); 00700 if (read < static_cast<size_t>(submessageLength + SMHDR_SZ)) { 00701 ser.skip(static_cast<CORBA::UShort>(submessageLength + SMHDR_SZ - read)); 00702 } 00703 } else if (!submessageLength) { 00704 break; // submessageLength of 0 indicates the last submessage 00705 } 00706 } 00707 00708 return 0; 00709 }
int OpenDDS::RTPS::Spdp::SpdpTransport::handle_timeout | ( | const ACE_Time_Value & | , | |
const void * | ||||
) | [virtual] |
void OpenDDS::RTPS::Spdp::SpdpTransport::open | ( | ) |
Definition at line 399 of file Spdp.cpp.
References OpenDDS::RTPS::Spdp::disco_, multicast_socket_, outer_, OpenDDS::RTPS::Spdp::reactor(), OpenDDS::RTPS::RtpsDiscovery::resend_period(), and unicast_socket_.
Referenced by OpenDDS::RTPS::Spdp::init_bit().
00400 { 00401 ACE_Reactor* reactor = outer_->reactor(); 00402 if (reactor->register_handler(unicast_socket_.get_handle(), 00403 this, ACE_Event_Handler::READ_MASK) != 0) { 00404 throw std::runtime_error("failed to register unicast input handler"); 00405 } 00406 00407 if (reactor->register_handler(multicast_socket_.get_handle(), 00408 this, ACE_Event_Handler::READ_MASK) != 0) { 00409 throw std::runtime_error("failed to register multicast input handler"); 00410 } 00411 00412 const ACE_Time_Value per = outer_->disco_->resend_period(); 00413 if (-1 == reactor->schedule_timer(this, 0, ACE_Time_Value(0), per)) { 00414 throw std::runtime_error("failed to schedule timer with reactor"); 00415 } 00416 }
bool OpenDDS::RTPS::Spdp::SpdpTransport::open_unicast_socket | ( | u_short | port_common, | |
u_short | participant_id | |||
) |
Definition at line 732 of file Spdp.cpp.
References OpenDDS::RTPS::RtpsDiscovery::d1(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::RTPS::Spdp::disco_, OpenDDS::DCPS::open_appropriate_socket_type(), outer_, OpenDDS::RTPS::RtpsDiscovery::pg(), OpenDDS::DCPS::set_socket_multicast_ttl(), OpenDDS::RTPS::RtpsDiscovery::ttl(), and unicast_socket_.
Referenced by SpdpTransport().
00734 { 00735 const u_short uni_port = port_common + outer_->disco_->d1() + 00736 (outer_->disco_->pg() * participant_id); 00737 00738 ACE_INET_Addr local_addr; 00739 if (0 != local_addr.set(uni_port)) { 00740 ACE_DEBUG(( 00741 LM_ERROR, 00742 ACE_TEXT("(%P|%t) Spdp::SpdpTransport::open_unicast_socket() - ") 00743 ACE_TEXT("failed setting unicast local_addr to port %d %p\n"), 00744 uni_port, ACE_TEXT("ACE_INET_Addr::set"))); 00745 throw std::runtime_error("failed to set unicast local address"); 00746 } 00747 00748 if (!OpenDDS::DCPS::open_appropriate_socket_type(unicast_socket_, local_addr)) { 00749 if (DCPS::DCPS_debug_level > 3) { 00750 ACE_DEBUG(( 00751 LM_WARNING, 00752 ACE_TEXT("(%P|%t) Spdp::SpdpTransport::open_unicast_socket() - ") 00753 ACE_TEXT("failed to open_appropriate_socket_type unicast socket on port %d %p. ") 00754 ACE_TEXT("Trying next participantId...\n"), 00755 uni_port, ACE_TEXT("ACE_SOCK_Dgram::open"))); 00756 } 00757 return false; 00758 00759 } else if (DCPS::DCPS_debug_level > 3) { 00760 ACE_DEBUG(( 00761 LM_INFO, 00762 ACE_TEXT("(%P|%t) Spdp::SpdpTransport::open_unicast_socket() - ") 00763 ACE_TEXT("opened unicast socket on port %d\n"), 00764 uni_port)); 00765 } 00766 00767 if (!OpenDDS::DCPS::set_socket_multicast_ttl(unicast_socket_, outer_->disco_->ttl())) { 00768 ACE_DEBUG((LM_ERROR, 00769 ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::open_unicast_socket() - ") 00770 ACE_TEXT("failed to set TTL value to %d ") 00771 ACE_TEXT("for port:%hd %p\n"), 00772 outer_->disco_->ttl(), uni_port, ACE_TEXT("OpenDDS::DCPS::set_socket_multicast_ttl:"))); 00773 throw std::runtime_error("failed to set TTL"); 00774 } 00775 return true; 00776 }
OpenDDS::RTPS::Spdp::SpdpTransport::OPENDDS_SET | ( | ACE_INET_Addr | ) |
Referenced by dispose_unregister(), and write_i().
void OpenDDS::RTPS::Spdp::SpdpTransport::write | ( | ) |
Definition at line 490 of file Spdp.cpp.
References OpenDDS::DCPS::LocalParticipant< EndpointManagerType >::lock_, outer_, and write_i().
Referenced by handle_timeout().
void OpenDDS::RTPS::Spdp::SpdpTransport::write_i | ( | ) |
Definition at line 497 of file Spdp.cpp.
References OpenDDS::DCPS::Serializer::ALIGN_CDR, OpenDDS::RTPS::BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_DATA_READER, OpenDDS::RTPS::BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_DATA_WRITER, data_, OpenDDS::RTPS::DISC_BUILTIN_ENDPOINT_PARTICIPANT_ANNOUNCER, OpenDDS::RTPS::DISC_BUILTIN_ENDPOINT_PARTICIPANT_DETECTOR, OpenDDS::RTPS::DISC_BUILTIN_ENDPOINT_PUBLICATION_ANNOUNCER, OpenDDS::RTPS::DISC_BUILTIN_ENDPOINT_PUBLICATION_DETECTOR, OpenDDS::RTPS::DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_ANNOUNCER, OpenDDS::RTPS::DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_DETECTOR, OpenDDS::RTPS::encap_LE, OpenDDS::DCPS::SequenceNumber::getHigh(), OpenDDS::DCPS::SequenceNumber::getLow(), OpenDDS::RTPS::Spdp::guid_, OpenDDS::DCPS::GUID_t::guidPrefix, hdr_, OpenDDS::RTPS::SequenceNumber_t::high, lease_duration_, OpenDDS::RTPS::LOCATOR_KIND_UDPv4, OpenDDS::RTPS::SequenceNumber_t::low, OPENDDS_SET(), outer_, OpenDDS::RTPS::PROTOCOLVERSION, OpenDDS::DCPS::LocalParticipant< EndpointManagerType >::qos_, OpenDDS::RTPS::Spdp::sedp_multicast_, OpenDDS::RTPS::Spdp::sedp_unicast_, seq_, OpenDDS::RTPS::ParameterListConverter::to_param_list(), unicast_socket_, DDS::DomainParticipantQos::user_data, OpenDDS::RTPS::VENDORID_OPENDDS, wbuff_, and OpenDDS::RTPS::DataSubmessage::writerSN.
Referenced by OpenDDS::RTPS::Spdp::data_received(), and write().
00498 { 00499 static const BuiltinEndpointSet_t availableBuiltinEndpoints = 00500 DISC_BUILTIN_ENDPOINT_PARTICIPANT_ANNOUNCER | 00501 DISC_BUILTIN_ENDPOINT_PARTICIPANT_DETECTOR | 00502 DISC_BUILTIN_ENDPOINT_PUBLICATION_ANNOUNCER | 00503 DISC_BUILTIN_ENDPOINT_PUBLICATION_DETECTOR | 00504 DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_ANNOUNCER | 00505 DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_DETECTOR | 00506 BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_DATA_WRITER | 00507 BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_DATA_READER 00508 ; 00509 // The RTPS spec has no constants for the builtinTopics{Writer,Reader} 00510 00511 // This locator list should not be empty, but we won't actually be using it. 00512 // The OpenDDS publication/subscription data will have locators included. 00513 OpenDDS::DCPS::LocatorSeq nonEmptyList(1); 00514 nonEmptyList.length(1); 00515 nonEmptyList[0].kind = LOCATOR_KIND_UDPv4; 00516 nonEmptyList[0].port = 12345; 00517 std::memset(nonEmptyList[0].address, 0, 12); 00518 nonEmptyList[0].address[12] = 127; 00519 nonEmptyList[0].address[13] = 0; 00520 nonEmptyList[0].address[14] = 0; 00521 nonEmptyList[0].address[15] = 1; 00522 00523 data_.writerSN.high = seq_.getHigh(); 00524 data_.writerSN.low = seq_.getLow(); 00525 ++seq_; 00526 00527 const GuidPrefix_t& gp = outer_->guid_.guidPrefix; 00528 00529 const SPDPdiscoveredParticipantData pdata = { 00530 { // ParticipantBuiltinTopicData 00531 DDS::BuiltinTopicKey_t() /*ignored*/, 00532 outer_->qos_.user_data 00533 }, 00534 { // ParticipantProxy_t 00535 PROTOCOLVERSION, 00536 {gp[0], gp[1], gp[2], gp[3], gp[4], gp[5], 00537 gp[6], gp[7], gp[8], gp[9], gp[10], gp[11]}, 00538 VENDORID_OPENDDS, 00539 false /*expectsIQoS*/, 00540 availableBuiltinEndpoints, 00541 outer_->sedp_unicast_, 00542 outer_->sedp_multicast_, 00543 nonEmptyList /*defaultMulticastLocatorList*/, 00544 nonEmptyList /*defaultUnicastLocatorList*/, 00545 {0 /*manualLivelinessCount*/} //FUTURE: implement manual liveliness 00546 }, 00547 { // Duration_t (leaseDuration) 00548 static_cast<CORBA::Long>(lease_duration_.sec()), 00549 0 // we are not supporting fractional seconds in the lease duration 00550 } 00551 }; 00552 00553 ParameterList plist; 00554 if (ParameterListConverter::to_param_list(pdata, plist) < 0) { 00555 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ") 00556 ACE_TEXT("Spdp::SpdpTransport::write() - ") 00557 ACE_TEXT("failed to convert from SPDPdiscoveredParticipantData ") 00558 ACE_TEXT("to ParameterList\n"))); 00559 return; 00560 } 00561 00562 wbuff_.reset(); 00563 CORBA::UShort options = 0; 00564 DCPS::Serializer ser(&wbuff_, false, DCPS::Serializer::ALIGN_CDR); 00565 if (!(ser << hdr_) || !(ser << data_) || !(ser << encap_LE) || !(ser << options) 00566 || !(ser << plist)) { 00567 ACE_ERROR((LM_ERROR, 00568 ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::write() - ") 00569 ACE_TEXT("failed to serialize headers for SPDP\n"))); 00570 return; 00571 } 00572 00573 typedef OPENDDS_SET(ACE_INET_Addr)::const_iterator iter_t; 00574 for (iter_t iter = send_addrs_.begin(); iter != send_addrs_.end(); ++iter) { 00575 const ssize_t res = 00576 unicast_socket_.send(wbuff_.rd_ptr(), wbuff_.length(), *iter); 00577 if (res < 0) { 00578 ACE_TCHAR addr_buff[256] = {}; 00579 iter->addr_to_string(addr_buff, 256, 0); 00580 ACE_ERROR((LM_ERROR, 00581 ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::write() - ") 00582 ACE_TEXT("destination %s failed %p\n"), addr_buff, ACE_TEXT("send"))); 00583 } 00584 } 00585 }
ACE_Message_Block OpenDDS::RTPS::Spdp::SpdpTransport::buff_ |
Definition at line 104 of file Spdp.h.
Referenced by dispose_unregister(), SpdpTransport(), and write_i().
Definition at line 103 of file Spdp.h.
Referenced by dispose_unregister(), SpdpTransport(), and write_i().
ACE_Time_Value OpenDDS::RTPS::Spdp::SpdpTransport::lease_duration_ |
ACE_SOCK_Dgram_Mcast OpenDDS::RTPS::Spdp::SpdpTransport::multicast_socket_ |
Definition at line 108 of file Spdp.h.
Referenced by close(), handle_input(), open(), SpdpTransport(), and ~SpdpTransport().
Definition at line 102 of file Spdp.h.
Referenced by acknowledge(), close(), dispose_unregister(), handle_exception(), handle_input(), handle_timeout(), open(), open_unicast_socket(), SpdpTransport(), write(), write_i(), and ~SpdpTransport().
ACE_SOCK_Dgram OpenDDS::RTPS::Spdp::SpdpTransport::unicast_socket_ |
Definition at line 107 of file Spdp.h.
Referenced by close(), dispose_unregister(), handle_input(), open(), open_unicast_socket(), write_i(), and ~SpdpTransport().
ACE_Message_Block OpenDDS::RTPS::Spdp::SpdpTransport::wbuff_ |