Twitter HyperLogLog monoids in Spark

Want to count unique elements in a stream without blowing up memory? In more specific words, do you want to use a HyperLogLog counter in Spark? Until today, I’d never heard the word “monoid” before. However, Twitter Algebird is a project that contains a collection of monoids including a HyperLogLog monoid, which can be used to aggregate a stream into unique elements. The code looks like this:

import com.twitter.algebird._
val aggregator = new HyperLogLogMonoid(12)
inputData.reduceByKey(aggregator.plus(_, _))

This young man tells you all about it, and then some:

The video also mentions another Twitter project, the Storehaus project, which can be used to integrate Spark with a lot of NoSQL databases like DynamoDB. Looks very useful indeed.

And just to go completely crazy with the Twitter project references, the talk also brings on Summingbird. The Twitter team has a separate blog post
about using Summingbird with Spark Streaming.

Easiest way to install a PostgreSQL/PostGIS database on Mac

Installing Postgres+PostGIS has never been easier on Mac. In fact, it is now an app! You download the app-file from postgresapp.com, place it in your Applications folder, and you’re done. Really.

If you think that was over too fast

If you think that was over too fast, there is one more thing you can do. Add the postgreapp “bin” directory to PATH.

vi ~/.bash_profile
 
# add line: export PATH=$PATH:/Applications/Postgres.app/Contents/Versions/9.3/bin

Next time you open terminal you will be able to execute all of the following commands:

PostgreSQL:

clusterdb createdb createlang createuser dropdb droplang
dropuser ecpg initdb oid2name pg_archivecleanup 
pg_basebackup pg_config pg_controldata pg_ctl pg_dump 
pg_dumpall pg_receivexlog pg_resetxlog pg_restore 
pg_standby pg_test_fsync pg_test_timing pg_upgrade 
pgbench postgres postmaster psql reindexdb vacuumdb 
vacuumlo

PROJ.4:

cs2cs geod invgeod invproj nad2bin proj

GDAL:

gdal_contour gdal_grid gdal_rasterize gdal_translate 
gdaladdo gdalbuildvrt gdaldem gdalenhance gdalinfo 
gdallocationinfo gdalmanage gdalserver gdalsrsinfo 
gdaltindex gdaltransform gdalwarp nearblack ogr2ogr 
ogrinfo ogrtindex testepsg

PostGIS:

pgsql2shp raster2pgsql shp2pgsql

That is pretty f’ing awesome!!

Linked Data: First Blood

Knowing a lot about something, makes me more prone to appraising its value. I unfortunately know very little about Linked data. For this reason, I’ve had a very biased and shamefully low opinion about the concept of linked data. I’ve decided to change this.

A repository of linked data that I’ve recently taken an interest in, is DBPedia. DBPedia is a project about extracting structured data (linked data) from Wikipedia, and exposing it via a SPARQL endpoint. With the interested in DBPedia, comes the first sparks (pun intended) of interest in RDF-endpoints and in particular SPARQL.

The brilliant thing about DBPedia (and SPARQL) is that it makes it possible to query a vast repository of information, originally in raw text form, using a proper query language. It’s Wikipedia with a nerd boner on.

So what can you do with SPARQL and DBPedia? There are several examples on the DBPedia homepage.

Here is one (slightly modified one): Find all people born in Copenhagen before 1900 (the link points to a page that executes the query):

PREFIX dbo: <http://dbpedia.org/ontology/>
 
SELECT ?name ?birth ?death ?person WHERE {
     ?person dbo:birthPlace :Copenhagen .
     ?person dbo:birthDate ?birth .
     ?person foaf:name ?name .
     ?person dbo:deathDate ?death .
     FILTER (?birth < "1900-01-01"^^xsd:date) .
}
ORDER BY ?name

Looking at the names that are returned, I believe that those are names of people born in Copenhagen before 1900. A test probe looking up one of the people on the list confirms it. According to Wikipedia, Agnes Charlotte Dagmar Adler was a pianist born in Copenhagen in 1865.

