NetApp announces intent to acquire Instaclustr Read the announcement
Instaclustr Spark with SSL configured Apache Cassandra Cluster

A common setup for a Cassandra cluster is to enable client encryption. In order to utilize Spark with these clusters, additional steps must be taken when submitting jobs to configure the Spark Cassandra connector to use SSL. In this guide, we will go through these steps and attempt to clarify the configuration properties used.

As a prerequisite to this guide, the user should have provisioned and configured a cluster with both Cassandra and Spark. You can find the details on how to do this in sections 1, 2 and 3 of the following article. Getting Started with Instaclustr Spark & Cassandra. Remember to select Client ⇄ Node Encryption to enable client encryption when creating the cluster. This option is not available for Developer node size, so you must select a Production node size.

Table of Contents

Download Truststore File

You will need to download the Certificates for the cluster from the Connection info page for your cluster.

In the downloaded zip, you will find a Java Key Store file called truststore.jks. This file needs to be included as a resource in the assembled jar in a later step.

Creating and Submitting a Scala Job with SSL Cassandra Connection

In this step of the tutorial, we will demonstrate how to build and submit a Scala job. This is useful where you wish to create a job and submit it multiple times.

  1. Log in to your Spark client machine
  2. Create required directories for your project:
    mkdir ~/cassandra-count
    cd cassandra-count
    mkdir -p src/main/scala
    mkdir project
    mkdir -p src/main/java
    mkdir -p src/main/resources
  3. Create a file called build.sbt in the cassandra-count directory with the following contents (note: the blank lines are important):
    name := "cassandra-count"
    version := "1.0"
    scalaVersion := "2.11.8"
    libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.1" % "provided"
    libraryDependencies += "com.datastax.spark" %% "spark-cassandra-connector" % "2.0.2"
    libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.1.1" % "provided"
    assemblyMergeStrategy in assembly := {
    case PathList("META-INF", "") => MergeStrategy.last
    case x =>
    val old = (assemblyMergeStrategy in assembly).value
  4. Create a file called assembly.sbt in the cassandra-count/project directory with the following contents (this will include required dependencies in the output jars):
    addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.5")
  5. Create a file called cassandra-count.scala in the cassandra-count/src/main/scala directory with the following contents:
    import org.apache.spark.SparkContext
    import org.apache.spark.SparkConf
    import com.datastax.spark.connector._
    object cassandraCount {
      def main(args: Array[String]) {
    	// 1. Create a conf for the Spark context
    	// In this example, Spark master and Cassandra nodes info are provided in a separate count.conf file.
    	val conf = new SparkConf().setAppName("Counting rows of a cassandra table")
    	// 2. Create a Spark context.
    	val sc = new SparkContext(conf)
    	// 3. Create an rdd that connects to the Cassandra table "schema_keyspaces" of the keyspace "system_schema".
    	val rdd = sc.cassandraTable("system_schema", "keyspaces")
    	// 4. Count the number of rows.
    	val num_row = rdd.count()
    	println("nn Number of rows in system_schema.keyspaces: " + num_row + "nn")
    	// 5. Stop the Sspark context.
  6. In order for Spark to connect to Cassandra using SSL, an appropriate SSL Context needs to be created on the Spark driver and all the executors. This is achieved via providing SSL specific properties to the Spark Cassandra connector. Using the default factory the path to the truststore file needs to be valid for the driver and executors. This can be restrictive. An alternative is to create a custom connector. Next, we are going to create a custom Cassandra connection class which treats the trust store path property as a resource path rather than a file path. This allows the reading of the trust store from a resource inside the assembled jar. Create a file called in the cassandra-count/src/main/java directory with the following contents:
    import com.datastax.driver.core.Cluster;
    import com.datastax.driver.core.JdkSSLOptions;
    import com.datastax.driver.core.SSLOptions;
    import com.datastax.driver.core.SocketOptions;
    import com.datastax.driver.core.policies.ExponentialReconnectionPolicy;
    import com.datastax.spark.connector.cql.CassandraConnectionFactory;
    import com.datastax.spark.connector.cql.CassandraConnectorConf;
    import com.datastax.spark.connector.cql.LocalNodeFirstLoadBalancingPolicy;
    import com.datastax.spark.connector.cql.MultipleRetryPolicy;
    import scala.collection.immutable.HashSet;
    import scala.collection.immutable.Set;
    import scala.reflect.ClassTag;
    import java.util.ArrayList;
    import java.util.List;
    import com.datastax.spark.connector.cql.*;
    import com.datastax.spark.connector.cql.Scanner;
    import scala.collection.JavaConversions.*;
    import scala.collection.IndexedSeq;
    import com.datastax.spark.connector.rdd.ReadConf;
    public class CustomCassandraConnectionFactory implements CassandraConnectionFactory {
        public Cluster createCluster(CassandraConnectorConf conf)  {
            try {
                return clusterBuilder (conf).build();
            } catch (Exception e) {
                throw new RuntimeException(e);
        public Set<String> properties() {
            try {
                return new HashSet<String>();
            } catch (Exception e) {
                throw new RuntimeException(e);
        public Scanner getScanner (ReadConf readConf, CassandraConnectorConf connConf, IndexedSeq<String> columnNames){
             return new DefaultScanner(readConf, connConf, columnNames);
        private Cluster.Builder clusterBuilder(CassandraConnectorConf conf) throws CertificateException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException, IOException {
            SocketOptions socketOptions = new SocketOptions();
            List<Inet4Address> hosts = new ArrayList<Inet4Address>();
            scala.collection.Iterator iter = conf.hosts().toIterator();
            while (iter.hasNext()) {
                Inet4Address a = (Inet4Address);
            Cluster.Builder builder = Cluster.builder()
                    .addContactPoints(hosts.toArray(new Inet4Address[0]))
                            new MultipleRetryPolicy(conf.queryRetryCount()))
                            new ExponentialReconnectionPolicy(conf.minReconnectionDelayMillis(), conf.maxReconnectionDelayMillis()))
                            new LocalNodeFirstLoadBalancingPolicy(conf.hosts(), conf.localDC(), true))
            if (conf.cassandraSSLConf().enabled()) {
                SSLOptions options = createSSLOPtions(conf.cassandraSSLConf());
                if (null != options) {
                    builder = builder.withSSL(options);
                } else {
                    builder = builder.withSSL();
            return builder;
        SSLOptions createSSLOPtions (CassandraConnectorConf.CassandraSSLConf conf) throws KeyStoreException, IOException, CertificateException, NoSuchAlgorithmException, KeyManagementException {
    	if (conf.trustStorePath().isEmpty()) {
                return null;
            try (InputStream trustStore = this.getClass().getClassLoader().getResourceAsStream(conf.trustStorePath().get())) {
                    KeyStore keyStore = KeyStore.getInstance(conf.trustStoreType());
                    keyStore.load(trustStore, conf.trustStorePassword().isDefined() ? conf.trustStorePassword().get().toCharArray() : null);
                    TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
                    SSLContext context = SSLContext.getInstance(conf.protocol());
                    context.init(null, tmf.getTrustManagers(), new SecureRandom());
                    ClassTag<String> tag = scala.reflect.ClassTag$.MODULE$.apply(String.class);
                    return JdkSSLOptions.builder()
                            .withCipherSuites((String[]) conf.enabledAlgorithms().toArray(tag)).build();
  7. Copy the trust store file downloaded in the earlier step to the cassandra-count/src/main/resources directory.
  8. Additional Properties are needed to set up the connection for the SSL connection to Cassandra
    Property NameDescription
    spark.cassandra.connection.ssl.enabledA boolean switch to indicate whether the connection to Cassandra should use SSL
    spark.cassandra.connection.ssl.trustStore.passwordThe password matching the Trust Store
    spark.cassandra.connection.ssl.trustStore.path/td>The path to the trust store file. With the Custom Factory in this example, this is a path to a resource instead
    spark.cassandra.connection.factory For overriding the behaviour of the default Spark Cassandra Connector. When used it should be the name of the class that implements CassandraConnectionFactory. Details of this class can be found at the DataStax Spark Cassandra Connector page at GitHub

    Create a file called cassandra-count.conf in the cassandra-count directory (this file contains the configuration that will be used when we submit the job):

    spark.master spark://<spark_master_private_IP1>:7077,<spark_master_private_IP2>:7077,<spark_master_private_IP3>:7077
    spark.executor.memory 1g <private IP of a cassandra node>
    spark.cassandra.auth.username iccassandra
    spark.cassandra.auth.password <iccassandra password>
    spark.serializer org.apache.spark.serializer.KryoSerializer
    spark.eventLog.enabled true
    spark.eventLog.dir .
    spark.cassandra.connection.ssl.enabled true
    spark.cassandra.connection.ssl.trustStore.password <trust store password>
    spark.cassandra.connection.ssl.trustStore.path truststore.jks
    spark.cassandra.connection.factory CustomCassandraConnectionFactory
  9. Build the job (from cassandra-count directory):
    sbt assembly
  10. Submit the job (from cassandra-count directory):
    ~/spark-2.1.1-bin-hadoop2.6/bin/spark-submit --properties-file cassandra-count.conf --class cassandraCount target/scala-2.11/cassandra-count-assembly-1.0.jar
  11. You should see a lot of log messages with the row count message about 15 messages from the end. And you should see this output:

Using Spark Shell

Connecting to Cassandra via SSL when using Spark Shell is achieved in the same fashion as Spark Submit. The jar containing the custom connection factory and trust store resource must be added to the list of jar files. The same configuration properties used to set up the context for the SSL connection must also be specified. Below is an example Spark Shell Command:

cd ~/spark-2.1.1-bin-hadoop2.6/bin
./spark-shell --master spark://<spark_master_IP1>:7077,<spark_master_IP2>:7077,<spark_master_IP1>:7077  --conf <private IP  of a cassandra node>  --conf spark.cassandra.auth.username=iccassandra  --conf spark.cassandra.auth.password= <iccassandra password> --jars ~/spark-cassandra-connector-assembly-2.0.2.jar,$HOME/cassandra-count/target/scala-2.11/cassandra-count-assembly-1.0.jar --conf spark.cassandra.connection.ssl.enabled=true --conf spark.cassandra.connection.ssl.trustStore.password=<trust store password> --conf spark.cassandra.connection.ssl.trustStore.path=truststore.jks --conf spark.cassandra.connection.factory=CustomCassandraConnectionFactory

Further Resources

You can find the source code used in this guide at this GitHub page.

By Instaclustr Support
Need Support?
Experiencing difficulties on the website or console?
Already have an account?
Need help with your cluster?
Contact Support
Why sign up?
To experience the ease of creating and managing clusters via the Instaclustr Console
Spin up a cluster in minutes