Using Kafka with MQTT

Menu

This article contains two examples of using Instaclustr Kafka with MQTT:

  • Kafka Connect with a MQTT Source
  • Kafka Connect with a MQTT Sink

Prerequisites

This example uses Apache Kafka and a Kafka Connect MQTT plugin, both of which require Java 8. The Apache Kafka package installation comes bundled with a number of Kafka tools. In particular, this example uses the connect-standalone.sh tool. To get this tool you will need to download and install a Kafka release from here. This example has been tested with Kafka 1.1.0. The example also uses a third party plugin to allow us to connect Kafka to MQTT brokers that can be found here. Once downloaded, extract the archive and copy the stream-reactor/kafka-connect-mqtt/ folder to a plugins/ folder somewhere on your computer. For example:

The examples will also use the Eclipse Mosquitto MQTT clients for publishing and reading test messages. The Mosquitto clients can be downloaded here or via the command line, e.g.:

Kafka Connect with a MQTT Sink

This example demonstrates how to send messages from a Kafka topic into a MQTT topic.

Kafka Connect Configuration

Before you can use Kafka Connect you need to configure a number of things. Basic configuration requires the following configuration options, see here for a full list of options. Make a file connect.properties with the following content:

Make sure to replace the bootstrap.servers with the IP address of at least one node in your cluster, and plugin.path with the path to your plugins directory.

Note: To connect to your Kafka cluster over the private network, use port 9093 instead of 9092.

In order to use Kafka Connect with Instaclustr Kafka you also need to provide authentication credentials. Add the following to your connect.properties file, ensuring the password is correct:

If your cluster does not have client ⇆ broker encryption enabled, add the following to your connect.properties file:

If your cluster has client ⇆ broker encryption enabled you will also need to provide encryption information. For more information on using certificates with Kafka and where to find them see here. Add the following to your connect.properties file, ensuring the truststore location is correct:

Create Kafka Topic

For Kafka Connect to work, sources and sinks must refer to specific Kafka topics. Before you can run Kafka Connect you need to create a topic to be used for storing the messages produced by Kafka Connect. Use the guide here to create a new topic called mqtt-sink.

MQTT Sink Configuration

Now that Kafka Connect is configured, you need to configure the sink for your data. This example uses the MQTT Sink from the stream reactor project. For more information on the MQTT Sink, including more configuration options, see here. The MQTT Sink will take all messages from a Kafka topic and publish them to a MQTT topic. Make a file mqtt-sink.properties with the following content:

Make sure to replace <your prefix> with a prefix to differentiate your topic from other topics, as this example uses a publicly available MQTT broker (test.mosquitto.org) for demonstration purposes.

Start Kafka Connect

Now that all the Kafka Connect components in this example are configured, you can start Kafka Connect from the command line like so:

Test Kafka Connect

Once Kafka Connect has started, it’s time to test the configuration.

First, use the mosquitto_sub command line tool to subscribe to the MQTT topic:

Next, follow the guide here to setup a Kafka console producer in a new terminal window, changing the topic name to the mqtt-sink topic.

Once the console producer has started, send some test messages:

After a short delay the text you produced to Kafka should be output by the mosquitto_sub MQTT subscriber you started earlier:

Kafka Connect with a MQTT Source

This example demonstrates how to use Kafka Connect to read data from an MQTT topic into a Kafka topic.

Kafka Connect Configuration

Before you can use Kafka Connect you need to configure a number of things. Basic configuration requires the following configuration options, see here for a full list of options. Make a file connect.properties with the following content:

Make sure to replace the bootstrap.servers with the IP address of at least one node in your cluster, and plugin.path with the path to your plugins directory.

Note: To connect to your Kafka cluster over the private network, use port 9093 instead of 9092.

In order to use Kafka Connect with Instaclustr Kafka you also need to provide authentication credentials. Add the following to your connect.properties file, ensuring the password is correct:

If your cluster does not have client ⇆ broker encryption enabled, add the following to your connect.properties file:

If your cluster has client ⇆ broker encryption enabled you will also need to provide encryption information. For more information on using certificates with Kafka and where to find them see here. Add the following to your connect.properties file, ensuring the truststore location is correct:

Create Kafka Topic

For Kafka Connect to work, sources and sinks must refer to specific Kafka topics. Before you can run Kafka Connect you need to create a topic to be used for storing the messages produced by Kafka Connect. Use the guide here to create a new topic called mqtt-source.

MQTT Source Configuration

Now that Kafka Connect is configured, you need to configure the source for our data. This example uses the MQTT Source from the stream reactor project. For more information on the MQTT Source, including more configuration options, see here. The MQTT Source will take all messages from a MQTT topic into a specified Kafka topic. Make a file mqtt-source.properties with the following content:

Make sure to replace <your prefix> with a prefix to differentiate your topic from other topics, as this example uses a publicly available MQTT broker (test.mosquitto.org) for demonstration purposes.

Start Kafka Connect

Now that all the Kafka Connect components in this example are configured, you can start Kafka Connect from the command line like so:

Test Kafka Connect

Once Kafka Connect has started, it’s time to test the configuration.

First, follow the guide here to setup a Kafka console consumer, changing the topic name to the mqtt-source topic.

Next, use the mosquitto_pub command line tool to produce some messages to the MQTT topic:

Make sure to replace <your prefix> with the same prefix you chose in mqtt-source.properties.

After a short delay, the text you produced to the MQTT topic should be output by the console consumer, for example:

Site by Swell Design Group