Ok, the hello world of linked data has been commited to this blog. This will NOT be the last thing I write about Linked Data… I’ve seen the light.

This blog post is dedicated to Anders Friis-Christensen, who tried (without luck) to get me interested in Linked Data two years ago. I might be a bit slow, but I eventually get it 🙂

Geocoding Python function for PostgreSQL

Gratefully making use of what others have provided, i.e. geopy, Google and plpythonu.

Type to hold result of geocoding:

CREATE TYPE geocoding AS (
  place text,
  latitude DOUBLE PRECISION,
  longitude DOUBLE PRECISION
);

Function that does the actual geocoding (to be extended with more vendors. Hint: look at geopy wiki). Takes an (arbitrary) input string to be geocoded:

CREATE OR REPLACE FUNCTION python_geocode
(
  input text,
  vendor text DEFAULT 'google'
) RETURNS SETOF geocoding AS
$$
  import time
  from geopy import geocoders
  # https://code.google.com/p/geopy/wiki/GettingStarted
 
  time.sleep(0.2)
  # TODO: Add other available vendors, e.g. Yahoo.
  if vendor.lower() == 'google':
    geocoder = geocoders.GoogleV3()
  else:
    raise ValueError("Invalid geocoder: %s" % vendor)
  try:
    for res in geocoder.geocode(input, exactly_one=False):
      yield {'place': res[0], 'latitude': res[1][0], 'longitude': res[1][1]}
  except:
    pass
$$ LANGUAGE plpythonu VOLATILE;

Example:

SELECT place, ST_SetSRID(ST_MakePoint(longitude, latitude), 4326)
FROM python_geocode('Kostas');

Things related to Docker

Docker is a cool idea and open-source product, that seems to be taking the tech community by storm. Wired will tell you why it is cool in a story titled The Man Who Built a Computer the Size of the Internet.

The short version goes: Docker is a way to deploy and move applications with dependencies between Linux servers, using a container concept. The idea is similar to how applications are installed on a Mac, i.e. “everything in a single package”.

There are a number of supporting and related technologies, which I will now list:

  • Google Borg/Apache Mesos are a related technologies, and perhaps Borg is the original rolemodel for Docker. Borg is apparently being replaced by a new system codenamed Omega (video). According to a Wired Story, it influenced Twitter to develop Mesos (originally developed by researchers at the University of California at Berkeley), now Apache Mesos, to do a similar thing as Borg. It might be fair to say that Docker is an easy version of Borg/Mesos/Omega, for non-geniuses (people generally hired by Google, Twitter etc).
  • CoreOS is a supporting technology, an OS designed for deploying containers such as Docker. As mentioned in Wired, the project is based on Google’s ChromeOS. According to the website of this operating system, CoreOS is “Linux kernel + systemd. That’s about it.”

This is it for now about Docker. Just heard about it a few hours ago in an email from a friend and supervisor.

Watched the RAMCloud video

Today I watched a video on RAMCloud. I have made an index over the various sections of the video, with direct links. You’ll find this index in the bottom of this post.

“The RAMCloud project is creating a new class of storage, based entirely in DRAM, that is 2-3 orders of magnitude faster than existing storage systems”

Notable features (in my oppinion) a fast DRAM backed key-value interface with durability, fast recovery and the potential for adding transactions.

Also, RAMCloud is open source.

How many requests per second can I get out of Redis?

Warning: This is not a very interesting post. I’m toying around with the Redis benchmarking tool. What would be significantly more interesting would be to toy around with the Lua API in Redis, which I’ll do in a subsequent post.

In this post, I’ll try to squeeze as many get/set requests out of Redis as I can. I’ll use the redis-benchmark tool to test just the set and get commands. This is not meant to be a benchmark, but a learning experience to see “what works”.

I’m testing the current stable version of Redis: 2.6.15.

Basic testing approach

First, compile Redis from source (it should “just work”) and place the binaries somewhere useful. Next, start Redis server (I use port 7777 for no specific reason):

redis-server --port 7777

To test (set and get):

redis-benchmark -p 7777 -t set,get -q

