Python script for geocoding a text file

Assume that you have a file with some locations as text with one location per line.

For example, here are some school names in Copenhagen, Denmark, stored in schools.csv:

Hyltebjerg Skole
Heibergskolen
Ellebjerg Skole
Katrinedals Skole
Peder Lykke Skolen
Amager Fælled Skole
Tingbjerg Heldagsskole
Øster Farimagsgades Skole
Sankt Annæ Gymnasiums Grundskole
Lykkebo Skole
Randersgades Skole
Strandvejsskolen
Sortedamskolen
Grøndalsvængets Skole
Sølvgades Skole
Skolen ved Sundet
Hanssted Skole
Holbergskolen
Den Classenske Legatskole
Tove Ditlevsens Skole
Lergravsparkens Skole
Vigerslev Allés Skole
Bavnehøj Skole
Ålholm Skole
Langelinieskolen
Guldberg Skole
Husum Skole
Nyboder Skole
Vanløse Skole
Kirkebjerg Skole
Christianshavns Skole
Bellahøj Skole
Kildevældsskolen
Korsager Skole
Nørrebro Park Skole
Utterslev Skole
Skolen på Islands Brygge
Brønshøj Skole
Kirsebærhavens Skole
Rødkilde Skole
Vesterbro Ny Skole
Blågård Skole
Sønderbro Skole
Højdevangens Skole
Oehlenschlægersgades Skole
Vibenshus Skole
Valby Skole
Rådmandsgades Skole
Lundehusskolen
Tagensbo Skole

Here is a script, geocode.py, that will attempt to geocode each location in an input stream. It prints CSV output to stdout with the fields input_line, input_line_no, result_no, place, latitude, longitude:

from geopy import geocoders
import sys
import time
import pdb
 
geocoder = geocoders.GoogleV3()
 
SEPARATOR='|'  # can also use tab. Comma is bad, since the place will most likely contain a comma.
dummy = ['', ['', '']]
i = 0
 
header = ['input_line', 'input_line_no', 'result_no', 'place', 'latitude', 'longitude']
print(SEPARATOR.join(header))
 
for line in sys.stdin:
    line = line.strip()
    results = geocoder.geocode(line, exactly_one=False) or [dummy]
    for j, res in enumerate(results):
        place = res[0]
        lat = str(res[1][0])
        lon = str(res[1][1])
        out = SEPARATOR.join([line, str(i), str(j), place, lat, lon])
        print (out)
    time.sleep(0.05)
    i += 1

Here is how you might use the script:

cat schools.csv | python geocode.py 

Tip: you might want to

Easy parallel HTTP requests with Python and asyncio

Python 3.x, and in particular Python 3.5, natively supports asynchronous programming. While asynchronous code can be harder to read than synchronous code, there are many use cases were the added complexity is worthwhile. One such examples is to execute a batch of HTTP requests in parallel, which I will explore in this post. Additionally, the async-await paradigm used by Python 3.5 makes the code almost as easy to understand as synchronous code.

Before we look at asynchronous requests, let us look at the sequential case. This will give us something to compare with later. The code listing below is an example of how to make twenty synchronous HTTP requests:

# Example 1: synchronous requests
import requests

num_requests = 20

responses = [
    requests.get('http://example.org/')
    for i in range(num_requests)
]

How does the total completion time develop as a function of num_requests? The chart below shows my measurements. The curve is unsurprisingly linear:

Using synchronous requests, I was able to execute just five requests per second in my experiment.

Next, let us see how we can make asynchronous HTTP requests with the help of asyncio. The code listing below is an example of how to make twenty asynchronous HTTP requests in Python 3.5 or later:

# Example 2: asynchronous requests
import asyncio
import requests

async def main():
    loop = asyncio.get_event_loop()
    futures = [
        loop.run_in_executor(
            None, 
            requests.get, 
            'http://example.org/'
        )
        for i in range(20)
    ]
    for response in await asyncio.gather(*futures):
        pass

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

The code is now more convoluted. But is it better? In the chart below, you will see the total completion time in seconds as a function of the number of asynchronous requests made.

The stepwise curve indicates that some requests are being executed in parallel. However, the curve is still asymptotically linear. Let’s find out why.

Increasing the number of threads

Notice that there is a step pattern in the chart? The completion time is 1x for 1-5 requests, 2x for 6-10 requests, 3x for 11-15 requests, and so on. The reason that we see this step pattern is that the default Executor has an internal pool of five threads that execute work. While five requests can be executed in parallel, any remaining requests will have to wait for a thread to become available.

So here is an idea. To minimize the total completion time, we could increase the size of the thread pool to match the number of requests we have to make. Luckily, this is easy to do as we will see next. The code listing below is an example of how to make twenty asynchronous HTTP requests with a thread pool of twenty worker threads:

