OpenDDS::RTPS::Spdp::SpdpTransport Struct Reference

Inheritance diagram for OpenDDS::RTPS::Spdp::SpdpTransport:
Inheritance graph
[legend]
Collaboration diagram for OpenDDS::RTPS::Spdp::SpdpTransport:
Collaboration graph
[legend]

List of all members.

Public Member Functions

 SpdpTransport (Spdp *outer, bool securityGuids)
 ~SpdpTransport ()
virtual int handle_timeout (const ACE_Time_Value &, const void *)
virtual int handle_input (ACE_HANDLE h)
virtual int handle_exception (ACE_HANDLE fd=ACE_INVALID_HANDLE)
void open ()
void write ()
void write_i ()
void close ()
void dispose_unregister ()
bool open_unicast_socket (u_short port_common, u_short participant_id)
void acknowledge ()
 OPENDDS_SET (ACE_INET_Addr) send_addrs_

Public Attributes

Spdpouter_
Header hdr_
DataSubmessage data_
DCPS::SequenceNumber seq_
ACE_Time_Value lease_duration_
ACE_SOCK_Dgram unicast_socket_
ACE_SOCK_Dgram_Mcast multicast_socket_
ACE_Message_Block buff_
ACE_Message_Block wbuff_
ACE_Time_Value disco_resend_period_
ACE_Time_Value last_disco_resend_

Detailed Description

Definition at line 152 of file Spdp.h.


Constructor & Destructor Documentation

OpenDDS::RTPS::Spdp::SpdpTransport::SpdpTransport ( Spdp outer,
bool  securityGuids 
)

Definition at line 1276 of file Spdp.cpp.

References ACE_TEXT(), ACE_TEXT_CHAR_TO_TCHAR, OpenDDS::RTPS::RtpsDiscovery::d0(), OpenDDS::RTPS::DATA, data_, OpenDDS::RTPS::DATA_OCTETS_TO_IQOS, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::RTPS::RtpsDiscovery::default_multicast_group(), ACE_SOCK_Dgram_Mcast::DEFOPT_NULLIFACE, OpenDDS::RTPS::RtpsDiscovery::dg(), OpenDDS::RTPS::Spdp::disco_, OpenDDS::RTPS::Spdp::domain_, OpenDDS::DCPS::ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER, OpenDDS::DCPS::ENTITYID_UNKNOWN, OpenDDS::RTPS::DataSubmessage::extraFlags, OpenDDS::RTPS::FLAG_D, OpenDDS::RTPS::FLAG_E, OpenDDS::RTPS::SubmessageHeader::flags, ACE_OS::getpid(), OpenDDS::RTPS::Spdp::guid_, OpenDDS::DCPS::GUID_t::guidPrefix, OpenDDS::RTPS::Header::guidPrefix, hdr_, OpenDDS::RTPS::SequenceNumber_t::high, ACE_SOCK_Dgram_Mcast::join(), LM_ERROR, LM_INFO, OpenDDS::RTPS::SequenceNumber_t::low, OpenDDS::RTPS::RtpsDiscovery::multicast_interface(), multicast_socket_, OpenDDS::RTPS::DataSubmessage::octetsToInlineQos, open_unicast_socket(), OPENDDS_STRING, ACE_SOCK_Dgram_Mcast::OPT_BINDADDR_NO, ACE_SOCK_Dgram_Mcast::opts(), outer_, OpenDDS::RTPS::RtpsDiscovery::pb(), OpenDDS::RTPS::Header::prefix, OpenDDS::RTPS::PROTOCOLVERSION, OpenDDS::RTPS::DataSubmessage::readerId, ACE_INET_Addr::set(), OpenDDS::RTPS::DataSubmessage::smHeader, OpenDDS::RTPS::RtpsDiscovery::spdp_send_addrs(), OpenDDS::RTPS::SubmessageHeader::submessageId, OpenDDS::RTPS::SubmessageHeader::submessageLength, OpenDDS::RTPS::Header::vendorId, OpenDDS::RTPS::VENDORID_OPENDDS, OpenDDS::RTPS::Header::version, while(), OpenDDS::RTPS::DataSubmessage::writerId, and OpenDDS::RTPS::DataSubmessage::writerSN.

