Overview
Recently, I needed to work my way through the details of how batch transactions are processed in Cassandra and also how they affect exposed metrics. This article outlines the workflow of submitting a batch via the Java Cassandra driver and will hopefully be of use to others interested in this process.
This article summarises the process that Cassandra uses to action a BATCH statement (either logged or unlogged). It also details how BATCH transactions will affect exposed metrics (e.g. WriteLatency count).
Workflow
- User submits a BATCH transaction to a coordinator node e.g.
1234BEGIN BATCHINSERT INTO test1.test1 (id, time, message) VALUES (2, '2016-09-14 10:00:00', 'logged batch test - single partition');INSERT INTO test1.test1 (id, time, message) VALUES (2, '2016-09-14 10:05:00', 'logged batch test - single partition 2');APPLY BATCH;
If performed using CQLSH, then the coordinator is whichever node CQLSH connects to. If using the Java driver, if using SimpleStatement objects to populate the Batch, then a routing key cannot be automatically determined and would need to be manually calculated and set. If using PreparedStatements however, then “the first non-null keyspace is used as the keyspace of the batch, and the first non-null routing key as its routing key” (i.e. assumes all statements in the BATCH affect a single partition). - Batch statements converted to Mutations
Within a batch, queries for the same partition key are rolled up into a single mutation (e.g. in the example above, assuming id is the primary key and time is the clustering key, then both statements would be combined into a single Mutation). This also provides row-level isolation, for multiple queries affecting the same partition.This means that for metrics, the replica nodes will record a single local write, per partition mutation (for partitions that they manage). - Logged batch
Logged batches provide atomicity; all Mutations in the batch will be run until the entire batch has completed successfully. This has a performance cost on the coordinator (and potentially backup batchlog nodes).- Coordinator sends blob (group of Mutations) to up 2 other nodes
Once the query statements have been parsed into Mutations, the coordinator sends that blob as a batchlog record to up to 2 other nodes. Cassandra will prefer that these nodes are in different racks to the coordinator, but within the same DC. These batchlog records are written to the system.batchlog or system.batches (batchlog is the legacy name) table on the nodes. The system keyspace uses LocalStrategy, so is individual for each node.In my testing, for a given coordinator, Cassandra will always pick the same nodes to store the batchlog backup. The election process is randomized (see EndpointFilter in BatchlogManager) however, out of 6 BATCHes, the same node was chosen each time. - Coordinator processes each Mutation
Once the backup nodes have received the batchlogs, the coordinator actually starts processing each Mutation. - Coordinator deletes batchlog record
Finally, the coordinator deletes the batchlog entry after all the Mutations have been successfully processed (either replicas have acknowledged or hints have been written). - If a statement in the BATCH fails
A hint is written, but the BATCH itself is not “failed”. - If the coordinator goes down, after writing batchlog
Because up to 2 other nodes have the batchlog, they will run the Mutations contained in the batchlog record 10 seconds after it was created (i.e. wait long for enough that the coordinator should have actioned the batch. If the coordinator hasn’t deleted the batchlog record within this time, then something must have gone wrong, so re-run it). This setup relies on the statements within the BATCH being idempotent (timestamps are critical to this).
- Coordinator sends blob (group of Mutations) to up 2 other nodes
- Unlogged batch
- Coordinator processes each Mutation
As opposed to logged batches, the coordinator skips writing backup batchlogs to other nodes and moves directly to processing each Mutation.
- Coordinator processes each Mutation
Metrics
As of version 3.7, only the org.apache.cassandra.db:type=BatchlogManager/TotalBatchesReplayed/Count
metric is made available.
However, when a BATCH is processed it will also:
- The coordinator node will increment
org.apache.cassandra.metrics:type=ClientRequest,scope=Write,name=Latency/Count
by 1. This is regardless of the number of Mutations actually generated by the BATCH or how many nodes the coordinator has to coordinate. - Each replica node (including the coordinator, if relevant) will increment
org.apache.cassandra.metrics:type=Table,keyspace=<keyspace>,scope=<table>,name=WriteLatency
for each Mutation that it processes (i.e. each partition that that replica is responsible for). - For a logged batch, there will also be corresponding local WriteLatency increments for the batches table, for each node that stores a copy of the batchlog.
Example
Given the following schema:
1 2 3 4 5 6 7 8 |
CREATE KEYSPACE test1 WITH replication = {'class': 'NetworkTopologyStrategy', 'dc1': '1', 'dc2': '1'} AND durable_writes = true; CREATE TABLE test1.test1 ( id int, time timestamp, message text, PRIMARY KEY (id, time) ) WITH CLUSTERING ORDER BY (time ASC); |
and the following cluster:
- DC1
- 127.0.0.11
- 127.0.0.12 (will use this as the coordinator)
- DC2
- 127.0.0.13
- 127.0.0.14
and the following token assignments (retrieved by running ccm node1 nodetool getendpoints test1 test1 <partition key>
):
- 1 = 127.0.0.12, 127.0.0.13
- 2 = 127.0.0.11, 127.0.0.14
- 3 = 127.0.0.12, 127.0.0.13
- 4 = 127.0.0.11, 127.0.0.13
- 5 = 127.0.0.11, 127.0.0.14
Running a batch (logged or unlogged) for a single partition key on the coordinator will increment the following metrics:
1 2 3 4 |
BEGIN BATCH INSERT INTO test1.test1 (id, time, message) VALUES (1, '2016-09-14 10:00:00', 'logged batch test - single partition'); INSERT INTO test1.test1 (id, time, message) VALUES (1, '2016-09-14 10:05:00', 'logged batch test - single partition 2'); APPLY BATCH; |
127.0.0.12 | 127.0.0.13 | |
---|---|---|
org.apache.cassandra.metrics:type=ClientRequest,scope=Write,name=Latency | +1 | 0 |
org.apache.cassandra.metrics:type=Table,keyspace=test1,scope=test1,name=WriteLatency | +1 | +1 |
Running a batch for multiple partition keys, managed by the coordinator and other nodes:
1 2 3 4 5 6 7 8 |
BEGIN BATCH INSERT INTO test1.test1 (id, time, message) VALUES (1, '2016-09-14 10:00:00', 'logged batch test - single partition'); INSERT INTO test1.test1 (id, time, message) VALUES (1, '2016-09-14 10:05:00', 'logged batch test - single partition 2'); INSERT INTO test1.test1 (id, time, message) VALUES (2, '2016-09-14 10:00:00', 'logged batch test - single partition'); INSERT INTO test1.test1 (id, time, message) VALUES (2, '2016-09-14 10:05:00', 'logged batch test - single partition 2'); INSERT INTO test1.test1 (id, time, message) VALUES (4, '2016-09-14 10:00:00', 'logged batch test - single partition'); INSERT INTO test1.test1 (id, time, message) VALUES (4, '2016-09-14 10:05:00', 'logged batch test - single partition 2'); APPLY BATCH; |
127.0.0.11 | 127.0.0.12 | 127.0.0.13 | 127.0.0.14 | |
---|---|---|---|---|
org.apache.cassandra.metrics:type=ClientRequest,scope=Write,name=Latency | 0 | +1 | 0 | 0 |
org.apache.cassandra.metrics:type=Table,keyspace=test1,scope=test1,name=WriteLatency | +2 | +1 | +2 | +1 |
Use Cases
General consensus is that using unlogged batches of multiple queries affecting the same partition AND routed to a replica node as the coordinator, may provide performance gains over async queries.
For this reason, Cassandra does not apply BATCH size warning and failure thresholds (batch_size_warn_threshold_in_kb / batch_size_fail_threshold_in_kb) to batches that evaluate to one mutation.