RtpsUdpDataLink.h

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #ifndef DCPS_RTPSUDPDATALINK_H
00009 #define DCPS_RTPSUDPDATALINK_H
00010 
00011 #include "Rtps_Udp_Export.h"
00012 
00013 #include "RtpsUdpSendStrategy.h"
00014 #include "RtpsUdpSendStrategy_rch.h"
00015 #include "RtpsUdpReceiveStrategy.h"
00016 #include "RtpsUdpReceiveStrategy_rch.h"
00017 #include "RtpsCustomizedElement.h"
00018 
00019 #include "ace/Basic_Types.h"
00020 #include "ace/SOCK_Dgram.h"
00021 #include "ace/SOCK_Dgram_Mcast.h"
00022 
00023 #include "dds/DCPS/transport/framework/DataLink.h"
00024 #include "dds/DCPS/transport/framework/TransportReactorTask.h"
00025 #include "dds/DCPS/transport/framework/TransportReactorTask_rch.h"
00026 #include "dds/DCPS/transport/framework/TransportSendBuffer.h"
00027 
00028 #include "dds/DCPS/DataSampleElement.h"
00029 #include "dds/DCPS/DisjointSequence.h"
00030 #include "dds/DCPS/GuidConverter.h"
00031 #include "dds/DCPS/PoolAllocator.h"
00032 #include "dds/DCPS/DiscoveryListener.h"
00033 #include "dds/DCPS/ReactorInterceptor.h"
00034 
00035 class DDS_TEST;
00036 
00037 namespace OpenDDS {
00038 namespace DCPS {
00039 
00040 class RtpsUdpInst;
00041 class RtpsUdpTransport;
00042 class ReceivedDataSample;
00043 
00044 class OpenDDS_Rtps_Udp_Export RtpsUdpDataLink : public DataLink {
00045 public:
00046 
00047   RtpsUdpDataLink(RtpsUdpTransport* transport,
00048                   const GuidPrefix_t& local_prefix,
00049                   RtpsUdpInst* config,
00050                   TransportReactorTask* reactor_task);
00051 
00052   void send_strategy(RtpsUdpSendStrategy* send_strategy);
00053   void receive_strategy(RtpsUdpReceiveStrategy* recv_strategy);
00054   bool add_delayed_notification(TransportQueueElement* element);
00055   void do_remove_sample(const RepoId& pub_id, const TransportQueueElement::MatchCriteria& criteria);
00056   RtpsUdpInst* config();
00057 
00058   ACE_Reactor* get_reactor();
00059   bool reactor_is_shut_down();
00060 
00061   ACE_SOCK_Dgram& unicast_socket();
00062   ACE_SOCK_Dgram_Mcast& multicast_socket();
00063 
00064   bool open(const ACE_SOCK_Dgram& unicast_socket);
00065 
00066   void received(const RTPS::DataSubmessage& data,
00067                 const GuidPrefix_t& src_prefix);
00068 
00069   void received(const RTPS::GapSubmessage& gap, const GuidPrefix_t& src_prefix);
00070 
00071   void received(const RTPS::HeartBeatSubmessage& heartbeat,
00072                 const GuidPrefix_t& src_prefix);
00073 
00074   void received(const RTPS::HeartBeatFragSubmessage& hb_frag,
00075                 const GuidPrefix_t& src_prefix);
00076 
00077   void received(const RTPS::AckNackSubmessage& acknack,
00078                 const GuidPrefix_t& src_prefix);
00079 
00080   void received(const RTPS::NackFragSubmessage& nackfrag,
00081                 const GuidPrefix_t& src_prefix);
00082 
00083   const GuidPrefix_t& local_prefix() const { return local_prefix_; }
00084 
00085   void add_locator(const RepoId& remote_id, const ACE_INET_Addr& address,
00086                    bool requires_inline_qos);
00087 
00088   /// Given a 'local_id' of a publication or subscription, populate the set of
00089   /// 'addrs' with the network addresses of any remote peers (or if 'local_id'
00090   /// is GUID_UNKNOWN, all known addresses).
00091   void get_locators(const RepoId& local_id,
00092                     OPENDDS_SET(ACE_INET_Addr)& addrs) const;
00093 
00094   ACE_INET_Addr get_locator(const RepoId& remote_id) const;
00095 
00096   void associated(const RepoId& local, const RepoId& remote,
00097                   bool local_reliable, bool remote_reliable,
00098                   bool local_durable, bool remote_durable);
00099 
00100   bool check_handshake_complete(const RepoId& local, const RepoId& remote);
00101 
00102   void register_for_reader(const RepoId& writerid,
00103                            const RepoId& readerid,
00104                            const ACE_INET_Addr& address,
00105                            OpenDDS::DCPS::DiscoveryListener* listener);
00106 
00107   void unregister_for_reader(const RepoId& writerid,
00108                              const RepoId& readerid);
00109 
00110   void register_for_writer(const RepoId& readerid,
00111                            const RepoId& writerid,
00112                            const ACE_INET_Addr& address,
00113                            OpenDDS::DCPS::DiscoveryListener* listener);
00114 
00115   void unregister_for_writer(const RepoId& readerid,
00116                              const RepoId& writerid);
00117 
00118   virtual void pre_stop_i();
00119 
00120   virtual void send_final_acks (const RepoId& readerid);
00121 
00122 private:
00123   virtual void stop_i();
00124   virtual void send_i(TransportQueueElement* element, bool relink = true);
00125 
00126   virtual TransportQueueElement* customize_queue_element(
00127     TransportQueueElement* element);
00128 
00129   virtual void release_remote_i(const RepoId& remote_id);
00130   virtual void release_reservations_i(const RepoId& remote_id,
00131                                       const RepoId& local_id);
00132 
00133   friend class ::DDS_TEST;
00134   /// static member used by testing code to force inline qos
00135   static bool force_inline_qos_;
00136   bool requires_inline_qos(const PublicationId& pub_id);
00137 
00138   typedef OPENDDS_MAP_CMP(RepoId, OPENDDS_VECTOR(RepoId),GUID_tKeyLessThan) DestToEntityMap;
00139   void add_gap_submsg(RTPS::SubmessageSeq& msg,
00140                       const TransportQueueElement& tqe,
00141                       const DestToEntityMap& dtem);
00142 
00143   RtpsUdpInst* config_;
00144   TransportReactorTask_rch reactor_task_;
00145 
00146   RtpsUdpSendStrategy_rch send_strategy_;
00147   RtpsUdpReceiveStrategy_rch recv_strategy_;
00148 
00149   GuidPrefix_t local_prefix_;
00150 
00151   struct RemoteInfo {
00152     RemoteInfo() : addr_(), requires_inline_qos_(false) {}
00153     RemoteInfo(const ACE_INET_Addr& addr, bool iqos)
00154       : addr_(addr), requires_inline_qos_(iqos) {}
00155     ACE_INET_Addr addr_;
00156     bool requires_inline_qos_;
00157   };
00158   OPENDDS_MAP_CMP(RepoId, RemoteInfo, GUID_tKeyLessThan) locators_;
00159 
00160   ACE_SOCK_Dgram unicast_socket_;
00161   ACE_SOCK_Dgram_Mcast multicast_socket_;
00162 
00163   RtpsCustomizedElementAllocator rtps_customized_element_allocator_;
00164 
00165   struct MultiSendBuffer : TransportSendBuffer {
00166 
00167     MultiSendBuffer(RtpsUdpDataLink* outer, size_t capacity)
00168       : TransportSendBuffer(capacity)
00169       , outer_(outer)
00170     {}
00171 
00172     void retain_all(RepoId pub_id);
00173     void insert(SequenceNumber sequence,
00174                 TransportSendStrategy::QueueType* queue,
00175                 ACE_Message_Block* chain);
00176 
00177     RtpsUdpDataLink* outer_;
00178 
00179   } multi_buff_;
00180 
00181 
00182   // RTPS reliability support for local writers:
00183 
00184   struct ReaderInfo {
00185     CORBA::Long acknack_recvd_count_, nackfrag_recvd_count_;
00186     OPENDDS_VECTOR(RTPS::SequenceNumberSet) requested_changes_;
00187     OPENDDS_MAP(SequenceNumber, RTPS::FragmentNumberSet) requested_frags_;
00188     SequenceNumber cur_cumulative_ack_;
00189     bool handshake_done_, durable_;
00190     OPENDDS_MAP(SequenceNumber, TransportQueueElement*) durable_data_;
00191     ACE_Time_Value durable_timestamp_;
00192 
00193     ReaderInfo()
00194       : acknack_recvd_count_(0)
00195       , nackfrag_recvd_count_(0)
00196       , handshake_done_(false)
00197       , durable_(false)
00198     {}
00199     ~ReaderInfo();
00200     void expire_durable_data();
00201     bool expecting_durable_data() const;
00202   };
00203 
00204   typedef OPENDDS_MAP_CMP(RepoId, ReaderInfo, GUID_tKeyLessThan) ReaderInfoMap;
00205 
00206   struct RtpsWriter {
00207     ReaderInfoMap remote_readers_;
00208     RcHandle<SingleSendBuffer> send_buff_;
00209     SequenceNumber expected_;
00210     typedef OPENDDS_MULTIMAP(SequenceNumber, TransportQueueElement*) SnToTqeMap;
00211     SnToTqeMap elems_not_acked_;
00212     //Only accessed with RtpsUdpDataLink lock held
00213     SnToTqeMap to_deliver_;
00214     bool durable_;
00215 
00216     RtpsWriter() : durable_(false) {}
00217     ~RtpsWriter();
00218     SequenceNumber heartbeat_high(const ReaderInfo&) const;
00219     void add_elem_awaiting_ack(TransportQueueElement* element);
00220   };
00221 
00222   typedef OPENDDS_MAP_CMP(RepoId, RtpsWriter, GUID_tKeyLessThan) RtpsWriterMap;
00223   RtpsWriterMap writers_;
00224 
00225   void end_historic_samples(RtpsWriterMap::iterator writer,
00226                             const DataSampleHeader& header,
00227                             ACE_Message_Block* body);
00228 
00229 
00230   // RTPS reliability support for local readers:
00231 
00232   struct WriterInfo {
00233     DisjointSequence recvd_;
00234     OPENDDS_MAP(SequenceNumber, ReceivedDataSample) held_;
00235     SequenceRange hb_range_;
00236     OPENDDS_MAP(SequenceNumber, RTPS::FragmentNumber_t) frags_;
00237     bool ack_pending_, initial_hb_;
00238     CORBA::Long heartbeat_recvd_count_, hb_frag_recvd_count_,
00239       acknack_count_, nackfrag_count_;
00240 
00241     WriterInfo()
00242       : ack_pending_(false), initial_hb_(true), heartbeat_recvd_count_(0),
00243         hb_frag_recvd_count_(0), acknack_count_(0), nackfrag_count_(0) {}
00244 
00245     bool should_nack() const;
00246   };
00247 
00248   typedef OPENDDS_MAP_CMP(RepoId, WriterInfo, GUID_tKeyLessThan) WriterInfoMap;
00249 
00250   struct RtpsReader {
00251     WriterInfoMap remote_writers_;
00252     bool durable_;
00253     bool nack_durable(const WriterInfo& info);
00254   };
00255 
00256   typedef OPENDDS_MAP_CMP(RepoId, RtpsReader, GUID_tKeyLessThan) RtpsReaderMap;
00257   RtpsReaderMap readers_;
00258 
00259   typedef OPENDDS_MULTIMAP_CMP(RepoId, RtpsReaderMap::iterator, GUID_tKeyLessThan)
00260     RtpsReaderIndex;
00261   RtpsReaderIndex reader_index_; // keys are remote data writer GUIDs
00262 
00263   void deliver_held_data(const RepoId& readerId, WriterInfo& info, bool durable);
00264 
00265   /// lock_ protects data structures accessed by both the transport's thread
00266   /// (TransportReactorTask) and an external thread which is responsible
00267   /// for adding/removing associations from the DataLink.
00268   mutable ACE_Thread_Mutex lock_;
00269 
00270   size_t generate_nack_frags(OPENDDS_VECTOR(RTPS::NackFragSubmessage)& nack_frags,
00271                              WriterInfo& wi, const RepoId& pub_id);
00272 
00273   /// Extend the FragmentNumberSet to cover the fragments that are
00274   /// missing from our last known fragment to the extent
00275   /// @param fnSet FragmentNumberSet for the message sequence number
00276   /// in question
00277   /// @param extent is the highest fragement sequence number for this
00278   /// FragmentNumberSet
00279   static void extend_bitmap_range(RTPS::FragmentNumberSet& fnSet,
00280                                   CORBA::ULong extent);
00281 
00282   bool process_heartbeat_i(const RTPS::HeartBeatSubmessage& heartbeat,
00283                            const RepoId& src, RtpsReaderMap::value_type& rr);
00284 
00285   bool process_hb_frag_i(const RTPS::HeartBeatFragSubmessage& hb_frag,
00286                          const RepoId& src, RtpsReaderMap::value_type& rr);
00287 
00288   bool process_gap_i(const RTPS::GapSubmessage& gap, const RepoId& src,
00289                      RtpsReaderMap::value_type& rr);
00290 
00291   bool process_data_i(const RTPS::DataSubmessage& data, const RepoId& src,
00292                       RtpsReaderMap::value_type& rr);
00293 
00294   void durability_resend(TransportQueueElement* element);
00295   void send_durability_gaps(const RepoId& writer, const RepoId& reader,
00296                             const DisjointSequence& gaps);
00297   ACE_Message_Block* marshal_gaps(const RepoId& writer, const RepoId& reader,
00298                                   const DisjointSequence& gaps,
00299                                   bool durable = false);
00300 
00301   void send_nackfrag_replies(RtpsWriter& writer, DisjointSequence& gaps,
00302                              OPENDDS_SET(ACE_INET_Addr)& gap_recipients);
00303 
00304   template<typename T, typename FN>
00305   void datareader_dispatch(const T& submessage, const GuidPrefix_t& src_prefix,
00306                            const FN& func)
00307   {
00308     using std::pair;
00309     RepoId local;
00310     std::memcpy(local.guidPrefix, local_prefix_, sizeof(GuidPrefix_t));
00311     local.entityId = submessage.readerId;
00312 
00313     RepoId src;
00314     std::memcpy(src.guidPrefix, src_prefix, sizeof(GuidPrefix_t));
00315     src.entityId = submessage.writerId;
00316 
00317     bool schedule_timer = false;
00318     ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00319     if (local.entityId == ENTITYID_UNKNOWN) {
00320       for (pair<RtpsReaderIndex::iterator, RtpsReaderIndex::iterator> iters =
00321              reader_index_.equal_range(src);
00322            iters.first != iters.second; ++iters.first) {
00323         schedule_timer |= (this->*func)(submessage, src, *iters.first->second);
00324       }
00325 
00326     } else {
00327       const RtpsReaderMap::iterator rr = readers_.find(local);
00328       if (rr == readers_.end()) {
00329         return;
00330       }
00331       schedule_timer = (this->*func)(submessage, src, *rr);
00332     }
00333     g.release();
00334     if (schedule_timer) {
00335       heartbeat_reply_.schedule();
00336     }
00337   }
00338 
00339 
00340   // Timers for reliability:
00341   void send_nack_replies();
00342   void process_acked_by_all_i(ACE_Guard<ACE_Thread_Mutex>& g, const RepoId& pub_id);
00343   void send_heartbeats();
00344   void check_heartbeats();
00345   void send_heartbeats_manual(const TransportSendControlElement* tsce);
00346   void send_heartbeat_replies();
00347 
00348   CORBA::Long best_effort_heartbeat_count_;
00349 
00350   typedef void (RtpsUdpDataLink::*PMF)();
00351 
00352   struct TimedDelay : ACE_Event_Handler {
00353 
00354     TimedDelay(RtpsUdpDataLink* outer, PMF function,
00355                const ACE_Time_Value& timeout)
00356       : outer_(outer), function_(function), timeout_(timeout), scheduled_(false)
00357     {}
00358 
00359     void schedule();
00360     void cancel();
00361 
00362     int handle_timeout(const ACE_Time_Value&, const void*)
00363     {
00364       scheduled_ = false;
00365       (outer_->*function_)();
00366       return 0;
00367     }
00368 
00369     RtpsUdpDataLink* outer_;
00370     PMF function_;
00371     ACE_Time_Value timeout_;
00372     bool scheduled_;
00373 
00374   } nack_reply_, heartbeat_reply_;
00375 
00376   struct HeartBeat : ReactorInterceptor {
00377 
00378     explicit HeartBeat(ACE_Reactor* reactor, ACE_thread_t owner, RtpsUdpDataLink* outer, PMF function)
00379       : ReactorInterceptor(reactor, owner)
00380       , outer_(outer)
00381       , function_(function)
00382       , enabled_(false) {}
00383 
00384     void schedule_enable()
00385     {
00386       ScheduleEnableCommand c(this);
00387       execute_or_enqueue(c);
00388     }
00389 
00390     int handle_timeout(const ACE_Time_Value&, const void*)
00391     {
00392       (outer_->*function_)();
00393       return 0;
00394     }
00395 
00396     bool reactor_is_shut_down() const
00397     {
00398       return outer_->reactor_is_shut_down();
00399     }
00400 
00401     void enable();
00402     void disable();
00403 
00404     RtpsUdpDataLink* outer_;
00405     PMF function_;
00406     bool enabled_;
00407 
00408     struct ScheduleEnableCommand : public Command {
00409       ScheduleEnableCommand(HeartBeat* hb)
00410         : heartbeat_(hb)
00411       { }
00412 
00413       virtual void execute()
00414       {
00415         heartbeat_->enable();
00416       }
00417 
00418       HeartBeat* heartbeat_;
00419     };
00420 
00421   } heartbeat_, heartbeatchecker_;
00422 
00423   /// Data structure representing an "interesting" remote entity for static discovery.
00424   struct InterestingRemote {
00425     /// id of local entity that is interested in this remote.
00426     RepoId localid;
00427     /// address of this entity
00428     ACE_INET_Addr address;
00429     /// Callback to invoke.
00430     DiscoveryListener* listener;
00431     /// Timestamp indicating the last HeartBeat or AckNack received from the remote entity
00432     ACE_Time_Value last_activity;
00433     /// Current status of the remote entity.
00434     enum { DOES_NOT_EXIST, EXISTS } status;
00435 
00436     InterestingRemote() { }
00437     InterestingRemote(const RepoId& w, const ACE_INET_Addr& a, DiscoveryListener* l)
00438       : localid(w)
00439       , address(a)
00440       , listener(l)
00441       //, heartbeat_count(0)
00442       , status(DOES_NOT_EXIST)
00443     { }
00444   };
00445   typedef OPENDDS_MULTIMAP_CMP(RepoId, InterestingRemote, DCPS::GUID_tKeyLessThan) InterestingRemoteMapType;
00446   InterestingRemoteMapType interesting_readers_;
00447   InterestingRemoteMapType interesting_writers_;
00448 
00449   mutable ACE_Thread_Mutex writer_no_longer_exists_lock_,
00450                            reader_no_longer_exists_lock_;
00451 
00452   typedef std::pair<RepoId, InterestingRemote> CallbackType;
00453   OPENDDS_VECTOR(CallbackType) writerDoesNotExistCallbacks_;
00454   OPENDDS_VECTOR(CallbackType) readerDoesNotExistCallbacks_;
00455 
00456 
00457   typedef OPENDDS_MAP_CMP(RepoId, CORBA::Long, DCPS::GUID_tKeyLessThan) HeartBeatCountMapType;
00458   HeartBeatCountMapType heartbeat_counts_;
00459 
00460   struct InterestingAckNack {
00461     RepoId writerid;
00462     RepoId readerid;
00463     ACE_INET_Addr writer_address;
00464 
00465     InterestingAckNack() { }
00466     InterestingAckNack(const RepoId& w, const RepoId& r, const ACE_INET_Addr& wa)
00467       : writerid(w)
00468       , readerid(r)
00469       , writer_address(wa)
00470     { }
00471 
00472     bool operator<(const InterestingAckNack& other) const {
00473       if (writerid != other.writerid) {
00474         return DCPS::GUID_tKeyLessThan() (writerid, other.writerid);
00475       }
00476       return DCPS::GUID_tKeyLessThan() (readerid, other.readerid);
00477     }
00478   };
00479 
00480   typedef OPENDDS_SET(InterestingAckNack) InterestingAckNackSetType;
00481   InterestingAckNackSetType interesting_ack_nacks_;
00482 
00483   void send_ack_nacks(RtpsReaderMap::iterator rr, bool finalFlag = false);
00484 };
00485 
00486 } // namespace DCPS
00487 } // namespace OpenDDS
00488 
00489 #ifdef __ACE_INLINE__
00490 # include "RtpsUdpDataLink.inl"
00491 #endif  /* __ACE_INLINE__ */
00492 
00493 #endif  /* DCPS_RTPSUDPDATALINK_H */

Generated on Fri Feb 12 20:05:26 2016 for OpenDDS by  doxygen 1.4.7