CSE444 Lab 6: Parallel Processing

Assigned: Friday, May 18, 2012
Due: Friday, June 1, 2012

Version History:

In this lab, you will augment SimpleDB to run in parallel using multiple processes. These processes may all be on a single machine or may be spread across multiple physical machines! If you have access to multiple machines, you may find it fun to test the latter. Otherwise, you can complete the whole assignment on a single machine by running multiple processes on it.

The main tasks of the lab include:

These three tasks will expose you to three core aspects of parallel data processing: (1) executing queries using more than one process, (2) exchanging data between processes for efficient parallel processing, and (3) optimizing operators for a parallel architecture.

As usual, you are free to choose your own design to implement the various components, but we provide guidelines along the way.

The remainder of this document describes what is involved in building a parallel version of SimpleDB and provides a basic outline of how you might add this functionality to SimpleDB.

As with the previous labs, we recommend that you start as early as possible!

1. Getting started

You should begin with the code you submitted for Lab 3. (If you did not submit code for Lab 3, or your solution didn't work properly, contact us to discuss options).

We have provided you with extra test cases as well as source code files for this lab that are not in the original code distribution you received. We reiterate that the unit tests we provide are to help guide your implementation along, but they are not intended to be comprehensive or to establish correctness. You will need to add these new test cases to your release. The easiest way to do this is to untar the new code in the same directory as your top-level simpledb directory, as follows:

Implementation hints

We suggest exercises along this document to guide your implementation, but you may find that a different order makes more sense for you. As before, we will grade your assignment by looking at your code and verifying that you have passed the test for the ant targets test and systemtest. See Section 3.4 for a complete discussion of grading and the tests you will need to pass.

Here's a rough outline of one way you might proceed with this lab. More details on these steps are given in Section 2 below.

 

2. Parallel SimpleDB outline

SimpleDB follows a standard architecture for a parallel database management system as illustrated in Figure 1.

Figure 1

Figure 1: Overview of parallel SimpleDB

When executing a single instance of SimpleDB, as we did in the previous assignments, we run a single SimpleDB process (i.e., java application), which receives commands from the user in the form of SQL queries, parses, optimizes, and executes these queries, then returns results to the user. In the parallel deployment, we execute multiple instances of SimpleDB, each in its own process (i.e., separate java applications). One process performs the special role of the coordinator, while all other processes are workers. Each relation in the database is partitioned horizontally and each worker sees only one fragment of each relation.

Note:

The major difference between single-node SimpleDB and parallel SimpleDB is that each query execution is now completed by a set of concurrently executing processes that can only communicate over the network or through inter-process communication techniques (if all processes run on a single machine). SimpleDB accomplishes data transfers, such as when joining two tables, using four operators dedicated to data sharing. They are described below and illustrated in Figure 2.

Figure 2

The two data sharing operators require inter-process communication (IPC). We use TCP connections to implement this. More specifically, we use the Apache Mina framework. More information on java socket programming can be found here: http://docs.oracle.com/javase/tutorial/networking/sockets/index.html. More information about Mina can be found here: http://mina.apache.org/documentation.html.

An IPC action is acomplished by a sender process sending some data to a receiver process. In SimpleDB, all senders are instances of type Producer, and all receivers are instances of type Consumer. Thus, both data transfer operations, that is, shuffling and collecting, are implemented using a Producer-Consumer pair, where the Producer and the Consumer reside at different workers. In other words, CollectProducer and CollectConsumer together perform the function of collecting tuples, and ShuffleProducer and ShuffleConsumer together perform the function of shuffling tuples.

SimpleDB performs the following sequence of actions to execute a query in parallel:

  1. The coordinator handles all interactions with the user. As in the previous labs, the user submits a query either as a command-line input or as a command-line argument specified using the -f option during system startup. The latter is useful for testing.
  2. The coordinator translates the SQL query to a sequential execution plan that can be executed on a stand-alone (not parallel) SimpleDB instance.
  3. Then, the coordinator inserts the data sharing operators into the sequential plan. A (ShuffleProducer, ShuffleConsumer) pair is inserted between relational operators that require data shuffling such as between a join operator and each of its children. A (CollectProducer, CollectConsumer) pair is inserted when data needs to be collected at a single worker, for example, just before an aggregate operator. This produces a parallel query plan that the parallel SimpleDB can execute.
  4. Although we have a functional parallel query plan at this point, it might not be efficient. Unlike a single-site DBMS, inter-process data transfers in a parallel DBMS are expensive. Thus, a good parallel query execution plan should try to minimize the amount of data exchanged between the processes. During query optimization, each parallel query plan goes through a chain of optimizers, where each optimizer improves a distinct aspect of its input query plan. SimpleDB's optimizer chain is implemented by the ParallelQueryPlanOptimizer class that uses the following set of optimizers: ProjectOptimizer, FilterOptimizer, BloomFilterOptimizer, and AggregateOptimizer. By default SimpleDB uses all implemented optimizers: ProjectOptimizer, FilterOptimizer, BloomFilterOptimizer, and AggregateOptimizer. You can switch optimizers on/off in the processQuery method in Server.java. For example, to switch off the ProjectOptimizer, just comment out the line:
    p = ParallelQueryPlan.optimize(tid, p, new ProjectOptimizer());
    Among the optimizers, you are only required to implement some parts of AggregateOptimizer. All the other optimizers are already implemented for you.
  5. After optimizing the plan, the coordinator dispatches the optimized query plan to the workers. The workers execute the query plans on their locally available datasets and return the final query output to the coordinator.
  6. Finally, the coordinator outputs the results back to the user.

As part of this lab, you will implement three components of the parallel SimpleDB system: pieces of the worker process, the Shuffle operator, and the AggregateOptimizer.

 

2.1 Starting a Parallel SimpleDB Deployment

We provide a script to deploy SimpleDB in parallel either on a single machine or on separate physical machines.

You can find the Bash script under src/bin/startSimpleDB.sh. The script is designed for use on Linux, Mac OS and Windows/CYGWIN.

This script performs the following actions in order:

  1. Splits all relations in the database horizontally. The number of data partitions equals the number of workers. Each worker will get one data partition.
  2. Copies each database fragment to the physical machine where the corresponding worker will execute.
  3. Starts worker processes on their physical machines.
  4. Starts the coordinator process on its physical machine

You can configure the IP and port number for the coordinator and the worker processes by editing the files conf/server.conf and conf/workers.conf. To add more workers, simply add more lines with new port numbers in the file conf/workers.conf.

The script uses ssh/rsync in the first data copying step. You are required to install an ssh server on each worker machine.

SSH uses two user authentication mechanisms, i.e. username/password pair and public-key authentication. To run the script, public-key authentication is required. The configuration steps for all the three platforms are generally the same.

If you are going to run all processes on the same machine, the simplest configuration that you can try looks as follows:

cd ~/.ssh

// When you execute the following, type ENTER to accept all the default options.
// You don't have to enter a passphrase
ssh-keygen -t rsa

cat id_rsa.pub > .ssh/authorized_keys

If you are going to use multiple machines, the last step requires that you copy the local public key (id_rsa.pub) and add it to the authorized_keys file on the remote machines.

If these simple instructions fail, you can find more detailed instructions here: https://hkn.eecs.berkeley.edu/~dhsu/ssh_public_key_howto.html.

 

To start your parallel SimpleDB deployment, you should execute this script as follows:

// Step 1: Build SimpleDB either through eclipse or by running "ant" on the command line

// Step 2: Make the script executable:
chmod u+x bin/startSimpleDB.sh

// Step 3: Run the script located in the ***top-level bin directory***:
./bin/startSimpleDB.sh catalogFile [-explain] [-f queryFile]

// For example
./bin/startSimpleDB.sh /PATH/TO/DATABASE/imdb.schema

// The output at the coordinator should look as follows
 
Start splitting data files to worker partitions ... more messages here... Computing table stats.
Done.
SimpleDB>
// For each worker, a new terminal window should appear with some messages including ... some messages here... Worker started at port:24448 // Step 4: You can now execute queries by entering SQL statements at the coordinator. // While the query will appear to execute, it will return an empty result because the worker process // implementation is not complete SimpleDB> select * from Actor; // Step 5: To quit the system, simply run "quit" on the coordinator // The workers will automatically terminate after the coordinator terminates SimpleDB> quit; or SimpleDB> exit; or Ctrl-C

Debugging hints: Debugging distributed systems can be challenging. You may find the following information useful:

 

2.2 Worker Process and Parallel Selection Queries

In the first part of the assignment, we will execute a parallel selection query over the IMDB database that we used in lab 4. In this assignment, we will use the 10% version of IMDB: sample-0.1.tar.bz2 but feel free to use the smaller samples for debugging.

When executing a selection query in parallel, worker nodes need not talk to each other. Each worker will execute the selection locally and will send the results to the coordinator using the Collect operator as illustrated in Figure 2.

We provided you with the code for the coordinator (see simpledb.parallel.Server.java) but you need to complete the implementation of the workers in simpledb.parallel.Worker.java.

Note that before you implement the actual query execution on workers, for each query, the workers always return an empty result set to the server.

The majority of the Worker class is already provided. You only need to implement two methods, i.e. localizeQueryPlan and executeQuery.

After a queryplan is received from the coordinator, a worker needs to replace some database information in the queryplan with local versions of the same information. Three changes need to be made to the query plan inside the localizeQueryPlan method:

The executeQuery method does the real query execution. Intially we implement it to always return an empty result set to the coordinator. Replace this code with a real query execution implementation.

 

Exercise 1: Complete the implementation of the Worker class.

You should now be able to pass the WorkerTest.

Execute the following parallel selection query on the 10% sample of the IMDB database.
select * from Actor where id < 1000; 

Please report the execution time when using 1, 2, and 4 workers.

To change the number of worker processes, you need to edit the file conf/workers.conf. Notice that the runtime will be different the first time you execute a query (cold cache) vs the subsequent times that you execute the same query (warm cache). Feel free to report either number or both. Comment on the SimpleDB performance. Do you see a linear speed-up or something else? Feel free to try other queries and other numbers of workers. Do not worry about having good or bad performance. Simply tell us what you see and reflect on what you see in your lab write-up.

 

2.2. Shuffle Operator and Parallel Join Queries

In order to execute join queries in parallel, workers need the ability to exchange data using a Shuffle operator as illustrated in Figure 2.

In this assignment, we will only implement parallel equijoins. To execute an equijoin in parallel, SimpleDB must ensure that tuples from the two relations that we want to join are sent to the same worker. For example, if we want to join Actor and Casts using the predicate Actor.id = Casts.pid, we must ensure that tuples from Actor and tuples from Casts that share the same value of an actor ID are sent to the same worker. The shuffle operator performs this data re-partitioning.

The shuffle operator has two components: a ShuffleProducer and a ShuffleConsumer.

The ShuffleProducer pulls data from the operator below it in the query plan. It applies a hash function to the value of the join attribute of each input tuple. The computed hash value determines the worker where the tuple should be routed. The ShuffleProducer maintains one output queue and one TCP connection for each worker.

The ShuffleConsumer unions all the tuples received from upstream ShuffleProducers and makes them available to the join operator using the standard Iterator interface.

 

Exercise 2: Complete the implementation of the ShuffleProducer and ShuffleConsumer classes.

You should now be able to pass the ShuffleTest.

Execute the following parallel query on the 10% sample of the IMDB database.
  select m.name,m.year,g.genre
  from Movie m,Director d,Genre g,Movie_Director md
  where d.fname='Steven' and d.lname='Spielberg'
    and d.id=md.did and md.mid=m.id
    and g.mid=m.id; 
  

Please report the execution time when using 1, 2, and 4 workers. To change the number of worker processes, you need to edit the file conf/workers.conf. Do not worry about having good or bad performance. Simply tell us what you see and reflect on what you see in your lab write-up.

 

2.3. Optimizing Parallel Aggregation Operator

The computation of aggregates has two different cases

These basic implementations leave significant room for improvement. For example, when executing count(*) on Actor, each worker can compute the local count and then send this partial count to the collector worker. The same mechanism can be applied in aggregates with group by. For example, if we want to compute the count of all actors that appeared in each movie, we can scan the Casts relation in parallel on all the workers. Each worker can group the local data on the movie ID (mid) and can count the total number of actors for each movie locally. It can then re-shuffle the resulting partial aggregates such that all partial results for the same movie land on the same worker, which will perform the final aggregate computation by summing up the intermediate counts.

The above optimization works for all algebraic aggregates including min, max, count, and average (note that for average, the partial aggregate takes the form of a sum and count). It does not work for holistic aggregates such as median.

In this last exercise, we ask you to implement this optimization for the following standard algebraic aggregates: min, count, sum and average. The implementation for max is already provided as example. Note that, for avg, what you are required to implement is not the optimization code, but the SC_AVG aggregate function.

If the aggregate is accompanied by a group by, every worker receives the same quey plan and then directly reports the results to the coordinator. If group by is not in the query, we need a worker to collect the partial aggregate results from all the workers (including itself) and then do the aggregation. The coordinator will pick the collector worker (randomly in the current implementation) before the query plans get dispatched. We call the selected worker the Master Worker. In this case, the query plan of the master worker is different from that of the other workers. The query plan of all other workers ends with sending partial aggregate results to the master worker. While the master query plan does both the partial aggregate (on its own partition of the database), and the final aggregate of the partial aggregate results. The master worker finally sends the final aggregate result to the coordinator.

 

Exercise 3: Complete the implementation of simpledb.parallel.AggregateOptimizer.java and the SC_AVG operator in Aggregate.java and Aggregator.java

You should now be able to pass the ParallelAggregateTest.

Execute the following parallel query on the 10% sample of the IMDB database.
  select m.name,count(a.id)
  from Movie m,Director d,Movie_Director md, Actor a,Casts c
  where d.fname='Steven' and d.lname='Spielberg'
    and d.id=md.did and md.mid=m.id
    and c.mid=m.id
    and c.pid=a.id
    group by m.name;
    

Please report the execution time when using 1, 2, and 4 workers. To change the number of worker processes, you need to edit the file conf/workers.conf. Again, do not worry about having good or bad performance. Simply tell us what you see and reflect on what you see in your lab write-up. You may find it interesting to try and run the query with or without this optimization.

 

3. Logistics

You must submit your code (see below) as well as a short (2 pages, maximum) write-up describing your approach. This write-up should:

 

3.1. Collaboration

All CSE 444 labs are to be completed INDIVIDUALLY! However, you may discuss your high-level approach to solving each lab with other students in the class.

 

3.2. Submitting your assignment

To submit your code, please create a CSE444-lab6.tar.gz tarball (such that, untarred, it creates a CSE444-lab6 directory with your source code at CSE444-lab6/src/java/simpledb). Include your individual writeup in the tarball that you submit:

 

$ cp lab6_writeup.pdf CSE444-lab6

$ tar -cvzf CSE444-lab6.tar.gz CSE444-lab6

Please do not use the ant handin target to create your submission archive.

Submit your tarball for the Lab 6 assigment to the dropbox.. You may submit your code multiple times; we will use the latest version you submit that arrives before the deadline (before 11:59pm on the due date). Please submit your writeup as a PDF, plain text file, or word document (.doc or .docx).

3.3. Submitting a bug

SimpleDB is a relatively complex piece of code. It is very possible you are going to find bugs, inconsistencies, and bad, outdated, or incorrect documentation, etc.

We ask you, therefore, to do this lab with an adventurous mindset. Don't get mad if something is not clear, or even wrong; rather, try to figure it out yourself or send us a friendly email. Please submit (friendly!) bug reports to the course staff. When you do, please try to include:

You can also post on the class message board if you feel you have run into a bug.

3.4 Grading

50% of your grade will be based on whether or not your code passes the test suite we will run over it. These tests will be a superset of the tests we have provided. Before handing in your code, you should make sure it produces no errors (passes all of the tests) from both ant test and ant systemtest.

Important: before testing, we will replace your build.xml, HeapFileEncoder.java, and the entire contents of the test/ directory with our version of these files! This means you cannot change the format of .dat files! You should therefore be careful changing our APIs. This also means you need to test whether your code compiles with our test programs. In other words, we will untar your tarball, replace the files mentioned above, compile it, and then grade it. It will look roughly like this:

$ gunzip CSE444-lab6.tar.gz
$ tar xvf CSE444-lab6.tar
$ cd CSE444-lab6
[replace build.xml, HeapFileEncoder.java, and test]
$ ant test
$ ant systemtest
[additional tests]
    

If any of these commands fail, we'll be unhappy, and, therefore, so will your grade.

An additional 50% of your grade will be based on the quality of your writeup and our subjective evaluation of your code.

We've had a lot of fun designing this assignment, and we hope you enjoy hacking on it!