Are you navigating the complexities of streaming data from Kafka to OpenSearch? Whether you’re dealing with default configurations or data stream capabilities, this post explores the process while guiding you to ensure efficient data flow.

Why Kafka to OpenSearch integration matters

When integrating two powerhouse tools like Apache Kafka® and OpenSearch®, structuring your data properly is critical. For developers and engineers, this process ensures seamless data flow, optimal search capabilities, and robust real-time analysis. However, without a clear structure, you might encounter issues with data integrity, inefficient querying, and challenges with scalability.

While exploring the configuration documentation for the open source Apache Kafka OpenSearch sink connector, I came across something called “Data Stream”. This looked interesting, as that’s what I’m doing—streaming data from Kafka to OpenSearch. In this blog, we’ll take a closer look at OpenSearch insertion semantics, add (and eventually remove) Kafka keys, and test out the data stream.

By the end, you’ll understand how default indices mimic Kafka’s compacted topics, while data stream indices enforce immutability. Whether you’re building simple integration or complex data pipelines, this blog will help you on your journey to streaming success.

Starting with insertion semantics

debris floats around earth in orbit

The growing number of satellites and debris in low Earth orbit makes it more difficult to insert new satellites into orbit.
(Source: Adobe Stock)

Let’s revisit our approach from ‘How to configure Apache Kafka for OpenSearch sink connector.’ Under the “Data Conversion” section of the configuration documentation, I noticed that the default method used by this sink connector to write data into the OpenSearch Index is “insert”—this means that records with the same document id will be replaced! So, let’s try a new experiment to check what is happening. This time around I’ll use synthetic time series JSON data which includes a sensor name, and temperature fields. I’ll send two records with the same sensor name (here) and different temperatures to a new topic (test4) as follows:

What ends up in the index?

 

We see that there is a new document for every record, with document IDs of “test4+1+0” etc. This is because of the Document Id approach used by default—from the documentation:

key.ignore.id.strategy

Specifies the strategy to generate the Document ID. Only applicable when key.ignore is true or specific topics are configured using topic.key.ignore.

Available strategies:

  • none: No Doc ID is added
  • record.key: Generated from the record’s key
  • topic.partition.offset: Generated as record’s topic+partition+offset

If not specified, the default generation strategy is topic.partition.offset.

Type: string
Default: topic.partition.offset
Valid values: [none, record.key, topic.partition.offset]
Importance: low

Given these settings, every new Kafka record will have a unique document ID and be uniquely inserted into the index. So, the default behaviour is to keep all the records as unique documents, which for streamed time series data makes sense.

What if we try no document ID? i.e. key.ignore.id.strategy=none? After repeating sending the data and searching the index steps on a new topic, here’s the resulting data again:

All the records are still stored in the index, the only difference is in the Document IDs which are now being randomly generated by OpenSearch, so will never conflict, ensuring that all records we send will be stored as separate documents.

Adding Kafka keys

AI-generated image of a Franz Kafka holding many old-fashioned keys

Franz Kafka with keys!
(Source: AI generated with ChatGPT)

Our example so far doesn’t use Kafka keys, and even if we send records with keys the current OpenSearch sink connector configurations would ignore them. To have more control over the document ID to better understand the default indexing behavior we will try to use explicit Kafka Keys. A little-known “trick” with the Kafka CLI producer allows keys to be added to the records with this addition (I initially used ‘:’ as the separator but that failed due to ‘:’ in JSON hence the choice of ‘=’):

And turning keys “on” in the configuration with:

Our producer data now looks like this, which is interpreted as the record key first followed by the value:

Uh oh! The tasks failed with the following error message in the log file: 

Well, that’s maybe a helpful error message, so I tried again after adding this to the configuration:

But still no good, as the tasks again failed but with a new error message now:

So, it looks like it doesn’t like keys being JSONwhich is “odd” as it accepts this configuration option: 

The MAP error gives a hint that it doesn’t expect a JSON name/value pair format for a key. Well, how about a simple String key?

This seems to be working ok, and the document IDs are the String Kafka keys, which means the default index insert behavior is visible for the first time. Existing documents with the same ID are replaced with newer documents, i.e. after sending 3 records with 2 distinct keys to the index, only 2 documents exist, and the document ID {"sensor":"here"} has the latest value {"temp":"20.9"}. But then I also realized that a String is actually also valid JSON, so reverted to a JSON key converter with:

