Version History:
In this lab, you will augment SimpleDB to run in parallel using multiple processes. These processes may all be on a single machine or may be spread across multiple physical machines! If you have access to multiple machines, you may find it fun to test the latter. Otherwise, you can complete the whole assignment on a single machine by running multiple processes on it.
The main tasks of the lab include:
These three tasks will expose you to three core aspects of parallel data processing: (1) executing queries using more than one process, (2) exchanging data between processes for efficient parallel processing, and (3) optimizing operators for a parallel architecture.
As usual, you are free to choose your own design to implement the various components, but we provide guidelines along the way.
The remainder of this document describes what is involved in building a parallel version of SimpleDB and provides a basic outline of how you might add this functionality to SimpleDB.
As with the previous labs, we recommend that you start as early as possible!
We have provided you with extra test cases as well as source code files for this lab that are not in the original code distribution you received. We reiterate that the unit tests we provide are to help guide your implementation along, but they are not intended to be comprehensive or to establish correctness. You will need to add these new test cases to your release. The easiest way to do this is to untar the new code in the same directory as your top-level simpledb directory, as follows:
$ tar -cvzf CSE444-lab3-submitted.tar.gz CSE444-lab3
$ mv CSE444-lab3 CSE444-lab6
$ wget http://www.cs.washington.edu/education/courses/cse444/12sp/labs/lab6/CSE444-lab6-supplement.tar.gz
tar -xvzf CSE444-lab6-supplement.tar.gz
We suggest exercises along this document to guide your implementation, but you may find that a different order makes more sense for you. As before, we will grade your assignment by looking at your code and verifying that you have passed the test for the ant targets test and systemtest. See Section 3.4 for a complete discussion of grading and the tests you will need to pass.
Here's a rough outline of one way you might proceed with this lab. More details on these steps are given in Section 2 below.
SimpleDB follows a standard architecture for a parallel database management system as illustrated in Figure 1.
Figure 1: Overview of parallel SimpleDB
When executing a single instance of SimpleDB, as we did in the previous assignments, we run a single SimpleDB process (i.e., java application), which receives commands from the user in the form of SQL queries, parses, optimizes, and executes these queries, then returns results to the user. In the parallel deployment, we execute multiple instances of SimpleDB, each in its own process (i.e., separate java applications). One process performs the special role of the coordinator, while all other processes are workers. Each relation in the database is partitioned horizontally and each worker sees only one fragment of each relation.
Note:
The major difference between single-node SimpleDB and parallel SimpleDB is that each query execution is now completed by a set of concurrently executing processes that can only communicate over the network or through inter-process communication techniques (if all processes run on a single machine). SimpleDB accomplishes data transfers, such as when joining two tables, using four operators dedicated to data sharing. They are described below and illustrated in Figure 2.
An IPC action is acomplished by a sender process sending some data to a receiver process. In SimpleDB, all senders are instances of type Producer, and all receivers are instances of type Consumer. Thus, both data transfer operations, that is, shuffling and collecting, are implemented using a Producer-Consumer pair, where the Producer and the Consumer reside at different workers. In other words, CollectProducer and CollectConsumer together perform the function of collecting tuples, and ShuffleProducer and ShuffleConsumer together perform the function of shuffling tuples.
SimpleDB performs the following sequence of actions to execute a query in parallel:
As part of this lab, you will implement three components of the parallel SimpleDB system: pieces of the worker process, the Shuffle operator, and the AggregateOptimizer.
We provide a script to deploy SimpleDB in parallel either on a single machine or on separate physical machines.
You can find the Bash script under src/bin/startSimpleDB.sh. The script is designed for use on Linux, Mac OS and Windows/CYGWIN.
This script performs the following actions in order:
You can configure the IP and port number for the coordinator and the worker processes by editing the files conf/server.conf and conf/workers.conf. To add more workers, simply add more lines with new port numbers in the file conf/workers.conf.
The script uses ssh/rsync in the first data copying step. You are required to install an ssh server on each worker machine.
SSH uses two user authentication mechanisms, i.e. username/password pair and public-key authentication. To run the script, public-key authentication is required. The configuration steps for all the three platforms are generally the same.
If you are going to run all processes on the same machine, the simplest configuration that you can try looks as follows:
cd ~/.ssh // When you execute the following, type ENTER to accept all the default options. // You don't have to enter a passphrase ssh-keygen -t rsa cat id_rsa.pub > .ssh/authorized_keys
If you are going to use multiple machines, the last step requires that you copy the local public key (id_rsa.pub) and add it to the authorized_keys file on the remote machines.
If these simple instructions fail, you can find more detailed instructions here: https://hkn.eecs.berkeley.edu/~dhsu/ssh_public_key_howto.html.
To start your parallel SimpleDB deployment, you should execute this script as follows:
// Step 1: Build SimpleDB either through eclipse or by running "ant" on the command line // Step 2: Make the script executable: chmod u+x bin/startSimpleDB.sh // Step 3: Run the script located in the ***top-level bin directory***: ./bin/startSimpleDB.sh catalogFile [-explain] [-f queryFile] // For example ./bin/startSimpleDB.sh /PATH/TO/DATABASE/imdb.schema // The output at the coordinator should look as follows
Start splitting data files to worker partitions ... more messages here... Computing table stats.
Done.
SimpleDB> // For each worker, a new terminal window should appear with some messages including ... some messages here... Worker started at port:24448 // Step 4: You can now execute queries by entering SQL statements at the coordinator. // While the query will appear to execute, it will return an empty result because the worker process // implementation is not complete SimpleDB> select * from Actor; // Step 5: To quit the system, simply run "quit" on the coordinator // The workers will automatically terminate after the coordinator terminates SimpleDB> quit; or SimpleDB> exit; or Ctrl-C
Debugging hints: Debugging distributed systems can be challenging. You may find the following information useful:
In the first part of the assignment, we will execute a parallel selection query over the IMDB database that we used in lab 4. In this assignment, we will use the 10% version of IMDB: sample-0.1.tar.bz2 but feel free to use the smaller samples for debugging.
When executing a selection query in parallel, worker nodes need not talk to each other. Each worker will execute the selection locally and will send the results to the coordinator using the Collect operator as illustrated in Figure 2.
We provided you with the code for the coordinator (see simpledb.parallel.Server.java) but you need to complete the implementation of the workers in simpledb.parallel.Worker.java.
Note that before you implement the actual query execution on workers, for each query, the workers always return an empty result set to the server.
The majority of the Worker class is already provided. You only need to implement two methods, i.e. localizeQueryPlan and executeQuery.
After a queryplan is received from the coordinator, a worker needs to replace some database information in the queryplan with local versions of the same information. Three changes need to be made to the query plan inside the localizeQueryPlan method:
The executeQuery method does the real query execution. Intially we implement it to always return an empty result set to the coordinator. Replace this code with a real query execution implementation.
select * from Actor where id < 1000;
In order to execute join queries in parallel, workers need the ability to exchange data using a Shuffle operator as illustrated in Figure 2.
In this assignment, we will only implement parallel equijoins. To execute an equijoin in parallel, SimpleDB must ensure that tuples from the two relations that we want to join are sent to the same worker. For example, if we want to join Actor and Casts using the predicate Actor.id = Casts.pid, we must ensure that tuples from Actor and tuples from Casts that share the same value of an actor ID are sent to the same worker. The shuffle operator performs this data re-partitioning.
The shuffle operator has two components: a ShuffleProducer and a ShuffleConsumer.
The ShuffleProducer pulls data from the operator below it in the query plan. It applies a hash function to the value of the join attribute of each input tuple. The computed hash value determines the worker where the tuple should be routed. The ShuffleProducer maintains one output queue and one TCP connection for each worker.
The ShuffleConsumer unions all the tuples received from upstream ShuffleProducers and makes them available to the join operator using the standard Iterator interface.
select m.name,m.year,g.genre from Movie m,Director d,Genre g,Movie_Director md where d.fname='Steven' and d.lname='Spielberg' and d.id=md.did and md.mid=m.id and g.mid=m.id;
The computation of aggregates has two different cases
These basic implementations leave significant room for improvement. For example, when executing count(*) on Actor, each worker can compute the local count and then send this partial count to the collector worker. The same mechanism can be applied in aggregates with group by. For example, if we want to compute the count of all actors that appeared in each movie, we can scan the Casts relation in parallel on all the workers. Each worker can group the local data on the movie ID (mid) and can count the total number of actors for each movie locally. It can then re-shuffle the resulting partial aggregates such that all partial results for the same movie land on the same worker, which will perform the final aggregate computation by summing up the intermediate counts.
The above optimization works for all algebraic aggregates including min, max, count, and average (note that for average, the partial aggregate takes the form of a sum and count). It does not work for holistic aggregates such as median.
In this last exercise, we ask you to implement this optimization for the following standard algebraic aggregates: min, count, sum and average. The implementation for max is already provided as example. Note that, for avg, what you are required to implement is not the optimization code, but the SC_AVG aggregate function.
If the aggregate is accompanied by a group by, every worker receives the same quey plan and then directly reports the results to the coordinator. If group by is not in the query, we need a worker to collect the partial aggregate results from all the workers (including itself) and then do the aggregation. The coordinator will pick the collector worker (randomly in the current implementation) before the query plans get dispatched. We call the selected worker the Master Worker. In this case, the query plan of the master worker is different from that of the other workers. The query plan of all other workers ends with sending partial aggregate results to the master worker. While the master query plan does both the partial aggregate (on its own partition of the database), and the final aggregate of the partial aggregate results. The master worker finally sends the final aggregate result to the coordinator.
select m.name,count(a.id) from Movie m,Director d,Movie_Director md, Actor a,Casts c where d.fname='Steven' and d.lname='Spielberg' and d.id=md.did and md.mid=m.id and c.mid=m.id and c.pid=a.id group by m.name;
You must submit your code (see below) as well as a short (2 pages, maximum) write-up describing your approach. This write-up should:
To submit your code, please create a CSE444-lab6.tar.gz tarball (such that, untarred, it creates a CSE444-lab6 directory with your source code at CSE444-lab6/src/java/simpledb). Include your individual writeup in the tarball that you submit:
$ cp lab6_writeup.pdf CSE444-lab6 $ tar -cvzf CSE444-lab6.tar.gz CSE444-lab6
Please do not use the ant handin target to create your submission archive.
Submit your tarball for the Lab 6 assigment to the dropbox.. You may submit your code multiple times; we will use the latest version you submit that arrives before the deadline (before 11:59pm on the due date). Please submit your writeup as a PDF, plain text file, or word document (.doc or .docx).
SimpleDB is a relatively complex piece of code. It is very possible you are going to find bugs, inconsistencies, and bad, outdated, or incorrect documentation, etc.
We ask you, therefore, to do this lab with an adventurous mindset. Don't get mad if something is not clear, or even wrong; rather, try to figure it out yourself or send us a friendly email. Please submit (friendly!) bug reports to the course staff. When you do, please try to include:
test/simpledb
directory, compile, and run.
HeapFileEncoder
.
50% of your grade will be based on whether or not your code passes the test suite we will run over it. These tests will be a superset of the tests we have provided. Before handing in your code, you should make sure it produces no errors (passes all of the tests) from both ant test and ant systemtest.
Important: before testing, we will replace your build.xml, HeapFileEncoder.java, and the entire contents of the test/ directory with our version of these files! This means you cannot change the format of .dat files! You should therefore be careful changing our APIs. This also means you need to test whether your code compiles with our test programs. In other words, we will untar your tarball, replace the files mentioned above, compile it, and then grade it. It will look roughly like this:
$ gunzip CSE444-lab6.tar.gz $ tar xvf CSE444-lab6.tar $ cd CSE444-lab6 [replace build.xml, HeapFileEncoder.java, and test] $ ant test $ ant systemtest [additional tests]
If any of these commands fail, we'll be unhappy, and, therefore, so will your grade.
An additional 50% of your grade will be based on the quality of your writeup and our subjective evaluation of your code.
We've had a lot of fun designing this assignment, and we hope you enjoy hacking on it!