Faust, your Python-based streaming library

Robinhood is a very popular California based FinTech, included by Forbes in the top 50 FinTechs to watch in 2019. Their primary mission is to bring down stock trading fees for the common Joe/Jane, although their roadmap also includes cryptocurrency trading.

Due to the nature of the bidding market, their data stack probably includes a lot of stream tooling. Also (probably) due to the lack of quick and easy tooling for streaming with Python, supported with the growing demand for Python in the developer community, they launched their own port of Kafka Streams, called Faust.

In this post, we’re going to show how to easy it is to bootstrap a project with Faust to put your stream related business logic needs in practice very quickly. The demo we prepared is of an app which filters words from a Kafka topic and then keeps a count of how many times it has seen the colors “red”, “green” and “blue”.

In a nutshell, Faust is:

  • easy to bootstrap streaming library for Python to use (mostly) Apache Kafka as the source;
  • relies on Python’s asyncio to split the tasks into agents
  • distributing the workload between agents living in the same cluster
  • provides stateful​processing with support for rocksdb (same as Apache Flink)
  • the agents can use whatever Python code/modules/libraries you are accustomed to use to the transformation or sinking of data (numpy, pandas, boto3, whatever you so desire).

The original developers of Faust describe it as falling under the use case of any of the following domains:

  • Event Processing
  • Distributed Joins & Aggregations
  • Machine Learning
  • Asynchronous Tasks
  • Distributed Computing
  • Data Denormalization
  • Intrusion Detection
  • Realtime Web & Web Sockets.
  • and much more…

Also, luckily, Faust’s documentation is already at a mature stage, even including instructions on how to test your applications, in case you are not familiarised with Python’s asyncio.

Distributed

It’s also worth noticing that Faust is tightly coupled to core Kafka concepts. More concretely, it relies on the way Kafka manages consumer groups to identify whether any partition/topic is not being attended to and launching an agent in an instance where  the application is reporting as functioning. Or, more correctly, it assigns that partition to an agent living on an instance/app that is still identified by Kafka as still being active in the consumer group.

basic_faust.png

On the image above, we have a single Kafka topic which consists of four partitions. We also have two instances running simultaneously of the same Faust app. On Diagram 1 both are working, so the elected Faust leader for that topic will decide which topics it will get and which topics the other consumers in the same consumer group will get the remaining partitions. Since there is a 4 by 2 distribution, it will attribute 2 partitions to each instance.

On Diagram 2, the first instance goes down. Based on a certain timeout (which is can also be parameterised in faust settings), the customer group will inform the leader that one of the consumers is down and will force a re-distribution of topic/partition attribution. In this case, it will assign all four partitions to the same instance.

The reason this is important to mention is that the maximum parallelisation you are able to get out of any given topic is as high as its number of partitions. A topic with eight partitions will, at maximum, be assigned to eight instances of the app.

It’s all about the topics

Streams

When developing Faust, you are creating Agents which are basically async functions that look into every event passing through a stream and apply some logic to it. A stream is the basic abstraction for the communication link between source/agents/sinks and you can, usually, look at them as a Kafka topic.

If you have a simple workflow, like counting words, you would have

  • a single agent that reads from a Kafka topic (not managed by Faust)
  • updates the word count table
  • sends the result to another stream/topic.

However, you could have an agent filtering the words filtering words, selecting only words for the 3 prime colors and sending the “filtered” results to another Faust agent that keeps the actual relevant word count.

So how do the agents communicate to each other? We’ll illustrate later the example, but it’s important to know that if we want agents to communicate with each other we can declare “internal” topics which are managed by Faust. It will take care of creating them with the default (8) or expressed number of partitions.

Stateful / Managed state

Because managed state is a whole different discussion, in order to keep this article focused, it’s important to mention that Faust supports it in a very peculiar way. The concept is that we declare a table for each object that want want to keep track of. Each table is basically a special Faust class that behaves very similarly to a Python Dictionary.

  • for persistence: supports both Rocksdb used for Production (same as Flink, not that they tackle the same problems at the same level) and Memory (useful for development).
  • for recovery: uses topics to keep track of the table’s changelog of each table. This is done for you.

Basis for demo

In order to create sharable and demonstrable code, let’s look at the repository’s structure, which you can find here.