01277   : outer_(outer), lease_duration_(outer_->disco_->resend_period() * LEASE_MULT)
01278   , buff_(64 * 1024)
01279   , wbuff_(64 * 1024)
01280 {
01281   hdr_.prefix[0] = 'R';
01282   hdr_.prefix[1] = 'T';
01283   hdr_.prefix[2] = 'P';
01284   hdr_.prefix[3] = 'S';
01285   hdr_.version = PROTOCOLVERSION;
01286   hdr_.vendorId = VENDORID_OPENDDS;
01287   std::memcpy(hdr_.guidPrefix, outer_->guid_.guidPrefix, sizeof(GuidPrefix_t));
01288   data_.smHeader.submessageId = DATA;
01289   data_.smHeader.flags = FLAG_E | FLAG_D;
01290   data_.smHeader.submessageLength = 0; // last submessage in the Message
01291   data_.extraFlags = 0;
01292   data_.octetsToInlineQos = DATA_OCTETS_TO_IQOS;
01293   data_.readerId = ENTITYID_UNKNOWN;
01294   data_.writerId = ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER;
01295   data_.writerSN.high = 0;
01296   data_.writerSN.low = 0;
01297 
01298   // Ports are set by the formulas in RTPS v2.1 Table 9.8
01299   const u_short port_common = outer_->disco_->pb() +
01300                               (outer_->disco_->dg() * outer_->domain_),
01301     mc_port = port_common + outer_->disco_->d0();
01302 
01303   // with security enabled the meaning of the bytes in guidPrefix changes
01304   u_short participantId = securityGuids ? 0
01305     : (hdr_.guidPrefix[10] << 8) | hdr_.guidPrefix[11];
01306 
01307 #ifdef OPENDDS_SAFETY_PROFILE
01308   const u_short startingParticipantId = participantId;
01309 #endif
01310 
01311   while (!open_unicast_socket(port_common, participantId)) {
01312     ++participantId;
01313   }
01314 
01315 #ifdef OPENDDS_SAFETY_PROFILE
01316   if (participantId > startingParticipantId && ACE_OS::getpid() == -1) {
01317     // Since pids are not available, use the fact that we had to increment
01318     // participantId to modify the GUID's pid bytes.  This avoids GUID conflicts
01319     // between processes on the same host which start at the same time
01320     // (resulting in the same seed value for the random number generator).
01321     hdr_.guidPrefix[8] = static_cast<CORBA::Octet>(participantId >> 8);
01322     hdr_.guidPrefix[9] = static_cast<CORBA::Octet>(participantId & 0xFF);
01323     outer_->guid_.guidPrefix[8] = hdr_.guidPrefix[8];
01324     outer_->guid_.guidPrefix[9] = hdr_.guidPrefix[9];
01325   }
01326 #endif
01327 
01328   OPENDDS_STRING mc_addr = outer_->disco_->default_multicast_group();
01329   ACE_INET_Addr default_multicast;
01330   if (0 != default_multicast.set(mc_port, mc_addr.c_str())) {
01331     ACE_DEBUG((
01332           LM_ERROR,
01333           ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::SpdpTransport() - ")
01334           ACE_TEXT("failed setting default_multicast address %C:%hd %p\n"),
01335           mc_addr.c_str(), mc_port, ACE_TEXT("ACE_INET_Addr::set")));
01336     throw std::runtime_error("failed to set default_multicast address");
01337   }
01338 
01339   const OPENDDS_STRING& net_if = outer_->disco_->multicast_interface();
01340 
01341   if (DCPS::DCPS_debug_level > 3) {
01342     ACE_DEBUG((LM_INFO,
01343                ACE_TEXT("(%P|%t) Spdp::SpdpTransport::SpdpTransport ")
01344                ACE_TEXT("joining group %C %C:%hd\n"),
01345                net_if.c_str (),
01346                mc_addr.c_str (),
01347                mc_port));
01348   }
01349 
01350 #ifdef ACE_HAS_MAC_OSX
01351   multicast_socket_.opts(ACE_SOCK_Dgram_Mcast::OPT_BINDADDR_NO |
01352                          ACE_SOCK_Dgram_Mcast::DEFOPT_NULLIFACE);
01353 #endif
01354 
01355   if (0 != multicast_socket_.join(default_multicast, 1,
01356                                   net_if.empty() ? 0 :
01357                                   ACE_TEXT_CHAR_TO_TCHAR(net_if.c_str()))) {
01358     ACE_ERROR((LM_ERROR,
01359         ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::SpdpTransport() - ")
01360         ACE_TEXT("failed to join multicast group %C:%hd %p\n"),
01361         mc_addr.c_str(), mc_port, ACE_TEXT("ACE_SOCK_Dgram_Mcast::join")));
01362     throw std::runtime_error("failed to join multicast group");
01363   }
01364 
01365   send_addrs_.insert(default_multicast);
01366 
01367   typedef RtpsDiscovery::AddrVec::iterator iter;
01368   for (iter it = outer_->disco_->spdp_send_addrs().begin(),
01369        end = outer_->disco_->spdp_send_addrs().end(); it != end; ++it) {
01370     send_addrs_.insert(ACE_INET_Addr(it->c_str()));
01371   }
01372 }

