00001
00002
00003
00004
00005
00006
00007
00008 #ifndef OPENDDS_RTPS_SEDP_H
00009 #define OPENDDS_RTPS_SEDP_H
00010
00011 #include "dds/DdsDcpsInfrastructureC.h"
00012 #include "dds/DdsDcpsInfoUtilsC.h"
00013 #include "dds/DdsDcpsCoreTypeSupportImpl.h"
00014
00015 #include "dds/DCPS/RTPS/RtpsCoreTypeSupportImpl.h"
00016 #include "dds/DCPS/RTPS/BaseMessageTypes.h"
00017 #include "dds/DCPS/RTPS/BaseMessageUtils.h"
00018
00019 #include "dds/DCPS/RcHandle_T.h"
00020 #include "dds/DCPS/GuidUtils.h"
00021 #include "dds/DCPS/DataReaderCallbacks.h"
00022 #include "dds/DCPS/Definitions.h"
00023 #include "dds/DCPS/BuiltInTopicUtils.h"
00024 #include "dds/DCPS/DataSampleElement.h"
00025 #include "dds/DCPS/DataSampleHeader.h"
00026 #include "dds/DCPS/PoolAllocationBase.h"
00027 #include "dds/DCPS/DiscoveryBase.h"
00028
00029 #include "dds/DCPS/transport/framework/TransportRegistry.h"
00030 #include "dds/DCPS/transport/framework/TransportSendListener.h"
00031 #include "dds/DCPS/transport/framework/TransportClient.h"
00032 #include "dds/DCPS/transport/framework/TransportInst_rch.h"
00033
00034 #include "ace/Task_Ex_T.h"
00035 #include "ace/Condition_Thread_Mutex.h"
00036 #include "ace/Thread_Mutex.h"
00037 #include "dds/DCPS/PoolAllocator.h"
00038
00039
00040 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00041 #pragma once
00042 #endif
00043
00044 namespace OpenDDS {
00045 namespace RTPS {
00046
00047 enum RtpsFlags { FLAG_E = 1, FLAG_Q = 2, FLAG_D = 4 };
00048
00049 class RtpsDiscovery;
00050 class Spdp;
00051
00052 class WaitForAcks;
00053
00054 class Sedp : public OpenDDS::DCPS::EndpointManager<SPDPdiscoveredParticipantData> {
00055 public:
00056 Sedp(const DCPS::RepoId& participant_id,
00057 Spdp& owner,
00058 ACE_Thread_Mutex& lock);
00059
00060 DDS::ReturnCode_t init(const DCPS::RepoId& guid,
00061 const RtpsDiscovery& disco,
00062 DDS::DomainId_t domainId);
00063
00064
00065 void acknowledge();
00066
00067 void shutdown();
00068 void unicast_locators(OpenDDS::DCPS::LocatorSeq& locators) const;
00069
00070
00071
00072 const ACE_INET_Addr& local_address() const;
00073 const ACE_INET_Addr& multicast_group() const;
00074 bool map_ipv4_to_ipv6() const;
00075
00076 void associate(const SPDPdiscoveredParticipantData& pdata);
00077 bool disassociate(const SPDPdiscoveredParticipantData& pdata);
00078
00079
00080 bool update_topic_qos(const DCPS::RepoId& topicId, const DDS::TopicQos& qos,
00081 OPENDDS_STRING& name);
00082
00083
00084 bool update_publication_qos(const DCPS::RepoId& publicationId,
00085 const DDS::DataWriterQos& qos,
00086 const DDS::PublisherQos& publisherQos);
00087
00088
00089 bool update_subscription_qos(const DCPS::RepoId& subscriptionId,
00090 const DDS::DataReaderQos& qos,
00091 const DDS::SubscriberQos& subscriberQos);
00092 bool update_subscription_params(const DCPS::RepoId& subId,
00093 const DDS::StringSeq& params);
00094
00095
00096 void association_complete(const DCPS::RepoId& localId,
00097 const DCPS::RepoId& remoteId);
00098
00099 void signal_liveliness(DDS::LivelinessQosPolicyKind kind);
00100
00101 static const bool host_is_bigendian_;
00102 private:
00103 Spdp& spdp_;
00104
00105 struct Msg : public OpenDDS::DCPS::PoolAllocationBase {
00106 enum MsgType { MSG_PARTICIPANT, MSG_WRITER, MSG_READER, MSG_PARTICIPANT_DATA,
00107 MSG_REMOVE_FROM_PUB_BIT, MSG_REMOVE_FROM_SUB_BIT,
00108 MSG_FINI_BIT, MSG_STOP } type_;
00109 DCPS::MessageId id_;
00110 union {
00111 const SPDPdiscoveredParticipantData* dpdata_;
00112 const OpenDDS::DCPS::DiscoveredWriterData* wdata_;
00113 const OpenDDS::DCPS::DiscoveredReaderData* rdata_;
00114 const ParticipantMessageData* pmdata_;
00115 DDS::InstanceHandle_t ih_;
00116 };
00117 Msg(MsgType mt, DCPS::MessageId id, const SPDPdiscoveredParticipantData* dpdata)
00118 : type_(mt), id_(id), dpdata_(dpdata) {}
00119 Msg(MsgType mt, DCPS::MessageId id, const OpenDDS::DCPS::DiscoveredWriterData* wdata)
00120 : type_(mt), id_(id), wdata_(wdata) {}
00121 Msg(MsgType mt, DCPS::MessageId id, const OpenDDS::DCPS::DiscoveredReaderData* rdata)
00122 : type_(mt), id_(id), rdata_(rdata) {}
00123 Msg(MsgType mt, DCPS::MessageId id, const ParticipantMessageData* pmdata)
00124 : type_(mt), id_(id), pmdata_(pmdata) {}
00125 Msg(MsgType mt, DCPS::MessageId id, DDS::InstanceHandle_t ih)
00126 : type_(mt), id_(id), ih_(ih) {}
00127 };
00128
00129 class Endpoint : public DCPS::TransportClient {
00130 public:
00131 Endpoint(const DCPS::RepoId& repo_id, Sedp& sedp)
00132 : repo_id_(repo_id)
00133 , sedp_(sedp)
00134 {}
00135
00136 virtual ~Endpoint();
00137
00138
00139 bool check_transport_qos(const DCPS::TransportInst&)
00140 { return true; }
00141 const DCPS::RepoId& get_repo_id() const
00142 { return repo_id_; }
00143 DDS::DomainId_t domain_id() const
00144 { return 0; }
00145 CORBA::Long get_priority_value(const DCPS::AssociationData&) const
00146 { return 0; }
00147
00148 using DCPS::TransportClient::enable_transport_using_config;
00149 using DCPS::TransportClient::disassociate;
00150
00151 protected:
00152 DCPS::RepoId repo_id_;
00153 Sedp& sedp_;
00154 };
00155
00156 class Writer : public DCPS::TransportSendListener, public Endpoint {
00157 public:
00158 Writer(const DCPS::RepoId& pub_id, Sedp& sedp);
00159 virtual ~Writer();
00160
00161 bool assoc(const DCPS::AssociationData& subscription);
00162
00163
00164 void data_delivered(const DCPS::DataSampleElement*);
00165
00166 void data_dropped(const DCPS::DataSampleElement*, bool by_transport);
00167
00168 void control_delivered(ACE_Message_Block* sample);
00169
00170 void control_dropped(ACE_Message_Block* sample,
00171 bool dropped_by_transport);
00172
00173 void notify_publication_disconnected(const DCPS::ReaderIdSeq&) {}
00174 void notify_publication_reconnected(const DCPS::ReaderIdSeq&) {}
00175 void notify_publication_lost(const DCPS::ReaderIdSeq&) {}
00176 void notify_connection_deleted(const DCPS::RepoId&) {}
00177 void remove_associations(const DCPS::ReaderIdSeq&, bool) {}
00178 void retrieve_inline_qos_data(InlineQosData&) const {}
00179
00180 DDS::ReturnCode_t write_sample(const ParameterList& plist,
00181 const DCPS::RepoId& reader,
00182 DCPS::SequenceNumber& sequence);
00183 DDS::ReturnCode_t write_sample(const ParticipantMessageData& pmd,
00184 const DCPS::RepoId& reader,
00185 DCPS::SequenceNumber& sequence);
00186 DDS::ReturnCode_t write_unregister_dispose(const DCPS::RepoId& rid);
00187
00188 void end_historic_samples(const DCPS::RepoId& reader);
00189
00190 private:
00191 DCPS::TransportSendElementAllocator alloc_;
00192 Header header_;
00193 DCPS::SequenceNumber seq_;
00194
00195 void write_control_msg(ACE_Message_Block& payload,
00196 size_t size,
00197 DCPS::MessageId id,
00198 DCPS::SequenceNumber seq = DCPS::SequenceNumber());
00199
00200 void set_header_fields(DCPS::DataSampleHeader& dsh,
00201 size_t size,
00202 const DCPS::RepoId& reader,
00203 DCPS::SequenceNumber& sequence,
00204 DCPS::MessageId id = DCPS::SAMPLE_DATA);
00205
00206 } publications_writer_, subscriptions_writer_, participant_message_writer_;
00207
00208 class Reader
00209 : public DCPS::TransportReceiveListener
00210 , public Endpoint
00211 , public DCPS::RcObject<ACE_SYNCH_MUTEX>
00212 {
00213 public:
00214 Reader(const DCPS::RepoId& sub_id, Sedp& sedp)
00215 : Endpoint(sub_id, sedp)
00216 , shutting_down_(0)
00217 {}
00218
00219 virtual ~Reader();
00220
00221 bool assoc(const DCPS::AssociationData& publication);
00222
00223
00224
00225 void data_received(const DCPS::ReceivedDataSample& sample);
00226
00227 void notify_subscription_disconnected(const DCPS::WriterIdSeq&) {}
00228 void notify_subscription_reconnected(const DCPS::WriterIdSeq&) {}
00229 void notify_subscription_lost(const DCPS::WriterIdSeq&) {}
00230 void notify_connection_deleted(const DCPS::RepoId&) {}
00231 void remove_associations(const DCPS::WriterIdSeq&, bool) {}
00232
00233 void listener_add_ref() { _add_ref(); }
00234 void listener_remove_ref() { _remove_ref(); }
00235
00236 ACE_Atomic_Op<ACE_SYNCH_MUTEX, long> shutting_down_;
00237 };
00238
00239 typedef DCPS::RcHandle<Reader> Reader_rch;
00240
00241 Reader_rch publications_reader_, subscriptions_reader_, participant_message_reader_;
00242
00243 struct Task : ACE_Task_Ex<ACE_MT_SYNCH, Msg> {
00244 explicit Task(Sedp* sedp)
00245 : spdp_(&sedp->spdp_)
00246 , sedp_(sedp)
00247 , shutting_down_(false)
00248 {
00249 activate();
00250 }
00251 ~Task();
00252
00253 void enqueue(const SPDPdiscoveredParticipantData* pdata);
00254 void enqueue(DCPS::MessageId id, const OpenDDS::DCPS::DiscoveredWriterData* wdata);
00255 void enqueue(DCPS::MessageId id, const OpenDDS::DCPS::DiscoveredReaderData* rdata);
00256 void enqueue(DCPS::MessageId id, const ParticipantMessageData* data);
00257 void enqueue(Msg::MsgType which_bit, const DDS::InstanceHandle_t bit_ih);
00258
00259 void acknowledge();
00260 void shutdown();
00261
00262
00263 private:
00264 int svc();
00265
00266 void svc_i(const SPDPdiscoveredParticipantData* pdata);
00267 void svc_i(DCPS::MessageId id, const OpenDDS::DCPS::DiscoveredWriterData* wdata);
00268 void svc_i(DCPS::MessageId id, const OpenDDS::DCPS::DiscoveredReaderData* rdata);
00269 void svc_i(DCPS::MessageId id, const ParticipantMessageData* data);
00270 void svc_i(Msg::MsgType which_bit, const DDS::InstanceHandle_t bit_ih);
00271
00272 Spdp* spdp_;
00273 Sedp* sedp_;
00274 bool shutting_down_;
00275 } task_;
00276
00277
00278 DCPS::TransportInst_rch transport_inst_;
00279
00280 #ifndef DDS_HAS_MINIMUM_BIT
00281 DDS::TopicBuiltinTopicDataDataReaderImpl* topic_bit();
00282 DDS::PublicationBuiltinTopicDataDataReaderImpl* pub_bit();
00283 DDS::SubscriptionBuiltinTopicDataDataReaderImpl* sub_bit();
00284 #endif
00285
00286 void populate_discovered_writer_msg(
00287 OpenDDS::DCPS::DiscoveredWriterData& dwd,
00288 const DCPS::RepoId& publication_id,
00289 const LocalPublication& pub);
00290
00291 void populate_discovered_reader_msg(
00292 OpenDDS::DCPS::DiscoveredReaderData& drd,
00293 const DCPS::RepoId& subscription_id,
00294 const LocalSubscription& sub);
00295
00296 struct LocalParticipantMessage : LocalEndpoint {
00297 };
00298 typedef OPENDDS_MAP_CMP(DCPS::RepoId, LocalParticipantMessage,
00299 DCPS::GUID_tKeyLessThan) LocalParticipantMessageMap;
00300 typedef LocalParticipantMessageMap::iterator LocalParticipantMessageIter;
00301 typedef LocalParticipantMessageMap::const_iterator LocalParticipantMessageCIter;
00302 LocalParticipantMessageMap local_participant_messages_;
00303
00304 void data_received(DCPS::MessageId message_id,
00305 const OpenDDS::DCPS::DiscoveredWriterData& wdata);
00306 void data_received(DCPS::MessageId message_id,
00307 const OpenDDS::DCPS::DiscoveredReaderData& rdata);
00308 void data_received(DCPS::MessageId message_id,
00309 const ParticipantMessageData& data);
00310
00311 typedef std::pair<DCPS::MessageId, OpenDDS::DCPS::DiscoveredWriterData> MsgIdWtrDataPair;
00312 typedef OPENDDS_MAP_CMP(DCPS::RepoId, MsgIdWtrDataPair,
00313 DCPS::GUID_tKeyLessThan) DeferredPublicationMap;
00314 DeferredPublicationMap deferred_publications_;
00315
00316 typedef std::pair<DCPS::MessageId, OpenDDS::DCPS::DiscoveredReaderData> MsgIdRdrDataPair;
00317 typedef OPENDDS_MAP_CMP(DCPS::RepoId, MsgIdRdrDataPair,
00318 DCPS::GUID_tKeyLessThan) DeferredSubscriptionMap;
00319 DeferredSubscriptionMap deferred_subscriptions_;
00320
00321 void assign_bit_key(DiscoveredPublication& pub);
00322 void assign_bit_key(DiscoveredSubscription& sub);
00323
00324 template<typename Map>
00325 void remove_entities_belonging_to(Map& m, DCPS::RepoId participant);
00326
00327 void remove_from_bit_i(const DiscoveredPublication& pub);
00328 void remove_from_bit_i(const DiscoveredSubscription& sub);
00329
00330 virtual DDS::ReturnCode_t remove_publication_i(const DCPS::RepoId& publicationId);
00331 virtual DDS::ReturnCode_t remove_subscription_i(const DCPS::RepoId& subscriptionId);
00332
00333
00334
00335 DCPS::RepoIdSet defer_match_endpoints_, associated_participants_;
00336
00337 void inconsistent_topic(const DCPS::RepoIdSet& endpoints) const;
00338
00339 virtual bool shutting_down() const;
00340
00341 virtual void populate_transport_locator_sequence(DCPS::TransportLocatorSeq*& tls,
00342 DiscoveredSubscriptionIter& iter,
00343 const DCPS::RepoId& reader);
00344
00345 virtual void populate_transport_locator_sequence(DCPS::TransportLocatorSeq*& tls,
00346 DiscoveredPublicationIter& iter,
00347 const DCPS::RepoId& writer);
00348
00349 virtual bool defer_writer(const DCPS::RepoId& writer,
00350 const DCPS::RepoId& writer_participant);
00351
00352 virtual bool defer_reader(const DCPS::RepoId& reader,
00353 const DCPS::RepoId& reader_participant);
00354
00355 static DCPS::RepoId make_id(const DCPS::RepoId& participant_id,
00356 const EntityId_t& entity);
00357
00358 static void set_inline_qos(DCPS::TransportLocatorSeq& locators);
00359
00360 void write_durable_publication_data(const DCPS::RepoId& reader);
00361 void write_durable_subscription_data(const DCPS::RepoId& reader);
00362 void write_durable_participant_message_data(const DCPS::RepoId& reader);
00363
00364 DDS::ReturnCode_t write_publication_data(const DCPS::RepoId& rid,
00365 LocalPublication& pub,
00366 const DCPS::RepoId& reader = DCPS::GUID_UNKNOWN);
00367 DDS::ReturnCode_t write_subscription_data(const DCPS::RepoId& rid,
00368 LocalSubscription& pub,
00369 const DCPS::RepoId& reader = DCPS::GUID_UNKNOWN);
00370 DDS::ReturnCode_t write_participant_message_data(const DCPS::RepoId& rid,
00371 LocalParticipantMessage& part,
00372 const DCPS::RepoId& reader = DCPS::GUID_UNKNOWN);
00373
00374 DCPS::SequenceNumber automatic_liveliness_seq_;
00375 DCPS::SequenceNumber manual_liveliness_seq_;
00376 };
00377
00378
00379 class WaitForAcks {
00380 public:
00381 WaitForAcks();
00382 void ack();
00383 void wait_for_acks(unsigned int num_acks);
00384 void reset();
00385 private:
00386 ACE_Thread_Mutex lock_;
00387 ACE_Condition_Thread_Mutex cond_;
00388 unsigned int acks_;
00389 };
00390
00391 }
00392 }
00393
00394 #endif // OPENDDS_RTPS_SEDP_H