In previous posts, we’ve described our Instametrics solution which provides the monitoring services for the close to 1,000 instances we manage. We’ve always seen this solution as an important proving ground for new technology that we are providing to our customers and so have recently upgraded from Cassandra 2.1.13 to our Cassandra 3.7 LTS (Long Term Support) release and updated our Spark-based monitoring roll-up jobs to take advantage of the new aggregation features in Cassandra 3.
Overall, the upgrade has been a big success – we have reduced runtime of our regular roll-up jobs by over 50% and decreased disk usage by ~30%. While pre-change testing and planning took some considerable work, the changes themselves went smoothly and with zero downtime.
We undertook the upgrade in two phases: firstly, upgrading the cluster from 2.1.13 to 3.7, and then modifying our roll-up jobs to take advantage of the aggregation features of Cassandra 3.0.
Our main concern for the initial upgrade to 3.7 was ensuring that we did not impact the runtime of our regular (5 minute, 1 hour, 1 day) roll-up jobs (which were already taking ~5-6 minutes each) and, of course, verifying stability under a sustained load. To check this we did sustained benchmarking in a (50% size) test environment for:
- Running our load on 2.1.13.
- Running our load in a mixed 2.1.13 / 3.7 cluster.
- Running our load in an fully upgraded 3.7 cluster before we ran upgradesstables to convert the files on disk to 3.7 format.
- Running our load while upgradesstables was running on one or more nodes (up to one full rack at a time).
- Running our load after all sstables were upgraded.
Seeing all these scenarios listed out, you can see why the pre-release testing took 2-3 weeks!
The results of this testing was that we didn’t find any significant performance changes for our use case in any of the stages. This was perhaps a little better than what we were expecting for step 4 (upgrade sstables) in particular, and a little bit disappointing for the end result as we were hoping to see some performance improvement (although read on to the next stage for the good news). However we did see a reduction in disk usage of ~30% due to the new storage format in Cassandra 3.x which was definitely welcome.
The actual upgrade process, using our docker-based provisioning system, across our production cluster took less than a couple of hours to change the running version and then few days to run upgradesstables across all nodes. We did not see any production impact during this time.
The next step for us was to look at our Apache Spark based roll-up jobs to see what improvements we could make given the growth in our base of managed nodes was stretching the capacity of the existing cluster. Detailed investigation lead us to discover the job was unexpectedly (to us) shuffling data when aggregating. We spent quite some time investigating methods to ensure the Cassandra partitioner was being passed around with our RDDs (see ‘Spark Cassandra Connector and Spark Partitioners’) before deciding to investigate using Cassandra aggregates.
Using Cassandra aggregates with the Spark Cassandra Connector turned out to be a perfect fit for our use case – we were already using joinWithCassandra table to select the data from our base table and this results in the connector issuing one query per source (left hand side) row which corresponds to the level we want to aggregate to. The built-in Cassandra aggregate functions (which aggregate across all returned data) therefore do what we want as the Connector is issuing one query for every result row. Each query is executed locally on a node that holds the data to be rolled up due to the repartitionByCassandraReplica call and the fact we are rolling up to a level that corresponds to the partition key of our raw data table.
We replaced the set of code which previously extracted the raw data from our metrics table (for later roll-up by Spark) with a set of code that called aggregate functions to roll up the data in Cassandra before extracting. This code uses the FunctionCallRef functionality of the Spark Cassandra Connector to call the Cassandra built-in aggregate functions as shown below:
val RDDJoin = sc.cassandraTable[(String, String)]("instametrics" , "service_per_host")
.filter(a => broadcastListEventAll.value.map(r => a._2.matches(r)).foldLeft(false)(_ || _))
.map(a => (a._1, dateBucket, a._2))
.repartitionByCassandraReplica("instametrics", "events_raw_5m", 100)
SomeColumns("time", "state", FunctionCallRef("avg", Seq(Right("metric")), Some("avg")),
FunctionCallRef("max", Seq(Right("metric")), Some("max")), FunctionCallRef("min", Seq(Right("metric")),
The impact of this was significant: our jobs went from a typical runtime of 5 to 6 minutes to a typical runtime of 2.5 to 3 minutes – a 50% reduction. At the same time, the average CPU load on the cluster was significantly reduced and our confidence that processing capacity will scale linearly with cluster growth is significantly higher.
Overall, we have found 3.7 to be quite stable in production. Anecdotally, we think we have seen more issues with repairs than we saw with 2.1.13 although we are currently deploying some recently released patches which we expect will improve that. Also, other than aggregates, we don’t have a requirement to use the new 3.x feature such as materialized views (although we do have customers using them successfully). We will be transitioning to Time Windowed Compaction Strategy once that is backported to our 3.7 LTS release.