https://24b4dt1v60e526bo2p349l4c-wpengine.netdna-ssl.com/wp-content/themes/instaclustr-2020/assets/font/ionicons.ttf?v=2.0.0

Technical — Kafka Tuesday 16th March 2021

Apache Kafka MirrorMaker 2 (MM2) Part 1: Theory

By Paul Brebner

In this new two-part blog series we’ll turn our gaze to the newest version of MirrorMaker 2 (MM2), the Apache Kafka cross-cluster mirroring, or replication, technology. MirrorMaker 2 is built on top of the Kafka Connect framework for increased reliability and scalability, and is suitable for more demanding geo-replication use cases including migration, backup, disaster recovery and fail-over. In the first installment in this series, we’ll focus on MirrorMaker 2 theory (Kafka replication, architecture, components, and terminology) and invent some MirrorMaker 2 rules. Part Two will be more practical, and we’ll try out Instaclustr’s managed MirrorMaker 2 service and test the rules out with some experiments. 

1. Replication—Then and Now

During a recent visit to the National Museum of Australia, I was impressed to see Captain Cook’s logbook on display, but I was surprised that it was only one of three copies. As ships from this era were prone to vanish without a trace, naval captains had to regularly forward a copy of their log to the admiralty in case of accidents. The Endeavour had been at sea for two years before an opportunity to send a copy back arose, so it was only a partial copy of the final log. Cook’s log illustrates some interesting problems with copies, including incompleteness, having differences (errors in transcription, edits, and additions), uncertainty around which is the most “original”, and how many copies even exist (one copy went missing for years—it was found in the Royal Library).

book in the royal library
(Source: Paul Brebner)

Fast forward two centuries and distributed computing and the Internet (modern circumnavigation!) have picked up where sailing ships and physical logbooks left off.  Some examples of replication I’ve come across include broadcast message delivery (reliable message delivery to multiple recipients), mirrored databases (for improved latency and redundancy), caching (e.g. middle-tier with optimistic concurrency control in Enterprise Java, client-side with invalidation messages in Redis), grid computing (solving enormous computing problems with shared computing resources), and sharing geospatial datasets across data custodians with pub-sub (infinite loops can easily occur).

But replicated systems are tricky to get right. In the 1990s, computer scientist Jim Gray published a famous paper, “The Dangers of Replication and a Solution”. Luckily, his solution—partitioning—was adopted by modern scalable Big Data technologies including Apache Cassandra and Kafka. In this blog, we’ll see how Kafka manages geo-replication at speed and scale.

2. Replication in Kafka

“Head of Franz Kafka”:42 replicated mobile tiers form the moving face of Kafka
“Head of Franz Kafka”:42 replicated mobile tiers form the moving face of Kafka
(Source: Shutterstock)

Apache Kafka is basically a distributed streams processing system, designed so that messages can be sent from distributed producers to distributed consumers via a distributed Kafka cluster (see “A visual introduction to Apache Kaka”). Internally, Kafka ensures performance, scalability, availability, and durability through the use of log-based storage, data partitioning, multiple horizontally scalable nodes, and replication from leader to follower partitions/nodes. So, by default, a Kafka cluster replicates messages in several ways: from producers to leader nodes, internally from leader to follower nodes, and finally to consumers, with additional duplication across consumer groups. Even though Kafka supports exactly-once delivery semantics, replication is obviously fundamental to Kafka.

It’s interesting to note that even a single Kafka cluster can be stretched across multiple data centers, as long as the latency is low enough. For example, a typical Instaclustr managed Kafka cluster has nodes deployed to multiple AWS Availability Zones (discrete data centers), in the same region, for improved availability. Last year I learned that Kafka can run as a “stretched” cluster over multiple data centers, possibly across multiple regions, as long as the maximum latency is under 100ms. 

So, what’s better than a single Kafka cluster? Multiple clusters! Multi-cluster Kafka is a common deployment pattern. For example, Netflix pioneered multi-cluster Kafka patterns for high fan-out architectures. And Kafka applications can be designed to produce and consume to and from topics deployed to multiple Kafka clusters for enhanced flexibility, visibility, and scalability. 
Finally, if you want to run multiple Kaka clusters, potentially in different data centers and/or regions, with more than 100ms latency between them, one solution is MirrorMaker. Last year I also learned that MirrorMaker 2 (KIP-382, since Kafka 2.4.0) was available and discovered a few things about it.

