OpenDDS::RTPS::Spdp::SpdpTransport Struct Reference

Inheritance diagram for OpenDDS::RTPS::Spdp::SpdpTransport:

Inheritance graph
[legend]
Collaboration diagram for OpenDDS::RTPS::Spdp::SpdpTransport:

Collaboration graph
[legend]
List of all members.

Public Member Functions

 SpdpTransport (Spdp *outer)
 ~SpdpTransport ()
virtual int handle_timeout (const ACE_Time_Value &, const void *)
virtual int handle_input (ACE_HANDLE h)
virtual int handle_exception (ACE_HANDLE fd=ACE_INVALID_HANDLE)
void open ()
void write ()
void write_i ()
void close ()
void dispose_unregister ()
bool open_unicast_socket (u_short port_common, u_short participant_id)
void acknowledge ()
 OPENDDS_SET (ACE_INET_Addr) send_addrs_

Public Attributes

Spdpouter_
Header hdr_
DataSubmessage data_
DCPS::SequenceNumber seq_
ACE_Time_Value lease_duration_
ACE_SOCK_Dgram unicast_socket_
ACE_SOCK_Dgram_Mcast multicast_socket_
ACE_Message_Block buff_
ACE_Message_Block wbuff_

Detailed Description

Definition at line 86 of file Spdp.h.


Constructor & Destructor Documentation

OpenDDS::RTPS::Spdp::SpdpTransport::SpdpTransport ( Spdp outer  )  [explicit]

Definition at line 300 of file Spdp.cpp.

References OpenDDS::RTPS::RtpsDiscovery::d0(), OpenDDS::RTPS::DATA, data_, OpenDDS::RTPS::DATA_OCTETS_TO_IQOS, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::RTPS::RtpsDiscovery::default_multicast_group(), OpenDDS::RTPS::RtpsDiscovery::dg(), OpenDDS::RTPS::Spdp::disco_, OpenDDS::RTPS::Spdp::domain_, OpenDDS::DCPS::ENABLED, OpenDDS::DCPS::ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER, OpenDDS::DCPS::ENTITYID_UNKNOWN, OpenDDS::RTPS::DataSubmessage::extraFlags, OpenDDS::RTPS::SubmessageHeader::flags, OpenDDS::RTPS::Spdp::guid_, OpenDDS::DCPS::GUID_t::guidPrefix, OpenDDS::RTPS::Header::guidPrefix, hdr_, OpenDDS::RTPS::SequenceNumber_t::high, OpenDDS::RTPS::SequenceNumber_t::low, OpenDDS::RTPS::RtpsDiscovery::multicast_interface(), multicast_socket_, OpenDDS::RTPS::DataSubmessage::octetsToInlineQos, open_unicast_socket(), OPENDDS_STRING, outer_, OpenDDS::RTPS::RtpsDiscovery::pb(), OpenDDS::RTPS::Header::prefix, OpenDDS::RTPS::PROTOCOLVERSION, OpenDDS::RTPS::DataSubmessage::readerId, OpenDDS::RTPS::DataSubmessage::smHeader, OpenDDS::RTPS::RtpsDiscovery::spdp_send_addrs(), OpenDDS::RTPS::SubmessageHeader::submessageId, OpenDDS::RTPS::SubmessageHeader::submessageLength, OpenDDS::RTPS::Header::vendorId, OpenDDS::RTPS::VENDORID_OPENDDS, OpenDDS::RTPS::Header::version, OpenDDS::RTPS::DataSubmessage::writerId, and OpenDDS::RTPS::DataSubmessage::writerSN.

