PySpark实战:累加器

来自CloudWiki
跳转至: 导航搜索

介绍

除了广播变量外,Spark还提供了一种累加器用于在集群中共享暑假。累加器是一种只能利用关联操作做“加”操作的变数,因此它能够快速执行操作。

累加器的一个常见作用是,在调试时对作业的执行过程中的相关事件进行计数,调用SparkContext.acumulator(v)方法可根据初始变量v创建出累加器对象。

代码

import findspark
findspark.init()
##############################################
from pyspark.sql import SparkSession
spark = SparkSession.builder \
        .master("local[1]") \
        .appName("RDD Demo") \
        .getOrCreate();
sc = spark.sparkContext
#############################################
rdd = sc.range(1,101)
#创建累加器,初始值0
acc = sc.accumulator(0)
def fcounter(x):
        global acc
        if x % 2 == 0 :
                acc += 1
                #unsupported operand type(s) for -=
                #acc -= 1
rdd_counter =  rdd.map(fcounter)
#获取累加器值
#0
print("累加器acc:")
print(acc.value)
#保证多次正确获取累加器值
rdd_counter.persist()
#100
print("rdd统计个数:")
print(rdd_counter.count())
#50
print("累加器acc:")
print(acc.value)
#100
print("rdd统计个数:")
print(rdd_counter.count())
#50
print("累加器acc:")
print(acc.value)
##############################################
sc.stop()


  • acc是累加器
  • 第一个acc.value ,因为还未真正执行动作,因此acc.value初始值为0
  • 当执行rdd_counter.count()进行求和后,实际上这时候才触发fcounter函数的逻辑,所以acc.value=50
  • 使用累加器时,为了保证准确性,只能使用一次动作。
  • 当使用多次动作时,rdd_counter.persist()一行保证了累加器只执行一次,不重复累加

试一试:把rdd_counter.persist()一行注释掉,acc.value会有什么不同 ?

输出

累加器acc:
0
rdd统计个数:
100
累加器acc:
50
rdd统计个数:
100
累加器acc:
50