00001 /* 00002 * 00003 * 00004 * Distributed under the OpenDDS License. 00005 * See: http://www.opendds.org/license.html 00006 */ 00007 00008 #ifndef OPENDDS_DCPS_DATALINKSET_H 00009 #define OPENDDS_DCPS_DATALINKSET_H 00010 00011 #include "dds/DCPS/dcps_export.h" 00012 #include "dds/DCPS/RcObject_T.h" 00013 #include "dds/DCPS/PoolAllocator.h" 00014 #include "DataLink_rch.h" 00015 #include "SendResponseListener.h" 00016 #include "TransportDefs.h" 00017 #include "TransportSendControlElement.h" 00018 00019 #include "ace/Synch.h" 00020 00021 namespace OpenDDS { 00022 namespace DCPS { 00023 00024 class TransportSendListener; 00025 class DataSampleElement; 00026 00027 class OpenDDS_Dcps_Export DataLinkSet : public RcObject<ACE_SYNCH_MUTEX> { 00028 public: 00029 00030 DataLinkSet(); 00031 virtual ~DataLinkSet(); 00032 00033 // Returns 0 for success, -1 for failure, and 1 for failure due 00034 // to duplicate entry (link is already a member of the set). 00035 int insert_link(DataLink* link); 00036 00037 void remove_link(const DataLink_rch& link); 00038 00039 /// Send to each DataLink in the set. 00040 void send(DataSampleElement* sample); 00041 00042 /// Send a control message that is wrapped in a DataSampleElement 00043 void send_control(DataSampleElement* sample); 00044 00045 /// Send control message to each DataLink in the set. 00046 SendControlStatus send_control(RepoId pub_id, 00047 TransportSendListener* listener, 00048 const DataSampleHeader& header, 00049 ACE_Message_Block* msg, 00050 TransportSendControlElementAllocator* allocator = 0); 00051 00052 void send_response(RepoId sub_id, 00053 const DataSampleHeader& header, 00054 ACE_Message_Block* response); 00055 00056 bool remove_sample(const DataSampleElement* sample); 00057 00058 bool remove_all_msgs(RepoId pub_id); 00059 00060 /// Calls send_start() on the links in link_set and also adds 00061 /// the links from link_set to *this. 00062 void send_start(DataLinkSet* link_set); 00063 00064 /// Calls send_stop() on the links with ID repoId and then 00065 /// clears the set. 00066 void send_stop(RepoId repoId); 00067 00068 DataLinkSet* select_links(const RepoId* remoteIds, 00069 const CORBA::ULong num_targets); 00070 00071 bool empty(); 00072 00073 void send_final_acks(const RepoId& readerid); 00074 00075 typedef ACE_SYNCH_MUTEX LockType; 00076 typedef ACE_Guard<LockType> GuardType; 00077 00078 typedef OPENDDS_MAP(DataLinkIdType, DataLink_rch) MapType; 00079 00080 //{@ 00081 /// Accessors for external iteration 00082 LockType& lock() { return lock_; } 00083 MapType& map() { return map_; } 00084 //@} 00085 00086 TransportSendControlElementAllocator& tsce_allocator() { 00087 return send_control_element_allocator_; 00088 } 00089 00090 private: 00091 00092 /// Hash map for DataLinks. 00093 MapType map_; 00094 00095 /// Allocator for TransportSendControlElement. 00096 TransportSendControlElementAllocator send_control_element_allocator_; 00097 00098 /// This lock will protect critical sections of code that play a 00099 /// role in the sending of data. 00100 LockType lock_; 00101 00102 /// Listener for TransportSendControlElements created in send_response 00103 SendResponseListener send_response_listener_; 00104 00105 /// lock and copy map for lock-free access 00106 void copy_map_to(MapType& target); 00107 }; 00108 00109 } // namespace DCPS 00110 } // namespace OpenDDS 00111 00112 #if defined (__ACE_INLINE__) 00113 #include "DataLinkSet.inl" 00114 #endif /* __ACE_INLINE__ */ 00115 00116 #endif /* OPENDDS_DCPS_DATALINKSET_H */