00001
00002
00003
00004
00005
00006
00007
00008 #include "DCPS/DdsDcps_pch.h"
00009 #include "TransportImpl.h"
00010 #include "DataLink.h"
00011 #include "TransportExceptions.h"
00012 #include "dds/DCPS/BuiltInTopicUtils.h"
00013 #include "dds/DCPS/DataWriterImpl.h"
00014 #include "dds/DCPS/DataReaderImpl.h"
00015 #include "dds/DCPS/PublisherImpl.h"
00016 #include "dds/DCPS/SubscriberImpl.h"
00017 #include "dds/DCPS/Util.h"
00018 #include "dds/DCPS/MonitorFactory.h"
00019 #include "dds/DCPS/Service_Participant.h"
00020 #include "tao/debug.h"
00021 #include "dds/DCPS/SafetyProfileStreams.h"
00022
00023
00024 #if !defined (__ACE_INLINE__)
00025 #include "TransportImpl.inl"
00026 #endif
00027
00028 namespace OpenDDS {
00029 namespace DCPS {
00030
00031 TransportImpl::TransportImpl()
00032 : monitor_(0),
00033 last_link_(0),
00034 is_shut_down_(false)
00035 {
00036 DBG_ENTRY_LVL("TransportImpl", "TransportImpl", 6);
00037 if (TheServiceParticipant->monitor_factory_) {
00038 monitor_ = TheServiceParticipant->monitor_factory_->create_transport_monitor(this);
00039 }
00040 }
00041
00042 TransportImpl::~TransportImpl()
00043 {
00044 DBG_ENTRY_LVL("TransportImpl", "~TransportImpl", 6);
00045 }
00046
00047 bool
00048 TransportImpl::is_shut_down() const
00049 {
00050 return is_shut_down_;
00051 }
00052
00053 void
00054 TransportImpl::shutdown()
00055 {
00056 DBG_ENTRY_LVL("TransportImpl", "shutdown", 6);
00057
00058 is_shut_down_ = true;
00059
00060
00061 this->dl_clean_task_.close(1);
00062
00063 if (!this->reactor_task_.is_nil()) {
00064 this->reactor_task_->stop();
00065 }
00066
00067 this->pre_shutdown_i();
00068
00069 OPENDDS_SET(TransportClient*) local_clients;
00070
00071 {
00072 GuardType guard(this->lock_);
00073
00074 if (this->config_.is_nil()) {
00075
00076
00077 return;
00078 }
00079
00080 local_clients.swap(this->clients_);
00081
00082
00083 }
00084
00085 for (OPENDDS_SET(TransportClient*)::iterator it = local_clients.begin();
00086 it != local_clients.end(); ++it) {
00087 (*it)->transport_detached(this);
00088 }
00089
00090
00091 this->shutdown_i();
00092
00093 {
00094 GuardType guard(this->lock_);
00095 this->reactor_task_ = 0;
00096
00097
00098
00099
00100 this->config_ = 0;
00101 }
00102 }
00103
00104 bool
00105 TransportImpl::configure(TransportInst* config)
00106 {
00107 DBG_ENTRY_LVL("TransportImpl","configure",6);
00108
00109 GuardType guard(this->lock_);
00110
00111 if (config == 0) {
00112 ACE_ERROR_RETURN((LM_ERROR,
00113 "(%P|%t) ERROR: invalid configuration.\n"),
00114 false);
00115 }
00116
00117 if (!this->config_.is_nil()) {
00118
00119
00120 ACE_ERROR_RETURN((LM_ERROR,
00121 "(%P|%t) ERROR: TransportImpl already configured.\n"),
00122 false);
00123 }
00124
00125 config->_add_ref();
00126 this->config_ = config;
00127
00128
00129 if (this->configure_i(config) == false) {
00130 if (Transport_debug_level > 0) {
00131 dump();
00132 }
00133
00134 guard.release();
00135 shutdown();
00136
00137
00138 ACE_ERROR_RETURN((LM_ERROR,
00139 "(%P|%t) ERROR: TransportImpl configuration failed.\n"),
00140 false);
00141 }
00142
00143
00144
00145
00146 if (this->dl_clean_task_.open()) {
00147 ACE_ERROR_RETURN((LM_ERROR,
00148 "(%P|%t) ERROR: DL Cleanup task failed to open : %p\n",
00149 ACE_TEXT("open")), false);
00150 }
00151
00152
00153 if (this->monitor_) {
00154 this->monitor_->report();
00155 }
00156
00157 if (Transport_debug_level > 0) {
00158
00159 ACE_DEBUG((LM_DEBUG,
00160 ACE_TEXT("(%P|%t) TransportImpl::configure()\n%C"),
00161 dump_to_str().c_str()));
00162 }
00163
00164
00165 return true;
00166 }
00167
00168 void
00169 TransportImpl::add_pending_connection(TransportClient* client, DataLink* link)
00170 {
00171 pending_connections_.insert(std::pair<TransportClient* const, DataLink_rch>(
00172 client, DataLink_rch(link, false)));
00173 }
00174
00175 void
00176 TransportImpl::create_reactor_task(bool useAsyncSend)
00177 {
00178 if (this->reactor_task_.in()) {
00179 return;
00180 }
00181
00182 this->reactor_task_ = new TransportReactorTask(useAsyncSend);
00183 if (0 != this->reactor_task_->open(0)) {
00184 throw Transport::MiscProblem();
00185 }
00186 }
00187
00188 void
00189 TransportImpl::attach_client(TransportClient* client)
00190 {
00191 DBG_ENTRY_LVL("TransportImpl", "attach_client", 6);
00192
00193 GuardType guard(this->lock_);
00194 clients_.insert(client);
00195 }
00196
00197 void
00198 TransportImpl::detach_client(TransportClient* client)
00199 {
00200 DBG_ENTRY_LVL("TransportImpl", "detach_client", 6);
00201
00202 pre_detach(client);
00203 GuardType guard(this->lock_);
00204 clients_.erase(client);
00205 }
00206
00207 void
00208 TransportImpl::unbind_link(DataLink*)
00209 {
00210
00211 DBG_ENTRY_LVL("TransportImpl", "unbind_link",6);
00212 }
00213
00214 bool
00215 TransportImpl::release_link_resources(DataLink* link)
00216 {
00217 DBG_ENTRY_LVL("TransportImpl", "release_link_resources",6);
00218
00219
00220 DataLink_rch dl(link, false);
00221
00222 dl_clean_task_.add(dl);
00223
00224 return true;
00225 }
00226
00227 void
00228 TransportImpl::report()
00229 {
00230 if (this->monitor_) {
00231 this->monitor_->report();
00232 }
00233 }
00234
00235 void
00236 TransportImpl::dump()
00237 {
00238 ACE_DEBUG((LM_DEBUG,
00239 ACE_TEXT("(%P|%t) TransportImpl::dump() -\n%C"),
00240 dump_to_str().c_str()));
00241 }
00242
00243 OPENDDS_STRING
00244 TransportImpl::dump_to_str()
00245 {
00246 if (this->config_.is_nil()) {
00247 return OPENDDS_STRING(" (not configured)\n");
00248 } else {
00249 return this->config_->dump_to_str();
00250 }
00251 }
00252
00253 }
00254 }