├── Makefile
├── README.md
├── requirements.txt
├── docker
│  ├── docker-compose-local.yml
│  └── kafka-docker
└── src
    └── app.py
  • We’ll need a handy Kafka cluster running locally. For ease of reproducibility, we’re using the popular wurstmeister‘s kafka-docker. 
  • There is a Makefile which eases launching individual components (Kafka+Zookeeper or the sample faust app)
  • Faust app is stored in src/app.py for simplicity, but they have instructions on how to organize larger projects

We will be running our Python code from outside Docker, so we need a virtual environment. We suggest using conda, so… after cloning the repository, you can follow the below to setup your proper environment

conda create -n faust-test python=3.6
conda activate faust-test
pip install -r requirements.txt
make kafka

The last command will launch Zookeeper and Kafka in different containers and will create 2 topics with 1 partition each: input and output. This is the source and sink topics which we’ll use in our Faust app.

Finally, because we will be sending stuff to and from Kafka topics, we need the Kafka executables to achieve that purpose. You can download them here. The bash scripts, inside the bin directory you get after decompressing the tarball, will have the files we need to interact with Kafka as producers or consumers of a certain topic (the input and output mentioned earlier).

Quick code review

import faust
app = faust.App(id="test",broker="kafka://localhost:9092",store="memory://")
# convenience func for launching the app
def main() -> None:
app.main()
# Input topic, NOT managed by Faust. Marked
input_topic = app.topic('input', internal=False, partitions=1, value_type=str)
# Faust will create this topic for us.
colors_topic = app.topic('colors_topic', internal=True, partitions=1, value_type=str)
# Output, also NOT managed by Faust.
output_topic = app.topic('output', internal=False, partitions=1, value_type=str)
# Let's define a table to keep the count of valid RGB colors.
colors_count_table = app.Table('colors-count', key_type=str, value_type=int, partitions=1, default=int)
VALID_WORDS = ["red", "green", "blue"]
@app.agent(input_topic)
async def filter_colors(words):
async for word in words:
print(word)
if word in VALID_WORDS:
await colors_topic.send(value=word)
@app.agent(colors_topic)
async def colors_count(colors):
async for color in colors:
colors_count_table[color] += 1
print(f'{color} has now been seen {colors_count_table[color]} times')
await output_topic.send(value=colors_count_table[color])

view raw
faust.py
hosted with ❤ by GitHub

This Faust app is relatively simple. It’s purpose is to read all words into the input topic and keep a count of how many times the colors red, green and blue have been sent. Each time either of them has been said, it sends an update to the output topic.