00301   : outer_(outer), lease_duration_(outer_->disco_->resend_period() * LEASE_MULT)
00302   , buff_(64 * 1024)
00303   , wbuff_(64 * 1024)
00304 {
00305   hdr_.prefix[0] = 'R';
00306   hdr_.prefix[1] = 'T';
00307   hdr_.prefix[2] = 'P';
00308   hdr_.prefix[3] = 'S';
00309   hdr_.version = PROTOCOLVERSION;
00310   hdr_.vendorId = VENDORID_OPENDDS;
00311   std::memcpy(hdr_.guidPrefix, outer_->guid_.guidPrefix, sizeof(GuidPrefix_t));
00312   data_.smHeader.submessageId = DATA;
00313   data_.smHeader.flags = 1 /*FLAG_E*/ | 4 /*FLAG_D*/;
00314   data_.smHeader.submessageLength = 0; // last submessage in the Message
00315   data_.extraFlags = 0;
00316   data_.octetsToInlineQos = DATA_OCTETS_TO_IQOS;
00317   data_.readerId = ENTITYID_UNKNOWN;
00318   data_.writerId = ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER;
00319   data_.writerSN.high = 0;
00320   data_.writerSN.low = 0;
00321 
00322   // Ports are set by the formulas in RTPS v2.1 Table 9.8
00323   const u_short port_common = outer_->disco_->pb() +
00324                               (outer_->disco_->dg() * outer_->domain_),
00325     mc_port = port_common + outer_->disco_->d0();
00326 
00327   u_short participantId = (hdr_.guidPrefix[10] << 8) | hdr_.guidPrefix[11];
00328 
00329 #ifdef OPENDDS_SAFETY_PROFILE
00330   const u_short startingParticipantId = participantId;
00331 #endif
00332 
00333   while (!open_unicast_socket(port_common, participantId)) {
00334     ++participantId;
00335   }
00336 
00337 #ifdef OPENDDS_SAFETY_PROFILE
00338   if (participantId > startingParticipantId && ACE_OS::getpid() == -1) {
00339     // Since pids are not available, use the fact that we had to increment
00340     // participantId to modify the GUID's pid bytes.  This avoids GUID conflicts
00341     // between processes on the same host which start at the same time
00342     // (resulting in the same seed value for the random number generator).
00343     hdr_.guidPrefix[8] = static_cast<CORBA::Octet>(participantId >> 8);
00344     hdr_.guidPrefix[9] = static_cast<CORBA::Octet>(participantId & 0xFF);
00345     outer_->guid_.guidPrefix[8] = hdr_.guidPrefix[8];
00346     outer_->guid_.guidPrefix[9] = hdr_.guidPrefix[9];
00347   }
00348 #endif
00349 
00350   OPENDDS_STRING mc_addr = outer_->disco_->default_multicast_group();
00351   ACE_INET_Addr default_multicast;
00352   if (0 != default_multicast.set(mc_port, mc_addr.c_str())) {
00353     ACE_DEBUG((
00354           LM_ERROR,
00355           ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::SpdpTransport() - ")
00356           ACE_TEXT("failed setting default_multicast address %C:%hd %p\n"),
00357           mc_addr.c_str(), mc_port, ACE_TEXT("ACE_INET_Addr::set")));
00358     throw std::runtime_error("failed to set default_multicast address");
00359   }
00360 
00361   const OPENDDS_STRING& net_if = outer_->disco_->multicast_interface();
00362 
00363   if (DCPS::DCPS_debug_level > 3) {
00364     ACE_DEBUG((LM_INFO,
00365                ACE_TEXT("(%P|%t) Spdp::SpdpTransport::SpdpTransport ")
00366                ACE_TEXT("joining group %C %C:%hd\n"),
00367                net_if.c_str (),
00368                mc_addr.c_str (),
00369                mc_port));
00370   }
00371 
00372 #ifdef ACE_HAS_MAC_OSX
00373   multicast_socket_.opts(ACE_SOCK_Dgram_Mcast::OPT_BINDADDR_NO |
00374                          ACE_SOCK_Dgram_Mcast::DEFOPT_NULLIFACE);
00375 #endif
00376 
00377   if (0 != multicast_socket_.join(default_multicast, 1,
00378                                   net_if.empty() ? 0 :
00379                                   ACE_TEXT_CHAR_TO_TCHAR(net_if.c_str()))) {
00380     ACE_DEBUG((LM_ERROR,
00381         ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::SpdpTransport() - ")
00382         ACE_TEXT("failed to join multicast group %C:%hd %p\n"),
00383         mc_addr.c_str(), mc_port, ACE_TEXT("ACE_SOCK_Dgram_Mcast::join")));
00384     throw std::runtime_error("failed to join multicast group");
00385   }
00386 
00387   send_addrs_.insert(default_multicast);
00388 
00389   typedef RtpsDiscovery::AddrVec::iterator iter;
00390   for (iter it = outer_->disco_->spdp_send_addrs().begin(),
00391        end = outer_->disco_->spdp_send_addrs().end(); it != end; ++it) {
00392     send_addrs_.insert(ACE_INET_Addr(it->c_str()));
00393   }
00394 
00395   reference_counting_policy().value(Reference_Counting_Policy::ENABLED);
00396 }

