TransportImpl.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
00009 #include "TransportImpl.h"
00010 #include "DataLink.h"
00011 #include "TransportExceptions.h"
00012 #include "dds/DCPS/BuiltInTopicUtils.h"
00013 #include "dds/DCPS/DataWriterImpl.h"
00014 #include "dds/DCPS/DataReaderImpl.h"
00015 #include "dds/DCPS/PublisherImpl.h"
00016 #include "dds/DCPS/SubscriberImpl.h"
00017 #include "dds/DCPS/Util.h"
00018 #include "dds/DCPS/MonitorFactory.h"
00019 #include "dds/DCPS/Service_Participant.h"
00020 #include "tao/debug.h"
00021 #include "dds/DCPS/SafetyProfileStreams.h"
00022 
00023 
00024 #if !defined (__ACE_INLINE__)
00025 #include "TransportImpl.inl"
00026 #endif /* __ACE_INLINE__ */
00027 
00028 namespace OpenDDS {
00029 namespace DCPS {
00030 
00031 TransportImpl::TransportImpl()
00032   : monitor_(0),
00033     last_link_(0),
00034     is_shut_down_(false)
00035 {
00036   DBG_ENTRY_LVL("TransportImpl", "TransportImpl", 6);
00037   if (TheServiceParticipant->monitor_factory_) {
00038     monitor_ = TheServiceParticipant->monitor_factory_->create_transport_monitor(this);
00039   }
00040 }
00041 
00042 TransportImpl::~TransportImpl()
00043 {
00044   DBG_ENTRY_LVL("TransportImpl", "~TransportImpl", 6);
00045 }
00046 
00047 bool
00048 TransportImpl::is_shut_down() const
00049 {
00050   return is_shut_down_;
00051 }
00052 
00053 void
00054 TransportImpl::shutdown()
00055 {
00056   DBG_ENTRY_LVL("TransportImpl", "shutdown", 6);
00057 
00058   is_shut_down_ = true;
00059 
00060   // Stop datalink clean task.
00061   this->dl_clean_task_.close(1);
00062 
00063   if (!this->reactor_task_.is_nil()) {
00064     this->reactor_task_->stop();
00065   }
00066 
00067   this->pre_shutdown_i();
00068 
00069   OPENDDS_SET(TransportClient*) local_clients;
00070 
00071   {
00072     GuardType guard(this->lock_);
00073 
00074     if (this->config_.is_nil()) {
00075       // This TransportImpl is already shutdown.
00076 //MJM: So, I read here that config_i() actually "starts" us?
00077       return;
00078     }
00079 
00080     local_clients.swap(this->clients_);
00081 
00082     // We can release our lock_ now.
00083   }
00084 
00085   for (OPENDDS_SET(TransportClient*)::iterator it = local_clients.begin();
00086        it != local_clients.end(); ++it) {
00087     (*it)->transport_detached(this);
00088   }
00089 
00090   // Tell our subclass about the "shutdown event".
00091   this->shutdown_i();
00092 
00093   {
00094     GuardType guard(this->lock_);
00095     this->reactor_task_ = 0;
00096     // The shutdown_i() path may access the configuration so remove configuration
00097     // reference after shutdown is performed.
00098 
00099     // Drop our references to the config_.
00100     this->config_ = 0;
00101   }
00102 }
00103 
00104 bool
00105 TransportImpl::configure(TransportInst* config)
00106 {
00107   DBG_ENTRY_LVL("TransportImpl","configure",6);
00108 
00109   GuardType guard(this->lock_);
00110 
00111   if (config == 0) {
00112     ACE_ERROR_RETURN((LM_ERROR,
00113                       "(%P|%t) ERROR: invalid configuration.\n"),
00114                      false);
00115   }
00116 
00117   if (!this->config_.is_nil()) {
00118     // We are rejecting this configuration attempt since this
00119     // TransportImpl object has already been configured.
00120     ACE_ERROR_RETURN((LM_ERROR,
00121                       "(%P|%t) ERROR: TransportImpl already configured.\n"),
00122                      false);
00123   }
00124 
00125   config->_add_ref();
00126   this->config_ = config;
00127 
00128   // Let our subclass take a shot at the configuration object.
00129   if (this->configure_i(config) == false) {
00130     if (Transport_debug_level > 0) {
00131       dump();
00132     }
00133 
00134     guard.release();
00135     shutdown();
00136 
00137     // The subclass rejected the configuration attempt.
00138     ACE_ERROR_RETURN((LM_ERROR,
00139                       "(%P|%t) ERROR: TransportImpl configuration failed.\n"),
00140                      false);
00141   }
00142 
00143   // Open the DL Cleanup task
00144   // We depend upon the existing config logic to ensure the
00145   // DL Cleanup task is opened only once
00146   if (this->dl_clean_task_.open()) {
00147     ACE_ERROR_RETURN((LM_ERROR,
00148                       "(%P|%t) ERROR: DL Cleanup task failed to open : %p\n",
00149                       ACE_TEXT("open")), false);
00150   }
00151 
00152   // Success.
00153   if (this->monitor_) {
00154     this->monitor_->report();
00155   }
00156 
00157   if (Transport_debug_level > 0) {
00158 
00159     ACE_DEBUG((LM_DEBUG,
00160                ACE_TEXT("(%P|%t) TransportImpl::configure()\n%C"),
00161                dump_to_str().c_str()));
00162   }
00163 
00164 
00165   return true;
00166 }
00167 
00168 void
00169 TransportImpl::add_pending_connection(TransportClient* client, DataLink* link)
00170 {
00171   pending_connections_.insert(std::pair<TransportClient* const, DataLink_rch>(
00172     client, DataLink_rch(link, false)));
00173 }
00174 
00175 void
00176 TransportImpl::create_reactor_task(bool useAsyncSend)
00177 {
00178   if (this->reactor_task_.in()) {
00179     return;
00180   }
00181 
00182   this->reactor_task_ = new TransportReactorTask(useAsyncSend);
00183   if (0 != this->reactor_task_->open(0)) {
00184     throw Transport::MiscProblem(); // error already logged by TRT::open()
00185   }
00186 }
00187 
00188 void
00189 TransportImpl::attach_client(TransportClient* client)
00190 {
00191   DBG_ENTRY_LVL("TransportImpl", "attach_client", 6);
00192 
00193   GuardType guard(this->lock_);
00194   clients_.insert(client);
00195 }
00196 
00197 void
00198 TransportImpl::detach_client(TransportClient* client)
00199 {
00200   DBG_ENTRY_LVL("TransportImpl", "detach_client", 6);
00201 
00202   pre_detach(client);
00203   GuardType guard(this->lock_);
00204   clients_.erase(client);
00205 }
00206 
00207 void
00208 TransportImpl::unbind_link(DataLink*)
00209 {
00210   // may be overridden by subclass
00211   DBG_ENTRY_LVL("TransportImpl", "unbind_link",6);
00212 }
00213 
00214 bool
00215 TransportImpl::release_link_resources(DataLink* link)
00216 {
00217   DBG_ENTRY_LVL("TransportImpl", "release_link_resources",6);
00218 
00219   // Create a smart pointer without ownership (bumps up ref count)
00220   DataLink_rch dl(link, false);
00221 
00222   dl_clean_task_.add(dl);
00223 
00224   return true;
00225 }
00226 
00227 void
00228 TransportImpl::report()
00229 {
00230   if (this->monitor_) {
00231     this->monitor_->report();
00232   }
00233 }
00234 
00235 void
00236 TransportImpl::dump()
00237 {
00238   ACE_DEBUG((LM_DEBUG,
00239              ACE_TEXT("(%P|%t) TransportImpl::dump() -\n%C"),
00240              dump_to_str().c_str()));
00241 }
00242 
00243 OPENDDS_STRING
00244 TransportImpl::dump_to_str()
00245 {
00246   if (this->config_.is_nil()) {
00247     return OPENDDS_STRING(" (not configured)\n");
00248   } else {
00249     return this->config_->dump_to_str();
00250   }
00251 }
00252 
00253 }
00254 }

Generated on Fri Feb 12 20:05:28 2016 for OpenDDS by  doxygen 1.4.7