After creating an app object from the faust.App class:

  • we declare two external topics with 1 partition each: input and output (which are created automatically by the provided docker code when running make kafka
  • we declare an internal topic colors_topic which is the means of communications between our 2 agents
  • we also declare a Table object so Faust knows to keep the running count of the colors in a managed state, in case it fails.
  • we create 2 agents: one filters colors from all the words and sends the color to the internal topic. The other agent keeps a count of the filtered colors that come in from the internal topic and reports its running count.

Running the sample code

Assuming you already ran the code in the Basis for demo above, then Kafka should already be running in the background and the conda environment should be activated.  You should also have already downloaded Kafka.

Now, I suggest you split your terminal in 2:

  1. One table located at the repo’s root
  2. Another repo at the bin directory with Kafka’s command line utility scripts.

On the repo tab, run make run-app which will start the app we described above.

On Kafka’s bin directory run the following to start a Kafka terminal producer:  ./kafka-console-producer.sh --broker-list localhost:9092 --topic input

Now you can start writing individual words, including colors. A word of advice is that, since we haven’t defined a concrete type (with a schema) for the values expected in the input topic, we need to include double quotes that we send to the output topic.

You should now have something like this:

running_faust

Where to go from here

You can now start bootstrapping your own streaming project with Python and Kafka. My suggestion for your further exploration of Faust would be to try and see how state is managed when using topics with more than 1 partition and when using more than 1 Faust app instance. You’ll understand how the Table gets sharded across Faust/Kafka consumers.

Another neat feature we did not explore was the type safety you can input on topics and tables: when declaring a topic you can specify the valye_type (and even the key_type) you are expecting to exist in that topic.

Found an error or need clarification from some topic? Leave a comment down below.

12 thoughts on “Faust, your Python-based streaming library

    1. Thank for taking the time to reading the article.
      Could you be more specific in what type of information you are looking for? Is it clarification on how those 3 work in Kafka or how Faust deals with them?
      The diagram I have on the article shows how multiple instances of a given Faust consumer app listening to one topic would split the work between themselves. This is usually automated and it’s possible because 2x instances of the same “Faust app” would belong to the same Kafka Consumer Group. In these sense, they know that if a topic has 8 partitions, each of them will get 4. If one of the instance crashes, then these 4 unattended partitions get reassigned to the instance that is left alive, by Kafka.

      1. I created 3 partition topic of “input” and 3 partition topic of “output” topic. I created 3 faust app instances. How many instances of “colors_topic ” and colors-count table are required ? I tried creating 3 instances of faust app with 3 partitions of “input”, “output”, colors_topic and colors-count table. But each were maintaining the a local “colors-count” table instance. Also, when i try to bring one of the faust app instance down it was not load sharing across the other 2 instances. (Am i missing something here ?)
        Further, i am trying to build a low latency system. I want to set “max.poll.interval.ms = 0” and same for the producer “linger.ms=0 “. Also i dont want batching of records. How to set these configuration parameters ?

      2. The behaviour you describe with all 3x instances “maintaining” a local colors-count instance is the intended behaviour: the concept of a Table in Kafka is the same as a topic – each “row” of the table (a.k.a message) will be keyed and belongs to partition. In this sense, each Faust instance will keep track of their “Table” partitions. Actually, input and output topics of any given Faust app instance need to have the same number of partitions, otherwise the app fails to start. There needs to be a sync beween input, state and output number of partitions.
        The post uses only 1x partition per topic to focus on Faust usability, hence the topics have only 1x partition.

      3. I created 3 partition topic of “input” and 3 partition topic of “output” topic. I created 3 faust app instances. How many instances of “colors_topic ” and colors-count table are required ? I tried creating 3 instances of faust app with 3 partitions of “input”, “output”, colors_topic and colors-count table. But each were maintaining the a local “colors-count” table instance.
        Also, when i try to bring one of the faust app instance down it other instances were not resuming the consumption
        across the other 2 partitions (Am i missing something here ?)

  1. Further, i am trying to build a low latency system. I want to set “max.poll.interval.ms = 0” and same for the producer “linger.ms=0 “. Also i dont want batching of records. How to set these configuration parameters to minimize the delay ?

      1. I have not set the value in faust consumer. My question is how do i set the configuration ?. I have the default configuration for now. With the default configuration, i do not see the rebalancing

      2. I’m sorry to hear that. If you are using 3x instances of the exact same app (this is what makes them part of the same consumer group) then it should happen.

        Also, make sure you sending messages to the 3x different partitions to control debug which partitions are still being listened to.

        I also recommend using docker to ensure a kafka clean slate for all your runs.

  2. i changed all the partitions to 3 incl. tables
    Started 3 instances by
    faust –datadir=./worker1 -A app -l info worker –web-port=6066
    faust –datadir=./worker2 -A app -l info worker –web-port=6067
    faust –datadir=./worker3 -A app -l info worker –web-port=6068

    I did
    ./kafka-console-producer.sh –bootstrap-server localhost:9092 –topic input

    and the output was working fine.

    However, i killed the worker 1,
    Now, i again sent input via
    ./kafka-console-producer.sh –bootstrap-server localhost:9092 –topic input

    but i doont see the output in the “output” topic.

    I tried checking the slack link/google groups link, but both seem to be not working. Is there a mail id where i could discuss or if this medium is fine ?

    Further, i am trying to build a low latency system. I want to set “max.poll.interval.ms = 0” and same for the producer “linger.ms=0 “. Also i dont want batching of records. How to set these configuration parameters to minimize the delay ?

    1. It is difficult to give you more help without seeing exact code that is being run inside the python and the setup of the all components.
      I already mentioned that the setting of max.poll = 0 is probably the culprit for stopping to receive new messages after killing the worker – the documentation link I sent you is very clear that it is not an acceptable value.
      How are you defining low latency? Why do you feel this setting is the one that will make you reach “low latency”? This setting will only have a high impact on that if you plan on having workers die every minute.
      I can’t provide you with a Faust official support channel, I’m not part of their eam, I just liked their stack and decided to write about their software.
      I am however a consultant, so if you need professional help, I would be glad to arrange a meeting.

      1. I am trying to use the above article that you have mentioned with multiple partitions. I have not changed anything much apart from changing the partitions and changing the number of workers. I wanted to check the rebalancing aspect before thinking about using in our product.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s