Use Apache Kafka with Ruby
There are many Kafka clients for Ruby, a list of some recommended options can be found here. In this example we’ll be using Zendesk’s ruby-kafka client.
Imports
Add the ruby-kafka package to your application, either by adding gem ‘ruby-kafka’ to your Gemfile or installing it manually with gem install ruby-kafka.
Using client ⇆ broker encryption (SSL)
If you have chosen to enable client ⇆ broker encryption, see here for information on the certificates required to establish an SSL connection to your cluster.
Producing Messages
Messages are produced to Kafka using a Kafka client Producer object. In this example we provide only the required properties for the Kafka client. See here for more details on configuration options.
1 2 3 4 5 6 7 8 9 |
require "kafka" kafka = Kafka.new( ["18.204.134.49:9092","18.208.108.53:9092","34.194.230.138:9092"], ssl_ca_cert: File.read('cluster-ca-certificate.pem'), sasl_scram_mechanism: 'sha256', sasl_scram_username: 'ickafka', sasl_scram_password: '361d4871ff1a5ef58deaf3b887b4898029faee9690e62c549078a1f51f18f755' ) producer = kafka.producer |
Ensure the IP addresses, cluster certificate location and SCRAM password are correct. If your Kafka cluster does not have client ⇆ broker encryption enable, replace the ssl_ca_cert line with sasl_over_ssl: false.
Note: To connect to your Kafka cluster over the private network, use port 9093 instead of 9092.
Now that we have a Producer, sending a message is trivial:
1 2 |
producer.produce("test",topic: "my-topic") producer.deliver_messages |
Consuming Messages
Messages from Kafka are consumed using a Kafka client Consumer object. In this example we provide only the required properties for the Kafka client and consumer. See here for the full list of configuration options.
1 2 3 4 5 6 7 8 9 |
require "kafka" kafka = Kafka.new( ["18.204.134.49:9092","18.208.108.53:9092","34.194.230.138:9092"], ssl_ca_cert: File.read('cluster-ca-certificate.pem'),, sasl_scram_mechanism: 'sha256', sasl_scram_username: 'ickafka', sasl_scram_password: '361d4871ff1a5ef58deaf3b887b4898029faee9690e62c549078a1f51f18f755' ) consumer = kafka.consumer(group_id: "my-group") |
Ensure the IP addresses, cluster certificate location and SCRAM password are correct. If your Kafka cluster does not have client ⇆ broker encryption enable, replace the ssl_ca_cert line with sasl_over_ssl: false.
Note: To connect to your Kafka cluster over the private network, use port 9093 instead of 9092.
Before we can consume messages, we need to subscribe to the topics we wish to receive messages from:
1 |
consumer.subscribe("my-topic") |
Now we are ready to consume messages from Kafka. We use the consumer’s each_message method to output every consumed message:
1 2 3 |
consumer.each_message do |message| puts "#{message.topic}, #{message.partition}, #{message.offset}, #{message.key}, #{message.value}" end |
Putting Them Together
Now that we have a consumer and producer setup, it’s time to combine them.
Start the consumer
Start the consumer before starting the producer because by default consumers only consume messages that were produced after the consumer started.
Start the producer
Now that the consumer is setup and ready to consume messages, you can now start your producer.
If the consumer and producer are setup correctly the consumer should output the message sent by the producer shortly after it was produced:
1 |
my-topic, 5, 0, , test |