Apache Spark groupBy Example

Spark groupBy example can also be compared with groupBy of SQL. In spark, groupBy is a transformation operation. Let’s have some overview first then we’ll understand this operation by some examples in Scala, Java and Python languages.

Spark RDD groupBy function returns an RDD of grouped items.

Spark groupBy function is defined in RDD class of spark. It is a transformation operation which means it is lazily evaluated. We need to pass one function (which defines a group for an element) which will be applied to the source RDD and will create a new RDD as with the individual groups and the list of items in that group. This operation is a wide operation as data shuffling may happen across the partitions.

This operation will return the new RDD which basically is made up with a KEY (which is a group) and list of items of that group (in a form of Iterator). Order of element within the group may not same when you apply the same operation on the same RDD over and over. Additionally, this operation considered to be very costly when you are trying to perform some aggregation on grouped items.

Apache Spark groupBy Example
Apache Spark groupBy Example

In above image you can see that RDD X contains different words with 2 partitions. It accepts a function word => word.charAt(0) which will get the first character of the word in upper case (which will be considered as a group). The output RDD Y which will contain the group(first character of the word) as a key and Iterator of all words belong to that group (all words starting from a character).

Important points to note are,

  • groupBy is a transformation operation in Spark hence it is lazily evaluated
  • It is a wide operation as it shuffles data from multiple partitions and create another RDD
  • It is a costly operation as it doesn’t us combiner local to a partition to reduce the data transfer
  • Not recommended to use when you need to do further aggregation on grouped data

This function has three variants

  1. groupBy(function)
  2. groupBy(function, [numPartition])
  3. groupBy(partitioner, function)
  • Variants 1 will generate hash-partitioned output with existing partitioner
  • Variants 2 will generate hash-partitioned output with number of partitions given by numPartition
  • Variants 3 will generate output using Partitioner object referenced by partitioner

Let’s have some examples,

Spark groupBy Example Using Scala
// Bazic groupBy example in scala
scala> val x = sc.parallelize(Array("Joseph", "Jimmy", "Tina",
     | "Thomas", "James", "Cory",
     | "Christine", "Jackeline", "Juan"), 3)
x: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[16] at parallelize at <console>:21

// create group per first character
scala> val y = x.groupBy(word => word.charAt(0))
y: org.apache.spark.rdd.RDD[(Char, Iterable[String])] = ShuffledRDD[18] at groupBy at <console>:23

scala> y.collect
res0: Array[(Char, Iterable[String])] = Array((T,CompactBuffer(Tina, Thomas)), (C,CompactBuffer(Cory,
 Christine)), (J,CompactBuffer(Joseph, Jimmy, James, Jackeline, Juan)))

// Another short syntax
 scala> val y = x.groupBy(_.charAt(0))
y: org.apache.spark.rdd.RDD[(Char, Iterable[String])] = ShuffledRDD[3] at groupBy at <console>:23

scala> y.collect
res1: Array[(Char, Iterable[String])] = Array((T,CompactBuffer(Tina, Thomas)), (C,CompactBuffer(Cory,
 Christine)), (J,CompactBuffer(Joseph, Jimmy, James, Jackeline, Juan)))
Spark groupBy Example Using Java 8
package com.backtobazics.sparkexamples;

import java.util.Arrays;

import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

public class GroupByExample {
    public static void main(String[] args) throws Exception {
        
        JavaSparkContext sc = new JavaSparkContext();
        
        // Parallelized with 2 partitions
        JavaRDD<String> rddX = sc.parallelize(
                Arrays.asList("Joseph", "Jimmy", "Tina",
                        "Thomas", "James", "Cory",
                        "Christine", "Jackeline", "Juan"), 3);
        
        JavaPairRDD<Character, Iterable<String>> rddY = rddX.groupBy(word -> word.charAt(0));
        
        System.out.println(rddY.collect());
    }
}

// Output:
// [(J,[Joseph, Jimmy, James, Jackeline, Juan]), (T,[Tina, Thomas]), (C,[Cory, Christine])] 
Spark groupBy Example Using Python
# Bazic groupBy example in python
# creating RDD x with words
>>> x = sc.parallelize(["Joseph", "Jimmy", "Tina",
... "Thomas", "James", "Cory",
... "Christine", "Jackeline", "Juan"], 3)

# Applying groupBy operation on x
>>> y = x.groupBy(lambda word: word[0])
>>> print([(t[0],[i for i in t[1]]) for t in y.collect()])
[('J', ['Joseph', 'Jimmy', 'James', 'Jackeline', 'Juan']), ('T', ['Tina', 'Thomas']), ('C', ['Cory', 'Christine'])]

References:

 

10 thoughts on “Apache Spark groupBy Example”

  1. Very nice article. Can you also post examples on groupByKey(), aggregateByKey(), combineByKey(), foldByKey() etc.

Leave a Reply to Aravindh Cancel reply

Your email address will not be published. Required fields are marked *

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>