OpenDDS::RTPS::Spdp::SpdpTransport::~SpdpTransport (  ) 

Definition at line 418 of file Spdp.cpp.

References OpenDDS::DCPS::DCPS_debug_level, dispose_unregister(), OpenDDS::RTPS::Spdp::eh_shutdown_, OpenDDS::DCPS::LocalParticipant< EndpointManagerType >::lock_, multicast_socket_, outer_, OpenDDS::RTPS::Spdp::shutdown_cond_, and unicast_socket_.

00419 {
00420   if (DCPS::DCPS_debug_level > 3) {
00421     ACE_DEBUG((LM_INFO, ACE_TEXT("(%P|%t) SpdpTransport::~SpdpTransport\n")));
00422   }
00423   dispose_unregister();
00424   {
00425     // Acquire lock for modification of condition variable
00426     ACE_GUARD(ACE_Thread_Mutex, g, outer_->lock_);
00427     outer_->eh_shutdown_ = true;
00428   }
00429   outer_->shutdown_cond_.signal();
00430   unicast_socket_.close();
00431   multicast_socket_.close();
00432 }


Member Function Documentation

void OpenDDS::RTPS::Spdp::SpdpTransport::acknowledge (  ) 

Definition at line 719 of file Spdp.cpp.

References outer_, and OpenDDS::RTPS::Spdp::reactor().

Referenced by OpenDDS::RTPS::Spdp::fini_bit().

00720 {
00721   ACE_Reactor* reactor = outer_->reactor();
00722   reactor->notify(this);
00723 }

void OpenDDS::RTPS::Spdp::SpdpTransport::close (  ) 

Definition at line 476 of file Spdp.cpp.

References OpenDDS::DCPS::DCPS_debug_level, multicast_socket_, outer_, OpenDDS::RTPS::Spdp::reactor(), and unicast_socket_.

Referenced by OpenDDS::RTPS::Spdp::~Spdp().

00477 {
00478   if (DCPS::DCPS_debug_level > 3) {
00479     ACE_DEBUG((LM_INFO, ACE_TEXT("(%P|%t) SpdpTransport::close\n")));
00480   }
00481   ACE_Reactor* reactor = outer_->reactor();
00482   reactor->cancel_timer(this);
00483   const ACE_Reactor_Mask mask =
00484     ACE_Event_Handler::READ_MASK | ACE_Event_Handler::DONT_CALL;
00485   reactor->remove_handler(multicast_socket_.get_handle(), mask);
00486   reactor->remove_handler(unicast_socket_.get_handle(), mask);
00487 }

void OpenDDS::RTPS::Spdp::SpdpTransport::dispose_unregister (  ) 

Definition at line 435 of file Spdp.cpp.

References OpenDDS::DCPS::Serializer::ALIGN_CDR, data_, OpenDDS::RTPS::encap_LE, OpenDDS::RTPS::SubmessageHeader::flags, OpenDDS::DCPS::SequenceNumber::getHigh(), OpenDDS::DCPS::SequenceNumber::getLow(), OpenDDS::RTPS::Spdp::guid_, hdr_, OpenDDS::RTPS::SequenceNumber_t::high, OpenDDS::RTPS::DataSubmessage::inlineQos, OpenDDS::RTPS::SequenceNumber_t::low, OPENDDS_SET(), outer_, OpenDDS::RTPS::PID_PARTICIPANT_GUID, seq_, OpenDDS::RTPS::DataSubmessage::smHeader, unicast_socket_, wbuff_, and OpenDDS::RTPS::DataSubmessage::writerSN.

Referenced by ~SpdpTransport().

