When you need to process a large amount of data, chaining python generators
is a nice way to set up processing pipelines. Advantage of this method are
that you have fine-grained control over memory usage, and it provides an
easy way of defining the pipelines.
At the end of this post we can define pipelines as follows:
The codes
Let’s walk through the different parts of the code. It is assumed you are
familiar with python generators.
We start with importing csv and itertools. The itertools module is
not used in the example code included in this post, but it provides some
very nice utilities.
A basic processing node
I chose to call each of the processing steps a Node. We start with
implementing a No Operation node to demonstrate the simplest node.
The generator iterates over the input data and yields each item
unmodified.
Running this:
Debug printing is usually very annoying an inefficient, but we all do it
from time to time right? In any case it makes a nice example for a
processing node that as side-effect has an output to stdout.
Running this:
A node to split an iterable into chunks
When inserting data into a database it is efficient to do this multiple
rows at a time. Let’s make a node that makes chunks of size 3. Larger
chunks are more realistic, but for demonstration purpose we use a small
size.
Static config values on a node
How cool would it be if we can configure these nodes at pipeline definition
time? We would need some place to store this information. Maybe a class could
work, passing configuration values to the constructor and a method for the
actual generator. I haven’t tried this, another option is to take advantage
of closures:
This allows us to make the following construction, we pass the config
value to the method that returns the actual generator:
More nodes
Other examples of nodes are:
open_file_node, takes an iterable of filenames and yields file-like objects
parse_csv_node, takes an iterable of text lines and yields data rows
make_upper_node, takes an iterable of iterables and yields lists for each
inner iterable with their items uppercased
print_line_node, same as debug node without printing the source
Chaining generators
Now we have some nodes, we need to have a way to chain them all together.
The following method does that by accepting a list of nodes and a source
iterable. The source is passed to the first generator in the nodes list. The
generator returned by the first node is then passed as an argument to the next
node and so on until there are no more nodes in the list. Finally, the generator
returned by the last node is returned. Iterating this last generator sets the
whole pipeline in motion.
Defining pipelines
Putting it all together we get something like this:
Conclusion and Future
I like the result of this experiment. Although I didn’t take care of any
exception handling nor did I try to use for something serious I think this
can work. One thing I am not sure about is something like progress reporting.
If each node knows beforehand how many items it will be processing, they can
know their own percentage, but how to communicate that. A callable that is
passed in with the source iterable perhaps…
Nodes can also be used to partition data, like make_chunk_node, and fire
off tasks to be processed in parallel. After jobs become ready, the result
can be yielded.