You should use the redis-benchmark tool to benchmark Redis, for exactly the reasons mentioned in the pitfalls and misconception section on the Redis benchmarking page. The primary reason is that the tool uses multiple connections, and easily enables commands to be pipelined.

This command above uses the -p switch to set the port, uses the -t to limit the commands we test, and finally the -q switch to limit the output to just the throughput.

Additional notes

Redis is a single-threaded server. Unfortunately it does not seems possible to use the benchmark tool to load-balance over several Redis instances, say running on different ports on the same machine. I guess nothing is keeping me from using consistent hashing (or another partitioning technique) with Redis, but the benchmarking tool does not seem to support any kind of partitioning.

Antirez has a blog post about using a Redis proxy called Twemproxy for doing partitioning with Redis. It can potentially increase the throughput. Unfortunately the Proxy uses the epoll system call in Linux, which does not exist on Mac OS X (where kqueue is used instead), so I can not try it.

All in all, I’ll be evaluating Redis in a purely single-node setup, using a TCP loopback connection to the Redis server running on my laptop.

A further thing that is noted on the benchmarking page is that:

Finally, when very efficient servers are benchmarked (and stores like Redis or memcached definitely fall in this category), it may be difficult to saturate the server. Sometimes, the performance bottleneck is on client side, and not server-side. In that case, the client (i.e. the benchmark program itself) must be fixed, or perhaps scaled out, in order to reach the maximum throughput.

Another reason that Redis may not be saturated by the benchmark is that Redis throughput may is limited by the network well before being limited by the CPU. As I’m running on a local machine, I’m assuming that this is not the case, but I’m not entirely sure that there are not other bottlenecks in the OS regarding communication between the benchmark process and the redis-server process. As noted on the benchmarking page: When client and server run on the same box, the CPU is the limiting factor with redis-benchmark.

Let’s keep all that in mind.

1: Running Redis server on my slightly old Macbook Pro

This is the 100% lazy installation. I compiled Redis from source on my laptop, using all defaults, and simply started it.

Hardware: 2.66 GHz Intel Core 2 Duo, 4 GB 1067 Mhz DDR3
OS: Mac OS X 10.6.8 (Snow Leopard)

The result is 37K and 38K requests per second for set and get respectively:

$ redis-benchmark -p 7777 -t set,get -q
SET: 37174.72 requests per second
GET: 37313.43 requests per second

The standard test uses just a single key. To increase the number of expected cache misses, I’ll run the same test using a million random keys (using the -r switch to set number of keys) to see if it makes a huge difference:

redis-benchmark -p 7777 -t set,get -r 1000000 -q

The difference is roughly 2.8% for set and 3% for get. Nothing dramatic. The performance overall is however not great for this initial setup running unmodified on my laptop.

2: Using pipelining

Now I’ll read the fucking manual. Maybe it helps. Redis has a page about benchmarking Redis. The first suggestion is to use pipelining. It is enabled by using the -P switch with an argument of the number of commands to bunch together in each request. I’ll try 16 as suggested on the page.

$ redis-benchmark -p 7777 -t set,get -P 16 -q
SET: 222222.22 requests per second
GET: 256410.25 requests per second

Actually the throughput varies a lot between different runs of this test, much more than the non-pipelined test. With that in mind, it seems that using a pipeline level of 100 is better than 16, about 30% higher throughput:

$ redis-benchmark -p 7777 -t set,get -P 100 -q
SET: 312500.00 requests per second
GET: 333333.34 requests per second

But using a pipeline level of 1000 is worse. Again there is a lot of variance, so I’d need to do a proper statistical analysis. Here I’m doing a rough estimation, and using pipelining of 100 dominates 1000, and that is all I care about.

Bottom line is that you can get 1 order of magnitude improvement to throughput by using pipelining, at least on my old Macbook Pro. Maybe it will be more or less on a “proper” server.

The question is, can we do better?

3: Using lua scripting