00436 {
00437   // Send the dispose/unregister SPDP sample
00438   data_.writerSN.high = seq_.getHigh();
00439   data_.writerSN.low = seq_.getLow();
00440   data_.smHeader.flags = 1 /*FLAG_E*/ | 2 /*FLAG_Q*/ | 8 /*FLAG_K*/;
00441   data_.inlineQos.length(1);
00442   static const StatusInfo_t dispose_unregister = { {0, 0, 0, 3} };
00443   data_.inlineQos[0].status_info(dispose_unregister);
00444 
00445   ParameterList plist(1);
00446   plist.length(1);
00447   plist[0].guid(outer_->guid_);
00448   plist[0]._d(PID_PARTICIPANT_GUID);
00449 
00450   wbuff_.reset();
00451   DCPS::Serializer ser(&wbuff_, false, DCPS::Serializer::ALIGN_CDR);
00452   CORBA::UShort options = 0;
00453   if (!(ser << hdr_) || !(ser << data_) || !(ser << encap_LE) || !(ser << options)
00454       || !(ser << plist)) {
00455     ACE_ERROR((LM_ERROR,
00456       ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::dispose_unregister() - ")
00457       ACE_TEXT("failed to serialize headers for dispose/unregister\n")));
00458     return;
00459   }
00460 
00461   typedef OPENDDS_SET(ACE_INET_Addr)::const_iterator iter_t;
00462   for (iter_t iter = send_addrs_.begin(); iter != send_addrs_.end(); ++iter) {
00463     const ssize_t res =
00464       unicast_socket_.send(wbuff_.rd_ptr(), wbuff_.length(), *iter);
00465     if (res < 0) {
00466       ACE_TCHAR addr_buff[256] = {};
00467       iter->addr_to_string(addr_buff, 256, 0);
00468       ACE_ERROR((LM_ERROR,
00469         ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::dispose_unregister() - ")
00470         ACE_TEXT("destination %s failed %p\n"), addr_buff, ACE_TEXT("send")));
00471     }
00472   }
00473 }

int OpenDDS::RTPS::Spdp::SpdpTransport::handle_exception ( ACE_HANDLE  fd = ACE_INVALID_HANDLE  )  [virtual]

Definition at line 712 of file Spdp.cpp.

References OpenDDS::RTPS::WaitForAcks::ack(), outer_, and OpenDDS::RTPS::Spdp::wait_for_acks().

00713 {
00714   outer_->wait_for_acks().ack();
00715   return 0;
00716 }

int OpenDDS::RTPS::Spdp::SpdpTransport::handle_input ( ACE_HANDLE  h  )  [virtual]

Definition at line 596 of file Spdp.cpp.

References OpenDDS::DCPS::Serializer::ALIGN_CDR, buff_, OpenDDS::RTPS::DATA, OpenDDS::RTPS::Spdp::data_received(), OpenDDS::RTPS::encap_BE, OpenDDS::RTPS::encap_LE, OpenDDS::DCPS::ENTITYID_PARTICIPANT, OpenDDS::DCPS::ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER, OpenDDS::RTPS::SubmessageHeader::flags, header, multicast_socket_, outer_, OpenDDS::RTPS::PID_PARTICIPANT_GUID, OpenDDS::DCPS::Serializer::skip(), OpenDDS::RTPS::SMHDR_SZ, OpenDDS::RTPS::DataSubmessage::smHeader, OpenDDS::RTPS::SubmessageHeader::submessageLength, OpenDDS::DCPS::Serializer::swap_bytes(), unicast_socket_, and OpenDDS::RTPS::DataSubmessage::writerId.

