00001
00002
00003
00004
00005
00006
00007
00008 #ifndef OPENDDS_RTPS_SEDP_H
00009 #define OPENDDS_RTPS_SEDP_H
00010
00011 #include "dds/DdsDcpsInfrastructureC.h"
00012 #include "dds/DdsDcpsInfoUtilsC.h"
00013 #include "dds/DdsDcpsCoreTypeSupportImpl.h"
00014
00015 #include "dds/DCPS/RTPS/RtpsCoreTypeSupportImpl.h"
00016 #include "dds/DCPS/RTPS/BaseMessageTypes.h"
00017 #include "dds/DCPS/RTPS/BaseMessageUtils.h"
00018
00019 #include "dds/DCPS/RcHandle_T.h"
00020 #include "dds/DCPS/GuidUtils.h"
00021 #include "dds/DCPS/DataReaderCallbacks.h"
00022 #include "dds/DCPS/Definitions.h"
00023 #include "dds/DCPS/BuiltInTopicUtils.h"
00024 #include "dds/DCPS/DataSampleElement.h"
00025 #include "dds/DCPS/DataSampleHeader.h"
00026 #include "dds/DCPS/PoolAllocationBase.h"
00027 #include "dds/DCPS/DiscoveryBase.h"
00028
00029 #include "dds/DCPS/transport/framework/TransportRegistry.h"
00030 #include "dds/DCPS/transport/framework/TransportSendListener.h"
00031 #include "dds/DCPS/transport/framework/TransportClient.h"
00032 #include "dds/DCPS/transport/framework/TransportInst_rch.h"
00033
00034 #include "dds/DCPS/PoolAllocator.h"
00035 #include "dds/DdsSecurityCoreTypeSupportImpl.h"
00036 #include "dds/DCPS/RTPS/RtpsSecurityC.h"
00037
00038 #include "ace/Task_Ex_T.h"
00039 #include "ace/Thread_Mutex.h"
00040 #include "ace/Condition_Thread_Mutex.h"
00041
00042 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00043 #pragma once
00044 #endif
00045
00046 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00047
00048 namespace OpenDDS {
00049 namespace RTPS {
00050
00051 class RtpsDiscovery;
00052 class Spdp;
00053 class WaitForAcks;
00054
00055 #if defined(OPENDDS_SECURITY)
00056 struct DiscoveredWriterData_SecurityWrapper;
00057 struct DiscoveredReaderData_SecurityWrapper;
00058 #endif
00059
00060 class Sedp : public DCPS::EndpointManager<Security::SPDPdiscoveredParticipantData> {
00061 public:
00062 Sedp(const DCPS::RepoId& participant_id,
00063 Spdp& owner,
00064 ACE_Thread_Mutex& lock);
00065
00066 DDS::ReturnCode_t init(const DCPS::RepoId& guid,
00067 const RtpsDiscovery& disco,
00068 DDS::DomainId_t domainId);
00069
00070 #if defined(OPENDDS_SECURITY)
00071 DDS::ReturnCode_t init_security(DDS::Security::IdentityHandle id_handle,
00072 DDS::Security::PermissionsHandle perm_handle,
00073 DDS::Security::ParticipantCryptoHandle crypto_handle);
00074 #endif
00075
00076
00077 void acknowledge();
00078
00079 void shutdown();
00080 void unicast_locators(DCPS::LocatorSeq& locators) const;
00081
00082
00083
00084 const ACE_INET_Addr& local_address() const;
00085 const ACE_INET_Addr& multicast_group() const;
00086 bool map_ipv4_to_ipv6() const;
00087
00088 void associate(const Security::SPDPdiscoveredParticipantData& pdata);
00089
00090 #if defined(OPENDDS_SECURITY)
00091 void associate_preauth(const Security::SPDPdiscoveredParticipantData& pdata);
00092 void associate_volatile(const Security::SPDPdiscoveredParticipantData& pdata);
00093 void associate_secure_writers_to_readers(const Security::SPDPdiscoveredParticipantData& pdata);
00094 void associate_secure_readers_to_writers(const Security::SPDPdiscoveredParticipantData& pdata);
00095 void send_builtin_crypto_tokens(const Security::SPDPdiscoveredParticipantData& pdata);
00096 void send_builtin_crypto_tokens(const DCPS::RepoId& dstParticipant,
00097 const DCPS::EntityId_t& dstEntity, const DCPS::RepoId& src);
00098 #endif
00099
00100 bool disassociate(const Security::SPDPdiscoveredParticipantData& pdata);
00101
00102 #if defined(OPENDDS_SECURITY)
00103 DDS::ReturnCode_t write_stateless_message(DDS::Security::ParticipantStatelessMessage& msg,
00104 const DCPS::RepoId& reader);
00105
00106 DDS::ReturnCode_t write_volatile_message(DDS::Security::ParticipantVolatileMessageSecure& msg,
00107 const DCPS::RepoId& reader);
00108
00109 DDS::ReturnCode_t write_dcps_participant_secure(const Security::SPDPdiscoveredParticipantData& msg,
00110 const DCPS::RepoId& reader);
00111 #endif
00112
00113 DDS::ReturnCode_t write_dcps_participant_dispose(const DCPS::RepoId& part);
00114
00115
00116 bool update_topic_qos(const DCPS::RepoId& topicId, const DDS::TopicQos& qos,
00117 OPENDDS_STRING& name);
00118
00119
00120 bool update_publication_qos(const DCPS::RepoId& publicationId,
00121 const DDS::DataWriterQos& qos,
00122 const DDS::PublisherQos& publisherQos);
00123
00124
00125 bool update_subscription_qos(const DCPS::RepoId& subscriptionId,
00126 const DDS::DataReaderQos& qos,
00127 const DDS::SubscriberQos& subscriberQos);
00128 bool update_subscription_params(const DCPS::RepoId& subId,
00129 const DDS::StringSeq& params);
00130
00131
00132 void association_complete(const DCPS::RepoId& localId,
00133 const DCPS::RepoId& remoteId);
00134
00135 void signal_liveliness(DDS::LivelinessQosPolicyKind kind);
00136 void signal_liveliness_unsecure(DDS::LivelinessQosPolicyKind kind);
00137
00138 #if defined(OPENDDS_SECURITY)
00139 void signal_liveliness_secure(DDS::LivelinessQosPolicyKind kind);
00140 #endif
00141
00142 static const bool host_is_bigendian_;
00143 private:
00144 Spdp& spdp_;
00145 DDS::Security::ParticipantSecurityAttributes participant_sec_attr_;
00146
00147 struct Msg : public DCPS::PoolAllocationBase {
00148 enum MsgType {
00149 MSG_PARTICIPANT,
00150 MSG_WRITER,
00151 MSG_READER,
00152 MSG_PARTICIPANT_DATA,
00153 MSG_REMOVE_FROM_PUB_BIT,
00154 MSG_REMOVE_FROM_SUB_BIT,
00155 MSG_FINI_BIT,
00156 MSG_STOP,
00157
00158 #if defined(OPENDDS_SECURITY)
00159 MSG_PARTICIPANT_STATELESS_DATA,
00160 MSG_PARTICIPANT_VOLATILE_SECURE,
00161 MSG_PARTICIPANT_DATA_SECURE,
00162 MSG_WRITER_SECURE,
00163 MSG_READER_SECURE,
00164 MSG_DCPS_PARTICIPANT_SECURE
00165 #endif
00166
00167 } type_;
00168
00169 DCPS::MessageId id_;
00170
00171 union {
00172 const Security::SPDPdiscoveredParticipantData* dpdata_;
00173
00174 const DCPS::DiscoveredWriterData* wdata_;
00175
00176 #if defined(OPENDDS_SECURITY)
00177 const DiscoveredWriterData_SecurityWrapper* wdata_secure_;
00178 #endif
00179
00180 const DCPS::DiscoveredReaderData* rdata_;
00181
00182 #if defined(OPENDDS_SECURITY)
00183 const DiscoveredReaderData_SecurityWrapper* rdata_secure_;
00184 #endif
00185
00186 const ParticipantMessageData* pmdata_;
00187 DDS::InstanceHandle_t ih_;
00188 const DDS::Security::ParticipantGenericMessage* pgmdata_;
00189 };
00190
00191 Msg(MsgType mt, DCPS::MessageId id, const Security::SPDPdiscoveredParticipantData* dpdata)
00192 : type_(mt), id_(id), dpdata_(dpdata) {}
00193
00194 Msg(MsgType mt, DCPS::MessageId id, const DCPS::DiscoveredWriterData* wdata)
00195 : type_(mt), id_(id), wdata_(wdata) {}
00196
00197 #if defined(OPENDDS_SECURITY)
00198 Msg(MsgType mt, DCPS::MessageId id, const DiscoveredWriterData_SecurityWrapper* wdata)
00199 : type_(mt), id_(id), wdata_secure_(wdata) {}
00200 #endif
00201
00202 Msg(MsgType mt, DCPS::MessageId id, const DCPS::DiscoveredReaderData* rdata)
00203 : type_(mt), id_(id), rdata_(rdata) {}
00204
00205 #if defined(OPENDDS_SECURITY)
00206 Msg(MsgType mt, DCPS::MessageId id, const DiscoveredReaderData_SecurityWrapper* rdata)
00207 : type_(mt), id_(id), rdata_secure_(rdata) {}
00208 #endif
00209
00210 Msg(MsgType mt, DCPS::MessageId id, const ParticipantMessageData* pmdata)
00211 : type_(mt), id_(id), pmdata_(pmdata) {}
00212
00213 Msg(MsgType mt, DCPS::MessageId id, DDS::InstanceHandle_t ih)
00214 : type_(mt), id_(id), ih_(ih) {}
00215
00216 Msg(MsgType mt, DCPS::MessageId id, const DDS::Security::ParticipantGenericMessage* data)
00217 : type_(mt), id_(id), pgmdata_(data) {}
00218
00219 };
00220
00221
00222 class Endpoint : public DCPS::TransportClient {
00223 public:
00224 Endpoint(const DCPS::RepoId& repo_id, Sedp& sedp)
00225 : repo_id_(repo_id)
00226 , sedp_(sedp)
00227 , participant_crypto_handle_(DDS::HANDLE_NIL)
00228 , endpoint_crypto_handle_(DDS::HANDLE_NIL)
00229 {}
00230
00231 virtual ~Endpoint();
00232
00233
00234 bool check_transport_qos(const DCPS::TransportInst&)
00235 { return true; }
00236 const DCPS::RepoId& get_repo_id() const
00237 { return repo_id_; }
00238 DDS::DomainId_t domain_id() const
00239 { return 0; }
00240 CORBA::Long get_priority_value(const DCPS::AssociationData&) const
00241 { return 0; }
00242
00243 using DCPS::TransportClient::enable_transport_using_config;
00244 using DCPS::TransportClient::disassociate;
00245
00246 void set_crypto_handles(DDS::Security::ParticipantCryptoHandle p,
00247 DDS::Security::NativeCryptoHandle e = DDS::HANDLE_NIL)
00248 {
00249 participant_crypto_handle_ = p;
00250 endpoint_crypto_handle_ = e;
00251 }
00252
00253 DDS::Security::ParticipantCryptoHandle get_crypto_handle() const
00254 {
00255 return participant_crypto_handle_;
00256 }
00257
00258 DDS::Security::NativeCryptoHandle get_endpoint_crypto_handle() const
00259 {
00260 return endpoint_crypto_handle_;
00261 }
00262
00263 protected:
00264 DCPS::RepoId repo_id_;
00265 Sedp& sedp_;
00266 DDS::Security::ParticipantCryptoHandle participant_crypto_handle_;
00267 DDS::Security::NativeCryptoHandle endpoint_crypto_handle_;
00268 };
00269
00270 class Writer : public DCPS::TransportSendListener, public Endpoint {
00271 public:
00272 Writer(const DCPS::RepoId& pub_id, Sedp& sedp);
00273 virtual ~Writer();
00274
00275 bool assoc(const DCPS::AssociationData& subscription);
00276
00277
00278 void data_delivered(const DCPS::DataSampleElement*);
00279
00280 void data_dropped(const DCPS::DataSampleElement*, bool by_transport);
00281
00282 void control_delivered(const DCPS::Message_Block_Ptr& sample);
00283
00284 void control_dropped(const DCPS::Message_Block_Ptr& sample,
00285 bool dropped_by_transport);
00286
00287 void notify_publication_disconnected(const DCPS::ReaderIdSeq&) {}
00288 void notify_publication_reconnected(const DCPS::ReaderIdSeq&) {}
00289 void notify_publication_lost(const DCPS::ReaderIdSeq&) {}
00290 void remove_associations(const DCPS::ReaderIdSeq&, bool) {}
00291 void retrieve_inline_qos_data(InlineQosData&) const {}
00292
00293 void send_sample(const ACE_Message_Block& data,
00294 size_t size,
00295 const DCPS::RepoId& reader,
00296 DCPS::SequenceNumber& sequence,
00297 bool historic = false);
00298
00299 DDS::ReturnCode_t write_parameter_list(const ParameterList& plist,
00300 const DCPS::RepoId& reader,
00301 DCPS::SequenceNumber& sequence);
00302
00303 DDS::ReturnCode_t write_participant_message(const ParticipantMessageData& pmd,
00304 const DCPS::RepoId& reader,
00305 DCPS::SequenceNumber& sequence);
00306
00307 DDS::ReturnCode_t write_stateless_message(const DDS::Security::ParticipantStatelessMessage& msg,
00308 const DCPS::RepoId& reader,
00309 DCPS::SequenceNumber& sequence);
00310
00311 DDS::ReturnCode_t write_volatile_message_secure(const DDS::Security::ParticipantVolatileMessageSecure& msg,
00312 const DCPS::RepoId& reader,
00313 DCPS::SequenceNumber& sequence);
00314
00315 DDS::ReturnCode_t write_dcps_participant_secure(const Security::SPDPdiscoveredParticipantData& msg,
00316 const DCPS::RepoId& reader,
00317 DCPS::SequenceNumber& sequence);
00318
00319 DDS::ReturnCode_t write_unregister_dispose(const DCPS::RepoId& rid, CORBA::UShort pid = PID_ENDPOINT_GUID);
00320
00321 void end_historic_samples(const DCPS::RepoId& reader);
00322
00323 private:
00324 Header header_;
00325 DCPS::SequenceNumber seq_;
00326
00327 void write_control_msg(DCPS::Message_Block_Ptr payload,
00328 size_t size,
00329 DCPS::MessageId id,
00330 DCPS::SequenceNumber seq = DCPS::SequenceNumber());
00331
00332 void set_header_fields(DCPS::DataSampleHeader& dsh,
00333 size_t size,
00334 const DCPS::RepoId& reader,
00335 DCPS::SequenceNumber& sequence,
00336 bool historic_sample = false,
00337 DCPS::MessageId id = DCPS::SAMPLE_DATA);
00338
00339 };
00340
00341 Writer publications_writer_;
00342
00343 #if defined(OPENDDS_SECURITY)
00344 Writer publications_secure_writer_;
00345 #endif
00346
00347 Writer subscriptions_writer_;
00348
00349 #if defined(OPENDDS_SECURITY)
00350 Writer subscriptions_secure_writer_;
00351 #endif
00352
00353 Writer participant_message_writer_;
00354
00355 #if defined(OPENDDS_SECURITY)
00356 Writer participant_message_secure_writer_;
00357 Writer participant_stateless_message_writer_;
00358 Writer participant_volatile_message_secure_writer_;
00359 Writer dcps_participant_secure_writer_;
00360 #endif
00361
00362 class Reader
00363 : public DCPS::TransportReceiveListener
00364 , public Endpoint
00365 {
00366 public:
00367 Reader(const DCPS::RepoId& sub_id, Sedp& sedp)
00368 : Endpoint(sub_id, sedp)
00369 , shutting_down_(false)
00370 {}
00371
00372 virtual ~Reader();
00373
00374 bool assoc(const DCPS::AssociationData& publication);
00375
00376
00377
00378 void data_received(const DCPS::ReceivedDataSample& sample);
00379
00380 void notify_subscription_disconnected(const DCPS::WriterIdSeq&) {}
00381 void notify_subscription_reconnected(const DCPS::WriterIdSeq&) {}
00382 void notify_subscription_lost(const DCPS::WriterIdSeq&) {}
00383 void remove_associations(const DCPS::WriterIdSeq&, bool) {}
00384
00385 ACE_Atomic_Op<ACE_SYNCH_MUTEX, bool> shutting_down_;
00386 };
00387
00388 typedef DCPS::RcHandle<Reader> Reader_rch;
00389
00390 Reader_rch publications_reader_;
00391
00392 #if defined(OPENDDS_SECURITY)
00393 Reader_rch publications_secure_reader_;
00394 #endif
00395
00396 Reader_rch subscriptions_reader_;
00397
00398 #if defined(OPENDDS_SECURITY)
00399 Reader_rch subscriptions_secure_reader_;
00400 #endif
00401
00402 Reader_rch participant_message_reader_;
00403
00404 #if defined(OPENDDS_SECURITY)
00405 Reader_rch participant_message_secure_reader_;
00406 Reader_rch participant_stateless_message_reader_;
00407 Reader_rch participant_volatile_message_secure_reader_;
00408 Reader_rch dcps_participant_secure_reader_;
00409 #endif
00410
00411 struct Task : ACE_Task_Ex<ACE_MT_SYNCH, Msg> {
00412 explicit Task(Sedp* sedp)
00413 : spdp_(&sedp->spdp_)
00414 , sedp_(sedp)
00415 , shutting_down_(false)
00416 {
00417 activate();
00418 }
00419 ~Task();
00420
00421 void enqueue(DCPS::MessageId id, DCPS::unique_ptr<Security::SPDPdiscoveredParticipantData> pdata);
00422
00423 void enqueue(DCPS::MessageId id, DCPS::unique_ptr<DCPS::DiscoveredWriterData> wdata);
00424 void enqueue(DCPS::MessageId id, DCPS::unique_ptr<DCPS::DiscoveredReaderData> rdata);
00425
00426 #if defined(OPENDDS_SECURITY)
00427 void enqueue(DCPS::MessageId id, DCPS::unique_ptr<DiscoveredWriterData_SecurityWrapper> wrapper);
00428 void enqueue(DCPS::MessageId id, DCPS::unique_ptr<DiscoveredReaderData_SecurityWrapper> wrapper);
00429 #endif
00430
00431 void enqueue(DCPS::MessageId id, DCPS::unique_ptr<ParticipantMessageData> data);
00432 void enqueue(Msg::MsgType which_bit, const DDS::InstanceHandle_t bit_ih);
00433
00434 #if defined(OPENDDS_SECURITY)
00435 void enqueue_participant_message_secure(DCPS::MessageId id, DCPS::unique_ptr<ParticipantMessageData> data);
00436 void enqueue_stateless_message(DCPS::MessageId id, DCPS::unique_ptr<DDS::Security::ParticipantStatelessMessage> data);
00437 void enqueue_volatile_message_secure(DCPS::MessageId id, DCPS::unique_ptr<DDS::Security::ParticipantVolatileMessageSecure> data);
00438 #endif
00439
00440 void acknowledge();
00441 void shutdown();
00442
00443 private:
00444 int svc();
00445
00446 void svc_i(const Security::SPDPdiscoveredParticipantData* pdata);
00447 void svc_secure_i(DCPS::MessageId id, const Security::SPDPdiscoveredParticipantData* pdata);
00448
00449 void svc_i(DCPS::MessageId id, const DCPS::DiscoveredWriterData* wdata);
00450 void svc_i(DCPS::MessageId id, const DCPS::DiscoveredReaderData* rdata);
00451
00452 #if defined(OPENDDS_SECURITY)
00453 void svc_i(DCPS::MessageId id, const DiscoveredWriterData_SecurityWrapper* wrapper);
00454 void svc_i(DCPS::MessageId id, const DiscoveredReaderData_SecurityWrapper* wrapper);
00455 #endif
00456
00457 void svc_i(DCPS::MessageId id, const ParticipantMessageData* data);
00458 void svc_i(Msg::MsgType which_bit, const DDS::InstanceHandle_t bit_ih);
00459
00460 #if defined(OPENDDS_SECURITY)
00461 void svc_participant_message_data_secure(DCPS::MessageId id, const ParticipantMessageData* data);
00462 void svc_stateless_message(DCPS::MessageId id, const DDS::Security::ParticipantStatelessMessage* data);
00463 void svc_volatile_message_secure(DCPS::MessageId id, const DDS::Security::ParticipantVolatileMessageSecure* data);
00464 #endif
00465
00466 Spdp* spdp_;
00467 Sedp* sedp_;
00468 bool shutting_down_;
00469 } task_;
00470
00471
00472 DCPS::TransportInst_rch transport_inst_;
00473
00474 #ifndef DDS_HAS_MINIMUM_BIT
00475 DCPS::TopicBuiltinTopicDataDataReaderImpl* topic_bit();
00476 DCPS::PublicationBuiltinTopicDataDataReaderImpl* pub_bit();
00477 DCPS::SubscriptionBuiltinTopicDataDataReaderImpl* sub_bit();
00478 #endif
00479
00480 void populate_discovered_writer_msg(
00481 DCPS::DiscoveredWriterData& dwd,
00482 const DCPS::RepoId& publication_id,
00483 const LocalPublication& pub);
00484
00485 void populate_discovered_reader_msg(
00486 DCPS::DiscoveredReaderData& drd,
00487 const DCPS::RepoId& subscription_id,
00488 const LocalSubscription& sub);
00489
00490 struct LocalParticipantMessage : LocalEndpoint {
00491 };
00492 typedef OPENDDS_MAP_CMP(DCPS::RepoId, LocalParticipantMessage,
00493 DCPS::GUID_tKeyLessThan) LocalParticipantMessageMap;
00494 typedef LocalParticipantMessageMap::iterator LocalParticipantMessageIter;
00495 typedef LocalParticipantMessageMap::const_iterator LocalParticipantMessageCIter;
00496 LocalParticipantMessageMap local_participant_messages_;
00497
00498 void process_discovered_writer_data(DCPS::MessageId message_id,
00499 const DCPS::DiscoveredWriterData& wdata,
00500 const DCPS::RepoId& guid,
00501 const DDS::Security::EndpointSecurityInfo* security_info = NULL);
00502
00503 void data_received(DCPS::MessageId message_id,
00504 const DCPS::DiscoveredWriterData& wdata);
00505
00506 #if defined(OPENDDS_SECURITY)
00507 void data_received(DCPS::MessageId message_id,
00508 const DiscoveredWriterData_SecurityWrapper& wrapper);
00509 #endif
00510
00511 void process_discovered_reader_data(DCPS::MessageId message_id,
00512 const DCPS::DiscoveredReaderData& rdata,
00513 const DCPS::RepoId& guid,
00514 const DDS::Security::EndpointSecurityInfo* security_info = NULL);
00515
00516 void data_received(DCPS::MessageId message_id,
00517 const DCPS::DiscoveredReaderData& rdata);
00518
00519 #if defined(OPENDDS_SECURITY)
00520 void data_received(DCPS::MessageId message_id,
00521 const DiscoveredReaderData_SecurityWrapper& wrapper);
00522 #endif
00523
00524 void data_received(DCPS::MessageId message_id,
00525 const ParticipantMessageData& data);
00526
00527 #if defined(OPENDDS_SECURITY)
00528 void received_participant_message_data_secure(DCPS::MessageId message_id,
00529 const ParticipantMessageData& data);
00530
00531 bool should_drop_stateless_message(const DDS::Security::ParticipantGenericMessage& msg);
00532 bool should_drop_volatile_message(const DDS::Security::ParticipantGenericMessage& msg);
00533 bool should_drop_message(const char* unsecure_topic_name);
00534
00535 void received_stateless_message(DCPS::MessageId message_id,
00536 const DDS::Security::ParticipantStatelessMessage& data);
00537
00538 void received_volatile_message_secure(DCPS::MessageId message_id,
00539 const DDS::Security::ParticipantVolatileMessageSecure& data);
00540 #endif
00541
00542 typedef std::pair<DCPS::MessageId, DCPS::DiscoveredWriterData> MsgIdWtrDataPair;
00543 typedef OPENDDS_MAP_CMP(DCPS::RepoId, MsgIdWtrDataPair,
00544 DCPS::GUID_tKeyLessThan) DeferredPublicationMap;
00545 DeferredPublicationMap deferred_publications_;
00546
00547 typedef std::pair<DCPS::MessageId, DCPS::DiscoveredReaderData> MsgIdRdrDataPair;
00548 typedef OPENDDS_MAP_CMP(DCPS::RepoId, MsgIdRdrDataPair,
00549 DCPS::GUID_tKeyLessThan) DeferredSubscriptionMap;
00550 DeferredSubscriptionMap deferred_subscriptions_;
00551
00552 void assign_bit_key(DiscoveredPublication& pub);
00553 void assign_bit_key(DiscoveredSubscription& sub);
00554
00555 template<typename Map>
00556 void remove_entities_belonging_to(Map& m, DCPS::RepoId participant);
00557
00558 void remove_from_bit_i(const DiscoveredPublication& pub);
00559 void remove_from_bit_i(const DiscoveredSubscription& sub);
00560
00561 virtual DDS::ReturnCode_t remove_publication_i(const DCPS::RepoId& publicationId);
00562 virtual DDS::ReturnCode_t remove_subscription_i(const DCPS::RepoId& subscriptionId);
00563
00564
00565
00566 DCPS::RepoIdSet defer_match_endpoints_, associated_participants_;
00567
00568 void inconsistent_topic(const DCPS::RepoIdSet& endpoints) const;
00569
00570 virtual bool shutting_down() const;
00571
00572 virtual void populate_transport_locator_sequence(DCPS::TransportLocatorSeq*& tls,
00573 DiscoveredSubscriptionIter& iter,
00574 const DCPS::RepoId& reader);
00575
00576 virtual void populate_transport_locator_sequence(DCPS::TransportLocatorSeq*& tls,
00577 DiscoveredPublicationIter& iter,
00578 const DCPS::RepoId& writer);
00579
00580 #if defined(OPENDDS_SECURITY)
00581 DCPS::TransportLocatorSeq
00582 add_security_info(const DCPS::TransportLocatorSeq& locators,
00583 const DCPS::RepoId& writer, const DCPS::RepoId& reader);
00584 #endif
00585
00586 virtual bool defer_writer(const DCPS::RepoId& writer,
00587 const DCPS::RepoId& writer_participant);
00588
00589 virtual bool defer_reader(const DCPS::RepoId& reader,
00590 const DCPS::RepoId& reader_participant);
00591
00592 static DCPS::RepoId make_id(const DCPS::RepoId& participant_id,
00593 const EntityId_t& entity);
00594
00595 static void set_inline_qos(DCPS::TransportLocatorSeq& locators);
00596
00597 void write_durable_publication_data(const DCPS::RepoId& reader);
00598 void write_durable_subscription_data(const DCPS::RepoId& reader);
00599
00600 #if defined(OPENDDS_SECURITY)
00601 void write_durable_publication_data_secure(const DCPS::RepoId& reader);
00602 void write_durable_subscription_data_secure(const DCPS::RepoId& reader);
00603 #endif
00604
00605 void write_durable_participant_message_data(const DCPS::RepoId& reader);
00606
00607 DDS::ReturnCode_t write_publication_data(const DCPS::RepoId& rid,
00608 LocalPublication& pub,
00609 const DCPS::RepoId& reader = DCPS::GUID_UNKNOWN);
00610
00611 #if defined(OPENDDS_SECURITY)
00612 DDS::ReturnCode_t write_publication_data_secure(const DCPS::RepoId& rid,
00613 LocalPublication& pub,
00614 const DCPS::RepoId& reader = DCPS::GUID_UNKNOWN);
00615 #endif
00616
00617 DDS::ReturnCode_t write_publication_data_unsecure(const DCPS::RepoId& rid,
00618 LocalPublication& pub,
00619 const DCPS::RepoId& reader = DCPS::GUID_UNKNOWN);
00620
00621 DDS::ReturnCode_t write_subscription_data(const DCPS::RepoId& rid,
00622 LocalSubscription& pub,
00623 const DCPS::RepoId& reader = DCPS::GUID_UNKNOWN);
00624
00625 #if defined(OPENDDS_SECURITY)
00626 DDS::ReturnCode_t write_subscription_data_secure(const DCPS::RepoId& rid,
00627 LocalSubscription& pub,
00628 const DCPS::RepoId& reader = DCPS::GUID_UNKNOWN);
00629 #endif
00630
00631 DDS::ReturnCode_t write_subscription_data_unsecure(const DCPS::RepoId& rid,
00632 LocalSubscription& pub,
00633 const DCPS::RepoId& reader = DCPS::GUID_UNKNOWN);
00634
00635 DDS::ReturnCode_t write_participant_message_data(const DCPS::RepoId& rid,
00636 LocalParticipantMessage& part,
00637 const DCPS::RepoId& reader = DCPS::GUID_UNKNOWN);
00638
00639 bool is_opendds(const GUID_t& endpoint) const;
00640
00641 #if defined(OPENDDS_SECURITY)
00642 DCPS::SequenceNumber secure_automatic_liveliness_seq_;
00643 DCPS::SequenceNumber secure_manual_liveliness_seq_;
00644 #endif
00645
00646 DCPS::SequenceNumber automatic_liveliness_seq_;
00647 DCPS::SequenceNumber manual_liveliness_seq_;
00648
00649 protected:
00650
00651 #if defined(OPENDDS_SECURITY)
00652 DDS::Security::DatawriterCryptoHandle generate_remote_matched_writer_crypto_handle(const DCPS::RepoId& writer_part, const DDS::Security::DatareaderCryptoHandle& drch);
00653 DDS::Security::DatareaderCryptoHandle generate_remote_matched_reader_crypto_handle(const DCPS::RepoId& reader_part, const DDS::Security::DatawriterCryptoHandle& dwch, bool relay_only);
00654 void create_and_send_datareader_crypto_tokens(const DDS::Security::DatareaderCryptoHandle& drch, const DCPS::RepoId& local_reader, const DDS::Security::DatawriterCryptoHandle& dwch, const DCPS::RepoId& remote_writer);
00655 void create_and_send_datawriter_crypto_tokens(const DDS::Security::DatawriterCryptoHandle& dwch, const DCPS::RepoId& local_writer, const DDS::Security::DatareaderCryptoHandle& drch, const DCPS::RepoId& remote_reader);
00656 void handle_datareader_crypto_tokens(const DDS::Security::ParticipantVolatileMessageSecure& msg);
00657 void handle_datawriter_crypto_tokens(const DDS::Security::ParticipantVolatileMessageSecure& msg);
00658
00659 DDS::DomainId_t get_domain_id() const;
00660 #endif
00661
00662 };
00663
00664
00665 class WaitForAcks {
00666 public:
00667 WaitForAcks();
00668 void ack();
00669 void wait_for_acks(unsigned int num_acks);
00670 void reset();
00671 private:
00672 ACE_Thread_Mutex lock_;
00673 ACE_Condition_Thread_Mutex cond_;
00674 unsigned int acks_;
00675 };
00676
00677 }
00678 }
00679
00680 OPENDDS_END_VERSIONED_NAMESPACE_DECL
00681
00682 #endif // OPENDDS_RTPS_SEDP_H