• Home
  • Contact

SUPERGLOO

  • Stream ProcessingStart Here
  • SparkSpark Tutorials
    • Spark Tutorials With Scala
    • Spark Tutorials with Python
  • KafkaKafka Tutorials
  • Books
    • Clean Code Summary
    • Mythical Man Month Summary
    • Learning Spark Summary
    • Pragmatic Programmer Summary
    • Spark Tutorials with Scala
    • Data Science from Scratch Summary
  • Courses
    • Scala for Spark Course
    • Spark with Scala Course
    • Spark with Python Course
  • Contact

Kafka Streams Joins Examples

Kafka Join Examples

Performing Kafka Streams Joins presents interesting design options when implementing streaming architecture patterns.

There are numerous applicable scenarios, but let’s consider an application might need to access multiple database tables or REST APIs in order to enrich a topic’s event record with context information. For example, perhaps we could augment records in a topic with sensor event data with location and temperature with the most current weather information for the location.  Furthermore, let’s say we require these weather lookups based on a sensor’s location to have extremely low processing latency which we cannot achieve with a database or REST API lookup.  In this case, we may wish to leverage the Kafka Streams API to perform joins of such topics (sensor events and weather data events), rather than requiring lookups to remote databases or REST APIs.  This could result in improved processing latency.  (If not entirely obvious, this previous example assumes we are piping sensor and weather events into Kafka topics)

In this Kafka Streams Joins examples tutorial, we’ll create and review sample code of various types of Kafka joins.  In addition, let’s demonstrate how to run each example.  The intention is a deeper dive into Kafka Streams joins to highlight possibilities for your use cases.

KAFKA STREAMS JOINS OPERATORS

When going through the Kafka Stream join examples below, it may be helpful to start with a visual representation of expected results join operands.

Kafka Joins Operand Expected Results

Kafka Joins Operand Expected Results

When we go through examples of Kafka joins, it may be helpful keep this above diagram in mind.  The color blue represents are expected results when performing the Kafka based joins.

Kafka Streams Joins Code Overview

We can implement Kafka joins in different ways.  In the following examples, we’ll cover the Kafka Streams DSL perspective.  From this approach, we’ll use the DSL for abstractions such as KTable, KStream and GlobalKTable.  We’ll cover various usage examples of these abstractions, but it’s important to note regardless of abstraction, joining streams involves :

  1. Specifying at least two input streams which are read from Kafka topics
  2. Performing transformations on the joined streams to produce results
  3. Writing the results back to Kafka topics

 

In essence, we will be creating miniature stream processing applications for each one of the join examples.

But first, how should we think about our choices of KTable vs KStream vs GlobalKTable?

KTable represents each data record as an upsert.  If an existing key in the stream exists, it will be updated.  If the key does not exist it will be inserted.  For those of you coming from relational databases, I like to think of KTable as a form of a reference table.  In my experience, the use of reference tables was concerned with using the latest values for a particular key rather than the entire history of a particular key.  The value of a reference table was looking up the most recent value of a particular key in a table, rather than all the values of a particular key.

KStream on the other hand is designed for when you are concerned with the entire history of data events for particular keys.  This is often referred to as each data record as being considered an insert (rather than an update or upsert in KTable).  For example, KStream would be utilized to process each sensor temperature readings in order to produce an average temperature over a period of time.  All the historical records are required to produce a reasonable average.  This is in contrast to KTable where you might wish to know the most recent average temperatures of all sensors in a particular region.  You wouldn’t use a KTable to calculate an average because KTable would always return the most recent individual temperature and not concerned with each individual event like KStream.

GlobalKTable, as the name implies, is a form of KTable.  Unlike a regular KTable which will represent 1 partition from the topic of which it is being composed, GlobalKTable, on the other hand, accounts for all partitions in the underlying topic.  As you can imagine, this has advantages but also performance related considerations as well.  Performance related considerations include increased storage and increased network transmission requirements.  Other benefits of GlobalKTable include no requirement for co-partitioning for joins, the ability to broadcast to all running instances of an application, and more join operations which we won’t cover in any detail here because of the introductory nature of this tutorial.

We do not cover co-partitioning in this tutorial but let me know if you’d like to explore further.

TYPES OF KAFKA STREAMS JOINS

Keep in mind there are essentially two types of joins: windowed and non-windowed.  Windowing allows us to control how to group records which have the same key.

In joins, a windowing state store is used to retain all the records within a defined window boundary.  Old records in the state store are purged after a defined retention period. The default window retention period is one day.  I’ll add relevant windowing where applicable in join examples below.

Kafka Streams Join Examples

