ZFlow
After reading J.P. Morrison’s book Flow-Based Programming, I thought it would be interesting to implement a driver in Python to try it out.
I decided to not go the multi-threaded approach and instead implement the driver using Python’s generator facility.
The results were promising, but the implementation lacks a few key features that the standard Flow-Based Programming model has. The most important of which is the inability to make loops in the graph.
Creating graphs by hand by specifying edges in a list is tedious. I am led to believe that a graphical process modeler would be required for this kind of programming to be very productive.
Usage
A component is defined by a Python function annotated with the @component decorator. The component can then be attached to other components or used independently.
Arguments to the function specify component ports while arguments to the decorator specify the type of each port.
Here is a primitive add component with two IN ports and one OUT port:
import itertools
@component('IN', 'IN', 'OUT')
def add(a, b, out):
for a1, b2 in itertools.izip(a, b):
yield a1 + b2
This code is written in the list-processing style common to functional languages. If you prefer a more procedural approach:
@component('IN', 'IN', 'OUT')
def add(a, b, out):
while True:
a1 = a.next()
b2 = b.next()
yield a1 + b2
IN port arguments are given as Python generators. The yield statement is used to send values to OUT ports. With only one OUT port, the value yielded is sent.
If there is more than one OUT port, the function must yield tuples. The first element being the port argument. The second being the value to send on the port.
@component('IN', 'OUT', 'OUT')
def split(source, out1, out2):
""" Duplicates a data stream into two streams. """
for data in source:
yield (out1, data)
yield (out2, data)
The library also supports ARRAY ports. Declare an IN ARRAY port using “INA”. Here is a component that sums variable numbers of data streams.
import itertools, operator
@component('INA', 'OUT')
def sum(ops, out):
for row in itertools.izip(*ops):
yield reduce(operator.add, row)
Here is the procedural version:
@component('INA', 'OUT')
def sum(ops, out):
while True:
cur_sum = 0
for op in ops:
cur_sum = cur_sum + op.next()
yield cur_sum
OUT ARRAY ports are similar, but require more work. Use
“OUTA” in the port specification. The function must first yield the number of elements in the OUT ARRAY. After which, values can be yielded. Here is a split component that splits a variable number of times.
@component('IN', 'OUTA')
def splitm(source, out, size=3):
yield (out.size(), size)
for row in source:
for i in range(size):
yield (out(i), row)
In this example we have a control port named size. Any keyword argument to the procedure will be considered control ports.
Running components is done with a runner object. Provide a component and an optional dictionary of control values to create a runner.
After wards, use the connect_in method to attach streams to IN ports and invoke the runner like a procedure to pull values from the OUT ports.
r = runner(add)
r.connect_in('a', [1, 2, 3, 4, 5])
r.connect_in('b', [10, 20, 30, 40, 50])
for total in r('out'):
print total
# Produces 11 22 33 44 55
Components can be connected together using these library calls. Here is a series of components wired together to match the following diagram.

This machine takes a stream of numbers and calculates the equation 3x + 1. The inc component takes a stream of numbers and increments their value by 1.
splitm_r = runner(splitm, {'size' : 3})
sum_r = runner(sum)
inc_r = runner(inc)
sum_r.connect_in('ops', splitm_r('out', 0))
sum_r.connect_in('ops', splitm_r('out', 1))
sum_r.connect_in('ops', inc_r('out'))
inc_r.connect_in('value', splitm_r('out', 2))
splitm_r.connect_in('source', range(5))
for x in sum_r('out'):
print x
Here is an implementation of the inc component for completeness.
@component("IN", "OUT")
def inc(value, out):
for line in value:
yield line+1
Composite components can also be specified in a more declarative way using the network procedure. This procedure takes the following arguments:
- Component name
- Port connections as a list
- The list of ports defined on the composite component which are to be used to connect the component to other components.
- Dictionary of IN paths mapping the composite component’s IN port to an interal component port.
- Dictionary of OUT paths mapping the composite component’s OUT port to an interal component port.
- Dictionary of control ports mapping the composite’s control ports to interal ports.
The previously constructed composite that calculates 3x+1 can be declared as the following:
triple1 = network("triple1", [
["splitm", splitm, ("out", 0), "sum", sum, ("ops", 0)],
["splitm", splitm, ("out", 1), "sum", sum, ("ops", 1)],
["splitm", splitm, ("out", 2), "inc", inc, "value"],
["inc", inc, "out", "sum", sum, ("ops", 2)]
],
["source", "out"],
{"source" : ("splitm", "source")},
{"out" : ("sum", "out")},
{"size" : ("splitm", "size")}
)
This component exposes the same interface as simple components.
t_r = runner(triple1, {'size' : 3})
t_r.connect_in('source', range(10))
for value in t_r('out'):
print value
It is this type of definition that a GUI would be dramatically
helpful for.
Download
With Easy Install, you can install zflow by typing: easy_install zflow on your terminal. For manually installation, see below.
Click below to download a zip file containg the zflow package. Also included is a unit test that provides some examples that may be useful.
Note that this module requires the NetworkX package. Install it by running easy_install networkx.
I would like to see something like this developed into a node-based compositor like Nuke. I’m imaging being able to pop open a node and edit python code to change the image operator.
You might want to check out what’s going on over here: http://bitbucket.org/faide/pyf
They’re working on something that’s going to include a modeler , and zflow is part of the bottom stack.