00001 /* 00002 * 00003 * 00004 * Distributed under the OpenDDS License. 00005 * See: http://www.opendds.org/license.html 00006 */ 00007 00008 #ifndef UPDATE_RECEIVER_T_H 00009 #define UPDATE_RECEIVER_T_H 00010 00011 #if !defined (ACE_LACKS_PRAGMA_ONCE) 00012 #pragma once 00013 #endif /* ACE_LACKS_PRAGMA_ONCE */ 00014 00015 #include "ace/Task.h" 00016 #include "dds/DCPS/unique_ptr.h" 00017 00018 #include <list> 00019 00020 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL 00021 00022 namespace DDS { 00023 00024 struct SampleInfo; 00025 00026 } // namespace DDS 00027 00028 namespace OpenDDS { 00029 namespace Federator { 00030 00031 template<class DataType> 00032 class UpdateProcessor; 00033 00034 template<class DataType> 00035 class UpdateReceiver : public virtual ACE_Task_Base { 00036 public: 00037 /// Construct with a processor reference. 00038 UpdateReceiver(UpdateProcessor<DataType>& processor); 00039 00040 virtual ~UpdateReceiver(); 00041 00042 // ACE_Task_Base methods. 00043 00044 virtual int open(void*); 00045 virtual int svc(); 00046 virtual int close(u_long flags = 0); 00047 00048 /** 00049 * @brief Sample enqueueing. 00050 * 00051 * @param sample - pointer to the received sample to be processed 00052 * @param info - pointer to the info about the sample to be processed. 00053 * 00054 * NOTE: We take ownership of this data and delete it when we are 00055 * done processing it. 00056 */ 00057 void add(OpenDDS::DCPS::unique_ptr<DataType> sample, OpenDDS::DCPS::unique_ptr<DDS::SampleInfo> info); 00058 00059 /// Synchronous termination. 00060 void stop(); 00061 00062 private: 00063 /// The object that we delegate update processing to. 00064 UpdateProcessor<DataType>& processor_; 00065 00066 /// Termination flag. 00067 bool stop_; 00068 00069 /// Protect queue modifications. 00070 ACE_SYNCH_MUTEX lock_; 00071 00072 /// Work to do indicator. 00073 ACE_Condition<ACE_SYNCH_MUTEX> workAvailable_; 00074 00075 /// Contents of the queue. 00076 typedef std::pair<DataType*, DDS::SampleInfo* > DataInfo; 00077 00078 /// Queue of publication data to process. 00079 std::list<DataInfo> queue_; 00080 }; 00081 00082 } // namespace Federator 00083 } // namespace OpenDDS 00084 00085 OPENDDS_END_VERSIONED_NAMESPACE_DECL 00086 00087 #if defined (ACE_TEMPLATES_REQUIRE_SOURCE) 00088 #include "UpdateReceiver_T.cpp" 00089 #endif /* ACE_TEMPLATES_REQUIRE_SOURCE */ 00090 00091 #if defined (ACE_TEMPLATES_REQUIRE_PRAGMA) 00092 #pragma message ("UpdateReceiver_T.cpp template inst") 00093 #pragma implementation ("UpdateReceiver_T.cpp") 00094 #endif /* ACE_TEMPLATES_REQUIRE_PRAGMA */ 00095 00096 #endif /* UPDATE_RECEIVER_T_H */