This blog provides an overview of the two fundamental concepts in Apache Kafka: Topics and Partitions. While developing and scaling our Anomalia Machina application we have discovered that distributed applications using Apache Kafka and Cassandra clusters require careful tuning to achieve close to linear scalability, and critical variables included the number of Apache Kafka topics and partitions. In this blog, we test that theory and answer questions like “What impact does increasing partitions have on throughput?” and “Is there an optimal number of partitions for a cluster to maximize write throughput?” And more!
1. Introduction to Kafka Partitions
Two fundamental concepts in Apache Kafka are Topics and Partitions.
Topics enable Kafka producers and Kafka consumers to be loosely coupled (isolated from each other), and are the mechanism that Apache Kafka uses to filter and deliver messages to specific consumers. Consumers subscribe to 1 or more topics of interest and receive messages that are sent to those topics by producers.
Partitions are the main concurrency mechanism in Kafka. A topic is divided into 1 or more partitions, enabling producer and consumer loads to be scaled. Specifically, a consumer group supports as many consumers as partitions for a topic. The consumers are shared evenly across the partitions, allowing for the consumer load to be linearly scaled by increasing both consumers and partitions. You can have fewer consumers than partitions (in which case consumers get messages from multiple partitions), but if you have more consumers than partitions some of the consumers will be “starved” and not receive any messages until the number of consumers drops to (or below) the number of partitions. i.e. consumers don’t share partitions (unless they are in different consumer groups).
Here are some useful partition facts:
- Each topic can have 1 or more partitions. There is no theoretical upper limit. You can request as many partitions as you like, but there are practical limits.
- The size (in terms of messages stored) of partitions is limited to what can fit on a single node. If you have more data in a topic than can fit on a single node you must increase the number of partitions. Partitions are spread across the nodes in a Kafka cluster.
- Message ordering in Kafka is per partition only.
- If you are using an (optional) message key (required for event ordering within partitions, otherwise events are round-robin load balanced across the partitions – and therefore not ordered), then you need to ensure you have many more distinct keys (> 20 is a good start) than partitions otherwise partitions may get unbalanced, and in some cases may not even have any messages (due to hash collisions).
- Partitions can have copies to increase durability and availability and enable Kafka to failover to a broker with a replica of the partition if the broker with the leader partition fails. This is called the Replication Factor and can be 1 or more.
The total number of copies of a partition is the replication factor. i.e. RF=1 means that the leader has the sole copy of the partition (there are no followers); 2 means there are 2 copies of the partition (the leader and a follower), and 3 means there are 3 copies (1 leader and 2 followers). Note that the partition leader handles all writes and reads, as followers are purely for failover. Cleverly, followers just run Consumers to poll the data from the leaders. Partitions and Replication Factor can be configured cluster-wide or set/checked per topic (with the ic-Kafka-topics command for Instaclustr managed Kafka clusters).
The following diagrams (from the insidebigdata series we published last year on Apache Kafka architecture) illustrate how Kafka partitions and leaders/followers work for a simple example (1 topic and 4 partitions), enable Kafka write scalability (including replication), and read scalability:
2. Kafka Partitions and Replication Factor
We were curious to better understand the relationship between the number of partitions and the throughput of Kafka clusters. While developing and scaling our Anomalia Machina application we have discovered that distributed applications using Apache Kafka and Cassandra clusters require careful tuning to achieve close to linear scalability, and critical variables included the number of topics and partitions. We had also noticed that even without a load on the Kafka cluster (writes or reads), there was measurable CPU utilization which appeared to be correlated with having more partitions. We had a theory that the overhead was due to (attempted) message replication – i.e. the polling of the leader partitions by the followers. If this is true then for a replication factor of 1 (leaders only) there would be no CPU overhead with increasing partitions as there are no followers polling the leaders. Conversely, increasing the replication factor will result in increased overhead. Our methodology to test this theory was simply to measure the CPU utilization while increasing the number of partitions gradually for different replication factors.
The test setup used a small production Instaclustr managed Apache Kafka cluster as follows:
3 nodes x r5.xlarge (4 cores, 32GB RAM) Instaclustr managed Kafka cluster (12 cores in total)
This graph shows the CPU overhead on the Kafka cluster with partitions increasing from 1 to 20,000, with replication factor 1 (blue), 2 (orange), and 3 (grey), for 1 topic. We also tried 100 topics (yellow, RF=3) with increasing partitions for each topic giving the same number of total partitions.
This graph confirms that CPU overhead increases due to increasing replication factor and partitions, as CPU with RF=1 is constant (blue). It also demonstrates that overhead is higher with increasing topics (but the same number of total partitions, yellow), i.e. 100 topics with 200 partitions each have more overhead than 1 topic with 20,000 partitions.
Note that we used up to 20,000 partitions purely to check our theory. In practice, too many partitions can cause long periods of unavailability if a broker fails. If there are many partitions it takes a long time (potentially 10s of seconds) to elect new leaders for all the partitions with leaders that are on the failed broker. Also note that If the partitions are increased (e.g. using the ic-Kafka-topics command) too fast, or to a value that is too large, then the clusters can be overloaded and may become unresponsive. It pays to increase the number of Kafka partitions in small increments and wait until the CPU utilization has dropped back again.
Transparent, fair, and flexible pricing for your data infrastructure: See Instaclustr Pricing Here
3. Partitions and Producer Throughput
Next, we wanted to find out a couple of things with more practical applications: What impact does increasing Kafka partitions have on throughput? And is there is an optimal number of partitions for a cluster (of this size) to maximize write throughput?
Our methodology was to initially deploy the Kafka producer from our Anomalia Machina application as a load generator on another EC2 instance as follows:
1 x m4.4xlarge (16 core, 64GB RAM) EC2 instance
This isn’t a particularly large EC2 instance, but Kafka producers are very lightweight and the CPU utilization was consistently under 70% on this instance. We ran a series of load tests with a multi-threaded producer, gradually increasing the number of threads and therefore increasing the arrival rate until an obvious peak was found. We repeated this test for different numbers of partitions. The replication factor was 3, and the message size was 80 bytes. Here’s a graph showing one run for 3 partitions showing producer threads vs. arrival rate, with a peak at 4 threads.
Repeating this process for 3 to 5,000 partitions we recorded the maximum arrival rate for each number of partitions resulting in this graph (note that the x-axis, partitions, is logarithmic), which shows that the optimal write throughput is reached at 12 partitions, dropping substantially above 100 partitions. The throughput at 5,000 partitions is only 28% of the maximum throughput. There is however only a 7% variation in throughput between 3 and 100 partitions, showing that the number of partitions isn’t really critical until exceeding more than 100.
Twelve partitions also correspond to the total number of CPU cores in the Kafka cluster (3 nodes with 4 CPU cores each). Note that the total number of followers is (RF-1) x partitions = (3-1) x 12 = 24 which is higher but still in the “sweet spot” between 12 and 100 on the graph, and maximizes the utilization of the available 12 CPU cores.
4. End-to-end Throughput and Latency Experiment
Real Kafka clusters naturally have messages going in and out, so for the next experiment, we deployed a complete application using both the Anomalia Machine Kafka producers and consumers (with the anomaly detector pipeline disabled as we are only interested in Kafka message throughput). We used a single topic with 12 partitions, a producer with multiple threads, and 12 consumers. We monitored the producer and consumer message rates (to ensure the consumers were keeping up), and the total end-to-end latency (time from a message sent to message receive).