DataLink.inl

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "TransportSendStrategy.h"
00009 #include "TransportStrategy.h"
00010 #include "ThreadPerConnectionSendTask.h"
00011 #include "EntryExit.h"
00012 #include "dds/DCPS/GuidConverter.h"
00013 
00014 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00015 
00016 namespace OpenDDS {
00017 namespace DCPS {
00018 
00019 ACE_INLINE
00020 Priority&
00021 DataLink::transport_priority()
00022 {
00023   return this->transport_priority_;
00024 }
00025 
00026 ACE_INLINE
00027 Priority
00028 DataLink::transport_priority() const
00029 {
00030   return this->transport_priority_;
00031 }
00032 
00033 
00034 ACE_INLINE
00035 bool& DataLink::is_loopback()
00036 {
00037   return this->is_loopback_;
00038 }
00039 
00040 
00041 ACE_INLINE
00042 bool  DataLink::is_loopback() const
00043 {
00044   return this->is_loopback_;
00045 }
00046 
00047 
00048 ACE_INLINE
00049 bool& DataLink::is_active()
00050 {
00051   return this->is_active_;
00052 }
00053 
00054 
00055 ACE_INLINE
00056 bool  DataLink::is_active() const
00057 {
00058   return this->is_active_;
00059 }
00060 
00061 ACE_INLINE const ACE_Time_Value&
00062 DataLink::datalink_release_delay() const
00063 {
00064   return this->datalink_release_delay_;
00065 }
00066 
00067 ACE_INLINE void
00068 DataLink::send_start()
00069 {
00070   DBG_ENTRY_LVL("DataLink","send_start",6);
00071 
00072   if (this->thr_per_con_send_task_ != 0) {
00073     this->thr_per_con_send_task_->add_request(SEND_START);
00074 
00075   } else
00076     this->send_start_i();
00077 }
00078 
00079 ACE_INLINE void
00080 DataLink::send_start_i()
00081 {
00082   DBG_ENTRY_LVL("DataLink","send_start_i",6);
00083   // This one is easy.  Simply delegate to our TransportSendStrategy
00084   // data member.
00085 
00086   TransportSendStrategy_rch strategy;
00087   {
00088     GuardType guard(this->strategy_lock_);
00089 
00090     strategy = this->send_strategy_;
00091   }
00092 
00093   if (!strategy.is_nil()) {
00094     strategy->send_start();
00095   }
00096 }
00097 
00098 ACE_INLINE void
00099 DataLink::send(TransportQueueElement* element)
00100 {
00101   DBG_ENTRY_LVL("DataLink","send",6);
00102 
00103   if (element->is_request_ack() &&
00104       this->handle_send_request_ack(element)) {
00105     return;
00106   }
00107 
00108   element = this->customize_queue_element(element);
00109   if (!element) {
00110     return;
00111   }
00112 
00113   if (this->thr_per_con_send_task_ != 0) {
00114     if (this->thr_per_con_send_task_->add_request(SEND, element) == -1) {
00115       element->data_dropped(true);
00116     }
00117 
00118   } else {
00119     this->send_i(element);
00120 
00121   }
00122 }
00123 
00124 ACE_INLINE void
00125 DataLink::send_i(TransportQueueElement* element, bool relink)
00126 {
00127   DBG_ENTRY_LVL("DataLink","send_i",6);
00128   // This one is easy.  Simply delegate to our TransportSendStrategy
00129   // data member.
00130 
00131   TransportSendStrategy_rch strategy;
00132   {
00133     GuardType guard(this->strategy_lock_);
00134 
00135     strategy = this->send_strategy_;
00136   }
00137 
00138   if (strategy) {
00139     strategy->send(element, relink);
00140   } else {
00141     element->data_dropped(true);
00142   }
00143 }
00144 
00145 ACE_INLINE void
00146 DataLink::send_stop(RepoId repoId)
00147 {
00148   DBG_ENTRY_LVL("DataLink","send_stop",6);
00149 
00150   if (this->thr_per_con_send_task_ != 0) {
00151     this->thr_per_con_send_task_->add_request(SEND_STOP);
00152 
00153   } else
00154     this->send_stop_i(repoId);
00155 }
00156 
00157 ACE_INLINE void
00158 DataLink::send_stop_i(RepoId repoId)
00159 {
00160   DBG_ENTRY_LVL("DataLink","send_stop_i",6);
00161   // This one is easy.  Simply delegate to our TransportSendStrategy
00162   // data member.
00163 
00164   TransportSendStrategy_rch strategy;
00165   {
00166     GuardType guard(this->strategy_lock_);
00167 
00168     strategy = this->send_strategy_;
00169   }
00170 
00171   if (!strategy.is_nil()) {
00172     strategy->send_stop(repoId);
00173   }
00174 }
00175 
00176 ACE_INLINE
00177 void  DataLink::set_scheduling_release(bool scheduling_release)
00178 {
00179   this->scheduling_release_ = scheduling_release;
00180 }
00181 
00182 ACE_INLINE RemoveResult
00183 DataLink::remove_sample(const DataSampleElement* sample, void* context)
00184 {
00185   DBG_ENTRY_LVL("DataLink", "remove_sample", 6);
00186 
00187   if (this->thr_per_con_send_task_ != 0) {
00188     const RemoveResult rr = this->thr_per_con_send_task_->remove_sample(sample);
00189     if (rr == REMOVE_RELEASED || rr == REMOVE_FOUND) {
00190       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00191             "Removed sample from ThreadPerConnection queue.\n"));
00192       return rr;
00193     }
00194   }
00195 
00196   TransportSendStrategy_rch strategy;
00197   {
00198     GuardType guard(this->strategy_lock_);
00199 
00200     strategy = this->send_strategy_;
00201   }
00202 
00203   if (!strategy.is_nil()) {
00204     return strategy->remove_sample(sample, context);
00205   }
00206 
00207   return REMOVE_NOT_FOUND;
00208 }
00209 
00210 ACE_INLINE void
00211 DataLink::remove_all_msgs(RepoId pub_id)
00212 {
00213   DBG_ENTRY_LVL("DataLink","remove_all_msgs",6);
00214 
00215   // This one is easy.  Simply delegate to our TransportSendStrategy
00216   // data member.
00217 
00218   TransportSendStrategy_rch strategy;
00219   {
00220     GuardType guard(this->strategy_lock_);
00221 
00222     strategy = this->send_strategy_;
00223   }
00224 
00225   if (!strategy.is_nil()) {
00226     strategy->remove_all_msgs(pub_id);
00227   }
00228 }
00229 
00230 ACE_INLINE DataLinkIdType
00231 DataLink::id() const
00232 {
00233   DBG_ENTRY_LVL("DataLink","id",6);
00234   return id_;
00235 }
00236 
00237 ACE_INLINE int
00238 DataLink::start(const TransportSendStrategy_rch& send_strategy,
00239                 const TransportStrategy_rch& receive_strategy)
00240 {
00241   DBG_ENTRY_LVL("DataLink","start",6);
00242 
00243   // We assume that the send_strategy is not NULL, but the receive_strategy
00244   // is allowed to be NULL.
00245 
00246   // Attempt to start the strategies, and if there is a start() failure,
00247   // make sure to stop() any strategy that was already start()'ed.
00248   if (send_strategy->start() != 0) {
00249     // Failed to start the TransportSendStrategy.
00250     invoke_on_start_callbacks(false);
00251     return -1;
00252   }
00253 
00254   if ((!receive_strategy.is_nil()) && (receive_strategy->start() != 0)) {
00255     // Failed to start the TransportReceiveStrategy.
00256 
00257     // Remember to stop() the TransportSendStrategy since we did start it,
00258     // and now need to "undo" that action.
00259     send_strategy->stop();
00260     invoke_on_start_callbacks(false);
00261     return -1;
00262   }
00263 
00264   // We started both strategy objects.  Save them to data members since
00265   // we will now take ownership of them.
00266   {
00267     GuardType guard(this->strategy_lock_);
00268 
00269     this->send_strategy_    = send_strategy;
00270     this->receive_strategy_ = receive_strategy;
00271   }
00272   invoke_on_start_callbacks(true);
00273   {
00274     //catch any associations added during initial invoke_on_start_callbacks
00275     //only after first use_datalink has resolved does datalink's state truly
00276     //change to started, thus can't let pending associations proceed normally yet
00277     GuardType guard(this->strategy_lock_);
00278     this->started_ = true;
00279   }
00280   //Now state transitioned to started so no new on_start_callbacks will be added
00281   //so resolve any added during transition to started.
00282   invoke_on_start_callbacks(true);
00283   return 0;
00284 }
00285 
00286 
00287 ACE_INLINE
00288 const char*
00289 DataLink::connection_notice_as_str(ConnectionNotice notice)
00290 {
00291   static const char* NoticeStr[] = { "DISCONNECTED",
00292                                      "RECONNECTED",
00293                                      "LOST"
00294                                    };
00295 
00296   return NoticeStr [notice];
00297 }
00298 
00299 ACE_INLINE
00300 void
00301 DataLink::terminate_send()
00302 {
00303   this->send_strategy_->terminate_send(false);
00304 }
00305 
00306 ACE_INLINE
00307 void
00308 DataLink::remove_listener(const RepoId& local_id)
00309 {
00310   GuardType guard(pub_sub_maps_lock_);
00311   {
00312     IdToSendListenerMap::iterator pos = send_listeners_.find(local_id);
00313     if (pos != send_listeners_.end()) {
00314       send_listeners_.erase(pos);
00315       if (Transport_debug_level > 5) {
00316         GuidConverter converter(local_id);
00317         ACE_DEBUG((LM_DEBUG,
00318                    ACE_TEXT("(%P|%t) DataLink::remove_listener: ")
00319                    ACE_TEXT("removed %C from send_listeners\n"),
00320                    OPENDDS_STRING(converter).c_str()));
00321       }
00322       return;
00323     }
00324   }
00325   {
00326     IdToRecvListenerMap::iterator pos = recv_listeners_.find(local_id);
00327     if (pos != recv_listeners_.end()) {
00328       recv_listeners_.erase(pos);
00329       if (Transport_debug_level > 5) {
00330         GuidConverter converter(local_id);
00331         ACE_DEBUG((LM_DEBUG,
00332                    ACE_TEXT("(%P|%t) DataLink::remove_listener: ")
00333                    ACE_TEXT("removed %C from recv_listeners\n"),
00334                    OPENDDS_STRING(converter).c_str()));
00335       }
00336       return;
00337     }
00338   }
00339 }
00340 
00341 ACE_INLINE
00342 TransportSendListener_rch
00343 DataLink::send_listener_for(const RepoId& pub_id) const
00344 {
00345   // pub_map_ (and send_listeners_) are already locked when entering this
00346   // private method.
00347   IdToSendListenerMap::const_iterator found =
00348     this->send_listeners_.find(pub_id);
00349   if (found == this->send_listeners_.end()) {
00350     return TransportSendListener_rch();
00351   }
00352   return found->second.lock();
00353 }
00354 
00355 ACE_INLINE
00356 TransportReceiveListener_rch
00357 DataLink::recv_listener_for(const RepoId& sub_id) const
00358 {
00359   // sub_map_ (and recv_listeners_) are already locked when entering this
00360   // private method.
00361   IdToRecvListenerMap::const_iterator found =
00362     this->recv_listeners_.find(sub_id);
00363   if (found == this->recv_listeners_.end()) {
00364     return TransportReceiveListener_rch();
00365   }
00366   return found->second.lock();
00367 }
00368 
00369 ACE_INLINE
00370 void
00371 DataLink::default_listener(const TransportReceiveListener_wrch& trl)
00372 {
00373   GuardType guard(this->pub_sub_maps_lock_);
00374   this->default_listener_ = trl;
00375 }
00376 
00377 ACE_INLINE
00378 TransportReceiveListener_wrch
00379 DataLink::default_listener() const
00380 {
00381   GuardType guard(this->pub_sub_maps_lock_);
00382   return this->default_listener_;
00383 }
00384 
00385 ACE_INLINE
00386 void
00387 DataLink::send_final_acks (const RepoId& /*readerid*/)
00388 { }
00389 
00390 }
00391 }
00392 
00393 OPENDDS_END_VERSIONED_NAMESPACE_DECL
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1