Working with Kafka Streams API

Menu

In this example we will be using the Java Kafka Streams API to count the number of times different words occur in a topic.

Dependencies

Add the kafka_2.12 package to your application. This package is available in maven:

Kafka Streams Configuration

Before we can use the Streams API we need to configure a number of things. Basic configuration requires the following configuration options. Make a file streams.properties with the following content, making sure to replace the bootstrap.servers list with the IP addresses of your cluster:

Note: To connect to your Kafka cluster over the private network, use port 9093 instead of 9092.

In order to use the Streams API with Instaclustr Kafka we also need to provide authentication credentials. If your cluster has client ⇆ broker encryption enabled you will also need to provide encryption information. For more information on using certificates with Kafka and where to find them see here. Add the following to your streams.properties file, ensuring the password and truststore location are correct:

If your cluster does not have client ⇆ broker encryption enabled instead add the following to your streams.properties file, ensuring the password is correct.

Create the Streams application

First, load the properties we defined earlier:

Create a new input KStream object on the wordcount-input topic:

Create the word count KStream that will calculate the number of times each word occurs:

Direct the output from the word count KStream to a topic named wordcount-output:

Finally, create and start the KafkaStreams object:

Our Streams application is now ready, but we need to some more setup before we can use it.

Create Input Topic

In this example we are going to use the Streams API to count the occurances of words in a Kafka topic. Before we can run the Streams application we need to create the topic to read input from. Use the guide here to create a new topic called wordcount-input with 1 partition and a replication factor of 1.

Produce Some Input

First, follow the guide here to setup a Kafka console producer, changing the topic name to the wordcount-input topic.

Once you’ve setup a console producer, send some input to Kafka (preferably with duplicated words so we get word counts greater than 1):

Start the Streams application

Run the Java Streams application we created earlier. If setup correctly the application won’t produce any output, and will continue running indefinitely until you stop it.

Consume the Output

Now that the Streams application has begun calculating the word counts of our input text, we would like to view the output. To view the output follow the guide here to setup a Kafka console consumer, changing the topic name to the wordcount-output topic.

After some delay (it could take up to a minute for the first output to come through), the consumer output the word counts as produced by our Streams application:

Feel free to produce some more text to the wordcount-input topic, and after some delay the consumer should output new word count list. For example, if you produced the input “chuck the woodchuck chucked wood”, the console consumer should output:

Note: the Streams application will only output the counts for words contained in the most recent message.

Site by Swell Design Group