Redis supports Lua scripts that are evaluated server side. This can improve the throughput in the situation where a read is followed by a computation follow by say a write. Without scripting, even if using pipelining, there would be a roundtrip following the initial read in order to do the computation. The benefits of scripting are really application specific, and I’ll not explore that further.

4: Various potential optimizations

  • Use another memory allocation library. Default is libc. Unlikely to have any dramatic effect on the test
  • Other things to consider?

I have not tried any of these optimizations.

5: Givin’er all she’s got!

On the Redis page there are results posted for a high-end server, using TCP loopback (like I am) and without pipelining.

Here are the results for a 2 x Intel X5670 @ 2.93 GHz (without pipelining):

SET: 142653.36 requests per second
GET: 142450.14 requests per second

For Intel(R) Xeon(R) CPU E5520 @ 2.27GHz (with pipelining):

SET: 552028.75 requests per second
GET: 707463.75 requests per second

Note that these are not the same machines.

That is roughly 3.8x increase in throughput (compared to my laptop), in the non-pipelining case (run on the high-end server) and roughly 2x in the pipelining case (run on the not-quite-as-high-end server). Again, take the numbers with a big grain of salt. They essentially say nothing wildly interesting. The main conclusion is that pipelining and perhaps Lua scripting is a good idea. Also that partitioning may improve throughput, in which case you could try the Twemproxy code if you’re on Linux.

Conclusion and next steps

Using a single-node instance of Redis running on my laptop I managed to get 300K requests per second (both get and set). This is achieved only if using pipelining (100 commands at a time). On a high-end machine someone got 700K get requests per second using pipelining, i.e. a bit more than twice the throughput.

My goal is to squeeze 1 million get requests per second out of Redis, for a “realistic workload”. For this I’ll use a partitioning approach. The approach is to use Twemproxy running on a multi-core Linux machine with several Redis instances. The exact setup will take some experimenting to get right.

Out of the box, both pipelining and Lua scripting are good avenues for improving performance with Redis server. I saw 1 order of magnitude improvement to throughput when using pipelining. Both approaches are quite application specific, perhaps Lua scripting more so than pipelining. I did not experiment with lua scripting. That would also be very interesting to try.

A stop watch for Postgres

To time the execution of various stages of a long transaction, I’m using the following function:

CREATE OR REPLACE FUNCTION CVL_TimerLap() RETURNS double precision AS $$
import time
now = time.time()
if not SD.has_key('t_last'):
  SD['t_last'] = now
elapsed = now - SD['t_last']
SD['t_last'] = now
return elapsed
$$ LANGUAGE plpythonu;

The “lap times” are returned using:

SELECT CVL_TimerLap();

This will return the number of seconds (wall-clock time) that have passed since the function was last invoked.

Running LP-solver in Postgres

Having reinstalled PostgreSQL with support for Python and pointing at my non-system python, it is time to test whether I can use the convex optimizer library I’ve installed in my Python 2.7 (pip install cvxopt).

Install PL/Python if not already installed

-- if not already installed. Doesn't hurt.
CREATE extension plpythonu;

Create a function that imports cvxopt:

CREATE OR REPLACE FUNCTION hello_cvxopt()
  RETURNS text
AS $$
  import cvxopt
  RETURN cvxopt.__doc__
$$ LANGUAGE plpythonu IMMUTABLE;

See if it works:

SELECT hello_cvxopt();
-- should return a documentation string

Try the linear programming example:

CREATE OR REPLACE FUNCTION cvxopt_lp_example()
  RETURNS FLOAT[]
AS $$
  FROM cvxopt import matrix, solvers
  A = matrix([ [-1.0, -1.0, 0.0, 1.0], [1.0, -1.0, -1.0, -2.0] ])
  b = matrix([ 1.0, -2.0, 0.0, 4.0 ])
  c = matrix([ 2.0, 1.0 ])
  solvers.options['show_progress'] = FALSE
  sol=solvers.lp(c,A,b)
  RETURN list(sol['x'])
$$ LANGUAGE plpythonu IMMUTABLE;
 
SELECT cvxopt_lp_example();
-- should return something like "{0.499999995215,1.49999999912}"

Try