Sedp.h

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #ifndef OPENDDS_RTPS_SEDP_H
00009 #define OPENDDS_RTPS_SEDP_H
00010 
00011 #include "dds/DdsDcpsInfrastructureC.h"
00012 #include "dds/DdsDcpsInfoUtilsC.h"
00013 #include "dds/DdsDcpsCoreTypeSupportImpl.h"
00014 
00015 #include "dds/DCPS/RTPS/RtpsCoreTypeSupportImpl.h"
00016 #include "dds/DCPS/RTPS/BaseMessageTypes.h"
00017 #include "dds/DCPS/RTPS/BaseMessageUtils.h"
00018 
00019 #include "dds/DCPS/RcHandle_T.h"
00020 #include "dds/DCPS/GuidUtils.h"
00021 #include "dds/DCPS/DataReaderCallbacks.h"
00022 #include "dds/DCPS/Definitions.h"
00023 #include "dds/DCPS/BuiltInTopicUtils.h"
00024 #include "dds/DCPS/DataSampleElement.h"
00025 #include "dds/DCPS/DataSampleHeader.h"
00026 #include "dds/DCPS/PoolAllocationBase.h"
00027 #include "dds/DCPS/DiscoveryBase.h"
00028 
00029 #include "dds/DCPS/transport/framework/TransportRegistry.h"
00030 #include "dds/DCPS/transport/framework/TransportSendListener.h"
00031 #include "dds/DCPS/transport/framework/TransportClient.h"
00032 #include "dds/DCPS/transport/framework/TransportInst_rch.h"
00033 
00034 #include "dds/DCPS/PoolAllocator.h"
00035 #include "dds/DdsSecurityCoreTypeSupportImpl.h"
00036 #include "dds/DCPS/RTPS/RtpsSecurityC.h"
00037 
00038 #include "ace/Task_Ex_T.h"
00039 #include "ace/Thread_Mutex.h"
00040 #include "ace/Condition_Thread_Mutex.h"
00041 
00042 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00043 #pragma once
00044 #endif /* ACE_LACKS_PRAGMA_ONCE */
00045 
00046 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00047 
00048 namespace OpenDDS {
00049 namespace RTPS {
00050 
00051 class RtpsDiscovery;
00052 class Spdp;
00053 class WaitForAcks;
00054 
00055 #if defined(OPENDDS_SECURITY)
00056 struct DiscoveredWriterData_SecurityWrapper;
00057 struct DiscoveredReaderData_SecurityWrapper;
00058 #endif
00059 
00060 class Sedp : public DCPS::EndpointManager<Security::SPDPdiscoveredParticipantData> {
00061 public:
00062   Sedp(const DCPS::RepoId& participant_id,
00063        Spdp& owner,
00064        ACE_Thread_Mutex& lock);
00065 
00066   DDS::ReturnCode_t init(const DCPS::RepoId& guid,
00067                          const RtpsDiscovery& disco,
00068                          DDS::DomainId_t domainId);
00069 
00070 #if defined(OPENDDS_SECURITY)
00071   DDS::ReturnCode_t init_security(DDS::Security::IdentityHandle id_handle,
00072                                   DDS::Security::PermissionsHandle perm_handle,
00073                                   DDS::Security::ParticipantCryptoHandle crypto_handle);
00074 #endif
00075 
00076   /// request for acknowledgement from all Sedp threads (Task)
00077   void acknowledge();
00078 
00079   void shutdown();
00080   void unicast_locators(DCPS::LocatorSeq& locators) const;
00081 
00082   // @brief return the ip address we have bound to.
00083   // Valid after init() call
00084   const ACE_INET_Addr& local_address() const;
00085   const ACE_INET_Addr& multicast_group() const;
00086   bool map_ipv4_to_ipv6() const;
00087 
00088   void associate(const Security::SPDPdiscoveredParticipantData& pdata);
00089 
00090 #if defined(OPENDDS_SECURITY)
00091   void associate_preauth(const Security::SPDPdiscoveredParticipantData& pdata);
00092   void associate_volatile(const Security::SPDPdiscoveredParticipantData& pdata);
00093   void associate_secure_writers_to_readers(const Security::SPDPdiscoveredParticipantData& pdata);
00094   void associate_secure_readers_to_writers(const Security::SPDPdiscoveredParticipantData& pdata);
00095   void send_builtin_crypto_tokens(const Security::SPDPdiscoveredParticipantData& pdata);
00096   void send_builtin_crypto_tokens(const DCPS::RepoId& dstParticipant,
00097                                   const DCPS::EntityId_t& dstEntity, const DCPS::RepoId& src);
00098 #endif
00099 
00100   bool disassociate(const Security::SPDPdiscoveredParticipantData& pdata);
00101 
00102 #if defined(OPENDDS_SECURITY)
00103   DDS::ReturnCode_t write_stateless_message(DDS::Security::ParticipantStatelessMessage& msg,
00104                                             const DCPS::RepoId& reader);
00105 
00106   DDS::ReturnCode_t write_volatile_message(DDS::Security::ParticipantVolatileMessageSecure& msg,
00107                                            const DCPS::RepoId& reader);
00108 
00109   DDS::ReturnCode_t write_dcps_participant_secure(const Security::SPDPdiscoveredParticipantData& msg,
00110                                                   const DCPS::RepoId& reader);
00111 #endif
00112 
00113   DDS::ReturnCode_t write_dcps_participant_dispose(const DCPS::RepoId& part);
00114 
00115   // Topic
00116   bool update_topic_qos(const DCPS::RepoId& topicId, const DDS::TopicQos& qos,
00117                         OPENDDS_STRING& name);
00118 
00119   // Publication
00120   bool update_publication_qos(const DCPS::RepoId& publicationId,
00121                               const DDS::DataWriterQos& qos,
00122                               const DDS::PublisherQos& publisherQos);
00123 
00124   // Subscription
00125   bool update_subscription_qos(const DCPS::RepoId& subscriptionId,
00126                                const DDS::DataReaderQos& qos,
00127                                const DDS::SubscriberQos& subscriberQos);
00128   bool update_subscription_params(const DCPS::RepoId& subId,
00129                                   const DDS::StringSeq& params);
00130 
00131   // Managing reader/writer associations
00132   void association_complete(const DCPS::RepoId& localId,
00133                             const DCPS::RepoId& remoteId);
00134 
00135   void signal_liveliness(DDS::LivelinessQosPolicyKind kind);
00136   void signal_liveliness_unsecure(DDS::LivelinessQosPolicyKind kind);
00137 
00138 #if defined(OPENDDS_SECURITY)
00139   void signal_liveliness_secure(DDS::LivelinessQosPolicyKind kind);
00140 #endif
00141 
00142   static const bool host_is_bigendian_;
00143 private:
00144   Spdp& spdp_;
00145   DDS::Security::ParticipantSecurityAttributes participant_sec_attr_;
00146 
00147   struct Msg : public DCPS::PoolAllocationBase {
00148     enum MsgType {
00149       MSG_PARTICIPANT,
00150       MSG_WRITER,
00151       MSG_READER,
00152       MSG_PARTICIPANT_DATA,
00153       MSG_REMOVE_FROM_PUB_BIT,
00154       MSG_REMOVE_FROM_SUB_BIT,
00155       MSG_FINI_BIT,
00156       MSG_STOP,
00157 
00158 #if defined(OPENDDS_SECURITY)
00159       MSG_PARTICIPANT_STATELESS_DATA,
00160       MSG_PARTICIPANT_VOLATILE_SECURE,
00161       MSG_PARTICIPANT_DATA_SECURE,
00162       MSG_WRITER_SECURE,
00163       MSG_READER_SECURE,
00164       MSG_DCPS_PARTICIPANT_SECURE
00165 #endif
00166 
00167     } type_;
00168 
00169     DCPS::MessageId id_;
00170 
00171     union {
00172       const Security::SPDPdiscoveredParticipantData* dpdata_;
00173 
00174       const DCPS::DiscoveredWriterData* wdata_;
00175 
00176 #if defined(OPENDDS_SECURITY)
00177       const DiscoveredWriterData_SecurityWrapper* wdata_secure_;
00178 #endif
00179 
00180       const DCPS::DiscoveredReaderData* rdata_;
00181 
00182 #if defined(OPENDDS_SECURITY)
00183       const DiscoveredReaderData_SecurityWrapper* rdata_secure_;
00184 #endif
00185 
00186       const ParticipantMessageData* pmdata_;
00187       DDS::InstanceHandle_t ih_;
00188       const DDS::Security::ParticipantGenericMessage* pgmdata_;
00189     };
00190 
00191     Msg(MsgType mt, DCPS::MessageId id, const Security::SPDPdiscoveredParticipantData* dpdata)
00192       : type_(mt), id_(id), dpdata_(dpdata) {}
00193 
00194     Msg(MsgType mt, DCPS::MessageId id, const DCPS::DiscoveredWriterData* wdata)
00195       : type_(mt), id_(id), wdata_(wdata) {}
00196 
00197 #if defined(OPENDDS_SECURITY)
00198     Msg(MsgType mt, DCPS::MessageId id, const DiscoveredWriterData_SecurityWrapper* wdata)
00199       : type_(mt), id_(id), wdata_secure_(wdata) {}
00200 #endif
00201 
00202     Msg(MsgType mt, DCPS::MessageId id, const DCPS::DiscoveredReaderData* rdata)
00203       : type_(mt), id_(id), rdata_(rdata) {}
00204 
00205 #if defined(OPENDDS_SECURITY)
00206     Msg(MsgType mt, DCPS::MessageId id, const DiscoveredReaderData_SecurityWrapper* rdata)
00207       : type_(mt), id_(id), rdata_secure_(rdata) {}
00208 #endif
00209 
00210     Msg(MsgType mt, DCPS::MessageId id, const ParticipantMessageData* pmdata)
00211       : type_(mt), id_(id), pmdata_(pmdata) {}
00212 
00213     Msg(MsgType mt, DCPS::MessageId id, DDS::InstanceHandle_t ih)
00214       : type_(mt), id_(id), ih_(ih) {}
00215 
00216     Msg(MsgType mt, DCPS::MessageId id, const DDS::Security::ParticipantGenericMessage* data)
00217       : type_(mt), id_(id), pgmdata_(data) {}
00218 
00219   };
00220 
00221 
00222   class Endpoint : public DCPS::TransportClient {
00223   public:
00224     Endpoint(const DCPS::RepoId& repo_id, Sedp& sedp)
00225       : repo_id_(repo_id)
00226       , sedp_(sedp)
00227       , participant_crypto_handle_(DDS::HANDLE_NIL)
00228       , endpoint_crypto_handle_(DDS::HANDLE_NIL)
00229     {}
00230 
00231     virtual ~Endpoint();
00232 
00233     // Implementing TransportClient
00234     bool check_transport_qos(const DCPS::TransportInst&)
00235       { return true; }
00236     const DCPS::RepoId& get_repo_id() const
00237       { return repo_id_; }
00238     DDS::DomainId_t domain_id() const
00239       { return 0; } // not used for SEDP
00240     CORBA::Long get_priority_value(const DCPS::AssociationData&) const
00241       { return 0; }
00242 
00243     using DCPS::TransportClient::enable_transport_using_config;
00244     using DCPS::TransportClient::disassociate;
00245 
00246     void set_crypto_handles(DDS::Security::ParticipantCryptoHandle p,
00247                             DDS::Security::NativeCryptoHandle e = DDS::HANDLE_NIL)
00248     {
00249       participant_crypto_handle_ = p;
00250       endpoint_crypto_handle_ = e;
00251     }
00252 
00253     DDS::Security::ParticipantCryptoHandle get_crypto_handle() const
00254     {
00255       return participant_crypto_handle_;
00256     }
00257 
00258     DDS::Security::NativeCryptoHandle get_endpoint_crypto_handle() const
00259     {
00260       return endpoint_crypto_handle_;
00261     }
00262 
00263   protected:
00264     DCPS::RepoId repo_id_;
00265     Sedp& sedp_;
00266     DDS::Security::ParticipantCryptoHandle participant_crypto_handle_;
00267     DDS::Security::NativeCryptoHandle endpoint_crypto_handle_;
00268   };
00269 
00270   class Writer : public DCPS::TransportSendListener, public Endpoint {
00271   public:
00272     Writer(const DCPS::RepoId& pub_id, Sedp& sedp);
00273     virtual ~Writer();
00274 
00275     bool assoc(const DCPS::AssociationData& subscription);
00276 
00277     // Implementing TransportSendListener
00278     void data_delivered(const DCPS::DataSampleElement*);
00279 
00280     void data_dropped(const DCPS::DataSampleElement*, bool by_transport);
00281 
00282     void control_delivered(const DCPS::Message_Block_Ptr& sample);
00283 
00284     void control_dropped(const DCPS::Message_Block_Ptr& sample,
00285                          bool dropped_by_transport);
00286 
00287     void notify_publication_disconnected(const DCPS::ReaderIdSeq&) {}
00288     void notify_publication_reconnected(const DCPS::ReaderIdSeq&) {}
00289     void notify_publication_lost(const DCPS::ReaderIdSeq&) {}
00290     void remove_associations(const DCPS::ReaderIdSeq&, bool) {}
00291     void retrieve_inline_qos_data(InlineQosData&) const {}
00292 
00293     void send_sample(const ACE_Message_Block& data,
00294                      size_t size,
00295                      const DCPS::RepoId& reader,
00296                      DCPS::SequenceNumber& sequence,
00297                      bool historic = false);
00298 
00299     DDS::ReturnCode_t write_parameter_list(const ParameterList& plist,
00300                                            const DCPS::RepoId& reader,
00301                                            DCPS::SequenceNumber& sequence);
00302 
00303     DDS::ReturnCode_t write_participant_message(const ParticipantMessageData& pmd,
00304                                                 const DCPS::RepoId& reader,
00305                                                 DCPS::SequenceNumber& sequence);
00306 
00307     DDS::ReturnCode_t write_stateless_message(const DDS::Security::ParticipantStatelessMessage& msg,
00308                                               const DCPS::RepoId& reader,
00309                                               DCPS::SequenceNumber& sequence);
00310 
00311     DDS::ReturnCode_t write_volatile_message_secure(const DDS::Security::ParticipantVolatileMessageSecure& msg,
00312                                                     const DCPS::RepoId& reader,
00313                                                     DCPS::SequenceNumber& sequence);
00314 
00315     DDS::ReturnCode_t write_dcps_participant_secure(const Security::SPDPdiscoveredParticipantData& msg,
00316                                                     const DCPS::RepoId& reader,
00317                                                     DCPS::SequenceNumber& sequence);
00318 
00319     DDS::ReturnCode_t write_unregister_dispose(const DCPS::RepoId& rid, CORBA::UShort pid = PID_ENDPOINT_GUID);
00320 
00321     void end_historic_samples(const DCPS::RepoId& reader);
00322 
00323   private:
00324     Header header_;
00325     DCPS::SequenceNumber seq_;
00326 
00327     void write_control_msg(DCPS::Message_Block_Ptr payload,
00328                            size_t size,
00329                            DCPS::MessageId id,
00330                            DCPS::SequenceNumber seq = DCPS::SequenceNumber());
00331 
00332     void set_header_fields(DCPS::DataSampleHeader& dsh,
00333                            size_t size,
00334                            const DCPS::RepoId& reader,
00335                            DCPS::SequenceNumber& sequence,
00336                            bool historic_sample = false,
00337                            DCPS::MessageId id = DCPS::SAMPLE_DATA);
00338 
00339   };
00340 
00341   Writer publications_writer_;
00342 
00343 #if defined(OPENDDS_SECURITY)
00344   Writer publications_secure_writer_;
00345 #endif
00346 
00347   Writer subscriptions_writer_;
00348 
00349 #if defined(OPENDDS_SECURITY)
00350   Writer subscriptions_secure_writer_;
00351 #endif
00352 
00353   Writer participant_message_writer_;
00354 
00355 #if defined(OPENDDS_SECURITY)
00356   Writer participant_message_secure_writer_;
00357   Writer participant_stateless_message_writer_;
00358   Writer participant_volatile_message_secure_writer_;
00359   Writer dcps_participant_secure_writer_;
00360 #endif
00361 
00362   class Reader
00363     : public DCPS::TransportReceiveListener
00364     , public Endpoint
00365   {
00366   public:
00367     Reader(const DCPS::RepoId& sub_id, Sedp& sedp)
00368       : Endpoint(sub_id, sedp)
00369       , shutting_down_(false)
00370     {}
00371 
00372     virtual ~Reader();
00373 
00374     bool assoc(const DCPS::AssociationData& publication);
00375 
00376     // Implementing TransportReceiveListener
00377 
00378     void data_received(const DCPS::ReceivedDataSample& sample);
00379 
00380     void notify_subscription_disconnected(const DCPS::WriterIdSeq&) {}
00381     void notify_subscription_reconnected(const DCPS::WriterIdSeq&) {}
00382     void notify_subscription_lost(const DCPS::WriterIdSeq&) {}
00383     void remove_associations(const DCPS::WriterIdSeq&, bool) {}
00384 
00385     ACE_Atomic_Op<ACE_SYNCH_MUTEX, bool> shutting_down_;
00386   };
00387 
00388   typedef DCPS::RcHandle<Reader> Reader_rch;
00389 
00390   Reader_rch publications_reader_;
00391 
00392 #if defined(OPENDDS_SECURITY)
00393   Reader_rch publications_secure_reader_;
00394 #endif
00395 
00396   Reader_rch subscriptions_reader_;
00397 
00398 #if defined(OPENDDS_SECURITY)
00399   Reader_rch subscriptions_secure_reader_;
00400 #endif
00401 
00402   Reader_rch participant_message_reader_;
00403 
00404 #if defined(OPENDDS_SECURITY)
00405   Reader_rch participant_message_secure_reader_;
00406   Reader_rch participant_stateless_message_reader_;
00407   Reader_rch participant_volatile_message_secure_reader_;
00408   Reader_rch dcps_participant_secure_reader_;
00409 #endif
00410 
00411   struct Task : ACE_Task_Ex<ACE_MT_SYNCH, Msg> {
00412     explicit Task(Sedp* sedp)
00413       : spdp_(&sedp->spdp_)
00414       , sedp_(sedp)
00415       , shutting_down_(false)
00416     {
00417       activate();
00418     }
00419     ~Task();
00420 
00421     void enqueue(DCPS::MessageId id, DCPS::unique_ptr<Security::SPDPdiscoveredParticipantData> pdata);
00422 
00423     void enqueue(DCPS::MessageId id, DCPS::unique_ptr<DCPS::DiscoveredWriterData> wdata);
00424     void enqueue(DCPS::MessageId id, DCPS::unique_ptr<DCPS::DiscoveredReaderData> rdata);
00425 
00426 #if defined(OPENDDS_SECURITY)
00427     void enqueue(DCPS::MessageId id, DCPS::unique_ptr<DiscoveredWriterData_SecurityWrapper> wrapper);
00428     void enqueue(DCPS::MessageId id, DCPS::unique_ptr<DiscoveredReaderData_SecurityWrapper> wrapper);
00429 #endif
00430 
00431     void enqueue(DCPS::MessageId id, DCPS::unique_ptr<ParticipantMessageData> data);
00432     void enqueue(Msg::MsgType which_bit, const DDS::InstanceHandle_t bit_ih);
00433 
00434 #if defined(OPENDDS_SECURITY)
00435     void enqueue_participant_message_secure(DCPS::MessageId id, DCPS::unique_ptr<ParticipantMessageData> data);
00436     void enqueue_stateless_message(DCPS::MessageId id, DCPS::unique_ptr<DDS::Security::ParticipantStatelessMessage> data);
00437     void enqueue_volatile_message_secure(DCPS::MessageId id, DCPS::unique_ptr<DDS::Security::ParticipantVolatileMessageSecure> data);
00438 #endif
00439 
00440     void acknowledge();
00441     void shutdown();
00442 
00443   private:
00444     int svc();
00445 
00446     void svc_i(const Security::SPDPdiscoveredParticipantData* pdata);
00447     void svc_secure_i(DCPS::MessageId id, const Security::SPDPdiscoveredParticipantData* pdata);
00448 
00449     void svc_i(DCPS::MessageId id, const DCPS::DiscoveredWriterData* wdata);
00450     void svc_i(DCPS::MessageId id, const DCPS::DiscoveredReaderData* rdata);
00451 
00452 #if defined(OPENDDS_SECURITY)
00453     void svc_i(DCPS::MessageId id, const DiscoveredWriterData_SecurityWrapper* wrapper);
00454     void svc_i(DCPS::MessageId id, const DiscoveredReaderData_SecurityWrapper* wrapper);
00455 #endif
00456 
00457     void svc_i(DCPS::MessageId id, const ParticipantMessageData* data);
00458     void svc_i(Msg::MsgType which_bit, const DDS::InstanceHandle_t bit_ih);
00459 
00460 #if defined(OPENDDS_SECURITY)
00461     void svc_participant_message_data_secure(DCPS::MessageId id, const ParticipantMessageData* data);
00462     void svc_stateless_message(DCPS::MessageId id, const DDS::Security::ParticipantStatelessMessage* data);
00463     void svc_volatile_message_secure(DCPS::MessageId id, const DDS::Security::ParticipantVolatileMessageSecure* data);
00464 #endif
00465 
00466     Spdp* spdp_;
00467     Sedp* sedp_;
00468     bool shutting_down_;
00469   } task_;
00470 
00471   // Transport
00472   DCPS::TransportInst_rch transport_inst_;
00473 
00474 #ifndef DDS_HAS_MINIMUM_BIT
00475   DCPS::TopicBuiltinTopicDataDataReaderImpl* topic_bit();
00476   DCPS::PublicationBuiltinTopicDataDataReaderImpl* pub_bit();
00477   DCPS::SubscriptionBuiltinTopicDataDataReaderImpl* sub_bit();
00478 #endif /* DDS_HAS_MINIMUM_BIT */
00479 
00480   void populate_discovered_writer_msg(
00481       DCPS::DiscoveredWriterData& dwd,
00482       const DCPS::RepoId& publication_id,
00483       const LocalPublication& pub);
00484 
00485   void populate_discovered_reader_msg(
00486       DCPS::DiscoveredReaderData& drd,
00487       const DCPS::RepoId& subscription_id,
00488       const LocalSubscription& sub);
00489 
00490   struct LocalParticipantMessage : LocalEndpoint {
00491   };
00492   typedef OPENDDS_MAP_CMP(DCPS::RepoId, LocalParticipantMessage,
00493     DCPS::GUID_tKeyLessThan) LocalParticipantMessageMap;
00494   typedef LocalParticipantMessageMap::iterator LocalParticipantMessageIter;
00495   typedef LocalParticipantMessageMap::const_iterator LocalParticipantMessageCIter;
00496   LocalParticipantMessageMap local_participant_messages_;
00497 
00498   void process_discovered_writer_data(DCPS::MessageId message_id,
00499                                       const DCPS::DiscoveredWriterData& wdata,
00500                                       const DCPS::RepoId& guid,
00501                                       const DDS::Security::EndpointSecurityInfo* security_info = NULL);
00502 
00503   void data_received(DCPS::MessageId message_id,
00504                      const DCPS::DiscoveredWriterData& wdata);
00505 
00506 #if defined(OPENDDS_SECURITY)
00507   void data_received(DCPS::MessageId message_id,
00508                      const DiscoveredWriterData_SecurityWrapper& wrapper);
00509 #endif
00510 
00511   void process_discovered_reader_data(DCPS::MessageId message_id,
00512                                       const DCPS::DiscoveredReaderData& rdata,
00513                                       const DCPS::RepoId& guid,
00514                                       const DDS::Security::EndpointSecurityInfo* security_info = NULL);
00515 
00516   void data_received(DCPS::MessageId message_id,
00517                      const DCPS::DiscoveredReaderData& rdata);
00518 
00519 #if defined(OPENDDS_SECURITY)
00520   void data_received(DCPS::MessageId message_id,
00521                      const DiscoveredReaderData_SecurityWrapper& wrapper);
00522 #endif
00523 
00524   void data_received(DCPS::MessageId message_id,
00525                      const ParticipantMessageData& data);
00526 
00527 #if defined(OPENDDS_SECURITY)
00528   void received_participant_message_data_secure(DCPS::MessageId message_id,
00529             const ParticipantMessageData& data);
00530 
00531   bool should_drop_stateless_message(const DDS::Security::ParticipantGenericMessage& msg);
00532   bool should_drop_volatile_message(const DDS::Security::ParticipantGenericMessage& msg);
00533   bool should_drop_message(const char* unsecure_topic_name);
00534 
00535   void received_stateless_message(DCPS::MessageId message_id,
00536                      const DDS::Security::ParticipantStatelessMessage& data);
00537 
00538   void received_volatile_message_secure(DCPS::MessageId message_id,
00539                      const DDS::Security::ParticipantVolatileMessageSecure& data);
00540 #endif
00541 
00542   typedef std::pair<DCPS::MessageId, DCPS::DiscoveredWriterData> MsgIdWtrDataPair;
00543   typedef OPENDDS_MAP_CMP(DCPS::RepoId, MsgIdWtrDataPair,
00544                    DCPS::GUID_tKeyLessThan) DeferredPublicationMap;
00545   DeferredPublicationMap deferred_publications_;  // Publications that Spdp has not discovered.
00546 
00547   typedef std::pair<DCPS::MessageId, DCPS::DiscoveredReaderData> MsgIdRdrDataPair;
00548   typedef OPENDDS_MAP_CMP(DCPS::RepoId, MsgIdRdrDataPair,
00549                    DCPS::GUID_tKeyLessThan) DeferredSubscriptionMap;
00550   DeferredSubscriptionMap deferred_subscriptions_; // Subscriptions that Sedp has not discovered.
00551 
00552   void assign_bit_key(DiscoveredPublication& pub);
00553   void assign_bit_key(DiscoveredSubscription& sub);
00554 
00555   template<typename Map>
00556   void remove_entities_belonging_to(Map& m, DCPS::RepoId participant);
00557 
00558   void remove_from_bit_i(const DiscoveredPublication& pub);
00559   void remove_from_bit_i(const DiscoveredSubscription& sub);
00560 
00561   virtual DDS::ReturnCode_t remove_publication_i(const DCPS::RepoId& publicationId);
00562   virtual DDS::ReturnCode_t remove_subscription_i(const DCPS::RepoId& subscriptionId);
00563 
00564   // Topic:
00565 
00566   DCPS::RepoIdSet defer_match_endpoints_, associated_participants_;
00567 
00568   void inconsistent_topic(const DCPS::RepoIdSet& endpoints) const;
00569 
00570   virtual bool shutting_down() const;
00571 
00572   virtual void populate_transport_locator_sequence(DCPS::TransportLocatorSeq*& tls,
00573                                                    DiscoveredSubscriptionIter& iter,
00574                                                    const DCPS::RepoId& reader);
00575 
00576   virtual void populate_transport_locator_sequence(DCPS::TransportLocatorSeq*& tls,
00577                                                    DiscoveredPublicationIter& iter,
00578                                                    const DCPS::RepoId& writer);
00579 
00580 #if defined(OPENDDS_SECURITY)
00581   DCPS::TransportLocatorSeq
00582   add_security_info(const DCPS::TransportLocatorSeq& locators,
00583                     const DCPS::RepoId& writer, const DCPS::RepoId& reader);
00584 #endif
00585 
00586   virtual bool defer_writer(const DCPS::RepoId& writer,
00587                             const DCPS::RepoId& writer_participant);
00588 
00589   virtual bool defer_reader(const DCPS::RepoId& reader,
00590                             const DCPS::RepoId& reader_participant);
00591 
00592   static DCPS::RepoId make_id(const DCPS::RepoId& participant_id,
00593                               const EntityId_t& entity);
00594 
00595   static void set_inline_qos(DCPS::TransportLocatorSeq& locators);
00596 
00597   void write_durable_publication_data(const DCPS::RepoId& reader);
00598   void write_durable_subscription_data(const DCPS::RepoId& reader);
00599 
00600 #if defined(OPENDDS_SECURITY)
00601   void write_durable_publication_data_secure(const DCPS::RepoId& reader);
00602   void write_durable_subscription_data_secure(const DCPS::RepoId& reader);
00603 #endif
00604 
00605   void write_durable_participant_message_data(const DCPS::RepoId& reader);
00606 
00607   DDS::ReturnCode_t write_publication_data(const DCPS::RepoId& rid,
00608                                            LocalPublication& pub,
00609                                            const DCPS::RepoId& reader = DCPS::GUID_UNKNOWN);
00610 
00611 #if defined(OPENDDS_SECURITY)
00612   DDS::ReturnCode_t write_publication_data_secure(const DCPS::RepoId& rid,
00613                                                   LocalPublication& pub,
00614                                                   const DCPS::RepoId& reader = DCPS::GUID_UNKNOWN);
00615 #endif
00616 
00617   DDS::ReturnCode_t write_publication_data_unsecure(const DCPS::RepoId& rid,
00618                                                     LocalPublication& pub,
00619                                                     const DCPS::RepoId& reader = DCPS::GUID_UNKNOWN);
00620 
00621   DDS::ReturnCode_t write_subscription_data(const DCPS::RepoId& rid,
00622                                             LocalSubscription& pub,
00623                                             const DCPS::RepoId& reader = DCPS::GUID_UNKNOWN);
00624 
00625 #if defined(OPENDDS_SECURITY)
00626   DDS::ReturnCode_t write_subscription_data_secure(const DCPS::RepoId& rid,
00627                                                    LocalSubscription& pub,
00628                                                    const DCPS::RepoId& reader = DCPS::GUID_UNKNOWN);
00629 #endif
00630 
00631   DDS::ReturnCode_t write_subscription_data_unsecure(const DCPS::RepoId& rid,
00632                                                      LocalSubscription& pub,
00633                                                      const DCPS::RepoId& reader = DCPS::GUID_UNKNOWN);
00634 
00635   DDS::ReturnCode_t write_participant_message_data(const DCPS::RepoId& rid,
00636                                                    LocalParticipantMessage& part,
00637                                                    const DCPS::RepoId& reader = DCPS::GUID_UNKNOWN);
00638 
00639   bool is_opendds(const GUID_t& endpoint) const;
00640 
00641 #if defined(OPENDDS_SECURITY)
00642   DCPS::SequenceNumber secure_automatic_liveliness_seq_;
00643   DCPS::SequenceNumber secure_manual_liveliness_seq_;
00644 #endif
00645 
00646   DCPS::SequenceNumber automatic_liveliness_seq_;
00647   DCPS::SequenceNumber manual_liveliness_seq_;
00648 
00649 protected:
00650 
00651 #if defined(OPENDDS_SECURITY)
00652   DDS::Security::DatawriterCryptoHandle generate_remote_matched_writer_crypto_handle(const DCPS::RepoId& writer_part, const DDS::Security::DatareaderCryptoHandle& drch);
00653   DDS::Security::DatareaderCryptoHandle generate_remote_matched_reader_crypto_handle(const DCPS::RepoId& reader_part, const DDS::Security::DatawriterCryptoHandle& dwch, bool relay_only);
00654   void create_and_send_datareader_crypto_tokens(const DDS::Security::DatareaderCryptoHandle& drch, const DCPS::RepoId& local_reader, const DDS::Security::DatawriterCryptoHandle& dwch, const DCPS::RepoId& remote_writer);
00655   void create_and_send_datawriter_crypto_tokens(const DDS::Security::DatawriterCryptoHandle& dwch, const DCPS::RepoId& local_writer, const DDS::Security::DatareaderCryptoHandle& drch, const DCPS::RepoId& remote_reader);
00656   void handle_datareader_crypto_tokens(const DDS::Security::ParticipantVolatileMessageSecure& msg);
00657   void handle_datawriter_crypto_tokens(const DDS::Security::ParticipantVolatileMessageSecure& msg);
00658 
00659   DDS::DomainId_t get_domain_id() const;
00660 #endif
00661 
00662 };
00663 
00664 /// A class to wait on acknowledgments from other threads
00665 class WaitForAcks {
00666 public:
00667   WaitForAcks();
00668   void ack();
00669   void wait_for_acks(unsigned int num_acks);
00670   void reset();
00671 private:
00672   ACE_Thread_Mutex lock_;
00673   ACE_Condition_Thread_Mutex cond_;
00674   unsigned int acks_;
00675 };
00676 
00677 }
00678 }
00679 
00680 OPENDDS_END_VERSIONED_NAMESPACE_DECL
00681 
00682 #endif // OPENDDS_RTPS_SEDP_H
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1