Before we get into the Kafka Streams Join source code examples, I’d like to show a quick screencast of running the examples to help set some overall context and put you in a position to succeed.  As you’ll see, the examples are in Scala, but let me know if you’d like to see them converted to Java.  Let me know.

As you see in the screencast, we’re going to run all the Kafka Streams Joins examples through Scala tests.  If you want some background on this approach, it may be helpful to check out the previous Kafka Streams Testing post.

Code Examples

Let’s start with 3 examples of KTable to KTable joins.  These examples and other examples of Kafka Joins are contained in the com.supergloo.KafkaStreamsJoins class.  (All source code is available for download.  See link to it in the Reference section below and Screencast above for further reference.)

Each of the KTable to KTable join examples are within functions starting with the name kTableToKTable.  For example, an inner join example is within the kTableToKTableJoin function

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
 
def kTableToKTableJoin(inputTopic1: String,
                       inputTopic2: String,
                       storeName: String): Topology = {
 
  val builder: StreamsBuilder = new StreamsBuilder
 
  val userRegions: KTable[String, String] = builder.table(inputTopic1)
  val regionMetrics: KTable[String, Long] = builder.table(inputTopic2)
 
  userRegions.join(regionMetrics,
    Materialized.as(storeName))((regionValue, metricValue) => regionValue + "/" + metricValue)
 
  builder.build()
}
 

What’s going on in the code above?

Using a StreamBuilder we construct two KTable and perform the inner join.  In the args we are providing to join function, we are providing a specific instance of StateStore in Materialzed.as(storedName).   In essence, this StateStore is another KTable which is based on a Kafka topic.  I find it helps when I attempt to simplify the constructs behind the API.  In this case, we’re simply joining two topics based on keys and particular moments in time (message ordering in the topic).  The results of this join are stored to another Kafka topic for a period of time.  Finally, in the last portion of the call

1
2
3
 
((regionValue, metricValue) => regionValue + "/" + metricValue)
 

we’re providing an implementation of what to do with the values of each topic based on the join of keys.  If this is confusing, it will make sense when you see the results we are testing for next.  (Also, to really drive it home, try changing “/” to “-” for example and re-run the tests to see the failures.)

How to run the Kafka join examples?

To run the Kafka join examples, check out the com.supergloo.KafkaStreamsJoinsSpec test class as shown in the Screencast above.  Running this class will run all of the Kafka join examples.  For example, the following test will run this inner join test described above.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
 
// -------  KTable to KTable Joins ------------ //
"KTable to KTable Inner join" should "save expected results to state store" in {
 
  val driver = new TopologyTestDriver(
    KafkaStreamsJoins.kTableToKTableJoin(inputTopicOne, inputTopicTwo, stateStore),
    config
  )
 
  driver.pipeInput(recordFactory.create(inputTopicOne, userRegions))
  driver.pipeInput(recordFactoryTwo.create(inputTopicTwo, sensorMetric))
 
  // Perform tests
  val store: KeyValueStore[String, String] = driver.getKeyValueStore(stateStore)
 
  store.get("sensor-1") shouldBe "MN/99"
  store.get("sensor-3-in-topic-one") shouldBe null
  store.get("sensor-99-in-topic-two") shouldBe null
 
  driver.close()
}
 

The expected results specific to Kafka Joins will be in the tests

1
2
3
4
5
 
  store.get("sensor-1") shouldBe "MN/99"
  store.get("sensor-3-in-topic-one") shouldBe null
  store.get("sensor-99-in-topic-two") shouldBe null
 

Pay attention to how these tests differ from the other KTable to KTable join tests later in the test code.

Windowing note: As you might expect, KTable to KTable are non-windowed because of the nature of KTable where only the most recent keys are considered.

Next, let’s move on to KStream to KTable join examples.  Following the overall code organization of join implementations and test examples described above, we can find three examples of these joins in functions starting with the name “kStreamToKTable” in KafkaStreamsJoins.  Similarly, we can find examples of how to run the examples and differences in their tests in the KafkaStreamsJoinsSpec class.

As you’ll see in the implementation of the KStream to KTable examples, the API use is slightly different.  For example, in the inner join example

1
2
3
4
5
6
7
8
9
10
11
12
13
14
 
val userRegions: KTable[String, String] = builder.table(inputTopic1)
val regionMetrics: KStream[String, Long] = builder.stream(inputTopic2)
 
regionMetrics.join(userRegions){(regionValue, metricValue) =>
  regionValue + "/" + metricValue
}.to(outputTopicName)
 
