Sedp.h

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #ifndef OPENDDS_RTPS_SEDP_H
00009 #define OPENDDS_RTPS_SEDP_H
00010 
00011 #include "dds/DdsDcpsInfrastructureC.h"
00012 #include "dds/DdsDcpsInfoUtilsC.h"
00013 #include "dds/DdsDcpsCoreTypeSupportImpl.h"
00014 
00015 #include "dds/DCPS/RTPS/RtpsCoreTypeSupportImpl.h"
00016 #include "dds/DCPS/RTPS/BaseMessageTypes.h"
00017 #include "dds/DCPS/RTPS/BaseMessageUtils.h"
00018 
00019 #include "dds/DCPS/RcHandle_T.h"
00020 #include "dds/DCPS/GuidUtils.h"
00021 #include "dds/DCPS/DataReaderCallbacks.h"
00022 #include "dds/DCPS/Definitions.h"
00023 #include "dds/DCPS/BuiltInTopicUtils.h"
00024 #include "dds/DCPS/DataSampleElement.h"
00025 #include "dds/DCPS/DataSampleHeader.h"
00026 #include "dds/DCPS/PoolAllocationBase.h"
00027 #include "dds/DCPS/DiscoveryBase.h"
00028 
00029 #include "dds/DCPS/transport/framework/TransportRegistry.h"
00030 #include "dds/DCPS/transport/framework/TransportSendListener.h"
00031 #include "dds/DCPS/transport/framework/TransportClient.h"
00032 #include "dds/DCPS/transport/framework/TransportInst_rch.h"
00033 
00034 #include "ace/Task_Ex_T.h"
00035 #include "ace/Condition_Thread_Mutex.h"
00036 #include "ace/Thread_Mutex.h"
00037 #include "dds/DCPS/PoolAllocator.h"
00038 
00039 
00040 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00041 #pragma once
00042 #endif /* ACE_LACKS_PRAGMA_ONCE */
00043 
00044 namespace OpenDDS {
00045 namespace RTPS {
00046 
00047 enum RtpsFlags { FLAG_E = 1, FLAG_Q = 2, FLAG_D = 4 };
00048 
00049 class RtpsDiscovery;
00050 class Spdp;
00051 
00052 class WaitForAcks;
00053 
00054 class Sedp : public OpenDDS::DCPS::EndpointManager<SPDPdiscoveredParticipantData> {
00055 public:
00056   Sedp(const DCPS::RepoId& participant_id,
00057        Spdp& owner,
00058        ACE_Thread_Mutex& lock);
00059 
00060   DDS::ReturnCode_t init(const DCPS::RepoId& guid,
00061                          const RtpsDiscovery& disco,
00062                          DDS::DomainId_t domainId);
00063 
00064   /// request for acknowledgement from all Sedp threads (Task)
00065   void acknowledge();
00066 
00067   void shutdown();
00068   void unicast_locators(OpenDDS::DCPS::LocatorSeq& locators) const;
00069 
00070   // @brief return the ip address we have bound to.
00071   // Valid after init() call
00072   const ACE_INET_Addr& local_address() const;
00073   const ACE_INET_Addr& multicast_group() const;
00074   bool map_ipv4_to_ipv6() const;
00075 
00076   void associate(const SPDPdiscoveredParticipantData& pdata);
00077   bool disassociate(const SPDPdiscoveredParticipantData& pdata);
00078 
00079   // Topic
00080   bool update_topic_qos(const DCPS::RepoId& topicId, const DDS::TopicQos& qos,
00081                         OPENDDS_STRING& name);
00082 
00083   // Publication
00084   bool update_publication_qos(const DCPS::RepoId& publicationId,
00085                               const DDS::DataWriterQos& qos,
00086                               const DDS::PublisherQos& publisherQos);
00087 
00088   // Subscription
00089   bool update_subscription_qos(const DCPS::RepoId& subscriptionId,
00090                                const DDS::DataReaderQos& qos,
00091                                const DDS::SubscriberQos& subscriberQos);
00092   bool update_subscription_params(const DCPS::RepoId& subId,
00093                                   const DDS::StringSeq& params);
00094 
00095   // Managing reader/writer associations
00096   void association_complete(const DCPS::RepoId& localId,
00097                             const DCPS::RepoId& remoteId);
00098 
00099   void signal_liveliness(DDS::LivelinessQosPolicyKind kind);
00100 
00101   static const bool host_is_bigendian_;
00102 private:
00103   Spdp& spdp_;
00104 
00105   struct Msg : public OpenDDS::DCPS::PoolAllocationBase {
00106     enum MsgType { MSG_PARTICIPANT, MSG_WRITER, MSG_READER, MSG_PARTICIPANT_DATA,
00107                    MSG_REMOVE_FROM_PUB_BIT, MSG_REMOVE_FROM_SUB_BIT,
00108                    MSG_FINI_BIT, MSG_STOP } type_;
00109     DCPS::MessageId id_;
00110     union {
00111       const SPDPdiscoveredParticipantData* dpdata_;
00112       const OpenDDS::DCPS::DiscoveredWriterData* wdata_;
00113       const OpenDDS::DCPS::DiscoveredReaderData* rdata_;
00114       const ParticipantMessageData* pmdata_;
00115       DDS::InstanceHandle_t ih_;
00116     };
00117     Msg(MsgType mt, DCPS::MessageId id, const SPDPdiscoveredParticipantData* dpdata)
00118       : type_(mt), id_(id), dpdata_(dpdata) {}
00119     Msg(MsgType mt, DCPS::MessageId id, const OpenDDS::DCPS::DiscoveredWriterData* wdata)
00120       : type_(mt), id_(id), wdata_(wdata) {}
00121     Msg(MsgType mt, DCPS::MessageId id, const OpenDDS::DCPS::DiscoveredReaderData* rdata)
00122       : type_(mt), id_(id), rdata_(rdata) {}
00123     Msg(MsgType mt, DCPS::MessageId id, const ParticipantMessageData* pmdata)
00124       : type_(mt), id_(id), pmdata_(pmdata) {}
00125     Msg(MsgType mt, DCPS::MessageId id, DDS::InstanceHandle_t ih)
00126       : type_(mt), id_(id), ih_(ih) {}
00127   };
00128 
00129   class Endpoint : public DCPS::TransportClient {
00130   public:
00131     Endpoint(const DCPS::RepoId& repo_id, Sedp& sedp)
00132       : repo_id_(repo_id)
00133       , sedp_(sedp)
00134     {}
00135 
00136     virtual ~Endpoint();
00137 
00138     // Implementing TransportClient
00139     bool check_transport_qos(const DCPS::TransportInst&)
00140       { return true; }
00141     const DCPS::RepoId& get_repo_id() const
00142       { return repo_id_; }
00143     DDS::DomainId_t domain_id() const
00144       { return 0; } // not used for SEDP
00145     CORBA::Long get_priority_value(const DCPS::AssociationData&) const
00146       { return 0; }
00147 
00148     using DCPS::TransportClient::enable_transport_using_config;
00149     using DCPS::TransportClient::disassociate;
00150 
00151   protected:
00152     DCPS::RepoId repo_id_;
00153     Sedp& sedp_;
00154   };
00155 
00156   class Writer : public DCPS::TransportSendListener, public Endpoint {
00157   public:
00158     Writer(const DCPS::RepoId& pub_id, Sedp& sedp);
00159     virtual ~Writer();
00160 
00161     bool assoc(const DCPS::AssociationData& subscription);
00162 
00163     // Implementing TransportSendListener
00164     void data_delivered(const DCPS::DataSampleElement*);
00165 
00166     void data_dropped(const DCPS::DataSampleElement*, bool by_transport);
00167 
00168     void control_delivered(ACE_Message_Block* sample);
00169 
00170     void control_dropped(ACE_Message_Block* sample,
00171                          bool dropped_by_transport);
00172 
00173     void notify_publication_disconnected(const DCPS::ReaderIdSeq&) {}
00174     void notify_publication_reconnected(const DCPS::ReaderIdSeq&) {}
00175     void notify_publication_lost(const DCPS::ReaderIdSeq&) {}
00176     void notify_connection_deleted(const DCPS::RepoId&) {}
00177     void remove_associations(const DCPS::ReaderIdSeq&, bool) {}
00178     void retrieve_inline_qos_data(InlineQosData&) const {}
00179 
00180     DDS::ReturnCode_t write_sample(const ParameterList& plist,
00181                                    const DCPS::RepoId& reader,
00182                                    DCPS::SequenceNumber& sequence);
00183     DDS::ReturnCode_t write_sample(const ParticipantMessageData& pmd,
00184                                    const DCPS::RepoId& reader,
00185                                    DCPS::SequenceNumber& sequence);
00186     DDS::ReturnCode_t write_unregister_dispose(const DCPS::RepoId& rid);
00187 
00188     void end_historic_samples(const DCPS::RepoId& reader);
00189 
00190   private:
00191     DCPS::TransportSendElementAllocator alloc_;
00192     Header header_;
00193     DCPS::SequenceNumber seq_;
00194 
00195     void write_control_msg(ACE_Message_Block& payload,
00196                            size_t size,
00197                            DCPS::MessageId id,
00198                            DCPS::SequenceNumber seq = DCPS::SequenceNumber());
00199 
00200     void set_header_fields(DCPS::DataSampleHeader& dsh,
00201                            size_t size,
00202                            const DCPS::RepoId& reader,
00203                            DCPS::SequenceNumber& sequence,
00204                            DCPS::MessageId id = DCPS::SAMPLE_DATA);
00205 
00206   } publications_writer_, subscriptions_writer_, participant_message_writer_;
00207 
00208   class Reader
00209     : public DCPS::TransportReceiveListener
00210     , public Endpoint
00211     , public DCPS::RcObject<ACE_SYNCH_MUTEX>
00212   {
00213   public:
00214     Reader(const DCPS::RepoId& sub_id, Sedp& sedp)
00215       : Endpoint(sub_id, sedp)
00216       , shutting_down_(0)
00217     {}
00218 
00219     virtual ~Reader();
00220 
00221     bool assoc(const DCPS::AssociationData& publication);
00222 
00223     // Implementing TransportReceiveListener
00224 
00225     void data_received(const DCPS::ReceivedDataSample& sample);
00226 
00227     void notify_subscription_disconnected(const DCPS::WriterIdSeq&) {}
00228     void notify_subscription_reconnected(const DCPS::WriterIdSeq&) {}
00229     void notify_subscription_lost(const DCPS::WriterIdSeq&) {}
00230     void notify_connection_deleted(const DCPS::RepoId&) {}
00231     void remove_associations(const DCPS::WriterIdSeq&, bool) {}
00232 
00233     void listener_add_ref() { _add_ref(); }
00234     void listener_remove_ref() { _remove_ref(); }
00235 
00236     ACE_Atomic_Op<ACE_SYNCH_MUTEX, long> shutting_down_;
00237   };
00238 
00239   typedef DCPS::RcHandle<Reader> Reader_rch;
00240 
00241   Reader_rch publications_reader_, subscriptions_reader_, participant_message_reader_;
00242 
00243   struct Task : ACE_Task_Ex<ACE_MT_SYNCH, Msg> {
00244     explicit Task(Sedp* sedp)
00245       : spdp_(&sedp->spdp_)
00246       , sedp_(sedp)
00247       , shutting_down_(false)
00248     {
00249       activate();
00250     }
00251     ~Task();
00252 
00253     void enqueue(const SPDPdiscoveredParticipantData* pdata);
00254     void enqueue(DCPS::MessageId id, const OpenDDS::DCPS::DiscoveredWriterData* wdata);
00255     void enqueue(DCPS::MessageId id, const OpenDDS::DCPS::DiscoveredReaderData* rdata);
00256     void enqueue(DCPS::MessageId id, const ParticipantMessageData* data);
00257     void enqueue(Msg::MsgType which_bit, const DDS::InstanceHandle_t bit_ih);
00258 
00259     void acknowledge();
00260     void shutdown();
00261 
00262 
00263   private:
00264     int svc();
00265 
00266     void svc_i(const SPDPdiscoveredParticipantData* pdata);
00267     void svc_i(DCPS::MessageId id, const OpenDDS::DCPS::DiscoveredWriterData* wdata);
00268     void svc_i(DCPS::MessageId id, const OpenDDS::DCPS::DiscoveredReaderData* rdata);
00269     void svc_i(DCPS::MessageId id, const ParticipantMessageData* data);
00270     void svc_i(Msg::MsgType which_bit, const DDS::InstanceHandle_t bit_ih);
00271 
00272     Spdp* spdp_;
00273     Sedp* sedp_;
00274     bool shutting_down_;
00275   } task_;
00276 
00277   // Transport
00278   DCPS::TransportInst_rch transport_inst_;
00279 
00280 #ifndef DDS_HAS_MINIMUM_BIT
00281   DDS::TopicBuiltinTopicDataDataReaderImpl* topic_bit();
00282   DDS::PublicationBuiltinTopicDataDataReaderImpl* pub_bit();
00283   DDS::SubscriptionBuiltinTopicDataDataReaderImpl* sub_bit();
00284 #endif /* DDS_HAS_MINIMUM_BIT */
00285 
00286   void populate_discovered_writer_msg(
00287       OpenDDS::DCPS::DiscoveredWriterData& dwd,
00288       const DCPS::RepoId& publication_id,
00289       const LocalPublication& pub);
00290 
00291   void populate_discovered_reader_msg(
00292       OpenDDS::DCPS::DiscoveredReaderData& drd,
00293       const DCPS::RepoId& subscription_id,
00294       const LocalSubscription& sub);
00295 
00296   struct LocalParticipantMessage : LocalEndpoint {
00297   };
00298   typedef OPENDDS_MAP_CMP(DCPS::RepoId, LocalParticipantMessage,
00299     DCPS::GUID_tKeyLessThan) LocalParticipantMessageMap;
00300   typedef LocalParticipantMessageMap::iterator LocalParticipantMessageIter;
00301   typedef LocalParticipantMessageMap::const_iterator LocalParticipantMessageCIter;
00302   LocalParticipantMessageMap local_participant_messages_;
00303 
00304   void data_received(DCPS::MessageId message_id,
00305                      const OpenDDS::DCPS::DiscoveredWriterData& wdata);
00306   void data_received(DCPS::MessageId message_id,
00307                      const OpenDDS::DCPS::DiscoveredReaderData& rdata);
00308   void data_received(DCPS::MessageId message_id,
00309                      const ParticipantMessageData& data);
00310 
00311   typedef std::pair<DCPS::MessageId, OpenDDS::DCPS::DiscoveredWriterData> MsgIdWtrDataPair;
00312   typedef OPENDDS_MAP_CMP(DCPS::RepoId, MsgIdWtrDataPair,
00313                    DCPS::GUID_tKeyLessThan) DeferredPublicationMap;
00314   DeferredPublicationMap deferred_publications_;  // Publications that Spdp has not discovered.
00315 
00316   typedef std::pair<DCPS::MessageId, OpenDDS::DCPS::DiscoveredReaderData> MsgIdRdrDataPair;
00317   typedef OPENDDS_MAP_CMP(DCPS::RepoId, MsgIdRdrDataPair,
00318                    DCPS::GUID_tKeyLessThan) DeferredSubscriptionMap;
00319   DeferredSubscriptionMap deferred_subscriptions_; // Subscriptions that Sedp has not discovered.
00320 
00321   void assign_bit_key(DiscoveredPublication& pub);
00322   void assign_bit_key(DiscoveredSubscription& sub);
00323 
00324   template<typename Map>
00325   void remove_entities_belonging_to(Map& m, DCPS::RepoId participant);
00326 
00327   void remove_from_bit_i(const DiscoveredPublication& pub);
00328   void remove_from_bit_i(const DiscoveredSubscription& sub);
00329 
00330   virtual DDS::ReturnCode_t remove_publication_i(const DCPS::RepoId& publicationId);
00331   virtual DDS::ReturnCode_t remove_subscription_i(const DCPS::RepoId& subscriptionId);
00332 
00333   // Topic:
00334 
00335   DCPS::RepoIdSet defer_match_endpoints_, associated_participants_;
00336 
00337   void inconsistent_topic(const DCPS::RepoIdSet& endpoints) const;
00338 
00339   virtual bool shutting_down() const;
00340 
00341   virtual void populate_transport_locator_sequence(DCPS::TransportLocatorSeq*& tls,
00342                                                    DiscoveredSubscriptionIter& iter,
00343                                                    const DCPS::RepoId& reader);
00344 
00345   virtual void populate_transport_locator_sequence(DCPS::TransportLocatorSeq*& tls,
00346                                                    DiscoveredPublicationIter& iter,
00347                                                    const DCPS::RepoId& writer);
00348 
00349   virtual bool defer_writer(const DCPS::RepoId& writer,
00350                             const DCPS::RepoId& writer_participant);
00351 
00352   virtual bool defer_reader(const DCPS::RepoId& reader,
00353                             const DCPS::RepoId& reader_participant);
00354 
00355   static DCPS::RepoId make_id(const DCPS::RepoId& participant_id,
00356                               const EntityId_t& entity);
00357 
00358   static void set_inline_qos(DCPS::TransportLocatorSeq& locators);
00359 
00360   void write_durable_publication_data(const DCPS::RepoId& reader);
00361   void write_durable_subscription_data(const DCPS::RepoId& reader);
00362   void write_durable_participant_message_data(const DCPS::RepoId& reader);
00363 
00364   DDS::ReturnCode_t write_publication_data(const DCPS::RepoId& rid,
00365                                            LocalPublication& pub,
00366                                            const DCPS::RepoId& reader = DCPS::GUID_UNKNOWN);
00367   DDS::ReturnCode_t write_subscription_data(const DCPS::RepoId& rid,
00368                                             LocalSubscription& pub,
00369                                             const DCPS::RepoId& reader = DCPS::GUID_UNKNOWN);
00370   DDS::ReturnCode_t write_participant_message_data(const DCPS::RepoId& rid,
00371                                                    LocalParticipantMessage& part,
00372                                                    const DCPS::RepoId& reader = DCPS::GUID_UNKNOWN);
00373 
00374   DCPS::SequenceNumber automatic_liveliness_seq_;
00375   DCPS::SequenceNumber manual_liveliness_seq_;
00376 };
00377 
00378 /// A class to wait on acknowledgments from other threads
00379 class WaitForAcks {
00380 public:
00381   WaitForAcks();
00382   void ack();
00383   void wait_for_acks(unsigned int num_acks);
00384   void reset();
00385 private:
00386   ACE_Thread_Mutex lock_;
00387   ACE_Condition_Thread_Mutex cond_;
00388   unsigned int acks_;
00389 };
00390 
00391 }
00392 }
00393 
00394 #endif // OPENDDS_RTPS_SEDP_H

Generated on Fri Feb 12 20:05:26 2016 for OpenDDS by  doxygen 1.4.7