PySpark实战:combineByKey操作

来自CloudWiki
跳转至: 导航搜索

背景

combineByKey操作是一个变换算子,是Spark中一个比较核心的高级函数

定义如下:

def combineByKey[C](
          createCombiner: V => C,
          mergeValue: (C, V) => C,
          mergeCombiners: (C, C) => C,
          partitioner: Partitioner,
          mapSideCombine: Boolean = true,
          serializer: Serializer = null)

如下解释下3个重要的函数参数:

  • createCombiner: V => C ,这个函数把当前的值作为参数,此时我们可以对其做些附加操作(类型转换)并把它返回 (这一步类似于初始化操作)
  • mergeValue: (C, V) => C,该函数把元素V合并到之前的元素C(createCombiner)上 (这个操作在每个分区内进行)
  • mergeCombiners: (C, C) => C,该函数把2个元素C合并 (这个操作在不同分区间进行)

代码

import findspark
findspark.init()
##############################################
from pyspark.sql import SparkSession
spark = SparkSession.builder \
        .master("local[1]") \
        .appName("RDD Demo") \
        .getOrCreate();
sc = spark.sparkContext
#############################################
rdd = sc.parallelize([("a", 1), ("b", 3), ("a", 2),("b", 4)],2)
#rdd = sc.parallelize([("a", 1), ("b", 3), ("a", 2),("b", 4)],2)
def to_list(a):
    #print(a)
    return [a]

def append(a, b):
    print(a)
    print(b)
    a.append(b)
    return a

def extend(a, b):
    print("======")
    print(a)
    print(b)
    a.extend(b)
    return a

ret = sorted(rdd.combineByKey(to_list, append, extend).collect())
#[('a', [1, 2]), ('b', [3, 4])]
print(ret)
##############################################
sc.stop()
  • to_list: 将V转换成[V]
  • append(a,b)依据mergeValue:(C,V) => C ,在每个分区内进行合并
  • extend(a,b)依据mergeCombiners: (C, C) => C 在不同分区间进行。

输出

======
[3]
[4]
======
[1]
[2]
[('a', [1, 2]), ('b', [3, 4])]

函数中的print语句,可以做完理解函数参数的重要手段。