Here is the call graph for this function:

OpenDDS::RTPS::Spdp::SpdpTransport::~SpdpTransport (  ) 

Definition at line 1398 of file Spdp.cpp.

References CORBA::Exception::_info(), ACE_TEXT(), ACE_String_Base< ACE_CHAR_T >::c_str(), ACE_SOCK::close(), OpenDDS::DCPS::DCPS_debug_level, dispose_unregister(), OpenDDS::RTPS::Spdp::eh_shutdown_, LM_INFO, LM_WARNING, OpenDDS::DCPS::LocalParticipant< EndpointManagerType >::lock_, multicast_socket_, outer_, OpenDDS::RTPS::Spdp::shutdown_cond_, ACE_Condition< ACE_Thread_Mutex >::signal(), and unicast_socket_.

01399 {
01400   if (DCPS::DCPS_debug_level > 3) {
01401     ACE_DEBUG((LM_INFO, ACE_TEXT("(%P|%t) SpdpTransport::~SpdpTransport\n")));
01402   }
01403   try {
01404     dispose_unregister();
01405   }
01406   catch (const CORBA::Exception& ex) {
01407     if (DCPS::DCPS_debug_level > 0) {
01408       ACE_DEBUG((LM_WARNING,
01409         ACE_TEXT("(%P|%t) WARNING: Exception caught in ")
01410         ACE_TEXT("SpdpTransport::~SpdpTransport: %C\n"),
01411         ex._info().c_str()));
01412     }
01413   }
01414   {
01415     // Acquire lock for modification of condition variable
01416     ACE_GUARD(ACE_Thread_Mutex, g, outer_->lock_);
01417     outer_->eh_shutdown_ = true;
01418   }
01419   outer_->shutdown_cond_.signal();
01420   unicast_socket_.close();
01421   multicast_socket_.close();
01422 }

Here is the call graph for this function:


Member Function Documentation

void OpenDDS::RTPS::Spdp::SpdpTransport::acknowledge (  ) 

Definition at line 1683 of file Spdp.cpp.

References ACE_Reactor::notify(), outer_, OpenDDS::RTPS::Spdp::reactor(), and ACE_Event_Handler::reactor().

Referenced by OpenDDS::RTPS::Spdp::fini_bit().

01684 {
01685   ACE_Reactor* reactor = outer_->reactor();
01686   reactor->notify(this);
01687 }

Here is the call graph for this function:

Here is the caller graph for this function:

void OpenDDS::RTPS::Spdp::SpdpTransport::close ( void   ) 

Definition at line 1466 of file Spdp.cpp.

References ACE_TEXT(), ACE_Reactor::cancel_timer(), OpenDDS::DCPS::DCPS_debug_level, ACE_Event_Handler::DONT_CALL, ACE_IPC_SAP::get_handle(), LM_INFO, multicast_socket_, outer_, OpenDDS::RTPS::Spdp::reactor(), ACE_Event_Handler::reactor(), ACE_Event_Handler::READ_MASK, ACE_Reactor::remove_handler(), and unicast_socket_.

Referenced by OpenDDS::RTPS::Spdp::~Spdp().

01467 {
01468   if (DCPS::DCPS_debug_level > 3) {
01469     ACE_DEBUG((LM_INFO, ACE_TEXT("(%P|%t) SpdpTransport::close\n")));
01470   }
01471   ACE_Reactor* reactor = outer_->reactor();
01472   reactor->cancel_timer(this);
01473   const ACE_Reactor_Mask mask =
01474     ACE_Event_Handler::READ_MASK | ACE_Event_Handler::DONT_CALL;
01475   reactor->remove_handler(multicast_socket_.get_handle(), mask);
01476   reactor->remove_handler(unicast_socket_.get_handle(), mask);
01477 }

