28 , work_available_(
lock_)
29 , shutdown_initiated_(false)
34 DBG_ENTRY_LVL(
"ThreadPerConnectionSendTask",
"ThreadPerConnectionSendTask", 6);
39 DBG_ENTRY_LVL(
"ThreadPerConnectionSendTask",
"~ThreadPerConnectionSendTask", 6);
45 DBG_ENTRY(
"ThreadPerConnectionSendTask",
"add");
49 req->element_ = element;
70 ACE_TEXT(
"(%P|%t) ERROR: ThreadPerConnectionSendTask::add %p\n"),
79 DBG_ENTRY(
"ThreadPerConnectionSendTask",
"open");
87 "(%P|%t) ThreadPerConnectionSendTask failed to open. " 88 "Task has previously been open()'ed.\n"),
93 int priority = mapper.thread_priority();
95 long flags = THR_NEW_LWP | THR_JOINABLE ;
101 flags |= THR_INHERIT_SCHED;
106 ACE_TEXT(
"(%P|%t) ThreadPerConnectionSendTask::open(): ")
107 ACE_TEXT(
"activating thread with flags 0x%08.8x ")
114 if (
activate(flags, 1, 0, priority) != 0) {
118 "(%P|%t) ThreadPerConnectionSendTask failed to activate " 119 "the worker threads.\n"),
179 if (!reqs.empty() && reqs.back()->op_ ==
SEND_STOP) {
180 for (
size_t i = 0; i < reqs.size(); ++i) {
193 DBG_ENTRY(
"ThreadPerConnectionSendTask",
"close");
222 DBG_ENTRY(
"ThreadPerConnectionSendTask",
"remove_sample");
255 ACE_ERROR((
LM_ERROR,
"(%P|%t) ERROR: ThreadPerConnectionSendTask::execute unknown command %d\n",
const GUID_t GUID_UNKNOWN
Nil value for GUID.
bool opened_
Flag used to avoid multiple open() calls.
virtual ~ThreadPerConnectionSendTask()
bool notify_one()
Unblock one of the threads waiting on this condition.
virtual void send_i(TransportQueueElement *element, bool relink=true)
ACE_thread_t thr_self(void)
ACE_thread_t thr_id_
The id of the thread created by this task.
ACE_Guard< ACE_Thread_Mutex > lock_
bool shutdown_initiated_
Flag used to initiate a shutdown request to all worker threads.
virtual int svc()
The "mainline" executed by the worker thread.
DataSample * get_sample() const
virtual void execute(SendRequest &req)
Handle the request.
virtual int open(void *=0)
Activate the worker threads.
RemoveResult
used by DataLink::remove_sample(), TransportSendStrategy, *RemoveVisitor
Priority & transport_priority()
TransportQueueElement * element_
int thr_sigsetmask(int how, const sigset_t *nsm, sigset_t *osm)
ACE_Message_Block * cont(void) const
virtual int priority(void) const
void send_stop_i(GUID_t repoId)
size_t size() const
Accessor for the current number of elements in the queue.
int thr_equal(ACE_thread_t t1, ACE_thread_t t2)
#define DBG_ENTRY(CNAME, MNAME)
bool notify_all()
Unblock all of the threads waiting on this condition.
ThreadPerConnectionSendTask(DataLink *link)
map TRANSPORT_PRIORITY values directly.
RemoveResult remove_sample(const DataSampleElement *element)
Remove sample from the thread per connection queue.
CvStatus wait(ThreadStatusManager &thread_status_manager)
Block until thread is woken up.
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
ConditionVariableType work_available_
RemoveResult status() const
True if the visitor found and removed the sample.
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
void accept_visitor(VisitorType &visitor) const
QueueType queue_
The request queue.
#define ACE_ERROR_RETURN(X, Y)
virtual int activate(long flags=THR_NEW_LWP|THR_JOINABLE|THR_INHERIT_SCHED, int n_threads=1, int force_active=0, long priority=ACE_DEFAULT_THREAD_PRIORITY, int grp_id=-1, ACE_Task_Base *task=0, ACE_hthread_t thread_handles[]=0, void *stack[]=0, size_t stack_size[]=0, ACE_thread_t thread_ids[]=0, const char *thr_name[]=0)
typedef OPENDDS_VECTOR(ActionConnectionRecord) ConnectionRecords
#define TheServiceParticipant
int add_request(SendStrategyOpType op, TransportQueueElement *element=0)
LockType lock_
Lock to protect the "state" (all of the data members) of this object.
int sigfillset(sigset_t *s)
virtual int close(u_long flag=0)
Called when the thread exits.
The Internal API and Implementation of OpenDDS.
DataLink * link_
The datalink to send the samples or control messages.
int put(T *elem)
Put a pointer to an element (T*) on to the queue.
Base wrapper class around a data/control sample to be sent.