Line data Source code
1 : /* 2 : * 3 : * 4 : * Distributed under the OpenDDS License. 5 : * See: http://www.opendds.org/license.html 6 : */ 7 : 8 : #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/ 9 : #include "TransportImpl.h" 10 : #include "DataLink.h" 11 : #include "TransportExceptions.h" 12 : #include "dds/DCPS/BuiltInTopicUtils.h" 13 : #include "dds/DCPS/DataWriterImpl.h" 14 : #include "dds/DCPS/DataReaderImpl.h" 15 : #include "dds/DCPS/PublisherImpl.h" 16 : #include "dds/DCPS/SubscriberImpl.h" 17 : #include "dds/DCPS/Util.h" 18 : #include "dds/DCPS/MonitorFactory.h" 19 : #include "dds/DCPS/Service_Participant.h" 20 : #include "dds/DCPS/ServiceEventDispatcher.h" 21 : #include "tao/debug.h" 22 : #include "dds/DCPS/SafetyProfileStreams.h" 23 : 24 : #if !defined (__ACE_INLINE__) 25 : #include "TransportImpl.inl" 26 : #endif /* __ACE_INLINE__ */ 27 : 28 : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL 29 : 30 : namespace OpenDDS { 31 : namespace DCPS { 32 : 33 0 : TransportImpl::TransportImpl(TransportInst_rch config) 34 0 : : config_(config) 35 0 : , event_dispatcher_(make_rch<ServiceEventDispatcher>(1)) 36 0 : , is_shut_down_(false) 37 : { 38 : DBG_ENTRY_LVL("TransportImpl", "TransportImpl", 6); 39 0 : if (TheServiceParticipant->monitor_factory_) { 40 0 : monitor_.reset(TheServiceParticipant->monitor_factory_->create_transport_monitor(this)); 41 : } 42 0 : } 43 : 44 0 : TransportImpl::~TransportImpl() 45 : { 46 : DBG_ENTRY_LVL("TransportImpl", "~TransportImpl", 6); 47 0 : event_dispatcher_->shutdown(true); 48 0 : } 49 : 50 : bool 51 0 : TransportImpl::is_shut_down() const 52 : { 53 0 : return is_shut_down_; 54 : } 55 : 56 : void 57 0 : TransportImpl::shutdown() 58 : { 59 : DBG_ENTRY_LVL("TransportImpl", "shutdown", 6); 60 : 61 0 : is_shut_down_ = true; 62 : 63 0 : if (!this->reactor_task_.is_nil()) { 64 0 : this->reactor_task_->stop(); 65 : } 66 : 67 0 : event_dispatcher_->shutdown(true); 68 : 69 : // Tell our subclass about the "shutdown event". 70 0 : this->shutdown_i(); 71 0 : } 72 : 73 : 74 : bool 75 0 : TransportImpl::open() 76 : { 77 : // Success. 78 0 : if (this->monitor_) { 79 0 : this->monitor_->report(); 80 : } 81 : 82 0 : if (Transport_debug_level > 0) { 83 : 84 0 : ACE_DEBUG((LM_DEBUG, 85 : ACE_TEXT("(%P|%t) TransportImpl::open()\n%C"), 86 : dump_to_str().c_str())); 87 : } 88 : 89 0 : return true; 90 : } 91 : 92 : void 93 0 : TransportImpl::add_pending_connection(const TransportClient_rch& client, DataLink_rch link) 94 : { 95 0 : GuardType guard(pending_connections_lock_); 96 0 : pending_connections_.insert( PendConnMap::value_type(client, link) ); 97 0 : } 98 : 99 : void 100 0 : TransportImpl::create_reactor_task(bool useAsyncSend, const OPENDDS_STRING& name) 101 : { 102 0 : if (is_shut_down_ || this->reactor_task_.in()) { 103 0 : return; 104 : } 105 : 106 0 : this->reactor_task_= make_rch<ReactorTask>(useAsyncSend); 107 : 108 0 : if (reactor_task_->open_reactor_task(0, 109 0 : &TheServiceParticipant->get_thread_status_manager(), 110 : name)) { 111 0 : throw Transport::MiscProblem(); // error already logged by TRT::open() 112 : } 113 : } 114 : 115 : 116 : void 117 0 : TransportImpl::unbind_link(DataLink*) 118 : { 119 : // may be overridden by subclass 120 : DBG_ENTRY_LVL("TransportImpl", "unbind_link",6); 121 0 : } 122 : 123 : bool 124 0 : TransportImpl::release_link_resources(DataLink* link) 125 : { 126 : DBG_ENTRY_LVL("TransportImpl", "release_link_resources",6); 127 : 128 0 : DataLink_rch link_rch = rchandle_from(link); 129 0 : EventBase_rch do_clear = make_rch<DoClear>(link_rch); 130 0 : event_dispatcher_->dispatch(do_clear); 131 0 : return true; 132 0 : } 133 : 134 : void 135 0 : TransportImpl::report() 136 : { 137 0 : if (this->monitor_) { 138 0 : this->monitor_->report(); 139 : } 140 0 : } 141 : 142 : void 143 0 : TransportImpl::dump() 144 : { 145 0 : ACE_DEBUG((LM_DEBUG, 146 : ACE_TEXT("(%P|%t) TransportImpl::dump() -\n%C"), 147 : dump_to_str().c_str())); 148 0 : } 149 : 150 : OPENDDS_STRING 151 0 : TransportImpl::dump_to_str() 152 : { 153 0 : TransportInst_rch cfg = config_.lock(); 154 0 : return cfg ? cfg->dump_to_str() : OPENDDS_STRING(); 155 0 : } 156 : 157 : } 158 : } 159 : 160 : OPENDDS_END_VERSIONED_NAMESPACE_DECL