In this post, let’s explore an example of updating an existing Spark Streaming application to newer Spark Structured Streaming. We will start simple and then move to a more advanced Kafka Spark Structured Streaming examples.
My original Kafka Spark Streaming post is three years old now. On the Spark side, the data abstractions have evolved from RDDs to DataFrames and DataSets. RDDs are not the preferred abstraction layer anymore and the previous Spark Streaming with Kafka example utilized DStreams which was the Spark Streaming abstraction over streams of data at the time. Some of you might recall that DStreams was built on the foundation of RDDs.
From the Spark DStream API docs
“A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous sequence of RDDs (of the same type) representing a continuous stream of data (see org.apache.spark.rdd.RDD in the Spark core documentation for more details on RDDs). DStreams can either be created from live data (such as, data from TCP sockets, Kafka, Flume, etc.) using a StreamingContext or it can be generated by transforming existing DStreams using operations such as
reduceByKeyAndWindow. While a Spark Streaming program is running, each DStream periodically generates a RDD, either from live data or by transforming the RDD generated by a parent DStream.”
Because we try not to use RDDs anymore, it can be confusing when there are still Spark tutorials, documentation, and code examples that still show RDD examples. I’ve updated the previous Spark Streaming with Kafka example to point to this new Spark Structured Streaming with Kafka Example example to try to help clarify.
Ok, let’s show a demo and look at some code.
All source code is available on Github. See link below.
Spark Structured Streaming with Kafka Examples Overview
We are going to show a couple of demos with Spark Structured Streaming code in Scala reading and writing to Kafka. The Kafka cluster will consist of three multiple brokers (nodes), schema registry, and Zookeeper all wrapped in a convenient docker-compose example. Let me know if you have any ideas to make things easier or more efficient.
The Scala code examples will be shown running within IntelliJ as well as deploying to a Spark cluster.
The source code and docker-compose file are available on Github. See the Resources section below for links.
If you want to run these Kafka Spark Structured Streaming examples exactly as shown below, you will need:
- Docker compose
- IntelliJ Configured for Scala and Spark
Structured Streaming with Kafka Demo
Let’s take a look at a demo.
The following items or concepts were shown in the demo--
- Startup Kafka Cluster with
kafkacatas described in Generate Test Data in Kafka Cluster (used an example from a previous tutorial)
- Run the Spark Kafka example in IntelliJ
- Build a Jar and deploy the Spark Structured Streaming example in a Spark cluster with
This demo assumes you are already familiar with the basics of Spark, so I don’t cover it.
Spark Structured Streaming with Kafka CSV Example
For reading CSV data from Kafka with Spark Structured streaming, these are the steps to perform.
- Loaded CSV data into Kafka with
cat data/cricket.csv | kafkacat -b localhost:19092 -t cricket_csv
- Ran the example in IntelliJ
- The code to highlight is the
inputDFDataFrame and use of the
selectExprfunction where we utilized the
CASTbuilt SparkSQL function to deserialize the Kafka key and value from the INPUT_CSV topic into a new DataFrame called
- We output
inputCSVto the console with
Spark Structured Streaming with Kafka JSON Example
For reading JSON values from Kafka, it is similar to the previous CSV example with a few differences noted in the following steps.
- Load JSON example data into Kafka with
cat data/cricket.json | kafkacat -b localhost:19092 -t cricket_json -J
- Notice the
inputJsonDFDataFrame creation. A couple of noteworthy items are casting to String before using the
from_jsonfunction. We pass the
StructTypeas defined in
- Next, we create a filtered DataFrame called
selectDFand output to the console.
Spark Structured Streaming with Kafka Avro
Reading Avro serialized data from Kafka in Spark Structured Streaming is a bit more involved.
- First, load some example Avro data into Kafka with
cat data/cricket.json | kafka-avro-console-producer --broker-list localhost:19092 --topic cricket_avro --property value.schema="$(jq -r tostring data/cricket.avsc)"
- In the Scala code, we create and register a custom UDF called
deserializeand use it in two different ways: once in the creation of
valueDataFrameand the other in the creation of
jsonDf. This custom UDF is using a simple implementation of the Confluent
- To make the data more useful, we convert to a DataFrame by using the Confluent Kafka Schema Registry. In particular, check out the creation of
avroDf. A couple of things to note here. We seem to have to compute two conversions: 1) deserialize from Avro to JSON and then 2) convert from JSON with
from_jsonfunction similar to previous JSON example but using a DataType from the
spark-avrolibrary this time.
I don’t honestly know if this the most efficient straightforward way when using Avro formatted data with Kafka and Spark Structured Streaming, but I definitely want/need to use the Schema Registry. If you have some suggestions, please let me know.
Also, as noted in the source code, it appears there might be a different option available from Databricks’ available version of the
from_avrofunction. I’ll try it out in the next post.
Spark Structured Streaming Kafka Deploy Example
The build.sbt and project/assembly.sbt files are set to build and deploy to an external Spark cluster. As shown in the demo, just run
assembly and then deploy the jar.
Spark Structured Streaming Kafka Example Conclusion
As mentioned above, RDDs have evolved quite a bit in the last few years. Kafka has evolved quite a bit as well. However, one aspect which doesn’t seem to have evolved much is the Spark Kafka integration. As you see in the SBT file, the integration is still using 0.10 of the Kafka API. It doesn’t matter for this example, but it does prevent us from using more advanced Kafka constructs like Transaction support introduced in 0.11. In other words, it doesn’t appear we can effectively set the `isolation level` to `read_committed` from Spark Kafka consumer in other words.
The Bigger Picture
Hopefully, these examples are helpful for your particular use case(s). I’d be curious to hear more about what you are attempting to do with Spark reading from Kafka. Do you plan to build a Stream Processor where you will be writing results back to Kafka? Or, will you be writing results to an object store or data warehouse and not back to Kafka?
My definition of a Stream Processor in this case is taking source data from an Event Log (Kafka in this case), performing some processing on it, and then writing the results back to Kafka. These results could be utilized downstream from Microservice or used in Kafka Connect to sink the results into an analytic data store.
While I’m obviously a fan of Spark, I’m curious to hear your reasons to use Spark with Kafka. You have other options, so I’m interested in hearing from you. Let me know in the comments below.
- Source https://github.com/supergloo/spark-streaming-examples
- Multiple Broker Kafka Cluster with Schema Registry https://github.com/tmcgrath/docker-for-demos/tree/master/confluent-3-broker-cluster
- Structured Streaming Kafka Integration Guide https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html
- With schema registry https://stackoverflow.com/questions/48882723/integrating-spark-structured-streaming-with-the-confluent-schema-registry
Featured image credit https://pixabay.com/photos/water-rapids-stream-cascade-872016/