PySpark实战:aggregateByKey操作

来自CloudWiki
跳转至: 导航搜索

介绍

aggregateByKey操作是一个变换操作,

它的调用形式为rdd.aggregateByKey(zeroValue,seqFunc,combFunc,numPartitons=None,partitionFunc=<function portable_has>

zeroValue: 每次按Key分组之后每个组的初始值,

seqFunc:函数用来对每个分区内的数据按照key分别进行逻辑计算

combFunc对经过seqFunc处理过的数据按照key分别进行逻辑计算

代码

import findspark
findspark.init()

##############################################
from pyspark.sql import SparkSession
spark = SparkSession.builder \
        .master("local[1]") \
        .appName("RDD Demo") \
        .getOrCreate();
sc = spark.sparkContext
##############################################
data=[("a",1),("b",2),("a",3),("b",4),("a",5),("b",6),("a",7),("b",8),("a",9),("b",10)]
#2个分区
rdd=sc.parallelize(data,2)
#打印分区的值
#[[('a', 1), ('b', 2), ('a', 3), ('b', 4), ('a', 5)],
# [('b', 6), ('a', 7), ('b', 8), ('a', 9), ('b', 10)]]
print(rdd.glom().collect())
def seqFunc(x,y):
        # seqOp x-> 0
        # seqOp y-> 1
        print("seqOp x->",x)
        print("seqOp y->",y)
        print("seqResult",x+y)
        return x + y
def combFunc(x,y):
        #combOp x-> 6
        #combOp y-> 16
        #combOp x-> 9
        #combOp y-> 16
        print("combOp x->",x)
        print("combOp y->",y)
        print("combResult",x+y)
        return x + y
a=rdd.aggregateByKey(0,seqFunc,combFunc)
# [('b', 30), ('a', 25)]
print(a.collect())
##############################################
sc.stop()

输出

[[('a', 1), ('b', 2), ('a', 3), ('b', 4), ('a', 5)], [('b', 6), ('a', 7), ('b', 8), ('a', 9), ('b', 10)]]
seqOp x-> 0
seqOp y-> 1
seqResult 1
seqOp x-> 0
seqOp y-> 2
seqResult 2
seqOp x-> 1
seqOp y-> 3
seqResult 4
seqOp x-> 2
seqOp y-> 4
seqResult 6
seqOp x-> 4
seqOp y-> 5
seqResult 9
seqOp x-> 0
seqOp y-> 6
seqResult 6
seqOp x-> 0
seqOp y-> 7
seqResult 7
seqOp x-> 6
seqOp y-> 8
seqResult 14
seqOp x-> 7
seqOp y-> 9
seqResult 16
seqOp x-> 14
seqOp y-> 10
seqResult 24
combOp x-> 6
combOp y-> 24
combResult 30
combOp x-> 9
combOp y-> 16
combResult 25
[('b', 30), ('a', 25)]