Here is the call graph for this function:

Here is the caller graph for this function:

void OpenDDS::RTPS::Spdp::SpdpTransport::dispose_unregister (  ) 

Definition at line 1425 of file Spdp.cpp.

References ACE_TEXT(), OpenDDS::DCPS::Serializer::ALIGN_CDR, data_, OpenDDS::RTPS::FLAG_E, OpenDDS::RTPS::FLAG_K_IN_DATA, OpenDDS::RTPS::FLAG_Q, OpenDDS::RTPS::SubmessageHeader::flags, OpenDDS::DCPS::SequenceNumber::getHigh(), OpenDDS::DCPS::SequenceNumber::getLow(), OpenDDS::RTPS::Spdp::guid_, hdr_, OpenDDS::RTPS::SequenceNumber_t::high, OpenDDS::RTPS::DataSubmessage::inlineQos, ACE_Message_Block::length(), LM_ERROR, OpenDDS::RTPS::SequenceNumber_t::low, OPENDDS_SET(), outer_, OpenDDS::RTPS::PID_PARTICIPANT_GUID, ACE_Message_Block::rd_ptr(), ACE_Message_Block::reset(), ACE_SOCK_Dgram::send(), seq_, OpenDDS::RTPS::DataSubmessage::smHeader, unicast_socket_, wbuff_, and OpenDDS::RTPS::DataSubmessage::writerSN.

Referenced by ~SpdpTransport().

01426 {
01427   // Send the dispose/unregister SPDP sample
01428   data_.writerSN.high = seq_.getHigh();
01429   data_.writerSN.low = seq_.getLow();
01430   data_.smHeader.flags = FLAG_E | FLAG_Q | FLAG_K_IN_DATA;
01431   data_.inlineQos.length(1);
01432   static const StatusInfo_t dispose_unregister = { {0, 0, 0, 3} };
01433   data_.inlineQos[0].status_info(dispose_unregister);
01434 
01435   ParameterList plist(1);
01436   plist.length(1);
01437   plist[0].guid(outer_->guid_);
01438   plist[0]._d(PID_PARTICIPANT_GUID);
01439 
01440   wbuff_.reset();
01441   DCPS::Serializer ser(&wbuff_, false, DCPS::Serializer::ALIGN_CDR);
01442   CORBA::UShort options = 0;
01443   if (!(ser << hdr_) || !(ser << data_) || !(ser << encap_LE) || !(ser << options)
01444       || !(ser << plist)) {
01445     ACE_ERROR((LM_ERROR,
01446       ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::dispose_unregister() - ")
01447       ACE_TEXT("failed to serialize headers for dispose/unregister\n")));
01448     return;
01449   }
01450 
01451   typedef OPENDDS_SET(ACE_INET_Addr)::const_iterator iter_t;
01452   for (iter_t iter = send_addrs_.begin(); iter != send_addrs_.end(); ++iter) {
01453     const ssize_t res =
01454       unicast_socket_.send(wbuff_.rd_ptr(), wbuff_.length(), *iter);
01455     if (res < 0) {
01456       ACE_TCHAR addr_buff[256] = {};
01457       iter->addr_to_string(addr_buff, 256, 0);
01458       ACE_ERROR((LM_ERROR,
01459         ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::dispose_unregister() - ")
01460         ACE_TEXT("destination %s failed %p\n"), addr_buff, ACE_TEXT("send")));
01461     }
01462   }
01463 }

Here is the call graph for this function:

Here is the caller graph for this function:

int OpenDDS::RTPS::Spdp::SpdpTransport::handle_exception ( ACE_HANDLE  fd = ACE_INVALID_HANDLE  )  [virtual]

Reimplemented from ACE_Event_Handler.

Definition at line 1676 of file Spdp.cpp.

References OpenDDS::RTPS::WaitForAcks::ack(), outer_, and OpenDDS::RTPS::Spdp::wait_for_acks().

01677 {
01678   outer_->wait_for_acks().ack();
01679   return 0;
01680 }

Here is the call graph for this function:

int OpenDDS::RTPS::Spdp::SpdpTransport::handle_input ( ACE_HANDLE  h  )  [virtual]

Reimplemented from ACE_Event_Handler.

Definition at line 1554 of file Spdp.cpp.