But with the following key=value data records:

Which worked as expected with document IDs of here and there and the same insert index behavior (the document is the latest value):

It’s interesting to note that this behavior is the same as compacted Kafka topics.

Enter the data stream in OpenSearch

AI-generated Matrix-style green pillars of data

Matrix-style data streams!
(Source: AI generated with ChatGPT)

Finally, we are ready to try the OpenSearch data stream option we mentioned at the start. The relevant configuration options are:

data.stream.enabled

Enable use of data streams. If set to true the connector will write to data streams instead of regular indices. Default is false.

  • Type: boolean
  • Default: false
  • Importance: medium

data.stream.prefix

Generic data stream name to write into. If set, it will be used to construct the final data stream name in the form of {data.stream.prefix}-{topic}.

  • Type: string
  • Default: null
  • Valid values: non-empty string
  • Importance: medium

data.stream.timestamp.field

The Kafka record field to use as the timestamp for the @timestamp field in documents sent to a data stream. The default is @timestamp.

  • Type: string
  • Default: @timestamp
  • Valid values: non-empty string
  • Importance: medium

The default which uses the Kafka record timestamp metadata field is fine, and it’s difficult to see what else could be used instead. Here’s the full sink connector configuration for my first experiment with data streams:

And sending the revised JSON key:value data to topic test8:

Let’s see what the index contains (note the need to change the index name to include the data stream prefix):

This index now has a timestamp field, but the “here” temp isn’t the latest, just the first value. So maybe that’s what an OpenSearch data stream index does, it only allows one (the first) document for any given document ID?

Let’s try changing the configuration of behavior.on.version.conflict to ignore as the error message suggested. The connector tasks don’t fail anymore, but we still get a warning about a version conflict. So, it looks like we have to make a final change and drop the Kafka key, as data streams enforce immutable documents for each document IDs.

We now change the configuration to:

With the final configuration as follows:

And sending the original test data (with no keys):

And checking the result:

We see that each record has been stored as a separate document with timestamps. We do seem to have come full circle here, as this is the same behaviour as using non-data stream indices, with the addition of timestamps. However, data stream indices are better optimized for append-only data with timestamps built-in, and one of our OpenSearch gurus suggests that data stream indices are likely to simplify operational complexity, but may not perform better than just using the Index State Management Plugin, which is one of several OpenSearch Plugins supported on our platform.

With this particular OpenSearch sink connector, it also seems that the only way to get a repeating Kafka key into OpenSearch is to use the default indexing strategy, but the key is then only available as the Document ID—this may be a limitation for some use cases. This was also an issue with some other technologies that I’ve explored recently, including RisingWave – you often have to copy the key into a field in the value part of the record in advance.

If the Kafka Key never repeats (which may be the case if it’s generated as a random UUID for example), or you can safely ignore the Kafka key, then you can also use data streams, with the added benefit that timestamps are automatically added. A potential future enhancement to this connector could be a new configuration option to allow the Kafka key to be treated similarly to the timestamp metadata field and also be automatically added to the document.

What did we discover at a high level? The default indices behave like Kafka compacted topics (retaining the latest value for each document ID), and the data stream indices enforce immutability—once created, the documents associated with each document ID are unchangeable.

Conclusion

Data stream indices store each record as a separate document with timestamps, offering similar behaviour to non-data stream indices but optimized for append-only data. While data streams can simplify operational complexity, they may not outperform solutions like the Index State Management Plugin, which is supported on our platform.

Currently, the OpenSearch sink connector requires default indexing to use Kafka keys as document IDs. This approach can be limiting, as Kafka keys are unavailable as separate fields unless manually included in record values. This was also an issue with some other technologies that I’ve explored recently, including RisingWave—you often have to copy the key into a field in the value part of the record in advance. If the Kafka key never repeats (which may be the case if it’s generated as a random UUID for example) you can also use data streams. Future enhancements could expand the connector’s functionality by integrating Kafka keys directly into documents, much like the timestamp metadata field.

At a high level, the default indices behave like Kafka compacted topics, retaining only the latest value for each document ID, whereas data stream indices enforce immutability to maintain all records tied to their unique document IDs.

Take the next step in mastering Kafka to OpenSearch integration by exploring our managed services and expert resources. Whether you’re building robust data pipelines or seeking operational simplicity, we provide the tools and support to help you succeed.