PySpark实战:foreachPartition操作
来自CloudWiki
介绍
foreachPartition操作 是一个变换算子,
它的作用是对RDD每个分区中的元素按照func定义的逻辑进行处理
代码
import findspark findspark.init() ############################################## from pyspark.sql import SparkSession spark = SparkSession.builder \ .master("local[1]") \ .appName("RDD Demo") \ .getOrCreate(); sc = spark.sparkContext ############################################# def f(iter): for x in iter: print(x) sc.parallelize([1, 2, 3, 4, 5,6,7,8],2).foreachPartition(f) def f2(x): print(x) sc.parallelize([1, 2, 3, 4, 5,6,7,8],2).foreach(f2) ############################################## sc.stop()
- foreachPartitions操作是一次性处理一个partition的数据,效率比foreach高
- foreachPartitions读写数据库,性能比map高很多,不用反复创建数据库连接。
输出
1
2
3
4
5
6
7
8
1
2
3
4
5
6
7
8