References ACE_TEXT(), OpenDDS::DCPS::Serializer::ALIGN_CDR, buff_, OpenDDS::RTPS::DATA, OpenDDS::RTPS::Spdp::data_received(), OpenDDS::DCPS::ENTITYID_PARTICIPANT, OpenDDS::DCPS::ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER, OpenDDS::RTPS::FLAG_D, OpenDDS::RTPS::FLAG_E, OpenDDS::RTPS::FLAG_K_IN_DATA, OpenDDS::RTPS::SubmessageHeader::flags, ACE_IPC_SAP::get_handle(), OpenDDS::RTPS::Header::guidPrefix, header, ACE_Message_Block::length(), LM_ERROR, ACE_OS::memcmp(), multicast_socket_, outer_, OpenDDS::RTPS::PID_PARTICIPANT_GUID, ACE_Message_Block::rd_ptr(), read(), ACE_SOCK_Dgram::recv(), ACE_Message_Block::reset(), ACE_Message_Block::size(), OpenDDS::DCPS::Serializer::skip(), OpenDDS::RTPS::SMHDR_SZ, OpenDDS::RTPS::DataSubmessage::smHeader, socket(), ACE_Message_Block::space(), OpenDDS::RTPS::SubmessageHeader::submessageLength, OpenDDS::DCPS::Serializer::swap_bytes(), unicast_socket_, ACE_Message_Block::wr_ptr(), and OpenDDS::RTPS::DataSubmessage::writerId.

01555 {
01556   const ACE_SOCK_Dgram& socket = (h == unicast_socket_.get_handle())
01557                                  ? unicast_socket_ : multicast_socket_;
01558   ACE_INET_Addr remote;
01559   buff_.reset();
01560   const ssize_t bytes = socket.recv(buff_.wr_ptr(), buff_.space(), remote);
01561 
01562   if (bytes > 0) {
01563     buff_.wr_ptr(bytes);
01564   } else if (bytes == 0) {
01565     return -1;
01566   } else {
01567     ACE_DEBUG((
01568           LM_ERROR,
01569           ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::handle_input() - ")
01570           ACE_TEXT("error reading from %C socket %p\n")
01571           , (h == unicast_socket_.get_handle()) ? "unicast" : "multicast",
01572           ACE_TEXT("ACE_SOCK_Dgram::recv")));
01573     return -1;
01574   }
01575 
01576   // Handle some RTI protocol multicast to the same address
01577   if ((buff_.size() >= 4) && (!ACE_OS::memcmp(buff_.rd_ptr(), "RTPX", 4))) {
01578     return 0; // Ignore
01579   }
01580 
01581   DCPS::Serializer ser(&buff_, false, DCPS::Serializer::ALIGN_CDR);
01582   Header header;
01583   if (!(ser >> header)) {
01584     ACE_ERROR((LM_ERROR,
01585                ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::handle_input() - ")
01586                ACE_TEXT("failed to deserialize RTPS header for SPDP\n")));
01587     return 0;
01588   }
01589 
01590   while (buff_.length() > 3) {
01591     const char subm = buff_.rd_ptr()[0], flags = buff_.rd_ptr()[1];
01592     ser.swap_bytes((flags & FLAG_E) != ACE_CDR_BYTE_ORDER);
01593     const size_t start = buff_.length();
01594     CORBA::UShort submessageLength = 0;
01595     switch (subm) {
01596     case DATA: {
01597       DataSubmessage data;
01598       if (!(ser >> data)) {
01599         ACE_ERROR((
01600               LM_ERROR,
01601               ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::handle_input() - ")
01602               ACE_TEXT("failed to deserialize DATA header for SPDP\n")));
01603         return 0;
01604       }
01605       submessageLength = data.smHeader.submessageLength;
01606 
01607       if (data.writerId != ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER) {
01608         // Not our message: this could be the same multicast group used
01609         // for SEDP and other traffic.
01610         break;
01611       }
01612 
01613       ParameterList plist;
01614       if (data.smHeader.flags & (FLAG_D | FLAG_K_IN_DATA)) {
01615         ser.swap_bytes(!ACE_CDR_BYTE_ORDER); // read "encap" itself in LE
01616         CORBA::UShort encap, options;
01617         if (!(ser >> encap) || (encap != encap_LE && encap != encap_BE)) {
01618           ACE_ERROR((LM_ERROR,
01619             ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::handle_input() - ")
01620             ACE_TEXT("failed to deserialize encapsulation header for SPDP\n")));
01621           return 0;
01622         }
01623         ser >> options;
01624         // bit 8 in encap is on if it's PL_CDR_LE
01625         ser.swap_bytes(((encap & 0x100) >> 8) != ACE_CDR_BYTE_ORDER);
01626         if (!(ser >> plist)) {
01627           ACE_ERROR((LM_ERROR,
01628             ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::handle_input() - ")
01629             ACE_TEXT("failed to deserialize data payload for SPDP\n")));
01630           return 0;
01631         }
01632       } else {
01633         plist.length(1);
01634         RepoId guid;
01635         std::memcpy(guid.guidPrefix, header.guidPrefix, sizeof(GuidPrefix_t));
01636         guid.entityId = ENTITYID_PARTICIPANT;
01637         plist[0].guid(guid);
01638         plist[0]._d(PID_PARTICIPANT_GUID);
01639       }
01640 
01641       outer_->data_received(data, plist);
01642       break;
01643     }
01644     default:
01645       SubmessageHeader smHeader;
01646       if (!(ser >> smHeader)) {
01647         ACE_ERROR((
01648               LM_ERROR,
01649               ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::handle_input() - ")
01650               ACE_TEXT("failed to deserialize SubmessageHeader for SPDP\n")));
01651         return 0;
01652       }
01653       submessageLength = smHeader.submessageLength;
01654       break;
01655     }
01656     if (submessageLength && buff_.length()) {
01657       const size_t read = start - buff_.length();
01658       if (read < static_cast<size_t>(submessageLength + SMHDR_SZ)) {
01659         if (!ser.skip(static_cast<CORBA::UShort>(submessageLength + SMHDR_SZ
01660                                                  - read))) {
01661           ACE_ERROR((LM_ERROR,
01662             ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::handle_input() - ")
01663             ACE_TEXT("failed to skip sub message length\n")));
01664           return 0;
01665         }
01666       }
01667     } else if (!submessageLength) {
01668       break; // submessageLength of 0 indicates the last submessage
01669     }
01670   }
01671 
01672   return 0;
01673 }

