00001 /* 00002 * 00003 * 00004 * Distributed under the OpenDDS License. 00005 * See: http://www.opendds.org/license.html 00006 */ 00007 00008 #ifndef OPENDDS_DCPS_DATALINKSET_H 00009 #define OPENDDS_DCPS_DATALINKSET_H 00010 00011 #include "dds/DCPS/dcps_export.h" 00012 #include "dds/DCPS/RcObject.h" 00013 #include "dds/DCPS/PoolAllocator.h" 00014 #include "DataLink_rch.h" 00015 #include "SendResponseListener.h" 00016 #include "TransportDefs.h" 00017 #include "TransportSendControlElement.h" 00018 #include "ace/Synch_Traits.h" 00019 00020 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL 00021 00022 namespace OpenDDS { 00023 namespace DCPS { 00024 00025 class TransportSendListener; 00026 class DataSampleElement; 00027 class DataLinkSet; 00028 typedef RcHandle<DataLinkSet> DataLinkSet_rch; 00029 00030 class OpenDDS_Dcps_Export DataLinkSet : public RcObject { 00031 public: 00032 00033 DataLinkSet(); 00034 virtual ~DataLinkSet(); 00035 00036 // Returns 0 for success, -1 for failure, and 1 for failure due 00037 // to duplicate entry (link is already a member of the set). 00038 int insert_link(const DataLink_rch& link); 00039 00040 void remove_link(const DataLink_rch& link); 00041 00042 /// Send to each DataLink in the set. 00043 void send(DataSampleElement* sample); 00044 00045 /// Send a control message that is wrapped in a DataSampleElement 00046 void send_control(DataSampleElement* sample); 00047 00048 /// Send control message to each DataLink in the set. 00049 SendControlStatus send_control(RepoId pub_id, 00050 const TransportSendListener_rch& listener, 00051 const DataSampleHeader& header, 00052 Message_Block_Ptr msg); 00053 00054 void send_response(RepoId sub_id, 00055 const DataSampleHeader& header, 00056 Message_Block_Ptr response); 00057 00058 bool remove_sample(const DataSampleElement* sample); 00059 00060 bool remove_all_msgs(RepoId pub_id); 00061 00062 /// Calls send_start() on the links in link_set and also adds 00063 /// the links from link_set to *this. 00064 void send_start(DataLinkSet* link_set); 00065 00066 /// Calls send_stop() on the links with ID repoId and then 00067 /// clears the set. 00068 void send_stop(RepoId repoId); 00069 00070 DataLinkSet_rch select_links(const RepoId* remoteIds, 00071 const CORBA::ULong num_targets); 00072 00073 bool empty(); 00074 00075 void send_final_acks(const RepoId& readerid); 00076 00077 typedef ACE_SYNCH_MUTEX LockType; 00078 typedef ACE_Guard<LockType> GuardType; 00079 00080 typedef OPENDDS_MAP(DataLinkIdType, DataLink_rch) MapType; 00081 00082 //{@ 00083 /// Accessors for external iteration 00084 LockType& lock() { return lock_; } 00085 MapType& map() { return map_; } 00086 //@} 00087 00088 private: 00089 00090 /// Hash map for DataLinks. 00091 MapType map_; 00092 00093 /// This lock will protect critical sections of code that play a 00094 /// role in the sending of data. 00095 LockType lock_; 00096 00097 /// Listener for TransportSendControlElements created in send_response 00098 SendResponseListener send_response_listener_; 00099 00100 /// lock and copy map for lock-free access 00101 void copy_map_to(MapType& target); 00102 }; 00103 00104 } // namespace DCPS 00105 } // namespace OpenDDS 00106 00107 OPENDDS_END_VERSIONED_NAMESPACE_DECL 00108 00109 #if defined (__ACE_INLINE__) 00110 #include "DataLinkSet.inl" 00111 #endif /* __ACE_INLINE__ */ 00112 00113 #endif /* OPENDDS_DCPS_DATALINKSET_H */