Spdp.h

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #ifndef OPENDDS_RTPS_SPDP_H
00009 #define OPENDDS_RTPS_SPDP_H
00010 
00011 #include "dds/DdsDcpsInfrastructureC.h"
00012 #include "dds/DdsDcpsInfoUtilsC.h"
00013 #include "dds/DdsDcpsCoreTypeSupportImpl.h"
00014 
00015 #include "dds/DCPS/RcObject_T.h"
00016 #include "dds/DCPS/GuidUtils.h"
00017 #include "dds/DCPS/Definitions.h"
00018 
00019 #include "RtpsCoreC.h"
00020 #include "Sedp.h"
00021 #include "rtps_export.h"
00022 
00023 #include "ace/Atomic_Op.h"
00024 #include "ace/SOCK_Dgram.h"
00025 #include "ace/SOCK_Dgram_Mcast.h"
00026 #include "ace/Condition_Thread_Mutex.h"
00027 
00028 #include "dds/DCPS/PoolAllocator.h"
00029 #include "dds/DCPS/PoolAllocationBase.h"
00030 
00031 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00032 #pragma once
00033 #endif /* ACE_LACKS_PRAGMA_ONCE */
00034 
00035 namespace OpenDDS {
00036 namespace RTPS {
00037 
00038 class RtpsDiscovery;
00039 
00040 /// Each instance of class Spdp represents the implementation of the RTPS
00041 /// Simple Participant Discovery Protocol for a single local DomainParticipant.
00042 class OpenDDS_Rtps_Export Spdp : public OpenDDS::DCPS::LocalParticipant<Sedp> {
00043 public:
00044   Spdp(DDS::DomainId_t domain, DCPS::RepoId& guid,
00045        const DDS::DomainParticipantQos& qos, RtpsDiscovery* disco);
00046   ~Spdp();
00047 
00048   // Participant
00049   void init_bit(const DDS::Subscriber_var& bit_subscriber);
00050   void fini_bit();
00051 
00052   bool get_default_locators(const DCPS::RepoId& part_id,
00053                             OpenDDS::DCPS::LocatorSeq& target,
00054                             bool& inlineQos);
00055 
00056   // Managing reader/writer associations
00057   void signal_liveliness(DDS::LivelinessQosPolicyKind kind);
00058 
00059   // Is Spdp shutting down?
00060   bool shutting_down() { return shutdown_flag_.value(); }
00061 
00062   bool associated() const;
00063   bool has_discovered_participant(const DCPS::RepoId& guid);
00064 
00065   WaitForAcks& wait_for_acks();
00066 
00067 protected:
00068   Sedp& endpoint_manager() { return sedp_; }
00069 
00070 private:
00071   ACE_Reactor* reactor() const;
00072 
00073   RtpsDiscovery* disco_;
00074 
00075   // Participant:
00076   const DDS::DomainId_t domain_;
00077   DCPS::RepoId guid_;
00078   OpenDDS::DCPS::LocatorSeq sedp_unicast_, sedp_multicast_;
00079 
00080   void data_received(const DataSubmessage& data, const ParameterList& plist);
00081 
00082 #ifndef DDS_HAS_MINIMUM_BIT
00083   DDS::ParticipantBuiltinTopicDataDataReaderImpl* part_bit();
00084 #endif /* DDS_HAS_MINIMUM_BIT */
00085 
00086   struct SpdpTransport : ACE_Event_Handler, public OpenDDS::DCPS::PoolAllocationBase {
00087     explicit SpdpTransport(Spdp* outer);
00088     ~SpdpTransport();
00089 
00090     virtual int handle_timeout(const ACE_Time_Value&, const void*);
00091     virtual int handle_input(ACE_HANDLE h);
00092     virtual int handle_exception(ACE_HANDLE fd = ACE_INVALID_HANDLE);
00093 
00094     void open();
00095     void write();
00096     void write_i();
00097     void close();
00098     void dispose_unregister();
00099     bool open_unicast_socket(u_short port_common, u_short participant_id);
00100     void acknowledge();
00101 
00102     Spdp* outer_;
00103     Header hdr_;
00104     DataSubmessage data_;
00105     DCPS::SequenceNumber seq_;
00106     ACE_Time_Value lease_duration_;
00107     ACE_SOCK_Dgram unicast_socket_;
00108     ACE_SOCK_Dgram_Mcast multicast_socket_;
00109     OPENDDS_SET(ACE_INET_Addr) send_addrs_;
00110     ACE_Message_Block buff_, wbuff_;
00111 
00112   } *tport_;
00113 
00114   ACE_Event_Handler_var eh_; // manages our refcount on tport_
00115   bool eh_shutdown_;
00116   ACE_Condition_Thread_Mutex shutdown_cond_;
00117   ACE_Atomic_Op<ACE_Thread_Mutex, long> shutdown_flag_; // Spdp shutting down
00118 
00119   void remove_expired_participants();
00120   void get_discovered_participant_ids(DCPS::RepoIdSet& results) const;
00121 
00122   Sedp sedp_;
00123   // wait for acknowledgments from SpdpTransport and Sedp::Task
00124   // when BIT is being removed (fini_bit)
00125   WaitForAcks wait_for_acks_;
00126 };
00127 
00128 }
00129 }
00130 
00131 #endif // OPENDDS_RTPS_SPDP_H

Generated on Fri Feb 12 20:05:27 2016 for OpenDDS by  doxygen 1.4.7