Here is the call graph for this function:

int OpenDDS::RTPS::Spdp::SpdpTransport::handle_timeout ( const ACE_Time_Value tv,
const void *   
) [virtual]

Reimplemented from ACE_Event_Handler.

Definition at line 1538 of file Spdp.cpp.

References disco_resend_period_, last_disco_resend_, outer_, OpenDDS::RTPS::Spdp::remove_expired_participants(), and write().

01539 {
01540   if (tv > last_disco_resend_ + disco_resend_period_) {
01541     write();
01542     outer_->remove_expired_participants();
01543     last_disco_resend_ = tv;
01544   }
01545 
01546 #if defined(OPENDDS_SECURITY)
01547   outer_->check_auth_states(tv);
01548 #endif
01549 
01550   return 0;
01551 }

Here is the call graph for this function:

void OpenDDS::RTPS::Spdp::SpdpTransport::open ( void   ) 

Definition at line 1375 of file Spdp.cpp.

References OpenDDS::RTPS::Spdp::disco_, disco_resend_period_, ACE_IPC_SAP::get_handle(), last_disco_resend_, multicast_socket_, outer_, OpenDDS::RTPS::Spdp::reactor(), ACE_Event_Handler::reactor(), ACE_Event_Handler::READ_MASK, ACE_Reactor::register_handler(), OpenDDS::RTPS::RtpsDiscovery::resend_period(), ACE_Reactor::schedule_timer(), and unicast_socket_.

Referenced by OpenDDS::RTPS::Spdp::init_bit().

01376 {
01377   ACE_Reactor* reactor = outer_->reactor();
01378   if (reactor->register_handler(unicast_socket_.get_handle(),
01379                                 this, ACE_Event_Handler::READ_MASK) != 0) {
01380     throw std::runtime_error("failed to register unicast input handler");
01381   }
01382 
01383   if (reactor->register_handler(multicast_socket_.get_handle(),
01384                                 this, ACE_Event_Handler::READ_MASK) != 0) {
01385     throw std::runtime_error("failed to register multicast input handler");
01386   }
01387 
01388   disco_resend_period_ = outer_->disco_->resend_period();
01389   last_disco_resend_ = 0;
01390 
01391   ACE_Time_Value timer_period = disco_resend_period_ < MAX_SPDP_TIMER_PERIOD ? disco_resend_period_ : MAX_SPDP_TIMER_PERIOD;
01392 
01393   if (-1 == reactor->schedule_timer(this, 0, ACE_Time_Value(0), timer_period)) {
01394     throw std::runtime_error("failed to schedule timer with reactor");
01395   }
01396 }

