Quick introduction to RabbitMQ and Celery

I like to code in Python. I also like the concept of asynchronous workers to build loosely coupled applications. Luckily, RabbitMQ and Celery can help me do exactly that.

This post is based on a very nice YouTube video by Max Mautner (the one below).

For easy repeatability, I have transcribed the video in this article, with some minor changes for Python 3.

Install RabbitMQ and Celery

I will assume that you are on a Mac. If not, you will have to think a bit to make the instructions work.

First, let us install RabbitMQ if you don’t already have it.

brew install rabbitmq
# add the RabbitMQ tools to your path, however you want to do that.
# The tools are in /usr/local/sbin
echo '/usr/local/sbin' >> ~/.bash_profile

Next, I recommend that you create a virtual environment for this project and install Celery in there.

# create virtual environment
virtualenv -p python3 p3env
# activate environment
source p3env/bin/activate
# install celery in the environment
pip install celery
# Test that it worked
celery -h

Example application

This example is the simplest possible. It assumes the defaults for Celery, such as using a local RabbitMQ as the message broker. The application will distribute 10000 addition tasks to be executed by Celery.

We will need two files. One file that defines the Celery task (tasks.py) and one for the driver program (driver.py). Of course, you can call these files anything you want. Also, the driver program is just a simple way to push tasks to RabbitMQ (the Celery default), which will later be dequeued by the Celery workers.

First, let’s create tasks.py:

from celery import Celery
app = Celery()
def add(x, y):
    return x + y

Next, let’s create driver.py:

from tasks import add
for i in range(10000):
    # The delay function was added by Celery, when we decorated the add function
    add.delay(i, 1)

As you can see, the driver program consists of a loop that calls add.delay(i, 1) 10000 times. We did not explicitly define the delay function. It was added automatically when we decorated the add function with the annotation @app.task. This means that the function call will be pushed to the message broker and executed asynchronously on the workers.

Run example

To run the example, first start the local RabbitMQ server in a new Terminal window:

# Start message broker
# check that /usr/local/sbin is in path 
# if this does not work

In another Terminal window, start the Celery workers:

# activate the virtual env in the new window
source p3evn/bin/activate
# start the workers
celery worker -A tasks -l INFO

Finally, run the driver program to create 10000 tasks:

# activate the virtual env in the new window, if needed
source p3evn/bin/activate
# run the driver program
python driver.py

Now, in the Terminal window that is running the workers, you should see lines fly by as tasks are being executed asynchronously.

What to do next?

Celery fits a lot of use cases, from web scraping, API consumption, long-running web application tasks etc. The follow-up video by Max demonstrates a simplified web scraping use case. Like the first video, it is succinct and sufficient for a basic understanding.

How to Become a Web Scraping Pro with Python pt. 1

Scrapy is an excellent Python library for web scraping. For example, you could create an API with data that is populated via web scraping. This article covers some basic scrapy features, such as the shell and selectors.

Install scrapy in virtual environment on your machine:

$ virtualenv venv
$ source venv/bin/activate
$ pip install scrapy

To learn about scrapy, the shell is a good place to start, because it offers an interactive environment where you can try selectors on a concrete web page. Here is how to start the scrapy shell:

$ scrapy shell http://doc.scrapy.org/en/latest/topics/selectors.html


Now, try out different selections.

You can select elements on a page with CSS and XPath; these selectors can be stringed together. For example, use css to select a tags and xpath to select the href attribute of those tags:

>>> for link in response.css('a').xpath('@href').extract():
>>>   print link


Now you are ready to head over to the documentation to read more about how to become great a using scrapy. Another tip is to follow the scrapinghub blog.

How to export CSV file to database with Python

Pandas and SQLAlchemy offer powerful conversions between CSV files and tables in databases. Here is a small example:

import pandas as pd
from sqlalchemy import create_engine
df = pd.read_csv('mydata.csv')
engine = create_engine('sqlite:///mydata.db')
df.to_sql('mytable', engine)

Read more:

How to use non-default profile in boto3

Given an AWS credentials file that looks like this:

aws_access_key_id = DEFAULT
aws_secret_access_key = SECRET1
aws_access_key_id = DEV
aws_secret_access_key = SECRET2
aws_access_key_id = PROD
aws_secret_access_key = SECRET3

You can use any profile, say dev, like this in Python:

import boto3.session
dev = boto3.session.Session(profile_name='dev')
s3 = dev.resource('s3')
for bucket in s3.buckets.all():

How to work with spatial data in Amazon Redshift

While Redshift does not offer native support for spatial data, indexes and functions, there exists a partial workaround. Redshift supports Python UDFs and can also load custom Python libraries. Out of the box, Redshift has numpy, scipy, pandas and many other useful Python libraries. For spatial functionality, one saving grace is the high quality spatial libraries that exist for Python, such as shapely. Of course, the alternative is to simply implement useful spatial functions in Python directly, which we will do here. The drawback is that this does not provide the means for spatial indexes or native spatial types in Redshift. As long as you are working mainly with point data, this should not be a huge obstacle. While polygons and operations on them are useful in many cases, a properly utilized GeoHash can usually do the trick.

