/*
* Copyright 2011 Steven Gribble
*
* This file is part of the UW CSE 333 course project sequence
* (333proj).
*
* 333proj is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* 333proj is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with 333proj. If not, see .
*/
#include
#include
#include "./ThreadPool.h"
namespace hw4 {
// A (pointer to a) ThreadStartStruct is passed to a worker thread
// as the (void *) argument to the thread start routine.
typedef struct {
ThreadPool *tp;
ThreadPool::thread_init_fn initfn;
void * initarg;
} ThreadStartStruct;
// This is the thread start routine, i.e., the function that threads
// are born into.
void *ThreadLoop(void *vpool);
ThreadPool::ThreadPool(uint32_t num_threads,
thread_init_fn f,
void *v) {
// Initialize our member variables.
num_threads_running_ = 0;
killthreads_ = false;
assert(pthread_mutex_init(&qlock_, NULL) == 0);
assert(pthread_cond_init(&qcond_, NULL) == 0);
// Allocate the array of pthread structures.
thread_array_ = new pthread_t[num_threads];
// Spawn the threads one by one, passing them a heap-allocated
// ThreadStartStruct.
assert(pthread_mutex_lock(&qlock_) == 0);
for (uint32_t i = 0; i < num_threads; i++) {
ThreadStartStruct *tss = new ThreadStartStruct;
tss->tp = this;
tss->initfn = f;
tss->initarg = v;
assert(pthread_create(&(thread_array_[i]),
NULL,
&ThreadLoop,
static_cast(tss)) == 0);
}
// Wait for all of the threads to be born and initialized.
while (num_threads_running_ != num_threads) {
assert(pthread_mutex_unlock(&qlock_) == 0);
assert(pthread_mutex_lock(&qlock_) == 0);
}
assert(pthread_mutex_unlock(&qlock_) == 0);
// Done! The thread pool is ready, and all of the worker threads
// are initialized and waiting on qcond_ to be notified of available
// work.
}
ThreadPool:: ~ThreadPool() {
assert(pthread_mutex_lock(&qlock_) == 0);
uint32_t num_threads = num_threads_running_;
// Tell all of the worker threads to kill themselves.
killthreads_ = true;
// Join with the running threads 1-by-1 until they have all died.
for (uint32_t i = 0; i < num_threads; i++) {
// Use a sledgehammer and broadcast every loop iteration, just to
// be extra-certain that worker threads wake up and see the "kill
// yourself" flag.
assert(pthread_cond_broadcast(&qcond_) == 0);
assert(pthread_mutex_unlock(&qlock_) == 0);
assert(pthread_join(thread_array_[i], NULL) == 0);
assert(pthread_mutex_lock(&qlock_) == 0);
}
// All of the worker threads are dead, so clean up the thread
// structures.
assert(num_threads_running_ == 0);
if (thread_array_ != NULL) {
delete[] thread_array_;
}
thread_array_ = NULL;
assert(pthread_mutex_unlock(&qlock_) == 0);
// Empty the task queue, serially issuing any remaining work.
while (!work_queue_.empty()) {
Task *nextTask = work_queue_.front();
work_queue_.pop_front();
nextTask->f_(nextTask);
}
}
// Enqueue a Task for dispatch.
void ThreadPool::Dispatch(Task *t) {
assert(pthread_mutex_lock(&qlock_) == 0);
assert(killthreads_ == false);
work_queue_.push_back(t);
assert(pthread_cond_signal(&qcond_) == 0);
assert(pthread_mutex_unlock(&qlock_) == 0);
}
// This is the main loop that all worker threads are born into. They
// wait for a signal on the work queue condition variable, then they
// grab work off the queue. Threads return (i.e., kill themselves)
// when they notice that killthreads_ is true.
void *ThreadLoop(void *vpool) {
ThreadStartStruct *tss = static_cast(vpool);
ThreadPool *pool = tss->tp;
void *thread_data = NULL;
// Grab the lock, increment the thread count so that the ThreadPool
// constructor knows this new thread is alive.
assert(pthread_mutex_lock(&(pool->qlock_)) == 0);
pool->num_threads_running_++;
// Call the customer-supplied initialization function under lock and
// stash away the return value.
if (tss->initfn != NULL) {
thread_data = tss->initfn(tss->initarg);
}
delete tss;
// This is our main thread work loop.
while (pool->killthreads_ == false) {
// Wait to be signaled that something has happened.
assert(pthread_cond_wait(&(pool->qcond_), &(pool->qlock_)) == 0);
// Keep trying to dequeue work until the work queue is empty.
while (!pool->work_queue_.empty() && (pool->killthreads_ == false)) {
ThreadPool::Task *nextTask = pool->work_queue_.front();
nextTask->thread_init_return = thread_data;
pool->work_queue_.pop_front();
// We picked up a Task, so invoke the task function with the
// lock released, then check so see if more tasks are waiting to
// be picked up.
assert(pthread_mutex_unlock(&(pool->qlock_)) == 0);
nextTask->f_(nextTask);
assert(pthread_mutex_lock(&(pool->qlock_)) == 0);
}
}
// All done, exit.
pool->num_threads_running_--;
assert(pthread_mutex_unlock(&(pool->qlock_)) == 0);
return NULL;
}
} // namespace hw4