Spark and Cassandra Cluster Data Sampling
This tutorial describes how you can use Apache Spark and Zeppelin as part of an Instaclustr-managed cluster to extract and sample data from one cluster and write to another cluster.
Table of Contents
Prerequisites
- At least two clusters running in Instaclustr. In this tutorial, the cluster from which we read data is called “source cluster” and the cluster to which we write the data is called “target cluster”.
- The target cluster is provisioned with Zeppelin and Spark.
- The keyspace of the target table must be identical to that of the source table (table names can be different).
Configure Network Access
As the Spark in your target cluster needs to connect to your source cluster to read data, the public IP addresses of the nodes in your target cluster need to be added into the “Cassandra Allowed Addresses” of your source cluster. The detailed steps are as follows:
- Open your source cluster dashboard page.
- Click “Settings” panel.
- Add the public IP addresses of your target cluster nodes to “Cassandra Allowed Addresses”
- Click “Save Cluster Settings”.
Create Table Definition on Source Cassandra Cluster and Target Cassandra Cluster
- Check the public IP address of your source cluster node.
- Open a terminal.
- Make sure cqlsh is installed on your system.
- Execute:1cqlsh <public IP address of source cluster>
- Change to instaclustr keyspace:1use instaclustr;
- Create a table called “users”:12345678CREATE TABLE users (userid text Primary Key,first_name text,last_name text,emails set<text>,top_scores list<int>,todo map<timestamp, text>);
- Insert test data:12345678910111213141516171819INSERT INTO users(userid, first_name, last_name) VALUES (‘1’, ’f_name_src', ’l_name_src’);INSERT INTO users(userid, first_name, last_name) VALUES (‘2’, ’f_name_src', ’l_name_src’);INSERT INTO users(userid, first_name, last_name) VALUES (‘3’, ’f_name_src', ’l_name_src’);INSERT INTO users(userid, first_name, last_name) VALUES (‘4’, ’f_name_src', ’l_name_src’);INSERT INTO users(userid, first_name, last_name) VALUES (‘5’, ’f_name_src', ’l_name_src’);INSERT INTO users(userid, first_name, last_name) VALUES (‘6’, ’f_name_src', ’l_name_src’);INSERT INTO users(userid, first_name, last_name) VALUES (‘7’, ’f_name_src', ’l_name_src’);INSERT INTO users(userid, first_name, last_name) VALUES (‘8’, ’f_name_src', ’l_name_src’);INSERT INTO users(userid, first_name, last_name) VALUES (‘9’, ’f_name_src', ’l_name_src’);INSERT INTO users(userid, first_name, last_name) VALUES (‘10’, ’f_name_src', ’l_name_src’);
- Execute “quit” to exit the Cassandra environment.
- Check the public IP addresses of your target cluster nodes.
- Execute:1cqlsh <public IP address of target cluster>
- Change to instaclustr keyspace:1use instaclustr;
- Create a table called “users”:123CREATE TABLE users (userid text PRIMARY KEY);
- Execute “select” CQL command:1SELECT * FROM users;
The result should be empty.
- Keep this window until the end of step 4.
Sample and Load Data
- Open the dashboard page of your target cluster.
- Open “Details” panel and click “Zeppelin” button, then you will see Zeppelin webpage opened through your web browser.
- Create a new notebook by clicking the “Notebook” button on the home page of Zeppelin.
- Put the following code in the first paragraph to load dependencies.123%depz.load("/opt/zeppelin/interpreter/spark/spark-cassandra-connector-assembly-2.0.2.jar")
- Use the following spark code in the next paragraph to sample12345678910111213141516%sparkimport com.datastax.spark.connector._import com.datastax.spark.connector.cql._import org.apache.spark.SparkContextval sourceCluster = CassandraConnector(sc.getConf.set("spark.cassandra.connection.host", "<public IP of nodes in source cluster>").set("spark.cassandra.auth.username","<user name of source cluster>") .set("spark.cassandra.auth.password","<password of source cluster>"))val rddFromSourceCluster = {implicit val c = sourceCluster // connect to source cluster in this code block.sc.cassandraTable("<source keyspace>","<source table>").select("PK column”).sample(false,0.1)// sample data from source cluster. // sample 10% data from source table}rddFromSourceCluster.saveToCassandra("<target keyspace>", "<target table>") //save data to local cassandra
For a large dataset, it is very time consuming to extract the whole dataset into Spark and then sample data on Spark. To make it more efficient, the method used in the above example is sampling partition key and joining the sampled partition key with the source table, which avoids pulling the complete data set down to spark.
- Check the result on target Cassandra Cluster.
- Go back to the terminal environment.
- Execute “select” CQL command again:1SELECT * FROM users;
The result should be as following:
SSL Connection
If encryption is enabled in your source cluster, you will need to contact support@instaclustr.com to load the truststore file of the source cluster to your target cluster. Meanwhile, the Spark context should be configured using the following code:
1 2 3 4 5 6 7 8 | val sourceCluster = CassandraConnector( sc.getConf.set("spark.cassandra.connection.host", "<span style="font-weight: 400;"><source cluster IP></span>") .set("spark.cassandra.auth.username","<user name of source cluster>") .set("spark.cassandra.auth.password","<password of source cluster>") .set("spark.cassandra.connection.ssl.trustStore.password", "instaclustr") .set("spark.cassandra.connection.ssl.enabled","true") .set("spark.cassandra.connection.ssl.trustStore.type","jks") .set("spark.cassandra.connection.ssl.trustStore.path","/opt/spark/conf/source_truststore.jks")) |
Submit Comment