DataLink.inl

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "TransportSendStrategy.h"
00009 #include "TransportStrategy.h"
00010 #include "ThreadPerConnectionSendTask.h"
00011 #include "EntryExit.h"
00012 #include "dds/DCPS/GuidConverter.h"
00013 
00014 
00015 namespace OpenDDS {
00016 namespace DCPS {
00017 
00018 ACE_INLINE
00019 bool
00020 DataLink::issues_on_deleted_callback() const
00021 {
00022   return false;
00023 }
00024 
00025 ACE_INLINE
00026 Priority&
00027 DataLink::transport_priority()
00028 {
00029   return this->transport_priority_;
00030 }
00031 
00032 ACE_INLINE
00033 Priority
00034 DataLink::transport_priority() const
00035 {
00036   return this->transport_priority_;
00037 }
00038 
00039 
00040 ACE_INLINE
00041 bool& DataLink::is_loopback()
00042 {
00043   return this->is_loopback_;
00044 }
00045 
00046 
00047 ACE_INLINE
00048 bool  DataLink::is_loopback() const
00049 {
00050   return this->is_loopback_;
00051 }
00052 
00053 
00054 ACE_INLINE
00055 bool& DataLink::is_active()
00056 {
00057   return this->is_active_;
00058 }
00059 
00060 
00061 ACE_INLINE
00062 bool  DataLink::is_active() const
00063 {
00064   return this->is_active_;
00065 }
00066 
00067 ACE_INLINE const ACE_Time_Value&
00068 DataLink::datalink_release_delay() const
00069 {
00070   return this->datalink_release_delay_;
00071 }
00072 
00073 ACE_INLINE void
00074 DataLink::send_start()
00075 {
00076   DBG_ENTRY_LVL("DataLink","send_start",6);
00077 
00078   if (this->thr_per_con_send_task_ != 0) {
00079     this->thr_per_con_send_task_->add_request(SEND_START);
00080 
00081   } else
00082     this->send_start_i();
00083 }
00084 
00085 ACE_INLINE void
00086 DataLink::send_start_i()
00087 {
00088   DBG_ENTRY_LVL("DataLink","send_start_i",6);
00089   // This one is easy.  Simply delegate to our TransportSendStrategy
00090   // data member.
00091 
00092   TransportSendStrategy_rch strategy;
00093   {
00094     GuardType guard(this->strategy_lock_);
00095 
00096     strategy = this->send_strategy_;
00097   }
00098 
00099   if (!strategy.is_nil()) {
00100     strategy->send_start();
00101   }
00102 }
00103 
00104 ACE_INLINE void
00105 DataLink::send(TransportQueueElement* element)
00106 {
00107   DBG_ENTRY_LVL("DataLink","send",6);
00108 
00109   element = this->customize_queue_element(element);
00110   if (!element) {
00111     return;
00112   }
00113 
00114   if (this->thr_per_con_send_task_ != 0) {
00115     this->thr_per_con_send_task_->add_request(SEND, element);
00116 
00117   } else {
00118     this->send_i(element);
00119 
00120   }
00121 }
00122 
00123 ACE_INLINE void
00124 DataLink::send_i(TransportQueueElement* element, bool relink)
00125 {
00126   DBG_ENTRY_LVL("DataLink","send_i",6);
00127   // This one is easy.  Simply delegate to our TransportSendStrategy
00128   // data member.
00129 
00130   TransportSendStrategy_rch strategy;
00131   {
00132     GuardType guard(this->strategy_lock_);
00133 
00134     strategy = this->send_strategy_;
00135   }
00136 
00137   if (!strategy.is_nil()) {
00138     strategy->send(element, relink);
00139   }
00140 }
00141 
00142 ACE_INLINE void
00143 DataLink::send_stop(RepoId repoId)
00144 {
00145   DBG_ENTRY_LVL("DataLink","send_stop",6);
00146 
00147   if (this->thr_per_con_send_task_ != 0) {
00148     this->thr_per_con_send_task_->add_request(SEND_STOP);
00149 
00150   } else
00151     this->send_stop_i(repoId);
00152 }
00153 
00154 ACE_INLINE void
00155 DataLink::send_stop_i(RepoId repoId)
00156 {
00157   DBG_ENTRY_LVL("DataLink","send_stop_i",6);
00158   // This one is easy.  Simply delegate to our TransportSendStrategy
00159   // data member.
00160 
00161   TransportSendStrategy_rch strategy = 0;
00162   {
00163     GuardType guard(this->strategy_lock_);
00164 
00165     strategy = this->send_strategy_;
00166   }
00167 
00168   if (!strategy.is_nil()) {
00169     strategy->send_stop(repoId);
00170   }
00171 }
00172 
00173 ACE_INLINE
00174 void  DataLink::set_scheduling_release(bool scheduling_release)
00175 {
00176   this->scheduling_release_ = scheduling_release;
00177 }
00178 
00179 ACE_INLINE RemoveResult
00180 DataLink::remove_sample(const DataSampleElement* sample)
00181 {
00182   DBG_ENTRY_LVL("DataLink", "remove_sample", 6);
00183 
00184   if (this->thr_per_con_send_task_ != 0) {
00185     const RemoveResult rr = this->thr_per_con_send_task_->remove_sample(sample);
00186     if (rr == REMOVE_RELEASED || rr == REMOVE_FOUND) {
00187       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00188             "Removed sample from ThreadPerConnection queue.\n"));
00189       return rr;
00190     }
00191   }
00192 
00193   TransportSendStrategy_rch strategy;
00194   {
00195     GuardType guard(this->strategy_lock_);
00196 
00197     strategy = this->send_strategy_;
00198   }
00199 
00200   if (!strategy.is_nil()) {
00201     return strategy->remove_sample(sample);
00202   }
00203 
00204   return REMOVE_NOT_FOUND;
00205 }
00206 
00207 ACE_INLINE void
00208 DataLink::remove_all_msgs(RepoId pub_id)
00209 {
00210   DBG_ENTRY_LVL("DataLink","remove_all_msgs",6);
00211 
00212   // This one is easy.  Simply delegate to our TransportSendStrategy
00213   // data member.
00214 
00215   TransportSendStrategy_rch strategy;
00216   {
00217     GuardType guard(this->strategy_lock_);
00218 
00219     strategy = this->send_strategy_;
00220   }
00221 
00222   if (!strategy.is_nil()) {
00223     strategy->remove_all_msgs(pub_id);
00224   }
00225 }
00226 
00227 ACE_INLINE DataLinkIdType
00228 DataLink::id() const
00229 {
00230   DBG_ENTRY_LVL("DataLink","id",6);
00231   return id_;
00232 }
00233 
00234 ACE_INLINE int
00235 DataLink::start(const TransportSendStrategy_rch& send_strategy,
00236                 const TransportStrategy_rch& receive_strategy)
00237 {
00238   DBG_ENTRY_LVL("DataLink","start",6);
00239 
00240   // We assume that the send_strategy is not NULL, but the receive_strategy
00241   // is allowed to be NULL.
00242 
00243   // Attempt to start the strategies, and if there is a start() failure,
00244   // make sure to stop() any strategy that was already start()'ed.
00245   if (send_strategy->start() != 0) {
00246     // Failed to start the TransportSendStrategy.
00247     invoke_on_start_callbacks(false);
00248     return -1;
00249   }
00250 
00251   if ((!receive_strategy.is_nil()) && (receive_strategy->start() != 0)) {
00252     // Failed to start the TransportReceiveStrategy.
00253 
00254     // Remember to stop() the TransportSendStrategy since we did start it,
00255     // and now need to "undo" that action.
00256     send_strategy->stop();
00257     invoke_on_start_callbacks(false);
00258     return -1;
00259   }
00260 
00261   // We started both strategy objects.  Save them to data members since
00262   // we will now take ownership of them.
00263   {
00264     GuardType guard(this->strategy_lock_);
00265 
00266     this->send_strategy_    = send_strategy;
00267     this->receive_strategy_ = receive_strategy;
00268   }
00269   invoke_on_start_callbacks(true);
00270   {
00271     //catch any associations added during initial invoke_on_start_callbacks
00272     //only after first use_datalink has resolved does datalink's state truly
00273     //change to started, thus can't let pending associations proceed normally yet
00274     GuardType guard(this->strategy_lock_);
00275     this->started_ = true;
00276   }
00277   //Now state transitioned to started so no new on_start_callbacks will be added
00278   //so resolve any added during transition to started.
00279   invoke_on_start_callbacks(true);
00280   return 0;
00281 }
00282 
00283 ACE_INLINE
00284 bool
00285 DataLink::add_on_start_callback(TransportClient* client, const RepoId& remote)
00286 {
00287   GuardType guard(strategy_lock_);
00288 
00289   if (started_ && !send_strategy_.is_nil()) {
00290     return false; // link already started
00291   }
00292   on_start_callbacks_.push_back(std::make_pair(client, remote));
00293 
00294   return true;
00295 }
00296 
00297 ACE_INLINE
00298 void
00299 DataLink::remove_on_start_callback(TransportClient* client, const RepoId& remote)
00300 {
00301   GuardType guard(strategy_lock_);
00302   on_start_callbacks_.erase(std::remove(on_start_callbacks_.begin(),
00303                                         on_start_callbacks_.end(),
00304                                         std::make_pair(client, remote)),
00305                             on_start_callbacks_.end());
00306 }
00307 
00308 ACE_INLINE
00309 const char*
00310 DataLink::connection_notice_as_str(ConnectionNotice notice)
00311 {
00312   static const char* NoticeStr[] = { "DISCONNECTED",
00313                                      "RECONNECTED",
00314                                      "LOST"
00315                                    };
00316 
00317   return NoticeStr [notice];
00318 }
00319 
00320 ACE_INLINE
00321 void
00322 DataLink::terminate_send()
00323 {
00324   this->send_strategy_->terminate_send(false);
00325 }
00326 
00327 ACE_INLINE
00328 void
00329 DataLink::remove_listener(const RepoId& local_id)
00330 {
00331   GuardType guard(this->pub_sub_maps_lock_);
00332   {
00333     IdToSendListenerMap::iterator pos = this->send_listeners_.find(local_id);
00334     if (pos != this->send_listeners_.end()) {
00335       this->send_listeners_.erase(pos);
00336       if (Transport_debug_level > 5) {
00337         GuidConverter converter(local_id);
00338         ACE_DEBUG((LM_DEBUG,
00339                    ACE_TEXT("(%P|%t) DataLink::remove_listener: removed %C from send_listeners\n"),
00340                    OPENDDS_STRING(converter).c_str()));
00341       }
00342       return;
00343     }
00344   }
00345   {
00346     IdToRecvListenerMap::iterator pos = this->recv_listeners_.find(local_id);
00347     if (pos != this->recv_listeners_.end()) {
00348       this->recv_listeners_.erase(pos);
00349       if (Transport_debug_level > 5) {
00350         GuidConverter converter(local_id);
00351         ACE_DEBUG((LM_DEBUG,
00352                    ACE_TEXT("(%P|%t) DataLink::remove_listener: removed %C from recv_listeners\n"),
00353                    OPENDDS_STRING(converter).c_str()));
00354       }
00355       return;
00356     }
00357   }
00358 }
00359 
00360 ACE_INLINE
00361 TransportSendListener*
00362 DataLink::send_listener_for(const RepoId& pub_id) const
00363 {
00364   // pub_map_ (and send_listeners_) are already locked when entering this
00365   // private method.
00366   IdToSendListenerMap::const_iterator found =
00367     this->send_listeners_.find(pub_id);
00368   if (found == this->send_listeners_.end()) {
00369     return 0;
00370   }
00371   return found->second;
00372 }
00373 
00374 ACE_INLINE
00375 TransportReceiveListener*
00376 DataLink::recv_listener_for(const RepoId& sub_id) const
00377 {
00378   // sub_map_ (and recv_listeners_) are already locked when entering this
00379   // private method.
00380   IdToRecvListenerMap::const_iterator found =
00381     this->recv_listeners_.find(sub_id);
00382   if (found == this->recv_listeners_.end()) {
00383     return 0;
00384   }
00385   return found->second;
00386 }
00387 
00388 ACE_INLINE
00389 void
00390 DataLink::default_listener(TransportReceiveListener* trl)
00391 {
00392   GuardType guard(this->pub_sub_maps_lock_);
00393   this->default_listener_ = trl;
00394 }
00395 
00396 ACE_INLINE
00397 TransportReceiveListener*
00398 DataLink::default_listener() const
00399 {
00400   GuardType guard(this->pub_sub_maps_lock_);
00401   return this->default_listener_;
00402 }
00403 
00404 ACE_INLINE
00405 void
00406 DataLink::send_final_acks (const RepoId& /*readerid*/)
00407 { }
00408 
00409 }
00410 }

Generated on Fri Feb 12 20:05:19 2016 for OpenDDS by  doxygen 1.4.7