3. MirrorMaker 2

Hall of mirrors at Versailles Palace—it’s time to upgrade your mirrors
(Source: Shutterstock)

MirrorMaker 2 (MM2) is designed to make it easier to mirror or replicate topics from one Kafka cluster to another. It uses the Kafka Connect framework to simplify configuration and scaling. It dynamically detects changes to topics and ensures source and target topic properties are synchronized, including offsets and partitions. It supports topic renaming to enable more complex topologies and bidirectional flows, more than two clusters, and prevent loops. It provides end-to-end metrics including replication latency across multiple clusters. To make it easier for Kafka clients to detect and respond to failover and change clusters (e.g. for disaster recovery), it provides offsets and tooling for offset translation.

The main MM2 components are actually Kafka connectors as follows.

  • The MirrorSourceConnector replicates records from local to remote clusters and enables offset synchronization.
  • The MirrorCheckpointConnector manages consumer offset synchronization, emits checkpoints, and enables failover.
  • Finally, the MirrorHeartbeatConnector provides heartbeats, monitoring of replication flows, and client discovery of replication topologies (which can be more complex than for the original MirrorMaker).

Using Kafka connectors for MM2 is a clever idea. Normal Kafka connectors come in two flavors, source or sink connectors, and enable data flow either into or out of a Kafka cluster from external systems. But MM2 works across multiple Kafka clusters, so how does Kafka connect help? It turns out that MM2 connectors are special, and actually have a pair of Kafka clients working in tandem: a consumer (reading from the Kafka source cluster) and a producer (writing to the Kafka sink cluster). 

MM2 is, therefore, a significant improvement to the original MirrorMaker, particularly in its support for more flexible use cases (including geo-replication), scalability, and ease of use.

So why am I blogging MM2 now? First, I noticed that MM2 was in the news last year—I heard about it at the Kafka Summit, and in September 2020 Instaclustr’s Managed MirrorMaker 2.0 became available. I’ve also just finished a blog series building a data processing pipeline using Apache Kafka Connect, which provides a useful background to Instaclustr’s managed Kafka Connect clusters, also used by our managed MM2.

  • Source/upstream topic/cluster: Where the records are replicated from (the origin).
  • Sink/target/downstream topic/cluster: Where the records are replicated to (the destination).
  • Mirror flow: A unidirectional replication of topics from a source cluster to a target cluster.
  • Local/target cluster: For a Kafka connect cluster, the Kaka cluster is directly associated with.
  • Remote topic: The topic on the target cluster.
  • Remote cluster: Any non-local Kafka cluster used by a Kafka connector (source/upstream clusters). 
  • Active/active clusters: This is a database pattern for high availability (HA) architectures, but is also applicable to MM2. Each cluster is “active”, in that it has a local application that uses it for both writes and reads. However, it also ensures HA by replicating data between the clusters (bidirectional), so that both applications receive all messages produced in either cluster.
  • Active/passive clusters: This is a similar pattern, but has only one active application/cluster at a time. Data is replicated from the active to the passive cluster (unidirectional), and the application only switches to the passive cluster if the active cluster fails. This is a good explanation of these patterns for MM2.

For MM2, if a pattern is active/active or active/passive (or something else) depends on a combination of things including: how many flows; flow directions, topics replicated; and the application design, including client support for failover. That is, for active/active, there must be two flows, bidirectional, mirroring the same topics, and an application with producers and consumers running on each cluster using the mirrored topics. 

So, now that we have some basic vocabulary in place, let’s take a closer look at MM2.

4. How Does MM2 Work?

(Source: Shutterstock)

I started by formulating some “rules” to succinctly capture how MM2 works, initially from “first principles” (mainly from reading the Apache documentation and the Instaclustr MM2 support article).

MM2 RULES

RULE 0: Each mirror flow runs on a Kafka Connect cluster associated with a single target Kafka cluster

This rule just sets the context for the subsequent rules, as MM2 runs on top of Kafka connect. So logically you need a Kafka Connect cluster with an associated Kafka target cluster.

RULE 1: Unidirectional copying

