PySpark实战:aggregate操作

来自CloudWiki
跳转至: 导航搜索

介绍

aggregate操作是一个动作操作,

调用形式为rdd.aggregate(zeroValue,seqOp,combOp)

其中使用给定的seqOp函数和给定的零值zeroValue来聚合每个分区上的元素,

然后再利用combOp函数和给定的零值zeroValue汇总所有分区的结果。

基本版

代码

import findspark
findspark.init()

##############################################
from pyspark.sql import SparkSession
spark = SparkSession.builder \
        .master("local[1]") \
        .appName("RDD Demo") \
        .getOrCreate();
sc = spark.sparkContext
##############################################
data=[1,3,5,7,9,11,13,15,17]
#2个分区
rdd=sc.parallelize(data,2)
#打印分区的值
print(rdd.glom().collect())
seqOp = (lambda x, y: (x[0] + y, x[1] + 1))
combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1]))
a=rdd.aggregate((0,0),seqOp,combOp)
#(81, 9)=(0,0)+(16,4)+(65,5)
print(a)
#求每个分区的最大值的和
fmax = lambda x, y : ( x[0] if x[0] > y else y , x[1]+1)
# zeroValue初始值形式是(x,y)
a=rdd.aggregate((0,0),fmax,combOp)
#(24, 9)=(0,0)+(7,4)+(17,5)
print(a)
#求每个分区的最小值的和
fmin = lambda x, y : ( x[0] if x[0] < y else y , x[1]+1)
a=rdd.aggregate((6,0),fmin,combOp)
#第一个分区和6比较,最小值是1
#第二个分区和6比较,最小值是6
#(13, 9)= (6,0)+(1,4)+(6,5)
print(a)
##############################################
sc.stop()

输出

[[1, 3, 5, 7], [9, 11, 13, 15, 17]]
(81, 9)
(24, 9)
(13, 9)

输出中间结果版

代码

import findspark
findspark.init()
##############################################
from pyspark.sql import SparkSession
spark = SparkSession.builder \
        .master("local[1]") \
        .appName("RDD Demo") \
        .getOrCreate();
sc = spark.sparkContext
##############################################
data=[1,3,5,7,9,11,13,15,17]
rdd=sc.parallelize(data,2)
def seqOp(x,y):
        print("seqOp x->",x)
        print("seqOp y->",y)
        print("seqResult",(x[0] + y, x[1] + 1))
        print()
        return (x[0] + y, x[1] + 1)
def combOp(x,y):
        print("combOp x->",x)
        print("combOp y->",y)
        print("comResult", (x[0] + y[0], x[1] + y[1]))
        print()
        return (x[0] + y[0], x[1] + y[1])
a=rdd.aggregate((0,0),seqOp,combOp)
print(a)
# (81, 9)
##############################################
sc.stop()

输出

seqOp x-> (0, 0)
seqOp y-> 9
seqResult (9, 1)

seqOp x-> (9, 1)
seqOp y-> 11
seqResult (20, 2)

seqOp x-> (20, 2)
seqOp y-> 13
seqResult (33, 3)

seqOp x-> (33, 3)
seqOp y-> 15
seqResult (48, 4)

seqOp x-> (48, 4)
seqOp y-> 17
seqResult (65, 5)

combOp x-> (0, 0)
combOp y-> (16, 4)
comResult (16, 4)

combOp x-> (16, 4)
combOp y-> (65, 5)
comResult (81, 9)

(81, 9)