RtpsUdpDataLink.h

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #ifndef DCPS_RTPSUDPDATALINK_H
00009 #define DCPS_RTPSUDPDATALINK_H
00010 
00011 #include "Rtps_Udp_Export.h"
00012 
00013 #include "RtpsUdpSendStrategy.h"
00014 #include "RtpsUdpSendStrategy_rch.h"
00015 #include "RtpsUdpReceiveStrategy.h"
00016 #include "RtpsUdpReceiveStrategy_rch.h"
00017 #include "RtpsCustomizedElement.h"
00018 
00019 #include "ace/Basic_Types.h"
00020 #include "ace/SOCK_Dgram.h"
00021 #include "ace/SOCK_Dgram_Mcast.h"
00022 
00023 #include "dds/DCPS/transport/framework/DataLink.h"
00024 #include "dds/DCPS/transport/framework/TransportReactorTask.h"
00025 #include "dds/DCPS/transport/framework/TransportReactorTask_rch.h"
00026 #include "dds/DCPS/transport/framework/TransportSendBuffer.h"
00027 
00028 #include "dds/DCPS/DataSampleElement.h"
00029 #include "dds/DCPS/DisjointSequence.h"
00030 #include "dds/DCPS/GuidConverter.h"
00031 #include "dds/DCPS/PoolAllocator.h"
00032 #include "dds/DCPS/DiscoveryListener.h"
00033 #include "dds/DCPS/ReactorInterceptor.h"
00034 #include "dds/DCPS/RcEventHandler.h"
00035 
00036 #if defined(OPENDDS_SECURITY)
00037 #include "dds/DdsSecurityCoreC.h"
00038 #include "dds/DCPS/security/framework/SecurityConfig_rch.h"
00039 #endif
00040 
00041 class DDS_TEST;
00042 
00043 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00044 
00045 namespace OpenDDS {
00046 namespace DCPS {
00047 
00048 class RtpsUdpInst;
00049 class RtpsUdpTransport;
00050 class ReceivedDataSample;
00051 typedef RcHandle<RtpsUdpInst> RtpsUdpInst_rch;
00052 typedef RcHandle<RtpsUdpTransport> RtpsUdpTransport_rch;
00053 
00054 class OpenDDS_Rtps_Udp_Export RtpsUdpDataLink : public DataLink {
00055 public:
00056 
00057   RtpsUdpDataLink(RtpsUdpTransport& transport,
00058                   const GuidPrefix_t& local_prefix,
00059                   const RtpsUdpInst& config,
00060                   const TransportReactorTask_rch& reactor_task);
00061 
00062   bool add_delayed_notification(TransportQueueElement* element);
00063 
00064   void do_remove_sample(const RepoId& pub_id,
00065                         const TransportQueueElement::MatchCriteria& criteria,
00066                         ACE_Guard<ACE_Thread_Mutex>& guard);
00067 
00068   RtpsUdpInst& config() const;
00069 
00070   ACE_Reactor* get_reactor();
00071   bool reactor_is_shut_down();
00072 
00073   ACE_SOCK_Dgram& unicast_socket();
00074   ACE_SOCK_Dgram_Mcast& multicast_socket();
00075 
00076   bool open(const ACE_SOCK_Dgram& unicast_socket);
00077 
00078   void received(const RTPS::DataSubmessage& data,
00079                 const GuidPrefix_t& src_prefix);
00080 
00081   void received(const RTPS::GapSubmessage& gap, const GuidPrefix_t& src_prefix);
00082 
00083   void received(const RTPS::HeartBeatSubmessage& heartbeat,
00084                 const GuidPrefix_t& src_prefix);
00085 
00086   void received(const RTPS::HeartBeatFragSubmessage& hb_frag,
00087                 const GuidPrefix_t& src_prefix);
00088 
00089   void received(const RTPS::AckNackSubmessage& acknack,
00090                 const GuidPrefix_t& src_prefix);
00091 
00092   void received(const RTPS::NackFragSubmessage& nackfrag,
00093                 const GuidPrefix_t& src_prefix);
00094 
00095   const GuidPrefix_t& local_prefix() const { return local_prefix_; }
00096 
00097   void add_locator(const RepoId& remote_id, const ACE_INET_Addr& address,
00098                    bool requires_inline_qos);
00099 
00100   /// Given a 'local_id' of a publication or subscription, populate the set of
00101   /// 'addrs' with the network addresses of any remote peers (or if 'local_id'
00102   /// is GUID_UNKNOWN, all known addresses).
00103   void get_locators(const RepoId& local_id,
00104                     OPENDDS_SET(ACE_INET_Addr)& addrs) const;
00105 
00106   ACE_INET_Addr get_locator(const RepoId& remote_id) const;
00107 
00108   void associated(const RepoId& local, const RepoId& remote,
00109                   bool local_reliable, bool remote_reliable,
00110                   bool local_durable, bool remote_durable);
00111 
00112   bool check_handshake_complete(const RepoId& local, const RepoId& remote);
00113 
00114   void register_for_reader(const RepoId& writerid,
00115                            const RepoId& readerid,
00116                            const ACE_INET_Addr& address,
00117                            OpenDDS::DCPS::DiscoveryListener* listener);
00118 
00119   void unregister_for_reader(const RepoId& writerid,
00120                              const RepoId& readerid);
00121 
00122   void register_for_writer(const RepoId& readerid,
00123                            const RepoId& writerid,
00124                            const ACE_INET_Addr& address,
00125                            OpenDDS::DCPS::DiscoveryListener* listener);
00126 
00127   void unregister_for_writer(const RepoId& readerid,
00128                              const RepoId& writerid);
00129 
00130   virtual void pre_stop_i();
00131 
00132   virtual void send_final_acks (const RepoId& readerid);
00133 
00134 #if defined(OPENDDS_SECURITY)
00135   Security::SecurityConfig_rch security_config() const
00136   { return security_config_; }
00137 
00138   DDS::Security::ParticipantCryptoHandle local_crypto_handle() const;
00139   void local_crypto_handle(DDS::Security::ParticipantCryptoHandle pch);
00140 
00141   DDS::Security::ParticipantCryptoHandle peer_crypto_handle(const RepoId& peer) const;
00142   DDS::Security::DatawriterCryptoHandle writer_crypto_handle(const RepoId& writer) const;
00143   DDS::Security::DatareaderCryptoHandle reader_crypto_handle(const RepoId& reader) const;
00144 
00145   void populate_security_handles(const RepoId& local_id, const RepoId& remote_id,
00146                                  const unsigned char* buffer,
00147                                  unsigned int buffer_size);
00148 #endif
00149 
00150 private:
00151   virtual void stop_i();
00152   virtual void send_i(TransportQueueElement* element, bool relink = true);
00153   RemoveResult remove_sample(const DataSampleElement* sample, void* context);
00154 
00155   virtual TransportQueueElement* customize_queue_element(
00156     TransportQueueElement* element);
00157 
00158   virtual void release_remote_i(const RepoId& remote_id);
00159   virtual void release_reservations_i(const RepoId& remote_id,
00160                                       const RepoId& local_id);
00161 
00162   friend class ::DDS_TEST;
00163   /// static member used by testing code to force inline qos
00164   static bool force_inline_qos_;
00165   bool requires_inline_qos(const GUIDSeq_var & peers);
00166 
00167   typedef OPENDDS_MAP_CMP(RepoId, OPENDDS_VECTOR(RepoId),GUID_tKeyLessThan) DestToEntityMap;
00168   void add_gap_submsg(RTPS::SubmessageSeq& msg,
00169                       const TransportQueueElement& tqe,
00170                       const DestToEntityMap& dtem);
00171 
00172   TransportReactorTask_rch reactor_task_;
00173 
00174   RtpsUdpSendStrategy* send_strategy();
00175   RtpsUdpReceiveStrategy* receive_strategy();
00176 
00177   GuidPrefix_t local_prefix_;
00178 
00179   struct RemoteInfo {
00180     RemoteInfo() : addr_(), requires_inline_qos_(false) {}
00181     RemoteInfo(const ACE_INET_Addr& addr, bool iqos)
00182       : addr_(addr), requires_inline_qos_(iqos) {}
00183     ACE_INET_Addr addr_;
00184     bool requires_inline_qos_;
00185   };
00186   OPENDDS_MAP_CMP(RepoId, RemoteInfo, GUID_tKeyLessThan) locators_;
00187 
00188   ACE_SOCK_Dgram unicast_socket_;
00189   ACE_SOCK_Dgram_Mcast multicast_socket_;
00190 
00191   struct MultiSendBuffer : TransportSendBuffer {
00192 
00193     MultiSendBuffer(RtpsUdpDataLink* outer, size_t capacity)
00194       : TransportSendBuffer(capacity)
00195       , outer_(outer)
00196     {}
00197 
00198     void retain_all(RepoId pub_id);
00199     void insert(SequenceNumber sequence,
00200                 TransportSendStrategy::QueueType* queue,
00201                 ACE_Message_Block* chain);
00202 
00203     RtpsUdpDataLink* outer_;
00204 
00205   } multi_buff_;
00206 
00207 
00208   // RTPS reliability support for local writers:
00209 
00210   struct ReaderInfo {
00211     CORBA::Long acknack_recvd_count_, nackfrag_recvd_count_;
00212     OPENDDS_VECTOR(RTPS::SequenceNumberSet) requested_changes_;
00213     OPENDDS_MAP(SequenceNumber, RTPS::FragmentNumberSet) requested_frags_;
00214     SequenceNumber cur_cumulative_ack_;
00215     bool handshake_done_, durable_;
00216     OPENDDS_MAP(SequenceNumber, TransportQueueElement*) durable_data_;
00217     ACE_Time_Value durable_timestamp_;
00218 
00219     ReaderInfo()
00220       : acknack_recvd_count_(0)
00221       , nackfrag_recvd_count_(0)
00222       , handshake_done_(false)
00223       , durable_(false)
00224     {}
00225     ~ReaderInfo();
00226     void expire_durable_data();
00227     bool expecting_durable_data() const;
00228   };
00229 
00230   typedef OPENDDS_MAP_CMP(RepoId, ReaderInfo, GUID_tKeyLessThan) ReaderInfoMap;
00231 
00232   struct RtpsWriter {
00233     ReaderInfoMap remote_readers_;
00234     RcHandle<SingleSendBuffer> send_buff_;
00235     SequenceNumber expected_;
00236     typedef OPENDDS_MULTIMAP(SequenceNumber, TransportQueueElement*) SnToTqeMap;
00237     SnToTqeMap elems_not_acked_;
00238     //Only accessed with RtpsUdpDataLink lock held
00239     SnToTqeMap to_deliver_;
00240     bool durable_;
00241 
00242     RtpsWriter() : durable_(false) {}
00243     ~RtpsWriter();
00244     SequenceNumber heartbeat_high(const ReaderInfo&) const;
00245     void add_elem_awaiting_ack(TransportQueueElement* element);
00246   };
00247 
00248   typedef OPENDDS_MAP_CMP(RepoId, RtpsWriter, GUID_tKeyLessThan) RtpsWriterMap;
00249   RtpsWriterMap writers_;
00250 
00251   void end_historic_samples(RtpsWriterMap::iterator writer,
00252                             const DataSampleHeader& header,
00253                             ACE_Message_Block* body);
00254 
00255 
00256   // RTPS reliability support for local readers:
00257 
00258   struct WriterInfo {
00259     DisjointSequence recvd_;
00260     OPENDDS_MAP(SequenceNumber, ReceivedDataSample) held_;
00261     SequenceRange hb_range_;
00262     OPENDDS_MAP(SequenceNumber, RTPS::FragmentNumber_t) frags_;
00263     bool ack_pending_, initial_hb_;
00264     CORBA::Long heartbeat_recvd_count_, hb_frag_recvd_count_,
00265       acknack_count_, nackfrag_count_;
00266 
00267     WriterInfo()
00268       : ack_pending_(false), initial_hb_(true), heartbeat_recvd_count_(0),
00269         hb_frag_recvd_count_(0), acknack_count_(0), nackfrag_count_(0) {}
00270 
00271     bool should_nack() const;
00272   };
00273 
00274   typedef OPENDDS_MAP_CMP(RepoId, WriterInfo, GUID_tKeyLessThan) WriterInfoMap;
00275 
00276   struct RtpsReader {
00277     WriterInfoMap remote_writers_;
00278     bool durable_;
00279     bool nack_durable(const WriterInfo& info);
00280   };
00281 
00282   typedef OPENDDS_MAP_CMP(RepoId, RtpsReader, GUID_tKeyLessThan) RtpsReaderMap;
00283   RtpsReaderMap readers_;
00284 
00285   typedef OPENDDS_MULTIMAP_CMP(RepoId, RtpsReaderMap::iterator, GUID_tKeyLessThan)
00286     RtpsReaderIndex;
00287   RtpsReaderIndex reader_index_; // keys are remote data writer GUIDs
00288 
00289   void deliver_held_data(const RepoId& readerId, WriterInfo& info, bool durable);
00290 
00291   /// lock_ protects data structures accessed by both the transport's thread
00292   /// (TransportReactorTask) and an external thread which is responsible
00293   /// for adding/removing associations from the DataLink.
00294   mutable ACE_Thread_Mutex lock_;
00295 
00296   size_t generate_nack_frags(OPENDDS_VECTOR(RTPS::NackFragSubmessage)& nack_frags,
00297                              WriterInfo& wi, const RepoId& pub_id);
00298 
00299   /// Extend the FragmentNumberSet to cover the fragments that are
00300   /// missing from our last known fragment to the extent
00301   /// @param fnSet FragmentNumberSet for the message sequence number
00302   /// in question
00303   /// @param extent is the highest fragment sequence number for this
00304   /// FragmentNumberSet
00305   static void extend_bitmap_range(RTPS::FragmentNumberSet& fnSet,
00306                                   CORBA::ULong extent);
00307 
00308   bool process_heartbeat_i(const RTPS::HeartBeatSubmessage& heartbeat,
00309                            const RepoId& src, RtpsReaderMap::value_type& rr);
00310 
00311   bool process_hb_frag_i(const RTPS::HeartBeatFragSubmessage& hb_frag,
00312                          const RepoId& src, RtpsReaderMap::value_type& rr);
00313 
00314   bool process_gap_i(const RTPS::GapSubmessage& gap, const RepoId& src,
00315                      RtpsReaderMap::value_type& rr);
00316 
00317   bool process_data_i(const RTPS::DataSubmessage& data, const RepoId& src,
00318                       RtpsReaderMap::value_type& rr);
00319 
00320   void durability_resend(TransportQueueElement* element);
00321   void send_durability_gaps(const RepoId& writer, const RepoId& reader,
00322                             const DisjointSequence& gaps);
00323   ACE_Message_Block* marshal_gaps(const RepoId& writer, const RepoId& reader,
00324                                   const DisjointSequence& gaps,
00325                                   bool durable = false);
00326 
00327   void send_nackfrag_replies(RtpsWriter& writer, DisjointSequence& gaps,
00328                              OPENDDS_SET(ACE_INET_Addr)& gap_recipients);
00329 
00330   template<typename T, typename FN>
00331   void datareader_dispatch(const T& submessage, const GuidPrefix_t& src_prefix,
00332                            const FN& func)
00333   {
00334     using std::pair;
00335     RepoId local;
00336     std::memcpy(local.guidPrefix, local_prefix_, sizeof(GuidPrefix_t));
00337     local.entityId = submessage.readerId;
00338 
00339     RepoId src;
00340     std::memcpy(src.guidPrefix, src_prefix, sizeof(GuidPrefix_t));
00341     src.entityId = submessage.writerId;
00342 
00343     bool schedule_timer = false;
00344     ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00345     if (local.entityId == ENTITYID_UNKNOWN) {
00346       for (pair<RtpsReaderIndex::iterator, RtpsReaderIndex::iterator> iters =
00347              reader_index_.equal_range(src);
00348            iters.first != iters.second; ++iters.first) {
00349         schedule_timer |= (this->*func)(submessage, src, *iters.first->second);
00350       }
00351 
00352     } else {
00353       const RtpsReaderMap::iterator rr = readers_.find(local);
00354       if (rr == readers_.end()) {
00355         return;
00356       }
00357       schedule_timer = (this->*func)(submessage, src, *rr);
00358     }
00359     g.release();
00360     if (schedule_timer) {
00361       heartbeat_reply_.schedule();
00362     }
00363   }
00364 
00365   void send_nack_replies();
00366   void send_directed_nack_replies(const RepoId& writerId, RtpsWriter& writer,
00367                                   const RepoId& readerId, ReaderInfo& reader);
00368   void process_requested_changes(DisjointSequence& requests,
00369                                  const RtpsWriter& writer,
00370                                  const ReaderInfo& reader);
00371   void process_acked_by_all_i(ACE_Guard<ACE_Thread_Mutex>& g, const RepoId& pub_id);
00372   void send_heartbeats();
00373   void send_directed_heartbeats(OPENDDS_VECTOR(RTPS::HeartBeatSubmessage)& hbs);
00374   void check_heartbeats();
00375   void send_heartbeats_manual(const TransportSendControlElement* tsce);
00376   void send_heartbeat_replies();
00377 
00378   CORBA::Long best_effort_heartbeat_count_;
00379 
00380   typedef void (RtpsUdpDataLink::*PMF)();
00381 
00382   struct TimedDelay : ACE_Event_Handler {
00383 
00384     TimedDelay(RtpsUdpDataLink* outer, PMF function,
00385                const ACE_Time_Value& timeout)
00386       : outer_(outer), function_(function), timeout_(timeout), scheduled_(false)
00387     {}
00388 
00389     void schedule();
00390     void cancel();
00391 
00392     int handle_timeout(const ACE_Time_Value&, const void*)
00393     {
00394       scheduled_ = false;
00395       (outer_->*function_)();
00396       return 0;
00397     }
00398 
00399     RtpsUdpDataLink* outer_;
00400     PMF function_;
00401     ACE_Time_Value timeout_;
00402     bool scheduled_;
00403 
00404   } nack_reply_, heartbeat_reply_;
00405 
00406   struct HeartBeat : ReactorInterceptor {
00407 
00408     explicit HeartBeat(ACE_Reactor* reactor, ACE_thread_t owner, RtpsUdpDataLink* outer, PMF function)
00409       : ReactorInterceptor(reactor, owner)
00410       , outer_(outer)
00411       , function_(function)
00412       , enabled_(false) {}
00413 
00414     void schedule_enable()
00415     {
00416       ScheduleEnableCommand c(this);
00417       execute_or_enqueue(c);
00418     }
00419 
00420     int handle_timeout(const ACE_Time_Value&, const void*)
00421     {
00422       (outer_->*function_)();
00423       return 0;
00424     }
00425 
00426     bool reactor_is_shut_down() const
00427     {
00428       return outer_->reactor_is_shut_down();
00429     }
00430 
00431     void enable();
00432     void disable();
00433 
00434     RtpsUdpDataLink* outer_;
00435     PMF function_;
00436     bool enabled_;
00437 
00438     struct ScheduleEnableCommand : public Command {
00439       ScheduleEnableCommand(HeartBeat* hb)
00440         : heartbeat_(hb)
00441       { }
00442 
00443       virtual void execute()
00444       {
00445         heartbeat_->enable();
00446       }
00447 
00448       HeartBeat* heartbeat_;
00449     };
00450 
00451   };
00452 
00453   RcHandle<HeartBeat> heartbeat_, heartbeatchecker_;
00454 
00455   /// Data structure representing an "interesting" remote entity for static discovery.
00456   struct InterestingRemote {
00457     /// id of local entity that is interested in this remote.
00458     RepoId localid;
00459     /// address of this entity
00460     ACE_INET_Addr address;
00461     /// Callback to invoke.
00462     DiscoveryListener* listener;
00463     /// Timestamp indicating the last HeartBeat or AckNack received from the remote entity
00464     ACE_Time_Value last_activity;
00465     /// Current status of the remote entity.
00466     enum { DOES_NOT_EXIST, EXISTS } status;
00467 
00468     InterestingRemote() { }
00469     InterestingRemote(const RepoId& w, const ACE_INET_Addr& a, DiscoveryListener* l)
00470       : localid(w)
00471       , address(a)
00472       , listener(l)
00473       //, heartbeat_count(0)
00474       , status(DOES_NOT_EXIST)
00475     { }
00476   };
00477   typedef OPENDDS_MULTIMAP_CMP(RepoId, InterestingRemote, DCPS::GUID_tKeyLessThan) InterestingRemoteMapType;
00478   InterestingRemoteMapType interesting_readers_;
00479   InterestingRemoteMapType interesting_writers_;
00480 
00481   typedef std::pair<RepoId, InterestingRemote> CallbackType;
00482 
00483 
00484   typedef OPENDDS_MAP_CMP(RepoId, CORBA::Long, DCPS::GUID_tKeyLessThan) HeartBeatCountMapType;
00485   HeartBeatCountMapType heartbeat_counts_;
00486 
00487   struct InterestingAckNack {
00488     RepoId writerid;
00489     RepoId readerid;
00490     ACE_INET_Addr writer_address;
00491 
00492     InterestingAckNack() { }
00493     InterestingAckNack(const RepoId& w, const RepoId& r, const ACE_INET_Addr& wa)
00494       : writerid(w)
00495       , readerid(r)
00496       , writer_address(wa)
00497     { }
00498 
00499     bool operator<(const InterestingAckNack& other) const {
00500       if (writerid != other.writerid) {
00501         return DCPS::GUID_tKeyLessThan() (writerid, other.writerid);
00502       }
00503       return DCPS::GUID_tKeyLessThan() (readerid, other.readerid);
00504     }
00505   };
00506 
00507   typedef OPENDDS_SET(InterestingAckNack) InterestingAckNackSetType;
00508   InterestingAckNackSetType interesting_ack_nacks_;
00509 
00510   void send_ack_nacks(RtpsReaderMap::iterator rr, bool finalFlag = false);
00511 
00512   class HeldDataDeliveryHandler : public RcEventHandler {
00513   public:
00514     HeldDataDeliveryHandler(RtpsUdpDataLink* link)
00515       : link_(link) {
00516       }
00517 
00518       //Reactor invokes this after being notified in schedule_stop or cancel_release
00519     int handle_exception(ACE_HANDLE /* fd */);
00520 
00521     void notify_delivery(const RepoId& readerId, WriterInfo& info);
00522 
00523     virtual ACE_Event_Handler::Reference_Count add_reference();
00524     virtual ACE_Event_Handler::Reference_Count remove_reference();
00525   private:
00526     RtpsUdpDataLink* link_;
00527     typedef std::pair<ReceivedDataSample, RepoId> HeldDataEntry;
00528     typedef OPENDDS_VECTOR(HeldDataEntry) HeldData;
00529     HeldData held_data_;
00530   };
00531   HeldDataDeliveryHandler held_data_delivery_handler_;
00532 
00533 #if defined(OPENDDS_SECURITY)
00534   Security::SecurityConfig_rch security_config_;
00535   DDS::Security::ParticipantCryptoHandle local_crypto_handle_;
00536 
00537   typedef OPENDDS_MAP_CMP(RepoId, DDS::Security::NativeCryptoHandle,
00538                           GUID_tKeyLessThan) PeerHandlesMap;
00539   PeerHandlesMap peer_crypto_handles_;
00540 
00541   typedef OPENDDS_MAP_CMP(RepoId, DDS::Security::NativeCryptoHandle,
00542                           GUID_tKeyLessThan)::const_iterator PeerHandlesCIter;
00543 #endif
00544 
00545 };
00546 
00547 } // namespace DCPS
00548 } // namespace OpenDDS
00549 
00550 OPENDDS_END_VERSIONED_NAMESPACE_DECL
00551 
00552 #ifdef __ACE_INLINE__
00553 # include "RtpsUdpDataLink.inl"
00554 #endif  /* __ACE_INLINE__ */
00555 
00556 #endif  /* DCPS_RTPSUDPDATALINK_H */
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1