查看“PySpark实战:累加器”的源代码
←
PySpark实战:累加器
跳转至:
导航
,
搜索
因为以下原因,您没有权限编辑本页:
您所请求的操作仅限于该用户组的用户使用:
用户
您可以查看与复制此页面的源代码。
==介绍== 除了广播变量外,Spark还提供了一种累加器用于在集群中共享暑假。累加器是一种只能利用关联操作做“加”操作的变数,因此它能够快速执行操作。 累加器的一个常见作用是,在调试时对作业的执行过程中的相关事件进行计数,调用SparkContext.acumulator(v)方法可根据初始变量v创建出累加器对象。 ==代码== <nowiki> import findspark findspark.init() ############################################## from pyspark.sql import SparkSession spark = SparkSession.builder \ .master("local[1]") \ .appName("RDD Demo") \ .getOrCreate(); sc = spark.sparkContext ############################################# rdd = sc.range(1,101) #创建累加器,初始值0 acc = sc.accumulator(0) def fcounter(x): global acc if x % 2 == 0 : acc += 1 #unsupported operand type(s) for -= #acc -= 1 rdd_counter = rdd.map(fcounter) #获取累加器值 #0 print("累加器acc:") print(acc.value) #保证多次正确获取累加器值 rdd_counter.persist() #100 print("rdd统计个数:") print(rdd_counter.count()) #50 print("累加器acc:") print(acc.value) #100 print("rdd统计个数:") print(rdd_counter.count()) #50 print("累加器acc:") print(acc.value) ############################################## sc.stop() </nowiki> *acc是累加器 *第一个acc.value ,因为还未真正执行动作,因此acc.value初始值为0 *当执行rdd_counter.count()进行求和后,实际上这时候才触发fcounter函数的逻辑,所以acc.value=50 *使用累加器时,为了保证准确性,只能使用一次动作。 *当使用多次动作时,rdd_counter.persist()一行保证了累加器只执行一次,不重复累加 试一试:把rdd_counter.persist()一行注释掉,acc.value会有什么不同 ? ==输出== <nowiki> 累加器acc: 0 rdd统计个数: 100 累加器acc: 50 rdd统计个数: 100 累加器acc: 50 </nowiki>
返回至
PySpark实战:累加器
。
导航菜单
个人工具
登录
命名空间
页面
讨论
变种
视图
阅读
查看源代码
查看历史
更多
搜索
导航
首页
最近更改
随机页面
帮助
工具
链入页面
相关更改
特殊页面
页面信息