Dr Black has been murdered in the Billiard Room with a Candlestick! Whodunnit?!

In this blog, we’ll have a look at some simple Kafka Streams examples using the murder mystery game Cluedo (Clue in the US) as a simple problem domain.  There are six suspects and a mansion with multiple rooms.

Kongo 5.2 Cluedo Image

The suspects are:

  • Miss Scarlet
  • Professor Plum
  • Mrs. Peacock
  • Mr Green
  • Colonel Mustard
  • Mrs. White

The rooms are:

  • Kitchen
  • Conservatory
  • Dining Room
  • Billiard Room
  • Library
  • Lounge

Assuming everyone has a means and a motive for murder, we’ll focus on who has an alibi as they move around the mansion.

Example 1: Who’s in the same location at the same time?

So you can imagine what the rooms look like, here’s the Cluedo mansion in The Sims.

Kongo 5.2 Cluedo Mansion

The input stream will be <room, person> records giving the new location of a person. We’ll send them using the kafka-console-producer program as follows:

./kafka-console-producer.sh –broker-list localhost:9092 –topic cluedo-topic –property “parse.key=true” –property “key.separator=:”

>room1:person1
>room2:person2

From this event stream let’s find out which people are in the same room as someone else, which gives everyone in a room with at least one other person an alibi (unless they all did it). This example illustrates Kafka streams configuration properties, topology building, reading from a topic, a windowed (self) streams join, a filter, and print (for tracing).  The self join will find all pairs of people who are in the same location at the “same time”, in a 30s sliding window in this case. However, we don’t want anyone providing themselves an alibi so if the leftValue and rightValue names match from the join we produce a “” joined value and filter it out. Here’s the initial streams code:

Let’s try it out. These are the initial locations of the suspects:

After 30 seconds has elapsed (the window duration) let’s move people around to these locations:

We get the following printed out showing all the pairs of people in the same locations. They all appear to have alibis, except Mrs White who can move around to other empty rooms without anyone else knowing. Note that the order of printing events isn’t always strictly preserved:

Note that no one has an alibi at the start as everyone is in different rooms.

Example 2: How many people are in each room?

This is possibly the oddest version of Cluedo set in some sort of cartoon alternative reality:

Kongo 5.2 Clue - Rick and Morty Board Game

Let’s try an extension of example 1 with a KTable keeping track of some state. We’ll count the number of people in each room as they move around. If someone is in a room by themselves then they don’t have an alibi and are a prime suspect! How do we get from the <room, person> stream to a <room, count> table?

The final count table will have <room, count> data, but in order to count the number of people per room we need to know which rooms people are in. This is also state information so we need an intermediate table, created from the input topic, with <person, room> state data. Once we have this we groupBy the reverse key/value pair, <room, person> and count the number of people per room.   However, the input topic has <room, person> records so we need to reverse these. One way to do this is to read the topic into a stream, swap the key/value pairs, and write back to a topic. We can’t directly turn the stream to a table, as there’s no operator to transform a KStream to a KTable (see the table in previous blog).

Here’s the output with same input as before:

Dr Black should stick to the Conservatory or Library to avoid being bumped off.

Notice the delay with the KTable output. This is due to the default caching time for state stores (30s).

Example 3: Who doesn’t have an Alibi?

Kongo 5.2 Image

It would be useful to make the people with no alibis explicit and create a stream of potential suspects. This example illustrates a stream-table join with a filter. We’ll join the <room, person> stream with the <room, count> table giving a <room, person, count> join, which we filter for people with no alibi:

How well did this work? Well, as noted above, no one has an initial alibi, so I changed things a bit to prevent spurious no alibi messages. Everyone now starts out in the Hall, and we filter out the Hall to prevent “no alibi” messages.  

According to this version, everyone is under suspicion as they don’t have an alibi. Why? It appears that the KTables are not being updated until after the “no alibi” messages are produced. I guess this behaviour is due to the state store caching, so I turned off the state store cache to see what happens. The properties setting is:

config.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);

ProfessorPlum, ColonelMustard and MrsWhite have no alibis, so this is working better.

But still may not be “correct”. I.e. we are not checking if someone is the last person in a room after everyone has moved out, but how can we given that everything is event driven? We’d need to check when they move out to another room. But what if they don’t move to another room?

Topology

Kongo 5.2 - Topology

Braided sheets (each with a different color and internal structure) slipping over and under each other in higher-dimensional space (From Topology and the Visualization of Space)

