00001 /* 00002 * 00003 * 00004 * Distributed under the OpenDDS License. 00005 * See: http://www.opendds.org/license.html 00006 */ 00007 00008 #ifndef OPENDDS_SHMEMTRANSPORT_H 00009 #define OPENDDS_SHMEMTRANSPORT_H 00010 00011 #include "Shmem_Export.h" 00012 00013 #include "ShmemDataLink_rch.h" 00014 #include "ShmemDataLink.h" 00015 #include "dds/DCPS/transport/framework/TransportImpl.h" 00016 00017 #include "dds/DCPS/PoolAllocator.h" 00018 00019 #include <string> 00020 00021 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL 00022 00023 namespace OpenDDS { 00024 namespace DCPS { 00025 00026 class ShmemInst; 00027 00028 class OpenDDS_Shmem_Export ShmemTransport : public TransportImpl { 00029 public: 00030 explicit ShmemTransport(ShmemInst& inst); 00031 00032 // used by our DataLink: 00033 ShmemAllocator* alloc() { return alloc_.get(); } 00034 std::string address(); 00035 void signal_semaphore(); 00036 00037 ShmemInst& config() const; 00038 00039 protected: 00040 virtual AcceptConnectResult connect_datalink(const RemoteTransport& remote, 00041 const ConnectionAttribs& attribs, 00042 const TransportClient_rch& client); 00043 00044 virtual AcceptConnectResult accept_datalink(const RemoteTransport& remote, 00045 const ConnectionAttribs& attribs, 00046 const TransportClient_rch& client); 00047 00048 virtual void stop_accepting_or_connecting(const TransportClient_wrch& client, 00049 const RepoId& remote_id); 00050 00051 bool configure_i(ShmemInst& config); 00052 00053 virtual void shutdown_i(); 00054 00055 virtual bool connection_info_i(TransportLocator& info) const; 00056 00057 virtual void release_datalink(DataLink* link); 00058 00059 virtual std::string transport_type() const { return "shmem"; } 00060 00061 private: 00062 00063 /// Create a new link (using make_datalink) and add it to the map 00064 DataLink_rch add_datalink(const std::string& remote_address); 00065 00066 /// Create the DataLink object and start it 00067 ShmemDataLink_rch make_datalink(const std::string& remote_address); 00068 00069 std::pair<std::string, std::string> blob_to_key(const TransportBLOB& blob); 00070 00071 void read_from_links(); // callback from ReadTask 00072 00073 typedef ACE_Thread_Mutex LockType; 00074 typedef ACE_Guard<LockType> GuardType; 00075 00076 LockType links_lock_; 00077 00078 /// Map of fully associated DataLinks for this transport. Protected 00079 /// by links_lock_. 00080 typedef OPENDDS_MAP(std::string, ShmemDataLink_rch) ShmemDataLinkMap; 00081 ShmemDataLinkMap links_; 00082 00083 unique_ptr<ShmemAllocator> alloc_; 00084 00085 struct ReadTask : ACE_Task_Base { 00086 ReadTask(ShmemTransport* outer, ACE_sema_t semaphore); 00087 int svc(); 00088 void stop(); 00089 00090 ShmemTransport* outer_; 00091 ACE_sema_t semaphore_; 00092 bool stopped_; 00093 00094 }; 00095 unique_ptr<ReadTask> read_task_; 00096 }; 00097 00098 } // namespace DCPS 00099 } // namespace OpenDDS 00100 00101 OPENDDS_END_VERSIONED_NAMESPACE_DECL 00102 00103 #endif /* OPENDDS_SHMEMTRANSPORT_H */