• Dev Rel
  • Apache Kafka
  • Technical
Machine Learning Over Streaming Kafka® Data—Part 5: Incremental TensorFlow Training With Kafka Data

In the “Machine Learning over Streaming Kafka Data” blog series we’ve been learning all about Kafka Machine Learning incrementally! In the previous part, we explored incremental training with TensorFlow, but without the complication of using Kafka. In this part, we now connect TensorFlow to Kafka and explore how incremental learning works in practice with moving data (albeit very gently moving data to start with). 

Bitter Springs (Mataranka, NT, Australia) flow at under 1 cubic metre per second, generating a short stream popular for tourists floating at a leisurely rate on pool noodles (Source: Paul Brebner) 

1. Incremental Kafka TensorFlow Code 

My goal at the start of this blog series was to try out the tutorial “Robust machine learning on streaming data using Kafka and Tensorflow-IO” on streaming data from my Drone Delivery application. I did give it a go, but soon realized I didn’t understand what was really going on, or if it was even working. So, it’s time to revisit it now that we have some basic TensorFlow experience. 

The TensorFlow Kafka tutorial has 2 parts; the first part deals with batch training from Kafka, and the second part with online training from Kafka. Given that we want to explore incremental/online learning, let’s focus on the second half. 

1.1 Write Data to Kafka 

Here’s the new Python code to write my CSV data into Kafka topics.   

I’ve kept some of the design of the tutorial code but modified it slightly for my drone data. This includes my drone data-specific columns, and I’ve also decided not to use the original approach of one topic for training data and one for testing. Instead, I plan to use the training topic for training and testing (hence most of the data is written to the training topic only). The other slightly odd design was to use the class label (‘0’ or ‘1’) as the Kafka message key. I’ve kept this feature for the time being, although in practice I think it may be better to use the shop ID as the message key. 

1.2 Create the Model 

This code is identical to the previous blog. 

1.3 Read the Data From Kafka 

“Franz Kafka reading data in the style of Rembrandt” (Source: Paul Brebner, Dalle-2) 

Here’s where things get a bit trickier, and Kafka specific. In incremental/online learning, the assumption is that the data, once consumed and used to train the model incrementally, may not be available for training again (after all, streams are infinite, you just can’t keep all the data in RAM). 

The above code uses a new data type, tfio.experimental.streaming.KafkaBatchIODataset. It represents a streaming batch dataset obtained using a Kafka consumer group (batch is a slightly confusing idea in the context of streaming data—it just fetches batches from the streaming data). Each batch is of type tf.data.Dataset, which is also new. It basically allows for the creation of a dataset from streaming input data, the application of many transformations to pre-process the data, and iterations over the dataset in a streaming fashion, so that the full dataset doesn’t have to fit into RAM.  

The above code connects a consumer group to the specified Kafka cluster and topic (I’m just using Kafka on my mac for the initial experiments, version 3.3.1 with Kraft). The consumer timeout can be infinite (-1), or a maximum value, after which the consumer terminates and no more data is returned from the topic.  

The configuration option “batch.num.messages” (which corresponds to Kafka’s max.poll.records) turned out to be important for my example data (this is an undocumented setting). Because I don’t have massive amounts of data, the default Kafka consumer buffer size meant that all the data was being returned in a single poll, so it appeared to be not really working correctly at providing continuous streaming data. The simplest solution that I eventually found (after trying a few hacks, including windowing etc.) was to set the batch.num.messages to 100 (which is the value I had determined in the previous blog to provide optimal incremental learning from this data).   

The second function handles decoding CSV formatted record values.  

1.4 Incremental Training 

Here’s some simple code for incremental training: 

 There are a few different batch values which can be confusing. In the training loop, every mini_ds is a tf.data.Dataset which will be of (maximum) size batch.num.messages = 100. But this is only relevant for fetching the data from Kafka. For training, you must use another batch, mini_ds.batch()—the value 1 worked for me; and 32 worked for the fit batch_size value. Note that you must have a batch() size, otherwise the fit() method returns an error about incorrect training data shape.  

2. Some Refinements 

Metal refining is very hot work (Source: Shutterstock)

The above code worked ok, but I could think of several refinements to make it more useful before running the experiments. For example, given that I was using all the data for training, I was worried about overfitting, and also how to evaluate the model. The solution I came up with was to use 80% of each batch for training, and 20% for evaluation. But I also decided to keep up to the most recent 1000 observations for evaluation. I also wanted to keep track of the evaluation accuracy over time, so I kept the result in a list. Here’s close to the final code with these and a few more metrics added to it:

3. Initial Results 

I also ended up with a simplified data training set, as I realized that the initial rules I used for shops being busy or not busy relied too much on the day of the week—and this is potentially confusing for incremental learning as the training data is presently strictly in time order, so the later days of the week (with rule changes) come as a surprise! To keep things simple, I therefore made the rules depend only on the shop type, hour of day (which is a time-based feature too), and shop location.   

Here’s the result of a run on 1 week of the new data using the incremental algorithm. The best accuracy is 0.72. The x-axis is just the loop number (multiply by 100 to find out how much data has been processed so far). 

Now compare that to what we observed in the previous blog:

As we saw, the accuracy of incremental learning tends to oscillate wildly, and this run is no exception. Just for comparison, I reran the previous batch training on this data which gave a training accuracy of 0.94, but an evaluation accuracy of less at 0.8 (on 20% of the data). 

In the final part of this series, we’ll introduce some concept drift and see how it manages. 

Follow the series: Machine Learning Over Streaming Kafka® Data

Part 1: Introduction

Part 2: Introduction to Batch Training and TensorFlow

Part 3: Introduction to Batch Training and TensorFlow Results

Part 4: Introduction to Incremental Training With TensorFlow

Part 5: Incremental TensorFlow Training With Kafka Data

Part 6: Incremental TensorFlow Training With Kafka Data and Concept Drift