Speculative query executions in Cassandra are an interesting solution for some use cases, such as faulty/slow/unresponsive nodes or network issues. This kind of execution allows a client to launch a DB request for multiple endpoints at the same time and let these requests compete for the winning response. This would only work if the query itself is defined as idempotent, that is, it renders the same result no matter how many times it is run. We’ve written about idempotence and speculative query execution before, so head over https://www.instaclustr.com/query-idempotence-in-gocql-driver/ to refresh your memory if needed.
In most cases, using speculative query execution would not lead to performance improvement in normal execution (even though in some edge cases it might), but improve the reliability of queries to get a response from the server. In other cases, it might improve overall execution time for queries, as getting the response from the fastest node does save time. One should also remember, that while this can improve the reliability of the queries or overall execution time, it will be at the cost of using more resources (mostly CPU/Network) to do so.
A few use cases for using speculative queries:
- A node you are querying is down and your SLAs require a response from the server within a timeframe that is lower than Cassandra timeout.
- A node is flaky, leading to inconsistent response times or dropped queries.
- A node is returning timeout errors requiring a client application to retry the query on another node.
Speculative query execution has been a feature of the Java driver for quite some time now, and we have recently included similar functionality into the GoCql driver. As mentioned above, it allows a user to run the same query on multiple hosts at the same time, letting the executions to compete for the winning spot. The first query to get a response will win and be returned to the client.
In order to use the speculative query execution with this change, one must define the query as Idempotent and provide an execution policy:
1 2 3 4 5 6 |
... cluster := gocql.NewCluster("192.168.1.1", "192.168.1.2", "192.168.1.3") sp := &SimpleSpeculativeExecution{NumAttempts: 1, TimeoutDelay: 200 * time.Millisecond} session, err := cluster.CreateSession() // Build the query qry := session.Query("speculative").SetSpeculativeExecutionPolicy(sp).Idempotent(true) |
As can be seen from the example above, we’ve used a SimpleSpeculativeExecution policy, which is the one implemented in the driver. It is a very simple policy, one that defines the number of additional executions (that is in addition to the original request), and the constant delay between these executions. One could implement their own policy easily for example, to have a policy that pauses incrementally longer between additional executions, one could build the following policy:
1 2 3 4 5 6 7 8 9 10 |
type IncreasingSpeculativeExecution struct { NumAttempts int TimeoutDelay time.Duration } func (sp *IncreasingSpeculativeExecution) Attempts() int { return sp.NumAttempts } func (sp *IncreasingSpeculativeExecution) Delay() time.Duration { sp.TimeoutDelay += 50 * time.Millisecond return sp.TimeoutDelay } |
And then use it in the query execution:
1 2 3 4 5 6 7 |
... cluster := gocql.NewCluster("192.168.1.1", "192.168.1.2", "192.168.1.3") sp := &IncreasingSpeculativeExecution{NumAttempts: 1, TimeoutDelay: 200 * time.Millisecond} session, err := cluster.CreateSession() // Build the query qry := session.Query("speculative").SetSpeculativeExecutionPolicy(sp).Idempotent(true) ….. |
To show an example for using speculative query executions, we’ll use a 3-node Cassandra cluster. The use case that we’re going to explore is going to be a slow node, which we are going to simulate using a simple tc tool that comes as a part of the iproute2 package. Our example is going to be a bit extreme, but hopefully, it conveys the idea of when speculative queries might be useful.
To simulate a slow node, run the following command on one of the nodes:
1 |
sudo tc qdisc add dev eth0 root netem delay 250ms |
This adds 250ms delay to all outbound packets for the given physical device (eth0 in the above case). Then we use the following client code to run the tests:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 |
/* Before you execute the program, Launch `cqlsh` and execute: create keyspace example with replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 3 }; create table example.tweet(timeline text, id UUID, text int, PRIMARY KEY(id)); create index on example.tweet(timeline); */ package main import ( "context" "flag" "fmt" "log" "math/rand" "time" "github.com/gocql/gocql" ) type hostMetrics struct { attempts int latency int } // The observer type to watch the queries data type testQueryObserver struct { metrics map[string]*hostMetrics verbose bool } func (o *testQueryObserver) ObserveQuery(ctx context.Context, q gocql.ObservedQuery) { host := q.Host.ConnectAddress().String() curMetric := o.metrics[host] curAttempts := 0 curLatency := 0 if curMetric != nil { curAttempts = curMetric.attempts curLatency = curMetric.latency } if q.Err == nil { o.metrics[host] = &hostMetrics{attempts: q.Metrics.Attempts + curAttempts, latency: curLatency + int(q.Metrics.TotalLatency/1000000)} } if o.verbose { fmt.Printf("Observed query %q. Returned %v rows, took %v on host %q with %v attempts and total latency %v. Error: %q\n", q.Statement, q.Rows, q.End.Sub(q.Start), host, q.Metrics.Attempts, q.Metrics.TotalLatency, q.Err) } } func (o *testQueryObserver) GetMetrics() { for h, m := range o.metrics { fmt.Printf("Host: %s, Attempts: %v, Avg Latency: %vms\n", h, m.attempts, m.latency/m.attempts) } } // Simple retry policy for attempting the connection to 1 host only per query type RT struct { num int } func (rt *RT) Attempt(q gocql.RetryableQuery) bool { return q.Attempts() <= rt.num } func (rt *RT) GetRetryType(err error) gocql.RetryType { return gocql.Rethrow } func main() { specExec := flag.Bool("specExec", false, "Speculative execution") flag.Parse() // the number of entries to insert cycles := 10000 // connect to the cluster cluster := gocql.NewCluster("...") cluster.Keyspace = "example" // the timeout of one of the nodes is very high, so let’s make sure we wait long enough cluster.Timeout = 10 * time.Second cluster.RetryPolicy = &RT{num: 3} session, err := cluster.CreateSession() if err != nil { log.Fatal(err) } defer session.Close() observer := &testQueryObserver{metrics: make(map[string]*hostMetrics), verbose: false} rand.Seed(time.Now().UnixNano()) for i := 0; i < cycles; i = i + 1 { r := rand.Intn(10000) u, _ := gocql.RandomUUID() query := session.Query(`INSERT INTO example.tweet (id, timeline, data) VALUES (?, 'me', ?)`, u, r).Observer(observer) // Create speculative execution policy with the timeout delay between following executions set to 10ms sp := &gocql.SimpleSpeculativeExecution{NumAttempts: 2, TimeoutDelay: 10 * time.Millisecond} // Specifically set Idempotence to either true or false to constrol normal/speculative execution query.SetSpeculativeExecutionPolicy(sp).Idempotent(*specExec) query.Exec() } // wait a sec before everything finishes <-time.After(1 * time.Second) // Print results fmt.Println("\n==========\n") observer.GetMetrics() } |
This code is also available at https://github.com/instaclustr/sample-GoCql-Speculative-Execution.
This client code will insert 10000 entries into the cluster. As we’re using random numbers for the key column (id), all the queries are expected to be distributed more or less evenly among the nodes. Now, when we start the cluster and execute the client, we notice the following:
1 2 3 4 5 6 7 8 9 10 11 |
admin@ip-10-0-17-222:~/go$ time go run spectest.go ========== Host1: <ip>, Attempts: 3334, Avg Latency: 502ms Host2: <ip> , Attempts: 3333, Avg Latency: 2ms Host3: <ip>, Attempts: 3333, Avg Latency: 2ms real 28m21.859s user 0m2.920s sys 0m1.828s |
So it takes about half an hour to run the queries because one of the nodes has a constant delay of about half a second. When running with speculative execution mode, we get:
1 2 3 4 5 6 7 8 9 10 |
admin@ip-10-0-17-222:~/go$ time go run spectest.go --specExec ========== Host2: <ip>, Attempts: 5000, Avg Latency: 1ms Host3: <ip>, Attempts: 4999, Avg Latency: 2ms real 1m24.493s user 0m3.900s sys 0m3.072s |
Not only we don’t ever see the “problematic” node responding to the query, but all queries are also split between the 2 other “fast” nodes, taking only about a 1.5 minutes to complete.
As for the overhead of this improvement, there’s a jump in open sockets while running in speculative mode vs non-speculative:
As can be seen, the number of open sockets jumps from about 147 to 153 (about 5% jump). There are no significant increases in CPU, memory or io utilisation.
This might be considered a good trade-off, but remember that this is what happens on a single, very simple client on a performant host (m4-2xlarge) that only ran this test. In other use cases, such as smaller instance, busy client, or client running many queries in parallel, this may have a more significant effect on system resources utilisation. As a result, it is highly recommended to test the benefits of using speculative queries in your pre-production environment before deploying in production.
After this feature was released, a few issues were identified and have already been fixed, thanks to the community of contributors. It is in good shape now and is ready for general use. Enhancements planned for this feature include running the query observer (useful for monitoring query executions) in the main goroutine, which will make it more convenient to debug failures in the client code.