Apache Spark filter Example

In spark filter example, we’ll explore filter method of Spark RDD class in all of three languages Scala, Java and Python. Spark filter operation is a transformation kind of operation so its evaluation is lazy. Let’s dig a bit deeper.

Spark RDD filter function returns a new RDD containing only the elements that satisfy a predicate.

As we know, spark filter is a transformation operation of RDD which accepts a predicate as an argument. Predicate is function which accepts some parameter and returns boolean value true or false. Spark filter method will pass this predicate in argument and operates on the source RDD. It will filter all the elements of the source RDD for which predicate is not satisfied and creates new RDD with the elements which are passed by the predicate function. Let’s understand this by following example.

Apache Spark filter Example
Apache Spark filter Example

 As you can see in above image RDD X is the source RDD and contains elements 1 to 5 and has two partitions. Operation filter is take predicate f(x) as an argument which is some thing like x % 2 == 0 it means it will return true for even elements and false for odd elements. RDD Y is a resulting RDD which will have the filtered (i.e. even elements).

Important points to note are,

  • filter is a transformation operation in Spark hence it is lazily evaluated
  • It is a narrow operation as it is not shuffling data from one partition to multiple partitions
  • filter accepts predicate as an argument and will filter the elements from source RDD which are not satisfied by predicate function

Let’s take some examples,

 Spark filter Example Using Scala
scala> val x = sc.parallelize(1 to 10, 2)

// filter operation 
scala> val y = x.filter(e => e%2==0) 
scala> y.collect
res0: Array[Int] = Array(2, 4, 6, 8, 10)

// rdd y can be re written with shorter syntax in scala as 
scala> val y = x.filter(_ % 2 == 0)
scala> y.collect
res1: Array[Int] = Array(2, 4, 6, 8, 10)
Spark  filter Example Using Java 8
package com.backtobazics.sparkexamples;

import java.util.Arrays;
import java.util.List;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;

public class FilterExample {
    public static void main(String[] args) throws Exception {
        JavaSparkContext sc = new JavaSparkContext();
        
        //Filter Predicate
        Function<Integer, Boolean> filterPredicate = e -> e % 2 == 0;
        
        // Parallelized with 2 partitions
        JavaRDD<Integer> rddX = sc.parallelize(
                Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10),
                2);
        
        // filter operation will return List of Array in following case
        JavaRDD<Integer> rddY = rddX.filter(filterPredicate);
        List<Integer> filteredList = rddY.collect();
    }
}
 Spark filter Example Using Python
>>> x = sc.parallelize([1,2,3,4,5,6,7,8,9,10], 2)

# filter operation 
>>> y = x.filter(lambda x: x % 2 == 0)
>>> y.collect()
[2, 4, 6, 8, 10]

References:

2 thoughts on “Apache Spark filter Example”

Leave a Reply to Piotrek Cancel reply

Your email address will not be published. Required fields are marked *

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>