Spark Broadcast and Accumulator Examples in Scala

Spark Broadcast and Accumulator Overview

So far, we’ve learned about distributing processing tasks across a Spark cluster.  But, let’s go a bit deeper in a couple of approaches you may need when designing distributed tasks.  I’d like to start with a question.  What do we do when we need each Spark worker task to coordinate certain variables and values with each other?  I mean, let’s imagine we want each task to know the state of variables or values instead of simply independently returning action results back to the driver program.   If you are thinking of terms such as shared state or “stateful” vs. “stateless”, then you are on the right track.  Or, that’s how I think of the Spark Broadcast and Accumulators.  In this post, we’ll discuss two constructs of sharing variables across a Spark cluster and then review example Scala code.

Spark Shared Variables

When functions are passed to a specific Spark operation, it is executed on a particular remote cluster node.  Usually, the operation is done in a way that different copy of variable(s) are used within the function. These particular variables are carefully copied into the different machines, and the updates to the variables in the said remote machines are not propagated back to the driver program. For this reason, one cannot support the general; read-write shared variables across the tasks and expects them to be efficient. Nevertheless, Spark does provide two different types (limited) of shared variables to two known usage patterns.

  • Broadcast variables
  • Accumulators

Broadcast Variables

Broadcast variables allow Spark developers to keep a secured read-only variable cached on different nodes, other than merely shipping a copy of it with the needed tasks. For an instance, they can be used to give a node a copy of a large input dataset without having to waste time with network transfer I/O. Spark has the ability to distribute broadcast variables using various broadcast algorithms which will in turn largely reduce the cost of communication.

Actions in Spark can be executed through different stages. These stages are separated by distributed “shuffle” operations. Within each stage, Spark automatically broadcasts common data needed in a cached, serialized form which will be de-serialized by each node before the running of each task.  For this reason, if you create broadcast variables explicitly, it should only be done when tasks across multiple stages are in need of same data.

Broadcast variables are created by wrapping with SparkContext.broadcast function as shown in the following Scala code

 

Accumulators

As you might assume from the name, Accumulators are variables which may be added to through associated operations.  There are many uses for accumulators including implementing counters or sums.  Spark supports the accumulation of numeric types easily, but programmers can add support for other types.  If there is a particular name for an accumulator in code, it is usually displayed in the Spark UI, which will be useful in understanding the running stage progress.

Accumulators are created from an initial value v; i.e. SparkContext.accumulator(v).  Then the tasks running in the cluster can be added to it using the known “add method” or += operator in Scala. They cannot, however, read the value of it. The driver program has the ability to read the value of the accumulator, using the value method as shown below

 

Spark Broadcast and Spark Accumulators Example Driver Program in Scala

With this background on broadcast and accumulators, let’s take a look at more extensive examples in Scala.  The context of the following example code is developing a web server log file analyzer for certain types of http status codes. We can easily imagine the advantages of using Spark when processing a large volume of log file data.  See the Resources section below for source code download links.

Let’s start with an object containing the main method and definition of one broadcast variable and numerous accumulators:

 

As you can hopefully see above, we plan to use the httpStatusList when determining which accumulator to update.

populateHttpStatusList is available from the import to Utils and looks like

 

AccessLogParser could be considered a wrapper for regular expression boogie woogie as seen next

The real credit for this regex mastery goes to alvinj at https://github.com/alvinj and in particular https://github.com/alvinj/ScalaApacheAccessLogParser/blob/master/src/main/scala/AccessLogParser.scala

Resources

 

Featured image credit https://flic.kr/p/wYHqe

Leave a Reply

Your email address will not be published. Required fields are marked *

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code class="" title="" data-url=""> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong> <pre class="" title="" data-url=""> <span class="" title="" data-url="">