# Example 3: asynchronous requests with larger thread pool
import asyncio
import concurrent.futures
import requests

async def main():

    with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:

        loop = asyncio.get_event_loop()
        futures = [
            loop.run_in_executor(
                executor, 
                requests.get, 
                'http://example.org/'
            )
            for i in range(20)
        ]
        for response in await asyncio.gather(*futures):
            pass


loop = asyncio.get_event_loop()
loop.run_until_complete(main())

Let us rerun the experiment from before.

Bingo. If the executor has a pool of 20 threads then 20 requests take roughly the same amount of time as 1 request.

The next obvious question to ask is: what is the limit to this approach? How does the total completion time increase for a more substantial amount of asynchronous requests and equal amount of threads in the executor pool, say 100 or 1000? Let’s find out.

We are back to linear – the flat kind. The chart shows that we can easily make 500+ asynchronous HTTP requests in the same time it takes to make 10 synchronous requests – or 300 requests per second. Surely a vast improvement. You might have noticed that the chart only goes to 720 requests, not 1000. The reason is that “something bad” happened when I reached 720 requests/threads and my timing program became very sad and stopped working. On a different computer than mine, this limit will likely be found somewhere else.

Conclusion

First, you have seen that with asynchronous requests you can execute more requests per second than with synchronous requests: 300 requests per second compared to just 5 requests per second (in my experiments).

Second, you have seen how to achieve this well-known benefit of async I/O in Python 3.5 using just slightly convoluted code.

I would like to apologize to the people who host example.org for hammering their website during my experiment. I like to think that it was for a good cause.

How to compute the pagerank of almost anything

Whenever two things have a directional relationship to each other, then you can compute the pagerank of those things. For example, you can observe a directional relationships between web pages that link to each other, scientists that cite each other, and chess players that beat each other. The relationship is directional because it matters in what direction the relationship points, e.g. who lost to who in chess.

Intuitively, you may think of directional relationships as a transferal of some abstract value between two parties. For example, when one chess player loses to another in chess, then the value (i.e. relative skill level) of the winner will increase and the value of the loser decrease. Furthermore, the amount of value that is transfered depends on the starting value of each party. For example, if a master chess player loses to a novice chess player in chess, then the relative skill level of the novice will dramatically increase. Conversely, if a novice chess player loses to a master chess player in chess, then that is to be expected. In this situation, the relative skill level of each player should remain roughly the same as before – the status quo.

Below you’ll see an illustration of a small graph with seven nodes and seven edges. The pagerank of each node is illustrated by shading it, where a darker color denotes a higher rank.

If you study this figure, you should notice that:

  • Nodes 1 through 4 all have low rank, because no other nodes point to them
  • Node 5 has a medium rank, because a low-rank node points to it
  • Node 6 has high rank, because many low-rank nodes point to it
  • Node 7 has the highest rank, because a high-rank node points to it, while it points to nothing

Compute pagerank with Python

The pageranks of the nodes in the example graph (see figure above) was computed in Python with the help of the networkx library, which can be installed with pip: pip install networkx. The code that creates a graph and computes pagerank is listed below:

import networkx as nx
 
# Initialize directed graph
G = nx.DiGraph()
 
# Add edges (implicitely adds nodes)
G.add_edge(1,6)
G.add_edge(2,6)
G.add_edge(3,6)
G.add_edge(4,6)
G.add_edge(5,6)
G.add_edge(4,5)
G.add_edge(6,7)
 
# Compute pagerank (keys are node IDs, values are pageranks)
pr = nx.pagerank(G)
"""
{
  1: 0.06242340798778012, 
  2: 0.06242340798778012, 
  3: 0.06242340798778012, 
  4: 0.06242340798778012, 
  5: 0.08895357136701444, 
  6: 0.32374552689540625, 
  7: 0.33760726978645894
}
"""

Notice that each nodes is represented by an integer ID, with no specific semantics tied to the nodes nor the edges. In other words, the graph could equally well represent relationships between web pages, scientists and chess players (or something else entirely).

If your relationships can be assigned weights, e.g. the strength of a victory in chess or the prominence of a link on a web page, then you can add weights to the edges in the graph. Luckily, weighted edges can be easily added in networkx:

G.add_edge(1, 2, weight=0.5)

Dealing with time

You may ask yourself, should a chess game that took place last year impact a player’s rank as much as a game that was won or lost just last week? In many situations, the most meaningful answer would be no. A good way to represent the passing of time in a relationship graph is to use edge weights that decrease over time by some function. For example, an exponential decay function can be used, such that relationships that were formed a long time ago have exponentially lower weight than recently formed relationships. This can be achieved in Python with the ** operator with a negative exponent:

