We have just hit full release of our Cassandra + Spark managed service offering after 4 months in preview release. During this time, we have had lots of opportunity to get in-depth with using the Cassandra connector for Spark, both with our own Instametrics application and assisting customers with developing and troubleshooting. In that process, we’ve learnt a few key lessons about how to get the best out of the Cassandra connector for Spark. This post captures some of those key lessons.
Tip 1: Filter early
A fairly obvious but important tip to start with – the more you can push filters down to Cassandra, and particularly where you can limit queries by partition key, the better. Include your filters as early as possible in your Spark statements and, if possible, specify all the components of the partition key in the filter statement. Also, if you are going to use a set of data from Cassandra more than once, make sure to use cache() to keep it in Spark memory rather than reading from Cassandra each time.
Tip 2: Be aware of your partitions
Once you start working with Spark and Cassandra together, there are two sets of partitions you need to be very aware of:
- Cassandra partitions – These are the unit at which Cassandra splits data across nodes and determine which Cassandra node your data is stored on. The Cassandra partition key is set when you define the schema for your Cassandra table. Each partition is contained on a single node (per replica). The number of partitions is determined by the cardinality of your partition key.
- Spark partitions – These are the unit at which Spark splits data (in memory) across workers. Spark partitions also determine the degree of parallelism that Spark can apply in processing data (each partition can be processed in parallel). The number of partitions and partition key can either be determined automatically by Spark or set explicitly (more below).
A fundamental principle in getting good performance with Spark is to align distribution of your Spark partitions with your Cassandra partitions and to have an appropriate number of Spark partitions to allow Spark to efficiently parallel process calculations.
In almost all cases, there will be a lot less Spark partitions than Cassandra partitions (i.e. a one to many mapping from Spark partitions to Cassandra partitions) but, as far as possible, you want Cassandra data to be read and written to/from the Cassandra node where the Spark partition resides. You also want enough Spark partitions that each partition will fit in available work memory and so that each processing a step for a partition is not excessively long running but not so many that each step is tiny, resulting in excessive overhead. The right number is highly dependant on the exact processing scenario but we’ve found around 30 partitions per worker to be a good ballpark for some of our scenarios.
The good news is that in many cases the Cassandra connector will take care of this for you automatically. When you use the Cassandra Spark connector’s cassandraTable() function to load data from Cassandra to Spark it will automatically create Spark partitions aligned to the Cassandra partition key. It will try to create an appropriate number of partitions by estimating the size of the table and dividing this by the parameter spark.cassandra.input.split.size_in_mb (64mb by default). (One instance where the default will need changing is if you have a small source table – in that case use withReadConf() to override the parameter.)
Where you need to be extra careful is when you are joining with a Cassandra table using a different partition key or doing multi-step processing. In this scenario you can start off with an appropriately sized set of partitions but then greatly change the size of your data, resulting in an inappropriate number of partitions.
The way to address this problem is to use repartitionByCassandraReplica() to resize and/or redistribute the data in the Spark partition. This will redistribute Spark’s in-memory copy of the the data to match the distribution of a specified Cassandra table and with a specified number of Spark partitions per worker (it requires that the Spark RDD has columns that match the specified Cassandra table).
Tip 3: Control Spark write speed for any bulk writes
By default, Spark will write to Cassandra as fast as it possibly can. For small bursts of writes, this can work OK but for extended periods (say more than 5 minutes or so and getting more significant after a couple of hours) this can result in the Cassandra nodes being overwhelmed and crashing.
The Spark Cassandra Connector provides three settings to control this behaviour:
- cassandra.output.batch.size.bytes (default: 1024): total data per batch
- cassandra.output.batch.size.rows (default: auto – batch size determined by size.byts): number of rows per batch
- cassandra.output.concurrent.writes (default: 5): maximum concurrent writes per Spark task
- cassandra.output.throughput_mb_per_sec (default: unlimited): maximum write throughput per core
Ideal tuning of these parameters will depend on your schema design and other factors. However, as a general guide we suggest starting with throughput_mb_per_sec set to 5 (mb per sec per core) on our m4.xl size nodes.
Tip 4: joinWithCassandraTable is a powerful tool
joinWithCassandraTable() is one of the most powerful functions provided by the Spark Cassandra connector. It allows you to perform a join between a Spark RDD (including a Cassandra table) and a Cassandra table based on the key of the Cassandra table. The Connector can execute this efficiently, pushing down selection to the Cassandra nodes.
As well as using this function to join data sets, it can also be used as an efficient way to filter a significant set of records from a Cassandra table. To do this, build an RDD with the keys of the records that you want to extract and then use joinWithCassandraTable() to join the RDD to the Cassandra table and retrieve the data from Cassandra. We have found this to be a very efficient way of selecting a set of data from Cassandra.
An example of how we have used this is to select a set of metrics readings for a list of hosts, metrics and time buckets). This is an example where we have found it necessary to explicitly control Spark partition creation – the definition of the partitions to be selected is a much smaller data set than the resulting extracted data. As a result, we use repartitionByCassandraReplica() to ensure the selection list is correctly partitioned to match the location of the source Cassandra data and to create an appropriate number of Spark partitions to receive the data that is to be selected from the source tables.
Tip 5: Use SparkSQL appropriately
SparkSQL is an excellent tool for ad-hoc analysis, giving you the expressiveness of SQL over big data data stores such as Cassandra. However, if you need to optimise for performance of a query (such as where you are running as a regular batch or as part of a Spark Streaming job) then SparkSQL may not provide the level of control that you need for the optimum results. In these cases, getting familiar with the Spark and Cassandra connector APIs is likely to pay dividends in efficiency of execution.
Bonus tip – read the docs and use the UI!
A lot of information is contained in Spark Cassandra Connector documentation located on the Cassandra Connector GitHub page (https://github.com/datastax/spark-cassandra-connector). In particular, the FAQ page at the end of the docs contains a lot of useful information that is easy to overlook.
Also, the Spark UI has a lot of information that can help with the tuning described above. Stay tuned for a future blog post touring the key features of the UI.