Each MM2 replication flow is unidirectional and replicates topics from one source cluster to one target cluster, e.g. from cluster A to cluster B:

This is the basic flow required for an active/passive pattern. But it’s also used for patterns where data is simply being shared from one cluster to another, and both clusters can have applications running at the same time. 

RULE 2: Copies ONE or more topics

Each MM2 replication flow can be configured to replicate one or more topics. You can specify one or more topic names, or regular expressions for multiple topics, including all topics.

RULE 3: Each source topic is replicated to exactly one remote topic (one-to-one topic mapping)

Each source topic is replicated to exactly one remote topic (the destination topic on the target cluster). Records are also copied from/to the same partition.

RULE 4: Automatic remote topic creation

If the source topic doesn’t exist on the target cluster, it is created with the same configurations (e.g. number of partitions). But what is the new topic called?

RULE 5: Remote topic renaming

By default, remote topics are created with a modified name to the original source topic name. The name of the source cluster is prepended (with a “.” separator) to the original topic name. For example, a source topic, topic-1, is renamed as sourceCluster.topic-1 on the target cluster. I use this notation to represent this flow (topic-1 on sourceCluster is replicated to sourceCluster.topic-1 on targetCluster):

With replication flows between more than two clusters, e.g.

You can therefore end up with topic names like “clusterA.clusterB.topic-1”:

Also note that technically it is the “source cluster alias” that is used as the prefix, which by default is identical to the actual source cluster name (see below for complications that arise if you change it).

RULE 6: Multiple mirror flows

You can have multiple MM2 mirror flows, enabling complex topologies between more than two clusters, including bidirectional flows, e.g.

Fan out (1 source cluster, multiple target clusters):

Fan in (aggregation, multiple source clusters to 1 target cluster):

Pipe (forwarding from 1 cluster to another and so on):

Bidirectional:

Note that bidirectional flows can form the basis of an active/active pattern, but this requires the same topic to be replicated in both directions. If different topics are replicated in each direction then other patterns such as request/response can be supported (i.e. an application on clusterA sends a message to clusterB on topic toClusterB, and receives a reply with the result from clusterB on the topic toClusterA). 

Complex:

Arbitrary topologies can be created, but as complexity increases, you are likely to need the next rule.

RULE 7: No cycles

Because RULE 6 introduces the possibility of bidirectional flows between two clusters, there is the real possibility of infinite “reflections” (c.f. The “infinity mirror”, two parallel mirrors, which gives the illusion of infinite reflections). 

(Source: Shutterstock)

So, to prevent infinite event loops or cycles, MM2 has a trick using topic name filtering:

RULE 7: MM2 does not replicate source topics containing the name of the target cluster.

For example, assuming we have two clusters (A, B) both with topic-1, which has events being produced locally to it on each cluster. Consider the following bidirectional flows (without renaming):

An event produced on topic-1 (source cluster A) will be replicated by the first flow to the topic with the same name on cluster B. But the second flow will immediately replicate the event from cluster B back to cluster A. And because the topics have the same name, we are back at the start of the loop and the event will be copied infinitely backward and forwards between the two clusters. 

No cycles
(Source: Shutterstock)

Now consider the case where remote topic rename is turned on (the default), so that we instead have the following bidirectional flows:

This time, an event produced on topic-1 (source cluster A) will be replicated by the first flow to A.topic-1 on cluster B (indicating that it originated remotely not locally). The second flow is not triggered as the event didn’t appear in topic-1 on cluster B. However, once an event, locally produced on cluster B, appears in topic-1 on cluster B, then the 2nd flow performs the replication to the renamed B.topic-1 on cluster A. 

The topic filtering rule prevents X.topic-1 (on any cluster) from being replicated to a target cluster X, as the topic already contains the name of the target cluster, indicating that it originated from the target cluster, and a loop would be introduced if it were replicated back to the originating cluster. 

A not-s-obvious corollary of this rule is that automatically created remote topic names will only ever contain each cluster name once. So, A.B.A.topic-1 is not a possible topic name (but see below!), but A.B.C.topic-1 is (e.g. with a pipeline).

So these are my 8 Basic MM2 Rules. In Part 2, we’ll try them out and check if they are correct and complete, or not.