PySpark实战:累加器
来自CloudWiki
124.128.143.191(讨论)2021年7月10日 (六) 11:13的版本 (创建页面,内容为“==介绍== 除了广播变量外,Spark还提供了一种累加器用于在集群中共享暑假。累加器是一种只能利用关联操作做“加”操作的…”)
介绍
除了广播变量外,Spark还提供了一种累加器用于在集群中共享暑假。累加器是一种只能利用关联操作做“加”操作的变数,因此它能够快速执行操作。
累加器的一个常见作用是,在调试时对作业的执行过程中的相关事件进行计数,调用SparkContext.acumulator(v)方法可根据初始变量v创建出累加器对象。
代码
import findspark findspark.init() ############################################## from pyspark.sql import SparkSession spark = SparkSession.builder \ .master("local[1]") \ .appName("RDD Demo") \ .getOrCreate(); sc = spark.sparkContext ############################################# rdd = sc.range(1,101) #创建累加器,初始值0 acc = sc.accumulator(0) def fcounter(x): global acc if x % 2 == 0 : acc += 1 #unsupported operand type(s) for -= #acc -= 1 rdd_counter = rdd.map(fcounter) #获取累加器值 #0 print("累加器acc:") print(acc.value) #保证多次正确获取累加器值 rdd_counter.persist() #100 print("rdd统计个数:") print(rdd_counter.count()) #50 print("累加器acc:") print(acc.value) #100 print("rdd统计个数:") print(rdd_counter.count()) #50 print("累加器acc:") print(acc.value) ############################################## sc.stop()
- acc是累加器
- 第一个acc.value ,因为还未真正执行动作,因此acc.value初始值为0
- 当执行rdd_counter.count()进行求和后,实际上这时候才触发fcounter函数的逻辑,所以acc.value=50
- 使用累加器时,为了保证准确性,只能使用一次动作。
- 当使用多次动作时,rdd_counter.persist()一行保证了累加器只执行一次,不重复累加
试一试:把rdd_counter.persist()一行注释掉,acc.value会有什么不同 ?
输出
累加器acc: 0 rdd统计个数: 100 累加器acc: 50 rdd统计个数: 100 累加器acc: 50