Robinhood is a very popular California based FinTech, included by Forbes in the top 50 FinTechs to watch in 2019. Their primary mission is to bring down stock trading fees for the common Joe/Jane, although their roadmap also includes cryptocurrency trading.
Due to the nature of the bidding market, their data stack probably includes a lot of stream tooling. Also (probably) due to the lack of quick and easy tooling for streaming with Python, supported with the growing demand for Python in the developer community, they launched their own port of Kafka Streams, called Faust.
In this post, we’re going to show how to easy it is to bootstrap a project with Faust to put your stream related business logic needs in practice very quickly. The demo we prepared is of an app which filters words from a Kafka topic and then keeps a count of how many times it has seen the colors “red”, “green” and “blue”.
In a nutshell, Faust is:
- easy to bootstrap streaming library for Python to use (mostly) Apache Kafka as the source;
- relies on Python’s
asyncio
to split the tasks intoagents
- distributing the workload between agents living in the same cluster
- provides
stateful
processing with support forrocksdb
(same as Apache Flink) - the agents can use whatever Python code/modules/libraries you are accustomed to use to the transformation or sinking of data (
numpy
,pandas
,boto3
, whatever you so desire).
The original developers of Faust describe it as falling under the use case of any of the following domains:
|
|
Also, luckily, Faust’s documentation is already at a mature stage, even including instructions on how to test your applications, in case you are not familiarised with Python’s asyncio
.
Distributed
It’s also worth noticing that Faust is tightly coupled to core Kafka concepts. More concretely, it relies on the way Kafka manages consumer groups to identify whether any partition/topic is not being attended to and launching an agent in an instance where the application is reporting as functioning. Or, more correctly, it assigns that partition to an agent living on an instance/app that is still identified by Kafka as still being active in the consumer group.
On the image above, we have a single Kafka topic which consists of four partitions. We also have two instances running simultaneously of the same Faust app. On Diagram 1 both are working, so the elected Faust leader for that topic will decide which topics it will get and which topics the other consumers in the same consumer group will get the remaining partitions. Since there is a 4 by 2 distribution, it will attribute 2 partitions to each instance.
On Diagram 2, the first instance goes down. Based on a certain timeout (which is can also be parameterised in faust settings), the customer group will inform the leader that one of the consumers is down and will force a re-distribution of topic/partition attribution. In this case, it will assign all four partitions to the same instance.
The reason this is important to mention is that the maximum parallelisation you are able to get out of any given topic is as high as its number of partitions. A topic with eight partitions will, at maximum, be assigned to eight instances of the app.
It’s all about the topics
Streams
When developing Faust, you are creating Agents which are basically async functions that look into every event passing through a stream
and apply some logic to it. A stream
is the basic abstraction for the communication link between source/agents/sinks and you can, usually, look at them as a Kafka topic.
If you have a simple workflow, like counting words, you would have
- a single agent that reads from a Kafka topic (not managed by Faust)
- updates the word count table
- sends the result to another stream/topic.
However, you could have an agent filtering the words filtering words, selecting only words for the 3 prime colors and sending the “filtered” results to another Faust agent that keeps the actual relevant word count.
So how do the agents communicate to each other? We’ll illustrate later the example, but it’s important to know that if we want agents to communicate with each other we can declare “internal” topics which are managed by Faust. It will take care of creating them with the default (8) or expressed number of partitions.
Stateful / Managed state
Because managed state is a whole different discussion, in order to keep this article focused, it’s important to mention that Faust supports it in a very peculiar way. The concept is that we declare a table
for each object that want want to keep track of. Each table is basically a special Faust class that behaves very similarly to a Python Dictionary.
- for persistence: supports both Rocksdb used for Production (same as Flink, not that they tackle the same problems at the same level) and Memory (useful for development).
- for recovery: uses topics to keep track of the table’s changelog of each table. This is done for you.
Basis for demo
In order to create sharable and demonstrable code, let’s look at the repository’s structure, which you can find here.
├── Makefile ├── README.md ├── requirements.txt ├── docker │ ├── docker-compose-local.yml │ └── kafka-docker └── src └── app.py
- We’ll need a handy Kafka cluster running locally. For ease of reproducibility, we’re using the popular
wurstmeister
‘s kafka-docker. - There is a Makefile which eases launching individual components (Kafka+Zookeeper or the sample faust app)
- Faust app is stored in
src/app.py
for simplicity, but they have instructions on how to organize larger projects
We will be running our Python code from outside Docker, so we need a virtual environment. We suggest using conda, so… after cloning the repository, you can follow the below to setup your proper environment
conda create -n faust-test python=3.6 conda activate faust-test pip install -r requirements.txt make kafka
The last command will launch Zookeeper and Kafka in different containers and will create 2 topics with 1 partition each: input
and output
. This is the source and sink topics which we’ll use in our Faust app.
Finally, because we will be sending stuff to and from Kafka topics, we need the Kafka executables to achieve that purpose. You can download them here. The bash scripts, inside the bin
directory you get after decompressing the tarball, will have the files we need to interact with Kafka as producers or consumers of a certain topic (the input
and output
mentioned earlier).
Quick code review
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import faust | |
app = faust.App(id="test",broker="kafka://localhost:9092",store="memory://") | |
# convenience func for launching the app | |
def main() -> None: | |
app.main() | |
# Input topic, NOT managed by Faust. Marked | |
input_topic = app.topic('input', internal=False, partitions=1, value_type=str) | |
# Faust will create this topic for us. | |
colors_topic = app.topic('colors_topic', internal=True, partitions=1, value_type=str) | |
# Output, also NOT managed by Faust. | |
output_topic = app.topic('output', internal=False, partitions=1, value_type=str) | |
# Let's define a table to keep the count of valid RGB colors. | |
colors_count_table = app.Table('colors-count', key_type=str, value_type=int, partitions=1, default=int) | |
VALID_WORDS = ["red", "green", "blue"] | |
@app.agent(input_topic) | |
async def filter_colors(words): | |
async for word in words: | |
print(word) | |
if word in VALID_WORDS: | |
await colors_topic.send(value=word) | |
@app.agent(colors_topic) | |
async def colors_count(colors): | |
async for color in colors: | |
colors_count_table[color] += 1 | |
print(f'{color} has now been seen {colors_count_table[color]} times') | |
await output_topic.send(value=colors_count_table[color]) |
This Faust app is relatively simple. It’s purpose is to read all words into the input
topic and keep a count of how many times the colors red
, green
and blue
have been sent. Each time either of them has been said, it sends an update to the output
topic.
After creating an app object from the faust.App
class:
- we declare two external topics with 1 partition each:
input
andoutput
(which are created automatically by the provided docker code when runningmake kafka
- we declare an internal topic
colors_topic
which is the means of communications between our 2 agents - we also declare a
Table
object so Faust knows to keep the running count of the colors in a managed state, in case it fails. - we create 2 agents: one filters colors from all the words and sends the color to the internal topic. The other agent keeps a count of the filtered colors that come in from the internal topic and reports its running count.
Running the sample code
Assuming you already ran the code in the Basis for demo above, then Kafka should already be running in the background and the conda environment should be activated. You should also have already downloaded Kafka.
Now, I suggest you split your terminal in 2:
- One table located at the repo’s root
- Another repo at the
bin
directory with Kafka’s command line utility scripts.
On the repo tab, run make run-app
which will start the app we described above.
On Kafka’s bin
directory run the following to start a Kafka terminal producer: ./kafka-console-producer.sh --broker-list localhost:9092 --topic input
Now you can start writing individual words, including colors. A word of advice is that, since we haven’t defined a concrete type (with a schema) for the values expected in the input topic, we need to include double quotes that we send to the output topic.
You should now have something like this:
Where to go from here
You can now start bootstrapping your own streaming project with Python and Kafka. My suggestion for your further exploration of Faust would be to try and see how state is managed when using topics with more than 1 partition and when using more than 1 Faust app instance. You’ll understand how the Table gets sharded across Faust/Kafka consumers.
Another neat feature we did not explore was the type safety you can input on topics and tables: when declaring a topic you can specify the valye_type
(and even the key_type
) you are expecting to exist in that topic.
Found an error or need clarification from some topic? Leave a comment down below.
Can you please provide more reference to multiple topic partitions, table, multiple instances ?
Thank for taking the time to reading the article.
Could you be more specific in what type of information you are looking for? Is it clarification on how those 3 work in Kafka or how Faust deals with them?
The diagram I have on the article shows how multiple instances of a given Faust consumer app listening to one topic would split the work between themselves. This is usually automated and it’s possible because 2x instances of the same “Faust app” would belong to the same Kafka Consumer Group. In these sense, they know that if a topic has 8 partitions, each of them will get 4. If one of the instance crashes, then these 4 unattended partitions get reassigned to the instance that is left alive, by Kafka.
I created 3 partition topic of “input” and 3 partition topic of “output” topic. I created 3 faust app instances. How many instances of “colors_topic ” and colors-count table are required ? I tried creating 3 instances of faust app with 3 partitions of “input”, “output”, colors_topic and colors-count table. But each were maintaining the a local “colors-count” table instance. Also, when i try to bring one of the faust app instance down it was not load sharing across the other 2 instances. (Am i missing something here ?)
Further, i am trying to build a low latency system. I want to set “max.poll.interval.ms = 0” and same for the producer “linger.ms=0 “. Also i dont want batching of records. How to set these configuration parameters ?
The behaviour you describe with all 3x instances “maintaining” a local colors-count instance is the intended behaviour: the concept of a Table in Kafka is the same as a topic – each “row” of the table (a.k.a message) will be keyed and belongs to partition. In this sense, each Faust instance will keep track of their “Table” partitions. Actually, input and output topics of any given Faust app instance need to have the same number of partitions, otherwise the app fails to start. There needs to be a sync beween input, state and output number of partitions.
The post uses only 1x partition per topic to focus on Faust usability, hence the topics have only 1x partition.
I created 3 partition topic of “input” and 3 partition topic of “output” topic. I created 3 faust app instances. How many instances of “colors_topic ” and colors-count table are required ? I tried creating 3 instances of faust app with 3 partitions of “input”, “output”, colors_topic and colors-count table. But each were maintaining the a local “colors-count” table instance.
Also, when i try to bring one of the faust app instance down it other instances were not resuming the consumption
across the other 2 partitions (Am i missing something here ?)
Further, i am trying to build a low latency system. I want to set “max.poll.interval.ms = 0” and same for the producer “linger.ms=0 “. Also i dont want batching of records. How to set these configuration parameters to minimize the delay ?
May I ask why did you specifically choose “max.poll.interval.ms”? You mentioned before that your Faust app instances were not rebalancing within the consumer group when you took one down. Setting this to 0 is probably the reason why – https://kafka.apache.org/documentation/#max.poll.interval.ms
I have not set the value in faust consumer. My question is how do i set the configuration ?. I have the default configuration for now. With the default configuration, i do not see the rebalancing
I’m sorry to hear that. If you are using 3x instances of the exact same app (this is what makes them part of the same consumer group) then it should happen.
Also, make sure you sending messages to the 3x different partitions to control debug which partitions are still being listened to.
I also recommend using docker to ensure a kafka clean slate for all your runs.
i changed all the partitions to 3 incl. tables
Started 3 instances by
faust –datadir=./worker1 -A app -l info worker –web-port=6066
faust –datadir=./worker2 -A app -l info worker –web-port=6067
faust –datadir=./worker3 -A app -l info worker –web-port=6068
I did
./kafka-console-producer.sh –bootstrap-server localhost:9092 –topic input
and the output was working fine.
However, i killed the worker 1,
Now, i again sent input via
./kafka-console-producer.sh –bootstrap-server localhost:9092 –topic input
but i doont see the output in the “output” topic.
I tried checking the slack link/google groups link, but both seem to be not working. Is there a mail id where i could discuss or if this medium is fine ?
Further, i am trying to build a low latency system. I want to set “max.poll.interval.ms = 0” and same for the producer “linger.ms=0 “. Also i dont want batching of records. How to set these configuration parameters to minimize the delay ?
It is difficult to give you more help without seeing exact code that is being run inside the python and the setup of the all components.
I already mentioned that the setting of max.poll = 0 is probably the culprit for stopping to receive new messages after killing the worker – the documentation link I sent you is very clear that it is not an acceptable value.
How are you defining low latency? Why do you feel this setting is the one that will make you reach “low latency”? This setting will only have a high impact on that if you plan on having workers die every minute.
I can’t provide you with a Faust official support channel, I’m not part of their eam, I just liked their stack and decided to write about their software.
I am however a consultant, so if you need professional help, I would be glad to arrange a meeting.
I am trying to use the above article that you have mentioned with multiple partitions. I have not changed anything much apart from changing the partitions and changing the number of workers. I wanted to check the rebalancing aspect before thinking about using in our product.