Want to use
joins with Cassandra? Or, help people already familiar with SQL leverage your Spark infrastructure? They can use their existing SQL based tools they already know such as Tableau or even MS Excel. Maybe both? This post describes how to provide answers. It describes how to configure the Apache Spark Thrift Server with Cassandra.
First, some quick background on Apache Spark Thrift Server.
Apache Spark Thrift Server is a port of Apache HiveServer2 which allows JDBC/ODBC clients to execute SQL queries. From my experience, these “clients” are typically business intelligence (BI) tools such as Tableau or even MS Excel or direct SQL access using their query tool of choice such as Toad, DBVisualizer, SQuirrel SQL Client.
Ok, great, but why do we care?
The Spark Thrift Server allows clients to utilize the in-memory, distributed architecture of the Spark SQL. In addition, the Thrift Server may be deployed as a Spark application which allows clients to share the same SparkContext and take advantage of caching across different JDBC/ODBC requests. Finally, through the Thrift Server and Spark SQL, we may be able to provide relational database concepts such as SQL JOINs in environments in which they are not supported such as Cassandra.
APACHE SPARK THRIFT SERVER with Cassandra Setup
- Start Cassandra and Load Sample Data
- Optional Start Spark Cluster
- Start Spark Thrift Server with Cassandra configuration
- Verify our environment
- Configure Hive Metastore
- Run example SQL
- Check out the Spark UI
- Small celebration
I demo all these steps in a screencast in case it helps. See the Screencast section below to view.
If you have Spark Cluster or have downloaded Apache Spark to your laptop, you already have everything you need to run Spark Thrift Server. Thrift Server is included with Spark.
Also, I assume you are familiar with the setup of Spark and Cassandra. I don’t walk through these steps. Instead, I just run through the steps to start everything up in my local environment. You will need to make some adjustments the following to best match your environment. For example, you will need to modify any references to
$SPARK_HOME in steps below to match the appropriate directory for your setup. Ok, enough chit chat, let’s go…
1. START CASSANDRA and load sample database
This tutorial assumes you have Cassandra running locally, but it isn’t a requirement. Your Cassandra cluster can be running someplace other than local. If Cassandra is running locally, the next step is load a sample keyspace and load up some data. In this tutorial, we’re going to use the CDM tool found at https://github.com/riptano/cdm-java. Using CDM is really simple. Just download the pre-built binary, update the permissions to make it executable (i.e.
chmod 755 cdm) and run it
cdm install killrweather. I show an example of running it in screencast below.
2. START YOUR SPARK MASTER, AT LEAST ONE WORKER AND THE THRIFT SERVER (OPTIONAL)
A running Spark cluster is optional in this Spark Thrift tutorial. If you want to run a minimal cluster with one worker on your laptop, you can perform something similar to the following
I bet you already knew this. Anyhow, movin on…
3. START THE THRIFT SERVER AND SET CONFIGURATION FOR CASSANDRA
If you did not perform step 2 or do not have an available Spark cluster, then run
$SPARK_HOME/sbin/start-thriftserver.sh --packages com.datastax.spark:spark-cassandra-connector_2.11:2.0.2 --conf spark.cassandra.connection.host=127.0.0.1
This will run Thrift Server in local[*] mode which is fine for this quick start.
Alternatively, if you do have a Spark cluster, you can also pass in –master arg
$SPARK_HOME/sbin/start-thriftserver.sh --packages com.datastax.spark:spark-cassandra-connector_2.11:2.0.2 --conf spark.cassandra.connection.host=127.0.0.1 --master spark://<spark-master>:7077
Notice how we are passing in the Spark Cassandra Connector and the
cassandra.connection.host config here? Of course you do, because you do not let details like this get past you. You are stickler for details as they say.
4. Configure Hive Metastore with beeline client
Next, we’re going to update the hive metastore. The Hive metastore is what Spark Thrift Server uses to know connection parameters of registered data sources. In this example, the data source is cassandra of course.
To update the metastore, we’re going to use the Apache Beeline client. Beeline is a command shell that works with HiveServer2 using JDBC. It is based on SQLite CLI.
beeline> !connect jdbc:hive2://localhost:10000
In my setup, I can disregard the username and password prompts… just enter key to answer both prompts.
jdbc:hive2://localhost:10000> CREATE TABLE raw_weather_data USING org.apache.spark.sql.cassandra OPTIONS (keyspace 'isd_weather_data', table 'raw_weather_data');
jdbc:hive2://localhost:10000> CREATE TABLE weather_station USING org.apache.spark.sql.cassandra OPTIONS (keyspace 'isd_weather_data', table 'weather_station');
We registered two tables because we are going to use SQL JOINs in future sections of this Spark Thrift tutotial. It’s probably noteworthy to mention these tables may be cached as well. But, I’m not going to show how to do that in this example, but leave a comment, if you have any questions.
5. VERIFY OUR SPARK, CASSANDRA AND THRIFT SERVER SETUP
Before running some more complex SQL, let’s just verify our setup with a smoke test. Still, within the connected beeline client, issue a simple SQL statement
jdbc:hive2://localhost:10000> select * from raw_weather_data limit 10;
You should see some results. 10 rows to be precise you cheeky bastard.
6. SQL with Spark Examples
Let’s show more complex SQL examples and take things another step. I’m going to use SQuirreL SQL client, but the concepts apply to any SQL client. If you want more detail on setting up SQuirrel with Thrift, see the Reference section below.
Joins? Joins you say! No problem
SELECT ws.name, raw.temperature
FROM raw_weather_data raw
JOIN weather_station ws
WHERE raw.wsid = '725030:14732'
AND raw.year = 2008 AND raw.month = 12 AND raw.day = 31;
Well, well, special, special. Let’s turn it to 11…
SELECT wsid, year, month, day, daily.avg_temp,
dense_rank() OVER (PARTITION BY wsid order by avg_temp desc) as rank
(select wsid, year, month, day, avg(temperature) as avg_temp
group by wsid, year, month,day ) daily
JOIN weather_station on ranked_temp.wsid = weather_station.id
where rank <= 5;
7. JDBC / ODBC server tab in SPARK UI
Once again, the Spark UI is a valuable resource for us. Now, that we’ve run a few queries, let’s take a look at what we can see…
http://localhost:4040/ in your browser. You should be redirected to /jobs
If you started Spark Thrift Server with a –master argument and pointed a cluster, then you can open http://localhost:8080 and get to this screen as well.
Notice the JDBC/ODBC tab. That’s the Thrift Server part. You did that. Check things out. You can see the query plans, details of the Spark jobs such as stages and tasks, how did shuffling go? etc. etc.
8. Small Celebration
“Let’s celebrate, it’s all right… we gonna have a good time tonight” — Kool and the Gang
Sing it with me.
Hopefully, this Apache Spark Thrift Server tutorial helps get you started. I’ve also put together a quick screencast of me going through these steps above in case it helps. See the Screencast section below.
If you have any questions or ideas for corrections, let me know in the comments below.
Update November 2017
I intertwined two different data sources in the above post. When I originally wrote this, I had a keyspace called ‘isd_weather_data’ from the killrweather reference application. But, in this tutorial, I used
cdm to create and load a keyspace called
killrweather. You can see this when looking at how the tables are created in the above when referencing
isd_weather_data. This shouldn’t have been there. When using
cdm as described in this tutorial, replace
killrweather. And not only that…. the data appears different between the two sources. So, the
join examples produce results when using isd_weather_data and none when using killrweather. I’m presuming differences between https://github.com/killrweather/killrweather/blob/master/data/weather_stations.csv and https://github.com/killrweather/killrweather-data/blob/master/data/weather_station.csv is the root cause of no results with joins. For example, CSV contains weather_station id
725030:14732. Sorry if this causes any confusion.
- Spark Thrift Server https://spark.apache.org/docs/latest/sql-programming-guide.html#running-the-thrift-jdbcodbc-server
- CDM https://github.com/riptano/cdm-java
- Apache Beeline https://cwiki.apache.org/confluence/display/Hive/HiveServer2+Clients
- Killrweather Sample query inspiration https://github.com/killrweather/killrweather/wiki/7.-Spark-and-Cassandra-Exercises-for-KillrWeather-data
- SQuirrel SQL client setup with Thrift https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-sql-thrift-server.html#SQuirreL-SQL-Client
Image credit https://flic.kr/p/4ykKnp