PySpark实战:combineByKey操作
来自CloudWiki
背景
combineByKey操作是一个变换算子,是Spark中一个比较核心的高级函数
定义如下:
def combineByKey[C]( createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null)
如下解释下3个重要的函数参数:
- createCombiner: V => C ,这个函数把当前的值作为参数,此时我们可以对其做些附加操作(类型转换)并把它返回 (这一步类似于初始化操作)
- mergeValue: (C, V) => C,该函数把元素V合并到之前的元素C(createCombiner)上 (这个操作在每个分区内进行)
- mergeCombiners: (C, C) => C,该函数把2个元素C合并 (这个操作在不同分区间进行)
代码
import findspark findspark.init() ############################################## from pyspark.sql import SparkSession spark = SparkSession.builder \ .master("local[1]") \ .appName("RDD Demo") \ .getOrCreate(); sc = spark.sparkContext ############################################# rdd = sc.parallelize([("a", 1), ("b", 3), ("a", 2),("b", 4)],2) #rdd = sc.parallelize([("a", 1), ("b", 3), ("a", 2),("b", 4)],2) def to_list(a): #print(a) return [a] def append(a, b): print(a) print(b) a.append(b) return a def extend(a, b): print("======") print(a) print(b) a.extend(b) return a ret = sorted(rdd.combineByKey(to_list, append, extend).collect()) #[('a', [1, 2]), ('b', [3, 4])] print(ret) ############################################## sc.stop()
- to_list: 将V转换成[V]
- append(a,b)依据mergeValue:(C,V) => C ,在每个分区内进行合并
- extend(a,b)依据mergeCombiners: (C, C) => C 在不同分区间进行。
输出
====== [3] [4] ====== [1] [2] [('a', [1, 2]), ('b', [3, 4])]
函数中的print语句,可以做完理解函数参数的重要手段。