Use Apache Kafka with Python
There are many Kafka clients for Python, a list of some recommended options can be found here. In this example we’ll be using kafka-python client.
Dependencies
Add kafka-python to your requirements.txt file or install it manually with pip install kafka-python.
Using client ⇆ broker encryption (SSL)
If you have chosen to enable client ⇆ broker encryption on your Kafka cluster, see here for information on the certificates required to establish an SSL connection to your Kafka cluster.
Producing Messages
Messages are produced to Kafka using a Producer object. In this example we provide only the required properties for the producer. See here for the full list of configuration options.
1 2 3 4 5 6 7 8 9 10 |
from kafka import KafkaProducer producer = KafkaProducer( bootstrap_servers='54.162.160.113:9092,52.2.170.90:9092,44.216.58.106:9092', ssl_cafile='cluster-ca-certificate.pem', security_protocol='SASL_SSL', sasl_mechanism='SCRAM-SHA-256', sasl_plain_username='[USER NAME]' , sasl_plain_password='[USER PASSWORD]', ) |
Ensure the IP addresses, cluster certificate location and password are correct. If your Kafka cluster does not have client ⇆ broker encryption enabled your configuration options should look like this:
1 2 3 4 5 |
bootstrap_servers='54.162.160.113:9092,52.2.170.90:9092,44.216.58.106:9092', security_protocol='SASL_PLAINTEXT', sasl_mechanism='SCRAM-SHA-256', sasl_plain_username='[USER NAME]', sasl_plain_password='[USER PASSWORD]', |
Make sure the IP addresses and password are correct.
Note: To connect to your Kafka cluster over the private network, use port 9093 instead of 9092.
Now that we have a Producer, sending a message is trivial:
1 2 3 |
producer.send('my-topic', b'test') producer.flush() print('Published message') |
Note: We use the producer’s flush method here to ensure the message gets sent before the program exits. In normal operation the producer will send messages in batches when it has either accumulated a certain number of messages, or has waited a certain amount of time.
Consuming Messages
Messages from Kafka are consumed using a Consumer object. In this example we provide only the required properties for the consumer. See here for the full list of configuration options.
1 2 3 4 5 6 7 8 9 10 11 12 13 |
from kafka import KafkaConsumer consumer = KafkaConsumer( 'my-topic', bootstrap_servers='54.162.160.113:9092,52.2.170.90:9092,44.216.58.106:9092', ssl_cafile='cluster-ca-certificate.pem', security_protocol='SASL_SSL', sasl_mechanism='SCRAM-SHA-256', sasl_plain_username='[USER NAME]' , sasl_plain_password='[USER PASSWORD]', auto_offset_reset='earliest', group_id='my-group' ) |
Ensure the IP addresses, cluster certificate location and password are correct. If your Kafka cluster does not have client ⇆ broker encryption enabled your configuration options should look like this:
1 2 3 4 5 6 7 8 |
'my-topic', bootstrap_servers='54.162.160.113:9092,52.2.170.90:9092,44.216.58.106:9092', security_protocol='SASL_PLAINTEXT', sasl_mechanism='SCRAM-SHA-256', sasl_plain_username='[USER NAME]', sasl_plain_password='[USER PASSWORD]', auto_offset_reset='earliest', group_id='my-group' |
Make sure the IP addresses and password are correct.
Note: To connect to your Kafka cluster over the private network, use port 9093 instead of 9092.
Then we can continually consume messages from Kafka as they are produced:
1 2 3 4 5 6 7 8 |
try: for message in consumer: if message: print(f"Received message: {message.value.decode('utf-8')}") except Exception as e: print(f"An exception occurred: {e}") finally: consumer.close() |
Client ⇄ Broker Encryption & Mutual Authentication (mTLS)
If your Kafka cluster does have MTLS enabled, most of the steps are the same as other types, we just describe the different configurations for MTLS.
Your producer configuration options should look like this:
1 2 3 4 5 6 |
bootstrap_servers='54.162.160.113:9082,52.2.170.90:9082,44.216.58.106:9082', security_protocol='SSL', ssl_cafile='CARoot.pem', ssl_certfile='certificate.pem', ssl_keyfile='key.pem', ssl_password='<key-password>', |
The consumer configuration should look like this:
1 2 3 4 5 6 7 8 9 |
'my-topic', bootstrap_servers='34.192.31.54:9082,52.5.101.12:9082,3.95.89.64:9082', security_protocol='SSL', ssl_cafile='CARoot.pem', ssl_certfile='certificate.pem', ssl_keyfile='key.pem', ssl_password='<key-password>', auto_offset_reset='earliest', group_id='my-group' |
kafka-python uses .pem files to connect to Kafka. For this purpose we will have to convert the JKS files to PEM with the help of keytool and openssl commands.
Please read more details about how to configure keystore to use mtls authentication with clients here.
After you get the keystore file, you could generate key and certificated files from it. Read this document for more details.
Note: You could use either cluster-ca-certificate.pem (downloaded from “Connection Info” page) or CARoot.pem for the “ssl_cafile” param in the configuration.
Putting Them Together
Now that we have a consumer and producer setup, it’s time to combine them.
Start the consumer
Start the consumer before starting the producer because by default consumers only consume messages that were produced after the consumer started.
Start the producer
Now that the consumer is setup and ready to consume messages, you can now start your producer.
If the consumer and producer are setup correctly then the consumer should output the message sent by the producer shortly after it was produced:
1 |
Received message: test |