time_decayed_weight = max(.00001, time_passed) ** -1
G.add_edge(1, 2, weight=time_decayed_weight)

We use the trick max(.00001, time_passed) to ensure that we do not raise zero to the power of a negative number. The unit of time passed depends on the domain, and is not essential to the computation. For example, the unit could be milliseconds, years or millennia.

To be continued…

Quick introduction to RabbitMQ and Celery

I like to code in Python. I also like the concept of asynchronous workers to build loosely coupled applications. Luckily, RabbitMQ and Celery can help me do exactly that.

This post is based on a very nice YouTube video by Max Mautner (the one below).

For easy repeatability, I have transcribed the video in this article, with some minor changes for Python 3.

Install RabbitMQ and Celery

I will assume that you are on a Mac. If not, you will have to think a bit to make the instructions work.

First, let us install RabbitMQ if you don’t already have it.

brew install rabbitmq
 
# add the RabbitMQ tools to your path, however you want to do that.
# The tools are in /usr/local/sbin
echo '/usr/local/sbin' >> ~/.bash_profile

Next, I recommend that you create a virtual environment for this project and install Celery in there.

# create virtual environment
virtualenv -p python3 p3env
 
# activate environment
source p3env/bin/activate
 
# install celery in the environment
pip install celery
 
# Test that it worked
celery -h

Example application

This example is the simplest possible. It assumes the defaults for Celery, such as using a local RabbitMQ as the message broker. The application will distribute 10000 addition tasks to be executed by Celery.

We will need two files. One file that defines the Celery task (tasks.py) and one for the driver program (driver.py). Of course, you can call these files anything you want. Also, the driver program is just a simple way to push tasks to RabbitMQ (the Celery default), which will later be dequeued by the Celery workers.

First, let’s create tasks.py:

from celery import Celery
 
app = Celery()
 
@app.task
def add(x, y):
    return x + y

Next, let’s create driver.py:

from tasks import add
 
for i in range(10000):
    # The delay function was added by Celery, when we decorated the add function
    add.delay(i, 1)

As you can see, the driver program consists of a loop that calls add.delay(i, 1) 10000 times. We did not explicitly define the delay function. It was added automatically when we decorated the add function with the annotation @app.task. This means that the function call will be pushed to the message broker and executed asynchronously on the workers.

Run example

To run the example, first start the local RabbitMQ server in a new Terminal window:

# Start message broker
rabbitmq-server
 
# check that /usr/local/sbin is in path 
# if this does not work

In another Terminal window, start the Celery workers:

# activate the virtual env in the new window
source p3evn/bin/activate
 
# start the workers
celery worker -A tasks -l INFO

Finally, run the driver program to create 10000 tasks:

# activate the virtual env in the new window, if needed
source p3evn/bin/activate
 
# run the driver program
python driver.py

Now, in the Terminal window that is running the workers, you should see lines fly by as tasks are being executed asynchronously.

What to do next?

Celery fits a lot of use cases, from web scraping, API consumption, long-running web application tasks etc. The follow-up video by Max demonstrates a simplified web scraping use case. Like the first video, it is succinct and sufficient for a basic understanding.

How to export CSV file to database with Python

Pandas and SQLAlchemy offer powerful conversions between CSV files and tables in databases. Here is a small example:

import pandas as pd
from sqlalchemy import create_engine
 
df = pd.read_csv('mydata.csv')
 
engine = create_engine('sqlite:///mydata.db')
df.to_sql('mytable', engine)

Read more:

How to use non-default profile in boto3

Given an AWS credentials file that looks like this:

[default]
aws_access_key_id = DEFAULT
aws_secret_access_key = SECRET1
 
[dev]
aws_access_key_id = DEV
aws_secret_access_key = SECRET2
 
[prod]
aws_access_key_id = PROD
aws_secret_access_key = SECRET3

You can use any profile, say dev, like this in Python:

import boto3.session
 
dev = boto3.session.Session(profile_name='dev')
 
s3 = dev.resource('s3')
for bucket in s3.buckets.all():
    print(bucket.name)
print('')

PyBrain quickstart and beyond

After pip install bybrain, the PyBrain the quick start essentially goes as follows:

from pybrain.tools.shortcuts import buildNetwork
from pybrain.structure import TanhLayer
from pybrain.datasets import SupervisedDataSet
from pybrain.supervised.trainers import BackpropTrainer
 
# Create a neural network with two inputs, three hidden, and one output
net = buildNetwork(2, 3, 1, bias=True, hiddenclass=TanhLayer)
 
# Create a dataset that matches NN input/output sizes:
xor = SupervisedDataSet(2, 1)
 
