Python AMQPlib Common Idioms
I’ve been looking at implementing AMQP for quite some time now to replace our globs of ad-hoc system communication, and so far I’ve been enjoying learning the technology. I’ve been going over the AMQP spec, Rabbit MQ’s Documentation and have read Rabbits and Warrens a handful of times.
One thing that I’m yet to run across is some examples of the common patterns you can do with AMQP and how to write them with amqplib. I’d like for this page to help answer the question, “so what do I do with it?” after someone has learned about channels, exchanges, queues and routing keys. When looking at the following examples, you will notice a lot of similarities in the code, but it’s the small differences that causes the nuances that have given me the most insight.
Below is the channel setup (and python script initialization) that all of these examples will use. This configuration worked fine for me on Ubuntu after installing the .deb file from the rabbit-mq website.
#!/usr/bin/env python from amqplib import client_0_8 as amqp conn = amqp.Connection(host="localhost:5672 ", userid="guest", password="guest", virtual_host="/", insist=False) chan = conn.channel()
When broadcasting, a producer sends out messages to the queue and does not care about who receives them or if their is someone at the other end to receive them. Consumers join and leave the exchange at their leisure. Providing sports score updates is an example where broadcasting would be appropriate. Every time a team scores, a message is sent out to all interested parties with what happened and the new score. Many clients would only want to know scores that happen in real time. Scores that happen in the past would be of no consequence.
When broadcasting, the exchange name alone identifies the destination of the message.
chan.exchange_declare(exchange="score_updates", type="fanout", durable=False, auto_delete=True) # Produce Script msg = amqp.Message("AL 24: FL 7\nTouch down by Tim Tebow") chan.basic_publish(msg, exchange="score_updates") # Consume Script queue = chan.queue_declare("", durable=False, exclusive=True, auto_delete=True) def score_update_callback(msg): # tell person using this device pass chan.basic_consume(queue=queue, callback=score_update_callback, no_ack=True)
This idiom allows consumers to subscribe to events and guarantee that all messages get delivered, even when the consumer is not running. The setup is similar to broadcast with the following exceptions: The queues are given explicit names that relate to the consumer. The name selection must be coordinated across all consumers to be sure they don’t overlap. Queues are made durable and persistent, giving them permanency. In this scheme a consumer should be ran at least once to register its queue. After the initial run, messages sent to the exchange will start to pile up in the queue for the consumer to process.
Let’s say that data records entered in one system should be sent to numerous other systems across an enterprise. The Notifier is the way to go.
When notifying, the exchange name alone identifies the destination of the message.
chan.exchange_declare(exchange="notify.patient.add", type="fanout", durable=True, auto_delete=False) # Producer msg = amqp.Message("given:John\nsurname:Smith", delivery_mode=2) chan.basic_publish(msg, exchange="notify.patient.add") # Consumer for Scheduling System chan.queue_declare("notify.patient.add.scheduling", durable=True, auto_delete=False, exclusive=False) chan.queue_bind(queue="notify.patient.add.scheduling", exchange="notify.patient.add") def scheduling_patient_add(msg): # add to local database maybe? chan.basic_ack(msg.delivery_tag) chan.basic_consume(queue="notify.patient.add.scheduling", callback=scheduling_patient_add) # Consumer for Reporting System chan.queue_declare("notify.patient.add.reporting", durable=True, auto_delete=False, exclusive=False) chan.queue_bind(queue="notify.patient.add.reporting", exchange="notify.patient.add") def reporting_patient_add(msg): # add to local database maybe? chan.basic_ack(msg.delivery_tag) chan.basic_consume(queue="notify.patient.add.reporting", callback=reporting_patient_add)
It’s important that each consumer has it’s own queue uniquely named. We don’t want
different consumers gobbling up each other’s messages.
This one took me the longest to figure out. It’s an implementation of the standard request/response idiom ubiquitously found in client/server computing. This is also the underlying mechanism to RPC, so if you’re looking to do your own RPC protocol on top of AMQP, then this is the way to go.
I learned the proper way to implement this by looking at the source code of rabbit mq’s java client library along with reading the specification, so if anyone has any pointers to make this code better, I’d love to hear them!
The idea is to use two queues. One for sending the request and one for the response.
chan.exchange_declare(exchange="api", type="direct", durable=False, auto_delete=True) # Responder/Callee/Server chan.queue_declare(queue="add_patient", durable=False, exclusive=False, auto_delete=True) chan.queue_bind(exchange="api", queue="add_patient", routing_key="add_patient") def add_patient(msg): reply = amqp.Message("Return Value") chan.basic_publish(reply, routing_key=msg.reply_to) chan.basic_consume(queue="add_patient", callback=add_patient, no_ack=True) # Requester/Caller/Client reply_queue = chan.queue_declare("", durable=False, exclusive=True, auto_delete=True) reply_queue = reply_queue msg = amqp.Message("Test message!") msg.properties["reply_to"] = reply_queue chan.basic_publish(msg, exchange="api", routing_key="add_patient") def handle_response(msg): print "Response:", msg.body chan.basic_consume(queue=reply_queue, callback=handle_response, no_ack=True) chan.wait()
There is a lot going on in this example, so I’ll try to go through it step-by-step. In this scenario the exchange is defined as a container, while the queue is the application end point. The caller creates a temporary, exclusive queue to serve as a response channel for the responder. AMQP messages have a standard reply_to field for just this purpose.
In this case, the response queue is defined in the global exchange, since it is only temporary and is passed by name to the responder.
Something that I didn’t get much time to go in to is the choice of which queues and exchanges to make durable and set auto delete. In request and response, you generally do not want messages to persist and pile up in the queue if no responder is running, so a time out mechanism will have to be implemented in the application layer. I’m sure there are many other considerations that I haven’t thought about. But I’ll hopefully cross those paths when I come to them.