00001
00002
00003
00004
00005
00006
00007
00008 #ifndef DCPS_RTPSUDPDATALINK_H
00009 #define DCPS_RTPSUDPDATALINK_H
00010
00011 #include "Rtps_Udp_Export.h"
00012
00013 #include "RtpsUdpSendStrategy.h"
00014 #include "RtpsUdpSendStrategy_rch.h"
00015 #include "RtpsUdpReceiveStrategy.h"
00016 #include "RtpsUdpReceiveStrategy_rch.h"
00017 #include "RtpsCustomizedElement.h"
00018
00019 #include "ace/Basic_Types.h"
00020 #include "ace/SOCK_Dgram.h"
00021 #include "ace/SOCK_Dgram_Mcast.h"
00022
00023 #include "dds/DCPS/transport/framework/DataLink.h"
00024 #include "dds/DCPS/transport/framework/TransportReactorTask.h"
00025 #include "dds/DCPS/transport/framework/TransportReactorTask_rch.h"
00026 #include "dds/DCPS/transport/framework/TransportSendBuffer.h"
00027
00028 #include "dds/DCPS/DataSampleElement.h"
00029 #include "dds/DCPS/DisjointSequence.h"
00030 #include "dds/DCPS/GuidConverter.h"
00031 #include "dds/DCPS/PoolAllocator.h"
00032 #include "dds/DCPS/DiscoveryListener.h"
00033 #include "dds/DCPS/ReactorInterceptor.h"
00034
00035 class DDS_TEST;
00036
00037 namespace OpenDDS {
00038 namespace DCPS {
00039
00040 class RtpsUdpInst;
00041 class RtpsUdpTransport;
00042 class ReceivedDataSample;
00043
00044 class OpenDDS_Rtps_Udp_Export RtpsUdpDataLink : public DataLink {
00045 public:
00046
00047 RtpsUdpDataLink(RtpsUdpTransport* transport,
00048 const GuidPrefix_t& local_prefix,
00049 RtpsUdpInst* config,
00050 TransportReactorTask* reactor_task);
00051
00052 void send_strategy(RtpsUdpSendStrategy* send_strategy);
00053 void receive_strategy(RtpsUdpReceiveStrategy* recv_strategy);
00054 bool add_delayed_notification(TransportQueueElement* element);
00055 void do_remove_sample(const RepoId& pub_id, const TransportQueueElement::MatchCriteria& criteria);
00056 RtpsUdpInst* config();
00057
00058 ACE_Reactor* get_reactor();
00059 bool reactor_is_shut_down();
00060
00061 ACE_SOCK_Dgram& unicast_socket();
00062 ACE_SOCK_Dgram_Mcast& multicast_socket();
00063
00064 bool open(const ACE_SOCK_Dgram& unicast_socket);
00065
00066 void received(const RTPS::DataSubmessage& data,
00067 const GuidPrefix_t& src_prefix);
00068
00069 void received(const RTPS::GapSubmessage& gap, const GuidPrefix_t& src_prefix);
00070
00071 void received(const RTPS::HeartBeatSubmessage& heartbeat,
00072 const GuidPrefix_t& src_prefix);
00073
00074 void received(const RTPS::HeartBeatFragSubmessage& hb_frag,
00075 const GuidPrefix_t& src_prefix);
00076
00077 void received(const RTPS::AckNackSubmessage& acknack,
00078 const GuidPrefix_t& src_prefix);
00079
00080 void received(const RTPS::NackFragSubmessage& nackfrag,
00081 const GuidPrefix_t& src_prefix);
00082
00083 const GuidPrefix_t& local_prefix() const { return local_prefix_; }
00084
00085 void add_locator(const RepoId& remote_id, const ACE_INET_Addr& address,
00086 bool requires_inline_qos);
00087
00088
00089
00090
00091 void get_locators(const RepoId& local_id,
00092 OPENDDS_SET(ACE_INET_Addr)& addrs) const;
00093
00094 ACE_INET_Addr get_locator(const RepoId& remote_id) const;
00095
00096 void associated(const RepoId& local, const RepoId& remote,
00097 bool local_reliable, bool remote_reliable,
00098 bool local_durable, bool remote_durable);
00099
00100 bool check_handshake_complete(const RepoId& local, const RepoId& remote);
00101
00102 void register_for_reader(const RepoId& writerid,
00103 const RepoId& readerid,
00104 const ACE_INET_Addr& address,
00105 OpenDDS::DCPS::DiscoveryListener* listener);
00106
00107 void unregister_for_reader(const RepoId& writerid,
00108 const RepoId& readerid);
00109
00110 void register_for_writer(const RepoId& readerid,
00111 const RepoId& writerid,
00112 const ACE_INET_Addr& address,
00113 OpenDDS::DCPS::DiscoveryListener* listener);
00114
00115 void unregister_for_writer(const RepoId& readerid,
00116 const RepoId& writerid);
00117
00118 virtual void pre_stop_i();
00119
00120 virtual void send_final_acks (const RepoId& readerid);
00121
00122 private:
00123 virtual void stop_i();
00124 virtual void send_i(TransportQueueElement* element, bool relink = true);
00125
00126 virtual TransportQueueElement* customize_queue_element(
00127 TransportQueueElement* element);
00128
00129 virtual void release_remote_i(const RepoId& remote_id);
00130 virtual void release_reservations_i(const RepoId& remote_id,
00131 const RepoId& local_id);
00132
00133 friend class ::DDS_TEST;
00134
00135 static bool force_inline_qos_;
00136 bool requires_inline_qos(const PublicationId& pub_id);
00137
00138 typedef OPENDDS_MAP_CMP(RepoId, OPENDDS_VECTOR(RepoId),GUID_tKeyLessThan) DestToEntityMap;
00139 void add_gap_submsg(RTPS::SubmessageSeq& msg,
00140 const TransportQueueElement& tqe,
00141 const DestToEntityMap& dtem);
00142
00143 RtpsUdpInst* config_;
00144 TransportReactorTask_rch reactor_task_;
00145
00146 RtpsUdpSendStrategy_rch send_strategy_;
00147 RtpsUdpReceiveStrategy_rch recv_strategy_;
00148
00149 GuidPrefix_t local_prefix_;
00150
00151 struct RemoteInfo {
00152 RemoteInfo() : addr_(), requires_inline_qos_(false) {}
00153 RemoteInfo(const ACE_INET_Addr& addr, bool iqos)
00154 : addr_(addr), requires_inline_qos_(iqos) {}
00155 ACE_INET_Addr addr_;
00156 bool requires_inline_qos_;
00157 };
00158 OPENDDS_MAP_CMP(RepoId, RemoteInfo, GUID_tKeyLessThan) locators_;
00159
00160 ACE_SOCK_Dgram unicast_socket_;
00161 ACE_SOCK_Dgram_Mcast multicast_socket_;
00162
00163 RtpsCustomizedElementAllocator rtps_customized_element_allocator_;
00164
00165 struct MultiSendBuffer : TransportSendBuffer {
00166
00167 MultiSendBuffer(RtpsUdpDataLink* outer, size_t capacity)
00168 : TransportSendBuffer(capacity)
00169 , outer_(outer)
00170 {}
00171
00172 void retain_all(RepoId pub_id);
00173 void insert(SequenceNumber sequence,
00174 TransportSendStrategy::QueueType* queue,
00175 ACE_Message_Block* chain);
00176
00177 RtpsUdpDataLink* outer_;
00178
00179 } multi_buff_;
00180
00181
00182
00183
00184 struct ReaderInfo {
00185 CORBA::Long acknack_recvd_count_, nackfrag_recvd_count_;
00186 OPENDDS_VECTOR(RTPS::SequenceNumberSet) requested_changes_;
00187 OPENDDS_MAP(SequenceNumber, RTPS::FragmentNumberSet) requested_frags_;
00188 SequenceNumber cur_cumulative_ack_;
00189 bool handshake_done_, durable_;
00190 OPENDDS_MAP(SequenceNumber, TransportQueueElement*) durable_data_;
00191 ACE_Time_Value durable_timestamp_;
00192
00193 ReaderInfo()
00194 : acknack_recvd_count_(0)
00195 , nackfrag_recvd_count_(0)
00196 , handshake_done_(false)
00197 , durable_(false)
00198 {}
00199 ~ReaderInfo();
00200 void expire_durable_data();
00201 bool expecting_durable_data() const;
00202 };
00203
00204 typedef OPENDDS_MAP_CMP(RepoId, ReaderInfo, GUID_tKeyLessThan) ReaderInfoMap;
00205
00206 struct RtpsWriter {
00207 ReaderInfoMap remote_readers_;
00208 RcHandle<SingleSendBuffer> send_buff_;
00209 SequenceNumber expected_;
00210 typedef OPENDDS_MULTIMAP(SequenceNumber, TransportQueueElement*) SnToTqeMap;
00211 SnToTqeMap elems_not_acked_;
00212
00213 SnToTqeMap to_deliver_;
00214 bool durable_;
00215
00216 RtpsWriter() : durable_(false) {}
00217 ~RtpsWriter();
00218 SequenceNumber heartbeat_high(const ReaderInfo&) const;
00219 void add_elem_awaiting_ack(TransportQueueElement* element);
00220 };
00221
00222 typedef OPENDDS_MAP_CMP(RepoId, RtpsWriter, GUID_tKeyLessThan) RtpsWriterMap;
00223 RtpsWriterMap writers_;
00224
00225 void end_historic_samples(RtpsWriterMap::iterator writer,
00226 const DataSampleHeader& header,
00227 ACE_Message_Block* body);
00228
00229
00230
00231
00232 struct WriterInfo {
00233 DisjointSequence recvd_;
00234 OPENDDS_MAP(SequenceNumber, ReceivedDataSample) held_;
00235 SequenceRange hb_range_;
00236 OPENDDS_MAP(SequenceNumber, RTPS::FragmentNumber_t) frags_;
00237 bool ack_pending_, initial_hb_;
00238 CORBA::Long heartbeat_recvd_count_, hb_frag_recvd_count_,
00239 acknack_count_, nackfrag_count_;
00240
00241 WriterInfo()
00242 : ack_pending_(false), initial_hb_(true), heartbeat_recvd_count_(0),
00243 hb_frag_recvd_count_(0), acknack_count_(0), nackfrag_count_(0) {}
00244
00245 bool should_nack() const;
00246 };
00247
00248 typedef OPENDDS_MAP_CMP(RepoId, WriterInfo, GUID_tKeyLessThan) WriterInfoMap;
00249
00250 struct RtpsReader {
00251 WriterInfoMap remote_writers_;
00252 bool durable_;
00253 bool nack_durable(const WriterInfo& info);
00254 };
00255
00256 typedef OPENDDS_MAP_CMP(RepoId, RtpsReader, GUID_tKeyLessThan) RtpsReaderMap;
00257 RtpsReaderMap readers_;
00258
00259 typedef OPENDDS_MULTIMAP_CMP(RepoId, RtpsReaderMap::iterator, GUID_tKeyLessThan)
00260 RtpsReaderIndex;
00261 RtpsReaderIndex reader_index_;
00262
00263 void deliver_held_data(const RepoId& readerId, WriterInfo& info, bool durable);
00264
00265
00266
00267
00268 mutable ACE_Thread_Mutex lock_;
00269
00270 size_t generate_nack_frags(OPENDDS_VECTOR(RTPS::NackFragSubmessage)& nack_frags,
00271 WriterInfo& wi, const RepoId& pub_id);
00272
00273
00274
00275
00276
00277
00278
00279 static void extend_bitmap_range(RTPS::FragmentNumberSet& fnSet,
00280 CORBA::ULong extent);
00281
00282 bool process_heartbeat_i(const RTPS::HeartBeatSubmessage& heartbeat,
00283 const RepoId& src, RtpsReaderMap::value_type& rr);
00284
00285 bool process_hb_frag_i(const RTPS::HeartBeatFragSubmessage& hb_frag,
00286 const RepoId& src, RtpsReaderMap::value_type& rr);
00287
00288 bool process_gap_i(const RTPS::GapSubmessage& gap, const RepoId& src,
00289 RtpsReaderMap::value_type& rr);
00290
00291 bool process_data_i(const RTPS::DataSubmessage& data, const RepoId& src,
00292 RtpsReaderMap::value_type& rr);
00293
00294 void durability_resend(TransportQueueElement* element);
00295 void send_durability_gaps(const RepoId& writer, const RepoId& reader,
00296 const DisjointSequence& gaps);
00297 ACE_Message_Block* marshal_gaps(const RepoId& writer, const RepoId& reader,
00298 const DisjointSequence& gaps,
00299 bool durable = false);
00300
00301 void send_nackfrag_replies(RtpsWriter& writer, DisjointSequence& gaps,
00302 OPENDDS_SET(ACE_INET_Addr)& gap_recipients);
00303
00304 template<typename T, typename FN>
00305 void datareader_dispatch(const T& submessage, const GuidPrefix_t& src_prefix,
00306 const FN& func)
00307 {
00308 using std::pair;
00309 RepoId local;
00310 std::memcpy(local.guidPrefix, local_prefix_, sizeof(GuidPrefix_t));
00311 local.entityId = submessage.readerId;
00312
00313 RepoId src;
00314 std::memcpy(src.guidPrefix, src_prefix, sizeof(GuidPrefix_t));
00315 src.entityId = submessage.writerId;
00316
00317 bool schedule_timer = false;
00318 ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00319 if (local.entityId == ENTITYID_UNKNOWN) {
00320 for (pair<RtpsReaderIndex::iterator, RtpsReaderIndex::iterator> iters =
00321 reader_index_.equal_range(src);
00322 iters.first != iters.second; ++iters.first) {
00323 schedule_timer |= (this->*func)(submessage, src, *iters.first->second);
00324 }
00325
00326 } else {
00327 const RtpsReaderMap::iterator rr = readers_.find(local);
00328 if (rr == readers_.end()) {
00329 return;
00330 }
00331 schedule_timer = (this->*func)(submessage, src, *rr);
00332 }
00333 g.release();
00334 if (schedule_timer) {
00335 heartbeat_reply_.schedule();
00336 }
00337 }
00338
00339
00340
00341 void send_nack_replies();
00342 void process_acked_by_all_i(ACE_Guard<ACE_Thread_Mutex>& g, const RepoId& pub_id);
00343 void send_heartbeats();
00344 void check_heartbeats();
00345 void send_heartbeats_manual(const TransportSendControlElement* tsce);
00346 void send_heartbeat_replies();
00347
00348 CORBA::Long best_effort_heartbeat_count_;
00349
00350 typedef void (RtpsUdpDataLink::*PMF)();
00351
00352 struct TimedDelay : ACE_Event_Handler {
00353
00354 TimedDelay(RtpsUdpDataLink* outer, PMF function,
00355 const ACE_Time_Value& timeout)
00356 : outer_(outer), function_(function), timeout_(timeout), scheduled_(false)
00357 {}
00358
00359 void schedule();
00360 void cancel();
00361
00362 int handle_timeout(const ACE_Time_Value&, const void*)
00363 {
00364 scheduled_ = false;
00365 (outer_->*function_)();
00366 return 0;
00367 }
00368
00369 RtpsUdpDataLink* outer_;
00370 PMF function_;
00371 ACE_Time_Value timeout_;
00372 bool scheduled_;
00373
00374 } nack_reply_, heartbeat_reply_;
00375
00376 struct HeartBeat : ReactorInterceptor {
00377
00378 explicit HeartBeat(ACE_Reactor* reactor, ACE_thread_t owner, RtpsUdpDataLink* outer, PMF function)
00379 : ReactorInterceptor(reactor, owner)
00380 , outer_(outer)
00381 , function_(function)
00382 , enabled_(false) {}
00383
00384 void schedule_enable()
00385 {
00386 ScheduleEnableCommand c(this);
00387 execute_or_enqueue(c);
00388 }
00389
00390 int handle_timeout(const ACE_Time_Value&, const void*)
00391 {
00392 (outer_->*function_)();
00393 return 0;
00394 }
00395
00396 bool reactor_is_shut_down() const
00397 {
00398 return outer_->reactor_is_shut_down();
00399 }
00400
00401 void enable();
00402 void disable();
00403
00404 RtpsUdpDataLink* outer_;
00405 PMF function_;
00406 bool enabled_;
00407
00408 struct ScheduleEnableCommand : public Command {
00409 ScheduleEnableCommand(HeartBeat* hb)
00410 : heartbeat_(hb)
00411 { }
00412
00413 virtual void execute()
00414 {
00415 heartbeat_->enable();
00416 }
00417
00418 HeartBeat* heartbeat_;
00419 };
00420
00421 } heartbeat_, heartbeatchecker_;
00422
00423
00424 struct InterestingRemote {
00425
00426 RepoId localid;
00427
00428 ACE_INET_Addr address;
00429
00430 DiscoveryListener* listener;
00431
00432 ACE_Time_Value last_activity;
00433
00434 enum { DOES_NOT_EXIST, EXISTS } status;
00435
00436 InterestingRemote() { }
00437 InterestingRemote(const RepoId& w, const ACE_INET_Addr& a, DiscoveryListener* l)
00438 : localid(w)
00439 , address(a)
00440 , listener(l)
00441
00442 , status(DOES_NOT_EXIST)
00443 { }
00444 };
00445 typedef OPENDDS_MULTIMAP_CMP(RepoId, InterestingRemote, DCPS::GUID_tKeyLessThan) InterestingRemoteMapType;
00446 InterestingRemoteMapType interesting_readers_;
00447 InterestingRemoteMapType interesting_writers_;
00448
00449 mutable ACE_Thread_Mutex writer_no_longer_exists_lock_,
00450 reader_no_longer_exists_lock_;
00451
00452 typedef std::pair<RepoId, InterestingRemote> CallbackType;
00453 OPENDDS_VECTOR(CallbackType) writerDoesNotExistCallbacks_;
00454 OPENDDS_VECTOR(CallbackType) readerDoesNotExistCallbacks_;
00455
00456
00457 typedef OPENDDS_MAP_CMP(RepoId, CORBA::Long, DCPS::GUID_tKeyLessThan) HeartBeatCountMapType;
00458 HeartBeatCountMapType heartbeat_counts_;
00459
00460 struct InterestingAckNack {
00461 RepoId writerid;
00462 RepoId readerid;
00463 ACE_INET_Addr writer_address;
00464
00465 InterestingAckNack() { }
00466 InterestingAckNack(const RepoId& w, const RepoId& r, const ACE_INET_Addr& wa)
00467 : writerid(w)
00468 , readerid(r)
00469 , writer_address(wa)
00470 { }
00471
00472 bool operator<(const InterestingAckNack& other) const {
00473 if (writerid != other.writerid) {
00474 return DCPS::GUID_tKeyLessThan() (writerid, other.writerid);
00475 }
00476 return DCPS::GUID_tKeyLessThan() (readerid, other.readerid);
00477 }
00478 };
00479
00480 typedef OPENDDS_SET(InterestingAckNack) InterestingAckNackSetType;
00481 InterestingAckNackSetType interesting_ack_nacks_;
00482
00483 void send_ack_nacks(RtpsReaderMap::iterator rr, bool finalFlag = false);
00484 };
00485
00486 }
00487 }
00488
00489 #ifdef __ACE_INLINE__
00490 # include "RtpsUdpDataLink.inl"
00491 #endif
00492
00493 #endif