Python AMQPlib Common Idioms

November 12th, 2009 jeremy 2 comments

I’ve been looking at implementing AMQP for quite some time now to replace our globs of ad-hoc system communication, and so far I’ve been enjoying learning the technology. I’ve been going over the AMQP spec, Rabbit MQ’s Documentation and have read Rabbits and Warrens a handful of times.

One thing that I’m yet to run across is some examples of the common patterns you can do with AMQP and how to write them with amqplib. I’d like for this page to help answer the question, “so what do I do with it?” after someone has learned about channels, exchanges, queues and routing keys. When looking at the following examples, you will notice a lot of similarities in the code, but it’s the small differences that causes the nuances that have given me the most insight.

Set up

Below is the channel setup (and python script initialization) that all of these examples will use. This configuration worked fine for me on Ubuntu after installing the .deb file from the rabbit-mq website.

#!/usr/bin/env python
from amqplib import client_0_8 as amqp
conn = amqp.Connection(host="localhost:5672 ", userid="guest",
    password="guest", virtual_host="/", insist=False)
chan = conn.channel()

Broadcast

When broadcasting, a producer sends out messages to the queue and does not care about who receives them or if their is someone at the other end to receive them. Consumers join and leave the exchange at their leisure. Providing sports score updates is an example where broadcasting would be appropriate. Every time a team scores, a message is sent out to all interested parties with what happened and the new score. Many clients would only want to know scores that happen in real time. Scores that happen in the past would be of no consequence.

When broadcasting, the exchange name alone identifies the destination of the message.

chan.exchange_declare(exchange="score_updates", type="fanout", durable=False,
                      auto_delete=True)
 
# Produce Script
msg = amqp.Message("AL 24: FL 7\nTouch down by Tim Tebow")
chan.basic_publish(msg, exchange="score_updates")
 
# Consume Script
queue = chan.queue_declare("", durable=False, exclusive=True, auto_delete=True)
def score_update_callback(msg):
    # tell person using this device
    pass
chan.basic_consume(queue=queue[0], callback=score_update_callback, no_ack=True)

Notify

This idiom allows consumers to subscribe to events and guarantee that all messages get delivered, even when the consumer is not running. The setup is similar to broadcast with the following exceptions: The queues are given explicit names that relate to the consumer. The name selection must be coordinated across all consumers to be sure they don’t overlap. Queues are made durable and persistent, giving them permanency. In this scheme a consumer should be ran at least once to register its queue. After the initial run, messages sent to the exchange will start to pile up in the queue for the consumer to process.

Let’s say that data records entered in one system should be sent to numerous other systems across an enterprise. The Notifier is the way to go.

When notifying, the exchange name alone identifies the destination of the message.

chan.exchange_declare(exchange="notify.patient.add", type="fanout", durable=True,
                      auto_delete=False)
 
# Producer
msg = amqp.Message("given:John\nsurname:Smith", delivery_mode=2)
chan.basic_publish(msg, exchange="notify.patient.add")
 
# Consumer for Scheduling System
chan.queue_declare("notify.patient.add.scheduling", durable=True, auto_delete=False,
                            exclusive=False)
chan.queue_bind(queue="notify.patient.add.scheduling", exchange="notify.patient.add")
def scheduling_patient_add(msg):
     # add to local database maybe?
     chan.basic_ack(msg.delivery_tag)
 
chan.basic_consume(queue="notify.patient.add.scheduling",
                            callback=scheduling_patient_add)
 
# Consumer for Reporting System
chan.queue_declare("notify.patient.add.reporting", durable=True, auto_delete=False,
                            exclusive=False)
chan.queue_bind(queue="notify.patient.add.reporting", exchange="notify.patient.add")
def reporting_patient_add(msg):
     # add to local database maybe?
     chan.basic_ack(msg.delivery_tag)
 
chan.basic_consume(queue="notify.patient.add.reporting",
                            callback=reporting_patient_add)

It’s important that each consumer has it’s own queue uniquely named. We don’t want
different consumers gobbling up each other’s messages.

Request/Response

This one took me the longest to figure out. It’s an implementation of the standard request/response idiom ubiquitously found in client/server computing. This is also the underlying mechanism to RPC, so if you’re looking to do your own RPC protocol on top of AMQP, then this is the way to go.

I learned the proper way to implement this by looking at the source code of rabbit mq’s java client library along with reading the specification, so if anyone has any pointers to make this code better, I’d love to hear them!

The idea is to use two queues. One for sending the request and one for the response.

chan.exchange_declare(exchange="api", type="direct", durable=False,
                      auto_delete=True)
 
# Responder/Callee/Server
chan.queue_declare(queue="add_patient", durable=False,
    exclusive=False, auto_delete=True)
chan.queue_bind(exchange="api", queue="add_patient", routing_key="add_patient")
def add_patient(msg):
    reply = amqp.Message("Return Value")
    chan.basic_publish(reply, routing_key=msg.reply_to)
chan.basic_consume(queue="add_patient", callback=add_patient, no_ack=True)
 
# Requester/Caller/Client
reply_queue = chan.queue_declare("", durable=False, exclusive=True,
                                                auto_delete=True)
reply_queue = reply_queue[0]
msg = amqp.Message("Test message!")
msg.properties["reply_to"] = reply_queue
chan.basic_publish(msg, exchange="api", routing_key="add_patient")
def handle_response(msg):
    print "Response:", msg.body
chan.basic_consume(queue=reply_queue, callback=handle_response, no_ack=True)
chan.wait()

There is a lot going on in this example, so I’ll try to go through it step-by-step. In this scenario the exchange is defined as a container, while the queue is the application end point. The caller creates a temporary, exclusive queue to serve as a response channel for the responder. AMQP messages have a standard reply_to field for just this purpose.

