When you can call fold() method on the RDD it returns a different result than you normally expect, so i wanted to figure out how fold() method actually works so i built this simple application
First thing that i do in the application is create a simple RDD with 8 values from 1 to 8 and divide it into 3 partitions
sparkContext.parallelize(List(1,2,3,4,5,6,7,8),3)
. Then i am calling
input.mapPartitions(mapPartition)
to iterate through all the partitions in the RDD and printing records in them one by one. This shows that the RDD has 3 partitions and 1 and 2 are in first partitions 3,4,5 are in second partions and record 6,7,8 are in the third partitions.
Then next step is to call
input.reduce((x,y)=> add(x,y)))
method that will invoke
add
reduce function on the RDD, as you can see the output. The reduce function simply starts calling add method first for first 2 records then it starts calling it with running count for rest of the elements in the RDD
The last part is fold() method which i am calling with initial value of 10. As you can see from the output of fold() method, it first takes 10 as initial value and adds all the elements in single partitions to it. But then it also takes running counts across the RDDs adds 10 to it sums them up. Because of this, the result of fold() = (initial value * num of partitions +1) + sum of reduce
********************** mapPartitions *******************
[Stage 0:>(0 + 0) / 3]2016-02-17 10:22:13 DEBUG HelloSparkPartitions:63 - Inside mapPartition
1
2
2016-02-17 10:22:13 DEBUG HelloSparkPartitions:63 - Inside mapPartition
3
4
5
2016-02-17 10:22:13 DEBUG HelloSparkPartitions:63 - Inside mapPartition
6
7
8
********************** reduce *******************
2016-02-17 10:22:13 ERROR HelloSparkPartitions:75 - Inside add -> 1, 2
2016-02-17 10:22:13 ERROR HelloSparkPartitions:75 - Inside add -> 3, 4
2016-02-17 10:22:13 ERROR HelloSparkPartitions:75 - Inside add -> 7, 5
2016-02-17 10:22:13 ERROR HelloSparkPartitions:75 - Inside add -> 3, 12
2016-02-17 10:22:13 ERROR HelloSparkPartitions:75 - Inside add -> 6, 7
2016-02-17 10:22:13 ERROR HelloSparkPartitions:75 - Inside add -> 13, 8
2016-02-17 10:22:13 ERROR HelloSparkPartitions:75 - Inside add -> 15, 21
input.reduce 36
********************** fold *******************
2016-02-17 10:22:13 ERROR HelloSparkPartitions:75 - Inside add -> 10, 1
2016-02-17 10:22:13 ERROR HelloSparkPartitions:75 - Inside add -> 11, 2
2016-02-17 10:22:13 ERROR HelloSparkPartitions:75 - Inside add -> 10, 13
2016-02-17 10:22:13 ERROR HelloSparkPartitions:75 - Inside add -> 10, 3
2016-02-17 10:22:13 ERROR HelloSparkPartitions:75 - Inside add -> 13, 4
2016-02-17 10:22:13 ERROR HelloSparkPartitions:75 - Inside add -> 17, 5
2016-02-17 10:22:13 ERROR HelloSparkPartitions:75 - Inside add -> 23, 22
2016-02-17 10:22:13 ERROR HelloSparkPartitions:75 - Inside add -> 10, 6
2016-02-17 10:22:13 ERROR HelloSparkPartitions:75 - Inside add -> 16, 7
2016-02-17 10:22:13 ERROR HelloSparkPartitions:75 - Inside add -> 23, 8
2016-02-17 10:22:13 ERROR HelloSparkPartitions:75 - Inside add -> 45, 31
input.fold 76
Hi,
ReplyDeleteYou shoudn't use 10 as initial value. The value you provide should be the identity element for your operation (e.g., 0 for +, 1 for *, or an empty list for concatenation). This is specified in the Spark API:
"Aggregate the elements of each partition, and then the results for all the partitions, using a given associative function and a neutral "zero value". "
Regards
Tomás
ReplyDeletevery informative blog and useful article thank you for sharing with us, keep posting learn more Big Data Hadoop Online Training Hyderabad
Thanks for info....
ReplyDeleteWebsite development in Bangalore