ClickHouse Kafka Connect Sink

The ClickHouse Kafka Connect Sink allows you to forward messages from a defined Kafka topic into a specified ClickHouse table for querying and later usage. It is based on the official opensource ClickHouse connector developed with Apache 2.0 license. The code repository is available for public access.

Cluster Requirements

  • Kafka cluster of version 2.7+

Note: This guide assumes Kafka messages being forwarded to your ClickHouse cluster are in JSONEachRow format. If you would like them to be in Avro format instead, you will also need to enable Karapace Schema Registry when provisioning your Kafka cluster.

  • Kafka Connect cluster of version 2.7+, with the Kafka cluster above as its target
  • ClickHouse cluster of version 23.3+

You can refer to this guide for instructions on creating a Kafka cluster with Instaclustr, this guide on creating a Kafka Connect cluster, and this guide on creating a ClickHouse cluster. If you have an older version of Kafka, Kafka Connect or ClickHouse, reach out to the Instaclustr Support Team to upgrade your clusters so you can leverage the benefits of this connector.

Step 1: Create the ClickHouse table where the Kafka messages will be forwarded

A simple example might include the following:

Make sure the field names and data types correspond to the key-value pairs in the messages you expect to be forwarding to the table.

Step 2: Configure the ClickHouse cluster firewall to give access to the Kafka Connect cluster

The Kafka Connect public IPs need to be added to the firewall allow list for the ClickHouse cluster. See this guide for detailed instructions on how do this. If the ClickHouse cluster is part of the Instaclustr-managed service, you can also use the Connected Clusters page on the Kafka Connect cluster to add the IPs with the click of a button as shown below:

At the end of this step, the ClickHouse firewall rules should list the Kafka Connect IPs:

Step 3: Manually create the Kafka topic in which messages will be sent to the ClickHouse table

This step is only required if:

  • You do not already have a topic on your Kafka cluster you want to use for forwarding messages to your ClickHouse table AND
  • You did not include “Enable Topic Auto-Create” when provisioning your Kafka cluster; OR you did but want to override the default number of partitions per topic and topic replication factor.

Create the topic by sending a POST API request to https://api.instaclustr.com/cluster-management/v2/resources/applications/kafka/topics/v3 with the following payload:

Note: make sure that the desired topic name is not already being used by another topic on your Kafka cluster.

Step 4: Connect the Kafka topic with the ClickHouse table

Send a POST API request to https://<KAFKA CONNECT NODE 1 IP>:8083/connectors with the following credentials and payload (the example below uses curl). Note that this example includes only the minimal required parameters, and a full list of all configuration options can be found at the official documentation which can be accessed via the project repository:

  • If the desired topic name you specify in the payload is the name of a topic that already exists on your Kafka cluster, the connector will start using this preexisting topic instead of creating a new one for you. If the topic has already received messages used for other purposes, this can result in connector failure if the messages are not in the format specified in the payload (see note below), or can cause erroneous data insertions if they are in the correct format but are not messages you want inserted in your ClickHouse table. Note that this will not be an issue for any topics you created in Step 3 as these were newly created specifically for using with the connector.
  • Additionally, please keep in mind that any future messages sent on the topic specified in the payload will be forwarded to your ClickHouse table for insertion, so they should always be correctly formatted and intended for the table.
  • The above configuration example uses JsonConverter as the value converter, meaning that the Kafka messages being forwarded to ClickHouse must be in JSONEachRow format. If your Kafka cluster includes Karapace Schema Registry and you would like to use AvroConverter, modify the payload by changing the value of value.converter to “io.confluent.connect.avro.AvroConverter”, changing the value of “value.converter.schemas.enable” to “false”, and adding “value.converter.schema.registry.url”: “<SCHEMA REGISTRY HOST>:PORT”. Additionally, see the official documentation (which can be accessed via the project repository) for a reference to how Kafka Connect data types are mapped to ClickHouse data types.
  • Multiple topic-table mappings can be configured by delimiting the values in “topics” and “topic2TableMap” with commas. Topics and tables can occur multiple times in “topic2TableMap”, so it is possible to map the same topic to multiple tables, or multiple topics to the same table. Make sure to repeat Step 3 for however many topics you want to manually create.

Upon successfully configuring the connector, you will receive a 20X response containing the configuration. You can also verify the connector’s status in the Active Connectors page in the Kafka Connect cluster:

Step 5: Post messages to the Kafka topic and ensure they are forwarded to the ClickHouse table

At this stage the connector is active, and any messages in the Kafka topic – preexisting and new – will start to be inserted into the connected ClickHouse table as new rows. For sample instructions on how to produce messages to the topic, see the “Producer / Consumer” examples at the bottom of the Connection Info page for the Kafka Cluster:

If a topic message does not match the schema for the ClickHouse table exactly, the data record will be created as follows:

  • Any key-value pairs present in both the topic message and table schema are included in the new record
  • Any key-value pairs present in the topic message but not the table schema are ignored
  • Any key-value pairs present in the table schema but not the topic message are left as blank in the new record
  • If there is no overlap in key-value pairs, no data record is created

Note: Topic messages which are not in the format specified in the payload will cause the connector to fail, which requires deleting the topic (see Instaclustr’s API Doc) and repeating Step 3 onwards.

Troubleshooting

If messages are not being inserted as new entries into the ClickHouse table, first check the Active Connectors page for the Kafka Connect cluster to see if the connector has failed. If so, you can review the application logs to identify any configuration issues (see this guide for instructions on how to do this). If you require further assistance, contact Instaclustr Support.