00001
00002
00003
00004
00005
00006
00007
00008 #include "TransportSendStrategy.h"
00009 #include "TransportStrategy.h"
00010 #include "ThreadPerConnectionSendTask.h"
00011 #include "EntryExit.h"
00012 #include "dds/DCPS/GuidConverter.h"
00013
00014
00015 namespace OpenDDS {
00016 namespace DCPS {
00017
00018 ACE_INLINE
00019 bool
00020 DataLink::issues_on_deleted_callback() const
00021 {
00022 return false;
00023 }
00024
00025 ACE_INLINE
00026 Priority&
00027 DataLink::transport_priority()
00028 {
00029 return this->transport_priority_;
00030 }
00031
00032 ACE_INLINE
00033 Priority
00034 DataLink::transport_priority() const
00035 {
00036 return this->transport_priority_;
00037 }
00038
00039
00040 ACE_INLINE
00041 bool& DataLink::is_loopback()
00042 {
00043 return this->is_loopback_;
00044 }
00045
00046
00047 ACE_INLINE
00048 bool DataLink::is_loopback() const
00049 {
00050 return this->is_loopback_;
00051 }
00052
00053
00054 ACE_INLINE
00055 bool& DataLink::is_active()
00056 {
00057 return this->is_active_;
00058 }
00059
00060
00061 ACE_INLINE
00062 bool DataLink::is_active() const
00063 {
00064 return this->is_active_;
00065 }
00066
00067 ACE_INLINE const ACE_Time_Value&
00068 DataLink::datalink_release_delay() const
00069 {
00070 return this->datalink_release_delay_;
00071 }
00072
00073 ACE_INLINE void
00074 DataLink::send_start()
00075 {
00076 DBG_ENTRY_LVL("DataLink","send_start",6);
00077
00078 if (this->thr_per_con_send_task_ != 0) {
00079 this->thr_per_con_send_task_->add_request(SEND_START);
00080
00081 } else
00082 this->send_start_i();
00083 }
00084
00085 ACE_INLINE void
00086 DataLink::send_start_i()
00087 {
00088 DBG_ENTRY_LVL("DataLink","send_start_i",6);
00089
00090
00091
00092 TransportSendStrategy_rch strategy;
00093 {
00094 GuardType guard(this->strategy_lock_);
00095
00096 strategy = this->send_strategy_;
00097 }
00098
00099 if (!strategy.is_nil()) {
00100 strategy->send_start();
00101 }
00102 }
00103
00104 ACE_INLINE void
00105 DataLink::send(TransportQueueElement* element)
00106 {
00107 DBG_ENTRY_LVL("DataLink","send",6);
00108
00109 element = this->customize_queue_element(element);
00110 if (!element) {
00111 return;
00112 }
00113
00114 if (this->thr_per_con_send_task_ != 0) {
00115 this->thr_per_con_send_task_->add_request(SEND, element);
00116
00117 } else {
00118 this->send_i(element);
00119
00120 }
00121 }
00122
00123 ACE_INLINE void
00124 DataLink::send_i(TransportQueueElement* element, bool relink)
00125 {
00126 DBG_ENTRY_LVL("DataLink","send_i",6);
00127
00128
00129
00130 TransportSendStrategy_rch strategy;
00131 {
00132 GuardType guard(this->strategy_lock_);
00133
00134 strategy = this->send_strategy_;
00135 }
00136
00137 if (!strategy.is_nil()) {
00138 strategy->send(element, relink);
00139 }
00140 }
00141
00142 ACE_INLINE void
00143 DataLink::send_stop(RepoId repoId)
00144 {
00145 DBG_ENTRY_LVL("DataLink","send_stop",6);
00146
00147 if (this->thr_per_con_send_task_ != 0) {
00148 this->thr_per_con_send_task_->add_request(SEND_STOP);
00149
00150 } else
00151 this->send_stop_i(repoId);
00152 }
00153
00154 ACE_INLINE void
00155 DataLink::send_stop_i(RepoId repoId)
00156 {
00157 DBG_ENTRY_LVL("DataLink","send_stop_i",6);
00158
00159
00160
00161 TransportSendStrategy_rch strategy = 0;
00162 {
00163 GuardType guard(this->strategy_lock_);
00164
00165 strategy = this->send_strategy_;
00166 }
00167
00168 if (!strategy.is_nil()) {
00169 strategy->send_stop(repoId);
00170 }
00171 }
00172
00173 ACE_INLINE
00174 void DataLink::set_scheduling_release(bool scheduling_release)
00175 {
00176 this->scheduling_release_ = scheduling_release;
00177 }
00178
00179 ACE_INLINE RemoveResult
00180 DataLink::remove_sample(const DataSampleElement* sample)
00181 {
00182 DBG_ENTRY_LVL("DataLink", "remove_sample", 6);
00183
00184 if (this->thr_per_con_send_task_ != 0) {
00185 const RemoveResult rr = this->thr_per_con_send_task_->remove_sample(sample);
00186 if (rr == REMOVE_RELEASED || rr == REMOVE_FOUND) {
00187 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00188 "Removed sample from ThreadPerConnection queue.\n"));
00189 return rr;
00190 }
00191 }
00192
00193 TransportSendStrategy_rch strategy;
00194 {
00195 GuardType guard(this->strategy_lock_);
00196
00197 strategy = this->send_strategy_;
00198 }
00199
00200 if (!strategy.is_nil()) {
00201 return strategy->remove_sample(sample);
00202 }
00203
00204 return REMOVE_NOT_FOUND;
00205 }
00206
00207 ACE_INLINE void
00208 DataLink::remove_all_msgs(RepoId pub_id)
00209 {
00210 DBG_ENTRY_LVL("DataLink","remove_all_msgs",6);
00211
00212
00213
00214
00215 TransportSendStrategy_rch strategy;
00216 {
00217 GuardType guard(this->strategy_lock_);
00218
00219 strategy = this->send_strategy_;
00220 }
00221
00222 if (!strategy.is_nil()) {
00223 strategy->remove_all_msgs(pub_id);
00224 }
00225 }
00226
00227 ACE_INLINE DataLinkIdType
00228 DataLink::id() const
00229 {
00230 DBG_ENTRY_LVL("DataLink","id",6);
00231 return id_;
00232 }
00233
00234 ACE_INLINE int
00235 DataLink::start(const TransportSendStrategy_rch& send_strategy,
00236 const TransportStrategy_rch& receive_strategy)
00237 {
00238 DBG_ENTRY_LVL("DataLink","start",6);
00239
00240
00241
00242
00243
00244
00245 if (send_strategy->start() != 0) {
00246
00247 invoke_on_start_callbacks(false);
00248 return -1;
00249 }
00250
00251 if ((!receive_strategy.is_nil()) && (receive_strategy->start() != 0)) {
00252
00253
00254
00255
00256 send_strategy->stop();
00257 invoke_on_start_callbacks(false);
00258 return -1;
00259 }
00260
00261
00262
00263 {
00264 GuardType guard(this->strategy_lock_);
00265
00266 this->send_strategy_ = send_strategy;
00267 this->receive_strategy_ = receive_strategy;
00268 }
00269 invoke_on_start_callbacks(true);
00270 {
00271
00272
00273
00274 GuardType guard(this->strategy_lock_);
00275 this->started_ = true;
00276 }
00277
00278
00279 invoke_on_start_callbacks(true);
00280 return 0;
00281 }
00282
00283 ACE_INLINE
00284 bool
00285 DataLink::add_on_start_callback(TransportClient* client, const RepoId& remote)
00286 {
00287 GuardType guard(strategy_lock_);
00288
00289 if (started_ && !send_strategy_.is_nil()) {
00290 return false;
00291 }
00292 on_start_callbacks_.push_back(std::make_pair(client, remote));
00293
00294 return true;
00295 }
00296
00297 ACE_INLINE
00298 void
00299 DataLink::remove_on_start_callback(TransportClient* client, const RepoId& remote)
00300 {
00301 GuardType guard(strategy_lock_);
00302 on_start_callbacks_.erase(std::remove(on_start_callbacks_.begin(),
00303 on_start_callbacks_.end(),
00304 std::make_pair(client, remote)),
00305 on_start_callbacks_.end());
00306 }
00307
00308 ACE_INLINE
00309 const char*
00310 DataLink::connection_notice_as_str(ConnectionNotice notice)
00311 {
00312 static const char* NoticeStr[] = { "DISCONNECTED",
00313 "RECONNECTED",
00314 "LOST"
00315 };
00316
00317 return NoticeStr [notice];
00318 }
00319
00320 ACE_INLINE
00321 void
00322 DataLink::terminate_send()
00323 {
00324 this->send_strategy_->terminate_send(false);
00325 }
00326
00327 ACE_INLINE
00328 void
00329 DataLink::remove_listener(const RepoId& local_id)
00330 {
00331 GuardType guard(this->pub_sub_maps_lock_);
00332 {
00333 IdToSendListenerMap::iterator pos = this->send_listeners_.find(local_id);
00334 if (pos != this->send_listeners_.end()) {
00335 this->send_listeners_.erase(pos);
00336 if (Transport_debug_level > 5) {
00337 GuidConverter converter(local_id);
00338 ACE_DEBUG((LM_DEBUG,
00339 ACE_TEXT("(%P|%t) DataLink::remove_listener: removed %C from send_listeners\n"),
00340 OPENDDS_STRING(converter).c_str()));
00341 }
00342 return;
00343 }
00344 }
00345 {
00346 IdToRecvListenerMap::iterator pos = this->recv_listeners_.find(local_id);
00347 if (pos != this->recv_listeners_.end()) {
00348 this->recv_listeners_.erase(pos);
00349 if (Transport_debug_level > 5) {
00350 GuidConverter converter(local_id);
00351 ACE_DEBUG((LM_DEBUG,
00352 ACE_TEXT("(%P|%t) DataLink::remove_listener: removed %C from recv_listeners\n"),
00353 OPENDDS_STRING(converter).c_str()));
00354 }
00355 return;
00356 }
00357 }
00358 }
00359
00360 ACE_INLINE
00361 TransportSendListener*
00362 DataLink::send_listener_for(const RepoId& pub_id) const
00363 {
00364
00365
00366 IdToSendListenerMap::const_iterator found =
00367 this->send_listeners_.find(pub_id);
00368 if (found == this->send_listeners_.end()) {
00369 return 0;
00370 }
00371 return found->second;
00372 }
00373
00374 ACE_INLINE
00375 TransportReceiveListener*
00376 DataLink::recv_listener_for(const RepoId& sub_id) const
00377 {
00378
00379
00380 IdToRecvListenerMap::const_iterator found =
00381 this->recv_listeners_.find(sub_id);
00382 if (found == this->recv_listeners_.end()) {
00383 return 0;
00384 }
00385 return found->second;
00386 }
00387
00388 ACE_INLINE
00389 void
00390 DataLink::default_listener(TransportReceiveListener* trl)
00391 {
00392 GuardType guard(this->pub_sub_maps_lock_);
00393 this->default_listener_ = trl;
00394 }
00395
00396 ACE_INLINE
00397 TransportReceiveListener*
00398 DataLink::default_listener() const
00399 {
00400 GuardType guard(this->pub_sub_maps_lock_);
00401 return this->default_listener_;
00402 }
00403
00404 ACE_INLINE
00405 void
00406 DataLink::send_final_acks (const RepoId& )
00407 { }
00408
00409 }
00410 }