#include <DataLinkSet.h>
Inheritance diagram for OpenDDS::DCPS::DataLinkSet:
Public Types | |
typedef ACE_SYNCH_MUTEX | LockType |
typedef ACE_Guard< LockType > | GuardType |
Public Member Functions | |
DataLinkSet () | |
virtual | ~DataLinkSet () |
int | insert_link (DataLink *link) |
void | remove_link (const DataLink_rch &link) |
void | send (DataSampleElement *sample) |
Send to each DataLink in the set. | |
void | send_control (DataSampleElement *sample) |
Send a control message that is wrapped in a DataSampleElement. | |
SendControlStatus | send_control (RepoId pub_id, TransportSendListener *listener, const DataSampleHeader &header, ACE_Message_Block *msg, TransportSendControlElementAllocator *allocator=0) |
Send control message to each DataLink in the set. | |
void | send_response (RepoId sub_id, const DataSampleHeader &header, ACE_Message_Block *response) |
bool | remove_sample (const DataSampleElement *sample) |
bool | remove_all_msgs (RepoId pub_id) |
void | send_start (DataLinkSet *link_set) |
void | send_stop (RepoId repoId) |
DataLinkSet * | select_links (const RepoId *remoteIds, const CORBA::ULong num_targets) |
bool | empty () |
void | send_final_acks (const RepoId &readerid) |
typedef | OPENDDS_MAP (DataLinkIdType, DataLink_rch) MapType |
LockType & | lock () |
Accessors for external iteration. | |
MapType & | map () |
TransportSendControlElementAllocator & | tsce_allocator () |
Private Member Functions | |
void | copy_map_to (MapType &target) |
lock and copy map for lock-free access | |
Private Attributes | |
MapType | map_ |
Hash map for DataLinks. | |
TransportSendControlElementAllocator | send_control_element_allocator_ |
Allocator for TransportSendControlElement. | |
LockType | lock_ |
SendResponseListener | send_response_listener_ |
Listener for TransportSendControlElements created in send_response. |
Definition at line 27 of file DataLinkSet.h.
typedef ACE_Guard<LockType> OpenDDS::DCPS::DataLinkSet::GuardType |
Definition at line 76 of file DataLinkSet.h.
typedef ACE_SYNCH_MUTEX OpenDDS::DCPS::DataLinkSet::LockType |
Definition at line 75 of file DataLinkSet.h.
OpenDDS::DCPS::DataLinkSet::DataLinkSet | ( | ) |
Definition at line 30 of file DataLinkSet.cpp.
References DBG_ENTRY_LVL, NUM_SEND_CONTROL_ELEMENT_CHUNKS, send_control_element_allocator_, and OpenDDS::DCPS::Transport_debug_level.
Referenced by select_links().
00031 : send_control_element_allocator_(NUM_SEND_CONTROL_ELEMENT_CHUNKS) 00032 , send_response_listener_("DataLinkSet") 00033 { 00034 DBG_ENTRY_LVL("DataLinkSet","DataLinkSet",6); 00035 00036 if (OpenDDS::DCPS::Transport_debug_level > 3) { 00037 ACE_DEBUG((LM_DEBUG, "(%P|%t) DataLinkSet send_control_element_allocator %x with %d chunks\n", 00038 &send_control_element_allocator_, NUM_SEND_CONTROL_ELEMENT_CHUNKS)); 00039 } 00040 }
OpenDDS::DCPS::DataLinkSet::~DataLinkSet | ( | ) | [virtual] |
Definition at line 42 of file DataLinkSet.cpp.
References DBG_ENTRY_LVL.
00043 { 00044 DBG_ENTRY_LVL("DataLinkSet","~DataLinkSet",6); 00045 }
ACE_INLINE void OpenDDS::DCPS::DataLinkSet::copy_map_to | ( | MapType & | target | ) | [private] |
lock and copy map for lock-free access
Definition at line 238 of file DataLinkSet.inl.
References map_.
Referenced by send_control().
00239 { 00240 target.clear(); 00241 00242 // Lock the existing map 00243 GuardType guard(this->lock_); 00244 00245 // Copy to target 00246 for (MapType::iterator itr = map_.begin(); 00247 itr != map_.end(); 00248 ++itr) { 00249 target.insert(*itr); 00250 } 00251 }
bool OpenDDS::DCPS::DataLinkSet::empty | ( | ) |
int OpenDDS::DCPS::DataLinkSet::insert_link | ( | DataLink * | link | ) |
Definition at line 48 of file DataLinkSet.cpp.
References OpenDDS::DCPS::bind(), DBG_ENTRY_LVL, and map_.
Referenced by OpenDDS::DCPS::TransportClient::add_link(), OpenDDS::DCPS::TransportClient::send_control_to(), and OpenDDS::DCPS::TransportClient::send_response().
00049 { 00050 DBG_ENTRY_LVL("DataLinkSet","insert_link",6); 00051 DataLink_rch mylink(link, false); 00052 GuardType guard(this->lock_); 00053 return OpenDDS::DCPS::bind(map_, mylink->id(), mylink); 00054 }
LockType& OpenDDS::DCPS::DataLinkSet::lock | ( | ) | [inline] |
Accessors for external iteration.
Definition at line 82 of file DataLinkSet.h.
00082 { return lock_; }
MapType& OpenDDS::DCPS::DataLinkSet::map | ( | ) | [inline] |
Definition at line 83 of file DataLinkSet.h.
Referenced by OpenDDS::DCPS::TransportClient::transport_detached(), and OpenDDS::DCPS::TransportClient::~TransportClient().
00083 { return map_; }
typedef OpenDDS::DCPS::DataLinkSet::OPENDDS_MAP | ( | DataLinkIdType | , | |
DataLink_rch | ||||
) |
ACE_INLINE bool OpenDDS::DCPS::DataLinkSet::remove_all_msgs | ( | RepoId | pub_id | ) |
Definition at line 181 of file DataLinkSet.inl.
References DBG_ENTRY_LVL, and map_.
Referenced by OpenDDS::DCPS::TransportClient::remove_all_msgs().
00182 { 00183 DBG_ENTRY_LVL("DataLinkSet", "remove_all_msgs", 6); 00184 GuardType guard(this->lock_); 00185 const MapType::iterator end = this->map_.end(); 00186 for (MapType::iterator itr = this->map_.begin(); itr != end; ++itr) { 00187 itr->second->remove_all_msgs(pub_id); 00188 } 00189 00190 return true; 00191 }
void OpenDDS::DCPS::DataLinkSet::remove_link | ( | const DataLink_rch & | link | ) |
Definition at line 57 of file DataLinkSet.cpp.
References DBG_ENTRY_LVL, map_, OpenDDS::DCPS::unbind(), and VDBG.
Referenced by OpenDDS::DCPS::TransportClient::disassociate().
00058 { 00059 DBG_ENTRY_LVL("DataLinkSet", "remove_link", 6); 00060 GuardType guard1(this->lock_); 00061 if (unbind(map_, link->id()) != 0) { 00062 // Just report to the log that we tried. 00063 VDBG((LM_DEBUG, 00064 ACE_TEXT("(%P|%t) DataLinkSet::remove_links: ") 00065 ACE_TEXT("link_id %d not found in map.\n"), 00066 link->id())); 00067 } 00068 }
ACE_INLINE bool OpenDDS::DCPS::DataLinkSet::remove_sample | ( | const DataSampleElement * | sample | ) |
Definition at line 165 of file DataLinkSet.inl.
References DBG_ENTRY_LVL, map_, and OpenDDS::DCPS::REMOVE_RELEASED.
Referenced by OpenDDS::DCPS::TransportClient::remove_sample().
00166 { 00167 DBG_ENTRY_LVL("DataLinkSet", "remove_sample", 6); 00168 GuardType guard(this->lock_); 00169 const MapType::iterator end = this->map_.end(); 00170 for (MapType::iterator itr = this->map_.begin(); itr != end; ++itr) { 00171 00172 if (itr->second->remove_sample(sample) == REMOVE_RELEASED) { 00173 return true; 00174 } 00175 } 00176 00177 return false; 00178 }
OpenDDS::DCPS::DataLinkSet * OpenDDS::DCPS::DataLinkSet::select_links | ( | const RepoId * | remoteIds, | |
const CORBA::ULong | num_targets | |||
) |
Definition at line 71 of file DataLinkSet.cpp.
References OpenDDS::DCPS::RcHandle< T >::_retn(), OpenDDS::DCPS::bind(), DataLinkSet(), DBG_ENTRY_LVL, and map_.
Referenced by OpenDDS::DCPS::TransportClient::send_i().
00073 { 00074 DBG_ENTRY_LVL("DataLinkSet","select_links",6); 00075 00076 DataLinkSet_rch selected_links = new DataLinkSet(); 00077 GuardType guard(this->lock_); 00078 for (MapType::iterator itr = map_.begin(); 00079 itr != map_.end(); 00080 ++itr) { 00081 for (CORBA::ULong i = 0; i < num_targets; ++i) { 00082 if (itr->second->is_target(remoteIds[i])) { 00083 OpenDDS::DCPS::bind(selected_links->map_, 00084 itr->second->id(), itr->second); 00085 break; 00086 } 00087 } 00088 } 00089 00090 return selected_links._retn(); 00091 }
ACE_INLINE void OpenDDS::DCPS::DataLinkSet::send | ( | DataSampleElement * | sample | ) |
Send to each DataLink in the set.
Definition at line 22 of file DataLinkSet.inl.
References OpenDDS::DCPS::DataSampleHeader::add_cfentries(), OpenDDS::DCPS::TransportCustomizedElement::alloc(), OpenDDS::DCPS::TransportSendElement::alloc(), OpenDDS::DCPS::CONTENT_FILTER_FLAG, DBG_ENTRY_LVL, OpenDDS::DCPS::DataSampleElement::get_filter_per_link(), OpenDDS::DCPS::DataSampleElement::get_sample(), OpenDDS::DCPS::DataSampleElement::get_transport_customized_element_allocator(), map_, OpenDDS::DCPS::TransportCustomizedElement::set_msg(), OpenDDS::DCPS::DataSampleHeader::test_flag(), and VDBG_LVL.
00023 { 00024 DBG_ENTRY_LVL("DataLinkSet", "send", 6); 00025 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: DataLinkSet::send element %@.\n", 00026 sample), 5); 00027 GuardType guard(this->lock_); 00028 TransportSendElement* send_element = 00029 TransportSendElement::alloc(static_cast<int>(map_.size()), sample); 00030 00031 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE 00032 const bool customHeader = 00033 DataSampleHeader::test_flag(CONTENT_FILTER_FLAG, sample->get_sample()); 00034 #endif 00035 00036 for (MapType::iterator itr = map_.begin(); itr != map_.end(); ++itr) { 00037 00038 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE 00039 if (customHeader) { 00040 typedef std::map<DataLinkIdType, GUIDSeq_var>::iterator FilterIter; 00041 FilterIter fi = sample->get_filter_per_link().find(itr->first); 00042 GUIDSeq* guids = 0; 00043 if (fi != sample->get_filter_per_link().end()) { 00044 guids = fi->second.ptr(); 00045 } 00046 00047 VDBG_LVL((LM_DEBUG, 00048 "(%P|%t) DBG: DataLink %@ filtering %d subscribers.\n", 00049 itr->second.in(), guids ? guids->length() : 0), 5); 00050 00051 ACE_Message_Block* mb = sample->get_sample()->duplicate(); 00052 00053 DataSampleHeader::add_cfentries(guids, mb); 00054 00055 TransportCustomizedElement* tce = 00056 TransportCustomizedElement::alloc(send_element, false, 00057 sample->get_transport_customized_element_allocator()); 00058 tce->set_msg(mb); // tce now owns ACE_Message_Block chain 00059 00060 itr->second->send(tce); 00061 00062 } else { 00063 #endif // OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE 00064 00065 // Tell the DataLink to send it. 00066 itr->second->send(send_element); 00067 00068 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE 00069 } 00070 #endif 00071 } 00072 }
ACE_INLINE OpenDDS::DCPS::SendControlStatus OpenDDS::DCPS::DataLinkSet::send_control | ( | RepoId | pub_id, | |
TransportSendListener * | listener, | |||
const DataSampleHeader & | header, | |||
ACE_Message_Block * | msg, | |||
TransportSendControlElementAllocator * | allocator = 0 | |||
) |
Send control message to each DataLink in the set.
Definition at line 94 of file DataLinkSet.inl.
References OpenDDS::DCPS::TransportSendControlElement::alloc(), OpenDDS::DCPS::TransportSendListener::control_delivered(), copy_map_to(), DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, header, OPENDDS_STRING, send_control_element_allocator_, OpenDDS::DCPS::SEND_CONTROL_ERROR, and OpenDDS::DCPS::SEND_CONTROL_OK.
00099 { 00100 DBG_ENTRY_LVL("DataLinkSet","send_control",6); 00101 //Optimized - use cached allocator. 00102 00103 MapType dup_map; 00104 copy_map_to(dup_map); 00105 00106 if (dup_map.empty()) { 00107 // similar to the "no links" case in TransportClient::send() 00108 if (DCPS_debug_level > 4) { 00109 const GuidConverter converter(pub_id); 00110 ACE_DEBUG((LM_DEBUG, 00111 ACE_TEXT("(%P|%t) DataLinkSet::send_control: ") 00112 ACE_TEXT("no links for publication %C, ") 00113 ACE_TEXT("not sending control message.\n"), 00114 OPENDDS_STRING(converter).c_str())); 00115 } 00116 listener->control_delivered(msg); 00117 return SEND_CONTROL_OK; 00118 } 00119 00120 TransportSendControlElementAllocator& use_alloc = 00121 allocator ? *allocator : send_control_element_allocator_; 00122 00123 TransportSendControlElement* const send_element = 00124 TransportSendControlElement::alloc(static_cast<int>(dup_map.size()), pub_id, 00125 listener, header, msg, &use_alloc); 00126 if (!send_element) return SEND_CONTROL_ERROR; 00127 00128 for (MapType::iterator itr = dup_map.begin(); 00129 itr != dup_map.end(); 00130 ++itr) { 00131 itr->second->send_start(); 00132 itr->second->send(send_element); 00133 itr->second->send_stop(pub_id); 00134 } 00135 00136 return SEND_CONTROL_OK; 00137 }
ACE_INLINE void OpenDDS::DCPS::DataLinkSet::send_control | ( | DataSampleElement * | sample | ) |
Send a control message that is wrapped in a DataSampleElement.
Definition at line 75 of file DataLinkSet.inl.
References OpenDDS::DCPS::TransportSendControlElement::alloc(), DBG_ENTRY_LVL, map_, send_control_element_allocator_, and VDBG.
Referenced by OpenDDS::DCPS::TransportClient::send_control(), and OpenDDS::DCPS::TransportClient::send_control_to().
00076 { 00077 DBG_ENTRY_LVL("DataLinkSet", "send_control", 6); 00078 VDBG((LM_DEBUG, "(%P|%t) DBG: DataLinkSet::send_control %@.\n", sample)); 00079 GuardType guard(this->lock_); 00080 TransportSendControlElement* send_element = 00081 TransportSendControlElement::alloc(static_cast<int>(map_.size()), sample, 00082 &send_control_element_allocator_); 00083 if (!send_element) { 00084 ACE_ERROR((LM_ERROR, "(%P|%t) DataLinkSet::send_control allocation of " 00085 "TransportSendControlElement failed\n")); 00086 return; 00087 } 00088 for (MapType::iterator itr = map_.begin(); itr != map_.end(); ++itr) { 00089 itr->second->send(send_element); 00090 } 00091 }
ACE_INLINE void OpenDDS::DCPS::DataLinkSet::send_final_acks | ( | const RepoId & | readerid | ) |
Definition at line 254 of file DataLinkSet.inl.
References map_.
Referenced by OpenDDS::DCPS::TransportClient::send_final_acks().
00255 { 00256 GuardType guard(this->lock_); 00257 for (MapType::iterator itr = map_.begin(); 00258 itr != map_.end(); 00259 ++itr) { 00260 itr->second->send_final_acks(readerid); 00261 } 00262 00263 map_.clear(); 00264 }
ACE_INLINE void OpenDDS::DCPS::DataLinkSet::send_response | ( | RepoId | sub_id, | |
const DataSampleHeader & | header, | |||
ACE_Message_Block * | response | |||
) |
Definition at line 140 of file DataLinkSet.inl.
References OpenDDS::DCPS::TransportSendControlElement::alloc(), DBG_ENTRY_LVL, header, map_, send_control_element_allocator_, send_response_listener_, and OpenDDS::DCPS::SendResponseListener::track_message().
Referenced by OpenDDS::DCPS::TransportClient::send_response().
00144 { 00145 DBG_ENTRY_LVL("DataLinkSet","send_response",6); 00146 GuardType guard(this->lock_); 00147 00148 TransportSendControlElement* const send_element = 00149 TransportSendControlElement::alloc(static_cast<int>(map_.size()), pub_id, 00150 &send_response_listener_, header, 00151 response, &send_control_element_allocator_); 00152 if (!send_element) return; 00153 send_response_listener_.track_message(); 00154 00155 for (MapType::iterator itr = map_.begin(); 00156 itr != map_.end(); 00157 ++itr) { 00158 itr->second->send_start(); 00159 itr->second->send(send_element); 00160 itr->second->send_stop(pub_id); 00161 } 00162 }
ACE_INLINE void OpenDDS::DCPS::DataLinkSet::send_start | ( | DataLinkSet * | link_set | ) |
Calls send_start() on the links in link_set and also adds the links from link_set to *this.
Definition at line 194 of file DataLinkSet.inl.
References OpenDDS::DCPS::bind(), DBG_ENTRY_LVL, lock_, and map_.
Referenced by OpenDDS::DCPS::TransportClient::send_i().
00195 { 00196 DBG_ENTRY_LVL("DataLinkSet","send_start",6); 00197 GuardType guard1(this->lock_); 00198 GuardType guard2(link_set->lock_); 00199 for (MapType::iterator itr = link_set->map_.begin(); 00200 itr != link_set->map_.end(); 00201 ++itr) { 00202 // Attempt to add the current DataLink to this set. 00203 int result = OpenDDS::DCPS::bind(map_, itr->first, itr->second); 00204 00205 if (result == 0) { 00206 // We successfully added the current DataLink to this set, 00207 // meaning that it wasn't already a member. We should tell 00208 // the DataLink about the send_start() event. 00209 itr->second->send_start(); 00210 00211 } else if (result == -1) { 00212 ACE_ERROR((LM_ERROR, 00213 "(%P|%t) ERROR: Failed to bind data link into set.\n")); 00214 } 00215 00216 // Note that there is a possibility that the result == 1, which 00217 // means that the DataLink already exists in our map_-> We skip 00218 // all of these cases. 00219 } 00220 }
ACE_INLINE void OpenDDS::DCPS::DataLinkSet::send_stop | ( | RepoId | repoId | ) |
Calls send_stop() on the links with ID repoId and then clears the set.
Definition at line 223 of file DataLinkSet.inl.
References DBG_ENTRY_LVL, and map_.
Referenced by OpenDDS::DCPS::TransportClient::send_i().
00224 { 00225 DBG_ENTRY_LVL("DataLinkSet","send_stop",6); 00226 // Iterate over our map_ and tell each DataLink about the send_stop() event. 00227 GuardType guard(this->lock_); 00228 for (MapType::iterator itr = map_.begin(); 00229 itr != map_.end(); 00230 ++itr) { 00231 itr->second->send_stop(repoId); 00232 } 00233 00234 map_.clear(); 00235 }
TransportSendControlElementAllocator& OpenDDS::DCPS::DataLinkSet::tsce_allocator | ( | ) | [inline] |
Definition at line 86 of file DataLinkSet.h.
Referenced by OpenDDS::DCPS::TransportClient::send_control_to().
00086 { 00087 return send_control_element_allocator_; 00088 }
LockType OpenDDS::DCPS::DataLinkSet::lock_ [private] |
This lock will protect critical sections of code that play a role in the sending of data.
Definition at line 100 of file DataLinkSet.h.
Referenced by send_start().
MapType OpenDDS::DCPS::DataLinkSet::map_ [private] |
Hash map for DataLinks.
Definition at line 93 of file DataLinkSet.h.
Referenced by copy_map_to(), empty(), insert_link(), remove_all_msgs(), remove_link(), remove_sample(), select_links(), send(), send_control(), send_final_acks(), send_response(), send_start(), and send_stop().
TransportSendControlElementAllocator OpenDDS::DCPS::DataLinkSet::send_control_element_allocator_ [private] |
Allocator for TransportSendControlElement.
Definition at line 96 of file DataLinkSet.h.
Referenced by DataLinkSet(), send_control(), and send_response().
Listener for TransportSendControlElements created in send_response.
Definition at line 103 of file DataLinkSet.h.
Referenced by send_response().