#include <DataLinkSet.h>
Public Types | |
typedef ACE_SYNCH_MUTEX | LockType |
typedef ACE_Guard< LockType > | GuardType |
Public Member Functions | |
DataLinkSet () | |
virtual | ~DataLinkSet () |
int | insert_link (const DataLink_rch &link) |
void | remove_link (const DataLink_rch &link) |
void | send (DataSampleElement *sample) |
Send to each DataLink in the set. | |
void | send_control (DataSampleElement *sample) |
Send a control message that is wrapped in a DataSampleElement. | |
SendControlStatus | send_control (RepoId pub_id, const TransportSendListener_rch &listener, const DataSampleHeader &header, Message_Block_Ptr msg) |
Send control message to each DataLink in the set. | |
void | send_response (RepoId sub_id, const DataSampleHeader &header, Message_Block_Ptr response) |
bool | remove_sample (const DataSampleElement *sample) |
bool | remove_all_msgs (RepoId pub_id) |
void | send_start (DataLinkSet *link_set) |
void | send_stop (RepoId repoId) |
DataLinkSet_rch | select_links (const RepoId *remoteIds, const CORBA::ULong num_targets) |
bool | empty () |
void | send_final_acks (const RepoId &readerid) |
typedef | OPENDDS_MAP (DataLinkIdType, DataLink_rch) MapType |
LockType & | lock () |
Accessors for external iteration. | |
MapType & | map () |
Private Member Functions | |
void | copy_map_to (MapType &target) |
lock and copy map for lock-free access | |
Private Attributes | |
MapType | map_ |
Hash map for DataLinks. | |
LockType | lock_ |
SendResponseListener | send_response_listener_ |
Listener for TransportSendControlElements created in send_response. |
Definition at line 30 of file DataLinkSet.h.
Definition at line 78 of file DataLinkSet.h.
typedef ACE_SYNCH_MUTEX OpenDDS::DCPS::DataLinkSet::LockType |
Definition at line 77 of file DataLinkSet.h.
OpenDDS::DCPS::DataLinkSet::DataLinkSet | ( | ) |
Definition at line 30 of file DataLinkSet.cpp.
References DBG_ENTRY_LVL.
00031 : send_response_listener_("DataLinkSet") 00032 { 00033 DBG_ENTRY_LVL("DataLinkSet","DataLinkSet",6); 00034 }
OpenDDS::DCPS::DataLinkSet::~DataLinkSet | ( | ) | [virtual] |
Definition at line 36 of file DataLinkSet.cpp.
References DBG_ENTRY_LVL.
00037 { 00038 DBG_ENTRY_LVL("DataLinkSet","~DataLinkSet",6); 00039 }
ACE_INLINE void OpenDDS::DCPS::DataLinkSet::copy_map_to | ( | MapType & | target | ) | [private] |
lock and copy map for lock-free access
Definition at line 227 of file DataLinkSet.inl.
Referenced by send_control().
00228 { 00229 target.clear(); 00230 00231 // Lock the existing map 00232 GuardType guard(this->lock_); 00233 00234 // Copy to target 00235 for (MapType::iterator itr = map_.begin(); 00236 itr != map_.end(); 00237 ++itr) { 00238 target.insert(*itr); 00239 } 00240 }
bool OpenDDS::DCPS::DataLinkSet::empty | ( | void | ) |
int OpenDDS::DCPS::DataLinkSet::insert_link | ( | const DataLink_rch & | link | ) |
Definition at line 42 of file DataLinkSet.cpp.
References OpenDDS::DCPS::bind(), DBG_ENTRY_LVL, lock_, and map_.
Referenced by OpenDDS::DCPS::TransportClient::add_link(), OpenDDS::DCPS::TransportClient::send_control_to(), and OpenDDS::DCPS::TransportClient::send_response().
00043 { 00044 DBG_ENTRY_LVL("DataLinkSet","insert_link",6); 00045 GuardType guard(this->lock_); 00046 return OpenDDS::DCPS::bind(map_, link->id(), link); 00047 }
LockType& OpenDDS::DCPS::DataLinkSet::lock | ( | void | ) | [inline] |
Accessors for external iteration.
Definition at line 84 of file DataLinkSet.h.
00084 { return lock_; }
MapType& OpenDDS::DCPS::DataLinkSet::map | ( | void | ) | [inline] |
Definition at line 85 of file DataLinkSet.h.
00085 { return map_; }
typedef OpenDDS::DCPS::DataLinkSet::OPENDDS_MAP | ( | DataLinkIdType | , | |
DataLink_rch | ||||
) |
ACE_INLINE bool OpenDDS::DCPS::DataLinkSet::remove_all_msgs | ( | RepoId | pub_id | ) |
Definition at line 170 of file DataLinkSet.inl.
References DBG_ENTRY_LVL, lock_, and map_.
Referenced by OpenDDS::DCPS::TransportClient::remove_all_msgs().
00171 { 00172 DBG_ENTRY_LVL("DataLinkSet", "remove_all_msgs", 6); 00173 GuardType guard(this->lock_); 00174 const MapType::iterator end = this->map_.end(); 00175 for (MapType::iterator itr = this->map_.begin(); itr != end; ++itr) { 00176 itr->second->remove_all_msgs(pub_id); 00177 } 00178 00179 return true; 00180 }
void OpenDDS::DCPS::DataLinkSet::remove_link | ( | const DataLink_rch & | link | ) |
Definition at line 50 of file DataLinkSet.cpp.
References ACE_TEXT(), DBG_ENTRY_LVL, LM_DEBUG, lock_, map_, OpenDDS::DCPS::unbind(), and VDBG.
Referenced by OpenDDS::DCPS::TransportClient::disassociate().
00051 { 00052 DBG_ENTRY_LVL("DataLinkSet", "remove_link", 6); 00053 GuardType guard1(this->lock_); 00054 if (unbind(map_, link->id()) != 0) { 00055 // Just report to the log that we tried. 00056 VDBG((LM_DEBUG, 00057 ACE_TEXT("(%P|%t) DataLinkSet::remove_links: ") 00058 ACE_TEXT("link_id %d not found in map.\n"), 00059 link->id())); 00060 } 00061 }
ACE_INLINE bool OpenDDS::DCPS::DataLinkSet::remove_sample | ( | const DataSampleElement * | sample | ) |
Definition at line 154 of file DataLinkSet.inl.
References DBG_ENTRY_LVL, lock_, map_, and OpenDDS::DCPS::REMOVE_RELEASED.
Referenced by OpenDDS::DCPS::TransportClient::remove_sample().
00155 { 00156 DBG_ENTRY_LVL("DataLinkSet", "remove_sample", 6); 00157 GuardType guard(this->lock_); 00158 const MapType::iterator end = this->map_.end(); 00159 for (MapType::iterator itr = this->map_.begin(); itr != end; ++itr) { 00160 00161 if (itr->second->remove_sample(sample, 0) == REMOVE_RELEASED) { 00162 return true; 00163 } 00164 } 00165 00166 return false; 00167 }
OpenDDS::DCPS::DataLinkSet_rch OpenDDS::DCPS::DataLinkSet::select_links | ( | const RepoId * | remoteIds, | |
const CORBA::ULong | num_targets | |||
) |
Definition at line 64 of file DataLinkSet.cpp.
References OpenDDS::DCPS::bind(), DBG_ENTRY_LVL, lock_, and map_.
Referenced by OpenDDS::DCPS::TransportClient::send_i().
00066 { 00067 DBG_ENTRY_LVL("DataLinkSet","select_links",6); 00068 00069 DataLinkSet_rch selected_links ( make_rch<DataLinkSet>() ); 00070 GuardType guard(this->lock_); 00071 for (MapType::iterator itr = map_.begin(); 00072 itr != map_.end(); 00073 ++itr) { 00074 for (CORBA::ULong i = 0; i < num_targets; ++i) { 00075 if (itr->second->is_target(remoteIds[i])) { 00076 OpenDDS::DCPS::bind(selected_links->map_, 00077 itr->second->id(), itr->second); 00078 break; 00079 } 00080 } 00081 } 00082 00083 return selected_links; 00084 }
ACE_INLINE void OpenDDS::DCPS::DataLinkSet::send | ( | DataSampleElement * | sample | ) |
Send to each DataLink in the set.
Definition at line 22 of file DataLinkSet.inl.
References OpenDDS::DCPS::DataSampleHeader::add_cfentries(), OpenDDS::DCPS::CONTENT_FILTER_FLAG, DBG_ENTRY_LVL, OpenDDS::DCPS::DataSampleElement::get_filter_per_link(), OpenDDS::DCPS::DataSampleElement::get_sample(), LM_DEBUG, lock_, map_, OpenDDS::DCPS::move(), OpenDDS::DCPS::DataSampleHeader::test_flag(), and VDBG_LVL.
00023 { 00024 DBG_ENTRY_LVL("DataLinkSet", "send", 6); 00025 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: DataLinkSet::send element %@.\n", 00026 sample), 5); 00027 GuardType guard(this->lock_); 00028 00029 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE 00030 const bool customHeader = 00031 DataSampleHeader::test_flag(CONTENT_FILTER_FLAG, sample->get_sample()); 00032 #endif 00033 00034 TransportSendElement* send_element = new TransportSendElement(static_cast<int>(map_.size()), sample); 00035 00036 for (MapType::iterator itr = map_.begin(); itr != map_.end(); ++itr) { 00037 00038 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE 00039 if (customHeader) { 00040 typedef std::map<DataLinkIdType, GUIDSeq_var>::iterator FilterIter; 00041 FilterIter fi = sample->get_filter_per_link().find(itr->first); 00042 GUIDSeq* guids = 0; 00043 if (fi != sample->get_filter_per_link().end()) { 00044 guids = fi->second.ptr(); 00045 } 00046 00047 VDBG_LVL((LM_DEBUG, 00048 "(%P|%t) DBG: DataLink %@ filtering %d subscribers.\n", 00049 itr->second.in(), guids ? guids->length() : 0), 5); 00050 00051 Message_Block_Ptr mb (send_element->msg()->duplicate()); 00052 00053 DataSampleHeader::add_cfentries(guids, mb.get()); 00054 00055 TransportCustomizedElement* tce = 00056 new TransportCustomizedElement(send_element, false); 00057 tce->set_msg(move(mb)); // tce now owns ACE_Message_Block chain 00058 00059 itr->second->send(tce); 00060 00061 } else { 00062 #endif // OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE 00063 00064 // Tell the DataLink to send it. 00065 itr->second->send(send_element); 00066 00067 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE 00068 } 00069 #endif 00070 } 00071 }
ACE_INLINE OpenDDS::DCPS::SendControlStatus OpenDDS::DCPS::DataLinkSet::send_control | ( | RepoId | pub_id, | |
const TransportSendListener_rch & | listener, | |||
const DataSampleHeader & | header, | |||
Message_Block_Ptr | msg | |||
) |
Send control message to each DataLink in the set.
Definition at line 88 of file DataLinkSet.inl.
References ACE_TEXT(), copy_map_to(), DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::RcHandle< T >::in(), LM_DEBUG, OpenDDS::DCPS::move(), OPENDDS_STRING, and OpenDDS::DCPS::SEND_CONTROL_OK.
00092 { 00093 DBG_ENTRY_LVL("DataLinkSet","send_control",6); 00094 //Optimized - use cached allocator. 00095 00096 MapType dup_map; 00097 copy_map_to(dup_map); 00098 00099 if (dup_map.empty()) { 00100 // similar to the "no links" case in TransportClient::send() 00101 if (DCPS_debug_level > 4) { 00102 const GuidConverter converter(pub_id); 00103 ACE_DEBUG((LM_DEBUG, 00104 ACE_TEXT("(%P|%t) DataLinkSet::send_control: ") 00105 ACE_TEXT("no links for publication %C, ") 00106 ACE_TEXT("not sending control message.\n"), 00107 OPENDDS_STRING(converter).c_str())); 00108 } 00109 listener->control_delivered(msg); 00110 return SEND_CONTROL_OK; 00111 } 00112 00113 TransportSendControlElement* const send_element = 00114 new TransportSendControlElement(static_cast<int>(dup_map.size()), pub_id, 00115 listener.in(), header, move(msg)); 00116 00117 for (MapType::iterator itr = dup_map.begin(); 00118 itr != dup_map.end(); 00119 ++itr) { 00120 itr->second->send_start(); 00121 itr->second->send(send_element); 00122 itr->second->send_stop(pub_id); 00123 } 00124 00125 return SEND_CONTROL_OK; 00126 }
ACE_INLINE void OpenDDS::DCPS::DataLinkSet::send_control | ( | DataSampleElement * | sample | ) |
Send a control message that is wrapped in a DataSampleElement.
Definition at line 74 of file DataLinkSet.inl.
References DBG_ENTRY_LVL, LM_DEBUG, lock_, map_, and VDBG.
Referenced by OpenDDS::DCPS::TransportClient::send_control(), and OpenDDS::DCPS::TransportClient::send_control_to().
00075 { 00076 DBG_ENTRY_LVL("DataLinkSet", "send_control", 6); 00077 VDBG((LM_DEBUG, "(%P|%t) DBG: DataLinkSet::send_control %@.\n", sample)); 00078 GuardType guard(this->lock_); 00079 TransportSendControlElement* send_element = 00080 new TransportSendControlElement(static_cast<int>(map_.size()), sample); 00081 00082 for (MapType::iterator itr = map_.begin(); itr != map_.end(); ++itr) { 00083 itr->second->send(send_element); 00084 } 00085 }
ACE_INLINE void OpenDDS::DCPS::DataLinkSet::send_final_acks | ( | const RepoId & | readerid | ) |
Definition at line 243 of file DataLinkSet.inl.
Referenced by OpenDDS::DCPS::TransportClient::send_final_acks().
00244 { 00245 GuardType guard(this->lock_); 00246 for (MapType::iterator itr = map_.begin(); 00247 itr != map_.end(); 00248 ++itr) { 00249 itr->second->send_final_acks(readerid); 00250 } 00251 00252 map_.clear(); 00253 }
ACE_INLINE void OpenDDS::DCPS::DataLinkSet::send_response | ( | RepoId | sub_id, | |
const DataSampleHeader & | header, | |||
Message_Block_Ptr | response | |||
) |
Definition at line 129 of file DataLinkSet.inl.
References DBG_ENTRY_LVL, lock_, map_, OpenDDS::DCPS::move(), send_response_listener_, and OpenDDS::DCPS::SendResponseListener::track_message().
Referenced by OpenDDS::DCPS::TransportClient::send_response().
00133 { 00134 DBG_ENTRY_LVL("DataLinkSet","send_response",6); 00135 GuardType guard(this->lock_); 00136 00137 TransportSendControlElement* const send_element = 00138 new TransportSendControlElement(static_cast<int>(map_.size()), pub_id, 00139 &send_response_listener_, header, 00140 move(response)); 00141 if (!send_element) return; 00142 send_response_listener_.track_message(); 00143 00144 for (MapType::iterator itr = map_.begin(); 00145 itr != map_.end(); 00146 ++itr) { 00147 itr->second->send_start(); 00148 itr->second->send(send_element); 00149 itr->second->send_stop(pub_id); 00150 } 00151 }
ACE_INLINE void OpenDDS::DCPS::DataLinkSet::send_start | ( | DataLinkSet * | link_set | ) |
Calls send_start() on the links in link_set and also adds the links from link_set to *this.
Definition at line 183 of file DataLinkSet.inl.
References OpenDDS::DCPS::bind(), DBG_ENTRY_LVL, LM_ERROR, lock_, and map_.
Referenced by OpenDDS::DCPS::TransportClient::send_i().
00184 { 00185 DBG_ENTRY_LVL("DataLinkSet","send_start",6); 00186 GuardType guard1(this->lock_); 00187 GuardType guard2(link_set->lock_); 00188 for (MapType::iterator itr = link_set->map_.begin(); 00189 itr != link_set->map_.end(); 00190 ++itr) { 00191 // Attempt to add the current DataLink to this set. 00192 int result = OpenDDS::DCPS::bind(map_, itr->first, itr->second); 00193 00194 if (result == 0) { 00195 // We successfully added the current DataLink to this set, 00196 // meaning that it wasn't already a member. We should tell 00197 // the DataLink about the send_start() event. 00198 itr->second->send_start(); 00199 00200 } else if (result == -1) { 00201 ACE_ERROR((LM_ERROR, 00202 "(%P|%t) ERROR: Failed to bind data link into set.\n")); 00203 } 00204 00205 // Note that there is a possibility that the result == 1, which 00206 // means that the DataLink already exists in our map_-> We skip 00207 // all of these cases. 00208 } 00209 }
ACE_INLINE void OpenDDS::DCPS::DataLinkSet::send_stop | ( | RepoId | repoId | ) |
Calls send_stop() on the links with ID repoId and then clears the set.
Definition at line 212 of file DataLinkSet.inl.
References DBG_ENTRY_LVL, lock_, and map_.
Referenced by OpenDDS::DCPS::TransportClient::send_i().
00213 { 00214 DBG_ENTRY_LVL("DataLinkSet","send_stop",6); 00215 // Iterate over our map_ and tell each DataLink about the send_stop() event. 00216 GuardType guard(this->lock_); 00217 for (MapType::iterator itr = map_.begin(); 00218 itr != map_.end(); 00219 ++itr) { 00220 itr->second->send_stop(repoId); 00221 } 00222 00223 map_.clear(); 00224 }
LockType OpenDDS::DCPS::DataLinkSet::lock_ [private] |
This lock will protect critical sections of code that play a role in the sending of data.
Definition at line 95 of file DataLinkSet.h.
Referenced by copy_map_to(), empty(), insert_link(), remove_all_msgs(), remove_link(), remove_sample(), select_links(), send(), send_control(), send_final_acks(), send_response(), send_start(), and send_stop().
MapType OpenDDS::DCPS::DataLinkSet::map_ [private] |
Hash map for DataLinks.
Definition at line 91 of file DataLinkSet.h.
Referenced by copy_map_to(), empty(), insert_link(), remove_all_msgs(), remove_link(), remove_sample(), select_links(), send(), send_control(), send_final_acks(), send_response(), send_start(), and send_stop().
Listener for TransportSendControlElements created in send_response.
Definition at line 98 of file DataLinkSet.h.
Referenced by send_response().