Are you navigating the complexities of streaming data from Kafka to OpenSearch? Whether you’re dealing with default configurations or data stream capabilities, this post explores the process while guiding you to ensure efficient data flow.
Why Kafka to OpenSearch integration matters
When integrating two powerhouse tools like Apache Kafka® and OpenSearch®, structuring your data properly is critical. For developers and engineers, this process ensures seamless data flow, optimal search capabilities, and robust real-time analysis. However, without a clear structure, you might encounter issues with data integrity, inefficient querying, and challenges with scalability.
While exploring the configuration documentation for the open source Apache Kafka OpenSearch sink connector, I came across something called “Data Stream”. This looked interesting, as that’s what I’m doing—streaming data from Kafka to OpenSearch. In this blog, we’ll take a closer look at OpenSearch insertion semantics, add (and eventually remove) Kafka keys, and test out the data stream.
By the end, you’ll understand how default indices mimic Kafka’s compacted topics, while data stream indices enforce immutability. Whether you’re building simple integration or complex data pipelines, this blog will help you on your journey to streaming success.
Starting with insertion semantics
The growing number of satellites and debris in low Earth orbit makes it more difficult to insert new satellites into orbit.
(Source: Adobe Stock)
Let’s revisit our approach from ‘How to configure Apache Kafka for OpenSearch sink connector.’ Under the “Data Conversion” section of the configuration documentation, I noticed that the default method used by this sink connector to write data into the OpenSearch Index is “insert”—this means that records with the same document id will be replaced! So, let’s try a new experiment to check what is happening. This time around I’ll use synthetic time series JSON data which includes a sensor name, and temperature fields. I’ll send two records with the same sensor name (here
) and different temperatures to a new topic (test4
) as follows:
1 2 3 |
{"sensor":"here", "temp":"20.1"} {"sensor":"there", "temp":"22.7"} {"sensor":"here", "temp":"20.9"} |
What ends up in the index?
1 2 3 4 5 6 7 |
curl -XGET -u OS_User "OS_URL:9200/test4/_search" -H 'Content-Type: application/json' -d' { "query": { "match_all": {} } } ' |
1 2 3 4 5 6 7 8 9 |
{"took":1,"timed_out":false,"_shards": {"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total": {"value":3,"relation":"eq"},"max_score":1.0,"hits": [{"_index":"test4","_id":"test4+1+0","_score":1.0,"_source": {"temp":"20.1","sensor":"here"}}, {"_index":"test4","_id":"test4+1+1","_score":1.0,"_source": {"temp":"22.7","sensor":"there"}}, {"_index":"test4","_id":"test4+1+2","_score":1.0,"_source": {"temp":"20.9","sensor":"here"}}]}}% |
We see that there is a new document for every record, with document IDs of “test4+1+0” etc. This is because of the Document Id approach used by default—from the documentation:
key.ignore.id.strategy
Specifies the strategy to generate the Document ID. Only applicable when
key.ignore
is true or specific topics are configured usingtopic.key.ignore
.Available strategies:
none
: No Doc ID is addedrecord.key
: Generated from the record’s keytopic.partition.offset
: Generated as record’s topic+partition+offsetIf not specified, the default generation strategy is
topic.partition.offset
.Type: string
Default:topic.partition.offset
Valid values: [none, record.key, topic.partition.offset]
Importance: low
Given these settings, every new Kafka record will have a unique document ID and be uniquely inserted into the index. So, the default behaviour is to keep all the records as unique documents, which for streamed time series data makes sense.
What if we try no document ID? i.e. key.ignore.id.strategy=none
? After repeating sending the data and searching the index steps on a new topic, here’s the resulting data again:
1 2 3 4 5 6 7 8 9 |
{"took":73,"timed_out":false,"_shards": {"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total": {"value":3,"relation":"eq"},"max_score":1.0,"hits": [{"_index":"test5","_id":"KXMO9JUBbKqBPrAOPWlX","_score":1.0,"_source": {"temp":"20.1","sensor":"here"}}, {"_index":"test5","_id":"J3MO9JUBbKqBPrAOPWlX","_score":1.0,"_source": {"temp":"22.7","sensor":"there"}}, {"_index":"test5","_id":"KHMO9JUBbKqBPrAOPWlX","_score":1.0,"_source": {"temp":"20.9","sensor":"here"}}]}}% |
All the records are still stored in the index, the only difference is in the Document IDs which are now being randomly generated by OpenSearch, so will never conflict, ensuring that all records we send will be stored as separate documents.
Adding Kafka keys
Franz Kafka with keys!
(Source: AI generated with ChatGPT)
Our example so far doesn’t use Kafka keys, and even if we send records with keys the current OpenSearch sink connector configurations would ignore them. To have more control over the document ID to better understand the default indexing behavior we will try to use explicit Kafka Keys. A little-known “trick” with the Kafka CLI producer allows keys to be added to the records with this addition (I initially used ‘:
’ as the separator but that failed due to ‘:
’ in JSON hence the choice of ‘=
’):
1 |
--property "parse.key=true" --property "key.separator==" |
And turning keys “on” in the configuration with:
1 |
"key.ignore": "false" |
Our producer data now looks like this, which is interpreted as the record key first followed by the value:
1 2 3 |
{"sensor":"here"}={"temp":"20.1"} {"sensor":"there"}={"temp":"22.7"} {"sensor":"here"}={"temp":"20.9"} |
Uh oh! The tasks failed with the following error message in the log file:
1 2 3 4 5 6 7 8 9 10 |
ERROR Error encountered in task pauls_os_sink_test6-1. Executing stage 'KEY_CONVERTER' with class 'org.apache.kafka.connect.json.JsonConverter'. (org.apache.kafka.connect.runtime.errors.LogReporter:70) org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration. |
Well, that’s maybe a helpful error message, so I tried again after adding this to the configuration:
1 |
"key.converter.schemas.enable":"false" |
But still no good, as the tasks again failed but with a new error message now:
1 2 3 4 5 6 7 |
org.apache.kafka.connect.errors.DataException: MAP is not supported as the document id. Supported are: [Schema{INT8}, Schema{INT16}, Schema{INT32}, Schema{INT64}, Schema{STRING}] ERROR Can't convert record from topic test6 with partition 1 and offset 1. Reason: (io.aiven.kafka.connect.opensearch.OpensearchSinkTask:141) |
So, it looks like it doesn’t like keys being JSON—which is “odd” as it accepts this configuration option:
1 |
"key.converter": "org.apache.kafka.connect.json.JsonConverter |
The MAP error gives a hint that it doesn’t expect a JSON name/value pair format for a key. Well, how about a simple String key?
1 |
"key.converter": "org.apache.kafka.connect.storage.StringConverter" |
1 2 3 4 5 |
{"took":960,"timed_out":false,"_shards": {"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total": {"value":2,"relation":"eq"},"max_score":1.0,"hits":[{"_index":"test6","_id":" {\"sensor\":\"there\"}","_score":1.0,"_source":{"temp":"22.7"}},{"_index":"test6","_id":" {\"sensor\":\"here\"}","_score":1.0,"_source":{"temp":"20.9"}}]}}% |
This seems to be working ok, and the document IDs are the String Kafka keys, which means the default index insert behavior is visible for the first time. Existing documents with the same ID are replaced with newer documents, i.e. after sending 3 records with 2 distinct keys to the index, only 2 documents exist, and the document ID {"sensor":"here"}
has the latest value {"temp":"20.9"}
. But then I also realized that a String is actually also valid JSON, so reverted to a JSON key converter with:
1 |
"key.converter": "org.apache.kafka.connect.json.JsonConverter |
But with the following key=value data records:
1 2 3 |
"here"={"temp":"20.1"} "there"={"temp":"22.7"} "here"={"temp":"20.9"} |
Which worked as expected with document IDs of here
and there
and the same insert index behavior (the document is the latest value):
1 2 3 4 5 |
{"took":81,"timed_out":false,"_shards": {"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total": {"value":2,"relation":"eq"},"max_score":1.0,"hits": [{"_index":"test7","_id":"there","_score":1.0,"_source":{"temp":"22.7"}}, {"_index":"test7","_id":"here","_score":1.0,"_source":{"temp":"20.9"}}]}}% |
It’s interesting to note that this behavior is the same as compacted Kafka topics.
Enter the data stream in OpenSearch
Matrix-style data streams!
(Source: AI generated with ChatGPT)
Finally, we are ready to try the OpenSearch data stream option we mentioned at the start. The relevant configuration options are:
data.stream.enabled
Enable use of data streams. If set to true the connector will write to data streams instead of regular indices. Default is
false
.
- Type: boolean
- Default: false
- Importance: medium
data.stream.prefix
Generic data stream name to write into. If set, it will be used to construct the final data stream name in the form of
{data.stream.prefix}-{topic}
.
- Type: string
- Default: null
- Valid values: non-empty string
- Importance: medium
data.stream.timestamp.field
The Kafka record field to use as the timestamp for the
@timestamp
field in documents sent to a data stream. The default is@timestamp
.
- Type: string
- Default:
@timestamp
- Valid values: non-empty string
- Importance: medium
The default which uses the Kafka record timestamp metadata field is fine, and it’s difficult to see what else could be used instead. Here’s the full sink connector configuration for my first experiment with data streams:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
{ "name": "pauls_os_sink_test8", "config": { "connector.class": "io.aiven.kafka.connect.opensearch.OpensearchSinkConnector", "topics": "test8", "connection.url": "OS_URL", "connection.username": "OS_User", "connection.password": "OS_Password", "tasks.max": "3", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "errors.deadletterqueue.topic.name": "dead1", "key.ignore": "false", "schema.ignore": "true", "drop.invalid.message": "true", "behavior.on.malformed.documents": "ignore", "errors.log.enable": "true", "errors.log.include.message": "true", "errors.tolerance": "all", "value.converter.schemas.enable": "false", "data.stream.enabled": "true", "data.stream.prefix": "datastream", "key.converter.schemas.enable": "false" } } |
And sending the revised JSON key:value data to topic test8
:
1 2 3 |
"here"={"temp":"20.1"} "there"={"temp":"22.7"} "here"={"temp":"20.9"} |
1 2 3 4 5 6 7 8 9 |
ERROR Encountered a version conflict when executing batch 9 of 1 records. Error was [.ds-datastream-test8-000001/kF- LX8LfRk2GeBJeMf_0dg][[.ds-datastream-test8-000001][0]] OpenSearchException[OpenSearch exception [type=version_conflict_engine_exception, reason=[here]: version conflict, document already exists (current version [1])]] (to ignore version conflicts you may consider changing the configuration property 'behavior.on.version.conflict' from 'fail' to 'ignore'). |
Let’s see what the index contains (note the need to change the index name to include the data stream prefix):
1 2 3 4 5 6 7 |
curl -XGET -u OS_User:OS_Password "OS_URL:9200/datastream-test8/_search" -H 'Content-Type: application/json' -d' { "query": { "match_all": {} } } ' |
1 2 3 4 5 6 7 |
{"took":22,"timed_out":false,"_shards": {"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total": {"value":2,"relation":"eq"},"max_score":1.0,"hits":[{"_index":".ds-datastream-test8- 000001","_id":"there","_score":1.0,"_source": {"temp":"22.7","@timestamp":1743573117018}},{"_index":".ds-datastream-test8- 000001","_id":"here","_score":1.0,"_source": {"temp":"20.1","@timestamp":1743573117013}}]}}% |
This index now has a timestamp field, but the “here” temp isn’t the latest, just the first value. So maybe that’s what an OpenSearch data stream index does, it only allows one (the first) document for any given document ID?
Let’s try changing the configuration of behavior.on.version.conflict
to ignore
as the error message suggested. The connector tasks don’t fail anymore, but we still get a warning about a version conflict. So, it looks like we have to make a final change and drop the Kafka key, as data streams enforce immutable documents for each document IDs.
We now change the configuration to:
1 2 |
key.ignore = true key.ignore.id.strategy=none |
With the final configuration as follows:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
{ "name": "pauls_os_sink_test9", "config": { "connector.class": "io.aiven.kafka.connect.opensearch.OpensearchSinkConnector", "topics": "test9", "connection.url": "OS_URL:9200", "connection.username": "OS_User", "connection.password": "OS_Password", "tasks.max": "3", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "errors.deadletterqueue.topic.name": "dead1", "key.ignore": "true", "key.ignore.id.strategy": "none", "schema.ignore": "true", "drop.invalid.message": "true", "behavior.on.malformed.documents": "ignore", "errors.log.enable": "true", "errors.log.include.message": "true", "errors.tolerance": "all", "value.converter.schemas.enable": "false", "data.stream.enabled": "true", "data.stream.prefix": "datastream", "behavior.on.version.conflict": "warn", "key.converter.schemas.enable": "false" } } |
And sending the original test data (with no keys):
1 2 3 |
{"sensor":"here", "temp":"20.1"} {"sensor":"there", "temp":"22.7"} {"sensor":"here", "temp":"20.9"} |
And checking the result:
1 2 3 4 5 6 7 |
curl -XGET -u OS_User:OS_Password "OS_URL:9200/datastream-test9/_search" -H 'Content-Type: application/json' -d' { "query": { "match_all": {} } } ' |
1 2 3 4 5 6 7 8 9 |
{"took":4,"timed_out":false,"_shards": {"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total": {"value":3,"relation":"eq"},"max_score":1.0,"hits":[{"_index":".ds-datastream-test9- 000001","_id":"F3Mq9ZUBbKqBPrAOj3Q6","_score":1.0,"_source": {"temp":"22.7","sensor":"there","@timestamp":1743574960108}}, {"_index":".ds-datastream-test9-000001","_id":"GHMq9ZUBbKqBPrAOj3Q6","_score":1.0,"_source": {"temp":"20.9","sensor":"here","@timestamp":1743574960108}}, {"_index":".ds-datastream-test9-000001","_id":"FnMq9ZUBbKqBPrAOj3Q2","_score":1.0,"_source": {"temp":"20.1","sensor":"here","@timestamp":1743574960104}}]}}% |
We see that each record has been stored as a separate document with timestamps. We do seem to have come full circle here, as this is the same behaviour as using non-data stream indices, with the addition of timestamps. However, data stream indices are better optimized for append-only data with timestamps built-in, and one of our OpenSearch gurus suggests that data stream indices are likely to simplify operational complexity, but may not perform better than just using the Index State Management Plugin, which is one of several OpenSearch Plugins supported on our platform.
With this particular OpenSearch sink connector, it also seems that the only way to get a repeating Kafka key into OpenSearch is to use the default indexing strategy, but the key is then only available as the Document ID—this may be a limitation for some use cases. This was also an issue with some other technologies that I’ve explored recently, including RisingWave – you often have to copy the key into a field in the value part of the record in advance.
If the Kafka Key never repeats (which may be the case if it’s generated as a random UUID for example), or you can safely ignore the Kafka key, then you can also use data streams, with the added benefit that timestamps are automatically added. A potential future enhancement to this connector could be a new configuration option to allow the Kafka key to be treated similarly to the timestamp metadata field and also be automatically added to the document.
What did we discover at a high level? The default indices behave like Kafka compacted topics (retaining the latest value for each document ID), and the data stream indices enforce immutability—once created, the documents associated with each document ID are unchangeable.
Conclusion
Data stream indices store each record as a separate document with timestamps, offering similar behaviour to non-data stream indices but optimized for append-only data. While data streams can simplify operational complexity, they may not outperform solutions like the Index State Management Plugin, which is supported on our platform.
Currently, the OpenSearch sink connector requires default indexing to use Kafka keys as document IDs. This approach can be limiting, as Kafka keys are unavailable as separate fields unless manually included in record values. This was also an issue with some other technologies that I’ve explored recently, including RisingWave—you often have to copy the key into a field in the value part of the record in advance. If the Kafka key never repeats (which may be the case if it’s generated as a random UUID for example) you can also use data streams. Future enhancements could expand the connector’s functionality by integrating Kafka keys directly into documents, much like the timestamp metadata field.
At a high level, the default indices behave like Kafka compacted topics, retaining only the latest value for each document ID, whereas data stream indices enforce immutability to maintain all records tied to their unique document IDs.
Take the next step in mastering Kafka to OpenSearch integration by exploring our managed services and expert resources. Whether you’re building robust data pipelines or seeking operational simplicity, we provide the tools and support to help you succeed.