Faust, your Python-based streaming library

Robinhood is a very popular California based FinTech, included by Forbes in the top 50 FinTechs to watch in 2019. Their primary mission is to bring down stock trading fees for the common Joe/Jane, although their roadmap also includes cryptocurrency trading.

Due to the nature of the bidding market, their data stack probably includes a lot of stream tooling. Also (probably) due to the lack of quick and easy tooling for streaming with Python, supported with the growing demand for Python in the developer community, they launched their own port of Kafka Streams, called Faust.

In this post, we’re going to show how to easy it is to bootstrap a project with Faust to put your stream related business logic needs in practice very quickly. The demo we prepared is of an app which filters words from a Kafka topic and then keeps a count of how many times it has seen the colors “red”, “green” and “blue”.

In a nutshell, Faust is:

  • easy to bootstrap streaming library for Python to use (mostly) Apache Kafka as the source;
  • relies on Python’s asyncio to split the tasks into agents
  • distributing the workload between agents living in the same cluster
  • provides stateful​processing with support for rocksdb (same as Apache Flink)
  • the agents can use whatever Python code/modules/libraries you are accustomed to use to the transformation or sinking of data (numpy, pandas, boto3, whatever you so desire).

The original developers of Faust describe it as falling under the use case of any of the following domains:

  • Event Processing
  • Distributed Joins & Aggregations
  • Machine Learning
  • Asynchronous Tasks
  • Distributed Computing
  • Data Denormalization
  • Intrusion Detection
  • Realtime Web & Web Sockets.
  • and much more…

Also, luckily, Faust’s documentation is already at a mature stage, even including instructions on how to test your applications, in case you are not familiarised with Python’s asyncio.

Distributed

It’s also worth noticing that Faust is tightly coupled to core Kafka concepts. More concretely, it relies on the way Kafka manages consumer groups to identify whether any partition/topic is not being attended to and launching an agent in an instance where  the application is reporting as functioning. Or, more correctly, it assigns that partition to an agent living on an instance/app that is still identified by Kafka as still being active in the consumer group.

basic_faust.png

On the image above, we have a single Kafka topic which consists of four partitions. We also have two instances running simultaneously of the same Faust app. On Diagram 1 both are working, so the elected Faust leader for that topic will decide which topics it will get and which topics the other consumers in the same consumer group will get the remaining partitions. Since there is a 4 by 2 distribution, it will attribute 2 partitions to each instance.

On Diagram 2, the first instance goes down. Based on a certain timeout (which is can also be parameterised in faust settings), the customer group will inform the leader that one of the consumers is down and will force a re-distribution of topic/partition attribution. In this case, it will assign all four partitions to the same instance.

The reason this is important to mention is that the maximum parallelisation you are able to get out of any given topic is as high as its number of partitions. A topic with eight partitions will, at maximum, be assigned to eight instances of the app.

It’s all about the topics

Streams

When developing Faust, you are creating Agents which are basically async functions that look into every event passing through a stream and apply some logic to it. A stream is the basic abstraction for the communication link between source/agents/sinks and you can, usually, look at them as a Kafka topic.

If you have a simple workflow, like counting words, you would have

  • a single agent that reads from a Kafka topic (not managed by Faust)
  • updates the word count table
  • sends the result to another stream/topic.

However, you could have an agent filtering the words filtering words, selecting only words for the 3 prime colors and sending the “filtered” results to another Faust agent that keeps the actual relevant word count.

So how do the agents communicate to each other? We’ll illustrate later the example, but it’s important to know that if we want agents to communicate with each other we can declare “internal” topics which are managed by Faust. It will take care of creating them with the default (8) or expressed number of partitions.

Stateful / Managed state

Because managed state is a whole different discussion, in order to keep this article focused, it’s important to mention that Faust supports it in a very peculiar way. The concept is that we declare a table for each object that want want to keep track of. Each table is basically a special Faust class that behaves very similarly to a Python Dictionary.

  • for persistence: supports both Rocksdb used for Production (same as Flink, not that they tackle the same problems at the same level) and Memory (useful for development).
  • for recovery: uses topics to keep track of the table’s changelog of each table. This is done for you.

Basis for demo

In order to create sharable and demonstrable code, let’s look at the repository’s structure, which you can find here.

├── Makefile
├── README.md
├── requirements.txt
├── docker
│  ├── docker-compose-local.yml
│  └── kafka-docker
└── src
    └── app.py
  • We’ll need a handy Kafka cluster running locally. For ease of reproducibility, we’re using the popular wurstmeister‘s kafka-docker. 
  • There is a Makefile which eases launching individual components (Kafka+Zookeeper or the sample faust app)
  • Faust app is stored in src/app.py for simplicity, but they have instructions on how to organize larger projects

We will be running our Python code from outside Docker, so we need a virtual environment. We suggest using conda, so… after cloning the repository, you can follow the below to setup your proper environment

conda create -n faust-test python=3.6
conda activate faust-test
pip install -r requirements.txt
make kafka

The last command will launch Zookeeper and Kafka in different containers and will create 2 topics with 1 partition each: input and output. This is the source and sink topics which we’ll use in our Faust app.

Finally, because we will be sending stuff to and from Kafka topics, we need the Kafka executables to achieve that purpose. You can download them here. The bash scripts, inside the bin directory you get after decompressing the tarball, will have the files we need to interact with Kafka as producers or consumers of a certain topic (the input and output mentioned earlier).

Quick code review

This Faust app is relatively simple. It’s purpose is to read all words into the input topic and keep a count of how many times the colors red, green and blue have been sent. Each time either of them has been said, it sends an update to the output topic.

After creating an app object from the faust.App class:

  • we declare two external topics with 1 partition each: input and output (which are created automatically by the provided docker code when running make kafka
  • we declare an internal topic colors_topic which is the means of communications between our 2 agents
  • we also declare a Table object so Faust knows to keep the running count of the colors in a managed state, in case it fails.
  • we create 2 agents: one filters colors from all the words and sends the color to the internal topic. The other agent keeps a count of the filtered colors that come in from the internal topic and reports its running count.

Running the sample code

Assuming you already ran the code in the Basis for demo above, then Kafka should already be running in the background and the conda environment should be activated.  You should also have already downloaded Kafka.

Now, I suggest you split your terminal in 2:

  1. One table located at the repo’s root
  2. Another repo at the bin directory with Kafka’s command line utility scripts.

On the repo tab, run make run-app which will start the app we described above.

On Kafka’s bin directory run the following to start a Kafka terminal producer:  ./kafka-console-producer.sh --broker-list localhost:9092 --topic input

Now you can start writing individual words, including colors. A word of advice is that, since we haven’t defined a concrete type (with a schema) for the values expected in the input topic, we need to include double quotes that we send to the output topic.

You should now have something like this:

running_faust

Where to go from here

You can now start bootstrapping your own streaming project with Python and Kafka. My suggestion for your further exploration of Faust would be to try and see how state is managed when using topics with more than 1 partition and when using more than 1 Faust app instance. You’ll understand how the Table gets sharded across Faust/Kafka consumers.

Another neat feature we did not explore was the type safety you can input on topics and tables: when declaring a topic you can specify the valye_type (and even the key_type) you are expecting to exist in that topic.

Found an error or need clarification from some topic? Leave a comment down below.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s