00001
00002
00003
00004
00005
00006
00007
00008 #ifndef OPENDDS_RTPS_SPDP_H
00009 #define OPENDDS_RTPS_SPDP_H
00010
00011 #include "dds/DdsDcpsInfrastructureC.h"
00012 #include "dds/DdsDcpsInfoUtilsC.h"
00013 #include "dds/DdsDcpsCoreTypeSupportImpl.h"
00014
00015 #include "dds/DCPS/RcObject_T.h"
00016 #include "dds/DCPS/GuidUtils.h"
00017 #include "dds/DCPS/Definitions.h"
00018
00019 #include "RtpsCoreC.h"
00020 #include "Sedp.h"
00021 #include "rtps_export.h"
00022
00023 #include "ace/Atomic_Op.h"
00024 #include "ace/SOCK_Dgram.h"
00025 #include "ace/SOCK_Dgram_Mcast.h"
00026 #include "ace/Condition_Thread_Mutex.h"
00027
00028 #include "dds/DCPS/PoolAllocator.h"
00029 #include "dds/DCPS/PoolAllocationBase.h"
00030
00031 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00032 #pragma once
00033 #endif
00034
00035 namespace OpenDDS {
00036 namespace RTPS {
00037
00038 class RtpsDiscovery;
00039
00040
00041
00042 class OpenDDS_Rtps_Export Spdp : public OpenDDS::DCPS::LocalParticipant<Sedp> {
00043 public:
00044 Spdp(DDS::DomainId_t domain, DCPS::RepoId& guid,
00045 const DDS::DomainParticipantQos& qos, RtpsDiscovery* disco);
00046 ~Spdp();
00047
00048
00049 void init_bit(const DDS::Subscriber_var& bit_subscriber);
00050 void fini_bit();
00051
00052 bool get_default_locators(const DCPS::RepoId& part_id,
00053 OpenDDS::DCPS::LocatorSeq& target,
00054 bool& inlineQos);
00055
00056
00057 void signal_liveliness(DDS::LivelinessQosPolicyKind kind);
00058
00059
00060 bool shutting_down() { return shutdown_flag_.value(); }
00061
00062 bool associated() const;
00063 bool has_discovered_participant(const DCPS::RepoId& guid);
00064
00065 WaitForAcks& wait_for_acks();
00066
00067 protected:
00068 Sedp& endpoint_manager() { return sedp_; }
00069
00070 private:
00071 ACE_Reactor* reactor() const;
00072
00073 RtpsDiscovery* disco_;
00074
00075
00076 const DDS::DomainId_t domain_;
00077 DCPS::RepoId guid_;
00078 OpenDDS::DCPS::LocatorSeq sedp_unicast_, sedp_multicast_;
00079
00080 void data_received(const DataSubmessage& data, const ParameterList& plist);
00081
00082 #ifndef DDS_HAS_MINIMUM_BIT
00083 DDS::ParticipantBuiltinTopicDataDataReaderImpl* part_bit();
00084 #endif
00085
00086 struct SpdpTransport : ACE_Event_Handler, public OpenDDS::DCPS::PoolAllocationBase {
00087 explicit SpdpTransport(Spdp* outer);
00088 ~SpdpTransport();
00089
00090 virtual int handle_timeout(const ACE_Time_Value&, const void*);
00091 virtual int handle_input(ACE_HANDLE h);
00092 virtual int handle_exception(ACE_HANDLE fd = ACE_INVALID_HANDLE);
00093
00094 void open();
00095 void write();
00096 void write_i();
00097 void close();
00098 void dispose_unregister();
00099 bool open_unicast_socket(u_short port_common, u_short participant_id);
00100 void acknowledge();
00101
00102 Spdp* outer_;
00103 Header hdr_;
00104 DataSubmessage data_;
00105 DCPS::SequenceNumber seq_;
00106 ACE_Time_Value lease_duration_;
00107 ACE_SOCK_Dgram unicast_socket_;
00108 ACE_SOCK_Dgram_Mcast multicast_socket_;
00109 OPENDDS_SET(ACE_INET_Addr) send_addrs_;
00110 ACE_Message_Block buff_, wbuff_;
00111
00112 } *tport_;
00113
00114 ACE_Event_Handler_var eh_;
00115 bool eh_shutdown_;
00116 ACE_Condition_Thread_Mutex shutdown_cond_;
00117 ACE_Atomic_Op<ACE_Thread_Mutex, long> shutdown_flag_;
00118
00119 void remove_expired_participants();
00120 void get_discovered_participant_ids(DCPS::RepoIdSet& results) const;
00121
00122 Sedp sedp_;
00123
00124
00125 WaitForAcks wait_for_acks_;
00126 };
00127
00128 }
00129 }
00130
00131 #endif // OPENDDS_RTPS_SPDP_H