• Home
  • Contact

SUPERGLOO

  • Books
    • Clean Code Summary
    • Mythical Man Month Summary
    • Learning Spark Summary
    • Pragmatic Programmer Summary
    • Spark Tutorials with Scala
    • Data Science from Scratch Summary
  • Spark CoursesApache Spark Training
    • Spark with Python Course
    • Spark with Scala Course
  • SparkSpark Tutorials
    • Spark Tutorials With Scala
    • Spark Tutorials with Python
  • KafkaKafka Tutorials
  • Scala Course
  • Contact

Spark Kinesis Example – Moving Beyond Word Count

spark kinesis

If you are looking for Spark with Kinesis example, you are in the right place.  This Spark Kinesis tutorial intends to help you become better at integrating the two.  In this post, I’m going to provide a custom Spark Kinesis code example and a screencast of running it.  We’re going to cover running, configuring, sending sample data and AWS setup.  Finally, I’m going to list out some links for content which helped me become more comfortable in Spark Kinesis code and configuration.  If you have questions or suggestions, please let me know in the comment form below.

Spark Kinesis Tutorial Example Overview

In this example, we’re going to simulate sensor devices recording their temperature to a Kinesis stream.  This Kinesis stream will be read from our Spark Scala program every 2 seconds and notify us of two things:

  1. If a sensor’s temperature is above 100
  2. The top two sensors’ temps over the previous 20 seconds

 

So, nothing too complicated, but close enough to a possible real-world scenario of reading and analyzing stream(s) of data and acting on certain results.

In this tutorial, here’s how we’re going to cover things in the following order

  1. Check assumptions
  2. Present the code (Scala) and configuration example
  3. Go through AWS Kinesis setup and also Amazon Kinesis Data Generator
  4. Run in IntelliJ
  5. How to build and deploy outside of IntelliJ
  6. Rejoice, savor the moment, and thank the author of this tutorial with $150 PayPal donation

 

Sound good? I hope so, let’s begin.

Spark with Kinesis TUTORIAL Assumptions

I’m making the following assumptions about you when writing this tutorial.

  1. You have basic understanding of Amazon Kinesis
  2. You have experience writing and deploying Apache Spark programs
  3. You have an AWS account and understand using AWS costs money.  (AKA: moolah, cash money).  Not my money, your money.  It costs you or your company money to run in AWS.
  4. You have set your AWS access id and key appropriately for your environment.

 

If any of these assumptions are incorrect, you are probably going to struggle with this Spark Kinesis integration tutorial.

 

SPARK with KINESIS EXAMPLE SCALA CODE

 

Let’s start with the code…

Spark (Scala) with Kinesis
Scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
 
object SparkKinesisExample extends Logging {
 
