Project 4: Hadoop and PIG

ESTIMATED TIME: 18 hours.

DUE DATE: Wednesday, 12/03, 11:59pm

TURN IN INSTRUCTIONS: Turn in eight txt files (details below) to the Catalyst drop box.

HADOOP: Hadoop is a software platform that lets one easily write and run applications that process vast amounts of data. Hadoop implements MapReduce, using the Hadoop Distributed File System (HDFS). MapReduce divides applications into many small blocks of work. HDFS creates multiple replicas of data blocks for reliability, placing them on compute nodes around the cluster. MapReduce can then process the data where it is located.

PIG: Pig Latin is a declarative, SQL-style language for ad-hoc analysis of extremely large datasets. The motivation for Pig Latin is the fact that many people who analyze extremely large datasets are entrenched programmers, who are fit in MapReduce. The MapReduce paradigm is low-level and rigid, and leads to a great deal of custom user code that is hard to maintain, and reuse. Pig Latin is implemented in PIG, a system which compiles Pig Latin into physical plans that are then executed over Hadoop.

To get more familiar with MapReduce and PIG, we recommend that you first skim through the following two research papers:

Preliminaries

To do most of the excercises in this tutorial, you will need access to a Hadoop cluster. Follow the instructions here to get access to our department's IBM/Google cluster.

You should also set up PIG locally on your machine. Follow the instruction here to set up PIG. Afterwards copy your PIG folder to the cluster.

Problem 1: Getting started with PIG

After you have set up your pig folder on the cluster, go through the PIG tutorial and read the PIG paper carefully!

Run the first two example scripts in the tutorial on the cluster (script1-hadoop.pig and script2-hadoop.pig), and then answer the following questions:

1.1 How many MapReduce jobs are generated by script1 and script2?

1.2 For each of the two scripts, hHow many map tasks are within the first MapReduce job? How many maps are within later MapReduce jobs? Why do you think that could be? (Note: the number of reduce tasks is always set to the Hadoop default, currently 30.)

1.3 How long does each job take? How long does the entire script take?

1.4 What do tuples look like after command uniq_frequency1 = ... in script1? What do tuples look like after command same = ... in script2?

Hint: Use the cluster web tool here to see the number of map and reduce tasks for your MapReduce jobs.

What you need to turn in: To simplify grading, turn in your answers to the above questions in a file named problem1-answers.txt.

Problem 2: Simple tweak to tutorial

Write a pig script that creates a histogram showing the distribution of user activity levels.

Use gnuplot to plot the results.

So, for a point (x,y) that we plot, we mean to say that y users each performed a total of x queries. You can run the org.apache.pig.tutorial.NonURLDetector(query) filter on the data to remove queries that are empty or only contain URLs.

A few comments to help you get started:

What you need to turn in: Turn in your pig program (named problem2.txt), your computed result file (named problem2-results.txt), and your PNG plot (named problem2-results.png).

Problem 3: Script on the much larger astronomy data

The astro dataset contains snapshots of a cosmological simulation that follows the evolution of structure within a volume of 150 million light years shortly after the Big Bang until the present day. The simulation had 512 timesteps, and snapshots were taken every even timestep. Each snapshot consists of the state of each particle in the simulation.

The following is an example of what the data looks like:

snapshot, #Time   , partId, #mass,        position0, position1, position2, velocity0, velocity1, velocity2, eps, phi,        rho,     temp,    hsmooth, metals
2,        0.290315, 0,      2.09808e-08, -0.489263,  -0.47879,  -0.477001, 0.1433,    0.0721365, 0.265767,     , -0.0263865, 0.38737, 48417.3, 9.6e-06, 0
2,        0.290315, 1,      2.09808e-08, -0.48225,   -0.481107, -0.480114, 0.0703595, 0.142529,  0.0118989,    , -0.0269804, 1.79008, 662495,  9.6e-06, 0
Relevant to us are snapshot, partId, and velocity0-2. Each particle has a unique id (partId) and the data tracks the state of particles over time.

We created three files of such data with different sizes. tiny.txt has only a couple of rows and might be useful when writing the script. medium.txt has about 1.3 MB and might be useful when testing your script locally. Finally, we have large.txt which has 6 GB and is only available on the cluster. All files contain data from 11 snapshots (2, 12, 22, .., 102). On the Hadoop file system, the files are available in folder /tmp/cse444/.

3.1 Write a script that counts the number of particles per snapshot. How many particles are contained in each of the 11 snapshots?

For each of the following datasets (available on the cluster), what is the level of parallelism when running your script (Use the web interface to determine the number of concurrently executing map tasks for each dataset; this is the level of parallelism)? . How long does the script take on each data set (note: time may vary based on other factors such as cluster utilization)? What can you say about the scalability of the system?

Run the script on these files (available on the cluster):
large-2.txt (contains only snapshot at timestep 2)
large-2-12.txt (contains only snapshots at timesteps 2 and 12)
large-2-12-22.txt (contains only snapshots at timesteps 2, 12, 22)
large-2-12-22-32.txt (contains only snapshots at timesteps 2, 12, 22, 32)

For each of the 4 datasets, launch your script on the cluster and check in the web interface how many map tasks are created. You can then immediately cancel your script again.

I ran these before and got the following running times:

large-2.txt 2:04 minutes
large-2-12.txt 1:57 minutes
large-2-12-22.txt 2:02 minutes
large-2-12-22-32.txt 2:02 minutes

(Note: The running times above may be way smaller than what you experienced on the cluster. I ran this on a different cluster.)

What you need to turn in: Turn in your program (named problem3.1.txt), and answers to the above questions (in a file problem3.1-answers.txt).

3.2 For each pair of subsequent snapshots (2, 12), (12, 22), (22, 32), ..., (92, 102), compute the number of particles which increased their velocity, and the number of particles which reduced their velocity.

Your output should have the following format
2.0, accelerate, 2
2.0, decelerate, 3
12.0, decelerate, 16
22.0, accelerate, 2
...
The first column denotes the start snapshot (int or float), the second column accelerate or decelerate, and the third column a count of the number of particles. It's ok to leave out rows with 0 counts. The results you turn in should be based on the large.txt medium.txt dataset.

Note: If you run your script on the large.txt dataset, you will earn 5 points extra credit.

Hints:

What you need to turn in: Turn in your program (named problem3.2.txt), and the computed result file (named problem3.2-results.txt).