Here is the call graph for this function:

Here is the caller graph for this function:

bool OpenDDS::RTPS::Spdp::SpdpTransport::open_unicast_socket ( u_short  port_common,
u_short  participant_id 
)

Definition at line 1696 of file Spdp.cpp.

References ACE_TEXT(), OpenDDS::RTPS::RtpsDiscovery::d1(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::RTPS::Spdp::disco_, LM_ERROR, LM_INFO, LM_WARNING, OpenDDS::DCPS::open_appropriate_socket_type(), OPENDDS_STRING, outer_, OpenDDS::RTPS::RtpsDiscovery::pg(), ACE_INET_Addr::set(), OpenDDS::DCPS::set_socket_multicast_ttl(), OpenDDS::RTPS::RtpsDiscovery::spdp_local_address(), OpenDDS::RTPS::RtpsDiscovery::ttl(), and unicast_socket_.

Referenced by SpdpTransport().

01698 {
01699   const u_short uni_port = port_common + outer_->disco_->d1() +
01700                            (outer_->disco_->pg() * participant_id);
01701 
01702   ACE_INET_Addr local_addr;
01703   OPENDDS_STRING spdpaddr = outer_->disco_->spdp_local_address().c_str();
01704 
01705   if (spdpaddr.empty()) {
01706     spdpaddr = "0.0.0.0";
01707   }
01708 
01709   if (0 != local_addr.set(uni_port, spdpaddr.c_str())) {
01710     ACE_DEBUG((
01711           LM_ERROR,
01712           ACE_TEXT("(%P|%t) Spdp::SpdpTransport::open_unicast_socket() - ")
01713           ACE_TEXT("failed setting unicast local_addr to port %d %p\n"),
01714           uni_port, ACE_TEXT("ACE_INET_Addr::set")));
01715     throw std::runtime_error("failed to set unicast local address");
01716   }
01717 
01718   if (!DCPS::open_appropriate_socket_type(unicast_socket_, local_addr)) {
01719     if (DCPS::DCPS_debug_level > 3) {
01720       ACE_DEBUG((
01721             LM_WARNING,
01722             ACE_TEXT("(%P|%t) Spdp::SpdpTransport::open_unicast_socket() - ")
01723             ACE_TEXT("failed to open_appropriate_socket_type unicast socket on port %d %p.  ")
01724             ACE_TEXT("Trying next participantId...\n"),
01725             uni_port, ACE_TEXT("ACE_SOCK_Dgram::open")));
01726     }
01727     return false;
01728 
01729   } else if (DCPS::DCPS_debug_level > 3) {
01730     ACE_DEBUG((
01731           LM_INFO,
01732           ACE_TEXT("(%P|%t) Spdp::SpdpTransport::open_unicast_socket() - ")
01733           ACE_TEXT("opened unicast socket on port %d\n"),
01734           uni_port));
01735   }
01736 
01737   if (!DCPS::set_socket_multicast_ttl(unicast_socket_, outer_->disco_->ttl())) {
01738     ACE_ERROR((LM_ERROR,
01739                ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::open_unicast_socket() - ")
01740                ACE_TEXT("failed to set TTL value to %d ")
01741                ACE_TEXT("for port:%hd %p\n"),
01742                outer_->disco_->ttl(), uni_port, ACE_TEXT("DCPS::set_socket_multicast_ttl:")));
01743     throw std::runtime_error("failed to set TTL");
01744   }
01745   return true;
01746 }

Here is the call graph for this function:

Here is the caller graph for this function:

OpenDDS::RTPS::Spdp::SpdpTransport::OPENDDS_SET ( ACE_INET_Addr   ) 

Referenced by dispose_unregister(), and write_i().

Here is the caller graph for this function:

void OpenDDS::RTPS::Spdp::SpdpTransport::write (  ) 

Definition at line 1480 of file Spdp.cpp.

References OpenDDS::DCPS::LocalParticipant< EndpointManagerType >::lock_, outer_, and write_i().

Referenced by handle_timeout().

01481 {
01482   ACE_GUARD(ACE_Thread_Mutex, g, outer_->lock_);
01483   write_i();
01484 }

Here is the call graph for this function:

Here is the caller graph for this function:

void OpenDDS::RTPS::Spdp::SpdpTransport::write_i (  ) 

Definition at line 1487 of file Spdp.cpp.

References ACE_TEXT(), OpenDDS::DCPS::Serializer::ALIGN_CDR, OpenDDS::RTPS::Spdp::build_local_pdata(), data_, OpenDDS::Security::DPDK_ENHANCED, OpenDDS::Security::DPDK_ORIGINAL, OpenDDS::DCPS::SequenceNumber::getHigh(), OpenDDS::DCPS::SequenceNumber::getLow(), hdr_, OpenDDS::RTPS::SequenceNumber_t::high, ACE_Message_Block::length(), LM_ERROR, OpenDDS::RTPS::SequenceNumber_t::low, OPENDDS_SET(), outer_, ACE_Message_Block::rd_ptr(), ACE_Message_Block::reset(), ACE_SOCK_Dgram::send(), seq_, OpenDDS::RTPS::ParameterListConverter::to_param_list(), unicast_socket_, wbuff_, and OpenDDS::RTPS::DataSubmessage::writerSN.

Referenced by OpenDDS::RTPS::Spdp::handle_participant_data(), and write().

01488 {
01489 #if defined(OPENDDS_SECURITY)
01490   const Security::SPDPdiscoveredParticipantData& pdata =
01491     outer_->build_local_pdata(outer_->is_security_enabled() ?
01492                               Security::DPDK_ENHANCED :
01493                               Security::DPDK_ORIGINAL);
01494 #else
01495     const Security::SPDPdiscoveredParticipantData& pdata =
01496       outer_->build_local_pdata(Security::DPDK_ORIGINAL);
01497 #endif
01498 
01499   data_.writerSN.high = seq_.getHigh();
01500   data_.writerSN.low = seq_.getLow();
01501   ++seq_;
01502 
01503   ParameterList plist;
01504   if (ParameterListConverter::to_param_list(pdata, plist) < 0) {
01505     ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
01506       ACE_TEXT("Spdp::SpdpTransport::write() - ")
01507       ACE_TEXT("failed to convert from SPDPdiscoveredParticipantData ")
01508       ACE_TEXT("to ParameterList\n")));
01509     return;
01510   }
01511 
01512   wbuff_.reset();
01513   CORBA::UShort options = 0;
01514   DCPS::Serializer ser(&wbuff_, false, DCPS::Serializer::ALIGN_CDR);
01515   if (!(ser << hdr_) || !(ser << data_) || !(ser << encap_LE) || !(ser << options)
01516       || !(ser << plist)) {
01517     ACE_ERROR((LM_ERROR,
01518       ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::write() - ")
01519       ACE_TEXT("failed to serialize headers for SPDP\n")));
01520     return;
01521   }
01522 
01523   typedef OPENDDS_SET(ACE_INET_Addr)::const_iterator iter_t;
01524   for (iter_t iter = send_addrs_.begin(); iter != send_addrs_.end(); ++iter) {
01525     const ssize_t res =
01526       unicast_socket_.send(wbuff_.rd_ptr(), wbuff_.length(), *iter);
01527     if (res < 0) {
01528       ACE_TCHAR addr_buff[256] = {};
01529       iter->addr_to_string(addr_buff, 256, 0);
01530       ACE_ERROR((LM_ERROR,
01531         ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::write() - ")
01532         ACE_TEXT("destination %s failed %p\n"), addr_buff, ACE_TEXT("send")));
01533     }
01534   }
01535 }

Here is the call graph for this function:

Here is the caller graph for this function:


Member Data Documentation

Definition at line 176 of file Spdp.h.

Referenced by handle_input().

Definition at line 170 of file Spdp.h.

Referenced by dispose_unregister(), SpdpTransport(), and write_i().

Definition at line 177 of file Spdp.h.

Referenced by handle_timeout(), and open().

Definition at line 169 of file Spdp.h.

Referenced by dispose_unregister(), SpdpTransport(), and write_i().

Definition at line 178 of file Spdp.h.

Referenced by handle_timeout(), and open().

Definition at line 172 of file Spdp.h.

Definition at line 174 of file Spdp.h.

Referenced by close(), handle_input(), open(), SpdpTransport(), and ~SpdpTransport().

Definition at line 171 of file Spdp.h.

Referenced by dispose_unregister(), and write_i().

Definition at line 176 of file Spdp.h.

Referenced by dispose_unregister(), and write_i().


The documentation for this struct was generated from the following files:
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1