TransportReactorTask.cpp
Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008 #include "DCPS/DdsDcps_pch.h"
00009 #include "TransportReactorTask.h"
00010
00011 #if !defined (__ACE_INLINE__)
00012 #include "TransportReactorTask.inl"
00013 #endif
00014
00015 #include <ace/Select_Reactor.h>
00016 #include <ace/WFMO_Reactor.h>
00017 #include <ace/Proactor.h>
00018 #include <ace/Proactor_Impl.h>
00019 #include <ace/WIN32_Proactor.h>
00020
00021 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00022
00023 OpenDDS::DCPS::TransportReactorTask::TransportReactorTask(bool useAsyncSend)
00024 : barrier_(2)
00025 , state_(STATE_NOT_RUNNING)
00026 , condition_(this->lock_)
00027 , reactor_owner_(ACE_OS::NULL_thread)
00028 {
00029 DBG_ENTRY_LVL("TransportReactorTask","TransportReactorTask",6);
00030
00031 #if defined (ACE_WIN32) && defined (ACE_HAS_WIN32_OVERLAPPED_IO)
00032
00033 if (useAsyncSend) {
00034 this->reactor_ = new ACE_Reactor(new ACE_WFMO_Reactor, 1);
00035
00036 ACE_WIN32_Proactor* proactor_impl = new ACE_WIN32_Proactor(0, 1);
00037 this->proactor_ = new ACE_Proactor(proactor_impl, 1);
00038 this->reactor_->register_handler(proactor_impl, proactor_impl->get_handle());
00039 return;
00040 }
00041 #else
00042 ACE_UNUSED_ARG(useAsyncSend);
00043 #endif
00044
00045 this->reactor_ = new ACE_Reactor(new ACE_Select_Reactor, 1);
00046 this->proactor_ = 0;
00047 }
00048
00049 OpenDDS::DCPS::TransportReactorTask::~TransportReactorTask()
00050 {
00051 DBG_ENTRY_LVL("TransportReactorTask","~TransportReactorTask",6);
00052
00053 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO) || defined (ACE_HAS_AIO_CALLS)
00054 if (this->proactor_) {
00055 this->reactor_->remove_handler(this->proactor_->implementation()->get_handle(),
00056 ACE_Event_Handler::DONT_CALL);
00057 delete this->proactor_;
00058 }
00059 #endif
00060
00061 delete this->reactor_;
00062 }
00063
00064 int
00065 OpenDDS::DCPS::TransportReactorTask::open(void*)
00066 {
00067 DBG_ENTRY_LVL("TransportReactorTask","open",6);
00068
00069 GuardType guard(this->lock_);
00070
00071
00072 this->state_ = STATE_NOT_RUNNING;
00073
00074
00075
00076
00077
00078
00079
00080
00081 if (this->activate(THR_NEW_LWP | THR_JOINABLE,1) != 0) {
00082 ACE_ERROR_RETURN((LM_ERROR,
00083 "(%P|%t) ERROR: TransportReactorTask Failed to activate "
00084 "itself.\n"),
00085 -1);
00086 }
00087
00088 this->wait_for_startup();
00089
00090
00091
00092
00093
00094
00095 if (this->state_ == STATE_NOT_RUNNING) {
00096 this->state_ = STATE_OPENING;
00097 this->condition_.wait();
00098 }
00099
00100 return 0;
00101 }
00102
00103 int
00104 OpenDDS::DCPS::TransportReactorTask::svc()
00105 {
00106 DBG_ENTRY_LVL("TransportReactorTask","svc",6);
00107
00108
00109
00110
00111
00112 this->_add_ref();
00113
00114
00115
00116
00117 sigset_t set;
00118 ACE_OS::sigfillset(&set);
00119 ACE_OS::thr_sigsetmask(SIG_SETMASK, &set, NULL);
00120
00121
00122
00123 if (this->reactor_->owner(ACE_Thread_Manager::instance()->thr_self()) != 0) {
00124 ACE_ERROR((LM_ERROR,
00125 "(%P|%t) ERROR: Failed to change the reactor's owner().\n"));
00126 }
00127 this->reactor_owner_ = ACE_Thread_Manager::instance()->thr_self();
00128 this->wait_for_startup();
00129
00130
00131 {
00132
00133
00134 GuardType guard(this->lock_);
00135
00136 if (this->state_ == STATE_OPENING) {
00137
00138 this->state_ = STATE_RUNNING;
00139
00140
00141 this->condition_.signal();
00142 }
00143 }
00144
00145
00146
00147
00148
00149
00150
00151 try {
00152
00153 this->reactor_->run_reactor_event_loop();
00154 } catch (const std::exception& e) {
00155 ACE_ERROR((LM_ERROR,
00156 "(%P|%t) ERROR: TransportReactorTask::svc caught exception - %C.\n",
00157 e.what()));
00158 } catch (...) {
00159 ACE_ERROR((LM_ERROR,
00160 "(%P|%t) ERROR: TransportReactorTask::svc caught exception.\n"));
00161 }
00162
00163 return 0;
00164 }
00165
00166 int
00167 OpenDDS::DCPS::TransportReactorTask::close(u_long flags)
00168 {
00169 DBG_ENTRY_LVL("TransportReactorTask","close",6);
00170 ACE_UNUSED_ARG(flags);
00171
00172
00173
00174
00175
00176
00177
00178
00179
00180
00181
00182 this->_remove_ref();
00183 return 0;
00184 }
00185
00186 void
00187 OpenDDS::DCPS::TransportReactorTask::stop()
00188 {
00189 DBG_ENTRY_LVL("TransportReactorTask","stop",6);
00190 {
00191 GuardType guard(this->lock_);
00192
00193 if (this->state_ == STATE_NOT_RUNNING) {
00194
00195 return;
00196 }
00197
00198 this->state_ = STATE_NOT_RUNNING;
00199 }
00200
00201 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO) || defined (ACE_HAS_AIO_CALLS)
00202
00203 if (this->proactor_) {
00204 this->reactor_->remove_handler(this->proactor_->implementation()->get_handle(),
00205 ACE_Event_Handler::DONT_CALL);
00206 }
00207 #endif
00208
00209 this->reactor_->end_reactor_event_loop();
00210
00211
00212
00213 this->wait();
00214 }
00215
00216 OPENDDS_END_VERSIONED_NAMESPACE_DECL