  def main(args: Array[String]) {
 
    val conf = new SparkConf().setAppName("Kinesis Read Sensor Data")
    conf.setIfMissing("spark.master", "local[*]")
 
    // Typesafe config - load external config from src/main/resources/application.conf
    val kinesisConf = ConfigFactory.load.getConfig("kinesis")
 
    val appName = kinesisConf.getString("appName")
    val streamName = kinesisConf.getString("streamName")
    val endpointUrl = kinesisConf.getString("endpointUrl")
 
    val credentials = new DefaultAWSCredentialsProviderChain().getCredentials()
    require(credentials != null,
      "No AWS credentials found. See http://docs.aws.amazon.com/AWSSdkDocsJava/latest/DeveloperGuide/credentials.html")
    val kinesisClient = new AmazonKinesisClient(credentials)
    kinesisClient.setEndpoint(endpointUrl)
    val numShards = kinesisClient.describeStream(streamName).getStreamDescription().getShards().size
 
    val numStreams = numShards
    val batchInterval = Milliseconds(2000)
    val kinesisCheckpointInterval = batchInterval
 
    // Get the region name from the endpoint URL to save Kinesis Client Library metadata in
    // DynamoDB of the same region as the Kinesis stream
    val regionName = RegionUtils.getRegionByEndpoint(endpointUrl).getName()
 
    val ssc = new StreamingContext(conf, batchInterval)
 
    // Create the Kinesis DStreams
    val kinesisStreams = (0 until numStreams).map { i =>
      KinesisUtils.createStream(ssc, appName, streamName, endpointUrl, regionName,
        InitialPositionInStream.LATEST, kinesisCheckpointInterval, StorageLevel.MEMORY_AND_DISK_2)
    }
 
    // Union all the streams (in case numStreams > 1)
    val unionStreams = ssc.union(kinesisStreams)
 
    val sensorData = unionStreams.map { byteArray =>
      val Array(sensorId, temp, status) = new String(byteArray).split(",")
      SensorData(sensorId, temp.toInt, status)
    }
 
    val hotSensors: DStream[SensorData] = sensorData.filter(_.currentTemp > 100)
 
    hotSensors.print(1) // remove me if you want... this is just to spit out timestamps
 
    println(s"Sensors with Temp > 100")
    hotSensors.map { sd =>
      println(s"Sensor id ${sd.id} has temp of ${sd.currentTemp}")
    }
 
    // Hotest sensors over the last 20 seconds
    hotSensors.window(Seconds(20)).foreachRDD { rdd =>
      val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
      import spark.implicits._
 
      val hotSensorDF = rdd.toDF()
      hotSensorDF.createOrReplaceTempView("hot_sensors")
 
      val hottestOverTime = spark.sql("select * from hot_sensors order by currentTemp desc limit 5")
      hottestOverTime.show(2)
    }
 
    // To make sure data is not deleted by the time we query it interactively
    ssc.remember(Minutes(1))
 
    ssc.start()
    ssc.awaitTermination()
  }
}
case class SensorData(id: String, currentTemp: Int, status: String)
 

(Note: The entire code is available from my Github repo.  See links in the Resources section below.)

The first 20 or so lines of the main function are just setting things up.  For example, we are reading the particulars of the Kinesis stream (streamName, endpointURL, etc.) from a config file.  You will need to change the config variables in the file src/main/resources/application.conf to values appropriate for your Kinesis setup.

Then, we start utilizing the code provided by Amazon.  You’ll see reference to spark-streaming-kinesis-asl in the build.sbt file in the Github repo.

Next, we create a dynamic number of streams kinesisStreams based on the number of shards configured in our Kinesis stream.  Make sure to view the screencast below for further insight on this subject.  In the screencast, I go over setting up Kinesis and running this program.

We utilize the AWS SDK KinesisUtils object’s createStream method to register each stream.  We set the initial position to start in the stream as the very latest record.  In other words, don’t worry about anything that has already been added to the stream.

(Some of you might be wondering at this point… this doesn’t look like Spark Structured Streaming?  And then you might be thinking, we should be using Structured Streaming for anything new right?  I’d say you’re right.  But, at the time of this writing, Structured Streaming for Kinesis is not available in Spark outside of DataBricks.  Relevant links on this subject below in the Resouces section.)

After we create the streams, we perform a union see can analyze each stream as one.  After the union, we convert our stream from an Array[Byte] to DStream[SensorData].  If you set up your Kinesis stream with 1 shard as I did, there will only be one stream.

From here, we perform our pseudo business logic.  We are looking for sensors which might be running hot (e.g. over 100).  If this was a real application, our code might trigger an event based on this temperature.

This example also gives us the opportunity to perform some spark streaming windowing.  Windowing allows us to analyze and consider data that previously arrived in the stream and not only data present compute time (the current interation of micro-batch).  For example, if we wanted to the determine hottest two sensors over the past 20 seconds and not just sensor data in a particular batch.  To this, consider the code starting at hotSensors.window block.  Within the block, notice the import for implicits.  This allows us to convert our RDDs of SensorData to DataFrames later in the block.

That’s it.  I think the code speaks for itself, but as I mentioned above, let me know if you have any questions or suggestions for improvement.

AWS KINESIS SETUP

Setting up Kinesis requires an AWS account.  In this example, it’s nothing fancy.  I created in the us-west-2 region because that’s where the Kinesis Generator is, but I don’t think it really matters.  The Kinesis stream is just 1 shard (aka partition) with default settings on others.  I did need to modify security policies to work, which I show in the screencast.