00597 {
00598   const ACE_SOCK_Dgram& socket = (h == unicast_socket_.get_handle())
00599                                  ? unicast_socket_ : multicast_socket_;
00600   ACE_INET_Addr remote;
00601   buff_.reset();
00602   const ssize_t bytes = socket.recv(buff_.wr_ptr(), buff_.space(), remote);
00603 
00604   if (bytes > 0) {
00605     buff_.wr_ptr(bytes);
00606   } else if (bytes == 0) {
00607     return -1;
00608   } else {
00609     ACE_DEBUG((
00610           LM_ERROR,
00611           ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::handle_input() - ")
00612           ACE_TEXT("error reading from %C socket %p\n")
00613           , (h == unicast_socket_.get_handle()) ? "unicast" : "multicast",
00614           ACE_TEXT("ACE_SOCK_Dgram::recv")));
00615     return -1;
00616   }
00617 
00618   // Handle some RTI protocol multicast to the same address
00619   if ((buff_.size() >= 4) && (!ACE_OS::memcmp(buff_.rd_ptr(), "RTPX", 4))) {
00620     return 0; // Ignore
00621   }
00622 
00623   DCPS::Serializer ser(&buff_, false, DCPS::Serializer::ALIGN_CDR);
00624   Header header;
00625   if (!(ser >> header)) {
00626     ACE_ERROR((LM_ERROR,
00627                ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::handle_input() - ")
00628                ACE_TEXT("failed to deserialize RTPS header for SPDP\n")));
00629     return 0;
00630   }
00631 
00632   while (buff_.length() > 3) {
00633     const char subm = buff_.rd_ptr()[0], flags = buff_.rd_ptr()[1];
00634     ser.swap_bytes((flags & 1 /*FLAG_E*/) != ACE_CDR_BYTE_ORDER);
00635     const size_t start = buff_.length();
00636     CORBA::UShort submessageLength = 0;
00637     switch (subm) {
00638     case DATA: {
00639       DataSubmessage data;
00640       if (!(ser >> data)) {
00641         ACE_ERROR((
00642               LM_ERROR,
00643               ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::handle_input() - ")
00644               ACE_TEXT("failed to deserialize DATA header for SPDP\n")));
00645         return 0;
00646       }
00647       submessageLength = data.smHeader.submessageLength;
00648 
00649       if (data.writerId != ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER) {
00650         // Not our message: this could be the same multicast group used
00651         // for SEDP and other traffic.
00652         break;
00653       }
00654 
00655       ParameterList plist;
00656       if (data.smHeader.flags & (4 /*FLAG_D*/ | 8 /*FLAG_K*/)) {
00657         ser.swap_bytes(!ACE_CDR_BYTE_ORDER); // read "encap" itself in LE
00658         CORBA::UShort encap, options;
00659         if (!(ser >> encap) || (encap != encap_LE && encap != encap_BE)) {
00660           ACE_ERROR((LM_ERROR,
00661             ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::handle_input() - ")
00662             ACE_TEXT("failed to deserialize encapsulation header for SPDP\n")));
00663           return 0;
00664         }
00665         ser >> options;
00666         // bit 8 in encap is on if it's PL_CDR_LE
00667         ser.swap_bytes(((encap & 0x100) >> 8) != ACE_CDR_BYTE_ORDER);
00668         if (!(ser >> plist)) {
00669           ACE_ERROR((LM_ERROR,
00670             ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::handle_input() - ")
00671             ACE_TEXT("failed to deserialize data payload for SPDP\n")));
00672           return 0;
00673         }
00674       } else {
00675         plist.length(1);
00676         RepoId guid;
00677         std::memcpy(guid.guidPrefix, header.guidPrefix, sizeof(GuidPrefix_t));
00678         guid.entityId = ENTITYID_PARTICIPANT;
00679         plist[0].guid(guid);
00680         plist[0]._d(PID_PARTICIPANT_GUID);
00681       }
00682 
00683       outer_->data_received(data, plist);
00684       break;
00685     }
00686     default:
00687       SubmessageHeader smHeader;
00688       if (!(ser >> smHeader)) {
00689         ACE_ERROR((
00690               LM_ERROR,
00691               ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::handle_input() - ")
00692               ACE_TEXT("failed to deserialize SubmessageHeader for SPDP\n")));
00693         return 0;
00694       }
00695       submessageLength = smHeader.submessageLength;
00696       break;
00697     }
00698     if (submessageLength && buff_.length()) {
00699       const size_t read = start - buff_.length();
00700       if (read < static_cast<size_t>(submessageLength + SMHDR_SZ)) {
00701         ser.skip(static_cast<CORBA::UShort>(submessageLength + SMHDR_SZ - read));
00702       }
00703     } else if (!submessageLength) {
00704       break; // submessageLength of 0 indicates the last submessage
00705     }
00706   }
00707 
00708   return 0;
00709 }

int OpenDDS::RTPS::Spdp::SpdpTransport::handle_timeout ( const ACE_Time_Value &  ,
const void *   
) [virtual]

Definition at line 588 of file Spdp.cpp.

References outer_, OpenDDS::RTPS::Spdp::remove_expired_participants(), and write().

00589 {
00590   write();
00591   outer_->remove_expired_participants();
00592   return 0;
00593 }

void OpenDDS::RTPS::Spdp::SpdpTransport::open (  ) 

Definition at line 399 of file Spdp.cpp.

References OpenDDS::RTPS::Spdp::disco_, multicast_socket_, outer_, OpenDDS::RTPS::Spdp::reactor(), OpenDDS::RTPS::RtpsDiscovery::resend_period(), and unicast_socket_.

Referenced by OpenDDS::RTPS::Spdp::init_bit().

