Apache Spark reduceByKey Example

Looking at spark reduceByKey example, we can say that reduceByKey is one step ahead then reduce function in Spark with the contradiction that it is a transformation operation. Let’s understand this operation by some examples in Scala, Java and Python languages.

Spark RDD reduceByKey function merges the values for each key using an associative reduce function.

Basically reduceByKey function works only for RDDs which contains key and value pairs kind of elements(i.e RDDs having tuple or Map as a data element). It is a transformation operation which means it is lazily evaluated. We need to pass one associative function as a parameter, which will be applied to the source RDD and will create a new RDD as with resulting values(i.e. key value pair). This operation is a wide operation as data shuffling may happen across the partitions.

The associative function (which accepts two arguments and returns a single element) should be Commutative and Associative in mathematical nature. That intuitively means, this function produces same result when repetitively applied on same set of RDD data with multiple partitions irrespective of element’s order. Additionally, it performs merging locally using reduce function and than sends records across the partitions for preparing the final results.

Apache Spark reduceByKey Example
Apache Spark reduceByKey Example

In above image you can see that RDD X has set of multiple paired elements like (a,1) and (b,1) with 3 partitions. It accepts a function (accum, n) => (accum + n) which initialize accum variable with default integer value 0, adds up an element for each key and returns final RDD Y with total counts paired with key. And before shuffling the data across the partitions it does the same aggregation locally for each partition.

Important points to note are,

  • reduceByKey is a transformation operation in Spark hence it is lazily evaluated
  • It is a wide operation as it shuffles data from multiple partitions and creates another RDD
  • Before sending data across the partitions, it also merges the data locally using the same associative function for optimized data shuffling
  • It can only be used with RDDs which contains key and value pairs kind of elements
  • It accepts a Commutative and Associative function as an argument
    • The parameter function should have two arguments of the same data type
    • The return type of the function also must be same as argument types

This function has three variants

  1. reduceByKey(function)
  2. reduceByKey(function, [numPartition])
  3. reduceByKey(partitioner, function)
  • Variants 1 will generate hash-partitioned output with existing partitioner
  • Variants 2 will generate hash-partitioned output with number of partitions given by numPartition
  • Variants 3 will generate output using Partitioner object referenced by partitioner

All of above returns RDD with key value pair.

Let’s have some examples,

Spark reduceByKey Example Using Scala
// Bazic reduceByKey example in scala
// Creating PairRDD x with key value pairs
scala> val x = sc.parallelize(Array(("a", 1), ("b", 1), ("a", 1),
     | ("a", 1), ("b", 1), ("b", 1),
     | ("b", 1), ("b", 1)), 3)
x: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[1] at parallelize at <console>:21

// Applying reduceByKey operation on x
scala> val y = x.reduceByKey((accum, n) => (accum + n))
y: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[2] at reduceByKey at <console>:23

scala> y.collect
res0: Array[(String, Int)] = Array((a,3), (b,5))

// Another way of applying associative function
scala> val y = x.reduceByKey(_ + _)
y: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[3] at reduceByKey at <console>:23

scala> y.collect
res1: Array[(String, Int)] = Array((a,3), (b,5))

// Define associative function separately
scala> def sumFunc(accum:Int, n:Int) =  accum + n
sumFunc: (accum: Int, n: Int)Int

scala> val y = x.reduceByKey(sumFunc)
y: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[4] at reduceByKey at <console>:25

scala> y.collect
res2: Array[(String, Int)] = Array((a,3), (b,5))
Spark reduceByKey Example Using Java 8
package com.backtobazics.sparkexamples;

import java.util.Arrays;

import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;

import scala.Tuple2;

public class ReduceByKeyExample {
    public static void main(String[] args) throws Exception {
        JavaSparkContext sc = new JavaSparkContext();
        
        //Reduce Function for sum
        Function2<Integer, Integer, Integer> reduceSumFunc = (accum, n) -> (accum + n);
        
        
        // Parallelized with 2 partitions
        JavaRDD<String> x = sc.parallelize(
                        Arrays.asList("a", "b", "a", "a", "b", "b", "b", "b"),
                        3);
        
        // PairRDD parallelized with 3 partitions
        // mapToPair function will map JavaRDD to JavaPairRDD
        JavaPairRDD<String, Integer> rddX = 
                        x.mapToPair(e -> new Tuple2<String, Integer>(e, 1));
        
        // New JavaPairRDD 
        JavaPairRDD<String, Integer> rddY = rddX.reduceByKey(reduceSumFunc);
        
        //Print tuples
        for(Tuple2<String, Integer> element : rddY.collect()){
            System.out.println("("+element._1+", "+element._2+")");
        }
    }
}

// Output:
// (b, 5)
// (a, 3) 
Spark reduceByKey Example Using Python
# Bazic reduceByKey example in python
# creating PairRDD x with key value pairs
>>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1), ("a", 1),
... ("b", 1), ("b", 1), ("b", 1), ("b", 1)], 3)

# Applying reduceByKey operation on x
>>> y = x.reduceByKey(lambda accum, n: accum + n)
>>> y.collect()
[('b', 5), ('a', 3)]

# Define associative function separately 
>>> def sumFunc(accum, n):
...     return accum + n
...
>>> y = x.reduceByKey(sumFunc)
>>> y.collect()
[('b', 5), ('a', 3)]

References:

4 thoughts on “Apache Spark reduceByKey Example”

  1. Man Varun,

    This is awsome. I have been learning scala on my own, this is a lot help.

    I am a consultant and just starting to get comfortable with Scala and Spark. Would you help
    me if I am a client site. I can also pay you. Drop me a mail and let me know.

    thanks

    Kevin

  2. How do you accumulate the key/value pairs in the following format:
    x = sc.parallelize([(“a”, (1,1,1)), (“b”, (1,1,1)), (“a”, (1,1,1)), (“a”, (1,1,1)),
    … (“b”, (1,1,1)), (“b”, (1,1,1)), (“b”, (1,1,1)), (“b”, (1,1,1))],)

    I want to do the element wise sum of the values.

    Thanks.

    1. Hi Ashish, It is pretty simple. Go through following code..


      scala> val rdd = sc.parallelize(List(("a", (1,1,1)), ("b", (1,1,1)), ("a", (1,1,1)), ("a", (1,1,1)), ("b", (1,1,1)), ("b", (1,1,1)), ("b", (1,1,1)), ("b", (1,1,1))))
      scala> rdd.reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2, x._3 + y._3)).collect

Leave a Reply to Kevin Roger Cancel reply

Your email address will not be published. Required fields are marked *

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>