This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.spnotes.spark | |
import org.apache.spark.{Logging, SparkContext, SparkConf, Partitioner} | |
/** | |
* Created by sunilpatil on 2/16/16. | |
*/ | |
object HelloSparkPartitions extends Logging{ | |
def main(argv:Array[String]): Unit ={ | |
val sparkConf = new SparkConf().setMaster("local[1]").setAppName("HelloSparkPartitions") | |
val sparkContext = new SparkContext(sparkConf) | |
val input = sparkContext.parallelize(List(1,2,3,4,5,6,7,8),3) | |
println("********************** mapPartitions *******************") | |
input.mapPartitions(mapPartition).collect() | |
println("********************** reduce *******************") | |
println( "input.reduce " + input.reduce((x,y)=> add(x,y))) | |
println("********************** fold *******************") | |
println( "input.fold " + input.fold(10)((x,y)=> add(x,y))) | |
} | |
def add(x:Int, y:Int): Int={ | |
logError(s"Inside add -> $x, $y") | |
x +y | |
} | |
def mapPartition(iterator: Iterator[Int]): Iterator[Int] ={ | |
logDebug("Inside mapPartition ") | |
iterator.foreach(println) | |
iterator | |
} | |
} |
sparkContext.parallelize(List(1,2,3,4,5,6,7,8),3)
. Then i am calling input.mapPartitions(mapPartition)
to iterate through all the partitions in the RDD and printing records in them one by one. This shows that the RDD has 3 partitions and 1 and 2 are in first partitions 3,4,5 are in second partions and record 6,7,8 are in the third partitions.
Then next step is to call input.reduce((x,y)=> add(x,y)))
method that will invoke add
reduce function on the RDD, as you can see the output. The reduce function simply starts calling add method first for first 2 records then it starts calling it with running count for rest of the elements in the RDD
The last part is fold() method which i am calling with initial value of 10. As you can see from the output of fold() method, it first takes 10 as initial value and adds all the elements in single partitions to it. But then it also takes running counts across the RDDs adds 10 to it sums them up. Because of this, the result of fold() = (initial value * num of partitions +1) + sum of reduce
********************** mapPartitions *******************
[Stage 0:>(0 + 0) / 3]2016-02-17 10:22:13 DEBUG HelloSparkPartitions:63 - Inside mapPartition
1
2
2016-02-17 10:22:13 DEBUG HelloSparkPartitions:63 - Inside mapPartition
3
4
5
2016-02-17 10:22:13 DEBUG HelloSparkPartitions:63 - Inside mapPartition
6
7
8
********************** reduce *******************
2016-02-17 10:22:13 ERROR HelloSparkPartitions:75 - Inside add -> 1, 2
2016-02-17 10:22:13 ERROR HelloSparkPartitions:75 - Inside add -> 3, 4
2016-02-17 10:22:13 ERROR HelloSparkPartitions:75 - Inside add -> 7, 5
2016-02-17 10:22:13 ERROR HelloSparkPartitions:75 - Inside add -> 3, 12
2016-02-17 10:22:13 ERROR HelloSparkPartitions:75 - Inside add -> 6, 7
2016-02-17 10:22:13 ERROR HelloSparkPartitions:75 - Inside add -> 13, 8
2016-02-17 10:22:13 ERROR HelloSparkPartitions:75 - Inside add -> 15, 21
input.reduce 36
********************** fold *******************
2016-02-17 10:22:13 ERROR HelloSparkPartitions:75 - Inside add -> 10, 1
2016-02-17 10:22:13 ERROR HelloSparkPartitions:75 - Inside add -> 11, 2
2016-02-17 10:22:13 ERROR HelloSparkPartitions:75 - Inside add -> 10, 13
2016-02-17 10:22:13 ERROR HelloSparkPartitions:75 - Inside add -> 10, 3
2016-02-17 10:22:13 ERROR HelloSparkPartitions:75 - Inside add -> 13, 4
2016-02-17 10:22:13 ERROR HelloSparkPartitions:75 - Inside add -> 17, 5
2016-02-17 10:22:13 ERROR HelloSparkPartitions:75 - Inside add -> 23, 22
2016-02-17 10:22:13 ERROR HelloSparkPartitions:75 - Inside add -> 10, 6
2016-02-17 10:22:13 ERROR HelloSparkPartitions:75 - Inside add -> 16, 7
2016-02-17 10:22:13 ERROR HelloSparkPartitions:75 - Inside add -> 23, 8
2016-02-17 10:22:13 ERROR HelloSparkPartitions:75 - Inside add -> 45, 31
input.fold 76