00001
00002
00003
00004
00005
00006
00007
00008 #include "DCPS/DdsDcps_pch.h"
00009 #include "TransportReactorTask.h"
00010
00011 #if !defined (__ACE_INLINE__)
00012 #include "TransportReactorTask.inl"
00013 #endif
00014
00015 #include <ace/Select_Reactor.h>
00016 #include <ace/WFMO_Reactor.h>
00017 #include <ace/Proactor.h>
00018 #include <ace/Proactor_Impl.h>
00019 #include <ace/WIN32_Proactor.h>
00020
00021 OpenDDS::DCPS::TransportReactorTask::TransportReactorTask(bool useAsyncSend)
00022 : barrier_(2)
00023 , state_(STATE_NOT_RUNNING)
00024 , condition_(this->lock_)
00025 {
00026 DBG_ENTRY_LVL("TransportReactorTask","TransportReactorTask",6);
00027
00028 #if defined (ACE_WIN32) && defined (ACE_HAS_WIN32_OVERLAPPED_IO)
00029
00030 if (useAsyncSend) {
00031 this->reactor_ = new ACE_Reactor(new ACE_WFMO_Reactor, 1);
00032
00033 ACE_WIN32_Proactor* proactor_impl = new ACE_WIN32_Proactor(0, 1);
00034 this->proactor_ = new ACE_Proactor(proactor_impl, 1);
00035 this->reactor_->register_handler(proactor_impl, proactor_impl->get_handle());
00036 return;
00037 }
00038 #else
00039 ACE_UNUSED_ARG(useAsyncSend);
00040 #endif
00041
00042 this->reactor_ = new ACE_Reactor(new ACE_Select_Reactor, 1);
00043 this->proactor_ = 0;
00044 }
00045
00046 OpenDDS::DCPS::TransportReactorTask::~TransportReactorTask()
00047 {
00048 DBG_ENTRY_LVL("TransportReactorTask","~TransportReactorTask",6);
00049
00050 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO) || defined (ACE_HAS_AIO_CALLS)
00051 if (this->proactor_) {
00052 this->reactor_->remove_handler(this->proactor_->implementation()->get_handle(),
00053 ACE_Event_Handler::DONT_CALL);
00054 delete this->proactor_;
00055 }
00056 #endif
00057
00058 delete this->reactor_;
00059 }
00060
00061 int
00062 OpenDDS::DCPS::TransportReactorTask::open(void*)
00063 {
00064 DBG_ENTRY_LVL("TransportReactorTask","open",6);
00065
00066 GuardType guard(this->lock_);
00067
00068
00069 this->state_ = STATE_NOT_RUNNING;
00070
00071
00072
00073
00074
00075
00076
00077
00078 if (this->activate(THR_NEW_LWP | THR_JOINABLE,1) != 0) {
00079 ACE_ERROR_RETURN((LM_ERROR,
00080 "(%P|%t) ERROR: TransportReactorTask Failed to activate "
00081 "itself.\n"),
00082 -1);
00083 }
00084
00085 this->wait_for_startup();
00086
00087
00088
00089
00090
00091
00092 if (this->state_ == STATE_NOT_RUNNING) {
00093 this->state_ = STATE_OPENING;
00094 this->condition_.wait();
00095 }
00096
00097 return 0;
00098 }
00099
00100 int
00101 OpenDDS::DCPS::TransportReactorTask::svc()
00102 {
00103 DBG_ENTRY_LVL("TransportReactorTask","svc",6);
00104
00105
00106
00107
00108
00109 this->_add_ref();
00110
00111
00112
00113
00114 sigset_t set;
00115 ACE_OS::sigfillset(&set);
00116 ACE_OS::thr_sigsetmask(SIG_SETMASK, &set, NULL);
00117
00118
00119
00120 if (this->reactor_->owner(ACE_Thread_Manager::instance()->thr_self()) != 0) {
00121 ACE_ERROR((LM_ERROR,
00122 "(%P|%t) ERROR: Failed to change the reactor's owner().\n"));
00123 }
00124 this->reactor_owner_ = ACE_Thread_Manager::instance()->thr_self();
00125 this->wait_for_startup();
00126
00127
00128 {
00129
00130
00131 GuardType guard(this->lock_);
00132
00133 if (this->state_ == STATE_OPENING) {
00134
00135 this->state_ = STATE_RUNNING;
00136
00137
00138 this->condition_.signal();
00139 }
00140 }
00141
00142
00143
00144
00145
00146
00147
00148 try {
00149
00150 this->reactor_->run_reactor_event_loop();
00151 } catch (const std::exception& e) {
00152 ACE_ERROR((LM_ERROR,
00153 "(%P|%t) ERROR: TransportReactorTask::svc caught exception - %C.\n",
00154 e.what()));
00155 } catch (...) {
00156 ACE_ERROR((LM_ERROR,
00157 "(%P|%t) ERROR: TransportReactorTask::svc caught exception.\n"));
00158 }
00159
00160 return 0;
00161 }
00162
00163 int
00164 OpenDDS::DCPS::TransportReactorTask::close(u_long flags)
00165 {
00166 DBG_ENTRY_LVL("TransportReactorTask","close",6);
00167 ACE_UNUSED_ARG(flags);
00168
00169
00170
00171
00172
00173
00174
00175
00176
00177
00178
00179 this->_remove_ref();
00180 return 0;
00181 }
00182
00183 void
00184 OpenDDS::DCPS::TransportReactorTask::stop()
00185 {
00186 DBG_ENTRY_LVL("TransportReactorTask","stop",6);
00187 {
00188 GuardType guard(this->lock_);
00189
00190 if (this->state_ == STATE_NOT_RUNNING) {
00191
00192 return;
00193 }
00194
00195 this->state_ = STATE_NOT_RUNNING;
00196 }
00197
00198 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO) || defined (ACE_HAS_AIO_CALLS)
00199
00200 if (this->proactor_) {
00201 this->reactor_->remove_handler(this->proactor_->implementation()->get_handle(),
00202 ACE_Event_Handler::DONT_CALL);
00203 }
00204 #endif
00205
00206 this->reactor_->end_reactor_event_loop();
00207
00208
00209
00210 this->wait();
00211 }