Use Kafka with C#
There are many Kafka clients for C#, a list of some recommended options to use Kafka with C# can be found here. In this example, we’ll be using Confluent’s kafka-dotnet client.
Dependencies
Add the Confluent.Kafka package to your application. This package is available via NuGet. You can install Confluent. Kafka from within Visual Studio by searching for Confluent.Kafka in the NuGet UI, or by running this command in the Package Manager Console:
Install-Package Confluent.Kafka -Version 0.11.4
Using client ⇆ broker encryption (SSL)
If you have chosen to enable client ⇆ broker encryption on your Kafka cluster, see here for information on the certificates required to establish an SSL connection to your Kafka cluster.
Producing Messages
Messages are produced to Kafka using a Producer Builder. In this example, we provide only the required properties for the producer. See here for the full list of configuration options.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | var config = new Dictionary<string, object> { using System; using System.Threading.Tasks; using Confluent.Kafka; var config = new ProducerConfig { BootstrapServers = "34.196.151.95:9092,3.213.212.236:9092,18.213.93.46:9092", SslCaLocation = "/Path-to/cluster-ca-certificate.pem", SecurityProtocol = SecurityProtocol.SaslSsl, SaslMechanism = SaslMechanism.ScramSha256, SaslUsername = "ickafka", SaslPassword = "493f586cc469a59987c8a9148669b9ecc570bb031bf3fa639e894be185331dce", }; |
Ensure the IP addresses, cluster certificate location and password are correct. If your Kafka cluster does not have client ⇆ broker encryption enabled your configuration options should look like this:
1 2 3 4 5 6 | ]BootstrapServers = "52.205.78.117:9092,3.83.28.35:9092,54.147.176.197:9092", SecurityProtocol = SecurityProtocol.SaslPlaintext, SaslMechanism = SaslMechanism.ScramSha256, SaslUsername = "ickafka", SaslPassword = "d54bcd975a5281876958820d206232a9b99dc750d4e25bb53ce269970fb", AutoOffsetReset = AutoOffsetReset.Earliest, |
Make sure the IP addresses and passwords are correct.
Note: To connect to your Kafka cluster over the private network, use port 9093 instead of 9092.
Now that we have setup the configuration Dictionary, we can create a Producer object:
1 | using (var p = new ProducerBuilder<Null, string>(config).Build()) |
Once we have a Producer object we can use it to send a message to Kafka:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | using (var p = new ProducerBuilder<Null, string>(config).Build()) { try { var dr = await p.ProduceAsync("test-topic", new Message<Null, string> { Value="test" }); Console.WriteLine($"Delivered '{dr.Value}' to '{dr.TopicPartitionOffset}'"); } catch (ProduceException<Null, string> e) { Console.WriteLine($"Delivery failed: {e.Error.Reason}"); } } } } |
Note: we move the producer declaration into a using block here so it will be automatically closed cleanly once we’re finished with it.
Consuming Messages
Messages from Kafka are consumed using a Consumer object. In this example we provide only the required properties for the consumer. See here for the full list of configuration options.
First, create the configuration Dictionary:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | using System; using System.Threading.Tasks; using Confluent.Kafka; { var config = new ConsumerConfig { GroupId = "test-consumer-group", BootstrapServers = "34.196.151.95:9092,3.213.212.236:9092,18.213.93.46:9092", SslCaLocation = "/PathTO/cluster-ca-certificate.pem", SecurityProtocol = SecurityProtocol.SaslSsl, SaslMechanism = SaslMechanism.ScramSha256, SaslUsername = "ickafka", SaslPassword = "493f586cc469a59987c8a9148669b9ecc570bb031bf3fa639e894be185331dce", AutoOffsetReset = AutoOffsetReset.Earliest, } } |
Ensure the IP addresses, cluster certificate location and password are correct. If your Kafka cluster does not have client ⇆ broker encryption enabled your configuration options should look like this:
1 2 3 4 5 6 | BootstrapServers = "52.205.78.117:9092,3.83.28.35:9092,54.147.176.197:9092", SecurityProtocol = SecurityProtocol.SaslPlaintext, SaslMechanism = SaslMechanism.ScramSha256, SaslUsername = "ickafka", SaslPassword = "d54bcd975a5281876958820d206232a9b99dc750d9035b4e25bb53ce269970fb", AutoOffsetReset = AutoOffsetReset.Earliest, |
Make sure the IP addresses and password are correct.
Note: To connect to your Kafka cluster over the private network, use port 9093 instead of 9092.
Now that we have the configuration Dictionary we can create a Consumer object:
1 | using (var c = new ConsumerBuilder<Ignore, string>(conf).Build()) |
Before we can consume messages, we need to subscribe to the topics we wish to receive messages from:
1 | c.Subscribe("my-topic"); |
Combined with a loop, we can continually consume and output messages from Kafka as they are produced:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 | using (var c = new ConsumerBuilder<Ignore, string>(config).Build()) { c.Subscribe("my-topic"); CancellationTokenSource cts = new CancellationTokenSource(); Console.CancelKeyPress += (_, e) => { e.Cancel = true; // prevent the process from terminating. cts.Cancel(); }; try { while (true) { try { var cr = c.Consume(cts.Token); Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'."); } catch (ConsumeException e) { Console.WriteLine($"Error occurred: {e.Error.Reason}"); } } } catch (OperationCanceledException) { // Ensure the consumer leaves the group cleanly and final offsets are committed. c.Close(); } |
Note: we move the consumer declaration into a using block here so it will be automatically closed cleanly once we’re finished with it.
Putting Them Together
Now that we have a consumer and producer setup, it’s time to combine them.
Start the consumer
Start the consumer before starting the producer because by default consumers only consume messages that were produced after the consumer started.
Start the producer
Now that the consumer is setup and ready to consume messages, you can now start your producer.
If the consumer and producer are setup correctly the consumer should output the message sent by the producer shortly after it was produced.
1 | Consumed message 'test' at: 'my-topic [[0]] @0'. |
How would you use “Producer” method in your assembly as it’s “Internal” class..?
Hi Arun The code above works as-is with the version of the C# Kafka client mentioned (0.11.4). Since we published this article, the client library has changed and you would need to use the ProducerBuilder to instantiate a Producer. Thanks for bringing this to our attention we will update this article in the future.
Our Kafka Service using SSL and the admin give as 3 files:
1. ca.perm
2. service.cert
3. service.key
I try to set up the Consumer.Config but I could not talk to the server. Do you have any experience to connect to Kafka using SSL?
Thanks,
Jdang