00001 /* 00002 * 00003 * 00004 * Distributed under the OpenDDS License. 00005 * See: http://www.opendds.org/license.html 00006 */ 00007 00008 #ifndef DCPS_MULTICASTDATALINK_H 00009 #define DCPS_MULTICASTDATALINK_H 00010 00011 #include "Multicast_Export.h" 00012 00013 #include "MulticastInst.h" 00014 #include "MulticastSendStrategy.h" 00015 #include "MulticastSendStrategy_rch.h" 00016 #include "MulticastReceiveStrategy.h" 00017 #include "MulticastReceiveStrategy_rch.h" 00018 #include "MulticastSession_rch.h" 00019 #include "MulticastSessionFactory.h" 00020 #include "MulticastSessionFactory_rch.h" 00021 #include "MulticastTransport.h" 00022 #include "MulticastTypes.h" 00023 00024 #include "dds/DCPS/DisjointSequence.h" 00025 #include "dds/DCPS/PoolAllocator.h" 00026 00027 #include "dds/DCPS/transport/framework/DataLink.h" 00028 #include "dds/DCPS/transport/framework/TransportReactorTask.h" 00029 #include "dds/DCPS/transport/framework/TransportSendBuffer.h" 00030 00031 #include "ace/SOCK_Dgram_Mcast.h" 00032 #include "ace/Synch_Traits.h" 00033 00034 namespace OpenDDS { 00035 namespace DCPS { 00036 00037 class MulticastTransport; 00038 00039 class OpenDDS_Multicast_Export MulticastDataLink 00040 : public DataLink { 00041 public: 00042 MulticastDataLink(MulticastTransport* transport, 00043 MulticastSessionFactory* session_factory, 00044 MulticastPeer local_peer, 00045 bool is_active); 00046 virtual ~MulticastDataLink(); 00047 00048 MulticastTransport* transport(); 00049 00050 MulticastPeer local_peer() const; 00051 00052 void configure(MulticastInst* config, 00053 TransportReactorTask* reactor_task); 00054 00055 void send_strategy(MulticastSendStrategy* send_strategy); 00056 MulticastSendStrategy* send_strategy(); 00057 00058 void receive_strategy(MulticastReceiveStrategy* recv_strategy); 00059 MulticastReceiveStrategy* receive_strategy(); 00060 00061 SingleSendBuffer* send_buffer(); 00062 00063 MulticastInst* config(); 00064 00065 TransportReactorTask* reactor_task(); 00066 ACE_Reactor* get_reactor(); 00067 ACE_Proactor* get_proactor(); 00068 00069 ACE_SOCK_Dgram_Mcast& socket(); 00070 00071 bool join(const ACE_INET_Addr& group_address); 00072 00073 MulticastSession* find_or_create_session(MulticastPeer remote_peer); 00074 MulticastSession* find_session(MulticastPeer remote_peer); 00075 00076 bool check_header(const TransportHeader& header); 00077 bool check_header(const DataSampleHeader& header); 00078 void sample_received(ReceivedDataSample& sample); 00079 00080 bool reassemble(ReceivedDataSample& data, const TransportHeader& header); 00081 00082 private: 00083 MulticastTransport* transport_; 00084 00085 MulticastSessionFactory_rch session_factory_; 00086 00087 MulticastPeer local_peer_; 00088 00089 MulticastInst* config_; 00090 00091 TransportReactorTask* reactor_task_; 00092 00093 MulticastSendStrategy_rch send_strategy_; 00094 MulticastReceiveStrategy_rch recv_strategy_; 00095 00096 SingleSendBuffer* send_buffer_; 00097 00098 ACE_SOCK_Dgram_Mcast socket_; 00099 00100 ACE_SYNCH_RECURSIVE_MUTEX session_lock_; 00101 00102 typedef OPENDDS_MAP(MulticastPeer, MulticastSession_rch) MulticastSessionMap; 00103 MulticastSessionMap sessions_; 00104 00105 virtual void stop_i(); 00106 00107 void syn_received_no_session(MulticastPeer source, ACE_Message_Block* data, 00108 bool swap_bytes); 00109 00110 void release_remote_i(const RepoId& remote); 00111 RepoIdSet readers_selected_, readers_withheld_; 00112 bool ready_to_deliver(const ReceivedDataSample& data); 00113 }; 00114 00115 } // namespace DCPS 00116 } // namespace OpenDDS 00117 00118 #ifdef __ACE_INLINE__ 00119 # include "MulticastDataLink.inl" 00120 #endif /* __ACE_INLINE__ */ 00121 00122 #endif /* DCPS_MULTICASTDATALINK_H */