Public Member Functions | |
SpdpTransport (Spdp *outer, bool securityGuids) | |
~SpdpTransport () | |
virtual int | handle_timeout (const ACE_Time_Value &, const void *) |
virtual int | handle_input (ACE_HANDLE h) |
virtual int | handle_exception (ACE_HANDLE fd=ACE_INVALID_HANDLE) |
void | open () |
void | write () |
void | write_i () |
void | close () |
void | dispose_unregister () |
bool | open_unicast_socket (u_short port_common, u_short participant_id) |
void | acknowledge () |
OPENDDS_SET (ACE_INET_Addr) send_addrs_ | |
Public Attributes | |
Spdp * | outer_ |
Header | hdr_ |
DataSubmessage | data_ |
DCPS::SequenceNumber | seq_ |
ACE_Time_Value | lease_duration_ |
ACE_SOCK_Dgram | unicast_socket_ |
ACE_SOCK_Dgram_Mcast | multicast_socket_ |
ACE_Message_Block | buff_ |
ACE_Message_Block | wbuff_ |
ACE_Time_Value | disco_resend_period_ |
ACE_Time_Value | last_disco_resend_ |
Definition at line 152 of file Spdp.h.
OpenDDS::RTPS::Spdp::SpdpTransport::SpdpTransport | ( | Spdp * | outer, | |
bool | securityGuids | |||
) |
Definition at line 1276 of file Spdp.cpp.
References ACE_TEXT(), ACE_TEXT_CHAR_TO_TCHAR, OpenDDS::RTPS::RtpsDiscovery::d0(), OpenDDS::RTPS::DATA, data_, OpenDDS::RTPS::DATA_OCTETS_TO_IQOS, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::RTPS::RtpsDiscovery::default_multicast_group(), ACE_SOCK_Dgram_Mcast::DEFOPT_NULLIFACE, OpenDDS::RTPS::RtpsDiscovery::dg(), OpenDDS::RTPS::Spdp::disco_, OpenDDS::RTPS::Spdp::domain_, OpenDDS::DCPS::ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER, OpenDDS::DCPS::ENTITYID_UNKNOWN, OpenDDS::RTPS::DataSubmessage::extraFlags, OpenDDS::RTPS::FLAG_D, OpenDDS::RTPS::FLAG_E, OpenDDS::RTPS::SubmessageHeader::flags, ACE_OS::getpid(), OpenDDS::RTPS::Spdp::guid_, OpenDDS::DCPS::GUID_t::guidPrefix, OpenDDS::RTPS::Header::guidPrefix, hdr_, OpenDDS::RTPS::SequenceNumber_t::high, ACE_SOCK_Dgram_Mcast::join(), LM_ERROR, LM_INFO, OpenDDS::RTPS::SequenceNumber_t::low, OpenDDS::RTPS::RtpsDiscovery::multicast_interface(), multicast_socket_, OpenDDS::RTPS::DataSubmessage::octetsToInlineQos, open_unicast_socket(), OPENDDS_STRING, ACE_SOCK_Dgram_Mcast::OPT_BINDADDR_NO, ACE_SOCK_Dgram_Mcast::opts(), outer_, OpenDDS::RTPS::RtpsDiscovery::pb(), OpenDDS::RTPS::Header::prefix, OpenDDS::RTPS::PROTOCOLVERSION, OpenDDS::RTPS::DataSubmessage::readerId, ACE_INET_Addr::set(), OpenDDS::RTPS::DataSubmessage::smHeader, OpenDDS::RTPS::RtpsDiscovery::spdp_send_addrs(), OpenDDS::RTPS::SubmessageHeader::submessageId, OpenDDS::RTPS::SubmessageHeader::submessageLength, OpenDDS::RTPS::Header::vendorId, OpenDDS::RTPS::VENDORID_OPENDDS, OpenDDS::RTPS::Header::version, while(), OpenDDS::RTPS::DataSubmessage::writerId, and OpenDDS::RTPS::DataSubmessage::writerSN.
01277 : outer_(outer), lease_duration_(outer_->disco_->resend_period() * LEASE_MULT) 01278 , buff_(64 * 1024) 01279 , wbuff_(64 * 1024) 01280 { 01281 hdr_.prefix[0] = 'R'; 01282 hdr_.prefix[1] = 'T'; 01283 hdr_.prefix[2] = 'P'; 01284 hdr_.prefix[3] = 'S'; 01285 hdr_.version = PROTOCOLVERSION; 01286 hdr_.vendorId = VENDORID_OPENDDS; 01287 std::memcpy(hdr_.guidPrefix, outer_->guid_.guidPrefix, sizeof(GuidPrefix_t)); 01288 data_.smHeader.submessageId = DATA; 01289 data_.smHeader.flags = FLAG_E | FLAG_D; 01290 data_.smHeader.submessageLength = 0; // last submessage in the Message 01291 data_.extraFlags = 0; 01292 data_.octetsToInlineQos = DATA_OCTETS_TO_IQOS; 01293 data_.readerId = ENTITYID_UNKNOWN; 01294 data_.writerId = ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER; 01295 data_.writerSN.high = 0; 01296 data_.writerSN.low = 0; 01297 01298 // Ports are set by the formulas in RTPS v2.1 Table 9.8 01299 const u_short port_common = outer_->disco_->pb() + 01300 (outer_->disco_->dg() * outer_->domain_), 01301 mc_port = port_common + outer_->disco_->d0(); 01302 01303 // with security enabled the meaning of the bytes in guidPrefix changes 01304 u_short participantId = securityGuids ? 0 01305 : (hdr_.guidPrefix[10] << 8) | hdr_.guidPrefix[11]; 01306 01307 #ifdef OPENDDS_SAFETY_PROFILE 01308 const u_short startingParticipantId = participantId; 01309 #endif 01310 01311 while (!open_unicast_socket(port_common, participantId)) { 01312 ++participantId; 01313 } 01314 01315 #ifdef OPENDDS_SAFETY_PROFILE 01316 if (participantId > startingParticipantId && ACE_OS::getpid() == -1) { 01317 // Since pids are not available, use the fact that we had to increment 01318 // participantId to modify the GUID's pid bytes. This avoids GUID conflicts 01319 // between processes on the same host which start at the same time 01320 // (resulting in the same seed value for the random number generator). 01321 hdr_.guidPrefix[8] = static_cast<CORBA::Octet>(participantId >> 8); 01322 hdr_.guidPrefix[9] = static_cast<CORBA::Octet>(participantId & 0xFF); 01323 outer_->guid_.guidPrefix[8] = hdr_.guidPrefix[8]; 01324 outer_->guid_.guidPrefix[9] = hdr_.guidPrefix[9]; 01325 } 01326 #endif 01327 01328 OPENDDS_STRING mc_addr = outer_->disco_->default_multicast_group(); 01329 ACE_INET_Addr default_multicast; 01330 if (0 != default_multicast.set(mc_port, mc_addr.c_str())) { 01331 ACE_DEBUG(( 01332 LM_ERROR, 01333 ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::SpdpTransport() - ") 01334 ACE_TEXT("failed setting default_multicast address %C:%hd %p\n"), 01335 mc_addr.c_str(), mc_port, ACE_TEXT("ACE_INET_Addr::set"))); 01336 throw std::runtime_error("failed to set default_multicast address"); 01337 } 01338 01339 const OPENDDS_STRING& net_if = outer_->disco_->multicast_interface(); 01340 01341 if (DCPS::DCPS_debug_level > 3) { 01342 ACE_DEBUG((LM_INFO, 01343 ACE_TEXT("(%P|%t) Spdp::SpdpTransport::SpdpTransport ") 01344 ACE_TEXT("joining group %C %C:%hd\n"), 01345 net_if.c_str (), 01346 mc_addr.c_str (), 01347 mc_port)); 01348 } 01349 01350 #ifdef ACE_HAS_MAC_OSX 01351 multicast_socket_.opts(ACE_SOCK_Dgram_Mcast::OPT_BINDADDR_NO | 01352 ACE_SOCK_Dgram_Mcast::DEFOPT_NULLIFACE); 01353 #endif 01354 01355 if (0 != multicast_socket_.join(default_multicast, 1, 01356 net_if.empty() ? 0 : 01357 ACE_TEXT_CHAR_TO_TCHAR(net_if.c_str()))) { 01358 ACE_ERROR((LM_ERROR, 01359 ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::SpdpTransport() - ") 01360 ACE_TEXT("failed to join multicast group %C:%hd %p\n"), 01361 mc_addr.c_str(), mc_port, ACE_TEXT("ACE_SOCK_Dgram_Mcast::join"))); 01362 throw std::runtime_error("failed to join multicast group"); 01363 } 01364 01365 send_addrs_.insert(default_multicast); 01366 01367 typedef RtpsDiscovery::AddrVec::iterator iter; 01368 for (iter it = outer_->disco_->spdp_send_addrs().begin(), 01369 end = outer_->disco_->spdp_send_addrs().end(); it != end; ++it) { 01370 send_addrs_.insert(ACE_INET_Addr(it->c_str())); 01371 } 01372 }
OpenDDS::RTPS::Spdp::SpdpTransport::~SpdpTransport | ( | ) |
Definition at line 1398 of file Spdp.cpp.
References CORBA::Exception::_info(), ACE_TEXT(), ACE_String_Base< ACE_CHAR_T >::c_str(), ACE_SOCK::close(), OpenDDS::DCPS::DCPS_debug_level, dispose_unregister(), OpenDDS::RTPS::Spdp::eh_shutdown_, LM_INFO, LM_WARNING, OpenDDS::DCPS::LocalParticipant< EndpointManagerType >::lock_, multicast_socket_, outer_, OpenDDS::RTPS::Spdp::shutdown_cond_, ACE_Condition< ACE_Thread_Mutex >::signal(), and unicast_socket_.
01399 { 01400 if (DCPS::DCPS_debug_level > 3) { 01401 ACE_DEBUG((LM_INFO, ACE_TEXT("(%P|%t) SpdpTransport::~SpdpTransport\n"))); 01402 } 01403 try { 01404 dispose_unregister(); 01405 } 01406 catch (const CORBA::Exception& ex) { 01407 if (DCPS::DCPS_debug_level > 0) { 01408 ACE_DEBUG((LM_WARNING, 01409 ACE_TEXT("(%P|%t) WARNING: Exception caught in ") 01410 ACE_TEXT("SpdpTransport::~SpdpTransport: %C\n"), 01411 ex._info().c_str())); 01412 } 01413 } 01414 { 01415 // Acquire lock for modification of condition variable 01416 ACE_GUARD(ACE_Thread_Mutex, g, outer_->lock_); 01417 outer_->eh_shutdown_ = true; 01418 } 01419 outer_->shutdown_cond_.signal(); 01420 unicast_socket_.close(); 01421 multicast_socket_.close(); 01422 }
void OpenDDS::RTPS::Spdp::SpdpTransport::acknowledge | ( | ) |
Definition at line 1683 of file Spdp.cpp.
References ACE_Reactor::notify(), outer_, OpenDDS::RTPS::Spdp::reactor(), and ACE_Event_Handler::reactor().
Referenced by OpenDDS::RTPS::Spdp::fini_bit().
01684 { 01685 ACE_Reactor* reactor = outer_->reactor(); 01686 reactor->notify(this); 01687 }
void OpenDDS::RTPS::Spdp::SpdpTransport::close | ( | void | ) |
Definition at line 1466 of file Spdp.cpp.
References ACE_TEXT(), ACE_Reactor::cancel_timer(), OpenDDS::DCPS::DCPS_debug_level, ACE_Event_Handler::DONT_CALL, ACE_IPC_SAP::get_handle(), LM_INFO, multicast_socket_, outer_, OpenDDS::RTPS::Spdp::reactor(), ACE_Event_Handler::reactor(), ACE_Event_Handler::READ_MASK, ACE_Reactor::remove_handler(), and unicast_socket_.
Referenced by OpenDDS::RTPS::Spdp::~Spdp().
01467 { 01468 if (DCPS::DCPS_debug_level > 3) { 01469 ACE_DEBUG((LM_INFO, ACE_TEXT("(%P|%t) SpdpTransport::close\n"))); 01470 } 01471 ACE_Reactor* reactor = outer_->reactor(); 01472 reactor->cancel_timer(this); 01473 const ACE_Reactor_Mask mask = 01474 ACE_Event_Handler::READ_MASK | ACE_Event_Handler::DONT_CALL; 01475 reactor->remove_handler(multicast_socket_.get_handle(), mask); 01476 reactor->remove_handler(unicast_socket_.get_handle(), mask); 01477 }
void OpenDDS::RTPS::Spdp::SpdpTransport::dispose_unregister | ( | ) |
Definition at line 1425 of file Spdp.cpp.
References ACE_TEXT(), OpenDDS::DCPS::Serializer::ALIGN_CDR, data_, OpenDDS::RTPS::FLAG_E, OpenDDS::RTPS::FLAG_K_IN_DATA, OpenDDS::RTPS::FLAG_Q, OpenDDS::RTPS::SubmessageHeader::flags, OpenDDS::DCPS::SequenceNumber::getHigh(), OpenDDS::DCPS::SequenceNumber::getLow(), OpenDDS::RTPS::Spdp::guid_, hdr_, OpenDDS::RTPS::SequenceNumber_t::high, OpenDDS::RTPS::DataSubmessage::inlineQos, ACE_Message_Block::length(), LM_ERROR, OpenDDS::RTPS::SequenceNumber_t::low, OPENDDS_SET(), outer_, OpenDDS::RTPS::PID_PARTICIPANT_GUID, ACE_Message_Block::rd_ptr(), ACE_Message_Block::reset(), ACE_SOCK_Dgram::send(), seq_, OpenDDS::RTPS::DataSubmessage::smHeader, unicast_socket_, wbuff_, and OpenDDS::RTPS::DataSubmessage::writerSN.
Referenced by ~SpdpTransport().
01426 { 01427 // Send the dispose/unregister SPDP sample 01428 data_.writerSN.high = seq_.getHigh(); 01429 data_.writerSN.low = seq_.getLow(); 01430 data_.smHeader.flags = FLAG_E | FLAG_Q | FLAG_K_IN_DATA; 01431 data_.inlineQos.length(1); 01432 static const StatusInfo_t dispose_unregister = { {0, 0, 0, 3} }; 01433 data_.inlineQos[0].status_info(dispose_unregister); 01434 01435 ParameterList plist(1); 01436 plist.length(1); 01437 plist[0].guid(outer_->guid_); 01438 plist[0]._d(PID_PARTICIPANT_GUID); 01439 01440 wbuff_.reset(); 01441 DCPS::Serializer ser(&wbuff_, false, DCPS::Serializer::ALIGN_CDR); 01442 CORBA::UShort options = 0; 01443 if (!(ser << hdr_) || !(ser << data_) || !(ser << encap_LE) || !(ser << options) 01444 || !(ser << plist)) { 01445 ACE_ERROR((LM_ERROR, 01446 ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::dispose_unregister() - ") 01447 ACE_TEXT("failed to serialize headers for dispose/unregister\n"))); 01448 return; 01449 } 01450 01451 typedef OPENDDS_SET(ACE_INET_Addr)::const_iterator iter_t; 01452 for (iter_t iter = send_addrs_.begin(); iter != send_addrs_.end(); ++iter) { 01453 const ssize_t res = 01454 unicast_socket_.send(wbuff_.rd_ptr(), wbuff_.length(), *iter); 01455 if (res < 0) { 01456 ACE_TCHAR addr_buff[256] = {}; 01457 iter->addr_to_string(addr_buff, 256, 0); 01458 ACE_ERROR((LM_ERROR, 01459 ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::dispose_unregister() - ") 01460 ACE_TEXT("destination %s failed %p\n"), addr_buff, ACE_TEXT("send"))); 01461 } 01462 } 01463 }
int OpenDDS::RTPS::Spdp::SpdpTransport::handle_exception | ( | ACE_HANDLE | fd = ACE_INVALID_HANDLE |
) | [virtual] |
Reimplemented from ACE_Event_Handler.
Definition at line 1676 of file Spdp.cpp.
References OpenDDS::RTPS::WaitForAcks::ack(), outer_, and OpenDDS::RTPS::Spdp::wait_for_acks().
01677 { 01678 outer_->wait_for_acks().ack(); 01679 return 0; 01680 }
int OpenDDS::RTPS::Spdp::SpdpTransport::handle_input | ( | ACE_HANDLE | h | ) | [virtual] |
Reimplemented from ACE_Event_Handler.
Definition at line 1554 of file Spdp.cpp.
References ACE_TEXT(), OpenDDS::DCPS::Serializer::ALIGN_CDR, buff_, OpenDDS::RTPS::DATA, OpenDDS::RTPS::Spdp::data_received(), OpenDDS::DCPS::ENTITYID_PARTICIPANT, OpenDDS::DCPS::ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER, OpenDDS::RTPS::FLAG_D, OpenDDS::RTPS::FLAG_E, OpenDDS::RTPS::FLAG_K_IN_DATA, OpenDDS::RTPS::SubmessageHeader::flags, ACE_IPC_SAP::get_handle(), OpenDDS::RTPS::Header::guidPrefix, header, ACE_Message_Block::length(), LM_ERROR, ACE_OS::memcmp(), multicast_socket_, outer_, OpenDDS::RTPS::PID_PARTICIPANT_GUID, ACE_Message_Block::rd_ptr(), read(), ACE_SOCK_Dgram::recv(), ACE_Message_Block::reset(), ACE_Message_Block::size(), OpenDDS::DCPS::Serializer::skip(), OpenDDS::RTPS::SMHDR_SZ, OpenDDS::RTPS::DataSubmessage::smHeader, socket(), ACE_Message_Block::space(), OpenDDS::RTPS::SubmessageHeader::submessageLength, OpenDDS::DCPS::Serializer::swap_bytes(), unicast_socket_, ACE_Message_Block::wr_ptr(), and OpenDDS::RTPS::DataSubmessage::writerId.
01555 { 01556 const ACE_SOCK_Dgram& socket = (h == unicast_socket_.get_handle()) 01557 ? unicast_socket_ : multicast_socket_; 01558 ACE_INET_Addr remote; 01559 buff_.reset(); 01560 const ssize_t bytes = socket.recv(buff_.wr_ptr(), buff_.space(), remote); 01561 01562 if (bytes > 0) { 01563 buff_.wr_ptr(bytes); 01564 } else if (bytes == 0) { 01565 return -1; 01566 } else { 01567 ACE_DEBUG(( 01568 LM_ERROR, 01569 ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::handle_input() - ") 01570 ACE_TEXT("error reading from %C socket %p\n") 01571 , (h == unicast_socket_.get_handle()) ? "unicast" : "multicast", 01572 ACE_TEXT("ACE_SOCK_Dgram::recv"))); 01573 return -1; 01574 } 01575 01576 // Handle some RTI protocol multicast to the same address 01577 if ((buff_.size() >= 4) && (!ACE_OS::memcmp(buff_.rd_ptr(), "RTPX", 4))) { 01578 return 0; // Ignore 01579 } 01580 01581 DCPS::Serializer ser(&buff_, false, DCPS::Serializer::ALIGN_CDR); 01582 Header header; 01583 if (!(ser >> header)) { 01584 ACE_ERROR((LM_ERROR, 01585 ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::handle_input() - ") 01586 ACE_TEXT("failed to deserialize RTPS header for SPDP\n"))); 01587 return 0; 01588 } 01589 01590 while (buff_.length() > 3) { 01591 const char subm = buff_.rd_ptr()[0], flags = buff_.rd_ptr()[1]; 01592 ser.swap_bytes((flags & FLAG_E) != ACE_CDR_BYTE_ORDER); 01593 const size_t start = buff_.length(); 01594 CORBA::UShort submessageLength = 0; 01595 switch (subm) { 01596 case DATA: { 01597 DataSubmessage data; 01598 if (!(ser >> data)) { 01599 ACE_ERROR(( 01600 LM_ERROR, 01601 ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::handle_input() - ") 01602 ACE_TEXT("failed to deserialize DATA header for SPDP\n"))); 01603 return 0; 01604 } 01605 submessageLength = data.smHeader.submessageLength; 01606 01607 if (data.writerId != ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER) { 01608 // Not our message: this could be the same multicast group used 01609 // for SEDP and other traffic. 01610 break; 01611 } 01612 01613 ParameterList plist; 01614 if (data.smHeader.flags & (FLAG_D | FLAG_K_IN_DATA)) { 01615 ser.swap_bytes(!ACE_CDR_BYTE_ORDER); // read "encap" itself in LE 01616 CORBA::UShort encap, options; 01617 if (!(ser >> encap) || (encap != encap_LE && encap != encap_BE)) { 01618 ACE_ERROR((LM_ERROR, 01619 ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::handle_input() - ") 01620 ACE_TEXT("failed to deserialize encapsulation header for SPDP\n"))); 01621 return 0; 01622 } 01623 ser >> options; 01624 // bit 8 in encap is on if it's PL_CDR_LE 01625 ser.swap_bytes(((encap & 0x100) >> 8) != ACE_CDR_BYTE_ORDER); 01626 if (!(ser >> plist)) { 01627 ACE_ERROR((LM_ERROR, 01628 ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::handle_input() - ") 01629 ACE_TEXT("failed to deserialize data payload for SPDP\n"))); 01630 return 0; 01631 } 01632 } else { 01633 plist.length(1); 01634 RepoId guid; 01635 std::memcpy(guid.guidPrefix, header.guidPrefix, sizeof(GuidPrefix_t)); 01636 guid.entityId = ENTITYID_PARTICIPANT; 01637 plist[0].guid(guid); 01638 plist[0]._d(PID_PARTICIPANT_GUID); 01639 } 01640 01641 outer_->data_received(data, plist); 01642 break; 01643 } 01644 default: 01645 SubmessageHeader smHeader; 01646 if (!(ser >> smHeader)) { 01647 ACE_ERROR(( 01648 LM_ERROR, 01649 ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::handle_input() - ") 01650 ACE_TEXT("failed to deserialize SubmessageHeader for SPDP\n"))); 01651 return 0; 01652 } 01653 submessageLength = smHeader.submessageLength; 01654 break; 01655 } 01656 if (submessageLength && buff_.length()) { 01657 const size_t read = start - buff_.length(); 01658 if (read < static_cast<size_t>(submessageLength + SMHDR_SZ)) { 01659 if (!ser.skip(static_cast<CORBA::UShort>(submessageLength + SMHDR_SZ 01660 - read))) { 01661 ACE_ERROR((LM_ERROR, 01662 ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::handle_input() - ") 01663 ACE_TEXT("failed to skip sub message length\n"))); 01664 return 0; 01665 } 01666 } 01667 } else if (!submessageLength) { 01668 break; // submessageLength of 0 indicates the last submessage 01669 } 01670 } 01671 01672 return 0; 01673 }
int OpenDDS::RTPS::Spdp::SpdpTransport::handle_timeout | ( | const ACE_Time_Value & | tv, | |
const void * | ||||
) | [virtual] |
Reimplemented from ACE_Event_Handler.
Definition at line 1538 of file Spdp.cpp.
References disco_resend_period_, last_disco_resend_, outer_, OpenDDS::RTPS::Spdp::remove_expired_participants(), and write().
01539 { 01540 if (tv > last_disco_resend_ + disco_resend_period_) { 01541 write(); 01542 outer_->remove_expired_participants(); 01543 last_disco_resend_ = tv; 01544 } 01545 01546 #if defined(OPENDDS_SECURITY) 01547 outer_->check_auth_states(tv); 01548 #endif 01549 01550 return 0; 01551 }
void OpenDDS::RTPS::Spdp::SpdpTransport::open | ( | void | ) |
Definition at line 1375 of file Spdp.cpp.
References OpenDDS::RTPS::Spdp::disco_, disco_resend_period_, ACE_IPC_SAP::get_handle(), last_disco_resend_, multicast_socket_, outer_, OpenDDS::RTPS::Spdp::reactor(), ACE_Event_Handler::reactor(), ACE_Event_Handler::READ_MASK, ACE_Reactor::register_handler(), OpenDDS::RTPS::RtpsDiscovery::resend_period(), ACE_Reactor::schedule_timer(), and unicast_socket_.
Referenced by OpenDDS::RTPS::Spdp::init_bit().
01376 { 01377 ACE_Reactor* reactor = outer_->reactor(); 01378 if (reactor->register_handler(unicast_socket_.get_handle(), 01379 this, ACE_Event_Handler::READ_MASK) != 0) { 01380 throw std::runtime_error("failed to register unicast input handler"); 01381 } 01382 01383 if (reactor->register_handler(multicast_socket_.get_handle(), 01384 this, ACE_Event_Handler::READ_MASK) != 0) { 01385 throw std::runtime_error("failed to register multicast input handler"); 01386 } 01387 01388 disco_resend_period_ = outer_->disco_->resend_period(); 01389 last_disco_resend_ = 0; 01390 01391 ACE_Time_Value timer_period = disco_resend_period_ < MAX_SPDP_TIMER_PERIOD ? disco_resend_period_ : MAX_SPDP_TIMER_PERIOD; 01392 01393 if (-1 == reactor->schedule_timer(this, 0, ACE_Time_Value(0), timer_period)) { 01394 throw std::runtime_error("failed to schedule timer with reactor"); 01395 } 01396 }
bool OpenDDS::RTPS::Spdp::SpdpTransport::open_unicast_socket | ( | u_short | port_common, | |
u_short | participant_id | |||
) |
Definition at line 1696 of file Spdp.cpp.
References ACE_TEXT(), OpenDDS::RTPS::RtpsDiscovery::d1(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::RTPS::Spdp::disco_, LM_ERROR, LM_INFO, LM_WARNING, OpenDDS::DCPS::open_appropriate_socket_type(), OPENDDS_STRING, outer_, OpenDDS::RTPS::RtpsDiscovery::pg(), ACE_INET_Addr::set(), OpenDDS::DCPS::set_socket_multicast_ttl(), OpenDDS::RTPS::RtpsDiscovery::spdp_local_address(), OpenDDS::RTPS::RtpsDiscovery::ttl(), and unicast_socket_.
Referenced by SpdpTransport().
01698 { 01699 const u_short uni_port = port_common + outer_->disco_->d1() + 01700 (outer_->disco_->pg() * participant_id); 01701 01702 ACE_INET_Addr local_addr; 01703 OPENDDS_STRING spdpaddr = outer_->disco_->spdp_local_address().c_str(); 01704 01705 if (spdpaddr.empty()) { 01706 spdpaddr = "0.0.0.0"; 01707 } 01708 01709 if (0 != local_addr.set(uni_port, spdpaddr.c_str())) { 01710 ACE_DEBUG(( 01711 LM_ERROR, 01712 ACE_TEXT("(%P|%t) Spdp::SpdpTransport::open_unicast_socket() - ") 01713 ACE_TEXT("failed setting unicast local_addr to port %d %p\n"), 01714 uni_port, ACE_TEXT("ACE_INET_Addr::set"))); 01715 throw std::runtime_error("failed to set unicast local address"); 01716 } 01717 01718 if (!DCPS::open_appropriate_socket_type(unicast_socket_, local_addr)) { 01719 if (DCPS::DCPS_debug_level > 3) { 01720 ACE_DEBUG(( 01721 LM_WARNING, 01722 ACE_TEXT("(%P|%t) Spdp::SpdpTransport::open_unicast_socket() - ") 01723 ACE_TEXT("failed to open_appropriate_socket_type unicast socket on port %d %p. ") 01724 ACE_TEXT("Trying next participantId...\n"), 01725 uni_port, ACE_TEXT("ACE_SOCK_Dgram::open"))); 01726 } 01727 return false; 01728 01729 } else if (DCPS::DCPS_debug_level > 3) { 01730 ACE_DEBUG(( 01731 LM_INFO, 01732 ACE_TEXT("(%P|%t) Spdp::SpdpTransport::open_unicast_socket() - ") 01733 ACE_TEXT("opened unicast socket on port %d\n"), 01734 uni_port)); 01735 } 01736 01737 if (!DCPS::set_socket_multicast_ttl(unicast_socket_, outer_->disco_->ttl())) { 01738 ACE_ERROR((LM_ERROR, 01739 ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::open_unicast_socket() - ") 01740 ACE_TEXT("failed to set TTL value to %d ") 01741 ACE_TEXT("for port:%hd %p\n"), 01742 outer_->disco_->ttl(), uni_port, ACE_TEXT("DCPS::set_socket_multicast_ttl:"))); 01743 throw std::runtime_error("failed to set TTL"); 01744 } 01745 return true; 01746 }
OpenDDS::RTPS::Spdp::SpdpTransport::OPENDDS_SET | ( | ACE_INET_Addr | ) |
void OpenDDS::RTPS::Spdp::SpdpTransport::write | ( | ) |
Definition at line 1480 of file Spdp.cpp.
References OpenDDS::DCPS::LocalParticipant< EndpointManagerType >::lock_, outer_, and write_i().
Referenced by handle_timeout().
01481 { 01482 ACE_GUARD(ACE_Thread_Mutex, g, outer_->lock_); 01483 write_i(); 01484 }
void OpenDDS::RTPS::Spdp::SpdpTransport::write_i | ( | ) |
Definition at line 1487 of file Spdp.cpp.
References ACE_TEXT(), OpenDDS::DCPS::Serializer::ALIGN_CDR, OpenDDS::RTPS::Spdp::build_local_pdata(), data_, OpenDDS::Security::DPDK_ENHANCED, OpenDDS::Security::DPDK_ORIGINAL, OpenDDS::DCPS::SequenceNumber::getHigh(), OpenDDS::DCPS::SequenceNumber::getLow(), hdr_, OpenDDS::RTPS::SequenceNumber_t::high, ACE_Message_Block::length(), LM_ERROR, OpenDDS::RTPS::SequenceNumber_t::low, OPENDDS_SET(), outer_, ACE_Message_Block::rd_ptr(), ACE_Message_Block::reset(), ACE_SOCK_Dgram::send(), seq_, OpenDDS::RTPS::ParameterListConverter::to_param_list(), unicast_socket_, wbuff_, and OpenDDS::RTPS::DataSubmessage::writerSN.
Referenced by OpenDDS::RTPS::Spdp::handle_participant_data(), and write().
01488 { 01489 #if defined(OPENDDS_SECURITY) 01490 const Security::SPDPdiscoveredParticipantData& pdata = 01491 outer_->build_local_pdata(outer_->is_security_enabled() ? 01492 Security::DPDK_ENHANCED : 01493 Security::DPDK_ORIGINAL); 01494 #else 01495 const Security::SPDPdiscoveredParticipantData& pdata = 01496 outer_->build_local_pdata(Security::DPDK_ORIGINAL); 01497 #endif 01498 01499 data_.writerSN.high = seq_.getHigh(); 01500 data_.writerSN.low = seq_.getLow(); 01501 ++seq_; 01502 01503 ParameterList plist; 01504 if (ParameterListConverter::to_param_list(pdata, plist) < 0) { 01505 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ") 01506 ACE_TEXT("Spdp::SpdpTransport::write() - ") 01507 ACE_TEXT("failed to convert from SPDPdiscoveredParticipantData ") 01508 ACE_TEXT("to ParameterList\n"))); 01509 return; 01510 } 01511 01512 wbuff_.reset(); 01513 CORBA::UShort options = 0; 01514 DCPS::Serializer ser(&wbuff_, false, DCPS::Serializer::ALIGN_CDR); 01515 if (!(ser << hdr_) || !(ser << data_) || !(ser << encap_LE) || !(ser << options) 01516 || !(ser << plist)) { 01517 ACE_ERROR((LM_ERROR, 01518 ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::write() - ") 01519 ACE_TEXT("failed to serialize headers for SPDP\n"))); 01520 return; 01521 } 01522 01523 typedef OPENDDS_SET(ACE_INET_Addr)::const_iterator iter_t; 01524 for (iter_t iter = send_addrs_.begin(); iter != send_addrs_.end(); ++iter) { 01525 const ssize_t res = 01526 unicast_socket_.send(wbuff_.rd_ptr(), wbuff_.length(), *iter); 01527 if (res < 0) { 01528 ACE_TCHAR addr_buff[256] = {}; 01529 iter->addr_to_string(addr_buff, 256, 0); 01530 ACE_ERROR((LM_ERROR, 01531 ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::write() - ") 01532 ACE_TEXT("destination %s failed %p\n"), addr_buff, ACE_TEXT("send"))); 01533 } 01534 } 01535 }
Definition at line 176 of file Spdp.h.
Referenced by handle_input().
Definition at line 170 of file Spdp.h.
Referenced by dispose_unregister(), SpdpTransport(), and write_i().
Definition at line 177 of file Spdp.h.
Referenced by handle_timeout(), and open().
Definition at line 169 of file Spdp.h.
Referenced by dispose_unregister(), SpdpTransport(), and write_i().
Definition at line 178 of file Spdp.h.
Referenced by handle_timeout(), and open().
Definition at line 174 of file Spdp.h.
Referenced by close(), handle_input(), open(), SpdpTransport(), and ~SpdpTransport().
Definition at line 168 of file Spdp.h.
Referenced by acknowledge(), close(), dispose_unregister(), handle_exception(), handle_input(), handle_timeout(), open(), open_unicast_socket(), SpdpTransport(), write(), write_i(), and ~SpdpTransport().
Definition at line 171 of file Spdp.h.
Referenced by dispose_unregister(), and write_i().
Definition at line 173 of file Spdp.h.
Referenced by close(), dispose_unregister(), handle_input(), open(), open_unicast_socket(), write_i(), and ~SpdpTransport().
Definition at line 176 of file Spdp.h.
Referenced by dispose_unregister(), and write_i().