00400 {
00401   ACE_Reactor* reactor = outer_->reactor();
00402   if (reactor->register_handler(unicast_socket_.get_handle(),
00403                                 this, ACE_Event_Handler::READ_MASK) != 0) {
00404     throw std::runtime_error("failed to register unicast input handler");
00405   }
00406 
00407   if (reactor->register_handler(multicast_socket_.get_handle(),
00408                                 this, ACE_Event_Handler::READ_MASK) != 0) {
00409     throw std::runtime_error("failed to register multicast input handler");
00410   }
00411 
00412   const ACE_Time_Value per = outer_->disco_->resend_period();
00413   if (-1 == reactor->schedule_timer(this, 0, ACE_Time_Value(0), per)) {
00414     throw std::runtime_error("failed to schedule timer with reactor");
00415   }
00416 }

bool OpenDDS::RTPS::Spdp::SpdpTransport::open_unicast_socket ( u_short  port_common,
u_short  participant_id 
)

Definition at line 732 of file Spdp.cpp.

References OpenDDS::RTPS::RtpsDiscovery::d1(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::RTPS::Spdp::disco_, OpenDDS::DCPS::open_appropriate_socket_type(), outer_, OpenDDS::RTPS::RtpsDiscovery::pg(), OpenDDS::DCPS::set_socket_multicast_ttl(), OpenDDS::RTPS::RtpsDiscovery::ttl(), and unicast_socket_.

Referenced by SpdpTransport().

00734 {
00735   const u_short uni_port = port_common + outer_->disco_->d1() +
00736                            (outer_->disco_->pg() * participant_id);
00737 
00738   ACE_INET_Addr local_addr;
00739   if (0 != local_addr.set(uni_port)) {
00740     ACE_DEBUG((
00741           LM_ERROR,
00742           ACE_TEXT("(%P|%t) Spdp::SpdpTransport::open_unicast_socket() - ")
00743           ACE_TEXT("failed setting unicast local_addr to port %d %p\n"),
00744           uni_port, ACE_TEXT("ACE_INET_Addr::set")));
00745     throw std::runtime_error("failed to set unicast local address");
00746   }
00747 
00748   if (!OpenDDS::DCPS::open_appropriate_socket_type(unicast_socket_, local_addr)) {
00749     if (DCPS::DCPS_debug_level > 3) {
00750       ACE_DEBUG((
00751             LM_WARNING,
00752             ACE_TEXT("(%P|%t) Spdp::SpdpTransport::open_unicast_socket() - ")
00753             ACE_TEXT("failed to open_appropriate_socket_type unicast socket on port %d %p.  ")
00754             ACE_TEXT("Trying next participantId...\n"),
00755             uni_port, ACE_TEXT("ACE_SOCK_Dgram::open")));
00756     }
00757     return false;
00758 
00759   } else if (DCPS::DCPS_debug_level > 3) {
00760     ACE_DEBUG((
00761           LM_INFO,
00762           ACE_TEXT("(%P|%t) Spdp::SpdpTransport::open_unicast_socket() - ")
00763           ACE_TEXT("opened unicast socket on port %d\n"),
00764           uni_port));
00765   }
00766 
00767   if (!OpenDDS::DCPS::set_socket_multicast_ttl(unicast_socket_, outer_->disco_->ttl())) {
00768     ACE_DEBUG((LM_ERROR,
00769                ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::open_unicast_socket() - ")
00770                ACE_TEXT("failed to set TTL value to %d ")
00771                ACE_TEXT("for port:%hd %p\n"),
00772                outer_->disco_->ttl(), uni_port, ACE_TEXT("OpenDDS::DCPS::set_socket_multicast_ttl:")));
00773     throw std::runtime_error("failed to set TTL");
00774   }
00775   return true;
00776 }

OpenDDS::RTPS::Spdp::SpdpTransport::OPENDDS_SET ( ACE_INET_Addr   ) 

Referenced by dispose_unregister(), and write_i().

void OpenDDS::RTPS::Spdp::SpdpTransport::write (  ) 

Definition at line 490 of file Spdp.cpp.

References OpenDDS::DCPS::LocalParticipant< EndpointManagerType >::lock_, outer_, and write_i().

Referenced by handle_timeout().

00491 {
00492   ACE_GUARD(ACE_Thread_Mutex, g, outer_->lock_);
00493   write_i();
00494 }

void OpenDDS::RTPS::Spdp::SpdpTransport::write_i (  ) 

Definition at line 497 of file Spdp.cpp.

References OpenDDS::DCPS::Serializer::ALIGN_CDR, OpenDDS::RTPS::BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_DATA_READER, OpenDDS::RTPS::BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_DATA_WRITER, data_, OpenDDS::RTPS::DISC_BUILTIN_ENDPOINT_PARTICIPANT_ANNOUNCER, OpenDDS::RTPS::DISC_BUILTIN_ENDPOINT_PARTICIPANT_DETECTOR, OpenDDS::RTPS::DISC_BUILTIN_ENDPOINT_PUBLICATION_ANNOUNCER, OpenDDS::RTPS::DISC_BUILTIN_ENDPOINT_PUBLICATION_DETECTOR, OpenDDS::RTPS::DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_ANNOUNCER, OpenDDS::RTPS::DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_DETECTOR, OpenDDS::RTPS::encap_LE, OpenDDS::DCPS::SequenceNumber::getHigh(), OpenDDS::DCPS::SequenceNumber::getLow(), OpenDDS::RTPS::Spdp::guid_, OpenDDS::DCPS::GUID_t::guidPrefix, hdr_, OpenDDS::RTPS::SequenceNumber_t::high, lease_duration_, OpenDDS::RTPS::LOCATOR_KIND_UDPv4, OpenDDS::RTPS::SequenceNumber_t::low, OPENDDS_SET(), outer_, OpenDDS::RTPS::PROTOCOLVERSION, OpenDDS::DCPS::LocalParticipant< EndpointManagerType >::qos_, OpenDDS::RTPS::Spdp::sedp_multicast_, OpenDDS::RTPS::Spdp::sedp_unicast_, seq_, OpenDDS::RTPS::ParameterListConverter::to_param_list(), unicast_socket_, DDS::DomainParticipantQos::user_data, OpenDDS::RTPS::VENDORID_OPENDDS, wbuff_, and OpenDDS::RTPS::DataSubmessage::writerSN.

Referenced by OpenDDS::RTPS::Spdp::data_received(), and write().

00498 {
00499   static const BuiltinEndpointSet_t availableBuiltinEndpoints =
00500     DISC_BUILTIN_ENDPOINT_PARTICIPANT_ANNOUNCER |
00501     DISC_BUILTIN_ENDPOINT_PARTICIPANT_DETECTOR |
00502     DISC_BUILTIN_ENDPOINT_PUBLICATION_ANNOUNCER |
00503     DISC_BUILTIN_ENDPOINT_PUBLICATION_DETECTOR |
00504     DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_ANNOUNCER |
00505     DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_DETECTOR |
00506     BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_DATA_WRITER |
00507     BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_DATA_READER
00508     ;
00509   // The RTPS spec has no constants for the builtinTopics{Writer,Reader}
00510 
00511   // This locator list should not be empty, but we won't actually be using it.
00512   // The OpenDDS publication/subscription data will have locators included.
00513   OpenDDS::DCPS::LocatorSeq nonEmptyList(1);
00514   nonEmptyList.length(1);
00515   nonEmptyList[0].kind = LOCATOR_KIND_UDPv4;
00516   nonEmptyList[0].port = 12345;
00517   std::memset(nonEmptyList[0].address, 0, 12);
00518   nonEmptyList[0].address[12] = 127;
00519   nonEmptyList[0].address[13] = 0;
00520   nonEmptyList[0].address[14] = 0;
00521   nonEmptyList[0].address[15] = 1;
00522 
00523   data_.writerSN.high = seq_.getHigh();
00524   data_.writerSN.low = seq_.getLow();
00525   ++seq_;
00526 
00527   const GuidPrefix_t& gp = outer_->guid_.guidPrefix;
00528 
00529   const SPDPdiscoveredParticipantData pdata = {
00530     { // ParticipantBuiltinTopicData
00531       DDS::BuiltinTopicKey_t() /*ignored*/,
00532       outer_->qos_.user_data
00533     },
00534     { // ParticipantProxy_t
00535       PROTOCOLVERSION,
00536       {gp[0], gp[1], gp[2], gp[3], gp[4], gp[5],
00537        gp[6], gp[7], gp[8], gp[9], gp[10], gp[11]},
00538       VENDORID_OPENDDS,
00539       false /*expectsIQoS*/,
00540       availableBuiltinEndpoints,
00541       outer_->sedp_unicast_,
00542       outer_->sedp_multicast_,
00543       nonEmptyList /*defaultMulticastLocatorList*/,
00544       nonEmptyList /*defaultUnicastLocatorList*/,
00545       {0 /*manualLivelinessCount*/}   //FUTURE: implement manual liveliness
00546     },
00547     { // Duration_t (leaseDuration)
00548       static_cast<CORBA::Long>(lease_duration_.sec()),
00549       0 // we are not supporting fractional seconds in the lease duration
00550     }
00551   };
00552 
00553   ParameterList plist;
00554   if (ParameterListConverter::to_param_list(pdata, plist) < 0) {
00555     ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00556       ACE_TEXT("Spdp::SpdpTransport::write() - ")
00557       ACE_TEXT("failed to convert from SPDPdiscoveredParticipantData ")
00558       ACE_TEXT("to ParameterList\n")));
00559     return;
00560   }
00561 
00562   wbuff_.reset();
00563   CORBA::UShort options = 0;
00564   DCPS::Serializer ser(&wbuff_, false, DCPS::Serializer::ALIGN_CDR);
00565   if (!(ser << hdr_) || !(ser << data_) || !(ser << encap_LE) || !(ser << options)
00566       || !(ser << plist)) {
00567     ACE_ERROR((LM_ERROR,
00568       ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::write() - ")
00569       ACE_TEXT("failed to serialize headers for SPDP\n")));
00570     return;
00571   }
00572 
00573   typedef OPENDDS_SET(ACE_INET_Addr)::const_iterator iter_t;
00574   for (iter_t iter = send_addrs_.begin(); iter != send_addrs_.end(); ++iter) {
00575     const ssize_t res =
00576       unicast_socket_.send(wbuff_.rd_ptr(), wbuff_.length(), *iter);
00577     if (res < 0) {
00578       ACE_TCHAR addr_buff[256] = {};
00579       iter->addr_to_string(addr_buff, 256, 0);
00580       ACE_ERROR((LM_ERROR,
00581         ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::write() - ")
00582         ACE_TEXT("destination %s failed %p\n"), addr_buff, ACE_TEXT("send")));
00583     }
00584   }
00585 }


