PySpark实战:foreachPartition操作

来自CloudWiki
跳转至: 导航搜索

介绍

foreachPartition操作 是一个变换算子,

它的作用是对RDD每个分区中的元素按照func定义的逻辑进行处理

代码

import findspark
findspark.init()
##############################################
from pyspark.sql import SparkSession
spark = SparkSession.builder \
        .master("local[1]") \
        .appName("RDD Demo") \
        .getOrCreate();
sc = spark.sparkContext
#############################################
def f(iter):
    for x in iter:
         print(x)

sc.parallelize([1, 2, 3, 4, 5,6,7,8],2).foreachPartition(f)

def f2(x):
        print(x)
        
sc.parallelize([1, 2, 3, 4, 5,6,7,8],2).foreach(f2)
##############################################
sc.stop()

  • foreachPartitions操作是一次性处理一个partition的数据,效率比foreach高
  • foreachPartitions读写数据库,性能比map高很多,不用反复创建数据库连接。

输出

1

2

3

4

5

6

7

8

1

2

3

4

5

6

7

8