Amazon Kinesis Generator SETUP

I like being lazy sometimes.  So, when I found that Amazon provides a service to send fake data to Kinesis, I jumped all over it.  I mean, sure, I could write my own Java, Python or Scala program to do it, but using this Kinesis Generator was easier and faster.  I like how it has integrated Faker in order to provide dynamic data.

RUN Spark Kinesis Scala code in IntelliJ

I’m going to run this in IntelliJ because it simulates how I work.  I like to develop and test in IntelliJ first before building and deploying a jar.  So, check out the screencast for some running-in-intellij-fun. In order to run in IntelliJ, I’ve customized my build.sbt file and updated the Run/Debug window in Intellij to intellijRunner classpath.  Also, as previously mentioned, you need to update the configuration settings in the src/main/resources/application.conf file.

Finally, as previously mentioned I assume you have your AWS security (id and key) setup with confirmed working.  I don’t show how I set mine up in the screencast, but I include a link for more information in Resources below.

BUILD and Deploy SPark Kinesis Example

The entire project is configured with SBT assembly plugin.  Seeproject/assembly.sbt  To build a deployable jar, run the assembly sbt task.  Nothing fancy in the deploy then…just deploy with spark-submit and reference the com.supergloo.SparkKinesisExample class.

Cash Money Time

I like money.  This is the part where you send me $150.   I’ll wait here until you send it.  Thanks in advance.

 

Screencast

While I’m waiting for you to send me money, check out the screencast

I’m still waiting for you to send the cash $$$ by the way.

 

Resources

  • Github repo with Spark 2 Streaming examples including this one above https://github.com/tmcgrath/spark-2-streaming
  • Amazon Kinesis Data Generator https://awslabs.github.io/amazon-kinesis-data-generator/web/help.html
  • Structured Streaming for Kinesis is available from DataBricks https://databricks.com/blog/2017/08/09/apache-sparks-structured-streaming-with-amazon-kinesis-on-databricks.html
  • Structured Streaming for Kinesis appears to be not available in OSS https://issues.apache.org/jira/browse/SPARK-18165
  • Configuring AWS access credentials http://docs.aws.amazon.com/AWSSdkDocsJava/latest/DeveloperGuide/credentials.html

 

 

Featured image credit: https://flic.kr/p/6vfaHV

 

Oct 20, 2017Todd M
Spark Performance Monitoring Tools - A List of Options
You Might Also Like
 
Spark Streaming with Kafka Example
 
Spark SQL JSON Examples in Python using World Cup Player Data
Todd M

Provider of tutorials, training, and other learning resources for Data Engineers, Data Scientists, and Data Architects. I created a few courses and books.

6 months ago Sparkkinesis, spark sql, streaming1,671
Recent Posts
  • Spark Kinesis Example – Moving Beyond Word Count October 20, 2017
  • Spark Performance Monitoring Tools – A List of Options September 18, 2017
  • Spark FAIR Scheduler Example September 15, 2017
Most Commented
Spark SQL mySQL JDBC
Spark SQL MySQL Example with JDBC
18 Comments
Spark Cluster on EC2
How To: Apache Spark Cluster on Amazon EC2 Tutorial
13 Comments
Intellij Scala Spark
IntelliJ Scala and Apache Spark – Well, Now You Know
8 Comments
Tags
scalaspark tutorialpythonspark sqlbooksummaryapache sparkspark pythonstreaminglearningsparkspark apachesparkmachine learningclusterkinesisphoenixframeworkcassandra
  • Privacy Policy
  • Terms of Use
Most Viewed
Spark Transformation Examples
Apache Spark: Examples of Transformations
34,969 views
Intellij Scala Spark
IntelliJ Scala and Apache Spark – Well, Now You Know
31,837 views
Spark Streaming with Kafka
Spark Streaming with Kafka Example
22,959 views
Recent Posts
  • Spark Kinesis Example – Moving Beyond Word Count
  • Spark Performance Monitoring Tools – A List of Options
  • Spark FAIR Scheduler Example
  • Spark Tutorial – Performance Monitoring with History Server
  • Apache Spark Thrift Server Load Testing Example
2017 © Supergloo