So, let’s get into it! Connect to your Redshift cluster using a client of your choosing. I prefer SQLWorkbench/J. Properly connected, attempt to create the following UDF in Python, which implements the haversine formula using NumPy (thanks to jterrace for the solution).

CREATE OR REPLACE FUNCTION haversine (lat1 float, lon1 float, lat2 float, lon2 float)
    from math import radians, sin, cos, asin, sqrt, pi, atan2
    import numpy as np
    earth_radius_miles = 3956.0
    def haversine(lat1, lon1, lat2, lon2):
        """Gives the distance between two points on earth.
        lat1, lon1 = radians(lat1), radians(lon1)
        lat2, lon2 = radians(lat2), radians(lon2)
        dlat, dlon = (lat2 - lat1, lon2 - lon1)
        a = sin(dlat/2.0)**2 + cos(lat1) * cos(lat2) * sin(dlon/2.0)**2
        great_circle_distance = 2 * asin(min(1,sqrt(a)))
        return earth_radius_miles * great_circle_distance
    return haversine(lat1, lon1, lat2, lon2)
$$ LANGUAGE plpythonu;

Now, let’s use our new UDF to calculate the great-circle distance between a pair of points.

SELECT haversine(37.160316546736745, -78.75, 39.095962936305476, -121.2890625)
-- 2293.1324218790523

One very big drawback is that it is incredibly slow (an understatement). The following query computes the function just 100 times, which on my cluster took over 17.21 seconds (jeez!):

SELECT COUNT(haversine(37.160316546736745, -78.75, 39.095962936305476, lon2 % 360 - 180)) FROM generate_series(1, 100) lon2

Because the speed is so slow, I will investigate another way to achieve this goal with Redshift. Expect updates to this post.

How to randomly sample k lines from a file in *nix

You can use the shell to extract a random sample of lines from a file in *nix. The two commands you need are “shuf” and “head” (+ “tail” for CSV files with a header). The shuf command will randomly shuffle all the lines of its input. The head command will cut of the input after the first k lines. Examples for both general files and CSV files are given below.

General pattern

To randomly sample 100 lines from any file in *nix:

shuf INPUT_FILE | head -n 100 > SAMPLE_FILE

Pattern for CSV

If you file is a CSV file, you probably want to extract the header and only sample the body. You can use the head and tail commands, respectively, to extract the header and sample the contents of the CSV file.

Extract the header of the CSV file:

head -1 INPUT_FILE.csv > SAMPLE_FILE.csv

Sample 100 lines from the body of the CSV file and append to sample file (notice “>” above versus “>>” below):

tail +2 INPUT_FILE.csv | shuf | head -100 >> SAMPLE_FILE.csv

Install dependencies on Mac

On Mac, the shuf command is not shipped with the OS. You can get it via brew. It will be named “gshuf”:

brew install coreutils

So, on Mac you should replace shuf with gshuf in the example above.

Apache Zeppelin (incubator) rocks!

At Spark Summit Europe 2015, several presenters made use of Apache Zeppeling, which is a notebook (a la IPython) for Spark.

I immediately wanted to try it out myself. I also highly recommend you to download and try it out if you like Spark. But one note: download Zeppelin from GitHub rather than from the apache homepage. The GitHub one is significantly more up to date (today). You do not need to preinstall Spark (but you can if you want), because Zeppelin comes with a stand-alone installation of Spark.

PyBrain quickstart and beyond

After pip install bybrain, the PyBrain the quick start essentially goes as follows:

from pybrain.tools.shortcuts import buildNetwork
from pybrain.structure import TanhLayer
from pybrain.datasets import SupervisedDataSet
from pybrain.supervised.trainers import BackpropTrainer
# Create a neural network with two inputs, three hidden, and one output
net = buildNetwork(2, 3, 1, bias=True, hiddenclass=TanhLayer)
# Create a dataset that matches NN input/output sizes:
xor = SupervisedDataSet(2, 1)
# Add input and target values to dataset
# Values correspond to XOR truth table
xor.addSample((0, 0), (0,))
xor.addSample((0, 1), (1,))
xor.addSample((1, 0), (1,))
xor.addSample((1, 1), (0,))
trainer = BackpropTrainer(net, xor)
for epoch in range(1000):

However, it does not work, which can be seen by running the following test?

testdata = xor
trainer.testOnData(testdata, verbose = True)  # Works if you are lucky!

Kristina Striegnitz code has written and published an XOR example that works more reliably. The code is effectively reproduced below, in case the original should disappear:

# ... continued from above
# Create a recurrent neural network with four hidden nodes (default is SigmoidLayer) 
net = buildNetwork(2, 4, 1, recurrent = True)
# Train the network using arguments for learningrate and momentum
trainer = BackpropTrainer(net, xor, learningrate = 0.01, momentum = 0.99, verbose = True)
for epoch in range(1000):
# This should work every time...
trainer.testOnData(testdata, verbose = True)