val outputTopic: KTable[String, String] =
  builder.table(
    outputTopicName,
    Materialized.as(storeName)
  )
 

In this example above, we don’t have the option to provide a StateStore in the join.  So, instead, we use to function to pipe results to a new topic directly.  Then, we customize the StateStore by creating a KTable with the previously mentioned topic, so we can reference in the tests.

When moving to the KStream to KStream examples with a function name starting with “kStreamToKStream”, notice we need to provide a JoinWindow now.  For example

1
2
3
4
5
6
 
regionMetrics.join(userRegions)(
  ((regionValue, metricValue) => regionValue + "/" + metricValue),
  JoinWindows.of(Duration.ofMinutes(5L))
).to(outputTopicName)
 

The final two examples are KStream to GlobalKTable joins.  Again, the code is similar, but key differences include how to create a GlobalKTable and the join function signature as seen in the following.

1
2
3
4
5
6
7
8
9
 
val userRegions: GlobalKTable[String, String] = builder.globalTable(inputTopic1)
val regionMetrics: KStream[String, Long] = builder.stream(inputTopic2)
 
regionMetrics.join(userRegions)(
  (lk, rk) => lk,
  ((regionValue, metricValue) => regionValue + "/" + metricValue)
).to(outputTopicName)
 

Constructing a GlobalKTable is simple enough that it doesn’t require elaboration.

The join function signature changes to require a keyValueMapper: (lk, rk) => lk  This keyValueMapper is a function used to map the key,value pair from the KStream to the key of the GlobalKTable.  In this implemenation, nothing fancy.  We simply want the key of the KStream (represented as “lk), to match the key of the GlobalKTable.

As you might expect based on the aforementioned description of KTable vs GlobalKTable, the tests in KStream to GlobalKTable joins are nearly identical to KStream to KTable examples.

Kafka JoinS Examples Conclusion

Hopefully, you found these Kafka join examples helpful and useful.  If you have any questions or even better, suggestions on how to improve, please let me know.

 

Noteworthy

As shown in the screencast above, the path is not smooth when a failed test occurs. If you run a test which fails and then you attempt to rerun tests again, an Exception occurs and none of the tests pass.  The Exception is

org.apache.kafka.streams.errors.LockException: task [0_0] Failed to lock the state directory for task 0_0

The only way I’ve found to resolve is rm -rf /tmp/kafka-streams/testing/0_0/

This experience happens when running tests in both IntelliJ and SBT REPL.

Let me know any suggestions to resolve.

 

References
  • My Kafka Streams Github repo with all source code described above
  • Streaming Architectures
  • Kafka Documentation on Joining

 

 

Kafka Streams Joins Examples image credit: https://pixabay.com/en/network-networking-rope-connection-1246209/

 

Jan 22, 2019Todd M
Kafka Streams Testing with Scala Part 1Kafka Streams - Why Should You Care?
You Might Also Like
 
Apache Spark Cluster Part 2: Deploy Scala Program to Spark Cluster
 
Spark Streaming Example – How to Stream from Slack

Leave a Reply Cancel reply

Todd M

Provider of tutorials, training, and other learning resources for Data Engineers, Data Scientists, and Data Architects. I created a few courses and books.

29 days ago Kafka, Streamingkafka streams, scala77
Categories
  • Kafka
  • Spark
  • Streaming
  • Summary Series
Recent Posts
  • Kafka Streams – Transformations Examples February 13, 2019
  • Kafka Producer January 29, 2019
  • Kafka Consumer January 27, 2019
Most Commented
Spark SQL mySQL JDBC
Spark SQL MySQL Example with JDBC
19 Comments
Spark Cluster on EC2
How To: Apache Spark Cluster on Amazon EC2 Tutorial
13 Comments
Intellij Scala Spark
IntelliJ Scala and Apache Spark – Well, Now You Know
8 Comments
Tags
scalaspark tutorialpythonstreamingspark sqlbooksummaryspark pythonapache sparkkafka streamslearningsparksparkclusterintellijspark apachemachine learningchange data capturekinesisarchitecturekafka connectcassandracdc
  • Privacy Policy
  • Terms of Use
Most Viewed
Spark Transformation Examples
Apache Spark: Examples of Transformations
47,542 views
Intellij Scala Spark
IntelliJ Scala and Apache Spark – Well, Now You Know
42,186 views
Spark Streaming with Kafka
Spark Streaming with Kafka Example
36,300 views
Recent Posts
  • Kafka Streams – Transformations Examples
  • Kafka Producer
  • Kafka Consumer
  • Kafka Streams – Why Should You Care?
  • Kafka Streams Joins Examples
2019 © Supergloo