I was curious to see what the Topology of the final example looked like. Here’s the code to print it out (simple) and the result (not so simple, but probably simpler than the above illustration of braided higher dimensional sheets):

final Topology top = builder.build();
System.out.println(top.describe());

Things to Watch Out For (Lucky they are only tiny)

Kongo 5.2 Cluedo WeaponsWeapons from the original Cluedo Patent

While playing around with Kafka Streams I made a list of things that caught me out. Here they are:

  1. Your streams application needs the latest RocksDB jar file (look for error “java.lang.UnsupportedOperationException”)
  2. When debugging/testing streams applications, ensure you don’t have any other instances of them running (with the same APPLICATION_ID_CONFIG, which I guess is the consumer group name) that you’ve forgotten about. It’s best to get the code to close() the application after it’s finished each time.
  3. If you transform a KTable to a KStream to a Kafka topic, ensure that the sink topic is compacted so that it doesn’t grow indefinitely.
  4. You can’t explicitly access the record timestamp from the DSL, so you can’t do things like compare the order of events. This seems like an odd constraint, but you can access the timestamp from the Processor API.  You could cheat by putting a copy of the timestamp in the record value.
  5. When testing/debugging remember that Kafka remembers everything! You may need to reset both the state stores and the streams.
    1. There’s 2 things to reset, streams and state. You can use cleanUp() in your code to reset state stores, and the reset tool to reset streams (however, this may not be the behaviour you want). For the Cluedo application I wanted to reset the state stores but not the streams (as I didn’t want the streams data replayed each time).
    2. A useful trick was to use input data to put the state stores back to the start state that I wanted each time. I did this by introducing an extra room, a Train, and moving everyone to and from the Train at the start and end of each test scenario.  
    3. Here’s documentation on resetting the local environment, and a more comprehensive explanation of resetting in the context of reprocessing.
  6. print() is your friend. Print everything, but remember it’s terminal (just like a candlestick is if used as a blunt instrument).
  7. Only put DSL code inline with the DSL code. I.e. don’t put any non-DSL code inline with DSL code (or something bad may happen).
  8. Changing and reversing keys and values is tricky, but appears to be normal. How do know which operators do this? E.g. Join can only change the value not the key.
  9. There’s a trade-off between using KStreams and KTables. KStreams are immediate (i.e. an incoming event immediately triggers processing), but some operations require the use of time Windows to limit the amount of data included. KTables keep track of state, which can be from an arbitrarily long series of events/time, but if caching is turned on then the results may not be immediate. I.e. use KStreams (where possible) if you care about low latency and using fresh data, and KTables when you don’t mind if the results are a bit fuzzy.
  10. It’s useful to have some knowledge of Java 8 Streams, Lambdas and Functional Programming to use get the most out of Kafka Streams DSL (see resources below).

Finally, Kafka Streams DSL is highly extensible and composable. It should be a good choice for application development in complex and rapidly moving environments as it enables loosely coupled code to be written which:

  • doesn’t need to know about other applications that may also be using the same data, and
  • can build new functionality upon new data/topics as they come into being.

Some Kafka Streams Resources

Apache Kafka Streams Documentation

Data Stream Management with Apache Kafka Streams (good overview of Kafka streams deployment)

Mastering Kafka Streams (online book, incomplete but good overview and detail)

Simple Spatio-Temporal Windowing with Kafka Streams with code example

Apache Kafka DSL API documentation

Apache Kafka KStream API

Apache Kafka KTable API

Kafka Streams examples

Kafka Streams DSL vs Processor API vs KSQL

Timestamps in streams

Inspecting streams and tables

Converting KStream to KTable

Crossing the streams – Joins in Apache Kafka

Java 8 Concepts: FP, Streams, and Lambda Expressions (useful background to the Java 8 syntax of Streams and Lambdas used in Kafka Streams), and Java 8 in Action: Chapter 4. Introducing streams (free chapter).

Data Reprocessing with the Streams API in Kafka: Resetting a Streams Application

Application Reset Tool

Geomesa (an open source large-scale geospatial tool) integrates with Kafka and would enable more powerful Open Geospatial Consortium (OGC) spatio-temporal queries.

Spark streaming vs Kafka Streams (note may be better comparisons this is just random)

Describe and execute Kafka stream topologies with Lenses SQL (in a previous blog we used the Landoop Kafka Cassandra connector, they also have an open source GUI for Kafka streams).

Kafka Streams Examples

E.g. Join between KStream and KTable

Site by Swell Design Group