#include "EventLoop.h" #include "ThreadPool.h" #include "ThreadSafeQueue.hpp" extern "C" { #include #include #include #include #include #include #include // for uint32_t, etc. #include #include #include } #include #include using namespace std; using namespace hw4; // A results queue, storing pairs of integers with booleans indicating whether or not they're prime ThreadSafeQueue > results; // A thread pool, initialized to 16 threads ThreadPool pool(16); void CalculatePrimality(ThreadPool::Task *arg); class PrimalityTask : public ThreadPool::Task { public: PrimalityTask(int fd, uint32_t pp) : ThreadPool::Task(CalculatePrimality), fd_(fd), pp_(pp) {} uint32_t get_possible_prime() { return pp_; } int get_signal_fd() { return fd_; } private: int fd_; uint32_t pp_; }; void CalculatePrimality(ThreadPool::Task *arg) { PrimalityTask *ptask = static_cast(arg); char b; // contents don't matter int signal_fd = ptask->get_signal_fd(); uint32_t pp = ptask->get_possible_prime(); bool isPrime = true; sleep(pp); for(uint32_t i = 2; i < pp; i++) { if (pp % i == 0) { isPrime = false; break; } } pair p(pp,isPrime); // This queue is thread-safe results.push(p); assert(write(signal_fd,&b,1) == 1); // When called, this function owns the task object, and is responsible for deleting it. delete arg; } // An EventLoopHandler to run when some output result is ready class OutputDoneHandler : public EventLoopHandler { virtual void Handle(const int fd, const unsigned int eventfield, EventLoop *looper) { char c; while(read(fd,&c,1) != 1); pair p; assert(eventfield & EventLoop::FD_IS_READABLE); p = results.pop(); cout << "The number " << p.first << (p.second ? " is prime." : " is not prime") << endl; } virtual void Quitting(const int fd) { } }; // An EventLoopHandler to run when the user has entered input class InputEventHandler : public EventLoopHandler { public: InputEventHandler(int fd) : signal_fd_(fd) {} virtual void Handle(const int fd, const unsigned int eventfield, EventLoop *looper) { // Read the user input char buf[100]; int bytes_read = 0; while (bytes_read < 100 && (bytes_read == 0 || buf[bytes_read-1] != '\n')) { int tmp = read(fd, buf, 100-bytes_read); if (tmp < 0 && (errno == EAGAIN || errno == EINTR)) continue; if (tmp < 0) { cerr << "error reading from stdin: " << strerror(errno) << endl; assert(0); } bytes_read += tmp; } buf[bytes_read-1] = '\0'; // If we read a q, quit, otherwise parse the integer and enqueue a new task to the thread pool if (buf[0]=='q') { looper->Quit(); } else { int pp = atoi(buf); pool.Dispatch(new PrimalityTask(signal_fd_,pp)); } } virtual void Quitting(const int fd) { } private: int signal_fd_; }; int main() { EventLoop loop; // Create a pipe, a pair of file descriptors that talk to each other. // The prime checker threads in the thread pool will write a byte here to // signal when they're done checking a prime, and the result is ready. // The main thread's event loop will wait on the other descriptor, and // dequeue a result when a byte arrives. int fds[2]; assert(0 == pipe2(fds,O_NONBLOCK)); int output_in_q_fd = fds[0]; int signal_fd = fds[1]; // Trigger an InputEventHandler when user input is entered assert(loop.RegisterFD(0,EventLoop::FD_IS_READABLE,new InputEventHandler(signal_fd))); // Trigger an OutputDoneHandler when a worker thread indicates a result is computed assert(loop.RegisterFD(output_in_q_fd,EventLoop::FD_IS_READABLE,new OutputDoneHandler())); // Kick off the event loop loop.Run(); close(fds[0]); close(fds[1]); return 0; }