DataLink.inl
Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008 #include "TransportSendStrategy.h"
00009 #include "TransportStrategy.h"
00010 #include "ThreadPerConnectionSendTask.h"
00011 #include "EntryExit.h"
00012 #include "dds/DCPS/GuidConverter.h"
00013
00014 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00015
00016 namespace OpenDDS {
00017 namespace DCPS {
00018
00019 ACE_INLINE
00020 Priority&
00021 DataLink::transport_priority()
00022 {
00023 return this->transport_priority_;
00024 }
00025
00026 ACE_INLINE
00027 Priority
00028 DataLink::transport_priority() const
00029 {
00030 return this->transport_priority_;
00031 }
00032
00033
00034 ACE_INLINE
00035 bool& DataLink::is_loopback()
00036 {
00037 return this->is_loopback_;
00038 }
00039
00040
00041 ACE_INLINE
00042 bool DataLink::is_loopback() const
00043 {
00044 return this->is_loopback_;
00045 }
00046
00047
00048 ACE_INLINE
00049 bool& DataLink::is_active()
00050 {
00051 return this->is_active_;
00052 }
00053
00054
00055 ACE_INLINE
00056 bool DataLink::is_active() const
00057 {
00058 return this->is_active_;
00059 }
00060
00061 ACE_INLINE const ACE_Time_Value&
00062 DataLink::datalink_release_delay() const
00063 {
00064 return this->datalink_release_delay_;
00065 }
00066
00067 ACE_INLINE void
00068 DataLink::send_start()
00069 {
00070 DBG_ENTRY_LVL("DataLink","send_start",6);
00071
00072 if (this->thr_per_con_send_task_ != 0) {
00073 this->thr_per_con_send_task_->add_request(SEND_START);
00074
00075 } else
00076 this->send_start_i();
00077 }
00078
00079 ACE_INLINE void
00080 DataLink::send_start_i()
00081 {
00082 DBG_ENTRY_LVL("DataLink","send_start_i",6);
00083
00084
00085
00086 TransportSendStrategy_rch strategy;
00087 {
00088 GuardType guard(this->strategy_lock_);
00089
00090 strategy = this->send_strategy_;
00091 }
00092
00093 if (!strategy.is_nil()) {
00094 strategy->send_start();
00095 }
00096 }
00097
00098 ACE_INLINE void
00099 DataLink::send(TransportQueueElement* element)
00100 {
00101 DBG_ENTRY_LVL("DataLink","send",6);
00102
00103 if (element->is_request_ack() &&
00104 this->handle_send_request_ack(element)) {
00105 return;
00106 }
00107
00108 element = this->customize_queue_element(element);
00109 if (!element) {
00110 return;
00111 }
00112
00113 if (this->thr_per_con_send_task_ != 0) {
00114 if (this->thr_per_con_send_task_->add_request(SEND, element) == -1) {
00115 element->data_dropped(true);
00116 }
00117
00118 } else {
00119 this->send_i(element);
00120
00121 }
00122 }
00123
00124 ACE_INLINE void
00125 DataLink::send_i(TransportQueueElement* element, bool relink)
00126 {
00127 DBG_ENTRY_LVL("DataLink","send_i",6);
00128
00129
00130
00131 TransportSendStrategy_rch strategy;
00132 {
00133 GuardType guard(this->strategy_lock_);
00134
00135 strategy = this->send_strategy_;
00136 }
00137
00138 if (strategy) {
00139 strategy->send(element, relink);
00140 } else {
00141 element->data_dropped(true);
00142 }
00143 }
00144
00145 ACE_INLINE void
00146 DataLink::send_stop(RepoId repoId)
00147 {
00148 DBG_ENTRY_LVL("DataLink","send_stop",6);
00149
00150 if (this->thr_per_con_send_task_ != 0) {
00151 this->thr_per_con_send_task_->add_request(SEND_STOP);
00152
00153 } else
00154 this->send_stop_i(repoId);
00155 }
00156
00157 ACE_INLINE void
00158 DataLink::send_stop_i(RepoId repoId)
00159 {
00160 DBG_ENTRY_LVL("DataLink","send_stop_i",6);
00161
00162
00163
00164 TransportSendStrategy_rch strategy;
00165 {
00166 GuardType guard(this->strategy_lock_);
00167
00168 strategy = this->send_strategy_;
00169 }
00170
00171 if (!strategy.is_nil()) {
00172 strategy->send_stop(repoId);
00173 }
00174 }
00175
00176 ACE_INLINE
00177 void DataLink::set_scheduling_release(bool scheduling_release)
00178 {
00179 this->scheduling_release_ = scheduling_release;
00180 }
00181
00182 ACE_INLINE RemoveResult
00183 DataLink::remove_sample(const DataSampleElement* sample, void* context)
00184 {
00185 DBG_ENTRY_LVL("DataLink", "remove_sample", 6);
00186
00187 if (this->thr_per_con_send_task_ != 0) {
00188 const RemoveResult rr = this->thr_per_con_send_task_->remove_sample(sample);
00189 if (rr == REMOVE_RELEASED || rr == REMOVE_FOUND) {
00190 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00191 "Removed sample from ThreadPerConnection queue.\n"));
00192 return rr;
00193 }
00194 }
00195
00196 TransportSendStrategy_rch strategy;
00197 {
00198 GuardType guard(this->strategy_lock_);
00199
00200 strategy = this->send_strategy_;
00201 }
00202
00203 if (!strategy.is_nil()) {
00204 return strategy->remove_sample(sample, context);
00205 }
00206
00207 return REMOVE_NOT_FOUND;
00208 }
00209
00210 ACE_INLINE void
00211 DataLink::remove_all_msgs(RepoId pub_id)
00212 {
00213 DBG_ENTRY_LVL("DataLink","remove_all_msgs",6);
00214
00215
00216
00217
00218 TransportSendStrategy_rch strategy;
00219 {
00220 GuardType guard(this->strategy_lock_);
00221
00222 strategy = this->send_strategy_;
00223 }
00224
00225 if (!strategy.is_nil()) {
00226 strategy->remove_all_msgs(pub_id);
00227 }
00228 }
00229
00230 ACE_INLINE DataLinkIdType
00231 DataLink::id() const
00232 {
00233 DBG_ENTRY_LVL("DataLink","id",6);
00234 return id_;
00235 }
00236
00237 ACE_INLINE int
00238 DataLink::start(const TransportSendStrategy_rch& send_strategy,
00239 const TransportStrategy_rch& receive_strategy)
00240 {
00241 DBG_ENTRY_LVL("DataLink","start",6);
00242
00243
00244
00245
00246
00247
00248 if (send_strategy->start() != 0) {
00249
00250 invoke_on_start_callbacks(false);
00251 return -1;
00252 }
00253
00254 if ((!receive_strategy.is_nil()) && (receive_strategy->start() != 0)) {
00255
00256
00257
00258
00259 send_strategy->stop();
00260 invoke_on_start_callbacks(false);
00261 return -1;
00262 }
00263
00264
00265
00266 {
00267 GuardType guard(this->strategy_lock_);
00268
00269 this->send_strategy_ = send_strategy;
00270 this->receive_strategy_ = receive_strategy;
00271 }
00272 invoke_on_start_callbacks(true);
00273 {
00274
00275
00276
00277 GuardType guard(this->strategy_lock_);
00278 this->started_ = true;
00279 }
00280
00281
00282 invoke_on_start_callbacks(true);
00283 return 0;
00284 }
00285
00286
00287 ACE_INLINE
00288 const char*
00289 DataLink::connection_notice_as_str(ConnectionNotice notice)
00290 {
00291 static const char* NoticeStr[] = { "DISCONNECTED",
00292 "RECONNECTED",
00293 "LOST"
00294 };
00295
00296 return NoticeStr [notice];
00297 }
00298
00299 ACE_INLINE
00300 void
00301 DataLink::terminate_send()
00302 {
00303 this->send_strategy_->terminate_send(false);
00304 }
00305
00306 ACE_INLINE
00307 void
00308 DataLink::remove_listener(const RepoId& local_id)
00309 {
00310 GuardType guard(pub_sub_maps_lock_);
00311 {
00312 IdToSendListenerMap::iterator pos = send_listeners_.find(local_id);
00313 if (pos != send_listeners_.end()) {
00314 send_listeners_.erase(pos);
00315 if (Transport_debug_level > 5) {
00316 GuidConverter converter(local_id);
00317 ACE_DEBUG((LM_DEBUG,
00318 ACE_TEXT("(%P|%t) DataLink::remove_listener: ")
00319 ACE_TEXT("removed %C from send_listeners\n"),
00320 OPENDDS_STRING(converter).c_str()));
00321 }
00322 return;
00323 }
00324 }
00325 {
00326 IdToRecvListenerMap::iterator pos = recv_listeners_.find(local_id);
00327 if (pos != recv_listeners_.end()) {
00328 recv_listeners_.erase(pos);
00329 if (Transport_debug_level > 5) {
00330 GuidConverter converter(local_id);
00331 ACE_DEBUG((LM_DEBUG,
00332 ACE_TEXT("(%P|%t) DataLink::remove_listener: ")
00333 ACE_TEXT("removed %C from recv_listeners\n"),
00334 OPENDDS_STRING(converter).c_str()));
00335 }
00336 return;
00337 }
00338 }
00339 }
00340
00341 ACE_INLINE
00342 TransportSendListener_rch
00343 DataLink::send_listener_for(const RepoId& pub_id) const
00344 {
00345
00346
00347 IdToSendListenerMap::const_iterator found =
00348 this->send_listeners_.find(pub_id);
00349 if (found == this->send_listeners_.end()) {
00350 return TransportSendListener_rch();
00351 }
00352 return found->second.lock();
00353 }
00354
00355 ACE_INLINE
00356 TransportReceiveListener_rch
00357 DataLink::recv_listener_for(const RepoId& sub_id) const
00358 {
00359
00360
00361 IdToRecvListenerMap::const_iterator found =
00362 this->recv_listeners_.find(sub_id);
00363 if (found == this->recv_listeners_.end()) {
00364 return TransportReceiveListener_rch();
00365 }
00366 return found->second.lock();
00367 }
00368
00369 ACE_INLINE
00370 void
00371 DataLink::default_listener(const TransportReceiveListener_wrch& trl)
00372 {
00373 GuardType guard(this->pub_sub_maps_lock_);
00374 this->default_listener_ = trl;
00375 }
00376
00377 ACE_INLINE
00378 TransportReceiveListener_wrch
00379 DataLink::default_listener() const
00380 {
00381 GuardType guard(this->pub_sub_maps_lock_);
00382 return this->default_listener_;
00383 }
00384
00385 ACE_INLINE
00386 void
00387 DataLink::send_final_acks (const RepoId& )
00388 { }
00389
00390 }
00391 }
00392
00393 OPENDDS_END_VERSIONED_NAMESPACE_DECL