# Add input and target values to dataset
# Values correspond to XOR truth table
xor.addSample((0, 0), (0,))
xor.addSample((0, 1), (1,))
xor.addSample((1, 0), (1,))
xor.addSample((1, 1), (0,))
 
trainer = BackpropTrainer(net, xor)
#trainer.trainUntilConvergence()
for epoch in range(1000):
    trainer.train()

However, it does not work, which can be seen by running the following test?

testdata = xor
trainer.testOnData(testdata, verbose = True)  # Works if you are lucky!

Kristina Striegnitz code has written and published an XOR example that works more reliably. The code is effectively reproduced below, in case the original should disappear:

# ... continued from above
 
# Create a recurrent neural network with four hidden nodes (default is SigmoidLayer) 
net = buildNetwork(2, 4, 1, recurrent = True)
 
# Train the network using arguments for learningrate and momentum
trainer = BackpropTrainer(net, xor, learningrate = 0.01, momentum = 0.99, verbose = True)
for epoch in range(1000):
    trainer.train()
 
# This should work every time...
trainer.testOnData(testdata, verbose = True)

How to merge two disjoint random samples?

The problem: Given two random samples, s1 and s2, of size k over two disjoint populations, p1 and p2, how to combine the two k-sized random samples into one k-sized random sample over p1 ∪ p2?

The solution: k times, draw an element s1 ∪ s2; with probability d1 = |p1| / |p1 ∪ p2|, draw the next element from p1; with probability d2 = 1 – d1 draw the next element from p2.

(the solution was on stackoverflow)

In python:

import random
import numpy
 
# sizes
e1 = 1000
e2 = 1000000
 
# populations
p1 = xrange(e1)
p2 = xrange(e1, e2)
 
# sample size
k = 500
 
# random samples
s1 = random.sample(p1, k)
s2 = random.sample(p2, k)
 
# merge samples
merge = []
for i in range(k):
  if s1 and s2:
    merge.append(s1.pop() if random.random < len(p1) / float(len(p1)+len(p2)) else s2.pop())
  elif s1:
    merge.append(s1.pop())
  else:
    merge.append(s2.pop())
 
# Validate
hist = numpy.histogram(merge, bins=[0,500000,1000000])
# The two bins should be roughly equal, i.e. the error should be small.
print abs(hist[0][0] - hist[0][1]) / float(k)
 
# alternatively, use filter to count values below 500K
print abs(len(filter(lambda x: x<500000, merge)) - 250) / 500.0

Get Weather using JSON web service and Python

Get the current weather for Copenhagen:

import urllib2
import json
 
# hent vejret for Koebenhavn
url = 'http://api.openweathermap.org/data/2.5/weather?q=Copenhagen,dk'
response = urllib2.urlopen(url)
 
# parse JSON resultatet
data = json.load(response)
print 'Weather in Copenhagen:', data['weather'][0]['description']

Writing a parser in Python

This is my base pattern for writing a parser in Python by using the pyparsing library. It is slightly more complicated than a hello world in pyparsing, but I think it is more useful as a small example of writing a parser for a real grammar.

A base class PNode is used to provide utility functions to classes implementing parse tree nodes, e.g. turning a parse tree into the original string (except all whitespace is replaced by single space). It assumes that tokens in the input where separated by whitespace, and that all whitespace is the same.

For a particular grammar, I use Python classes to represent nodes in the parse tree; these classes get created by calling the setParseAction method on the corresponding BNF element. I like having these classes because it adds a nice structure to the parse tree.

from pyparsing import *
 
class PNode(object):
    """Base class for parser elements"""
    def __init__(self, tokens):
        super(PNode, self).__init__()
        self.tokens = tokens
 
    def __str__(self):
        return u" ".join(map(lambda x: unicode(x), self.tokens))
 
    def __repr__(self):
        return self.__str__()
 
# Target classes
 
class Integer(PNode):
    def __init__(self, tokens):
        super(Integer, self).__init__(tokens)
        self.value = int(tokens[0])
 
class Comma(PNode):
    def __init__(self, tokens):
        super(Comma, self).__init__(tokens)
 
class IntegerList(PNode):
    def __init__(self, tokens):
        super(IntegerList, self).__init__(tokens)
        self.integers = filter(lambda x: type(x) == Integer, tokens)
        #pdb.set_trace()
        #self.foo = 'bar'
 
# BNF
 
comma = Literal(',').setParseAction(Comma)
integer = Word(nums).setParseAction(Integer)
integer_list = (integer + ZeroOrMore(comma + integer)).setParseAction(IntegerList)
 
bnf = integer_list
bnf += StringEnd()
 
# Try parser
 
parsed_list = bnf.parseString('1,2,3')[0]
 
print parsed_list