Member Data Documentation

ACE_Message_Block OpenDDS::RTPS::Spdp::SpdpTransport::buff_

Definition at line 110 of file Spdp.h.

Referenced by handle_input().

DataSubmessage OpenDDS::RTPS::Spdp::SpdpTransport::data_

Definition at line 104 of file Spdp.h.

Referenced by dispose_unregister(), SpdpTransport(), and write_i().

Header OpenDDS::RTPS::Spdp::SpdpTransport::hdr_

Definition at line 103 of file Spdp.h.

Referenced by dispose_unregister(), SpdpTransport(), and write_i().

ACE_Time_Value OpenDDS::RTPS::Spdp::SpdpTransport::lease_duration_

Definition at line 106 of file Spdp.h.

Referenced by write_i().

ACE_SOCK_Dgram_Mcast OpenDDS::RTPS::Spdp::SpdpTransport::multicast_socket_

Definition at line 108 of file Spdp.h.

Referenced by close(), handle_input(), open(), SpdpTransport(), and ~SpdpTransport().

Spdp* OpenDDS::RTPS::Spdp::SpdpTransport::outer_

Definition at line 102 of file Spdp.h.

Referenced by acknowledge(), close(), dispose_unregister(), handle_exception(), handle_input(), handle_timeout(), open(), open_unicast_socket(), SpdpTransport(), write(), write_i(), and ~SpdpTransport().

DCPS::SequenceNumber OpenDDS::RTPS::Spdp::SpdpTransport::seq_

Definition at line 105 of file Spdp.h.

Referenced by dispose_unregister(), and write_i().

ACE_SOCK_Dgram OpenDDS::RTPS::Spdp::SpdpTransport::unicast_socket_

Definition at line 107 of file Spdp.h.

Referenced by close(), dispose_unregister(), handle_input(), open(), open_unicast_socket(), write_i(), and ~SpdpTransport().

ACE_Message_Block OpenDDS::RTPS::Spdp::SpdpTransport::wbuff_

Definition at line 110 of file Spdp.h.

Referenced by dispose_unregister(), and write_i().


The documentation for this struct was generated from the following files:
Generated on Fri Feb 12 20:06:51 2016 for OpenDDS by  doxygen 1.4.7