How to use non-default profile in boto3

Given an AWS credentials file that looks like this:

[default]
aws_access_key_id = DEFAULT
aws_secret_access_key = SECRET1
 
[dev]
aws_access_key_id = DEV
aws_secret_access_key = SECRET2
 
[prod]
aws_access_key_id = PROD
aws_secret_access_key = SECRET3

You can use any profile, say dev, like this in Python:

import boto3.session
 
dev = boto3.session.Session(profile_name='dev')
 
s3 = dev.resource('s3')
for bucket in s3.buckets.all():
    print(bucket.name)
print('')

Twitter HyperLogLog monoids in Spark

Want to count unique elements in a stream without blowing up memory? In more specific words, do you want to use a HyperLogLog counter in Spark? Until today, I’d never heard the word “monoid” before. However, Twitter Algebird is a project that contains a collection of monoids including a HyperLogLog monoid, which can be used to aggregate a stream into unique elements. The code looks like this:

import com.twitter.algebird._
val aggregator = new HyperLogLogMonoid(12)
inputData.reduceByKey(aggregator.plus(_, _))

This young man tells you all about it, and then some:

The video also mentions another Twitter project, the Storehaus project, which can be used to integrate Spark with a lot of NoSQL databases like DynamoDB. Looks very useful indeed.

And just to go completely crazy with the Twitter project references, the talk also brings on Summingbird. The Twitter team has a separate blog post
about using Summingbird with Spark Streaming.

Word-count exercise with Spark on Amazon EMR

This is a mini-workshop that shows you how to work with Spark on Amazon Elastic Map-Reduce; It’s a kind of hello world of Spark on EMR. We will solve a simple problem, namely use Spark and Amazon EMR to count the words in a text file stored in S3.

To follow along you will need the following:

Create some test data in S3

We will count the words in the U.S. constitution, more specifically count the words in a text file that I have found online. Step one is to upload this file to Amazon S3, so that the Spark cluster (created in next section) can access it.

Download the file locally first:

wget http://www.usconstitution.net/const.txt

Create a bucket to hold the data on S3:

aws s3 mb s3://[your-bucket-name]

Finally, upload the file to S3:

aws s3 mv const.txt s3://[your-bucket-name]/us-constitution.txt

Create Spark cluster on AWS EMR

To create a Spark cluster on Amazon EMR, we need to pick an instance type for the machines. For this small toy example we will use three m3.xlarge instances. You can consult the Amazon EMR price list for an overview of all supported instance types on Amazon EMR.

Launch a Spark 0.8.1 cluster with three m3.xlarge instances on Amazon EMR:

elastic-mapreduce --create --alive --name "Spark/Shark Cluster"  \
--bootstrap-action s3://elasticmapreduce/samples/spark/0.8.1/install-spark-shark.sh \
--bootstrap-name "Spark/Shark"  --instance-type m3.xlarge --instance-count 3

If everything worked, the command returns a job flow ID, e.g. a message saying something like “Created job flow j-1R2OWN88UD8ZC”.

It will take a few minutes before the cluster is in the “WAITING” state, which means that it is ready to accept queries. We can check that cluster is “WAITING” state using the –list option to elastic-mapreduce:

elastic-mapreduce --list j-1R2OWN88UD8ZC
# replace j-1R2OWN88UD8ZC with the ID you got when launching the cluster

When cluster has status “WAITING”, connect to the master node of the Spark Cluster using SSH:

elastic-mapreduce --ssh j-1R2OWN88UD8ZC
# replace j-1R2OWN88UD8ZC with the ID you got when launching the cluster

You should now be connected to the master node of your Spark cluster…

Run query in spark shell

To run the word-count query, we will enter the Spark shell installed on the master node. Since the text file is really unstructured, it is perfect for a map-reduce type query. Once in the shell, we will express the word-count query in the Scala programming language.

Enter spark shell:

SPARK_MEM="2g" /home/hadoop/spark/spark-shell

(In Spark shell) load U.S. constitution text file:

val file = sc.textFile("s3://[your-bucket-name]/us-constitution.txt")

(In Spark shell) count words in file, replacing dots and commas with space:

// remove linebreaks before pasting...
val counts = file
  .flatMap(line => line
    .toLowerCase()
    .replace(".", " ")
    .replace(",", " ")
    .split(" "))
  .map(word => (word, 1L))
  .reduceByKey(_ + _)

(In Spark shell) Inspect ten most prominent words (using unary minus to invert sort-order, i.e. descending):

val sorted_counts = counts.collect().sortBy(wc => -wc._2)
sorted_counts.take(10).foreach(println)
# prints lines containing (word, count) pairs

Save the sorted counts in S3:

sc.parallelize(sorted_counts).saveAsTextFile("s3://[your-bucket-name]/wordcount-us-consitution")

(Back on local machine) remember to terminate cluster when done:

elastic-mapreduce --terminate j-1R2OWN88UD8ZC
# replace j-1R2OWN88UD8ZC with the ID you got when launching the cluster

If you’ve forgotten the Cluster ID, you can get a list of active clusters using the –list command:

elastic-mapreduce --list --active

Caveats

When first drafting this example, I was tempted to use a cheaper instance, i.e. m1.small. While Amazon EMR officially supports this instance type (tagged as “General Purpose – Previous Generation”), the word-count example didn’t work for me using this instance type. When I switched to the more recent and “beefier” instance type, m3.xlarge, everything worked out fine.

I also tried to bootstrap the instances with the latest version of Spark (1.0.0 at time of writing). This failed to even launch on the m1.small instance. Note that the install script in 1.0.0 is a ruby-script (s3://elasticmapreduce/samples/spark/1.0.0/install-spark-shark.rb) instead of the 0.8.1 shell-script (s3://elasticmapreduce/samples/spark/0.8.1/install-spark-shark.sh). It is worth trying the example above, with Spark 1.0.0, and using a current instance, e.g. m3.xlarge.

For more examples, check the Spark examples section, which includes the wordcount example that I’ve adapted a bit.