PySpark实战:aggregate操作
来自CloudWiki
介绍
aggregate操作是一个动作操作,
调用形式为rdd.aggregate(zeroValue,seqOp,combOp)
其中使用给定的seqOp函数和给定的零值zeroValue来聚合每个分区上的元素,
然后再利用combOp函数和给定的零值zeroValue汇总所有分区的结果。
基本版
代码
import findspark findspark.init() ############################################## from pyspark.sql import SparkSession spark = SparkSession.builder \ .master("local[1]") \ .appName("RDD Demo") \ .getOrCreate(); sc = spark.sparkContext ############################################## data=[1,3,5,7,9,11,13,15,17] #2个分区 rdd=sc.parallelize(data,2) #打印分区的值 print(rdd.glom().collect()) seqOp = (lambda x, y: (x[0] + y, x[1] + 1)) combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1])) a=rdd.aggregate((0,0),seqOp,combOp) #(81, 9)=(0,0)+(16,4)+(65,5) print(a) #求每个分区的最大值的和 fmax = lambda x, y : ( x[0] if x[0] > y else y , x[1]+1) # zeroValue初始值形式是(x,y) a=rdd.aggregate((0,0),fmax,combOp) #(24, 9)=(0,0)+(7,4)+(17,5) print(a) #求每个分区的最小值的和 fmin = lambda x, y : ( x[0] if x[0] < y else y , x[1]+1) a=rdd.aggregate((6,0),fmin,combOp) #第一个分区和6比较,最小值是1 #第二个分区和6比较,最小值是6 #(13, 9)= (6,0)+(1,4)+(6,5) print(a) ############################################## sc.stop()
输出
[[1, 3, 5, 7], [9, 11, 13, 15, 17]] (81, 9) (24, 9) (13, 9)
输出中间结果版
代码
import findspark findspark.init() ############################################## from pyspark.sql import SparkSession spark = SparkSession.builder \ .master("local[1]") \ .appName("RDD Demo") \ .getOrCreate(); sc = spark.sparkContext ############################################## data=[1,3,5,7,9,11,13,15,17] rdd=sc.parallelize(data,2) def seqOp(x,y): print("seqOp x->",x) print("seqOp y->",y) print("seqResult",(x[0] + y, x[1] + 1)) print() return (x[0] + y, x[1] + 1) def combOp(x,y): print("combOp x->",x) print("combOp y->",y) print("comResult", (x[0] + y[0], x[1] + y[1])) print() return (x[0] + y[0], x[1] + y[1]) a=rdd.aggregate((0,0),seqOp,combOp) print(a) # (81, 9) ############################################## sc.stop()
输出
seqOp x-> (0, 0) seqOp y-> 9 seqResult (9, 1) seqOp x-> (9, 1) seqOp y-> 11 seqResult (20, 2) seqOp x-> (20, 2) seqOp y-> 13 seqResult (33, 3) seqOp x-> (33, 3) seqOp y-> 15 seqResult (48, 4) seqOp x-> (48, 4) seqOp y-> 17 seqResult (65, 5) combOp x-> (0, 0) combOp y-> (16, 4) comResult (16, 4) combOp x-> (16, 4) combOp y-> (65, 5) comResult (81, 9) (81, 9)