In this case, the response queue is defined in the global exchange, since it is only temporary and is passed by name to the responder.

Something that I didn’t get much time to go in to is the choice of which queues and exchanges to make durable and set auto delete. In request and response, you generally do not want messages to persist and pile up in the queue if no responder is running, so a time out mechanism will have to be implemented in the application layer. I’m sure there are many other considerations that I haven’t thought about. But I’ll hopefully cross those paths when I come to them.

Categories: Python Tags:

WSGI DIY Application

October 4th, 2009 jeremy No comments

# feed me to mod_wsgi
import os

from beaker.middleware import SessionMiddleware
from paste import httpexceptions
from paste.exceptions.errormiddleware import ErrorMiddleware
from paste.urlparser import make_url_parser

site_dir = os.path.abspath(os.path.dirname(__file__))
app_dir = os.path.join(site_dir, 'apps')

os.environ.update({
     'DBURI': 'postgres://XX:XXXX@localhost/XX',
     'MAKO_CACHE_DIR': '/tmp/XX_mako',
     'MAKO_TEMPLATE_DIR': '%s/mako' % app_dir
})

application = make_url_parser({}, app_dir, 'XX.apps')
application = SessionMiddleware(application, {
    'session.cookie_expires': 30000,
    'session.type' : 'file',
    'session.data_dir' : '/tmp/XX_session',
    'session.key' : '_sid'
})
application = httpexceptions.HTTPExceptionHandler(application)
application = ErrorMiddleware(application, debug=True)
Categories: Python, WSGI Tags:

First Thoughts on WSGI and Paste

July 27th, 2009 jeremy No comments

I’ve finally had some time to look into WSGI, and I’m starting my first project using it. I made the jump because of how easy mod_wsgi was to get working. I particularly like daemon mode where multiple processes can be configured to run the same application. If, on a particular instance, a wsgi application goes horribly wrong (like calls into a shared object that causes a seg fault), mod_wsgi will happily replace the process with a new copy. I find process isolation in a production environment to be manditory. So go mod_wsgi!

I’ve found Ian Bicking’s work on Python Paste to be most illuminating and good brain food to chew on. I particularly enjoyed his article on a do-it-yourself framework.

The smorgasbord approach to pure WSGI development is very appealing: simple interfaces, low coupling, transparent code, small contexts. It really embodies the values I’ve developed over the years writing Python. Middleware is absolutely scrumptious, and I hope to have some packages suitable for release to the rest of the community after I finish this first project.

So far, I’m using:

I have to say that so far, I’ve been very impressed with paste.urlparser. I’ve written and used so many web dispatch systems over the years, and this one really seems to get it right.

The fact that the dispatch sends to the module level is perfect. Other systems, like cherrypy require you to explicitly import all modules we controller code, usually resulting in many import statements in __init__ modules. paste.urlparser, automatically scans for modules with WSGI applications.

Each module, and in turn each file, is its own application. Simple, clean and to the point. Convention over Configuration.

Categories: Python, WSGI Tags:

Automated Ignore Files with Subversion

May 10th, 2007 jeremy No comments

We always have tons of garbage files lying around that we don’t want going into version control. Subversion’s svn:ignore property doesn’t have the best documentation around. So, in order help out, here are two little scripts that work in tandem to ignore whatever you want.

The easiest way is to place .svnignore files in all of your directories that you wish to ignore files in. This file has a list of patterns or file names, one on each line, that should be ignored in this directory.

For example, to ignore foo.py and bar.py the file would have something like this:

   foo.py
   bar.py

Now, what if we have a set of files that we want to ignore across a project? The easiest way is to write scripts that automatically manage these .svnignore files.

In one project, we have template directories named ‘t’ with compiled cheetah files that we do not want in subversion. However, we need the __init__.py files in those directories to be in subversion.

Here is a python script to manage those ignore files:

#!/usr/bin/env python
""" Adds all python files in directories named t except
for __init__.py to the .svnignore files in those
directories recursively from the current directory.
"""
import os
import glob

for root, dirs, files in os.walk(os.curdir):
    if os.path.basename(root) <> 't':
        continue
    files = glob.glob("%s/*.py" % root)
    files = [os.path.basename(f) for f in files]
    files = [f for f in files if f != '__init__.py']
    ignore_path = os.path.join(root, '.svnignore')
    if os.path.exists(ignore_path):
        igh = open(ignore_path)
        ignores = dict([(f.strip(), True) for f in igh])
        igh.close()
    else:
        ignores = {}
    ignores.update(dict([(f, True) for f in files]))

    igh = open(ignore_path, 'w')
    for ignore in ignores.keys():
        igh.write("%s\n" % ignore)
    igh.close()

If a ignore file exists, we pull it in and update it with what we want to ignore. It’s always important that scripts do not break any previous work. So if I manually put an entry in the .svnignore file, the script better not erase it.

Alright, so there we go. We now have ignore files everywhere. The other piece of the puzzle is a script that uses these ignore files and sets the svn properties on the directories.

So here is the last piece of the puzzle:

#!/usr/bin/env python
""" Sets the contents of all .svnignore files to svn:ignore properties
recursively in the current directory. Also attemps to add the
.svnignore file to subversion.
"""
import os
for root, dirs, files in os.walk(os.curdir):
    if '.svnignore' not in files:
        continue
    path = os.path.join(root, '.svnignore')
    os.system('svn propset svn:ignore -F "%s" "%s"' % (path, root))
    os.system('svn add "%s"' % path)
Categories: Python, Sysadmin Tags: