This is a mini-workshop that shows you how to work with Spark on Amazon Elastic Map-Reduce; It's a kind of hello world of Spark on EMR. We will solve a simple problem, namely use Spark and Amazon EMR to count the words in a text file stored in S3.
To follow along you will need the following:
- An account on Amazon Web Services (AWS)
- AWS cli tools installed
- AWS EMR cli tools installed
Create some test data in S3
We will count the words in the U.S. constitution, more specifically count the words in a text file that I have found online. Step one is to upload this file to Amazon S3, so that the Spark cluster (created in next section) can access it.
Download the file locally first:
wget http://www.usconstitution.net/const.txt |
Create a bucket to hold the data on S3:
aws s3 mb s3://[your-bucket-name] |
Finally, upload the file to S3:
aws s3 mv const.txt s3://[your-bucket-name]/us-constitution.txt |
Create Spark cluster on AWS EMR
To create a Spark cluster on Amazon EMR, we need to pick an instance type for the machines. For this small toy example we will use three m3.xlarge instances. You can consult the Amazon EMR price list for an overview of all supported instance types on Amazon EMR.
Launch a Spark 0.8.1 cluster with three m3.xlarge instances on Amazon EMR:
elastic-mapreduce --create --alive --name "Spark/Shark Cluster" \ --bootstrap-action s3://elasticmapreduce/samples/spark/0.8.1/install-spark-shark.sh \ --bootstrap-name "Spark/Shark" --instance-type m3.xlarge --instance-count 3 |
If everything worked, the command returns a job flow ID, e.g. a message saying something like "Created job flow j-1R2OWN88UD8ZC".
It will take a few minutes before the cluster is in the "WAITING" state, which means that it is ready to accept queries. We can check that cluster is "WAITING" state using the --list option to elastic-mapreduce:
elastic-mapreduce --list j-1R2OWN88UD8ZC # replace j-1R2OWN88UD8ZC with the ID you got when launching the cluster |
When cluster has status "WAITING", connect to the master node of the Spark Cluster using SSH:
elastic-mapreduce --ssh j-1R2OWN88UD8ZC # replace j-1R2OWN88UD8ZC with the ID you got when launching the cluster |
You should now be connected to the master node of your Spark cluster...
Run query in spark shell
To run the word-count query, we will enter the Spark shell installed on the master node. Since the text file is really unstructured, it is perfect for a map-reduce type query. Once in the shell, we will express the word-count query in the Scala programming language.
Enter spark shell:
SPARK_MEM="2g" /home/hadoop/spark/spark-shell |
(In Spark shell) load U.S. constitution text file:
val file = sc.textFile("s3://[your-bucket-name]/us-constitution.txt") |
(In Spark shell) count words in file, replacing dots and commas with space:
// remove linebreaks before pasting... val counts = file .flatMap(line => line .toLowerCase() .replace(".", " ") .replace(",", " ") .split(" ")) .map(word => (word, 1L)) .reduceByKey(_ + _) |
(In Spark shell) Inspect ten most prominent words (using unary minus to invert sort-order, i.e. descending):
val sorted_counts = counts.collect().sortBy(wc => -wc._2) sorted_counts.take(10).foreach(println) # prints lines containing (word, count) pairs |
Save the sorted counts in S3:
sc.parallelize(sorted_counts).saveAsTextFile("s3://[your-bucket-name]/wordcount-us-consitution") |
(Back on local machine) remember to terminate cluster when done:
elastic-mapreduce --terminate j-1R2OWN88UD8ZC # replace j-1R2OWN88UD8ZC with the ID you got when launching the cluster |
If you've forgotten the Cluster ID, you can get a list of active clusters using the --list command:
elastic-mapreduce --list --active |
Caveats
When first drafting this example, I was tempted to use a cheaper instance, i.e. m1.small. While Amazon EMR officially supports this instance type (tagged as "General Purpose - Previous Generation"), the word-count example didn't work for me using this instance type. When I switched to the more recent and "beefier" instance type, m3.xlarge, everything worked out fine.
I also tried to bootstrap the instances with the latest version of Spark (1.0.0 at time of writing). This failed to even launch on the m1.small instance. Note that the install script in 1.0.0 is a ruby-script (s3://elasticmapreduce/samples/spark/1.0.0/install-spark-shark.rb) instead of the 0.8.1 shell-script (s3://elasticmapreduce/samples/spark/0.8.1/install-spark-shark.sh). It is worth trying the example above, with Spark 1.0.0, and using a current instance, e.g. m3.xlarge.
For more examples, check the Spark examples section, which includes the wordcount example that I've adapted a bit.
Pingback: Process logs with Kinesis, S3, Apache Spark on EMR, Amazon RDS | BigSnarf blog
Pingback: Running Apache Spark EMR and EC2 scripts on AWS with read write S3 | BigSnarf blog