Spark combineByKey
RDD transformation is very similar to combiner in Hadoop MapReduce programming. In this post, we’ll discuss spark combineByKey example in depth and try to understand the importance of this function in detail.
Spark combineByKey
is a transformation operation on PairRDD (i.e. RDD with key/value pair). It is a wider operation as it requires shuffle in the last stage. As we have seen earlier in reduceByKey example that it internally combines elements by partition. The same combiner kind behavior is there in combineByKey
function.
Spark combineByKey is a generic function to combine the elements for each key using a custom set of aggregation functions
Internally spark combineByKey
function efficiently combines the values of a PairRDD
partition by applying aggregation function. The main objective of combineByKey transformation is transforming any PairRDD[(K,V)]
to the RDD[(K,C)]
where C is the result of any aggregation of all values under key K.
Let’s understand this transformation step by step.
Spark combineByKey argument functions
Spark combineByKey
function uses following three functions as an argument,
- createCombiner
- mergeValue
- mergeCombiners
To start with let’s say we have a RDD of studentName, subjectName and marks and we want to get the percentage of all students. So following will be the steps of solving it with combineByKey
transformation.

createCombiner function of combineByKey
- This function is a first argument of
combineByKey
function - It is a first aggregation step for each key
- It will be executed when any new key is found in a partition
- Execution of this lambda function is local to a partition of a node, on each individual values
In our case as we are calculating percentage, we need sum and count as an aggregation. So our createCombiner
function should initialize it with a tuple (sum, count). For initial aggregation, it should be (value, 1).
This function is similar to first argument (i.e. zeroVal) of aggregateByKey transformation.
mergeValue function of combineByKey
- Second function executes when next subsequent value is given to combiner
- It also executes locally on each partition of a node and combines all values
- Arguments of this function are a accumulator and a new value
- It combines a new value in existing accumulator
In our case mergeValue
has one accumulator tuple (sum, count). So whenever we get a new value the marks will be added to first element and second value (i.e. counter) will be incremented by 1.
This function is similar to second argument (i.e. seqOp) of aggregateByKey transformation.
mergeCombiner function of combineByKey
- Final function is used to combine how to merge two accumulators (i.e. combiners) of a single key across the partitions to generate final expected result
- Arguments are two accumulators (i.e. combiners)
- Merge results of a single key from different partitions
This function is similar to third argument (i.e. combOp) of aggregateByKey transformation.
Important Points
- Apache spark
combineByKey
is a transformation operation hence its evaluation is lazy - It is a wider operation as it shuffles data in the last stage of aggregation and creates another RDD
- Recommended to use when you need to do further aggregation on grouped data
- Use
combineByKey
when return type differs than source type (i.e. when you cannot usereduceByKey
)
Spark combineByKey Example in Scala
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 | // Creating PairRDD studentRDD with key value pairs val studentRDD = sc.parallelize(Array( ("Joseph", "Maths", 83), ("Joseph", "Physics", 74), ("Joseph", "Chemistry", 91), ("Joseph", "Biology", 82), ("Jimmy", "Maths", 69), ("Jimmy", "Physics", 62), ("Jimmy", "Chemistry", 97), ("Jimmy", "Biology", 80), ("Tina", "Maths", 78), ("Tina", "Physics", 73), ("Tina", "Chemistry", 68), ("Tina", "Biology", 87), ("Thomas", "Maths", 87), ("Thomas", "Physics", 93), ("Thomas", "Chemistry", 91), ("Thomas", "Biology", 74), ("Cory", "Maths", 56), ("Cory", "Physics", 65), ("Cory", "Chemistry", 71), ("Cory", "Biology", 68), ("Jackeline", "Maths", 86), ("Jackeline", "Physics", 62), ("Jackeline", "Chemistry", 75), ("Jackeline", "Biology", 83), ("Juan", "Maths", 63), ("Juan", "Physics", 69), ("Juan", "Chemistry", 64), ("Juan", "Biology", 60)), 3) //Defining createCombiner, mergeValue and mergeCombiner functions def createCombiner = (tuple: (String, Int)) => (tuple._2.toDouble, 1) def mergeValue = (accumulator: (Double, Int), element: (String, Int)) => (accumulator._1 + element._2, accumulator._2 + 1) def mergeCombiner = (accumulator1: (Double, Int), accumulator2: (Double, Int)) => (accumulator1._1 + accumulator2._1, accumulator1._2 + accumulator2._2) // use combineByKey for finding percentage val combRDD = studentRDD.map(t => (t._1, (t._2, t._3))) .combineByKey(createCombiner, mergeValue, mergeCombiner) .map(e => (e._1, e._2._1/e._2._2)) //Check the Outout combRDD.collect foreach println // Output // (Tina,76.5) // (Thomas,86.25) // (Jackeline,76.5) // (Joseph,82.5) // (Juan,64.0) // (Jimmy,77.0) // (Cory,65.0) |
PySpark combineByKey Example
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 | # PySpark combineByKey EXample # Creating PairRDD student_rdd with key value pairs student_rdd = sc.parallelize([ ("Joseph", "Maths", 83), ("Joseph", "Physics", 74), ("Joseph", "Chemistry", 91), ("Joseph", "Biology", 82), ("Jimmy", "Maths", 69), ("Jimmy", "Physics", 62), ("Jimmy", "Chemistry", 97), ("Jimmy", "Biology", 80), ("Tina", "Maths", 78), ("Tina", "Physics", 73), ("Tina", "Chemistry", 68), ("Tina", "Biology", 87), ("Thomas", "Maths", 87), ("Thomas", "Physics", 93), ("Thomas", "Chemistry", 91), ("Thomas", "Biology", 74), ("Cory", "Maths", 56), ("Cory", "Physics", 65), ("Cory", "Chemistry", 71), ("Cory", "Biology", 68), ("Jackeline", "Maths", 86), ("Jackeline", "Physics", 62), ("Jackeline", "Chemistry", 75), ("Jackeline", "Biology", 83), ("Juan", "Maths", 63), ("Juan", "Physics", 69), ("Juan", "Chemistry", 64), ("Juan", "Biology", 60)], 3) # Defining createCombiner, mergeValue and mergeCombiner functions def createCombiner(tpl): return (tpl[1], 1) def mergeValue(accumulator, element): return (accumulator[0] + element[1], accumulator[1] + 1) def mergeCombiner(accumulator1, accumulator2): return (accumulator1[0] + accumulator2[0], accumulator1[1] + accumulator2[1]) comb_rdd = student_rdd.map(lambda t: (t[0], (t[1], t[2]))) \ .combineByKey(createCombiner, mergeValue, mergeCombiner) \ .map(lambda t: (t[0], t[1][0]/t[1][1])) # Check the Outout for tpl in comb_rdd.collect(): print(tpl) ## Output # ('Thomas', 86.25) # ('Tina', 76.5) # ('Jimmy', 77.0) # ('Juan', 64.0) # ('Joseph', 82.5) # ('Cory', 65.0) # ('Jackeline', 76.5) |
aggregateByKey
and combineByKey
transformations. Let’s find out..Difference between aggregateByKey and combineByKey transformations
Spark combineByKey
is a general transformation whereas internal implementation of transformations groupByKey
, reduceByKey
and aggregateByKey
uses combineByKey
.
aggregateByKey
- In
combineByKey
, aggregations such as sum, avg, etc. are performed for each key to reduce the shuffle data - If your aggregation function supports that reduction you can safely use
aggregateByKey
operation - The scenario is referred as map side combine when you perform aggregation with respect to a key locally in a partition itself
- Spark
aggregateByKey
is similar toreduceByKey
but the return RDD type is different foraggregateByKey
and you can provide initial values when performing aggregation
combineByKey
- Spark
combineByKey
transformation is flexible in performing map or reduce side combine - Usage of
combineByKey
transformation it is more complex - You always need to implement three functions:
createCombiner
,mergeValue
,mergeCombiner
Hope you like this post. Stay tuned for more post on other spark Transformations and Actions.
Leave a Reply