00001
00002
00003
00004
00005
00006
00007
00008 #ifndef DCPS_RTPSUDPDATALINK_H
00009 #define DCPS_RTPSUDPDATALINK_H
00010
00011 #include "Rtps_Udp_Export.h"
00012
00013 #include "RtpsUdpSendStrategy.h"
00014 #include "RtpsUdpSendStrategy_rch.h"
00015 #include "RtpsUdpReceiveStrategy.h"
00016 #include "RtpsUdpReceiveStrategy_rch.h"
00017 #include "RtpsCustomizedElement.h"
00018
00019 #include "ace/Basic_Types.h"
00020 #include "ace/SOCK_Dgram.h"
00021 #include "ace/SOCK_Dgram_Mcast.h"
00022
00023 #include "dds/DCPS/transport/framework/DataLink.h"
00024 #include "dds/DCPS/transport/framework/TransportReactorTask.h"
00025 #include "dds/DCPS/transport/framework/TransportReactorTask_rch.h"
00026 #include "dds/DCPS/transport/framework/TransportSendBuffer.h"
00027
00028 #include "dds/DCPS/DataSampleElement.h"
00029 #include "dds/DCPS/DisjointSequence.h"
00030 #include "dds/DCPS/GuidConverter.h"
00031 #include "dds/DCPS/PoolAllocator.h"
00032 #include "dds/DCPS/DiscoveryListener.h"
00033 #include "dds/DCPS/ReactorInterceptor.h"
00034 #include "dds/DCPS/RcEventHandler.h"
00035
00036 #if defined(OPENDDS_SECURITY)
00037 #include "dds/DdsSecurityCoreC.h"
00038 #include "dds/DCPS/security/framework/SecurityConfig_rch.h"
00039 #endif
00040
00041 class DDS_TEST;
00042
00043 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00044
00045 namespace OpenDDS {
00046 namespace DCPS {
00047
00048 class RtpsUdpInst;
00049 class RtpsUdpTransport;
00050 class ReceivedDataSample;
00051 typedef RcHandle<RtpsUdpInst> RtpsUdpInst_rch;
00052 typedef RcHandle<RtpsUdpTransport> RtpsUdpTransport_rch;
00053
00054 class OpenDDS_Rtps_Udp_Export RtpsUdpDataLink : public DataLink {
00055 public:
00056
00057 RtpsUdpDataLink(RtpsUdpTransport& transport,
00058 const GuidPrefix_t& local_prefix,
00059 const RtpsUdpInst& config,
00060 const TransportReactorTask_rch& reactor_task);
00061
00062 bool add_delayed_notification(TransportQueueElement* element);
00063
00064 void do_remove_sample(const RepoId& pub_id,
00065 const TransportQueueElement::MatchCriteria& criteria,
00066 ACE_Guard<ACE_Thread_Mutex>& guard);
00067
00068 RtpsUdpInst& config() const;
00069
00070 ACE_Reactor* get_reactor();
00071 bool reactor_is_shut_down();
00072
00073 ACE_SOCK_Dgram& unicast_socket();
00074 ACE_SOCK_Dgram_Mcast& multicast_socket();
00075
00076 bool open(const ACE_SOCK_Dgram& unicast_socket);
00077
00078 void received(const RTPS::DataSubmessage& data,
00079 const GuidPrefix_t& src_prefix);
00080
00081 void received(const RTPS::GapSubmessage& gap, const GuidPrefix_t& src_prefix);
00082
00083 void received(const RTPS::HeartBeatSubmessage& heartbeat,
00084 const GuidPrefix_t& src_prefix);
00085
00086 void received(const RTPS::HeartBeatFragSubmessage& hb_frag,
00087 const GuidPrefix_t& src_prefix);
00088
00089 void received(const RTPS::AckNackSubmessage& acknack,
00090 const GuidPrefix_t& src_prefix);
00091
00092 void received(const RTPS::NackFragSubmessage& nackfrag,
00093 const GuidPrefix_t& src_prefix);
00094
00095 const GuidPrefix_t& local_prefix() const { return local_prefix_; }
00096
00097 void add_locator(const RepoId& remote_id, const ACE_INET_Addr& address,
00098 bool requires_inline_qos);
00099
00100
00101
00102
00103 void get_locators(const RepoId& local_id,
00104 OPENDDS_SET(ACE_INET_Addr)& addrs) const;
00105
00106 ACE_INET_Addr get_locator(const RepoId& remote_id) const;
00107
00108 void associated(const RepoId& local, const RepoId& remote,
00109 bool local_reliable, bool remote_reliable,
00110 bool local_durable, bool remote_durable);
00111
00112 bool check_handshake_complete(const RepoId& local, const RepoId& remote);
00113
00114 void register_for_reader(const RepoId& writerid,
00115 const RepoId& readerid,
00116 const ACE_INET_Addr& address,
00117 OpenDDS::DCPS::DiscoveryListener* listener);
00118
00119 void unregister_for_reader(const RepoId& writerid,
00120 const RepoId& readerid);
00121
00122 void register_for_writer(const RepoId& readerid,
00123 const RepoId& writerid,
00124 const ACE_INET_Addr& address,
00125 OpenDDS::DCPS::DiscoveryListener* listener);
00126
00127 void unregister_for_writer(const RepoId& readerid,
00128 const RepoId& writerid);
00129
00130 virtual void pre_stop_i();
00131
00132 virtual void send_final_acks (const RepoId& readerid);
00133
00134 #if defined(OPENDDS_SECURITY)
00135 Security::SecurityConfig_rch security_config() const
00136 { return security_config_; }
00137
00138 DDS::Security::ParticipantCryptoHandle local_crypto_handle() const;
00139 void local_crypto_handle(DDS::Security::ParticipantCryptoHandle pch);
00140
00141 DDS::Security::ParticipantCryptoHandle peer_crypto_handle(const RepoId& peer) const;
00142 DDS::Security::DatawriterCryptoHandle writer_crypto_handle(const RepoId& writer) const;
00143 DDS::Security::DatareaderCryptoHandle reader_crypto_handle(const RepoId& reader) const;
00144
00145 void populate_security_handles(const RepoId& local_id, const RepoId& remote_id,
00146 const unsigned char* buffer,
00147 unsigned int buffer_size);
00148 #endif
00149
00150 private:
00151 virtual void stop_i();
00152 virtual void send_i(TransportQueueElement* element, bool relink = true);
00153 RemoveResult remove_sample(const DataSampleElement* sample, void* context);
00154
00155 virtual TransportQueueElement* customize_queue_element(
00156 TransportQueueElement* element);
00157
00158 virtual void release_remote_i(const RepoId& remote_id);
00159 virtual void release_reservations_i(const RepoId& remote_id,
00160 const RepoId& local_id);
00161
00162 friend class ::DDS_TEST;
00163
00164 static bool force_inline_qos_;
00165 bool requires_inline_qos(const GUIDSeq_var & peers);
00166
00167 typedef OPENDDS_MAP_CMP(RepoId, OPENDDS_VECTOR(RepoId),GUID_tKeyLessThan) DestToEntityMap;
00168 void add_gap_submsg(RTPS::SubmessageSeq& msg,
00169 const TransportQueueElement& tqe,
00170 const DestToEntityMap& dtem);
00171
00172 TransportReactorTask_rch reactor_task_;
00173
00174 RtpsUdpSendStrategy* send_strategy();
00175 RtpsUdpReceiveStrategy* receive_strategy();
00176
00177 GuidPrefix_t local_prefix_;
00178
00179 struct RemoteInfo {
00180 RemoteInfo() : addr_(), requires_inline_qos_(false) {}
00181 RemoteInfo(const ACE_INET_Addr& addr, bool iqos)
00182 : addr_(addr), requires_inline_qos_(iqos) {}
00183 ACE_INET_Addr addr_;
00184 bool requires_inline_qos_;
00185 };
00186 OPENDDS_MAP_CMP(RepoId, RemoteInfo, GUID_tKeyLessThan) locators_;
00187
00188 ACE_SOCK_Dgram unicast_socket_;
00189 ACE_SOCK_Dgram_Mcast multicast_socket_;
00190
00191 struct MultiSendBuffer : TransportSendBuffer {
00192
00193 MultiSendBuffer(RtpsUdpDataLink* outer, size_t capacity)
00194 : TransportSendBuffer(capacity)
00195 , outer_(outer)
00196 {}
00197
00198 void retain_all(RepoId pub_id);
00199 void insert(SequenceNumber sequence,
00200 TransportSendStrategy::QueueType* queue,
00201 ACE_Message_Block* chain);
00202
00203 RtpsUdpDataLink* outer_;
00204
00205 } multi_buff_;
00206
00207
00208
00209
00210 struct ReaderInfo {
00211 CORBA::Long acknack_recvd_count_, nackfrag_recvd_count_;
00212 OPENDDS_VECTOR(RTPS::SequenceNumberSet) requested_changes_;
00213 OPENDDS_MAP(SequenceNumber, RTPS::FragmentNumberSet) requested_frags_;
00214 SequenceNumber cur_cumulative_ack_;
00215 bool handshake_done_, durable_;
00216 OPENDDS_MAP(SequenceNumber, TransportQueueElement*) durable_data_;
00217 ACE_Time_Value durable_timestamp_;
00218
00219 ReaderInfo()
00220 : acknack_recvd_count_(0)
00221 , nackfrag_recvd_count_(0)
00222 , handshake_done_(false)
00223 , durable_(false)
00224 {}
00225 ~ReaderInfo();
00226 void expire_durable_data();
00227 bool expecting_durable_data() const;
00228 };
00229
00230 typedef OPENDDS_MAP_CMP(RepoId, ReaderInfo, GUID_tKeyLessThan) ReaderInfoMap;
00231
00232 struct RtpsWriter {
00233 ReaderInfoMap remote_readers_;
00234 RcHandle<SingleSendBuffer> send_buff_;
00235 SequenceNumber expected_;
00236 typedef OPENDDS_MULTIMAP(SequenceNumber, TransportQueueElement*) SnToTqeMap;
00237 SnToTqeMap elems_not_acked_;
00238
00239 SnToTqeMap to_deliver_;
00240 bool durable_;
00241
00242 RtpsWriter() : durable_(false) {}
00243 ~RtpsWriter();
00244 SequenceNumber heartbeat_high(const ReaderInfo&) const;
00245 void add_elem_awaiting_ack(TransportQueueElement* element);
00246 };
00247
00248 typedef OPENDDS_MAP_CMP(RepoId, RtpsWriter, GUID_tKeyLessThan) RtpsWriterMap;
00249 RtpsWriterMap writers_;
00250
00251 void end_historic_samples(RtpsWriterMap::iterator writer,
00252 const DataSampleHeader& header,
00253 ACE_Message_Block* body);
00254
00255
00256
00257
00258 struct WriterInfo {
00259 DisjointSequence recvd_;
00260 OPENDDS_MAP(SequenceNumber, ReceivedDataSample) held_;
00261 SequenceRange hb_range_;
00262 OPENDDS_MAP(SequenceNumber, RTPS::FragmentNumber_t) frags_;
00263 bool ack_pending_, initial_hb_;
00264 CORBA::Long heartbeat_recvd_count_, hb_frag_recvd_count_,
00265 acknack_count_, nackfrag_count_;
00266
00267 WriterInfo()
00268 : ack_pending_(false), initial_hb_(true), heartbeat_recvd_count_(0),
00269 hb_frag_recvd_count_(0), acknack_count_(0), nackfrag_count_(0) {}
00270
00271 bool should_nack() const;
00272 };
00273
00274 typedef OPENDDS_MAP_CMP(RepoId, WriterInfo, GUID_tKeyLessThan) WriterInfoMap;
00275
00276 struct RtpsReader {
00277 WriterInfoMap remote_writers_;
00278 bool durable_;
00279 bool nack_durable(const WriterInfo& info);
00280 };
00281
00282 typedef OPENDDS_MAP_CMP(RepoId, RtpsReader, GUID_tKeyLessThan) RtpsReaderMap;
00283 RtpsReaderMap readers_;
00284
00285 typedef OPENDDS_MULTIMAP_CMP(RepoId, RtpsReaderMap::iterator, GUID_tKeyLessThan)
00286 RtpsReaderIndex;
00287 RtpsReaderIndex reader_index_;
00288
00289 void deliver_held_data(const RepoId& readerId, WriterInfo& info, bool durable);
00290
00291
00292
00293
00294 mutable ACE_Thread_Mutex lock_;
00295
00296 size_t generate_nack_frags(OPENDDS_VECTOR(RTPS::NackFragSubmessage)& nack_frags,
00297 WriterInfo& wi, const RepoId& pub_id);
00298
00299
00300
00301
00302
00303
00304
00305 static void extend_bitmap_range(RTPS::FragmentNumberSet& fnSet,
00306 CORBA::ULong extent);
00307
00308 bool process_heartbeat_i(const RTPS::HeartBeatSubmessage& heartbeat,
00309 const RepoId& src, RtpsReaderMap::value_type& rr);
00310
00311 bool process_hb_frag_i(const RTPS::HeartBeatFragSubmessage& hb_frag,
00312 const RepoId& src, RtpsReaderMap::value_type& rr);
00313
00314 bool process_gap_i(const RTPS::GapSubmessage& gap, const RepoId& src,
00315 RtpsReaderMap::value_type& rr);
00316
00317 bool process_data_i(const RTPS::DataSubmessage& data, const RepoId& src,
00318 RtpsReaderMap::value_type& rr);
00319
00320 void durability_resend(TransportQueueElement* element);
00321 void send_durability_gaps(const RepoId& writer, const RepoId& reader,
00322 const DisjointSequence& gaps);
00323 ACE_Message_Block* marshal_gaps(const RepoId& writer, const RepoId& reader,
00324 const DisjointSequence& gaps,
00325 bool durable = false);
00326
00327 void send_nackfrag_replies(RtpsWriter& writer, DisjointSequence& gaps,
00328 OPENDDS_SET(ACE_INET_Addr)& gap_recipients);
00329
00330 template<typename T, typename FN>
00331 void datareader_dispatch(const T& submessage, const GuidPrefix_t& src_prefix,
00332 const FN& func)
00333 {
00334 using std::pair;
00335 RepoId local;
00336 std::memcpy(local.guidPrefix, local_prefix_, sizeof(GuidPrefix_t));
00337 local.entityId = submessage.readerId;
00338
00339 RepoId src;
00340 std::memcpy(src.guidPrefix, src_prefix, sizeof(GuidPrefix_t));
00341 src.entityId = submessage.writerId;
00342
00343 bool schedule_timer = false;
00344 ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00345 if (local.entityId == ENTITYID_UNKNOWN) {
00346 for (pair<RtpsReaderIndex::iterator, RtpsReaderIndex::iterator> iters =
00347 reader_index_.equal_range(src);
00348 iters.first != iters.second; ++iters.first) {
00349 schedule_timer |= (this->*func)(submessage, src, *iters.first->second);
00350 }
00351
00352 } else {
00353 const RtpsReaderMap::iterator rr = readers_.find(local);
00354 if (rr == readers_.end()) {
00355 return;
00356 }
00357 schedule_timer = (this->*func)(submessage, src, *rr);
00358 }
00359 g.release();
00360 if (schedule_timer) {
00361 heartbeat_reply_.schedule();
00362 }
00363 }
00364
00365 void send_nack_replies();
00366 void send_directed_nack_replies(const RepoId& writerId, RtpsWriter& writer,
00367 const RepoId& readerId, ReaderInfo& reader);
00368 void process_requested_changes(DisjointSequence& requests,
00369 const RtpsWriter& writer,
00370 const ReaderInfo& reader);
00371 void process_acked_by_all_i(ACE_Guard<ACE_Thread_Mutex>& g, const RepoId& pub_id);
00372 void send_heartbeats();
00373 void send_directed_heartbeats(OPENDDS_VECTOR(RTPS::HeartBeatSubmessage)& hbs);
00374 void check_heartbeats();
00375 void send_heartbeats_manual(const TransportSendControlElement* tsce);
00376 void send_heartbeat_replies();
00377
00378 CORBA::Long best_effort_heartbeat_count_;
00379
00380 typedef void (RtpsUdpDataLink::*PMF)();
00381
00382 struct TimedDelay : ACE_Event_Handler {
00383
00384 TimedDelay(RtpsUdpDataLink* outer, PMF function,
00385 const ACE_Time_Value& timeout)
00386 : outer_(outer), function_(function), timeout_(timeout), scheduled_(false)
00387 {}
00388
00389 void schedule();
00390 void cancel();
00391
00392 int handle_timeout(const ACE_Time_Value&, const void*)
00393 {
00394 scheduled_ = false;
00395 (outer_->*function_)();
00396 return 0;
00397 }
00398
00399 RtpsUdpDataLink* outer_;
00400 PMF function_;
00401 ACE_Time_Value timeout_;
00402 bool scheduled_;
00403
00404 } nack_reply_, heartbeat_reply_;
00405
00406 struct HeartBeat : ReactorInterceptor {
00407
00408 explicit HeartBeat(ACE_Reactor* reactor, ACE_thread_t owner, RtpsUdpDataLink* outer, PMF function)
00409 : ReactorInterceptor(reactor, owner)
00410 , outer_(outer)
00411 , function_(function)
00412 , enabled_(false) {}
00413
00414 void schedule_enable()
00415 {
00416 ScheduleEnableCommand c(this);
00417 execute_or_enqueue(c);
00418 }
00419
00420 int handle_timeout(const ACE_Time_Value&, const void*)
00421 {
00422 (outer_->*function_)();
00423 return 0;
00424 }
00425
00426 bool reactor_is_shut_down() const
00427 {
00428 return outer_->reactor_is_shut_down();
00429 }
00430
00431 void enable();
00432 void disable();
00433
00434 RtpsUdpDataLink* outer_;
00435 PMF function_;
00436 bool enabled_;
00437
00438 struct ScheduleEnableCommand : public Command {
00439 ScheduleEnableCommand(HeartBeat* hb)
00440 : heartbeat_(hb)
00441 { }
00442
00443 virtual void execute()
00444 {
00445 heartbeat_->enable();
00446 }
00447
00448 HeartBeat* heartbeat_;
00449 };
00450
00451 };
00452
00453 RcHandle<HeartBeat> heartbeat_, heartbeatchecker_;
00454
00455
00456 struct InterestingRemote {
00457
00458 RepoId localid;
00459
00460 ACE_INET_Addr address;
00461
00462 DiscoveryListener* listener;
00463
00464 ACE_Time_Value last_activity;
00465
00466 enum { DOES_NOT_EXIST, EXISTS } status;
00467
00468 InterestingRemote() { }
00469 InterestingRemote(const RepoId& w, const ACE_INET_Addr& a, DiscoveryListener* l)
00470 : localid(w)
00471 , address(a)
00472 , listener(l)
00473
00474 , status(DOES_NOT_EXIST)
00475 { }
00476 };
00477 typedef OPENDDS_MULTIMAP_CMP(RepoId, InterestingRemote, DCPS::GUID_tKeyLessThan) InterestingRemoteMapType;
00478 InterestingRemoteMapType interesting_readers_;
00479 InterestingRemoteMapType interesting_writers_;
00480
00481 typedef std::pair<RepoId, InterestingRemote> CallbackType;
00482
00483
00484 typedef OPENDDS_MAP_CMP(RepoId, CORBA::Long, DCPS::GUID_tKeyLessThan) HeartBeatCountMapType;
00485 HeartBeatCountMapType heartbeat_counts_;
00486
00487 struct InterestingAckNack {
00488 RepoId writerid;
00489 RepoId readerid;
00490 ACE_INET_Addr writer_address;
00491
00492 InterestingAckNack() { }
00493 InterestingAckNack(const RepoId& w, const RepoId& r, const ACE_INET_Addr& wa)
00494 : writerid(w)
00495 , readerid(r)
00496 , writer_address(wa)
00497 { }
00498
00499 bool operator<(const InterestingAckNack& other) const {
00500 if (writerid != other.writerid) {
00501 return DCPS::GUID_tKeyLessThan() (writerid, other.writerid);
00502 }
00503 return DCPS::GUID_tKeyLessThan() (readerid, other.readerid);
00504 }
00505 };
00506
00507 typedef OPENDDS_SET(InterestingAckNack) InterestingAckNackSetType;
00508 InterestingAckNackSetType interesting_ack_nacks_;
00509
00510 void send_ack_nacks(RtpsReaderMap::iterator rr, bool finalFlag = false);
00511
00512 class HeldDataDeliveryHandler : public RcEventHandler {
00513 public:
00514 HeldDataDeliveryHandler(RtpsUdpDataLink* link)
00515 : link_(link) {
00516 }
00517
00518
00519 int handle_exception(ACE_HANDLE );
00520
00521 void notify_delivery(const RepoId& readerId, WriterInfo& info);
00522
00523 virtual ACE_Event_Handler::Reference_Count add_reference();
00524 virtual ACE_Event_Handler::Reference_Count remove_reference();
00525 private:
00526 RtpsUdpDataLink* link_;
00527 typedef std::pair<ReceivedDataSample, RepoId> HeldDataEntry;
00528 typedef OPENDDS_VECTOR(HeldDataEntry) HeldData;
00529 HeldData held_data_;
00530 };
00531 HeldDataDeliveryHandler held_data_delivery_handler_;
00532
00533 #if defined(OPENDDS_SECURITY)
00534 Security::SecurityConfig_rch security_config_;
00535 DDS::Security::ParticipantCryptoHandle local_crypto_handle_;
00536
00537 typedef OPENDDS_MAP_CMP(RepoId, DDS::Security::NativeCryptoHandle,
00538 GUID_tKeyLessThan) PeerHandlesMap;
00539 PeerHandlesMap peer_crypto_handles_;
00540
00541 typedef OPENDDS_MAP_CMP(RepoId, DDS::Security::NativeCryptoHandle,
00542 GUID_tKeyLessThan)::const_iterator PeerHandlesCIter;
00543 #endif
00544
00545 };
00546
00547 }
00548 }
00549
00550 OPENDDS_END_VERSIONED_NAMESPACE_DECL
00551
00552 #ifdef __ACE_INLINE__
00553 # include "RtpsUdpDataLink.inl"
00554 #endif
00555
00556 #endif