Apache Spark aggregateByKey Example

In this Spark aggregateByKey example post, we will discover how aggregationByKey could be a better alternative of groupByKey transformation when aggregation operation is involved. The most common problem while working with key-value pairs is grouping of values and aggregating them with respect to a common key. And Spark aggregateByKey transformation decently addresses this problem in a very intuitive way.

Any RDD with key-value pair data is refereed as PairRDD in Spark. For any transformation on PairRDD, the initial step is grouping values with respect to a common key.

Spark aggregateByKey function aggregates the values of each key, using given combine functions and a neutral “zero value”

The aggregateByKey function aggregates values for each key and and returns a different type of value for that key. Let us say we have RDD with a tuple of Student, Subject and marks scored in that subject. Now we want the output RDD with student and maximum marks or percentage. Such transformations you can do using aggregateByKey function.

Before we look at the aggregateByKey examples in scala and python, let’s have a look at this transformation function in detail.

aggregateByKey function in Spark accepts total 3 parameters,

  1. Initial value or Zero value
    • It can be 0 if aggregation is type of sum of all values
    • We have have this value as Double.MaxValue if aggregation objective is to find minimum value
    • We can also use Double.MinValue value if aggregation objective is to find maximum value
    • Or we can also have an empty List or Map object, if we just want a respective collection as an output for each key
  2. Sequence operation function which transforms/merges data of one type [V] to another type [U]
  3. Combination operation function which merges multiple transformed type [U] to a single type [U]

How to interpret the Syntax of aggregateByKey function?

Let’s take an example for finding maximum marks in a single subject of a student using aggregateByKey.

Here your source RDD will be of type PairRDD[String, (String, Double)] where key is student name with datatype String and value is subject name and marks with datatype (String, Double)

Now, following are the three parameters of aggregateByKey function,

  • zeroValue: As we are finding maximum marks out of all subjects we should use Double.MinValue (which is also known as an accumulator)
  • seqOp: Sequential operation is an operation of finding maximum marks (operation at each partition level data)
  • combOp: Combiner operation is an operation of finding maximum marks from two values (operation on aggregated data of all partitions)

WHY WE NEED TWO FUNCTIONS ?

Here in above parameters the fundamental question from many of us is WHY WE NEED TWO FUNCTIONS?????

Let me help understanding this.

  • First argument is clear as it is an accumulation value or initial value of an accumulator
  • Second argument seqOp is an operation which aggregates all values of a single partition
  • Finally third argument combOp is similar as seqOp and which further aggregates all aggregated values from different partitions

Now, let me make it very simple with above example. In above example,

seqOp function will be applied to each element of the PairRDD[String, (String, Double)] . As a result, it gives the maximum marks of a student out of all subjects. As your RDD is distributed, it could be possible that multiple partitions may have records of a single student. Hence, seqOp function is applied to a single partition and finds the maximum marks from that partition.

So what about finding maximum marks from different partitions?

Remember function combOp ? It does take care of finding maximum marks across multiple partitions for a student. combOp function will be applied to all aggregated values of different partitions (i.e. output of seqOp ). And finally we have an aggregated value with respect to a single key.

Apache Spark aggregateByKey Transformation
Apache Spark aggregateByKey Transformation

Important Points

  • Performance wise aggregateByKey is an optimized transformation
  • aggregateByKey is a wider transformation
  • We should use aggregateByKey when aggregation required plus type of input and output RDDs are different
  • We can use reduceByKey in case of operations with aggregateByKey having same RDD types of input and output RDDs

Following examples will give you more clarity.

Spark aggregateByKey Examples in Scala

Let’s extend above example by adding more complexity. How about printing over all percentage of all students using aggregateByKey?

Now let’s have a look at the same aggregateByKey examples in PySpark.

PySpark aggregateByKey Examples

Hope you like this post. Stay tuned for more posts on RDD Transformations and Actions.

Leave a Reply

Your email address will not be published. Required fields are marked *