Line data Source code
1 : #include <DCPS/DdsDcps_pch.h> // Only the _pch include should start with DCPS/ 2 : 3 : #include "DcpsUpcalls.h" 4 : 5 : #include "Service_Participant.h" 6 : #include "ThreadStatusManager.h" 7 : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL 8 : 9 : namespace OpenDDS { 10 : namespace DCPS { 11 : 12 0 : DcpsUpcalls::DcpsUpcalls( 13 : DataReaderCallbacks_rch drr, 14 : const GUID_t& reader, 15 : const WriterAssociation& wa, 16 : bool active, 17 0 : DataWriterCallbacks_rch dwr) 18 0 : : drr_(drr) 19 0 : , reader_(reader) 20 0 : , wa_(wa) 21 0 : , active_(active) 22 0 : , dwr_(dwr) 23 0 : , reader_done_(false) 24 0 : , writer_done_(false) 25 0 : , cnd_(mtx_) 26 : { 27 0 : } 28 : 29 0 : int DcpsUpcalls::svc() 30 : { 31 0 : ThreadStatusManager& thread_status_manager = TheServiceParticipant->get_thread_status_manager(); 32 0 : const TimeDuration thread_status_interval = thread_status_manager.thread_status_interval(); 33 0 : const bool update_thread_status = thread_status_manager.update_thread_status(); 34 : 35 0 : ThreadStatusManager::Start s(thread_status_manager, "DcpsUpcalls"); 36 : 37 0 : MonotonicTimePoint expire = MonotonicTimePoint::now() + thread_status_interval; 38 : 39 0 : DataReaderCallbacks_rch drr = drr_.lock(); 40 0 : if (!drr) { 41 0 : return 0; 42 : } 43 0 : drr->add_association(reader_, wa_, active_); 44 : 45 : { 46 0 : ACE_GUARD_RETURN(ACE_Thread_Mutex, g, mtx_, -1); 47 0 : reader_done_ = true; 48 0 : cnd_.notify_one(); 49 0 : while (!writer_done_) { 50 0 : if (update_thread_status) { 51 0 : switch (cnd_.wait_until(expire, thread_status_manager)) { 52 0 : case CvStatus_NoTimeout: 53 0 : break; 54 : 55 0 : case CvStatus_Timeout: 56 0 : expire = MonotonicTimePoint::now() + thread_status_interval; 57 0 : break; 58 : 59 0 : case CvStatus_Error: 60 0 : if (DCPS_debug_level) { 61 0 : ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: DcpsUpcalls::svc: error in wait_utill\n")); 62 : } 63 0 : return -1; 64 : } 65 0 : } else if (cnd_.wait(thread_status_manager) == CvStatus_Error) { 66 0 : if (DCPS_debug_level) { 67 0 : ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: DcpsUpcalls::svc: error in wait\n")); 68 : } 69 0 : return -1; 70 : } 71 : } 72 0 : } 73 : 74 0 : return 0; 75 0 : } 76 : 77 0 : void DcpsUpcalls::writer_done() 78 : { 79 : { 80 0 : ACE_GUARD(ACE_Thread_Mutex, g, mtx_); 81 0 : writer_done_ = true; 82 0 : cnd_.notify_one(); 83 0 : } 84 : 85 0 : ThreadStatusManager& thread_status_manager = TheServiceParticipant->get_thread_status_manager(); 86 0 : ThreadStatusManager::Sleeper sleeper(thread_status_manager); 87 0 : wait(); // ACE_Task_Base::wait does not accept a timeout 88 0 : } 89 : 90 : } // namespace DCPS 91 : } // namespace OpenDDS 92 : 93 : OPENDDS_END_VERSIONED_NAMESPACE_DECL