Quick introduction to RabbitMQ and Celery

I like to code in Python. I also like the concept of asynchronous workers to build loosely coupled applications. Luckily, RabbitMQ and Celery can help me do exactly that.

This post is based on a very nice YouTube video by Max Mautner (the one below).

For easy repeatability, I have transcribed the video in this article, with some minor changes for Python 3.

Install RabbitMQ and Celery

I will assume that you are on a Mac. If not, you will have to think a bit to make the instructions work.

First, let us install RabbitMQ if you don’t already have it.

brew install rabbitmq
# add the RabbitMQ tools to your path, however you want to do that.
# The tools are in /usr/local/sbin
echo '/usr/local/sbin' >> ~/.bash_profile

Next, I recommend that you create a virtual environment for this project and install Celery in there.

# create virtual environment
virtualenv -p python3 p3env
# activate environment
source p3env/bin/activate
# install celery in the environment
pip install celery
# Test that it worked
celery -h

Example application

This example is the simplest possible. It assumes the defaults for Celery, such as using a local RabbitMQ as the message broker. The application will distribute 10000 addition tasks to be executed by Celery.

We will need two files. One file that defines the Celery task (tasks.py) and one for the driver program (driver.py). Of course, you can call these files anything you want. Also, the driver program is just a simple way to push tasks to RabbitMQ (the Celery default), which will later be dequeued by the Celery workers.

First, let’s create tasks.py:

from celery import Celery
app = Celery()
def add(x, y):
    return x + y

Next, let’s create driver.py:

from tasks import add
for i in range(10000):
    # The delay function was added by Celery, when we decorated the add function
    add.delay(i, 1)

As you can see, the driver program consists of a loop that calls add.delay(i, 1) 10000 times. We did not explicitly define the delay function. It was added automatically when we decorated the add function with the annotation @app.task. This means that the function call will be pushed to the message broker and executed asynchronously on the workers.

Run example

To run the example, first start the local RabbitMQ server in a new Terminal window:

# Start message broker
# check that /usr/local/sbin is in path 
# if this does not work

In another Terminal window, start the Celery workers:

# activate the virtual env in the new window
source p3evn/bin/activate
# start the workers
celery worker -A tasks -l INFO

Finally, run the driver program to create 10000 tasks:

# activate the virtual env in the new window, if needed
source p3evn/bin/activate
# run the driver program
python driver.py

Now, in the Terminal window that is running the workers, you should see lines fly by as tasks are being executed asynchronously.

What to do next?

Celery fits a lot of use cases, from web scraping, API consumption, long-running web application tasks etc. The follow-up video by Max demonstrates a simplified web scraping use case. Like the first video, it is succinct and sufficient for a basic understanding.

Using the Python debugger

A few days ago I found out that using the Python debugger is so easy, I can’t believe I haven’t used it before.

Import the module:

import pdb

Set a breakpoint somewhere in your code:

def some_function(self, x, y, z):

Run your program. Now every time ‘some_function’ is called, the Python interpreter will break. At this point you could:

  • type x to inspect the argument passed to the x parameter
  • hit the ‘n’ button to skip over the next line of code
  • hit the ‘c’ button to resume the program
  • hit the ‘h’ button to get help

Easy enough?